diff --git a/scripts/process_entity_to_ppid.py b/scripts/process_entity_to_ppid.py new file mode 100644 index 0000000000..90b47653c1 --- /dev/null +++ b/scripts/process_entity_to_ppid.py @@ -0,0 +1,441 @@ +#!/usr/bin/env python3 +""" +Process Entity Files to PPID Format + +This script processes LinkedIn entity extractions from data/custodian/person/entity/ +and creates PPID-formatted profiles in data/person/ for heritage-relevant individuals. + +Workflow: +1. Scan entity files for heritage_relevance.is_heritage_relevant == true +2. Check for duplicates against existing data/person/ profiles by LinkedIn slug +3. Generate PPID filename based on inferred locations and dates +4. Create new profile or merge with existing profile + +Rules Applied: +- Rule 12: Person Data Reference Pattern +- Rule 20: Person Entity Profiles - Individual File Storage +- Rule 27: Person-Custodian Data Architecture +- Rule 44: PPID Birth Date Enrichment and EDTF Unknown Date Notation +- Rule 45: Inferred Data Must Be Explicit with Provenance + +Usage: + python scripts/process_entity_to_ppid.py --dry-run # Preview changes + python scripts/process_entity_to_ppid.py --limit 100 # Process first 100 + python scripts/process_entity_to_ppid.py # Process all +""" + +import argparse +import json +import os +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Any +from collections import defaultdict + +# Directories +ENTITY_DIR = Path("data/custodian/person/entity") +PERSON_DIR = Path("data/person") + +# PPID filename pattern: ID_{birth-loc}_{decade}_{work-loc}_{custodian}_{NAME}.json +PPID_PATTERN = re.compile(r"ID_([A-Z]{2}-[A-Z]{2}-[A-Z]{3})_(\d{3}X|\d{4}|XXXX)_([A-Z]{2}-[A-Z]{2}-[A-Z]{3}|XX-XX-XXX)_([A-Z]{4}|XXXX)_(.+)\.json") + + +def get_existing_profiles() -> Dict[str, Path]: + """Build index of existing profiles by LinkedIn slug.""" + profiles = {} + + if not PERSON_DIR.exists(): + return profiles + + for f in PERSON_DIR.glob("*.json"): + try: + with open(f, "r", encoding="utf-8") as fp: + data = json.load(fp) + slug = data.get("linkedin_slug") + if slug: + profiles[slug] = f + except (json.JSONDecodeError, IOError): + continue + + return profiles + + +def extract_linkedin_slug(entity: Dict) -> Optional[str]: + """Extract LinkedIn slug from entity data.""" + # Direct slug + if entity.get("person_id"): + return entity["person_id"] + + # From profile_data.linkedin_url + profile_data = entity.get("profile_data", {}) + linkedin_url = profile_data.get("linkedin_url", "") + + if linkedin_url: + # Extract slug from URL like https://www.linkedin.com/in/john-doe-123abc + match = re.search(r"/in/([^/?\s]+)", linkedin_url) + if match: + return match.group(1) + + return None + + +def infer_birth_decade(entity: Dict) -> Tuple[str, Dict]: + """ + Infer birth decade from education/career data. + Returns (decade_str, provenance_dict) + """ + provenance = { + "method": "earliest_observation_heuristic", + "inference_chain": [], + "confidence": "very_low", + "inferred_at": datetime.now(timezone.utc).isoformat(), + "inferred_by": "process_entity_to_ppid.py" + } + + profile_data = entity.get("profile_data", {}) or {} + education = profile_data.get("education") or [] + experience = profile_data.get("experience") or [] + + earliest_year = None + earliest_source = None + + # Check education dates + for edu in education: + if isinstance(edu, dict): + dates = edu.get("dates", "") or "" + years = re.findall(r"(19\d{2}|20[0-2]\d)", str(dates)) + for y in years: + year = int(y) + if earliest_year is None or year < earliest_year: + earliest_year = year + earliest_source = f"education: {edu.get('school', 'unknown')}" + + # Check experience dates + for exp in experience: + if isinstance(exp, dict): + dates = exp.get("dates", "") or exp.get("duration", "") + years = re.findall(r"(19\d{2}|20[0-2]\d)", str(dates)) + for y in years: + year = int(y) + if earliest_year is None or year < earliest_year: + earliest_year = year + earliest_source = f"experience: {exp.get('title', 'unknown')}" + + if earliest_year is None: + return "XXXX", {"method": "none_found", "confidence": "none"} + + # Assume university entry at 18-22, career start at 22-26 + if "education" in earliest_source: + assumed_age = 20 + provenance["inference_chain"].append({ + "step": 1, + "observation": f"Earliest education year: {earliest_year}", + "source_field": earliest_source + }) + provenance["inference_chain"].append({ + "step": 2, + "assumption": f"University/education entry at age ~{assumed_age}", + "rationale": "Standard education entry age" + }) + else: + assumed_age = 24 + provenance["inference_chain"].append({ + "step": 1, + "observation": f"Earliest career year: {earliest_year}", + "source_field": earliest_source + }) + provenance["inference_chain"].append({ + "step": 2, + "assumption": f"Career start at age ~{assumed_age}", + "rationale": "Standard career entry age" + }) + + birth_year = earliest_year - assumed_age + decade = (birth_year // 10) * 10 + decade_str = f"{decade // 10}X" # e.g., "197X" + + provenance["inference_chain"].append({ + "step": 3, + "calculation": f"{earliest_year} - {assumed_age} = {birth_year}", + "result": f"Birth year ~{birth_year}" + }) + provenance["inference_chain"].append({ + "step": 4, + "generalization": "Round to decade", + "result": decade_str + }) + provenance["confidence"] = "low" + + return decade_str, provenance + + +def infer_location_code(entity: Dict) -> str: + """Infer location code for PPID. Returns XX-XX-XXX if unknown.""" + profile_data = entity.get("profile_data", {}) + location = profile_data.get("location", "") + + if not location: + return "XX-XX-XXX" + + # Simple heuristic for Netherlands + location_lower = location.lower() + + nl_cities = { + "amsterdam": "NL-NH-AMS", + "rotterdam": "NL-ZH-ROT", + "the hague": "NL-ZH-DHA", + "den haag": "NL-ZH-DHA", + "utrecht": "NL-UT-UTR", + "eindhoven": "NL-NB-EIN", + "groningen": "NL-GR-GRO", + "leiden": "NL-ZH-LEI", + "maastricht": "NL-LI-MAA", + "nijmegen": "NL-GE-NIJ", + "tilburg": "NL-NB-TIL", + "delft": "NL-ZH-DEL", + "haarlem": "NL-NH-HAA", + "arnhem": "NL-GE-ARN", + "breda": "NL-NB-BRE", + "apeldoorn": "NL-GE-APE", + "zwolle": "NL-OV-ZWO", + "almere": "NL-FL-ALM", + "lelystad": "NL-FL-LEL", + } + + for city, code in nl_cities.items(): + if city in location_lower: + return code + + # Check country + if "netherlands" in location_lower or "nederland" in location_lower: + return "NL-XX-XXX" + + return "XX-XX-XXX" + + +def generate_ppid_filename(entity: Dict) -> str: + """Generate PPID filename for an entity.""" + profile_data = entity.get("profile_data", {}) + name = profile_data.get("name", "UNKNOWN") + + # Normalize name for filename + name_slug = re.sub(r"[^a-zA-Z\s-]", "", name) + name_slug = "-".join(name_slug.upper().split()) + if not name_slug: + name_slug = "UNKNOWN" + + # Get location code + location_code = infer_location_code(entity) + + # Get birth decade + decade, _ = infer_birth_decade(entity) + + # Work location (same as profile location for now) + work_location = location_code + + # Custodian code (XXXX for unknown) + custodian_code = "XXXX" + affiliations = entity.get("affiliations", []) + if affiliations and isinstance(affiliations[0], dict): + # Could derive from custodian GHCID in future + pass + + return f"ID_{location_code}_{decade}_{work_location}_{custodian_code}_{name_slug}.json" + + +def convert_entity_to_ppid(entity: Dict, existing_profile: Optional[Dict] = None) -> Dict: + """Convert entity format to PPID format, optionally merging with existing profile.""" + profile_data = entity.get("profile_data", {}) + extraction_metadata = entity.get("extraction_metadata", {}) + + # Start with existing profile or empty template + ppid = existing_profile.copy() if existing_profile else {} + + # Core fields + ppid["name"] = profile_data.get("name") or ppid.get("name") + ppid["linkedin_slug"] = extract_linkedin_slug(entity) or ppid.get("linkedin_slug") + + # Birth date inference + decade, decade_prov = infer_birth_decade(entity) + if decade != "XXXX": + ppid["inferred_birth_decade"] = { + "value": decade, + "edtf": decade, + "inference_provenance": decade_prov + } + + # Heritage relevance (keep from entity) + hr = entity.get("heritage_relevance", {}) + ppid["heritage_relevance"] = { + "is_heritage_relevant": hr.get("is_heritage_relevant", False), + "heritage_types": hr.get("heritage_types", []), + "rationale": hr.get("rationale") + } + + # Profile data + ppid["profile_data"] = ppid.get("profile_data", {}) + if profile_data.get("headline"): + ppid["profile_data"]["headline"] = profile_data["headline"] + if profile_data.get("location"): + ppid["profile_data"]["location"] = profile_data["location"] + if profile_data.get("education"): + ppid["profile_data"]["education"] = profile_data["education"] + if profile_data.get("experience"): + ppid["profile_data"]["experience"] = profile_data["experience"] + + # Affiliations + affiliations = entity.get("affiliations", []) + if affiliations: + ppid["affiliations"] = ppid.get("affiliations", []) + for aff in affiliations: + # Check if already exists + existing_slugs = [a.get("custodian_slug") for a in ppid["affiliations"]] + if aff.get("custodian_slug") not in existing_slugs: + ppid["affiliations"].append(aff) + + # Web claims (merge with existing) + web_claims = entity.get("web_claims", []) + if web_claims: + ppid["web_claims"] = ppid.get("web_claims", []) + for claim in web_claims: + ppid["web_claims"].append(claim) + + # Extraction provenance + ppid["extraction_provenance"] = ppid.get("extraction_provenance", {}) + ppid["extraction_provenance"]["source_files"] = ppid["extraction_provenance"].get("source_files", []) + source_file = extraction_metadata.get("source_file") + if source_file and source_file not in ppid["extraction_provenance"]["source_files"]: + ppid["extraction_provenance"]["source_files"].append(source_file) + ppid["extraction_provenance"]["modified_at"] = datetime.now(timezone.utc).isoformat() + ppid["extraction_provenance"]["modified_by"] = "process_entity_to_ppid.py" + + return ppid + + +def process_entities(dry_run: bool = True, limit: Optional[int] = None, verbose: bool = False): + """Process entity files and create/update PPID profiles.""" + if not ENTITY_DIR.exists(): + print(f"Entity directory not found: {ENTITY_DIR}") + return + + # Build existing profile index + print("Building existing profile index...") + existing_profiles = get_existing_profiles() + print(f"Found {len(existing_profiles)} existing profiles") + + # Stats + stats = { + "total_scanned": 0, + "heritage_relevant": 0, + "already_exists": 0, + "new_profiles": 0, + "updated_profiles": 0, + "errors": 0 + } + + # Scan entity files + entity_files = list(ENTITY_DIR.glob("*.json")) + if limit: + entity_files = entity_files[:limit] + + print(f"Scanning {len(entity_files)} entity files...") + + for i, entity_file in enumerate(entity_files): + if i > 0 and i % 1000 == 0: + print(f" Processed {i}/{len(entity_files)}...") + + stats["total_scanned"] += 1 + + try: + with open(entity_file, "r", encoding="utf-8") as fp: + entity = json.load(fp) + except (json.JSONDecodeError, IOError) as e: + stats["errors"] += 1 + if verbose: + print(f" Error reading {entity_file.name}: {e}") + continue + + # Check heritage relevance + hr = entity.get("heritage_relevance", {}) + if not hr.get("is_heritage_relevant"): + continue + + stats["heritage_relevant"] += 1 + + # Get LinkedIn slug + slug = extract_linkedin_slug(entity) + if not slug: + if verbose: + print(f" No LinkedIn slug for {entity_file.name}") + continue + + # Check for existing profile + existing_path = existing_profiles.get(slug) + existing_profile = None + + if existing_path: + stats["already_exists"] += 1 + try: + with open(existing_path, "r", encoding="utf-8") as fp: + existing_profile = json.load(fp) + except: + existing_profile = None + + # Convert to PPID format + ppid = convert_entity_to_ppid(entity, existing_profile) + + # Generate filename + if existing_path: + output_path = existing_path + stats["updated_profiles"] += 1 + else: + filename = generate_ppid_filename(entity) + output_path = PERSON_DIR / filename + stats["new_profiles"] += 1 + + if verbose: + action = "UPDATE" if existing_path else "CREATE" + print(f" {action}: {output_path.name}") + + if not dry_run: + # Ensure directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, "w", encoding="utf-8") as fp: + json.dump(ppid, fp, indent=2, ensure_ascii=False) + + # Print summary + print("\n" + "=" * 60) + print("PROCESSING SUMMARY") + print("=" * 60) + print(f"Total scanned: {stats['total_scanned']:,}") + print(f"Heritage relevant: {stats['heritage_relevant']:,}") + print(f"Already exists: {stats['already_exists']:,}") + print(f"New profiles: {stats['new_profiles']:,}") + print(f"Updated profiles: {stats['updated_profiles']:,}") + print(f"Errors: {stats['errors']:,}") + print("=" * 60) + + if dry_run: + print("\n** DRY RUN - No files were written **") + print("Run without --dry-run to apply changes") + + +def main(): + parser = argparse.ArgumentParser(description="Process entity files to PPID format") + parser.add_argument("--dry-run", action="store_true", help="Preview changes without writing") + parser.add_argument("--limit", type=int, help="Limit number of files to process") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + + args = parser.parse_args() + + process_entities( + dry_run=args.dry_run, + limit=args.limit, + verbose=args.verbose + ) + + +if __name__ == "__main__": + main()