#!/usr/bin/env python3 """ Complete LinkedIn profile fetching system that processes all staff directories and fetches only new profiles not already in entity directory. """ import json import os import sys import time import hashlib import argparse from pathlib import Path # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed import re import subprocess import httpx import yaml from tqdm import tqdm # Path to PiCo integration module with GLM system prompt PICO_MODULE_PATH = Path("/Users/kempersc/apps/glam/data/entity_annotation/modules/integrations/pico.yaml") def load_pico_system_prompt() -> Optional[str]: """Load the GLM CH-Annotator system prompt from PiCo YAML module. Returns: The system prompt string, or None if loading fails. """ try: if not PICO_MODULE_PATH.exists(): print(f"Warning: PiCo module not found at {PICO_MODULE_PATH}") return None with open(PICO_MODULE_PATH, 'r', encoding='utf-8') as f: pico_module = yaml.safe_load(f) system_prompt = pico_module.get('glm_annotator_config', {}).get('system_prompt') if not system_prompt: print("Warning: system_prompt not found in PiCo module") return None return system_prompt.strip() except Exception as e: print(f"Error loading PiCo module: {e}") return None # Cache the system prompt to avoid re-reading file for each profile _CACHED_SYSTEM_PROMPT: Optional[str] = None def get_pico_system_prompt() -> Optional[str]: """Get cached PiCo system prompt, loading if needed.""" global _CACHED_SYSTEM_PROMPT if _CACHED_SYSTEM_PROMPT is None: _CACHED_SYSTEM_PROMPT = load_pico_system_prompt() return _CACHED_SYSTEM_PROMPT def extract_linkedin_urls(staff_data: Dict) -> List[str]: """Extract LinkedIn URLs from staff data.""" urls = [] if 'staff' in staff_data: for person in staff_data['staff']: # Check both possible field names url = person.get('linkedin_url') or person.get('linkedin_profile_url') if url and url not in urls: urls.append(url) return urls def extract_slug_from_url(url: str) -> Optional[str]: """Extract LinkedIn slug from URL.""" match = re.search(r"linkedin\.com/in/([a-zA-Z0-9\-]+)", url) if match: return match.group(1) return None def generate_filename(slug: str) -> str: """Generate filename for profile data.""" timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") return f"{slug}_{timestamp}.json" def fetch_profile_with_exa(url: str, session: httpx.Client, use_pico_annotator: bool = True, source_type: str = "modern_digital") -> Optional[Dict]: """Fetch profile data using Exa contents API, optionally with PiCo annotation. Returns the RAW Exa API response plus optional PiCo structured data. This preserves all source data while adding semantic structure. Args: url: LinkedIn profile URL session: HTTP client session use_pico_annotator: If True, call GLM-4.6 to structure the data using PiCo source_type: Source category (modern_digital, historical_indices, etc.) """ exa_api_key = os.environ.get('EXA_API_KEY') if not exa_api_key: print("Error: EXA_API_KEY not set") return None try: # Use Exa contents endpoint to fetch LinkedIn page content # Use "fallback" to prefer cached content (faster, cheaper) but fall back to live if needed exa_response = session.post( "https://api.exa.ai/contents", headers={ "x-api-key": exa_api_key, "Content-Type": "application/json", }, json={ "urls": [url], "text": True, "livecrawl": "always", # Always fetch fresh content "livecrawlTimeout": 30000, }, timeout=60.0 ) if exa_response.status_code != 200: print(f"Exa API error for {url}: {exa_response.status_code} - {exa_response.text[:200]}") return None exa_result = exa_response.json() # Validate we got results if not exa_result.get("results") or len(exa_result["results"]) == 0: print(f"No content returned from Exa for {url}") return None result = exa_result["results"][0] page_content = result.get("text", "") if not page_content or len(page_content) < 50: print(f"Insufficient content from Exa for {url} (len={len(page_content)})") return None # Build response with raw Exa data response_data = { "exa_raw_response": exa_result, "linkedin_url": url } # Optionally add PiCo structured data if use_pico_annotator: pico_structured = structure_with_pico_annotator( exa_content=page_content, url=url, session=session, source_type=source_type ) if pico_structured: response_data["pico_structured"] = pico_structured return response_data except httpx.RequestError as e: print(f"HTTP error fetching {url}: {e}") return None except Exception as e: print(f"Exception fetching {url}: {e}") return None def parse_linkedin_content(content: str, url: str, exa_result: Dict) -> Optional[Dict]: """Parse LinkedIn profile content from Exa response. Exa returns structured text content that we can parse directly. The response typically includes: - title: "Name | Headline" format - author: Name - text: Structured profile content with sections """ if not content: return None # Extract name from title or first line name = None headline = None location = None about = None connections = None lines = content.strip().split('\n') # Try to extract name from Exa metadata - author field is most reliable if exa_result.get("author"): name = exa_result["author"].strip() # Also check title for name and headline if exa_result.get("title"): # Title often contains "Name | Headline" format title = exa_result["title"] if " | " in title: title_parts = title.split(" | ", 1) if not name: name = title_parts[0].strip() # The part after the pipe is often the headline headline_candidate = title_parts[1].strip() if headline_candidate and "LinkedIn" not in headline_candidate: headline = headline_candidate elif " - " in title: title_parts = title.split(" - ", 1) if not name: name = title_parts[0].strip() headline_candidate = title_parts[1].strip() if headline_candidate and "LinkedIn" not in headline_candidate: headline = headline_candidate elif not name: name = title.strip() # Remove common suffixes if still present if name: name = re.sub(r'\s*\|\s*LinkedIn.*$', '', name) name = re.sub(r'\s*-\s*LinkedIn.*$', '', name) # Parse structured sections from content current_section = None experience = [] education = [] skills = [] about_lines = [] for i, line in enumerate(lines): line = line.strip() if not line: continue # Skip markdown headers that are just the name if line.startswith('#') and name and name.lower() in line.lower(): continue # Detect sections lower_line = line.lower().strip('#').strip() if lower_line in ['experience', 'ervaring', 'werkervaring']: current_section = 'experience' continue elif lower_line in ['education', 'opleiding', 'educatie', 'opleidingen']: current_section = 'education' continue elif lower_line in ['skills', 'vaardigheden', 'skills & endorsements']: current_section = 'skills' continue elif lower_line in ['about', 'over', 'info', 'summary', 'samenvatting']: current_section = 'about' continue elif lower_line in ['languages', 'talen']: current_section = 'languages' continue # Extract headline if not already found (usually near the top) if not headline and i < 10 and len(line) > 10 and len(line) < 200: # Skip lines that are clearly not headlines skip_patterns = ['linkedin', 'connection', 'follower', '#', 'http', 'experience', 'education'] if not any(p in line.lower() for p in skip_patterns): if name and name.lower() not in line.lower(): headline = line # Extract location if not location: location_keywords = ['netherlands', 'amsterdam', 'rotterdam', 'utrecht', 'nederland', 'den haag', 'eindhoven', 'groningen', 'area', 'region'] if any(x in line.lower() for x in location_keywords): if len(line) < 100: # Location lines are usually short location = line # Extract connections if 'connection' in line.lower() or 'follower' in line.lower(): connections = line # Parse sections if current_section == 'about': about_lines.append(line) elif current_section == 'skills' and line and not line.startswith('#'): # Clean up skill entries skill = line.strip('•-* ').strip() if skill and len(skill) > 1 and len(skill) < 50: skills.append(skill) elif current_section == 'experience' and line and not line.startswith('#'): experience.append(line) elif current_section == 'education' and line and not line.startswith('#'): education.append(line) # Combine about lines if about_lines: about = ' '.join(about_lines) # Validate name if not name or len(name) < 2: return None # Clean up name name = name.strip() if name.lower() in ['linkedin member', 'unknown', 'n/a', 'linkedin']: return None return { "name": name, "linkedin_url": url, "headline": headline or "", "location": location or "", "connections": connections or "", "about": about or "", "experience": experience, "education": education, "skills": skills, "languages": [], "profile_image_url": "" } def structure_with_pico_annotator(exa_content: str, url: str, session: httpx.Client, source_type: str = "modern_digital") -> Optional[Dict]: """Use GLM-4.6 with PiCo/CH-Annotator convention to structure profile data. This creates a structured profile following PiCo (Person in Context) ontology patterns: - PersonObservation: The profile as observed (source-bound) - PNV PersonName: Structured name components - sdo:hasOccupation: Occupational roles - Heritage-relevant experience tagging The system prompt is loaded from the PiCo YAML module to ensure consistency with the CH-Annotator v1.7.0 convention. Args: exa_content: Raw text content from source url: Source URL session: HTTP client session source_type: Source category (modern_digital, historical_indices, archival_descriptions, biographical_dictionaries) Returns: Structured profile data following PiCo conventions, or None if failed """ zai_api_token = os.environ.get('ZAI_API_TOKEN') if not zai_api_token: print("Warning: ZAI_API_TOKEN not set - skipping PiCo annotation") return None # Load system prompt from PiCo YAML module system_prompt = get_pico_system_prompt() if not system_prompt: print("Warning: Could not load PiCo system prompt - using fallback") # Minimal fallback prompt if YAML loading fails system_prompt = """You are a CH-Annotator extracting person data. Return ONLY valid JSON with: { "pico_observation": {"observation_id": "...", "observed_at": "...", "source_type": "...", "source_reference": "..."}, "persons": [{"pnv_name": {"literalName": "...", "givenName": null, "surnamePrefix": null, "baseSurname": null}, "roles": [], "heritage_relevant_experience": []}], "organizations_mentioned": [] } CRITICAL: ONLY extract data that EXISTS. NEVER fabricate.""" try: # Build user message with source context user_message = f"""Source Type: {source_type} Source URL: {url} Content: {exa_content[:12000]}""" glm_response = session.post( "https://api.z.ai/api/coding/paas/v4/chat/completions", headers={ "Authorization": f"Bearer {zai_api_token}", "Content-Type": "application/json", }, json={ "model": "glm-4.6", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "temperature": 0.1, "max_tokens": 4000 }, timeout=90.0 # Increased timeout for GLM processing ) if glm_response.status_code != 200: print(f"GLM API error for {url}: {glm_response.status_code}") return None result = glm_response.json() glm_content = result.get('choices', [{}])[0].get('message', {}).get('content', '') if not glm_content: print(f"Empty GLM response for {url}") return None # Try to parse JSON from response # First try: direct JSON parse try: structured = json.loads(glm_content) return structured except json.JSONDecodeError: pass # Second try: extract from markdown code block json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', glm_content) if json_match: try: structured = json.loads(json_match.group(1)) return structured except json.JSONDecodeError: pass # Third try: find JSON object in text json_match = re.search(r'\{[\s\S]*\}', glm_content) if json_match: try: structured = json.loads(json_match.group()) return structured except json.JSONDecodeError: pass print(f"Could not parse GLM response as JSON for {url}") return None except httpx.RequestError as e: print(f"HTTP error calling GLM for {url}: {e}") return None except Exception as e: print(f"Exception in PiCo annotation for {url}: {e}") return None def enhance_with_glm(profile_data: Dict, content: str, url: str, session: httpx.Client) -> Dict: """DEPRECATED: Use structure_with_pico_annotator instead. This function is kept for backward compatibility but delegates to the new PiCo annotator. """ structured = structure_with_pico_annotator(content, url, session) if structured and structured.get("profile_data"): return structured["profile_data"] return profile_data def save_profile(slug: str, profile_data: Dict, source_url: str, entity_dir: Path) -> Optional[str]: """Save Exa profile data with optional PiCo structured annotation. Stores the complete Exa API response plus PiCo structured data if available. This preserves all source data while adding semantic structure. """ # Check we have raw Exa response if not profile_data.get("exa_raw_response"): print(f"Skipping save for {slug} - no Exa response data") return None exa_response = profile_data["exa_raw_response"] # Basic validation - ensure we have results if not exa_response.get("results") or len(exa_response["results"]) == 0: print(f"Skipping save for {slug} - empty results") return None filename = generate_filename(slug) filepath = entity_dir / filename # Determine extraction method based on whether PiCo was used has_pico = "pico_structured" in profile_data and profile_data["pico_structured"] is not None extraction_method = "exa_contents_raw+pico_glm" if has_pico else "exa_contents_raw" # Structure with metadata + raw Exa response + optional PiCo structured structured_data = { "extraction_metadata": { "source_file": "staff_parsing", "staff_id": f"{slug}_profile", "extraction_date": datetime.now(timezone.utc).isoformat(), "extraction_method": extraction_method, "extraction_agent": "", "linkedin_url": source_url, "cost_usd": exa_response.get("costDollars", {}).get("total", 0), "request_id": exa_response.get("requestId", hashlib.md5(source_url.encode()).hexdigest()) }, "exa_raw_response": exa_response } # Add PiCo structured data if available if has_pico: structured_data["pico_structured"] = profile_data["pico_structured"] with open(filepath, 'w', encoding='utf-8') as f: json.dump(structured_data, f, indent=2, ensure_ascii=False) return filename def load_existing_profiles(entity_dir: Path) -> Set[str]: """Load existing profile filenames to avoid duplicates.""" existing = set() if not entity_dir.exists(): entity_dir.mkdir(parents=True, exist_ok=True) return existing for file_path in entity_dir.glob("*.json"): match = re.match(r"([a-zA-Z0-9\-]+)_\d{8}T\d{6}Z\.json", file_path.name) if match: existing.add(match.group(1)) return existing def load_all_staff_files(staff_dirs: List[str]) -> List[str]: """Load all staff files from multiple directories and extract LinkedIn URLs.""" all_urls = [] file_count = 0 for staff_dir in staff_dirs: dir_path = Path(staff_dir) if not dir_path.exists(): print(f"Warning: Directory not found: {staff_dir}") continue for file_path in dir_path.glob("*.json"): file_count += 1 try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) urls = extract_linkedin_urls(data) all_urls.extend(urls) except Exception as e: print(f"Error loading {file_path}: {e}") # Remove duplicates while preserving order seen = set() unique_urls = [] for url in all_urls: if url not in seen: seen.add(url) unique_urls.append(url) print(f"\nProcessed {file_count} staff files") print(f"Found {len(all_urls)} total LinkedIn URLs") print(f"Found {len(unique_urls)} unique LinkedIn URLs") return unique_urls def process_person(url: str, session: httpx.Client, existing_profiles: Set[str], entity_dir: Path, use_pico: bool = True, source_type: str = "modern_digital") -> Tuple[str, bool, Optional[str]]: """Process a single person's LinkedIn profile. Args: url: LinkedIn profile URL session: HTTP client session existing_profiles: Set of already-processed profile slugs entity_dir: Directory to save profiles use_pico: Whether to use PiCo annotator for structuring source_type: Source category for PiCo annotation """ slug = extract_slug_from_url(url) if not slug: return url, False, "No slug found" # Check if we already have this profile if slug in existing_profiles: return url, False, "Already exists" # Rate limiting - 2 second delay between requests to avoid overloading APIs time.sleep(2) # Fetch profile - returns raw Exa response + optional PiCo structured profile_data = fetch_profile_with_exa(url, session, use_pico_annotator=use_pico, source_type=source_type) if profile_data: filename = save_profile(slug, profile_data, url, entity_dir) if filename: return url, True, filename return url, False, "Save failed" return url, False, None def main(): """Main entry point.""" # Parse command line arguments parser = argparse.ArgumentParser( description="Fetch LinkedIn profiles and optionally structure with PiCo/CH-Annotator" ) parser.add_argument( "batch_size", type=int, nargs="?", default=None, help="Number of profiles to process (omit for interactive mode)" ) parser.add_argument( "--no-pico", action="store_true", help="Disable PiCo/CH-Annotator structuring (only save raw Exa data)" ) parser.add_argument( "--source-type", type=str, choices=["modern_digital", "historical_indices", "archival_descriptions", "biographical_dictionaries"], default="modern_digital", help="Source type category for PiCo annotation (default: modern_digital)" ) args = parser.parse_args() batch_size = args.batch_size use_pico = not args.no_pico source_type = args.source_type if batch_size is not None: print(f"Will process first {batch_size} profiles") if use_pico: print(f"PiCo/CH-Annotator enabled (source_type: {source_type})") else: print("PiCo/CH-Annotator disabled (raw mode only)") # Check for required API keys if not os.environ.get('EXA_API_KEY'): print("Error: EXA_API_KEY environment variable not set") print("Please set it in your .env file or environment") sys.exit(1) # ZAI_API_TOKEN required for PiCo annotation if use_pico and not os.environ.get('ZAI_API_TOKEN'): print("Warning: ZAI_API_TOKEN not set - PiCo annotation will be skipped") print("Set ZAI_API_TOKEN or use --no-pico to disable PiCo annotation") elif not os.environ.get('ZAI_API_TOKEN'): print("Note: ZAI_API_TOKEN not set (not needed in --no-pico mode)") # Setup paths entity_dir = Path("/Users/kempersc/apps/glam/data/custodian/person/entity") # Load existing profiles print("Loading existing profiles...") existing_profiles = load_existing_profiles(entity_dir) print(f"Found {len(existing_profiles)} existing profiles") # Path to the staff directory staff_dir = "/Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed" # Load all staff files print("\n" + "="*60) print("LOADING STAFF FILES") print("="*60) urls = load_all_staff_files([staff_dir]) if not urls: print("No LinkedIn URLs found to process.") return # Filter out URLs we already have new_urls = [] for url in urls: slug = extract_slug_from_url(url) if slug and slug not in existing_profiles: new_urls.append(url) print(f"\nNeed to fetch {len(new_urls)} new profiles") print(f"Skipping {len(urls) - len(new_urls)} already existing profiles") if not new_urls: print("\nAll profiles already exist!") return # Determine how many to process if batch_size is not None: # Use command line batch size new_urls = new_urls[:batch_size] print(f"Processing first {len(new_urls)} profiles (batch size: {batch_size})...") else: # Ask user interactively print(f"\nThere are {len(new_urls)} new profiles to fetch.") print("Enter number to process (or press Enter for all): ", end="") try: response = input() if response.strip(): limit = int(response.strip()) new_urls = new_urls[:limit] print(f"Processing first {len(new_urls)} profiles...") else: print("Processing all profiles...") except (ValueError, KeyboardInterrupt, EOFError): print("\nProcessing all profiles...") # Process with threading success_count = 0 failed_count = 0 results = [] print("\n" + "="*60) print("FETCHING PROFILES") print("="*60) with httpx.Client(timeout=90.0) as session: # Increased timeout to 90s with ThreadPoolExecutor(max_workers=1) as executor: # Reduced to 1 to avoid rate limits # Submit all tasks future_to_url = { executor.submit(process_person, url, session, existing_profiles, entity_dir, use_pico, source_type): url for url in new_urls } # Process with progress bar with tqdm(total=len(new_urls), desc="Fetching profiles") as pbar: for future in as_completed(future_to_url): url, success, result = future.result() if success: success_count += 1 results.append(f"✓ {url} -> {result}") else: failed_count += 1 results.append(f"✗ {url} -> {result}") pbar.update(1) # Save results log log_filename = f"fetch_log_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.txt" with open(log_filename, 'w') as f: f.write(f"LinkedIn Profile Fetch Results\n") f.write(f"Timestamp: {datetime.now(timezone.utc).isoformat()}\n") f.write(f"Total attempted: {len(new_urls)}\n") f.write(f"Successful: {success_count}\n") f.write(f"Failed: {failed_count}\n") f.write(f"\n" + "="*60 + "\n") for result in results: f.write(result + "\n") print("\n" + "="*60) print("RESULTS") print("="*60) print(f"Successfully fetched: {success_count}") print(f"Failed: {failed_count}") print(f"Profiles saved to: {entity_dir}") print(f"Results log: {log_filename}") # Show some successful profiles if success_count > 0: print(f"\nFirst 5 successfully fetched profiles:") for i, result in enumerate([r for r in results if r.startswith("✓")][:5]): print(f" {i+1}. {result}") if __name__ == "__main__": main()