#!/usr/bin/env python3 """Validate LinkML schema bundle integrity. This is a lightweight validator intended for this repository's modular LinkML schemas. It checks: - all local imports resolve to readable YAML files - all imported modules parse as YAML - referenced class/slot/enum names exist somewhere in the import closure It deliberately does not depend on LinkML's SchemaView/JsonSchemaGenerator, so it can run even if the schema contains non-standard extensions/annotations. Usage: python scripts/validate_linkml_schema_integrity.py \ --schema schemas/20251121/linkml/custodian_source.yaml """ from __future__ import annotations import argparse from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Set, Tuple import yaml PRIMITIVE_RANGES: Set[str] = { # LinkML core "string", "integer", "boolean", "float", "double", "decimal", "date", "datetime", "time", "uri", "uriorcurie", "curie", "ncname", "jsonpointer", # Common schema-local patterns "Any", "linkml:Any", } @dataclass class Issue: file: Path message: str def _is_external_import(imp: str) -> bool: # Examples: linkml:types return ":" in imp and not imp.startswith("./") and not imp.startswith("../") def _resolve_import(base_dir: Path, imp: str) -> Optional[Path]: if _is_external_import(imp): return None p = (base_dir / imp).resolve() if p.suffix != ".yaml": p = p.with_suffix(".yaml") return p def _load_yaml(path: Path) -> Dict[str, Any]: with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) if not isinstance(data, dict): raise ValueError("Top-level YAML is not a mapping") return data def _iter_imports(schema: Dict[str, Any]) -> Iterable[str]: imports = schema.get("imports") if not imports: return [] if not isinstance(imports, list): return [] return [i for i in imports if isinstance(i, str)] def _collect_definitions(schema: Dict[str, Any]) -> Tuple[Set[str], Set[str], Set[str]]: classes = schema.get("classes") slots = schema.get("slots") enums = schema.get("enums") class_names = set(classes.keys()) if isinstance(classes, dict) else set() slot_names = set(slots.keys()) if isinstance(slots, dict) else set() enum_names = set(enums.keys()) if isinstance(enums, dict) else set() return class_names, slot_names, enum_names def _iter_class_defs(schema: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, Any]]]: classes = schema.get("classes") if not isinstance(classes, dict): return [] out = [] for name, spec in classes.items(): if isinstance(name, str) and isinstance(spec, dict): out.append((name, spec)) return out def _iter_slot_defs(schema: Dict[str, Any]) -> Iterable[Tuple[str, Dict[str, Any]]]: slots = schema.get("slots") if not isinstance(slots, dict): return [] out = [] for name, spec in slots.items(): if isinstance(name, str) and isinstance(spec, dict): out.append((name, spec)) return out def _normalize_range(rng: str) -> str: # Accept prefixed form for Any. if rng == "Any" or rng == "linkml:Any": return "linkml:Any" return rng def validate_schema_bundle(schema_path: Path) -> List[Issue]: issues: List[Issue] = [] if not schema_path.exists(): return [Issue(schema_path, "Schema file not found")] # Load schema + imports closure to_visit: List[Path] = [schema_path.resolve()] visited: Set[Path] = set() schemas: Dict[Path, Dict[str, Any]] = {} while to_visit: path = to_visit.pop() if path in visited: continue visited.add(path) try: data = _load_yaml(path) except Exception as e: issues.append(Issue(path, f"YAML parse/load error: {e}")) continue schemas[path] = data base_dir = path.parent for imp in _iter_imports(data): resolved = _resolve_import(base_dir, imp) if resolved is None: continue if not resolved.exists(): issues.append(Issue(path, f"Import not found: {imp} -> {resolved}")) continue to_visit.append(resolved) # Aggregate definitions and detect duplicates classes: Dict[str, Path] = {} slots: Dict[str, Path] = {} enums: Dict[str, Path] = {} def _register(reg: Dict[str, Path], kind: str, name: str, file: Path): if name in reg: issues.append(Issue(file, f"Duplicate {kind} name '{name}' also defined in {reg[name]}") ) else: reg[name] = file for path, data in schemas.items(): cset, sset, eset = _collect_definitions(data) for n in sorted(cset): _register(classes, "class", n, path) for n in sorted(sset): _register(slots, "slot", n, path) for n in sorted(eset): _register(enums, "enum", n, path) # Validate class references for path, data in schemas.items(): for cname, cdef in _iter_class_defs(data): parent = cdef.get("is_a") if isinstance(parent, str) and parent and parent not in classes: issues.append(Issue(path, f"Class '{cname}' has unknown is_a '{parent}'")) for slot_name in cdef.get("slots") or []: if isinstance(slot_name, str) and slot_name not in slots: issues.append(Issue(path, f"Class '{cname}' references unknown slot '{slot_name}'")) su = cdef.get("slot_usage") if isinstance(su, dict): for slot_name in su.keys(): if isinstance(slot_name, str) and slot_name not in slots: issues.append(Issue(path, f"Class '{cname}' has slot_usage for unknown slot '{slot_name}'")) # Validate slot ranges for path, data in schemas.items(): for sname, sdef in _iter_slot_defs(data): rng = sdef.get("range") if not isinstance(rng, str) or not rng: continue rng = _normalize_range(rng) if rng in PRIMITIVE_RANGES: continue if rng in classes or rng in enums: continue # Permit prefixed values without validating prefix expansion here if ":" in rng: continue issues.append(Issue(path, f"Slot '{sname}' has unknown range '{rng}'")) # Validate slot_usage range overrides for path, data in schemas.items(): for cname, cdef in _iter_class_defs(data): su = cdef.get("slot_usage") if not isinstance(su, dict): continue for slot_name, override in su.items(): if not isinstance(override, dict): continue rng = override.get("range") if not isinstance(rng, str) or not rng: continue rng = _normalize_range(rng) if rng in PRIMITIVE_RANGES: continue if rng in classes or rng in enums: continue if ":" in rng: continue issues.append(Issue(path, f"Class '{cname}' slot_usage '{slot_name}' has unknown range '{rng}'")) return issues def main() -> int: parser = argparse.ArgumentParser(description="Validate LinkML schema bundle integrity") parser.add_argument( "--schema", default="schemas/20251121/linkml/custodian_source.yaml", help="Root schema file to validate", ) args = parser.parse_args() schema_path = Path(args.schema) issues = validate_schema_bundle(schema_path) if issues: print(f"Found {len(issues)} issue(s) in schema bundle: {schema_path}") for issue in issues[:50]: print(f"- {issue.file}: {issue.message}") if len(issues) > 50: print(f"... and {len(issues) - 50} more") return 1 print(f"OK: schema bundle integrity checks passed: {schema_path}") return 0 if __name__ == "__main__": raise SystemExit(main())