glam/scripts/enrich_nde_entries_ghcid.py
2025-12-03 17:38:46 +01:00

1219 lines
46 KiB
Python

#!/usr/bin/env python3
"""
Enrich NDE Heritage Institution Entries with GHCID Persistent Identifiers.
This script:
1. Loads all YAML files from data/nde/enriched/entries/
2. Extracts location data (city, region, coordinates)
3. Generates base GHCIDs using NL-REGION-CITY-TYPE-ABBREV format
4. Detects collisions and applies First Batch rule (all get name suffixes)
5. Generates all 4 identifier formats:
- Human-readable GHCID string
- UUID v5 (SHA-1, RFC 4122 compliant) - PRIMARY
- UUID v8 (SHA-256, SOTA cryptographic strength) - Future-proof
- Numeric (64-bit integer for database PKs)
6. Adds GHCID fields to each entry
7. Generates collision statistics report
## GHCID Format
Base: NL-{Region}-{City}-{Type}-{Abbreviation}
With collision suffix: NL-{Region}-{City}-{Type}-{Abbreviation}-{name_suffix}
## Collision Resolution (First Batch Rule)
Since this is a batch import (all entries processed together), when multiple
institutions generate the same base GHCID:
- ALL colliding institutions receive native language name suffixes
- Name suffix: snake_case of institution name
Example:
- Two societies with NL-OV-ZWO-S-HK both become:
- NL-OV-ZWO-S-HK-historische_kring_zwolle
- NL-OV-ZWO-S-HK-heemkundige_kring_zwolle
Usage:
python scripts/enrich_nde_entries_ghcid.py [--dry-run]
Options:
--dry-run Preview changes without writing to files
"""
import argparse
import json
import re
import sys
import unicodedata
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import yaml
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from glam_extractor.identifiers.ghcid import (
GHCIDComponents,
GHCIDGenerator,
InstitutionType,
extract_abbreviation_from_name,
normalize_city_name,
)
from glam_extractor.geocoding.geonames_lookup import GeoNamesDB
# Dutch province to ISO 3166-2 code mapping
DUTCH_PROVINCE_CODES = {
# Standard names
"drenthe": "DR",
"flevoland": "FL",
"friesland": "FR",
"fryslan": "FR",
"fryslân": "FR",
"gelderland": "GE",
"groningen": "GR",
"limburg": "LI",
"noord-brabant": "NB",
"north brabant": "NB",
"noord brabant": "NB",
"noord-holland": "NH",
"north holland": "NH",
"noord holland": "NH",
"overijssel": "OV",
"utrecht": "UT",
"zeeland": "ZE",
"zuid-holland": "ZH",
"south holland": "ZH",
"zuid holland": "ZH",
}
# GeoNames admin1 code to ISO 3166-2 NL mapping
# Based on actual GeoNames database content (verified 2025-12-01)
GEONAMES_ADMIN1_TO_ISO_NL = {
"01": "DR", # Drenthe
"02": "FR", # Friesland (NOT Flevoland!)
"03": "GE", # Gelderland
"04": "GR", # Groningen
"05": "LI", # Limburg
"06": "NB", # Noord-Brabant (North Brabant)
"07": "NH", # Noord-Holland (North Holland)
"09": "UT", # Utrecht
"10": "ZE", # Zeeland
"11": "ZH", # Zuid-Holland (South Holland)
"15": "OV", # Overijssel
"16": "FL", # Flevoland
}
# GeoNames admin1 code to ISO 3166-2 BE mapping
# Belgium uses region codes as admin1
GEONAMES_ADMIN1_TO_ISO_BE = {
"BRU": "BRU", # Brussels Capital Region
"VLG": "VLG", # Flanders (Vlaanderen)
"WAL": "WAL", # Wallonia (Wallonie)
}
# GeoNames admin1 code to ISO 3166-2 DE mapping (placeholder)
GEONAMES_ADMIN1_TO_ISO_DE = {
# German federal states would go here
}
# Combined mapping by country
GEONAMES_ADMIN1_TO_ISO = {
"NL": GEONAMES_ADMIN1_TO_ISO_NL,
"BE": GEONAMES_ADMIN1_TO_ISO_BE,
"DE": GEONAMES_ADMIN1_TO_ISO_DE,
}
# Global GeoNames database instance (initialized lazily)
_geonames_db: Optional[GeoNamesDB] = None
def get_geonames_db() -> GeoNamesDB:
"""Get or create the global GeoNames database instance."""
global _geonames_db
if _geonames_db is None:
project_root = Path(__file__).parent.parent
db_path = project_root / "data" / "reference" / "geonames.db"
_geonames_db = GeoNamesDB(db_path, enable_disambiguation=True)
return _geonames_db
def reverse_geocode_to_city(latitude: float, longitude: float, country_code: str = "NL") -> Optional[dict]:
"""
Reverse geocode coordinates to find the nearest city/town/village.
Uses the GeoNames database to find the closest settlement to the given coordinates.
EXCLUDES neighborhoods/districts (PPLX) - only returns proper settlements.
Args:
latitude: Latitude coordinate
longitude: Longitude coordinate
country_code: ISO 3166-1 alpha-2 country code (default: NL)
Returns:
Dict with 'city', 'region', 'city_code', 'admin1_code' or None if not found
"""
db = get_geonames_db()
# Feature codes for proper settlements (cities, towns, villages):
# PPL = populated place (city/town/village)
# PPLA = seat of first-order admin division (provincial capital)
# PPLA2 = seat of second-order admin division
# PPLA3 = seat of third-order admin division
# PPLA4 = seat of fourth-order admin division
# PPLC = capital of a political entity (national capital)
# PPLS = populated places (multiple)
# PPLG = seat of government (when different from capital)
#
# EXCLUDED:
# PPLX = section of populated place (neighborhood, district, quarter)
# e.g., "Binnenstad" (city center), "Amsterdam Binnenstad"
VALID_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
# Query for nearest city/town/village using Euclidean distance approximation
# (Good enough for country-scale distances)
query = """
SELECT
name,
ascii_name,
admin1_code,
admin1_name,
latitude,
longitude,
geonames_id,
population,
feature_code,
((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq
FROM cities
WHERE country_code = ?
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY distance_sq
LIMIT 1
"""
import sqlite3
conn = sqlite3.connect(str(db.db_path))
cursor = conn.cursor()
try:
cursor.execute(query, (latitude, latitude, longitude, longitude, country_code, *VALID_FEATURE_CODES))
row = cursor.fetchone()
if row:
name, ascii_name, admin1_code, admin1_name, lat, lon, geonameid, population, feature_code, distance_sq = row
# Get city code using disambiguation
city_code = db.get_city_abbreviation(name, country_code, use_disambiguation=True)
if not city_code:
city_code = get_city_code(name)
# Map admin1 code to ISO 3166-2 (country-specific mapping)
country_admin1_map = GEONAMES_ADMIN1_TO_ISO.get(country_code, {})
region_code = country_admin1_map.get(admin1_code, admin1_code if admin1_code else "00")
return {
'city': name,
'ascii_name': ascii_name,
'region': admin1_name,
'region_code': region_code,
'city_code': city_code,
'admin1_code': admin1_code,
'geonames_id': geonameid,
'feature_code': feature_code,
'distance_km': (distance_sq ** 0.5) * 111, # Approximate km (1 degree ≈ 111km)
}
finally:
conn.close()
return None
# Institution type code mapping (from original entry 'type' field)
TYPE_CODE_MAP = {
"G": "G", # Gallery
"L": "L", # Library
"A": "A", # Archive
"M": "M", # Museum
"O": "O", # Official Institution
"R": "R", # Research Center
"C": "C", # Corporation
"U": "U", # Unknown
"B": "B", # Botanical/Zoo
"E": "E", # Education Provider
"S": "S", # Collecting Society
"P": "P", # Personal Collection
"F": "F", # Features (monuments, etc.)
"I": "I", # Intangible Heritage Group
"X": "X", # Mixed
"H": "H", # Holy Sites
"D": "D", # Digital Platform
"N": "N", # NGO
"T": "T", # Taste/Smell Heritage
}
def get_region_code(region_name: Optional[str]) -> str:
"""
Get ISO 3166-2 region code for a Dutch province.
Args:
region_name: Province/region name (Dutch or English)
Returns:
2-letter region code or "00" if not found
"""
if not region_name:
return "00"
# Normalize: lowercase, remove accents
normalized = unicodedata.normalize('NFD', region_name.lower())
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
normalized = normalized.strip()
return DUTCH_PROVINCE_CODES.get(normalized, "00")
def get_city_code(city_name: str) -> str:
"""
Generate 3-letter city code from city name.
Rules:
1. Single word: first 3 letters uppercase
2. City with article (de, het, den): first letter + first 2 of next word
3. Multi-word: first letter of each word (up to 3)
Args:
city_name: City name
Returns:
3-letter uppercase city code
"""
if not city_name:
return "XXX"
# Normalize: remove accents, handle special chars
normalized = normalize_city_name(city_name)
# Split into words
words = normalized.split()
if not words:
return "XXX"
# Dutch articles and prepositions
articles = {'de', 'het', 'den', "'s", 'op', 'aan', 'bij', 'ter'}
if len(words) == 1:
# Single word: take first 3 letters
code = words[0][:3].upper()
elif words[0].lower() in articles and len(words) > 1:
# City with article: first letter of article + first 2 of next word
code = (words[0][0] + words[1][:2]).upper()
else:
# Multi-word: take first letter of each word (up to 3)
code = ''.join(w[0] for w in words[:3]).upper()
# Ensure exactly 3 letters
if len(code) < 3:
code = code.ljust(3, 'X')
elif len(code) > 3:
code = code[:3]
# Ensure only A-Z characters
code = re.sub(r'[^A-Z]', 'X', code)
return code
def generate_name_suffix(institution_name: str) -> str:
"""
Generate snake_case name suffix from institution name.
Used for collision resolution. Converts native language name to
lowercase with underscores, removing diacritics and punctuation.
Args:
institution_name: Full institution name
Returns:
snake_case suffix (e.g., "historische_kring_zwolle")
"""
if not institution_name:
return "unknown"
# Normalize: NFD decomposition to remove accents
normalized = unicodedata.normalize('NFD', institution_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Convert to lowercase
lowercase = ascii_name.lower()
# Remove apostrophes, commas, and other punctuation
no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lowercase)
# Replace spaces and hyphens with underscores
underscored = re.sub(r'[\s\-/]+', '_', no_punct)
# Remove any remaining non-alphanumeric characters (except underscores)
clean = re.sub(r'[^a-z0-9_]', '', underscored)
# Collapse multiple underscores
final = re.sub(r'_+', '_', clean).strip('_')
# Truncate if too long (max 50 chars for name suffix)
if len(final) > 50:
final = final[:50].rstrip('_')
return final if final else "unknown"
def extract_entry_data(entry: dict) -> dict:
"""
Extract relevant data from an entry for GHCID generation.
Settlement Resolution Priority (Updated Dec 2025):
1. Google Maps locality (address_components with 'locality' type) - AUTHORITATIVE
- Avoids micro-hamlet problem where GeoNames finds tiny settlements
- Cross-referenced with GeoNames for geonames_id and city_code
2. GeoNames reverse geocoding (if no Google Maps locality)
- Uses MIN_POPULATION=100 threshold to skip micro-hamlets
3. GeoNames name lookup (if only text city available) - FALLBACK
4. Text-based city name (if GeoNames lookup fails) - LAST RESORT
The micro-hamlet problem: GeoNames may return tiny settlements like "Duur" (pop 0)
when the institution is clearly in "Olst" (pop 4,780) just because the coordinates
are slightly closer to the hamlet. Using Google Maps locality solves this.
Args:
entry: Entry dictionary from YAML
Returns:
Dict with: name, type_code, city, region, wikidata_id, geonames_id, location_resolution, country_code
"""
import re
# === STEP 0: DETERMINE COUNTRY CODE FIRST ===
# This is critical for correct GeoNames reverse geocoding!
country_code = "NL" # Default to Netherlands
# Check zcbs_enrichment.country (most explicit source)
if 'zcbs_enrichment' in entry and entry['zcbs_enrichment'].get('country'):
country_code = entry['zcbs_enrichment']['country']
# Check location.country
elif 'location' in entry and entry['location'].get('country'):
country_code = entry['location']['country']
# Check locations[].country
elif 'locations' in entry and entry['locations']:
loc = entry['locations'][0]
if loc.get('country'):
country_code = loc['country']
# Check original_entry for country indicators
elif 'original_entry' in entry:
# Check for explicit country field
if entry['original_entry'].get('country'):
country_code = entry['original_entry']['country']
# Check for country in address or name
elif entry['original_entry'].get('organisatie'):
org_name = entry['original_entry']['organisatie'].lower()
if 'belgium' in org_name or 'belgië' in org_name or 'belgique' in org_name:
country_code = "BE"
elif 'germany' in org_name or 'deutschland' in org_name:
country_code = "DE"
# Check google_maps_enrichment.address for country
if country_code == "NL" and 'google_maps_enrichment' in entry:
address = entry['google_maps_enrichment'].get('address', '')
if address:
if ', Belgium' in address or ', België' in address:
country_code = "BE"
elif ', Germany' in address or ', Deutschland' in address:
country_code = "DE"
# Check wikidata_enrichment for country/location hints
if country_code == "NL" and 'wikidata_enrichment' in entry:
wiki = entry['wikidata_enrichment']
# Check located_in label for country hints
located_in = wiki.get('located_in', {})
if isinstance(located_in, dict):
label = located_in.get('label', '').lower()
if 'belgium' in label or 'belgië' in label:
country_code = "BE"
elif 'germany' in label or 'deutschland' in label:
country_code = "DE"
# Get institution name
# Priority: custodian_name (verified) > original_entry > wikidata
name = None
# Try custodian_name first (XPath-verified from website or authoritative fallback)
if 'custodian_name' in entry and entry['custodian_name'].get('claim_value'):
name = entry['custodian_name']['claim_value']
# Fallback to original_entry.organisatie
if not name and 'original_entry' in entry:
name = entry['original_entry'].get('organisatie')
# Fallback to wikidata labels
if not name and 'wikidata_enrichment' in entry:
name = entry['wikidata_enrichment'].get('wikidata_label_nl')
if not name:
name = entry['wikidata_enrichment'].get('wikidata_label_en')
if not name:
name = "Unknown Institution"
# Get institution type
type_codes = []
# Check organization.institution_type first (enriched data)
if 'organization' in entry and 'institution_type' in entry['organization']:
org_type = entry['organization']['institution_type']
if isinstance(org_type, list):
type_codes = org_type
elif isinstance(org_type, str):
type_codes = [org_type]
# Fallback to original_entry.type
if not type_codes and 'original_entry' in entry and 'type' in entry['original_entry']:
types = entry['original_entry']['type']
if isinstance(types, list):
type_codes = types
elif isinstance(types, str):
type_codes = [types]
# Use first type, default to U (Unknown)
type_code = type_codes[0] if type_codes else 'U'
# === STEP 1: EXTRACT COORDINATES FROM ALL SOURCES ===
latitude = None
longitude = None
coord_source = None
# Try google_maps_enrichment first (most accurate coordinates)
if 'google_maps_enrichment' in entry:
gm = entry['google_maps_enrichment']
# Check nested 'coordinates' object first (new format)
if isinstance(gm.get('coordinates'), dict):
coords = gm['coordinates']
if coords.get('latitude') and coords.get('longitude'):
latitude = coords.get('latitude')
longitude = coords.get('longitude')
coord_source = 'google_maps'
# Fallback to flat structure (old format)
if latitude is None and gm.get('latitude') and gm.get('longitude'):
latitude = gm.get('latitude')
longitude = gm.get('longitude')
coord_source = 'google_maps'
# Try wikidata coordinates (multiple possible field names)
if latitude is None and 'wikidata_enrichment' in entry:
wiki = entry['wikidata_enrichment']
# Check 'wikidata_coordinates' field first
coords = wiki.get('wikidata_coordinates')
if isinstance(coords, dict) and coords.get('latitude') and coords.get('longitude'):
latitude = coords.get('latitude')
longitude = coords.get('longitude')
coord_source = 'wikidata'
# Also check 'coordinates' field (alternative format)
if latitude is None:
coords = wiki.get('coordinates')
if isinstance(coords, dict) and coords.get('latitude') and coords.get('longitude'):
latitude = coords.get('latitude')
longitude = coords.get('longitude')
coord_source = 'wikidata'
# Also check wikidata_claims for coordinates
if latitude is None:
claims = wiki.get('wikidata_claims', {})
coords = claims.get('coordinate_location') or claims.get('coordinates')
if isinstance(coords, dict) and coords.get('latitude') and coords.get('longitude'):
latitude = coords.get('latitude')
longitude = coords.get('longitude')
coord_source = 'wikidata_claims'
# Try locations[] array
if latitude is None and 'locations' in entry and entry['locations']:
loc = entry['locations'][0]
if loc.get('latitude') and loc.get('longitude'):
latitude = loc.get('latitude')
longitude = loc.get('longitude')
coord_source = 'locations'
# Try location{} object (singular) with nested coordinates
if latitude is None and 'location' in entry:
loc = entry['location']
# Check nested 'coordinates' object
if isinstance(loc.get('coordinates'), dict):
coords = loc['coordinates']
if coords.get('latitude') and coords.get('longitude'):
latitude = coords.get('latitude')
longitude = coords.get('longitude')
coord_source = 'location'
# Fallback to flat structure
elif loc.get('latitude') and loc.get('longitude'):
latitude = loc.get('latitude')
longitude = loc.get('longitude')
coord_source = 'location'
# === STEP 2: EXTRACT CITY FROM GOOGLE MAPS LOCALITY (AUTHORITATIVE) ===
# Google Maps address_components with 'locality' type is the most reliable city source
# This avoids the micro-hamlet problem where GeoNames finds tiny settlements near coords
city = None
region = None
geonames_id = None
location_resolution = None
google_maps_locality = None
google_maps_region = None
if 'google_maps_enrichment' in entry:
gm = entry['google_maps_enrichment']
for comp in gm.get('address_components', []):
types = comp.get('types', [])
if 'locality' in types:
google_maps_locality = comp.get('long_name')
elif 'administrative_area_level_1' in types:
# Extract region code from short_name (e.g., "OV" for Overijssel)
google_maps_region = comp.get('short_name')
# === STEP 3: USE GOOGLE MAPS LOCALITY OR FALL BACK TO GEONAMES ===
if google_maps_locality:
# Use Google Maps locality as the authoritative city name
city = google_maps_locality
region = google_maps_region
# Look up in GeoNames to get geonames_id and city_code
db = get_geonames_db()
try:
# Search for the city in GeoNames by name
import sqlite3
conn = sqlite3.connect(str(db.db_path))
cursor = conn.cursor()
VALID_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
cursor.execute("""
SELECT geonames_id, name, feature_code, population, admin1_code
FROM cities
WHERE country_code = ?
AND (name = ? OR ascii_name = ?)
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
ORDER BY population DESC
LIMIT 1
""", (country_code, google_maps_locality, google_maps_locality, *VALID_FEATURE_CODES))
row = cursor.fetchone()
if row:
geonames_id, geonames_name, feature_code, population, admin1_code = row
# Map admin1 code to ISO 3166-2 if we don't have region from Google Maps
if not region:
country_admin1_map = GEONAMES_ADMIN1_TO_ISO.get(country_code, {})
region = country_admin1_map.get(admin1_code, admin1_code if admin1_code else "00")
location_resolution = {
'method': 'GOOGLE_MAPS_LOCALITY',
'google_maps_locality': google_maps_locality,
'geonames_id': geonames_id,
'geonames_name': geonames_name,
'feature_code': feature_code,
'population': population,
'admin1_code': admin1_code,
'region_code': region,
'country_code': country_code,
'source_coordinates': {
'latitude': latitude,
'longitude': longitude,
'source': coord_source,
} if latitude and longitude else None,
}
else:
# GeoNames lookup failed, but we still have Google Maps locality
location_resolution = {
'method': 'GOOGLE_MAPS_LOCALITY',
'google_maps_locality': google_maps_locality,
'geonames_id': None,
'geonames_name': None,
'region_code': region,
'country_code': country_code,
'needs_geonames_entry': True,
'source_coordinates': {
'latitude': latitude,
'longitude': longitude,
'source': coord_source,
} if latitude and longitude else None,
}
conn.close()
except Exception as e:
# GeoNames lookup failed, but we still have Google Maps locality
location_resolution = {
'method': 'GOOGLE_MAPS_LOCALITY',
'google_maps_locality': google_maps_locality,
'geonames_id': None,
'geonames_name': None,
'region_code': region,
'country_code': country_code,
'error': str(e),
}
elif latitude is not None and longitude is not None:
# No Google Maps locality - fall back to GeoNames reverse geocoding
# with POPULATION THRESHOLD to avoid micro-hamlets
MIN_POPULATION = 100 # Skip settlements with population < 100
try:
geo_result = reverse_geocode_to_city(latitude, longitude, country_code)
if geo_result:
population = geo_result.get('population', 0) or 0
# If population is too low, try to find a larger nearby settlement
if population < MIN_POPULATION:
# Search for nearest settlement with population >= MIN_POPULATION
import sqlite3
conn = sqlite3.connect(str(get_geonames_db().db_path))
cursor = conn.cursor()
VALID_FEATURE_CODES = ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG')
cursor.execute("""
SELECT
name, ascii_name, admin1_code, admin1_name,
latitude, longitude, geonames_id, population, feature_code,
((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq
FROM cities
WHERE country_code = ?
AND feature_code IN (?, ?, ?, ?, ?, ?, ?, ?)
AND population >= ?
ORDER BY distance_sq
LIMIT 1
""", (latitude, latitude, longitude, longitude, country_code, *VALID_FEATURE_CODES, MIN_POPULATION))
row = cursor.fetchone()
conn.close()
if row:
name, ascii_name, admin1_code, admin1_name, lat, lon, geonameid, pop, fcode, dist_sq = row
# Get city code using disambiguation
db = get_geonames_db()
city_code = db.get_city_abbreviation(name, country_code, use_disambiguation=True)
if not city_code:
city_code = get_city_code(name)
country_admin1_map = GEONAMES_ADMIN1_TO_ISO.get(country_code, {})
region_code = country_admin1_map.get(admin1_code, admin1_code if admin1_code else "00")
geo_result = {
'city': name,
'ascii_name': ascii_name,
'region': admin1_name,
'region_code': region_code,
'city_code': city_code,
'admin1_code': admin1_code,
'geonames_id': geonameid,
'feature_code': fcode,
'population': pop,
'distance_km': (dist_sq ** 0.5) * 111,
'micro_hamlet_skipped': True,
}
city = geo_result.get('city')
region = geo_result.get('region_code') # ISO 3166-2 code
geonames_id = geo_result.get('geonames_id')
location_resolution = {
'method': 'REVERSE_GEOCODE',
'geonames_id': geonames_id,
'geonames_name': city,
'feature_code': geo_result.get('feature_code'),
'population': geo_result.get('population'),
'admin1_code': geo_result.get('admin1_code'),
'region_code': region,
'country_code': country_code,
'source_coordinates': {
'latitude': latitude,
'longitude': longitude,
'source': coord_source,
},
'distance_km': geo_result.get('distance_km'),
}
if geo_result.get('micro_hamlet_skipped'):
location_resolution['micro_hamlet_skipped'] = True
except Exception as e:
# Log but continue - will fall back to text-based resolution
pass
# === STEP 4: TEXT-BASED CITY EXTRACTION (LAST RESORT) ===
text_city = None
# Source 1: locations[] array (already enriched)
if 'locations' in entry and entry['locations']:
loc = entry['locations'][0]
text_city = loc.get('city')
if not region:
region = loc.get('region')
# Source 2: original_entry.plaatsnaam_bezoekadres (NDE CSV)
if not text_city and 'original_entry' in entry:
raw_city = entry['original_entry'].get('plaatsnaam_bezoekadres')
if raw_city:
# Handle formats like "Hoogeveen (en Zuidwolde)" - take first city
clean_city = re.sub(r'\s*\([^)]+\)', '', raw_city).strip()
if '/' in clean_city:
clean_city = clean_city.split('/')[0].strip()
if ' en ' in clean_city.lower():
clean_city = re.split(r'\s+en\s+', clean_city, flags=re.IGNORECASE)[0].strip()
text_city = clean_city if clean_city else raw_city
# Source 3: google_maps_enrichment address
if not text_city and 'google_maps_enrichment' in entry:
gm = entry['google_maps_enrichment']
address = gm.get('address', '')
if address:
parts = address.split(',')
if len(parts) >= 2:
last_part = parts[-1].strip()
city_match = re.sub(r'^\d{4}\s*[A-Z]{2}\s*', '', last_part)
if city_match:
text_city = city_match
if not text_city:
text_city = gm.get('city')
# Source 4: museum_register_enrichment.province (for region only)
if not region and 'museum_register_enrichment' in entry:
region = entry['museum_register_enrichment'].get('province')
# Source 5: wikidata_enrichment.wikidata_claims.location
if not text_city and 'wikidata_enrichment' in entry:
claims = entry['wikidata_enrichment'].get('wikidata_claims', {})
if 'location' in claims:
loc_data = claims['location']
if isinstance(loc_data, dict):
text_city = loc_data.get('label_en') or loc_data.get('label_nl')
# Source 6: wikidata description for city hint
if not text_city and 'wikidata_enrichment' in entry:
desc_nl = entry['wikidata_enrichment'].get('wikidata_description_nl', '')
city_match = re.search(r'in\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?),?\s*(?:Nederland|Netherlands)', desc_nl)
if city_match:
text_city = city_match.group(1)
# === STEP 4: USE TEXT CITY IF GEONAMES RESOLUTION FAILED ===
if not city and text_city:
city = text_city
location_resolution = {
'method': 'TEXT_FALLBACK',
'text_source': 'various',
'geonames_id': None,
'needs_review': True,
}
# Get Wikidata ID
wikidata_id = None
if 'wikidata_enrichment' in entry:
wikidata_id = entry['wikidata_enrichment'].get('wikidata_entity_id')
if not wikidata_id and 'original_entry' in entry:
wikidata_id = entry['original_entry'].get('wikidata_id')
return {
'name': name,
'type_code': TYPE_CODE_MAP.get(type_code, 'U'),
'city': city,
'region': region,
'country_code': country_code,
'wikidata_id': wikidata_id,
'geonames_id': geonames_id,
'location_resolution': location_resolution,
}
def generate_base_ghcid(data: dict) -> Tuple[str, GHCIDComponents]:
"""
Generate base GHCID (without name suffix) for an institution.
Args:
data: Dict with name, type_code, city, region, country_code
Returns:
Tuple of (base_ghcid_string, GHCIDComponents)
"""
# Get country code from data, default to NL
country_code = data.get('country_code', 'NL')
# Get region code - handle both ISO codes (FL, NH, VLG) and province names
region = data.get('region')
if region:
# If it's already a 2-3 letter uppercase code, use it directly
# (NL uses 2-letter, BE uses 3-letter region codes)
if len(region) in (2, 3) and region.isupper():
region_code = region
else:
region_code = get_region_code(region)
else:
region_code = "00"
# Get city code
city_code = get_city_code(data['city']) if data['city'] else "XXX"
# Get abbreviation from name
abbreviation = extract_abbreviation_from_name(data['name'])
if not abbreviation:
abbreviation = "INST"
# Create components (without Wikidata QID - we'll use name suffix for collisions)
components = GHCIDComponents(
country_code=country_code,
region_code=region_code,
city_locode=city_code,
institution_type=data['type_code'],
abbreviation=abbreviation,
wikidata_qid=None, # Don't use QID for collision resolution
)
return components.to_string(), components
def process_entries(entries_dir: Path, dry_run: bool = False) -> dict:
"""
Process all entry files and generate GHCIDs.
Args:
entries_dir: Path to entries directory
dry_run: If True, don't write changes
Returns:
Statistics dictionary
"""
stats = {
'total': 0,
'success': 0,
'skipped_no_location': 0,
'skipped_not_custodian': 0,
'collisions': 0,
'collision_groups': 0,
'files_updated': 0,
'google_maps_locality': 0, # Entries resolved via Google Maps locality (best)
'geonames_resolved': 0, # Entries resolved via GeoNames reverse geocoding
'text_fallback': 0, # Entries using text-based city (needs review)
'errors': [],
}
# Timestamp for this batch
generation_timestamp = datetime.now(timezone.utc).isoformat()
# Phase 1: Load all entries and generate base GHCIDs
print("Phase 1: Loading entries and generating base GHCIDs...")
entries_data = [] # List of (filepath, entry, extracted_data, base_ghcid, components)
yaml_files = sorted(entries_dir.glob("*.yaml"))
stats['total'] = len(yaml_files)
for filepath in yaml_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
continue
# Check if NOT_CUSTODIAN (skip these)
if entry.get('google_maps_status') == 'NOT_CUSTODIAN':
stats['skipped_not_custodian'] += 1
continue
# Extract data
data = extract_entry_data(entry)
# Check if we have location data
if not data['city']:
stats['skipped_no_location'] += 1
continue
# Track resolution method
loc_resolution = data.get('location_resolution', {})
method = loc_resolution.get('method', '')
if method == 'GOOGLE_MAPS_LOCALITY':
stats['google_maps_locality'] += 1
elif method == 'REVERSE_GEOCODE':
stats['geonames_resolved'] += 1
elif method == 'TEXT_FALLBACK':
stats['text_fallback'] += 1
# Generate base GHCID
base_ghcid, components = generate_base_ghcid(data)
entries_data.append({
'filepath': filepath,
'entry': entry,
'data': data,
'base_ghcid': base_ghcid,
'components': components,
})
except Exception as e:
stats['errors'].append(f"{filepath.name}: {str(e)}")
print(f" Loaded {len(entries_data)} entries with location data")
print(f" - Google Maps locality (best): {stats['google_maps_locality']}")
print(f" - GeoNames reverse geocode: {stats['geonames_resolved']}")
print(f" - Text fallback (needs review): {stats['text_fallback']}")
print(f" Skipped {stats['skipped_no_location']} entries without city")
print(f" Skipped {stats['skipped_not_custodian']} NOT_CUSTODIAN entries")
# Phase 2: Detect collisions
print("\nPhase 2: Detecting GHCID collisions...")
collision_groups = defaultdict(list)
for ed in entries_data:
collision_groups[ed['base_ghcid']].append(ed)
# Count collisions
for base_ghcid, group in collision_groups.items():
if len(group) > 1:
stats['collision_groups'] += 1
stats['collisions'] += len(group)
print(f" Found {stats['collision_groups']} collision groups ({stats['collisions']} entries)")
# Phase 3: Resolve collisions and generate final GHCIDs
print("\nPhase 3: Resolving collisions and generating final GHCIDs...")
collision_report = []
for base_ghcid, group in collision_groups.items():
if len(group) > 1:
# COLLISION: Apply First Batch rule - ALL get name suffixes
collision_report.append({
'base_ghcid': base_ghcid,
'count': len(group),
'institutions': [ed['data']['name'] for ed in group],
})
for ed in group:
# Generate name suffix
name_suffix = generate_name_suffix(ed['data']['name'])
ed['final_ghcid'] = f"{base_ghcid}-{name_suffix}"
ed['had_collision'] = True
else:
# No collision: use base GHCID
ed = group[0]
ed['final_ghcid'] = base_ghcid
ed['had_collision'] = False
# Phase 4: Generate all identifier formats and update entries
print("\nPhase 4: Generating identifier formats and updating entries...")
for ed in entries_data:
final_ghcid = ed['final_ghcid']
# Create final components with the resolved GHCID string
# We need to parse it back or generate UUIDs directly
# For simplicity, hash the final GHCID string directly
import hashlib
import uuid
# GHCID UUID v5 Namespace
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
# Generate UUID v5 (SHA-1)
ghcid_uuid = uuid.uuid5(GHCID_NAMESPACE, final_ghcid)
# Generate UUID v8 (SHA-256)
hash_bytes = hashlib.sha256(final_ghcid.encode('utf-8')).digest()
uuid_bytes = bytearray(hash_bytes[:16])
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x80 # Version 8
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80 # Variant RFC 4122
ghcid_uuid_sha256 = uuid.UUID(bytes=bytes(uuid_bytes))
# Generate numeric (64-bit)
ghcid_numeric = int.from_bytes(hash_bytes[:8], byteorder='big', signed=False)
# Generate record ID (UUID v7 - time-ordered, non-deterministic)
record_id = GHCIDComponents.generate_uuid_v7()
# Create GHCID block for entry
ghcid_block = {
'ghcid_current': final_ghcid,
'ghcid_original': final_ghcid, # Same for first assignment
'ghcid_uuid': str(ghcid_uuid),
'ghcid_uuid_sha256': str(ghcid_uuid_sha256),
'ghcid_numeric': ghcid_numeric,
'record_id': str(record_id),
'generation_timestamp': generation_timestamp,
'ghcid_history': [
{
'ghcid': final_ghcid,
'ghcid_numeric': ghcid_numeric,
'valid_from': generation_timestamp,
'valid_to': None,
'reason': 'Initial GHCID assignment (NDE batch import December 2025)'
+ (' - name suffix added to resolve collision' if ed.get('had_collision') else ''),
}
],
}
# Add location resolution metadata (GeoNames provenance)
if ed['data'].get('location_resolution'):
ghcid_block['location_resolution'] = ed['data']['location_resolution']
# Add GeoNames ID if available
if ed['data'].get('geonames_id'):
ghcid_block['geonames_id'] = ed['data']['geonames_id']
# Add collision info if applicable
if ed.get('had_collision'):
ghcid_block['collision_resolved'] = True
ghcid_block['base_ghcid_before_collision'] = ed['base_ghcid']
# Update entry
entry = ed['entry']
entry['ghcid'] = ghcid_block
# Also add to identifiers list
if 'identifiers' not in entry:
entry['identifiers'] = []
# Remove any existing GHCID identifiers
entry['identifiers'] = [
i for i in entry['identifiers']
if i.get('identifier_scheme') not in ['GHCID', 'GHCID_NUMERIC', 'GHCID_UUID', 'GHCID_UUID_SHA256', 'RECORD_ID']
]
# Add new GHCID identifiers
entry['identifiers'].extend([
{
'identifier_scheme': 'GHCID',
'identifier_value': final_ghcid,
},
{
'identifier_scheme': 'GHCID_UUID',
'identifier_value': str(ghcid_uuid),
'identifier_url': f'urn:uuid:{ghcid_uuid}',
},
{
'identifier_scheme': 'GHCID_UUID_SHA256',
'identifier_value': str(ghcid_uuid_sha256),
'identifier_url': f'urn:uuid:{ghcid_uuid_sha256}',
},
{
'identifier_scheme': 'GHCID_NUMERIC',
'identifier_value': str(ghcid_numeric),
},
{
'identifier_scheme': 'RECORD_ID',
'identifier_value': str(record_id),
'identifier_url': f'urn:uuid:{record_id}',
},
])
ed['entry'] = entry
stats['success'] += 1
# Phase 5: Write updated entries
if not dry_run:
print("\nPhase 5: Writing updated entry files...")
for ed in entries_data:
filepath = ed['filepath']
entry = ed['entry']
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
stats['files_updated'] += 1
except Exception as e:
stats['errors'].append(f"Write error {filepath.name}: {str(e)}")
print(f" Updated {stats['files_updated']} files")
else:
print("\nPhase 5: DRY RUN - no files written")
# Phase 6: Generate collision report
print("\nPhase 6: Generating collision report...")
if collision_report:
report_path = entries_dir.parent / "ghcid_collision_report.json"
report = {
'generation_timestamp': generation_timestamp,
'total_entries': stats['total'],
'entries_with_ghcid': stats['success'],
'collision_groups': stats['collision_groups'],
'entries_with_collisions': stats['collisions'],
'collision_resolution_strategy': 'first_batch_all_get_name_suffix',
'collisions': collision_report,
}
if not dry_run:
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f" Collision report written to: {report_path}")
else:
print(f" Would write collision report to: {report_path}")
return stats
def main():
"""Main execution."""
parser = argparse.ArgumentParser(description="Enrich NDE entries with GHCID identifiers")
parser.add_argument('--dry-run', action='store_true', help="Preview changes without writing")
args = parser.parse_args()
# Paths
project_root = Path(__file__).parent.parent
entries_dir = project_root / "data" / "nde" / "enriched" / "entries"
print("="*70)
print("NDE HERITAGE INSTITUTION GHCID ENRICHMENT")
print("="*70)
print(f"Entries directory: {entries_dir}")
print(f"Dry run: {args.dry_run}")
print()
if not entries_dir.exists():
print(f"ERROR: Entries directory not found: {entries_dir}")
sys.exit(1)
# Process entries
stats = process_entries(entries_dir, dry_run=args.dry_run)
# Print summary
print()
print("="*70)
print("GHCID ENRICHMENT SUMMARY")
print("="*70)
print(f"Total entry files: {stats['total']}")
print(f"Entries with GHCID generated: {stats['success']}")
print(f" - Google Maps locality: {stats['google_maps_locality']}")
print(f" - GeoNames reverse geocode: {stats['geonames_resolved']}")
print(f" - Text fallback (review): {stats['text_fallback']}")
print(f"Skipped (no city): {stats['skipped_no_location']}")
print(f"Skipped (NOT_CUSTODIAN): {stats['skipped_not_custodian']}")
print(f"Collision groups: {stats['collision_groups']}")
print(f"Entries with collisions: {stats['collisions']}")
print(f"Files updated: {stats['files_updated']}")
if stats['errors']:
print(f"\nErrors ({len(stats['errors'])}):")
for err in stats['errors'][:10]:
print(f" - {err}")
if len(stats['errors']) > 10:
print(f" ... and {len(stats['errors']) - 10} more")
print()
print("="*70)
if args.dry_run:
print("DRY RUN COMPLETE - No files were modified")
else:
print("GHCID ENRICHMENT COMPLETE")
print("="*70)
if __name__ == "__main__":
main()