glam/scripts/validate_all_instances.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

154 lines
4.4 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Validate all YAML instance files and report errors by type.
This script systematically validates all heritage institution YAML files
and groups errors by type for systematic fixing.
"""
import subprocess
import sys
from pathlib import Path
from collections import defaultdict
import re
def get_active_yaml_files():
"""Get all active YAML files (excluding archives, backups, tests)."""
instance_dir = Path.cwd() / "data" / "instances"
all_files = list(instance_dir.rglob("*.yaml"))
# Filter out archives, backups, test files
active_files = [
f for f in all_files
if "archive" not in str(f)
and "backups" not in str(f)
and "test_outputs" not in str(f)
]
return sorted(active_files)
def validate_file(filepath):
"""Validate a single YAML file using linkml-validate."""
cmd = [
"linkml-validate",
"-s", "schemas/heritage_custodian.yaml",
"--target-class", "HeritageCustodian",
str(filepath)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=Path.cwd()
)
if result.returncode == 0:
return None # No errors
# Parse errors from stderr
errors = []
for line in result.stderr.split('\n'):
if line.strip().startswith('[ERROR]'):
errors.append(line.strip())
return errors if errors else None
def categorize_errors(errors):
"""Categorize errors by type."""
categories = defaultdict(list)
for error in errors:
# Extract error type
if "does not match" in error:
if "temporal_coverage" in error:
categories["temporal_coverage_pattern"].append(error)
else:
categories["pattern_mismatch"].append(error)
elif "is not a valid" in error:
categories["invalid_value"].append(error)
elif "Additional properties are not allowed" in error:
categories["invalid_field"].append(error)
elif "required" in error.lower():
categories["missing_required"].append(error)
else:
categories["other"].append(error)
return categories
def main():
print("🔍 GLAM Data Validation Report")
print("=" * 80)
print()
# Get all files
files = get_active_yaml_files()
print(f"📄 Found {len(files)} active YAML files to validate")
print()
# Validate each file
files_with_errors = {}
files_ok = []
for i, filepath in enumerate(files, 1):
relative_path = filepath.relative_to(Path.cwd())
print(f"[{i}/{len(files)}] Validating {relative_path}...", end=" ")
errors = validate_file(filepath)
if errors:
print(f"{len(errors)} errors")
files_with_errors[str(relative_path)] = errors
else:
print("")
files_ok.append(str(relative_path))
print()
print("=" * 80)
print(f"✅ Valid files: {len(files_ok)}")
print(f"❌ Files with errors: {len(files_with_errors)}")
print()
if not files_with_errors:
print("🎉 All files pass validation!")
return 0
# Categorize all errors
print("📊 ERROR SUMMARY BY TYPE")
print("=" * 80)
all_errors_by_category = defaultdict(list)
for filepath, errors in files_with_errors.items():
categories = categorize_errors(errors)
for category, error_list in categories.items():
for error in error_list:
all_errors_by_category[category].append((filepath, error))
# Report by category
for category, error_list in sorted(all_errors_by_category.items()):
print(f"\n{category.upper().replace('_', ' ')}: {len(error_list)} errors")
print("-" * 80)
# Show first 5 examples
for filepath, error in error_list[:5]:
print(f" {filepath}")
print(f" {error}")
if len(error_list) > 5:
print(f" ... and {len(error_list) - 5} more")
# List all files with errors
print()
print("=" * 80)
print(f"FILES NEEDING FIXES ({len(files_with_errors)}):")
print("-" * 80)
for filepath in sorted(files_with_errors.keys()):
error_count = len(files_with_errors[filepath])
print(f" {filepath} ({error_count} errors)")
return 1
if __name__ == "__main__":
sys.exit(main())