glam/scripts/validate_linkml_schema_integrity.py
kempersc 66adec257e Add scripts for normalizing LinkML schemas and validating schema integrity
- Implement `normalize_linkml_alt_descriptions.py` to convert structured alt_descriptions to the expected scalar form.
- Implement `normalize_linkml_structured_aliases.py` to flatten language-keyed structured_aliases into a standard list-of-objects format.
- Implement `validate_linkml_schema_integrity.py` to validate the integrity of LinkML schema bundles, checking for import resolution, YAML parsing, and reference existence.
2026-02-16 10:16:51 +01:00

264 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""Validate LinkML schema bundle integrity.
This is a lightweight validator intended for this repository's modular LinkML
schemas. It checks:
- all local imports resolve to readable YAML files
- all imported modules parse as YAML
- referenced class/slot/enum names exist somewhere in the import closure
It deliberately does not depend on LinkML's SchemaView/JsonSchemaGenerator, so it
can run even if the schema contains non-standard extensions/annotations.
Usage:
python scripts/validate_linkml_schema_integrity.py \
--schema schemas/20251121/linkml/custodian_source.yaml
"""
from __future__ import annotations
import argparse
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
import yaml
PRIMITIVE_RANGES: Set[str] = {
# LinkML core
"string",
"integer",
"boolean",
"float",
"double",
"decimal",
"date",
"datetime",
"time",
"uri",
"uriorcurie",
"curie",
"ncname",
"jsonpointer",
# Common schema-local patterns
"Any",
"linkml:Any",
}
@dataclass
class Issue:
file: Path
message: str
def _is_external_import(imp: str) -> bool:
# Examples: linkml:types
return ":" in imp and not imp.startswith("./") and not imp.startswith("../")
def _resolve_import(base_dir: Path, imp: str) -> Optional[Path]:
if _is_external_import(imp):
return None
p = (base_dir / imp).resolve()
if p.suffix != ".yaml":
p = p.with_suffix(".yaml")
return p
def _load_yaml(path: Path) -> Dict[str, Any]:
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
raise ValueError("Top-level YAML is not a mapping")
return data
def _iter_imports(schema: Dict[str, Any]) -> Iterable[str]:
imports = schema.get("imports")
if not imports:
return []
if not isinstance(imports, list):
return []
return [i for i in imports if isinstance(i, str)]
def _collect_definitions(schema: Dict[str, Any]) -> Tuple[Set[str], Set[str], Set[str]]:
classes = schema.get("classes")
slots = schema.get("slots")
enums = schema.get("enums")
class_names = set(classes.keys()) if isinstance(classes, dict) else set()
slot_names = set(slots.keys()) if isinstance(slots, dict) else set()
enum_names = set(enums.keys()) if isinstance(enums, dict) else set()
return class_names, slot_names, enum_names
def _iter_class_defs(schema: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, Any]]]:
classes = schema.get("classes")
if not isinstance(classes, dict):
return []
out = []
for name, spec in classes.items():
if isinstance(name, str) and isinstance(spec, dict):
out.append((name, spec))
return out
def _iter_slot_defs(schema: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, Any]]]:
slots = schema.get("slots")
if not isinstance(slots, dict):
return []
out = []
for name, spec in slots.items():
if isinstance(name, str) and isinstance(spec, dict):
out.append((name, spec))
return out
def _normalize_range(rng: str) -> str:
# Accept prefixed form for Any.
if rng == "Any" or rng == "linkml:Any":
return "linkml:Any"
return rng
def validate_schema_bundle(schema_path: Path) -> List[Issue]:
issues: List[Issue] = []
if not schema_path.exists():
return [Issue(schema_path, "Schema file not found")]
# Load schema + imports closure
to_visit: List[Path] = [schema_path.resolve()]
visited: Set[Path] = set()
schemas: Dict[Path, Dict[str, Any]] = {}
while to_visit:
path = to_visit.pop()
if path in visited:
continue
visited.add(path)
try:
data = _load_yaml(path)
except Exception as e:
issues.append(Issue(path, f"YAML parse/load error: {e}"))
continue
schemas[path] = data
base_dir = path.parent
for imp in _iter_imports(data):
resolved = _resolve_import(base_dir, imp)
if resolved is None:
continue
if not resolved.exists():
issues.append(Issue(path, f"Import not found: {imp} -> {resolved}"))
continue
to_visit.append(resolved)
# Aggregate definitions and detect duplicates
classes: Dict[str, Path] = {}
slots: Dict[str, Path] = {}
enums: Dict[str, Path] = {}
def _register(reg: Dict[str, Path], kind: str, name: str, file: Path):
if name in reg:
issues.append(Issue(file, f"Duplicate {kind} name '{name}' also defined in {reg[name]}") )
else:
reg[name] = file
for path, data in schemas.items():
cset, sset, eset = _collect_definitions(data)
for n in sorted(cset):
_register(classes, "class", n, path)
for n in sorted(sset):
_register(slots, "slot", n, path)
for n in sorted(eset):
_register(enums, "enum", n, path)
# Validate class references
for path, data in schemas.items():
for cname, cdef in _iter_class_defs(data):
parent = cdef.get("is_a")
if isinstance(parent, str) and parent and parent not in classes:
issues.append(Issue(path, f"Class '{cname}' has unknown is_a '{parent}'"))
for slot_name in cdef.get("slots") or []:
if isinstance(slot_name, str) and slot_name not in slots:
issues.append(Issue(path, f"Class '{cname}' references unknown slot '{slot_name}'"))
su = cdef.get("slot_usage")
if isinstance(su, dict):
for slot_name in su.keys():
if isinstance(slot_name, str) and slot_name not in slots:
issues.append(Issue(path, f"Class '{cname}' has slot_usage for unknown slot '{slot_name}'"))
# Validate slot ranges
for path, data in schemas.items():
for sname, sdef in _iter_slot_defs(data):
rng = sdef.get("range")
if not isinstance(rng, str) or not rng:
continue
rng = _normalize_range(rng)
if rng in PRIMITIVE_RANGES:
continue
if rng in classes or rng in enums:
continue
# Permit prefixed values without validating prefix expansion here
if ":" in rng:
continue
issues.append(Issue(path, f"Slot '{sname}' has unknown range '{rng}'"))
# Validate slot_usage range overrides
for path, data in schemas.items():
for cname, cdef in _iter_class_defs(data):
su = cdef.get("slot_usage")
if not isinstance(su, dict):
continue
for slot_name, override in su.items():
if not isinstance(override, dict):
continue
rng = override.get("range")
if not isinstance(rng, str) or not rng:
continue
rng = _normalize_range(rng)
if rng in PRIMITIVE_RANGES:
continue
if rng in classes or rng in enums:
continue
if ":" in rng:
continue
issues.append(Issue(path, f"Class '{cname}' slot_usage '{slot_name}' has unknown range '{rng}'"))
return issues
def main() -> int:
parser = argparse.ArgumentParser(description="Validate LinkML schema bundle integrity")
parser.add_argument(
"--schema",
default="schemas/20251121/linkml/custodian_source.yaml",
help="Root schema file to validate",
)
args = parser.parse_args()
schema_path = Path(args.schema)
issues = validate_schema_bundle(schema_path)
if issues:
print(f"Found {len(issues)} issue(s) in schema bundle: {schema_path}")
for issue in issues[:50]:
print(f"- {issue.file}: {issue.message}")
if len(issues) > 50:
print(f"... and {len(issues) - 50} more")
return 1
print(f"OK: schema bundle integrity checks passed: {schema_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())