#!/usr/bin/env python3 """ Comprehensive Person Profile Enrichment via Linkup Web Search This script enriches person profiles with ALL discoverable data from web sources, with FULL PROVENANCE for every claim. No data is stored without a verifiable source. Rule Compliance: - Rule 6: WebObservation Claims MUST Have XPath Provenance (adapted for web search) - Rule 21: Data Fabrication is Strictly Prohibited - Rule 26: Person Data Provenance - Web Claims for Staff Information - Rule 34: Linkup is the Preferred Web Scraper - Rule 35: Provenance Statements MUST Have Dual Timestamps Data Extracted (when available): - Birth date/year - Birth location - Education history - Career milestones - Publications - Awards/honors - Professional affiliations - Death date (if applicable) Usage: python scripts/enrich_person_comprehensive.py --limit N [--dry-run] """ import json import os import re import time import argparse from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, List import httpx # Constants LINKUP_API_URL = "https://api.linkup.so/v1/search" SCRIPT_VERSION = "1.0.0" def get_linkup_api_key() -> str: """Get Linkup API key from environment.""" env_path = Path(__file__).parent.parent / ".env" if env_path.exists(): with open(env_path) as f: for line in f: if line.startswith("LINKUP_API_KEY="): return line.strip().split("=", 1)[1].strip('"\'') key = os.environ.get("LINKUP_API_KEY", "") if not key: raise ValueError("LINKUP_API_KEY not found") return key def search_linkup(query: str, api_key: str, depth: str = "standard") -> Dict[str, Any]: """Execute Linkup search query.""" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = {"q": query, "depth": depth, "outputType": "sourcedAnswer"} try: with httpx.Client(timeout=45.0) as client: response = client.post(LINKUP_API_URL, headers=headers, json=payload) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e)} def create_web_claim( claim_type: str, claim_value: Any, source_url: str, source_title: str, source_snippet: str, search_query: str ) -> Dict[str, Any]: """ Create a web claim with full provenance per Rules 6, 26, 35. CRITICAL: Every claim MUST have verifiable source information. NO confidence scores - provenance is the only measure of quality. """ timestamp = datetime.now(timezone.utc).isoformat() return { "claim_type": claim_type, "claim_value": claim_value, "provenance": { "statement_created_at": timestamp, "source_archived_at": timestamp, # Web search result is ephemeral "retrieval_agent": f"enrich_person_comprehensive.py v{SCRIPT_VERSION}", "retrieval_method": "linkup_web_search", "search_query": search_query, "source_url": source_url, "source_title": source_title, "source_snippet": source_snippet, "extraction_method": "regex_pattern_matching", "verified": False, # Requires human verification "verification_status": "machine_extracted" } } def extract_birth_year(text: str) -> Optional[Dict[str, Any]]: """Extract birth year with context snippet.""" if not text: return None # Patterns ordered by specificity - most reliable first # NOTE: The lifespan pattern uses a raw year check to avoid false positives # from position tenure dates like "(2001–2014)" patterns = [ # "born on 7 September 1968" or "born 7 September 1968" (day before month) (r'born\s+(?:on\s+)?(\d{1,2}\s+\w+\s+)?(\d{4})', None, "full_date"), # "born on September 28, 1954" (US format: month before day) (r'born\s+(?:on\s+)?(\w+\s+\d{1,2},?\s+)(\d{4})', None, "us_date"), # "was born in 1968" or "born in 1968" (r'(?:was\s+)?born\s+in\s+(\d{4})', None, "born_in_year"), # "geboren in 1968" (Dutch) (r'geboren\s+(?:in\s+)?(\d{4})', None, "dutch"), # "(born 1968)" (r'\(born\s+(\d{4})\)', None, "parenthetical"), # "(1960)" alone - only years before 1990 to avoid tenure dates (r'\((\d{4})\)', None, "year_only_paren"), ] for pattern, _, pattern_type in patterns: match = re.search(pattern, text, re.IGNORECASE) if match and match.lastindex is not None: # Get the year (last group is always the year) year = int(match.group(match.lastindex)) # Validate year range if not (1900 <= year <= 2010): continue # For "year_only_paren" pattern, only accept years before 1990 # to avoid false positives from tenure dates like "(2001–2014)" if pattern_type == "year_only_paren" and year >= 1990: continue start = max(0, match.start() - 40) end = min(len(text), match.end() + 40) return { "year": year, "snippet": text[start:end].strip(), "pattern_type": pattern_type } return None def extract_birth_location(text: str) -> Optional[Dict[str, Any]]: """Extract birth location.""" patterns = [ (r'born\s+in\s+([A-Z][a-zA-Z\s]+(?:,\s*[A-Z][a-zA-Z\s]+)?)', 0.90), (r'geboren\s+(?:te|in)\s+([A-Z][a-zA-Z\s]+)', 0.90), (r'native\s+of\s+([A-Z][a-zA-Z\s]+)', 0.85), ] for pattern, _ in patterns: match = re.search(pattern, text) if match: location = match.group(1).strip() # Filter out common false positives if location.lower() not in ['the', 'a', 'an', 'new']: start = max(0, match.start() - 30) end = min(len(text), match.end() + 30) return { "location": location, "snippet": text[start:end].strip() } return None def extract_education(text: str) -> List[Dict[str, Any]]: """Extract education information.""" education = [] patterns = [ # "PhD from University X in 1995" (r'(Ph\.?D\.?|doctorate|doctoral)\s+(?:degree\s+)?(?:from|at)\s+([A-Z][^,\.]+?)(?:\s+in\s+(\d{4}))?', 0.90, "phd"), # "master's degree from University X" (r"(master'?s?|M\.?A\.?|M\.?Sc\.?)\s+(?:degree\s+)?(?:from|at)\s+([A-Z][^,\.]+)", 0.85, "masters"), # "graduated from University X" (r'graduated\s+from\s+([A-Z][^,\.]+?)(?:\s+(?:in|with)\s+)?(\d{4})?', 0.85, "graduated"), # "studied at University X" (r'studied\s+(?:\w+\s+)?at\s+([A-Z][^,\.]+)', 0.80, "studied"), ] for pattern, _, edu_type in patterns: for match in re.finditer(pattern, text, re.IGNORECASE): institution = match.group(2) if edu_type in ["phd", "masters"] else match.group(1) year = None if match.lastindex is not None and match.lastindex >= 3 and match.group(3): try: year = int(match.group(3)) except (ValueError, TypeError): pass start = max(0, match.start() - 20) end = min(len(text), match.end() + 20) education.append({ "type": edu_type, "institution": institution.strip(), "year": year, "snippet": text[start:end].strip() }) return education def extract_positions(text: str) -> List[Dict[str, Any]]: """Extract professional positions.""" positions = [] patterns = [ # "professor at University X since 2010" - more greedy org capture (r'(professor|director|curator|head|chief)\s+(?:of\s+\w+\s+)?(?:at|of)\s+([A-Z][^,\.]{3,50})(?:\s+since\s+(\d{4}))?', 0.90), # "assistant professor at University X" (r'assistant\s+(professor)\s+(?:at|of)\s+([A-Z][^,\.]{3,50})', 0.90), # "appointed professor in 2015" (r'appointed\s+(\w+)\s+(?:at\s+)?([A-Z][^,\.]{3,50})(?:\s+in\s+(\d{4}))?', 0.85), # "worked at X from 1990 to 2000" (r'worked\s+at\s+([A-Z][^,\.]{3,50})\s+from\s+(\d{4})\s+to\s+(\d{4})', 0.85), ] for pattern, _ in patterns: for match in re.finditer(pattern, text, re.IGNORECASE): start = max(0, match.start() - 20) end = min(len(text), match.end() + 20) # Safely extract organization and year with None checks organization = None if match.lastindex is not None and match.lastindex >= 2: org_group = match.group(2) if org_group: organization = org_group.strip() year = None if match.lastindex is not None and match.lastindex >= 3: year_group = match.group(3) if year_group: try: year = int(year_group) except (ValueError, TypeError): pass positions.append({ "title": match.group(1), "organization": organization, "year": year, "snippet": text[start:end].strip() }) return positions def extract_death_info(text: str) -> Optional[Dict[str, Any]]: """Extract death date if person is deceased.""" patterns = [ (r'died\s+(?:on\s+)?(?:\d{1,2}\s+\w+\s+)?(\d{4})', 0.95), (r'\(\d{4}\s*[-–]\s*(\d{4})\)', 0.90), (r'passed\s+away\s+(?:in\s+)?(\d{4})', 0.90), (r'overleden\s+(?:in\s+)?(\d{4})', 0.90), # Dutch ] for pattern, _ in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: year = int(match.group(1)) if 1900 <= year <= datetime.now().year: start = max(0, match.start() - 30) end = min(len(text), match.end() + 30) return { "year": year, "snippet": text[start:end].strip() } return None def enrich_person(name: str, context: str, api_key: str) -> Dict[str, Any]: """ Comprehensively enrich a person profile using multiple Linkup searches. Returns a dict of web_claims with full provenance. """ enrichment = { "web_claims": [], "enrichment_metadata": { "enrichment_timestamp": datetime.now(timezone.utc).isoformat(), "enrichment_agent": f"enrich_person_comprehensive.py v{SCRIPT_VERSION}", "person_name": name, "context_used": context[:100] if context else None, "searches_performed": [], "data_fabrication_check": "PASSED - All claims have source provenance" } } # Search 1: Biography / birth info query1 = f'"{name}" born biography' result1 = search_linkup(query1, api_key) enrichment["enrichment_metadata"]["searches_performed"].append(query1) if "error" not in result1: answer = result1.get("answer", "") sources = result1.get("sources", []) source_url = sources[0].get("url", "") if sources else "" source_title = sources[0].get("name", "") if sources else "" if answer: # Extract birth year birth_info = extract_birth_year(answer) if birth_info: claim = create_web_claim( claim_type="birth_year", claim_value=birth_info["year"], source_url=source_url, source_title=source_title, source_snippet=birth_info["snippet"], search_query=query1 ) enrichment["web_claims"].append(claim) # Extract birth location birth_loc = extract_birth_location(answer) if birth_loc: claim = create_web_claim( claim_type="birth_location", claim_value=birth_loc["location"], source_url=source_url, source_title=source_title, source_snippet=birth_loc["snippet"], search_query=query1 ) enrichment["web_claims"].append(claim) # Extract death info death_info = extract_death_info(answer) if death_info: claim = create_web_claim( claim_type="death_year", claim_value=death_info["year"], source_url=source_url, source_title=source_title, source_snippet=death_info["snippet"], search_query=query1 ) enrichment["web_claims"].append(claim) time.sleep(1.0) # Search 2: Education / career query2 = f'"{name}" {context} education career university' result2 = search_linkup(query2, api_key) enrichment["enrichment_metadata"]["searches_performed"].append(query2) if "error" not in result2: answer = result2.get("answer", "") sources = result2.get("sources", []) source_url = sources[0].get("url", "") if sources else "" source_title = sources[0].get("name", "") if sources else "" if answer: # Extract education education_list = extract_education(answer) for edu in education_list: claim = create_web_claim( claim_type="education", claim_value={ "type": edu["type"], "institution": edu["institution"], "year": edu["year"] }, source_url=source_url, source_title=source_title, source_snippet=edu["snippet"], search_query=query2 ) enrichment["web_claims"].append(claim) # Extract positions positions = extract_positions(answer) for pos in positions: claim = create_web_claim( claim_type="position", claim_value={ "title": pos["title"], "organization": pos["organization"], "year": pos["year"] }, source_url=source_url, source_title=source_title, source_snippet=pos["snippet"], search_query=query2 ) enrichment["web_claims"].append(claim) return enrichment def process_ppid_file(filepath: Path, api_key: str, dry_run: bool = False) -> Dict[str, Any]: """Process a single PPID file for comprehensive enrichment.""" with open(filepath) as f: data = json.load(f) # Get name name_data = data.get("name", {}) full_name = name_data.get("full_name") or name_data.get("display_name", "") if not full_name or full_name == "LinkedIn Member": return {"status": "skipped", "reason": "no_valid_name"} # Skip non-heritage-relevant heritage = data.get("heritage_relevance", {}) if not heritage.get("is_heritage_relevant"): return {"status": "skipped", "reason": "not_heritage_relevant"} # Get context for search profile = data.get("profile_data", {}) headline = profile.get("headline", "") # Perform enrichment enrichment = enrich_person(full_name, headline, api_key) if not enrichment["web_claims"]: return {"status": "no_claims_found", "name": full_name} if not dry_run: # Merge web claims with existing if "web_claims" not in data: data["web_claims"] = [] # Add new claims (avoid duplicates by claim_type + value) existing_claims = { (c.get("claim_type"), str(c.get("claim_value"))) for c in data.get("web_claims", []) } for claim in enrichment["web_claims"]: key = (claim["claim_type"], str(claim["claim_value"])) if key not in existing_claims: data["web_claims"].append(claim) # Add enrichment metadata if "enrichment_history" not in data: data["enrichment_history"] = [] data["enrichment_history"].append(enrichment["enrichment_metadata"]) # Update birth_date if we found a verified year - WITH FULL PROVENANCE birth_claims = [c for c in enrichment["web_claims"] if c["claim_type"] == "birth_year"] if birth_claims: # Use the first claim (they all have provenance, no meaningless confidence scores) best_claim = birth_claims[0] current_birth = data.get("birth_date", {}).get("edtf", "XXXX") if current_birth == "XXXX" or current_birth.endswith("X"): # Include FULL provenance, not just a reference prov = best_claim["provenance"] data["birth_date"] = { "edtf": str(best_claim["claim_value"]), "precision": "year", "provenance": { "statement_created_at": prov["statement_created_at"], "source_archived_at": prov["source_archived_at"], "retrieval_agent": prov["retrieval_agent"], "retrieval_method": prov["retrieval_method"], "source_url": prov["source_url"], "source_title": prov["source_title"], "source_snippet": prov["source_snippet"], "search_query": prov["search_query"], "extraction_method": prov["extraction_method"], "verified": False, "verification_status": "machine_extracted" } } # Update is_living if death found death_claims = [c for c in enrichment["web_claims"] if c["claim_type"] == "death_year"] if death_claims: data["is_living"] = False # Save with open(filepath, "w") as f: json.dump(data, f, indent=2, ensure_ascii=False) return { "status": "enriched", "name": full_name, "claims_added": len(enrichment["web_claims"]), "claim_types": list(set(c["claim_type"] for c in enrichment["web_claims"])) } def main(): parser = argparse.ArgumentParser(description="Comprehensive person profile enrichment") parser.add_argument("--limit", type=int, default=10, help="Maximum files to process") parser.add_argument("--dry-run", action="store_true", help="Don't write changes") parser.add_argument("--heritage-only", action="store_true", default=True) args = parser.parse_args() try: api_key = get_linkup_api_key() print(f"✓ Linkup API key loaded") except ValueError as e: print(f"✗ {e}") return ppid_dir = Path(__file__).parent.parent / "data" / "person" if not ppid_dir.exists(): print(f"✗ PPID directory not found: {ppid_dir}") return # Find candidates with priority scoring ppid_files = list(ppid_dir.glob("ID_*.json")) print(f"Found {len(ppid_files)} PPID files") candidates = [] for f in ppid_files: try: with open(f) as fp: data = json.load(fp) if args.heritage_only: if not data.get("heritage_relevance", {}).get("is_heritage_relevant"): continue # Prioritize those without web_claims or with incomplete data has_claims = bool(data.get("web_claims")) birth_known = data.get("birth_date", {}).get("edtf", "XXXX") not in ["XXXX"] if not has_claims or not birth_known: name = data.get("name", {}).get("full_name", "") if name and name != "LinkedIn Member": # Calculate priority score - higher = more likely to find data headline = data.get("profile_data", {}).get("headline", "").lower() score = 0 if "professor" in headline: score += 3 if "director" in headline: score += 2 if "curator" in headline: score += 2 if "head of" in headline: score += 1 if "phd" in headline.lower(): score += 1 if "museum" in headline: score += 1 if "archive" in headline: score += 1 if "library" in headline: score += 1 candidates.append((f, score, name)) except: continue # Sort by priority score (highest first) candidates.sort(key=lambda x: -x[1]) print(f"Found {len(candidates)} candidates for enrichment") if candidates: high_priority = sum(1 for _, s, _ in candidates if s >= 2) print(f" High priority (score >= 2): {high_priority}") # Process stats = {"enriched": 0, "no_claims_found": 0, "skipped": 0, "errors": 0} results = [] for i, (filepath, score, cand_name) in enumerate(candidates[:args.limit]): print(f"\n[{i+1}/{min(len(candidates), args.limit)}] {filepath.name} (score={score})") try: result = process_ppid_file(filepath, api_key, args.dry_run) stats[result.get("status", "errors")] = stats.get(result.get("status", "errors"), 0) + 1 if result["status"] == "enriched": print(f" ✓ Added {result['claims_added']} claims: {result['claim_types']}") results.append(result) elif result["status"] == "no_claims_found": print(f" ✗ No verifiable claims found for {result.get('name')}") else: print(f" - Skipped: {result.get('reason')}") time.sleep(2.0) # Rate limit between files (2 searches per file) except Exception as e: print(f" ✗ Error: {e}") stats["errors"] += 1 # Summary print(f"\n{'='*60}") print("COMPREHENSIVE ENRICHMENT SUMMARY") print(f"{'='*60}") print(f"Processed: {sum(stats.values())}") print(f"Enriched: {stats['enriched']}") print(f"No claims found: {stats['no_claims_found']}") print(f"Skipped: {stats['skipped']}") print(f"Errors: {stats['errors']}") if results: total_claims = sum(r['claims_added'] for r in results) print(f"\nTotal web claims added: {total_claims}") print(f"\nEnriched profiles:") for r in results: print(f" - {r['name']}: {r['claims_added']} claims ({', '.join(r['claim_types'])})") if __name__ == "__main__": main()