diff --git a/backend/rag/dspy_heritage_rag.py b/backend/rag/dspy_heritage_rag.py index bf8b983d92..07b55295c8 100644 --- a/backend/rag/dspy_heritage_rag.py +++ b/backend/rag/dspy_heritage_rag.py @@ -21,6 +21,7 @@ import asyncio import json import logging import random +import re from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum @@ -166,6 +167,7 @@ get_cacheable_sparql_docstring: Optional[Callable[[], str]] = None get_cacheable_entity_docstring: Optional[Callable[[], str]] = None get_cacheable_query_intent_docstring: Optional[Callable[[], str]] = None get_cacheable_answer_docstring: Optional[Callable[[], str]] = None +get_cacheable_person_sparql_docstring: Optional[Callable[[], str]] = None try: from .schema_loader import ( @@ -184,6 +186,7 @@ try: get_cacheable_entity_docstring as _get_cacheable_entity_docstring, get_cacheable_query_intent_docstring as _get_cacheable_query_intent_docstring, get_cacheable_answer_docstring as _get_cacheable_answer_docstring, + get_cacheable_person_sparql_docstring as _get_cacheable_person_sparql_docstring, ) get_heritage_schema = _get_heritage_schema get_sparql_prefixes = _get_sparql_prefixes @@ -200,6 +203,7 @@ try: get_cacheable_entity_docstring = _get_cacheable_entity_docstring get_cacheable_query_intent_docstring = _get_cacheable_query_intent_docstring get_cacheable_answer_docstring = _get_cacheable_answer_docstring + get_cacheable_person_sparql_docstring = _get_cacheable_person_sparql_docstring SCHEMA_LOADER_AVAILABLE = True except ImportError: logger.info("Schema loader not available - using static signatures") @@ -473,6 +477,123 @@ class HeritageSPARQLGenerator(dspy.Signature): explanation: str = dspy.OutputField(desc="What the query retrieves") +class HeritagePersonSPARQLGenerator(dspy.Signature): + """Generate SPARQL queries for heritage person/staff queries. + + You are an expert in SPARQL and the Heritage Person data model. + Generate valid SPARQL queries for finding people in heritage institutions. + + REQUIRED PREFIXES: + PREFIX schema: + PREFIX foaf: + PREFIX hc: + + MAIN CLASS: + - schema:Person - Person records + + KEY PROPERTIES: + - schema:name - Person's full name (REQUIRED in SELECT) + - schema:jobTitle - Job title with embedded organization (e.g., "Manager bij Nationaal Archief") + - foaf:name - Alternative name field + - hc:custodianName - Associated institution name (may be empty) + + CRITICAL PATTERN: + Organization names are often embedded IN the jobTitle, not in a separate field. + Use FILTER(CONTAINS(LCASE(?jobTitle), "organization name")) to find people at specific organizations. + + ROLE TERMS (use in FILTER patterns with OR combinations): + + Leadership (English): director, executive director, CEO, deputy director, assistant director, + head, chief, manager, team lead, coordinator, supervisor + Leadership (Dutch): directeur, adjunct-directeur, hoofd, manager, teamleider, teammanager, + coördinator, leidinggevende, afdelingshoofd + + Governance (English): chair, chairman, chairperson, president, vice president, secretary, + treasurer, board member, trustee + Governance (Dutch): voorzitter, vice-voorzitter, secretaris, penningmeester, bestuurslid, + bestuursvoorzitter + + Curatorial (English): curator, senior curator, chief curator, collections manager, + registrar, conservator + Curatorial (Dutch): conservator, collectiebeheerder, registrar + + Archival (English): archivist, senior archivist, digital archivist, records manager, + archival manager, processing archivist + Archival (Dutch): archivaris, archiefmedewerker, informatiespecialist + + Library (English): librarian, chief librarian, reference librarian, cataloger + Library (Dutch): bibliothecaris, catalogiseur + + Research (English): researcher, historian, genealogist, research fellow + Research (Dutch): onderzoeker, historicus, genealoog + + Digital (English): digital preservation specialist, digitization specialist, data manager, + metadata specialist, developer, IT specialist + Digital (Dutch): digitaliseringsmedewerker, datamanager, ICT-medewerker + + Education (English): educator, education officer, tour guide, docent + Education (Dutch): educatiemedewerker, gids, rondleider + + ALWAYS EXCLUDE anonymous profiles: + FILTER(!CONTAINS(LCASE(?name), "linkedin member")) + + EXAMPLE QUERY - Find managers at Nationaal Archief: + ```sparql + PREFIX schema: + SELECT DISTINCT ?name ?jobTitle WHERE { + ?person a schema:Person ; + schema:name ?name ; + schema:jobTitle ?jobTitle . + FILTER(CONTAINS(LCASE(?jobTitle), "nationaal archief")) + FILTER(CONTAINS(LCASE(?jobTitle), "manager") || + CONTAINS(LCASE(?jobTitle), "hoofd") || + CONTAINS(LCASE(?jobTitle), "directeur") || + CONTAINS(LCASE(?jobTitle), "teamleider")) + FILTER(!CONTAINS(LCASE(?name), "linkedin member")) + } + ORDER BY ?name + LIMIT 50 + ``` + + EXAMPLE QUERY - Find all archivists: + ```sparql + PREFIX schema: + SELECT DISTINCT ?name ?jobTitle WHERE { + ?person a schema:Person ; + schema:name ?name ; + schema:jobTitle ?jobTitle . + FILTER(CONTAINS(LCASE(?jobTitle), "archiv") || + CONTAINS(LCASE(?jobTitle), "archivist")) + FILTER(!CONTAINS(LCASE(?name), "linkedin member")) + } + ORDER BY ?name + LIMIT 100 + ``` + + EXAMPLE QUERY - Find curators at a specific museum: + ```sparql + PREFIX schema: + SELECT DISTINCT ?name ?jobTitle WHERE { + ?person a schema:Person ; + schema:name ?name ; + schema:jobTitle ?jobTitle . + FILTER(CONTAINS(LCASE(?jobTitle), "rijksmuseum")) + FILTER(CONTAINS(LCASE(?jobTitle), "curator") || + CONTAINS(LCASE(?jobTitle), "conservator")) + FILTER(!CONTAINS(LCASE(?name), "linkedin member")) + } + ORDER BY ?name + ``` + """ + + question: str = dspy.InputField(desc="Natural language question about people in heritage sector") + intent: str = dspy.InputField(desc="Query intent from classifier") + entities: list[str] = dspy.InputField(desc="Extracted entities (names, organizations, roles)", default=[]) + context: str = dspy.InputField(desc="Previous conversation context", default="") + + sparql: str = dspy.OutputField(desc="Valid SPARQL query for person data using schema:Person") + explanation: str = dspy.OutputField(desc="What the query retrieves and which roles/organizations are targeted") + class HeritageSQLGenerator(dspy.Signature): """Generate SQL queries for DuckLake heritage analytics database. @@ -809,6 +930,8 @@ CRITICAL LANGUAGE RULE: _schema_aware_sparql_signature = None _schema_aware_entity_signature = None _schema_aware_answer_signature = None +_schema_aware_person_sparql_signature = None +_schema_aware_person_sparql_signature = None def get_schema_aware_sparql_signature() -> type[dspy.Signature]: @@ -835,6 +958,48 @@ def get_schema_aware_answer_signature() -> type[dspy.Signature]: return _schema_aware_answer_signature +def _create_schema_aware_person_sparql_signature() -> type[dspy.Signature]: + """Factory to create Person SPARQL signature with schema-derived docstring. + + Uses LinkML schema to inject correct prefixes and properties from + PersonObservation slot_usage into the signature docstring. + + OpenAI Prompt Caching: Uses get_cacheable_person_sparql_docstring() which + prepends the full ontology context (1,200+ tokens) to ensure cache eligibility. + """ + if not SCHEMA_LOADER_AVAILABLE or get_cacheable_person_sparql_docstring is None: + logger.warning("Schema loader unavailable, using static person SPARQL signature") + return HeritagePersonSPARQLGenerator + + try: + # Use cacheable docstring (1,500+ tokens) for OpenAI prompt caching + docstring = get_cacheable_person_sparql_docstring() + + class SchemaAwarePersonSPARQLGenerator(dspy.Signature): + __doc__ = docstring + + question: str = dspy.InputField(desc="Natural language question about people in heritage sector") + intent: str = dspy.InputField(desc="Query intent from classifier") + entities: list[str] = dspy.InputField(desc="Extracted entities (names, organizations, roles)", default=[]) + context: str = dspy.InputField(desc="Previous conversation context", default="") + + sparql: str = dspy.OutputField(desc="Valid SPARQL query for person data using schema:Person") + explanation: str = dspy.OutputField(desc="What the query retrieves and which roles/organizations are targeted") + + return SchemaAwarePersonSPARQLGenerator + except Exception as e: + logger.warning(f"Failed to create schema-aware person SPARQL signature: {e}") + return HeritagePersonSPARQLGenerator + + +def get_schema_aware_person_sparql_signature() -> type[dspy.Signature]: + """Get cached schema-aware person SPARQL signature.""" + global _schema_aware_person_sparql_signature + if _schema_aware_person_sparql_signature is None: + _schema_aware_person_sparql_signature = _create_schema_aware_person_sparql_signature() + return _schema_aware_person_sparql_signature + + def _create_schema_aware_query_intent_signature() -> type[dspy.Signature]: """Factory to create HeritageQueryIntent with schema-derived valid values. @@ -2923,12 +3088,21 @@ class HeritageRAGPipeline(dspy.Module): self.entity_extractor = dspy.Predict(HeritageEntityExtractor) # SPARQL generation - use schema-aware signature if available + # Institution SPARQL generator (crm:E39_Actor, hc:institutionType) if use_schema_aware and SCHEMA_LOADER_AVAILABLE: self.sparql_gen = dspy.ChainOfThought(get_schema_aware_sparql_signature()) logger.info("Using schema-aware SPARQL generator with LinkML-derived prefixes") else: self.sparql_gen = dspy.ChainOfThought(HeritageSPARQLGenerator) + # Person SPARQL generator - use schema-aware signature if available + if use_schema_aware and SCHEMA_LOADER_AVAILABLE: + self.person_sparql_gen = dspy.ChainOfThought(get_schema_aware_person_sparql_signature()) + logger.info("Using schema-aware Person SPARQL generator with LinkML-derived properties") + else: + self.person_sparql_gen = dspy.ChainOfThought(HeritagePersonSPARQLGenerator) + logger.info("Person SPARQL generator initialized with static predicates") + # Multi-hop retrieval (uses its own signatures internally) self.multi_hop = MultiHopHeritageRetriever(max_hops=max_hops) @@ -3089,20 +3263,32 @@ class HeritageRAGPipeline(dspy.Module): # Step 3: Generate SPARQL if needed (use resolved question) # Use fast_lm for SPARQL generation if available (performance optimization) + # Select the appropriate SPARQL generator based on entity_type: + # - person queries use HeritagePersonSPARQLGenerator (schema:Person predicates) + # - institution queries use HeritageSPARQLGenerator (crm:E39_Actor predicates) sparql = None if "sparql" in routing.sources: + # Select SPARQL generator based on entity_type from router + entity_type = getattr(routing, 'entity_type', 'institution') + if entity_type == "person": + sparql_generator = self.person_sparql_gen + logger.debug(f"Using person SPARQL generator for entity_type='{entity_type}'") + else: + sparql_generator = self.sparql_gen + logger.debug(f"Using institution SPARQL generator for entity_type='{entity_type}'") + if tracker: with tracker.track_llm_call("gpt-4o-mini") as llm_usage: if self.fast_lm: with dspy.settings.context(lm=self.fast_lm): - sparql_result = self.sparql_gen( + sparql_result = sparql_generator( question=resolved_question, intent=routing.intent, entities=routing.entities, context="", ) else: - sparql_result = self.sparql_gen( + sparql_result = sparql_generator( question=resolved_question, intent=routing.intent, entities=routing.entities, @@ -3111,14 +3297,14 @@ class HeritageRAGPipeline(dspy.Module): else: if self.fast_lm: with dspy.settings.context(lm=self.fast_lm): - sparql_result = self.sparql_gen( + sparql_result = sparql_generator( question=resolved_question, intent=routing.intent, entities=routing.entities, context="", ) else: - sparql_result = self.sparql_gen( + sparql_result = sparql_generator( question=resolved_question, intent=routing.intent, entities=routing.entities, @@ -3697,6 +3883,14 @@ class HeritageRAGPipeline(dspy.Module): retry_count = 0 max_stream_retries = 2 + # DSPy field marker pattern for filtering streaming output + # DSPy emits markers like [[ ## answer ## ]], [[ ## reasoning ## ]], etc. + DSPY_FIELD_MARKER = re.compile(r'\[\[\s*##\s*(\w+)\s*##\s*\]\]') + + # State machine for DSPy streaming: only stream 'answer' field tokens + current_field = None # Track which DSPy output field we're in + STREAMABLE_FIELDS = {'answer'} # Only stream these fields to frontend + while not streaming_succeeded and retry_count <= max_stream_retries: try: # Create streamified version of the answer generator @@ -3721,8 +3915,21 @@ class HeritageRAGPipeline(dspy.Module): follow_up = getattr(value, 'follow_up', []) streaming_succeeded = True elif isinstance(value, str): - # Streaming token - yield {"type": "token", "content": value} + # Filter DSPy field markers and only stream answer field + # Check if this token contains a field marker + marker_match = DSPY_FIELD_MARKER.search(value) + if marker_match: + # Update current field state + current_field = marker_match.group(1).lower() + # Remove the marker from the token + cleaned_token = DSPY_FIELD_MARKER.sub('', value).strip() + # Only yield if we're in a streamable field and have content + if current_field in STREAMABLE_FIELDS and cleaned_token: + yield {"type": "token", "content": cleaned_token} + elif current_field in STREAMABLE_FIELDS: + # We're in the answer field, stream this token + yield {"type": "token", "content": value} + # Tokens from other fields (reasoning, citations, etc.) are silently consumed else: # Handle ModelResponseStream from litellm/DSPy # Token text is in choices[0].delta.content or .reasoning_content @@ -3740,9 +3947,16 @@ class HeritageRAGPipeline(dspy.Module): yield {"type": "status", "message": value.message} continue - # Yield extracted token if we got text + # Apply same field filtering to extracted token text if token_text: - yield {"type": "token", "content": token_text} + marker_match = DSPY_FIELD_MARKER.search(token_text) + if marker_match: + current_field = marker_match.group(1).lower() + cleaned_token = DSPY_FIELD_MARKER.sub('', token_text).strip() + if current_field in STREAMABLE_FIELDS and cleaned_token: + yield {"type": "token", "content": cleaned_token} + elif current_field in STREAMABLE_FIELDS: + yield {"type": "token", "content": token_text} # If we get here, streaming completed streaming_succeeded = True