glam/scripts/patch_all_missing_derived_from.py
2025-12-30 03:43:31 +01:00

270 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
Patch all missing wasDerivedFrom fields in YAML enrichment sections.
Handles:
- youtube_enrichment: Uses channel_url field
- wikidata_enrichment: Constructs URL from wikidata_entity_id
- zcbs_enrichment: Uses source field or constructs from zcbs_id
Usage:
python scripts/patch_all_missing_derived_from.py [--dry-run]
"""
import argparse
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
try:
from ruamel.yaml import YAML # type: ignore
from ruamel.yaml.comments import CommentedMap # type: ignore
except ImportError:
print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml")
sys.exit(1)
def get_youtube_derived_from(section: dict) -> Optional[str]:
"""Extract wasDerivedFrom URL for youtube_enrichment."""
# Priority: channel_url > source_url > constructed from channel_id
if section.get('channel_url'):
return section['channel_url']
if section.get('source_url'):
return section['source_url']
if section.get('channel_id'):
return f"https://www.youtube.com/channel/{section['channel_id']}"
return None
def get_wikidata_derived_from(section: dict) -> Optional[str]:
"""Extract wasDerivedFrom URL for wikidata_enrichment."""
entity_id = section.get('wikidata_entity_id')
if entity_id and entity_id.startswith('Q'):
return f"https://www.wikidata.org/wiki/{entity_id}"
return None
def get_zcbs_derived_from(section: dict) -> Optional[str]:
"""Extract wasDerivedFrom URL for zcbs_enrichment."""
# Check for source field
if section.get('source'):
return section['source']
# Check platform_urls
platform_urls = section.get('platform_urls', {})
if platform_urls:
# Return first available URL
for key in ['website', 'catalog', 'main']:
if platform_urls.get(key):
return platform_urls[key]
# Return any URL
for url in platform_urls.values():
if url:
return url
# Construct from zcbs_id if available
zcbs_id = section.get('zcbs_id')
if zcbs_id:
return f"https://www.zcbs.nl/organisatie/{zcbs_id}"
return None
def patch_section(section: dict, section_name: str) -> bool:
"""Add wasDerivedFrom to section's _provenance if missing.
Returns True if patched, False if already present or no source available.
"""
# Get _provenance
provenance = section.get('_provenance')
if not provenance:
return False
# Get or create prov section
prov = provenance.get('prov')
if not prov:
prov = CommentedMap()
provenance['prov'] = prov
# Check if wasDerivedFrom already exists
if prov.get('wasDerivedFrom'):
return False
# Get the appropriate derived_from URL
if section_name == 'youtube_enrichment':
derived_from = get_youtube_derived_from(section)
elif section_name == 'wikidata_enrichment':
derived_from = get_wikidata_derived_from(section)
elif section_name == 'zcbs_enrichment':
derived_from = get_zcbs_derived_from(section)
else:
return False
if not derived_from:
return False
# Add wasDerivedFrom
prov['wasDerivedFrom'] = derived_from
# Add generatedAtTime if missing
if not prov.get('generatedAtTime'):
# Try to get timestamp from section
timestamp = (
section.get('fetch_timestamp') or
section.get('enrichment_timestamp') or
section.get('retrieval_timestamp') or
datetime.now(timezone.utc).isoformat()
)
prov['generatedAtTime'] = timestamp
# Add wasGeneratedBy if missing
if not prov.get('wasGeneratedBy'):
activity_map = {
'youtube_enrichment': ('youtube_api_fetch', 'https://www.googleapis.com/youtube/v3'),
'wikidata_enrichment': ('wikidata_api_fetch', 'https://www.wikidata.org/w/rest.php/wikibase/v1'),
'zcbs_enrichment': ('zcbs_registry_fetch', 'https://www.zcbs.nl'),
}
activity_name, api_url = activity_map.get(section_name, ('unknown', ''))
generated_by = CommentedMap()
generated_by['@type'] = 'prov:Activity'
generated_by['name'] = activity_name
generated_by['used'] = api_url
prov['wasGeneratedBy'] = generated_by
return True
def process_file(filepath: Path, yaml: YAML, dry_run: bool = False) -> dict:
"""Process a single YAML file.
Returns dict with patching results.
"""
result = {
'filepath': str(filepath),
'modified': False,
'patched_sections': [],
'error': None,
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.load(f)
except Exception as e:
result['error'] = str(e)
return result
if not isinstance(data, dict):
return result
sections_to_check = ['youtube_enrichment', 'wikidata_enrichment', 'zcbs_enrichment']
for section_name in sections_to_check:
if section_name not in data:
continue
section = data[section_name]
if not isinstance(section, dict):
continue
if patch_section(section, section_name):
result['patched_sections'].append(section_name)
result['modified'] = True
# Write back if modified and not dry run
if result['modified'] and not dry_run:
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f)
except Exception as e:
result['error'] = f"Write error: {e}"
result['modified'] = False
return result
def main():
parser = argparse.ArgumentParser(
description='Patch missing wasDerivedFrom fields in YAML enrichment sections'
)
parser.add_argument(
'--dry-run', action='store_true',
help='Show what would be changed without modifying files'
)
parser.add_argument(
'--verbose', '-v', action='store_true',
help='Show detailed output'
)
args = parser.parse_args()
# Setup YAML
yaml = YAML()
yaml.preserve_quotes = True
yaml.default_flow_style = False
yaml.width = 4096
# Find files
script_dir = Path(__file__).parent
base_dir = script_dir.parent
custodian_dir = base_dir / 'data' / 'custodian'
yaml_files = list(custodian_dir.glob('*.yaml'))
total_files = len(yaml_files)
print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing {total_files} YAML files...")
print()
# Stats
stats = {
'files_processed': 0,
'files_modified': 0,
'files_with_errors': 0,
'sections_patched': {
'youtube_enrichment': 0,
'wikidata_enrichment': 0,
'zcbs_enrichment': 0,
},
}
for i, filepath in enumerate(yaml_files):
if (i + 1) % 2000 == 0:
print(f" Progress: {i + 1}/{total_files}")
result = process_file(filepath, yaml, dry_run=args.dry_run)
stats['files_processed'] += 1
if result['error']:
stats['files_with_errors'] += 1
if args.verbose:
print(f" ERROR: {filepath.name}: {result['error']}")
elif result['modified']:
stats['files_modified'] += 1
for section_name in result['patched_sections']:
stats['sections_patched'][section_name] += 1
if args.verbose:
print(f" Patched: {filepath.name}: {result['patched_sections']}")
# Summary
print()
print("=" * 60)
print("PATCH SUMMARY")
print("=" * 60)
print(f"Files processed: {stats['files_processed']:,}")
print(f"Files modified: {stats['files_modified']:,}")
print(f"Files with errors: {stats['files_with_errors']:,}")
print()
print("Sections patched:")
for section_name, count in stats['sections_patched'].items():
print(f" {section_name}: {count:,}")
print()
total_patched = sum(stats['sections_patched'].values())
print(f"Total wasDerivedFrom fields added: {total_patched:,}")
if args.dry_run:
print()
print("This was a DRY RUN - no files were modified.")
print("Run without --dry-run to apply changes.")
if __name__ == '__main__':
main()