#!/usr/bin/env python3 """ Enrich person entity profiles that have empty experience data using Linkup API. This script: 1. Identifies profiles with is_heritage_relevant=true but empty experience 2. Uses Linkup search to find career information 3. Stores raw responses in data/custodian/web/linkedin/{slug}/ 4. Updates entity profiles with enrichment data Per AGENTS.md Rule 27: Person entity files are the SINGLE SOURCE OF TRUTH. """ import json import os import sys import time import re import httpx from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any # Configuration ENTITY_DIR = Path("/Users/kempersc/apps/glam/data/custodian/person/entity") WEB_DIR = Path("/Users/kempersc/apps/glam/data/custodian/web/linkedin") BATCH_SIZE = 10 DELAY_BETWEEN_REQUESTS = 2.0 # seconds between API calls # Linkup API configuration LINKUP_API_BASE = "https://api.linkup.so/v1" def get_linkup_api_key() -> Optional[str]: """Get Linkup API key from environment.""" return os.environ.get('LINKUP_API_KEY') def search_linkup(query: str, depth: str = "deep") -> Optional[Dict]: """ Search using Linkup API. Args: query: Search query string depth: Search depth - "standard" or "deep" Returns: Search results dict or None on error """ api_key = get_linkup_api_key() if not api_key: print(" ERROR: LINKUP_API_KEY not set") return None try: with httpx.Client(timeout=60.0) as client: response = client.post( f"{LINKUP_API_BASE}/search", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "q": query, "depth": depth, "outputType": "searchResults" } ) if response.status_code == 200: return response.json() else: print(f" Linkup search error: {response.status_code}") return None except Exception as e: print(f" Error calling Linkup: {e}") return None def fetch_linkup(url: str, render_js: bool = False) -> Optional[Dict]: """ Fetch URL content using Linkup API. Args: url: URL to fetch render_js: Whether to render JavaScript Returns: Fetch result dict or None on error """ api_key = get_linkup_api_key() if not api_key: print(" ERROR: LINKUP_API_KEY not set") return None try: with httpx.Client(timeout=60.0) as client: response = client.post( f"{LINKUP_API_BASE}/fetch", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "url": url, "renderJs": render_js } ) if response.status_code == 200: return response.json() else: print(f" Linkup fetch error: {response.status_code}") return None except Exception as e: print(f" Error calling Linkup fetch: {e}") return None def extract_linkedin_slug(url: str) -> str: """Extract LinkedIn slug from URL.""" if '/in/' in url: return url.split('/in/')[-1].rstrip('/') return url.rstrip('/') def load_profiles_needing_enrichment() -> List[Dict]: """ Load profiles that: 1. Have is_heritage_relevant = true 2. Have empty experience array 3. Have valid LinkedIn URL """ profiles = [] for file_path in ENTITY_DIR.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Check heritage relevance hr = data.get('heritage_relevance', {}) if not hr.get('is_heritage_relevant', False): continue # Check experience is empty profile_data = data.get('profile_data', {}) experience = profile_data.get('experience', []) if experience and len(experience) > 0: continue # Already has experience data # Check for LinkedIn URL linkedin_url = ( data.get('extraction_metadata', {}).get('linkedin_url') or profile_data.get('linkedin_url') ) if not linkedin_url or 'linkedin.com/in/' not in linkedin_url: continue # Check if already enriched via Linkup if data.get('timeline_enrichment', {}).get('enriched_on'): continue slug = extract_linkedin_slug(linkedin_url) # Get source info for better search source_info = data.get('source_staff_info', {}) profiles.append({ 'file_path': str(file_path), 'name': profile_data.get('name') or source_info.get('name', 'Unknown'), 'headline': profile_data.get('headline') or source_info.get('headline', ''), 'linkedin_url': linkedin_url, 'slug': slug, 'custodian': source_info.get('custodian', ''), 'heritage_type': source_info.get('heritage_type', '') }) except Exception as e: print(f"Error loading {file_path}: {e}") return profiles def build_search_query(profile: Dict) -> str: """Build optimal search query for profile.""" name = profile['name'] headline = profile.get('headline', '') custodian = profile.get('custodian', '') # Extract organization from headline if not in custodian org = custodian if not org and headline: # Common patterns: "Title at Organization" or "Title bij Organization" for sep in [' at ', ' bij ', ' | ', ' - ']: if sep in headline: parts = headline.split(sep) if len(parts) > 1: org = parts[-1].split('|')[0].strip() break # Build query query_parts = [f'"{name}"'] if org: query_parts.append(org) query_parts.append('linkedin.com/in career experience education') return ' '.join(query_parts) def parse_experience_from_results(results: List[Dict], name: str) -> List[Dict]: """Parse experience data from Linkup search results.""" experience = [] for result in results: content = result.get('content', '') or result.get('snippet', '') # Skip if doesn't mention the person if name.lower() not in content.lower(): continue # Look for job patterns # Pattern: "Title at Company" or "Company · Title" job_patterns = [ r'(?:as|is|was)\s+(?:a\s+)?([^·|]+?)\s+(?:at|for|bij)\s+([^·|]+)', r'([A-Z][^·|]+?)\s+(?:at|@|bij)\s+([A-Z][^·|]+)', r'([^·]+)\s+·\s+([A-Z][^·\n]+)', ] for pattern in job_patterns: matches = re.findall(pattern, content, re.IGNORECASE) for match in matches: if len(match) >= 2: title = match[0].strip()[:100] company = match[1].strip()[:100] # Skip if too short or generic if len(title) < 3 or len(company) < 3: continue experience.append({ 'title': title, 'company': company, 'source': 'linkup_search', 'current': False }) # Deduplicate seen = set() unique_exp = [] for exp in experience: key = (exp['title'].lower(), exp['company'].lower()) if key not in seen: seen.add(key) unique_exp.append(exp) return unique_exp[:10] # Limit to 10 positions def save_raw_response(slug: str, response_type: str, data: Dict) -> Path: """Save raw Linkup response to web directory.""" dir_path = WEB_DIR / slug dir_path.mkdir(parents=True, exist_ok=True) timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ') filename = f"linkup_{response_type}_{timestamp}.json" file_path = dir_path / filename with open(file_path, 'w', encoding='utf-8') as f: json.dump({ 'metadata': { 'retrieval_agent': 'linkup', 'retrieval_timestamp': datetime.now(timezone.utc).isoformat(), 'response_type': response_type }, 'data': data }, f, indent=2, ensure_ascii=False) return file_path def enrich_profile(profile: Dict) -> bool: """ Enrich a single profile using Linkup. Returns True if enrichment was successful. """ name = profile['name'] slug = profile['slug'] file_path = Path(profile['file_path']) print(f" Enriching: {name} ({slug})") # Build search query query = build_search_query(profile) print(f" Query: {query[:60]}...") # Search using Linkup search_result = search_linkup(query, depth="deep") if not search_result: print(f" No search results") return False # Save raw response raw_path = save_raw_response(slug, "search", search_result) print(f" Raw response saved: {raw_path.name}") # Parse results results = search_result.get('results', []) if not results: print(f" No results in response") return False # Extract experience data experience = parse_experience_from_results(results, name) # Also try to extract from LinkedIn-specific results linkedin_results = [r for r in results if 'linkedin.com/in/' in r.get('url', '')] # Load current entity file with open(file_path, 'r', encoding='utf-8') as f: entity_data = json.load(f) # Update with enrichment data timestamp = datetime.now(timezone.utc).isoformat() entity_data['timeline_enrichment'] = { 'enriched_on': timestamp, 'retrieval_agent': 'linkup', 'search_query': query, 'results_count': len(results), 'linkedin_results_count': len(linkedin_results), 'raw_response_path': str(raw_path.relative_to(Path('/Users/kempersc/apps/glam'))), 'extracted_experience_count': len(experience) } # Update profile_data with experience if found if experience: entity_data['profile_data']['experience'] = experience entity_data['extraction_metadata']['extraction_method'] = ( entity_data['extraction_metadata'].get('extraction_method', 'unknown') + '_linkup_enriched' ) entity_data['extraction_metadata']['notes'] = ( entity_data['extraction_metadata'].get('notes', '') + f" Enriched via Linkup search on {timestamp[:10]} with {len(experience)} experience entries." ) # Save updated entity with open(file_path, 'w', encoding='utf-8') as f: json.dump(entity_data, f, indent=2, ensure_ascii=False) print(f" ✓ Enriched with {len(experience)} experience entries") return len(experience) > 0 def main(): """Main enrichment workflow.""" print("=" * 60) print("Linkup Enrichment for Profiles with Empty Experience") print("=" * 60) # Check API key if not get_linkup_api_key(): print("\nERROR: LINKUP_API_KEY environment variable not set") print("Please set it before running this script.") sys.exit(1) # Load profiles profiles = load_profiles_needing_enrichment() print(f"\nFound {len(profiles)} profiles needing enrichment") if not profiles: print("No profiles need enrichment.") return # Limit batch size batch = profiles[:BATCH_SIZE] print(f"Processing batch of {len(batch)} profiles\n") # Process batch enriched = 0 failed = 0 for i, profile in enumerate(batch, 1): print(f"\n[{i}/{len(batch)}]") try: if enrich_profile(profile): enriched += 1 else: failed += 1 except Exception as e: print(f" ERROR: {e}") failed += 1 # Rate limiting if i < len(batch): time.sleep(DELAY_BETWEEN_REQUESTS) # Summary print("\n" + "=" * 60) print("Enrichment Complete") print("=" * 60) print(f"Total processed: {len(batch)}") print(f"Successfully enriched: {enriched}") print(f"Failed/No data: {failed}") print(f"Remaining profiles: {len(profiles) - len(batch)}") if __name__ == '__main__': main()