#!/usr/bin/env python3 """ Fix remaining non-ASCII GHCID files: 1. Bulgarian files with Cyrillic - add year suffixes for collision resolution 2. Swiss files with parentheses - fix city codes Per AGENTS.md: - Rule about diacritics normalization - Rule about special characters in abbreviations - Collision resolution via year suffix """ import os import re import uuid import hashlib from datetime import datetime, timezone from pathlib import Path import yaml # Custom YAML handling for anchors class NoAliasDumper(yaml.SafeDumper): def ignore_aliases(self, data): return True def generate_ghcid_uuid_v5(ghcid: str) -> str: """Generate UUID v5 from GHCID string using heritage namespace.""" HERITAGE_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') return str(uuid.uuid5(HERITAGE_NAMESPACE, ghcid)) def generate_ghcid_uuid_sha256(ghcid: str) -> str: """Generate UUID v8 (SHA-256 based) from GHCID string.""" hash_bytes = hashlib.sha256(ghcid.encode('utf-8')).digest()[:16] hash_bytes = bytes([ hash_bytes[0], hash_bytes[1], hash_bytes[2], hash_bytes[3], hash_bytes[4], hash_bytes[5], (hash_bytes[6] & 0x0F) | 0x80, # Version 8 hash_bytes[7], (hash_bytes[8] & 0x3F) | 0x80, # Variant hash_bytes[9], hash_bytes[10], hash_bytes[11], hash_bytes[12], hash_bytes[13], hash_bytes[14], hash_bytes[15] ]) return str(uuid.UUID(bytes=hash_bytes)) def generate_ghcid_numeric(ghcid: str) -> int: """Generate 64-bit numeric ID from GHCID.""" hash_bytes = hashlib.sha256(ghcid.encode('utf-8')).digest() return int.from_bytes(hash_bytes[:8], byteorder='big') def fix_bulgarian_file(old_filepath: str, year_suffix: str, new_abbrev: str): """Fix Bulgarian file with Cyrillic characters and add year suffix.""" print(f"\n=== Processing: {old_filepath} ===") with open(old_filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) old_ghcid = data['ghcid']['ghcid_current'] # Extract components (country-region-city-type-abbrev) parts = old_ghcid.rsplit('-', 4) if len(parts) >= 5: country, region, city, inst_type = parts[0], parts[1], parts[2], parts[3] else: print(f" ERROR: Could not parse GHCID: {old_ghcid}") return # New GHCID with ASCII abbreviation and year suffix new_ghcid = f"{country}-{region}-{city}-{inst_type}-{new_abbrev}-{year_suffix}" print(f" Old GHCID: {old_ghcid}") print(f" New GHCID: {new_ghcid}") # Generate new UUIDs new_uuid = generate_ghcid_uuid_v5(new_ghcid) new_uuid_sha256 = generate_ghcid_uuid_sha256(new_ghcid) new_numeric = generate_ghcid_numeric(new_ghcid) timestamp = datetime.now(timezone.utc).isoformat() # Update GHCID section data['ghcid']['ghcid_current'] = new_ghcid data['ghcid']['ghcid_uuid'] = new_uuid data['ghcid']['ghcid_uuid_sha256'] = new_uuid_sha256 data['ghcid']['ghcid_numeric'] = new_numeric # Add history entry if 'ghcid_history' not in data['ghcid']: data['ghcid']['ghcid_history'] = [] # Insert new history entry at the beginning data['ghcid']['ghcid_history'].insert(0, { 'ghcid': new_ghcid, 'valid_from': timestamp, 'valid_to': None, 'reason': f'Collision resolution: Added year suffix -{year_suffix} to differentiate from existing GHCID. Cyrillic to ASCII transliteration applied.' }) # Update identifiers list if 'identifiers' in data: for ident in data['identifiers']: if ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid elif ident.get('identifier_scheme') == 'GHCID_UUID': ident['identifier_value'] = new_uuid elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256': ident['identifier_value'] = new_uuid_sha256 elif ident.get('identifier_scheme') == 'GHCID_NUMERIC': ident['identifier_value'] = str(new_numeric) # New filename new_filename = f"{new_ghcid}.yaml" new_filepath = os.path.join(os.path.dirname(old_filepath), new_filename) # Write new file with open(new_filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=NoAliasDumper, allow_unicode=True, default_flow_style=False, sort_keys=False) print(f" Created: {new_filename}") # Remove old file os.remove(old_filepath) print(f" Deleted: {os.path.basename(old_filepath)}") def fix_swiss_file(old_filepath: str, new_city_code: str): """Fix Swiss file with parentheses in city code.""" print(f"\n=== Processing: {old_filepath} ===") with open(old_filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) old_ghcid = data['ghcid']['ghcid_current'] # Parse old GHCID to get components # Format: CH-{region}-{city}-{type}-{abbrev} # E.g., CH-BE-G(-A-SCCMMA or CH-GE-V(--L-BLUGUAAO # Find the position of the institution type (single letter followed by -) match = re.match(r'^([A-Z]{2})-([A-Z]{2})-(.+)-([GLAMORCSEFIBXPHDNT])-(.+)$', old_ghcid) if match: country, region, old_city, inst_type, abbrev = match.groups() else: print(f" ERROR: Could not parse GHCID: {old_ghcid}") return new_ghcid = f"{country}-{region}-{new_city_code}-{inst_type}-{abbrev}" print(f" Old GHCID: {old_ghcid}") print(f" New GHCID: {new_ghcid}") # Generate new UUIDs new_uuid = generate_ghcid_uuid_v5(new_ghcid) new_uuid_sha256 = generate_ghcid_uuid_sha256(new_ghcid) new_numeric = generate_ghcid_numeric(new_ghcid) timestamp = datetime.now(timezone.utc).isoformat() # Update GHCID section data['ghcid']['ghcid_current'] = new_ghcid data['ghcid']['ghcid_uuid'] = new_uuid data['ghcid']['ghcid_uuid_sha256'] = new_uuid_sha256 data['ghcid']['ghcid_numeric'] = new_numeric # Update location_resolution city_code if 'location_resolution' in data['ghcid']: data['ghcid']['location_resolution']['city_code'] = new_city_code # Add history entry if 'ghcid_history' not in data['ghcid']: data['ghcid']['ghcid_history'] = [] data['ghcid']['ghcid_history'].insert(0, { 'ghcid': new_ghcid, 'valid_from': timestamp, 'valid_to': None, 'reason': f'Fixed city code: Removed parentheses from city code per ABBREV-CHAR-FILTER rule. Old city code: {old_city}' }) # Update identifiers list if 'identifiers' in data: for ident in data['identifiers']: if ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid elif ident.get('identifier_scheme') == 'GHCID_UUID': ident['identifier_value'] = new_uuid elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256': ident['identifier_value'] = new_uuid_sha256 elif ident.get('identifier_scheme') == 'GHCID_NUMERIC': ident['identifier_value'] = str(new_numeric) # New filename new_filename = f"{new_ghcid}.yaml" new_filepath = os.path.join(os.path.dirname(old_filepath), new_filename) # Write new file with open(new_filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=NoAliasDumper, allow_unicode=True, default_flow_style=False, sort_keys=False) print(f" Created: {new_filename}") # Remove old file os.remove(old_filepath) print(f" Deleted: {os.path.basename(old_filepath)}") def main(): custodian_dir = Path('/Users/kempersc/apps/glam/data/custodian') # Bulgarian files with Cyrillic that need year suffixes # These collide with existing ASCII files, so need year suffix resolution bulgarian_fixes = [ # (filename, year_suffix, ascii_abbreviation) ('BG-11-RAY-L-БПНЧП-55.yaml', '1955', 'BPNCHP'), # "Просвета 55" - assuming 1955 ('BG-11-RAY-L-БПНЧИ.yaml', '1930', 'BPNCHI'), # "Изгрев-1930" ('BG-23-TRU-L-БПНЧСГ.yaml', '1906', 'BPNCHSG'), # "Светлина – 1906 г." ] # Swiss files with parentheses in city code swiss_fixes = [ # (filename, new_city_code) ('CH-BE-G(-A-SCCMMA.yaml', 'GWA'), # Gwatt (Thun) -> GWA ('CH-GE-V(--L-BLUGUAAO.yaml', 'VER'), # Versoix (Sauverny) -> VER ] print("=== FIXING BULGARIAN CYRILLIC FILES ===") for filename, year_suffix, abbrev in bulgarian_fixes: filepath = custodian_dir / filename if filepath.exists(): fix_bulgarian_file(str(filepath), year_suffix, abbrev) else: print(f" SKIP: {filename} not found") print("\n=== FIXING SWISS PARENTHESES FILES ===") for filename, new_city_code in swiss_fixes: filepath = custodian_dir / filename if filepath.exists(): fix_swiss_file(str(filepath), new_city_code) else: print(f" SKIP: {filename} not found") print("\n=== VERIFICATION ===") # Check for any remaining non-ASCII filenames remaining = [] for f in custodian_dir.iterdir(): if f.is_file() and f.suffix == '.yaml': try: f.name.encode('ascii') except UnicodeEncodeError: remaining.append(f.name) if remaining: print(f"WARNING: {len(remaining)} files still have non-ASCII names:") for name in remaining: print(f" - {name}") else: print("SUCCESS: All custodian files now have ASCII-only filenames!") if __name__ == '__main__': main()