glam/scripts/enrich_person_comprehensive.py
kempersc 556cc6c294 Add workspace configuration for Git and Gitea integration
- Set up GitHub integration to be disabled.
- Configure Git settings including path and autofetch options.
- Add Gitea instance URL and repository details.
- Enable YAML support for LinkML schemas with validation.
- Define file associations for YAML files.
- Recommend essential extensions for development and exclude unwanted ones.
2026-01-11 02:50:39 +01:00

952 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Comprehensive Person Profile Enrichment via Linkup Web Search
This script enriches person profiles with ALL discoverable data from web sources,
with FULL PROVENANCE for every claim. No data is stored without a verifiable source.
Rule Compliance:
- Rule 6: WebObservation Claims MUST Have XPath Provenance (adapted for web search)
- Rule 21: Data Fabrication is Strictly Prohibited
- Rule 26: Person Data Provenance - Web Claims for Staff Information
- Rule 34: Linkup is the Preferred Web Scraper
- Rule 35: Provenance Statements MUST Have Dual Timestamps
Data Extracted (when available):
- Birth date/year, birth location
- Education history, career milestones
- Publications, awards/honors
- Professional affiliations
- Death date (if applicable)
- Contact details (email, phone, social media)
- Media references (photos, videos, portraits)
Usage:
python scripts/enrich_person_comprehensive.py --limit N [--dry-run]
"""
import json
import os
import re
import time
import argparse
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List
import httpx
LINKUP_API_URL = "https://api.linkup.so/v1/search"
SCRIPT_VERSION = "1.3.1"
# False positive detection patterns
NEGATIVE_STATEMENT_PATTERNS = [
r'no\s+(?:information|data|details?|evidence)',
r'not\s+(?:found|available|mentioned|provided|disclosed)',
r"(?:isn't|aren't|wasn't|weren't)\s+(?:found|available|mentioned)",
r'in\s+the\s+provided\s+data',
r'(?:no|not)\s+(?:specific|explicit)',
r'could\s+not\s+(?:find|locate|determine)',
r'unavailable',
r'not\s+publicly\s+(?:known|available|disclosed)',
]
# URLs that indicate historical/genealogical data (not about current person)
GENEALOGY_URL_PATTERNS = [
r'genealog', r'ancestry', r'familysearch', r'findagrave', r'geni\.com',
r'myheritage', r'wikitree', r'family\.', r'grivel\.net', r'geneanet',
r'billiongraves', r'interment\.net', r'cemeter', r'grave',
]
# Garbage extraction patterns (non-sensical extractions)
GARBAGE_PATTERNS = [
r'provided\s+data', r'available\s+information', r'search\s+results?',
r'mentioned\s+in', r'according\s+to', r'based\s+on\s+the',
r'not\s+(?:available|found|mentioned)', r'no\s+(?:information|data)',
]
def is_negative_statement(snippet: str) -> bool:
"""Check if snippet contains a negative statement (e.g., 'no information found')."""
snippet_lower = snippet.lower()
for pattern in NEGATIVE_STATEMENT_PATTERNS:
if re.search(pattern, snippet_lower):
return True
return False
def is_genealogy_source(url: str) -> bool:
"""Check if URL is from a genealogy/historical source (not about current person)."""
url_lower = url.lower()
for pattern in GENEALOGY_URL_PATTERNS:
if re.search(pattern, url_lower):
return True
return False
def is_garbage_extraction(value: str) -> bool:
"""Check if extracted value is garbage (meta-text about the search, not actual data)."""
value_lower = value.lower()
for pattern in GARBAGE_PATTERNS:
if re.search(pattern, value_lower):
return True
return False
def validate_claim(claim_type: str, claim_value: Any, snippet: str, source_url: str) -> tuple:
"""
Validate a claim before adding it. Returns (is_valid, rejection_reason).
Rule 21: Data Fabrication is Strictly Prohibited
- Reject claims from genealogy sites (wrong person with same name)
- Reject claims from negative statements ("no information found")
- Reject garbage extractions (meta-text about the search)
"""
# Check for negative statements
if is_negative_statement(snippet):
return False, "negative_statement"
# Check for genealogy sources (for relationship claims)
if claim_type in ['social_connection', 'parent', 'spouse', 'sibling', 'child']:
if is_genealogy_source(source_url):
return False, "genealogy_source_wrong_person"
# Check for garbage extraction
if isinstance(claim_value, dict):
for v in claim_value.values():
if isinstance(v, str) and is_garbage_extraction(v):
return False, "garbage_extraction"
elif isinstance(claim_value, str):
if is_garbage_extraction(claim_value):
return False, "garbage_extraction"
return True, None
def get_linkup_api_key() -> str:
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
with open(env_path) as f:
for line in f:
if line.startswith("LINKUP_API_KEY="):
return line.strip().split("=", 1)[1].strip('"\'')
key = os.environ.get("LINKUP_API_KEY", "")
if not key:
raise ValueError("LINKUP_API_KEY not found")
return key
def search_linkup(query: str, api_key: str, depth: str = "standard") -> Dict[str, Any]:
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {"q": query, "depth": depth, "outputType": "sourcedAnswer"}
request_ts = datetime.now(timezone.utc).isoformat()
try:
with httpx.Client(timeout=45.0) as client:
response = client.post(LINKUP_API_URL, headers=headers, json=payload)
response.raise_for_status()
result = response.json()
result["_meta"] = {"request_ts": request_ts, "response_ts": datetime.now(timezone.utc).isoformat(),
"status": response.status_code, "depth": depth}
return result
except Exception as e:
return {"error": str(e), "_meta": {"request_ts": request_ts}}
def create_claim(claim_type: str, claim_value: Any, source_url: str, source_title: str,
snippet: str, query: str, sources: List = None, meta: Dict = None,
answer: str = None, pattern: str = None) -> Optional[Dict]:
"""
Create a claim with full provenance. Returns None if claim fails validation.
Rule 21: Data Fabrication is Strictly Prohibited
"""
# Validate claim before creating
is_valid, rejection_reason = validate_claim(claim_type, claim_value, snippet, source_url)
if not is_valid:
# Log rejection for debugging but don't create claim
return None
ts = datetime.now(timezone.utc).isoformat()
src_ts = meta.get("request_ts", ts) if meta else ts
prov = {
"statement_created_at": ts, "source_archived_at": src_ts,
"retrieval_agent": f"enrich_person_comprehensive.py v{SCRIPT_VERSION}",
"retrieval_method": "linkup_web_search", "api_endpoint": LINKUP_API_URL,
"search_query": query, "search_depth": meta.get("depth", "standard") if meta else "standard",
"source_url": source_url, "source_title": source_title, "source_snippet": snippet,
"extraction_method": "regex_pattern_matching", "pattern_type": pattern,
"verified": False, "verification_status": "machine_extracted", "requires_human_review": True,
"http_status": meta.get("status") if meta else None,
}
if answer and snippet:
pos = answer.find(snippet[:50])
if pos >= 0:
prov["answer_position"] = f"answer[{pos}:{pos + len(snippet)}]"
if sources:
prov["all_sources"] = [{"url": s.get("url", ""), "name": s.get("name", "")} for s in sources[:5]]
prov["source_count"] = len(sources)
if answer:
prov["answer_content_hash"] = hashlib.sha256(answer.encode()).hexdigest()[:16]
return {"claim_type": claim_type, "claim_value": claim_value, "provenance": prov}
def add_claim_if_valid(claims_list: List, claim: Optional[Dict]) -> None:
"""Add claim to list only if it's not None (passed validation)."""
if claim is not None:
claims_list.append(claim)
def extract_birth_year(text):
if not text: return None
patterns = [(r'born\s+(?:on\s+)?(\d{1,2}\s+\w+\s+)?(\d{4})', "full_date"),
(r'born\s+(?:on\s+)?(\w+\s+\d{1,2},?\s+)(\d{4})', "us_date"),
(r'(?:was\s+)?born\s+in\s+(\d{4})', "born_in"), (r'geboren\s+(?:in\s+)?(\d{4})', "dutch"),
(r'\(born\s+(\d{4})\)', "paren"), (r'\((\d{4})\)', "year_paren")]
for pat, ptype in patterns:
m = re.search(pat, text, re.I)
if m and m.lastindex:
yr = int(m.group(m.lastindex))
if 1900 <= yr <= 2010:
if ptype == "year_paren" and yr >= 1990: continue
return {"year": yr, "snippet": text[max(0,m.start()-40):m.end()+40].strip(), "pattern_type": ptype}
return None
def extract_birth_location(text):
for pat in [r'born\s+in\s+([A-Z][a-zA-Z\s,]+)', r'geboren\s+(?:te|in)\s+([A-Z][a-zA-Z\s]+)']:
m = re.search(pat, text)
if m:
loc = m.group(1).strip()
if loc.lower() not in ['the', 'a', 'an']:
return {"location": loc, "snippet": text[max(0,m.start()-30):m.end()+30].strip()}
return None
def extract_death_info(text):
for pat in [r'died\s+(?:on\s+)?(?:\d{1,2}\s+\w+\s+)?(\d{4})', r'\(\d{4}\s*[-]\s*(\d{4})\)',
r'passed\s+away\s+(?:in\s+)?(\d{4})', r'overleden\s+(?:in\s+)?(\d{4})']:
m = re.search(pat, text, re.I)
if m:
yr = int(m.group(1))
if 1900 <= yr <= datetime.now().year:
return {"year": yr, "snippet": text[max(0,m.start()-30):m.end()+30].strip()}
return None
def extract_education(text):
edu = []
patterns = [(r'(Ph\.?D\.?|doctorate)\s+(?:from|at)\s+([A-Z][^,\.]+?)(?:\s+in\s+(\d{4}))?', "phd"),
(r"(master'?s?|M\.?A\.?)\s+(?:from|at)\s+([A-Z][^,\.]+)", "masters"),
(r'graduated\s+from\s+([A-Z][^,\.]+?)(?:\s+in\s+)?(\d{4})?', "graduated"),
(r'studied\s+(?:\w+\s+)?at\s+([A-Z][^,\.]+)', "studied")]
for pat, etype in patterns:
for m in re.finditer(pat, text, re.I):
inst = m.group(2) if etype in ["phd", "masters"] else m.group(1)
yr = None
if m.lastindex and m.lastindex >= 3 and m.group(3):
try: yr = int(m.group(3))
except: pass
edu.append({"type": etype, "institution": inst.strip(), "year": yr,
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return edu
def extract_positions(text):
pos = []
for pat in [r'(professor|director|curator|head|chief)\s+(?:at|of)\s+([A-Z][^,\.]{3,50})(?:\s+since\s+(\d{4}))?',
r'appointed\s+(\w+)\s+(?:at\s+)?([A-Z][^,\.]{3,50})(?:\s+in\s+(\d{4}))?']:
for m in re.finditer(pat, text, re.I):
org = m.group(2).strip() if m.lastindex >= 2 and m.group(2) else None
yr = None
if m.lastindex and m.lastindex >= 3 and m.group(3):
try: yr = int(m.group(3))
except: pass
pos.append({"title": m.group(1), "organization": org, "year": yr,
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return pos
def extract_publications(text):
pubs = []
for pat, ptype in [(r'(?:author|wrote|published)\s+(?:of\s+)?["\']([^"\']+)["\']', "book"),
(r'published\s+["\']?([^"\',.]+)["\']?\s+(?:in\s+)?(\d{4})', "publication")]:
for m in re.finditer(pat, text, re.I):
title = m.group(1).strip()
yr = int(m.group(2)) if m.lastindex >= 2 and m.group(2) else None
pubs.append({"type": ptype, "title": title, "year": yr,
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return pubs
def extract_awards(text):
awards = []
for pat, atype in [(r'(?:received|awarded|won)\s+(?:the\s+)?([A-Z][^,\.]{5,50})', "award"),
(r'Fellow\s+of\s+(?:the\s+)?([A-Z][^,\.]{5,50})', "fellowship")]:
for m in re.finditer(pat, text, re.I):
awards.append({"type": atype, "name": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return awards
def extract_contacts(text):
"""Extract contact info including social media profiles (Instagram, Facebook, TikTok, Twitter/X)."""
contacts = []
seen_values = set() # Deduplication
# Blocklist for common false positives
twitter_blocklist = {'handle', 'handles', 'profile', 'profiles', 'account', 'accounts',
'found', 'available', 'not', 'no', 'or', 'and', 'the', 'is', 'are',
'was', 'were', 'has', 'have', 'with', 'for', 'o', 'a', 'example',
'gmail', 'outlook', 'yahoo', 'hotmail', 'email', 'mail', 'share',
'follow', 'tweet', 'retweet', 'like', 'post', 'status', 'search'}
instagram_blocklist = twitter_blocklist | {'photos', 'videos', 'reels', 'stories', 'explore', 'p', 'tv'}
facebook_blocklist = {'pages', 'groups', 'events', 'marketplace', 'watch', 'gaming', 'privacy',
'help', 'settings', 'login', 'signup', 'photo', 'photos', 'sharer'}
tiktok_blocklist = {'discover', 'following', 'foryou', 'live', 'upload', 'search', 'trending'}
for pat, ctype in [
# Email
(r'\b([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b', "email"),
# Twitter/X - handles and URLs
(r'(?<![a-zA-Z0-9.@])@([a-zA-Z0-9_]{3,15})\b', "twitter"),
(r'(?:twitter\.com|x\.com)/([a-zA-Z0-9_]{3,15})(?:\s|$|["\'\)\]?&])', "twitter"),
(r'(https?://(?:www\.)?(?:twitter\.com|x\.com)/[a-zA-Z0-9_]{3,15})(?:\s|$|["\'\)\]])', "twitter_url"),
# Instagram - handles and URLs
(r'(?:instagram\.com)/([a-zA-Z0-9_.]{3,30})(?:\s|$|["\'\)\]?&])', "instagram"),
(r'(https?://(?:www\.)?instagram\.com/[a-zA-Z0-9_.]{3,30})(?:/\?|$|\s|["\'\)\]])', "instagram_url"),
# Facebook - profile URLs
(r'(https?://(?:www\.)?facebook\.com/(?:profile\.php\?id=\d+|[a-zA-Z0-9.]{5,50}))(?:\s|$|["\'\)\]?&])', "facebook_url"),
(r'(?:facebook\.com)/([a-zA-Z0-9.]{5,50})(?:\s|$|["\'\)\]?&])', "facebook"),
# TikTok - handles and URLs
(r'(https?://(?:www\.)?tiktok\.com/@[a-zA-Z0-9_.]{2,24})(?:\s|$|["\'\)\]?&])', "tiktok_url"),
(r'(?:tiktok\.com)/@([a-zA-Z0-9_.]{2,24})(?:\s|$|["\'\)\]?&])', "tiktok"),
# LinkedIn
(r'(https?://(?:www\.)?linkedin\.com/in/[a-zA-Z0-9\-%]+/?)', "linkedin_url"),
# YouTube channel
(r'(https?://(?:www\.)?youtube\.com/(?:c/|channel/|user/|@)[a-zA-Z0-9_\-]+)', "youtube_url"),
# ORCID
(r'(?:orcid)[:\s]*((?:\d{4}-){3}\d{3}[\dX])', "orcid"),
(r'(https?://orcid\.org/(?:\d{4}-){3}\d{3}[\dX])', "orcid_url"),
# ResearchGate profile
(r'(https?://(?:www\.)?researchgate\.net/profile/[a-zA-Z0-9_\-]+)', "researchgate_url"),
# Academia.edu profile
(r'(https?://[a-zA-Z0-9\-]+\.academia\.edu(?:/[a-zA-Z0-9_\-]+)?)', "academia_url"),
# Google Scholar
(r'(https?://scholar\.google\.com/citations\?[^\s\)\"\']+)', "google_scholar_url"),
# Bluesky
(r'(https?://bsky\.app/profile/[a-zA-Z0-9._\-]+)', "bluesky_url"),
# Mastodon (various instances)
(r'(https?://[a-zA-Z0-9\-]+\.social/@[a-zA-Z0-9_]+)', "mastodon_url"),
(r'(https?://mastodon\.[a-zA-Z]+/@[a-zA-Z0-9_]+)', "mastodon_url"),
# Threads
(r'(https?://(?:www\.)?threads\.net/@[a-zA-Z0-9_.]+)', "threads_url"),
# Personal website
(r'(?:website|homepage|site)[:\s]*(https?://[a-zA-Z0-9\-]+\.[a-zA-Z]{2,}[^\s]*)', "website"),
# Phone numbers (international formats)
(r'(?:phone|tel|telephone|fax)[:\s]*(\+?[0-9][0-9\s\-\(\)]{8,18}[0-9])', "phone"),
(r'(?<!\d)(\+31[\s\-]?[0-9][\s\-]?[0-9]{3,4}[\s\-]?[0-9]{3,4})(?!\d)', "phone"), # Dutch
(r'(?<!\d)(\+1[\s\-]?\(?[0-9]{3}\)?[\s\-]?[0-9]{3}[\s\-]?[0-9]{4})(?!\d)', "phone"), # US/Canada
]:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip().rstrip('/').rstrip('?')
# Normalize phone numbers for deduplication
norm_val = re.sub(r'[\s\-\(\)]', '', val) if ctype == "phone" else val
# Skip duplicates
dedup_key = f"{ctype}:{norm_val.lower()}"
if dedup_key in seen_values: continue
seen_values.add(dedup_key)
# Skip common false positives
if ctype == "email" and any(x in val.lower() for x in ['example.com', 'test.com']): continue
if ctype == "twitter" and val.lower() in twitter_blocklist: continue
if ctype == "instagram" and val.lower() in instagram_blocklist: continue
if ctype == "facebook" and val.lower() in facebook_blocklist: continue
if ctype == "tiktok" and val.lower() in tiktok_blocklist: continue
# Skip if value is too short (likely false positive)
if ctype in ["twitter", "instagram", "tiktok"] and len(val) < 3: continue
if ctype == "facebook" and len(val) < 5: continue
contacts.append({"type": ctype, "value": val,
"snippet": text[max(0,m.start()-30):m.end()+30].strip()})
return contacts
def extract_media(text):
media = []
for pat, mtype in [(r'(https?://[^\s]+\.(?:jpg|jpeg|png|gif|webp))', "image_url"),
(r'(https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|vimeo\.com/)[^\s]+)', "video_url"),
(r'(https?://upload\.wikimedia\.org/[^\s]+)', "wikimedia_image")]:
for m in re.finditer(pat, text, re.I):
media.append({"type": mtype, "value": m.group(1).strip(),
"snippet": text[max(0,m.start()-30):m.end()+30].strip()})
return media
def extract_social(text):
"""Extract comprehensive social network data - family, collaborators, mentors, students, etc."""
conns = []
seen = set() # Deduplicate by (relationship_type, name)
# Name pattern: 1-4 capitalized words (handles multi-part names)
name_pat = r'([A-Z][a-zA-Z]+(?:\s+(?:van|de|der|den|von|la|el|al|ibn|bin)\s+)?[A-Z][a-zA-Z]+(?:\s+[A-Z][a-zA-Z]+){0,2})'
patterns = [
# Family relationships
(rf'(?:married|spouse|wife|husband)\s+(?:to\s+|of\s+)?{name_pat}', "spouse"),
(rf'(?:married|wed)\s+{name_pat}', "spouse"),
(rf'(?:daughter|son|child)\s+of\s+{name_pat}', "parent"),
(rf'(?:father|mother|parent)\s+(?:is|was)?\s*{name_pat}', "parent"),
(rf'(?:brother|sister|sibling)\s+(?:of\s+|is\s+)?{name_pat}', "sibling"),
(rf'(?:children|son|daughter)[:;,]?\s+{name_pat}', "child"),
# Professional collaborators
(rf'(?:collaborated|cooperated|partnered)\s+with\s+{name_pat}', "collaborator"),
(rf'(?:co-authored?|co-wrote)\s+(?:with\s+)?{name_pat}', "co_author"),
(rf'(?:worked\s+with|working\s+with)\s+{name_pat}', "colleague"),
(rf'(?:colleague|coworker)\s+{name_pat}', "colleague"),
(rf'(?:together\s+with|alongside)\s+{name_pat}', "collaborator"),
(rf'(?:joint\s+(?:work|research|project))\s+with\s+{name_pat}', "collaborator"),
# Academic relationships
(rf'(?:student|protégé|advisee)\s+of\s+{name_pat}', "advisor"),
(rf'(?:mentored?|supervised?|advised?)\s+by\s+{name_pat}', "mentor"),
(rf'(?:PhD|doctoral)\s+(?:supervisor|advisor)[:;]?\s*{name_pat}', "phd_advisor"),
(rf'(?:thesis|dissertation)\s+(?:supervisor|advisor)[:;]?\s*{name_pat}', "thesis_advisor"),
(rf'(?:under|with)\s+(?:the\s+)?(?:supervision|guidance)\s+of\s+{name_pat}', "supervisor"),
(rf'(?:mentor|advisor)\s+(?:to|of)\s+{name_pat}', "mentee"),
(rf'(?:students?|advisees?)[:;,]?\s+(?:include\s+)?{name_pat}', "student"),
(rf'(?:supervised?|mentored?|advised?)\s+{name_pat}', "mentee"),
(rf'(?:trained|educated)\s+(?:under\s+)?{name_pat}', "trainer"),
# Team and organizational
(rf'(?:team|group)\s+(?:includes?|members?)[:;,]?\s*{name_pat}', "team_member"),
(rf'(?:succeeded?|replaced?|followed?)\s+(?:by\s+)?{name_pat}', "successor"),
(rf'(?:successor\s+(?:to|of)|preceded\s+by)\s+{name_pat}', "predecessor"),
(rf'(?:appointed|hired|recruited)\s+by\s+{name_pat}', "recruiter"),
(rf'(?:assistant|deputy)\s+(?:to|of)\s+{name_pat}', "supervisor"),
(rf'(?:works?\s+for|reports?\s+to)\s+{name_pat}', "manager"),
# Research/project teams
(rf'(?:research\s+team|project\s+team|lab)\s+(?:of|led\s+by)\s+{name_pat}', "research_lead"),
(rf'(?:with\s+researchers?|with\s+scientists?)\s+{name_pat}', "research_collaborator"),
(rf'(?:co-(?:PI|investigator|researcher))\s+{name_pat}', "co_investigator"),
# Friends and acquaintances
(rf'(?:friend|close\s+friend|longtime\s+friend)\s+(?:of\s+)?{name_pat}', "friend"),
(rf'(?:knew|knows|friendship\s+with)\s+{name_pat}', "acquaintance"),
# Influence and intellectual relationships
(rf'(?:influenced\s+by|inspired\s+by)\s+{name_pat}', "influence"),
(rf'(?:student\s+of\s+the\s+ideas\s+of)\s+{name_pat}', "intellectual_influence"),
(rf'(?:protégé\s+of|disciple\s+of)\s+{name_pat}', "master"),
]
for pat, rtype in patterns:
for m in re.finditer(pat, text, re.I):
name = m.group(1).strip()
# Skip if too short or common words
if len(name) < 4 or name.lower() in ['the', 'and', 'his', 'her', 'their']:
continue
key = (rtype, name.lower())
if key in seen:
continue
seen.add(key)
conns.append({
"relationship_type": rtype,
"related_person": name,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return conns
def extract_interests(text):
interests = []
for pat, itype in [(r'(?:specializes|specialized)\s+in\s+([^,\.]{5,60})', "specialization"),
(r'expert\s+(?:in|on)\s+([^,\.]{5,60})', "expertise"),
(r'known\s+for\s+(?:his|her|their\s+)?([^,\.]{5,80})', "known_for")]:
for m in re.finditer(pat, text, re.I):
interests.append({"type": itype, "topic": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return interests
def extract_hobbies(text):
"""Extract personal interests, hobbies, sports, and recreational activities."""
hobbies = []
seen = set()
# Common hobby/interest patterns
patterns = [
# Direct hobby mentions
(r'(?:hobbies?|hobby)[:\s]+([^\.]{5,80})', "hobby"),
(r'(?:enjoys?|loves?|likes?)\s+([a-zA-Z]+(?:ing|ion)?(?:\s+[a-z]+)?)', "enjoys"),
(r'(?:passionate\s+about|passion\s+for)\s+([^,\.]{5,60})', "passion"),
(r'(?:free\s+time|spare\s+time|leisure)[,\s]+(?:enjoys?|loves?|likes?)?\s*([^\.]{5,60})', "leisure"),
(r'(?:avid|keen|enthusiastic)\s+((?:[a-zA-Z]+(?:er|ist|or))|(?:[a-zA-Z]+\s+(?:fan|lover|enthusiast)))', "enthusiast"),
# Sports and physical activities
(r'(?:plays?|played)\s+(tennis|golf|football|soccer|basketball|cricket|rugby|hockey|volleyball|badminton)', "sport"),
(r'(?:runner|cyclist|swimmer|skier|hiker|climber|sailor|surfer|golfer|tennis\s+player)', "athletic"),
(r'(?:marathon|triathlon|cycling|running|swimming|hiking|climbing|sailing|surfing)', "athletic_activity"),
# Creative hobbies
(r'(?:paints?|painting|painter|artist|sculpt(?:s|or|ure)?|photograph(?:y|er)?)', "creative"),
(r'(?:writes?|writing|writer|author|poet|poetry|novelist)', "writing"),
(r'(?:music(?:ian)?|plays?\s+(?:the\s+)?(?:piano|guitar|violin|drums|flute|saxophone))', "music"),
(r'(?:sings?|singer|vocalist|choir)', "music"),
(r'(?:gardening|gardener|gardens?)', "gardening"),
(r'(?:cooking|cook|chef|culinary|baking|baker)', "culinary"),
# Collecting
(r'(?:collects?|collector\s+of|collection\s+of)\s+([^,\.]{5,40})', "collecting"),
# Reading and intellectual
(r'(?:voracious|avid)\s+reader', "reading"),
(r'(?:reads?\s+(?:widely|extensively))', "reading"),
# Travel
(r'(?:travels?|traveled|travelling|traveler|wanderlust)', "travel"),
(r'(?:visited|visits)\s+(?:over\s+)?(\d+)\s+countries', "travel"),
# Other interests
(r'(?:volunteers?|volunteering|volunteer\s+work)\s+(?:at|for|with)?\s*([^,\.]{5,40})?', "volunteering"),
(r'(?:animal\s+lover|pet\s+owner|dog\s+lover|cat\s+lover)', "animals"),
]
for pat, htype in patterns:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip() if m.lastindex and m.group(1) else m.group(0).strip()
if len(val) < 3 or val.lower() in ['the', 'and', 'his', 'her', 'their', 'a', 'an']:
continue
key = (htype, val.lower()[:30])
if key in seen:
continue
seen.add(key)
hobbies.append({
"type": htype,
"activity": val,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return hobbies
def extract_political(text):
"""Extract political affiliations, activism, and civic engagement."""
political = []
seen = set()
patterns = [
# Party affiliations
(r'(?:member|supporter)\s+of\s+(?:the\s+)?([A-Z][a-zA-Z]+\s+[Pp]arty)', "party_member"),
(r'(?:democrat|republican|conservative|liberal|progressive|socialist|green\s+party)', "political_leaning"),
(r'(?:affiliated\s+with|belongs?\s+to)\s+(?:the\s+)?([A-Z][^,\.]{5,40}(?:party|movement))', "affiliation"),
# Activism and causes
(r'(?:activist|activism)\s+(?:for|in)?\s*([^,\.]{5,50})?', "activism"),
(r'(?:advocate(?:s)?|advocacy)\s+(?:for|of)\s+([^,\.]{5,50})', "advocacy"),
(r'(?:campaigns?\s+for|campaigning\s+for)\s+([^,\.]{5,50})', "campaign"),
(r'(?:fights?\s+for|fighting\s+for)\s+([^,\.]{5,50})', "cause"),
# Social causes
(r'(?:climate\s+(?:activist|action|advocacy)|environmental(?:ist)?)', "environmental"),
(r'(?:human\s+rights|civil\s+rights|social\s+justice)', "rights_advocacy"),
(r'(?:feminist|feminism|women\'?s\s+rights)', "feminism"),
(r'(?:LGBTQ?\+?|gay\s+rights|marriage\s+equality)', "lgbtq_rights"),
(r'(?:racial\s+equality|anti-?racism|BLM|Black\s+Lives\s+Matter)', "racial_justice"),
(r'(?:refugee|migrant|immigration)\s+(?:rights|advocacy|support)', "migration_advocacy"),
# Civic engagement
(r'(?:serves?|served)\s+(?:on|in)\s+(?:the\s+)?([A-Z][^,\.]{5,50}(?:council|board|committee))', "civic_service"),
(r'(?:city\s+council|town\s+council|parish\s+council)', "local_politics"),
(r'(?:elected\s+to|ran\s+for|candidate\s+for)\s+([^,\.]{5,50})', "political_candidacy"),
# Political views expressed
(r'(?:outspoken|vocal)\s+(?:critic|supporter)\s+of\s+([^,\.]{5,50})', "political_stance"),
(r'(?:opposes?|opposed\s+to|against)\s+([^,\.]{5,50})', "opposition"),
(r'(?:supports?|in\s+favor\s+of)\s+([^,\.]{5,50})', "support"),
]
for pat, ptype in patterns:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip() if m.lastindex and m.group(1) else m.group(0).strip()
if len(val) < 3:
continue
key = (ptype, val.lower()[:30])
if key in seen:
continue
seen.add(key)
political.append({
"type": ptype,
"topic": val,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return political
def extract_social_media_content(text):
"""Extract claims from social media posts/bios (Instagram, Facebook, TikTok, Twitter/X)."""
content = []
# Social media bio patterns
patterns = [
# Bio/about descriptions
(r'(?:bio|about)[:\s]+["\']?([^"\'\.]{10,200})["\']?', "bio"),
(r'(?:describes?\s+(?:himself|herself|themselves)\s+as)\s+["\']?([^"\'\.]{10,100})["\']?', "self_description"),
# Follower counts (influence indicator)
(r'(\d+(?:,\d+)?(?:\.\d+)?[KkMm]?)\s+(?:followers?|following|subscribers?)', "follower_count"),
# Location from social profiles
(r'(?:based\s+in|located\s+in|from|lives?\s+in)\s+([A-Z][a-zA-Z\s,]+?)(?:\s*[|•]|\s*$)', "social_location"),
# Hashtags used (interests indicator)
(r'(?:frequently\s+(?:uses?|posts?)|often\s+(?:uses?|posts?))\s+(?:hashtags?\s+)?(?:like\s+)?#(\w+)', "hashtag"),
# Content themes
(r'(?:posts?\s+(?:about|on)|shares?\s+(?:content\s+)?(?:about|on))\s+([^,\.]{5,60})', "content_theme"),
# Verified status
(r'(?:verified|blue\s+check|official)\s+(?:account|profile)', "verified_status"),
# Link in bio
(r'(?:link\s+in\s+bio|linktree|linktr\.ee)[:\s]*(https?://[^\s]+)?', "link_in_bio"),
]
for pat, ctype in patterns:
for m in re.finditer(pat, text, re.I):
val = m.group(1).strip() if m.lastindex and m.group(1) else m.group(0).strip()
if len(val) < 3:
continue
content.append({
"type": ctype,
"value": val,
"snippet": text[max(0, m.start()-30):m.end()+30].strip()
})
return content
def extract_memberships(text):
mems = []
for pat, mtype in [(r'member\s+of\s+(?:the\s+)?([A-Z][^,\.]{5,60})', "membership"),
(r'(?:board\s+member|board\s+director)\s+(?:of\s+)?([A-Z][^,\.]{5,60})', "board_member")]:
for m in re.finditer(pat, text, re.I):
mems.append({"type": mtype, "organization": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return mems
def extract_nationalities(text):
nats = []
words = ["Dutch", "German", "French", "British", "American", "Belgian", "Italian", "Spanish",
"Australian", "Canadian", "Japanese", "Chinese", "Brazilian", "Mexican", "Russian"]
pat = r'\b(' + '|'.join(words) + r')\s+(?:art\s+)?(?:historian|curator|professor|director|artist)'
for m in re.finditer(pat, text, re.I):
nats.append({"nationality": m.group(1).strip(),
"snippet": text[max(0,m.start()-20):m.end()+20].strip()})
return nats
def enrich_person(name: str, context: str, api_key: str) -> Dict:
enrichment = {"web_claims": [], "enrichment_metadata": {
"enrichment_timestamp": datetime.now(timezone.utc).isoformat(),
"enrichment_agent": f"enrich_person_comprehensive.py v{SCRIPT_VERSION}",
"person_name": name, "context_used": context[:100] if context else None,
"searches_performed": [], "data_fabrication_check": "PASSED"}}
# Search 1: Biography
q1 = f'"{name}" born biography'
r1 = search_linkup(q1, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q1)
if "error" not in r1:
ans, srcs = r1.get("answer", ""), r1.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r1.get("_meta", {})
if ans:
if (b := extract_birth_year(ans)):
add_claim_if_valid(enrichment["web_claims"], create_claim("birth_year", b["year"], url, title, b["snippet"], q1, srcs, meta, ans, b.get("pattern_type")))
if (l := extract_birth_location(ans)):
add_claim_if_valid(enrichment["web_claims"], create_claim("birth_location", l["location"], url, title, l["snippet"], q1, srcs, meta, ans, "birth_location"))
if (d := extract_death_info(ans)):
add_claim_if_valid(enrichment["web_claims"], create_claim("death_year", d["year"], url, title, d["snippet"], q1, srcs, meta, ans, "death_year"))
for n in extract_nationalities(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("nationality", n["nationality"], url, title, n["snippet"], q1, srcs, meta, ans, "nationality"))
for s in extract_social(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_connection", {"relationship_type": s["relationship_type"], "related_person": s["related_person"]}, url, title, s["snippet"], q1, srcs, meta, ans, s["relationship_type"]))
time.sleep(1.0)
# Search 2: Education/Career
q2 = f'"{name}" {context} education career university'
r2 = search_linkup(q2, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q2)
if "error" not in r2:
ans, srcs = r2.get("answer", ""), r2.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r2.get("_meta", {})
if ans:
for e in extract_education(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("education", {"type": e["type"], "institution": e["institution"], "year": e["year"]}, url, title, e["snippet"], q2, srcs, meta, ans, e["type"]))
for p in extract_positions(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("position", {"title": p["title"], "organization": p["organization"], "year": p["year"]}, url, title, p["snippet"], q2, srcs, meta, ans, "position"))
for m in extract_memberships(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("membership", {"type": m["type"], "organization": m["organization"]}, url, title, m["snippet"], q2, srcs, meta, ans, m["type"]))
for i in extract_interests(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("interest", {"type": i["type"], "topic": i["topic"]}, url, title, i["snippet"], q2, srcs, meta, ans, i["type"]))
time.sleep(1.0)
# Search 3: Publications/Awards
q3 = f'"{name}" publications awards honors books'
r3 = search_linkup(q3, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q3)
if "error" not in r3:
ans, srcs = r3.get("answer", ""), r3.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r3.get("_meta", {})
if ans:
for p in extract_publications(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("publication", {"type": p["type"], "title": p["title"], "year": p["year"]}, url, title, p["snippet"], q3, srcs, meta, ans, p["type"]))
for a in extract_awards(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("award", {"type": a["type"], "name": a["name"]}, url, title, a["snippet"], q3, srcs, meta, ans, a["type"]))
time.sleep(1.0)
# Search 4: Contact/Media
q4 = f'"{name}" contact email twitter linkedin orcid profile photo'
r4 = search_linkup(q4, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q4)
if "error" not in r4:
ans, srcs = r4.get("answer", ""), r4.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r4.get("_meta", {})
if ans:
for c in extract_contacts(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("contact_detail", {"type": c["type"], "value": c["value"]}, url, title, c["snippet"], q4, srcs, meta, ans, c["type"]))
for m in extract_media(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("media_reference", {"type": m["type"], "value": m["value"]}, url, title, m["snippet"], q4, srcs, meta, ans, m["type"]))
time.sleep(1.0)
# Search 5: Academic Profiles (NEW in v1.2.0)
q5 = f'"{name}" researchgate academia.edu google scholar profile'
r5 = search_linkup(q5, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q5)
if "error" not in r5:
ans, srcs = r5.get("answer", ""), r5.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r5.get("_meta", {})
if ans:
for c in extract_contacts(ans):
# Only add academic profile types from this search
if c["type"] in ["researchgate_url", "academia_url", "google_scholar_url"]:
add_claim_if_valid(enrichment["web_claims"], create_claim("contact_detail", {"type": c["type"], "value": c["value"]}, url, title, c["snippet"], q5, srcs, meta, ans, c["type"]))
time.sleep(1.0)
# Search 6: Social Media Profiles (NEW in v1.3.0)
q6 = f'"{name}" instagram facebook tiktok twitter social media profile'
r6 = search_linkup(q6, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q6)
if "error" not in r6:
ans, srcs = r6.get("answer", ""), r6.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r6.get("_meta", {})
if ans:
# Extract social media contacts
for c in extract_contacts(ans):
if c["type"] in ["instagram", "instagram_url", "facebook", "facebook_url",
"tiktok", "tiktok_url", "twitter", "twitter_url",
"youtube_url", "bluesky_url", "mastodon_url", "threads_url"]:
add_claim_if_valid(enrichment["web_claims"], create_claim("contact_detail", {"type": c["type"], "value": c["value"]}, url, title, c["snippet"], q6, srcs, meta, ans, c["type"]))
# Extract social media content (bios, follower counts, etc.)
for sc in extract_social_media_content(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_media_content", {"type": sc["type"], "value": sc["value"]}, url, title, sc["snippet"], q6, srcs, meta, ans, sc["type"]))
# Also extract social connections from social media context
for s in extract_social(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_connection", {"relationship_type": s["relationship_type"], "related_person": s["related_person"]}, url, title, s["snippet"], q6, srcs, meta, ans, s["relationship_type"]))
time.sleep(1.0)
# Search 7: Hobbies, Interests, and Political Affiliations (NEW in v1.3.0)
q7 = f'"{name}" hobbies interests passions politics activism volunteer'
r7 = search_linkup(q7, api_key)
enrichment["enrichment_metadata"]["searches_performed"].append(q7)
if "error" not in r7:
ans, srcs = r7.get("answer", ""), r7.get("sources", [])
url, title = (srcs[0].get("url", ""), srcs[0].get("name", "")) if srcs else ("", "")
meta = r7.get("_meta", {})
if ans:
# Extract hobbies and personal interests
for h in extract_hobbies(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("hobby", {"type": h["type"], "activity": h["activity"]}, url, title, h["snippet"], q7, srcs, meta, ans, h["type"]))
# Extract political affiliations and activism
for p in extract_political(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("political", {"type": p["type"], "topic": p["topic"]}, url, title, p["snippet"], q7, srcs, meta, ans, p["type"]))
# Also extract any social connections mentioned
for s in extract_social(ans):
add_claim_if_valid(enrichment["web_claims"], create_claim("social_connection", {"relationship_type": s["relationship_type"], "related_person": s["related_person"]}, url, title, s["snippet"], q7, srcs, meta, ans, s["relationship_type"]))
return enrichment
def process_ppid_file(filepath: Path, api_key: str, dry_run: bool = False) -> Dict:
with open(filepath) as f:
data = json.load(f)
name_data = data.get("name", {})
full_name = name_data.get("full_name") or name_data.get("display_name", "")
if not full_name or full_name == "LinkedIn Member":
return {"status": "skipped", "reason": "no_valid_name"}
if not data.get("heritage_relevance", {}).get("is_heritage_relevant"):
return {"status": "skipped", "reason": "not_heritage_relevant"}
headline = data.get("profile_data", {}).get("headline", "")
enrichment = enrich_person(full_name, headline, api_key)
if not enrichment["web_claims"]:
if not dry_run:
if "enrichment_history" not in data: data["enrichment_history"] = []
enrichment["enrichment_metadata"]["result"] = "no_claims_found"
data["enrichment_history"].append(enrichment["enrichment_metadata"])
with open(filepath, "w") as f: json.dump(data, f, indent=2, ensure_ascii=False)
return {"status": "no_claims_found", "name": full_name}
if not dry_run:
if "web_claims" not in data: data["web_claims"] = []
existing = {(c.get("claim_type"), str(c.get("claim_value"))) for c in data.get("web_claims", [])}
for claim in enrichment["web_claims"]:
key = (claim["claim_type"], str(claim["claim_value"]))
if key not in existing: data["web_claims"].append(claim)
if "enrichment_history" not in data: data["enrichment_history"] = []
data["enrichment_history"].append(enrichment["enrichment_metadata"])
birth_claims = [c for c in enrichment["web_claims"] if c["claim_type"] == "birth_year"]
if birth_claims:
current = data.get("birth_date", {}).get("edtf", "XXXX")
if current == "XXXX" or current.endswith("X"):
prov = birth_claims[0]["provenance"]
data["birth_date"] = {"edtf": str(birth_claims[0]["claim_value"]), "precision": "year",
"provenance": {k: prov[k] for k in ["statement_created_at", "source_archived_at",
"retrieval_agent", "retrieval_method", "source_url", "source_title",
"source_snippet", "search_query", "extraction_method"] if k in prov}}
data["birth_date"]["provenance"]["verified"] = False
data["birth_date"]["provenance"]["verification_status"] = "machine_extracted"
if [c for c in enrichment["web_claims"] if c["claim_type"] == "death_year"]:
data["is_living"] = False
with open(filepath, "w") as f: json.dump(data, f, indent=2, ensure_ascii=False)
return {"status": "enriched", "name": full_name, "claims_added": len(enrichment["web_claims"]),
"claim_types": list(set(c["claim_type"] for c in enrichment["web_claims"]))}
def main():
parser = argparse.ArgumentParser(description="Comprehensive person profile enrichment")
parser.add_argument("--limit", type=int, default=10)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
try:
api_key = get_linkup_api_key()
print(f"✓ Linkup API key loaded")
except ValueError as e:
print(f"{e}")
return
ppid_dir = Path(__file__).parent.parent / "data" / "person"
if not ppid_dir.exists():
print(f"✗ PPID directory not found")
return
print("Scanning for candidates...")
candidates = []
for f in ppid_dir.glob("ID_*.json"):
try:
with open(f) as fp: data = json.load(fp)
if not data.get("heritage_relevance", {}).get("is_heritage_relevant"): continue
if data.get("enrichment_history"): continue
name = data.get("name", {}).get("full_name", "")
if not name or name == "LinkedIn Member": continue
headline = data.get("profile_data", {}).get("headline", "").lower()
score = 0
if "professor" in headline: score += 3
if "director" in headline: score += 2
if "curator" in headline: score += 2
if "museum" in headline: score += 1
if "archive" in headline: score += 1
candidates.append((f, score, name))
except: continue
candidates.sort(key=lambda x: -x[1])
print(f"Found {len(candidates)} candidates")
stats = {"enriched": 0, "no_claims_found": 0, "skipped": 0, "errors": 0}
results = []
for i, (filepath, score, _) in enumerate(candidates[:args.limit]):
print(f"\n[{i+1}/{args.limit}] {filepath.name} (score={score})")
try:
result = process_ppid_file(filepath, api_key, args.dry_run)
stats[result.get("status", "errors")] = stats.get(result.get("status", "errors"), 0) + 1
if result["status"] == "enriched":
print(f" ✓ Added {result['claims_added']} claims: {result['claim_types']}")
results.append(result)
elif result["status"] == "no_claims_found":
print(f" ✗ No claims found for {result.get('name')}")
time.sleep(4.0)
except Exception as e:
print(f" ✗ Error: {e}")
stats["errors"] += 1
print(f"\n{'='*50}\nSUMMARY\n{'='*50}")
print(f"Enriched: {stats['enriched']}, No claims: {stats['no_claims_found']}, Errors: {stats['errors']}")
if results:
print(f"\nTotal claims added: {sum(r['claims_added'] for r in results)}")
if __name__ == "__main__":
main()