glam/scripts/resolve_diacritics_collisions.py
kempersc 891692a4d6 feat(ghcid): add diacritics normalization and transliteration scripts
- Add fix_ghcid_diacritics.py for normalizing non-ASCII in GHCIDs
- Add resolve_diacritics_collisions.py for collision handling
- Add transliterate_emic_names.py for non-Latin script handling
- Add transliteration tests
2025-12-08 14:59:28 +01:00

270 lines
9 KiB
Python

#!/usr/bin/env python3
"""
Resolve GHCID collisions caused by diacritics normalization.
When a file with diacritics normalizes to the same GHCID as an existing file,
the diacritics file gets a name suffix per AGENTS.md collision rules.
Usage:
python scripts/resolve_diacritics_collisions.py --dry-run # Preview changes
python scripts/resolve_diacritics_collisions.py # Apply changes
"""
import argparse
import hashlib
import os
import re
import shutil
import unicodedata
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
# GHCID namespace UUID for deterministic UUID generation
GHCID_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
# Regex pattern for common diacritics
DIACRITICS_PATTERN = re.compile(r'[ČŘŠŽĚŮÁÉÍÓÚÝÄÖÜßÑÇÀÈÌÒÙŁŃŚŹŻĄĘÅØÆŐŰÂÊÎÔÛčřšžěůáéíóúýäöüñçàèìòùłńśźżąęåøæőűâêîôûĎŇŤďňť]')
def normalize_diacritics(text: str) -> str:
"""Normalize diacritics to ASCII equivalents."""
normalized = unicodedata.normalize('NFD', text)
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
return ascii_text
def generate_name_suffix(native_name: str) -> str:
"""Convert native language institution name to snake_case suffix."""
# Normalize unicode (NFD decomposition) and remove diacritics
normalized = unicodedata.normalize('NFD', native_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Convert to lowercase
lowercase = ascii_name.lower()
# Remove apostrophes, commas, and other punctuation
no_punct = re.sub(r"[''`\",.:;!?()[\]{}‒–—]", '', lowercase)
# Replace spaces and hyphens with underscores
underscored = re.sub(r'[\s\-]+', '_', no_punct)
# Remove any remaining non-alphanumeric characters (except underscores)
clean = re.sub(r'[^a-z0-9_]', '', underscored)
# Collapse multiple underscores
final = re.sub(r'_+', '_', clean).strip('_')
return final
def generate_uuid_v5(ghcid_string: str) -> str:
"""Generate deterministic UUID v5 from GHCID string."""
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string))
def generate_uuid_v8_sha256(ghcid_string: str) -> str:
"""Generate UUID v8 from SHA-256 hash of GHCID string."""
sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
uuid_bytes = bytearray(sha256_hash[:16])
uuid_bytes[6] = (uuid_bytes[6] & 0x0f) | 0x80
uuid_bytes[8] = (uuid_bytes[8] & 0x3f) | 0x80
return str(uuid.UUID(bytes=bytes(uuid_bytes)))
def generate_numeric_id(ghcid_string: str) -> int:
"""Generate 64-bit numeric ID from SHA-256 hash."""
sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
return int.from_bytes(sha256_hash[:8], byteorder='big')
def find_collision_pairs(custodian_dir: Path) -> list[tuple[Path, Path, str]]:
"""Find files with diacritics that collide with existing ASCII files.
Returns list of (diacritics_file, ascii_file, ascii_ghcid).
"""
collisions = []
for yaml_file in custodian_dir.glob("*.yaml"):
filename = yaml_file.stem # Without .yaml
if not DIACRITICS_PATTERN.search(filename):
continue
# Normalize to ASCII
ascii_filename = normalize_diacritics(filename)
ascii_file = custodian_dir / f"{ascii_filename}.yaml"
if ascii_file.exists():
collisions.append((yaml_file, ascii_file, ascii_filename))
return collisions
def resolve_collision(diacritics_file: Path, ascii_ghcid: str, dry_run: bool = True) -> Optional[dict]:
"""
Resolve a collision by adding a name suffix to the diacritics file.
The diacritics file gets a name suffix since it's being added later.
"""
try:
with open(diacritics_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f" Error reading {diacritics_file}: {e}")
return None
if not data:
return None
# Get institution name for suffix
original_entry = data.get('original_entry', {})
inst_name = original_entry.get('name', '')
if not inst_name:
print(f" Warning: No institution name found in {diacritics_file}")
return None
# Generate name suffix
name_suffix = generate_name_suffix(inst_name)
# Create new GHCID with name suffix
new_ghcid = f"{ascii_ghcid}-{name_suffix}"
# Get old GHCID from file
ghcid_section = data.get('ghcid', {})
old_ghcid = ghcid_section.get('ghcid_current', diacritics_file.stem)
# Generate new identifiers
new_uuid_v5 = generate_uuid_v5(new_ghcid)
new_uuid_v8 = generate_uuid_v8_sha256(new_ghcid)
new_numeric = generate_numeric_id(new_ghcid)
timestamp_now = datetime.now(timezone.utc).isoformat()
change_info = {
'file': str(diacritics_file),
'institution_name': inst_name,
'old_ghcid': old_ghcid,
'new_ghcid': new_ghcid,
'name_suffix': name_suffix,
}
if dry_run:
return change_info
# Update ghcid section
ghcid_section['ghcid_current'] = new_ghcid
ghcid_section['ghcid_uuid'] = new_uuid_v5
ghcid_section['ghcid_uuid_sha256'] = new_uuid_v8
ghcid_section['ghcid_numeric'] = new_numeric
# Add history entry
ghcid_history = ghcid_section.get('ghcid_history', [])
new_history_entry = {
'ghcid': new_ghcid,
'ghcid_numeric': new_numeric,
'valid_from': timestamp_now,
'reason': f"Name suffix added to resolve collision with {ascii_ghcid} (was: {old_ghcid})"
}
if ghcid_history and 'valid_to' not in ghcid_history[0]:
ghcid_history[0]['valid_to'] = timestamp_now
ghcid_section['ghcid_history'] = [new_history_entry] + ghcid_history
data['ghcid'] = ghcid_section
# Update identifiers section
identifiers = data.get('identifiers', [])
for ident in identifiers:
if ident.get('identifier_scheme') == 'GHCID':
ident['identifier_value'] = new_ghcid
elif ident.get('identifier_scheme') == 'GHCID_UUID':
ident['identifier_value'] = new_uuid_v5
elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256':
ident['identifier_value'] = new_uuid_v8
elif ident.get('identifier_scheme') == 'GHCID_NUMERIC':
ident['identifier_value'] = str(new_numeric)
data['identifiers'] = identifiers
# Write updated file
with open(diacritics_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# Rename file to match new GHCID
new_filename = f"{new_ghcid}.yaml"
new_file_path = diacritics_file.parent / new_filename
if new_file_path.exists():
print(f" Warning: Target file already exists: {new_file_path}")
else:
shutil.move(str(diacritics_file), str(new_file_path))
change_info['new_file'] = str(new_file_path)
return change_info
def main():
parser = argparse.ArgumentParser(
description="Resolve GHCID collisions caused by diacritics normalization"
)
parser.add_argument(
'--dry-run',
action='store_true',
help="Preview changes without modifying files"
)
parser.add_argument(
'--custodian-dir',
type=Path,
default=Path('data/custodian'),
help="Path to custodian directory"
)
args = parser.parse_args()
custodian_dir = args.custodian_dir
if not custodian_dir.exists():
print(f"Error: Directory not found: {custodian_dir}")
return 1
print(f"Scanning {custodian_dir} for diacritics collision pairs...")
collisions = find_collision_pairs(custodian_dir)
print(f"Found {len(collisions)} collision pairs\n")
if args.dry_run:
print("=== DRY RUN (no changes will be made) ===\n")
else:
print("=== APPLYING CHANGES ===\n")
changes = []
for i, (diacritics_file, ascii_file, ascii_ghcid) in enumerate(collisions, 1):
print(f"[{i}/{len(collisions)}] Collision:")
print(f" Diacritics file: {diacritics_file.name}")
print(f" Collides with: {ascii_file.name}")
change = resolve_collision(diacritics_file, ascii_ghcid, dry_run=args.dry_run)
if change:
changes.append(change)
print(f" Institution: {change['institution_name']}")
print(f" GHCID change: {change['old_ghcid']}{change['new_ghcid']}")
if 'new_file' in change:
print(f" New file: {Path(change['new_file']).name}")
print()
print(f"=== SUMMARY ===")
print(f"Collisions found: {len(collisions)}")
print(f"Files resolved: {len(changes)}")
if args.dry_run and changes:
print("\nTo apply changes, run without --dry-run flag")
return 0
if __name__ == '__main__':
exit(main())