254 lines
7.5 KiB
Python
254 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Patch wikidata_enrichment sections to add wasDerivedFrom using available fields.
|
|
|
|
Handles cases where:
|
|
- wikidata_entity_id is None but wikidata_id exists
|
|
- wikidata_url exists and can be used directly
|
|
- Neither exists (skip)
|
|
|
|
Usage:
|
|
python scripts/patch_wikidata_derived_from.py [--dry-run] [--normalize]
|
|
"""
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
try:
|
|
from ruamel.yaml import YAML # type: ignore
|
|
from ruamel.yaml.comments import CommentedMap # type: ignore
|
|
except ImportError:
|
|
print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml")
|
|
sys.exit(1)
|
|
|
|
|
|
def extract_entity_id_from_url(url: str) -> Optional[str]:
|
|
"""Extract Q-number from Wikidata URL."""
|
|
if not url:
|
|
return None
|
|
match = re.search(r'(Q\d+)', url)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def get_wikidata_derived_from(section: dict) -> Optional[str]:
|
|
"""Get wasDerivedFrom URL from wikidata_enrichment section.
|
|
|
|
Priority:
|
|
1. wikidata_entity_id (if valid Q-number)
|
|
2. wikidata_id (if valid Q-number)
|
|
3. wikidata_url (use directly)
|
|
"""
|
|
# Try wikidata_entity_id first
|
|
entity_id = section.get('wikidata_entity_id')
|
|
if entity_id and str(entity_id).startswith('Q'):
|
|
return f"https://www.wikidata.org/wiki/{entity_id}"
|
|
|
|
# Try wikidata_id
|
|
wikidata_id = section.get('wikidata_id')
|
|
if wikidata_id and str(wikidata_id).startswith('Q'):
|
|
return f"https://www.wikidata.org/wiki/{wikidata_id}"
|
|
|
|
# Try wikidata_url directly
|
|
wikidata_url = section.get('wikidata_url')
|
|
if wikidata_url:
|
|
# Normalize to wiki URL format
|
|
entity_id = extract_entity_id_from_url(wikidata_url)
|
|
if entity_id:
|
|
return f"https://www.wikidata.org/wiki/{entity_id}"
|
|
# Use URL as-is if we can't extract entity ID
|
|
return wikidata_url
|
|
|
|
return None
|
|
|
|
|
|
def patch_section(section: dict, normalize: bool = False) -> tuple[bool, Optional[str]]:
|
|
"""Add wasDerivedFrom to section's _provenance if missing.
|
|
|
|
Args:
|
|
section: The wikidata_enrichment section dict
|
|
normalize: If True, also copy wikidata_id to wikidata_entity_id
|
|
|
|
Returns:
|
|
Tuple of (was_patched, derived_from_url)
|
|
"""
|
|
# Get _provenance
|
|
provenance = section.get('_provenance')
|
|
if not provenance:
|
|
return False, None
|
|
|
|
# Get or create prov section
|
|
prov = provenance.get('prov')
|
|
if not prov:
|
|
prov = CommentedMap()
|
|
provenance['prov'] = prov
|
|
|
|
# Check if wasDerivedFrom already exists
|
|
if prov.get('wasDerivedFrom'):
|
|
return False, prov.get('wasDerivedFrom')
|
|
|
|
# Get derived_from URL
|
|
derived_from = get_wikidata_derived_from(section)
|
|
if not derived_from:
|
|
return False, None
|
|
|
|
# Add wasDerivedFrom
|
|
prov['wasDerivedFrom'] = derived_from
|
|
|
|
# Add generatedAtTime if missing
|
|
if not prov.get('generatedAtTime'):
|
|
timestamp = (
|
|
section.get('enrichment_date') or
|
|
section.get('enrichment_timestamp') or
|
|
datetime.now(timezone.utc).isoformat()
|
|
)
|
|
prov['generatedAtTime'] = timestamp
|
|
|
|
# Add wasGeneratedBy if missing
|
|
if not prov.get('wasGeneratedBy'):
|
|
generated_by = CommentedMap()
|
|
generated_by['@type'] = 'prov:Activity'
|
|
generated_by['name'] = 'wikidata_api_fetch'
|
|
generated_by['used'] = 'https://www.wikidata.org/w/rest.php/wikibase/v1'
|
|
prov['wasGeneratedBy'] = generated_by
|
|
|
|
# Optionally normalize wikidata_entity_id
|
|
if normalize and not section.get('wikidata_entity_id'):
|
|
wikidata_id = section.get('wikidata_id')
|
|
if wikidata_id and str(wikidata_id).startswith('Q'):
|
|
section['wikidata_entity_id'] = wikidata_id
|
|
|
|
return True, derived_from
|
|
|
|
|
|
def process_file(filepath: Path, yaml: YAML, dry_run: bool = False, normalize: bool = False) -> dict:
|
|
"""Process a single YAML file."""
|
|
result = {
|
|
'filepath': str(filepath),
|
|
'modified': False,
|
|
'patched': False,
|
|
'derived_from': None,
|
|
'error': None,
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.load(f)
|
|
except Exception as e:
|
|
result['error'] = str(e)
|
|
return result
|
|
|
|
if not isinstance(data, dict):
|
|
return result
|
|
|
|
# Check wikidata_enrichment
|
|
if 'wikidata_enrichment' not in data:
|
|
return result
|
|
|
|
section = data['wikidata_enrichment']
|
|
if not isinstance(section, dict):
|
|
return result
|
|
|
|
patched, derived_from = patch_section(section, normalize=normalize)
|
|
|
|
if patched:
|
|
result['patched'] = True
|
|
result['derived_from'] = derived_from
|
|
|
|
if not dry_run:
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f)
|
|
result['modified'] = True
|
|
except Exception as e:
|
|
result['error'] = f"Write error: {e}"
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Patch wikidata_enrichment sections to add wasDerivedFrom'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run', action='store_true',
|
|
help='Show what would be changed without modifying files'
|
|
)
|
|
parser.add_argument(
|
|
'--normalize', action='store_true',
|
|
help='Also copy wikidata_id to wikidata_entity_id if missing'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose', '-v', action='store_true',
|
|
help='Show detailed output'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup YAML
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
yaml.default_flow_style = False
|
|
yaml.width = 4096
|
|
|
|
# Find files
|
|
script_dir = Path(__file__).parent
|
|
base_dir = script_dir.parent
|
|
custodian_dir = base_dir / 'data' / 'custodian'
|
|
|
|
yaml_files = list(custodian_dir.glob('*.yaml'))
|
|
total_files = len(yaml_files)
|
|
|
|
print(f"{'[DRY RUN] ' if args.dry_run else ''}Processing {total_files} YAML files...")
|
|
print()
|
|
|
|
# Stats
|
|
stats = {
|
|
'files_processed': 0,
|
|
'files_with_wikidata': 0,
|
|
'files_modified': 0,
|
|
'sections_patched': 0,
|
|
'errors': 0,
|
|
}
|
|
|
|
for i, filepath in enumerate(yaml_files):
|
|
if (i + 1) % 2000 == 0:
|
|
print(f" Progress: {i + 1}/{total_files}")
|
|
|
|
result = process_file(filepath, yaml, dry_run=args.dry_run, normalize=args.normalize)
|
|
stats['files_processed'] += 1
|
|
|
|
if result['error']:
|
|
stats['errors'] += 1
|
|
if args.verbose:
|
|
print(f" ERROR: {filepath.name}: {result['error']}")
|
|
|
|
if result['patched']:
|
|
stats['files_with_wikidata'] += 1
|
|
stats['sections_patched'] += 1
|
|
if result['modified']:
|
|
stats['files_modified'] += 1
|
|
if args.verbose:
|
|
print(f" Patched: {filepath.name} -> {result['derived_from']}")
|
|
|
|
# Summary
|
|
print()
|
|
print("=" * 60)
|
|
print("PATCH SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Files processed: {stats['files_processed']:,}")
|
|
print(f"Files modified: {stats['files_modified']:,}")
|
|
print(f"Sections patched: {stats['sections_patched']:,}")
|
|
print(f"Errors: {stats['errors']:,}")
|
|
print()
|
|
|
|
if args.dry_run:
|
|
print("This was a DRY RUN - no files were modified.")
|
|
print("Run without --dry-run to apply changes.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|