649 lines
20 KiB
Python
Executable file
649 lines
20 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Add enhanced provenance to enrichment sections in custodian YAML files.
|
|
|
|
This script enhances existing enrichment sections with FAIR-compliant provenance elements
|
|
following the YAML_PROVENANCE_SCHEMA.md specification:
|
|
|
|
1. **content_hash** - SHA-256 hash of enrichment section for integrity verification
|
|
2. **prov.wasDerivedFrom** - Source URL/entity for PROV-O alignment
|
|
3. **prov.generatedAtTime** - Timestamp from existing fetch_timestamp
|
|
4. **verification.status** - Verification status tracking
|
|
|
|
CRITICAL RULES:
|
|
- DATA_PRESERVATION_RULES: Never delete existing enriched content - additive only
|
|
- Use ruamel.yaml to preserve formatting, comments, and key ordering
|
|
- Idempotent processing - skip files already processed
|
|
- Hash generation is deterministic (computed from actual content)
|
|
|
|
Enrichment types processed:
|
|
- wikidata_enrichment (17,900 files)
|
|
- google_maps_enrichment (3,564 files)
|
|
- youtube_enrichment
|
|
- web_enrichment (1,708 files)
|
|
- zcbs_enrichment (142 files)
|
|
|
|
Usage:
|
|
python scripts/add_yaml_provenance.py [--limit N] [--dry-run] [--verbose]
|
|
python scripts/add_yaml_provenance.py --file path/to/file.yaml
|
|
python scripts/add_yaml_provenance.py --validate # Validate without modifying
|
|
|
|
Author: OpenCode/Claude
|
|
Created: 2025-12-28
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
# Use ruamel.yaml to preserve formatting and comments
|
|
try:
|
|
from ruamel.yaml import YAML
|
|
from ruamel.yaml.comments import CommentedMap
|
|
HAS_RUAMEL = True
|
|
except ImportError:
|
|
HAS_RUAMEL = False
|
|
print("Warning: ruamel.yaml not installed. Install with: pip install ruamel.yaml", file=sys.stderr)
|
|
|
|
# Constants
|
|
CUSTODIAN_DIR = Path("data/custodian")
|
|
PROVENANCE_SCHEMA_VERSION = "2.0"
|
|
|
|
# Enrichment sections to process with their source URL patterns
|
|
ENRICHMENT_SECTIONS = {
|
|
'wikidata_enrichment': {
|
|
'source_key': 'wikidata_entity_id',
|
|
'source_url_template': 'https://www.wikidata.org/wiki/{value}',
|
|
'api_endpoint': 'https://www.wikidata.org/w/rest.php/wikibase/v1',
|
|
'timestamp_key': 'fetch_timestamp',
|
|
},
|
|
'google_maps_enrichment': {
|
|
'source_key': 'place_id',
|
|
'source_url_template': 'https://maps.googleapis.com/maps/api/place/details/json?place_id={value}',
|
|
'api_endpoint': 'https://maps.googleapis.com/maps/api/place',
|
|
'timestamp_key': 'fetch_timestamp',
|
|
},
|
|
'youtube_enrichment': {
|
|
'source_key': 'source_url',
|
|
'source_url_template': None, # Direct URL
|
|
'source_url_template': 'https://www.googleapis.com/youtube/v3/channels?id={value}',
|
|
'api_endpoint': 'https://www.googleapis.com/youtube/v3',
|
|
'timestamp_key': 'fetch_timestamp',
|
|
},
|
|
'web_enrichment': {
|
|
'source_key': None, # Uses web_archives[0].url
|
|
'source_url_template': None,
|
|
'api_endpoint': None,
|
|
'timestamp_key': 'full_site_archive_timestamp',
|
|
},
|
|
'zcbs_enrichment': {
|
|
'source_key': 'source_url',
|
|
'source_url_template': None, # Direct URL
|
|
'api_endpoint': None,
|
|
'timestamp_key': 'fetch_timestamp',
|
|
},
|
|
'linkup_timespan': {
|
|
'source_key': 'search_query',
|
|
'source_url_template': None,
|
|
'api_endpoint': 'https://api.linkup.so',
|
|
'timestamp_key': 'fetch_timestamp',
|
|
},
|
|
}
|
|
|
|
|
|
def normalize_for_hash(data: Any) -> Any:
|
|
"""
|
|
Normalize data for consistent hashing.
|
|
|
|
- Floating point: round to 6 decimal places
|
|
- Timestamps: normalize to ISO 8601 UTC
|
|
- Unicode: apply NFD normalization
|
|
- Remove _provenance to avoid circular dependency
|
|
"""
|
|
if isinstance(data, dict):
|
|
return {k: normalize_for_hash(v) for k, v in sorted(data.items()) if k != '_provenance'}
|
|
elif isinstance(data, list):
|
|
return [normalize_for_hash(item) for item in data]
|
|
elif isinstance(data, float):
|
|
return round(data, 6)
|
|
elif isinstance(data, str):
|
|
# Apply Unicode normalization
|
|
return unicodedata.normalize('NFD', data)
|
|
else:
|
|
return data
|
|
|
|
|
|
def generate_content_hash(enrichment_data: dict) -> Dict[str, str]:
|
|
"""
|
|
Generate SHA-256 hash for enrichment section integrity.
|
|
|
|
Excludes '_provenance' key to avoid circular dependency.
|
|
Uses canonical JSON (sorted keys, no extra whitespace).
|
|
|
|
Args:
|
|
enrichment_data: The enrichment section dict
|
|
|
|
Returns:
|
|
Dict with algorithm, value (base64), scope, and computed_at
|
|
"""
|
|
# Normalize and remove _provenance
|
|
data_to_hash = normalize_for_hash(enrichment_data)
|
|
|
|
# Canonical JSON
|
|
canonical = json.dumps(data_to_hash, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
|
|
|
|
# SHA-256 hash
|
|
hash_bytes = hashlib.sha256(canonical.encode('utf-8')).digest()
|
|
hash_b64 = base64.b64encode(hash_bytes).decode('ascii')
|
|
|
|
return {
|
|
"algorithm": "sha256",
|
|
"value": f"sha256-{hash_b64}",
|
|
"scope": "enrichment_section",
|
|
"computed_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
|
|
def get_source_url(section_name: str, section_data: dict) -> Optional[str]:
|
|
"""
|
|
Extract or construct the source URL for an enrichment section.
|
|
|
|
Args:
|
|
section_name: Name of the enrichment section
|
|
section_data: The enrichment section data
|
|
|
|
Returns:
|
|
Source URL string or None
|
|
"""
|
|
config = ENRICHMENT_SECTIONS.get(section_name, {})
|
|
|
|
# Special handling for web_enrichment
|
|
if section_name == 'web_enrichment':
|
|
web_archives = section_data.get('web_archives', [])
|
|
if web_archives and isinstance(web_archives, list) and len(web_archives) > 0:
|
|
return web_archives[0].get('url')
|
|
return None
|
|
|
|
# Special handling for zcbs_enrichment - use source_url directly
|
|
if section_name == 'zcbs_enrichment':
|
|
return section_data.get('source_url')
|
|
|
|
# Get source key and template
|
|
source_key = config.get('source_key')
|
|
template = config.get('source_url_template')
|
|
|
|
if not source_key:
|
|
return None
|
|
|
|
value = section_data.get(source_key)
|
|
if not value:
|
|
# Try nested paths (e.g., youtube_enrichment.channel.channel_id)
|
|
if section_name == 'youtube_enrichment':
|
|
channel = section_data.get('channel', {})
|
|
value = channel.get('channel_id')
|
|
|
|
if not value:
|
|
return None
|
|
|
|
if template:
|
|
return template.format(value=value)
|
|
else:
|
|
return str(value)
|
|
|
|
|
|
def get_timestamp(section_name: str, section_data: dict) -> Optional[str]:
|
|
"""
|
|
Extract the timestamp from an enrichment section.
|
|
|
|
Args:
|
|
section_name: Name of the enrichment section
|
|
section_data: The enrichment section data
|
|
|
|
Returns:
|
|
ISO 8601 timestamp string or None
|
|
"""
|
|
config = ENRICHMENT_SECTIONS.get(section_name, {})
|
|
timestamp_key = config.get('timestamp_key', 'fetch_timestamp')
|
|
|
|
timestamp = section_data.get(timestamp_key)
|
|
|
|
# Handle nested timestamps
|
|
if not timestamp and section_name == 'linkup_timespan':
|
|
# Look in provenance.sources.linkup_timespan
|
|
pass # Already at root level in most cases
|
|
|
|
return timestamp
|
|
|
|
|
|
def create_provenance_block(
|
|
section_name: str,
|
|
section_data: dict,
|
|
content_hash: dict
|
|
) -> dict:
|
|
"""
|
|
Create a _provenance block for an enrichment section.
|
|
|
|
Args:
|
|
section_name: Name of the enrichment section
|
|
section_data: The enrichment section data
|
|
content_hash: Pre-computed content hash
|
|
|
|
Returns:
|
|
Complete _provenance dict
|
|
"""
|
|
config = ENRICHMENT_SECTIONS.get(section_name, {})
|
|
|
|
source_url = get_source_url(section_name, section_data)
|
|
timestamp = get_timestamp(section_name, section_data)
|
|
api_endpoint = config.get('api_endpoint')
|
|
|
|
provenance = {
|
|
'content_hash': content_hash,
|
|
'prov': {},
|
|
'verification': {
|
|
'status': 'verified',
|
|
'last_verified': datetime.now(timezone.utc).isoformat()
|
|
}
|
|
}
|
|
|
|
# Add PROV-O elements
|
|
if source_url:
|
|
provenance['prov']['wasDerivedFrom'] = source_url
|
|
|
|
if timestamp:
|
|
provenance['prov']['generatedAtTime'] = timestamp
|
|
|
|
if api_endpoint:
|
|
provenance['prov']['wasGeneratedBy'] = {
|
|
'@type': 'prov:Activity',
|
|
'name': f'{section_name.replace("_enrichment", "")}_api_fetch',
|
|
'used': api_endpoint
|
|
}
|
|
|
|
# Special handling for web_enrichment - add archive info
|
|
if section_name == 'web_enrichment':
|
|
web_archives = section_data.get('web_archives', [])
|
|
if web_archives and len(web_archives) > 0:
|
|
archive = web_archives[0]
|
|
provenance['archive'] = {
|
|
'local_path': f"{archive.get('directory', '')}/{archive.get('warc_file', '')}",
|
|
'format': archive.get('warc_format', 'ISO 28500 WARC'),
|
|
'size_bytes': archive.get('warc_size_bytes')
|
|
}
|
|
|
|
return provenance
|
|
|
|
|
|
def needs_provenance(section_data: dict) -> bool:
|
|
"""
|
|
Check if an enrichment section needs provenance added.
|
|
|
|
Args:
|
|
section_data: The enrichment section dict
|
|
|
|
Returns:
|
|
True if provenance should be added
|
|
"""
|
|
if not isinstance(section_data, dict):
|
|
return False
|
|
|
|
# Check if _provenance already exists with content_hash
|
|
existing_prov = section_data.get('_provenance', {})
|
|
if existing_prov and 'content_hash' in existing_prov:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def add_provenance_to_section(
|
|
section_name: str,
|
|
section_data: dict,
|
|
verbose: bool = False
|
|
) -> Tuple[dict, bool]:
|
|
"""
|
|
Add provenance to a single enrichment section.
|
|
|
|
Args:
|
|
section_name: Name of the enrichment section
|
|
section_data: The enrichment section dict (will be modified)
|
|
verbose: Print progress
|
|
|
|
Returns:
|
|
Tuple of (modified_section, was_modified)
|
|
"""
|
|
if not needs_provenance(section_data):
|
|
if verbose:
|
|
print(f" [{section_name}] Already has provenance, skipping")
|
|
return section_data, False
|
|
|
|
# Generate content hash
|
|
content_hash = generate_content_hash(section_data)
|
|
|
|
# Create provenance block
|
|
provenance = create_provenance_block(section_name, section_data, content_hash)
|
|
|
|
# Add to section
|
|
section_data['_provenance'] = provenance
|
|
|
|
if verbose:
|
|
source_url = get_source_url(section_name, section_data)
|
|
print(f" [{section_name}] Added provenance (hash: {content_hash['value'][:30]}...)")
|
|
if source_url:
|
|
print(f" wasDerivedFrom: {source_url[:60]}...")
|
|
|
|
return section_data, True
|
|
|
|
|
|
def update_root_provenance(data: dict, enrichment_summary: dict) -> dict:
|
|
"""
|
|
Update the root provenance section with enrichment-level summary.
|
|
|
|
Args:
|
|
data: The full YAML data dict
|
|
enrichment_summary: Dict mapping section names to their content hashes
|
|
|
|
Returns:
|
|
Modified provenance dict
|
|
"""
|
|
if 'provenance' not in data:
|
|
data['provenance'] = {}
|
|
|
|
prov = data['provenance']
|
|
|
|
# Update schema version
|
|
prov['schema_version'] = '2.0.0'
|
|
|
|
# Add enrichment provenance summary
|
|
prov['enrichment_provenance'] = {}
|
|
for section_name, hash_info in enrichment_summary.items():
|
|
prov['enrichment_provenance'][section_name] = {
|
|
'content_hash': hash_info['value'],
|
|
'verified_at': hash_info['computed_at']
|
|
}
|
|
|
|
# Add provenance schema version
|
|
prov['provenance_schema_version'] = PROVENANCE_SCHEMA_VERSION
|
|
|
|
# Add standards compliance
|
|
prov['standards_compliance'] = [
|
|
'W3C PROV-O',
|
|
'W3C SRI (content hashes)'
|
|
]
|
|
|
|
# Update generated_at
|
|
prov['generated_at'] = datetime.now(timezone.utc).isoformat()
|
|
|
|
return prov
|
|
|
|
|
|
def process_file(
|
|
filepath: Path,
|
|
yaml_handler: 'YAML',
|
|
dry_run: bool = False,
|
|
verbose: bool = False
|
|
) -> Tuple[bool, int, int]:
|
|
"""
|
|
Process a single custodian YAML file.
|
|
|
|
Args:
|
|
filepath: Path to the YAML file
|
|
yaml_handler: Configured ruamel.yaml handler
|
|
dry_run: If True, don't write changes
|
|
verbose: Print progress
|
|
|
|
Returns:
|
|
Tuple of (file_was_modified, sections_updated, sections_total)
|
|
"""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml_handler.load(f)
|
|
except Exception as e:
|
|
print(f"Error reading {filepath}: {e}", file=sys.stderr)
|
|
return False, 0, 0
|
|
|
|
if not isinstance(data, dict):
|
|
return False, 0, 0
|
|
|
|
sections_total = 0
|
|
sections_updated = 0
|
|
enrichment_summary = {}
|
|
file_modified = False
|
|
|
|
if verbose:
|
|
print(f"\n Processing {filepath.name}")
|
|
|
|
# Process each enrichment section
|
|
for section_name in ENRICHMENT_SECTIONS.keys():
|
|
if section_name not in data:
|
|
continue
|
|
|
|
section_data = data[section_name]
|
|
if not isinstance(section_data, dict):
|
|
continue
|
|
|
|
sections_total += 1
|
|
|
|
updated_section, was_modified = add_provenance_to_section(
|
|
section_name=section_name,
|
|
section_data=section_data,
|
|
verbose=verbose
|
|
)
|
|
|
|
if was_modified:
|
|
data[section_name] = updated_section
|
|
sections_updated += 1
|
|
file_modified = True
|
|
|
|
# Track for root provenance summary
|
|
prov = updated_section.get('_provenance', {})
|
|
if 'content_hash' in prov:
|
|
enrichment_summary[section_name] = prov['content_hash']
|
|
|
|
# Update root provenance if any sections were modified
|
|
if file_modified and enrichment_summary:
|
|
data['provenance'] = update_root_provenance(data, enrichment_summary)
|
|
|
|
if verbose:
|
|
print(f" [provenance] Updated root provenance with {len(enrichment_summary)} sections")
|
|
|
|
# Write file if modified
|
|
if file_modified and not dry_run:
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml_handler.dump(data, f)
|
|
if verbose:
|
|
print(f" Saved {filepath.name}")
|
|
except Exception as e:
|
|
print(f"Error writing {filepath}: {e}", file=sys.stderr)
|
|
return False, sections_updated, sections_total
|
|
elif file_modified and dry_run:
|
|
if verbose:
|
|
print(f" [DRY-RUN] Would save {filepath.name}")
|
|
|
|
return file_modified, sections_updated, sections_total
|
|
|
|
|
|
def validate_file(filepath: Path, yaml_handler: 'YAML', verbose: bool = False) -> List[str]:
|
|
"""
|
|
Validate provenance completeness for a file.
|
|
|
|
Args:
|
|
filepath: Path to the YAML file
|
|
yaml_handler: Configured ruamel.yaml handler
|
|
verbose: Print progress
|
|
|
|
Returns:
|
|
List of validation errors
|
|
"""
|
|
errors = []
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml_handler.load(f)
|
|
except Exception as e:
|
|
return [f"Error reading file: {e}"]
|
|
|
|
if not isinstance(data, dict):
|
|
return ["File does not contain a dict"]
|
|
|
|
for section_name in ENRICHMENT_SECTIONS.keys():
|
|
if section_name not in data:
|
|
continue
|
|
|
|
section = data[section_name]
|
|
if not isinstance(section, dict):
|
|
continue
|
|
|
|
prov = section.get('_provenance', {})
|
|
|
|
# Check mandatory elements
|
|
if 'content_hash' not in prov:
|
|
errors.append(f"{section_name}: missing content_hash")
|
|
if 'verification' not in prov:
|
|
errors.append(f"{section_name}: missing verification")
|
|
if 'prov' not in prov or 'wasDerivedFrom' not in prov.get('prov', {}):
|
|
errors.append(f"{section_name}: missing prov.wasDerivedFrom")
|
|
|
|
return errors
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Add enhanced provenance to enrichment sections in custodian YAML files"
|
|
)
|
|
parser.add_argument(
|
|
"--limit", type=int, default=None,
|
|
help="Limit number of files to process"
|
|
)
|
|
parser.add_argument(
|
|
"--file", type=str, default=None,
|
|
help="Process a specific file"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="Don't write changes, just report what would be done"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", "-v", action="store_true",
|
|
help="Print detailed progress"
|
|
)
|
|
parser.add_argument(
|
|
"--validate", action="store_true",
|
|
help="Validate provenance completeness without modifying"
|
|
)
|
|
parser.add_argument(
|
|
"--pattern", type=str, default="*.yaml",
|
|
help="File pattern to match (default: *.yaml)"
|
|
)
|
|
parser.add_argument(
|
|
"--section", type=str, default=None,
|
|
choices=list(ENRICHMENT_SECTIONS.keys()),
|
|
help="Process only a specific enrichment section type"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check requirements
|
|
if not HAS_RUAMEL:
|
|
print("Error: ruamel.yaml is required. Install with: pip install ruamel.yaml", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Configure YAML handler to preserve formatting
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
yaml.default_flow_style = False
|
|
yaml.indent(mapping=2, sequence=4, offset=2)
|
|
yaml.width = 120
|
|
|
|
# Get files to process
|
|
if args.file:
|
|
files = [Path(args.file)]
|
|
if not files[0].exists():
|
|
print(f"Error: File not found: {args.file}", file=sys.stderr)
|
|
sys.exit(1)
|
|
else:
|
|
files = sorted(CUSTODIAN_DIR.glob(args.pattern))
|
|
# Only process files at the root level of data/custodian/
|
|
files = [f for f in files if f.parent == CUSTODIAN_DIR]
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
|
|
print(f"Processing {len(files)} files...")
|
|
print(f" Dry run: {args.dry_run}")
|
|
print(f" Validate only: {args.validate}")
|
|
if args.section:
|
|
print(f" Section filter: {args.section}")
|
|
|
|
# Validation mode
|
|
if args.validate:
|
|
total_errors = 0
|
|
files_with_errors = 0
|
|
|
|
for i, filepath in enumerate(files):
|
|
if args.verbose or (i + 1) % 100 == 0:
|
|
print(f"\n[{i+1}/{len(files)}] Validating {filepath.name}")
|
|
|
|
errors = validate_file(filepath, yaml, args.verbose)
|
|
|
|
if errors:
|
|
files_with_errors += 1
|
|
total_errors += len(errors)
|
|
if args.verbose:
|
|
for error in errors:
|
|
print(f" ERROR: {error}")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"VALIDATION SUMMARY")
|
|
print(f"{'='*60}")
|
|
print(f"Files validated: {len(files)}")
|
|
print(f"Files with errors: {files_with_errors}")
|
|
print(f"Total errors: {total_errors}")
|
|
|
|
sys.exit(0 if total_errors == 0 else 1)
|
|
|
|
# Processing mode
|
|
files_modified = 0
|
|
total_sections_updated = 0
|
|
total_sections = 0
|
|
|
|
for i, filepath in enumerate(files):
|
|
if args.verbose or (i + 1) % 100 == 0:
|
|
print(f"\n[{i+1}/{len(files)}] {filepath.name}")
|
|
|
|
modified, sections_updated, sections_total = process_file(
|
|
filepath=filepath,
|
|
yaml_handler=yaml,
|
|
dry_run=args.dry_run,
|
|
verbose=args.verbose
|
|
)
|
|
|
|
if modified:
|
|
files_modified += 1
|
|
total_sections_updated += sections_updated
|
|
total_sections += sections_total
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
print(f"SUMMARY")
|
|
print(f"{'='*60}")
|
|
print(f"Files processed: {len(files)}")
|
|
print(f"Files modified: {files_modified}")
|
|
print(f"Sections total: {total_sections}")
|
|
print(f"Sections updated: {total_sections_updated}")
|
|
|
|
if args.dry_run:
|
|
print(f"\n[DRY-RUN] No files were actually modified.")
|
|
else:
|
|
print(f"\nDone!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|