266 lines
8.2 KiB
Python
266 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fix Austrian region codes in custodian YAML files.
|
|
|
|
Problem:
|
|
- Some files use AT-0X (leading zeros) instead of AT-X
|
|
- Some files use AT-XX letter codes (vehicle registration) instead of ISO 3166-2
|
|
|
|
ISO 3166-2:AT codes (single digit, no leading zeros):
|
|
- AT-1 = Burgenland
|
|
- AT-2 = Kärnten (Carinthia)
|
|
- AT-3 = Niederösterreich (Lower Austria)
|
|
- AT-4 = Oberösterreich (Upper Austria)
|
|
- AT-5 = Salzburg
|
|
- AT-6 = Steiermark (Styria)
|
|
- AT-7 = Tirol (Tyrol)
|
|
- AT-8 = Vorarlberg
|
|
- AT-9 = Wien (Vienna)
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
# Mapping from letter codes (vehicle registration) to ISO 3166-2 numbers
|
|
LETTER_TO_ISO = {
|
|
'B': '1', # Burgenland
|
|
'K': '2', # Kärnten
|
|
'NO': '3', # Niederösterreich
|
|
'OO': '4', # Oberösterreich
|
|
'S': '5', # Salzburg
|
|
'ST': '6', # Steiermark
|
|
'T': '7', # Tirol
|
|
'V': '8', # Vorarlberg
|
|
'W': '9', # Wien
|
|
}
|
|
|
|
# Region names for documentation
|
|
REGION_NAMES = {
|
|
'1': 'Burgenland',
|
|
'2': 'Kärnten',
|
|
'3': 'Niederösterreich',
|
|
'4': 'Oberösterreich',
|
|
'5': 'Salzburg',
|
|
'6': 'Steiermark',
|
|
'7': 'Tirol',
|
|
'8': 'Vorarlberg',
|
|
'9': 'Wien',
|
|
}
|
|
|
|
|
|
def get_correct_region_code(old_code: str) -> tuple[str, str]:
|
|
"""
|
|
Convert old region code to correct ISO 3166-2 code.
|
|
|
|
Returns: (correct_code, correction_type)
|
|
"""
|
|
# Check for leading zero (AT-01 -> AT-1)
|
|
if old_code.startswith('0') and len(old_code) == 2:
|
|
return old_code[1], 'leading_zero'
|
|
|
|
# Check for letter code
|
|
if old_code in LETTER_TO_ISO:
|
|
return LETTER_TO_ISO[old_code], 'letter_code'
|
|
|
|
# Already correct
|
|
if old_code in REGION_NAMES:
|
|
return old_code, 'already_correct'
|
|
|
|
return None, 'unknown'
|
|
|
|
|
|
def fix_yaml_content(content: str, old_ghcid: str, new_ghcid: str,
|
|
old_region: str, new_region: str, correction_type: str) -> str:
|
|
"""Fix the YAML content with new GHCID and region codes."""
|
|
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
region_name = REGION_NAMES.get(new_region, 'Unknown')
|
|
|
|
# Determine reason text
|
|
if correction_type == 'leading_zero':
|
|
reason = f"Corrected region code from AT-0{new_region} to AT-{new_region} (removed leading zero per ISO 3166-2:AT)"
|
|
else:
|
|
old_letter = [k for k, v in LETTER_TO_ISO.items() if v == new_region][0]
|
|
reason = f"Corrected region code from AT-{old_letter} (vehicle registration code) to AT-{new_region} ({region_name}) per ISO 3166-2:AT"
|
|
|
|
# Replace GHCID in ghcid_current
|
|
content = re.sub(
|
|
r'(ghcid_current:\s*)' + re.escape(old_ghcid),
|
|
r'\g<1>' + new_ghcid,
|
|
content
|
|
)
|
|
|
|
# Replace GHCID in identifiers
|
|
content = re.sub(
|
|
r'(identifier_value:\s*)' + re.escape(old_ghcid),
|
|
r'\g<1>' + new_ghcid,
|
|
content
|
|
)
|
|
|
|
# Replace region_code in location_resolution
|
|
content = re.sub(
|
|
r'(location_resolution:.*?region_code:\s*)' + re.escape(old_region),
|
|
r'\g<1>' + new_region,
|
|
content,
|
|
flags=re.DOTALL
|
|
)
|
|
|
|
# Replace region_code in location section
|
|
content = re.sub(
|
|
r'(location:.*?region_code:\s*)' + re.escape(old_region),
|
|
r'\g<1>' + new_region,
|
|
content,
|
|
flags=re.DOTALL
|
|
)
|
|
|
|
# Add history entry after ghcid_current line
|
|
history_entry = f'''
|
|
ghcid_history:
|
|
- ghcid: {new_ghcid}
|
|
valid_from: "{timestamp}"
|
|
valid_to: null
|
|
reason: "{reason}"
|
|
- ghcid: {old_ghcid}
|
|
valid_from: null
|
|
valid_to: "{timestamp}"
|
|
reason: "Original GHCID with incorrect region code"'''
|
|
|
|
# Check if ghcid_history already exists
|
|
if 'ghcid_history:' in content:
|
|
# Insert new entry at the beginning of existing history
|
|
new_history_items = f'''- ghcid: {new_ghcid}
|
|
valid_from: "{timestamp}"
|
|
valid_to: null
|
|
reason: "{reason}"
|
|
- ghcid: {old_ghcid}
|
|
valid_from: null
|
|
valid_to: "{timestamp}"
|
|
reason: "Previous GHCID with incorrect region code"
|
|
'''
|
|
content = re.sub(
|
|
r'(ghcid_history:\s*\n\s*)',
|
|
r'\g<1>' + new_history_items,
|
|
content
|
|
)
|
|
else:
|
|
# Add ghcid_history after ghcid_current
|
|
content = re.sub(
|
|
r'(ghcid_current:\s*' + re.escape(new_ghcid) + r')',
|
|
r'\g<1>' + history_entry,
|
|
content
|
|
)
|
|
|
|
return content
|
|
|
|
|
|
def process_file(filepath: Path, dry_run: bool = False) -> dict:
|
|
"""Process a single YAML file and return results."""
|
|
|
|
filename = filepath.name
|
|
|
|
# Extract current GHCID from filename (e.g., AT-09-VIE-A-OSA.yaml)
|
|
match = re.match(r'AT-([A-Z0-9]+)-([A-Z]+)-([A-Z])-(.+)\.yaml', filename)
|
|
if not match:
|
|
return {'status': 'skipped', 'reason': 'filename pattern mismatch'}
|
|
|
|
old_region = match.group(1)
|
|
city = match.group(2)
|
|
inst_type = match.group(3)
|
|
abbrev = match.group(4)
|
|
|
|
# Get correct region code
|
|
new_region, correction_type = get_correct_region_code(old_region)
|
|
|
|
if correction_type == 'already_correct':
|
|
return {'status': 'skipped', 'reason': 'already correct'}
|
|
|
|
if correction_type == 'unknown':
|
|
return {'status': 'error', 'reason': f'unknown region code: {old_region}'}
|
|
|
|
old_ghcid = f"AT-{old_region}-{city}-{inst_type}-{abbrev}"
|
|
new_ghcid = f"AT-{new_region}-{city}-{inst_type}-{abbrev}"
|
|
new_filename = f"{new_ghcid}.yaml"
|
|
new_filepath = filepath.parent / new_filename
|
|
|
|
if dry_run:
|
|
return {
|
|
'status': 'would_fix',
|
|
'old_ghcid': old_ghcid,
|
|
'new_ghcid': new_ghcid,
|
|
'old_file': filename,
|
|
'new_file': new_filename,
|
|
'correction_type': correction_type
|
|
}
|
|
|
|
# Read file content
|
|
content = filepath.read_text(encoding='utf-8')
|
|
|
|
# Fix content
|
|
new_content = fix_yaml_content(content, old_ghcid, new_ghcid, old_region, new_region, correction_type)
|
|
|
|
# Write to new file
|
|
new_filepath.write_text(new_content, encoding='utf-8')
|
|
|
|
# Remove old file if different name
|
|
if filepath != new_filepath:
|
|
filepath.unlink()
|
|
|
|
return {
|
|
'status': 'fixed',
|
|
'old_ghcid': old_ghcid,
|
|
'new_ghcid': new_ghcid,
|
|
'old_file': filename,
|
|
'new_file': new_filename,
|
|
'correction_type': correction_type
|
|
}
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Fix Austrian region codes in custodian files')
|
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without making changes')
|
|
parser.add_argument('--dir', default='/Users/kempersc/apps/glam/data/custodian', help='Directory containing custodian files')
|
|
args = parser.parse_args()
|
|
|
|
custodian_dir = Path(args.dir)
|
|
|
|
# Find all AT-*.yaml files
|
|
at_files = list(custodian_dir.glob('AT-*.yaml'))
|
|
print(f"Found {len(at_files)} Austrian files")
|
|
|
|
results = {
|
|
'fixed': [],
|
|
'would_fix': [],
|
|
'skipped': [],
|
|
'errors': []
|
|
}
|
|
|
|
for filepath in sorted(at_files):
|
|
result = process_file(filepath, dry_run=args.dry_run)
|
|
status = result['status']
|
|
|
|
if status in ('fixed', 'would_fix'):
|
|
results[status].append(result)
|
|
action = 'Would fix' if args.dry_run else 'Fixed'
|
|
print(f" {action}: {result['old_ghcid']} -> {result['new_ghcid']} ({result['correction_type']})")
|
|
elif status == 'error':
|
|
results['errors'].append((filepath.name, result['reason']))
|
|
print(f" ERROR: {filepath.name} - {result['reason']}")
|
|
# Skip already correct files silently
|
|
|
|
print(f"\n{'DRY RUN - ' if args.dry_run else ''}Summary:")
|
|
print(f" Fixed/Would fix: {len(results['fixed']) + len(results['would_fix'])}")
|
|
print(f" Already correct: {len(at_files) - len(results['fixed']) - len(results['would_fix']) - len(results['errors'])}")
|
|
print(f" Errors: {len(results['errors'])}")
|
|
|
|
if results['errors']:
|
|
print("\nErrors:")
|
|
for filename, reason in results['errors']:
|
|
print(f" {filename}: {reason}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|