571 lines
21 KiB
Python
571 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive Provenance Validation Script
|
|
|
|
Validates provenance metadata across:
|
|
1. Custodian YAML files (~29K files)
|
|
2. Person entity JSON files (~8.9K files)
|
|
|
|
Checks:
|
|
- Presence of _provenance sub-keys in enrichment sections
|
|
- content_hash structure and format
|
|
- wasDerivedFrom URLs
|
|
- prov:Activity metadata
|
|
- verification status
|
|
- Root provenance summary consistency
|
|
|
|
Usage:
|
|
python scripts/validate_provenance.py [--sample N] [--verbose]
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Use ruamel.yaml for YAML processing
|
|
try:
|
|
from ruamel.yaml import YAML # type: ignore
|
|
yaml = YAML()
|
|
yaml.preserve_quotes = True
|
|
except ImportError:
|
|
print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml")
|
|
sys.exit(1)
|
|
|
|
|
|
class ProvenanceValidator:
|
|
"""Validates provenance metadata in GLAM custodian and person files."""
|
|
|
|
ENRICHMENT_SECTIONS = [
|
|
'wikidata_enrichment',
|
|
'google_maps_enrichment',
|
|
'web_enrichment',
|
|
'youtube_enrichment',
|
|
'zcbs_enrichment',
|
|
]
|
|
|
|
# Expected wasDerivedFrom URL patterns
|
|
DERIVED_FROM_PATTERNS = {
|
|
'wikidata_enrichment': r'https://www\.wikidata\.org/wiki/Q\d+',
|
|
'google_maps_enrichment': r'https://maps\.googleapis\.com/',
|
|
'youtube_enrichment': r'https://(www\.)?(youtube\.com|youtu\.be)/',
|
|
'web_enrichment': r'https?://',
|
|
'zcbs_enrichment': r'https?://',
|
|
}
|
|
|
|
def __init__(self, verbose: bool = False):
|
|
self.verbose = verbose
|
|
self.stats = {
|
|
'yaml': {
|
|
'total_files': 0,
|
|
'files_with_enrichment': 0,
|
|
'files_with_provenance': 0,
|
|
'sections_checked': 0,
|
|
'sections_with_provenance': 0,
|
|
'sections_with_content_hash': 0,
|
|
'sections_with_derived_from': 0,
|
|
'valid_content_hashes': 0,
|
|
'invalid_content_hashes': 0,
|
|
'errors': [],
|
|
'warnings': [],
|
|
'by_section': defaultdict(lambda: {
|
|
'total': 0,
|
|
'with_provenance': 0,
|
|
'with_content_hash': 0,
|
|
'with_derived_from': 0,
|
|
}),
|
|
},
|
|
'json': {
|
|
'total_files': 0,
|
|
'files_with_web_claims': 0,
|
|
'claims_checked': 0,
|
|
'claims_with_provenance': 0,
|
|
'errors': [],
|
|
'warnings': [],
|
|
},
|
|
}
|
|
|
|
def validate_content_hash(self, hash_data: dict) -> tuple[bool, str]:
|
|
"""Validate content_hash structure and format."""
|
|
required_fields = ['algorithm', 'value', 'scope', 'computed_at']
|
|
|
|
for field in required_fields:
|
|
if field not in hash_data:
|
|
return False, f"Missing required field: {field}"
|
|
|
|
# Validate algorithm
|
|
if hash_data['algorithm'] != 'sha256':
|
|
return False, f"Unexpected algorithm: {hash_data['algorithm']}"
|
|
|
|
# Validate value format (sha256-BASE64)
|
|
value = hash_data['value']
|
|
if not value.startswith('sha256-'):
|
|
return False, f"Invalid hash prefix: {value[:20]}..."
|
|
|
|
# Validate base64 portion (should be 44 chars for SHA-256)
|
|
base64_part = value[7:] # Remove 'sha256-' prefix
|
|
if len(base64_part) < 40:
|
|
return False, f"Hash value too short: {len(base64_part)} chars"
|
|
|
|
# Validate computed_at is ISO format
|
|
try:
|
|
datetime.fromisoformat(hash_data['computed_at'].replace('Z', '+00:00'))
|
|
except (ValueError, AttributeError):
|
|
return False, f"Invalid computed_at timestamp: {hash_data.get('computed_at')}"
|
|
|
|
return True, "Valid"
|
|
|
|
def validate_prov_section(self, prov_data: dict, section_name: str) -> tuple[bool, list[str]]:
|
|
"""Validate prov: section structure."""
|
|
issues = []
|
|
|
|
# Check wasDerivedFrom
|
|
derived_from = prov_data.get('wasDerivedFrom')
|
|
if not derived_from:
|
|
issues.append("Missing wasDerivedFrom")
|
|
else:
|
|
# Validate URL pattern for section type
|
|
pattern = self.DERIVED_FROM_PATTERNS.get(section_name, r'https?://')
|
|
if not re.match(pattern, str(derived_from)):
|
|
issues.append(f"wasDerivedFrom URL doesn't match expected pattern for {section_name}")
|
|
|
|
# Check generatedAtTime
|
|
gen_time = prov_data.get('generatedAtTime')
|
|
if gen_time:
|
|
try:
|
|
datetime.fromisoformat(str(gen_time).replace('Z', '+00:00'))
|
|
except (ValueError, AttributeError):
|
|
issues.append(f"Invalid generatedAtTime: {gen_time}")
|
|
|
|
# Check wasGeneratedBy
|
|
generated_by = prov_data.get('wasGeneratedBy')
|
|
if generated_by:
|
|
if not isinstance(generated_by, dict):
|
|
issues.append("wasGeneratedBy should be a dict")
|
|
elif '@type' not in generated_by:
|
|
issues.append("wasGeneratedBy missing @type")
|
|
|
|
return len(issues) == 0, issues
|
|
|
|
def validate_yaml_file(self, filepath: Path) -> dict:
|
|
"""Validate a single YAML custodian file."""
|
|
result = {
|
|
'filepath': str(filepath),
|
|
'has_enrichment': False,
|
|
'has_provenance': False,
|
|
'sections': {},
|
|
'errors': [],
|
|
'warnings': [],
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.load(f)
|
|
except Exception as e:
|
|
result['errors'].append(f"Failed to parse YAML: {e}")
|
|
return result
|
|
|
|
if not isinstance(data, dict):
|
|
result['errors'].append("Root element is not a dict")
|
|
return result
|
|
|
|
# Check each enrichment section
|
|
for section_name in self.ENRICHMENT_SECTIONS:
|
|
if section_name not in data:
|
|
continue
|
|
|
|
result['has_enrichment'] = True
|
|
section = data[section_name]
|
|
|
|
if not isinstance(section, dict):
|
|
result['warnings'].append(f"{section_name} is not a dict")
|
|
continue
|
|
|
|
section_result = {
|
|
'exists': True,
|
|
'has_provenance': False,
|
|
'has_content_hash': False,
|
|
'has_derived_from': False,
|
|
'content_hash_valid': None,
|
|
'prov_valid': None,
|
|
'issues': [],
|
|
}
|
|
|
|
self.stats['yaml']['sections_checked'] += 1
|
|
self.stats['yaml']['by_section'][section_name]['total'] += 1
|
|
|
|
# Check for _provenance
|
|
provenance = section.get('_provenance')
|
|
if provenance and isinstance(provenance, dict):
|
|
section_result['has_provenance'] = True
|
|
result['has_provenance'] = True
|
|
self.stats['yaml']['sections_with_provenance'] += 1
|
|
self.stats['yaml']['by_section'][section_name]['with_provenance'] += 1
|
|
|
|
# Validate content_hash
|
|
content_hash = provenance.get('content_hash')
|
|
if content_hash:
|
|
section_result['has_content_hash'] = True
|
|
self.stats['yaml']['sections_with_content_hash'] += 1
|
|
self.stats['yaml']['by_section'][section_name]['with_content_hash'] += 1
|
|
|
|
is_valid, msg = self.validate_content_hash(content_hash)
|
|
section_result['content_hash_valid'] = is_valid
|
|
if is_valid:
|
|
self.stats['yaml']['valid_content_hashes'] += 1
|
|
else:
|
|
self.stats['yaml']['invalid_content_hashes'] += 1
|
|
section_result['issues'].append(f"content_hash: {msg}")
|
|
|
|
# Validate prov section
|
|
prov = provenance.get('prov')
|
|
if prov:
|
|
if prov.get('wasDerivedFrom'):
|
|
section_result['has_derived_from'] = True
|
|
self.stats['yaml']['sections_with_derived_from'] += 1
|
|
self.stats['yaml']['by_section'][section_name]['with_derived_from'] += 1
|
|
|
|
is_valid, issues = self.validate_prov_section(prov, section_name)
|
|
section_result['prov_valid'] = is_valid
|
|
section_result['issues'].extend(issues)
|
|
|
|
result['sections'][section_name] = section_result
|
|
|
|
return result
|
|
|
|
def validate_json_file(self, filepath: Path) -> dict:
|
|
"""Validate a single JSON person entity file."""
|
|
result = {
|
|
'filepath': str(filepath),
|
|
'has_web_claims': False,
|
|
'claims_checked': 0,
|
|
'claims_with_provenance': 0,
|
|
'errors': [],
|
|
'warnings': [],
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
except Exception as e:
|
|
result['errors'].append(f"Failed to parse JSON: {e}")
|
|
return result
|
|
|
|
if not isinstance(data, dict):
|
|
result['errors'].append("Root element is not a dict")
|
|
return result
|
|
|
|
# Check web_claims
|
|
web_claims = data.get('web_claims', [])
|
|
if not web_claims:
|
|
return result
|
|
|
|
result['has_web_claims'] = True
|
|
|
|
for i, claim in enumerate(web_claims):
|
|
if not isinstance(claim, dict):
|
|
continue
|
|
|
|
result['claims_checked'] += 1
|
|
|
|
# Check for provenance fields
|
|
has_source_url = bool(claim.get('source_url'))
|
|
has_retrieved_on = bool(claim.get('retrieved_on'))
|
|
has_retrieval_agent = bool(claim.get('retrieval_agent'))
|
|
|
|
if has_source_url and has_retrieved_on:
|
|
result['claims_with_provenance'] += 1
|
|
else:
|
|
missing = []
|
|
if not has_source_url:
|
|
missing.append('source_url')
|
|
if not has_retrieved_on:
|
|
missing.append('retrieved_on')
|
|
result['warnings'].append(f"Claim {i}: missing {', '.join(missing)}")
|
|
|
|
return result
|
|
|
|
def validate_yaml_directory(self, directory: Path, sample_size: Optional[int] = None) -> None:
|
|
"""Validate all YAML files in directory."""
|
|
yaml_files = list(directory.glob('*.yaml'))
|
|
|
|
if sample_size and sample_size < len(yaml_files):
|
|
import random
|
|
yaml_files = random.sample(yaml_files, sample_size)
|
|
|
|
print(f"\nValidating {len(yaml_files)} YAML files...")
|
|
|
|
for i, filepath in enumerate(yaml_files):
|
|
if (i + 1) % 1000 == 0:
|
|
print(f" Progress: {i + 1}/{len(yaml_files)}")
|
|
|
|
self.stats['yaml']['total_files'] += 1
|
|
result = self.validate_yaml_file(filepath)
|
|
|
|
if result['has_enrichment']:
|
|
self.stats['yaml']['files_with_enrichment'] += 1
|
|
if result['has_provenance']:
|
|
self.stats['yaml']['files_with_provenance'] += 1
|
|
|
|
if result['errors']:
|
|
self.stats['yaml']['errors'].extend(
|
|
[(str(filepath), e) for e in result['errors']]
|
|
)
|
|
|
|
if self.verbose and (result['errors'] or any(
|
|
s.get('issues') for s in result['sections'].values()
|
|
)):
|
|
print(f"\n Issues in {filepath.name}:")
|
|
for err in result['errors']:
|
|
print(f" ERROR: {err}")
|
|
for section, data in result['sections'].items():
|
|
for issue in data.get('issues', []):
|
|
print(f" {section}: {issue}")
|
|
|
|
def validate_json_directory(self, directory: Path, sample_size: Optional[int] = None) -> None:
|
|
"""Validate all JSON person entity files."""
|
|
json_files = list(directory.glob('*.json'))
|
|
|
|
if sample_size and sample_size < len(json_files):
|
|
import random
|
|
json_files = random.sample(json_files, sample_size)
|
|
|
|
print(f"\nValidating {len(json_files)} JSON files...")
|
|
|
|
for i, filepath in enumerate(json_files):
|
|
if (i + 1) % 1000 == 0:
|
|
print(f" Progress: {i + 1}/{len(json_files)}")
|
|
|
|
self.stats['json']['total_files'] += 1
|
|
result = self.validate_json_file(filepath)
|
|
|
|
if result['has_web_claims']:
|
|
self.stats['json']['files_with_web_claims'] += 1
|
|
|
|
self.stats['json']['claims_checked'] += result['claims_checked']
|
|
self.stats['json']['claims_with_provenance'] += result['claims_with_provenance']
|
|
|
|
if result['errors']:
|
|
self.stats['json']['errors'].extend(
|
|
[(str(filepath), e) for e in result['errors']]
|
|
)
|
|
|
|
def generate_report(self) -> str:
|
|
"""Generate validation report."""
|
|
lines = [
|
|
"=" * 70,
|
|
"PROVENANCE VALIDATION REPORT",
|
|
f"Generated: {datetime.now().isoformat()}",
|
|
"=" * 70,
|
|
"",
|
|
"## YAML Custodian Files",
|
|
"-" * 40,
|
|
f"Total files scanned: {self.stats['yaml']['total_files']:,}",
|
|
f"Files with enrichment: {self.stats['yaml']['files_with_enrichment']:,}",
|
|
f"Files with provenance: {self.stats['yaml']['files_with_provenance']:,}",
|
|
"",
|
|
f"Sections checked: {self.stats['yaml']['sections_checked']:,}",
|
|
f"Sections with _provenance: {self.stats['yaml']['sections_with_provenance']:,}",
|
|
f"Sections with content_hash: {self.stats['yaml']['sections_with_content_hash']:,}",
|
|
f"Sections with wasDerivedFrom: {self.stats['yaml']['sections_with_derived_from']:,}",
|
|
"",
|
|
f"Valid content_hashes: {self.stats['yaml']['valid_content_hashes']:,}",
|
|
f"Invalid content_hashes: {self.stats['yaml']['invalid_content_hashes']:,}",
|
|
"",
|
|
]
|
|
|
|
# Coverage calculation
|
|
if self.stats['yaml']['sections_checked'] > 0:
|
|
prov_coverage = (self.stats['yaml']['sections_with_provenance'] /
|
|
self.stats['yaml']['sections_checked'] * 100)
|
|
hash_coverage = (self.stats['yaml']['sections_with_content_hash'] /
|
|
self.stats['yaml']['sections_checked'] * 100)
|
|
derived_coverage = (self.stats['yaml']['sections_with_derived_from'] /
|
|
self.stats['yaml']['sections_checked'] * 100)
|
|
|
|
lines.extend([
|
|
"### Coverage Rates",
|
|
f" _provenance coverage: {prov_coverage:.1f}%",
|
|
f" content_hash coverage: {hash_coverage:.1f}%",
|
|
f" wasDerivedFrom coverage: {derived_coverage:.1f}%",
|
|
"",
|
|
])
|
|
|
|
# By section breakdown
|
|
lines.extend([
|
|
"### By Enrichment Section",
|
|
"-" * 40,
|
|
])
|
|
|
|
for section_name, section_stats in sorted(self.stats['yaml']['by_section'].items()):
|
|
if section_stats['total'] > 0:
|
|
prov_pct = section_stats['with_provenance'] / section_stats['total'] * 100
|
|
hash_pct = section_stats['with_content_hash'] / section_stats['total'] * 100
|
|
derived_pct = section_stats['with_derived_from'] / section_stats['total'] * 100
|
|
|
|
lines.extend([
|
|
f"\n{section_name}:",
|
|
f" Total sections: {section_stats['total']:,}",
|
|
f" With _provenance: {section_stats['with_provenance']:,} ({prov_pct:.1f}%)",
|
|
f" With content_hash: {section_stats['with_content_hash']:,} ({hash_pct:.1f}%)",
|
|
f" With wasDerivedFrom: {section_stats['with_derived_from']:,} ({derived_pct:.1f}%)",
|
|
])
|
|
|
|
# JSON section
|
|
lines.extend([
|
|
"",
|
|
"",
|
|
"## JSON Person Entity Files",
|
|
"-" * 40,
|
|
f"Total files scanned: {self.stats['json']['total_files']:,}",
|
|
f"Files with web_claims: {self.stats['json']['files_with_web_claims']:,}",
|
|
f"Claims checked: {self.stats['json']['claims_checked']:,}",
|
|
f"Claims with provenance: {self.stats['json']['claims_with_provenance']:,}",
|
|
])
|
|
|
|
if self.stats['json']['claims_checked'] > 0:
|
|
claims_coverage = (self.stats['json']['claims_with_provenance'] /
|
|
self.stats['json']['claims_checked'] * 100)
|
|
lines.append(f"Claims provenance coverage: {claims_coverage:.1f}%")
|
|
|
|
# Errors summary
|
|
yaml_errors = len(self.stats['yaml']['errors'])
|
|
json_errors = len(self.stats['json']['errors'])
|
|
|
|
lines.extend([
|
|
"",
|
|
"",
|
|
"## Error Summary",
|
|
"-" * 40,
|
|
f"YAML parsing errors: {yaml_errors}",
|
|
f"JSON parsing errors: {json_errors}",
|
|
])
|
|
|
|
if yaml_errors > 0 and yaml_errors <= 20:
|
|
lines.append("\nYAML Errors:")
|
|
for filepath, error in self.stats['yaml']['errors'][:20]:
|
|
lines.append(f" {Path(filepath).name}: {error}")
|
|
|
|
if json_errors > 0 and json_errors <= 20:
|
|
lines.append("\nJSON Errors:")
|
|
for filepath, error in self.stats['json']['errors'][:20]:
|
|
lines.append(f" {Path(filepath).name}: {error}")
|
|
|
|
# Final status
|
|
lines.extend([
|
|
"",
|
|
"",
|
|
"=" * 70,
|
|
"VALIDATION STATUS",
|
|
"=" * 70,
|
|
])
|
|
|
|
# Determine overall status
|
|
issues = []
|
|
|
|
if self.stats['yaml']['invalid_content_hashes'] > 0:
|
|
issues.append(f"{self.stats['yaml']['invalid_content_hashes']} invalid content hashes")
|
|
|
|
if yaml_errors > 0:
|
|
issues.append(f"{yaml_errors} YAML parsing errors")
|
|
|
|
if json_errors > 0:
|
|
issues.append(f"{json_errors} JSON parsing errors")
|
|
|
|
# Check coverage thresholds
|
|
if self.stats['yaml']['sections_checked'] > 0:
|
|
prov_coverage = (self.stats['yaml']['sections_with_provenance'] /
|
|
self.stats['yaml']['sections_checked'] * 100)
|
|
if prov_coverage < 95:
|
|
issues.append(f"_provenance coverage below 95% ({prov_coverage:.1f}%)")
|
|
|
|
if issues:
|
|
lines.append(f"STATUS: ISSUES FOUND")
|
|
for issue in issues:
|
|
lines.append(f" - {issue}")
|
|
else:
|
|
lines.append("STATUS: PASSED")
|
|
lines.append(" All provenance metadata validated successfully!")
|
|
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Validate provenance metadata across GLAM dataset'
|
|
)
|
|
parser.add_argument(
|
|
'--sample', '-s', type=int, default=None,
|
|
help='Sample size for validation (default: all files)'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose', '-v', action='store_true',
|
|
help='Show detailed validation issues'
|
|
)
|
|
parser.add_argument(
|
|
'--yaml-only', action='store_true',
|
|
help='Only validate YAML files'
|
|
)
|
|
parser.add_argument(
|
|
'--json-only', action='store_true',
|
|
help='Only validate JSON files'
|
|
)
|
|
parser.add_argument(
|
|
'--output', '-o', type=str, default=None,
|
|
help='Output report to file'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Determine base path
|
|
script_dir = Path(__file__).parent
|
|
base_dir = script_dir.parent
|
|
|
|
custodian_dir = base_dir / 'data' / 'custodian'
|
|
person_dir = custodian_dir / 'person' / 'entity'
|
|
|
|
validator = ProvenanceValidator(verbose=args.verbose)
|
|
|
|
# Validate YAML files
|
|
if not args.json_only:
|
|
if custodian_dir.exists():
|
|
validator.validate_yaml_directory(custodian_dir, args.sample)
|
|
else:
|
|
print(f"WARNING: Custodian directory not found: {custodian_dir}")
|
|
|
|
# Validate JSON files
|
|
if not args.yaml_only:
|
|
if person_dir.exists():
|
|
validator.validate_json_directory(person_dir, args.sample)
|
|
else:
|
|
print(f"WARNING: Person entity directory not found: {person_dir}")
|
|
|
|
# Generate report
|
|
report = validator.generate_report()
|
|
print(report)
|
|
|
|
# Save report if requested
|
|
if args.output:
|
|
output_path = Path(args.output)
|
|
output_path.write_text(report)
|
|
print(f"\nReport saved to: {output_path}")
|
|
|
|
# Return exit code based on status
|
|
has_errors = (
|
|
validator.stats['yaml']['invalid_content_hashes'] > 0 or
|
|
len(validator.stats['yaml']['errors']) > 0 or
|
|
len(validator.stats['json']['errors']) > 0
|
|
)
|
|
|
|
sys.exit(1 if has_errors else 0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|