270 lines
11 KiB
Python
270 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Custom LinkML instance validator for heritage custodian data.
|
|
Uses YAML parsing and schema field checking instead of linkml-validate.
|
|
"""
|
|
|
|
import yaml
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Set
|
|
|
|
# Schema field definitions from schemas/core.yaml, schemas/enums.yaml, schemas/provenance.yaml
|
|
REQUIRED_FIELDS = {
|
|
'HeritageCustodian': ['id', 'name', 'institution_type', 'provenance'],
|
|
'Provenance': ['data_source', 'data_tier', 'extraction_date', 'extraction_method', 'confidence_score'],
|
|
'Location': ['country'],
|
|
'Identifier': ['identifier_scheme', 'identifier_value'],
|
|
'DigitalPlatform': ['platform_name'],
|
|
'Collection': ['collection_name'],
|
|
'ChangeEvent': ['change_type', 'event_date', 'event_description'],
|
|
}
|
|
|
|
OPTIONAL_FIELDS = {
|
|
'HeritageCustodian': ['alternative_names', 'description', 'locations', 'identifiers',
|
|
'digital_platforms', 'collections', 'change_history', 'related_organizations'],
|
|
'Provenance': ['conversation_id', 'source_url', 'verified_date', 'verified_by', 'notes'],
|
|
'Location': ['city', 'street_address', 'postal_code', 'region', 'latitude', 'longitude',
|
|
'geonames_id', 'osm_id'],
|
|
'Identifier': ['identifier_url'],
|
|
'DigitalPlatform': ['platform_url', 'platform_type', 'metadata_standards'],
|
|
'Collection': ['collection_type', 'subject_areas', 'temporal_coverage', 'extent', 'access_rights'],
|
|
'ChangeEvent': ['event_id', 'affected_organization', 'resulting_organization',
|
|
'related_organizations', 'source_documentation'],
|
|
}
|
|
|
|
VALID_ENUMS = {
|
|
'institution_type': [
|
|
'GALLERY', 'LIBRARY', 'ARCHIVE', 'MUSEUM', 'OFFICIAL_INSTITUTION',
|
|
'RESEARCH_CENTER', 'CORPORATION', 'UNKNOWN', 'BOTANICAL_ZOO',
|
|
'EDUCATION_PROVIDER', 'PERSONAL_COLLECTION', 'COLLECTING_SOCIETY', 'MIXED'
|
|
],
|
|
'data_source': [
|
|
'CSV_REGISTRY', 'CONVERSATION_NLP', 'INSTITUTIONAL_WEBSITE',
|
|
'WIKIDATA', 'OPENSTREETMAP', 'API', 'MANUAL_ENTRY'
|
|
],
|
|
'data_tier': [
|
|
'TIER_1_AUTHORITATIVE', 'TIER_2_VERIFIED', 'TIER_3_CROWD_SOURCED', 'TIER_4_INFERRED'
|
|
],
|
|
'change_type': [
|
|
'FOUNDING', 'CLOSURE', 'MERGER', 'SPLIT', 'ACQUISITION', 'RELOCATION',
|
|
'NAME_CHANGE', 'TYPE_CHANGE', 'STATUS_CHANGE', 'RESTRUCTURING', 'LEGAL_CHANGE'
|
|
],
|
|
'platform_type': [
|
|
'COLLECTION_MANAGEMENT', 'DISCOVERY_PORTAL', 'DIGITAL_REPOSITORY',
|
|
'SPARQL_ENDPOINT', 'API', 'LEARNING_MANAGEMENT', 'OTHER'
|
|
],
|
|
}
|
|
|
|
|
|
class ValidationReport:
|
|
def __init__(self, filename: str):
|
|
self.filename = filename
|
|
self.errors: List[Dict[str, Any]] = []
|
|
self.warnings: List[Dict[str, Any]] = []
|
|
self.record_count = 0
|
|
self.valid_count = 0
|
|
self.field_stats = defaultdict(int)
|
|
|
|
def add_error(self, record_id: str, field: str, message: str):
|
|
self.errors.append({'record_id': record_id, 'field': field, 'message': message})
|
|
|
|
def add_warning(self, record_id: str, field: str, message: str):
|
|
self.warnings.append({'record_id': record_id, 'field': field, 'message': message})
|
|
|
|
def track_field(self, field: str):
|
|
self.field_stats[field] += 1
|
|
|
|
def print_report(self):
|
|
print(f"\n{'='*80}")
|
|
print(f"VALIDATION REPORT: {Path(self.filename).name}")
|
|
print(f"{'='*80}\n")
|
|
|
|
print(f"📊 **Summary**")
|
|
print(f" Total records: {self.record_count}")
|
|
print(f" Valid records: {self.valid_count}")
|
|
print(f" Records with errors: {len(set(e['record_id'] for e in self.errors))}")
|
|
print(f" Total errors: {len(self.errors)}")
|
|
print(f" Total warnings: {len(self.warnings)}")
|
|
|
|
if self.errors:
|
|
print(f"\n❌ **ERRORS** ({len(self.errors)})")
|
|
for i, error in enumerate(self.errors[:20], 1): # Show first 20
|
|
print(f" {i}. [{error['record_id']}] {error['field']}: {error['message']}")
|
|
if len(self.errors) > 20:
|
|
print(f" ... and {len(self.errors) - 20} more errors")
|
|
else:
|
|
print(f"\n✅ **No errors found!**")
|
|
|
|
if self.warnings:
|
|
print(f"\n⚠️ **WARNINGS** ({len(self.warnings)})")
|
|
for i, warning in enumerate(self.warnings[:10], 1): # Show first 10
|
|
print(f" {i}. [{warning['record_id']}] {warning['field']}: {warning['message']}")
|
|
if len(self.warnings) > 10:
|
|
print(f" ... and {len(self.warnings) - 10} more warnings")
|
|
|
|
print(f"\n📈 **Field Coverage**")
|
|
total = self.record_count
|
|
for field, count in sorted(self.field_stats.items(), key=lambda x: x[1], reverse=True)[:15]:
|
|
pct = (count / total * 100) if total > 0 else 0
|
|
print(f" {field:30s}: {count:4d} ({pct:5.1f}%)")
|
|
|
|
print(f"\n{'='*80}\n")
|
|
|
|
return len(self.errors) == 0
|
|
|
|
|
|
def validate_record(record: Dict[str, Any], report: ValidationReport) -> bool:
|
|
"""Validate a single HeritageCustodian record."""
|
|
record_id = record.get('id', 'UNKNOWN')
|
|
is_valid = True
|
|
|
|
# Check required fields
|
|
for field in REQUIRED_FIELDS['HeritageCustodian']:
|
|
if field not in record or record[field] is None:
|
|
report.add_error(record_id, field, f"Required field '{field}' is missing")
|
|
is_valid = False
|
|
else:
|
|
report.track_field(field)
|
|
|
|
# Validate institution_type enum
|
|
if 'institution_type' in record:
|
|
if record['institution_type'] not in VALID_ENUMS['institution_type']:
|
|
report.add_error(record_id, 'institution_type',
|
|
f"Invalid value '{record['institution_type']}'. Must be one of: {', '.join(VALID_ENUMS['institution_type'])}")
|
|
is_valid = False
|
|
|
|
# Track optional fields
|
|
for field in OPTIONAL_FIELDS['HeritageCustodian']:
|
|
if field in record and record[field]:
|
|
report.track_field(field)
|
|
|
|
# Validate provenance (required)
|
|
if 'provenance' in record:
|
|
prov = record['provenance']
|
|
for field in REQUIRED_FIELDS['Provenance']:
|
|
if field not in prov or prov[field] is None:
|
|
report.add_error(record_id, f'provenance.{field}', f"Required provenance field '{field}' is missing")
|
|
is_valid = False
|
|
|
|
# Validate enums in provenance
|
|
if 'data_source' in prov and prov['data_source'] not in VALID_ENUMS['data_source']:
|
|
report.add_error(record_id, 'provenance.data_source',
|
|
f"Invalid value '{prov['data_source']}'. Must be one of: {', '.join(VALID_ENUMS['data_source'])}")
|
|
is_valid = False
|
|
|
|
if 'data_tier' in prov and prov['data_tier'] not in VALID_ENUMS['data_tier']:
|
|
report.add_error(record_id, 'provenance.data_tier',
|
|
f"Invalid value '{prov['data_tier']}'. Must be one of: {', '.join(VALID_ENUMS['data_tier'])}")
|
|
is_valid = False
|
|
|
|
# Validate confidence_score range
|
|
if 'confidence_score' in prov:
|
|
score = prov['confidence_score']
|
|
if not (0.0 <= score <= 1.0):
|
|
report.add_error(record_id, 'provenance.confidence_score',
|
|
f"Confidence score {score} must be between 0.0 and 1.0")
|
|
is_valid = False
|
|
|
|
# Validate locations
|
|
if 'locations' in record and record['locations']:
|
|
for i, loc in enumerate(record['locations']):
|
|
if 'country' not in loc:
|
|
report.add_error(record_id, f'locations[{i}].country', "Country is required in Location")
|
|
is_valid = False
|
|
# Warn if no city
|
|
if 'city' not in loc or not loc['city']:
|
|
report.add_warning(record_id, f'locations[{i}].city', "City name not provided")
|
|
else:
|
|
report.add_warning(record_id, 'locations', "No location information provided")
|
|
|
|
# Validate identifiers
|
|
if 'identifiers' in record and record['identifiers']:
|
|
for i, ident in enumerate(record['identifiers']):
|
|
for field in REQUIRED_FIELDS['Identifier']:
|
|
if field not in ident:
|
|
report.add_error(record_id, f'identifiers[{i}].{field}',
|
|
f"Required identifier field '{field}' is missing")
|
|
is_valid = False
|
|
|
|
# Validate change_history events
|
|
if 'change_history' in record and record['change_history']:
|
|
for i, event in enumerate(record['change_history']):
|
|
for field in REQUIRED_FIELDS['ChangeEvent']:
|
|
if field not in event:
|
|
report.add_error(record_id, f'change_history[{i}].{field}',
|
|
f"Required change event field '{field}' is missing")
|
|
is_valid = False
|
|
|
|
if 'change_type' in event and event['change_type'] not in VALID_ENUMS['change_type']:
|
|
report.add_error(record_id, f'change_history[{i}].change_type',
|
|
f"Invalid change_type '{event['change_type']}'. Must be one of: {', '.join(VALID_ENUMS['change_type'])}")
|
|
is_valid = False
|
|
|
|
# Warn if no description
|
|
if 'description' not in record or not record['description']:
|
|
report.add_warning(record_id, 'description', "No description provided")
|
|
|
|
return is_valid
|
|
|
|
|
|
def validate_file(filepath: Path) -> ValidationReport:
|
|
"""Validate a YAML file containing heritage custodian instances."""
|
|
report = ValidationReport(str(filepath))
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not isinstance(data, list):
|
|
report.add_error('FILE', 'structure', "File must contain a list of records at top level")
|
|
return report
|
|
|
|
report.record_count = len(data)
|
|
|
|
for record in data:
|
|
if validate_record(record, report):
|
|
report.valid_count += 1
|
|
|
|
except Exception as e:
|
|
report.add_error('FILE', 'parsing', f"Error loading YAML file: {e}")
|
|
|
|
return report
|
|
|
|
|
|
def main():
|
|
"""Validate all curated instance files."""
|
|
files = [
|
|
'data/instances/chilean_institutions_curated.yaml',
|
|
'data/instances/mexican_institutions_curated.yaml',
|
|
'data/instances/brazilian_institutions_geocoded_v3.yaml',
|
|
]
|
|
|
|
all_valid = True
|
|
|
|
for filepath in files:
|
|
path = Path(filepath)
|
|
if not path.exists():
|
|
print(f"⚠️ File not found: {filepath}")
|
|
continue
|
|
|
|
report = validate_file(path)
|
|
is_valid = report.print_report()
|
|
all_valid = all_valid and is_valid
|
|
|
|
# Overall summary
|
|
print(f"\n{'='*80}")
|
|
print(f"OVERALL VALIDATION RESULT")
|
|
print(f"{'='*80}")
|
|
if all_valid:
|
|
print("✅ All files validated successfully!")
|
|
else:
|
|
print("❌ Some files have validation errors - see reports above")
|
|
print(f"{'='*80}\n")
|
|
|
|
return 0 if all_valid else 1
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|