glam/scripts/fix_inst_abbreviations.py
2025-12-10 13:01:13 +01:00

286 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Fix INST abbreviations by properly transliterating emic names.
This script:
1. Finds all files with INST as abbreviation
2. Extracts emic_name and name_language
3. Transliterates using transliterate_emic_names.py
4. Generates proper abbreviation
5. Updates GHCID and renames file
Usage:
python scripts/fix_inst_abbreviations.py --dry-run
python scripts/fix_inst_abbreviations.py
"""
import argparse
import os
import re
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, List, Tuple
import yaml
# Import transliteration function
from scripts.transliterate_emic_names import transliterate_for_abbreviation
# Skip words for abbreviation extraction (articles, prepositions, conjunctions)
SKIP_WORDS = {
# English
'a', 'an', 'the', 'of', 'in', 'at', 'on', 'to', 'for', 'with', 'from', 'by',
'as', 'under', 'and', 'or', 'but',
# Dutch
'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des',
"'s", 'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit', 'over', 'onder', 'door', 'en', 'of',
# French
'le', 'la', 'les', 'un', 'une', 'des', 'de', 'd', 'du', 'à', 'au', 'aux',
'en', 'dans', 'sur', 'sous', 'pour', 'par', 'avec', 'l', 'et', 'ou',
# German
'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einer', 'einem',
'einen', 'von', 'zu', 'für', 'mit', 'bei', 'nach', 'aus', 'vor', 'über',
'unter', 'durch', 'und', 'oder',
# Spanish
'el', 'la', 'los', 'las', 'un', 'una', 'unos', 'unas', 'de', 'del', 'a',
'al', 'en', 'con', 'por', 'para', 'sobre', 'bajo', 'y', 'o', 'e', 'u',
# Portuguese
'o', 'a', 'os', 'as', 'um', 'uma', 'uns', 'umas', 'de', 'do', 'da', 'dos',
'das', 'em', 'no', 'na', 'nos', 'nas', 'para', 'por', 'com', 'sobre', 'sob', 'e', 'ou',
# Italian
'il', 'lo', 'la', 'i', 'gli', 'le', 'un', 'uno', 'una', 'di', 'del', 'dello',
'della', 'dei', 'degli', 'delle', 'a', 'al', 'allo', 'alla', 'ai', 'agli',
'alle', 'da', 'dal', 'dallo', 'dalla', 'dai', 'dagli', 'dalle', 'in', 'nel',
'nello', 'nella', 'nei', 'negli', 'nelle', 'su', 'sul', 'sullo', 'sulla',
'sui', 'sugli', 'sulle', 'con', 'per', 'tra', 'fra', 'e', 'ed', 'o', 'od',
# Arabic transliteration common words
'al', 'el', 'wa', 'bi', 'li', 'fi', 'min',
# Hebrew transliteration common words
'ha', 've', 'be', 'le', 'me',
# Romanized CJK particles
'no', 'wo', 'ga', 'ni', 'de', 'to', 'wa', 'e', # Japanese
}
def extract_abbreviation(name: str) -> str:
"""Extract abbreviation from transliterated name."""
if not name:
return "UNK"
# Normalize
normalized = unicodedata.normalize('NFD', name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Remove punctuation
cleaned = re.sub(r"[''`\",.:;!?()[\]{}]", '', ascii_name)
# Split into words
words = cleaned.split()
# Filter skip words and digits
significant = []
for word in words:
word_lower = word.lower()
if word_lower not in SKIP_WORDS and not word.isdigit() and len(word) > 0:
# Only take first letter if it's alphabetic
if word[0].isalpha():
significant.append(word)
if not significant:
# Fallback: use first 3 words regardless
significant = [w for w in words[:3] if w and w[0].isalpha()]
# Take first letter of each significant word (up to 10)
abbrev = ''.join(w[0].upper() for w in significant[:10] if w)
# Remove any non-ASCII characters that slipped through
abbrev = ''.join(c for c in abbrev if ord(c) < 128 and c.isalpha())
return abbrev if abbrev else "UNK"
def fix_file(filepath: Path, dry_run: bool = False) -> Dict:
"""Fix a single file's INST abbreviation."""
filename = filepath.name
# Read file
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
data = yaml.safe_load(content)
if not data:
return {'status': 'error', 'reason': 'empty file'}
# Get emic name and language
custodian_name = data.get('custodian_name', {})
emic_name = custodian_name.get('emic_name')
lang = custodian_name.get('name_language')
if not emic_name:
return {'status': 'skip', 'reason': 'no emic_name'}
if not lang:
return {'status': 'skip', 'reason': 'no name_language'}
# Transliterate
try:
transliterated = transliterate_for_abbreviation(emic_name, lang)
except Exception as e:
return {'status': 'error', 'reason': f'transliteration failed: {e}'}
# Extract abbreviation
new_abbrev = extract_abbreviation(transliterated)
if new_abbrev == "UNK" or not new_abbrev:
return {'status': 'error', 'reason': f'could not extract abbreviation from "{transliterated}"'}
# Get current GHCID components
ghcid = data.get('ghcid', {})
current_ghcid = ghcid.get('ghcid_current', '')
# Parse current GHCID
match = re.match(r'^([A-Z]{2})-([A-Z0-9]+)-([A-Z]+)-([A-Z])-(.+)$', current_ghcid)
if not match:
return {'status': 'error', 'reason': f'could not parse GHCID: {current_ghcid}'}
country, region, city, inst_type, old_abbrev = match.groups()
if old_abbrev != 'INST':
return {'status': 'skip', 'reason': f'not INST abbreviation: {old_abbrev}'}
# Create new GHCID
new_ghcid = f"{country}-{region}-{city}-{inst_type}-{new_abbrev}"
new_filename = f"{new_ghcid}.yaml"
new_filepath = filepath.parent / new_filename
# Check for collision
if new_filepath.exists() and new_filepath != filepath:
return {
'status': 'collision',
'old_file': filename,
'new_file': new_filename,
'reason': 'target file exists'
}
if dry_run:
return {
'status': 'would_update',
'old_file': filename,
'new_file': new_filename,
'old_abbrev': old_abbrev,
'new_abbrev': new_abbrev,
'emic_name': emic_name,
'transliterated': transliterated,
'lang': lang
}
# Update GHCID in data
data['ghcid']['ghcid_current'] = new_ghcid
# Add history entry
timestamp = datetime.now(timezone.utc).isoformat()
history_entry = {
'ghcid': new_ghcid,
'valid_from': timestamp,
'reason': f'Abbreviation fixed via transliteration: "{emic_name}" ({lang}) → "{transliterated}"{new_abbrev}'
}
if 'ghcid_history' not in data['ghcid']:
data['ghcid']['ghcid_history'] = []
data['ghcid']['ghcid_history'].append(history_entry)
# Update location_resolution if present
if 'location_resolution' in data['ghcid']:
# Region code should already be correct from previous fixes
pass
# Write updated file
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# Rename file
if new_filepath != filepath:
os.rename(filepath, new_filepath)
return {
'status': 'updated',
'old_file': filename,
'new_file': new_filename,
'old_abbrev': old_abbrev,
'new_abbrev': new_abbrev,
'emic_name': emic_name,
'transliterated': transliterated,
'lang': lang
}
def main():
parser = argparse.ArgumentParser(description='Fix INST abbreviations using transliteration')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
parser.add_argument('--limit', type=int, default=0, help='Limit number of files to process')
args = parser.parse_args()
custodian_dir = Path('data/custodian')
# Find all INST files
inst_files = list(custodian_dir.glob('*-INST.yaml'))
if args.limit > 0:
inst_files = inst_files[:args.limit]
print("=" * 60)
print(f"FIX INST ABBREVIATIONS {'(DRY RUN)' if args.dry_run else ''}")
print("=" * 60)
print(f"\nFound {len(inst_files)} files with INST abbreviation\n")
results = {
'updated': [],
'would_update': [],
'collision': [],
'skip': [],
'error': []
}
for filepath in sorted(inst_files):
result = fix_file(filepath, dry_run=args.dry_run)
status = result['status']
results[status].append(result)
if status in ('updated', 'would_update'):
print(f"{result['old_file']}")
print(f"{result['new_file']}")
print(f" Emic: {result['emic_name']} ({result['lang']})")
print(f" Trans: {result['transliterated']}")
print(f" Abbrev: {result['old_abbrev']}{result['new_abbrev']}")
print()
elif status == 'collision':
print(f"⚠ COLLISION: {result['old_file']}{result['new_file']}")
elif status == 'error':
print(f"✗ ERROR: {filepath.name}: {result['reason']}")
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
if args.dry_run:
print(f"Would update: {len(results['would_update'])}")
else:
print(f"Updated: {len(results['updated'])}")
print(f"Collisions: {len(results['collision'])}")
print(f"Skipped: {len(results['skip'])}")
print(f"Errors: {len(results['error'])}")
if results['collision']:
print("\nCollisions:")
for r in results['collision']:
print(f" {r['old_file']}{r['new_file']}")
if results['error']:
print("\nErrors:")
for r in results['error']:
print(f" {r.get('old_file', 'unknown')}: {r['reason']}")
if __name__ == '__main__':
main()