#!/usr/bin/env python3 """ Process Entity Files to PPID Format This script processes LinkedIn entity extractions from data/custodian/person/entity/ and creates PPID-formatted profiles in data/person/ for heritage-relevant individuals. Workflow: 1. Scan entity files for heritage_relevance.is_heritage_relevant == true 2. Check for duplicates against existing data/person/ profiles by LinkedIn slug 3. Generate PPID filename based on inferred locations and dates 4. Create new profile or merge with existing profile Rules Applied: - Rule 12: Person Data Reference Pattern - Rule 20: Person Entity Profiles - Individual File Storage - Rule 27: Person-Custodian Data Architecture - Rule 44: PPID Birth Date Enrichment and EDTF Unknown Date Notation - Rule 45: Inferred Data Must Be Explicit with Provenance Usage: python scripts/process_entity_to_ppid.py --dry-run # Preview changes python scripts/process_entity_to_ppid.py --limit 100 # Process first 100 python scripts/process_entity_to_ppid.py # Process all """ import argparse import json import os import re from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from collections import defaultdict # Directories ENTITY_DIR = Path("data/custodian/person/entity") PERSON_DIR = Path("data/person") # PPID filename pattern: ID_{birth-loc}_{decade}_{work-loc}_{custodian}_{NAME}.json PPID_PATTERN = re.compile(r"ID_([A-Z]{2}-[A-Z]{2}-[A-Z]{3})_(\d{3}X|\d{4}|XXXX)_([A-Z]{2}-[A-Z]{2}-[A-Z]{3}|XX-XX-XXX)_([A-Z]{4}|XXXX)_(.+)\.json") def get_existing_profiles() -> Dict[str, Path]: """Build index of existing profiles by LinkedIn slug.""" profiles = {} if not PERSON_DIR.exists(): return profiles for f in PERSON_DIR.glob("*.json"): try: with open(f, "r", encoding="utf-8") as fp: data = json.load(fp) slug = data.get("linkedin_slug") if slug: profiles[slug] = f except (json.JSONDecodeError, IOError): continue return profiles def extract_linkedin_slug(entity: Dict) -> Optional[str]: """Extract LinkedIn slug from entity data.""" # Direct slug if entity.get("person_id"): return entity["person_id"] # From profile_data.linkedin_url profile_data = entity.get("profile_data", {}) linkedin_url = profile_data.get("linkedin_url", "") if linkedin_url: # Extract slug from URL like https://www.linkedin.com/in/john-doe-123abc match = re.search(r"/in/([^/?\s]+)", linkedin_url) if match: return match.group(1) return None def infer_birth_decade(entity: Dict) -> Tuple[str, Dict]: """ Infer birth decade from education/career data. Returns (decade_str, provenance_dict) """ provenance = { "method": "earliest_observation_heuristic", "inference_chain": [], "confidence": "very_low", "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "process_entity_to_ppid.py" } profile_data = entity.get("profile_data", {}) or {} education = profile_data.get("education") or [] experience = profile_data.get("experience") or [] earliest_year = None earliest_source = None # Check education dates for edu in education: if isinstance(edu, dict): dates = edu.get("dates", "") or "" years = re.findall(r"(19\d{2}|20[0-2]\d)", str(dates)) for y in years: year = int(y) if earliest_year is None or year < earliest_year: earliest_year = year earliest_source = f"education: {edu.get('school', 'unknown')}" # Check experience dates for exp in experience: if isinstance(exp, dict): dates = exp.get("dates", "") or exp.get("duration", "") years = re.findall(r"(19\d{2}|20[0-2]\d)", str(dates)) for y in years: year = int(y) if earliest_year is None or year < earliest_year: earliest_year = year earliest_source = f"experience: {exp.get('title', 'unknown')}" if earliest_year is None: return "XXXX", {"method": "none_found", "confidence": "none"} # Assume university entry at 18-22, career start at 22-26 if "education" in earliest_source: assumed_age = 20 provenance["inference_chain"].append({ "step": 1, "observation": f"Earliest education year: {earliest_year}", "source_field": earliest_source }) provenance["inference_chain"].append({ "step": 2, "assumption": f"University/education entry at age ~{assumed_age}", "rationale": "Standard education entry age" }) else: assumed_age = 24 provenance["inference_chain"].append({ "step": 1, "observation": f"Earliest career year: {earliest_year}", "source_field": earliest_source }) provenance["inference_chain"].append({ "step": 2, "assumption": f"Career start at age ~{assumed_age}", "rationale": "Standard career entry age" }) birth_year = earliest_year - assumed_age decade = (birth_year // 10) * 10 decade_str = f"{decade // 10}X" # e.g., "197X" provenance["inference_chain"].append({ "step": 3, "calculation": f"{earliest_year} - {assumed_age} = {birth_year}", "result": f"Birth year ~{birth_year}" }) provenance["inference_chain"].append({ "step": 4, "generalization": "Round to decade", "result": decade_str }) provenance["confidence"] = "low" return decade_str, provenance def infer_location_code(entity: Dict) -> str: """Infer location code for PPID. Returns XX-XX-XXX if unknown.""" profile_data = entity.get("profile_data", {}) location = profile_data.get("location", "") if not location: return "XX-XX-XXX" # Simple heuristic for Netherlands location_lower = location.lower() nl_cities = { "amsterdam": "NL-NH-AMS", "rotterdam": "NL-ZH-ROT", "the hague": "NL-ZH-DHA", "den haag": "NL-ZH-DHA", "utrecht": "NL-UT-UTR", "eindhoven": "NL-NB-EIN", "groningen": "NL-GR-GRO", "leiden": "NL-ZH-LEI", "maastricht": "NL-LI-MAA", "nijmegen": "NL-GE-NIJ", "tilburg": "NL-NB-TIL", "delft": "NL-ZH-DEL", "haarlem": "NL-NH-HAA", "arnhem": "NL-GE-ARN", "breda": "NL-NB-BRE", "apeldoorn": "NL-GE-APE", "zwolle": "NL-OV-ZWO", "almere": "NL-FL-ALM", "lelystad": "NL-FL-LEL", } for city, code in nl_cities.items(): if city in location_lower: return code # Check country if "netherlands" in location_lower or "nederland" in location_lower: return "NL-XX-XXX" return "XX-XX-XXX" def generate_ppid_filename(entity: Dict) -> str: """Generate PPID filename for an entity.""" profile_data = entity.get("profile_data", {}) name = profile_data.get("name", "UNKNOWN") # Normalize name for filename name_slug = re.sub(r"[^a-zA-Z\s-]", "", name) name_slug = "-".join(name_slug.upper().split()) if not name_slug: name_slug = "UNKNOWN" # Get location code location_code = infer_location_code(entity) # Get birth decade decade, _ = infer_birth_decade(entity) # Work location (same as profile location for now) work_location = location_code # Custodian code (XXXX for unknown) custodian_code = "XXXX" affiliations = entity.get("affiliations", []) if affiliations and isinstance(affiliations[0], dict): # Could derive from custodian GHCID in future pass return f"ID_{location_code}_{decade}_{work_location}_{custodian_code}_{name_slug}.json" def convert_entity_to_ppid(entity: Dict, existing_profile: Optional[Dict] = None) -> Dict: """Convert entity format to PPID format, optionally merging with existing profile.""" profile_data = entity.get("profile_data", {}) extraction_metadata = entity.get("extraction_metadata", {}) # Start with existing profile or empty template ppid = existing_profile.copy() if existing_profile else {} # Core fields ppid["name"] = profile_data.get("name") or ppid.get("name") ppid["linkedin_slug"] = extract_linkedin_slug(entity) or ppid.get("linkedin_slug") # Birth date inference decade, decade_prov = infer_birth_decade(entity) if decade != "XXXX": ppid["inferred_birth_decade"] = { "value": decade, "edtf": decade, "inference_provenance": decade_prov } # Heritage relevance (keep from entity) hr = entity.get("heritage_relevance", {}) ppid["heritage_relevance"] = { "is_heritage_relevant": hr.get("is_heritage_relevant", False), "heritage_types": hr.get("heritage_types", []), "rationale": hr.get("rationale") } # Profile data ppid["profile_data"] = ppid.get("profile_data", {}) if profile_data.get("headline"): ppid["profile_data"]["headline"] = profile_data["headline"] if profile_data.get("location"): ppid["profile_data"]["location"] = profile_data["location"] if profile_data.get("education"): ppid["profile_data"]["education"] = profile_data["education"] if profile_data.get("experience"): ppid["profile_data"]["experience"] = profile_data["experience"] # Affiliations affiliations = entity.get("affiliations", []) if affiliations: ppid["affiliations"] = ppid.get("affiliations", []) for aff in affiliations: # Check if already exists existing_slugs = [a.get("custodian_slug") for a in ppid["affiliations"]] if aff.get("custodian_slug") not in existing_slugs: ppid["affiliations"].append(aff) # Web claims (merge with existing) web_claims = entity.get("web_claims", []) if web_claims: ppid["web_claims"] = ppid.get("web_claims", []) for claim in web_claims: ppid["web_claims"].append(claim) # Extraction provenance ppid["extraction_provenance"] = ppid.get("extraction_provenance", {}) ppid["extraction_provenance"]["source_files"] = ppid["extraction_provenance"].get("source_files", []) source_file = extraction_metadata.get("source_file") if source_file and source_file not in ppid["extraction_provenance"]["source_files"]: ppid["extraction_provenance"]["source_files"].append(source_file) ppid["extraction_provenance"]["modified_at"] = datetime.now(timezone.utc).isoformat() ppid["extraction_provenance"]["modified_by"] = "process_entity_to_ppid.py" return ppid def process_entities(dry_run: bool = True, limit: Optional[int] = None, verbose: bool = False): """Process entity files and create/update PPID profiles.""" if not ENTITY_DIR.exists(): print(f"Entity directory not found: {ENTITY_DIR}") return # Build existing profile index print("Building existing profile index...") existing_profiles = get_existing_profiles() print(f"Found {len(existing_profiles)} existing profiles") # Stats stats = { "total_scanned": 0, "heritage_relevant": 0, "already_exists": 0, "new_profiles": 0, "updated_profiles": 0, "errors": 0 } # Scan entity files entity_files = list(ENTITY_DIR.glob("*.json")) if limit: entity_files = entity_files[:limit] print(f"Scanning {len(entity_files)} entity files...") for i, entity_file in enumerate(entity_files): if i > 0 and i % 1000 == 0: print(f" Processed {i}/{len(entity_files)}...") stats["total_scanned"] += 1 try: with open(entity_file, "r", encoding="utf-8") as fp: entity = json.load(fp) except (json.JSONDecodeError, IOError) as e: stats["errors"] += 1 if verbose: print(f" Error reading {entity_file.name}: {e}") continue # Check heritage relevance hr = entity.get("heritage_relevance", {}) if not hr.get("is_heritage_relevant"): continue stats["heritage_relevant"] += 1 # Get LinkedIn slug slug = extract_linkedin_slug(entity) if not slug: if verbose: print(f" No LinkedIn slug for {entity_file.name}") continue # Check for existing profile existing_path = existing_profiles.get(slug) existing_profile = None if existing_path: stats["already_exists"] += 1 try: with open(existing_path, "r", encoding="utf-8") as fp: existing_profile = json.load(fp) except: existing_profile = None # Convert to PPID format ppid = convert_entity_to_ppid(entity, existing_profile) # Generate filename if existing_path: output_path = existing_path stats["updated_profiles"] += 1 else: filename = generate_ppid_filename(entity) output_path = PERSON_DIR / filename stats["new_profiles"] += 1 if verbose: action = "UPDATE" if existing_path else "CREATE" print(f" {action}: {output_path.name}") if not dry_run: # Ensure directory exists output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", encoding="utf-8") as fp: json.dump(ppid, fp, indent=2, ensure_ascii=False) # Print summary print("\n" + "=" * 60) print("PROCESSING SUMMARY") print("=" * 60) print(f"Total scanned: {stats['total_scanned']:,}") print(f"Heritage relevant: {stats['heritage_relevant']:,}") print(f"Already exists: {stats['already_exists']:,}") print(f"New profiles: {stats['new_profiles']:,}") print(f"Updated profiles: {stats['updated_profiles']:,}") print(f"Errors: {stats['errors']:,}") print("=" * 60) if dry_run: print("\n** DRY RUN - No files were written **") print("Run without --dry-run to apply changes") def main(): parser = argparse.ArgumentParser(description="Process entity files to PPID format") parser.add_argument("--dry-run", action="store_true", help="Preview changes without writing") parser.add_argument("--limit", type=int, help="Limit number of files to process") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() process_entities( dry_run=args.dry_run, limit=args.limit, verbose=args.verbose ) if __name__ == "__main__": main()