glam/scripts/convert_palestinian_to_custodian.py
2025-12-07 00:26:01 +01:00

390 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Convert Palestinian heritage consolidated JSON to individual custodian YAML files.
This script reads data/extracted/palestinian_heritage_consolidated.json and creates
individual YAML files in data/custodian/ following the established format.
Features:
- Converts all 84 institutions to custodian YAML format
- Maps conflict_status to time_of_destruction for destroyed institutions
- Preserves all enrichment data (Wikidata, Google Maps, etc.)
- Generates proper identifiers list
- Creates provenance tracking
"""
import json
import yaml
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Optional
import re
import unicodedata
def slugify(text: Optional[str]) -> str:
"""Convert text to a safe filename slug."""
if not text:
return 'XXX'
# Normalize unicode and remove diacritics
normalized = unicodedata.normalize('NFD', str(text))
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Replace spaces and special chars with underscores
slug = re.sub(r'[^\w\s-]', '', ascii_text)
slug = re.sub(r'[\s-]+', '_', slug)
return slug.strip('_').upper() or 'XXX'
def map_type_to_glam_code(type_str: str) -> str:
"""Map institution type string to GLAM type code."""
type_mapping = {
'GRP.HER.MUS': 'M', # Museum
'GRP.HER.LIB': 'L', # Library
'GRP.HER.ARC': 'A', # Archive
'GRP.HER': 'R', # Research/Heritage center (default)
'GRP.HER.GAL': 'G', # Gallery
'GRP.EDU': 'E', # Education
'museum': 'M',
'library': 'L',
'archive': 'A',
'oral_history_archive': 'A',
'photographic_archive': 'A',
'research': 'R',
}
return type_mapping.get(type_str, 'R')
def convert_institution_to_custodian(inst: dict, metadata: dict) -> dict:
"""Convert a Palestinian heritage institution to custodian YAML format."""
custodian = {}
# Original entry section
original_entry = {
'id': inst.get('id'),
'name': inst.get('name'),
'name_arabic': inst.get('name_arabic'),
'type': inst.get('type'),
'subtype': inst.get('subtype'),
'country': inst.get('country'),
'city': inst.get('city'),
'location': inst.get('location'),
}
# Add optional fields if present
for field in ['founded', 'founded_by', 'affiliation', 'notes', 'confidence']:
if inst.get(field):
original_entry[field] = inst[field]
# Add coordinates if present
if inst.get('coordinates'):
original_entry['coordinates'] = inst['coordinates']
# Add Wikidata reference if present
if inst.get('wikidata'):
original_entry['wikidata'] = inst['wikidata']
# Add original identifiers if present
if inst.get('identifiers'):
original_entry['identifiers'] = inst['identifiers']
if inst.get('website'):
original_entry['website'] = inst['website']
custodian['original_entry'] = original_entry
# Entry index (sequential)
custodian['entry_index'] = inst.get('id', 'unknown')
# Processing timestamp
custodian['processing_timestamp'] = datetime.now(timezone.utc).isoformat()
# Wikidata enrichment - convert to expected format if we have wikidata info
if inst.get('wikidata'):
wikidata_enrichment = {
'wikidata_entity_id': inst['wikidata'].get('id'),
'wikidata_url': inst['wikidata'].get('url'),
'wikidata_description': inst['wikidata'].get('description'),
}
custodian['wikidata_enrichment'] = wikidata_enrichment
# Enrichment status
custodian['enrichment_status'] = inst.get('enrichment_status', 'not_enriched')
# Provenance
sources = inst.get('sources', ['palestinian_heritage_consolidated'])
provenance = {
'schema_version': '1.0.0',
'generated_at': datetime.now(timezone.utc).isoformat(),
'sources': {
'original_entry': [{
'source_type': 'palestinian_heritage_extraction',
'data_tier': 'TIER_4_INFERRED',
'extraction_source': sources,
}]
},
'data_tier_summary': {
'TIER_1_AUTHORITATIVE': [],
'TIER_2_VERIFIED': [],
'TIER_3_CROWD_SOURCED': ['wikidata'] if inst.get('wikidata') else [],
'TIER_4_INFERRED': ['palestinian_heritage_extraction'],
},
'notes': [
f"Extracted from palestinian_heritage_consolidated.json v{metadata.get('version', '2.4.0')}",
f"Confidence score: {inst.get('confidence', 'unknown')}",
]
}
custodian['provenance'] = provenance
# Google Maps enrichment
if inst.get('google_maps_enrichment'):
custodian['google_maps_enrichment'] = inst['google_maps_enrichment']
custodian['google_maps_status'] = 'SUCCESS'
if inst.get('google_maps_search_query'):
custodian['google_maps_search_query'] = inst['google_maps_search_query']
elif inst.get('google_maps_status'):
custodian['google_maps_status'] = inst['google_maps_status']
if inst.get('google_maps_search_query'):
custodian['google_maps_search_query'] = inst['google_maps_search_query']
# GHCID section
if inst.get('ghcid'):
ghcid_section = {
'ghcid_current': inst['ghcid'],
'ghcid_original': inst['ghcid'],
'ghcid_uuid': inst.get('ghcid_uuid'),
'ghcid_uuid_sha256': inst.get('ghcid_uuid_sha256'),
'ghcid_numeric': inst.get('ghcid_numeric'),
'generation_timestamp': inst.get('ghcid_generated'),
'ghcid_history': [{
'ghcid': inst['ghcid'],
'ghcid_numeric': inst.get('ghcid_numeric'),
'valid_from': inst.get('ghcid_generated'),
'valid_to': None,
'reason': 'Initial GHCID generation from Palestinian heritage extraction'
}],
}
if inst.get('ghcid_components'):
ghcid_section['location_resolution'] = {
'method': 'COORDINATE_LOOKUP' if inst.get('coordinates') else 'NAME_LOOKUP',
'country_code': inst['ghcid_components'].get('country'),
'region_code': inst['ghcid_components'].get('region'),
'city_code': inst['ghcid_components'].get('city'),
}
custodian['ghcid'] = ghcid_section
# Identifiers list
identifiers = []
# Add GHCID identifiers
if inst.get('ghcid'):
identifiers.append({
'identifier_scheme': 'GHCID',
'identifier_value': inst['ghcid']
})
if inst.get('ghcid_uuid'):
identifiers.append({
'identifier_scheme': 'GHCID_UUID',
'identifier_value': inst['ghcid_uuid'],
'identifier_url': f"urn:uuid:{inst['ghcid_uuid']}"
})
if inst.get('ghcid_uuid_sha256'):
identifiers.append({
'identifier_scheme': 'GHCID_UUID_SHA256',
'identifier_value': inst['ghcid_uuid_sha256'],
'identifier_url': f"urn:uuid:{inst['ghcid_uuid_sha256']}"
})
if inst.get('ghcid_numeric'):
identifiers.append({
'identifier_scheme': 'GHCID_NUMERIC',
'identifier_value': str(inst['ghcid_numeric'])
})
# Add Wikidata identifier
if inst.get('wikidata', {}).get('id'):
identifiers.append({
'identifier_scheme': 'Wikidata',
'identifier_value': inst['wikidata']['id'],
'identifier_url': inst['wikidata'].get('url')
})
# Add other identifiers from the institution
if inst.get('identifiers'):
for scheme, value in inst['identifiers'].items():
if scheme.lower() == 'viaf':
identifiers.append({
'identifier_scheme': 'VIAF',
'identifier_value': value,
'identifier_url': f'https://viaf.org/viaf/{value}'
})
elif scheme.lower() == 'gnd':
identifiers.append({
'identifier_scheme': 'GND',
'identifier_value': value,
'identifier_url': f'https://d-nb.info/gnd/{value}'
})
elif scheme.lower() == 'lcnaf':
identifiers.append({
'identifier_scheme': 'LCNAF',
'identifier_value': value,
'identifier_url': f'https://id.loc.gov/authorities/names/{value}'
})
elif scheme.lower() == 'isni':
identifiers.append({
'identifier_scheme': 'ISNI',
'identifier_value': value,
'identifier_url': f'https://isni.org/isni/{value}'
})
else:
identifiers.append({
'identifier_scheme': scheme.upper(),
'identifier_value': value
})
if identifiers:
custodian['identifiers'] = identifiers
# Conflict status → time_of_destruction
if inst.get('conflict_status'):
conflict = inst['conflict_status']
custodian['conflict_status'] = conflict
# Map to time_of_destruction if destroyed
if conflict.get('status') == 'destroyed':
custodian['time_of_destruction'] = {
'date': conflict.get('date'),
'reported_date': conflict.get('reported_date'),
'description': conflict.get('description'),
'sources': conflict.get('sources', []),
}
# Custodian name (consensus name)
custodian_name = {
'claim_type': 'custodian_name',
'claim_value': inst.get('name'),
'claim_value_arabic': inst.get('name_arabic'),
'source': 'palestinian_heritage_extraction',
'confidence': inst.get('confidence', 0.9),
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
custodian['custodian_name'] = custodian_name
# Digital platforms (from website)
if inst.get('website'):
custodian['digital_platforms'] = [{
'platform_name': f"{inst.get('name')} Website",
'platform_url': inst['website'],
'platform_type': 'WEBSITE',
'enrichment_timestamp': datetime.now(timezone.utc).isoformat(),
'source_method': 'palestinian_heritage_extraction',
}]
return custodian
def generate_filename(inst: dict) -> str:
"""Generate a filename for the custodian YAML file."""
# Use GHCID if available
if inst.get('ghcid'):
return f"{inst['ghcid']}.yaml"
# Fallback to country-city-name pattern
country = inst.get('country', 'XX')
city = slugify(inst.get('city', 'unknown'))[:3]
name = slugify(inst.get('name', 'unknown'))[:20]
return f"{country}-{city}-{name}.yaml"
def main():
"""Main conversion function."""
# Paths
source_file = Path('/Users/kempersc/apps/glam/data/extracted/palestinian_heritage_consolidated.json')
output_dir = Path('/Users/kempersc/apps/glam/data/custodian')
# Ensure output directory exists
output_dir.mkdir(parents=True, exist_ok=True)
# Load source data
print(f"Loading source file: {source_file}")
with open(source_file, 'r', encoding='utf-8') as f:
data = json.load(f)
metadata = data.get('metadata', {})
institutions = data.get('institutions', [])
print(f"Found {len(institutions)} institutions to convert")
print(f"Source version: {metadata.get('version', 'unknown')}")
# Track statistics
stats = {
'total': len(institutions),
'converted': 0,
'with_ghcid': 0,
'with_google_maps': 0,
'with_wikidata': 0,
'with_conflict_status': 0,
'destroyed': 0,
'by_country': {},
}
# Convert each institution
for inst in institutions:
try:
# Generate custodian format
custodian = convert_institution_to_custodian(inst, metadata)
# Generate filename
filename = generate_filename(inst)
output_path = output_dir / filename
# Write YAML file
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(custodian, f,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
width=120)
# Update statistics
stats['converted'] += 1
if inst.get('ghcid'):
stats['with_ghcid'] += 1
if inst.get('google_maps_enrichment'):
stats['with_google_maps'] += 1
if inst.get('wikidata'):
stats['with_wikidata'] += 1
if inst.get('conflict_status'):
stats['with_conflict_status'] += 1
if inst['conflict_status'].get('status') == 'destroyed':
stats['destroyed'] += 1
country = inst.get('country', 'XX')
stats['by_country'][country] = stats['by_country'].get(country, 0) + 1
print(f"{filename}")
except Exception as e:
print(f" ✗ Error converting {inst.get('id', 'unknown')}: {e}")
# Print summary
print("\n" + "="*60)
print("CONVERSION SUMMARY")
print("="*60)
print(f"Total institutions: {stats['total']}")
print(f"Successfully converted: {stats['converted']}")
print(f"With GHCID: {stats['with_ghcid']}")
print(f"With Google Maps: {stats['with_google_maps']}")
print(f"With Wikidata: {stats['with_wikidata']}")
print(f"With conflict status: {stats['with_conflict_status']}")
print(f"Destroyed institutions: {stats['destroyed']}")
print("\nBy country:")
for country, count in sorted(stats['by_country'].items(), key=lambda x: (x[0] or 'ZZ')):
print(f" {country or 'Unknown'}: {count}")
print(f"\nOutput directory: {output_dir}")
if __name__ == '__main__':
main()