#!/usr/bin/env python3 """ Generate PPIDs for Person Entity Files This script: 1. Reads all person entity files from data/custodian/person/entity/ 2. Deduplicates by keeping latest timestamp per LinkedIn slug 3. Filters to heritage_relevant: true only 4. Generates ID-class identifiers for living persons 5. Creates data/person/ directory structure with PPID filenames PPID Format (for living persons with unknown dates/locations): ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_FIRSTNAME-LASTNAME Per Rule 44: EDTF notation used for unknown dates (X = unspecified digit) """ import json import os import re import unicodedata from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from urllib.parse import unquote from typing import Optional import shutil try: from unidecode import unidecode as _unidecode HAS_UNIDECODE = True except ImportError: HAS_UNIDECODE = False _unidecode = None print("WARNING: unidecode not installed. Non-Latin names may not be transliterated correctly.") # Dutch tussenvoegsels (particles) to skip in last name token DUTCH_PARTICLES = { 'van', 'de', 'den', 'der', 'het', "'t", 'te', 'ter', 'ten', 'van de', 'van den', 'van der', 'van het', "van 't", 'in de', 'in den', 'in het', "in 't", 'op de', 'op den', 'op het', "op 't", 'aan de', 'aan den', 'aan het', } # International particles to skip INTERNATIONAL_PARTICLES = { # Dutch 'van', 'de', 'den', 'der', 'het', "'t", 'te', 'ter', 'ten', # German 'von', 'vom', 'zu', 'zum', 'zur', # French 'de', 'du', 'des', 'la', 'le', 'les', "l'", "d'", # Spanish/Portuguese 'da', 'das', 'do', 'dos', 'del', 'de la', 'de los', 'de las', # Italian 'di', 'della', 'dello', 'dei', 'degli', 'delle', # Arabic 'al', 'el', 'bin', 'ibn', 'abu', } def normalize_name(name: str) -> str: """Normalize name to ASCII equivalents. Uses NFD decomposition for Latin scripts with diacritics, and unidecode for non-Latin scripts (Hebrew, Arabic, Chinese, etc.) """ if not name: return "" # Check if name contains non-Latin characters # If any character is not in Latin extended range, use unidecode has_non_latin = any( ord(c) > 0x024F and unicodedata.category(c).startswith('L') for c in name ) if has_non_latin and HAS_UNIDECODE and _unidecode is not None: # Use unidecode for Hebrew, Arabic, Chinese, etc. ascii_name = _unidecode(name) else: # Use NFD decomposition for Latin scripts with diacritics normalized = unicodedata.normalize('NFD', name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') return ascii_name def extract_name_tokens(full_name: str) -> tuple[str, str]: """ Extract first and last name tokens for PPID. Rules: - Skip tussenvoegsels (van, de, den, der, etc.) - Use UPPERCASE - Normalize diacritics to ASCII Examples: "Jan van den Berg" -> ("JAN", "BERG") "Maria de la Cruz" -> ("MARIA", "CRUZ") "Vincent van Gogh" -> ("VINCENT", "GOGH") """ if not full_name: return ("UNKNOWN", "UNKNOWN") # Normalize diacritics name = normalize_name(full_name) # Split into words words = name.split() if not words: return ("UNKNOWN", "UNKNOWN") # First token is always the first word first_token = words[0].upper() # Remove any non-alpha characters first_token = re.sub(r'[^A-Z]', '', first_token) if not first_token: first_token = "UNKNOWN" # Find last token (skip particles) last_token = "UNKNOWN" for word in reversed(words): word_lower = word.lower() if word_lower not in INTERNATIONAL_PARTICLES: last_token = word.upper() # Remove any non-alpha characters last_token = re.sub(r'[^A-Z]', '', last_token) if last_token: break if not last_token: last_token = "UNKNOWN" return (first_token, last_token) def extract_slug_and_timestamp(filename: str) -> tuple[str, str]: """Extract LinkedIn slug and timestamp from filename. Format: {linkedin-slug}_{ISO-timestamp}.json Example: iris-van-meer-34329131_20251211T000000Z.json """ # Remove .json extension base = filename.replace('.json', '') # Split on last underscore (timestamp is always last) parts = base.rsplit('_', 1) if len(parts) == 2: slug = unquote(parts[0]) # URL-decode the slug timestamp = parts[1] return slug, timestamp else: return unquote(base), '' def parse_timestamp(ts: str) -> datetime: """Parse ISO timestamp like 20251211T000000Z.""" try: return datetime.strptime(ts, '%Y%m%dT%H%M%SZ') except ValueError: return datetime.min def generate_ppid( name: str, birth_location: Optional[str] = None, birth_date: Optional[str] = None, death_location: Optional[str] = None, death_date: Optional[str] = None, is_living: bool = True ) -> str: """ Generate a PPID for a person. Format: {TYPE}_{FL}_{FD}_{LL}_{LD}_{NT} For living persons with unknown data: ID_XX-XX-XXX_XXXX_XX-XX-XXX_XXXX_FIRSTNAME-LASTNAME """ # Type: ID for living/unverified, PID for deceased+verified id_type = "ID" if is_living else "PID" # First Location (birth place) first_location = birth_location if birth_location else "XX-XX-XXX" # First Date (birth date) - EDTF format first_date = birth_date if birth_date else "XXXX" # Last Location (death place or current location) last_location = death_location if death_location else "XX-XX-XXX" # Last Date (death date) - EDTF format # For living persons, use XXXX (unknown) last_date = death_date if death_date else "XXXX" # Name Tokens first_token, last_token = extract_name_tokens(name) name_tokens = f"{first_token}-{last_token}" # Combine ppid = f"{id_type}_{first_location}_{first_date}_{last_location}_{last_date}_{name_tokens}" return ppid def load_person_entity(filepath: Path) -> Optional[dict]: """Load and parse a person entity JSON file.""" try: with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, UnicodeDecodeError) as e: print(f" ERROR: Failed to parse {filepath.name}: {e}") return None def get_person_name_both(data: dict) -> tuple[str, str]: """Extract person name from entity data, returning both original and romanized. Returns: tuple: (display_name, original_name) - display_name: romanized/ASCII name for PPID - original_name: original name (may be non-Latin script) """ # Get original name original_name = ( data.get('profile_data', {}).get('name') or data.get('source_staff_info', {}).get('name') or data.get('fallback_data', {}).get('name') or '' ).strip() # Get romanized name if available name_romanized = data.get('profile_data', {}).get('name_romanized') if name_romanized: return name_romanized.strip(), original_name # Return original name for both if no romanization return original_name, original_name def get_person_name(data: dict) -> str: """Extract person name from entity data. Priority: 1. name_romanized (already transliterated) 2. name from profile_data 3. name from source_staff_info 4. name from fallback_data """ # First try romanized name (for Hebrew, Arabic, etc.) name_romanized = data.get('profile_data', {}).get('name_romanized') if name_romanized: return name_romanized.strip() # Try regular name fields name = ( data.get('profile_data', {}).get('name') or data.get('source_staff_info', {}).get('name') or data.get('fallback_data', {}).get('name') or '' ) return name.strip() def is_heritage_relevant(data: dict) -> bool: """Check if person is heritage-relevant.""" # Check nested heritage_relevance structure hr = data.get('heritage_relevance', {}) if isinstance(hr, dict): return hr.get('is_heritage_relevant', False) is True # Check direct field return data.get('heritage_relevant', False) is True def get_current_location(data: dict) -> Optional[str]: """Try to extract current work location from affiliations.""" affiliations = data.get('affiliations', []) for aff in affiliations: if isinstance(aff, dict): location = aff.get('location') if location: # TODO: Convert location string to CC-RR-PPP format # For now, return None (needs GeoNames lookup) pass return None def create_ppid_entity(data: dict, ppid: str, source_file: str) -> dict: """Create a new PPID entity structure from source data.""" display_name, original_name = get_person_name_both(data) entity = { "ppid": ppid, "ppid_type": "ID", # All living persons are ID class "ppid_components": { "type": "ID", "first_location": "XX-XX-XXX", "first_date": "XXXX", "last_location": "XX-XX-XXX", "last_date": "XXXX", "name_tokens": extract_name_tokens(display_name) }, "name": { "full_name": original_name, "display_name": display_name, "name_romanized": display_name if display_name != original_name else None, "name_tokens": extract_name_tokens(display_name), "source": "linkedin_profile" }, "birth_date": { "edtf": "XXXX", "precision": "unknown" }, "is_living": True, "heritage_relevance": data.get('heritage_relevance', {}), "affiliations": data.get('affiliations', []), "profile_data": data.get('profile_data', {}), "web_claims": data.get('web_claims', []), "source_observations": [ { "source_file": source_file, "observed_on": data.get('extraction_metadata', {}).get('extraction_date'), "extraction_agent": data.get('extraction_metadata', {}).get('extraction_agent') } ], "enrichment_metadata": { "birth_date_search": { "attempted": False, "notes": "Not yet searched - requires manual enrichment" } }, "provenance": { "created_at": datetime.now(timezone.utc).isoformat(), "created_by": "generate_ppids.py", "source_files": [source_file] } } return entity def main(): """Main entry point.""" entity_dir = Path('/Users/kempersc/apps/glam/data/custodian/person/entity') output_dir = Path('/Users/kempersc/apps/glam/data/person') if not entity_dir.exists(): print(f"ERROR: Entity directory not found: {entity_dir}") return # Create output directory output_dir.mkdir(parents=True, exist_ok=True) print("="*60) print("PPID GENERATION FOR PERSON ENTITIES") print("="*60) # Collect all JSON files json_files = list(entity_dir.glob('*.json')) print(f"\nFound {len(json_files)} JSON files") # Group by LinkedIn slug, keeping only latest timestamp slug_to_latest = {} # slug -> (filepath, timestamp, data) errors = [] print("\n📂 STEP 1: Loading and deduplicating files...") for i, filepath in enumerate(json_files): if i % 1000 == 0 and i > 0: print(f" Processing {i}/{len(json_files)}...") slug, timestamp = extract_slug_and_timestamp(filepath.name) data = load_person_entity(filepath) if data is None: errors.append(str(filepath)) continue # Keep only latest timestamp per slug if slug not in slug_to_latest: slug_to_latest[slug] = (filepath, timestamp, data) else: existing_ts = slug_to_latest[slug][1] if timestamp > existing_ts: slug_to_latest[slug] = (filepath, timestamp, data) print(f" Loaded: {len(slug_to_latest)} unique persons") print(f" Errors: {len(errors)}") # Filter to heritage-relevant only print("\n🏛️ STEP 2: Filtering to heritage-relevant persons...") heritage_relevant = {} non_heritage = 0 unknown_heritage = 0 for slug, (filepath, timestamp, data) in slug_to_latest.items(): if is_heritage_relevant(data): heritage_relevant[slug] = (filepath, timestamp, data) elif data.get('heritage_relevance', {}).get('is_heritage_relevant') is False: non_heritage += 1 else: unknown_heritage += 1 print(f" Heritage relevant: {len(heritage_relevant)}") print(f" Non-heritage: {non_heritage}") print(f" Unknown: {unknown_heritage}") # Generate PPIDs print("\n🆔 STEP 3: Generating PPIDs...") ppid_entities = [] ppid_collisions = defaultdict(list) # ppid -> list of slugs for slug, (filepath, timestamp, data) in heritage_relevant.items(): name = get_person_name(data) if not name: print(f" WARNING: No name found for {slug}, skipping") continue # Generate PPID (all living persons, unknown dates/locations) ppid = generate_ppid( name=name, is_living=True ) # Track collisions ppid_collisions[ppid].append(slug) # Create entity entity = create_ppid_entity(data, ppid, str(filepath)) entity['linkedin_slug'] = slug ppid_entities.append(entity) print(f" Generated {len(ppid_entities)} PPIDs") # Handle collisions collision_count = sum(1 for slugs in ppid_collisions.values() if len(slugs) > 1) print(f" Collisions detected: {collision_count}") if collision_count > 0: print("\n⚠️ STEP 3b: Resolving collisions with LinkedIn slug suffix...") # Add linkedin_slug suffix to resolve collisions for entity in ppid_entities: base_ppid = entity['ppid'] if len(ppid_collisions[base_ppid]) > 1: # Add linkedin slug as collision suffix slug = entity['linkedin_slug'] # Convert slug to safe suffix (replace special chars) safe_slug = re.sub(r'[^a-z0-9]', '_', slug.lower()) entity['ppid'] = f"{base_ppid}-{safe_slug}" entity['ppid_collision_suffix'] = safe_slug # Save entities print(f"\n💾 STEP 4: Saving {len(ppid_entities)} entities to {output_dir}...") saved = 0 save_errors = 0 for entity in ppid_entities: ppid = entity['ppid'] # Create safe filename (replace problematic chars) safe_filename = ppid.replace('/', '_').replace('\\', '_') output_path = output_dir / f"{safe_filename}.json" try: with open(output_path, 'w', encoding='utf-8') as f: json.dump(entity, f, indent=2, ensure_ascii=False) saved += 1 except Exception as e: print(f" ERROR saving {ppid}: {e}") save_errors += 1 print(f" Saved: {saved}") print(f" Errors: {save_errors}") # Summary print("\n" + "="*60) print("SUMMARY") print("="*60) print(f" Input files: {len(json_files)}") print(f" Unique persons: {len(slug_to_latest)}") print(f" Heritage relevant: {len(heritage_relevant)}") print(f" PPIDs generated: {len(ppid_entities)}") print(f" Collisions resolved: {collision_count}") print(f" Files saved: {saved}") print(f" Output directory: {output_dir}") # Save manifest manifest = { "generation_timestamp": datetime.now(timezone.utc).isoformat(), "input_directory": str(entity_dir), "output_directory": str(output_dir), "statistics": { "input_files": len(json_files), "unique_persons": len(slug_to_latest), "heritage_relevant": len(heritage_relevant), "ppids_generated": len(ppid_entities), "collisions_resolved": collision_count, "files_saved": saved }, "collisions": { ppid: slugs for ppid, slugs in ppid_collisions.items() if len(slugs) > 1 } } manifest_path = output_dir / "_manifest.json" with open(manifest_path, 'w', encoding='utf-8') as f: json.dump(manifest, f, indent=2, ensure_ascii=False) print(f"\n Manifest saved to: {manifest_path}") if __name__ == '__main__': main()