- Add fix_ghcid_diacritics.py for normalizing non-ASCII in GHCIDs - Add resolve_diacritics_collisions.py for collision handling - Add transliterate_emic_names.py for non-Latin script handling - Add transliteration tests
325 lines
10 KiB
Python
325 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fix GHCID abbreviations containing diacritics.
|
|
|
|
This script normalizes diacritics in GHCID abbreviation components to ASCII,
|
|
regenerates UUIDs and numeric IDs, updates GHCID history, and renames files.
|
|
|
|
Rule: ABBREV-DIACRITICS
|
|
See: .opencode/ABBREVIATION_SPECIAL_CHAR_RULE.md
|
|
|
|
Usage:
|
|
python scripts/fix_ghcid_diacritics.py --dry-run # Preview changes
|
|
python scripts/fix_ghcid_diacritics.py # Apply changes
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import shutil
|
|
import unicodedata
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import yaml
|
|
|
|
# GHCID namespace UUID for deterministic UUID generation
|
|
GHCID_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8") # URL namespace
|
|
|
|
# Regex pattern for common diacritics
|
|
DIACRITICS_PATTERN = re.compile(r'[ČŘŠŽĚŮÁÉÍÓÚÝÄÖÜßÑÇÀÈÌÒÙŁŃŚŹŻĄĘÅØÆŐŰÂÊÎÔÛčřšžěůáéíóúýäöüñçàèìòùłńśźżąęåøæőűâêîôû]')
|
|
|
|
|
|
def normalize_diacritics(text: str) -> str:
|
|
"""
|
|
Normalize diacritics to ASCII equivalents.
|
|
|
|
Uses Unicode NFD decomposition to separate base characters from
|
|
combining marks, then removes the combining marks.
|
|
|
|
Examples:
|
|
"Č" → "C"
|
|
"Ř" → "R"
|
|
"Ö" → "O"
|
|
"ñ" → "n"
|
|
"""
|
|
# NFD decomposition separates base characters from combining marks
|
|
normalized = unicodedata.normalize('NFD', text)
|
|
# Remove combining marks (category 'Mn' = Mark, Nonspacing)
|
|
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
return ascii_text
|
|
|
|
|
|
def has_diacritics_in_ghcid(ghcid: str) -> bool:
|
|
"""Check if GHCID contains any diacritics (in any component).
|
|
|
|
Diacritics can appear in:
|
|
- Region code (e.g., '31' is fine, but city code 'ČB' has diacritics)
|
|
- City code (e.g., 'TŘE' for Třebíč)
|
|
- Abbreviation (e.g., 'VHSPAOČRZS')
|
|
"""
|
|
return bool(DIACRITICS_PATTERN.search(ghcid))
|
|
|
|
|
|
def has_diacritics_in_abbreviation(ghcid: str) -> bool:
|
|
"""Check if GHCID abbreviation component contains diacritics."""
|
|
# GHCID format: CC-RR-CCC-T-ABBREV or CC-RR-CCC-T-ABBREV-suffix
|
|
parts = ghcid.split('-')
|
|
if len(parts) >= 5:
|
|
# Abbreviation is the 5th component (index 4)
|
|
abbrev = parts[4]
|
|
return bool(DIACRITICS_PATTERN.search(abbrev))
|
|
return False
|
|
|
|
|
|
def fix_ghcid_diacritics(ghcid: str) -> str:
|
|
"""
|
|
Fix diacritics in ALL GHCID components.
|
|
|
|
Normalizes diacritics in all parts: country, region, city, type,
|
|
abbreviation, and any suffix components.
|
|
"""
|
|
parts = ghcid.split('-')
|
|
# Normalize all parts
|
|
normalized_parts = [normalize_diacritics(part) for part in parts]
|
|
return '-'.join(normalized_parts)
|
|
|
|
|
|
def generate_uuid_v5(ghcid_string: str) -> str:
|
|
"""Generate deterministic UUID v5 from GHCID string."""
|
|
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string))
|
|
|
|
|
|
def generate_uuid_v8_sha256(ghcid_string: str) -> str:
|
|
"""Generate UUID v8 from SHA-256 hash of GHCID string."""
|
|
sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
|
|
# Take first 16 bytes for UUID
|
|
uuid_bytes = bytearray(sha256_hash[:16])
|
|
# Set version to 8 (custom)
|
|
uuid_bytes[6] = (uuid_bytes[6] & 0x0f) | 0x80
|
|
# Set variant to RFC 4122
|
|
uuid_bytes[8] = (uuid_bytes[8] & 0x3f) | 0x80
|
|
return str(uuid.UUID(bytes=bytes(uuid_bytes)))
|
|
|
|
|
|
def generate_numeric_id(ghcid_string: str) -> int:
|
|
"""Generate 64-bit numeric ID from SHA-256 hash."""
|
|
sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
|
|
# Take first 8 bytes as 64-bit unsigned integer
|
|
numeric_id = int.from_bytes(sha256_hash[:8], byteorder='big')
|
|
return numeric_id
|
|
|
|
|
|
def process_file(file_path: Path, dry_run: bool = True) -> Optional[dict]:
|
|
"""
|
|
Process a single YAML file to fix GHCID diacritics.
|
|
|
|
Returns dict with change info, or None if no change needed.
|
|
"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f" Error reading {file_path}: {e}")
|
|
return None
|
|
|
|
if not data or 'ghcid' not in data:
|
|
return None
|
|
|
|
ghcid_section = data.get('ghcid', {})
|
|
old_ghcid = ghcid_section.get('ghcid_current', '')
|
|
|
|
if not has_diacritics_in_ghcid(old_ghcid):
|
|
return None
|
|
|
|
# Fix the GHCID
|
|
new_ghcid = fix_ghcid_diacritics(old_ghcid)
|
|
|
|
if new_ghcid == old_ghcid:
|
|
return None
|
|
|
|
# Generate new identifiers
|
|
new_uuid_v5 = generate_uuid_v5(new_ghcid)
|
|
new_uuid_v8 = generate_uuid_v8_sha256(new_ghcid)
|
|
new_numeric = generate_numeric_id(new_ghcid)
|
|
timestamp_now = datetime.now(timezone.utc).isoformat()
|
|
|
|
change_info = {
|
|
'file': str(file_path),
|
|
'old_ghcid': old_ghcid,
|
|
'new_ghcid': new_ghcid,
|
|
'old_uuid': ghcid_section.get('ghcid_uuid', ''),
|
|
'new_uuid': new_uuid_v5,
|
|
'old_numeric': ghcid_section.get('ghcid_numeric', 0),
|
|
'new_numeric': new_numeric,
|
|
}
|
|
|
|
if dry_run:
|
|
return change_info
|
|
|
|
# Update ghcid section
|
|
ghcid_section['ghcid_current'] = new_ghcid
|
|
ghcid_section['ghcid_uuid'] = new_uuid_v5
|
|
ghcid_section['ghcid_uuid_sha256'] = new_uuid_v8
|
|
ghcid_section['ghcid_numeric'] = new_numeric
|
|
# Keep original as-is (for historical reference)
|
|
|
|
# Add history entry for the fix
|
|
ghcid_history = ghcid_section.get('ghcid_history', [])
|
|
|
|
# Add new entry at the beginning
|
|
new_history_entry = {
|
|
'ghcid': new_ghcid,
|
|
'ghcid_numeric': new_numeric,
|
|
'valid_from': timestamp_now,
|
|
'reason': f"Normalized diacritics to ASCII per ABBREV-DIACRITICS rule (was: {old_ghcid})"
|
|
}
|
|
|
|
# Mark previous entry as superseded
|
|
if ghcid_history:
|
|
if 'valid_to' not in ghcid_history[0]:
|
|
ghcid_history[0]['valid_to'] = timestamp_now
|
|
|
|
ghcid_section['ghcid_history'] = [new_history_entry] + ghcid_history
|
|
data['ghcid'] = ghcid_section
|
|
|
|
# Update identifiers section
|
|
identifiers = data.get('identifiers', [])
|
|
for ident in identifiers:
|
|
if ident.get('identifier_scheme') == 'GHCID':
|
|
ident['identifier_value'] = new_ghcid
|
|
elif ident.get('identifier_scheme') == 'GHCID_UUID':
|
|
ident['identifier_value'] = new_uuid_v5
|
|
elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256':
|
|
ident['identifier_value'] = new_uuid_v8
|
|
elif ident.get('identifier_scheme') == 'GHCID_NUMERIC':
|
|
ident['identifier_value'] = str(new_numeric)
|
|
data['identifiers'] = identifiers
|
|
|
|
# Write updated file
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
# Rename file to match new GHCID
|
|
old_filename = file_path.name
|
|
new_filename = f"{new_ghcid}.yaml"
|
|
|
|
if old_filename != new_filename:
|
|
new_file_path = file_path.parent / new_filename
|
|
if new_file_path.exists():
|
|
print(f" Warning: Target file already exists: {new_file_path}")
|
|
# Don't rename if target exists
|
|
else:
|
|
shutil.move(str(file_path), str(new_file_path))
|
|
change_info['new_file'] = str(new_file_path)
|
|
|
|
return change_info
|
|
|
|
|
|
def find_affected_files(custodian_dir: Path) -> list[Path]:
|
|
"""Find all YAML files with diacritics in GHCID abbreviation.
|
|
|
|
Uses filename-based detection for speed, since filenames match GHCID.
|
|
"""
|
|
import subprocess
|
|
|
|
# Use find with regex for speed - filenames contain the GHCID
|
|
try:
|
|
result = subprocess.run(
|
|
['find', str(custodian_dir), '-name', '*.yaml', '-type', 'f'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
all_files = [Path(p) for p in result.stdout.strip().split('\n') if p]
|
|
except Exception:
|
|
# Fallback to glob
|
|
all_files = list(custodian_dir.glob("*.yaml"))
|
|
|
|
affected = []
|
|
for yaml_file in all_files:
|
|
# Check filename for diacritics (faster than parsing YAML)
|
|
if DIACRITICS_PATTERN.search(yaml_file.stem):
|
|
affected.append(yaml_file)
|
|
|
|
return affected
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Fix GHCID abbreviations containing diacritics"
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help="Preview changes without modifying files"
|
|
)
|
|
parser.add_argument(
|
|
'--limit',
|
|
type=int,
|
|
default=0,
|
|
help="Limit number of files to process (0 = no limit)"
|
|
)
|
|
parser.add_argument(
|
|
'--custodian-dir',
|
|
type=Path,
|
|
default=Path('data/custodian'),
|
|
help="Path to custodian directory"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
custodian_dir = args.custodian_dir
|
|
if not custodian_dir.exists():
|
|
print(f"Error: Directory not found: {custodian_dir}")
|
|
return 1
|
|
|
|
print(f"Scanning {custodian_dir} for files with diacritics in GHCID abbreviation...")
|
|
affected_files = find_affected_files(custodian_dir)
|
|
|
|
print(f"Found {len(affected_files)} affected files")
|
|
|
|
if args.limit > 0:
|
|
affected_files = affected_files[:args.limit]
|
|
print(f"Limited to {args.limit} files")
|
|
|
|
if args.dry_run:
|
|
print("\n=== DRY RUN (no changes will be made) ===\n")
|
|
else:
|
|
print("\n=== APPLYING CHANGES ===\n")
|
|
|
|
changes = []
|
|
for i, file_path in enumerate(affected_files, 1):
|
|
print(f"[{i}/{len(affected_files)}] Processing {file_path.name}...")
|
|
change = process_file(file_path, dry_run=args.dry_run)
|
|
if change:
|
|
changes.append(change)
|
|
print(f" {change['old_ghcid']} → {change['new_ghcid']}")
|
|
|
|
print(f"\n=== SUMMARY ===")
|
|
print(f"Files processed: {len(affected_files)}")
|
|
print(f"Files changed: {len(changes)}")
|
|
|
|
if args.dry_run and changes:
|
|
print("\nTo apply changes, run without --dry-run flag")
|
|
|
|
# Show country distribution
|
|
if changes:
|
|
countries = {}
|
|
for c in changes:
|
|
cc = c['old_ghcid'].split('-')[0]
|
|
countries[cc] = countries.get(cc, 0) + 1
|
|
|
|
print("\nBy country:")
|
|
for cc, count in sorted(countries.items(), key=lambda x: -x[1]):
|
|
print(f" {cc}: {count}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|