#!/usr/bin/env python3 """ Comprehensive Provenance Validation Script Validates provenance metadata across: 1. Custodian YAML files (~29K files) 2. Person entity JSON files (~8.9K files) Checks: - Presence of _provenance sub-keys in enrichment sections - content_hash structure and format - wasDerivedFrom URLs - prov:Activity metadata - verification status - Root provenance summary consistency Usage: python scripts/validate_provenance.py [--sample N] [--verbose] """ import argparse import hashlib import json import re import sys from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Optional # Use ruamel.yaml for YAML processing try: from ruamel.yaml import YAML # type: ignore yaml = YAML() yaml.preserve_quotes = True except ImportError: print("ERROR: ruamel.yaml not installed. Run: pip install ruamel.yaml") sys.exit(1) class ProvenanceValidator: """Validates provenance metadata in GLAM custodian and person files.""" ENRICHMENT_SECTIONS = [ 'wikidata_enrichment', 'google_maps_enrichment', 'web_enrichment', 'youtube_enrichment', 'zcbs_enrichment', ] # Expected wasDerivedFrom URL patterns DERIVED_FROM_PATTERNS = { 'wikidata_enrichment': r'https://www\.wikidata\.org/wiki/Q\d+', 'google_maps_enrichment': r'https://maps\.googleapis\.com/', 'youtube_enrichment': r'https://(www\.)?(youtube\.com|youtu\.be)/', 'web_enrichment': r'https?://', 'zcbs_enrichment': r'https?://', } def __init__(self, verbose: bool = False): self.verbose = verbose self.stats = { 'yaml': { 'total_files': 0, 'files_with_enrichment': 0, 'files_with_provenance': 0, 'sections_checked': 0, 'sections_with_provenance': 0, 'sections_with_content_hash': 0, 'sections_with_derived_from': 0, 'valid_content_hashes': 0, 'invalid_content_hashes': 0, 'errors': [], 'warnings': [], 'by_section': defaultdict(lambda: { 'total': 0, 'with_provenance': 0, 'with_content_hash': 0, 'with_derived_from': 0, }), }, 'json': { 'total_files': 0, 'files_with_web_claims': 0, 'claims_checked': 0, 'claims_with_provenance': 0, 'errors': [], 'warnings': [], }, } def validate_content_hash(self, hash_data: dict) -> tuple[bool, str]: """Validate content_hash structure and format.""" required_fields = ['algorithm', 'value', 'scope', 'computed_at'] for field in required_fields: if field not in hash_data: return False, f"Missing required field: {field}" # Validate algorithm if hash_data['algorithm'] != 'sha256': return False, f"Unexpected algorithm: {hash_data['algorithm']}" # Validate value format (sha256-BASE64) value = hash_data['value'] if not value.startswith('sha256-'): return False, f"Invalid hash prefix: {value[:20]}..." # Validate base64 portion (should be 44 chars for SHA-256) base64_part = value[7:] # Remove 'sha256-' prefix if len(base64_part) < 40: return False, f"Hash value too short: {len(base64_part)} chars" # Validate computed_at is ISO format try: datetime.fromisoformat(hash_data['computed_at'].replace('Z', '+00:00')) except (ValueError, AttributeError): return False, f"Invalid computed_at timestamp: {hash_data.get('computed_at')}" return True, "Valid" def validate_prov_section(self, prov_data: dict, section_name: str) -> tuple[bool, list[str]]: """Validate prov: section structure.""" issues = [] # Check wasDerivedFrom derived_from = prov_data.get('wasDerivedFrom') if not derived_from: issues.append("Missing wasDerivedFrom") else: # Validate URL pattern for section type pattern = self.DERIVED_FROM_PATTERNS.get(section_name, r'https?://') if not re.match(pattern, str(derived_from)): issues.append(f"wasDerivedFrom URL doesn't match expected pattern for {section_name}") # Check generatedAtTime gen_time = prov_data.get('generatedAtTime') if gen_time: try: datetime.fromisoformat(str(gen_time).replace('Z', '+00:00')) except (ValueError, AttributeError): issues.append(f"Invalid generatedAtTime: {gen_time}") # Check wasGeneratedBy generated_by = prov_data.get('wasGeneratedBy') if generated_by: if not isinstance(generated_by, dict): issues.append("wasGeneratedBy should be a dict") elif '@type' not in generated_by: issues.append("wasGeneratedBy missing @type") return len(issues) == 0, issues def validate_yaml_file(self, filepath: Path) -> dict: """Validate a single YAML custodian file.""" result = { 'filepath': str(filepath), 'has_enrichment': False, 'has_provenance': False, 'sections': {}, 'errors': [], 'warnings': [], } try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.load(f) except Exception as e: result['errors'].append(f"Failed to parse YAML: {e}") return result if not isinstance(data, dict): result['errors'].append("Root element is not a dict") return result # Check each enrichment section for section_name in self.ENRICHMENT_SECTIONS: if section_name not in data: continue result['has_enrichment'] = True section = data[section_name] if not isinstance(section, dict): result['warnings'].append(f"{section_name} is not a dict") continue section_result = { 'exists': True, 'has_provenance': False, 'has_content_hash': False, 'has_derived_from': False, 'content_hash_valid': None, 'prov_valid': None, 'issues': [], } self.stats['yaml']['sections_checked'] += 1 self.stats['yaml']['by_section'][section_name]['total'] += 1 # Check for _provenance provenance = section.get('_provenance') if provenance and isinstance(provenance, dict): section_result['has_provenance'] = True result['has_provenance'] = True self.stats['yaml']['sections_with_provenance'] += 1 self.stats['yaml']['by_section'][section_name]['with_provenance'] += 1 # Validate content_hash content_hash = provenance.get('content_hash') if content_hash: section_result['has_content_hash'] = True self.stats['yaml']['sections_with_content_hash'] += 1 self.stats['yaml']['by_section'][section_name]['with_content_hash'] += 1 is_valid, msg = self.validate_content_hash(content_hash) section_result['content_hash_valid'] = is_valid if is_valid: self.stats['yaml']['valid_content_hashes'] += 1 else: self.stats['yaml']['invalid_content_hashes'] += 1 section_result['issues'].append(f"content_hash: {msg}") # Validate prov section prov = provenance.get('prov') if prov: if prov.get('wasDerivedFrom'): section_result['has_derived_from'] = True self.stats['yaml']['sections_with_derived_from'] += 1 self.stats['yaml']['by_section'][section_name]['with_derived_from'] += 1 is_valid, issues = self.validate_prov_section(prov, section_name) section_result['prov_valid'] = is_valid section_result['issues'].extend(issues) result['sections'][section_name] = section_result return result def validate_json_file(self, filepath: Path) -> dict: """Validate a single JSON person entity file.""" result = { 'filepath': str(filepath), 'has_web_claims': False, 'claims_checked': 0, 'claims_with_provenance': 0, 'errors': [], 'warnings': [], } try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) except Exception as e: result['errors'].append(f"Failed to parse JSON: {e}") return result if not isinstance(data, dict): result['errors'].append("Root element is not a dict") return result # Check web_claims web_claims = data.get('web_claims', []) if not web_claims: return result result['has_web_claims'] = True for i, claim in enumerate(web_claims): if not isinstance(claim, dict): continue result['claims_checked'] += 1 # Check for provenance fields has_source_url = bool(claim.get('source_url')) has_retrieved_on = bool(claim.get('retrieved_on')) has_retrieval_agent = bool(claim.get('retrieval_agent')) if has_source_url and has_retrieved_on: result['claims_with_provenance'] += 1 else: missing = [] if not has_source_url: missing.append('source_url') if not has_retrieved_on: missing.append('retrieved_on') result['warnings'].append(f"Claim {i}: missing {', '.join(missing)}") return result def validate_yaml_directory(self, directory: Path, sample_size: Optional[int] = None) -> None: """Validate all YAML files in directory.""" yaml_files = list(directory.glob('*.yaml')) if sample_size and sample_size < len(yaml_files): import random yaml_files = random.sample(yaml_files, sample_size) print(f"\nValidating {len(yaml_files)} YAML files...") for i, filepath in enumerate(yaml_files): if (i + 1) % 1000 == 0: print(f" Progress: {i + 1}/{len(yaml_files)}") self.stats['yaml']['total_files'] += 1 result = self.validate_yaml_file(filepath) if result['has_enrichment']: self.stats['yaml']['files_with_enrichment'] += 1 if result['has_provenance']: self.stats['yaml']['files_with_provenance'] += 1 if result['errors']: self.stats['yaml']['errors'].extend( [(str(filepath), e) for e in result['errors']] ) if self.verbose and (result['errors'] or any( s.get('issues') for s in result['sections'].values() )): print(f"\n Issues in {filepath.name}:") for err in result['errors']: print(f" ERROR: {err}") for section, data in result['sections'].items(): for issue in data.get('issues', []): print(f" {section}: {issue}") def validate_json_directory(self, directory: Path, sample_size: Optional[int] = None) -> None: """Validate all JSON person entity files.""" json_files = list(directory.glob('*.json')) if sample_size and sample_size < len(json_files): import random json_files = random.sample(json_files, sample_size) print(f"\nValidating {len(json_files)} JSON files...") for i, filepath in enumerate(json_files): if (i + 1) % 1000 == 0: print(f" Progress: {i + 1}/{len(json_files)}") self.stats['json']['total_files'] += 1 result = self.validate_json_file(filepath) if result['has_web_claims']: self.stats['json']['files_with_web_claims'] += 1 self.stats['json']['claims_checked'] += result['claims_checked'] self.stats['json']['claims_with_provenance'] += result['claims_with_provenance'] if result['errors']: self.stats['json']['errors'].extend( [(str(filepath), e) for e in result['errors']] ) def generate_report(self) -> str: """Generate validation report.""" lines = [ "=" * 70, "PROVENANCE VALIDATION REPORT", f"Generated: {datetime.now().isoformat()}", "=" * 70, "", "## YAML Custodian Files", "-" * 40, f"Total files scanned: {self.stats['yaml']['total_files']:,}", f"Files with enrichment: {self.stats['yaml']['files_with_enrichment']:,}", f"Files with provenance: {self.stats['yaml']['files_with_provenance']:,}", "", f"Sections checked: {self.stats['yaml']['sections_checked']:,}", f"Sections with _provenance: {self.stats['yaml']['sections_with_provenance']:,}", f"Sections with content_hash: {self.stats['yaml']['sections_with_content_hash']:,}", f"Sections with wasDerivedFrom: {self.stats['yaml']['sections_with_derived_from']:,}", "", f"Valid content_hashes: {self.stats['yaml']['valid_content_hashes']:,}", f"Invalid content_hashes: {self.stats['yaml']['invalid_content_hashes']:,}", "", ] # Coverage calculation if self.stats['yaml']['sections_checked'] > 0: prov_coverage = (self.stats['yaml']['sections_with_provenance'] / self.stats['yaml']['sections_checked'] * 100) hash_coverage = (self.stats['yaml']['sections_with_content_hash'] / self.stats['yaml']['sections_checked'] * 100) derived_coverage = (self.stats['yaml']['sections_with_derived_from'] / self.stats['yaml']['sections_checked'] * 100) lines.extend([ "### Coverage Rates", f" _provenance coverage: {prov_coverage:.1f}%", f" content_hash coverage: {hash_coverage:.1f}%", f" wasDerivedFrom coverage: {derived_coverage:.1f}%", "", ]) # By section breakdown lines.extend([ "### By Enrichment Section", "-" * 40, ]) for section_name, section_stats in sorted(self.stats['yaml']['by_section'].items()): if section_stats['total'] > 0: prov_pct = section_stats['with_provenance'] / section_stats['total'] * 100 hash_pct = section_stats['with_content_hash'] / section_stats['total'] * 100 derived_pct = section_stats['with_derived_from'] / section_stats['total'] * 100 lines.extend([ f"\n{section_name}:", f" Total sections: {section_stats['total']:,}", f" With _provenance: {section_stats['with_provenance']:,} ({prov_pct:.1f}%)", f" With content_hash: {section_stats['with_content_hash']:,} ({hash_pct:.1f}%)", f" With wasDerivedFrom: {section_stats['with_derived_from']:,} ({derived_pct:.1f}%)", ]) # JSON section lines.extend([ "", "", "## JSON Person Entity Files", "-" * 40, f"Total files scanned: {self.stats['json']['total_files']:,}", f"Files with web_claims: {self.stats['json']['files_with_web_claims']:,}", f"Claims checked: {self.stats['json']['claims_checked']:,}", f"Claims with provenance: {self.stats['json']['claims_with_provenance']:,}", ]) if self.stats['json']['claims_checked'] > 0: claims_coverage = (self.stats['json']['claims_with_provenance'] / self.stats['json']['claims_checked'] * 100) lines.append(f"Claims provenance coverage: {claims_coverage:.1f}%") # Errors summary yaml_errors = len(self.stats['yaml']['errors']) json_errors = len(self.stats['json']['errors']) lines.extend([ "", "", "## Error Summary", "-" * 40, f"YAML parsing errors: {yaml_errors}", f"JSON parsing errors: {json_errors}", ]) if yaml_errors > 0 and yaml_errors <= 20: lines.append("\nYAML Errors:") for filepath, error in self.stats['yaml']['errors'][:20]: lines.append(f" {Path(filepath).name}: {error}") if json_errors > 0 and json_errors <= 20: lines.append("\nJSON Errors:") for filepath, error in self.stats['json']['errors'][:20]: lines.append(f" {Path(filepath).name}: {error}") # Final status lines.extend([ "", "", "=" * 70, "VALIDATION STATUS", "=" * 70, ]) # Determine overall status issues = [] if self.stats['yaml']['invalid_content_hashes'] > 0: issues.append(f"{self.stats['yaml']['invalid_content_hashes']} invalid content hashes") if yaml_errors > 0: issues.append(f"{yaml_errors} YAML parsing errors") if json_errors > 0: issues.append(f"{json_errors} JSON parsing errors") # Check coverage thresholds if self.stats['yaml']['sections_checked'] > 0: prov_coverage = (self.stats['yaml']['sections_with_provenance'] / self.stats['yaml']['sections_checked'] * 100) if prov_coverage < 95: issues.append(f"_provenance coverage below 95% ({prov_coverage:.1f}%)") if issues: lines.append(f"STATUS: ISSUES FOUND") for issue in issues: lines.append(f" - {issue}") else: lines.append("STATUS: PASSED") lines.append(" All provenance metadata validated successfully!") lines.append("") return "\n".join(lines) def main(): parser = argparse.ArgumentParser( description='Validate provenance metadata across GLAM dataset' ) parser.add_argument( '--sample', '-s', type=int, default=None, help='Sample size for validation (default: all files)' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Show detailed validation issues' ) parser.add_argument( '--yaml-only', action='store_true', help='Only validate YAML files' ) parser.add_argument( '--json-only', action='store_true', help='Only validate JSON files' ) parser.add_argument( '--output', '-o', type=str, default=None, help='Output report to file' ) args = parser.parse_args() # Determine base path script_dir = Path(__file__).parent base_dir = script_dir.parent custodian_dir = base_dir / 'data' / 'custodian' person_dir = custodian_dir / 'person' / 'entity' validator = ProvenanceValidator(verbose=args.verbose) # Validate YAML files if not args.json_only: if custodian_dir.exists(): validator.validate_yaml_directory(custodian_dir, args.sample) else: print(f"WARNING: Custodian directory not found: {custodian_dir}") # Validate JSON files if not args.yaml_only: if person_dir.exists(): validator.validate_json_directory(person_dir, args.sample) else: print(f"WARNING: Person entity directory not found: {person_dir}") # Generate report report = validator.generate_report() print(report) # Save report if requested if args.output: output_path = Path(args.output) output_path.write_text(report) print(f"\nReport saved to: {output_path}") # Return exit code based on status has_errors = ( validator.stats['yaml']['invalid_content_hashes'] > 0 or len(validator.stats['yaml']['errors']) > 0 or len(validator.stats['json']['errors']) > 0 ) sys.exit(1 if has_errors else 0) if __name__ == '__main__': main()