#!/usr/bin/env python3 """ PPID Enrichment Script (Rule 45 Compliant) Enriches PPID files with EXPLICIT inferred data: 1. inferred_birth_decade - From earliest career observations 2. inferred_birth_settlement - From earliest school/university location 3. inferred_current_settlement - From current work location All inferred data includes full provenance chains per Rule 45: - Each inference step is documented - Source observations are linked - Confidence levels are assigned - Inferred values NEVER silently replace canonical fields Reference: - .opencode/rules/inferred-data-explicit-provenance-rule.md (Rule 45) - .opencode/rules/ppid-birth-date-enrichment-rule.md (Rule 44) """ import json import os import re import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Optional, Tuple, List, Dict, Any # GeoNames admin1_code to ISO 3166-2 mapping for Netherlands NL_ADMIN1_TO_ISO = { "01": "DR", # Drenthe "02": "FR", # Friesland "03": "GE", # Gelderland "04": "GR", # Groningen "05": "LI", # Limburg "06": "NB", # Noord-Brabant "07": "NH", # Noord-Holland "09": "UT", # Utrecht "10": "ZE", # Zeeland "11": "ZH", # Zuid-Holland "15": "OV", # Overijssel "16": "FL", # Flevoland } # Common country-specific admin1 mappings COUNTRY_ADMIN1_MAPPINGS = { "NL": NL_ADMIN1_TO_ISO, } # Known university location mappings DUTCH_UNI_LOCATIONS = { "Universiteit Utrecht": ("Utrecht", "NL"), "Utrecht University": ("Utrecht", "NL"), "UU": ("Utrecht", "NL"), "Universiteit van Amsterdam": ("Amsterdam", "NL"), "University of Amsterdam": ("Amsterdam", "NL"), "UvA": ("Amsterdam", "NL"), "VU Amsterdam": ("Amsterdam", "NL"), "Vrije Universiteit": ("Amsterdam", "NL"), "Leiden University": ("Leiden", "NL"), "Universiteit Leiden": ("Leiden", "NL"), "TU Delft": ("Delft", "NL"), "Technische Universiteit Delft": ("Delft", "NL"), "TU Eindhoven": ("Eindhoven", "NL"), "Technische Universiteit Eindhoven": ("Eindhoven", "NL"), "Radboud": ("Nijmegen", "NL"), "Radboud Universiteit": ("Nijmegen", "NL"), "Rijksuniversiteit Groningen": ("Groningen", "NL"), "University of Groningen": ("Groningen", "NL"), "RUG": ("Groningen", "NL"), "Maastricht University": ("Maastricht", "NL"), "Universiteit Maastricht": ("Maastricht", "NL"), "Erasmus": ("Rotterdam", "NL"), "Erasmus Universiteit": ("Rotterdam", "NL"), "Erasmus University Rotterdam": ("Rotterdam", "NL"), "Tilburg University": ("Tilburg", "NL"), "Universiteit Tilburg": ("Tilburg", "NL"), "Wageningen": ("Wageningen", "NL"), "Wageningen University": ("Wageningen", "NL"), "Hogeschool": ("", "NL"), # Generic, location from name # Additional Dutch institutions "Hogeschool van Arnhem en Nijmegen": ("Nijmegen", "NL"), "HAN": ("Nijmegen", "NL"), "Hogeschool Utrecht": ("Utrecht", "NL"), "HU": ("Utrecht", "NL"), "Hogeschool van Amsterdam": ("Amsterdam", "NL"), "HvA": ("Amsterdam", "NL"), "Hogeschool Rotterdam": ("Rotterdam", "NL"), "Hogeschool Inholland": ("Amsterdam", "NL"), "Fontys": ("Eindhoven", "NL"), "Fontys Hogescholen": ("Eindhoven", "NL"), "Saxion": ("Enschede", "NL"), "Saxion Hogeschool": ("Enschede", "NL"), "Stenden": ("Leeuwarden", "NL"), "NHL Stenden": ("Leeuwarden", "NL"), "Hanzehogeschool": ("Groningen", "NL"), "Hanze": ("Groningen", "NL"), "Christelijke Hogeschool Ede": ("Ede", "NL"), "CHE": ("Ede", "NL"), "Avans": ("Breda", "NL"), "Avans Hogeschool": ("Breda", "NL"), "Windesheim": ("Zwolle", "NL"), "Hogeschool Windesheim": ("Zwolle", "NL"), "Zuyd Hogeschool": ("Maastricht", "NL"), "Archiefschool": ("Amsterdam", "NL"), "Archiefschool Amsterdam": ("Amsterdam", "NL"), "Reinwardt Academie": ("Amsterdam", "NL"), "KABK": ("Den Haag", "NL"), "Koninklijke Academie van Beeldende Kunsten": ("Den Haag", "NL"), "Gerrit Rietveld Academie": ("Amsterdam", "NL"), "Design Academy Eindhoven": ("Eindhoven", "NL"), "Art & Design College Utrecht": ("Utrecht", "NL"), "ArtEZ": ("Arnhem", "NL"), "IOPS": ("Amsterdam", "NL"), "Interuniversity Graduate School of Psychometrics": ("Amsterdam", "NL"), "Sioo": ("Utrecht", "NL"), # Belgian institutions "KU Leuven": ("Leuven", "BE"), "University of Leuven": ("Leuven", "BE"), "Katholieke Universiteit Leuven": ("Leuven", "BE"), "Vrije Universiteit Brussel": ("Brussel", "BE"), "VUB": ("Brussel", "BE"), "Universiteit Gent": ("Gent", "BE"), "Ghent University": ("Gent", "BE"), "UGent": ("Gent", "BE"), "Universiteit Antwerpen": ("Antwerpen", "BE"), "University of Antwerp": ("Antwerpen", "BE"), # German institutions "Universität Bremen": ("Bremen", "DE"), "University of Bremen": ("Bremen", "DE"), "Westfälische Wilhelms-Universität Münster": ("Münster", "DE"), "WWU Münster": ("Münster", "DE"), "Humboldt-Universität": ("Berlin", "DE"), "Freie Universität Berlin": ("Berlin", "DE"), "FU Berlin": ("Berlin", "DE"), "Universität zu Köln": ("Köln", "DE"), "University of Cologne": ("Köln", "DE"), "Ruprecht-Karls-Universität Heidelberg": ("Heidelberg", "DE"), "Heidelberg University": ("Heidelberg", "DE"), "Ludwig-Maximilians-Universität München": ("München", "DE"), "LMU München": ("München", "DE"), "Technische Universität München": ("München", "DE"), "TU München": ("München", "DE"), # International "Politecnico di Milano": ("Milano", "IT"), "Oberlin College": ("Oberlin", "US"), } def get_settlement_code(city_name: str) -> str: """Generate 3-letter settlement code from city name.""" words = city_name.split() dutch_articles = {"de", "het", "den", "'s"} if len(words) == 1: return city_name[:3].upper() elif words[0].lower() in dutch_articles: return (words[0][0] + words[1][:2]).upper() else: return "".join(w[0] for w in words[:3]).upper() def geocode_location(location_str: str, db_path: str) -> Optional[dict]: """ Geocode a location string to CC-RR-PPP format using GeoNames. """ if not location_str: return None location_str = location_str.strip() # Extract country from common patterns country_code = None if "(NL)" in location_str or "Netherlands" in location_str or "Nederland" in location_str: country_code = "NL" elif "(BE)" in location_str or "Belgium" in location_str or "België" in location_str: country_code = "BE" elif "(DE)" in location_str or "Germany" in location_str or "Deutschland" in location_str: country_code = "DE" # Clean location for city lookup city_candidate = location_str.split(",")[0].strip() city_candidate = re.sub(r"\s*(Area|Region|\([A-Z]{2}\)).*", "", city_candidate).strip() if not city_candidate or not country_code: return None try: conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(""" SELECT name, ascii_name, admin1_code, admin1_name, country_code, latitude, longitude, geonames_id, population, feature_code FROM cities WHERE (name LIKE ? OR ascii_name LIKE ?) AND country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') ORDER BY population DESC LIMIT 1 """, (f"{city_candidate}%", f"{city_candidate}%", country_code)) row = cursor.fetchone() conn.close() if not row: return None name, ascii_name, admin1_code, admin1_name, cc, lat, lon, geonames_id, pop, feature_code = row # Map admin1_code to ISO 3166-2 region_code = "XX" if cc in COUNTRY_ADMIN1_MAPPINGS and admin1_code: region_code = COUNTRY_ADMIN1_MAPPINGS[cc].get(admin1_code, "XX") elif admin1_code: region_code = admin1_code[:2].upper() settlement_code = get_settlement_code(ascii_name) return { "country_code": cc, "region_code": region_code, "settlement_code": settlement_code, "settlement_name": name, "formatted": f"{cc}-{region_code}-{settlement_code}", "geonames_data": { "geonames_id": geonames_id, "geonames_name": name, "admin1_code": admin1_code, "admin1_name": admin1_name, "feature_code": feature_code, "latitude": lat, "longitude": lon, }, "original_query": location_str, } except Exception as e: print(f" GeoNames error: {e}") return None def parse_date_range(date_range: str) -> Tuple[Optional[int], Optional[int]]: """Parse date range string to extract start and end years.""" if not date_range: return None, None years = re.findall(r'\b(19\d{2}|20\d{2})\b', date_range) if not years: return None, None start_year = int(years[0]) if years else None end_year = int(years[-1]) if len(years) > 1 else None return start_year, end_year def build_inference_chain(steps: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Build a numbered inference chain.""" return [{"step": i + 1, **step} for i, step in enumerate(steps)] def is_near_decade_boundary(year: int, threshold: int = 3) -> bool: """ Check if a year is within `threshold` years of a decade boundary. Examples: 1968, threshold=3 → True (within 3 of 1970) 1972, threshold=3 → True (within 3 of 1970) 1975, threshold=3 → False (5 years from both boundaries) """ year_in_decade = year % 10 return year_in_decade >= (10 - threshold) or year_in_decade <= threshold def get_decade_notation(year: int) -> str: """Convert year to EDTF decade notation (e.g., 1968 → 196X).""" decade = (year // 10) * 10 return f"{decade // 10}X" def get_adjacent_decades(year: int) -> Tuple[str, str]: """ Get two adjacent decades for a year near a boundary. Examples: 1968 → ("196X", "197X") 1972 → ("196X", "197X") 2001 → ("199X", "200X") """ decade = (year // 10) * 10 year_in_decade = year % 10 if year_in_decade >= 7: # Late in decade (7, 8, 9) → spans to next return (get_decade_notation(year), get_decade_notation(year + 10)) else: # Early in decade (0, 1, 2, 3) → spans to previous return (get_decade_notation(year - 10), get_decade_notation(year)) def infer_birth_decade(profile_data: dict) -> Optional[dict]: """ Infer birth decade from earliest career observations. Returns explicit inferred_birth_decade with full provenance chain. Supports list-valued results for decade boundary cases (Rule 45 extension): - If estimated birth year is within 3 years of decade boundary, returns both adjacent decades as EDTF set notation: [196X,197X] """ earliest_year = None inference_steps = [] age_offset = 18 age_variance = 3 # ±3 years typical variance in entry age education_record = None experience_record = None # Check education first (most reliable) education = profile_data.get("education") or [] for edu in education: if edu is None: continue # Handle multiple date field names: "date_range", "period", "years", "year" date_range = edu.get("date_range") or edu.get("period") or edu.get("years") or edu.get("year") or "" degree = (edu.get("degree") or "").lower() # Handle None # Handle both "institution" and "school" field names institution = edu.get("institution") or edu.get("school") or "" start_year, _ = parse_date_range(date_range) if start_year: # Match bachelor's/master's/doctoral level degrees degree_lower = degree.lower() if any(term in degree_lower for term in [ # English degrees "bachelor", "bsc", "ba", "master", "msc", "ma", "phd", "doctor", "postgraduate", # Dutch degrees "doctoraal", "drs", "drs.", "mr", "mr.", "ing", "ing.", "ir", "ir.", "hbo", "mbo", "pabo", "meao", "heao", # German degrees "magister", "diplom", "staatsexamen", "referendariat", # Italian degrees "laurea", # Generic "degree", "graduate", "undergraduate", "post doc", "postdoc", ]): if earliest_year is None or start_year < earliest_year: earliest_year = start_year # Determine age offset based on degree level if any(term in degree_lower for term in ["master", "msc", "ma", "drs", "drs.", "mr", "mr.", "ir", "ir.", "laurea magistrale", "magister"]): age_offset = 22 # Master's typically starts at 22 elif any(term in degree_lower for term in ["phd", "doctor", "post doc", "postdoc", "postgraduate"]): age_offset = 24 # PhD typically starts at 24 else: age_offset = 18 # Bachelor's/undergraduate education_record = { "institution": institution, "degree": edu.get("degree", ""), "date_range": date_range, } elif any(term in degree_lower for term in ["hbo", "mbo", "vocational", "associate", "pabo", "meao", "heao"]): if earliest_year is None or start_year < earliest_year: earliest_year = start_year age_offset = 16 education_record = { "institution": institution, "degree": edu.get("degree", ""), "date_range": date_range, } # Also accept education without clear degree type (use conservative estimate) elif earliest_year is None: earliest_year = start_year age_offset = 18 # Assume typical university entry age education_record = { "institution": institution, "degree": edu.get("degree", "") or "(no degree specified)", "date_range": date_range, } # If no education, check earliest job if earliest_year is None: experience = profile_data.get("experience") or [] for exp in experience: if exp is None: continue # Handle multiple date field names date_range = exp.get("date_range") or exp.get("period") or "" start_year, _ = parse_date_range(date_range) if start_year: if earliest_year is None or start_year < earliest_year: earliest_year = start_year age_offset = 23 age_variance = 5 # Higher variance for first job experience_record = { "company": exp.get("company", ""), "title": exp.get("title", ""), "date_range": date_range, } if earliest_year is None: return None # Build inference chain if education_record: inference_steps.append({ "observation": "Education record found", "source_field": "profile_data.education", "source_value": education_record, }) inference_steps.append({ "extraction": "Start year extracted from date_range", "extracted_value": earliest_year, }) inference_steps.append({ "assumption": f"Education entry age is approximately {age_offset} (±{age_variance} years)", "rationale": "Standard entry age for this education level in Netherlands/Europe", "confidence_impact": f"Assumption introduces uncertainty; actual age may vary ±{age_variance} years", }) else: inference_steps.append({ "observation": "First job record found (no education data)", "source_field": "profile_data.experience", "source_value": experience_record, }) inference_steps.append({ "extraction": "Start year extracted from date_range", "extracted_value": earliest_year, }) inference_steps.append({ "assumption": f"First job age is approximately {age_offset} (±{age_variance} years)", "rationale": "Assumes first job after typical university completion", "confidence_impact": f"Higher uncertainty; first job age varies ±{age_variance} years", }) estimated_birth_year = earliest_year - age_offset min_birth_year = earliest_year - age_offset - age_variance max_birth_year = earliest_year - age_offset + age_variance inference_steps.append({ "calculation": f"{earliest_year} - {age_offset} = {estimated_birth_year}", "result": f"Estimated birth year: {estimated_birth_year}", "range": f"{min_birth_year}-{max_birth_year} (accounting for ±{age_variance} year variance)", }) # Check if birth year range spans a decade boundary min_decade = (min_birth_year // 10) * 10 max_decade = (max_birth_year // 10) * 10 spans_decade_boundary = min_decade != max_decade if spans_decade_boundary: # Get decades directly from min/max range (not estimated year) decade1 = get_decade_notation(min_birth_year) decade2 = get_decade_notation(max_birth_year) # Primary is the decade containing the estimated birth year estimated_decade = get_decade_notation(estimated_birth_year) if estimated_decade == decade1: primary_value = decade1 primary_rationale = f"{estimated_birth_year} is in {decade1}, but range extends into {decade2}" else: primary_value = decade2 primary_rationale = f"{estimated_birth_year} is in {decade2}, but range extends into {decade1}" inference_steps.append({ "generalization": "Birth year range spans decade boundary", "input_range": [min_birth_year, max_birth_year], "output": [decade1, decade2], "edtf": f"[{decade1},{decade2}]", "rationale": "Cannot determine which decade with certainty; using EDTF 'one of' set notation", }) return { "values": [decade1, decade2], "edtf": f"[{decade1},{decade2}]", "edtf_meaning": f"one of: {decade1[:-1]}0s or {decade2[:-1]}0s", "precision": "decade_set", "primary_value": primary_value, "primary_rationale": primary_rationale, "confidence": "very_low", # Lower confidence due to boundary uncertainty "inference_provenance": { "method": "earliest_observation_heuristic", "inference_chain": build_inference_chain(inference_steps), "assumptions": [ f"Entry age for education/first job: {age_offset} years (±{age_variance})", "Career records are complete in LinkedIn profile", ], "boundary_note": f"Birth year estimate {estimated_birth_year} spans decades {decade1}/{decade2}", "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "enrich_ppids.py", } } else: # Single decade - standard case edtf_decade = get_decade_notation(estimated_birth_year) inference_steps.append({ "generalization": "Convert to EDTF decade notation", "input": estimated_birth_year, "output": edtf_decade, "rationale": "Decade precision appropriate for heuristic-based estimate", }) return { "value": edtf_decade, "edtf": edtf_decade, "precision": "decade", "confidence": "low", "inference_provenance": { "method": "earliest_observation_heuristic", "inference_chain": build_inference_chain(inference_steps), "assumptions": [ f"Entry age for education/first job: {age_offset} years (±{age_variance})", "Career records are complete in LinkedIn profile", ], "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "enrich_ppids.py", } } def infer_birth_settlement(profile_data: dict, db_path: str) -> Optional[dict]: """ Infer birth settlement from earliest school/university location. Returns explicit inferred_birth_settlement with full provenance chain. """ inference_steps = [] # Check education first education = profile_data.get("education") or [] edu_with_years = [] for edu in education: if edu is None: continue # Handle multiple date field names: "date_range", "period", "years", "year" date_range = edu.get("date_range") or edu.get("period") or edu.get("years") or edu.get("year") or "" start_year, _ = parse_date_range(date_range) if start_year: edu_with_years.append((start_year, edu)) edu_with_years.sort(key=lambda x: x[0]) for start_year, edu in edu_with_years: # Handle both "institution" and "school" field names institution = edu.get("institution") or edu.get("school") or "" # Look up institution location location = None location_source = None for uni_name, (city, country) in DUTCH_UNI_LOCATIONS.items(): if uni_name.lower() in institution.lower(): location = f"{city}, Netherlands" if city else None location_source = f"Known institution mapping: {uni_name}" break if not location: continue # Get date_range for provenance (handle multiple field names) edu_date_range = edu.get("date_range") or edu.get("period") or edu.get("years") or edu.get("year") or "" inference_steps.append({ "observation": "Earliest education institution identified", "source_field": f"profile_data.education", "source_value": { "institution": institution, "date_range": edu_date_range, "degree": edu.get("degree") or "", }, }) inference_steps.append({ "lookup": "Institution location mapping", "mapping_source": "DUTCH_UNI_LOCATIONS dictionary", "mapping_key": institution, "mapping_result": location, }) geo = geocode_location(location, db_path) if geo: inference_steps.append({ "geocoding": "GeoNames resolution", "query": location, "result": geo["geonames_data"], }) inference_steps.append({ "formatting": "CC-RR-PPP generation", "components": { "country_code": geo["country_code"], "region_code": geo["region_code"], "settlement_code": geo["settlement_code"], }, "result": geo["formatted"], }) return { "value": geo["settlement_name"], "formatted": geo["formatted"], "country_code": geo["country_code"], "region_code": geo["region_code"], "settlement_code": geo["settlement_code"], "confidence": "low", "inference_provenance": { "method": "earliest_education_location", "inference_chain": build_inference_chain(inference_steps), "assumptions": [ "Student attended school near birth/family residence", "Institution location is representative of early life location", ], "assumption_note": "University location used as proxy for birth settlement; student may have relocated for education", "geonames_data": geo["geonames_data"], "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "enrich_ppids.py", } } # Fallback: earliest job location experience = profile_data.get("experience") or [] exp_with_years = [] for exp in experience: if exp is None: continue # Handle multiple date field names date_range = exp.get("date_range") or exp.get("period") or "" start_year, _ = parse_date_range(date_range) if start_year and exp.get("location"): exp_with_years.append((start_year, exp)) exp_with_years.sort(key=lambda x: x[0]) for start_year, exp in exp_with_years: location = exp.get("location", "") if not location: continue # Get date_range for provenance (handle multiple field names) exp_date_range = exp.get("date_range") or exp.get("period") or "" inference_steps.append({ "observation": "Earliest job with location found (no education location available)", "source_field": "profile_data.experience", "source_value": { "company": exp.get("company", ""), "title": exp.get("title", ""), "date_range": exp_date_range, "location": location, }, }) geo = geocode_location(location, db_path) if geo: inference_steps.append({ "geocoding": "GeoNames resolution", "query": location, "result": geo["geonames_data"], }) inference_steps.append({ "formatting": "CC-RR-PPP generation", "result": geo["formatted"], }) return { "value": geo["settlement_name"], "formatted": geo["formatted"], "country_code": geo["country_code"], "region_code": geo["region_code"], "settlement_code": geo["settlement_code"], "confidence": "very_low", "inference_provenance": { "method": "earliest_job_location", "inference_chain": build_inference_chain(inference_steps), "assumptions": [ "First job location represents early life region", ], "assumption_note": "Job location is weak proxy for birth location; person likely relocated for work", "geonames_data": geo["geonames_data"], "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "enrich_ppids.py", } } return None def infer_current_settlement(profile_data: dict, db_path: str) -> Optional[dict]: """ Infer current settlement from profile location or current job. Returns explicit inferred_current_settlement with full provenance chain. """ inference_steps = [] # Try profile location first (most reliable) profile_location = profile_data.get("location") if profile_location: inference_steps.append({ "observation": "Profile location field found", "source_field": "profile_data.location", "source_value": profile_location, }) geo = geocode_location(profile_location, db_path) if geo: inference_steps.append({ "geocoding": "GeoNames resolution", "query": profile_location, "result": geo["geonames_data"], }) inference_steps.append({ "formatting": "CC-RR-PPP generation", "result": geo["formatted"], }) return { "value": geo["settlement_name"], "formatted": geo["formatted"], "country_code": geo["country_code"], "region_code": geo["region_code"], "settlement_code": geo["settlement_code"], "confidence": "medium", "inference_provenance": { "method": "profile_location", "inference_chain": build_inference_chain(inference_steps), "assumptions": [ "Profile location is up-to-date", "Profile location represents current residence", ], "geonames_data": geo["geonames_data"], "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "enrich_ppids.py", } } # Try current job location experience = profile_data.get("experience") or [] for exp in experience: if exp is None: continue # Handle multiple date field names date_range = exp.get("date_range") or exp.get("period") or "" # Also check "current" field which some profiles have is_current = "Present" in date_range or exp.get("current") is True if is_current: location = exp.get("location") if location: inference_steps.append({ "observation": "Current job with location found", "source_field": "profile_data.experience", "source_value": { "company": exp.get("company", ""), "title": exp.get("title", ""), "location": location, }, }) geo = geocode_location(location, db_path) if geo: inference_steps.append({ "geocoding": "GeoNames resolution", "query": location, "result": geo["geonames_data"], }) inference_steps.append({ "formatting": "CC-RR-PPP generation", "result": geo["formatted"], }) return { "value": geo["settlement_name"], "formatted": geo["formatted"], "country_code": geo["country_code"], "region_code": geo["region_code"], "settlement_code": geo["settlement_code"], "confidence": "medium", "inference_provenance": { "method": "current_job_location", "inference_chain": build_inference_chain(inference_steps), "assumptions": [ "Current job location represents residence area", "Person works near where they live", ], "geonames_data": geo["geonames_data"], "inferred_at": datetime.now(timezone.utc).isoformat(), "inferred_by": "enrich_ppids.py", } } return None def regenerate_ppid(components: dict) -> str: """Regenerate PPID string from components.""" return ( f"{components['type']}_" f"{components['first_location']}_{components['first_date']}_" f"{components['last_location']}_{components['last_date']}_" f"{'-'.join(components['name_tokens'])}" ) def enrich_ppid_file(filepath: Path, db_path: str, dry_run: bool = False, force: bool = False) -> dict: """ Enrich a single PPID file with explicit inferred data (Rule 45 compliant). Args: filepath: Path to PPID JSON file db_path: Path to GeoNames SQLite database dry_run: Don't write changes force: Re-enrich already-enriched files (clears existing inferred_* fields) """ stats = { "birth_decade_inferred": False, "birth_decade_is_list": False, # Track decade boundary cases "birth_settlement_inferred": False, "current_settlement_inferred": False, "ppid_changed": False, } with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) profile_data = data.get("profile_data", {}) if not profile_data: return stats # If force mode, clear existing inferred fields to re-enrich if force: for field in ["inferred_birth_decade", "inferred_birth_settlement", "inferred_current_settlement"]: if field in data: del data[field] # Reset components that may have been set from inferred data if "_source" in str(data.get("ppid_components", {}).get("first_date_source", "")): data["ppid_components"]["first_date"] = "XXXX" data["ppid_components"].pop("first_date_source", None) data["ppid_components"].pop("first_date_alternatives", None) if "_source" in str(data.get("ppid_components", {}).get("first_location_source", "")): data["ppid_components"]["first_location"] = "XX-XX-XXX" data["ppid_components"].pop("first_location_source", None) if "_source" in str(data.get("ppid_components", {}).get("last_location_source", "")): data["ppid_components"]["last_location"] = "XX-XX-XXX" data["ppid_components"].pop("last_location_source", None) original_ppid = data.get("ppid", "") components = data.get("ppid_components", {}).copy() changed = False # ===== INFER BIRTH DECADE ===== # Only if we don't already have an inferred value AND birth_date is unknown if (data.get("birth_date", {}).get("edtf") == "XXXX" and "inferred_birth_decade" not in data): birth_info = infer_birth_decade(profile_data) if birth_info: # Store as EXPLICIT inferred field (Rule 45) data["inferred_birth_decade"] = birth_info # Handle list-valued (decade boundary) vs single value if "values" in birth_info: # List-valued: use primary_value for PPID components["first_date"] = birth_info["primary_value"] components["first_date_source"] = "inferred_birth_decade.primary_value" components["first_date_alternatives"] = [v for v in birth_info["values"] if v != birth_info["primary_value"]] stats["birth_decade_is_list"] = True else: # Single value components["first_date"] = birth_info["edtf"] components["first_date_source"] = "inferred_birth_decade" # Add note to canonical field pointing to inferred alternative data["birth_date"]["note"] = "See inferred_birth_decade for heuristic estimate" stats["birth_decade_inferred"] = True changed = True # ===== INFER BIRTH SETTLEMENT ===== if (components.get("first_location") == "XX-XX-XXX" and "inferred_birth_settlement" not in data): birth_loc = infer_birth_settlement(profile_data, db_path) if birth_loc: data["inferred_birth_settlement"] = birth_loc components["first_location"] = birth_loc["formatted"] components["first_location_source"] = "inferred_birth_settlement" stats["birth_settlement_inferred"] = True changed = True # ===== INFER CURRENT SETTLEMENT ===== if (components.get("last_location") == "XX-XX-XXX" and "inferred_current_settlement" not in data): current_loc = infer_current_settlement(profile_data, db_path) if current_loc: data["inferred_current_settlement"] = current_loc components["last_location"] = current_loc["formatted"] components["last_location_source"] = "inferred_current_settlement" stats["current_settlement_inferred"] = True changed = True # ===== REGENERATE PPID IF COMPONENTS CHANGED ===== if changed: new_ppid = regenerate_ppid(components) if new_ppid != original_ppid: data["ppid"] = new_ppid data["ppid_components"] = components stats["ppid_changed"] = True # Track PPID history if "ppid_history" not in data: data["ppid_history"] = [] data["ppid_history"].append({ "previous_ppid": original_ppid, "new_ppid": new_ppid, "changed_at": datetime.now(timezone.utc).isoformat(), "reason": "observation_based_inference", "inferred_fields": [ k for k in ["inferred_birth_decade", "inferred_birth_settlement", "inferred_current_settlement"] if k in data ], }) else: data["ppid_components"] = components # Update provenance data["provenance"]["modified_at"] = datetime.now(timezone.utc).isoformat() data["provenance"]["modified_by"] = "enrich_ppids.py" if not dry_run: # Write back to file with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # Rename file if PPID changed if stats["ppid_changed"]: new_filename = f"{new_ppid}.json" new_filepath = filepath.parent / new_filename if new_filepath != filepath and not new_filepath.exists(): filepath.rename(new_filepath) return stats def main(): import argparse parser = argparse.ArgumentParser(description="Enrich PPID files with explicit inferred data (Rule 45)") parser.add_argument("--dry-run", action="store_true", help="Don't write changes") parser.add_argument("--limit", type=int, help="Process only N files") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") parser.add_argument("--force", "-f", action="store_true", help="Re-enrich already-enriched files") args = parser.parse_args() # Paths person_dir = Path("/Users/kempersc/apps/glam/data/person") db_path = "/Users/kempersc/apps/glam/data/reference/geonames.db" # Get all PPID files ppid_files = list(person_dir.glob("ID_*.json")) if args.limit: ppid_files = ppid_files[:args.limit] print(f"Processing {len(ppid_files)} PPID files (Rule 45 compliant)...") if args.dry_run: print("DRY RUN - no changes will be written") if args.force: print("FORCE MODE - re-enriching all files") # Statistics total_stats = { "processed": 0, "birth_decade_inferred": 0, "birth_decade_list_valued": 0, # Decade boundary cases "birth_settlement_inferred": 0, "current_settlement_inferred": 0, "ppid_changed": 0, "errors": 0, } for i, filepath in enumerate(ppid_files): try: stats = enrich_ppid_file(filepath, db_path, dry_run=args.dry_run, force=args.force) total_stats["processed"] += 1 if stats["birth_decade_inferred"]: total_stats["birth_decade_inferred"] += 1 if stats.get("birth_decade_is_list"): total_stats["birth_decade_list_valued"] += 1 if stats["birth_settlement_inferred"]: total_stats["birth_settlement_inferred"] += 1 if stats["current_settlement_inferred"]: total_stats["current_settlement_inferred"] += 1 if stats["ppid_changed"]: total_stats["ppid_changed"] += 1 if args.verbose and any(stats.values()): print(f" {filepath.name}: {stats}") if (i + 1) % 500 == 0: print(f" Processed {i + 1}/{len(ppid_files)}...") except Exception as e: total_stats["errors"] += 1 if args.verbose: print(f" ERROR {filepath.name}: {e}") # Print summary print("\n" + "=" * 60) print("ENRICHMENT SUMMARY (Rule 45 Compliant)") print("=" * 60) print(f"Processed: {total_stats['processed']}") print(f"Birth decades inferred: {total_stats['birth_decade_inferred']}") print(f" - List-valued (boundary): {total_stats['birth_decade_list_valued']}") print(f"Birth settlements inferred: {total_stats['birth_settlement_inferred']}") print(f"Current settlements inferred: {total_stats['current_settlement_inferred']}") print(f"PPIDs updated: {total_stats['ppid_changed']}") print(f"Errors: {total_stats['errors']}") # Coverage percentages if total_stats["processed"] > 0: print("\nCoverage:") print(f" Birth decade: {total_stats['birth_decade_inferred'] / total_stats['processed'] * 100:.1f}%") if total_stats["birth_decade_inferred"] > 0: print(f" - Boundary cases: {total_stats['birth_decade_list_valued'] / total_stats['birth_decade_inferred'] * 100:.1f}%") print(f" Birth settlement: {total_stats['birth_settlement_inferred'] / total_stats['processed'] * 100:.1f}%") print(f" Current settlement: {total_stats['current_settlement_inferred'] / total_stats['processed'] * 100:.1f}%") print("\nNote: All inferred data stored in explicit inferred_* fields with provenance chains.") print("Note: Decade boundary cases use EDTF set notation [196X,197X] with primary_value for PPID.") if __name__ == "__main__": main()