""" SPARQL Language Server Protocol (SPARQL-LSP) A JSON-RPC based protocol for providing language intelligence for SPARQL queries against the Heritage Custodian ontology. Designed like the Language Server Protocol to be reusable across different AI agents, IDEs, and tools. Architecture: ┌─────────────────┐ JSON-RPC ┌─────────────────────┐ │ AI Agent │◄──────────────────►│ SPARQL-LSP │ │ (Client) │ │ Server │ └─────────────────┘ └─────────────────────┘ │ ┌─────────────────┐ JSON-RPC ┌────────▼────────────┐ │ IDE/Editor │◄──────────────────►│ Knowledge Sources │ │ (Client) │ │ - SHACL Shapes │ └─────────────────┘ │ - LinkML Schema │ │ - TypeDB Rules │ ┌─────────────────┐ JSON-RPC │ - SPARQL Endpoint │ │ Web UI │◄──────────────────►└─────────────────────┘ │ (Client) │ └─────────────────┘ LSP Methods Implemented: - initialize: Server capabilities handshake - textDocument/publishDiagnostics: SHACL-based validation errors - textDocument/completion: Prefix, class, property completion - textDocument/hover: Documentation on hover - textDocument/signatureHelp: Function signatures - sparql/execute: Execute query and return results - sparql/explain: Explain what a query does - sparql/suggest: Suggest novel connections from vector DB Author: Heritage Custodian Ontology Project Date: 2025-12-27 Protocol Version: 1.0.0 """ import json import logging import re import uuid from dataclasses import dataclass, field, asdict from enum import Enum, IntEnum from typing import Any, Dict, List, Optional, Union, Callable, Sequence from abc import ABC, abstractmethod logger = logging.getLogger(__name__) # ============================================================================= # JSON-RPC Protocol Types (LSP Standard) # ============================================================================= class ErrorCode(IntEnum): """Standard JSON-RPC and LSP error codes.""" # JSON-RPC errors ParseError = -32700 InvalidRequest = -32600 MethodNotFound = -32601 InvalidParams = -32602 InternalError = -32603 # LSP errors ServerNotInitialized = -32002 UnknownErrorCode = -32001 RequestCancelled = -32800 ContentModified = -32801 @dataclass class Position: """Position in a text document (0-indexed).""" line: int character: int @dataclass class Range: """Range in a text document.""" start: Position end: Position @dataclass class Location: """Location in a document.""" uri: str range: Range @dataclass class TextDocumentIdentifier: """Identifies a text document.""" uri: str @dataclass class TextDocumentItem: """Text document with content.""" uri: str languageId: str version: int text: str class DiagnosticSeverity(IntEnum): """Diagnostic severity levels.""" Error = 1 Warning = 2 Information = 3 Hint = 4 @dataclass class Diagnostic: """Represents a diagnostic (error, warning, etc.).""" range: Range message: str severity: DiagnosticSeverity = DiagnosticSeverity.Error code: Optional[str] = None source: str = "sparql-lsp" relatedInformation: Optional[List[Dict]] = None def to_dict(self) -> Dict[str, Any]: return { "range": { "start": {"line": self.range.start.line, "character": self.range.start.character}, "end": {"line": self.range.end.line, "character": self.range.end.character}, }, "message": self.message, "severity": self.severity, "code": self.code, "source": self.source, } class CompletionItemKind(IntEnum): """Completion item kinds.""" Text = 1 Method = 2 Function = 3 Constructor = 4 Field = 5 Variable = 6 Class = 7 Interface = 8 Module = 9 Property = 10 Unit = 11 Value = 12 Enum = 13 Keyword = 14 Snippet = 15 Color = 16 File = 17 Reference = 18 Folder = 19 EnumMember = 20 Constant = 21 Struct = 22 Event = 23 Operator = 24 TypeParameter = 25 @dataclass class CompletionItem: """Completion item returned by completion requests.""" label: str kind: CompletionItemKind detail: Optional[str] = None documentation: Optional[str] = None insertText: Optional[str] = None insertTextFormat: int = 1 # 1 = PlainText, 2 = Snippet def to_dict(self) -> Dict[str, Any]: result: Dict[str, Any] = { "label": self.label, "kind": self.kind, } if self.detail: result["detail"] = self.detail if self.documentation: result["documentation"] = {"kind": "markdown", "value": self.documentation} if self.insertText: result["insertText"] = self.insertText result["insertTextFormat"] = self.insertTextFormat return result @dataclass class Hover: """Hover information.""" contents: str # Markdown content range: Optional[Range] = None def to_dict(self) -> Dict[str, Any]: result: Dict[str, Any] = {"contents": {"kind": "markdown", "value": self.contents}} if self.range: result["range"] = { "start": {"line": self.range.start.line, "character": self.range.start.character}, "end": {"line": self.range.end.line, "character": self.range.end.character}, } return result @dataclass class SignatureInformation: """Signature information for a function.""" label: str documentation: Optional[str] = None parameters: Optional[List[Dict[str, Any]]] = None def to_dict(self) -> Dict[str, Any]: result: Dict[str, Any] = {"label": self.label} if self.documentation: result["documentation"] = {"kind": "markdown", "value": self.documentation} if self.parameters: result["parameters"] = self.parameters return result @dataclass class SignatureHelp: """Signature help result.""" signatures: List[SignatureInformation] activeSignature: int = 0 activeParameter: int = 0 def to_dict(self) -> Dict[str, Any]: return { "signatures": [s.to_dict() for s in self.signatures], "activeSignature": self.activeSignature, "activeParameter": self.activeParameter, } # ============================================================================= # SPARQL-LSP Specific Types # ============================================================================= @dataclass class SPARQLExecuteResult: """Result of executing a SPARQL query.""" success: bool results: Optional[Dict] = None error: Optional[str] = None executionTimeMs: Optional[float] = None def to_dict(self) -> Dict[str, Any]: return { "success": self.success, "results": self.results, "error": self.error, "executionTimeMs": self.executionTimeMs, } @dataclass class SPARQLExplanation: """Explanation of what a SPARQL query does.""" summary: str steps: List[str] estimatedComplexity: str # "simple", "moderate", "complex" suggestedOptimizations: Optional[List[str]] = None def to_dict(self) -> Dict[str, Any]: return { "summary": self.summary, "steps": self.steps, "estimatedComplexity": self.estimatedComplexity, "suggestedOptimizations": self.suggestedOptimizations, } @dataclass class SPARQLSuggestion: """Novel connection suggestion from vector DB.""" type: str # "relationship", "entity", "pattern" description: str sparqlFragment: str confidence: float source: str # "qdrant", "typedb", "inference" def to_dict(self) -> Dict[str, Any]: return asdict(self) # ============================================================================= # Knowledge Base (SHACL, LinkML, TypeDB) # ============================================================================= class OntologyKnowledgeBase: """ Knowledge base for SPARQL-LSP, derived from: - SHACL shapes (validation rules) - LinkML schema (class/property definitions) - TypeDB rules (inference patterns) """ # Prefixes PREFIXES: Dict[str, Dict[str, str]] = { "hc": { "uri": "https://nde.nl/ontology/hc/class/", "description": "Heritage Custodian classes", "example": "hc:Custodian", }, "hcp": { "uri": "https://nde.nl/ontology/hc/", "description": "Heritage Custodian properties", "example": "hcp:institutionType", }, "schema": { "uri": "http://schema.org/", "description": "Schema.org vocabulary", "example": "schema:name", }, "skos": { "uri": "http://www.w3.org/2004/02/skos/core#", "description": "SKOS vocabulary for labels", "example": "skos:prefLabel", }, "rdfs": { "uri": "http://www.w3.org/2000/01/rdf-schema#", "description": "RDF Schema", "example": "rdfs:label", }, "wd": { "uri": "http://www.wikidata.org/entity/", "description": "Wikidata entities", "example": "wd:Q55 (Netherlands)", }, "wdt": { "uri": "http://www.wikidata.org/prop/direct/", "description": "Wikidata direct properties", "example": "wdt:P17 (country)", }, "foaf": { "uri": "http://xmlns.com/foaf/0.1/", "description": "FOAF vocabulary", "example": "foaf:name", }, "dct": { "uri": "http://purl.org/dc/terms/", "description": "Dublin Core Terms", "example": "dct:description", }, } # Classes (from SHACL NodeShapes) CLASSES: Dict[str, Dict[str, Any]] = { "hc:Custodian": { "description": "Heritage custodian institution (museum, archive, library, etc.)", "properties": ["hcp:institutionType", "hcp:ghcid", "hcp:isil", "skos:prefLabel", "schema:addressCountry"], "example": "?s a hc:Custodian .", }, } # Properties (from SHACL PropertyShapes) PROPERTIES: Dict[str, Dict[str, Any]] = { "hcp:institutionType": { "description": "Single-letter institution type code", "domain": "hc:Custodian", "range": "xsd:string", "pattern": "^[MLAGSBREDONFHICUT]$", "values": { "M": "Museum", "L": "Library", "A": "Archive", "G": "Gallery", "S": "Collecting Society", "B": "Botanical/Zoo", "R": "Research Center", "E": "Education Provider", "O": "Official Institution", "D": "Digital Platform", "N": "NGO", "H": "Holy Site", "F": "Feature", "I": "Intangible Heritage", "C": "Corporation", "U": "Unknown", "T": "Trade Association", }, "example": '?s hcp:institutionType "M" .', }, "hcp:ghcid": { "description": "Global Heritage Custodian ID", "domain": "hc:Custodian", "range": "xsd:string", "pattern": "^[A-Z]{2}-[A-Z]{2,3}-[A-Z]{2,4}-[A-Z]-[A-Z0-9]+$", "example": '?s hcp:ghcid "NL-NH-AMS-M-RIJKS" .', }, "hcp:isil": { "description": "ISIL code (International Standard Identifier for Libraries)", "domain": "hc:Custodian", "range": "xsd:string", "pattern": "^[A-Z]{2}-[A-Za-z0-9]+$", "example": '?s hcp:isil "NL-AmRMA" .', }, "hcp:wikidataId": { "description": "Wikidata Q-number (without wd: prefix)", "domain": "hc:Custodian", "range": "xsd:string", "pattern": "^Q[0-9]+$", "example": '?s hcp:wikidataId "Q190804" .', }, "skos:prefLabel": { "description": "Preferred label/name of the institution", "domain": "hc:Custodian", "range": "xsd:string", "example": "?s skos:prefLabel ?name .", }, "schema:name": { "description": "Name of the institution", "domain": "hc:Custodian", "range": "xsd:string", "example": "?s schema:name ?name .", }, "schema:addressCountry": { "description": "Country as Wikidata entity URI", "domain": "hc:Custodian", "range": "IRI", "example": "?s schema:addressCountry wd:Q55 . # Netherlands", }, "schema:url": { "description": "Website URL", "domain": "hc:Custodian", "range": "xsd:anyURI", "example": "?s schema:url ?website .", }, } # SPARQL Keywords KEYWORDS: List[str] = [ "SELECT", "CONSTRUCT", "ASK", "DESCRIBE", "WHERE", "FILTER", "OPTIONAL", "UNION", "MINUS", "GROUP BY", "ORDER BY", "HAVING", "LIMIT", "OFFSET", "DISTINCT", "REDUCED", "AS", "BIND", "VALUES", "COUNT", "SUM", "AVG", "MIN", "MAX", "SAMPLE", "GROUP_CONCAT", "STR", "LANG", "LANGMATCHES", "DATATYPE", "BOUND", "IRI", "URI", "BNODE", "RAND", "ABS", "CEIL", "FLOOR", "ROUND", "CONCAT", "STRLEN", "UCASE", "LCASE", "ENCODE_FOR_URI", "CONTAINS", "STRSTARTS", "STRENDS", "STRBEFORE", "STRAFTER", "YEAR", "MONTH", "DAY", "HOURS", "MINUTES", "SECONDS", "TIMEZONE", "TZ", "NOW", "UUID", "STRUUID", "MD5", "SHA1", "SHA256", "SHA384", "SHA512", "COALESCE", "IF", "STRLANG", "STRDT", "SAMETERM", "ISIRI", "ISURI", "ISBLANK", "ISLITERAL", "ISNUMERIC", "REGEX", "REPLACE", "EXISTS", "NOT EXISTS", "PREFIX", "BASE", "FROM", "FROM NAMED", "GRAPH", "SERVICE", "SILENT", "IN", "NOT IN", "a", # shorthand for rdf:type ] # Province codes for filtering DUTCH_PROVINCES: Dict[str, str] = { "NH": "Noord-Holland", "ZH": "Zuid-Holland", "NB": "Noord-Brabant", "GE": "Gelderland", "UT": "Utrecht", "OV": "Overijssel", "LI": "Limburg", "FR": "Friesland", "GR": "Groningen", "DR": "Drenthe", "FL": "Flevoland", "ZE": "Zeeland", } # Country Wikidata IDs COUNTRIES: Dict[str, str] = { "Q55": "Netherlands", "Q17": "Japan", "Q213": "Czech Republic", "Q31": "Belgium", "Q40": "Austria", "Q183": "Germany", "Q145": "United Kingdom", "Q142": "France", "Q30": "United States", } # TypeDB Functions (inference rules) TYPEDB_FUNCTIONS: Dict[str, Dict[str, str]] = { "get-reconstructions-by-observation-name": { "parameters": "$name: string", "returns": "{ custodian-reconstruction }", "description": "Get all reconstructions derived from observations with given name", }, "get-high-confidence-observations": { "parameters": "", "returns": "{ custodian-observation }", "description": "Get observations that have multiple sources (high confidence)", }, "get-entity-names": { "parameters": "$recon: custodian-reconstruction", "returns": "{ string }", "description": "Get all observed names for a given reconstruction", }, "get-all-descendants": { "parameters": "$parent: custodian-reconstruction", "returns": "{ custodian-reconstruction }", "description": "Get all child organizations recursively", }, "get-name-successors": { "parameters": "$name: custodian-name", "returns": "{ custodian-name }", "description": "Get all successor names in temporal order", }, } # ============================================================================= # SPARQL-LSP Server # ============================================================================= class SPARQLLanguageServer: """ SPARQL Language Server implementing LSP-like protocol. Provides: - Diagnostics (SHACL-based validation) - Code completion (prefixes, classes, properties) - Hover information (documentation) - Signature help (SPARQL functions) - Query execution - Query explanation - Novel connection suggestions """ def __init__( self, sparql_endpoint: str = "https://bronhouder.nl/sparql", qdrant_host: Optional[str] = None, typedb_host: Optional[str] = None, ): self.sparql_endpoint = sparql_endpoint self.qdrant_host = qdrant_host self.typedb_host = typedb_host self.kb = OntologyKnowledgeBase() self.initialized = False self.documents: Dict[str, TextDocumentItem] = {} # Method handlers self._methods: Dict[str, Callable] = { "initialize": self._handle_initialize, "initialized": self._handle_initialized, "shutdown": self._handle_shutdown, "textDocument/didOpen": self._handle_did_open, "textDocument/didChange": self._handle_did_change, "textDocument/didClose": self._handle_did_close, "textDocument/completion": self._handle_completion, "textDocument/hover": self._handle_hover, "textDocument/signatureHelp": self._handle_signature_help, "sparql/validate": self._handle_validate, "sparql/execute": self._handle_execute, "sparql/explain": self._handle_explain, "sparql/suggest": self._handle_suggest, } # ========================================================================= # JSON-RPC Message Handling # ========================================================================= def handle_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Handle incoming JSON-RPC message. Args: message: JSON-RPC request/notification Returns: JSON-RPC response (None for notifications) """ try: # Validate JSON-RPC format if "jsonrpc" not in message or message["jsonrpc"] != "2.0": return self._error_response(None, ErrorCode.InvalidRequest, "Invalid JSON-RPC version") method = message.get("method") params = message.get("params", {}) msg_id = message.get("id") # None for notifications if not method: return self._error_response(msg_id, ErrorCode.InvalidRequest, "Missing method") # Find handler handler = self._methods.get(method) if not handler: return self._error_response(msg_id, ErrorCode.MethodNotFound, f"Unknown method: {method}") # Check initialization if not self.initialized and method not in ("initialize", "initialized", "shutdown"): return self._error_response(msg_id, ErrorCode.ServerNotInitialized, "Server not initialized") # Call handler result = handler(params) # Return response (None for notifications) if msg_id is not None: return self._success_response(msg_id, result) return None except Exception as e: logger.exception(f"Error handling message: {e}") return self._error_response( message.get("id"), ErrorCode.InternalError, str(e) ) def _success_response(self, msg_id: Any, result: Any) -> Dict[str, Any]: """Create JSON-RPC success response.""" return { "jsonrpc": "2.0", "id": msg_id, "result": result, } def _error_response(self, msg_id: Any, code: ErrorCode, message: str) -> Dict[str, Any]: """Create JSON-RPC error response.""" return { "jsonrpc": "2.0", "id": msg_id, "error": { "code": code, "message": message, }, } def _notification(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: """Create JSON-RPC notification (no id).""" return { "jsonrpc": "2.0", "method": method, "params": params, } # ========================================================================= # LSP Lifecycle Methods # ========================================================================= def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialize request.""" self.initialized = True return { "capabilities": { "textDocumentSync": { "openClose": True, "change": 1, # Full sync }, "completionProvider": { "triggerCharacters": [":", "<", "?", "$", '"'], "resolveProvider": False, }, "hoverProvider": True, "signatureHelpProvider": { "triggerCharacters": ["(", ","], }, "diagnosticProvider": { "interFileDependencies": False, "workspaceDiagnostics": False, }, # Custom SPARQL capabilities "sparqlExecuteProvider": True, "sparqlExplainProvider": True, "sparqlSuggestProvider": True, }, "serverInfo": { "name": "sparql-lsp", "version": "1.0.0", }, } def _handle_initialized(self, params: Dict[str, Any]) -> None: """Handle initialized notification.""" logger.info("SPARQL-LSP server initialized") return None def _handle_shutdown(self, params: Dict[str, Any]) -> None: """Handle shutdown request.""" self.initialized = False return None # ========================================================================= # Document Sync Methods # ========================================================================= def _handle_did_open(self, params: Dict[str, Any]) -> None: """Handle textDocument/didOpen notification.""" doc = params.get("textDocument", {}) self.documents[doc["uri"]] = TextDocumentItem( uri=doc["uri"], languageId=doc.get("languageId", "sparql"), version=doc.get("version", 0), text=doc.get("text", ""), ) return None def _handle_did_change(self, params: Dict[str, Any]) -> None: """Handle textDocument/didChange notification.""" uri = params.get("textDocument", {}).get("uri") changes = params.get("contentChanges", []) if uri in self.documents and changes: # Full sync - take the whole text self.documents[uri].text = changes[0].get("text", "") self.documents[uri].version = params.get("textDocument", {}).get("version", 0) return None def _handle_did_close(self, params: Dict[str, Any]) -> None: """Handle textDocument/didClose notification.""" uri = params.get("textDocument", {}).get("uri") if uri in self.documents: del self.documents[uri] return None # ========================================================================= # Diagnostics (SHACL-based Validation) # ========================================================================= def _handle_validate(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle sparql/validate request. Returns diagnostics for a SPARQL query, plus auto-corrected version if applicable. """ uri = params.get("textDocument", {}).get("uri") doc = self.documents.get(uri) if uri else None text = params.get("text") or (doc.text if doc else "") # Import auto_correct here to avoid circular imports try: from .sparql_linter import auto_correct_sparql corrected_text, was_corrected = auto_correct_sparql(text) except ImportError: try: from sparql_linter import auto_correct_sparql corrected_text, was_corrected = auto_correct_sparql(text) except ImportError: corrected_text, was_corrected = text, False # Validate the ORIGINAL query to show what's wrong diagnostics = self._validate_sparql(text) result: Dict[str, Any] = { "uri": uri, "diagnostics": [d.to_dict() for d in diagnostics], } # Include corrected query if auto-correction was applied if was_corrected: result["corrected_query"] = corrected_text result["auto_corrected"] = True return result def _validate_sparql(self, text: str) -> List[Diagnostic]: """Validate SPARQL query and return diagnostics.""" diagnostics = [] lines = text.split("\n") # Check for deprecated prefixes/classes # NOTE: Only flag patterns that are ACTUALLY wrong in our triplestore. # DO NOT flag crm:E39_Actor - it works correctly (dual typing with hcc:Custodian) # DO NOT suggest hcp: prefix - our ontology uses hc: for BOTH classes and properties deprecated_patterns = [ (r"w3id\.org/heritage/custodian", "WRONG_PREFIX_URI", "Use https://nde.nl/ontology/hc/ prefix"), (r'institutionType\s+"Museum"', "WRONG_TYPE_VALUE", 'Use "M" instead of "Museum"'), (r'institutionType\s+"Library"', "WRONG_TYPE_VALUE", 'Use "L" instead of "Library"'), (r'institutionType\s+"Archive"', "WRONG_TYPE_VALUE", 'Use "A" instead of "Archive"'), (r'addressCountry\s+"NL"', "WRONG_COUNTRY_FORMAT", "Use wd:Q55 for Netherlands"), # NOTE: The following rules were REMOVED because they broke queries: # - crm:E39_Actor works correctly in our triplestore (dual typing) # - hc: prefix is used for BOTH classes and properties, hcp: is undefined # - Suggesting hc:Custodian is wrong - the correct class is hcc:Custodian ] for line_num, line in enumerate(lines): for pattern, code, message in deprecated_patterns: match = re.search(pattern, line, re.IGNORECASE) if match: diagnostics.append(Diagnostic( range=Range( start=Position(line_num, match.start()), end=Position(line_num, match.end()), ), message=message, severity=DiagnosticSeverity.Error, code=code, )) # Check for syntax issues open_braces = text.count("{") close_braces = text.count("}") if open_braces != close_braces: diagnostics.append(Diagnostic( range=Range(start=Position(0, 0), end=Position(0, 1)), message=f"Unbalanced braces: {open_braces} opening, {close_braces} closing", severity=DiagnosticSeverity.Error, code="SYNTAX_ERROR", )) # Check for SELECT without WHERE if re.search(r"\bSELECT\b", text, re.IGNORECASE) and not re.search(r"\bWHERE\b", text, re.IGNORECASE): diagnostics.append(Diagnostic( range=Range(start=Position(0, 0), end=Position(0, 6)), message="SELECT query missing WHERE clause", severity=DiagnosticSeverity.Error, code="MISSING_WHERE", )) # Warning for province filtering without URI pattern for code, name in self.kb.DUTCH_PROVINCES.items(): if name.lower() in text.lower() and f"NL-{code}" not in text: line_num = next((i for i, l in enumerate(lines) if name.lower() in l.lower()), 0) diagnostics.append(Diagnostic( range=Range(start=Position(line_num, 0), end=Position(line_num, len(lines[line_num]))), message=f"Province '{name}' - consider URI filtering", severity=DiagnosticSeverity.Warning, code="SUGGEST_URI_FILTER", )) return diagnostics # ========================================================================= # Completion # ========================================================================= def _handle_completion(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle textDocument/completion request.""" uri = params.get("textDocument", {}).get("uri") position = params.get("position", {}) doc = self.documents.get(uri) if uri else None text = doc.text if doc else "" line = position.get("line", 0) character = position.get("character", 0) # Get the current line and context lines = text.split("\n") current_line = lines[line] if line < len(lines) else "" prefix_text = current_line[:character] items = [] # Prefix completion (after PREFIX keyword) if re.search(r"PREFIX\s+\w*$", prefix_text, re.IGNORECASE): for prefix, info in self.kb.PREFIXES.items(): items.append(CompletionItem( label=prefix, kind=CompletionItemKind.Module, detail=info["description"], documentation=f"**URI:** `{info['uri']}`\n\n**Example:** `{info['example']}`", insertText=f"{prefix}: <{info['uri']}>", )) # Class completion (after "a " or "rdf:type") elif re.search(r"(\ba\s+|\brdf:type\s+)\w*$", prefix_text, re.IGNORECASE): for cls, info in self.kb.CLASSES.items(): items.append(CompletionItem( label=cls, kind=CompletionItemKind.Class, detail=info["description"], documentation=f"**Properties:** {', '.join(info['properties'])}\n\n**Example:**\n```sparql\n{info['example']}\n```", insertText=cls, )) # Property completion (after prefix like "hcp:") elif re.search(r"(hcp|schema|skos|rdfs|foaf|dct):\w*$", prefix_text): prefix_match = re.search(r"(hcp|schema|skos|rdfs|foaf|dct):(\w*)$", prefix_text) if prefix_match: prefix = prefix_match.group(1) for prop, info in self.kb.PROPERTIES.items(): if prop.startswith(f"{prefix}:"): prop_name = prop.split(":")[1] items.append(CompletionItem( label=prop_name, kind=CompletionItemKind.Property, detail=info["description"], documentation=f"**Range:** `{info['range']}`\n\n**Example:**\n```sparql\n{info['example']}\n```", insertText=prop_name, )) # Institution type value completion (after institutionType) elif re.search(r'institutionType\s+"?\w*$', prefix_text): for code, name in self.kb.PROPERTIES["hcp:institutionType"]["values"].items(): items.append(CompletionItem( label=f'"{code}"', kind=CompletionItemKind.Value, detail=name, documentation=f"Institution type code for **{name}**", insertText=f'"{code}"', )) # Country completion (after addressCountry) elif re.search(r"addressCountry\s+\w*$", prefix_text): for qid, name in self.kb.COUNTRIES.items(): items.append(CompletionItem( label=f"wd:{qid}", kind=CompletionItemKind.Value, detail=name, documentation=f"Wikidata entity for **{name}**", insertText=f"wd:{qid}", )) # Keyword completion elif re.search(r"\b[A-Z]*$", prefix_text): keyword_prefix = re.search(r"\b([A-Z]*)$", prefix_text) if keyword_prefix: prefix_str = keyword_prefix.group(1).upper() for kw in self.kb.KEYWORDS: if kw.upper().startswith(prefix_str): items.append(CompletionItem( label=kw, kind=CompletionItemKind.Keyword, detail="SPARQL keyword", insertText=kw, )) return { "isIncomplete": False, "items": [item.to_dict() for item in items[:50]], # Limit to 50 items } # ========================================================================= # Hover # ========================================================================= def _handle_hover(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Handle textDocument/hover request.""" uri = params.get("textDocument", {}).get("uri") position = params.get("position", {}) doc = self.documents.get(uri) if uri else None text = doc.text if doc else "" line = position.get("line", 0) character = position.get("character", 0) lines = text.split("\n") current_line = lines[line] if line < len(lines) else "" # Find word at position word_match = None for match in re.finditer(r"[\w:]+", current_line): if match.start() <= character <= match.end(): word_match = match break if not word_match: return None word = word_match.group() # Check if it's a class if word in self.kb.CLASSES: info = self.kb.CLASSES[word] return Hover( contents=f"### {word}\n\n{info['description']}\n\n**Properties:** {', '.join(info['properties'])}\n\n```sparql\n{info['example']}\n```", range=Range( start=Position(line, word_match.start()), end=Position(line, word_match.end()), ), ).to_dict() # Check if it's a property if word in self.kb.PROPERTIES: info = self.kb.PROPERTIES[word] content = f"### {word}\n\n{info['description']}\n\n**Range:** `{info['range']}`" if "values" in info: content += "\n\n**Valid values:**\n" for code, name in info["values"].items(): content += f"- `\"{code}\"` = {name}\n" content += f"\n\n```sparql\n{info['example']}\n```" return Hover( contents=content, range=Range( start=Position(line, word_match.start()), end=Position(line, word_match.end()), ), ).to_dict() # Check if it's a prefix prefix = word.rstrip(":") if prefix in self.kb.PREFIXES: info = self.kb.PREFIXES[prefix] return Hover( contents=f"### PREFIX {prefix}:\n\n{info['description']}\n\n**URI:** `{info['uri']}`\n\n**Example:** `{info['example']}`", range=Range( start=Position(line, word_match.start()), end=Position(line, word_match.end()), ), ).to_dict() # Check for Wikidata entity if word.startswith("wd:Q") or word.startswith("Q"): qid = word.replace("wd:", "") if qid in self.kb.COUNTRIES: return Hover( contents=f"### {word}\n\n**Country:** {self.kb.COUNTRIES[qid]}\n\nWikidata entity for country filtering.", range=Range( start=Position(line, word_match.start()), end=Position(line, word_match.end()), ), ).to_dict() return None # ========================================================================= # Signature Help # ========================================================================= def _handle_signature_help(self, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Handle textDocument/signatureHelp request.""" uri = params.get("textDocument", {}).get("uri") position = params.get("position", {}) doc = self.documents.get(uri) if uri else None text = doc.text if doc else "" line = position.get("line", 0) character = position.get("character", 0) lines = text.split("\n") current_line = lines[line] if line < len(lines) else "" prefix_text = current_line[:character] # SPARQL aggregate functions functions: Dict[str, Dict[str, Any]] = { "COUNT": { "signature": "COUNT(expression) or COUNT(DISTINCT expression)", "documentation": "Returns the count of bindings. Use DISTINCT to count unique values.", "parameters": [{"label": "expression", "documentation": "Variable or expression to count"}], }, "SUM": { "signature": "SUM(expression)", "documentation": "Returns the sum of numeric values.", "parameters": [{"label": "expression", "documentation": "Numeric variable or expression"}], }, "AVG": { "signature": "AVG(expression)", "documentation": "Returns the average of numeric values.", "parameters": [{"label": "expression", "documentation": "Numeric variable or expression"}], }, "MIN": { "signature": "MIN(expression)", "documentation": "Returns the minimum value.", "parameters": [{"label": "expression", "documentation": "Variable or expression"}], }, "MAX": { "signature": "MAX(expression)", "documentation": "Returns the maximum value.", "parameters": [{"label": "expression", "documentation": "Variable or expression"}], }, "FILTER": { "signature": "FILTER(condition)", "documentation": "Filters results based on a boolean condition.", "parameters": [{"label": "condition", "documentation": "Boolean expression (e.g., CONTAINS(?name, 'Museum'))"}], }, "CONTAINS": { "signature": "CONTAINS(string, substring)", "documentation": "Returns true if string contains substring. Use with STR() for URIs.", "parameters": [ {"label": "string", "documentation": "String to search in"}, {"label": "substring", "documentation": "String to search for"}, ], }, "STR": { "signature": "STR(term)", "documentation": "Converts a term (URI, literal) to its string representation.", "parameters": [{"label": "term", "documentation": "URI or literal to convert"}], }, "BIND": { "signature": "BIND(expression AS ?variable)", "documentation": "Binds the result of an expression to a variable.", "parameters": [ {"label": "expression", "documentation": "Expression to evaluate"}, {"label": "variable", "documentation": "Variable to bind to"}, ], }, } # Find which function we're in for func_name, func_info in functions.items(): pattern = rf"\b{func_name}\s*\($" if re.search(pattern, prefix_text, re.IGNORECASE): return SignatureHelp( signatures=[SignatureInformation( label=func_info["signature"], documentation=func_info["documentation"], parameters=func_info["parameters"], )], ).to_dict() return None # ========================================================================= # SPARQL Execution # ========================================================================= def _handle_execute(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle sparql/execute request.""" import time import httpx query = params.get("query", "") try: start_time = time.time() response = httpx.post( self.sparql_endpoint, content=query, headers={ "Content-Type": "application/sparql-query", "Accept": "application/json", }, timeout=30.0, ) execution_time = (time.time() - start_time) * 1000 if response.status_code == 200: return SPARQLExecuteResult( success=True, results=response.json(), executionTimeMs=execution_time, ).to_dict() else: return SPARQLExecuteResult( success=False, error=f"HTTP {response.status_code}: {response.text}", executionTimeMs=execution_time, ).to_dict() except Exception as e: return SPARQLExecuteResult( success=False, error=str(e), ).to_dict() # ========================================================================= # SPARQL Explanation # ========================================================================= def _handle_explain(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle sparql/explain request.""" query = params.get("query", "") steps = [] complexity = "simple" optimizations = [] # Analyze query structure if re.search(r"\bSELECT\b", query, re.IGNORECASE): steps.append("SELECT query - retrieves specific variables") elif re.search(r"\bCONSTRUCT\b", query, re.IGNORECASE): steps.append("CONSTRUCT query - builds RDF graph") complexity = "moderate" elif re.search(r"\bASK\b", query, re.IGNORECASE): steps.append("ASK query - returns boolean result") # Check for patterns if "a hc:Custodian" in query or "rdf:type hc:Custodian" in query: steps.append("Filters to heritage custodian institutions") if "hcp:institutionType" in query: type_match = re.search(r'institutionType\s+"([A-Z])"', query) if type_match: type_code = type_match.group(1) type_name = self.kb.PROPERTIES["hcp:institutionType"]["values"].get(type_code, "Unknown") steps.append(f"Filters by institution type: {type_name} ({type_code})") if "schema:addressCountry" in query: country_match = re.search(r"addressCountry\s+wd:(Q\d+)", query) if country_match: qid = country_match.group(1) country_name = self.kb.COUNTRIES.get(qid, "Unknown") steps.append(f"Filters by country: {country_name}") if re.search(r"FILTER.*CONTAINS.*STR.*NL-[A-Z]{2}", query): province_match = re.search(r"NL-([A-Z]{2})", query) if province_match: code = province_match.group(1) province_name = self.kb.DUTCH_PROVINCES.get(code, "Unknown") steps.append(f"Filters by Dutch province: {province_name} ({code})") if re.search(r"\bGROUP BY\b", query, re.IGNORECASE): steps.append("Groups results for aggregation") complexity = "moderate" if re.search(r"\bCOUNT\b", query, re.IGNORECASE): steps.append("Counts matching results") if re.search(r"\bOPTIONAL\b", query, re.IGNORECASE): steps.append("Includes optional patterns (may return nulls)") complexity = "moderate" if re.search(r"\bUNION\b", query, re.IGNORECASE): steps.append("Combines multiple patterns with UNION") complexity = "complex" if re.search(r"\bSERVICE\b", query, re.IGNORECASE): steps.append("Federated query to external endpoint") complexity = "complex" optimizations.append("Federated queries can be slow - consider caching results") # Generate summary summary = f"This SPARQL query {'retrieves' if 'SELECT' in query.upper() else 'processes'} data from the Heritage Custodian knowledge graph." return SPARQLExplanation( summary=summary, steps=steps if steps else ["Basic query pattern"], estimatedComplexity=complexity, suggestedOptimizations=optimizations if optimizations else None, ).to_dict() # ========================================================================= # Novel Connection Suggestions (Vector DB Integration) # ========================================================================= def _handle_suggest(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle sparql/suggest request. Uses vector database to suggest novel connections that could enhance the SPARQL query. """ query = params.get("query", "") context = params.get("context", "") suggestions: List[SPARQLSuggestion] = [] # Extract entities from query entities: List[str] = [] if "skos:prefLabel" in query or "schema:name" in query: # Query is looking for names - suggest related entities suggestions.append(SPARQLSuggestion( type="relationship", description="Consider adding organizational hierarchy", sparqlFragment="OPTIONAL { ?s schema:containedInPlace ?parent . ?parent skos:prefLabel ?parentName . }", confidence=0.7, source="inference", )) if "hcp:institutionType" in query: # Query filters by type - suggest cross-type relationships suggestions.append(SPARQLSuggestion( type="pattern", description="Find related institutions in same location", sparqlFragment=""" OPTIONAL { ?related a hc:Custodian ; schema:containedInPlace ?location . ?s schema:containedInPlace ?location . FILTER(?related != ?s) }""", confidence=0.6, source="inference", )) # If we have Qdrant configured, query for semantic suggestions if self.qdrant_host: # This would integrate with the actual Qdrant retriever suggestions.append(SPARQLSuggestion( type="entity", description="Semantically similar institutions found in vector index", sparqlFragment="# Use vector similarity to find: [entities from Qdrant]", confidence=0.8, source="qdrant", )) # If we have TypeDB configured, suggest inference patterns if self.typedb_host: suggestions.append(SPARQLSuggestion( type="relationship", description="TypeDB can infer: observation → reconstruction chains", sparqlFragment="# Consider TypeQL: get-reconstructions-by-observation-name($name)", confidence=0.9, source="typedb", )) return { "suggestions": [s.to_dict() for s in suggestions], } # ============================================================================= # Convenience Functions for AI Agent Integration # ============================================================================= def create_lsp_request(method: str, params: Dict[str, Any], request_id: Optional[int] = None) -> Dict[str, Any]: """ Create a JSON-RPC request for the SPARQL-LSP server. Args: method: LSP method name (e.g., "sparql/validate") params: Method parameters request_id: Optional request ID (None for notifications) Returns: JSON-RPC request dictionary """ request: Dict[str, Any] = { "jsonrpc": "2.0", "method": method, "params": params, } if request_id is not None: request["id"] = request_id return request def validate_sparql_query(query: str, server: Optional[SPARQLLanguageServer] = None) -> Dict[str, Any]: """ Convenience function to validate a SPARQL query. Args: query: SPARQL query string server: Optional LSP server instance (creates one if not provided) Returns: Validation result with diagnostics """ if server is None: server = SPARQLLanguageServer() server.initialized = True request = create_lsp_request("sparql/validate", {"text": query}, request_id=1) response = server.handle_message(request) if response is None: return {} result: Dict[str, Any] = response.get("result", {}) return result def get_sparql_completions(query: str, line: int, character: int, server: Optional[SPARQLLanguageServer] = None) -> List[Dict[str, Any]]: """ Convenience function to get completions for a SPARQL query. Args: query: SPARQL query string line: Line number (0-indexed) character: Character position (0-indexed) server: Optional LSP server instance Returns: List of completion items """ if server is None: server = SPARQLLanguageServer() server.initialized = True # Open document doc_uri = "inmemory://query.sparql" server.handle_message(create_lsp_request("textDocument/didOpen", { "textDocument": { "uri": doc_uri, "languageId": "sparql", "version": 1, "text": query, } })) # Get completions request = create_lsp_request("textDocument/completion", { "textDocument": {"uri": doc_uri}, "position": {"line": line, "character": character}, }, request_id=1) response = server.handle_message(request) if response is None: return [] result_dict: Dict[str, Any] = response.get("result", {}) items: List[Dict[str, Any]] = result_dict.get("items", []) return items # ============================================================================= # Example Usage # ============================================================================= if __name__ == "__main__": # Create server server = SPARQLLanguageServer() # Initialize init_response = server.handle_message({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {}, }) print("Initialize:", json.dumps(init_response, indent=2)) # Validate a query with issues bad_query = """ PREFIX hc: SELECT ?museum ?name WHERE { ?museum a crm:E39_Actor ; hc:institutionType "Museum" ; schema:addressCountry "NL" ; skos:prefLabel ?name . } """ validate_response = server.handle_message({ "jsonrpc": "2.0", "id": 2, "method": "sparql/validate", "params": {"text": bad_query}, }) print("\nValidation:", json.dumps(validate_response, indent=2)) # Get completions server.handle_message({ "jsonrpc": "2.0", "method": "textDocument/didOpen", "params": { "textDocument": { "uri": "test://query.sparql", "languageId": "sparql", "version": 1, "text": "SELECT ?s WHERE { ?s hcp:", } } }) completion_response = server.handle_message({ "jsonrpc": "2.0", "id": 3, "method": "textDocument/completion", "params": { "textDocument": {"uri": "test://query.sparql"}, "position": {"line": 0, "character": 25}, }, }) print("\nCompletions:", json.dumps(completion_response, indent=2)) # Explain a query good_query = """ PREFIX hc: PREFIX hcp: SELECT (COUNT(?s) as ?count) WHERE { ?s a hc:Custodian ; hcp:institutionType "M" ; schema:addressCountry wd:Q55 . FILTER(CONTAINS(STR(?s), "NL-NH")) } """ explain_response = server.handle_message({ "jsonrpc": "2.0", "id": 4, "method": "sparql/explain", "params": {"query": good_query}, }) print("\nExplanation:", json.dumps(explain_response, indent=2))