198 lines
6.2 KiB
Python
Executable file
198 lines
6.2 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Validate custodian YAML files against LinkML schema.
|
|
|
|
This script generates a JSON Schema from the LinkML schema and applies
|
|
a fix for the exactly_one_of + default_range conflict that causes
|
|
integer values to be rejected when they should be allowed.
|
|
|
|
Usage:
|
|
python scripts/validate_custodian_schema.py [file_or_dir] [--sample N]
|
|
|
|
Examples:
|
|
# Validate a single file
|
|
python scripts/validate_custodian_schema.py data/custodian/NL-NH-AMS-M-RMA.yaml
|
|
|
|
# Validate all files in directory
|
|
python scripts/validate_custodian_schema.py data/custodian/
|
|
|
|
# Validate random sample of N files
|
|
python scripts/validate_custodian_schema.py data/custodian/ --sample 50
|
|
"""
|
|
|
|
import json
|
|
import yaml
|
|
import sys
|
|
import os
|
|
import random
|
|
import warnings
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
from dataclasses import dataclass
|
|
|
|
# Suppress deprecation warnings from jsonschema
|
|
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
|
|
|
from linkml.generators.jsonschemagen import JsonSchemaGenerator
|
|
from jsonschema import Draft7Validator
|
|
|
|
# Schema file path
|
|
SCHEMA_FILE = Path(__file__).parent.parent / "schemas/20251121/linkml/custodian_source.yaml"
|
|
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""Result of validating a single file."""
|
|
file_path: str
|
|
valid: bool
|
|
errors: List[str]
|
|
|
|
|
|
def fix_oneof_types(obj: Any) -> int:
|
|
"""
|
|
Fix JSON Schema by removing conflicting 'type' when 'oneOf' or 'anyOf' is present.
|
|
|
|
LinkML generates both 'oneOf'/'anyOf' and 'type' when using exactly_one_of or
|
|
any_of with a default_range, which causes validation to fail for mixed types.
|
|
|
|
Returns the number of fixes applied.
|
|
"""
|
|
count = 0
|
|
if isinstance(obj, dict):
|
|
# Fix both oneOf and anyOf conflicts with type
|
|
if ('oneOf' in obj or 'anyOf' in obj) and 'type' in obj:
|
|
del obj['type']
|
|
count += 1
|
|
for v in obj.values():
|
|
count += fix_oneof_types(v)
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
count += fix_oneof_types(item)
|
|
return count
|
|
|
|
|
|
def generate_json_schema(schema_file: Path) -> Dict[str, Any]:
|
|
"""Generate JSON Schema from LinkML and apply fixes."""
|
|
gen = JsonSchemaGenerator(str(schema_file))
|
|
json_schema = json.loads(gen.serialize())
|
|
|
|
# Apply fixes for exactly_one_of + default_range conflict
|
|
fixes = fix_oneof_types(json_schema)
|
|
if fixes > 0:
|
|
print(f"Applied {fixes} JSON Schema fixes for mixed-type fields")
|
|
|
|
return json_schema
|
|
|
|
|
|
def validate_file(file_path: Path, validator: Draft7Validator) -> ValidationResult:
|
|
"""Validate a single YAML file."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
errors = []
|
|
for error in validator.iter_errors(data):
|
|
path = '/'.join(str(p) for p in error.path)
|
|
errors.append(f"{error.message} at /{path}")
|
|
|
|
return ValidationResult(
|
|
file_path=str(file_path),
|
|
valid=len(errors) == 0,
|
|
errors=errors
|
|
)
|
|
except yaml.YAMLError as e:
|
|
return ValidationResult(
|
|
file_path=str(file_path),
|
|
valid=False,
|
|
errors=[f"YAML parse error: {e}"]
|
|
)
|
|
except Exception as e:
|
|
return ValidationResult(
|
|
file_path=str(file_path),
|
|
valid=False,
|
|
errors=[f"Error reading file: {e}"]
|
|
)
|
|
|
|
|
|
def validate_files(files: List[Path], schema_file: Path = SCHEMA_FILE) -> List[ValidationResult]:
|
|
"""Validate multiple YAML files against the schema."""
|
|
print(f"Generating JSON Schema from {schema_file}...")
|
|
json_schema = generate_json_schema(schema_file)
|
|
|
|
validator = Draft7Validator(json_schema)
|
|
|
|
results = []
|
|
for i, file_path in enumerate(files, 1):
|
|
result = validate_file(file_path, validator)
|
|
results.append(result)
|
|
|
|
status = "✓" if result.valid else "✗"
|
|
if not result.valid or i % 100 == 0:
|
|
print(f"[{i}/{len(files)}] {status} {file_path.name}")
|
|
if not result.valid:
|
|
for err in result.errors[:3]:
|
|
print(f" - {err[:100]}")
|
|
if len(result.errors) > 3:
|
|
print(f" ... and {len(result.errors) - 3} more errors")
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Validate custodian YAML files against LinkML schema')
|
|
parser.add_argument('path', nargs='?', default='data/custodian/',
|
|
help='File or directory to validate (default: data/custodian/)')
|
|
parser.add_argument('--sample', type=int, default=None,
|
|
help='Validate random sample of N files')
|
|
parser.add_argument('--schema', type=str, default=str(SCHEMA_FILE),
|
|
help='Path to LinkML schema file')
|
|
|
|
args = parser.parse_args()
|
|
|
|
path = Path(args.path)
|
|
schema_file = Path(args.schema)
|
|
|
|
if not schema_file.exists():
|
|
print(f"Schema file not found: {schema_file}")
|
|
sys.exit(1)
|
|
|
|
# Collect files to validate
|
|
if path.is_file():
|
|
files = [path]
|
|
elif path.is_dir():
|
|
files = list(path.glob('*.yaml'))
|
|
if args.sample and args.sample < len(files):
|
|
files = random.sample(files, args.sample)
|
|
else:
|
|
print(f"Path not found: {path}")
|
|
sys.exit(1)
|
|
|
|
print(f"Validating {len(files)} files...")
|
|
results = validate_files(files, schema_file)
|
|
|
|
# Summary
|
|
valid_count = sum(1 for r in results if r.valid)
|
|
invalid_count = len(results) - valid_count
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"VALIDATION SUMMARY")
|
|
print(f"{'='*60}")
|
|
print(f"Total files: {len(results)}")
|
|
print(f"Valid: {valid_count} ({100*valid_count/len(results):.1f}%)")
|
|
print(f"Invalid: {invalid_count} ({100*invalid_count/len(results):.1f}%)")
|
|
|
|
if invalid_count > 0:
|
|
print(f"\nInvalid files:")
|
|
for r in results:
|
|
if not r.valid:
|
|
print(f" - {r.file_path}")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"\n✓ All files valid!")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|