glam/scripts/enrich_ppids_linkup.py
kempersc 0845d9f30e feat(scripts): add person enrichment and slot mapping utilities
Person Enrichment Scripts:
- enrich_person_comprehensive.py: Full-featured web search enrichment via Linkup
  with Rule 6/21/26/34/35 compliance (dual timestamps, no fabrication)
- enrich_ppids_linkup.py: Batch PPID enrichment pipeline
- extract_persons_with_provenance.py: Extract person data from LinkedIn HTML
  with XPath provenance tracking

LinkML Slot Management:
- update_slot_mappings.py: Update slots for RiC-O naming (Rule 39) and
  semantic URI requirements (Rule 38)
- update_class_slot_references.py: Update class files referencing renamed slots
- validate_slot_mappings.py: Validate slot definitions against ontology rules

All scripts follow established project conventions for provenance and
ontology alignment.
2026-01-10 13:32:32 +01:00

374 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
PPID Enrichment via Linkup Web Search (Rule 34 & 44 Compliant)
Uses Linkup search to find birth years and biographical data from:
- Academic profiles (university pages, ResearchGate, Academia.edu)
- News articles and press releases
- Institutional websites
- Wikipedia, Wikidata
Per Rule 34: Linkup is the preferred web scraper.
Per Rule 44: Birth dates use EDTF notation with web search enrichment.
Per Rule 45: All inferred data includes explicit provenance.
Usage:
python scripts/enrich_ppids_linkup.py [--limit N] [--dry-run]
"""
import json
import os
import re
import time
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import httpx
# Linkup API configuration
LINKUP_API_URL = "https://api.linkup.so/v1/search"
def get_linkup_api_key() -> str:
"""Get Linkup API key from environment."""
# Try .env file first
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
with open(env_path) as f:
for line in f:
if line.startswith("LINKUP_API_KEY="):
return line.strip().split("=", 1)[1].strip('"\'')
# Fall back to environment variable
key = os.environ.get("LINKUP_API_KEY", "")
if not key:
raise ValueError("LINKUP_API_KEY not found in .env or environment")
return key
def search_linkup(query: str, api_key: str, depth: str = "standard") -> Dict[str, Any]:
"""Execute Linkup search query.
Returns dict with 'answer' (synthesized response) and 'sources' (list of source URLs).
The MCP tool returns 'results' but the API returns 'answer' + 'sources'.
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"q": query,
"depth": depth,
"outputType": "sourcedAnswer"
}
try:
with httpx.Client(timeout=30.0) as client:
response = client.post(LINKUP_API_URL, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
def extract_birth_year_from_text(text: str, name: str) -> Optional[Tuple[int, str, float]]:
"""
Extract birth year from text mentioning the person.
Returns (year, source_snippet, confidence) or None.
"""
if not text or not name:
return None
# Get name parts for matching
name_parts = name.lower().split()
last_name = name_parts[-1] if name_parts else ""
# Patterns to find birth year (ordered by specificity)
patterns = [
# "born on 11 February 1948" or "born December 3, 1951"
(r'born\s+(?:on\s+)?(?:\d{1,2}\s+)?\w+\s+(?:\d{1,2},?\s+)?(\d{4})', 0.95),
# "was born in 1955" or "born in Amsterdam in 1955"
(r'(?:was\s+)?born\s+(?:in\s+\w+\s+)?in\s+(\d{4})', 0.95),
# "geboren in 1955" (Dutch)
(r'geboren\s+(?:in\s+)?(\d{4})', 0.95),
# "Name (born 1951)"
(r'\(born\s+(\d{4})\)', 0.95),
# "Name (1951)" - common Wikipedia format
(r'\((\d{4})\)', 0.90),
# "born in 1951"
(r'born\s+(?:in\s+)?(\d{4})', 0.90),
# "Name, born in New York City, USA, in 1951"
(r'born\s+in\s+[\w\s,]+,?\s+in\s+(\d{4})', 0.85),
# Fallback: just find a year after "born"
(r'born.*?(\d{4})', 0.80),
]
for pattern, confidence in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
year = int(match.group(1))
if 1920 <= year <= 2010: # Reasonable birth year range
# Get context around match
start = max(0, match.start() - 50)
end = min(len(text), match.end() + 50)
snippet = text[start:end].strip()
return (year, snippet, confidence)
return None
def search_person_birth_year(name: str, affiliations: List[str], api_key: str) -> Optional[Dict[str, Any]]:
"""
Search for person's birth year using Linkup.
The API returns 'answer' (synthesized) and 'sources' (URLs).
"""
# Build search query with context
affiliation_context = ""
if affiliations:
# Use first heritage-related affiliation
for aff in affiliations[:2]:
if any(keyword in aff.lower() for keyword in
['museum', 'archive', 'library', 'university', 'heritage', 'curator']):
affiliation_context = aff
break
if not affiliation_context and affiliations:
affiliation_context = affiliations[0]
# Search queries to try
queries = [
f'"{name}" born biography {affiliation_context}',
f'"{name}" biography age born year',
]
for query in queries:
result = search_linkup(query, api_key)
if "error" in result:
continue
# The API returns 'answer' field with synthesized response
answer = result.get("answer", "")
if answer:
birth_info = extract_birth_year_from_text(answer, name)
if birth_info:
year, snippet, confidence = birth_info
# Get first source URL if available
sources = result.get("sources", [])
source_url = sources[0].get("url", "") if sources else ""
source_name = sources[0].get("name", "") if sources else ""
return {
"birth_year": year,
"edtf": str(year),
"source_snippet": snippet,
"source_url": source_url,
"source_title": source_name,
"confidence": confidence,
"search_query": query,
"source_type": "linkup_answer"
}
# Rate limit
time.sleep(0.5)
return None
def enrich_ppid_file(filepath: Path, api_key: str, dry_run: bool = False) -> Dict[str, Any]:
"""
Enrich a single PPID file with Linkup search data.
Returns enrichment result.
"""
with open(filepath) as f:
data = json.load(f)
# Skip if already has confirmed birth year
birth_date = data.get("birth_date", {})
if birth_date.get("edtf") and birth_date.get("edtf") != "XXXX":
if not birth_date.get("edtf", "").endswith("X"):
return {"status": "skipped", "reason": "already_has_birth_year"}
# Get name
name_data = data.get("name", {})
full_name = name_data.get("full_name") or name_data.get("display_name", "")
if not full_name or full_name == "LinkedIn Member":
return {"status": "skipped", "reason": "no_name"}
# Skip if not heritage relevant
heritage = data.get("heritage_relevance", {})
if not heritage.get("is_heritage_relevant"):
return {"status": "skipped", "reason": "not_heritage_relevant"}
# Get affiliations for context
affiliations = []
for aff in data.get("affiliations", []):
if isinstance(aff, dict):
org = aff.get("organization") or aff.get("company", "")
if org:
affiliations.append(org)
# Also check profile_data
profile = data.get("profile_data", {})
headline = profile.get("headline", "")
if headline:
affiliations.insert(0, headline)
if not affiliations:
return {"status": "skipped", "reason": "no_affiliations"}
# Search for birth year
result = search_person_birth_year(full_name, affiliations, api_key)
if not result:
return {"status": "not_found", "name": full_name}
# Build enrichment data with provenance (Rule 45)
timestamp = datetime.now(timezone.utc).isoformat()
enrichment = {
"web_search_enrichment": {
"birth_year_discovery": {
"value": result["birth_year"],
"edtf": result["edtf"],
"confidence": result["confidence"],
"provenance": {
"statement_created_at": timestamp,
"source_archived_at": timestamp, # Search result is ephemeral
"retrieval_agent": "enrich_ppids_linkup.py",
"method": "linkup_web_search",
"search_query": result["search_query"],
"source_url": result.get("source_url", ""),
"source_title": result.get("source_title", ""),
"source_snippet": result["source_snippet"],
"source_type": result["source_type"]
}
}
}
}
if not dry_run:
# Merge with existing data
if "web_search_enrichment" not in data:
data["web_search_enrichment"] = {}
data["web_search_enrichment"]["birth_year_discovery"] = enrichment["web_search_enrichment"]["birth_year_discovery"]
# Update birth_date if we found a specific year (better than XXXX or decade)
current_birth = data.get("birth_date", {}).get("edtf", "XXXX")
if current_birth == "XXXX" or current_birth.endswith("X"):
if result["confidence"] >= 0.80:
data["birth_date"] = {
"edtf": result["edtf"],
"precision": "year",
"source": "web_search_enrichment",
"confidence": result["confidence"]
}
# Save
with open(filepath, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return {
"status": "enriched",
"name": full_name,
"birth_year": result["birth_year"],
"confidence": result["confidence"],
"source": result.get("source_url", result["source_type"])
}
def main():
parser = argparse.ArgumentParser(description="Enrich PPID files using Linkup web search")
parser.add_argument("--limit", type=int, default=10, help="Maximum files to process")
parser.add_argument("--dry-run", action="store_true", help="Don't write changes")
parser.add_argument("--min-confidence", type=float, default=0.70, help="Minimum confidence threshold")
parser.add_argument("--heritage-only", action="store_true", default=True, help="Only process heritage-relevant profiles")
args = parser.parse_args()
# Get API key
try:
api_key = get_linkup_api_key()
print(f"✓ Linkup API key loaded")
except ValueError as e:
print(f"{e}")
return
# Find PPID files
ppid_dir = Path(__file__).parent.parent / "data" / "person"
if not ppid_dir.exists():
print(f"✗ PPID directory not found: {ppid_dir}")
return
ppid_files = list(ppid_dir.glob("ID_*.json"))
print(f"Found {len(ppid_files)} PPID files")
# Filter to files needing enrichment (unknown or decade-only birth dates)
candidates = []
for f in ppid_files:
try:
with open(f) as fp:
data = json.load(fp)
# Check heritage relevance
if args.heritage_only:
heritage = data.get("heritage_relevance", {})
if not heritage.get("is_heritage_relevant"):
continue
# Check if birth date needs enrichment
birth = data.get("birth_date", {}).get("edtf", "XXXX")
if birth == "XXXX" or birth.endswith("X"):
# Prioritize those with good names
name = data.get("name", {}).get("full_name", "")
if name and name != "LinkedIn Member":
candidates.append(f)
except:
continue
print(f"Found {len(candidates)} files needing birth year enrichment")
# Process
stats = {"enriched": 0, "not_found": 0, "skipped": 0, "errors": 0}
results = []
for i, filepath in enumerate(candidates[:args.limit]):
print(f"\n[{i+1}/{min(len(candidates), args.limit)}] Processing {filepath.name}...")
try:
result = enrich_ppid_file(filepath, api_key, args.dry_run)
stats[result.get("status", "errors")] = stats.get(result.get("status", "errors"), 0) + 1
if result["status"] == "enriched":
print(f" ✓ Found birth year: {result['birth_year']} (confidence: {result['confidence']:.0%})")
results.append(result)
elif result["status"] == "not_found":
print(f" ✗ No birth year found for {result.get('name', 'unknown')}")
else:
print(f" - Skipped: {result.get('reason', 'unknown')}")
# Rate limit
time.sleep(1.0)
except Exception as e:
print(f" ✗ Error: {e}")
stats["errors"] += 1
# Summary
print(f"\n{'='*50}")
print("ENRICHMENT SUMMARY")
print(f"{'='*50}")
print(f"Processed: {sum(stats.values())}")
print(f"Enriched: {stats['enriched']}")
print(f"Not found: {stats['not_found']}")
print(f"Skipped: {stats['skipped']}")
print(f"Errors: {stats['errors']}")
if results:
print(f"\nEnriched profiles:")
for r in results:
print(f" - {r['name']}: born {r['birth_year']} ({r['confidence']:.0%})")
if __name__ == "__main__":
main()