#!/usr/bin/env python3 """ Resolve GHCID collisions caused by diacritics normalization. When a file with diacritics normalizes to the same GHCID as an existing file, the diacritics file gets a name suffix per AGENTS.md collision rules. Usage: python scripts/resolve_diacritics_collisions.py --dry-run # Preview changes python scripts/resolve_diacritics_collisions.py # Apply changes """ import argparse import hashlib import os import re import shutil import unicodedata import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional import yaml # GHCID namespace UUID for deterministic UUID generation GHCID_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8") # Regex pattern for common diacritics DIACRITICS_PATTERN = re.compile(r'[ČŘŠŽĚŮÁÉÍÓÚÝÄÖÜßÑÇÀÈÌÒÙŁŃŚŹŻĄĘÅØÆŐŰÂÊÎÔÛčřšžěůáéíóúýäöüñçàèìòùłńśźżąęåøæőűâêîôûĎŇŤďňť]') def normalize_diacritics(text: str) -> str: """Normalize diacritics to ASCII equivalents.""" normalized = unicodedata.normalize('NFD', text) ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') return ascii_text def generate_name_suffix(native_name: str) -> str: """Convert native language institution name to snake_case suffix.""" # Normalize unicode (NFD decomposition) and remove diacritics normalized = unicodedata.normalize('NFD', native_name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Convert to lowercase lowercase = ascii_name.lower() # Remove apostrophes, commas, and other punctuation no_punct = re.sub(r"[''`\",.:;!?()[\]{}‒–—]", '', lowercase) # Replace spaces and hyphens with underscores underscored = re.sub(r'[\s\-]+', '_', no_punct) # Remove any remaining non-alphanumeric characters (except underscores) clean = re.sub(r'[^a-z0-9_]', '', underscored) # Collapse multiple underscores final = re.sub(r'_+', '_', clean).strip('_') return final def generate_uuid_v5(ghcid_string: str) -> str: """Generate deterministic UUID v5 from GHCID string.""" return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string)) def generate_uuid_v8_sha256(ghcid_string: str) -> str: """Generate UUID v8 from SHA-256 hash of GHCID string.""" sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest() uuid_bytes = bytearray(sha256_hash[:16]) uuid_bytes[6] = (uuid_bytes[6] & 0x0f) | 0x80 uuid_bytes[8] = (uuid_bytes[8] & 0x3f) | 0x80 return str(uuid.UUID(bytes=bytes(uuid_bytes))) def generate_numeric_id(ghcid_string: str) -> int: """Generate 64-bit numeric ID from SHA-256 hash.""" sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest() return int.from_bytes(sha256_hash[:8], byteorder='big') def find_collision_pairs(custodian_dir: Path) -> list[tuple[Path, Path, str]]: """Find files with diacritics that collide with existing ASCII files. Returns list of (diacritics_file, ascii_file, ascii_ghcid). """ collisions = [] for yaml_file in custodian_dir.glob("*.yaml"): filename = yaml_file.stem # Without .yaml if not DIACRITICS_PATTERN.search(filename): continue # Normalize to ASCII ascii_filename = normalize_diacritics(filename) ascii_file = custodian_dir / f"{ascii_filename}.yaml" if ascii_file.exists(): collisions.append((yaml_file, ascii_file, ascii_filename)) return collisions def resolve_collision(diacritics_file: Path, ascii_ghcid: str, dry_run: bool = True) -> Optional[dict]: """ Resolve a collision by adding a name suffix to the diacritics file. The diacritics file gets a name suffix since it's being added later. """ try: with open(diacritics_file, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: print(f" Error reading {diacritics_file}: {e}") return None if not data: return None # Get institution name for suffix original_entry = data.get('original_entry', {}) inst_name = original_entry.get('name', '') if not inst_name: print(f" Warning: No institution name found in {diacritics_file}") return None # Generate name suffix name_suffix = generate_name_suffix(inst_name) # Create new GHCID with name suffix new_ghcid = f"{ascii_ghcid}-{name_suffix}" # Get old GHCID from file ghcid_section = data.get('ghcid', {}) old_ghcid = ghcid_section.get('ghcid_current', diacritics_file.stem) # Generate new identifiers new_uuid_v5 = generate_uuid_v5(new_ghcid) new_uuid_v8 = generate_uuid_v8_sha256(new_ghcid) new_numeric = generate_numeric_id(new_ghcid) timestamp_now = datetime.now(timezone.utc).isoformat() change_info = { 'file': str(diacritics_file), 'institution_name': inst_name, 'old_ghcid': old_ghcid, 'new_ghcid': new_ghcid, 'name_suffix': name_suffix, } if dry_run: return change_info # Update ghcid section ghcid_section['ghcid_current'] = new_ghcid ghcid_section['ghcid_uuid'] = new_uuid_v5 ghcid_section['ghcid_uuid_sha256'] = new_uuid_v8 ghcid_section['ghcid_numeric'] = new_numeric # Add history entry ghcid_history = ghcid_section.get('ghcid_history', []) new_history_entry = { 'ghcid': new_ghcid, 'ghcid_numeric': new_numeric, 'valid_from': timestamp_now, 'reason': f"Name suffix added to resolve collision with {ascii_ghcid} (was: {old_ghcid})" } if ghcid_history and 'valid_to' not in ghcid_history[0]: ghcid_history[0]['valid_to'] = timestamp_now ghcid_section['ghcid_history'] = [new_history_entry] + ghcid_history data['ghcid'] = ghcid_section # Update identifiers section identifiers = data.get('identifiers', []) for ident in identifiers: if ident.get('identifier_scheme') == 'GHCID': ident['identifier_value'] = new_ghcid elif ident.get('identifier_scheme') == 'GHCID_UUID': ident['identifier_value'] = new_uuid_v5 elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256': ident['identifier_value'] = new_uuid_v8 elif ident.get('identifier_scheme') == 'GHCID_NUMERIC': ident['identifier_value'] = str(new_numeric) data['identifiers'] = identifiers # Write updated file with open(diacritics_file, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) # Rename file to match new GHCID new_filename = f"{new_ghcid}.yaml" new_file_path = diacritics_file.parent / new_filename if new_file_path.exists(): print(f" Warning: Target file already exists: {new_file_path}") else: shutil.move(str(diacritics_file), str(new_file_path)) change_info['new_file'] = str(new_file_path) return change_info def main(): parser = argparse.ArgumentParser( description="Resolve GHCID collisions caused by diacritics normalization" ) parser.add_argument( '--dry-run', action='store_true', help="Preview changes without modifying files" ) parser.add_argument( '--custodian-dir', type=Path, default=Path('data/custodian'), help="Path to custodian directory" ) args = parser.parse_args() custodian_dir = args.custodian_dir if not custodian_dir.exists(): print(f"Error: Directory not found: {custodian_dir}") return 1 print(f"Scanning {custodian_dir} for diacritics collision pairs...") collisions = find_collision_pairs(custodian_dir) print(f"Found {len(collisions)} collision pairs\n") if args.dry_run: print("=== DRY RUN (no changes will be made) ===\n") else: print("=== APPLYING CHANGES ===\n") changes = [] for i, (diacritics_file, ascii_file, ascii_ghcid) in enumerate(collisions, 1): print(f"[{i}/{len(collisions)}] Collision:") print(f" Diacritics file: {diacritics_file.name}") print(f" Collides with: {ascii_file.name}") change = resolve_collision(diacritics_file, ascii_ghcid, dry_run=args.dry_run) if change: changes.append(change) print(f" Institution: {change['institution_name']}") print(f" GHCID change: {change['old_ghcid']} → {change['new_ghcid']}") if 'new_file' in change: print(f" New file: {Path(change['new_file']).name}") print() print(f"=== SUMMARY ===") print(f"Collisions found: {len(collisions)}") print(f"Files resolved: {len(changes)}") if args.dry_run and changes: print("\nTo apply changes, run without --dry-run flag") return 0 if __name__ == '__main__': exit(main())