glam/scripts/add_timespan_to_custodians.py
2025-12-07 23:08:02 +01:00

330 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Add CIDOC-CRM TimeSpan fields to heritage custodian YAML files.
This script enriches custodian records with temporal data from:
1. conflict_status/time_of_destruction - For destroyed/damaged institutions
2. wikidata_inception - For founding dates from Wikidata
3. wikidata_claims - For inception and dissolution dates
TimeSpan follows CIDOC-CRM E52_Time-Span pattern:
- begin_of_the_begin: Earliest possible start (P82a)
- end_of_the_begin: Latest possible start (P81a)
- begin_of_the_end: Earliest possible end (P81b)
- end_of_the_end: Latest possible end (P82b)
Usage:
python scripts/add_timespan_to_custodians.py [--dry-run] [--verbose]
"""
import argparse
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import yaml
# Preserve YAML formatting
class PreserveQuotesLoader(yaml.SafeLoader):
pass
class PreserveQuotesDumper(yaml.SafeDumper):
pass
def str_representer(dumper, data):
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
PreserveQuotesDumper.add_representer(str, str_representer)
def parse_date(date_str: str) -> Optional[str]:
"""Parse various date formats to ISO 8601."""
if not date_str:
return None
# Already ISO format
if 'T' in str(date_str):
return str(date_str)
date_str = str(date_str).strip()
# Try various formats
formats = [
'%Y-%m-%d',
'%Y-%m',
'%Y',
'%d/%m/%Y',
'%m/%d/%Y',
]
for fmt in formats:
try:
dt = datetime.strptime(date_str, fmt)
return dt.strftime('%Y-%m-%dT00:00:00Z')
except ValueError:
continue
# Handle year-only
try:
year = int(date_str[:4])
if 1000 <= year <= 2100:
return f"{year}-01-01T00:00:00Z"
except (ValueError, IndexError):
pass
return None
def create_timespan_from_destruction(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create TimeSpan from conflict_status or time_of_destruction."""
# Get destruction info
conflict_status = data.get('conflict_status', {})
time_of_destruction = data.get('time_of_destruction', {})
destruction_date = (
conflict_status.get('date') or
time_of_destruction.get('date')
)
if not destruction_date:
return None
destruction_iso = parse_date(destruction_date)
if not destruction_iso:
return None
# Get description
description = (
conflict_status.get('description') or
time_of_destruction.get('description') or
'Institution destroyed or severely damaged.'
)
# Get sources
sources = (
conflict_status.get('sources') or
time_of_destruction.get('sources') or
[]
)
# Create TimeSpan - we know end but not beginning
timespan: Dict[str, Any] = {
'begin_of_the_begin': None, # Unknown founding
'end_of_the_begin': None, # Unknown founding
'begin_of_the_end': destruction_iso,
'end_of_the_end': destruction_iso,
'notes': description,
}
if sources:
timespan['sources'] = sources
return timespan
def create_timespan_from_wikidata(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create TimeSpan from Wikidata inception/dissolution dates."""
wikidata = data.get('wikidata_enrichment', {})
# Get inception date
inception = wikidata.get('wikidata_inception')
# Also check wikidata_claims for inception
claims = wikidata.get('wikidata_claims', {})
if not inception and claims:
inception_claim = claims.get('P571_inception', {})
if isinstance(inception_claim, dict):
inception = inception_claim.get('value')
elif isinstance(inception_claim, str):
inception = inception_claim
if not inception:
return None
inception_iso = parse_date(inception)
if not inception_iso:
return None
# Check for dissolution date
dissolution = None
if claims:
dissolution_claim = claims.get('P576_dissolved', {})
if isinstance(dissolution_claim, dict):
dissolution = dissolution_claim.get('value')
elif isinstance(dissolution_claim, str):
dissolution = dissolution_claim
dissolution_iso = parse_date(dissolution) if dissolution else None
# Create TimeSpan
timespan = {
'begin_of_the_begin': inception_iso,
'end_of_the_begin': inception_iso, # Precise date known
}
if dissolution_iso:
timespan['begin_of_the_end'] = dissolution_iso
timespan['end_of_the_end'] = dissolution_iso
else:
timespan['begin_of_the_end'] = None # Still operating
timespan['end_of_the_end'] = None
# Add source note
wikidata_id = wikidata.get('wikidata_entity_id', '')
if wikidata_id:
timespan['sources'] = [f'Wikidata: {wikidata_id}']
return timespan
def merge_timespans(existing: Optional[Dict], new: Optional[Dict]) -> Optional[Dict]:
"""Merge existing and new TimeSpan, preferring more specific data."""
if not new:
return existing
if not existing:
return new
merged = {}
# For each field, prefer non-null values
for field in ['begin_of_the_begin', 'end_of_the_begin', 'begin_of_the_end', 'end_of_the_end']:
merged[field] = existing.get(field) or new.get(field)
# Merge notes
notes = []
if existing.get('notes'):
notes.append(existing['notes'])
if new.get('notes') and new['notes'] not in notes:
notes.append(new['notes'])
if notes:
merged['notes'] = '\n'.join(notes)
# Merge sources
sources = list(existing.get('sources', []))
for s in new.get('sources', []):
if s not in sources:
sources.append(s)
if sources:
merged['sources'] = sources
return merged
def process_file(filepath: Path, dry_run: bool = False, verbose: bool = False) -> bool:
"""Process a single YAML file, adding TimeSpan if possible."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f"Error reading {filepath}: {e}")
return False
if not data:
return False
# Check if already has timespan
existing_timespan = data.get('timespan')
# Try to create TimeSpan from destruction info
destruction_timespan = create_timespan_from_destruction(data)
# Try to create TimeSpan from Wikidata
wikidata_timespan = create_timespan_from_wikidata(data)
# Merge all sources
final_timespan = existing_timespan
if destruction_timespan:
final_timespan = merge_timespans(final_timespan, destruction_timespan)
if wikidata_timespan:
final_timespan = merge_timespans(final_timespan, wikidata_timespan)
if not final_timespan:
return False
# Check if anything changed
if final_timespan == existing_timespan:
return False
if verbose:
name = data.get('custodian_name', {}).get('claim_value', filepath.stem)
print(f"Adding TimeSpan to: {name}")
print(f" begin: {final_timespan.get('begin_of_the_begin')}")
print(f" end: {final_timespan.get('begin_of_the_end')}")
if dry_run:
return True
# Update the data
data['timespan'] = final_timespan
# Write back
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, Dumper=PreserveQuotesDumper,
default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
print(f"Error writing {filepath}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Add TimeSpan to custodian files')
parser.add_argument('--dry-run', action='store_true', help='Do not write changes')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
parser.add_argument('--dir', default='data/custodian', help='Directory to process')
args = parser.parse_args()
custodian_dir = Path(args.dir)
if not custodian_dir.exists():
print(f"Directory not found: {custodian_dir}")
sys.exit(1)
print(f"Processing custodian files in: {custodian_dir}")
print(f"Dry run: {args.dry_run}")
print()
total = 0
updated = 0
destroyed_count = 0
inception_count = 0
for filepath in custodian_dir.glob('*.yaml'):
total += 1
# Quick check for relevant files
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
has_destruction = 'conflict_status:' in content or 'time_of_destruction:' in content
has_inception = "wikidata_inception: '" in content
if not has_destruction and not has_inception:
continue
if process_file(filepath, args.dry_run, args.verbose):
updated += 1
if has_destruction:
destroyed_count += 1
elif has_inception:
inception_count += 1
print()
print(f"Total files scanned: {total}")
print(f"Files updated: {updated}")
print(f" - From destruction data: {destroyed_count}")
print(f" - From Wikidata inception: {inception_count}")
if args.dry_run:
print("\n(Dry run - no files were modified)")
if __name__ == '__main__':
main()