glam/scripts/validate_provenance.py
2025-12-30 03:43:31 +01:00

571 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive Provenance Validation Script
Validates provenance metadata across:
1. Custodian YAML files (~29K files)
2. Person entity JSON files (~8.9K files)
Checks:
- Presence of _provenance sub-keys in enrichment sections
- content_hash structure and format
- wasDerivedFrom URLs
- prov:Activity metadata
- verification status
- Root provenance summary consistency
Usage:
python scripts/validate_provenance.py [--sample N] [--verbose]
"""
import argparse
import hashlib
import json
import re
import sys
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Optional
# Use ruamel.yaml for YAML processing
try:
from ruamel.yaml import YAML # type: ignore
yaml = YAML()
yaml.preserve_quotes = True
except ImportError:
print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml")
sys.exit(1)
class ProvenanceValidator:
"""Validates provenance metadata in GLAM custodian and person files."""
ENRICHMENT_SECTIONS = [
'wikidata_enrichment',
'google_maps_enrichment',
'web_enrichment',
'youtube_enrichment',
'zcbs_enrichment',
]
# Expected wasDerivedFrom URL patterns
DERIVED_FROM_PATTERNS = {
'wikidata_enrichment': r'https://www\.wikidata\.org/wiki/Q\d+',
'google_maps_enrichment': r'https://maps\.googleapis\.com/',
'youtube_enrichment': r'https://(www\.)?(youtube\.com|youtu\.be)/',
'web_enrichment': r'https?://',
'zcbs_enrichment': r'https?://',
}
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.stats = {
'yaml': {
'total_files': 0,
'files_with_enrichment': 0,
'files_with_provenance': 0,
'sections_checked': 0,
'sections_with_provenance': 0,
'sections_with_content_hash': 0,
'sections_with_derived_from': 0,
'valid_content_hashes': 0,
'invalid_content_hashes': 0,
'errors': [],
'warnings': [],
'by_section': defaultdict(lambda: {
'total': 0,
'with_provenance': 0,
'with_content_hash': 0,
'with_derived_from': 0,
}),
},
'json': {
'total_files': 0,
'files_with_web_claims': 0,
'claims_checked': 0,
'claims_with_provenance': 0,
'errors': [],
'warnings': [],
},
}
def validate_content_hash(self, hash_data: dict) -> tuple[bool, str]:
"""Validate content_hash structure and format."""
required_fields = ['algorithm', 'value', 'scope', 'computed_at']
for field in required_fields:
if field not in hash_data:
return False, f"Missing required field: {field}"
# Validate algorithm
if hash_data['algorithm'] != 'sha256':
return False, f"Unexpected algorithm: {hash_data['algorithm']}"
# Validate value format (sha256-BASE64)
value = hash_data['value']
if not value.startswith('sha256-'):
return False, f"Invalid hash prefix: {value[:20]}..."
# Validate base64 portion (should be 44 chars for SHA-256)
base64_part = value[7:] # Remove 'sha256-' prefix
if len(base64_part) < 40:
return False, f"Hash value too short: {len(base64_part)} chars"
# Validate computed_at is ISO format
try:
datetime.fromisoformat(hash_data['computed_at'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
return False, f"Invalid computed_at timestamp: {hash_data.get('computed_at')}"
return True, "Valid"
def validate_prov_section(self, prov_data: dict, section_name: str) -> tuple[bool, list[str]]:
"""Validate prov: section structure."""
issues = []
# Check wasDerivedFrom
derived_from = prov_data.get('wasDerivedFrom')
if not derived_from:
issues.append("Missing wasDerivedFrom")
else:
# Validate URL pattern for section type
pattern = self.DERIVED_FROM_PATTERNS.get(section_name, r'https?://')
if not re.match(pattern, str(derived_from)):
issues.append(f"wasDerivedFrom URL doesn't match expected pattern for {section_name}")
# Check generatedAtTime
gen_time = prov_data.get('generatedAtTime')
if gen_time:
try:
datetime.fromisoformat(str(gen_time).replace('Z', '+00:00'))
except (ValueError, AttributeError):
issues.append(f"Invalid generatedAtTime: {gen_time}")
# Check wasGeneratedBy
generated_by = prov_data.get('wasGeneratedBy')
if generated_by:
if not isinstance(generated_by, dict):
issues.append("wasGeneratedBy should be a dict")
elif '@type' not in generated_by:
issues.append("wasGeneratedBy missing @type")
return len(issues) == 0, issues
def validate_yaml_file(self, filepath: Path) -> dict:
"""Validate a single YAML custodian file."""
result = {
'filepath': str(filepath),
'has_enrichment': False,
'has_provenance': False,
'sections': {},
'errors': [],
'warnings': [],
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.load(f)
except Exception as e:
result['errors'].append(f"Failed to parse YAML: {e}")
return result
if not isinstance(data, dict):
result['errors'].append("Root element is not a dict")
return result
# Check each enrichment section
for section_name in self.ENRICHMENT_SECTIONS:
if section_name not in data:
continue
result['has_enrichment'] = True
section = data[section_name]
if not isinstance(section, dict):
result['warnings'].append(f"{section_name} is not a dict")
continue
section_result = {
'exists': True,
'has_provenance': False,
'has_content_hash': False,
'has_derived_from': False,
'content_hash_valid': None,
'prov_valid': None,
'issues': [],
}
self.stats['yaml']['sections_checked'] += 1
self.stats['yaml']['by_section'][section_name]['total'] += 1
# Check for _provenance
provenance = section.get('_provenance')
if provenance and isinstance(provenance, dict):
section_result['has_provenance'] = True
result['has_provenance'] = True
self.stats['yaml']['sections_with_provenance'] += 1
self.stats['yaml']['by_section'][section_name]['with_provenance'] += 1
# Validate content_hash
content_hash = provenance.get('content_hash')
if content_hash:
section_result['has_content_hash'] = True
self.stats['yaml']['sections_with_content_hash'] += 1
self.stats['yaml']['by_section'][section_name]['with_content_hash'] += 1
is_valid, msg = self.validate_content_hash(content_hash)
section_result['content_hash_valid'] = is_valid
if is_valid:
self.stats['yaml']['valid_content_hashes'] += 1
else:
self.stats['yaml']['invalid_content_hashes'] += 1
section_result['issues'].append(f"content_hash: {msg}")
# Validate prov section
prov = provenance.get('prov')
if prov:
if prov.get('wasDerivedFrom'):
section_result['has_derived_from'] = True
self.stats['yaml']['sections_with_derived_from'] += 1
self.stats['yaml']['by_section'][section_name]['with_derived_from'] += 1
is_valid, issues = self.validate_prov_section(prov, section_name)
section_result['prov_valid'] = is_valid
section_result['issues'].extend(issues)
result['sections'][section_name] = section_result
return result
def validate_json_file(self, filepath: Path) -> dict:
"""Validate a single JSON person entity file."""
result = {
'filepath': str(filepath),
'has_web_claims': False,
'claims_checked': 0,
'claims_with_provenance': 0,
'errors': [],
'warnings': [],
}
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception as e:
result['errors'].append(f"Failed to parse JSON: {e}")
return result
if not isinstance(data, dict):
result['errors'].append("Root element is not a dict")
return result
# Check web_claims
web_claims = data.get('web_claims', [])
if not web_claims:
return result
result['has_web_claims'] = True
for i, claim in enumerate(web_claims):
if not isinstance(claim, dict):
continue
result['claims_checked'] += 1
# Check for provenance fields
has_source_url = bool(claim.get('source_url'))
has_retrieved_on = bool(claim.get('retrieved_on'))
has_retrieval_agent = bool(claim.get('retrieval_agent'))
if has_source_url and has_retrieved_on:
result['claims_with_provenance'] += 1
else:
missing = []
if not has_source_url:
missing.append('source_url')
if not has_retrieved_on:
missing.append('retrieved_on')
result['warnings'].append(f"Claim {i}: missing {', '.join(missing)}")
return result
def validate_yaml_directory(self, directory: Path, sample_size: Optional[int] = None) -> None:
"""Validate all YAML files in directory."""
yaml_files = list(directory.glob('*.yaml'))
if sample_size and sample_size < len(yaml_files):
import random
yaml_files = random.sample(yaml_files, sample_size)
print(f"\nValidating {len(yaml_files)} YAML files...")
for i, filepath in enumerate(yaml_files):
if (i + 1) % 1000 == 0:
print(f" Progress: {i + 1}/{len(yaml_files)}")
self.stats['yaml']['total_files'] += 1
result = self.validate_yaml_file(filepath)
if result['has_enrichment']:
self.stats['yaml']['files_with_enrichment'] += 1
if result['has_provenance']:
self.stats['yaml']['files_with_provenance'] += 1
if result['errors']:
self.stats['yaml']['errors'].extend(
[(str(filepath), e) for e in result['errors']]
)
if self.verbose and (result['errors'] or any(
s.get('issues') for s in result['sections'].values()
)):
print(f"\n Issues in {filepath.name}:")
for err in result['errors']:
print(f" ERROR: {err}")
for section, data in result['sections'].items():
for issue in data.get('issues', []):
print(f" {section}: {issue}")
def validate_json_directory(self, directory: Path, sample_size: Optional[int] = None) -> None:
"""Validate all JSON person entity files."""
json_files = list(directory.glob('*.json'))
if sample_size and sample_size < len(json_files):
import random
json_files = random.sample(json_files, sample_size)
print(f"\nValidating {len(json_files)} JSON files...")
for i, filepath in enumerate(json_files):
if (i + 1) % 1000 == 0:
print(f" Progress: {i + 1}/{len(json_files)}")
self.stats['json']['total_files'] += 1
result = self.validate_json_file(filepath)
if result['has_web_claims']:
self.stats['json']['files_with_web_claims'] += 1
self.stats['json']['claims_checked'] += result['claims_checked']
self.stats['json']['claims_with_provenance'] += result['claims_with_provenance']
if result['errors']:
self.stats['json']['errors'].extend(
[(str(filepath), e) for e in result['errors']]
)
def generate_report(self) -> str:
"""Generate validation report."""
lines = [
"=" * 70,
"PROVENANCE VALIDATION REPORT",
f"Generated: {datetime.now().isoformat()}",
"=" * 70,
"",
"## YAML Custodian Files",
"-" * 40,
f"Total files scanned: {self.stats['yaml']['total_files']:,}",
f"Files with enrichment: {self.stats['yaml']['files_with_enrichment']:,}",
f"Files with provenance: {self.stats['yaml']['files_with_provenance']:,}",
"",
f"Sections checked: {self.stats['yaml']['sections_checked']:,}",
f"Sections with _provenance: {self.stats['yaml']['sections_with_provenance']:,}",
f"Sections with content_hash: {self.stats['yaml']['sections_with_content_hash']:,}",
f"Sections with wasDerivedFrom: {self.stats['yaml']['sections_with_derived_from']:,}",
"",
f"Valid content_hashes: {self.stats['yaml']['valid_content_hashes']:,}",
f"Invalid content_hashes: {self.stats['yaml']['invalid_content_hashes']:,}",
"",
]
# Coverage calculation
if self.stats['yaml']['sections_checked'] > 0:
prov_coverage = (self.stats['yaml']['sections_with_provenance'] /
self.stats['yaml']['sections_checked'] * 100)
hash_coverage = (self.stats['yaml']['sections_with_content_hash'] /
self.stats['yaml']['sections_checked'] * 100)
derived_coverage = (self.stats['yaml']['sections_with_derived_from'] /
self.stats['yaml']['sections_checked'] * 100)
lines.extend([
"### Coverage Rates",
f" _provenance coverage: {prov_coverage:.1f}%",
f" content_hash coverage: {hash_coverage:.1f}%",
f" wasDerivedFrom coverage: {derived_coverage:.1f}%",
"",
])
# By section breakdown
lines.extend([
"### By Enrichment Section",
"-" * 40,
])
for section_name, section_stats in sorted(self.stats['yaml']['by_section'].items()):
if section_stats['total'] > 0:
prov_pct = section_stats['with_provenance'] / section_stats['total'] * 100
hash_pct = section_stats['with_content_hash'] / section_stats['total'] * 100
derived_pct = section_stats['with_derived_from'] / section_stats['total'] * 100
lines.extend([
f"\n{section_name}:",
f" Total sections: {section_stats['total']:,}",
f" With _provenance: {section_stats['with_provenance']:,} ({prov_pct:.1f}%)",
f" With content_hash: {section_stats['with_content_hash']:,} ({hash_pct:.1f}%)",
f" With wasDerivedFrom: {section_stats['with_derived_from']:,} ({derived_pct:.1f}%)",
])
# JSON section
lines.extend([
"",
"",
"## JSON Person Entity Files",
"-" * 40,
f"Total files scanned: {self.stats['json']['total_files']:,}",
f"Files with web_claims: {self.stats['json']['files_with_web_claims']:,}",
f"Claims checked: {self.stats['json']['claims_checked']:,}",
f"Claims with provenance: {self.stats['json']['claims_with_provenance']:,}",
])
if self.stats['json']['claims_checked'] > 0:
claims_coverage = (self.stats['json']['claims_with_provenance'] /
self.stats['json']['claims_checked'] * 100)
lines.append(f"Claims provenance coverage: {claims_coverage:.1f}%")
# Errors summary
yaml_errors = len(self.stats['yaml']['errors'])
json_errors = len(self.stats['json']['errors'])
lines.extend([
"",
"",
"## Error Summary",
"-" * 40,
f"YAML parsing errors: {yaml_errors}",
f"JSON parsing errors: {json_errors}",
])
if yaml_errors > 0 and yaml_errors <= 20:
lines.append("\nYAML Errors:")
for filepath, error in self.stats['yaml']['errors'][:20]:
lines.append(f" {Path(filepath).name}: {error}")
if json_errors > 0 and json_errors <= 20:
lines.append("\nJSON Errors:")
for filepath, error in self.stats['json']['errors'][:20]:
lines.append(f" {Path(filepath).name}: {error}")
# Final status
lines.extend([
"",
"",
"=" * 70,
"VALIDATION STATUS",
"=" * 70,
])
# Determine overall status
issues = []
if self.stats['yaml']['invalid_content_hashes'] > 0:
issues.append(f"{self.stats['yaml']['invalid_content_hashes']} invalid content hashes")
if yaml_errors > 0:
issues.append(f"{yaml_errors} YAML parsing errors")
if json_errors > 0:
issues.append(f"{json_errors} JSON parsing errors")
# Check coverage thresholds
if self.stats['yaml']['sections_checked'] > 0:
prov_coverage = (self.stats['yaml']['sections_with_provenance'] /
self.stats['yaml']['sections_checked'] * 100)
if prov_coverage < 95:
issues.append(f"_provenance coverage below 95% ({prov_coverage:.1f}%)")
if issues:
lines.append(f"STATUS: ISSUES FOUND")
for issue in issues:
lines.append(f" - {issue}")
else:
lines.append("STATUS: PASSED")
lines.append(" All provenance metadata validated successfully!")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description='Validate provenance metadata across GLAM dataset'
)
parser.add_argument(
'--sample', '-s', type=int, default=None,
help='Sample size for validation (default: all files)'
)
parser.add_argument(
'--verbose', '-v', action='store_true',
help='Show detailed validation issues'
)
parser.add_argument(
'--yaml-only', action='store_true',
help='Only validate YAML files'
)
parser.add_argument(
'--json-only', action='store_true',
help='Only validate JSON files'
)
parser.add_argument(
'--output', '-o', type=str, default=None,
help='Output report to file'
)
args = parser.parse_args()
# Determine base path
script_dir = Path(__file__).parent
base_dir = script_dir.parent
custodian_dir = base_dir / 'data' / 'custodian'
person_dir = custodian_dir / 'person' / 'entity'
validator = ProvenanceValidator(verbose=args.verbose)
# Validate YAML files
if not args.json_only:
if custodian_dir.exists():
validator.validate_yaml_directory(custodian_dir, args.sample)
else:
print(f"WARNING: Custodian directory not found: {custodian_dir}")
# Validate JSON files
if not args.yaml_only:
if person_dir.exists():
validator.validate_json_directory(person_dir, args.sample)
else:
print(f"WARNING: Person entity directory not found: {person_dir}")
# Generate report
report = validator.generate_report()
print(report)
# Save report if requested
if args.output:
output_path = Path(args.output)
output_path.write_text(report)
print(f"\nReport saved to: {output_path}")
# Return exit code based on status
has_errors = (
validator.stats['yaml']['invalid_content_hashes'] > 0 or
len(validator.stats['yaml']['errors']) > 0 or
len(validator.stats['json']['errors']) > 0
)
sys.exit(1 if has_errors else 0)
if __name__ == '__main__':
main()