#!/usr/bin/env python3 """ Fix non-standard Palestinian GHCID file naming. These files have patterns like: - PS-GAZ-AKKAD_MUSEUM.yaml - PS-DEI-DEIR_AL_BALAH_MUSEUM.yaml They should be: - PS-GZ-GAZ-M-AM.yaml (with snake_case suffix if collision) This script: 1. Reads each non-standard file 2. Determines region (GZ for Gaza, WE for West Bank) 3. Determines city code using GeoNames 4. Determines type code from GRP.HER.* type 5. Generates abbreviation from name 6. Creates proper GHCID with UUID generation 7. Renames file """ import os import re import uuid import hashlib import unicodedata from pathlib import Path from datetime import datetime, timezone import yaml import sqlite3 # UUID v5 namespace for GHCID GHCID_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8") # Palestine region mapping REGION_MAP = { 'gaza': 'GZ', 'gaza strip': 'GZ', 'gaza city': 'GZ', 'khan yunis': 'GZ', 'khan younis': 'GZ', 'rafah': 'GZ', 'deir al-balah': 'GZ', 'al-qarara': 'GZ', 'beit lahia': 'GZ', 'beit hanoun': 'GZ', 'jabalia': 'GZ', 'west bank': 'WE', 'ramallah': 'WE', 'nablus': 'WE', 'hebron': 'WE', 'bethlehem': 'WE', 'jenin': 'WE', 'tulkarm': 'WE', 'qalqilya': 'WE', 'jericho': 'WE', 'jerusalem': 'JEM', # Special case - East Jerusalem 'birzeit': 'WE', } # City code mapping CITY_MAP = { 'gaza city': 'GAZ', 'gaza': 'GAZ', 'khan yunis': 'KYN', 'khan younis': 'KYN', 'rafah': 'RAF', 'deir al-balah': 'DEB', 'al-qarara': 'QAR', 'beit lahia': 'BLA', 'jabalia': 'JAB', 'ramallah': 'RAM', 'nablus': 'NAB', 'hebron': 'HEB', 'bethlehem': 'BTH', 'jenin': 'JEN', 'tulkarm': 'TUL', 'qalqilya': 'QAL', 'jericho': 'JER', 'jerusalem': 'JER', 'birzeit': 'BIR', 'beirut': 'BEI', # For LB files 'beit hanoun': 'BHA', } # Type code mapping from GRP.HER.* types TYPE_MAP = { 'GRP.HER.MUS': 'M', 'GRP.HER.LIB': 'L', 'GRP.HER.ARC': 'A', 'GRP.HER.GAL': 'G', 'GRP.HER': 'U', # Unknown if only base type 'GRP.HER.EDU': 'E', 'GRP.HER.RES': 'R', 'GRP.HER.HOL': 'H', } # Skip words for abbreviation SKIP_WORDS = { 'the', 'a', 'an', 'of', 'in', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'as', 'al', 'el', 'de', 'la', 'le', 'les', 'du', 'des', } def generate_abbreviation(name: str, max_length: int = 10) -> str: """Generate abbreviation from first letters of significant words.""" words = re.split(r'[\s\-]+', name) abbrev = '' for word in words: # Clean word clean = re.sub(r'[^a-zA-Z]', '', word).lower() if clean and clean not in SKIP_WORDS: abbrev += clean[0].upper() return abbrev[:max_length] if abbrev else 'UNK' def generate_name_suffix(name: str) -> str: """Convert name to snake_case suffix.""" normalized = unicodedata.normalize('NFD', name) ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') lowercase = ascii_name.lower() no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lowercase) underscored = re.sub(r'[\s\-]+', '_', no_punct) clean = re.sub(r'[^a-z0-9_]', '', underscored) final = re.sub(r'_+', '_', clean).strip('_') return final def generate_uuid_v5(ghcid_string: str) -> str: return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string)) def generate_uuid_v8_sha256(ghcid_string: str) -> str: hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest() uuid_bytes = bytearray(hash_bytes[:16]) uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x80 uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80 return str(uuid.UUID(bytes=bytes(uuid_bytes))) def generate_numeric_id(ghcid_string: str) -> int: hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest() return int.from_bytes(hash_bytes[:8], 'big') & ((1 << 63) - 1) def get_region_code(city: str, location: str = '') -> str: """Determine region code from city and location.""" search = f"{city} {location}".lower() # Check for specific cities first for key, region in REGION_MAP.items(): if key in search: return region # Default to GZ if Gaza mentioned, WE otherwise if 'gaza' in search: return 'GZ' return 'WE' def get_city_code(city: str) -> str: """Get 3-letter city code.""" city_lower = city.lower().strip() # Direct lookup if city_lower in CITY_MAP: return CITY_MAP[city_lower] # Try partial match for key, code in CITY_MAP.items(): if key in city_lower or city_lower in key: return code # Generate from first 3 letters clean = re.sub(r'[^a-zA-Z]', '', city) return clean[:3].upper() if len(clean) >= 3 else 'UNK' def get_type_code(type_str: str, subtype: str = '') -> str: """Get single-letter type code.""" if type_str in TYPE_MAP: return TYPE_MAP[type_str] # Infer from subtype subtype_lower = subtype.lower() if subtype else '' if 'museum' in subtype_lower: return 'M' elif 'library' in subtype_lower or 'lib' in subtype_lower: return 'L' elif 'archive' in subtype_lower: return 'A' elif 'gallery' in subtype_lower: return 'G' return 'U' def is_non_standard_filename(filename: str) -> bool: """Check if filename doesn't follow standard GHCID pattern.""" # Standard: PS-XX-XXX-X-ABBREV.yaml or PS-XX-XXX-X-ABBREV-suffix.yaml standard_pattern = r'^PS-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-[A-Z0-9]+(-[a-z0-9_]+)?\.yaml$' return not re.match(standard_pattern, filename) def process_file(filepath: Path, existing_ghcids: set, dry_run: bool = True): """Process a single non-standard file.""" filename = filepath.name if not is_non_standard_filename(filename): return None # Load YAML with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) original = data.get('original_entry', {}) name = original.get('name', '') city = original.get('city', '') location = original.get('location', '') type_str = original.get('type', '') subtype = original.get('subtype', '') if not name: print(f" SKIP: {filename} (no name)") return None # Determine GHCID components country = 'PS' region = get_region_code(city, location) city_code = get_city_code(city) type_code = get_type_code(type_str, subtype) abbreviation = generate_abbreviation(name) # Build base GHCID base_ghcid = f"{country}-{region}-{city_code}-{type_code}-{abbreviation}" # Check for collision if base_ghcid in existing_ghcids: name_suffix = generate_name_suffix(name) new_ghcid = f"{base_ghcid}-{name_suffix}" else: new_ghcid = base_ghcid print(f"\n File: {filename}") print(f" Name: {name}") print(f" City: {city} → Region: {region}, City Code: {city_code}") print(f" Type: {type_str} → {type_code}") print(f" Abbreviation: {abbreviation}") print(f" New GHCID: {new_ghcid}") # Generate UUIDs new_uuid_v5 = generate_uuid_v5(new_ghcid) new_uuid_v8 = generate_uuid_v8_sha256(new_ghcid) new_numeric = generate_numeric_id(new_ghcid) if not dry_run: now = datetime.now(timezone.utc).isoformat() # Create/update GHCID section data['ghcid'] = { 'ghcid_current': new_ghcid, 'ghcid_original': base_ghcid, 'ghcid_uuid': new_uuid_v5, 'ghcid_uuid_sha256': new_uuid_v8, 'ghcid_numeric': new_numeric, 'location_resolution': { 'method': 'INFERRED_FROM_CITY', 'country_code': country, 'region_code': region, 'city_code': city_code, 'city_name': city, 'resolution_date': now, 'notes': f'Migrated from non-standard filename: {filename}' } } # Write new file new_filename = f"{new_ghcid}.yaml" new_filepath = filepath.parent / new_filename with open(new_filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) # Delete old file if different if new_filepath != filepath: filepath.unlink() print(f" MIGRATED: {filename} → {new_filename}") # Track new GHCID existing_ghcids.add(base_ghcid) return new_filepath else: print(f" Would become: {new_ghcid}.yaml") return None def main(): import argparse parser = argparse.ArgumentParser(description='Fix non-standard PS GHCID file naming') parser.add_argument('--execute', action='store_true', help='Actually perform migration') parser.add_argument('--path', default='/Users/kempersc/apps/glam/data/custodian', help='Path to custodian files') args = parser.parse_args() custodian_dir = Path(args.path) # Build set of existing base GHCIDs existing_ghcids = set() for f in custodian_dir.glob('PS-*.yaml'): match = re.match(r'^(PS-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-[A-Z0-9]+)', f.name) if match: existing_ghcids.add(match.group(1)) print(f"Found {len(existing_ghcids)} existing standard PS GHCIDs") # Find non-standard files ps_files = list(custodian_dir.glob('PS-*.yaml')) non_standard = [f for f in ps_files if is_non_standard_filename(f.name)] print(f"Found {len(non_standard)} non-standard PS files to fix") if args.execute: print("\n=== EXECUTING MIGRATION ===") else: print("\n=== DRY RUN ===") migrated = 0 for filepath in sorted(non_standard): result = process_file(filepath, existing_ghcids, dry_run=not args.execute) if result: migrated += 1 if args.execute: print(f"\n=== MIGRATION COMPLETE: {migrated} files migrated ===") else: print(f"\n=== DRY RUN COMPLETE: {len(non_standard)} files would be migrated ===") if __name__ == '__main__': main()