#!/usr/bin/env python3 """ PPID Web Enrichment Script Enriches PPID files with web-sourced claims using Exa AI and Linkup search. Adds proper provenance statements per Rules 6, 26, and 35. Enrichment targets: 1. Birth date/year - Search for biographical information 2. Publications - ORCID, Google Scholar, ResearchGate 3. News mentions - Press coverage, interviews 4. Wikidata entity - Authority file linking 5. Institutional affiliations - Verify current roles All web claims include: - source_url: Where the data was found - retrieved_on: ISO 8601 timestamp - retrieval_agent: Tool used (exa_web_search, linkup_search, etc.) - claim_type: Type of claim (birth_date, publication, news_mention, etc.) - claim_value: The extracted value - provenance: Full provenance chain per Rule 35 Usage: python scripts/enrich_ppids_web.py --limit 10 --verbose python scripts/enrich_ppids_web.py --dry-run --sample stefankulk """ import json import os import re import sys import time import argparse from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, List, Any, Tuple # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) def create_web_claim( claim_type: str, claim_value: str, source_url: str, retrieval_agent: str, confidence: str = "medium", notes: Optional[str] = None, raw_response: Optional[Dict] = None ) -> Dict[str, Any]: """ Create a web claim with proper provenance per Rules 6, 26, and 35. Args: claim_type: Type of claim (birth_date, publication, news_mention, etc.) claim_value: The extracted value source_url: URL where the data was found retrieval_agent: Tool used (exa_web_search, linkup_search, etc.) confidence: Confidence level (high, medium, low, very_low) notes: Additional notes about the claim raw_response: Raw API response for audit Returns: Dict with claim structure per Rule 26 """ now = datetime.now(timezone.utc) claim = { "claim_type": claim_type, "claim_value": claim_value, "source_url": source_url, "retrieved_on": now.isoformat(), "retrieval_agent": retrieval_agent, "confidence": confidence, "provenance": { "statement_created_at": now.isoformat(), "source_archived_at": now.isoformat(), # Same time for API responses "retrieval_method": retrieval_agent, } } if notes: claim["notes"] = notes if raw_response: # Store snippet of raw response for audit (not full response to save space) claim["provenance"]["response_snippet"] = str(raw_response)[:500] return claim def extract_birth_year_from_text(text: str, full_name: str) -> Optional[Tuple[str, str]]: """ Extract birth year from text using various patterns. Returns: Tuple of (birth_year_edtf, extraction_note) or None """ if not text: return None # Normalize text text_lower = text.lower() name_parts = full_name.lower().split() last_name = name_parts[-1] if name_parts else "" # Check if the text is about the right person (basic check) if last_name and last_name not in text_lower: return None # Pattern 1: "born in YYYY" or "born YYYY" born_match = re.search(r'born\s+(?:in\s+)?(\d{4})', text_lower) if born_match: year = born_match.group(1) return (year, f"Extracted from 'born {year}' pattern") # Pattern 2: "(YYYY - )" or "(YYYY-)" indicating birth year birth_dash_match = re.search(r'\((\d{4})\s*[-–—]\s*\)', text) if birth_dash_match: year = birth_dash_match.group(1) return (year, f"Extracted from '({year} - )' lifespan pattern") # Pattern 3: "YYYY - present" or "b. YYYY" b_match = re.search(r'(?:b\.|born)\s*(\d{4})', text_lower) if b_match: year = b_match.group(1) return (year, f"Extracted from 'b. {year}' pattern") # Pattern 4: Age patterns "X years old" with date context age_match = re.search(r'(\d{1,2})\s*(?:years?\s*old|jaar\s*oud)', text_lower) if age_match: age = int(age_match.group(1)) if 20 <= age <= 100: # Reasonable age range current_year = datetime.now().year estimated_birth = current_year - age return (f"{estimated_birth}~", f"Estimated from age {age} (approximate)") # Pattern 5: Birthday patterns "birthday: Month DD, YYYY" birthday_match = re.search( r'(?:birthday|geboren|date of birth)[:\s]+(?:\w+\s+\d{1,2},?\s+)?(\d{4})', text_lower ) if birthday_match: year = birthday_match.group(1) return (year, "Extracted from birthday/geboren pattern") return None def extract_publications_from_text(text: str, full_name: str) -> List[Dict[str, str]]: """ Extract publication references from search results. Returns: List of publication dicts with title, year, venue """ publications = [] if not text: return publications # Look for DOI patterns doi_matches = re.findall(r'10\.\d{4,}/[^\s]+', text) for doi in doi_matches[:5]: # Limit to 5 publications.append({ "type": "doi", "value": doi.strip(), "note": "DOI found in search results" }) # Look for ORCID patterns orcid_match = re.search(r'orcid\.org/(\d{4}-\d{4}-\d{4}-\d{3}[\dX])', text) if orcid_match: publications.append({ "type": "orcid", "value": orcid_match.group(1), "note": "ORCID identifier found" }) return publications def search_birth_date_exa(full_name: str, context_hints: List[str] = None) -> Optional[Dict]: """ Search for birth date using Exa AI web search. Note: This function is designed to be called via MCP tools. In actual execution, replace with MCP tool call. """ # Build search query query_parts = [f'"{full_name}"', "born", "birthday"] if context_hints: query_parts.extend(context_hints[:2]) # Add up to 2 context hints query = " ".join(query_parts) # This would be replaced with actual MCP call: # result = exa_web_search_exa(query=query, numResults=5) return { "query": query, "tool": "exa_web_search_exa", "status": "pending_mcp_call" } def search_publications_exa(full_name: str, institution: str = None) -> Optional[Dict]: """ Search for publications using Exa AI. """ query_parts = [f'"{full_name}"'] if institution: query_parts.append(institution) query_parts.extend(["publications", "research", "ORCID"]) query = " ".join(query_parts) return { "query": query, "tool": "exa_web_search_exa", "status": "pending_mcp_call" } def search_news_mentions_exa(full_name: str, institution: str = None) -> Optional[Dict]: """ Search for news mentions using Exa AI. """ query_parts = [f'"{full_name}"'] if institution: query_parts.append(institution) query = " ".join(query_parts) return { "query": query, "tool": "exa_web_search_exa", "status": "pending_mcp_call" } def get_person_context(ppid_data: Dict) -> Dict[str, Any]: """ Extract context from PPID data for better search queries. """ context = { "full_name": "", "institutions": [], "roles": [], "location": None, "linkedin_url": None, "skills": [], } # Get name name_data = ppid_data.get("name", {}) context["full_name"] = name_data.get("full_name", "") # Get profile data profile = ppid_data.get("profile_data", {}) if profile: context["linkedin_url"] = profile.get("linkedin_url") context["location"] = profile.get("location") context["skills"] = profile.get("skills", [])[:10] # Top 10 skills # Extract institutions from experience for exp in profile.get("experience", []) or []: if exp and exp.get("company"): context["institutions"].append(exp["company"]) if exp.get("title"): context["roles"].append(exp["title"]) # Extract from education for edu in profile.get("education", []) or []: if edu and edu.get("institution"): context["institutions"].append(edu["institution"]) # Deduplicate context["institutions"] = list(dict.fromkeys(context["institutions"]))[:5] context["roles"] = list(dict.fromkeys(context["roles"]))[:5] return context def build_enrichment_queries(ppid_data: Dict) -> List[Dict[str, Any]]: """ Build a list of enrichment queries for a PPID. Returns list of query specs to execute via MCP tools. """ context = get_person_context(ppid_data) full_name = context["full_name"] if not full_name: return [] queries = [] # 1. Birth date search (only if not already known) birth_date = ppid_data.get("birth_date", {}).get("edtf", "XXXX") enrichment_meta = ppid_data.get("enrichment_metadata", {}).get("birth_date_search", {}) if birth_date == "XXXX" and not enrichment_meta.get("attempted"): # Build birth date query with context hints = [] if context["institutions"]: hints.append(context["institutions"][0]) if context["location"]: hints.append(context["location"].split(",")[0]) queries.append({ "type": "birth_date", "query": f'"{full_name}" born birthday biography', "context_hints": hints, "tool": "exa_web_search_exa", "priority": "high" }) # 2. Publications search (for academics/researchers) academic_keywords = ["professor", "researcher", "phd", "doctor", "lecturer", "scientist"] is_academic = any( kw in " ".join(context["roles"]).lower() for kw in academic_keywords ) if is_academic: institution = context["institutions"][0] if context["institutions"] else "" queries.append({ "type": "publications", "query": f'"{full_name}" {institution} publications ORCID research', "tool": "exa_web_search_exa", "priority": "medium" }) # 3. News/press mentions if context["institutions"]: queries.append({ "type": "news_mentions", "query": f'"{full_name}" {context["institutions"][0]}', "tool": "exa_web_search_exa", "priority": "low" }) # 4. Wikidata search (for notable persons) queries.append({ "type": "wikidata", "query": full_name, "tool": "wikidata_search_entity", "priority": "medium" }) return queries def process_search_result( result: Dict[str, Any], query_type: str, full_name: str, ppid_data: Dict ) -> List[Dict[str, Any]]: """ Process a search result and extract web claims. Args: result: Raw search result from Exa/Linkup query_type: Type of query (birth_date, publications, etc.) full_name: Person's full name ppid_data: Current PPID data Returns: List of web claims to add """ claims = [] if not result: return claims # Extract text content from result text = "" source_url = "" if isinstance(result, dict): text = result.get("text", "") or result.get("content", "") or "" source_url = result.get("url", "") or result.get("source_url", "") elif isinstance(result, str): text = result if query_type == "birth_date": birth_info = extract_birth_year_from_text(text, full_name) if birth_info: year, note = birth_info claims.append(create_web_claim( claim_type="birth_year", claim_value=year, source_url=source_url, retrieval_agent="exa_web_search_exa", confidence="medium" if "~" not in year else "low", notes=note, raw_response={"text_snippet": text[:200]} )) elif query_type == "publications": pubs = extract_publications_from_text(text, full_name) for pub in pubs: claims.append(create_web_claim( claim_type=f"identifier_{pub['type']}", claim_value=pub["value"], source_url=source_url, retrieval_agent="exa_web_search_exa", confidence="high" if pub["type"] in ["doi", "orcid"] else "medium", notes=pub.get("note") )) elif query_type == "news_mentions": # For news, we just record the mention if full_name.lower() in text.lower(): claims.append(create_web_claim( claim_type="news_mention", claim_value=text[:500], # First 500 chars source_url=source_url, retrieval_agent="exa_web_search_exa", confidence="medium", notes="News/press mention found" )) return claims def enrich_ppid_file( filepath: Path, dry_run: bool = False, verbose: bool = False ) -> Dict[str, Any]: """ Enrich a single PPID file with web-sourced claims. This function builds queries but does not execute them directly. Queries should be executed via MCP tools in the calling context. Returns: Dict with enrichment stats and pending queries """ stats = { "filepath": str(filepath), "queries_built": 0, "claims_added": 0, "errors": [], "pending_queries": [] } try: with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: stats["errors"].append(f"Failed to read file: {e}") return stats # Build enrichment queries queries = build_enrichment_queries(data) stats["queries_built"] = len(queries) stats["pending_queries"] = queries if verbose: print(f" Built {len(queries)} queries for {filepath.name}") for q in queries: print(f" - {q['type']}: {q['query'][:50]}...") return stats def main(): parser = argparse.ArgumentParser( description="Enrich PPID files with web-sourced claims (Rule 26 compliant)" ) parser.add_argument("--dry-run", action="store_true", help="Don't write changes") parser.add_argument("--limit", type=int, help="Process only N files") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") parser.add_argument("--sample", type=str, help="Process specific linkedin_slug") parser.add_argument( "--query-types", type=str, default="birth_date,publications,news_mentions,wikidata", help="Comma-separated list of query types to run" ) args = parser.parse_args() person_dir = Path("/Users/kempersc/apps/glam/data/person") # Get PPID files if args.sample: # Find file by linkedin slug ppid_files = list(person_dir.glob(f"ID_*{args.sample.upper()}*.json")) if not ppid_files: # Try case-insensitive search ppid_files = [ f for f in person_dir.glob("ID_*.json") if args.sample.lower() in f.stem.lower() ] else: ppid_files = list(person_dir.glob("ID_*.json")) if args.limit: ppid_files = ppid_files[:args.limit] print(f"Processing {len(ppid_files)} PPID files for web enrichment...") if args.dry_run: print("DRY RUN - no changes will be written") query_types = set(args.query_types.split(",")) print(f"Query types: {query_types}") # Statistics total_stats = { "processed": 0, "queries_built": 0, "by_type": {qt: 0 for qt in query_types}, "errors": 0, } all_pending_queries = [] for i, filepath in enumerate(ppid_files): try: stats = enrich_ppid_file(filepath, dry_run=args.dry_run, verbose=args.verbose) total_stats["processed"] += 1 total_stats["queries_built"] += stats["queries_built"] # Filter queries by requested types for q in stats["pending_queries"]: if q["type"] in query_types: total_stats["by_type"][q["type"]] += 1 all_pending_queries.append({ "filepath": stats["filepath"], **q }) if stats["errors"]: total_stats["errors"] += 1 if args.verbose: print(f" ERROR {filepath.name}: {stats['errors']}") if (i + 1) % 100 == 0: print(f" Processed {i + 1}/{len(ppid_files)}...") except Exception as e: total_stats["errors"] += 1 if args.verbose: print(f" ERROR {filepath.name}: {e}") # Print summary print("\n" + "=" * 60) print("WEB ENRICHMENT QUERY SUMMARY") print("=" * 60) print(f"Processed: {total_stats['processed']}") print(f"Queries built: {total_stats['queries_built']}") print(f"By query type:") for qt, count in total_stats["by_type"].items(): print(f" - {qt}: {count}") print(f"Errors: {total_stats['errors']}") # Output pending queries for MCP execution if all_pending_queries and not args.dry_run: output_file = person_dir.parent / "pending_web_queries.json" with open(output_file, "w", encoding="utf-8") as f: json.dump({ "generated_at": datetime.now(timezone.utc).isoformat(), "total_queries": len(all_pending_queries), "queries": all_pending_queries }, f, indent=2, ensure_ascii=False) print(f"\nPending queries saved to: {output_file}") print("Execute these queries via MCP tools and run --apply-results to add claims.") print("\nNote: This script builds queries. Execute via MCP tools:") print(" - exa_web_search_exa for birth_date, publications, news_mentions") print(" - wikidata_search_entity for wikidata matching") if __name__ == "__main__": main()