glam/scripts/generate_ghcids_latin_america.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

494 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Generate GHCIDs for Latin American GLAM institutions.
This script:
1. Loads the authoritative Latin American institutions YAML file
2. Maps region names to ISO 3166-2 codes (BR, CL, MX)
3. Looks up city codes from GeoNames database
4. Generates GHCID identifiers for each institution
5. Updates the YAML file with GHCID fields
6. Validates uniqueness and format
Usage:
python scripts/generate_ghcids_latin_america.py
"""
import json
import sys
import yaml
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Set
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from glam_extractor.identifiers.ghcid import (
GHCIDGenerator,
GHCIDComponents,
InstitutionType,
extract_abbreviation_from_name,
)
from glam_extractor.geocoding.geonames_lookup import GeoNamesDB
class RegionMapper:
"""Maps full region names to ISO 3166-2 subdivision codes."""
def __init__(self):
"""Load ISO 3166-2 mappings from reference data."""
self.reference_dir = Path(__file__).parent.parent / "data" / "reference"
# Load mappings for each country
self.br_mapping = self._load_mapping("iso_3166_2_br.json", reverse=True)
self.cl_mapping = self._load_mapping("iso_3166_2_cl.json", reverse=True)
self.mx_mapping = self._load_mapping("iso_3166_2_mx.json", reverse=True)
self.us_mapping = self._load_mapping("iso_3166_2_us.json", reverse=True)
self.ar_mapping = self._load_mapping("iso_3166_2_ar.json", reverse=True)
def _load_mapping(self, filename: str, reverse: bool = False) -> Dict[str, str]:
"""
Load ISO 3166-2 mapping from JSON file.
Args:
filename: JSON file in data/reference/
reverse: If True, create name->code mapping (default is code->name)
"""
filepath = self.reference_dir / filename
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
if reverse:
# Create normalized name -> code mapping
mapping = {}
for code, name in data.items():
# Normalize: uppercase, strip accents for lookup
normalized_name = self._normalize_region_name(name)
mapping[normalized_name] = code
return mapping
return data
@staticmethod
def _normalize_region_name(name: str) -> str:
"""
Normalize region name for lookup.
- Uppercase
- Remove accents (é->E, ã->A, ñ->N, etc.)
- Strip whitespace
"""
import unicodedata
# Uppercase
normalized = name.upper()
# Remove accents (NFD decomposition)
normalized = unicodedata.normalize('NFD', normalized)
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Strip whitespace
normalized = normalized.strip()
return normalized
def get_region_code(self, region_name: str, country_code: str) -> Optional[str]:
"""
Get ISO 3166-2 subdivision code for a region.
Args:
region_name: Full region name (e.g., "São Paulo", "Valparaíso")
country_code: ISO 3166-1 country code (e.g., "BR", "CL", "MX")
Returns:
ISO 3166-2 subdivision code (e.g., "SP", "VS", "CMX")
Returns "00" if region not found (national-level fallback)
"""
normalized = self._normalize_region_name(region_name)
mapping = None
if country_code == "BR":
mapping = self.br_mapping
elif country_code == "CL":
mapping = self.cl_mapping
elif country_code == "MX":
mapping = self.mx_mapping
elif country_code == "US":
mapping = self.us_mapping
elif country_code == "AR":
mapping = self.ar_mapping
if mapping and normalized in mapping:
return mapping[normalized]
# Fallback: return "00" for national-level
return "00"
class LatinAmericaGHCIDGenerator:
"""Generate GHCIDs for Latin American institutions."""
def __init__(self):
"""Initialize generator with dependencies."""
self.ghcid_gen = GHCIDGenerator()
self.region_mapper = RegionMapper()
self.geonames_db = GeoNamesDB()
# Statistics
self.stats = {
'total_institutions': 0,
'ghcids_generated': 0,
'missing_city_code': 0,
'missing_region_code': 0,
'collisions_detected': 0,
'errors': [],
}
# Collision detection
self.ghcid_usage: Dict[str, List[str]] = defaultdict(list) # GHCID -> [institution_names]
@staticmethod
def _get_city_code_fallback(city_name: str) -> str:
"""
Generate 3-letter city code from city name.
Handles city names with articles (La Serena, El Quisco, etc.)
by taking first letter of article + first 2 letters of main word.
Args:
city_name: City name (e.g., "La Serena", "São Paulo")
Returns:
3-letter uppercase code (e.g., "LSE", "SAO")
"""
import unicodedata
# Remove accents
normalized = unicodedata.normalize('NFD', city_name)
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Split into words
words = normalized.split()
if len(words) == 1:
# Single word: take first 3 letters
code = words[0][:3].upper()
elif words[0].lower() in ['la', 'el', 'los', 'las', 'o', 'a']:
# City with article: first letter of article + first 2 of next word
if len(words) > 1:
code = (words[0][0] + words[1][:2]).upper()
else:
code = words[0][:3].upper()
else:
# Multi-word: take first letter of each word (up to 3)
code = ''.join(w[0] for w in words[:3]).upper()
# Ensure exactly 3 letters
if len(code) < 3:
code = code.ljust(3, 'X')
elif len(code) > 3:
code = code[:3]
return code
def generate_for_institution(self, record: dict) -> Optional[GHCIDComponents]:
"""
Generate GHCID for a single institution record.
Args:
record: Institution record from YAML (dict)
Returns:
GHCIDComponents if successful, None otherwise
"""
self.stats['total_institutions'] += 1
try:
# Extract required fields
name = record.get('name')
institution_type_str = record.get('institution_type', 'UNKNOWN')
locations = record.get('locations', [])
if not name:
self.stats['errors'].append(f"Missing name for record: {record.get('id')}")
return None
if not locations:
self.stats['errors'].append(f"No locations for: {name}")
return None
location = locations[0] # Use first location
country_code = location.get('country')
region_name = location.get('region')
city_name = location.get('city')
if not country_code:
self.stats['errors'].append(f"No country for: {name}")
return None
# Get region code (ISO 3166-2)
region_code = "00" # Default to national-level
if region_name:
region_code = self.region_mapper.get_region_code(region_name, country_code)
if region_code == "00":
self.stats['missing_region_code'] += 1
# Get city code from GeoNames
city_code = "XXX" # Default for unknown/region-level
if city_name:
city_info = self.geonames_db.lookup_city(city_name, country_code)
if city_info:
city_code = city_info.get_abbreviation()
else:
self.stats['missing_city_code'] += 1
# Fallback: use first 3 letters of city name
# Handle city names with articles (La Serena -> LSE, El Quisco -> ELQ)
city_code = self._get_city_code_fallback(city_name)
# Map institution type to GHCID type code
try:
inst_type = InstitutionType[institution_type_str]
except KeyError:
inst_type = InstitutionType.UNKNOWN
# Generate abbreviation from name
abbreviation = extract_abbreviation_from_name(name)
# Create GHCID components
components = GHCIDComponents(
country_code=country_code,
region_code=region_code,
city_locode=city_code,
institution_type=inst_type.value,
abbreviation=abbreviation,
)
# Validate
is_valid, error_msg = components.validate()
if not is_valid:
self.stats['errors'].append(f"Invalid GHCID for {name}: {error_msg}")
return None
# Check for collisions
ghcid_str = components.to_string()
self.ghcid_usage[ghcid_str].append(name)
if len(self.ghcid_usage[ghcid_str]) > 1:
self.stats['collisions_detected'] += 1
self.stats['ghcids_generated'] += 1
return components
except Exception as e:
self.stats['errors'].append(f"Error generating GHCID for {record.get('name', 'unknown')}: {e}")
return None
def process_all_institutions(self, input_file: Path) -> List[dict]:
"""
Process all institutions in YAML file and generate GHCIDs.
Args:
input_file: Path to authoritative YAML file
Returns:
List of updated institution records with GHCID fields
"""
print(f"Loading institutions from: {input_file}")
with open(input_file, 'r', encoding='utf-8') as f:
institutions = yaml.safe_load(f)
print(f"Found {len(institutions)} institutions")
print()
updated_institutions = []
for i, record in enumerate(institutions, 1):
if i % 50 == 0:
print(f"Processing institution {i}/{len(institutions)}...")
# Generate GHCID
ghcid_components = self.generate_for_institution(record)
if ghcid_components:
# Add GHCID fields to record
record['ghcid'] = ghcid_components.to_string()
record['ghcid_numeric'] = ghcid_components.to_numeric()
# Use numeric GHCID as the main ID
old_id = record.get('id', '')
record['id'] = ghcid_components.to_numeric()
# Add GHCID to identifiers list
identifiers = record.get('identifiers', [])
# Add old ID to identifiers if it exists and isn't already there
if old_id:
has_old_id = any(i.get('identifier_value') == old_id for i in identifiers)
if not has_old_id:
identifiers.append({
'identifier_scheme': 'OLD_ID',
'identifier_value': old_id,
})
# Add GHCID identifier (if not already present)
has_ghcid = any(i.get('identifier_scheme') == 'GHCID' for i in identifiers)
if not has_ghcid:
identifiers.append({
'identifier_scheme': 'GHCID',
'identifier_value': ghcid_components.to_string(),
})
# Add numeric GHCID to identifiers as well
has_numeric = any(i.get('identifier_scheme') == 'GHCID_NUMERIC' for i in identifiers)
if not has_numeric:
identifiers.append({
'identifier_scheme': 'GHCID_NUMERIC',
'identifier_value': str(ghcid_components.to_numeric()),
})
record['identifiers'] = identifiers
updated_institutions.append(record)
return updated_institutions
def print_statistics(self):
"""Print generation statistics."""
print()
print("=" * 70)
print("GHCID GENERATION STATISTICS")
print("=" * 70)
print(f"Total institutions processed: {self.stats['total_institutions']}")
print(f"GHCIDs successfully generated: {self.stats['ghcids_generated']}")
print(f"Missing city codes (used fallback): {self.stats['missing_city_code']}")
print(f"Missing region codes (used '00'): {self.stats['missing_region_code']}")
print(f"GHCID collisions detected: {self.stats['collisions_detected']}")
print()
if self.stats['errors']:
print(f"⚠️ Errors encountered: {len(self.stats['errors'])}")
print()
print("Error details:")
for error in self.stats['errors'][:10]: # Show first 10
print(f" - {error}")
if len(self.stats['errors']) > 10:
print(f" ... and {len(self.stats['errors']) - 10} more")
else:
print("✅ No errors!")
print()
# Show collisions
if self.stats['collisions_detected'] > 0:
print("⚠️ GHCID COLLISIONS DETECTED:")
print()
for ghcid, names in self.ghcid_usage.items():
if len(names) > 1:
print(f" {ghcid}:")
for name in names:
print(f" - {name}")
print()
print("Note: Collisions will need Wikidata Q-numbers for disambiguation")
else:
print("✅ No GHCID collisions detected!")
print()
def validate_ghcids(self, institutions: List[dict]):
"""
Validate all generated GHCIDs.
Args:
institutions: List of institution records
"""
print("=" * 70)
print("VALIDATION")
print("=" * 70)
ghcid_set = set()
numeric_set = set()
duplicates = []
for record in institutions:
ghcid = record.get('ghcid')
ghcid_numeric = record.get('ghcid_numeric')
if ghcid:
if ghcid in ghcid_set:
duplicates.append(ghcid)
ghcid_set.add(ghcid)
if ghcid_numeric:
numeric_set.add(ghcid_numeric)
print(f"Unique GHCIDs: {len(ghcid_set)}")
print(f"Unique numeric GHCIDs: {len(numeric_set)}")
if duplicates:
print(f"⚠️ Duplicate GHCIDs found: {len(duplicates)}")
for dup in duplicates[:5]:
print(f" - {dup}")
else:
print("✅ All GHCIDs are unique!")
print()
def main():
"""Main entry point."""
# Paths
project_root = Path(__file__).parent.parent
input_file = project_root / "data" / "instances" / "latin_american_institutions_AUTHORITATIVE.yaml"
output_file = project_root / "data" / "instances" / "latin_american_institutions_AUTHORITATIVE.yaml"
backup_file = project_root / "data" / "instances" / "archive" / f"latin_american_institutions_pre_ghcid_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yaml"
# Create backup
print(f"Creating backup: {backup_file}")
backup_file.parent.mkdir(parents=True, exist_ok=True)
import shutil
shutil.copy(input_file, backup_file)
# Generate GHCIDs
generator = LatinAmericaGHCIDGenerator()
updated_institutions = generator.process_all_institutions(input_file)
# Print statistics
generator.print_statistics()
# Validate
generator.validate_ghcids(updated_institutions)
# Write updated YAML
print("=" * 70)
print(f"Writing updated YAML to: {output_file}")
# Add header comment
header = f"""---
# Latin American GLAM Institutions - GHCID Enhanced
# Last updated: {datetime.now(timezone.utc).isoformat()}
# GHCID generation: {generator.stats['ghcids_generated']}/{generator.stats['total_institutions']} institutions
#
# GHCID Statistics:
# - Total institutions: {generator.stats['total_institutions']}
# - GHCIDs generated: {generator.stats['ghcids_generated']}
# - Missing city codes: {generator.stats['missing_city_code']}
# - Missing region codes: {generator.stats['missing_region_code']}
# - Collisions detected: {generator.stats['collisions_detected']}
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(header)
yaml.dump(updated_institutions, f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
width=100)
print(f"✅ Done! Updated {len(updated_institutions)} institutions")
print()
if __name__ == "__main__":
main()