270 lines
8.3 KiB
Python
270 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Patch all missing wasDerivedFrom fields in YAML enrichment sections.
|
|
|
|
Handles:
|
|
- youtube_enrichment: Uses channel_url field
|
|
- wikidata_enrichment: Constructs URL from wikidata_entity_id
|
|
- zcbs_enrichment: Uses source field or constructs from zcbs_id
|
|
|
|
Usage:
|
|
python scripts/patch_all_missing_derived_from.py [--dry-run]
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
try:
|
|
from ruamel.yaml import YAML # type: ignore
|
|
from ruamel.yaml.comments import CommentedMap # type: ignore
|
|
except ImportError:
|
|
print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml")
|
|
sys.exit(1)
|
|
|
|
|
|
def get_youtube_derived_from(section: dict) -> Optional[str]:
|
|
"""Extract wasDerivedFrom URL for youtube_enrichment."""
|
|
# Priority: channel_url > source_url > constructed from channel_id
|
|
if section.get('channel_url'):
|
|
return section['channel_url']
|
|
if section.get('source_url'):
|
|
return section['source_url']
|
|
if section.get('channel_id'):
|
|
return f"https://www.youtube.com/channel/{section['channel_id']}"
|
|
return None
|
|
|
|
|
|
def get_wikidata_derived_from(section: dict) -> Optional[str]:
|
|
"""Extract wasDerivedFrom URL for wikidata_enrichment."""
|
|
entity_id = section.get('wikidata_entity_id')
|
|
if entity_id and entity_id.startswith('Q'):
|
|
return f"https://www.wikidata.org/wiki/{entity_id}"
|
|
return None
|
|
|
|
|
|
def get_zcbs_derived_from(section: dict) -> Optional[str]:
|
|
"""Extract wasDerivedFrom URL for zcbs_enrichment."""
|
|
# Check for source field
|
|
if section.get('source'):
|
|
return section['source']
|
|
# Check platform_urls
|
|
platform_urls = section.get('platform_urls', {})
|
|
if platform_urls:
|
|
# Return first available URL
|
|
for key in ['website', 'catalog', 'main']:
|
|
if platform_urls.get(key):
|
|
return platform_urls[key]
|
|
# Return any URL
|
|
for url in platform_urls.values():
|
|
if url:
|
|
return url
|
|
# Construct from zcbs_id if available
|
|
zcbs_id = section.get('zcbs_id')
|
|
if zcbs_id:
|
|
return f"https://www.zcbs.nl/organisatie/{zcbs_id}"
|
|
return None
|
|
|
|
|
|
def patch_section(section: dict, section_name: str) -> bool:
|
|
"""Add wasDerivedFrom to section's _provenance if missing.
|
|
|
|
Returns True if patched, False if already present or no source available.
|
|
"""
|
|
# Get _provenance
|
|
provenance = section.get('_provenance')
|
|
if not provenance:
|
|
return False
|
|
|
|
# Get or create prov section
|
|
prov = provenance.get('prov')
|
|
if not prov:
|
|
prov = CommentedMap()
|
|
provenance['prov'] = prov
|
|
|
|
# Check if wasDerivedFrom already exists
|
|
if prov.get('wasDerivedFrom'):
|
|
return False
|
|
|
|
# Get the appropriate derived_from URL
|
|
if section_name == 'youtube_enrichment':
|
|
derived_from = get_youtube_derived_from(section)
|
|
elif section_name == 'wikidata_enrichment':
|
|
derived_from = get_wikidata_derived_from(section)
|
|
elif section_name == 'zcbs_enrichment':
|
|
derived_from = get_zcbs_derived_from(section)
|
|
else:
|
|
return False
|
|
|
|
if not derived_from:
|
|
return False
|
|
|
|
# Add wasDerivedFrom
|
|
prov['wasDerivedFrom'] = derived_from
|
|
|
|
# Add generatedAtTime if missing
|
|
if not prov.get('generatedAtTime'):
|
|
# Try to get timestamp from section
|
|
timestamp = (
|
|
section.get('fetch_timestamp') or
|
|
section.get('enrichment_timestamp') or
|
|
section.get('retrieval_timestamp') or
|
|
datetime.now(timezone.utc).isoformat()
|
|
)
|
|
prov['generatedAtTime'] = timestamp
|
|
|
|
# Add wasGeneratedBy if missing
|
|
if not prov.get('wasGeneratedBy'):
|
|
activity_map = {
|
|
'youtube_enrichment': ('youtube_api_fetch', 'https://www.googleapis.com/youtube/v3'),
|
|
'wikidata_enrichment': ('wikidata_api_fetch', 'https://www.wikidata.org/w/rest.php/wikibase/v1'),
|
|
'zcbs_enrichment': ('zcbs_registry_fetch', 'https://www.zcbs.nl'),
|
|
}
|
|
activity_name, api_url = activity_map.get(section_name, ('unknown', ''))
|
|
|
|
generated_by = CommentedMap()
|
|
generated_by['@type'] = 'prov:Activity'
|
|
generated_by['name'] = activity_name
|
|
generated_by['used'] = api_url
|
|
prov['wasGeneratedBy'] = generated_by
|
|
|
|
return True
|
|
|
|
|
|
def process_file(filepath: Path, yaml: YAML, dry_run: bool = False) -> dict:
|
|
"""Process a single YAML file.
|
|
|
|
Returns dict with patching results.
|
|
"""
|
|
result = {
|
|
'filepath': str(filepath),
|
|
'modified': False,
|
|
'patched_sections': [],
|
|
'error': None,
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.load(f)
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
return result
|
|
|
|
if not isinstance(data, dict):
|
|
return result
|
|
|
|
sections_to_check = ['youtube_enrichment', 'wikidata_enrichment', 'zcbs_enrichment']
|
|
|
|
for section_name in sections_to_check:
|
|
if section_name not in data:
|
|
continue
|
|
|
|
section = data[section_name]
|
|
if not isinstance(section, dict):
|
|
continue
|
|
|
|
if patch_section(section, section_name):
|
|
result['patched_sections'].append(section_name)
|
|
result['modified'] = True
|
|
|
|
# Write back if modified and not dry run
|
|
if result['modified'] and not dry_run:
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f)
|
|
except Exception as e:
|
|
result['error'] = f"Write error: {e}"
|
|
result['modified'] = False
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Patch missing wasDerivedFrom fields in YAML enrichment sections'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run', action='store_true',
|
|
help='Show what would be changed without modifying files'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose', '-v', action='store_true',
|
|
help='Show detailed output'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup YAML
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
yaml.default_flow_style = False
|
|
yaml.width = 4096
|
|
|
|
# Find files
|
|
script_dir = Path(__file__).parent
|
|
base_dir = script_dir.parent
|
|
custodian_dir = base_dir / 'data' / 'custodian'
|
|
|
|
yaml_files = list(custodian_dir.glob('*.yaml'))
|
|
total_files = len(yaml_files)
|
|
|
|
print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing {total_files} YAML files...")
|
|
print()
|
|
|
|
# Stats
|
|
stats = {
|
|
'files_processed': 0,
|
|
'files_modified': 0,
|
|
'files_with_errors': 0,
|
|
'sections_patched': {
|
|
'youtube_enrichment': 0,
|
|
'wikidata_enrichment': 0,
|
|
'zcbs_enrichment': 0,
|
|
},
|
|
}
|
|
|
|
for i, filepath in enumerate(yaml_files):
|
|
if (i + 1) % 2000 == 0:
|
|
print(f" Progress: {i + 1}/{total_files}")
|
|
|
|
result = process_file(filepath, yaml, dry_run=args.dry_run)
|
|
stats['files_processed'] += 1
|
|
|
|
if result['error']:
|
|
stats['files_with_errors'] += 1
|
|
if args.verbose:
|
|
print(f" ERROR: {filepath.name}: {result['error']}")
|
|
elif result['modified']:
|
|
stats['files_modified'] += 1
|
|
for section_name in result['patched_sections']:
|
|
stats['sections_patched'][section_name] += 1
|
|
if args.verbose:
|
|
print(f" Patched: {filepath.name}: {result['patched_sections']}")
|
|
|
|
# Summary
|
|
print()
|
|
print("=" * 60)
|
|
print("PATCH SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Files processed: {stats['files_processed']:,}")
|
|
print(f"Files modified: {stats['files_modified']:,}")
|
|
print(f"Files with errors: {stats['files_with_errors']:,}")
|
|
print()
|
|
print("Sections patched:")
|
|
for section_name, count in stats['sections_patched'].items():
|
|
print(f" {section_name}: {count:,}")
|
|
print()
|
|
|
|
total_patched = sum(stats['sections_patched'].values())
|
|
print(f"Total wasDerivedFrom fields added: {total_patched:,}")
|
|
|
|
if args.dry_run:
|
|
print()
|
|
print("This was a DRY RUN - no files were modified.")
|
|
print("Run without --dry-run to apply changes.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|