435 lines
17 KiB
Python
435 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load LinkML schemas from YAML files into PostgreSQL database.
|
|
|
|
This script parses LinkML schema files (classes, slots, enums) and stores them
|
|
in the database for queryable access via the API.
|
|
|
|
Usage:
|
|
python load_linkml_to_postgres.py [--version VERSION] [--schema-dir DIR]
|
|
|
|
Example:
|
|
python load_linkml_to_postgres.py --version 20251121 --schema-dir schemas/20251121/linkml
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
import yaml
|
|
|
|
|
|
# Default configuration
|
|
DEFAULT_SCHEMA_DIR = "schemas/20251121/linkml"
|
|
DEFAULT_VERSION = "20251121"
|
|
DEFAULT_SCHEMA_NAME = "heritage_custodian"
|
|
|
|
# Database connection from environment
|
|
DATABASE_URL = os.environ.get(
|
|
"DATABASE_URL",
|
|
"postgresql://glam_api:glam_api_password@localhost:5432/glam"
|
|
)
|
|
|
|
|
|
class LinkMLLoader:
|
|
"""Loads LinkML schema files into PostgreSQL."""
|
|
|
|
def __init__(self, conn: asyncpg.Connection, version: str, schema_name: str):
|
|
self.conn = conn
|
|
self.version = version
|
|
self.schema_name = schema_name
|
|
self.version_id: int | None = None
|
|
self.class_id_map: dict[str, int] = {} # class_name -> id
|
|
self.slot_id_map: dict[str, int] = {} # slot_name -> id
|
|
self.enum_id_map: dict[str, int] = {} # enum_name -> id
|
|
|
|
async def load_schema(self, schema_dir: Path) -> dict[str, int]:
|
|
"""Load complete LinkML schema from directory.
|
|
|
|
Returns:
|
|
Dict with counts: classes, slots, enums, enum_values
|
|
"""
|
|
stats = {"classes": 0, "slots": 0, "enums": 0, "enum_values": 0, "prefixes": 0}
|
|
|
|
# Create schema version record
|
|
await self._create_schema_version(schema_dir)
|
|
|
|
# Load slots first (classes reference them)
|
|
slots_dir = schema_dir / "modules" / "slots"
|
|
if slots_dir.exists():
|
|
stats["slots"] = await self._load_slots(slots_dir)
|
|
|
|
# Load enums
|
|
enums_dir = schema_dir / "modules" / "enums"
|
|
if enums_dir.exists():
|
|
enum_stats = await self._load_enums(enums_dir)
|
|
stats["enums"] = enum_stats["enums"]
|
|
stats["enum_values"] = enum_stats["values"]
|
|
|
|
# Load classes
|
|
classes_dir = schema_dir / "modules" / "classes"
|
|
if classes_dir.exists():
|
|
stats["classes"] = await self._load_classes(classes_dir)
|
|
|
|
# Mark this version as current (unmark previous)
|
|
await self._set_current_version()
|
|
|
|
return stats
|
|
|
|
async def _create_schema_version(self, schema_dir: Path) -> None:
|
|
"""Create schema version record."""
|
|
# Check if version already exists
|
|
existing = await self.conn.fetchval(
|
|
"SELECT id FROM linkml_schema_versions WHERE version = $1",
|
|
self.version
|
|
)
|
|
|
|
if existing:
|
|
# Delete existing version and its data (cascade)
|
|
await self.conn.execute(
|
|
"DELETE FROM linkml_schema_versions WHERE id = $1",
|
|
existing
|
|
)
|
|
print(f" Replaced existing version {self.version}")
|
|
|
|
# Get git commit if in git repo
|
|
git_commit = None
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=schema_dir.parent.parent.parent
|
|
)
|
|
if result.returncode == 0:
|
|
git_commit = result.stdout.strip()[:40]
|
|
except Exception:
|
|
pass
|
|
|
|
# Insert new version
|
|
self.version_id = await self.conn.fetchval(
|
|
"""
|
|
INSERT INTO linkml_schema_versions
|
|
(version, schema_name, description, source_path, git_commit, loaded_by, is_current)
|
|
VALUES ($1, $2, $3, $4, $5, $6, FALSE)
|
|
RETURNING id
|
|
""",
|
|
self.version,
|
|
self.schema_name,
|
|
f"LinkML schema version {self.version} for Heritage Custodian Ontology",
|
|
str(schema_dir),
|
|
git_commit,
|
|
os.environ.get("USER", "unknown")
|
|
)
|
|
print(f" Created schema version: {self.version} (id={self.version_id})")
|
|
|
|
async def _set_current_version(self) -> None:
|
|
"""Mark this version as current, unmark others."""
|
|
await self.conn.execute(
|
|
"UPDATE linkml_schema_versions SET is_current = FALSE WHERE is_current = TRUE"
|
|
)
|
|
await self.conn.execute(
|
|
"UPDATE linkml_schema_versions SET is_current = TRUE WHERE id = $1",
|
|
self.version_id
|
|
)
|
|
print(f" Set version {self.version} as current")
|
|
|
|
async def _load_slots(self, slots_dir: Path) -> int:
|
|
"""Load all slot definitions from directory."""
|
|
count = 0
|
|
for yaml_file in sorted(slots_dir.glob("*.yaml")):
|
|
try:
|
|
with open(yaml_file, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
data = yaml.safe_load(content)
|
|
|
|
if not data or "slots" not in data:
|
|
continue
|
|
|
|
for slot_name, slot_def in data.get("slots", {}).items():
|
|
if slot_def is None:
|
|
slot_def = {}
|
|
|
|
slot_id = await self.conn.fetchval(
|
|
"""
|
|
INSERT INTO linkml_slots (
|
|
version_id, slot_name, slot_id, range, slot_uri,
|
|
required, multivalued, identifier, inlined, inlined_as_list,
|
|
pattern, description, comments, examples, yaml_content
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
|
|
RETURNING id
|
|
""",
|
|
self.version_id,
|
|
slot_name,
|
|
data.get("id", f"https://nde.nl/ontology/hc/slot/{slot_name}"),
|
|
slot_def.get("range"),
|
|
slot_def.get("slot_uri"),
|
|
slot_def.get("required", False),
|
|
slot_def.get("multivalued", False),
|
|
slot_def.get("identifier", False),
|
|
slot_def.get("inlined"),
|
|
slot_def.get("inlined_as_list"),
|
|
slot_def.get("pattern"),
|
|
slot_def.get("description"),
|
|
slot_def.get("comments", []),
|
|
self._to_jsonb(slot_def.get("examples")),
|
|
content
|
|
)
|
|
self.slot_id_map[slot_name] = slot_id
|
|
count += 1
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Error loading slot {yaml_file.name}: {e}")
|
|
|
|
print(f" Loaded {count} slots")
|
|
return count
|
|
|
|
async def _load_enums(self, enums_dir: Path) -> dict[str, int]:
|
|
"""Load all enum definitions from directory."""
|
|
enum_count = 0
|
|
value_count = 0
|
|
|
|
for yaml_file in sorted(enums_dir.glob("*.yaml")):
|
|
try:
|
|
with open(yaml_file, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
data = yaml.safe_load(content)
|
|
|
|
if not data or "enums" not in data:
|
|
continue
|
|
|
|
for enum_name, enum_def in data.get("enums", {}).items():
|
|
if enum_def is None:
|
|
enum_def = {}
|
|
|
|
enum_id = await self.conn.fetchval(
|
|
"""
|
|
INSERT INTO linkml_enums (
|
|
version_id, enum_name, enum_id, title,
|
|
description, comments, yaml_content
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
RETURNING id
|
|
""",
|
|
self.version_id,
|
|
enum_name,
|
|
data.get("id", f"https://nde.nl/ontology/hc/enum/{enum_name}"),
|
|
data.get("title"),
|
|
data.get("description") or enum_def.get("description"),
|
|
enum_def.get("comments", []),
|
|
content
|
|
)
|
|
self.enum_id_map[enum_name] = enum_id
|
|
enum_count += 1
|
|
|
|
# Load permissible values
|
|
permissible_values = enum_def.get("permissible_values", {})
|
|
for order, (value_name, value_def) in enumerate(permissible_values.items()):
|
|
if value_def is None:
|
|
value_def = {}
|
|
|
|
await self.conn.execute(
|
|
"""
|
|
INSERT INTO linkml_enum_values (
|
|
enum_id, value_name, meaning, description, comments, value_order
|
|
) VALUES ($1, $2, $3, $4, $5, $6)
|
|
""",
|
|
enum_id,
|
|
value_name,
|
|
value_def.get("meaning"),
|
|
value_def.get("description"),
|
|
value_def.get("comments", []),
|
|
order
|
|
)
|
|
value_count += 1
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Error loading enum {yaml_file.name}: {e}")
|
|
|
|
print(f" Loaded {enum_count} enums with {value_count} values")
|
|
return {"enums": enum_count, "values": value_count}
|
|
|
|
async def _load_classes(self, classes_dir: Path) -> int:
|
|
"""Load all class definitions from directory."""
|
|
count = 0
|
|
class_data: dict[str, tuple[dict, str]] = {} # class_name -> (class_def, yaml_content)
|
|
|
|
# First pass: load all class definitions
|
|
for yaml_file in sorted(classes_dir.glob("*.yaml")):
|
|
try:
|
|
with open(yaml_file, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
data = yaml.safe_load(content)
|
|
|
|
if not data or "classes" not in data:
|
|
continue
|
|
|
|
for class_name, class_def in data.get("classes", {}).items():
|
|
if class_def is None:
|
|
class_def = {}
|
|
class_data[class_name] = (class_def, content, data.get("id"))
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Error parsing class file {yaml_file.name}: {e}")
|
|
|
|
# Second pass: insert classes
|
|
for class_name, (class_def, content, file_id) in class_data.items():
|
|
try:
|
|
class_id = await self.conn.fetchval(
|
|
"""
|
|
INSERT INTO linkml_classes (
|
|
version_id, class_name, class_id, title, is_a, class_uri, abstract,
|
|
description, comments, exact_mappings, close_mappings,
|
|
broad_mappings, narrow_mappings, yaml_content
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
|
|
RETURNING id
|
|
""",
|
|
self.version_id,
|
|
class_name,
|
|
file_id or f"https://nde.nl/ontology/hc/class/{class_name}",
|
|
class_def.get("title"),
|
|
class_def.get("is_a"),
|
|
class_def.get("class_uri"),
|
|
class_def.get("abstract", False),
|
|
class_def.get("description"),
|
|
class_def.get("comments", []),
|
|
class_def.get("exact_mappings", []),
|
|
class_def.get("close_mappings", []),
|
|
class_def.get("broad_mappings", []),
|
|
class_def.get("narrow_mappings", []),
|
|
content
|
|
)
|
|
self.class_id_map[class_name] = class_id
|
|
count += 1
|
|
|
|
# Link class to its slots
|
|
slots = class_def.get("slots", [])
|
|
slot_usage = class_def.get("slot_usage", {})
|
|
|
|
for order, slot_name in enumerate(slots):
|
|
slot_id = self.slot_id_map.get(slot_name)
|
|
if slot_id:
|
|
await self.conn.execute(
|
|
"""
|
|
INSERT INTO linkml_class_slots (
|
|
version_id, class_id, slot_id, slot_usage, slot_order
|
|
) VALUES ($1, $2, $3, $4, $5)
|
|
""",
|
|
self.version_id,
|
|
class_id,
|
|
slot_id,
|
|
self._to_jsonb(slot_usage.get(slot_name)),
|
|
order
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Error loading class {class_name}: {e}")
|
|
|
|
print(f" Loaded {count} classes")
|
|
return count
|
|
|
|
def _to_jsonb(self, data: Any) -> str | None:
|
|
"""Convert Python object to JSONB-compatible string."""
|
|
if data is None:
|
|
return None
|
|
import json
|
|
return json.dumps(data)
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Load LinkML schemas into PostgreSQL database"
|
|
)
|
|
parser.add_argument(
|
|
"--version",
|
|
default=DEFAULT_VERSION,
|
|
help=f"Schema version (default: {DEFAULT_VERSION})"
|
|
)
|
|
parser.add_argument(
|
|
"--schema-dir",
|
|
default=DEFAULT_SCHEMA_DIR,
|
|
help=f"Path to LinkML schema directory (default: {DEFAULT_SCHEMA_DIR})"
|
|
)
|
|
parser.add_argument(
|
|
"--schema-name",
|
|
default=DEFAULT_SCHEMA_NAME,
|
|
help=f"Schema name (default: {DEFAULT_SCHEMA_NAME})"
|
|
)
|
|
parser.add_argument(
|
|
"--database-url",
|
|
default=DATABASE_URL,
|
|
help="PostgreSQL connection URL (or set DATABASE_URL env var)"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Parse files but don't write to database"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
schema_dir = Path(args.schema_dir)
|
|
if not schema_dir.exists():
|
|
print(f"Error: Schema directory not found: {schema_dir}")
|
|
sys.exit(1)
|
|
|
|
print(f"Loading LinkML schema from: {schema_dir}")
|
|
print(f"Version: {args.version}")
|
|
print(f"Database: {args.database_url.split('@')[-1] if '@' in args.database_url else args.database_url}")
|
|
print()
|
|
|
|
if args.dry_run:
|
|
print("DRY RUN - not writing to database")
|
|
# Just parse and count
|
|
stats = {"classes": 0, "slots": 0, "enums": 0}
|
|
for subdir, key in [("classes", "classes"), ("slots", "slots"), ("enums", "enums")]:
|
|
path = schema_dir / "modules" / subdir
|
|
if path.exists():
|
|
stats[key] = len(list(path.glob("*.yaml")))
|
|
print(f"\nFound: {stats['classes']} class files, {stats['slots']} slot files, {stats['enums']} enum files")
|
|
return
|
|
|
|
# Connect to database
|
|
try:
|
|
conn = await asyncpg.connect(args.database_url)
|
|
except Exception as e:
|
|
print(f"Error connecting to database: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Run in transaction
|
|
async with conn.transaction():
|
|
loader = LinkMLLoader(conn, args.version, args.schema_name)
|
|
stats = await loader.load_schema(schema_dir)
|
|
|
|
print()
|
|
print("=" * 60)
|
|
print("LinkML Schema Loaded Successfully!")
|
|
print("=" * 60)
|
|
print(f" Classes: {stats['classes']}")
|
|
print(f" Slots: {stats['slots']}")
|
|
print(f" Enums: {stats['enums']}")
|
|
print(f" Enum Values: {stats['enum_values']}")
|
|
print()
|
|
print("Query with:")
|
|
print(" SELECT * FROM linkml_schema_stats;")
|
|
print(" SELECT * FROM linkml_current_classes;")
|
|
print(" SELECT * FROM search_linkml_schema('Custodian');")
|
|
|
|
except Exception as e:
|
|
print(f"Error loading schema: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
finally:
|
|
await conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|