328 lines
10 KiB
Python
Executable file
328 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Fix non-standard Palestinian GHCID file naming.
|
|
|
|
These files have patterns like:
|
|
- PS-GAZ-AKKAD_MUSEUM.yaml
|
|
- PS-DEI-DEIR_AL_BALAH_MUSEUM.yaml
|
|
|
|
They should be:
|
|
- PS-GZ-GAZ-M-AM.yaml (with snake_case suffix if collision)
|
|
|
|
This script:
|
|
1. Reads each non-standard file
|
|
2. Determines region (GZ for Gaza, WE for West Bank)
|
|
3. Determines city code using GeoNames
|
|
4. Determines type code from GRP.HER.* type
|
|
5. Generates abbreviation from name
|
|
6. Creates proper GHCID with UUID generation
|
|
7. Renames file
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import uuid
|
|
import hashlib
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
import yaml
|
|
import sqlite3
|
|
|
|
# UUID v5 namespace for GHCID
|
|
GHCID_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
|
|
|
|
# Palestine region mapping
|
|
REGION_MAP = {
|
|
'gaza': 'GZ',
|
|
'gaza strip': 'GZ',
|
|
'gaza city': 'GZ',
|
|
'khan yunis': 'GZ',
|
|
'khan younis': 'GZ',
|
|
'rafah': 'GZ',
|
|
'deir al-balah': 'GZ',
|
|
'al-qarara': 'GZ',
|
|
'beit lahia': 'GZ',
|
|
'beit hanoun': 'GZ',
|
|
'jabalia': 'GZ',
|
|
'west bank': 'WE',
|
|
'ramallah': 'WE',
|
|
'nablus': 'WE',
|
|
'hebron': 'WE',
|
|
'bethlehem': 'WE',
|
|
'jenin': 'WE',
|
|
'tulkarm': 'WE',
|
|
'qalqilya': 'WE',
|
|
'jericho': 'WE',
|
|
'jerusalem': 'JEM', # Special case - East Jerusalem
|
|
'birzeit': 'WE',
|
|
}
|
|
|
|
# City code mapping
|
|
CITY_MAP = {
|
|
'gaza city': 'GAZ',
|
|
'gaza': 'GAZ',
|
|
'khan yunis': 'KYN',
|
|
'khan younis': 'KYN',
|
|
'rafah': 'RAF',
|
|
'deir al-balah': 'DEB',
|
|
'al-qarara': 'QAR',
|
|
'beit lahia': 'BLA',
|
|
'jabalia': 'JAB',
|
|
'ramallah': 'RAM',
|
|
'nablus': 'NAB',
|
|
'hebron': 'HEB',
|
|
'bethlehem': 'BTH',
|
|
'jenin': 'JEN',
|
|
'tulkarm': 'TUL',
|
|
'qalqilya': 'QAL',
|
|
'jericho': 'JER',
|
|
'jerusalem': 'JER',
|
|
'birzeit': 'BIR',
|
|
'beirut': 'BEI', # For LB files
|
|
'beit hanoun': 'BHA',
|
|
}
|
|
|
|
# Type code mapping from GRP.HER.* types
|
|
TYPE_MAP = {
|
|
'GRP.HER.MUS': 'M',
|
|
'GRP.HER.LIB': 'L',
|
|
'GRP.HER.ARC': 'A',
|
|
'GRP.HER.GAL': 'G',
|
|
'GRP.HER': 'U', # Unknown if only base type
|
|
'GRP.HER.EDU': 'E',
|
|
'GRP.HER.RES': 'R',
|
|
'GRP.HER.HOL': 'H',
|
|
}
|
|
|
|
# Skip words for abbreviation
|
|
SKIP_WORDS = {
|
|
'the', 'a', 'an', 'of', 'in', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'as',
|
|
'al', 'el', 'de', 'la', 'le', 'les', 'du', 'des',
|
|
}
|
|
|
|
def generate_abbreviation(name: str, max_length: int = 10) -> str:
|
|
"""Generate abbreviation from first letters of significant words."""
|
|
words = re.split(r'[\s\-]+', name)
|
|
abbrev = ''
|
|
for word in words:
|
|
# Clean word
|
|
clean = re.sub(r'[^a-zA-Z]', '', word).lower()
|
|
if clean and clean not in SKIP_WORDS:
|
|
abbrev += clean[0].upper()
|
|
return abbrev[:max_length] if abbrev else 'UNK'
|
|
|
|
def generate_name_suffix(name: str) -> str:
|
|
"""Convert name to snake_case suffix."""
|
|
normalized = unicodedata.normalize('NFD', name)
|
|
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
lowercase = ascii_name.lower()
|
|
no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lowercase)
|
|
underscored = re.sub(r'[\s\-]+', '_', no_punct)
|
|
clean = re.sub(r'[^a-z0-9_]', '', underscored)
|
|
final = re.sub(r'_+', '_', clean).strip('_')
|
|
return final
|
|
|
|
def generate_uuid_v5(ghcid_string: str) -> str:
|
|
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string))
|
|
|
|
def generate_uuid_v8_sha256(ghcid_string: str) -> str:
|
|
hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
|
|
uuid_bytes = bytearray(hash_bytes[:16])
|
|
uuid_bytes[6] = (uuid_bytes[6] & 0x0F) | 0x80
|
|
uuid_bytes[8] = (uuid_bytes[8] & 0x3F) | 0x80
|
|
return str(uuid.UUID(bytes=bytes(uuid_bytes)))
|
|
|
|
def generate_numeric_id(ghcid_string: str) -> int:
|
|
hash_bytes = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
|
|
return int.from_bytes(hash_bytes[:8], 'big') & ((1 << 63) - 1)
|
|
|
|
def get_region_code(city: str, location: str = '') -> str:
|
|
"""Determine region code from city and location."""
|
|
search = f"{city} {location}".lower()
|
|
|
|
# Check for specific cities first
|
|
for key, region in REGION_MAP.items():
|
|
if key in search:
|
|
return region
|
|
|
|
# Default to GZ if Gaza mentioned, WE otherwise
|
|
if 'gaza' in search:
|
|
return 'GZ'
|
|
return 'WE'
|
|
|
|
def get_city_code(city: str) -> str:
|
|
"""Get 3-letter city code."""
|
|
city_lower = city.lower().strip()
|
|
|
|
# Direct lookup
|
|
if city_lower in CITY_MAP:
|
|
return CITY_MAP[city_lower]
|
|
|
|
# Try partial match
|
|
for key, code in CITY_MAP.items():
|
|
if key in city_lower or city_lower in key:
|
|
return code
|
|
|
|
# Generate from first 3 letters
|
|
clean = re.sub(r'[^a-zA-Z]', '', city)
|
|
return clean[:3].upper() if len(clean) >= 3 else 'UNK'
|
|
|
|
def get_type_code(type_str: str, subtype: str = '') -> str:
|
|
"""Get single-letter type code."""
|
|
if type_str in TYPE_MAP:
|
|
return TYPE_MAP[type_str]
|
|
|
|
# Infer from subtype
|
|
subtype_lower = subtype.lower() if subtype else ''
|
|
if 'museum' in subtype_lower:
|
|
return 'M'
|
|
elif 'library' in subtype_lower or 'lib' in subtype_lower:
|
|
return 'L'
|
|
elif 'archive' in subtype_lower:
|
|
return 'A'
|
|
elif 'gallery' in subtype_lower:
|
|
return 'G'
|
|
|
|
return 'U'
|
|
|
|
def is_non_standard_filename(filename: str) -> bool:
|
|
"""Check if filename doesn't follow standard GHCID pattern."""
|
|
# Standard: PS-XX-XXX-X-ABBREV.yaml or PS-XX-XXX-X-ABBREV-suffix.yaml
|
|
standard_pattern = r'^PS-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-[A-Z0-9]+(-[a-z0-9_]+)?\.yaml$'
|
|
return not re.match(standard_pattern, filename)
|
|
|
|
def process_file(filepath: Path, existing_ghcids: set, dry_run: bool = True):
|
|
"""Process a single non-standard file."""
|
|
filename = filepath.name
|
|
|
|
if not is_non_standard_filename(filename):
|
|
return None
|
|
|
|
# Load YAML
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
original = data.get('original_entry', {})
|
|
name = original.get('name', '')
|
|
city = original.get('city', '')
|
|
location = original.get('location', '')
|
|
type_str = original.get('type', '')
|
|
subtype = original.get('subtype', '')
|
|
|
|
if not name:
|
|
print(f" SKIP: {filename} (no name)")
|
|
return None
|
|
|
|
# Determine GHCID components
|
|
country = 'PS'
|
|
region = get_region_code(city, location)
|
|
city_code = get_city_code(city)
|
|
type_code = get_type_code(type_str, subtype)
|
|
abbreviation = generate_abbreviation(name)
|
|
|
|
# Build base GHCID
|
|
base_ghcid = f"{country}-{region}-{city_code}-{type_code}-{abbreviation}"
|
|
|
|
# Check for collision
|
|
if base_ghcid in existing_ghcids:
|
|
name_suffix = generate_name_suffix(name)
|
|
new_ghcid = f"{base_ghcid}-{name_suffix}"
|
|
else:
|
|
new_ghcid = base_ghcid
|
|
|
|
print(f"\n File: {filename}")
|
|
print(f" Name: {name}")
|
|
print(f" City: {city} → Region: {region}, City Code: {city_code}")
|
|
print(f" Type: {type_str} → {type_code}")
|
|
print(f" Abbreviation: {abbreviation}")
|
|
print(f" New GHCID: {new_ghcid}")
|
|
|
|
# Generate UUIDs
|
|
new_uuid_v5 = generate_uuid_v5(new_ghcid)
|
|
new_uuid_v8 = generate_uuid_v8_sha256(new_ghcid)
|
|
new_numeric = generate_numeric_id(new_ghcid)
|
|
|
|
if not dry_run:
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Create/update GHCID section
|
|
data['ghcid'] = {
|
|
'ghcid_current': new_ghcid,
|
|
'ghcid_original': base_ghcid,
|
|
'ghcid_uuid': new_uuid_v5,
|
|
'ghcid_uuid_sha256': new_uuid_v8,
|
|
'ghcid_numeric': new_numeric,
|
|
'location_resolution': {
|
|
'method': 'INFERRED_FROM_CITY',
|
|
'country_code': country,
|
|
'region_code': region,
|
|
'city_code': city_code,
|
|
'city_name': city,
|
|
'resolution_date': now,
|
|
'notes': f'Migrated from non-standard filename: {filename}'
|
|
}
|
|
}
|
|
|
|
# Write new file
|
|
new_filename = f"{new_ghcid}.yaml"
|
|
new_filepath = filepath.parent / new_filename
|
|
|
|
with open(new_filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
# Delete old file if different
|
|
if new_filepath != filepath:
|
|
filepath.unlink()
|
|
print(f" MIGRATED: {filename} → {new_filename}")
|
|
|
|
# Track new GHCID
|
|
existing_ghcids.add(base_ghcid)
|
|
|
|
return new_filepath
|
|
else:
|
|
print(f" Would become: {new_ghcid}.yaml")
|
|
return None
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Fix non-standard PS GHCID file naming')
|
|
parser.add_argument('--execute', action='store_true', help='Actually perform migration')
|
|
parser.add_argument('--path', default='/Users/kempersc/apps/glam/data/custodian', help='Path to custodian files')
|
|
args = parser.parse_args()
|
|
|
|
custodian_dir = Path(args.path)
|
|
|
|
# Build set of existing base GHCIDs
|
|
existing_ghcids = set()
|
|
for f in custodian_dir.glob('PS-*.yaml'):
|
|
match = re.match(r'^(PS-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-[A-Z0-9]+)', f.name)
|
|
if match:
|
|
existing_ghcids.add(match.group(1))
|
|
|
|
print(f"Found {len(existing_ghcids)} existing standard PS GHCIDs")
|
|
|
|
# Find non-standard files
|
|
ps_files = list(custodian_dir.glob('PS-*.yaml'))
|
|
non_standard = [f for f in ps_files if is_non_standard_filename(f.name)]
|
|
|
|
print(f"Found {len(non_standard)} non-standard PS files to fix")
|
|
|
|
if args.execute:
|
|
print("\n=== EXECUTING MIGRATION ===")
|
|
else:
|
|
print("\n=== DRY RUN ===")
|
|
|
|
migrated = 0
|
|
for filepath in sorted(non_standard):
|
|
result = process_file(filepath, existing_ghcids, dry_run=not args.execute)
|
|
if result:
|
|
migrated += 1
|
|
|
|
if args.execute:
|
|
print(f"\n=== MIGRATION COMPLETE: {migrated} files migrated ===")
|
|
else:
|
|
print(f"\n=== DRY RUN COMPLETE: {len(non_standard)} files would be migrated ===")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|