263 lines
9.6 KiB
Python
263 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Fix remaining non-ASCII GHCID files:
|
||
1. Bulgarian files with Cyrillic - add year suffixes for collision resolution
|
||
2. Swiss files with parentheses - fix city codes
|
||
|
||
Per AGENTS.md:
|
||
- Rule about diacritics normalization
|
||
- Rule about special characters in abbreviations
|
||
- Collision resolution via year suffix
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import uuid
|
||
import hashlib
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
import yaml
|
||
|
||
|
||
# Custom YAML handling for anchors
|
||
class NoAliasDumper(yaml.SafeDumper):
|
||
def ignore_aliases(self, data):
|
||
return True
|
||
|
||
|
||
def generate_ghcid_uuid_v5(ghcid: str) -> str:
|
||
"""Generate UUID v5 from GHCID string using heritage namespace."""
|
||
HERITAGE_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
|
||
return str(uuid.uuid5(HERITAGE_NAMESPACE, ghcid))
|
||
|
||
|
||
def generate_ghcid_uuid_sha256(ghcid: str) -> str:
|
||
"""Generate UUID v8 (SHA-256 based) from GHCID string."""
|
||
hash_bytes = hashlib.sha256(ghcid.encode('utf-8')).digest()[:16]
|
||
hash_bytes = bytes([
|
||
hash_bytes[0], hash_bytes[1], hash_bytes[2], hash_bytes[3],
|
||
hash_bytes[4], hash_bytes[5],
|
||
(hash_bytes[6] & 0x0F) | 0x80, # Version 8
|
||
hash_bytes[7],
|
||
(hash_bytes[8] & 0x3F) | 0x80, # Variant
|
||
hash_bytes[9], hash_bytes[10], hash_bytes[11],
|
||
hash_bytes[12], hash_bytes[13], hash_bytes[14], hash_bytes[15]
|
||
])
|
||
return str(uuid.UUID(bytes=hash_bytes))
|
||
|
||
|
||
def generate_ghcid_numeric(ghcid: str) -> int:
|
||
"""Generate 64-bit numeric ID from GHCID."""
|
||
hash_bytes = hashlib.sha256(ghcid.encode('utf-8')).digest()
|
||
return int.from_bytes(hash_bytes[:8], byteorder='big')
|
||
|
||
|
||
def fix_bulgarian_file(old_filepath: str, year_suffix: str, new_abbrev: str):
|
||
"""Fix Bulgarian file with Cyrillic characters and add year suffix."""
|
||
print(f"\n=== Processing: {old_filepath} ===")
|
||
|
||
with open(old_filepath, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
old_ghcid = data['ghcid']['ghcid_current']
|
||
|
||
# Extract components (country-region-city-type-abbrev)
|
||
parts = old_ghcid.rsplit('-', 4)
|
||
if len(parts) >= 5:
|
||
country, region, city, inst_type = parts[0], parts[1], parts[2], parts[3]
|
||
else:
|
||
print(f" ERROR: Could not parse GHCID: {old_ghcid}")
|
||
return
|
||
|
||
# New GHCID with ASCII abbreviation and year suffix
|
||
new_ghcid = f"{country}-{region}-{city}-{inst_type}-{new_abbrev}-{year_suffix}"
|
||
print(f" Old GHCID: {old_ghcid}")
|
||
print(f" New GHCID: {new_ghcid}")
|
||
|
||
# Generate new UUIDs
|
||
new_uuid = generate_ghcid_uuid_v5(new_ghcid)
|
||
new_uuid_sha256 = generate_ghcid_uuid_sha256(new_ghcid)
|
||
new_numeric = generate_ghcid_numeric(new_ghcid)
|
||
|
||
timestamp = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Update GHCID section
|
||
data['ghcid']['ghcid_current'] = new_ghcid
|
||
data['ghcid']['ghcid_uuid'] = new_uuid
|
||
data['ghcid']['ghcid_uuid_sha256'] = new_uuid_sha256
|
||
data['ghcid']['ghcid_numeric'] = new_numeric
|
||
|
||
# Add history entry
|
||
if 'ghcid_history' not in data['ghcid']:
|
||
data['ghcid']['ghcid_history'] = []
|
||
|
||
# Insert new history entry at the beginning
|
||
data['ghcid']['ghcid_history'].insert(0, {
|
||
'ghcid': new_ghcid,
|
||
'valid_from': timestamp,
|
||
'valid_to': None,
|
||
'reason': f'Collision resolution: Added year suffix -{year_suffix} to differentiate from existing GHCID. Cyrillic to ASCII transliteration applied.'
|
||
})
|
||
|
||
# Update identifiers list
|
||
if 'identifiers' in data:
|
||
for ident in data['identifiers']:
|
||
if ident.get('identifier_scheme') == 'GHCID':
|
||
ident['identifier_value'] = new_ghcid
|
||
elif ident.get('identifier_scheme') == 'GHCID_UUID':
|
||
ident['identifier_value'] = new_uuid
|
||
elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256':
|
||
ident['identifier_value'] = new_uuid_sha256
|
||
elif ident.get('identifier_scheme') == 'GHCID_NUMERIC':
|
||
ident['identifier_value'] = str(new_numeric)
|
||
|
||
# New filename
|
||
new_filename = f"{new_ghcid}.yaml"
|
||
new_filepath = os.path.join(os.path.dirname(old_filepath), new_filename)
|
||
|
||
# Write new file
|
||
with open(new_filepath, 'w', encoding='utf-8') as f:
|
||
yaml.dump(data, f, Dumper=NoAliasDumper, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
||
|
||
print(f" Created: {new_filename}")
|
||
|
||
# Remove old file
|
||
os.remove(old_filepath)
|
||
print(f" Deleted: {os.path.basename(old_filepath)}")
|
||
|
||
|
||
def fix_swiss_file(old_filepath: str, new_city_code: str):
|
||
"""Fix Swiss file with parentheses in city code."""
|
||
print(f"\n=== Processing: {old_filepath} ===")
|
||
|
||
with open(old_filepath, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
old_ghcid = data['ghcid']['ghcid_current']
|
||
|
||
# Parse old GHCID to get components
|
||
# Format: CH-{region}-{city}-{type}-{abbrev}
|
||
# E.g., CH-BE-G(-A-SCCMMA or CH-GE-V(--L-BLUGUAAO
|
||
|
||
# Find the position of the institution type (single letter followed by -)
|
||
match = re.match(r'^([A-Z]{2})-([A-Z]{2})-(.+)-([GLAMORCSEFIBXPHDNT])-(.+)$', old_ghcid)
|
||
if match:
|
||
country, region, old_city, inst_type, abbrev = match.groups()
|
||
else:
|
||
print(f" ERROR: Could not parse GHCID: {old_ghcid}")
|
||
return
|
||
|
||
new_ghcid = f"{country}-{region}-{new_city_code}-{inst_type}-{abbrev}"
|
||
print(f" Old GHCID: {old_ghcid}")
|
||
print(f" New GHCID: {new_ghcid}")
|
||
|
||
# Generate new UUIDs
|
||
new_uuid = generate_ghcid_uuid_v5(new_ghcid)
|
||
new_uuid_sha256 = generate_ghcid_uuid_sha256(new_ghcid)
|
||
new_numeric = generate_ghcid_numeric(new_ghcid)
|
||
|
||
timestamp = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Update GHCID section
|
||
data['ghcid']['ghcid_current'] = new_ghcid
|
||
data['ghcid']['ghcid_uuid'] = new_uuid
|
||
data['ghcid']['ghcid_uuid_sha256'] = new_uuid_sha256
|
||
data['ghcid']['ghcid_numeric'] = new_numeric
|
||
|
||
# Update location_resolution city_code
|
||
if 'location_resolution' in data['ghcid']:
|
||
data['ghcid']['location_resolution']['city_code'] = new_city_code
|
||
|
||
# Add history entry
|
||
if 'ghcid_history' not in data['ghcid']:
|
||
data['ghcid']['ghcid_history'] = []
|
||
|
||
data['ghcid']['ghcid_history'].insert(0, {
|
||
'ghcid': new_ghcid,
|
||
'valid_from': timestamp,
|
||
'valid_to': None,
|
||
'reason': f'Fixed city code: Removed parentheses from city code per ABBREV-CHAR-FILTER rule. Old city code: {old_city}'
|
||
})
|
||
|
||
# Update identifiers list
|
||
if 'identifiers' in data:
|
||
for ident in data['identifiers']:
|
||
if ident.get('identifier_scheme') == 'GHCID':
|
||
ident['identifier_value'] = new_ghcid
|
||
elif ident.get('identifier_scheme') == 'GHCID_UUID':
|
||
ident['identifier_value'] = new_uuid
|
||
elif ident.get('identifier_scheme') == 'GHCID_UUID_SHA256':
|
||
ident['identifier_value'] = new_uuid_sha256
|
||
elif ident.get('identifier_scheme') == 'GHCID_NUMERIC':
|
||
ident['identifier_value'] = str(new_numeric)
|
||
|
||
# New filename
|
||
new_filename = f"{new_ghcid}.yaml"
|
||
new_filepath = os.path.join(os.path.dirname(old_filepath), new_filename)
|
||
|
||
# Write new file
|
||
with open(new_filepath, 'w', encoding='utf-8') as f:
|
||
yaml.dump(data, f, Dumper=NoAliasDumper, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
||
|
||
print(f" Created: {new_filename}")
|
||
|
||
# Remove old file
|
||
os.remove(old_filepath)
|
||
print(f" Deleted: {os.path.basename(old_filepath)}")
|
||
|
||
|
||
def main():
|
||
custodian_dir = Path('/Users/kempersc/apps/glam/data/custodian')
|
||
|
||
# Bulgarian files with Cyrillic that need year suffixes
|
||
# These collide with existing ASCII files, so need year suffix resolution
|
||
bulgarian_fixes = [
|
||
# (filename, year_suffix, ascii_abbreviation)
|
||
('BG-11-RAY-L-БПНЧП-55.yaml', '1955', 'BPNCHP'), # "Просвета 55" - assuming 1955
|
||
('BG-11-RAY-L-БПНЧИ.yaml', '1930', 'BPNCHI'), # "Изгрев-1930"
|
||
('BG-23-TRU-L-БПНЧСГ.yaml', '1906', 'BPNCHSG'), # "Светлина – 1906 г."
|
||
]
|
||
|
||
# Swiss files with parentheses in city code
|
||
swiss_fixes = [
|
||
# (filename, new_city_code)
|
||
('CH-BE-G(-A-SCCMMA.yaml', 'GWA'), # Gwatt (Thun) -> GWA
|
||
('CH-GE-V(--L-BLUGUAAO.yaml', 'VER'), # Versoix (Sauverny) -> VER
|
||
]
|
||
|
||
print("=== FIXING BULGARIAN CYRILLIC FILES ===")
|
||
for filename, year_suffix, abbrev in bulgarian_fixes:
|
||
filepath = custodian_dir / filename
|
||
if filepath.exists():
|
||
fix_bulgarian_file(str(filepath), year_suffix, abbrev)
|
||
else:
|
||
print(f" SKIP: {filename} not found")
|
||
|
||
print("\n=== FIXING SWISS PARENTHESES FILES ===")
|
||
for filename, new_city_code in swiss_fixes:
|
||
filepath = custodian_dir / filename
|
||
if filepath.exists():
|
||
fix_swiss_file(str(filepath), new_city_code)
|
||
else:
|
||
print(f" SKIP: {filename} not found")
|
||
|
||
print("\n=== VERIFICATION ===")
|
||
# Check for any remaining non-ASCII filenames
|
||
remaining = []
|
||
for f in custodian_dir.iterdir():
|
||
if f.is_file() and f.suffix == '.yaml':
|
||
try:
|
||
f.name.encode('ascii')
|
||
except UnicodeEncodeError:
|
||
remaining.append(f.name)
|
||
|
||
if remaining:
|
||
print(f"WARNING: {len(remaining)} files still have non-ASCII names:")
|
||
for name in remaining:
|
||
print(f" - {name}")
|
||
else:
|
||
print("SUCCESS: All custodian files now have ASCII-only filenames!")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|