glam/scripts/fix_austrian_region_codes.py
2025-12-10 13:01:13 +01:00

266 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Fix Austrian region codes in custodian YAML files.
Problem:
- Some files use AT-0X (leading zeros) instead of AT-X
- Some files use AT-XX letter codes (vehicle registration) instead of ISO 3166-2
ISO 3166-2:AT codes (single digit, no leading zeros):
- AT-1 = Burgenland
- AT-2 = Kärnten (Carinthia)
- AT-3 = Niederösterreich (Lower Austria)
- AT-4 = Oberösterreich (Upper Austria)
- AT-5 = Salzburg
- AT-6 = Steiermark (Styria)
- AT-7 = Tirol (Tyrol)
- AT-8 = Vorarlberg
- AT-9 = Wien (Vienna)
"""
import os
import re
import sys
from pathlib import Path
from datetime import datetime, timezone
# Mapping from letter codes (vehicle registration) to ISO 3166-2 numbers
LETTER_TO_ISO = {
'B': '1', # Burgenland
'K': '2', # Kärnten
'NO': '3', # Niederösterreich
'OO': '4', # Oberösterreich
'S': '5', # Salzburg
'ST': '6', # Steiermark
'T': '7', # Tirol
'V': '8', # Vorarlberg
'W': '9', # Wien
}
# Region names for documentation
REGION_NAMES = {
'1': 'Burgenland',
'2': 'Kärnten',
'3': 'Niederösterreich',
'4': 'Oberösterreich',
'5': 'Salzburg',
'6': 'Steiermark',
'7': 'Tirol',
'8': 'Vorarlberg',
'9': 'Wien',
}
def get_correct_region_code(old_code: str) -> tuple[str, str]:
"""
Convert old region code to correct ISO 3166-2 code.
Returns: (correct_code, correction_type)
"""
# Check for leading zero (AT-01 -> AT-1)
if old_code.startswith('0') and len(old_code) == 2:
return old_code[1], 'leading_zero'
# Check for letter code
if old_code in LETTER_TO_ISO:
return LETTER_TO_ISO[old_code], 'letter_code'
# Already correct
if old_code in REGION_NAMES:
return old_code, 'already_correct'
return None, 'unknown'
def fix_yaml_content(content: str, old_ghcid: str, new_ghcid: str,
old_region: str, new_region: str, correction_type: str) -> str:
"""Fix the YAML content with new GHCID and region codes."""
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
region_name = REGION_NAMES.get(new_region, 'Unknown')
# Determine reason text
if correction_type == 'leading_zero':
reason = f"Corrected region code from AT-0{new_region} to AT-{new_region} (removed leading zero per ISO 3166-2:AT)"
else:
old_letter = [k for k, v in LETTER_TO_ISO.items() if v == new_region][0]
reason = f"Corrected region code from AT-{old_letter} (vehicle registration code) to AT-{new_region} ({region_name}) per ISO 3166-2:AT"
# Replace GHCID in ghcid_current
content = re.sub(
r'(ghcid_current:\s*)' + re.escape(old_ghcid),
r'\g<1>' + new_ghcid,
content
)
# Replace GHCID in identifiers
content = re.sub(
r'(identifier_value:\s*)' + re.escape(old_ghcid),
r'\g<1>' + new_ghcid,
content
)
# Replace region_code in location_resolution
content = re.sub(
r'(location_resolution:.*?region_code:\s*)' + re.escape(old_region),
r'\g<1>' + new_region,
content,
flags=re.DOTALL
)
# Replace region_code in location section
content = re.sub(
r'(location:.*?region_code:\s*)' + re.escape(old_region),
r'\g<1>' + new_region,
content,
flags=re.DOTALL
)
# Add history entry after ghcid_current line
history_entry = f'''
ghcid_history:
- ghcid: {new_ghcid}
valid_from: "{timestamp}"
valid_to: null
reason: "{reason}"
- ghcid: {old_ghcid}
valid_from: null
valid_to: "{timestamp}"
reason: "Original GHCID with incorrect region code"'''
# Check if ghcid_history already exists
if 'ghcid_history:' in content:
# Insert new entry at the beginning of existing history
new_history_items = f'''- ghcid: {new_ghcid}
valid_from: "{timestamp}"
valid_to: null
reason: "{reason}"
- ghcid: {old_ghcid}
valid_from: null
valid_to: "{timestamp}"
reason: "Previous GHCID with incorrect region code"
'''
content = re.sub(
r'(ghcid_history:\s*\n\s*)',
r'\g<1>' + new_history_items,
content
)
else:
# Add ghcid_history after ghcid_current
content = re.sub(
r'(ghcid_current:\s*' + re.escape(new_ghcid) + r')',
r'\g<1>' + history_entry,
content
)
return content
def process_file(filepath: Path, dry_run: bool = False) -> dict:
"""Process a single YAML file and return results."""
filename = filepath.name
# Extract current GHCID from filename (e.g., AT-09-VIE-A-OSA.yaml)
match = re.match(r'AT-([A-Z0-9]+)-([A-Z]+)-([A-Z])-(.+)\.yaml', filename)
if not match:
return {'status': 'skipped', 'reason': 'filename pattern mismatch'}
old_region = match.group(1)
city = match.group(2)
inst_type = match.group(3)
abbrev = match.group(4)
# Get correct region code
new_region, correction_type = get_correct_region_code(old_region)
if correction_type == 'already_correct':
return {'status': 'skipped', 'reason': 'already correct'}
if correction_type == 'unknown':
return {'status': 'error', 'reason': f'unknown region code: {old_region}'}
old_ghcid = f"AT-{old_region}-{city}-{inst_type}-{abbrev}"
new_ghcid = f"AT-{new_region}-{city}-{inst_type}-{abbrev}"
new_filename = f"{new_ghcid}.yaml"
new_filepath = filepath.parent / new_filename
if dry_run:
return {
'status': 'would_fix',
'old_ghcid': old_ghcid,
'new_ghcid': new_ghcid,
'old_file': filename,
'new_file': new_filename,
'correction_type': correction_type
}
# Read file content
content = filepath.read_text(encoding='utf-8')
# Fix content
new_content = fix_yaml_content(content, old_ghcid, new_ghcid, old_region, new_region, correction_type)
# Write to new file
new_filepath.write_text(new_content, encoding='utf-8')
# Remove old file if different name
if filepath != new_filepath:
filepath.unlink()
return {
'status': 'fixed',
'old_ghcid': old_ghcid,
'new_ghcid': new_ghcid,
'old_file': filename,
'new_file': new_filename,
'correction_type': correction_type
}
def main():
import argparse
parser = argparse.ArgumentParser(description='Fix Austrian region codes in custodian files')
parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without making changes')
parser.add_argument('--dir', default='/Users/kempersc/apps/glam/data/custodian', help='Directory containing custodian files')
args = parser.parse_args()
custodian_dir = Path(args.dir)
# Find all AT-*.yaml files
at_files = list(custodian_dir.glob('AT-*.yaml'))
print(f"Found {len(at_files)} Austrian files")
results = {
'fixed': [],
'would_fix': [],
'skipped': [],
'errors': []
}
for filepath in sorted(at_files):
result = process_file(filepath, dry_run=args.dry_run)
status = result['status']
if status in ('fixed', 'would_fix'):
results[status].append(result)
action = 'Would fix' if args.dry_run else 'Fixed'
print(f" {action}: {result['old_ghcid']} -> {result['new_ghcid']} ({result['correction_type']})")
elif status == 'error':
results['errors'].append((filepath.name, result['reason']))
print(f" ERROR: {filepath.name} - {result['reason']}")
# Skip already correct files silently
print(f"\n{'DRY RUN - ' if args.dry_run else ''}Summary:")
print(f" Fixed/Would fix: {len(results['fixed']) + len(results['would_fix'])}")
print(f" Already correct: {len(at_files) - len(results['fixed']) - len(results['would_fix']) - len(results['errors'])}")
print(f" Errors: {len(results['errors'])}")
if results['errors']:
print("\nErrors:")
for filename, reason in results['errors']:
print(f" {filename}: {reason}")
if __name__ == '__main__':
main()