glam/scripts/enrich_person_comprehensive.py
2026-01-11 12:15:27 +01:00

1128 lines
54 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════════════════════╗
║ ║
║ 🚫🚫🚫 THIS SCRIPT IS DEPRECATED - DO NOT USE 🚫🚫🚫 ║
║ ║
║ Automated web enrichment has been DISABLED due to catastrophic ║
║ entity resolution failures discovered in January 2026. ║
║ ║
║ WHAT HAPPENED: ║
║ - 540+ false claims were attributed to wrong people with similar names ║
║ - Birth years from Venezuelan actresses attributed to UK curators ║
║ - Death years attributed to LIVING people ║
║ - Social media from random namesakes attributed to heritage workers ║
║ ║
║ ALL PERSON ENRICHMENT MUST NOW BE DONE MANUALLY. ║
║ See: .opencode/rules/entity-resolution-no-heuristics.md (Rule 46) ║
║ See: docs/MANUAL_PERSON_ENRICHMENT_WORKFLOW.md ║
║ ║
║ This script is preserved for reference only. Running it will exit ║
║ immediately unless --force-deprecated is passed. ║
║ ║
╚══════════════════════════════════════════════════════════════════════════════╝
Comprehensive Person Profile Enrichment via Linkup Web Search
This script enriches person profiles with ALL discoverable data from web sources,
with FULL PROVENANCE for every claim. No data is stored without a verifiable source.
⚠️ DATA QUALITY IS OF UTMOST IMPORTANCE ⚠️
Wrong data is worse than no data. All enrichments are double-checked via entity
resolution validation before being committed.
Rule Compliance:
- Rule 6: WebObservation Claims MUST Have XPath Provenance (adapted for web search)
- Rule 21: Data Fabrication is Strictly Prohibited
- Rule 26: Person Data Provenance - Web Claims for Staff Information
- Rule 34: Linkup is the Preferred Web Scraper
- Rule 35: Provenance Statements MUST Have Dual Timestamps
- Rule 46: Entity Resolution - Names Are NEVER Sufficient
Data Extracted (when available):
- Birth date/year, birth location
- Education history, career milestones
- Publications, awards/honors
- Professional affiliations
- Death date (if applicable)
- Contact details (email, phone, social media)
- Media references (photos, videos, portraits)
Usage:
THIS SCRIPT IS DEPRECATED. Use manual enrichment instead.
If you must run it (NOT RECOMMENDED):
python scripts/enrich_person_comprehensive.py --force-deprecated --limit N [--dry-run]
"""
import json
import os
import re
import time
import argparse
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List
import httpx
LINKUP_API_URL = "https://api.linkup.so/v1/search"
SCRIPT_VERSION = "1.4.0" # Added entity resolution validation
# False positive detection patterns
NEGATIVE_STATEMENT_PATTERNS = [
r'no\s+(?:information|data|details?|evidence)',
r'not\s+(?:found|available|mentioned|provided|disclosed)',
r"(?:isn't|aren't|wasn't|weren't)\s+(?:found|available|mentioned)",
r'in\s+the\s+provided\s+data',
r'(?:no|not)\s+(?:specific|explicit)',
r'could\s+not\s+(?:find|locate|determine)',
r'unavailable',
r'not\s+publicly\s+(?:known|available|disclosed)',
]
# URLs that indicate historical/genealogical data (not about current person)
GENEALOGY_URL_PATTERNS = [
r'genealog', r'ancestry', r'familysearch', r'findagrave', r'geni\.com',
r'myheritage', r'wikitree', r'family\.', r'grivel\.net', r'geneanet',
r'billiongraves', r'interment\.net', r'cemeter', r'grave',
]
# Garbage extraction patterns (non-sensical extractions)
GARBAGE_PATTERNS = [
r'provided\s+data', r'available\s+information', r'search\s+results?',
r'mentioned\s+in', r'according\s+to', r'based\s+on\s+the',
r'not\s+(?:available|found|mentioned)', r'no\s+(?:information|data)',
]
# High-risk sources that require entity resolution (Rule 46)
HIGH_RISK_SOURCES = [
r'wikipedia\.org', r'imdb\.com', r'instagram\.com', r'tiktok\.com',
r'facebook\.com', r'researchgate\.net', r'academia\.edu', r'scholar\.google',
]
# Profession conflict detection (Rule 46)
HERITAGE_PROFESSIONS = [
'curator', 'archivist', 'librarian', 'conservator', 'registrar',
'collection', 'heritage', 'museum', 'archive', 'library', 'preservation'
]
CONFLICTING_PROFESSIONS = [
'actress', 'actor', 'singer', 'footballer', 'politician', 'model',
'athlete', 'film', 'movie', 'band', 'music', 'sports'
]
def is_negative_statement(snippet: str) -> bool:
"""Check if snippet contains a negative statement (e.g., 'no information found')."""
snippet_lower = snippet.lower()
for pattern in NEGATIVE_STATEMENT_PATTERNS:
if re.search(pattern, snippet_lower):
return True
return False
def is_genealogy_source(url: str) -> bool:
"""Check if URL is from a genealogy/historical source (not about current person)."""
url_lower = url.lower()
for pattern in GENEALOGY_URL_PATTERNS:
if re.search(pattern, url_lower):
return True
return False
def is_garbage_extraction(value: str) -> bool:
"""Check if extracted value is garbage (meta-text about the search, not actual data)."""
value_lower = value.lower()
for pattern in GARBAGE_PATTERNS:
if re.search(pattern, value_lower):
return True
return False
def is_high_risk_source(url: str) -> bool:
"""Check if URL is from a high-risk source requiring entity resolution (Rule 46)."""
url_lower = url.lower()
for pattern in HIGH_RISK_SOURCES:
if re.search(pattern, url_lower):
return True
return False
def has_profession_conflict(profile_role: str, source_text: str) -> bool:
"""
Check for profession conflicts between profile and source (Rule 46).
If profile is a heritage professional but source describes an entertainment figure,
these are DIFFERENT PEOPLE.
"""
profile_lower = profile_role.lower()
source_lower = source_text.lower()
profile_is_heritage = any(p in profile_lower for p in HERITAGE_PROFESSIONS)
source_is_entertainment = any(p in source_lower for p in CONFLICTING_PROFESSIONS)
if profile_is_heritage and source_is_entertainment:
return True
return False
def validate_entity_resolution(profile: Dict, source_text: str, source_url: str, claim_type: str) -> tuple:
"""
Validate entity resolution before attributing a claim (Rule 46).
🚨 SIMILAR OR IDENTICAL NAMES ARE NEVER SUFFICIENT FOR ENTITY RESOLUTION.
Returns (is_valid, reason, match_count)
"""
# Get profile identity attributes
profile_employer = ''
if profile.get('affiliations'):
profile_employer = profile['affiliations'][0].get('custodian_name', '').lower()
profile_role = profile.get('profile_data', {}).get('headline', '').lower()
profile_location = profile.get('profile_data', {}).get('location', '').lower()
source_lower = source_text.lower()
url_lower = source_url.lower()
# AUTOMATIC REJECTION: Genealogy sources (Rule 46)
if is_genealogy_source(source_url):
return False, "genealogy_source_always_reject", 0
# AUTOMATIC REJECTION: Profession conflicts (Rule 46)
if profile_role and has_profession_conflict(profile_role, source_text):
return False, "profession_conflict", 0
# For high-risk sources (Wikipedia, IMDB, social media), require entity verification
if is_high_risk_source(source_url):
matches = 0
match_details = []
# 1. Employer match
if profile_employer and len(profile_employer) > 3:
if profile_employer in source_lower:
matches += 1
match_details.append('employer')
# 2. Role match
if profile_role:
role_words = [w for w in profile_role.split() if len(w) > 4]
if any(w in source_lower for w in role_words):
matches += 1
match_details.append('role')
# 3. Location match
if profile_location:
loc_parts = [p.strip() for p in profile_location.split(',') if len(p.strip()) > 2]
if any(p.lower() in source_lower for p in loc_parts):
matches += 1
match_details.append('location')
# High-risk sources need at least 1 identity match for birth_year, 2 for other claims
min_matches = 2 if claim_type in ['birth_year', 'birth_date', 'death_year'] else 1
if matches < min_matches:
return False, f"insufficient_identity_verification ({matches} matches, need {min_matches})", matches
return True, f"verified ({matches} matches: {', '.join(match_details)})", matches
# Lower-risk sources pass through
return True, "low_risk_source", 0
def validate_claim(claim_type: str, claim_value: Any, snippet: str, source_url: str) -> tuple:
"""
Validate a claim before adding it. Returns (is_valid, rejection_reason).
Rule 21: Data Fabrication is Strictly Prohibited
Rule 46: Entity Resolution - Names Are NEVER Sufficient
- Reject claims from genealogy sites (ALWAYS - wrong person with same name)
- Reject claims from negative statements ("no information found")
- Reject garbage extractions (meta-text about the search)
- Reject claims from high-risk sources for birth_year/death_year (too risky)
"""
# Check for negative statements
if is_negative_statement(snippet):
return False, "negative_statement"
# RULE 46: Genealogy sources are ALWAYS rejected (not just for relationships)
if is_genealogy_source(source_url):
return False, "genealogy_source_always_reject"
# RULE 46: High-risk sources for birth/death year claims are rejected
# These are too likely to be from a different person with the same name
if claim_type in ['birth_year', 'birth_date', 'death_year', 'death_date']:
if is_high_risk_source(source_url):
return False, "high_risk_source_birth_death_rejected"
# Check for garbage extraction
if isinstance(claim_value, dict):
for v in claim_value.values():
if isinstance(v, str) and is_garbage_extraction(v):
return False, "garbage_extraction"
elif isinstance(claim_value, str):
if is_garbage_extraction(claim_value):
return False, "garbage_extraction"
return True, None
def get_linkup_api_key() -> str:
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
with open(env_path) as f:
for line in f:
if line.startswith("LINKUP_API_KEY="):
return line.strip().split("=", 1)[1].strip('"\'')
key = os.environ.get("LINKUP_API_KEY", "")
if not key:
raise ValueError("LINKUP_API_KEY not found")
return key
def search_linkup(query: str, api_key: str, depth: str = "standard") -> Dict[str, Any]:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {"q": query, "depth": depth, "outputType": "sourcedAnswer"}
request_ts = datetime.now(timezone.utc).isoformat()
try:
with httpx.Client(timeout=45.0) as client:
response = client.post(LINKUP_API_URL, headers=headers, json=payload)
response.raise_for_status()
result = response.json()
result["_meta"] = {"request_ts": request_ts, "response_ts": datetime.now(timezone.utc).isoformat(),
"status": response.status_code, "depth": depth}
return result
except Exception as e:
return {"error": str(e), "_meta": {"request_ts": request_ts}}
def create_claim(claim_type: str, claim_value: Any, source_url: str, source_title: str,
snippet: str, query: str, sources: List = None, meta: Dict = None,
answer: str = None, pattern: str = None) -> Optional[Dict]:
"""
Create a claim with full provenance. Returns None if claim fails validation.
Rule 21: Data Fabrication is Strictly Prohibited
"""
# Validate claim before creating
is_valid, rejection_reason = validate_claim(claim_type, claim_value, snippet, source_url)
if not is_valid:
# Log rejection for debugging but don't create claim
return None
ts = datetime.now(timezone.utc).isoformat()
src_ts = meta.get("request_ts", ts) if meta else ts
prov = {
"statement_created_at": ts, "source_archived_at": src_ts,
"retrieval_agent": f"enrich_person_comprehensive.py v{SCRIPT_VERSION}",
"retrieval_method": "linkup_web_search", "api_endpoint": LINKUP_API_URL,
"search_query": query, "search_depth": meta.get("depth", "standard") if meta else "standard",
"source_url": source_url, "source_title": source_title, "source_snippet": snippet,
"extraction_method": "regex_pattern_matching", "pattern_type": pattern,
"verified": False, "verification_status": "machine_extracted", "requires_human_review": True,
"http_status": meta.get("status") if meta else None,
}
if answer and snippet:
pos = answer.find(snippet[:50])
if pos >= 0:
prov["answer_position"] = f"answer[{pos}:{pos + len(snippet)}]"
if sources:
prov["all_sources"] = [{"url": s.get("url", ""), "name": s.get("name", "")} for s in sources[:5]]
prov["source_count"] = len(sources)
if answer:
prov["answer_content_hash"] = hashlib.sha256(answer.encode()).hexdigest()[:16]
return {"claim_type": claim_type, "claim_value": claim_value, "provenance": prov}
def add_claim_if_valid(claims_list: List, claim: Optional[Dict]) -> None:
"""Add claim to list only if it's not None (passed validation)."""
if claim is not None:
claims_list.append(claim)
def extract_birth_year(text):
if not text: return None
patterns = [(r'born\s+(?:on\s+)?(\d{1,2}\s+\w+\s+)?(\d{4})', "full_date"),
(r'born\s+(?:on\s+)?(\w+\s+\d{1,2},?\s+)(\d{4})', "us_date"),
(r'(?:was\s+)?born\s+in\s+(\d{4})', "born_in"), (r'geboren\s+(?:in\s+)?(\d{4})', "dutch"),
(r'\(born\s+(\d{4})\)', "paren"), (r'\((\d{4})\)', "year_paren")]
for pat, ptype in patterns:
m = re.search(pat, text, re.I)
if m and m.lastindex:
yr = int(m.group(m.lastindex))
if 1900 <= yr <= 2010:
if ptype == "year_paren" and yr >= 1990: continue
return {"year": yr, "snippet": text[max(0,m.start()-40):m.end()+40].strip(), "pattern_type": ptype}
return None
def extract_birth_location(text):
for pat in [r'born\s+in\s+([A-Z][a-zA-Z\s,]+)', r'geboren\s+(?:te|in)\s+([A-Z][a-zA-Z\s]+)']:
m = re.search(pat, text)
if m:
loc = m.group(1).strip()
if loc.lower() not in ['the', 'a', 'an']:
return {"location": loc, "snippet": text[max(0,m.start()-30):m.end()+30].strip()}
return None
def extract_death_info(text):
for pat in [r'died\s+(?:on\s+)?(?:\d{1,2}\s+\w+\s+)?(\d{4})', r'\(\d{4}\s*[-]\s*(\d{4})\)',
r'passed\s+away\s+(?:in\s+)?(\d{4})', r'overleden\s+(?:in\s+)?(\d{4})']:
m = re.search(pat, text, re.I)
if m:
yr = int(m.group(1))
if 1900 <= yr <= datetime.now().year:
return {"year": yr, "snippet": text[max(0,m.start()-30):m.end()+30].strip()}
return None
def extract_education(text):
edu = []
patterns = [(r'(Ph\.?D\.?|doctorate)\s+(?:from|at)\s+([A-Z][^,\.]+?)(?:\s+in\s+(\d{4}))?', "phd"),
(r"(master'?s?|M\.?A\.?)\s+(?:from|at)\s+([A-Z][^,\.]+)", "masters"),
(r'graduated\s+from\s+([A-Z][^,\.]+?)(?:\s+in\s+)?(\d{4})?', "graduated"),
(r'studied\s+(?:\w+\s+)?at\s+([A-Z][^,\.]+)', "studied")]
for pat, etype in patterns:
for m in re.finditer(pat, text, re.I):
inst = m.group(2) if etype in ["phd", "masters"] else m.group(1)
yr = None
if m.lastindex and m.lastindex >= 3 and m.group(3):
try: yr = int(m.group(3))
except: pass
edu.append({"type": etype, "institution": inst.strip(), "year": yr,
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return edu
def extract_positions(text):
pos = []
for pat in [r'(professor|director|curator|head|chief)\s+(?:at|of)\s+([A-Z][^,\.]{3,50})(?:\s+since\s+(\d{4}))?',
r'appointed\s+(\w+)\s+(?:at\s+)?([A-Z][^,\.]{3,50})(?:\s+in\s+(\d{4}))?']:
for m in re.finditer(pat, text, re.I):
org = m.group(2).strip() if m.lastindex >= 2 and m.group(2) else None
yr = None
if m.lastindex and m.lastindex >= 3 and m.group(3):
try: yr = int(m.group(3))
except: pass
pos.append({"title": m.group(1), "organization": org, "year": yr,
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return pos
def extract_publications(text):
pubs = []
for pat, ptype in [(r'(?:author|wrote|published)\s+(?:of\s+)?["\']([^"\']+)["\']', "book"),
(r'published\s+["\']?([^"\',.]+)["\']?\s+(?:in\s+)?(\d{4})', "publication")]:
for m in re.finditer(pat, text, re.I):
title = m.group(1).strip()
yr = int(m.group(2)) if m.lastindex >= 2 and m.group(2) else None
pubs.append({"type": ptype, "title": title, "year": yr,
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return pubs
def extract_awards(text):
awards = []
for pat, atype in [(r'(?:received|awarded|won)\s+(?:the\s+)?([A-Z][^,\.]{5,50})', "award"),
(r'Fellow\s+of\s+(?:the\s+)?([A-Z][^,\.]{5,50})', "fellowship")]:
for m in re.finditer(pat, text, re.I):
awards.append({"type": atype, "name": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return awards
def extract_contacts(text):
"""Extract contact info including social media profiles (Instagram, Facebook, TikTok, Twitter/X)."""
contacts = []
seen_values = set() # Deduplication
# Blocklist for common false positives
twitter_blocklist = {'handle', 'handles', 'profile', 'profiles', 'account', 'accounts',
'found', 'available', 'not', 'no', 'or', 'and', 'the', 'is', 'are',
'was', 'were', 'has', 'have', 'with', 'for', 'o', 'a', 'example',
'gmail', 'outlook', 'yahoo', 'hotmail', 'email', 'mail', 'share',
'follow', 'tweet', 'retweet', 'like', 'post', 'status', 'search'}
instagram_blocklist = twitter_blocklist | {'photos', 'videos', 'reels', 'stories', 'explore', 'p', 'tv'}
facebook_blocklist = {'pages', 'groups', 'events', 'marketplace', 'watch', 'gaming', 'privacy',
'help', 'settings', 'login', 'signup', 'photo', 'photos', 'sharer'}
tiktok_blocklist = {'discover', 'following', 'foryou', 'live', 'upload', 'search', 'trending'}
for pat, ctype in [
# Email
(r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', "email"),
# Twitter/X - handles and URLs
(r'(?<![a-zA-Z0-9.@])@([a-zA-Z0-9_]{3,15})\b', "twitter"),
(r'(?:twitter\.com|x\.com)/([a-zA-Z0-9_]{3,15})(?:\s|$|["\'\)\]?&])', "twitter"),
(r'(https?://(?:www\.)?(?:twitter\.com|x\.com)/[a-zA-Z0-9_]{3,15})(?:\s|$|["\'\)\]])', "twitter_url"),
# Instagram - handles and URLs
(r'(?:instagram\.com)/([a-zA-Z0-9_.]{3,30})(?:\s|$|["\'\)\]?&])', "instagram"),
(r'(https?://(?:www\.)?instagram\.com/[a-zA-Z0-9_.]{3,30})(?:/\?|$|\s|["\'\)\]])', "instagram_url"),
# Facebook - profile URLs
(r'(https?://(?:www\.)?facebook\.com/(?:profile\.php\?id=\d+|[a-zA-Z0-9.]{5,50}))(?:\s|$|["\'\)\]?&])', "facebook_url"),
(r'(?:facebook\.com)/([a-zA-Z0-9.]{5,50})(?:\s|$|["\'\)\]?&])', "facebook"),
# TikTok - handles and URLs
(r'(https?://(?:www\.)?tiktok\.com/@[a-zA-Z0-9_.]{2,24})(?:\s|$|["\'\)\]?&])', "tiktok_url"),
(r'(?:tiktok\.com)/@([a-zA-Z0-9_.]{2,24})(?:\s|$|["\'\)\]?&])', "tiktok"),
# LinkedIn
(r'(https?://(?:www\.)?linkedin\.com/in/[a-zA-Z0-9\-%]+/?)', "linkedin_url"),
# YouTube channel
(r'(https?://(?:www\.)?youtube\.com/(?:c/|channel/|user/|@)[a-zA-Z0-9_\-]+)', "youtube_url"),
# ORCID
(r'(?:orcid)[:\s]*((?:\d{4}-){3}\d{3}[\dX])', "orcid"),
(r'(https?://orcid\.org/(?:\d{4}-){3}\d{3}[\dX])', "orcid_url"),
# ResearchGate profile
(r'(https?://(?:www\.)?researchgate\.net/profile/[a-zA-Z0-9_\-]+)', "researchgate_url"),
# Academia.edu profile
(r'(https?://[a-zA-Z0-9\-]+\.academia\.edu(?:/[a-zA-Z0-9_\-]+)?)', "academia_url"),
# Google Scholar
(r'(https?://scholar\.google\.com/citations\?[^\s\)\"\']+)', "google_scholar_url"),
# Bluesky
(r'(https?://bsky\.app/profile/[a-zA-Z0-9._\-]+)', "bluesky_url"),
# Mastodon (various instances)
(r'(https?://[a-zA-Z0-9\-]+\.social/@[a-zA-Z0-9_]+)', "mastodon_url"),
(r'(https?://mastodon\.[a-zA-Z]+/@[a-zA-Z0-9_]+)', "mastodon_url"),
# Threads
(r'(https?://(?:www\.)?threads\.net/@[a-zA-Z0-9_.]+)', "threads_url"),
# Personal website
(r'(?:website|homepage|site)[:\s]*(https?://[a-zA-Z0-9\-]+\.[a-zA-Z]{2,}[^\s]*)', "website"),
# Phone numbers (international formats)
(r'(?:phone|tel|telephone|fax)[:\s]*(\+?[0-9][0-9\s\-\(\)]{8,18}[0-9])', "phone"),
(r'(?<!\d)(\+31[\s\-]?[0-9][\s\-]?[0-9]{3,4}[\s\-]?[0-9]{3,4})(?!\d)', "phone"), # Dutch
(r'(?<!\d)(\+1[\s\-]?\(?[0-9]{3}\)?[\s\-]?[0-9]{3}[\s\-]?[0-9]{4})(?!\d)', "phone"), # US/Canada
]:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip().rstrip('/').rstrip('?')
# Normalize phone numbers for deduplication
norm_val = re.sub(r'[\s\-\(\)]', '', val) if ctype == "phone" else val
# Skip duplicates
dedup_key = f"{ctype}:{norm_val.lower()}"
if dedup_key in seen_values: continue
seen_values.add(dedup_key)
# Skip common false positives
if ctype == "email" and any(x in val.lower() for x in ['example.com', 'test.com']): continue
if ctype == "twitter" and val.lower() in twitter_blocklist: continue
if ctype == "instagram" and val.lower() in instagram_blocklist: continue
if ctype == "facebook" and val.lower() in facebook_blocklist: continue
if ctype == "tiktok" and val.lower() in tiktok_blocklist: continue
# Skip if value is too short (likely false positive)
if ctype in ["twitter", "instagram", "tiktok"] and len(val) < 3: continue
if ctype == "facebook" and len(val) < 5: continue
contacts.append({"type": ctype, "value": val,
"snippet": text[max(0,m.start()-30):m.end()+30].strip()})
return contacts
def extract_media(text):
media = []
for pat, mtype in [(r'(https?://[^\s]+\.(?:jpg|jpeg|png|gif|webp))', "image_url"),
(r'(https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|vimeo\.com/)[^\s]+)', "video_url"),
(r'(https?://upload\.wikimedia\.org/[^\s]+)', "wikimedia_image")]:
for m in re.finditer(pat, text, re.I):
media.append({"type": mtype, "value": m.group(1).strip(),
"snippet": text[max(0,m.start()-30):m.end()+30].strip()})
return media
def extract_social(text):
"""Extract comprehensive social network data - family, collaborators, mentors, students, etc."""
conns = []
seen = set() # Deduplicate by (relationship_type, name)
# Name pattern: 1-4 capitalized words (handles multi-part names)
name_pat = r'([A-Z][a-zA-Z]+(?:\s+(?:van|de|der|den|von|la|el|al|ibn|bin)\s+)?[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+){0,2})'
patterns = [
# Family relationships
(rf'(?:married|spouse|wife|husband)\s+(?:to\s+|of\s+)?{name_pat}', "spouse"),
(rf'(?:married|wed)\s+{name_pat}', "spouse"),
(rf'(?:daughter|son|child)\s+of\s+{name_pat}', "parent"),
(rf'(?:father|mother|parent)\s+(?:is|was)?\s*{name_pat}', "parent"),
(rf'(?:brother|sister|sibling)\s+(?:of\s+|is\s+)?{name_pat}', "sibling"),
(rf'(?:children|son|daughter)[:;,]?\s+{name_pat}', "child"),
# Professional collaborators
(rf'(?:collaborated|cooperated|partnered)\s+with\s+{name_pat}', "collaborator"),
(rf'(?:co-authored?|co-wrote)\s+(?:with\s+)?{name_pat}', "co_author"),
(rf'(?:worked\s+with|working\s+with)\s+{name_pat}', "colleague"),
(rf'(?:colleague|coworker)\s+{name_pat}', "colleague"),
(rf'(?:together\s+with|alongside)\s+{name_pat}', "collaborator"),
(rf'(?:joint\s+(?:work|research|project))\s+with\s+{name_pat}', "collaborator"),
# Academic relationships
(rf'(?:student|protégé|advisee)\s+of\s+{name_pat}', "advisor"),
(rf'(?:mentored?|supervised?|advised?)\s+by\s+{name_pat}', "mentor"),
(rf'(?:PhD|doctoral)\s+(?:supervisor|advisor)[:;]?\s*{name_pat}', "phd_advisor"),
(rf'(?:thesis|dissertation)\s+(?:supervisor|advisor)[:;]?\s*{name_pat}', "thesis_advisor"),
(rf'(?:under|with)\s+(?:the\s+)?(?:supervision|guidance)\s+of\s+{name_pat}', "supervisor"),
(rf'(?:mentor|advisor)\s+(?:to|of)\s+{name_pat}', "mentee"),
(rf'(?:students?|advisees?)[:;,]?\s+(?:include\s+)?{name_pat}', "student"),
(rf'(?:supervised?|mentored?|advised?)\s+{name_pat}', "mentee"),
(rf'(?:trained|educated)\s+(?:under\s+)?{name_pat}', "trainer"),
# Team and organizational
(rf'(?:team|group)\s+(?:includes?|members?)[:;,]?\s*{name_pat}', "team_member"),
(rf'(?:succeeded?|replaced?|followed?)\s+(?:by\s+)?{name_pat}', "successor"),
(rf'(?:successor\s+(?:to|of)|preceded\s+by)\s+{name_pat}', "predecessor"),
(rf'(?:appointed|hired|recruited)\s+by\s+{name_pat}', "recruiter"),
(rf'(?:assistant|deputy)\s+(?:to|of)\s+{name_pat}', "supervisor"),
(rf'(?:works?\s+for|reports?\s+to)\s+{name_pat}', "manager"),
# Research/project teams
(rf'(?:research\s+team|project\s+team|lab)\s+(?:of|led\s+by)\s+{name_pat}', "research_lead"),
(rf'(?:with\s+researchers?|with\s+scientists?)\s+{name_pat}', "research_collaborator"),
(rf'(?:co-(?:PI|investigator|researcher))\s+{name_pat}', "co_investigator"),
# Friends and acquaintances
(rf'(?:friend|close\s+friend|longtime\s+friend)\s+(?:of\s+)?{name_pat}', "friend"),
(rf'(?:knew|knows|friendship\s+with)\s+{name_pat}', "acquaintance"),
# Influence and intellectual relationships
(rf'(?:influenced\s+by|inspired\s+by)\s+{name_pat}', "influence"),
(rf'(?:student\s+of\s+the\s+ideas\s+of)\s+{name_pat}', "intellectual_influence"),
(rf'(?:protégé\s+of|disciple\s+of)\s+{name_pat}', "master"),
]
for pat, rtype in patterns:
for m in re.finditer(pat, text, re.I):
name = m.group(1).strip()
# Skip if too short or common words
if len(name) < 4 or name.lower() in ['the', 'and', 'his', 'her', 'their']:
continue
key = (rtype, name.lower())
if key in seen:
continue
seen.add(key)
conns.append({
"relationship_type": rtype,
"related_person": name,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return conns
def extract_interests(text):
interests = []
for pat, itype in [(r'(?:specializes|specialized)\s+in\s+([^,\.]{5,60})', "specialization"),
(r'expert\s+(?:in|on)\s+([^,\.]{5,60})', "expertise"),
(r'known\s+for\s+(?:his|her|their\s+)?([^,\.]{5,80})', "known_for")]:
for m in re.finditer(pat, text, re.I):
interests.append({"type": itype, "topic": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return interests
def extract_hobbies(text):
"""Extract personal interests, hobbies, sports, and recreational activities."""
hobbies = []
seen = set()
# Common hobby/interest patterns
patterns = [
# Direct hobby mentions
(r'(?:hobbies?|hobby)[:\s]+([^\.]{5,80})', "hobby"),
(r'(?:enjoys?|loves?|likes?)\s+([a-zA-Z]+(?:ing|ion)?(?:\s+[a-z]+)?)', "enjoys"),
(r'(?:passionate\s+about|passion\s+for)\s+([^,\.]{5,60})', "passion"),
(r'(?:free\s+time|spare\s+time|leisure)[,\s]+(?:enjoys?|loves?|likes?)?\s*([^\.]{5,60})', "leisure"),
(r'(?:avid|keen|enthusiastic)\s+((?:[a-zA-Z]+(?:er|ist|or))|(?:[a-zA-Z]+\s+(?:fan|lover|enthusiast)))', "enthusiast"),
# Sports and physical activities
(r'(?:plays?|played)\s+(tennis|golf|football|soccer|basketball|cricket|rugby|hockey|volleyball|badminton)', "sport"),
(r'(?:runner|cyclist|swimmer|skier|hiker|climber|sailor|surfer|golfer|tennis\s+player)', "athletic"),
(r'(?:marathon|triathlon|cycling|running|swimming|hiking|climbing|sailing|surfing)', "athletic_activity"),
# Creative hobbies
(r'(?:paints?|painting|painter|artist|sculpt(?:s|or|ure)?|photograph(?:y|er)?)', "creative"),
(r'(?:writes?|writing|writer|author|poet|poetry|novelist)', "writing"),
(r'(?:music(?:ian)?|plays?\s+(?:the\s+)?(?:piano|guitar|violin|drums|flute|saxophone))', "music"),
(r'(?:sings?|singer|vocalist|choir)', "music"),
(r'(?:gardening|gardener|gardens?)', "gardening"),
(r'(?:cooking|cook|chef|culinary|baking|baker)', "culinary"),
# Collecting
(r'(?:collects?|collector\s+of|collection\s+of)\s+([^,\.]{5,40})', "collecting"),
# Reading and intellectual
(r'(?:voracious|avid)\s+reader', "reading"),
(r'(?:reads?\s+(?:widely|extensively))', "reading"),
# Travel
(r'(?:travels?|traveled|travelling|traveler|wanderlust)', "travel"),
(r'(?:visited|visits)\s+(?:over\s+)?(\d+)\s+countries', "travel"),
# Other interests
(r'(?:volunteers?|volunteering|volunteer\s+work)\s+(?:at|for|with)?\s*([^,\.]{5,40})?', "volunteering"),
(r'(?:animal\s+lover|pet\s+owner|dog\s+lover|cat\s+lover)', "animals"),
]
for pat, htype in patterns:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip() if m.lastindex and m.group(1) else m.group(0).strip()
if len(val) < 3 or val.lower() in ['the', 'and', 'his', 'her', 'their', 'a', 'an']:
continue
key = (htype, val.lower()[:30])
if key in seen:
continue
seen.add(key)
hobbies.append({
"type": htype,
"activity": val,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return hobbies
def extract_political(text):
"""Extract political affiliations, activism, and civic engagement."""
political = []
seen = set()
patterns = [
# Party affiliations
(r'(?:member|supporter)\s+of\s+(?:the\s+)?([A-Z][a-zA-Z]+\s+[Pp]arty)', "party_member"),
(r'(?:democrat|republican|conservative|liberal|progressive|socialist|green\s+party)', "political_leaning"),
(r'(?:affiliated\s+with|belongs?\s+to)\s+(?:the\s+)?([A-Z][^,\.]{5,40}(?:party|movement))', "affiliation"),
# Activism and causes
(r'(?:activist|activism)\s+(?:for|in)?\s*([^,\.]{5,50})?', "activism"),
(r'(?:advocate(?:s)?|advocacy)\s+(?:for|of)\s+([^,\.]{5,50})', "advocacy"),
(r'(?:campaigns?\s+for|campaigning\s+for)\s+([^,\.]{5,50})', "campaign"),
(r'(?:fights?\s+for|fighting\s+for)\s+([^,\.]{5,50})', "cause"),
# Social causes
(r'(?:climate\s+(?:activist|action|advocacy)|environmental(?:ist)?)', "environmental"),
(r'(?:human\s+rights|civil\s+rights|social\s+justice)', "rights_advocacy"),
(r'(?:feminist|feminism|women\'?s\s+rights)', "feminism"),
(r'(?:LGBTQ?\+?|gay\s+rights|marriage\s+equality)', "lgbtq_rights"),
(r'(?:racial\s+equality|anti-?racism|BLM|Black\s+Lives\s+Matter)', "racial_justice"),
(r'(?:refugee|migrant|immigration)\s+(?:rights|advocacy|support)', "migration_advocacy"),
# Civic engagement
(r'(?:serves?|served)\s+(?:on|in)\s+(?:the\s+)?([A-Z][^,\.]{5,50}(?:council|board|committee))', "civic_service"),
(r'(?:city\s+council|town\s+council|parish\s+council)', "local_politics"),
(r'(?:elected\s+to|ran\s+for|candidate\s+for)\s+([^,\.]{5,50})', "political_candidacy"),
# Political views expressed
(r'(?:outspoken|vocal)\s+(?:critic|supporter)\s+of\s+([^,\.]{5,50})', "political_stance"),
(r'(?:opposes?|opposed\s+to|against)\s+([^,\.]{5,50})', "opposition"),
(r'(?:supports?|in\s+favor\s+of)\s+([^,\.]{5,50})', "support"),
]
for pat, ptype in patterns:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip() if m.lastindex and m.group(1) else m.group(0).strip()
if len(val) < 3:
continue
key = (ptype, val.lower()[:30])
if key in seen:
continue
seen.add(key)
political.append({
"type": ptype,
"topic": val,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return political
def extract_social_media_content(text):
"""Extract claims from social media posts/bios (Instagram, Facebook, TikTok, Twitter/X)."""
content = []
# Social media bio patterns
patterns = [
# Bio/about descriptions
(r'(?:bio|about)[:\s]+["\']?([^"\'\.]{10,200})["\']?', "bio"),
(r'(?:describes?\s+(?:himself|herself|themselves)\s+as)\s+["\']?([^"\'\.]{10,100})["\']?', "self_description"),
# Follower counts (influence indicator)
(r'(\d+(?:,\d+)?(?:\.\d+)?[KkMm]?)\s+(?:followers?|following|subscribers?)', "follower_count"),
# Location from social profiles
(r'(?:based\s+in|located\s+in|from|lives?\s+in)\s+([A-Z][a-zA-Z\s,]+?)(?:\s*[|•]|\s*$)', "social_location"),
# Hashtags used (interests indicator)
(r'(?:frequently\s+(?:uses?|posts?)|often\s+(?:uses?|posts?))\s+(?:hashtags?\s+)?(?:like\s+)?#(\w+)', "hashtag"),
# Content themes
(r'(?:posts?\s+(?:about|on)|shares?\s+(?:content\s+)?(?:about|on))\s+([^,\.]{5,60})', "content_theme"),
# Verified status
(r'(?:verified|blue\s+check|official)\s+(?:account|profile)', "verified_status"),
# Link in bio
(r'(?:link\s+in\s+bio|linktree|linktr\.ee)[:\s]*(https?://[^\s]+)?', "link_in_bio"),
]
for pat, ctype in patterns:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip() if m.lastindex and m.group(1) else m.group(0).strip()
if len(val) < 3:
continue
content.append({
"type": ctype,
"value": val,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return content
def extract_memberships(text):
mems = []
for pat, mtype in [(r'member\s+of\s+(?:the\s+)?([A-Z][^,\.]{5,60})', "membership"),
(r'(?:board\s+member|board\s+director)\s+(?:of\s+)?([A-Z][^,\.]{5,60})', "board_member")]:
for m in re.finditer(pat, text, re.I):
mems.append({"type": mtype, "organization": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return mems
def extract_nationalities(text):
nats = []
words = ["Dutch", "German", "French", "British", "American", "Belgian", "Italian", "Spanish",
"Australian", "Canadian", "Japanese", "Chinese", "Brazilian", "Mexican", "Russian"]
pat = r'\b(' + '|'.join(words) + r')\s+(?:art\s+)?(?:historian|curator|professor|director|artist)'
for m in re.finditer(pat, text, re.I):
nats.append({"nationality": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return nats
def enrich_person(name: str, context: str, api_key: str) -> Dict:
enrichment = {"web_claims": [], "enrichment_metadata": {
"enrichment_timestamp": datetime.now(timezone.utc).isoformat(),
"enrichment_agent": f"enrich_person_comprehensive.py v{SCRIPT_VERSION}",
"person_name": name, "context_used": context[:100] if context else None,
"searches_performed": [], "data_fabrication_check": "PASSED"}}
# Search 1: Biography
q1 = f'"{name}" born biography'
r1 = search_linkup(q1, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q1)
if "error" not in r1:
ans, srcs = r1.get("answer", ""), r1.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r1.get("_meta", {})
if ans:
if (b := extract_birth_year(ans)):
add_claim_if_valid(enrichment["web_claims"], create_claim("birth_year", b["year"], url, title, b["snippet"], q1, srcs, meta, ans, b.get("pattern_type")))
if (l := extract_birth_location(ans)):
add_claim_if_valid(enrichment["web_claims"], create_claim("birth_location", l["location"], url, title, l["snippet"], q1, srcs, meta, ans, "birth_location"))
if (d := extract_death_info(ans)):
add_claim_if_valid(enrichment["web_claims"], create_claim("death_year", d["year"], url, title, d["snippet"], q1, srcs, meta, ans, "death_year"))
for n in extract_nationalities(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("nationality", n["nationality"], url, title, n["snippet"], q1, srcs, meta, ans, "nationality"))
for s in extract_social(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_connection", {"relationship_type": s["relationship_type"], "related_person": s["related_person"]}, url, title, s["snippet"], q1, srcs, meta, ans, s["relationship_type"]))
time.sleep(1.0)
# Search 2: Education/Career
q2 = f'"{name}" {context} education career university'
r2 = search_linkup(q2, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q2)
if "error" not in r2:
ans, srcs = r2.get("answer", ""), r2.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r2.get("_meta", {})
if ans:
for e in extract_education(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("education", {"type": e["type"], "institution": e["institution"], "year": e["year"]}, url, title, e["snippet"], q2, srcs, meta, ans, e["type"]))
for p in extract_positions(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("position", {"title": p["title"], "organization": p["organization"], "year": p["year"]}, url, title, p["snippet"], q2, srcs, meta, ans, "position"))
for m in extract_memberships(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("membership", {"type": m["type"], "organization": m["organization"]}, url, title, m["snippet"], q2, srcs, meta, ans, m["type"]))
for i in extract_interests(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("interest", {"type": i["type"], "topic": i["topic"]}, url, title, i["snippet"], q2, srcs, meta, ans, i["type"]))
time.sleep(1.0)
# Search 3: Publications/Awards
q3 = f'"{name}" publications awards honors books'
r3 = search_linkup(q3, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q3)
if "error" not in r3:
ans, srcs = r3.get("answer", ""), r3.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r3.get("_meta", {})
if ans:
for p in extract_publications(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("publication", {"type": p["type"], "title": p["title"], "year": p["year"]}, url, title, p["snippet"], q3, srcs, meta, ans, p["type"]))
for a in extract_awards(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("award", {"type": a["type"], "name": a["name"]}, url, title, a["snippet"], q3, srcs, meta, ans, a["type"]))
time.sleep(1.0)
# Search 4: Contact/Media
q4 = f'"{name}" contact email twitter linkedin orcid profile photo'
r4 = search_linkup(q4, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q4)
if "error" not in r4:
ans, srcs = r4.get("answer", ""), r4.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r4.get("_meta", {})
if ans:
for c in extract_contacts(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("contact_detail", {"type": c["type"], "value": c["value"]}, url, title, c["snippet"], q4, srcs, meta, ans, c["type"]))
for m in extract_media(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("media_reference", {"type": m["type"], "value": m["value"]}, url, title, m["snippet"], q4, srcs, meta, ans, m["type"]))
time.sleep(1.0)
# Search 5: Academic Profiles (NEW in v1.2.0)
q5 = f'"{name}" researchgate academia.edu google scholar profile'
r5 = search_linkup(q5, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q5)
if "error" not in r5:
ans, srcs = r5.get("answer", ""), r5.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r5.get("_meta", {})
if ans:
for c in extract_contacts(ans):
# Only add academic profile types from this search
if c["type"] in ["researchgate_url", "academia_url", "google_scholar_url"]:
add_claim_if_valid(enrichment["web_claims"], create_claim("contact_detail", {"type": c["type"], "value": c["value"]}, url, title, c["snippet"], q5, srcs, meta, ans, c["type"]))
time.sleep(1.0)
# Search 6: Social Media Profiles (NEW in v1.3.0)
q6 = f'"{name}" instagram facebook tiktok twitter social media profile'
r6 = search_linkup(q6, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q6)
if "error" not in r6:
ans, srcs = r6.get("answer", ""), r6.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r6.get("_meta", {})
if ans:
# Extract social media contacts
for c in extract_contacts(ans):
if c["type"] in ["instagram", "instagram_url", "facebook", "facebook_url",
"tiktok", "tiktok_url", "twitter", "twitter_url",
"youtube_url", "bluesky_url", "mastodon_url", "threads_url"]:
add_claim_if_valid(enrichment["web_claims"], create_claim("contact_detail", {"type": c["type"], "value": c["value"]}, url, title, c["snippet"], q6, srcs, meta, ans, c["type"]))
# Extract social media content (bios, follower counts, etc.)
for sc in extract_social_media_content(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_media_content", {"type": sc["type"], "value": sc["value"]}, url, title, sc["snippet"], q6, srcs, meta, ans, sc["type"]))
# Also extract social connections from social media context
for s in extract_social(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_connection", {"relationship_type": s["relationship_type"], "related_person": s["related_person"]}, url, title, s["snippet"], q6, srcs, meta, ans, s["relationship_type"]))
time.sleep(1.0)
# Search 7: Hobbies, Interests, and Political Affiliations (NEW in v1.3.0)
q7 = f'"{name}" hobbies interests passions politics activism volunteer'
r7 = search_linkup(q7, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q7)
if "error" not in r7:
ans, srcs = r7.get("answer", ""), r7.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r7.get("_meta", {})
if ans:
# Extract hobbies and personal interests
for h in extract_hobbies(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("hobby", {"type": h["type"], "activity": h["activity"]}, url, title, h["snippet"], q7, srcs, meta, ans, h["type"]))
# Extract political affiliations and activism
for p in extract_political(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("political", {"type": p["type"], "topic": p["topic"]}, url, title, p["snippet"], q7, srcs, meta, ans, p["type"]))
# Also extract any social connections mentioned
for s in extract_social(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_connection", {"relationship_type": s["relationship_type"], "related_person": s["related_person"]}, url, title, s["snippet"], q7, srcs, meta, ans, s["relationship_type"]))
return enrichment
def process_ppid_file(filepath: Path, api_key: str, dry_run: bool = False) -> Dict:
with open(filepath) as f:
data = json.load(f)
name_data = data.get("name", {})
full_name = name_data.get("full_name") or name_data.get("display_name", "")
if not full_name or full_name == "LinkedIn Member":
return {"status": "skipped", "reason": "no_valid_name"}
if not data.get("heritage_relevance", {}).get("is_heritage_relevant"):
return {"status": "skipped", "reason": "not_heritage_relevant"}
headline = data.get("profile_data", {}).get("headline", "")
enrichment = enrich_person(full_name, headline, api_key)
if not enrichment["web_claims"]:
if not dry_run:
if "enrichment_history" not in data: data["enrichment_history"] = []
enrichment["enrichment_metadata"]["result"] = "no_claims_found"
data["enrichment_history"].append(enrichment["enrichment_metadata"])
with open(filepath, "w") as f: json.dump(data, f, indent=2, ensure_ascii=False)
return {"status": "no_claims_found", "name": full_name}
if not dry_run:
if "web_claims" not in data: data["web_claims"] = []
existing = {(c.get("claim_type"), str(c.get("claim_value"))) for c in data.get("web_claims", [])}
for claim in enrichment["web_claims"]:
key = (claim["claim_type"], str(claim["claim_value"]))
if key not in existing: data["web_claims"].append(claim)
if "enrichment_history" not in data: data["enrichment_history"] = []
data["enrichment_history"].append(enrichment["enrichment_metadata"])
birth_claims = [c for c in enrichment["web_claims"] if c["claim_type"] == "birth_year"]
if birth_claims:
current = data.get("birth_date", {}).get("edtf", "XXXX")
if current == "XXXX" or current.endswith("X"):
prov = birth_claims[0]["provenance"]
data["birth_date"] = {"edtf": str(birth_claims[0]["claim_value"]), "precision": "year",
"provenance": {k: prov[k] for k in ["statement_created_at", "source_archived_at",
"retrieval_agent", "retrieval_method", "source_url", "source_title",
"source_snippet", "search_query", "extraction_method"] if k in prov}}
data["birth_date"]["provenance"]["verified"] = False
data["birth_date"]["provenance"]["verification_status"] = "machine_extracted"
if [c for c in enrichment["web_claims"] if c["claim_type"] == "death_year"]:
data["is_living"] = False
with open(filepath, "w") as f: json.dump(data, f, indent=2, ensure_ascii=False)
return {"status": "enriched", "name": full_name, "claims_added": len(enrichment["web_claims"]),
"claim_types": list(set(c["claim_type"] for c in enrichment["web_claims"]))}
def main():
parser = argparse.ArgumentParser(
description="⚠️ DEPRECATED: Comprehensive person profile enrichment. Use manual enrichment instead.",
epilog="See docs/MANUAL_PERSON_ENRICHMENT_WORKFLOW.md for the recommended approach."
)
parser.add_argument("--limit", type=int, default=10)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--force-deprecated", action="store_true",
help="Force run this deprecated script (NOT RECOMMENDED)")
args = parser.parse_args()
# DEPRECATION CHECK - Added January 2026
if not args.force_deprecated:
print("""
╔══════════════════════════════════════════════════════════════════════════════╗
║ ║
║ 🚫🚫🚫 THIS SCRIPT IS DEPRECATED - DO NOT USE 🚫🚫🚫 ║
║ ║
║ Automated web enrichment caused 540+ false claims to be attributed ║
║ to wrong people with similar names (Rule 46 violations). ║
║ ║
║ ALL PERSON ENRICHMENT MUST NOW BE DONE MANUALLY. ║
║ ║
║ See: .opencode/rules/entity-resolution-no-heuristics.md ║
║ See: docs/MANUAL_PERSON_ENRICHMENT_WORKFLOW.md ║
║ ║
║ To force run anyway (NOT RECOMMENDED): ║
║ python scripts/enrich_person_comprehensive.py --force-deprecated ║
║ ║
╚══════════════════════════════════════════════════════════════════════════════╝
""")
return
print("⚠️ WARNING: Running deprecated script with --force-deprecated")
print("⚠️ Any enrichments MUST be manually verified before committing!")
print()
try:
api_key = get_linkup_api_key()
print(f"✓ Linkup API key loaded")
except ValueError as e:
print(f"{e}")
return
ppid_dir = Path(__file__).parent.parent / "data" / "person"
if not ppid_dir.exists():
print(f"✗ PPID directory not found")
return
print("Scanning for candidates...")
candidates = []
for f in ppid_dir.glob("ID_*.json"):
try:
with open(f) as fp: data = json.load(fp)
if not data.get("heritage_relevance", {}).get("is_heritage_relevant"): continue
if data.get("enrichment_history"): continue
name = data.get("name", {}).get("full_name", "")
if not name or name == "LinkedIn Member": continue
headline = data.get("profile_data", {}).get("headline", "").lower()
score = 0
if "professor" in headline: score += 3
if "director" in headline: score += 2
if "curator" in headline: score += 2
if "museum" in headline: score += 1
if "archive" in headline: score += 1
candidates.append((f, score, name))
except: continue
candidates.sort(key=lambda x: -x[1])
print(f"Found {len(candidates)} candidates")
stats = {"enriched": 0, "no_claims_found": 0, "skipped": 0, "errors": 0}
results = []
for i, (filepath, score, _) in enumerate(candidates[:args.limit]):
print(f"\n[{i+1}/{args.limit}] {filepath.name} (score={score})")
try:
result = process_ppid_file(filepath, api_key, args.dry_run)
stats[result.get("status", "errors")] = stats.get(result.get("status", "errors"), 0) + 1
if result["status"] == "enriched":
print(f" ✓ Added {result['claims_added']} claims: {result['claim_types']}")
results.append(result)
elif result["status"] == "no_claims_found":
print(f" ✗ No claims found for {result.get('name')}")
time.sleep(4.0)
except Exception as e:
print(f" ✗ Error: {e}")
stats["errors"] += 1
print(f"\n{'='*50}\nSUMMARY\n{'='*50}")
print(f"Enriched: {stats['enriched']}, No claims: {stats['no_claims_found']}, Errors: {stats['errors']}")
if results:
print(f"\nTotal claims added: {sum(r['claims_added'] for r in results)}")
if __name__ == "__main__":
main()