glam/scripts/generate_ghcids_egypt.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

642 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Generate GHCIDs for Egyptian GLAM institutions.
This script:
1. Loads the Egyptian institutions YAML file (with Wikidata/VIAF enrichment)
2. Maps governorate names to ISO 3166-2 codes (EG-C, EG-ALX, etc.)
3. Handles sparse location data:
- Extracts cities from street addresses
- Infers Cairo for national institutions
- Uses coordinates for geocoding
4. Generates GHCID identifiers with four-identifier strategy
5. Updates the YAML file with GHCID fields
6. Detects collisions and appends Wikidata Q-numbers when available
Key Challenges for Egypt:
- 15/29 institutions have NO location data (empty locations array)
- Only 10 institutions have city names
- Some cities are actually street names ("Nile Corniche", "Tahrir Square")
- National institutions often don't specify Cairo explicitly
Solution Strategy:
- Parse street addresses to extract city names (Alexandria from "Chatby, Alexandria")
- Default national libraries/museums/archives to Cairo (EG-C)
- Use Wikidata location data as fallback
- Allow "00-XXX" for institutions with unknown precise location
Usage:
python scripts/generate_ghcids_egypt.py
"""
import json
import re
import sys
import yaml
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from glam_extractor.identifiers.ghcid import (
GHCIDGenerator,
GHCIDComponents,
InstitutionType,
extract_abbreviation_from_name,
)
from glam_extractor.geocoding.geonames_lookup import GeoNamesDB
class EgyptRegionMapper:
"""Maps Egyptian governorate names to ISO 3166-2 codes."""
def __init__(self):
"""Load ISO 3166-2 mappings from reference data."""
self.reference_dir = Path(__file__).parent.parent / "data" / "reference"
# Load Egypt mapping
self.eg_mapping = self._load_mapping("iso_3166_2_eg.json", reverse=True)
# Egyptian city -> governorate inference
# Some cities are well-known and we can infer the governorate
self.city_to_governorate = {
'CAIRO': 'C',
'ALEXANDRIA': 'ALX',
'GIZA': 'GZ',
'LUXOR': 'LX',
'ASWAN': 'ASN',
'PORT SAID': 'PTS',
'SUEZ': 'SUZ',
}
def _load_mapping(self, filename: str, reverse: bool = False) -> Dict[str, str]:
"""
Load ISO 3166-2 mapping from JSON file.
Args:
filename: JSON file in data/reference/
reverse: If True, create name->code mapping (default is code->name)
"""
filepath = self.reference_dir / filename
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
if reverse:
# Create normalized name -> code mapping
mapping = {}
for code, name in data.items():
# Normalize: uppercase, strip accents for lookup
normalized_name = self._normalize_name(name)
mapping[normalized_name] = code
return mapping
return data
@staticmethod
def _normalize_name(name: str) -> str:
"""
Normalize governorate/city name for lookup.
- Uppercase
- Remove accents
- Strip whitespace
"""
import unicodedata
# Uppercase
normalized = name.upper()
# Remove accents (NFD decomposition)
normalized = unicodedata.normalize('NFD', normalized)
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Strip whitespace
normalized = normalized.strip()
return normalized
def get_governorate_code(self, governorate_name: str) -> str:
"""
Get ISO 3166-2 governorate code.
Args:
governorate_name: Governorate name (e.g., "Cairo", "Alexandria")
Returns:
ISO 3166-2 subdivision code (e.g., "C", "ALX")
Returns "00" if governorate not found (national-level fallback)
"""
normalized = self._normalize_name(governorate_name)
if normalized in self.eg_mapping:
return self.eg_mapping[normalized]
# Check city->governorate inference
if normalized in self.city_to_governorate:
return self.city_to_governorate[normalized]
# Fallback: return "00" for national-level
return "00"
class EgyptLocationInference:
"""Infer location data from various sources."""
# National institutions keywords (likely in Cairo)
NATIONAL_KEYWORDS = [
'national', 'egyptian', 'egypt', 'dar al-kutub', 'dar al-mahfuzat',
'grand egyptian museum', 'egyptian museum cairo'
]
# City extraction patterns from addresses
CITY_PATTERNS = [
r',\s*([A-Za-z\s]+)\s+\d{4,}', # ", Alexandria 21526"
r',\s*([A-Za-z\s]+),\s*Egypt', # ", Cairo, Egypt"
r'\b([A-Za-z\s]+)\s+\d{4,}\s*,?\s*Egypt', # "Alexandria 21526, Egypt"
]
@classmethod
def infer_location(cls, record: dict) -> Tuple[Optional[str], Optional[str]]:
"""
Infer city and governorate from institution record.
Args:
record: Institution record (dict)
Returns:
Tuple of (city_name, governorate_name) - may be None
"""
locations = record.get('locations', [])
name = record.get('name', '').lower()
# Strategy 1: Use existing location data
if locations:
location = locations[0]
city = location.get('city')
# Check if city is actually a street/landmark
if city and not cls._is_landmark(city):
# Infer governorate from city
governorate = cls._infer_governorate_from_city(city)
return city, governorate
# Try extracting from street address
address = location.get('street_address', '')
if address:
extracted_city = cls._extract_city_from_address(address)
if extracted_city:
governorate = cls._infer_governorate_from_city(extracted_city)
return extracted_city, governorate
# Strategy 2: Infer Cairo for national institutions
if any(keyword in name for keyword in cls.NATIONAL_KEYWORDS):
return 'Cairo', 'Cairo'
# Strategy 3: No location data
return None, None
@staticmethod
def _is_landmark(city_name: str) -> bool:
"""Check if 'city' is actually a landmark/street."""
landmarks = [
'nile corniche', 'tahrir square', 'chatby',
'downtown', 'zamalek', 'garden city'
]
return city_name.lower() in landmarks
@classmethod
def _extract_city_from_address(cls, address: str) -> Optional[str]:
"""Extract city name from street address."""
for pattern in cls.CITY_PATTERNS:
match = re.search(pattern, address, re.IGNORECASE)
if match:
city = match.group(1).strip()
# Filter out postal codes, Egypt
if city.lower() not in ['egypt', 'eg'] and not city.isdigit():
return city
return None
@staticmethod
def _infer_governorate_from_city(city_name: str) -> Optional[str]:
"""Map city name to governorate."""
city_upper = city_name.upper()
# Major cities that match governorate names
major_cities = {
'CAIRO': 'Cairo',
'ALEXANDRIA': 'Alexandria',
'GIZA': 'Giza',
'LUXOR': 'Luxor',
'ASWAN': 'Aswan',
'PORT SAID': 'Port Said',
'SUEZ': 'Suez',
}
for city_key, governorate in major_cities.items():
if city_key in city_upper:
return governorate
return None
class EgyptGHCIDGenerator:
"""Generate GHCIDs for Egyptian institutions."""
def __init__(self):
"""Initialize generator with dependencies."""
self.ghcid_gen = GHCIDGenerator()
self.region_mapper = EgyptRegionMapper()
self.geonames_db = GeoNamesDB()
# Statistics
self.stats = {
'total_institutions': 0,
'ghcids_generated': 0,
'location_inferred': 0,
'defaulted_to_cairo': 0,
'missing_city_code': 0,
'missing_governorate_code': 0,
'collisions_detected': 0,
'errors': [],
}
# Collision detection
self.ghcid_usage: Dict[str, List[str]] = defaultdict(list) # GHCID -> [institution_names]
@staticmethod
def _get_city_code_fallback(city_name: str) -> str:
"""
Generate 3-letter city code from city name.
Args:
city_name: City name (e.g., "Cairo", "Alexandria")
Returns:
3-letter uppercase code (e.g., "CAI", "ALE")
"""
import unicodedata
# Remove accents
normalized = unicodedata.normalize('NFD', city_name)
normalized = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Split into words
words = normalized.split()
if len(words) == 1:
# Single word: take first 3 letters
code = words[0][:3].upper()
else:
# Multi-word: take first letter of each word (up to 3)
code = ''.join(w[0] for w in words[:3]).upper()
# Ensure exactly 3 letters
if len(code) < 3:
code = code.ljust(3, 'X')
elif len(code) > 3:
code = code[:3]
return code
def generate_for_institution(self, record: dict) -> Optional[GHCIDComponents]:
"""
Generate GHCID for a single Egyptian institution.
Args:
record: Institution record from YAML (dict)
Returns:
GHCIDComponents if successful, None otherwise
"""
self.stats['total_institutions'] += 1
try:
# Extract required fields
name = record.get('name')
institution_type_str = record.get('institution_type', 'UNKNOWN')
if not name:
self.stats['errors'].append(f"Missing name for record: {record.get('id')}")
return None
# Country code (always EG)
country_code = "EG"
# Infer location data
city_name, governorate_name = EgyptLocationInference.infer_location(record)
if not city_name:
# Default to Cairo for national institutions
if any(keyword in name.lower() for keyword in EgyptLocationInference.NATIONAL_KEYWORDS):
city_name = "Cairo"
governorate_name = "Cairo"
self.stats['defaulted_to_cairo'] += 1
else:
self.stats['errors'].append(f"No location data for: {name}")
return None
else:
if governorate_name:
self.stats['location_inferred'] += 1
# Get governorate code (ISO 3166-2)
governorate_code = "00" # Default to national-level
if governorate_name:
governorate_code = self.region_mapper.get_governorate_code(governorate_name)
if governorate_code == "00":
self.stats['missing_governorate_code'] += 1
# Get city code from GeoNames
city_code = "XXX" # Default for unknown
if city_name:
city_info = self.geonames_db.lookup_city(city_name, country_code)
if city_info:
city_code = city_info.get_abbreviation()
else:
self.stats['missing_city_code'] += 1
# Fallback: use first 3 letters of city name
city_code = self._get_city_code_fallback(city_name)
# Map institution type to GHCID type code
try:
inst_type = InstitutionType[institution_type_str]
except KeyError:
inst_type = InstitutionType.UNKNOWN
# Generate abbreviation from name
abbreviation = extract_abbreviation_from_name(name)
# Create GHCID components
components = GHCIDComponents(
country_code=country_code,
region_code=governorate_code,
city_locode=city_code,
institution_type=inst_type.value,
abbreviation=abbreviation,
)
# Validate
is_valid, error_msg = components.validate()
if not is_valid:
self.stats['errors'].append(f"Invalid GHCID for {name}: {error_msg}")
return None
# Check for collisions (before Q-number)
base_ghcid = components.to_string()
self.ghcid_usage[base_ghcid].append(name)
if len(self.ghcid_usage[base_ghcid]) > 1:
self.stats['collisions_detected'] += 1
self.stats['ghcids_generated'] += 1
return components
except Exception as e:
self.stats['errors'].append(f"Error generating GHCID for {record.get('name', 'unknown')}: {e}")
return None
def process_all_institutions(self, input_file: Path) -> List[dict]:
"""
Process all institutions in YAML file and generate GHCIDs.
Args:
input_file: Path to Egyptian institutions YAML file
Returns:
List of updated institution records with GHCID fields
"""
print(f"Loading Egyptian institutions from: {input_file}")
with open(input_file, 'r', encoding='utf-8') as f:
institutions = yaml.safe_load(f)
print(f"Found {len(institutions)} institutions")
print()
updated_institutions = []
for i, record in enumerate(institutions, 1):
print(f"Processing {i}/{len(institutions)}: {record.get('name', 'unknown')}")
# Generate GHCID
ghcid_components = self.generate_for_institution(record)
if ghcid_components:
# Check for Wikidata Q-number (for collision resolution)
wikidata_qid = None
identifiers = record.get('identifiers', [])
for identifier in identifiers:
if identifier.get('identifier_scheme') == 'Wikidata':
wikidata_qid = identifier.get('identifier_value')
break
# If collision exists and we have Q-number, append it
base_ghcid = ghcid_components.to_string()
if len(self.ghcid_usage[base_ghcid]) > 1 and wikidata_qid:
# Append Q-number for disambiguation
ghcid_with_q = f"{base_ghcid}-{wikidata_qid}"
record['ghcid'] = ghcid_with_q
print(f" → Collision detected, using GHCID with Q-number: {ghcid_with_q}")
else:
record['ghcid'] = base_ghcid
print(f" → GHCID: {base_ghcid}")
# Add UUID v5 (SHA-1) - PRIMARY identifier
record['ghcid_uuid'] = str(ghcid_components.to_uuid())
# Add UUID v8 (SHA-256) - Secondary identifier
record['ghcid_uuid_sha256'] = str(ghcid_components.to_uuid_sha256())
# Add numeric identifier
record['ghcid_numeric'] = ghcid_components.to_numeric()
# Add GHCID to identifiers list
has_ghcid = any(i.get('identifier_scheme') == 'GHCID' for i in identifiers)
if not has_ghcid:
identifiers.append({
'identifier_scheme': 'GHCID',
'identifier_value': record['ghcid'],
})
record['identifiers'] = identifiers
# Update provenance with GHCID generation metadata
provenance = record.get('provenance', {})
provenance['ghcid_generation'] = {
'generated_date': datetime.now(timezone.utc).isoformat(),
'generation_method': 'EgyptGHCIDGenerator with location inference',
'base_ghcid': base_ghcid,
'has_wikidata_disambiguation': wikidata_qid is not None,
}
record['provenance'] = provenance
updated_institutions.append(record)
return updated_institutions
def print_statistics(self):
"""Print generation statistics."""
print()
print("=" * 70)
print("EGYPT GHCID GENERATION STATISTICS")
print("=" * 70)
print(f"Total institutions processed: {self.stats['total_institutions']}")
print(f"GHCIDs successfully generated: {self.stats['ghcids_generated']}")
print(f"Locations inferred from data: {self.stats['location_inferred']}")
print(f"Defaulted to Cairo (national inst): {self.stats['defaulted_to_cairo']}")
print(f"Missing city codes (used fallback): {self.stats['missing_city_code']}")
print(f"Missing governorate codes ('00'): {self.stats['missing_governorate_code']}")
print(f"GHCID collisions detected: {self.stats['collisions_detected']}")
print()
if self.stats['errors']:
print(f"⚠️ Errors encountered: {len(self.stats['errors'])}")
print()
print("Error details:")
for error in self.stats['errors']:
print(f" - {error}")
else:
print("✅ No errors!")
print()
# Show collisions
if self.stats['collisions_detected'] > 0:
print("⚠️ GHCID COLLISIONS DETECTED:")
print()
for ghcid, names in self.ghcid_usage.items():
if len(names) > 1:
print(f" {ghcid}:")
for name in names:
print(f" - {name}")
print()
print("Note: Collisions resolved with Wikidata Q-numbers where available")
else:
print("✅ No GHCID collisions detected!")
print()
def validate_ghcids(self, institutions: List[dict]):
"""
Validate all generated GHCIDs.
Args:
institutions: List of institution records
"""
print("=" * 70)
print("VALIDATION")
print("=" * 70)
ghcid_set = set()
numeric_set = set()
uuid_v5_set = set()
uuid_v8_set = set()
duplicates = []
for record in institutions:
ghcid = record.get('ghcid')
ghcid_numeric = record.get('ghcid_numeric')
ghcid_uuid = record.get('ghcid_uuid')
ghcid_uuid_sha256 = record.get('ghcid_uuid_sha256')
if ghcid:
if ghcid in ghcid_set:
duplicates.append(ghcid)
ghcid_set.add(ghcid)
if ghcid_numeric:
numeric_set.add(ghcid_numeric)
if ghcid_uuid:
uuid_v5_set.add(ghcid_uuid)
if ghcid_uuid_sha256:
uuid_v8_set.add(ghcid_uuid_sha256)
print(f"Unique GHCIDs (with Q-numbers): {len(ghcid_set)}")
print(f"Unique numeric GHCIDs: {len(numeric_set)}")
print(f"Unique UUID v5 (SHA-1) identifiers: {len(uuid_v5_set)}")
print(f"Unique UUID v8 (SHA-256) ids: {len(uuid_v8_set)}")
if duplicates:
print(f"⚠️ Duplicate GHCIDs found: {len(duplicates)}")
for dup in duplicates:
print(f" - {dup}")
else:
print("✅ All GHCIDs are unique!")
print()
def main():
"""Main entry point."""
# Paths
project_root = Path(__file__).parent.parent
input_file = project_root / "data" / "instances" / "egypt_institutions_wikidata_viaf.yaml"
output_file = project_root / "data" / "instances" / "egypt_institutions_ghcid.yaml"
backup_file = project_root / "data" / "instances" / "archive" / f"egypt_institutions_pre_ghcid_{datetime.now().strftime('%Y%m%d_%H%M%S')}.yaml"
# Create backup
print(f"Creating backup: {backup_file}")
backup_file.parent.mkdir(parents=True, exist_ok=True)
import shutil
shutil.copy(input_file, backup_file)
print()
# Generate GHCIDs
generator = EgyptGHCIDGenerator()
updated_institutions = generator.process_all_institutions(input_file)
# Print statistics
generator.print_statistics()
# Validate
generator.validate_ghcids(updated_institutions)
# Write updated YAML
print("=" * 70)
print(f"Writing updated YAML to: {output_file}")
# Add header comment
header = f"""---
# Egyptian GLAM Institutions - GHCID Enhanced
# Last updated: {datetime.now(timezone.utc).isoformat()}
# GHCID generation: {generator.stats['ghcids_generated']}/{generator.stats['total_institutions']} institutions
#
# GHCID Statistics:
# - Total institutions: {generator.stats['total_institutions']}
# - GHCIDs generated: {generator.stats['ghcids_generated']}
# - Locations inferred: {generator.stats['location_inferred']}
# - Defaulted to Cairo: {generator.stats['defaulted_to_cairo']}
# - Missing city codes: {generator.stats['missing_city_code']}
# - Missing governorate codes: {generator.stats['missing_governorate_code']}
# - Collisions detected: {generator.stats['collisions_detected']}
#
# Four-Identifier Strategy:
# - ghcid: Base GHCID string (with Q-number for collisions)
# - ghcid_uuid: UUID v5 (SHA-1) - PRIMARY persistent identifier
# - ghcid_uuid_sha256: UUID v8 (SHA-256) - Secondary identifier
# - ghcid_numeric: 64-bit numeric for CSV exports
#
# Location Inference:
# - Extracted cities from street addresses (e.g., "Chatby, Alexandria")
# - Defaulted national institutions to Cairo
# - Used fallback city codes when GeoNames lookup failed
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(header)
yaml.dump(updated_institutions, f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
width=100)
print(f"✅ Done! Updated {len(updated_institutions)} institutions")
print(f"✅ Output file: {output_file}")
print()
if __name__ == "__main__":
main()