glam/scripts/enrich_nde_entries_ghcid.py
2025-12-01 00:37:24 +01:00

689 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Enrich NDE Heritage Institution Entries with GHCID Persistent Identifiers.
This script:
1. Loads all YAML files from data/nde/enriched/entries/
2. Extracts location data (city, region, coordinates)
3. Generates base GHCIDs using NL-REGION-CITY-TYPE-ABBREV format
4. Detects collisions and applies First Batch rule (all get name suffixes)
5. Generates all 4 identifier formats:
- Human-readable GHCID string
- UUID v5 (SHA-1, RFC 4122 compliant) - PRIMARY
- UUID v8 (SHA-256, SOTA cryptographic strength) - Future-proof
- Numeric (64-bit integer for database PKs)
6. Adds GHCID fields to each entry
7. Generates collision statistics report
## GHCID Format
Base: NL-{Region}-{City}-{Type}-{Abbreviation}
With collision suffix: NL-{Region}-{City}-{Type}-{Abbreviation}-{name_suffix}
## Collision Resolution (First Batch Rule)
Since this is a batch import (all entries processed together), when multiple
institutions generate the same base GHCID:
- ALL colliding institutions receive native language name suffixes
- Name suffix: snake_case of institution name
Example:
- Two societies with NL-OV-ZWO-S-HK both become:
- NL-OV-ZWO-S-HK-historische_kring_zwolle
- NL-OV-ZWO-S-HK-heemkundige_kring_zwolle
Usage:
python scripts/enrich_nde_entries_ghcid.py [--dry-run]
Options:
--dry-run Preview changes without writing to files
"""
import argparse
import json
import re
import sys
import unicodedata
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import yaml
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from glam_extractor.identifiers.ghcid import (
GHCIDComponents,
GHCIDGenerator,
InstitutionType,
extract_abbreviation_from_name,
normalize_city_name,
)
# Dutch province to ISO 3166-2 code mapping
DUTCH_PROVINCE_CODES = {
# Standard names
"drenthe": "DR",
"flevoland": "FL",
"friesland": "FR",
"fryslan": "FR",
"fryslân": "FR",
"gelderland": "GE",
"groningen": "GR",
"limburg": "LI",
"noord-brabant": "NB",
"north brabant": "NB",
"noord brabant": "NB",
"noord-holland": "NH",
"north holland": "NH",
"noord holland": "NH",
"overijssel": "OV",
"utrecht": "UT",
"zeeland": "ZE",
"zuid-holland": "ZH",
"south holland": "ZH",
"zuid holland": "ZH",
}
# Institution type code mapping (from original entry 'type' field)
TYPE_CODE_MAP = {
"G": "G", # Gallery
"L": "L", # Library
"A": "A", # Archive
"M": "M", # Museum
"O": "O", # Official Institution
"R": "R", # Research Center
"C": "C", # Corporation
"U": "U", # Unknown
"B": "B", # Botanical/Zoo
"E": "E", # Education Provider
"S": "S", # Collecting Society
"P": "P", # Personal Collection
"F": "F", # Features (monuments, etc.)
"I": "I", # Intangible Heritage Group
"X": "X", # Mixed
"H": "H", # Holy Sites
"D": "D", # Digital Platform
"N": "N", # NGO
"T": "T", # Taste/Smell Heritage
}
def get_region_code(region_name: Optional[str]) -> str:
"""
Get ISO 3166-2 region code for a Dutch province.
Args:
region_name: Province/region name (Dutch or English)
Returns:
2-letter region code or "00" if not found
"""
if not region_name:
return "00"
# Normalize: lowercase, remove accents
normalized = unicodedata.normalize('NFD', region_name.lower())
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
normalized = normalized.strip()
return DUTCH_PROVINCE_CODES.get(normalized, "00")
def get_city_code(city_name: str) -> str:
"""
Generate 3-letter city code from city name.
Rules:
1. Single word: first 3 letters uppercase
2. City with article (de, het, den): first letter + first 2 of next word
3. Multi-word: first letter of each word (up to 3)
Args:
city_name: City name
Returns:
3-letter uppercase city code
"""
if not city_name:
return "XXX"
# Normalize: remove accents, handle special chars
normalized = normalize_city_name(city_name)
# Split into words
words = normalized.split()
if not words:
return "XXX"
# Dutch articles and prepositions
articles = {'de', 'het', 'den', "'s", 'op', 'aan', 'bij', 'ter'}
if len(words) == 1:
# Single word: take first 3 letters
code = words[0][:3].upper()
elif words[0].lower() in articles and len(words) > 1:
# City with article: first letter of article + first 2 of next word
code = (words[0][0] + words[1][:2]).upper()
else:
# Multi-word: take first letter of each word (up to 3)
code = ''.join(w[0] for w in words[:3]).upper()
# Ensure exactly 3 letters
if len(code) < 3:
code = code.ljust(3, 'X')
elif len(code) > 3:
code = code[:3]
# Ensure only A-Z characters
code = re.sub(r'[^A-Z]', 'X', code)
return code
def generate_name_suffix(institution_name: str) -> str:
"""
Generate snake_case name suffix from institution name.
Used for collision resolution. Converts native language name to
lowercase with underscores, removing diacritics and punctuation.
Args:
institution_name: Full institution name
Returns:
snake_case suffix (e.g., "historische_kring_zwolle")
"""
if not institution_name:
return "unknown"
# Normalize: NFD decomposition to remove accents
normalized = unicodedata.normalize('NFD', institution_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Convert to lowercase
lowercase = ascii_name.lower()
# Remove apostrophes, commas, and other punctuation
no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lowercase)
# Replace spaces and hyphens with underscores
underscored = re.sub(r'[\s\-/]+', '_', no_punct)
# Remove any remaining non-alphanumeric characters (except underscores)
clean = re.sub(r'[^a-z0-9_]', '', underscored)
# Collapse multiple underscores
final = re.sub(r'_+', '_', clean).strip('_')
# Truncate if too long (max 50 chars for name suffix)
if len(final) > 50:
final = final[:50].rstrip('_')
return final if final else "unknown"
def extract_entry_data(entry: dict) -> dict:
"""
Extract relevant data from an entry for GHCID generation.
Looks in multiple sources for location data:
1. locations[] array (if already enriched)
2. original_entry.plaatsnaam_bezoekadres (NDE CSV city field)
3. google_maps_enrichment.address / city
4. museum_register_enrichment.province
5. wikidata_enrichment.wikidata_claims.location
Args:
entry: Entry dictionary from YAML
Returns:
Dict with: name, type_code, city, region, wikidata_id
"""
# Get institution name
name = None
if 'original_entry' in entry:
name = entry['original_entry'].get('organisatie')
if not name and 'wikidata_enrichment' in entry:
name = entry['wikidata_enrichment'].get('wikidata_label_nl')
if not name:
name = entry['wikidata_enrichment'].get('wikidata_label_en')
if not name:
name = "Unknown Institution"
# Get institution type
type_codes = []
if 'original_entry' in entry and 'type' in entry['original_entry']:
types = entry['original_entry']['type']
if isinstance(types, list):
type_codes = types
elif isinstance(types, str):
type_codes = [types]
# Use first type, default to U (Unknown)
type_code = type_codes[0] if type_codes else 'U'
# Get location - try multiple sources
city = None
region = None
# Source 1: locations[] array (already enriched)
if 'locations' in entry and entry['locations']:
loc = entry['locations'][0]
city = loc.get('city')
region = loc.get('region')
# Source 2: original_entry.plaatsnaam_bezoekadres (NDE CSV)
if not city and 'original_entry' in entry:
city = entry['original_entry'].get('plaatsnaam_bezoekadres')
# Source 3: google_maps_enrichment
if not city and 'google_maps_enrichment' in entry:
gm = entry['google_maps_enrichment']
# Try to extract city from address
address = gm.get('address', '')
if address:
# Dutch addresses: "Street Nr, Postcode City"
# Try to extract city from last part
parts = address.split(',')
if len(parts) >= 2:
last_part = parts[-1].strip()
# Remove postcode (4 digits + 2 letters)
import re
city_match = re.sub(r'^\d{4}\s*[A-Z]{2}\s*', '', last_part)
if city_match:
city = city_match
# Also try 'city' field if present
if not city:
city = gm.get('city')
# Source 4: museum_register_enrichment.province (for region)
if not region and 'museum_register_enrichment' in entry:
region = entry['museum_register_enrichment'].get('province')
# Source 5: wikidata_enrichment.wikidata_claims.location
if not city and 'wikidata_enrichment' in entry:
claims = entry['wikidata_enrichment'].get('wikidata_claims', {})
if 'location' in claims:
loc_data = claims['location']
if isinstance(loc_data, dict):
city = loc_data.get('label_en') or loc_data.get('label_nl')
# Source 6: Try wikidata description for city hint
if not city and 'wikidata_enrichment' in entry:
desc_nl = entry['wikidata_enrichment'].get('wikidata_description_nl', '')
# Try to extract city from "museum in [City], Nederland"
import re
city_match = re.search(r'in\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?),?\s*(?:Nederland|Netherlands)', desc_nl)
if city_match:
city = city_match.group(1)
# Get Wikidata ID
wikidata_id = None
if 'wikidata_enrichment' in entry:
wikidata_id = entry['wikidata_enrichment'].get('wikidata_entity_id')
if not wikidata_id and 'original_entry' in entry:
wikidata_id = entry['original_entry'].get('wikidata_id')
return {
'name': name,
'type_code': TYPE_CODE_MAP.get(type_code, 'U'),
'city': city,
'region': region,
'wikidata_id': wikidata_id,
}
def generate_base_ghcid(data: dict) -> Tuple[str, GHCIDComponents]:
"""
Generate base GHCID (without name suffix) for an institution.
Args:
data: Dict with name, type_code, city, region
Returns:
Tuple of (base_ghcid_string, GHCIDComponents)
"""
# Get region code
region_code = get_region_code(data['region'])
# Get city code
city_code = get_city_code(data['city']) if data['city'] else "XXX"
# Get abbreviation from name
abbreviation = extract_abbreviation_from_name(data['name'])
if not abbreviation:
abbreviation = "INST"
# Create components (without Wikidata QID - we'll use name suffix for collisions)
components = GHCIDComponents(
country_code="NL",
region_code=region_code,
city_locode=city_code,
institution_type=data['type_code'],
abbreviation=abbreviation,
wikidata_qid=None, # Don't use QID for collision resolution
)
return components.to_string(), components
def process_entries(entries_dir: Path, dry_run: bool = False) -> dict:
"""
Process all entry files and generate GHCIDs.
Args:
entries_dir: Path to entries directory
dry_run: If True, don't write changes
Returns:
Statistics dictionary
"""
stats = {
'total': 0,
'success': 0,
'skipped_no_location': 0,
'skipped_not_custodian': 0,
'collisions': 0,
'collision_groups': 0,
'files_updated': 0,
'errors': [],
}
# Timestamp for this batch
generation_timestamp = datetime.now(timezone.utc).isoformat()
# Phase 1: Load all entries and generate base GHCIDs
print("Phase 1: Loading entries and generating base GHCIDs...")
entries_data = [] # List of (filepath, entry, extracted_data, base_ghcid, components)
yaml_files = sorted(entries_dir.glob("*.yaml"))
stats['total'] = len(yaml_files)
for filepath in yaml_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
continue
# Check if NOT_CUSTODIAN (skip these)
if entry.get('google_maps_status') == 'NOT_CUSTODIAN':
stats['skipped_not_custodian'] += 1
continue
# Extract data
data = extract_entry_data(entry)
# Check if we have location data
if not data['city']:
stats['skipped_no_location'] += 1
continue
# Generate base GHCID
base_ghcid, components = generate_base_ghcid(data)
entries_data.append({
'filepath': filepath,
'entry': entry,
'data': data,
'base_ghcid': base_ghcid,
'components': components,
})
except Exception as e:
stats['errors'].append(f"{filepath.name}: {str(e)}")
print(f" Loaded {len(entries_data)} entries with location data")
print(f" Skipped {stats['skipped_no_location']} entries without city")
print(f" Skipped {stats['skipped_not_custodian']} NOT_CUSTODIAN entries")
# Phase 2: Detect collisions
print("\nPhase 2: Detecting GHCID collisions...")
collision_groups = defaultdict(list)
for ed in entries_data:
collision_groups[ed['base_ghcid']].append(ed)
# Count collisions
for base_ghcid, group in collision_groups.items():
if len(group) > 1:
stats['collision_groups'] += 1
stats['collisions'] += len(group)
print(f" Found {stats['collision_groups']} collision groups ({stats['collisions']} entries)")
# Phase 3: Resolve collisions and generate final GHCIDs
print("\nPhase 3: Resolving collisions and generating final GHCIDs...")
collision_report = []
for base_ghcid, group in collision_groups.items():
if len(group) > 1:
# COLLISION: Apply First Batch rule - ALL get name suffixes
collision_report.append({
'base_ghcid': base_ghcid,
'count': len(group),
'institutions': [ed['data']['name'] for ed in group],
})
for ed in group:
# Generate name suffix
name_suffix = generate_name_suffix(ed['data']['name'])
ed['final_ghcid'] = f"{base_ghcid}-{name_suffix}"
ed['had_collision'] = True
else:
# No collision: use base GHCID
ed = group[0]
ed['final_ghcid'] = base_ghcid
ed['had_collision'] = False
# Phase 4: Generate all identifier formats and update entries
print("\nPhase 4: Generating identifier formats and updating entries...")
for ed in entries_data:
final_ghcid = ed['final_ghcid']
# Create final components with the resolved GHCID string
# We need to parse it back or generate UUIDs directly
# For simplicity, hash the final GHCID string directly
import hashlib
import uuid
# GHCID UUID v5 Namespace
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
# Generate UUID v5 (SHA-1)
ghcid_uuid = uuid.uuid5(GHCID_NAMESPACE, final_ghcid)
# Generate UUID v8 (SHA-256)
hash_bytes = hashlib.sha256(final_ghcid.encode('utf-8')).digest()
uuid_bytes = bytearray(hash_bytes[:16])
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x80 # Version 8
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80 # Variant RFC 4122
ghcid_uuid_sha256 = uuid.UUID(bytes=bytes(uuid_bytes))
# Generate numeric (64-bit)
ghcid_numeric = int.from_bytes(hash_bytes[:8], byteorder='big', signed=False)
# Generate record ID (UUID v7 - time-ordered, non-deterministic)
record_id = GHCIDComponents.generate_uuid_v7()
# Create GHCID block for entry
ghcid_block = {
'ghcid_current': final_ghcid,
'ghcid_original': final_ghcid, # Same for first assignment
'ghcid_uuid': str(ghcid_uuid),
'ghcid_uuid_sha256': str(ghcid_uuid_sha256),
'ghcid_numeric': ghcid_numeric,
'record_id': str(record_id),
'generation_timestamp': generation_timestamp,
'ghcid_history': [
{
'ghcid': final_ghcid,
'ghcid_numeric': ghcid_numeric,
'valid_from': generation_timestamp,
'valid_to': None,
'reason': 'Initial GHCID assignment (NDE batch import December 2025)'
+ (' - name suffix added to resolve collision' if ed.get('had_collision') else ''),
}
],
}
# Add collision info if applicable
if ed.get('had_collision'):
ghcid_block['collision_resolved'] = True
ghcid_block['base_ghcid_before_collision'] = ed['base_ghcid']
# Update entry
entry = ed['entry']
entry['ghcid'] = ghcid_block
# Also add to identifiers list
if 'identifiers' not in entry:
entry['identifiers'] = []
# Remove any existing GHCID identifiers
entry['identifiers'] = [
i for i in entry['identifiers']
if i.get('identifier_scheme') not in ['GHCID', 'GHCID_NUMERIC', 'GHCID_UUID', 'GHCID_UUID_SHA256', 'RECORD_ID']
]
# Add new GHCID identifiers
entry['identifiers'].extend([
{
'identifier_scheme': 'GHCID',
'identifier_value': final_ghcid,
},
{
'identifier_scheme': 'GHCID_UUID',
'identifier_value': str(ghcid_uuid),
'identifier_url': f'urn:uuid:{ghcid_uuid}',
},
{
'identifier_scheme': 'GHCID_UUID_SHA256',
'identifier_value': str(ghcid_uuid_sha256),
'identifier_url': f'urn:uuid:{ghcid_uuid_sha256}',
},
{
'identifier_scheme': 'GHCID_NUMERIC',
'identifier_value': str(ghcid_numeric),
},
{
'identifier_scheme': 'RECORD_ID',
'identifier_value': str(record_id),
'identifier_url': f'urn:uuid:{record_id}',
},
])
ed['entry'] = entry
stats['success'] += 1
# Phase 5: Write updated entries
if not dry_run:
print("\nPhase 5: Writing updated entry files...")
for ed in entries_data:
filepath = ed['filepath']
entry = ed['entry']
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
stats['files_updated'] += 1
except Exception as e:
stats['errors'].append(f"Write error {filepath.name}: {str(e)}")
print(f" Updated {stats['files_updated']} files")
else:
print("\nPhase 5: DRY RUN - no files written")
# Phase 6: Generate collision report
print("\nPhase 6: Generating collision report...")
if collision_report:
report_path = entries_dir.parent / "ghcid_collision_report.json"
report = {
'generation_timestamp': generation_timestamp,
'total_entries': stats['total'],
'entries_with_ghcid': stats['success'],
'collision_groups': stats['collision_groups'],
'entries_with_collisions': stats['collisions'],
'collision_resolution_strategy': 'first_batch_all_get_name_suffix',
'collisions': collision_report,
}
if not dry_run:
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f" Collision report written to: {report_path}")
else:
print(f" Would write collision report to: {report_path}")
return stats
def main():
"""Main execution."""
parser = argparse.ArgumentParser(description="Enrich NDE entries with GHCID identifiers")
parser.add_argument('--dry-run', action='store_true', help="Preview changes without writing")
args = parser.parse_args()
# Paths
project_root = Path(__file__).parent.parent
entries_dir = project_root / "data" / "nde" / "enriched" / "entries"
print("="*70)
print("NDE HERITAGE INSTITUTION GHCID ENRICHMENT")
print("="*70)
print(f"Entries directory: {entries_dir}")
print(f"Dry run: {args.dry_run}")
print()
if not entries_dir.exists():
print(f"ERROR: Entries directory not found: {entries_dir}")
sys.exit(1)
# Process entries
stats = process_entries(entries_dir, dry_run=args.dry_run)
# Print summary
print()
print("="*70)
print("GHCID ENRICHMENT SUMMARY")
print("="*70)
print(f"Total entry files: {stats['total']}")
print(f"Entries with GHCID generated: {stats['success']}")
print(f"Skipped (no city): {stats['skipped_no_location']}")
print(f"Skipped (NOT_CUSTODIAN): {stats['skipped_not_custodian']}")
print(f"Collision groups: {stats['collision_groups']}")
print(f"Entries with collisions: {stats['collisions']}")
print(f"Files updated: {stats['files_updated']}")
if stats['errors']:
print(f"\nErrors ({len(stats['errors'])}):")
for err in stats['errors'][:10]:
print(f" - {err}")
if len(stats['errors']) > 10:
print(f" ... and {len(stats['errors']) - 10} more")
print()
print("="*70)
if args.dry_run:
print("DRY RUN COMPLETE - No files were modified")
else:
print("GHCID ENRICHMENT COMPLETE")
print("="*70)
if __name__ == "__main__":
main()