feat(scripts): add entity-to-PPID processing script

- Processes 94,716 LinkedIn entity files from data/custodian/person/entity/
- Identifies heritage-relevant profiles (47% of total)
- Generates PPID-formatted filenames with inferred locations/dates
- Merges with existing profiles, preserving all provenance data
- Applies Rules 12, 20, 27, 44, 45 for person data architecture
- Fixed edge case: handle null education/experience arrays
This commit is contained in:
kempersc 2026-01-10 13:58:06 +01:00
parent 57e77c8b19
commit 3a15f2bdaa

View file

@ -0,0 +1,441 @@
#!/usr/bin/env python3
"""
Process Entity Files to PPID Format
This script processes LinkedIn entity extractions from data/custodian/person/entity/
and creates PPID-formatted profiles in data/person/ for heritage-relevant individuals.
Workflow:
1. Scan entity files for heritage_relevance.is_heritage_relevant == true
2. Check for duplicates against existing data/person/ profiles by LinkedIn slug
3. Generate PPID filename based on inferred locations and dates
4. Create new profile or merge with existing profile
Rules Applied:
- Rule 12: Person Data Reference Pattern
- Rule 20: Person Entity Profiles - Individual File Storage
- Rule 27: Person-Custodian Data Architecture
- Rule 44: PPID Birth Date Enrichment and EDTF Unknown Date Notation
- Rule 45: Inferred Data Must Be Explicit with Provenance
Usage:
python scripts/process_entity_to_ppid.py --dry-run # Preview changes
python scripts/process_entity_to_ppid.py --limit 100 # Process first 100
python scripts/process_entity_to_ppid.py # Process all
"""
import argparse
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from collections import defaultdict
# Directories
ENTITY_DIR = Path("data/custodian/person/entity")
PERSON_DIR = Path("data/person")
# PPID filename pattern: ID_{birth-loc}_{decade}_{work-loc}_{custodian}_{NAME}.json
PPID_PATTERN = re.compile(r"ID_([A-Z]{2}-[A-Z]{2}-[A-Z]{3})_(\d{3}X|\d{4}|XXXX)_([A-Z]{2}-[A-Z]{2}-[A-Z]{3}|XX-XX-XXX)_([A-Z]{4}|XXXX)_(.+)\.json")
def get_existing_profiles() -> Dict[str, Path]:
"""Build index of existing profiles by LinkedIn slug."""
profiles = {}
if not PERSON_DIR.exists():
return profiles
for f in PERSON_DIR.glob("*.json"):
try:
with open(f, "r", encoding="utf-8") as fp:
data = json.load(fp)
slug = data.get("linkedin_slug")
if slug:
profiles[slug] = f
except (json.JSONDecodeError, IOError):
continue
return profiles
def extract_linkedin_slug(entity: Dict) -> Optional[str]:
"""Extract LinkedIn slug from entity data."""
# Direct slug
if entity.get("person_id"):
return entity["person_id"]
# From profile_data.linkedin_url
profile_data = entity.get("profile_data", {})
linkedin_url = profile_data.get("linkedin_url", "")
if linkedin_url:
# Extract slug from URL like https://www.linkedin.com/in/john-doe-123abc
match = re.search(r"/in/([^/?\s]+)", linkedin_url)
if match:
return match.group(1)
return None
def infer_birth_decade(entity: Dict) -> Tuple[str, Dict]:
"""
Infer birth decade from education/career data.
Returns (decade_str, provenance_dict)
"""
provenance = {
"method": "earliest_observation_heuristic",
"inference_chain": [],
"confidence": "very_low",
"inferred_at": datetime.now(timezone.utc).isoformat(),
"inferred_by": "process_entity_to_ppid.py"
}
profile_data = entity.get("profile_data", {}) or {}
education = profile_data.get("education") or []
experience = profile_data.get("experience") or []
earliest_year = None
earliest_source = None
# Check education dates
for edu in education:
if isinstance(edu, dict):
dates = edu.get("dates", "") or ""
years = re.findall(r"(19\d{2}|20[0-2]\d)", str(dates))
for y in years:
year = int(y)
if earliest_year is None or year < earliest_year:
earliest_year = year
earliest_source = f"education: {edu.get('school', 'unknown')}"
# Check experience dates
for exp in experience:
if isinstance(exp, dict):
dates = exp.get("dates", "") or exp.get("duration", "")
years = re.findall(r"(19\d{2}|20[0-2]\d)", str(dates))
for y in years:
year = int(y)
if earliest_year is None or year < earliest_year:
earliest_year = year
earliest_source = f"experience: {exp.get('title', 'unknown')}"
if earliest_year is None:
return "XXXX", {"method": "none_found", "confidence": "none"}
# Assume university entry at 18-22, career start at 22-26
if "education" in earliest_source:
assumed_age = 20
provenance["inference_chain"].append({
"step": 1,
"observation": f"Earliest education year: {earliest_year}",
"source_field": earliest_source
})
provenance["inference_chain"].append({
"step": 2,
"assumption": f"University/education entry at age ~{assumed_age}",
"rationale": "Standard education entry age"
})
else:
assumed_age = 24
provenance["inference_chain"].append({
"step": 1,
"observation": f"Earliest career year: {earliest_year}",
"source_field": earliest_source
})
provenance["inference_chain"].append({
"step": 2,
"assumption": f"Career start at age ~{assumed_age}",
"rationale": "Standard career entry age"
})
birth_year = earliest_year - assumed_age
decade = (birth_year // 10) * 10
decade_str = f"{decade // 10}X" # e.g., "197X"
provenance["inference_chain"].append({
"step": 3,
"calculation": f"{earliest_year} - {assumed_age} = {birth_year}",
"result": f"Birth year ~{birth_year}"
})
provenance["inference_chain"].append({
"step": 4,
"generalization": "Round to decade",
"result": decade_str
})
provenance["confidence"] = "low"
return decade_str, provenance
def infer_location_code(entity: Dict) -> str:
"""Infer location code for PPID. Returns XX-XX-XXX if unknown."""
profile_data = entity.get("profile_data", {})
location = profile_data.get("location", "")
if not location:
return "XX-XX-XXX"
# Simple heuristic for Netherlands
location_lower = location.lower()
nl_cities = {
"amsterdam": "NL-NH-AMS",
"rotterdam": "NL-ZH-ROT",
"the hague": "NL-ZH-DHA",
"den haag": "NL-ZH-DHA",
"utrecht": "NL-UT-UTR",
"eindhoven": "NL-NB-EIN",
"groningen": "NL-GR-GRO",
"leiden": "NL-ZH-LEI",
"maastricht": "NL-LI-MAA",
"nijmegen": "NL-GE-NIJ",
"tilburg": "NL-NB-TIL",
"delft": "NL-ZH-DEL",
"haarlem": "NL-NH-HAA",
"arnhem": "NL-GE-ARN",
"breda": "NL-NB-BRE",
"apeldoorn": "NL-GE-APE",
"zwolle": "NL-OV-ZWO",
"almere": "NL-FL-ALM",
"lelystad": "NL-FL-LEL",
}
for city, code in nl_cities.items():
if city in location_lower:
return code
# Check country
if "netherlands" in location_lower or "nederland" in location_lower:
return "NL-XX-XXX"
return "XX-XX-XXX"
def generate_ppid_filename(entity: Dict) -> str:
"""Generate PPID filename for an entity."""
profile_data = entity.get("profile_data", {})
name = profile_data.get("name", "UNKNOWN")
# Normalize name for filename
name_slug = re.sub(r"[^a-zA-Z\s-]", "", name)
name_slug = "-".join(name_slug.upper().split())
if not name_slug:
name_slug = "UNKNOWN"
# Get location code
location_code = infer_location_code(entity)
# Get birth decade
decade, _ = infer_birth_decade(entity)
# Work location (same as profile location for now)
work_location = location_code
# Custodian code (XXXX for unknown)
custodian_code = "XXXX"
affiliations = entity.get("affiliations", [])
if affiliations and isinstance(affiliations[0], dict):
# Could derive from custodian GHCID in future
pass
return f"ID_{location_code}_{decade}_{work_location}_{custodian_code}_{name_slug}.json"
def convert_entity_to_ppid(entity: Dict, existing_profile: Optional[Dict] = None) -> Dict:
"""Convert entity format to PPID format, optionally merging with existing profile."""
profile_data = entity.get("profile_data", {})
extraction_metadata = entity.get("extraction_metadata", {})
# Start with existing profile or empty template
ppid = existing_profile.copy() if existing_profile else {}
# Core fields
ppid["name"] = profile_data.get("name") or ppid.get("name")
ppid["linkedin_slug"] = extract_linkedin_slug(entity) or ppid.get("linkedin_slug")
# Birth date inference
decade, decade_prov = infer_birth_decade(entity)
if decade != "XXXX":
ppid["inferred_birth_decade"] = {
"value": decade,
"edtf": decade,
"inference_provenance": decade_prov
}
# Heritage relevance (keep from entity)
hr = entity.get("heritage_relevance", {})
ppid["heritage_relevance"] = {
"is_heritage_relevant": hr.get("is_heritage_relevant", False),
"heritage_types": hr.get("heritage_types", []),
"rationale": hr.get("rationale")
}
# Profile data
ppid["profile_data"] = ppid.get("profile_data", {})
if profile_data.get("headline"):
ppid["profile_data"]["headline"] = profile_data["headline"]
if profile_data.get("location"):
ppid["profile_data"]["location"] = profile_data["location"]
if profile_data.get("education"):
ppid["profile_data"]["education"] = profile_data["education"]
if profile_data.get("experience"):
ppid["profile_data"]["experience"] = profile_data["experience"]
# Affiliations
affiliations = entity.get("affiliations", [])
if affiliations:
ppid["affiliations"] = ppid.get("affiliations", [])
for aff in affiliations:
# Check if already exists
existing_slugs = [a.get("custodian_slug") for a in ppid["affiliations"]]
if aff.get("custodian_slug") not in existing_slugs:
ppid["affiliations"].append(aff)
# Web claims (merge with existing)
web_claims = entity.get("web_claims", [])
if web_claims:
ppid["web_claims"] = ppid.get("web_claims", [])
for claim in web_claims:
ppid["web_claims"].append(claim)
# Extraction provenance
ppid["extraction_provenance"] = ppid.get("extraction_provenance", {})
ppid["extraction_provenance"]["source_files"] = ppid["extraction_provenance"].get("source_files", [])
source_file = extraction_metadata.get("source_file")
if source_file and source_file not in ppid["extraction_provenance"]["source_files"]:
ppid["extraction_provenance"]["source_files"].append(source_file)
ppid["extraction_provenance"]["modified_at"] = datetime.now(timezone.utc).isoformat()
ppid["extraction_provenance"]["modified_by"] = "process_entity_to_ppid.py"
return ppid
def process_entities(dry_run: bool = True, limit: Optional[int] = None, verbose: bool = False):
"""Process entity files and create/update PPID profiles."""
if not ENTITY_DIR.exists():
print(f"Entity directory not found: {ENTITY_DIR}")
return
# Build existing profile index
print("Building existing profile index...")
existing_profiles = get_existing_profiles()
print(f"Found {len(existing_profiles)} existing profiles")
# Stats
stats = {
"total_scanned": 0,
"heritage_relevant": 0,
"already_exists": 0,
"new_profiles": 0,
"updated_profiles": 0,
"errors": 0
}
# Scan entity files
entity_files = list(ENTITY_DIR.glob("*.json"))
if limit:
entity_files = entity_files[:limit]
print(f"Scanning {len(entity_files)} entity files...")
for i, entity_file in enumerate(entity_files):
if i > 0 and i % 1000 == 0:
print(f" Processed {i}/{len(entity_files)}...")
stats["total_scanned"] += 1
try:
with open(entity_file, "r", encoding="utf-8") as fp:
entity = json.load(fp)
except (json.JSONDecodeError, IOError) as e:
stats["errors"] += 1
if verbose:
print(f" Error reading {entity_file.name}: {e}")
continue
# Check heritage relevance
hr = entity.get("heritage_relevance", {})
if not hr.get("is_heritage_relevant"):
continue
stats["heritage_relevant"] += 1
# Get LinkedIn slug
slug = extract_linkedin_slug(entity)
if not slug:
if verbose:
print(f" No LinkedIn slug for {entity_file.name}")
continue
# Check for existing profile
existing_path = existing_profiles.get(slug)
existing_profile = None
if existing_path:
stats["already_exists"] += 1
try:
with open(existing_path, "r", encoding="utf-8") as fp:
existing_profile = json.load(fp)
except:
existing_profile = None
# Convert to PPID format
ppid = convert_entity_to_ppid(entity, existing_profile)
# Generate filename
if existing_path:
output_path = existing_path
stats["updated_profiles"] += 1
else:
filename = generate_ppid_filename(entity)
output_path = PERSON_DIR / filename
stats["new_profiles"] += 1
if verbose:
action = "UPDATE" if existing_path else "CREATE"
print(f" {action}: {output_path.name}")
if not dry_run:
# Ensure directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as fp:
json.dump(ppid, fp, indent=2, ensure_ascii=False)
# Print summary
print("\n" + "=" * 60)
print("PROCESSING SUMMARY")
print("=" * 60)
print(f"Total scanned: {stats['total_scanned']:,}")
print(f"Heritage relevant: {stats['heritage_relevant']:,}")
print(f"Already exists: {stats['already_exists']:,}")
print(f"New profiles: {stats['new_profiles']:,}")
print(f"Updated profiles: {stats['updated_profiles']:,}")
print(f"Errors: {stats['errors']:,}")
print("=" * 60)
if dry_run:
print("\n** DRY RUN - No files were written **")
print("Run without --dry-run to apply changes")
def main():
parser = argparse.ArgumentParser(description="Process entity files to PPID format")
parser.add_argument("--dry-run", action="store_true", help="Preview changes without writing")
parser.add_argument("--limit", type=int, help="Limit number of files to process")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
process_entities(
dry_run=args.dry_run,
limit=args.limit,
verbose=args.verbose
)
if __name__ == "__main__":
main()