- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive). - Added tests for extracted entities and result handling to validate the extraction process. - Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format. - Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns. - Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
154 lines
4.4 KiB
Python
Executable file
154 lines
4.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Validate all YAML instance files and report errors by type.
|
|
|
|
This script systematically validates all heritage institution YAML files
|
|
and groups errors by type for systematic fixing.
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
import re
|
|
|
|
def get_active_yaml_files():
|
|
"""Get all active YAML files (excluding archives, backups, tests)."""
|
|
instance_dir = Path.cwd() / "data" / "instances"
|
|
all_files = list(instance_dir.rglob("*.yaml"))
|
|
|
|
# Filter out archives, backups, test files
|
|
active_files = [
|
|
f for f in all_files
|
|
if "archive" not in str(f)
|
|
and "backups" not in str(f)
|
|
and "test_outputs" not in str(f)
|
|
]
|
|
|
|
return sorted(active_files)
|
|
|
|
def validate_file(filepath):
|
|
"""Validate a single YAML file using linkml-validate."""
|
|
cmd = [
|
|
"linkml-validate",
|
|
"-s", "schemas/heritage_custodian.yaml",
|
|
"--target-class", "HeritageCustodian",
|
|
str(filepath)
|
|
]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=Path.cwd()
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return None # No errors
|
|
|
|
# Parse errors from stderr
|
|
errors = []
|
|
for line in result.stderr.split('\n'):
|
|
if line.strip().startswith('[ERROR]'):
|
|
errors.append(line.strip())
|
|
|
|
return errors if errors else None
|
|
|
|
def categorize_errors(errors):
|
|
"""Categorize errors by type."""
|
|
categories = defaultdict(list)
|
|
|
|
for error in errors:
|
|
# Extract error type
|
|
if "does not match" in error:
|
|
if "temporal_coverage" in error:
|
|
categories["temporal_coverage_pattern"].append(error)
|
|
else:
|
|
categories["pattern_mismatch"].append(error)
|
|
elif "is not a valid" in error:
|
|
categories["invalid_value"].append(error)
|
|
elif "Additional properties are not allowed" in error:
|
|
categories["invalid_field"].append(error)
|
|
elif "required" in error.lower():
|
|
categories["missing_required"].append(error)
|
|
else:
|
|
categories["other"].append(error)
|
|
|
|
return categories
|
|
|
|
def main():
|
|
print("🔍 GLAM Data Validation Report")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
# Get all files
|
|
files = get_active_yaml_files()
|
|
print(f"📄 Found {len(files)} active YAML files to validate")
|
|
print()
|
|
|
|
# Validate each file
|
|
files_with_errors = {}
|
|
files_ok = []
|
|
|
|
for i, filepath in enumerate(files, 1):
|
|
relative_path = filepath.relative_to(Path.cwd())
|
|
print(f"[{i}/{len(files)}] Validating {relative_path}...", end=" ")
|
|
|
|
errors = validate_file(filepath)
|
|
|
|
if errors:
|
|
print(f"❌ {len(errors)} errors")
|
|
files_with_errors[str(relative_path)] = errors
|
|
else:
|
|
print("✅")
|
|
files_ok.append(str(relative_path))
|
|
|
|
print()
|
|
print("=" * 80)
|
|
print(f"✅ Valid files: {len(files_ok)}")
|
|
print(f"❌ Files with errors: {len(files_with_errors)}")
|
|
print()
|
|
|
|
if not files_with_errors:
|
|
print("🎉 All files pass validation!")
|
|
return 0
|
|
|
|
# Categorize all errors
|
|
print("📊 ERROR SUMMARY BY TYPE")
|
|
print("=" * 80)
|
|
|
|
all_errors_by_category = defaultdict(list)
|
|
|
|
for filepath, errors in files_with_errors.items():
|
|
categories = categorize_errors(errors)
|
|
for category, error_list in categories.items():
|
|
for error in error_list:
|
|
all_errors_by_category[category].append((filepath, error))
|
|
|
|
# Report by category
|
|
for category, error_list in sorted(all_errors_by_category.items()):
|
|
print(f"\n{category.upper().replace('_', ' ')}: {len(error_list)} errors")
|
|
print("-" * 80)
|
|
|
|
# Show first 5 examples
|
|
for filepath, error in error_list[:5]:
|
|
print(f" {filepath}")
|
|
print(f" {error}")
|
|
|
|
if len(error_list) > 5:
|
|
print(f" ... and {len(error_list) - 5} more")
|
|
|
|
# List all files with errors
|
|
print()
|
|
print("=" * 80)
|
|
print(f"FILES NEEDING FIXES ({len(files_with_errors)}):")
|
|
print("-" * 80)
|
|
|
|
for filepath in sorted(files_with_errors.keys()):
|
|
error_count = len(files_with_errors[filepath])
|
|
print(f" {filepath} ({error_count} errors)")
|
|
|
|
return 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|