749 lines
No EOL
26 KiB
Python
Executable file
749 lines
No EOL
26 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Complete LinkedIn profile fetching system that processes all staff directories
|
|
and fetches only new profiles not already in entity directory.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import hashlib
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# Load environment variables from .env file
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import re
|
|
import subprocess
|
|
|
|
import httpx
|
|
import yaml
|
|
from tqdm import tqdm
|
|
|
|
|
|
# Path to PiCo integration module with GLM system prompt
|
|
PICO_MODULE_PATH = Path("/Users/kempersc/apps/glam/data/entity_annotation/modules/integrations/pico.yaml")
|
|
|
|
|
|
def load_pico_system_prompt() -> Optional[str]:
|
|
"""Load the GLM CH-Annotator system prompt from PiCo YAML module.
|
|
|
|
Returns:
|
|
The system prompt string, or None if loading fails.
|
|
"""
|
|
try:
|
|
if not PICO_MODULE_PATH.exists():
|
|
print(f"Warning: PiCo module not found at {PICO_MODULE_PATH}")
|
|
return None
|
|
|
|
with open(PICO_MODULE_PATH, 'r', encoding='utf-8') as f:
|
|
pico_module = yaml.safe_load(f)
|
|
|
|
system_prompt = pico_module.get('glm_annotator_config', {}).get('system_prompt')
|
|
if not system_prompt:
|
|
print("Warning: system_prompt not found in PiCo module")
|
|
return None
|
|
|
|
return system_prompt.strip()
|
|
except Exception as e:
|
|
print(f"Error loading PiCo module: {e}")
|
|
return None
|
|
|
|
|
|
# Cache the system prompt to avoid re-reading file for each profile
|
|
_CACHED_SYSTEM_PROMPT: Optional[str] = None
|
|
|
|
|
|
def get_pico_system_prompt() -> Optional[str]:
|
|
"""Get cached PiCo system prompt, loading if needed."""
|
|
global _CACHED_SYSTEM_PROMPT
|
|
if _CACHED_SYSTEM_PROMPT is None:
|
|
_CACHED_SYSTEM_PROMPT = load_pico_system_prompt()
|
|
return _CACHED_SYSTEM_PROMPT
|
|
|
|
|
|
def extract_linkedin_urls(staff_data: Dict) -> List[str]:
|
|
"""Extract LinkedIn URLs from staff data."""
|
|
urls = []
|
|
|
|
if 'staff' in staff_data:
|
|
for person in staff_data['staff']:
|
|
# Check both possible field names
|
|
url = person.get('linkedin_url') or person.get('linkedin_profile_url')
|
|
if url and url not in urls:
|
|
urls.append(url)
|
|
|
|
return urls
|
|
|
|
|
|
def extract_slug_from_url(url: str) -> Optional[str]:
|
|
"""Extract LinkedIn slug from URL."""
|
|
match = re.search(r"linkedin\.com/in/([a-zA-Z0-9\-]+)", url)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def generate_filename(slug: str) -> str:
|
|
"""Generate filename for profile data."""
|
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
return f"{slug}_{timestamp}.json"
|
|
|
|
|
|
def fetch_profile_with_exa(url: str, session: httpx.Client, use_pico_annotator: bool = True, source_type: str = "modern_digital") -> Optional[Dict]:
|
|
"""Fetch profile data using Exa contents API, optionally with PiCo annotation.
|
|
|
|
Returns the RAW Exa API response plus optional PiCo structured data.
|
|
This preserves all source data while adding semantic structure.
|
|
|
|
Args:
|
|
url: LinkedIn profile URL
|
|
session: HTTP client session
|
|
use_pico_annotator: If True, call GLM-4.6 to structure the data using PiCo
|
|
source_type: Source category (modern_digital, historical_indices, etc.)
|
|
"""
|
|
exa_api_key = os.environ.get('EXA_API_KEY')
|
|
|
|
if not exa_api_key:
|
|
print("Error: EXA_API_KEY not set")
|
|
return None
|
|
|
|
try:
|
|
# Use Exa contents endpoint to fetch LinkedIn page content
|
|
# Use "fallback" to prefer cached content (faster, cheaper) but fall back to live if needed
|
|
exa_response = session.post(
|
|
"https://api.exa.ai/contents",
|
|
headers={
|
|
"x-api-key": exa_api_key,
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"urls": [url],
|
|
"text": True,
|
|
"livecrawl": "always", # Always fetch fresh content
|
|
"livecrawlTimeout": 30000,
|
|
},
|
|
timeout=60.0
|
|
)
|
|
|
|
if exa_response.status_code != 200:
|
|
print(f"Exa API error for {url}: {exa_response.status_code} - {exa_response.text[:200]}")
|
|
return None
|
|
|
|
exa_result = exa_response.json()
|
|
|
|
# Validate we got results
|
|
if not exa_result.get("results") or len(exa_result["results"]) == 0:
|
|
print(f"No content returned from Exa for {url}")
|
|
return None
|
|
|
|
result = exa_result["results"][0]
|
|
page_content = result.get("text", "")
|
|
|
|
if not page_content or len(page_content) < 50:
|
|
print(f"Insufficient content from Exa for {url} (len={len(page_content)})")
|
|
return None
|
|
|
|
# Build response with raw Exa data
|
|
response_data = {
|
|
"exa_raw_response": exa_result,
|
|
"linkedin_url": url
|
|
}
|
|
|
|
# Optionally add PiCo structured data
|
|
if use_pico_annotator:
|
|
pico_structured = structure_with_pico_annotator(
|
|
exa_content=page_content,
|
|
url=url,
|
|
session=session,
|
|
source_type=source_type
|
|
)
|
|
if pico_structured:
|
|
response_data["pico_structured"] = pico_structured
|
|
|
|
return response_data
|
|
|
|
except httpx.RequestError as e:
|
|
print(f"HTTP error fetching {url}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Exception fetching {url}: {e}")
|
|
return None
|
|
|
|
|
|
def parse_linkedin_content(content: str, url: str, exa_result: Dict) -> Optional[Dict]:
|
|
"""Parse LinkedIn profile content from Exa response.
|
|
|
|
Exa returns structured text content that we can parse directly.
|
|
The response typically includes:
|
|
- title: "Name | Headline" format
|
|
- author: Name
|
|
- text: Structured profile content with sections
|
|
"""
|
|
if not content:
|
|
return None
|
|
|
|
# Extract name from title or first line
|
|
name = None
|
|
headline = None
|
|
location = None
|
|
about = None
|
|
connections = None
|
|
|
|
lines = content.strip().split('\n')
|
|
|
|
# Try to extract name from Exa metadata - author field is most reliable
|
|
if exa_result.get("author"):
|
|
name = exa_result["author"].strip()
|
|
|
|
# Also check title for name and headline
|
|
if exa_result.get("title"):
|
|
# Title often contains "Name | Headline" format
|
|
title = exa_result["title"]
|
|
if " | " in title:
|
|
title_parts = title.split(" | ", 1)
|
|
if not name:
|
|
name = title_parts[0].strip()
|
|
# The part after the pipe is often the headline
|
|
headline_candidate = title_parts[1].strip()
|
|
if headline_candidate and "LinkedIn" not in headline_candidate:
|
|
headline = headline_candidate
|
|
elif " - " in title:
|
|
title_parts = title.split(" - ", 1)
|
|
if not name:
|
|
name = title_parts[0].strip()
|
|
headline_candidate = title_parts[1].strip()
|
|
if headline_candidate and "LinkedIn" not in headline_candidate:
|
|
headline = headline_candidate
|
|
elif not name:
|
|
name = title.strip()
|
|
|
|
# Remove common suffixes if still present
|
|
if name:
|
|
name = re.sub(r'\s*\|\s*LinkedIn.*$', '', name)
|
|
name = re.sub(r'\s*-\s*LinkedIn.*$', '', name)
|
|
|
|
# Parse structured sections from content
|
|
current_section = None
|
|
experience = []
|
|
education = []
|
|
skills = []
|
|
about_lines = []
|
|
|
|
for i, line in enumerate(lines):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Skip markdown headers that are just the name
|
|
if line.startswith('#') and name and name.lower() in line.lower():
|
|
continue
|
|
|
|
# Detect sections
|
|
lower_line = line.lower().strip('#').strip()
|
|
if lower_line in ['experience', 'ervaring', 'werkervaring']:
|
|
current_section = 'experience'
|
|
continue
|
|
elif lower_line in ['education', 'opleiding', 'educatie', 'opleidingen']:
|
|
current_section = 'education'
|
|
continue
|
|
elif lower_line in ['skills', 'vaardigheden', 'skills & endorsements']:
|
|
current_section = 'skills'
|
|
continue
|
|
elif lower_line in ['about', 'over', 'info', 'summary', 'samenvatting']:
|
|
current_section = 'about'
|
|
continue
|
|
elif lower_line in ['languages', 'talen']:
|
|
current_section = 'languages'
|
|
continue
|
|
|
|
# Extract headline if not already found (usually near the top)
|
|
if not headline and i < 10 and len(line) > 10 and len(line) < 200:
|
|
# Skip lines that are clearly not headlines
|
|
skip_patterns = ['linkedin', 'connection', 'follower', '#', 'http', 'experience', 'education']
|
|
if not any(p in line.lower() for p in skip_patterns):
|
|
if name and name.lower() not in line.lower():
|
|
headline = line
|
|
|
|
# Extract location
|
|
if not location:
|
|
location_keywords = ['netherlands', 'amsterdam', 'rotterdam', 'utrecht', 'nederland',
|
|
'den haag', 'eindhoven', 'groningen', 'area', 'region']
|
|
if any(x in line.lower() for x in location_keywords):
|
|
if len(line) < 100: # Location lines are usually short
|
|
location = line
|
|
|
|
# Extract connections
|
|
if 'connection' in line.lower() or 'follower' in line.lower():
|
|
connections = line
|
|
|
|
# Parse sections
|
|
if current_section == 'about':
|
|
about_lines.append(line)
|
|
elif current_section == 'skills' and line and not line.startswith('#'):
|
|
# Clean up skill entries
|
|
skill = line.strip('•-* ').strip()
|
|
if skill and len(skill) > 1 and len(skill) < 50:
|
|
skills.append(skill)
|
|
elif current_section == 'experience' and line and not line.startswith('#'):
|
|
experience.append(line)
|
|
elif current_section == 'education' and line and not line.startswith('#'):
|
|
education.append(line)
|
|
|
|
# Combine about lines
|
|
if about_lines:
|
|
about = ' '.join(about_lines)
|
|
|
|
# Validate name
|
|
if not name or len(name) < 2:
|
|
return None
|
|
|
|
# Clean up name
|
|
name = name.strip()
|
|
if name.lower() in ['linkedin member', 'unknown', 'n/a', 'linkedin']:
|
|
return None
|
|
|
|
return {
|
|
"name": name,
|
|
"linkedin_url": url,
|
|
"headline": headline or "",
|
|
"location": location or "",
|
|
"connections": connections or "",
|
|
"about": about or "",
|
|
"experience": experience,
|
|
"education": education,
|
|
"skills": skills,
|
|
"languages": [],
|
|
"profile_image_url": ""
|
|
}
|
|
|
|
|
|
def structure_with_pico_annotator(exa_content: str, url: str, session: httpx.Client, source_type: str = "modern_digital") -> Optional[Dict]:
|
|
"""Use GLM-4.6 with PiCo/CH-Annotator convention to structure profile data.
|
|
|
|
This creates a structured profile following PiCo (Person in Context) ontology patterns:
|
|
- PersonObservation: The profile as observed (source-bound)
|
|
- PNV PersonName: Structured name components
|
|
- sdo:hasOccupation: Occupational roles
|
|
- Heritage-relevant experience tagging
|
|
|
|
The system prompt is loaded from the PiCo YAML module to ensure consistency
|
|
with the CH-Annotator v1.7.0 convention.
|
|
|
|
Args:
|
|
exa_content: Raw text content from source
|
|
url: Source URL
|
|
session: HTTP client session
|
|
source_type: Source category (modern_digital, historical_indices, archival_descriptions, biographical_dictionaries)
|
|
|
|
Returns:
|
|
Structured profile data following PiCo conventions, or None if failed
|
|
"""
|
|
zai_api_token = os.environ.get('ZAI_API_TOKEN')
|
|
if not zai_api_token:
|
|
print("Warning: ZAI_API_TOKEN not set - skipping PiCo annotation")
|
|
return None
|
|
|
|
# Load system prompt from PiCo YAML module
|
|
system_prompt = get_pico_system_prompt()
|
|
if not system_prompt:
|
|
print("Warning: Could not load PiCo system prompt - using fallback")
|
|
# Minimal fallback prompt if YAML loading fails
|
|
system_prompt = """You are a CH-Annotator extracting person data. Return ONLY valid JSON with:
|
|
{
|
|
"pico_observation": {"observation_id": "...", "observed_at": "...", "source_type": "...", "source_reference": "..."},
|
|
"persons": [{"pnv_name": {"literalName": "...", "givenName": null, "surnamePrefix": null, "baseSurname": null}, "roles": [], "heritage_relevant_experience": []}],
|
|
"organizations_mentioned": []
|
|
}
|
|
CRITICAL: ONLY extract data that EXISTS. NEVER fabricate."""
|
|
|
|
try:
|
|
# Build user message with source context
|
|
user_message = f"""Source Type: {source_type}
|
|
Source URL: {url}
|
|
|
|
Content:
|
|
{exa_content[:12000]}"""
|
|
|
|
glm_response = session.post(
|
|
"https://api.z.ai/api/coding/paas/v4/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {zai_api_token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json={
|
|
"model": "glm-4.6",
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_message}
|
|
],
|
|
"temperature": 0.1,
|
|
"max_tokens": 4000
|
|
},
|
|
timeout=90.0 # Increased timeout for GLM processing
|
|
)
|
|
|
|
if glm_response.status_code != 200:
|
|
print(f"GLM API error for {url}: {glm_response.status_code}")
|
|
return None
|
|
|
|
result = glm_response.json()
|
|
glm_content = result.get('choices', [{}])[0].get('message', {}).get('content', '')
|
|
|
|
if not glm_content:
|
|
print(f"Empty GLM response for {url}")
|
|
return None
|
|
|
|
# Try to parse JSON from response
|
|
# First try: direct JSON parse
|
|
try:
|
|
structured = json.loads(glm_content)
|
|
return structured
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Second try: extract from markdown code block
|
|
json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', glm_content)
|
|
if json_match:
|
|
try:
|
|
structured = json.loads(json_match.group(1))
|
|
return structured
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Third try: find JSON object in text
|
|
json_match = re.search(r'\{[\s\S]*\}', glm_content)
|
|
if json_match:
|
|
try:
|
|
structured = json.loads(json_match.group())
|
|
return structured
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
print(f"Could not parse GLM response as JSON for {url}")
|
|
return None
|
|
|
|
except httpx.RequestError as e:
|
|
print(f"HTTP error calling GLM for {url}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Exception in PiCo annotation for {url}: {e}")
|
|
return None
|
|
|
|
|
|
def enhance_with_glm(profile_data: Dict, content: str, url: str, session: httpx.Client) -> Dict:
|
|
"""DEPRECATED: Use structure_with_pico_annotator instead.
|
|
|
|
This function is kept for backward compatibility but delegates to the new PiCo annotator.
|
|
"""
|
|
structured = structure_with_pico_annotator(content, url, session)
|
|
if structured and structured.get("profile_data"):
|
|
return structured["profile_data"]
|
|
return profile_data
|
|
|
|
|
|
def save_profile(slug: str, profile_data: Dict, source_url: str, entity_dir: Path) -> Optional[str]:
|
|
"""Save Exa profile data with optional PiCo structured annotation.
|
|
|
|
Stores the complete Exa API response plus PiCo structured data if available.
|
|
This preserves all source data while adding semantic structure.
|
|
"""
|
|
# Check we have raw Exa response
|
|
if not profile_data.get("exa_raw_response"):
|
|
print(f"Skipping save for {slug} - no Exa response data")
|
|
return None
|
|
|
|
exa_response = profile_data["exa_raw_response"]
|
|
|
|
# Basic validation - ensure we have results
|
|
if not exa_response.get("results") or len(exa_response["results"]) == 0:
|
|
print(f"Skipping save for {slug} - empty results")
|
|
return None
|
|
|
|
filename = generate_filename(slug)
|
|
filepath = entity_dir / filename
|
|
|
|
# Determine extraction method based on whether PiCo was used
|
|
has_pico = "pico_structured" in profile_data and profile_data["pico_structured"] is not None
|
|
extraction_method = "exa_contents_raw+pico_glm" if has_pico else "exa_contents_raw"
|
|
|
|
# Structure with metadata + raw Exa response + optional PiCo structured
|
|
structured_data = {
|
|
"extraction_metadata": {
|
|
"source_file": "staff_parsing",
|
|
"staff_id": f"{slug}_profile",
|
|
"extraction_date": datetime.now(timezone.utc).isoformat(),
|
|
"extraction_method": extraction_method,
|
|
"extraction_agent": "",
|
|
"linkedin_url": source_url,
|
|
"cost_usd": exa_response.get("costDollars", {}).get("total", 0),
|
|
"request_id": exa_response.get("requestId", hashlib.md5(source_url.encode()).hexdigest())
|
|
},
|
|
"exa_raw_response": exa_response
|
|
}
|
|
|
|
# Add PiCo structured data if available
|
|
if has_pico:
|
|
structured_data["pico_structured"] = profile_data["pico_structured"]
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(structured_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return filename
|
|
|
|
|
|
def load_existing_profiles(entity_dir: Path) -> Set[str]:
|
|
"""Load existing profile filenames to avoid duplicates."""
|
|
existing = set()
|
|
if not entity_dir.exists():
|
|
entity_dir.mkdir(parents=True, exist_ok=True)
|
|
return existing
|
|
|
|
for file_path in entity_dir.glob("*.json"):
|
|
match = re.match(r"([a-zA-Z0-9\-]+)_\d{8}T\d{6}Z\.json", file_path.name)
|
|
if match:
|
|
existing.add(match.group(1))
|
|
|
|
return existing
|
|
|
|
|
|
def load_all_staff_files(staff_dirs: List[str]) -> List[str]:
|
|
"""Load all staff files from multiple directories and extract LinkedIn URLs."""
|
|
all_urls = []
|
|
file_count = 0
|
|
|
|
for staff_dir in staff_dirs:
|
|
dir_path = Path(staff_dir)
|
|
if not dir_path.exists():
|
|
print(f"Warning: Directory not found: {staff_dir}")
|
|
continue
|
|
|
|
for file_path in dir_path.glob("*.json"):
|
|
file_count += 1
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
urls = extract_linkedin_urls(data)
|
|
all_urls.extend(urls)
|
|
except Exception as e:
|
|
print(f"Error loading {file_path}: {e}")
|
|
|
|
# Remove duplicates while preserving order
|
|
seen = set()
|
|
unique_urls = []
|
|
for url in all_urls:
|
|
if url not in seen:
|
|
seen.add(url)
|
|
unique_urls.append(url)
|
|
|
|
print(f"\nProcessed {file_count} staff files")
|
|
print(f"Found {len(all_urls)} total LinkedIn URLs")
|
|
print(f"Found {len(unique_urls)} unique LinkedIn URLs")
|
|
|
|
return unique_urls
|
|
|
|
|
|
def process_person(url: str, session: httpx.Client, existing_profiles: Set[str], entity_dir: Path, use_pico: bool = True, source_type: str = "modern_digital") -> Tuple[str, bool, Optional[str]]:
|
|
"""Process a single person's LinkedIn profile.
|
|
|
|
Args:
|
|
url: LinkedIn profile URL
|
|
session: HTTP client session
|
|
existing_profiles: Set of already-processed profile slugs
|
|
entity_dir: Directory to save profiles
|
|
use_pico: Whether to use PiCo annotator for structuring
|
|
source_type: Source category for PiCo annotation
|
|
"""
|
|
slug = extract_slug_from_url(url)
|
|
if not slug:
|
|
return url, False, "No slug found"
|
|
|
|
# Check if we already have this profile
|
|
if slug in existing_profiles:
|
|
return url, False, "Already exists"
|
|
|
|
# Rate limiting - 2 second delay between requests to avoid overloading APIs
|
|
time.sleep(2)
|
|
|
|
# Fetch profile - returns raw Exa response + optional PiCo structured
|
|
profile_data = fetch_profile_with_exa(url, session, use_pico_annotator=use_pico, source_type=source_type)
|
|
if profile_data:
|
|
filename = save_profile(slug, profile_data, url, entity_dir)
|
|
if filename:
|
|
return url, True, filename
|
|
return url, False, "Save failed"
|
|
|
|
return url, False, None
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
# Parse command line arguments
|
|
parser = argparse.ArgumentParser(
|
|
description="Fetch LinkedIn profiles and optionally structure with PiCo/CH-Annotator"
|
|
)
|
|
parser.add_argument(
|
|
"batch_size",
|
|
type=int,
|
|
nargs="?",
|
|
default=None,
|
|
help="Number of profiles to process (omit for interactive mode)"
|
|
)
|
|
parser.add_argument(
|
|
"--no-pico",
|
|
action="store_true",
|
|
help="Disable PiCo/CH-Annotator structuring (only save raw Exa data)"
|
|
)
|
|
parser.add_argument(
|
|
"--source-type",
|
|
type=str,
|
|
choices=["modern_digital", "historical_indices", "archival_descriptions", "biographical_dictionaries"],
|
|
default="modern_digital",
|
|
help="Source type category for PiCo annotation (default: modern_digital)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
batch_size = args.batch_size
|
|
use_pico = not args.no_pico
|
|
source_type = args.source_type
|
|
|
|
if batch_size is not None:
|
|
print(f"Will process first {batch_size} profiles")
|
|
|
|
if use_pico:
|
|
print(f"PiCo/CH-Annotator enabled (source_type: {source_type})")
|
|
else:
|
|
print("PiCo/CH-Annotator disabled (raw mode only)")
|
|
|
|
# Check for required API keys
|
|
if not os.environ.get('EXA_API_KEY'):
|
|
print("Error: EXA_API_KEY environment variable not set")
|
|
print("Please set it in your .env file or environment")
|
|
sys.exit(1)
|
|
|
|
# ZAI_API_TOKEN required for PiCo annotation
|
|
if use_pico and not os.environ.get('ZAI_API_TOKEN'):
|
|
print("Warning: ZAI_API_TOKEN not set - PiCo annotation will be skipped")
|
|
print("Set ZAI_API_TOKEN or use --no-pico to disable PiCo annotation")
|
|
elif not os.environ.get('ZAI_API_TOKEN'):
|
|
print("Note: ZAI_API_TOKEN not set (not needed in --no-pico mode)")
|
|
|
|
# Setup paths
|
|
entity_dir = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
|
|
|
|
# Load existing profiles
|
|
print("Loading existing profiles...")
|
|
existing_profiles = load_existing_profiles(entity_dir)
|
|
print(f"Found {len(existing_profiles)} existing profiles")
|
|
|
|
# Path to the staff directory
|
|
staff_dir = "/Users/kempersc/apps/glam/data/custodian/person/affiliated/parsed"
|
|
|
|
# Load all staff files
|
|
print("\n" + "="*60)
|
|
print("LOADING STAFF FILES")
|
|
print("="*60)
|
|
urls = load_all_staff_files([staff_dir])
|
|
|
|
if not urls:
|
|
print("No LinkedIn URLs found to process.")
|
|
return
|
|
|
|
# Filter out URLs we already have
|
|
new_urls = []
|
|
for url in urls:
|
|
slug = extract_slug_from_url(url)
|
|
if slug and slug not in existing_profiles:
|
|
new_urls.append(url)
|
|
|
|
print(f"\nNeed to fetch {len(new_urls)} new profiles")
|
|
print(f"Skipping {len(urls) - len(new_urls)} already existing profiles")
|
|
|
|
if not new_urls:
|
|
print("\nAll profiles already exist!")
|
|
return
|
|
|
|
# Determine how many to process
|
|
if batch_size is not None:
|
|
# Use command line batch size
|
|
new_urls = new_urls[:batch_size]
|
|
print(f"Processing first {len(new_urls)} profiles (batch size: {batch_size})...")
|
|
else:
|
|
# Ask user interactively
|
|
print(f"\nThere are {len(new_urls)} new profiles to fetch.")
|
|
print("Enter number to process (or press Enter for all): ", end="")
|
|
try:
|
|
response = input()
|
|
if response.strip():
|
|
limit = int(response.strip())
|
|
new_urls = new_urls[:limit]
|
|
print(f"Processing first {len(new_urls)} profiles...")
|
|
else:
|
|
print("Processing all profiles...")
|
|
except (ValueError, KeyboardInterrupt, EOFError):
|
|
print("\nProcessing all profiles...")
|
|
|
|
# Process with threading
|
|
success_count = 0
|
|
failed_count = 0
|
|
results = []
|
|
|
|
print("\n" + "="*60)
|
|
print("FETCHING PROFILES")
|
|
print("="*60)
|
|
|
|
with httpx.Client(timeout=90.0) as session: # Increased timeout to 90s
|
|
with ThreadPoolExecutor(max_workers=1) as executor: # Reduced to 1 to avoid rate limits
|
|
# Submit all tasks
|
|
future_to_url = {
|
|
executor.submit(process_person, url, session, existing_profiles, entity_dir, use_pico, source_type): url
|
|
for url in new_urls
|
|
}
|
|
|
|
# Process with progress bar
|
|
with tqdm(total=len(new_urls), desc="Fetching profiles") as pbar:
|
|
for future in as_completed(future_to_url):
|
|
url, success, result = future.result()
|
|
if success:
|
|
success_count += 1
|
|
results.append(f"✓ {url} -> {result}")
|
|
else:
|
|
failed_count += 1
|
|
results.append(f"✗ {url} -> {result}")
|
|
pbar.update(1)
|
|
|
|
# Save results log
|
|
log_filename = f"fetch_log_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.txt"
|
|
with open(log_filename, 'w') as f:
|
|
f.write(f"LinkedIn Profile Fetch Results\n")
|
|
f.write(f"Timestamp: {datetime.now(timezone.utc).isoformat()}\n")
|
|
f.write(f"Total attempted: {len(new_urls)}\n")
|
|
f.write(f"Successful: {success_count}\n")
|
|
f.write(f"Failed: {failed_count}\n")
|
|
f.write(f"\n" + "="*60 + "\n")
|
|
for result in results:
|
|
f.write(result + "\n")
|
|
|
|
print("\n" + "="*60)
|
|
print("RESULTS")
|
|
print("="*60)
|
|
print(f"Successfully fetched: {success_count}")
|
|
print(f"Failed: {failed_count}")
|
|
print(f"Profiles saved to: {entity_dir}")
|
|
print(f"Results log: {log_filename}")
|
|
|
|
# Show some successful profiles
|
|
if success_count > 0:
|
|
print(f"\nFirst 5 successfully fetched profiles:")
|
|
for i, result in enumerate([r for r in results if r.startswith("✓")][:5]):
|
|
print(f" {i+1}. {result}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |