glam/scripts/load_linkml_to_postgres.py
2025-12-07 00:26:01 +01:00

435 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Load LinkML schemas from YAML files into PostgreSQL database.
This script parses LinkML schema files (classes, slots, enums) and stores them
in the database for queryable access via the API.
Usage:
python load_linkml_to_postgres.py [--version VERSION] [--schema-dir DIR]
Example:
python load_linkml_to_postgres.py --version 20251121 --schema-dir schemas/20251121/linkml
"""
import argparse
import asyncio
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import asyncpg
import yaml
# Default configuration
DEFAULT_SCHEMA_DIR = "schemas/20251121/linkml"
DEFAULT_VERSION = "20251121"
DEFAULT_SCHEMA_NAME = "heritage_custodian"
# Database connection from environment
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql://glam_api:glam_api_password@localhost:5432/glam"
)
class LinkMLLoader:
"""Loads LinkML schema files into PostgreSQL."""
def __init__(self, conn: asyncpg.Connection, version: str, schema_name: str):
self.conn = conn
self.version = version
self.schema_name = schema_name
self.version_id: int | None = None
self.class_id_map: dict[str, int] = {} # class_name -> id
self.slot_id_map: dict[str, int] = {} # slot_name -> id
self.enum_id_map: dict[str, int] = {} # enum_name -> id
async def load_schema(self, schema_dir: Path) -> dict[str, int]:
"""Load complete LinkML schema from directory.
Returns:
Dict with counts: classes, slots, enums, enum_values
"""
stats = {"classes": 0, "slots": 0, "enums": 0, "enum_values": 0, "prefixes": 0}
# Create schema version record
await self._create_schema_version(schema_dir)
# Load slots first (classes reference them)
slots_dir = schema_dir / "modules" / "slots"
if slots_dir.exists():
stats["slots"] = await self._load_slots(slots_dir)
# Load enums
enums_dir = schema_dir / "modules" / "enums"
if enums_dir.exists():
enum_stats = await self._load_enums(enums_dir)
stats["enums"] = enum_stats["enums"]
stats["enum_values"] = enum_stats["values"]
# Load classes
classes_dir = schema_dir / "modules" / "classes"
if classes_dir.exists():
stats["classes"] = await self._load_classes(classes_dir)
# Mark this version as current (unmark previous)
await self._set_current_version()
return stats
async def _create_schema_version(self, schema_dir: Path) -> None:
"""Create schema version record."""
# Check if version already exists
existing = await self.conn.fetchval(
"SELECT id FROM linkml_schema_versions WHERE version = $1",
self.version
)
if existing:
# Delete existing version and its data (cascade)
await self.conn.execute(
"DELETE FROM linkml_schema_versions WHERE id = $1",
existing
)
print(f" Replaced existing version {self.version}")
# Get git commit if in git repo
git_commit = None
try:
import subprocess
result = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True,
text=True,
cwd=schema_dir.parent.parent.parent
)
if result.returncode == 0:
git_commit = result.stdout.strip()[:40]
except Exception:
pass
# Insert new version
self.version_id = await self.conn.fetchval(
"""
INSERT INTO linkml_schema_versions
(version, schema_name, description, source_path, git_commit, loaded_by, is_current)
VALUES ($1, $2, $3, $4, $5, $6, FALSE)
RETURNING id
""",
self.version,
self.schema_name,
f"LinkML schema version {self.version} for Heritage Custodian Ontology",
str(schema_dir),
git_commit,
os.environ.get("USER", "unknown")
)
print(f" Created schema version: {self.version} (id={self.version_id})")
async def _set_current_version(self) -> None:
"""Mark this version as current, unmark others."""
await self.conn.execute(
"UPDATE linkml_schema_versions SET is_current = FALSE WHERE is_current = TRUE"
)
await self.conn.execute(
"UPDATE linkml_schema_versions SET is_current = TRUE WHERE id = $1",
self.version_id
)
print(f" Set version {self.version} as current")
async def _load_slots(self, slots_dir: Path) -> int:
"""Load all slot definitions from directory."""
count = 0
for yaml_file in sorted(slots_dir.glob("*.yaml")):
try:
with open(yaml_file, "r", encoding="utf-8") as f:
content = f.read()
data = yaml.safe_load(content)
if not data or "slots" not in data:
continue
for slot_name, slot_def in data.get("slots", {}).items():
if slot_def is None:
slot_def = {}
slot_id = await self.conn.fetchval(
"""
INSERT INTO linkml_slots (
version_id, slot_name, slot_id, range, slot_uri,
required, multivalued, identifier, inlined, inlined_as_list,
pattern, description, comments, examples, yaml_content
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
RETURNING id
""",
self.version_id,
slot_name,
data.get("id", f"https://nde.nl/ontology/hc/slot/{slot_name}"),
slot_def.get("range"),
slot_def.get("slot_uri"),
slot_def.get("required", False),
slot_def.get("multivalued", False),
slot_def.get("identifier", False),
slot_def.get("inlined"),
slot_def.get("inlined_as_list"),
slot_def.get("pattern"),
slot_def.get("description"),
slot_def.get("comments", []),
self._to_jsonb(slot_def.get("examples")),
content
)
self.slot_id_map[slot_name] = slot_id
count += 1
except Exception as e:
print(f" Warning: Error loading slot {yaml_file.name}: {e}")
print(f" Loaded {count} slots")
return count
async def _load_enums(self, enums_dir: Path) -> dict[str, int]:
"""Load all enum definitions from directory."""
enum_count = 0
value_count = 0
for yaml_file in sorted(enums_dir.glob("*.yaml")):
try:
with open(yaml_file, "r", encoding="utf-8") as f:
content = f.read()
data = yaml.safe_load(content)
if not data or "enums" not in data:
continue
for enum_name, enum_def in data.get("enums", {}).items():
if enum_def is None:
enum_def = {}
enum_id = await self.conn.fetchval(
"""
INSERT INTO linkml_enums (
version_id, enum_name, enum_id, title,
description, comments, yaml_content
) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id
""",
self.version_id,
enum_name,
data.get("id", f"https://nde.nl/ontology/hc/enum/{enum_name}"),
data.get("title"),
data.get("description") or enum_def.get("description"),
enum_def.get("comments", []),
content
)
self.enum_id_map[enum_name] = enum_id
enum_count += 1
# Load permissible values
permissible_values = enum_def.get("permissible_values", {})
for order, (value_name, value_def) in enumerate(permissible_values.items()):
if value_def is None:
value_def = {}
await self.conn.execute(
"""
INSERT INTO linkml_enum_values (
enum_id, value_name, meaning, description, comments, value_order
) VALUES ($1, $2, $3, $4, $5, $6)
""",
enum_id,
value_name,
value_def.get("meaning"),
value_def.get("description"),
value_def.get("comments", []),
order
)
value_count += 1
except Exception as e:
print(f" Warning: Error loading enum {yaml_file.name}: {e}")
print(f" Loaded {enum_count} enums with {value_count} values")
return {"enums": enum_count, "values": value_count}
async def _load_classes(self, classes_dir: Path) -> int:
"""Load all class definitions from directory."""
count = 0
class_data: dict[str, tuple[dict, str]] = {} # class_name -> (class_def, yaml_content)
# First pass: load all class definitions
for yaml_file in sorted(classes_dir.glob("*.yaml")):
try:
with open(yaml_file, "r", encoding="utf-8") as f:
content = f.read()
data = yaml.safe_load(content)
if not data or "classes" not in data:
continue
for class_name, class_def in data.get("classes", {}).items():
if class_def is None:
class_def = {}
class_data[class_name] = (class_def, content, data.get("id"))
except Exception as e:
print(f" Warning: Error parsing class file {yaml_file.name}: {e}")
# Second pass: insert classes
for class_name, (class_def, content, file_id) in class_data.items():
try:
class_id = await self.conn.fetchval(
"""
INSERT INTO linkml_classes (
version_id, class_name, class_id, title, is_a, class_uri, abstract,
description, comments, exact_mappings, close_mappings,
broad_mappings, narrow_mappings, yaml_content
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING id
""",
self.version_id,
class_name,
file_id or f"https://nde.nl/ontology/hc/class/{class_name}",
class_def.get("title"),
class_def.get("is_a"),
class_def.get("class_uri"),
class_def.get("abstract", False),
class_def.get("description"),
class_def.get("comments", []),
class_def.get("exact_mappings", []),
class_def.get("close_mappings", []),
class_def.get("broad_mappings", []),
class_def.get("narrow_mappings", []),
content
)
self.class_id_map[class_name] = class_id
count += 1
# Link class to its slots
slots = class_def.get("slots", [])
slot_usage = class_def.get("slot_usage", {})
for order, slot_name in enumerate(slots):
slot_id = self.slot_id_map.get(slot_name)
if slot_id:
await self.conn.execute(
"""
INSERT INTO linkml_class_slots (
version_id, class_id, slot_id, slot_usage, slot_order
) VALUES ($1, $2, $3, $4, $5)
""",
self.version_id,
class_id,
slot_id,
self._to_jsonb(slot_usage.get(slot_name)),
order
)
except Exception as e:
print(f" Warning: Error loading class {class_name}: {e}")
print(f" Loaded {count} classes")
return count
def _to_jsonb(self, data: Any) -> str | None:
"""Convert Python object to JSONB-compatible string."""
if data is None:
return None
import json
return json.dumps(data)
async def main():
parser = argparse.ArgumentParser(
description="Load LinkML schemas into PostgreSQL database"
)
parser.add_argument(
"--version",
default=DEFAULT_VERSION,
help=f"Schema version (default: {DEFAULT_VERSION})"
)
parser.add_argument(
"--schema-dir",
default=DEFAULT_SCHEMA_DIR,
help=f"Path to LinkML schema directory (default: {DEFAULT_SCHEMA_DIR})"
)
parser.add_argument(
"--schema-name",
default=DEFAULT_SCHEMA_NAME,
help=f"Schema name (default: {DEFAULT_SCHEMA_NAME})"
)
parser.add_argument(
"--database-url",
default=DATABASE_URL,
help="PostgreSQL connection URL (or set DATABASE_URL env var)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse files but don't write to database"
)
args = parser.parse_args()
schema_dir = Path(args.schema_dir)
if not schema_dir.exists():
print(f"Error: Schema directory not found: {schema_dir}")
sys.exit(1)
print(f"Loading LinkML schema from: {schema_dir}")
print(f"Version: {args.version}")
print(f"Database: {args.database_url.split('@')[-1] if '@' in args.database_url else args.database_url}")
print()
if args.dry_run:
print("DRY RUN - not writing to database")
# Just parse and count
stats = {"classes": 0, "slots": 0, "enums": 0}
for subdir, key in [("classes", "classes"), ("slots", "slots"), ("enums", "enums")]:
path = schema_dir / "modules" / subdir
if path.exists():
stats[key] = len(list(path.glob("*.yaml")))
print(f"\nFound: {stats['classes']} class files, {stats['slots']} slot files, {stats['enums']} enum files")
return
# Connect to database
try:
conn = await asyncpg.connect(args.database_url)
except Exception as e:
print(f"Error connecting to database: {e}")
sys.exit(1)
try:
# Run in transaction
async with conn.transaction():
loader = LinkMLLoader(conn, args.version, args.schema_name)
stats = await loader.load_schema(schema_dir)
print()
print("=" * 60)
print("LinkML Schema Loaded Successfully!")
print("=" * 60)
print(f" Classes: {stats['classes']}")
print(f" Slots: {stats['slots']}")
print(f" Enums: {stats['enums']}")
print(f" Enum Values: {stats['enum_values']}")
print()
print("Query with:")
print(" SELECT * FROM linkml_schema_stats;")
print(" SELECT * FROM linkml_current_classes;")
print(" SELECT * FROM search_linkml_schema('Custodian');")
except Exception as e:
print(f"Error loading schema: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
await conn.close()
if __name__ == "__main__":
asyncio.run(main())