#!/usr/bin/env python3 """ Complete LinkedIn profile fetching system that processes all staff directories and fetches only new profiles not already in entity directory. """ import json import os import sys import time import hashlib import argparse from pathlib import Path # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed import re import subprocess import httpx import yaml from tqdm import tqdm # Path to PiCo integration module with GLM system prompt PICO_MODULE_PATH = Path("/Users/kempersc/apps/glam/data/entity_annotation/modules/integrations/pico.yaml") def load_pico_system_prompt() -> Optional[str]: """Load the GLM CH-Annotator system prompt from PiCo YAML module. Returns: The system prompt string, or None if loading fails. """ try: if not PICO_MODULE_PATH.exists(): print(f"Warning: PiCo module not found at {PICO_MODULE_PATH}") return None with open(PICO_MODULE_PATH, 'r', encoding='utf-8') as f: pico_module = yaml.safe_load(f) system_prompt = pico_module.get('glm_annotator_config', {}).get('system_prompt') if not system_prompt: print("Warning: system_prompt not found in PiCo module") return None return system_prompt.strip() except Exception as e: print(f"Error loading PiCo module: {e}") return None def load_linkedin_system_prompt() -> Optional[str]: """Load the LinkedIn-specific system prompt from PiCo YAML module. This prompt is optimized for extracting structured data from LinkedIn profiles fetched via Exa API. It instructs GLM to extract connected entities including time periods, locations, organization URLs, and company metadata. Returns: The LinkedIn-specific system prompt string, or None if loading fails. """ try: if not PICO_MODULE_PATH.exists(): print(f"Warning: PiCo module not found at {PICO_MODULE_PATH}") return None with open(PICO_MODULE_PATH, 'r', encoding='utf-8') as f: pico_module = yaml.safe_load(f) # Try LinkedIn-specific prompt first linkedin_prompt = pico_module.get('glm_annotator_config', {}).get('linkedin_system_prompt') if linkedin_prompt: return linkedin_prompt.strip() # Fall back to generic prompt if LinkedIn prompt not found print("Warning: linkedin_system_prompt not found in PiCo module, using generic prompt") return pico_module.get('glm_annotator_config', {}).get('system_prompt', '').strip() except Exception as e: print(f"Error loading LinkedIn prompt: {e}") return None # Cache the system prompts to avoid re-reading file for each profile _CACHED_SYSTEM_PROMPT: Optional[str] = None _CACHED_LINKEDIN_PROMPT: Optional[str] = None def get_pico_system_prompt() -> Optional[str]: """Get cached PiCo system prompt, loading if needed.""" global _CACHED_SYSTEM_PROMPT if _CACHED_SYSTEM_PROMPT is None: _CACHED_SYSTEM_PROMPT = load_pico_system_prompt() return _CACHED_SYSTEM_PROMPT def get_linkedin_system_prompt() -> Optional[str]: """Get cached LinkedIn-specific system prompt, loading if needed.""" global _CACHED_LINKEDIN_PROMPT if _CACHED_LINKEDIN_PROMPT is None: _CACHED_LINKEDIN_PROMPT = load_linkedin_system_prompt() return _CACHED_LINKEDIN_PROMPT def extract_linkedin_urls(staff_data: Dict) -> List[str]: """Extract LinkedIn URLs from staff data.""" urls = [] if 'staff' in staff_data: for person in staff_data['staff']: # Check both possible field names url = person.get('linkedin_url') or person.get('linkedin_profile_url') if url and url not in urls: urls.append(url) return urls def extract_slug_from_url(url: str) -> Optional[str]: """Extract LinkedIn slug from URL.""" match = re.search(r"linkedin\.com/in/([a-zA-Z0-9\-]+)", url) if match: return match.group(1) return None def generate_filename(slug: str) -> str: """Generate filename for profile data.""" timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") return f"{slug}_{timestamp}.json" def fetch_profile_with_exa(url: str, session: httpx.Client, use_pico_annotator: bool = True, source_type: str = "modern_digital") -> Optional[Dict]: """Fetch profile data using Exa contents API, optionally with PiCo annotation. Returns the RAW Exa API response plus optional PiCo structured data. This preserves all source data while adding semantic structure. Args: url: LinkedIn profile URL session: HTTP client session use_pico_annotator: If True, call GLM-4.6 to structure the data using PiCo source_type: Source category (modern_digital, historical_indices, etc.) """ exa_api_key = os.environ.get('EXA_API_KEY') if not exa_api_key: print("Error: EXA_API_KEY not set") return None try: # Use Exa contents endpoint to fetch LinkedIn page content # Use "fallback" to prefer cached content (faster, cheaper) but fall back to live if needed exa_response = session.post( "https://api.exa.ai/contents", headers={ "x-api-key": exa_api_key, "Content-Type": "application/json", }, json={ "urls": [url], "text": True, "livecrawl": "always", # Always fetch fresh content "livecrawlTimeout": 30000, }, timeout=60.0 ) if exa_response.status_code != 200: print(f"Exa API error for {url}: {exa_response.status_code} - {exa_response.text[:200]}") return None exa_result = exa_response.json() # Validate we got results if not exa_result.get("results") or len(exa_result["results"]) == 0: print(f"No content returned from Exa for {url}") return None result = exa_result["results"][0] page_content = result.get("text", "") if not page_content or len(page_content) < 50: print(f"Insufficient content from Exa for {url} (len={len(page_content)})") return None # Build response with raw Exa data response_data = { "exa_raw_response": exa_result, "linkedin_url": url } # Optionally add PiCo structured data if use_pico_annotator: pico_structured = structure_with_pico_annotator( exa_content=page_content, url=url, session=session, source_type=source_type ) if pico_structured: response_data["pico_structured"] = pico_structured # Log success with heritage experience count persons = pico_structured.get('persons', []) if persons: heritage_exp = persons[0].get('heritage_relevant_experience', []) print(f"✓ PiCo structured: {len(heritage_exp)} heritage experience entries for {url.split('/')[-1]}") return response_data except httpx.RequestError as e: print(f"HTTP error fetching {url}: {e}") return None except Exception as e: print(f"Exception fetching {url}: {e}") return None def parse_linkedin_content(content: str, url: str, exa_result: Dict) -> Optional[Dict]: """Parse LinkedIn profile content from Exa response. Exa returns structured text content that we can parse directly. The response typically includes: - title: "Name | Headline" format - author: Name - text: Structured profile content with sections """ if not content: return None # Extract name from title or first line name = None headline = None location = None about = None connections = None lines = content.strip().split('\n') # Try to extract name from Exa metadata - author field is most reliable if exa_result.get("author"): name = exa_result["author"].strip() # Also check title for name and headline if exa_result.get("title"): # Title often contains "Name | Headline" format title = exa_result["title"] if " | " in title: title_parts = title.split(" | ", 1) if not name: name = title_parts[0].strip() # The part after the pipe is often the headline headline_candidate = title_parts[1].strip() if headline_candidate and "LinkedIn" not in headline_candidate: headline = headline_candidate elif " - " in title: title_parts = title.split(" - ", 1) if not name: name = title_parts[0].strip() headline_candidate = title_parts[1].strip() if headline_candidate and "LinkedIn" not in headline_candidate: headline = headline_candidate elif not name: name = title.strip() # Remove common suffixes if still present if name: name = re.sub(r'\s*\|\s*LinkedIn.*$', '', name) name = re.sub(r'\s*-\s*LinkedIn.*$', '', name) # Parse structured sections from content current_section = None experience = [] education = [] skills = [] about_lines = [] for i, line in enumerate(lines): line = line.strip() if not line: continue # Skip markdown headers that are just the name if line.startswith('#') and name and name.lower() in line.lower(): continue # Detect sections lower_line = line.lower().strip('#').strip() if lower_line in ['experience', 'ervaring', 'werkervaring']: current_section = 'experience' continue elif lower_line in ['education', 'opleiding', 'educatie', 'opleidingen']: current_section = 'education' continue elif lower_line in ['skills', 'vaardigheden', 'skills & endorsements']: current_section = 'skills' continue elif lower_line in ['about', 'over', 'info', 'summary', 'samenvatting']: current_section = 'about' continue elif lower_line in ['languages', 'talen']: current_section = 'languages' continue # Extract headline if not already found (usually near the top) if not headline and i < 10 and len(line) > 10 and len(line) < 200: # Skip lines that are clearly not headlines skip_patterns = ['linkedin', 'connection', 'follower', '#', 'http', 'experience', 'education'] if not any(p in line.lower() for p in skip_patterns): if name and name.lower() not in line.lower(): headline = line # Extract location if not location: location_keywords = ['netherlands', 'amsterdam', 'rotterdam', 'utrecht', 'nederland', 'den haag', 'eindhoven', 'groningen', 'area', 'region'] if any(x in line.lower() for x in location_keywords): if len(line) < 100: # Location lines are usually short location = line # Extract connections if 'connection' in line.lower() or 'follower' in line.lower(): connections = line # Parse sections if current_section == 'about': about_lines.append(line) elif current_section == 'skills' and line and not line.startswith('#'): # Clean up skill entries skill = line.strip('•-* ').strip() if skill and len(skill) > 1 and len(skill) < 50: skills.append(skill) elif current_section == 'experience' and line and not line.startswith('#'): experience.append(line) elif current_section == 'education' and line and not line.startswith('#'): education.append(line) # Combine about lines if about_lines: about = ' '.join(about_lines) # Validate name if not name or len(name) < 2: return None # Clean up name name = name.strip() if name.lower() in ['linkedin member', 'unknown', 'n/a', 'linkedin']: return None return { "name": name, "linkedin_url": url, "headline": headline or "", "location": location or "", "connections": connections or "", "about": about or "", "experience": experience, "education": education, "skills": skills, "languages": [], "profile_image_url": "" } def structure_with_pico_annotator(exa_content: str, url: str, session: httpx.Client, source_type: str = "modern_digital") -> Optional[Dict]: """Use GLM-4.6 with PiCo/CH-Annotator convention to structure profile data. This creates a structured profile following PiCo (Person in Context) ontology patterns: - PersonObservation: The profile as observed (source-bound) - PNV PersonName: Structured name components - sdo:hasOccupation: Occupational roles - Heritage-relevant experience tagging The system prompt is loaded from the PiCo YAML module to ensure consistency with the CH-Annotator v1.7.0 convention. For LinkedIn profiles (modern_digital), a specialized prompt is used that extracts connected entities with time periods, locations, and organization metadata. Args: exa_content: Raw text content from source url: Source URL session: HTTP client session source_type: Source category (modern_digital, historical_indices, archival_descriptions, biographical_dictionaries) Returns: Structured profile data following PiCo conventions, or None if failed """ zai_api_token = os.environ.get('ZAI_API_TOKEN') if not zai_api_token: print("Warning: ZAI_API_TOKEN not set - skipping PiCo annotation") return None # Choose appropriate system prompt based on source type # For LinkedIn profiles (modern_digital), use the specialized LinkedIn prompt # which extracts connected entities with time periods, locations, and org URLs if source_type == "modern_digital" and "linkedin.com" in url.lower(): system_prompt = get_linkedin_system_prompt() prompt_source = "LinkedIn-specific" else: system_prompt = get_pico_system_prompt() prompt_source = "generic PiCo" if not system_prompt: print(f"Warning: Could not load {prompt_source} system prompt - using fallback") # Minimal fallback prompt if YAML loading fails system_prompt = """You are a CH-Annotator extracting person data. Return ONLY valid JSON with: { "pico_observation": {"observation_id": "...", "observed_at": "...", "source_type": "...", "source_reference": "..."}, "persons": [{"pnv_name": {"literalName": "...", "givenName": null, "surnamePrefix": null, "baseSurname": null}, "roles": [], "heritage_relevant_experience": []}], "organizations_mentioned": [] } CRITICAL: ONLY extract data that EXISTS. NEVER fabricate.""" try: # Build user message with source context user_message = f"""Source Type: {source_type} Source URL: {url} Content: {exa_content[:12000]}""" glm_response = session.post( "https://api.z.ai/api/coding/paas/v4/chat/completions", headers={ "Authorization": f"Bearer {zai_api_token}", "Content-Type": "application/json", }, json={ "model": "glm-4.6", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], "temperature": 0.1, "max_tokens": 4000 }, timeout=90.0 # Increased timeout for GLM processing ) if glm_response.status_code != 200: print(f"GLM API error for {url}: {glm_response.status_code}") return None result = glm_response.json() glm_content = result.get('choices', [{}])[0].get('message', {}).get('content', '') if not glm_content: print(f"Empty GLM response for {url}") return None # Try to parse JSON from response # First try: direct JSON parse try: structured = json.loads(glm_content) return structured except json.JSONDecodeError: pass # Second try: extract from markdown code block json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', glm_content) if json_match: try: structured = json.loads(json_match.group(1)) return structured except json.JSONDecodeError: pass # Third try: find JSON object in text json_match = re.search(r'\{[\s\S]*\}', glm_content) if json_match: try: structured = json.loads(json_match.group()) return structured except json.JSONDecodeError: pass print(f"Could not parse GLM response as JSON for {url}") return None except httpx.RequestError as e: print(f"HTTP error calling GLM for {url}: {e}") return None except Exception as e: print(f"Exception in PiCo annotation for {url}: {e}") return None def enhance_with_glm(profile_data: Dict, content: str, url: str, session: httpx.Client) -> Dict: """DEPRECATED: Use structure_with_pico_annotator instead. This function is kept for backward compatibility but delegates to the new PiCo annotator. """ structured = structure_with_pico_annotator(content, url, session) if structured and structured.get("profile_data"): return structured["profile_data"] return profile_data def save_profile(slug: str, profile_data: Dict, source_url: str, entity_dir: Path) -> Optional[str]: """Save Exa profile data with optional PiCo structured annotation. Stores the complete Exa API response plus PiCo structured data if available. This preserves all source data while adding semantic structure. """ # Check we have raw Exa response if not profile_data.get("exa_raw_response"): print(f"Skipping save for {slug} - no Exa response data") return None exa_response = profile_data["exa_raw_response"] # Basic validation - ensure we have results if not exa_response.get("results") or len(exa_response["results"]) == 0: print(f"Skipping save for {slug} - empty results") return None filename = generate_filename(slug) filepath = entity_dir / filename # Determine extraction method based on whether PiCo was used has_pico = "pico_structured" in profile_data and profile_data["pico_structured"] is not None extraction_method = "exa_contents_raw+pico_glm" if has_pico else "exa_contents_raw" # Structure with metadata + raw Exa response + optional PiCo structured structured_data = { "extraction_metadata": { "source_file": "staff_parsing", "staff_id": f"{slug}_profile", "extraction_date": datetime.now(timezone.utc).isoformat(), "extraction_method": extraction_method, "extraction_agent": "", "linkedin_url": source_url, "cost_usd": exa_response.get("costDollars", {}).get("total", 0), "request_id": exa_response.get("requestId", hashlib.md5(source_url.encode()).hexdigest()) }, "exa_raw_response": exa_response } # Add PiCo structured data if available if has_pico: structured_data["pico_structured"] = profile_data["pico_structured"] with open(filepath, 'w', encoding='utf-8') as f: json.dump(structured_data, f, indent=2, ensure_ascii=False) return filename def load_existing_profiles(entity_dir: Path) -> Set[str]: """Load existing profile filenames to avoid duplicates.""" existing = set() if not entity_dir.exists(): entity_dir.mkdir(parents=True, exist_ok=True) return existing for file_path in entity_dir.glob("*.json"): match = re.match(r"([a-zA-Z0-9\-]+)_\d{8}T\d{6}Z\.json", file_path.name) if match: existing.add(match.group(1)) return existing # Priority order for core GLAM institutions CORE_GLAM_INSTITUTIONS = [ "rijksmuseum", "kb-nationale-bibliotheek", "nationaal-archief", "nederlands-instituut-voor-beeld-en-geluid", "eye-filmmuseum", "huygens-institute", "het-utrechts-archief", "collectie-overijssel", "regionaal-archief-zuid-utrecht", "netwerk-digitaal-erfgoed", ] # Heritage type priority (lower = higher priority) HERITAGE_TYPE_PRIORITY = { "A": 1, # Archive "M": 2, # Museum "L": 3, # Library "R": 4, # Research "D": 5, # Digital "E": 6, # Education "G": 7, # Gallery "O": 8, # Official "S": 9, # Society } def load_all_staff_files_with_priority(staff_dirs: List[str]) -> List[Dict]: """Load all staff files and extract LinkedIn URLs with priority metadata. Returns list of dicts with url, heritage_relevant, heritage_type, custodian_slug, name sorted by priority (core GLAM institutions + heritage-relevant first). """ all_staff = [] file_count = 0 for staff_dir in staff_dirs: dir_path = Path(staff_dir) if not dir_path.exists(): print(f"Warning: Directory not found: {staff_dir}") continue for file_path in dir_path.glob("*.json"): if file_path.name.startswith("batch_results"): continue # Skip aggregated results file_count += 1 try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) custodian_slug = data.get('custodian_metadata', {}).get('custodian_slug', 'unknown') custodian_name = data.get('custodian_metadata', {}).get('custodian_name', 'Unknown') if 'staff' in data: for person in data['staff']: url = person.get('linkedin_url') or person.get('linkedin_profile_url') if url: all_staff.append({ 'url': url, 'name': person.get('name', ''), 'heritage_relevant': person.get('heritage_relevant', False), 'heritage_type': person.get('heritage_type'), 'custodian_slug': custodian_slug, 'custodian_name': custodian_name, }) except Exception as e: print(f"Error loading {file_path}: {e}") # Remove duplicates by URL while keeping first occurrence seen_urls = set() unique_staff = [] for staff in all_staff: if staff['url'] not in seen_urls: seen_urls.add(staff['url']) unique_staff.append(staff) # Sort by priority: # 1. Heritage-relevant from core GLAM institutions # 2. Heritage-relevant from other institutions # 3. Non-heritage-relevant from core GLAM # 4. All others def get_priority(staff: Dict) -> Tuple[int, int, int, str]: is_core_glam = staff['custodian_slug'] in CORE_GLAM_INSTITUTIONS core_glam_idx = CORE_GLAM_INSTITUTIONS.index(staff['custodian_slug']) if is_core_glam else 100 is_heritage = staff['heritage_relevant'] heritage_type_priority = HERITAGE_TYPE_PRIORITY.get(staff['heritage_type'], 99) # Primary: heritage-relevant first (0), then non-heritage (1) # Secondary: core GLAM first (0-9), others (100) # Tertiary: heritage type priority # Quaternary: alphabetical by name for stability return ( 0 if is_heritage else 1, core_glam_idx, heritage_type_priority, staff['name'].lower() ) unique_staff.sort(key=get_priority) # Print stats heritage_count = sum(1 for s in unique_staff if s['heritage_relevant']) core_glam_count = sum(1 for s in unique_staff if s['custodian_slug'] in CORE_GLAM_INSTITUTIONS) core_heritage = sum(1 for s in unique_staff if s['heritage_relevant'] and s['custodian_slug'] in CORE_GLAM_INSTITUTIONS) print(f"\nProcessed {file_count} staff files") print(f"Found {len(all_staff)} total LinkedIn URLs") print(f"Found {len(unique_staff)} unique LinkedIn URLs") print(f" - Heritage-relevant: {heritage_count}") print(f" - Core GLAM institutions: {core_glam_count}") print(f" - Heritage-relevant at core GLAM: {core_heritage}") return unique_staff def load_all_staff_files(staff_dirs: List[str]) -> List[str]: """Load all staff files from multiple directories and extract LinkedIn URLs. DEPRECATED: Use load_all_staff_files_with_priority() instead for prioritized fetching. """ all_urls = [] file_count = 0 for staff_dir in staff_dirs: dir_path = Path(staff_dir) if not dir_path.exists(): print(f"Warning: Directory not found: {staff_dir}") continue for file_path in dir_path.glob("*.json"): file_count += 1 try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) urls = extract_linkedin_urls(data) all_urls.extend(urls) except Exception as e: print(f"Error loading {file_path}: {e}") # Remove duplicates while preserving order seen = set() unique_urls = [] for url in all_urls: if url not in seen: seen.add(url) unique_urls.append(url) print(f"\nProcessed {file_count} staff files") print(f"Found {len(all_urls)} total LinkedIn URLs") print(f"Found {len(unique_urls)} unique LinkedIn URLs") return unique_urls def process_person(url: str, session: httpx.Client, existing_profiles: Set[str], entity_dir: Path, use_pico: bool = True, source_type: str = "modern_digital") -> Tuple[str, bool, Optional[str]]: """Process a single person's LinkedIn profile. Args: url: LinkedIn profile URL session: HTTP client session existing_profiles: Set of already-processed profile slugs entity_dir: Directory to save profiles use_pico: Whether to use PiCo annotator for structuring source_type: Source category for PiCo annotation """ slug = extract_slug_from_url(url) if not slug: return url, False, "No slug found" # Check if we already have this profile if slug in existing_profiles: return url, False, "Already exists" # Rate limiting - 2 second delay between requests to avoid overloading APIs time.sleep(2) # Fetch profile - returns raw Exa response + optional PiCo structured profile_data = fetch_profile_with_exa(url, session, use_pico_annotator=use_pico, source_type=source_type) if profile_data: filename = save_profile(slug, profile_data, url, entity_dir) if filename: return url, True, filename return url, False, "Save failed" return url, False, None def main(): """Main entry point.""" # Parse command line arguments parser = argparse.ArgumentParser( description="Fetch LinkedIn profiles and optionally structure with PiCo/CH-Annotator" ) parser.add_argument( "batch_size", type=int, nargs="?", default=None, help="Number of profiles to process (omit for interactive mode)" ) parser.add_argument( "--no-pico", action="store_true", help="Disable PiCo/CH-Annotator structuring (only save raw Exa data)" ) parser.add_argument( "--source-type", type=str, choices=["modern_digital", "historical_indices", "archival_descriptions", "biographical_dictionaries"], default="modern_digital", help="Source type category for PiCo annotation (default: modern_digital)" ) args = parser.parse_args() batch_size = args.batch_size use_pico = not args.no_pico source_type = args.source_type if batch_size is not None: print(f"Will process first {batch_size} profiles") if use_pico: print(f"PiCo/CH-Annotator enabled (source_type: {source_type})") else: print("PiCo/CH-Annotator disabled (raw mode only)") # Check for required API keys if not os.environ.get('EXA_API_KEY'): print("Error: EXA_API_KEY environment variable not set") print("Please set it in your .env file or environment") sys.exit(1) # ZAI_API_TOKEN required for PiCo annotation if use_pico and not os.environ.get('ZAI_API_TOKEN'): print("Warning: ZAI_API_TOKEN not set - PiCo annotation will be skipped") print("Set ZAI_API_TOKEN or use --no-pico to disable PiCo annotation") elif not os.environ.get('ZAI_API_TOKEN'): print("Note: ZAI_API_TOKEN not set (not needed in --no-pico mode)") # Setup paths entity_dir = Path("/Users/kempersc/apps/glam/data/custodian/person/entity") # Load existing profiles print("Loading existing profiles...") existing_profiles = load_existing_profiles(entity_dir) print(f"Found {len(existing_profiles)} existing profiles") # Path to the staff directory staff_dir = "/Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed" # Load all staff files with priority metadata print("\n" + "="*60) print("LOADING STAFF FILES WITH PRIORITY") print("="*60) all_staff = load_all_staff_files_with_priority([staff_dir]) if not all_staff: print("No LinkedIn URLs found to process.") return # Filter out profiles we already have new_staff = [] for staff in all_staff: slug = extract_slug_from_url(staff['url']) if slug and slug not in existing_profiles: new_staff.append(staff) # Calculate stats for new profiles heritage_new = sum(1 for s in new_staff if s['heritage_relevant']) core_glam_new = sum(1 for s in new_staff if s['custodian_slug'] in CORE_GLAM_INSTITUTIONS) core_heritage_new = sum(1 for s in new_staff if s['heritage_relevant'] and s['custodian_slug'] in CORE_GLAM_INSTITUTIONS) print(f"\nNeed to fetch {len(new_staff)} new profiles") print(f" - Heritage-relevant: {heritage_new}") print(f" - Core GLAM institutions: {core_glam_new}") print(f" - Heritage-relevant at core GLAM: {core_heritage_new}") print(f"Skipping {len(all_staff) - len(new_staff)} already existing profiles") if not new_staff: print("\nAll profiles already exist!") return # Show first 10 profiles that will be fetched (priority order) print("\n" + "-"*60) print("PRIORITY ORDER (first 10 profiles):") print("-"*60) for i, staff in enumerate(new_staff[:10]): ht = staff['heritage_type'] or '-' hr = '✓' if staff['heritage_relevant'] else ' ' core = '★' if staff['custodian_slug'] in CORE_GLAM_INSTITUTIONS else ' ' print(f" {i+1:2}. [{ht}]{hr}{core} {staff['name'][:30]:<30} @ {staff['custodian_name'][:25]}") # Extract just the URLs for processing new_urls = [s['url'] for s in new_staff] # Determine how many to process if batch_size is not None: # Use command line batch size new_urls = new_urls[:batch_size] new_staff_to_process = new_staff[:batch_size] print(f"\nProcessing first {len(new_urls)} profiles (batch size: {batch_size})...") else: # Ask user interactively print(f"\nThere are {len(new_urls)} new profiles to fetch.") print("Enter number to process (or press Enter for all): ", end="") try: response = input() if response.strip(): limit = int(response.strip()) new_urls = new_urls[:limit] new_staff_to_process = new_staff[:limit] print(f"Processing first {len(new_urls)} profiles...") else: new_staff_to_process = new_staff print("Processing all profiles...") except (ValueError, KeyboardInterrupt, EOFError): new_staff_to_process = new_staff print("\nProcessing all profiles...") # Process with threading success_count = 0 failed_count = 0 results = [] print("\n" + "="*60) print("FETCHING PROFILES") print("="*60) with httpx.Client(timeout=90.0) as session: # Increased timeout to 90s with ThreadPoolExecutor(max_workers=1) as executor: # Reduced to 1 to avoid rate limits # Submit all tasks future_to_url = { executor.submit(process_person, url, session, existing_profiles, entity_dir, use_pico, source_type): url for url in new_urls } # Process with progress bar with tqdm(total=len(new_urls), desc="Fetching profiles") as pbar: for future in as_completed(future_to_url): url, success, result = future.result() if success: success_count += 1 results.append(f"✓ {url} -> {result}") else: failed_count += 1 results.append(f"✗ {url} -> {result}") pbar.update(1) # Save results log log_filename = f"fetch_log_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.txt" with open(log_filename, 'w') as f: f.write(f"LinkedIn Profile Fetch Results\n") f.write(f"Timestamp: {datetime.now(timezone.utc).isoformat()}\n") f.write(f"Total attempted: {len(new_urls)}\n") f.write(f"Successful: {success_count}\n") f.write(f"Failed: {failed_count}\n") f.write(f"\n" + "="*60 + "\n") for result in results: f.write(result + "\n") print("\n" + "="*60) print("RESULTS") print("="*60) print(f"Successfully fetched: {success_count}") print(f"Failed: {failed_count}") print(f"Profiles saved to: {entity_dir}") print(f"Results log: {log_filename}") # Show some successful profiles if success_count > 0: print(f"\nFirst 5 successfully fetched profiles:") for i, result in enumerate([r for r in results if r.startswith("✓")][:5]): print(f" {i+1}. {result}") if __name__ == "__main__": main()