#!/usr/bin/env python3 """ Create custodian files from CH-Annotator data for unmatched institutions. This script: 1. Loads CH-Annotator files from data/instances/*_ch_annotator.yaml 2. Checks which institutions don't have custodian files yet 3. Generates GHCID for each new institution 4. Creates custodian files in data/custodian/ Usage: python scripts/create_custodian_from_ch_annotator.py [--dry-run] [--limit N] """ import os import sys import yaml import json import re import uuid import hashlib import argparse from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple, Any # Paths PROJECT_ROOT = Path(__file__).parent.parent CH_ANNOTATOR_DIR = PROJECT_ROOT / "data" / "instances" CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian" REPORTS_DIR = PROJECT_ROOT / "reports" INDEX_FILE = Path("/tmp/custodian_index.json") # GHCID namespace UUID for deterministic UUID generation GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # URL namespace # Institution type to GHCID code mapping TYPE_TO_CODE = { 'GALLERY': 'G', 'LIBRARY': 'L', 'ARCHIVE': 'A', 'MUSEUM': 'M', 'OFFICIAL_INSTITUTION': 'O', 'RESEARCH_CENTER': 'R', 'CORPORATION': 'C', 'UNKNOWN': 'U', 'BOTANICAL_ZOO': 'B', 'EDUCATION_PROVIDER': 'E', 'COLLECTING_SOCIETY': 'S', 'FEATURES': 'F', 'INTANGIBLE_HERITAGE_GROUP': 'I', 'MIXED': 'X', 'PERSONAL_COLLECTION': 'P', 'HOLY_SITES': 'H', 'DIGITAL_PLATFORM': 'D', 'NGO': 'N', 'TASTE_SMELL': 'T', } # Prepositions/articles to skip in abbreviations SKIP_WORDS = { 'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des', 'a', 'an', 'the', 'of', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'le', 'la', 'les', 'un', 'une', 'des', 'du', 'à', 'au', 'aux', 'en', 'der', 'die', 'das', 'dem', 'ein', 'eine', 'von', 'zu', 'für', 'mit', 'el', 'la', 'los', 'las', 'un', 'una', 'del', 'al', 'con', 'por', 'para', 'o', 'os', 'as', 'um', 'uma', 'do', 'da', 'dos', 'das', 'em', 'no', 'na', 'il', 'lo', 'i', 'gli', 'di', 'del', 'dello', 'della', 'nel', 'nella', 'and', 'or', 'but', 'und', 'oder', 'et', 'ou', 'e', 'y', 'o', } def normalize_name(name: str) -> str: """Normalize name for comparison.""" if not name: return "" name = name.lower() name = re.sub(r'[^\w\s]', '', name) name = re.sub(r'\s+', ' ', name).strip() return name def normalize_wikidata(qid: str) -> str: """Normalize Wikidata ID.""" if not qid: return "" if '/' in str(qid): qid = str(qid).split('/')[-1] return str(qid).strip().upper() def generate_abbreviation(name: str, max_len: int = 10) -> str: """Generate abbreviation from institution name.""" if not name: return "UNK" # Remove special characters but keep letters and spaces clean = re.sub(r'[^\w\s]', ' ', name) words = clean.split() # Filter out skip words and numbers significant_words = [w for w in words if w.lower() not in SKIP_WORDS and not w.isdigit()] if not significant_words: significant_words = words[:3] # Fallback to first 3 words # Take first letter of each word abbrev = ''.join(w[0].upper() for w in significant_words if w) # Limit length return abbrev[:max_len] if abbrev else "UNK" def name_to_snake_case(name: str) -> str: """Convert name to snake_case for file suffix.""" import unicodedata # Normalize unicode normalized = unicodedata.normalize('NFD', name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Lowercase and clean lower = ascii_name.lower() no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lower) underscored = re.sub(r'[\s\-]+', '_', no_punct) clean = re.sub(r'[^a-z0-9_]', '', underscored) final = re.sub(r'_+', '_', clean).strip('_') return final[:50] # Limit length def generate_ghcid( country_code: str, region_code: str, city_code: str, institution_type: str, abbreviation: str, name_suffix: Optional[str] = None ) -> str: """Generate GHCID string.""" type_code = TYPE_TO_CODE.get(institution_type, 'U') ghcid = f"{country_code}-{region_code}-{city_code}-{type_code}-{abbreviation}" if name_suffix: ghcid = f"{ghcid}-{name_suffix}" return ghcid def generate_ghcid_uuid(ghcid: str) -> str: """Generate UUID v5 from GHCID string.""" return str(uuid.uuid5(GHCID_NAMESPACE, ghcid)) def generate_ghcid_uuid_sha256(ghcid: str) -> str: """Generate UUID v8 (SHA-256 based) from GHCID string.""" sha256_hash = hashlib.sha256(ghcid.encode()).hexdigest() # Format as UUID v8 uuid_str = f"{sha256_hash[:8]}-{sha256_hash[8:12]}-8{sha256_hash[13:16]}-{sha256_hash[16:20]}-{sha256_hash[20:32]}" return uuid_str def generate_ghcid_numeric(ghcid: str) -> int: """Generate 64-bit numeric ID from GHCID.""" sha256_hash = hashlib.sha256(ghcid.encode()).digest() return int.from_bytes(sha256_hash[:8], 'big') def load_custodian_index() -> Dict: """Load or build custodian index.""" if INDEX_FILE.exists(): with open(INDEX_FILE, 'r') as f: return json.load(f) # Build index print("Building custodian index...") index = {'by_wikidata': {}, 'by_name': {}, 'by_isil': {}, 'by_ghcid': {}} for f in CUSTODIAN_DIR.glob("*.yaml"): try: with open(f, 'r') as fh: content = fh.read() # Extract GHCID from filename ghcid = f.stem index['by_ghcid'][ghcid] = str(f) # Extract Wikidata match = re.search(r'wikidata_entity_id:\s*["\']?(Q\d+)', content) if match: index['by_wikidata'][match.group(1).upper()] = str(f) # Extract name match = re.search(r'organisatie:\s*(.+?)$', content, re.MULTILINE) if match: name = match.group(1).strip().strip('"\'') index['by_name'][normalize_name(name)] = str(f) except: pass with open(INDEX_FILE, 'w') as f: json.dump(index, f) return index def institution_exists(inst: Dict, index: Dict) -> bool: """Check if institution already has a custodian file.""" # Check Wikidata for ident in inst.get('identifiers', []): if ident.get('identifier_scheme', '').upper() == 'WIKIDATA': qid = normalize_wikidata(ident.get('identifier_value', '')) if qid and qid in index['by_wikidata']: return True # Check name name = normalize_name(inst.get('name', '')) if name and name in index['by_name']: return True return False def sanitize_code(code: str, max_len: int = 2) -> str: """Sanitize a code for use in filenames and GHCIDs. - Removes diacritics - Keeps only alphanumeric chars - Converts to uppercase - Truncates to max_len """ import unicodedata if not code: return "XX" if max_len == 2 else "XXX" # Normalize unicode and remove diacritics normalized = unicodedata.normalize('NFD', str(code)) ascii_only = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Keep only alphanumeric clean = re.sub(r'[^a-zA-Z0-9]', '', ascii_only) if not clean: return "XX" if max_len == 2 else "XXX" return clean[:max_len].upper() def extract_location_info(inst: Dict) -> Tuple[str, str, str]: """Extract country, region, city codes from institution.""" locations = inst.get('locations', []) country_code = "XX" region_code = "XX" city_code = "XXX" if locations: loc = locations[0] country_code = loc.get('country', 'XX') or 'XX' # Region: if it's a 2-letter code, use it; otherwise sanitize region_raw = loc.get('region', 'XX') or 'XX' if len(region_raw) == 2 and region_raw.isalpha(): region_code = region_raw.upper() else: # It's a full region name - take first 2 letters region_code = sanitize_code(region_raw, 2) # City: generate 3-letter code city = loc.get('city', '') if city: city_code = sanitize_code(city, 3) return country_code, region_code, city_code def create_custodian_file(inst: Dict, source_file: str, index: Dict) -> Tuple[Optional[Path], str]: """ Create a custodian file for an institution. Returns: (file_path, status) where status is 'created', 'exists', or 'error' """ try: name = inst.get('name', 'Unknown Institution') institution_type = inst.get('institution_type', 'UNKNOWN') # Extract location country_code, region_code, city_code = extract_location_info(inst) # Generate abbreviation abbreviation = generate_abbreviation(name) # Generate base GHCID base_ghcid = generate_ghcid(country_code, region_code, city_code, institution_type, abbreviation) # Check for collision ghcid = base_ghcid if ghcid in index['by_ghcid']: # Add name suffix to resolve collision name_suffix = name_to_snake_case(name) ghcid = generate_ghcid(country_code, region_code, city_code, institution_type, abbreviation, name_suffix) # Generate UUIDs ghcid_uuid = generate_ghcid_uuid(ghcid) ghcid_uuid_sha256 = generate_ghcid_uuid_sha256(ghcid) ghcid_numeric = generate_ghcid_numeric(ghcid) record_id = str(uuid.uuid4()) timestamp = datetime.now(timezone.utc).isoformat() # Build custodian data structure custodian_data = { 'original_entry': { 'name': name, 'institution_type': institution_type, 'source': f'CH-Annotator ({source_file})', 'identifiers': inst.get('identifiers', []), 'locations': inst.get('locations', []), }, 'processing_timestamp': timestamp, 'ghcid': { 'ghcid_current': ghcid, 'ghcid_original': ghcid, 'ghcid_uuid': ghcid_uuid, 'ghcid_uuid_sha256': ghcid_uuid_sha256, 'ghcid_numeric': ghcid_numeric, 'record_id': record_id, 'generation_timestamp': timestamp, 'location_resolution': { 'country_code': country_code, 'region_code': region_code, 'city_code': city_code, 'method': 'CH_ANNOTATOR_SOURCE', }, 'ghcid_history': [{ 'ghcid': ghcid, 'ghcid_numeric': ghcid_numeric, 'valid_from': timestamp, 'reason': f'Initial GHCID from CH-Annotator ({source_file})', }], }, 'custodian_name': { 'claim_type': 'custodian_name', 'claim_value': name, 'source_type': 'ch_annotator', }, 'identifiers': [ {'identifier_scheme': 'GHCID', 'identifier_value': ghcid}, {'identifier_scheme': 'GHCID_UUID', 'identifier_value': ghcid_uuid}, {'identifier_scheme': 'GHCID_UUID_SHA256', 'identifier_value': ghcid_uuid_sha256}, {'identifier_scheme': 'GHCID_NUMERIC', 'identifier_value': str(ghcid_numeric)}, {'identifier_scheme': 'RECORD_ID', 'identifier_value': record_id}, ], 'provenance': { 'data_source': inst.get('provenance', {}).get('data_source', 'CH_ANNOTATOR'), 'data_tier': inst.get('provenance', {}).get('data_tier', 'TIER_3_CROWD_SOURCED'), 'extraction_date': inst.get('provenance', {}).get('extraction_date', timestamp), 'extraction_method': f'Created from CH-Annotator file: {source_file}', 'confidence_score': inst.get('provenance', {}).get('confidence_score', 0.8), }, 'ch_annotator': inst.get('ch_annotator', {}), } # Add original identifiers for ident in inst.get('identifiers', []): scheme = ident.get('identifier_scheme', '').upper() if scheme not in ['GHCID', 'GHCID_UUID', 'GHCID_UUID_SHA256', 'GHCID_NUMERIC', 'RECORD_ID']: custodian_data['identifiers'].append(ident) # Add Wikidata enrichment if available for ident in inst.get('identifiers', []): if ident.get('identifier_scheme', '').upper() == 'WIKIDATA': custodian_data['wikidata_enrichment'] = { 'wikidata_entity_id': ident.get('identifier_value', '').split('/')[-1], 'wikidata_label_en': name, } break # Add integration note to ch_annotator if 'ch_annotator' in custodian_data and custodian_data['ch_annotator']: custodian_data['ch_annotator']['integration_note'] = { 'created_from': source_file, 'creation_date': timestamp, 'creation_method': 'create_custodian_from_ch_annotator.py', } # Create file file_path = CUSTODIAN_DIR / f"{ghcid}.yaml" with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120) # Update index index['by_ghcid'][ghcid] = str(file_path) if normalize_name(name): index['by_name'][normalize_name(name)] = str(file_path) return file_path, 'created' except Exception as e: return None, f'error: {e}' def load_ch_annotator_file(path: Path) -> List[Dict]: """Load institutions from CH-Annotator file.""" with open(path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if isinstance(data, list): return data elif isinstance(data, dict): return data.get('institutions', []) return [] def main(): parser = argparse.ArgumentParser(description='Create custodian files from CH-Annotator data') parser.add_argument('--dry-run', action='store_true', help='Preview without creating files') parser.add_argument('--limit', type=int, default=0, help='Limit institutions per file (0=unlimited)') parser.add_argument('--skip-large', action='store_true', help='Skip files with >5000 institutions') args = parser.parse_args() print("=" * 60) print("Create Custodian Files from CH-Annotator Data") print("=" * 60) if args.dry_run: print("DRY RUN MODE - No files will be created") # Load index print("\n1. Loading custodian index...") index = load_custodian_index() print(f" Indexed: {len(index.get('by_ghcid', {}))} GHCIDs, " f"{len(index.get('by_wikidata', {}))} Wikidata, " f"{len(index.get('by_name', {}))} names") # Find CH-Annotator files ch_files = sorted(CH_ANNOTATOR_DIR.glob("*_ch_annotator.yaml")) print(f"\n2. Found {len(ch_files)} CH-Annotator files") # Process files total_stats = { 'processed': 0, 'created': 0, 'skipped_exists': 0, 'errors': 0, 'by_source': {}, } for ch_file in ch_files: print(f"\n--- {ch_file.name} ---") try: institutions = load_ch_annotator_file(ch_file) print(f" Loaded {len(institutions)} institutions") if args.skip_large and len(institutions) > 5000: print(f" SKIPPING (>5000 institutions)") continue file_stats = {'processed': 0, 'created': 0, 'skipped': 0, 'errors': 0} for i, inst in enumerate(institutions): if args.limit and file_stats['processed'] >= args.limit: print(f" Reached limit of {args.limit}") break if i % 500 == 0 and i > 0: print(f" Progress: {i}/{len(institutions)}, created: {file_stats['created']}") file_stats['processed'] += 1 total_stats['processed'] += 1 # Check if exists if institution_exists(inst, index): file_stats['skipped'] += 1 total_stats['skipped_exists'] += 1 continue # Create file if not args.dry_run: path, status = create_custodian_file(inst, ch_file.name, index) if status == 'created': file_stats['created'] += 1 total_stats['created'] += 1 elif 'error' in status: file_stats['errors'] += 1 total_stats['errors'] += 1 else: file_stats['created'] += 1 total_stats['created'] += 1 print(f" Processed: {file_stats['processed']}, Created: {file_stats['created']}, " f"Skipped: {file_stats['skipped']}, Errors: {file_stats['errors']}") total_stats['by_source'][ch_file.name] = file_stats except Exception as e: print(f" ERROR: {e}") total_stats['errors'] += 1 # Print summary print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) print(f"Total processed: {total_stats['processed']}") print(f"Files created: {total_stats['created']}") print(f"Skipped (already exist): {total_stats['skipped_exists']}") print(f"Errors: {total_stats['errors']}") # Save report if not args.dry_run: REPORTS_DIR.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = REPORTS_DIR / f"CUSTODIAN_CREATION_REPORT_{timestamp}.md" report = f"""# Custodian File Creation Report Generated: {datetime.now(timezone.utc).isoformat()} ## Summary | Metric | Count | |--------|-------| | Institutions processed | {total_stats['processed']} | | Custodian files created | {total_stats['created']} | | Skipped (already exist) | {total_stats['skipped_exists']} | | Errors | {total_stats['errors']} | ## By Source File | Source File | Processed | Created | Skipped | Errors | |-------------|-----------|---------|---------|--------| """ for source, stats in total_stats['by_source'].items(): report += f"| {source} | {stats['processed']} | {stats['created']} | {stats['skipped']} | {stats['errors']} |\n" with open(report_path, 'w') as f: f.write(report) print(f"\nReport saved to: {report_path}") return 0 if __name__ == '__main__': sys.exit(main())