#!/usr/bin/env python3 """ Custom LinkML instance validator for heritage custodian data. Uses YAML parsing and schema field checking instead of linkml-validate. """ import yaml from pathlib import Path from collections import defaultdict from datetime import datetime from typing import List, Dict, Any, Set # Schema field definitions from schemas/core.yaml, schemas/enums.yaml, schemas/provenance.yaml REQUIRED_FIELDS = { 'HeritageCustodian': ['id', 'name', 'institution_type', 'provenance'], 'Provenance': ['data_source', 'data_tier', 'extraction_date', 'extraction_method', 'confidence_score'], 'Location': ['country'], 'Identifier': ['identifier_scheme', 'identifier_value'], 'DigitalPlatform': ['platform_name'], 'Collection': ['collection_name'], 'ChangeEvent': ['change_type', 'event_date', 'event_description'], } OPTIONAL_FIELDS = { 'HeritageCustodian': ['alternative_names', 'description', 'locations', 'identifiers', 'digital_platforms', 'collections', 'change_history', 'related_organizations'], 'Provenance': ['conversation_id', 'source_url', 'verified_date', 'verified_by', 'notes'], 'Location': ['city', 'street_address', 'postal_code', 'region', 'latitude', 'longitude', 'geonames_id', 'osm_id'], 'Identifier': ['identifier_url'], 'DigitalPlatform': ['platform_url', 'platform_type', 'metadata_standards'], 'Collection': ['collection_type', 'subject_areas', 'temporal_coverage', 'extent', 'access_rights'], 'ChangeEvent': ['event_id', 'affected_organization', 'resulting_organization', 'related_organizations', 'source_documentation'], } VALID_ENUMS = { 'institution_type': [ 'GALLERY', 'LIBRARY', 'ARCHIVE', 'MUSEUM', 'OFFICIAL_INSTITUTION', 'RESEARCH_CENTER', 'CORPORATION', 'UNKNOWN', 'BOTANICAL_ZOO', 'EDUCATION_PROVIDER', 'PERSONAL_COLLECTION', 'COLLECTING_SOCIETY', 'MIXED' ], 'data_source': [ 'CSV_REGISTRY', 'CONVERSATION_NLP', 'INSTITUTIONAL_WEBSITE', 'WIKIDATA', 'OPENSTREETMAP', 'API', 'MANUAL_ENTRY' ], 'data_tier': [ 'TIER_1_AUTHORITATIVE', 'TIER_2_VERIFIED', 'TIER_3_CROWD_SOURCED', 'TIER_4_INFERRED' ], 'change_type': [ 'FOUNDING', 'CLOSURE', 'MERGER', 'SPLIT', 'ACQUISITION', 'RELOCATION', 'NAME_CHANGE', 'TYPE_CHANGE', 'STATUS_CHANGE', 'RESTRUCTURING', 'LEGAL_CHANGE' ], 'platform_type': [ 'COLLECTION_MANAGEMENT', 'DISCOVERY_PORTAL', 'DIGITAL_REPOSITORY', 'SPARQL_ENDPOINT', 'API', 'LEARNING_MANAGEMENT', 'OTHER' ], } class ValidationReport: def __init__(self, filename: str): self.filename = filename self.errors: List[Dict[str, Any]] = [] self.warnings: List[Dict[str, Any]] = [] self.record_count = 0 self.valid_count = 0 self.field_stats = defaultdict(int) def add_error(self, record_id: str, field: str, message: str): self.errors.append({'record_id': record_id, 'field': field, 'message': message}) def add_warning(self, record_id: str, field: str, message: str): self.warnings.append({'record_id': record_id, 'field': field, 'message': message}) def track_field(self, field: str): self.field_stats[field] += 1 def print_report(self): print(f"\n{'='*80}") print(f"VALIDATION REPORT: {Path(self.filename).name}") print(f"{'='*80}\n") print(f"šŸ“Š **Summary**") print(f" Total records: {self.record_count}") print(f" Valid records: {self.valid_count}") print(f" Records with errors: {len(set(e['record_id'] for e in self.errors))}") print(f" Total errors: {len(self.errors)}") print(f" Total warnings: {len(self.warnings)}") if self.errors: print(f"\nāŒ **ERRORS** ({len(self.errors)})") for i, error in enumerate(self.errors[:20], 1): # Show first 20 print(f" {i}. [{error['record_id']}] {error['field']}: {error['message']}") if len(self.errors) > 20: print(f" ... and {len(self.errors) - 20} more errors") else: print(f"\nāœ… **No errors found!**") if self.warnings: print(f"\nāš ļø **WARNINGS** ({len(self.warnings)})") for i, warning in enumerate(self.warnings[:10], 1): # Show first 10 print(f" {i}. [{warning['record_id']}] {warning['field']}: {warning['message']}") if len(self.warnings) > 10: print(f" ... and {len(self.warnings) - 10} more warnings") print(f"\nšŸ“ˆ **Field Coverage**") total = self.record_count for field, count in sorted(self.field_stats.items(), key=lambda x: x[1], reverse=True)[:15]: pct = (count / total * 100) if total > 0 else 0 print(f" {field:30s}: {count:4d} ({pct:5.1f}%)") print(f"\n{'='*80}\n") return len(self.errors) == 0 def validate_record(record: Dict[str, Any], report: ValidationReport) -> bool: """Validate a single HeritageCustodian record.""" record_id = record.get('id', 'UNKNOWN') is_valid = True # Check required fields for field in REQUIRED_FIELDS['HeritageCustodian']: if field not in record or record[field] is None: report.add_error(record_id, field, f"Required field '{field}' is missing") is_valid = False else: report.track_field(field) # Validate institution_type enum if 'institution_type' in record: if record['institution_type'] not in VALID_ENUMS['institution_type']: report.add_error(record_id, 'institution_type', f"Invalid value '{record['institution_type']}'. Must be one of: {', '.join(VALID_ENUMS['institution_type'])}") is_valid = False # Track optional fields for field in OPTIONAL_FIELDS['HeritageCustodian']: if field in record and record[field]: report.track_field(field) # Validate provenance (required) if 'provenance' in record: prov = record['provenance'] for field in REQUIRED_FIELDS['Provenance']: if field not in prov or prov[field] is None: report.add_error(record_id, f'provenance.{field}', f"Required provenance field '{field}' is missing") is_valid = False # Validate enums in provenance if 'data_source' in prov and prov['data_source'] not in VALID_ENUMS['data_source']: report.add_error(record_id, 'provenance.data_source', f"Invalid value '{prov['data_source']}'. Must be one of: {', '.join(VALID_ENUMS['data_source'])}") is_valid = False if 'data_tier' in prov and prov['data_tier'] not in VALID_ENUMS['data_tier']: report.add_error(record_id, 'provenance.data_tier', f"Invalid value '{prov['data_tier']}'. Must be one of: {', '.join(VALID_ENUMS['data_tier'])}") is_valid = False # Validate confidence_score range if 'confidence_score' in prov: score = prov['confidence_score'] if not (0.0 <= score <= 1.0): report.add_error(record_id, 'provenance.confidence_score', f"Confidence score {score} must be between 0.0 and 1.0") is_valid = False # Validate locations if 'locations' in record and record['locations']: for i, loc in enumerate(record['locations']): if 'country' not in loc: report.add_error(record_id, f'locations[{i}].country', "Country is required in Location") is_valid = False # Warn if no city if 'city' not in loc or not loc['city']: report.add_warning(record_id, f'locations[{i}].city', "City name not provided") else: report.add_warning(record_id, 'locations', "No location information provided") # Validate identifiers if 'identifiers' in record and record['identifiers']: for i, ident in enumerate(record['identifiers']): for field in REQUIRED_FIELDS['Identifier']: if field not in ident: report.add_error(record_id, f'identifiers[{i}].{field}', f"Required identifier field '{field}' is missing") is_valid = False # Validate change_history events if 'change_history' in record and record['change_history']: for i, event in enumerate(record['change_history']): for field in REQUIRED_FIELDS['ChangeEvent']: if field not in event: report.add_error(record_id, f'change_history[{i}].{field}', f"Required change event field '{field}' is missing") is_valid = False if 'change_type' in event and event['change_type'] not in VALID_ENUMS['change_type']: report.add_error(record_id, f'change_history[{i}].change_type', f"Invalid change_type '{event['change_type']}'. Must be one of: {', '.join(VALID_ENUMS['change_type'])}") is_valid = False # Warn if no description if 'description' not in record or not record['description']: report.add_warning(record_id, 'description', "No description provided") return is_valid def validate_file(filepath: Path) -> ValidationReport: """Validate a YAML file containing heritage custodian instances.""" report = ValidationReport(str(filepath)) try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not isinstance(data, list): report.add_error('FILE', 'structure', "File must contain a list of records at top level") return report report.record_count = len(data) for record in data: if validate_record(record, report): report.valid_count += 1 except Exception as e: report.add_error('FILE', 'parsing', f"Error loading YAML file: {e}") return report def main(): """Validate all curated instance files.""" files = [ 'data/instances/chilean_institutions_curated.yaml', 'data/instances/mexican_institutions_curated.yaml', 'data/instances/brazilian_institutions_geocoded_v3.yaml', ] all_valid = True for filepath in files: path = Path(filepath) if not path.exists(): print(f"āš ļø File not found: {filepath}") continue report = validate_file(path) is_valid = report.print_report() all_valid = all_valid and is_valid # Overall summary print(f"\n{'='*80}") print(f"OVERALL VALIDATION RESULT") print(f"{'='*80}") if all_valid: print("āœ… All files validated successfully!") else: print("āŒ Some files have validation errors - see reports above") print(f"{'='*80}\n") return 0 if all_valid else 1 if __name__ == '__main__': exit(main())