#!/usr/bin/env python3 """ Patch all missing wasDerivedFrom fields in YAML enrichment sections. Handles: - youtube_enrichment: Uses channel_url field - wikidata_enrichment: Constructs URL from wikidata_entity_id - zcbs_enrichment: Uses source field or constructs from zcbs_id Usage: python scripts/patch_all_missing_derived_from.py [--dry-run] """ import argparse import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional try: from ruamel.yaml import YAML # type: ignore from ruamel.yaml.comments import CommentedMap # type: ignore except ImportError: print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml") sys.exit(1) def get_youtube_derived_from(section: dict) -> Optional[str]: """Extract wasDerivedFrom URL for youtube_enrichment.""" # Priority: channel_url > source_url > constructed from channel_id if section.get('channel_url'): return section['channel_url'] if section.get('source_url'): return section['source_url'] if section.get('channel_id'): return f"https://www.youtube.com/channel/{section['channel_id']}" return None def get_wikidata_derived_from(section: dict) -> Optional[str]: """Extract wasDerivedFrom URL for wikidata_enrichment.""" entity_id = section.get('wikidata_entity_id') if entity_id and entity_id.startswith('Q'): return f"https://www.wikidata.org/wiki/{entity_id}" return None def get_zcbs_derived_from(section: dict) -> Optional[str]: """Extract wasDerivedFrom URL for zcbs_enrichment.""" # Check for source field if section.get('source'): return section['source'] # Check platform_urls platform_urls = section.get('platform_urls', {}) if platform_urls: # Return first available URL for key in ['website', 'catalog', 'main']: if platform_urls.get(key): return platform_urls[key] # Return any URL for url in platform_urls.values(): if url: return url # Construct from zcbs_id if available zcbs_id = section.get('zcbs_id') if zcbs_id: return f"https://www.zcbs.nl/organisatie/{zcbs_id}" return None def patch_section(section: dict, section_name: str) -> bool: """Add wasDerivedFrom to section's _provenance if missing. Returns True if patched, False if already present or no source available. """ # Get _provenance provenance = section.get('_provenance') if not provenance: return False # Get or create prov section prov = provenance.get('prov') if not prov: prov = CommentedMap() provenance['prov'] = prov # Check if wasDerivedFrom already exists if prov.get('wasDerivedFrom'): return False # Get the appropriate derived_from URL if section_name == 'youtube_enrichment': derived_from = get_youtube_derived_from(section) elif section_name == 'wikidata_enrichment': derived_from = get_wikidata_derived_from(section) elif section_name == 'zcbs_enrichment': derived_from = get_zcbs_derived_from(section) else: return False if not derived_from: return False # Add wasDerivedFrom prov['wasDerivedFrom'] = derived_from # Add generatedAtTime if missing if not prov.get('generatedAtTime'): # Try to get timestamp from section timestamp = ( section.get('fetch_timestamp') or section.get('enrichment_timestamp') or section.get('retrieval_timestamp') or datetime.now(timezone.utc).isoformat() ) prov['generatedAtTime'] = timestamp # Add wasGeneratedBy if missing if not prov.get('wasGeneratedBy'): activity_map = { 'youtube_enrichment': ('youtube_api_fetch', 'https://www.googleapis.com/youtube/v3'), 'wikidata_enrichment': ('wikidata_api_fetch', 'https://www.wikidata.org/w/rest.php/wikibase/v1'), 'zcbs_enrichment': ('zcbs_registry_fetch', 'https://www.zcbs.nl'), } activity_name, api_url = activity_map.get(section_name, ('unknown', '')) generated_by = CommentedMap() generated_by['@type'] = 'prov:Activity' generated_by['name'] = activity_name generated_by['used'] = api_url prov['wasGeneratedBy'] = generated_by return True def process_file(filepath: Path, yaml: YAML, dry_run: bool = False) -> dict: """Process a single YAML file. Returns dict with patching results. """ result = { 'filepath': str(filepath), 'modified': False, 'patched_sections': [], 'error': None, } try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.load(f) except Exception as e: result['error'] = str(e) return result if not isinstance(data, dict): return result sections_to_check = ['youtube_enrichment', 'wikidata_enrichment', 'zcbs_enrichment'] for section_name in sections_to_check: if section_name not in data: continue section = data[section_name] if not isinstance(section, dict): continue if patch_section(section, section_name): result['patched_sections'].append(section_name) result['modified'] = True # Write back if modified and not dry run if result['modified'] and not dry_run: try: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f) except Exception as e: result['error'] = f"Write error: {e}" result['modified'] = False return result def main(): parser = argparse.ArgumentParser( description='Patch missing wasDerivedFrom fields in YAML enrichment sections' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be changed without modifying files' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Show detailed output' ) args = parser.parse_args() # Setup YAML yaml = YAML() yaml.preserve_quotes = True yaml.default_flow_style = False yaml.width = 4096 # Find files script_dir = Path(__file__).parent base_dir = script_dir.parent custodian_dir = base_dir / 'data' / 'custodian' yaml_files = list(custodian_dir.glob('*.yaml')) total_files = len(yaml_files) print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing {total_files} YAML files...") print() # Stats stats = { 'files_processed': 0, 'files_modified': 0, 'files_with_errors': 0, 'sections_patched': { 'youtube_enrichment': 0, 'wikidata_enrichment': 0, 'zcbs_enrichment': 0, }, } for i, filepath in enumerate(yaml_files): if (i + 1) % 2000 == 0: print(f" Progress: {i + 1}/{total_files}") result = process_file(filepath, yaml, dry_run=args.dry_run) stats['files_processed'] += 1 if result['error']: stats['files_with_errors'] += 1 if args.verbose: print(f" ERROR: {filepath.name}: {result['error']}") elif result['modified']: stats['files_modified'] += 1 for section_name in result['patched_sections']: stats['sections_patched'][section_name] += 1 if args.verbose: print(f" Patched: {filepath.name}: {result['patched_sections']}") # Summary print() print("=" * 60) print("PATCH SUMMARY") print("=" * 60) print(f"Files processed: {stats['files_processed']:,}") print(f"Files modified: {stats['files_modified']:,}") print(f"Files with errors: {stats['files_with_errors']:,}") print() print("Sections patched:") for section_name, count in stats['sections_patched'].items(): print(f" {section_name}: {count:,}") print() total_patched = sum(stats['sections_patched'].values()) print(f"Total wasDerivedFrom fields added: {total_patched:,}") if args.dry_run: print() print("This was a DRY RUN - no files were modified.") print("Run without --dry-run to apply changes.") if __name__ == '__main__': main()