282 lines
9.3 KiB
Python
Executable file
282 lines
9.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Verify external ontology mappings used in LinkML YAML files.
|
|
|
|
Default behavior targets changed/untracked YAML files under:
|
|
schemas/20251121/linkml/
|
|
|
|
It validates mapping CURIEs under mapping keys:
|
|
exact_mappings, close_mappings, broad_mappings, narrow_mappings, related_mappings
|
|
|
|
Supported prefixes:
|
|
- la (Linked Art)
|
|
- rdac (RDA classes)
|
|
- rdau (RDA unconstrained properties)
|
|
- pav (PAV 2.3)
|
|
- ardo (ArDO)
|
|
- pca (POSC Caesar RDS)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
|
|
|
|
MAPPING_KEYS = {
|
|
"exact_mappings",
|
|
"close_mappings",
|
|
"broad_mappings",
|
|
"narrow_mappings",
|
|
"related_mappings",
|
|
}
|
|
SUPPORTED_PREFIXES = {"la", "rdac", "rdau", "pav", "ardo", "pca"}
|
|
CURIE_RE = re.compile(r"^(?P<prefix>[a-z][a-z0-9_-]*):(?P<local>[A-Za-z0-9_./-]+)$")
|
|
|
|
|
|
def fetch_text(url: str, timeout: int = 60) -> str:
|
|
with urllib.request.urlopen(url, timeout=timeout) as resp:
|
|
return resp.read().decode("utf-8", errors="ignore")
|
|
|
|
|
|
def fetch_bytes(url: str, timeout: int = 60) -> bytes:
|
|
with urllib.request.urlopen(url, timeout=timeout) as resp:
|
|
return resp.read()
|
|
|
|
|
|
def parse_mapping_curies(file_path: Path) -> list[tuple[int, str, str]]:
|
|
"""Return (line_number, prefix, local) mapping CURIEs from mapping blocks."""
|
|
out: list[tuple[int, str, str]] = []
|
|
lines = file_path.read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
|
|
in_block = False
|
|
block_indent = -1
|
|
|
|
for idx, line in enumerate(lines, start=1):
|
|
stripped = line.strip()
|
|
indent = len(line) - len(line.lstrip(" "))
|
|
|
|
if not in_block:
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
if ":" in stripped:
|
|
key = stripped.split(":", 1)[0].strip()
|
|
if key in MAPPING_KEYS and stripped.endswith(":"):
|
|
in_block = True
|
|
block_indent = indent
|
|
continue
|
|
|
|
# Exit mapping block on dedent to same or lower level and non-list content
|
|
if stripped and not stripped.startswith("#"):
|
|
if indent <= block_indent and not stripped.startswith("-"):
|
|
in_block = False
|
|
block_indent = -1
|
|
# re-process this line as potential new key
|
|
if ":" in stripped:
|
|
key = stripped.split(":", 1)[0].strip()
|
|
if key in MAPPING_KEYS and stripped.endswith(":"):
|
|
in_block = True
|
|
block_indent = indent
|
|
continue
|
|
|
|
if stripped.startswith("-"):
|
|
item = stripped[1:].strip()
|
|
# remove inline comment
|
|
if " #" in item:
|
|
item = item.split(" #", 1)[0].strip()
|
|
m = CURIE_RE.match(item)
|
|
if m:
|
|
pfx = m.group("prefix")
|
|
local = m.group("local")
|
|
out.append((idx, pfx, local))
|
|
|
|
return out
|
|
|
|
|
|
def changed_yaml_files(repo_root: Path, scope: Path) -> list[Path]:
|
|
"""Collect changed and untracked YAML files inside scope."""
|
|
files: set[Path] = set()
|
|
|
|
def run(cmd: list[str]) -> list[str]:
|
|
try:
|
|
out = subprocess.check_output(cmd, cwd=repo_root)
|
|
return [x for x in out.decode().splitlines() if x]
|
|
except subprocess.CalledProcessError:
|
|
return []
|
|
|
|
tracked = run(["git", "diff", "--name-only"])
|
|
untracked = run(["git", "ls-files", "--others", "--exclude-standard"])
|
|
|
|
for rel in tracked + untracked:
|
|
if not rel.endswith(".yaml"):
|
|
continue
|
|
p = (repo_root / rel).resolve()
|
|
try:
|
|
p.relative_to(scope.resolve())
|
|
except ValueError:
|
|
continue
|
|
if p.is_file():
|
|
files.add(p)
|
|
|
|
return sorted(files)
|
|
|
|
|
|
def load_linked_art_terms() -> tuple[set[str], set[str]]:
|
|
xml_data = fetch_bytes("https://linked.art/ns/terms/")
|
|
root = ET.fromstring(xml_data)
|
|
ns = {
|
|
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
|
|
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
|
|
}
|
|
props: set[str] = set()
|
|
classes: set[str] = set()
|
|
|
|
for p in root.findall("rdf:Property", ns):
|
|
uri = p.attrib.get("{http://www.w3.org/1999/02/22-rdf-syntax-ns#}about", "")
|
|
if uri.startswith("https://linked.art/ns/terms/"):
|
|
props.add(uri.rsplit("/", 1)[-1])
|
|
for c in root.findall("rdfs:Class", ns):
|
|
uri = c.attrib.get("{http://www.w3.org/1999/02/22-rdf-syntax-ns#}about", "")
|
|
if uri.startswith("https://linked.art/ns/terms/"):
|
|
classes.add(uri.rsplit("/", 1)[-1])
|
|
|
|
return props, classes
|
|
|
|
|
|
def load_rda_ids(path: str, marker: str) -> set[str]:
|
|
txt = fetch_text(f"https://www.rdaregistry.info/jsonld/Elements/{path}.jsonld")
|
|
return set(re.findall(marker, txt))
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Verify LinkML external mappings")
|
|
parser.add_argument(
|
|
"files",
|
|
nargs="*",
|
|
help="YAML files to verify (defaults to changed/untracked files under scope)",
|
|
)
|
|
parser.add_argument(
|
|
"--scope",
|
|
default="schemas/20251121/linkml",
|
|
help="Default scope used when no files are provided",
|
|
)
|
|
parser.add_argument(
|
|
"--all",
|
|
action="store_true",
|
|
help="Scan all YAML files under --scope (instead of changed/untracked files)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
scope = (repo_root / args.scope).resolve()
|
|
|
|
if args.files:
|
|
files = [Path(f).resolve() for f in args.files]
|
|
elif args.all:
|
|
files = sorted(scope.rglob("*.yaml"))
|
|
else:
|
|
files = changed_yaml_files(repo_root, scope)
|
|
|
|
if not files:
|
|
print("No target YAML files found. Nothing to verify.")
|
|
return 0
|
|
|
|
occurrences: dict[str, list[tuple[Path, int, str]]] = {}
|
|
for file_path in files:
|
|
if not file_path.exists() or file_path.suffix != ".yaml":
|
|
continue
|
|
for line_no, pfx, local in parse_mapping_curies(file_path):
|
|
if pfx not in SUPPORTED_PREFIXES:
|
|
continue
|
|
occurrences.setdefault(pfx, []).append((file_path, line_no, local))
|
|
|
|
if not occurrences:
|
|
print("No supported external mapping CURIEs found in selected files.")
|
|
return 0
|
|
|
|
failures: list[str] = []
|
|
|
|
la_props: set[str] = set()
|
|
la_classes: set[str] = set()
|
|
rdac_ids: set[str] = set()
|
|
rdau_ids: set[str] = set()
|
|
pav_text = ""
|
|
|
|
try:
|
|
la_props, la_classes = load_linked_art_terms()
|
|
except Exception as e: # pragma: no cover - network failures
|
|
failures.append(f"[load] Linked Art: {e}")
|
|
|
|
try:
|
|
rdac_ids = load_rda_ids("c", r"Elements/c/(C\d+)")
|
|
except Exception as e: # pragma: no cover
|
|
failures.append(f"[load] RDA c.jsonld: {e}")
|
|
|
|
try:
|
|
rdau_ids = load_rda_ids("u", r"Elements/u/(P\d+)")
|
|
except Exception as e: # pragma: no cover
|
|
failures.append(f"[load] RDA u.jsonld: {e}")
|
|
|
|
try:
|
|
pav_text = fetch_text("https://purl.org/pav/2.3")
|
|
except Exception as e: # pragma: no cover
|
|
failures.append(f"[load] PAV 2.3: {e}")
|
|
|
|
print("Verifying mapping CURIEs:")
|
|
for prefix in sorted(occurrences):
|
|
locals_unique = sorted({x[2] for x in occurrences[prefix]})
|
|
print(f"- {prefix}: {', '.join(locals_unique)}")
|
|
|
|
# prefix-specific verification
|
|
for file_path, line_no, local in occurrences.get("la", []):
|
|
if local not in la_props and local not in la_classes:
|
|
failures.append(f"{file_path}:{line_no} la:{local} not found in linked.art/ns/terms")
|
|
|
|
for file_path, line_no, local in occurrences.get("rdac", []):
|
|
if local not in rdac_ids:
|
|
failures.append(f"{file_path}:{line_no} rdac:{local} not found in RDA Elements/c.jsonld")
|
|
|
|
for file_path, line_no, local in occurrences.get("rdau", []):
|
|
if local not in rdau_ids:
|
|
failures.append(f"{file_path}:{line_no} rdau:{local} not found in RDA Elements/u.jsonld")
|
|
|
|
for file_path, line_no, local in occurrences.get("pav", []):
|
|
if local not in pav_text:
|
|
failures.append(f"{file_path}:{line_no} pav:{local} not found in PAV 2.3 ontology")
|
|
|
|
for file_path, line_no, local in occurrences.get("ardo", []):
|
|
url = f"https://w3id.org/ardo/2.0/{local}"
|
|
try:
|
|
txt = fetch_text(url)
|
|
if local not in txt:
|
|
failures.append(f"{file_path}:{line_no} ardo:{local} not found at {url}")
|
|
except urllib.error.URLError as e:
|
|
failures.append(f"{file_path}:{line_no} ardo:{local} fetch error: {e}")
|
|
|
|
for file_path, line_no, local in occurrences.get("pca", []):
|
|
url = f"https://rds.posccaesar.org/ontology/plm/rdl/{local}"
|
|
try:
|
|
txt = fetch_text(url)
|
|
if local not in txt:
|
|
failures.append(f"{file_path}:{line_no} pca:{local} not found at {url}")
|
|
except urllib.error.URLError as e:
|
|
failures.append(f"{file_path}:{line_no} pca:{local} fetch error: {e}")
|
|
|
|
if failures:
|
|
print("\nFAIL")
|
|
for f in failures:
|
|
print(f"- {f}")
|
|
return 1
|
|
|
|
print("\nOK: all checked mapping CURIEs were verified against source ontologies.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|