#!/usr/bin/env python3 """ Add CIDOC-CRM TimeSpan fields to heritage custodian YAML files. This script enriches custodian records with temporal data from: 1. conflict_status/time_of_destruction - For destroyed/damaged institutions 2. wikidata_inception - For founding dates from Wikidata 3. wikidata_claims - For inception and dissolution dates TimeSpan follows CIDOC-CRM E52_Time-Span pattern: - begin_of_the_begin: Earliest possible start (P82a) - end_of_the_begin: Latest possible start (P81a) - begin_of_the_end: Earliest possible end (P81b) - end_of_the_end: Latest possible end (P82b) Usage: python scripts/add_timespan_to_custodians.py [--dry-run] [--verbose] """ import argparse import os import sys from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any import yaml # Preserve YAML formatting class PreserveQuotesLoader(yaml.SafeLoader): pass class PreserveQuotesDumper(yaml.SafeDumper): pass def str_representer(dumper, data): if '\n' in data: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) PreserveQuotesDumper.add_representer(str, str_representer) def parse_date(date_str: str) -> Optional[str]: """Parse various date formats to ISO 8601.""" if not date_str: return None # Already ISO format if 'T' in str(date_str): return str(date_str) date_str = str(date_str).strip() # Try various formats formats = [ '%Y-%m-%d', '%Y-%m', '%Y', '%d/%m/%Y', '%m/%d/%Y', ] for fmt in formats: try: dt = datetime.strptime(date_str, fmt) return dt.strftime('%Y-%m-%dT00:00:00Z') except ValueError: continue # Handle year-only try: year = int(date_str[:4]) if 1000 <= year <= 2100: return f"{year}-01-01T00:00:00Z" except (ValueError, IndexError): pass return None def create_timespan_from_destruction(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create TimeSpan from conflict_status or time_of_destruction.""" # Get destruction info conflict_status = data.get('conflict_status', {}) time_of_destruction = data.get('time_of_destruction', {}) destruction_date = ( conflict_status.get('date') or time_of_destruction.get('date') ) if not destruction_date: return None destruction_iso = parse_date(destruction_date) if not destruction_iso: return None # Get description description = ( conflict_status.get('description') or time_of_destruction.get('description') or 'Institution destroyed or severely damaged.' ) # Get sources sources = ( conflict_status.get('sources') or time_of_destruction.get('sources') or [] ) # Create TimeSpan - we know end but not beginning timespan: Dict[str, Any] = { 'begin_of_the_begin': None, # Unknown founding 'end_of_the_begin': None, # Unknown founding 'begin_of_the_end': destruction_iso, 'end_of_the_end': destruction_iso, 'notes': description, } if sources: timespan['sources'] = sources return timespan def create_timespan_from_wikidata(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Create TimeSpan from Wikidata inception/dissolution dates.""" wikidata = data.get('wikidata_enrichment', {}) # Get inception date inception = wikidata.get('wikidata_inception') # Also check wikidata_claims for inception claims = wikidata.get('wikidata_claims', {}) if not inception and claims: inception_claim = claims.get('P571_inception', {}) if isinstance(inception_claim, dict): inception = inception_claim.get('value') elif isinstance(inception_claim, str): inception = inception_claim if not inception: return None inception_iso = parse_date(inception) if not inception_iso: return None # Check for dissolution date dissolution = None if claims: dissolution_claim = claims.get('P576_dissolved', {}) if isinstance(dissolution_claim, dict): dissolution = dissolution_claim.get('value') elif isinstance(dissolution_claim, str): dissolution = dissolution_claim dissolution_iso = parse_date(dissolution) if dissolution else None # Create TimeSpan timespan = { 'begin_of_the_begin': inception_iso, 'end_of_the_begin': inception_iso, # Precise date known } if dissolution_iso: timespan['begin_of_the_end'] = dissolution_iso timespan['end_of_the_end'] = dissolution_iso else: timespan['begin_of_the_end'] = None # Still operating timespan['end_of_the_end'] = None # Add source note wikidata_id = wikidata.get('wikidata_entity_id', '') if wikidata_id: timespan['sources'] = [f'Wikidata: {wikidata_id}'] return timespan def merge_timespans(existing: Optional[Dict], new: Optional[Dict]) -> Optional[Dict]: """Merge existing and new TimeSpan, preferring more specific data.""" if not new: return existing if not existing: return new merged = {} # For each field, prefer non-null values for field in ['begin_of_the_begin', 'end_of_the_begin', 'begin_of_the_end', 'end_of_the_end']: merged[field] = existing.get(field) or new.get(field) # Merge notes notes = [] if existing.get('notes'): notes.append(existing['notes']) if new.get('notes') and new['notes'] not in notes: notes.append(new['notes']) if notes: merged['notes'] = '\n'.join(notes) # Merge sources sources = list(existing.get('sources', [])) for s in new.get('sources', []): if s not in sources: sources.append(s) if sources: merged['sources'] = sources return merged def process_file(filepath: Path, dry_run: bool = False, verbose: bool = False) -> bool: """Process a single YAML file, adding TimeSpan if possible.""" try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: print(f"Error reading {filepath}: {e}") return False if not data: return False # Check if already has timespan existing_timespan = data.get('timespan') # Try to create TimeSpan from destruction info destruction_timespan = create_timespan_from_destruction(data) # Try to create TimeSpan from Wikidata wikidata_timespan = create_timespan_from_wikidata(data) # Merge all sources final_timespan = existing_timespan if destruction_timespan: final_timespan = merge_timespans(final_timespan, destruction_timespan) if wikidata_timespan: final_timespan = merge_timespans(final_timespan, wikidata_timespan) if not final_timespan: return False # Check if anything changed if final_timespan == existing_timespan: return False if verbose: name = data.get('custodian_name', {}).get('claim_value', filepath.stem) print(f"Adding TimeSpan to: {name}") print(f" begin: {final_timespan.get('begin_of_the_begin')}") print(f" end: {final_timespan.get('begin_of_the_end')}") if dry_run: return True # Update the data data['timespan'] = final_timespan # Write back try: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=PreserveQuotesDumper, default_flow_style=False, allow_unicode=True, sort_keys=False) return True except Exception as e: print(f"Error writing {filepath}: {e}") return False def main(): parser = argparse.ArgumentParser(description='Add TimeSpan to custodian files') parser.add_argument('--dry-run', action='store_true', help='Do not write changes') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--dir', default='data/custodian', help='Directory to process') args = parser.parse_args() custodian_dir = Path(args.dir) if not custodian_dir.exists(): print(f"Directory not found: {custodian_dir}") sys.exit(1) print(f"Processing custodian files in: {custodian_dir}") print(f"Dry run: {args.dry_run}") print() total = 0 updated = 0 destroyed_count = 0 inception_count = 0 for filepath in custodian_dir.glob('*.yaml'): total += 1 # Quick check for relevant files with open(filepath, 'r', encoding='utf-8') as f: content = f.read() has_destruction = 'conflict_status:' in content or 'time_of_destruction:' in content has_inception = "wikidata_inception: '" in content if not has_destruction and not has_inception: continue if process_file(filepath, args.dry_run, args.verbose): updated += 1 if has_destruction: destroyed_count += 1 elif has_inception: inception_count += 1 print() print(f"Total files scanned: {total}") print(f"Files updated: {updated}") print(f" - From destruction data: {destroyed_count}") print(f" - From Wikidata inception: {inception_count}") if args.dry_run: print("\n(Dry run - no files were modified)") if __name__ == '__main__': main()