glam/scripts/enrich_linkedin_profiles_linkup.py
2025-12-16 11:57:34 +01:00

289 lines
9.4 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Batch LinkedIn Profile Enrichment using Linkup API
This script processes heritage professional profiles that have empty experience[] arrays
and enriches them using the Linkup API for web search.
Usage:
python scripts/enrich_linkedin_profiles_linkup.py [--limit N] [--dry-run]
Environment:
LINKUP_API_KEY - Required API key for Linkup
"""
import json
import os
import sys
import argparse
import time
from datetime import datetime, timezone
from pathlib import Path
import httpx
# Configuration
ENTITY_DIR = Path("data/custodian/person/entity")
LINKUP_DIR = Path("data/custodian/web/linkedin")
LINKUP_API_URL = "https://api.linkup.so/v1/search"
# Heritage custodian keywords for filtering
HERITAGE_KEYWORDS = [
"museum", "archief", "bibliotheek", "erfgoed", "collectie",
"monumenten", "rijks", "nationaal", "koninklijk", "gallery",
"archive", "library", "heritage", "collection"
]
# Non-heritage organizations to skip
NON_HERITAGE_ORGS = [
"vvd", "pvda", "cda", "d66", "groenlinks", "pvv", "bbb", # political parties
"politie", "justitie", "defensie", # law enforcement/military
"inspectie", "autoriteit", # regulatory bodies
"ministerie", # ministries (unless heritage-specific)
"belastingdienst", "uwv", "svb", # government services
]
def load_api_key() -> str:
"""Load Linkup API key from environment."""
key = os.environ.get("LINKUP_API_KEY")
if not key:
# Try loading from .env file
env_path = Path(".env")
if env_path.exists():
with open(env_path) as f:
for line in f:
if line.startswith("LINKUP_API_KEY="):
key = line.split("=", 1)[1].strip().strip('"\'')
break
if not key:
raise ValueError("LINKUP_API_KEY not found in environment or .env file")
return key
def is_heritage_custodian(custodian: str) -> bool:
"""Check if custodian is likely a heritage institution."""
if not custodian:
return False
custodian_lower = custodian.lower()
# Check for non-heritage organizations
for org in NON_HERITAGE_ORGS:
if org in custodian_lower:
return False
# Check for heritage keywords
for keyword in HERITAGE_KEYWORDS:
if keyword in custodian_lower:
return True
return False
def find_candidates(limit: int = None) -> list[Path]:
"""Find profiles that need enrichment."""
candidates = []
for json_file in ENTITY_DIR.glob("*_20251214T*.json"):
try:
with open(json_file) as f:
data = json.load(f)
# Check if already enriched
method = data.get("extraction_metadata", {}).get("extraction_method", "")
if "linkup" in method.lower() or "enriched" in method.lower():
continue
# Check if experience is empty
exp = data.get("profile_data", {}).get("experience", [])
if len(exp) > 0:
continue
# Check custodian
custodian = data.get("source_staff_info", {}).get("custodian", "")
if not is_heritage_custodian(custodian):
continue
candidates.append(json_file)
if limit and len(candidates) >= limit:
break
except (json.JSONDecodeError, KeyError) as e:
print(f"Warning: Error reading {json_file}: {e}", file=sys.stderr)
continue
return candidates
def search_linkup(api_key: str, query: str, depth: str = "standard") -> dict:
"""Search using Linkup API."""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"q": query,
"depth": depth,
"outputType": "searchResults"
}
try:
with httpx.Client(timeout=60.0) as client:
response = client.post(LINKUP_API_URL, json=payload, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
print(f"Linkup API error: {e}", file=sys.stderr)
return {"error": str(e)}
def build_search_query(profile_data: dict) -> str:
"""Build a search query from profile data."""
name = profile_data.get("source_staff_info", {}).get("name", "")
custodian = profile_data.get("source_staff_info", {}).get("custodian", "")
headline = profile_data.get("source_staff_info", {}).get("headline", "")
# Build query with name and custodian
query_parts = [name, custodian]
# Add key terms from headline if available
if headline:
# Extract role-related terms
role_terms = []
for term in headline.split():
if len(term) > 3 and term.lower() not in ["bij", "van", "het", "een", "and", "the"]:
role_terms.append(term)
if len(role_terms) >= 2:
break
query_parts.extend(role_terms)
return " ".join(query_parts)
def enrich_profile(json_file: Path, api_key: str, dry_run: bool = False) -> dict:
"""Enrich a single profile with Linkup data."""
with open(json_file) as f:
data = json.load(f)
name = data.get("source_staff_info", {}).get("name", "Unknown")
custodian = data.get("source_staff_info", {}).get("custodian", "Unknown")
print(f"\nProcessing: {name} @ {custodian}")
query = build_search_query(data)
print(f" Query: {query}")
if dry_run:
print(" [DRY RUN] Would search Linkup API")
return {"status": "dry_run", "file": str(json_file)}
# Search Linkup
results = search_linkup(api_key, query, depth="standard")
if "error" in results:
print(f" Error: {results['error']}")
return {"status": "error", "file": str(json_file), "error": results["error"]}
# Save raw results
slug = json_file.stem.split("_")[0]
linkup_dir = LINKUP_DIR / slug
linkup_dir.mkdir(parents=True, exist_ok=True)
results_file = linkup_dir / f"linkup_search_{datetime.now().strftime('%Y%m%dT%H%M%S')}.json"
with open(results_file, "w") as f:
json.dump({"query": query, "results": results}, f, indent=2)
print(f" Saved results to: {results_file}")
# Update profile metadata
data["extraction_metadata"]["extraction_method"] = "fallback_basic_linkup_enriched"
data["extraction_metadata"]["notes"] = (
data["extraction_metadata"].get("notes", "") +
f" Linkup search performed on {datetime.now(timezone.utc).strftime('%Y-%m-%d')}."
).strip()
# Add linkup_enrichment block
data["linkup_enrichment"] = {
"enrichment_date": datetime.now(timezone.utc).isoformat(),
"enrichment_method": "linkup_search_standard",
"enrichment_agent": "enrich_linkedin_profiles_linkup.py",
"queries_used": [query],
"results_file": str(results_file),
"confidence_score": 0.5, # Base score, needs manual review
"notes": "Automated search - requires manual review and scoring"
}
# Save updated profile
with open(json_file, "w") as f:
json.dump(data, f, indent=2)
print(f" Updated profile: {json_file}")
return {
"status": "success",
"file": str(json_file),
"results_file": str(results_file),
"num_results": len(results.get("results", []))
}
def main():
parser = argparse.ArgumentParser(description="Batch LinkedIn profile enrichment using Linkup API")
parser.add_argument("--limit", type=int, default=10, help="Maximum profiles to process (default: 10)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making API calls")
parser.add_argument("--delay", type=float, default=1.0, help="Delay between API calls in seconds (default: 1.0)")
args = parser.parse_args()
print("LinkedIn Profile Enrichment using Linkup API")
print("=" * 50)
# Load API key
if not args.dry_run:
try:
api_key = load_api_key()
print(f"API key loaded (length: {len(api_key)})")
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
else:
api_key = None
# Find candidates
print(f"\nFinding candidates (limit: {args.limit})...")
candidates = find_candidates(limit=args.limit)
print(f"Found {len(candidates)} candidates for enrichment")
if not candidates:
print("No candidates found. Exiting.")
return
# Process candidates
results = []
for i, json_file in enumerate(candidates, 1):
print(f"\n[{i}/{len(candidates)}]", end="")
result = enrich_profile(json_file, api_key, dry_run=args.dry_run)
results.append(result)
# Rate limiting
if not args.dry_run and i < len(candidates):
time.sleep(args.delay)
# Summary
print("\n" + "=" * 50)
print("SUMMARY")
print("=" * 50)
success = sum(1 for r in results if r["status"] == "success")
errors = sum(1 for r in results if r["status"] == "error")
dry_run = sum(1 for r in results if r["status"] == "dry_run")
print(f"Processed: {len(results)}")
print(f" Success: {success}")
print(f" Errors: {errors}")
if dry_run:
print(f" Dry run: {dry_run}")
if __name__ == "__main__":
main()