Person Enrichment Scripts: - enrich_person_comprehensive.py: Full-featured web search enrichment via Linkup with Rule 6/21/26/34/35 compliance (dual timestamps, no fabrication) - enrich_ppids_linkup.py: Batch PPID enrichment pipeline - extract_persons_with_provenance.py: Extract person data from LinkedIn HTML with XPath provenance tracking LinkML Slot Management: - update_slot_mappings.py: Update slots for RiC-O naming (Rule 39) and semantic URI requirements (Rule 38) - update_class_slot_references.py: Update class files referencing renamed slots - validate_slot_mappings.py: Validate slot definitions against ontology rules All scripts follow established project conventions for provenance and ontology alignment.
374 lines
13 KiB
Python
Executable file
374 lines
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
PPID Enrichment via Linkup Web Search (Rule 34 & 44 Compliant)
|
|
|
|
Uses Linkup search to find birth years and biographical data from:
|
|
- Academic profiles (university pages, ResearchGate, Academia.edu)
|
|
- News articles and press releases
|
|
- Institutional websites
|
|
- Wikipedia, Wikidata
|
|
|
|
Per Rule 34: Linkup is the preferred web scraper.
|
|
Per Rule 44: Birth dates use EDTF notation with web search enrichment.
|
|
Per Rule 45: All inferred data includes explicit provenance.
|
|
|
|
Usage:
|
|
python scripts/enrich_ppids_linkup.py [--limit N] [--dry-run]
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import time
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
import httpx
|
|
|
|
# Linkup API configuration
|
|
LINKUP_API_URL = "https://api.linkup.so/v1/search"
|
|
|
|
|
|
def get_linkup_api_key() -> str:
|
|
"""Get Linkup API key from environment."""
|
|
# Try .env file first
|
|
env_path = Path(__file__).parent.parent / ".env"
|
|
if env_path.exists():
|
|
with open(env_path) as f:
|
|
for line in f:
|
|
if line.startswith("LINKUP_API_KEY="):
|
|
return line.strip().split("=", 1)[1].strip('"\'')
|
|
|
|
# Fall back to environment variable
|
|
key = os.environ.get("LINKUP_API_KEY", "")
|
|
if not key:
|
|
raise ValueError("LINKUP_API_KEY not found in .env or environment")
|
|
return key
|
|
|
|
|
|
def search_linkup(query: str, api_key: str, depth: str = "standard") -> Dict[str, Any]:
|
|
"""Execute Linkup search query.
|
|
|
|
Returns dict with 'answer' (synthesized response) and 'sources' (list of source URLs).
|
|
The MCP tool returns 'results' but the API returns 'answer' + 'sources'.
|
|
"""
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
payload = {
|
|
"q": query,
|
|
"depth": depth,
|
|
"outputType": "sourcedAnswer"
|
|
}
|
|
|
|
try:
|
|
with httpx.Client(timeout=30.0) as client:
|
|
response = client.post(LINKUP_API_URL, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def extract_birth_year_from_text(text: str, name: str) -> Optional[Tuple[int, str, float]]:
|
|
"""
|
|
Extract birth year from text mentioning the person.
|
|
Returns (year, source_snippet, confidence) or None.
|
|
"""
|
|
if not text or not name:
|
|
return None
|
|
|
|
# Get name parts for matching
|
|
name_parts = name.lower().split()
|
|
last_name = name_parts[-1] if name_parts else ""
|
|
|
|
# Patterns to find birth year (ordered by specificity)
|
|
patterns = [
|
|
# "born on 11 February 1948" or "born December 3, 1951"
|
|
(r'born\s+(?:on\s+)?(?:\d{1,2}\s+)?\w+\s+(?:\d{1,2},?\s+)?(\d{4})', 0.95),
|
|
# "was born in 1955" or "born in Amsterdam in 1955"
|
|
(r'(?:was\s+)?born\s+(?:in\s+\w+\s+)?in\s+(\d{4})', 0.95),
|
|
# "geboren in 1955" (Dutch)
|
|
(r'geboren\s+(?:in\s+)?(\d{4})', 0.95),
|
|
# "Name (born 1951)"
|
|
(r'\(born\s+(\d{4})\)', 0.95),
|
|
# "Name (1951)" - common Wikipedia format
|
|
(r'\((\d{4})\)', 0.90),
|
|
# "born in 1951"
|
|
(r'born\s+(?:in\s+)?(\d{4})', 0.90),
|
|
# "Name, born in New York City, USA, in 1951"
|
|
(r'born\s+in\s+[\w\s,]+,?\s+in\s+(\d{4})', 0.85),
|
|
# Fallback: just find a year after "born"
|
|
(r'born.*?(\d{4})', 0.80),
|
|
]
|
|
|
|
for pattern, confidence in patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
year = int(match.group(1))
|
|
if 1920 <= year <= 2010: # Reasonable birth year range
|
|
# Get context around match
|
|
start = max(0, match.start() - 50)
|
|
end = min(len(text), match.end() + 50)
|
|
snippet = text[start:end].strip()
|
|
return (year, snippet, confidence)
|
|
|
|
return None
|
|
|
|
|
|
def search_person_birth_year(name: str, affiliations: List[str], api_key: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Search for person's birth year using Linkup.
|
|
The API returns 'answer' (synthesized) and 'sources' (URLs).
|
|
"""
|
|
# Build search query with context
|
|
affiliation_context = ""
|
|
if affiliations:
|
|
# Use first heritage-related affiliation
|
|
for aff in affiliations[:2]:
|
|
if any(keyword in aff.lower() for keyword in
|
|
['museum', 'archive', 'library', 'university', 'heritage', 'curator']):
|
|
affiliation_context = aff
|
|
break
|
|
if not affiliation_context and affiliations:
|
|
affiliation_context = affiliations[0]
|
|
|
|
# Search queries to try
|
|
queries = [
|
|
f'"{name}" born biography {affiliation_context}',
|
|
f'"{name}" biography age born year',
|
|
]
|
|
|
|
for query in queries:
|
|
result = search_linkup(query, api_key)
|
|
|
|
if "error" in result:
|
|
continue
|
|
|
|
# The API returns 'answer' field with synthesized response
|
|
answer = result.get("answer", "")
|
|
if answer:
|
|
birth_info = extract_birth_year_from_text(answer, name)
|
|
if birth_info:
|
|
year, snippet, confidence = birth_info
|
|
# Get first source URL if available
|
|
sources = result.get("sources", [])
|
|
source_url = sources[0].get("url", "") if sources else ""
|
|
source_name = sources[0].get("name", "") if sources else ""
|
|
|
|
return {
|
|
"birth_year": year,
|
|
"edtf": str(year),
|
|
"source_snippet": snippet,
|
|
"source_url": source_url,
|
|
"source_title": source_name,
|
|
"confidence": confidence,
|
|
"search_query": query,
|
|
"source_type": "linkup_answer"
|
|
}
|
|
|
|
# Rate limit
|
|
time.sleep(0.5)
|
|
|
|
return None
|
|
|
|
|
|
def enrich_ppid_file(filepath: Path, api_key: str, dry_run: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Enrich a single PPID file with Linkup search data.
|
|
Returns enrichment result.
|
|
"""
|
|
with open(filepath) as f:
|
|
data = json.load(f)
|
|
|
|
# Skip if already has confirmed birth year
|
|
birth_date = data.get("birth_date", {})
|
|
if birth_date.get("edtf") and birth_date.get("edtf") != "XXXX":
|
|
if not birth_date.get("edtf", "").endswith("X"):
|
|
return {"status": "skipped", "reason": "already_has_birth_year"}
|
|
|
|
# Get name
|
|
name_data = data.get("name", {})
|
|
full_name = name_data.get("full_name") or name_data.get("display_name", "")
|
|
if not full_name or full_name == "LinkedIn Member":
|
|
return {"status": "skipped", "reason": "no_name"}
|
|
|
|
# Skip if not heritage relevant
|
|
heritage = data.get("heritage_relevance", {})
|
|
if not heritage.get("is_heritage_relevant"):
|
|
return {"status": "skipped", "reason": "not_heritage_relevant"}
|
|
|
|
# Get affiliations for context
|
|
affiliations = []
|
|
for aff in data.get("affiliations", []):
|
|
if isinstance(aff, dict):
|
|
org = aff.get("organization") or aff.get("company", "")
|
|
if org:
|
|
affiliations.append(org)
|
|
|
|
# Also check profile_data
|
|
profile = data.get("profile_data", {})
|
|
headline = profile.get("headline", "")
|
|
if headline:
|
|
affiliations.insert(0, headline)
|
|
|
|
if not affiliations:
|
|
return {"status": "skipped", "reason": "no_affiliations"}
|
|
|
|
# Search for birth year
|
|
result = search_person_birth_year(full_name, affiliations, api_key)
|
|
|
|
if not result:
|
|
return {"status": "not_found", "name": full_name}
|
|
|
|
# Build enrichment data with provenance (Rule 45)
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
enrichment = {
|
|
"web_search_enrichment": {
|
|
"birth_year_discovery": {
|
|
"value": result["birth_year"],
|
|
"edtf": result["edtf"],
|
|
"confidence": result["confidence"],
|
|
"provenance": {
|
|
"statement_created_at": timestamp,
|
|
"source_archived_at": timestamp, # Search result is ephemeral
|
|
"retrieval_agent": "enrich_ppids_linkup.py",
|
|
"method": "linkup_web_search",
|
|
"search_query": result["search_query"],
|
|
"source_url": result.get("source_url", ""),
|
|
"source_title": result.get("source_title", ""),
|
|
"source_snippet": result["source_snippet"],
|
|
"source_type": result["source_type"]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if not dry_run:
|
|
# Merge with existing data
|
|
if "web_search_enrichment" not in data:
|
|
data["web_search_enrichment"] = {}
|
|
data["web_search_enrichment"]["birth_year_discovery"] = enrichment["web_search_enrichment"]["birth_year_discovery"]
|
|
|
|
# Update birth_date if we found a specific year (better than XXXX or decade)
|
|
current_birth = data.get("birth_date", {}).get("edtf", "XXXX")
|
|
if current_birth == "XXXX" or current_birth.endswith("X"):
|
|
if result["confidence"] >= 0.80:
|
|
data["birth_date"] = {
|
|
"edtf": result["edtf"],
|
|
"precision": "year",
|
|
"source": "web_search_enrichment",
|
|
"confidence": result["confidence"]
|
|
}
|
|
|
|
# Save
|
|
with open(filepath, "w") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
return {
|
|
"status": "enriched",
|
|
"name": full_name,
|
|
"birth_year": result["birth_year"],
|
|
"confidence": result["confidence"],
|
|
"source": result.get("source_url", result["source_type"])
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Enrich PPID files using Linkup web search")
|
|
parser.add_argument("--limit", type=int, default=10, help="Maximum files to process")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't write changes")
|
|
parser.add_argument("--min-confidence", type=float, default=0.70, help="Minimum confidence threshold")
|
|
parser.add_argument("--heritage-only", action="store_true", default=True, help="Only process heritage-relevant profiles")
|
|
args = parser.parse_args()
|
|
|
|
# Get API key
|
|
try:
|
|
api_key = get_linkup_api_key()
|
|
print(f"✓ Linkup API key loaded")
|
|
except ValueError as e:
|
|
print(f"✗ {e}")
|
|
return
|
|
|
|
# Find PPID files
|
|
ppid_dir = Path(__file__).parent.parent / "data" / "person"
|
|
if not ppid_dir.exists():
|
|
print(f"✗ PPID directory not found: {ppid_dir}")
|
|
return
|
|
|
|
ppid_files = list(ppid_dir.glob("ID_*.json"))
|
|
print(f"Found {len(ppid_files)} PPID files")
|
|
|
|
# Filter to files needing enrichment (unknown or decade-only birth dates)
|
|
candidates = []
|
|
for f in ppid_files:
|
|
try:
|
|
with open(f) as fp:
|
|
data = json.load(fp)
|
|
|
|
# Check heritage relevance
|
|
if args.heritage_only:
|
|
heritage = data.get("heritage_relevance", {})
|
|
if not heritage.get("is_heritage_relevant"):
|
|
continue
|
|
|
|
# Check if birth date needs enrichment
|
|
birth = data.get("birth_date", {}).get("edtf", "XXXX")
|
|
if birth == "XXXX" or birth.endswith("X"):
|
|
# Prioritize those with good names
|
|
name = data.get("name", {}).get("full_name", "")
|
|
if name and name != "LinkedIn Member":
|
|
candidates.append(f)
|
|
except:
|
|
continue
|
|
|
|
print(f"Found {len(candidates)} files needing birth year enrichment")
|
|
|
|
# Process
|
|
stats = {"enriched": 0, "not_found": 0, "skipped": 0, "errors": 0}
|
|
results = []
|
|
|
|
for i, filepath in enumerate(candidates[:args.limit]):
|
|
print(f"\n[{i+1}/{min(len(candidates), args.limit)}] Processing {filepath.name}...")
|
|
|
|
try:
|
|
result = enrich_ppid_file(filepath, api_key, args.dry_run)
|
|
stats[result.get("status", "errors")] = stats.get(result.get("status", "errors"), 0) + 1
|
|
|
|
if result["status"] == "enriched":
|
|
print(f" ✓ Found birth year: {result['birth_year']} (confidence: {result['confidence']:.0%})")
|
|
results.append(result)
|
|
elif result["status"] == "not_found":
|
|
print(f" ✗ No birth year found for {result.get('name', 'unknown')}")
|
|
else:
|
|
print(f" - Skipped: {result.get('reason', 'unknown')}")
|
|
|
|
# Rate limit
|
|
time.sleep(1.0)
|
|
|
|
except Exception as e:
|
|
print(f" ✗ Error: {e}")
|
|
stats["errors"] += 1
|
|
|
|
# Summary
|
|
print(f"\n{'='*50}")
|
|
print("ENRICHMENT SUMMARY")
|
|
print(f"{'='*50}")
|
|
print(f"Processed: {sum(stats.values())}")
|
|
print(f"Enriched: {stats['enriched']}")
|
|
print(f"Not found: {stats['not_found']}")
|
|
print(f"Skipped: {stats['skipped']}")
|
|
print(f"Errors: {stats['errors']}")
|
|
|
|
if results:
|
|
print(f"\nEnriched profiles:")
|
|
for r in results:
|
|
print(f" - {r['name']}: born {r['birth_year']} ({r['confidence']:.0%})")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|