# GraphRAG Design Patterns for Heritage Knowledge Graph + Vector Search **Created**: 2025-01-06 **Purpose**: Analysis of external GraphRAG patterns applicable to our TypeDB-Oxigraph-DSPy architecture **Status**: Research and Planning --- ## Table of Contents 1. [Executive Summary](#executive-summary) 2. [Current GLAM Architecture Analysis](#current-glam-architecture-analysis) 3. [External Pattern Analysis](#external-pattern-analysis) 4. [Recommended Design Patterns](#recommended-design-patterns) 5. [Implementation Roadmap](#implementation-roadmap) 6. [Anti-Patterns to Avoid](#anti-patterns-to-avoid) 7. [Conclusion](#conclusion) 8. [Temporal Knowledge Graph Patterns](#temporal-knowledge-graph-patterns) 9. [Semantic Routing Patterns](#semantic-routing-patterns) 10. [Hypergraph Patterns Deep Dive](#hypergraph-patterns-deep-dive) 11. [Rules on Graphs Pattern](#11-rules-on-graphs-pattern-shacl--datalog-inference) 12. [References](#updated-references) --- ## Executive Summary This document analyzes design patterns from leading GraphRAG research and libraries (Microsoft GraphRAG, ROGRAG, Zep, HyperGraphRAG, LightRAG, etc.) and identifies **patterns applicable to our existing TypeDB-Oxigraph-DSPy stack** without adding new frameworks. ### Key Findings | Pattern Category | Applicable to GLAM | Implementation Complexity | Priority | |------------------|-------------------|--------------------------|----------| | Community Hierarchies | Yes | Medium | High | | Temporal Knowledge Graphs | Yes (already have) | Low | High | | Dual-Level Retrieval | Yes | Low | High | | Hypergraph Memory | Partial | High | Medium | | Multi-Stage Verification | Yes | Medium | High | | Iterative Search Optimization | Yes | Low | High | ### Core Principle > **Avoid adding new frameworks.** Focus on extracting design patterns as implementation strategies within our existing stack: **TypeDB** (semantic graph), **Oxigraph** (RDF/SPARQL), **Qdrant** (vector search), and **DSPy** (LLM orchestration). --- ## Current GLAM Architecture Analysis ### Existing Components ``` ┌─────────────────────────────────────────────────────────────────────┐ │ GLAM RAG Architecture │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Qdrant │ │ Oxigraph │ │ TypeDB │ │ │ │ Vector Store │ │ SPARQL/RDF │ │ Schema Store │ │ │ │ │ │ │ │ │ │ │ │ - Embeddings │ │ - Triples │ │ - LinkML │ │ │ │ - Semantic │ │ - SPARQL │ │ - Ontology │ │ │ │ Search │ │ Queries │ │ - Validation │ │ │ └──────┬───────┘ └──────┬───────┘ └──────────────┘ │ │ │ │ │ │ └───────────────────┴──────────────────────────────────┐ │ │ │ │ │ ┌────────────────────────────────────────────────────────────┐│ │ │ │ DSPy Heritage RAG ││ │ │ │ ││ │ │ │ ┌───────────────┐ ┌───────────────┐ ┌────────────────┐ ││ │ │ │ │ Template │ │ Entity │ │ SPARQL │ ││ │ │ │ │ SPARQL │ │ Extraction │ │ Generator │ ││ │ │ │ │ Classifier │ │ (DSPy Sig) │ │ (DSPy Sig) │ ││ │ │ │ └───────────────┘ └───────────────┘ └────────────────┘ ││ │ │ │ ││ │ │ │ ┌───────────────┐ ┌───────────────┐ ┌────────────────┐ ││ │ │ │ │ Semantic │ │ Cost │ │ GEPA │ ││ │ │ │ │ Cache │ │ Tracker │ │ Optimizer │ ││ │ │ │ └───────────────┘ └───────────────┘ └────────────────┘ ││ │ │ └────────────────────────────────────────────────────────────┘│ │ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ``` ### Current Retrieval Flow 1. **Query Intent Classification** (DSPy Signature) 2. **Entity Extraction** (Heritage-specific NER) 3. **Template Matching** (SPARQL template selection) 4. **Dual Retrieval**: - SPARQL queries to Oxigraph (structured) - Vector search in Qdrant (semantic) 5. **Result Fusion** (merge and deduplicate) 6. **Answer Generation** (DSPy with context) ### Strengths of Current System - **Template-based SPARQL**: 65% precision vs 10% LLM-only (Formica et al., 2023) - **Semantic caching**: Reduces redundant LLM calls - **Temporal awareness**: GHCID history tracking with `valid_from`/`valid_to` - **Ontology grounding**: LinkML schema provides type safety - **Multi-hop capable**: SPARQL traverses relationships ### Current Gaps (Opportunities) | Gap | External Pattern Solution | |-----|--------------------------| | No community/cluster summaries | Microsoft GraphRAG communities | | Limited iterative refinement | ROGRAG dual-level + logic form | | No explicit verification step | ROGRAG argument checking | | Flat retrieval (no hierarchy) | GraphRAG local/global search | | Missing hyperedge relations | HyperGraphRAG n-ary facts | --- ## External Pattern Analysis ### 1. Microsoft GraphRAG (arxiv:2404.16130) **Core Innovation**: Hierarchical community summarization using Leiden clustering. **Key Components**: - **Entity Extraction**: LLM extracts entities/relationships from text chunks - **Community Detection**: Leiden algorithm clusters related entities - **Hierarchical Summaries**: Bottom-up summaries for each community level - **Query Modes**: - **Global Search**: Uses community summaries for holistic questions - **Local Search**: Fan-out from specific entities - **DRIFT Search**: Local + community context **Applicable Pattern for GLAM**: ```python # Pattern: Community-Based Retrieval for Holistic Questions # E.g., "What are the main themes across Dutch archives?" class CommunityRetriever: """ Pre-compute community clusters from Oxigraph triples. Store community summaries in Qdrant as additional vectors. """ def detect_communities(self) -> dict[str, list[str]]: """Use Leiden/Louvain on institution-location-type graph.""" # SPARQL: Get all institution relationships # Apply community detection algorithm # Return community_id -> [ghcid_list] pass def generate_community_summary(self, community_ghcids: list[str]) -> str: """LLM summarizes institutions in a community.""" # Retrieve metadata for all institutions # Generate summary with DSPy signature pass def global_search(self, query: str) -> list[str]: """Search community summaries for holistic questions.""" # Vector search community summaries # Aggregate partial answers pass ``` **Implementation in Our Stack**: - Use **Oxigraph SPARQL** to extract graph for clustering - Run **Leiden algorithm** (Python `leidenalg` library) - Store **community summaries as Qdrant vectors** - Add **global_search mode** to DSPy RAG --- ### 2. ROGRAG (arxiv:2503.06474) **Core Innovation**: Multi-stage retrieval with dual-level + logic form methods. **Key Components**: - **Dual-Level Retrieval**: - Low-level: Entity keywords (fuzzy matching) - High-level: Relational descriptions (semantic matching) - **Logic Form Retrieval**: Operator-based query decomposition - **Retrieval Verifier**: Argument checking before generation **Applicable Pattern for GLAM**: ```python # Pattern: Dual-Level Retrieval with Verification class DualLevelRetriever: """ Combine entity-level and relation-level matching. """ def extract_dual_level(self, query: str) -> tuple[list[str], list[str]]: """ Extract low-level (entities) and high-level (relations) from query. E.g., "Which archives in Haarlem have digitized collections?" Low-level: ["Haarlem", "archief"] High-level: ["digitized collections", "heritage institution"] """ # DSPy signature for dual extraction pass def match_low_level(self, entities: list[str]) -> set[str]: """Fuzzy match entities against Oxigraph nodes.""" # SPARQL with FILTER(CONTAINS(...)) # Return matching GHCIDs pass def match_high_level(self, relations: list[str]) -> set[str]: """Semantic match relations against edge descriptions.""" # Vector search in Qdrant # Return matching GHCIDs pass def merge_results(self, low: set[str], high: set[str]) -> list[str]: """Merge and deduplicate, prioritize intersection.""" intersection = low & high return list(intersection) + list(low - high) + list(high - low) class RetrievalVerifier: """ Verify retrieved context answers the question before generation. """ def verify_argument(self, query: str, context: str) -> bool: """ Check if context is sufficient to answer query. Reject if confidence < threshold. """ # DSPy signature for verification # Return True if sufficient, False to retry pass ``` **Key Insight from ROGRAG**: > "Although the dual-level method achieves higher precision, logic form method provides higher information density and is more concise and clear." **Implementation in Our Stack**: - Add **dual-level extraction** as DSPy Signature - Extend **template_sparql.py** with fuzzy matching - Add **RetrievalVerifier** between retrieval and generation - Implement **fallback cascade**: Template → Dual-Level → Logic Form → Vector-only --- ### 3. Zep Temporal Knowledge Graph (arxiv:2501.13956) **Core Innovation**: Bitemporal modeling with episodic, semantic, and community subgraphs. **Key Components**: - **Episode Subgraph**: Raw events with original timestamps - **Semantic Entity Subgraph**: Extracted entities with embeddings - **Community Subgraph**: Clustered entities with summaries - **Bitemporal Modeling**: - **Event Time (T)**: When fact occurred - **Ingestion Time (T')**: When added to graph - **Edge Invalidation**: Update/supersede facts over time **We Already Have This!** GLAM's `ghcid_history` with `valid_from`/`valid_to` implements temporal tracking. **Enhancement Pattern**: ```python # Pattern: Enhanced Temporal Reasoning class TemporalReasoningEnhancer: """ Extend existing temporal tracking with Zep-style capabilities. """ def query_at_point_in_time( self, ghcid: str, query_date: datetime ) -> dict: """ Return institution state at specific point in time. Uses GHCID history to find valid record. """ # SPARQL with temporal filter: # FILTER(?valid_from <= ?query_date && # (?valid_to IS NULL || ?valid_to > ?query_date)) pass def track_provenance_chain(self, ghcid: str) -> list[dict]: """ Full audit trail: what changed, when, why. Critical for heritage institutions with mergers/splits. """ # Query ghcid_history entries # Include change_event references pass def invalidate_superseded_facts( self, ghcid: str, new_fact: dict, reason: str ) -> None: """ When new fact supersedes old, mark old as invalid. Preserve provenance for auditability. """ # Set valid_to on old fact # Create new fact with valid_from # Link via change_event pass ``` **Implementation in Our Stack**: - Already have `GHCIDHistoryEntry` in LinkML schema - Enhance **SPARQL templates** with temporal filters - Add **point-in-time query mode** to DSPy RAG - Leverage **ChangeEvent** for provenance chain --- ### 4. HyperGraphRAG (arxiv:2503.21322) **Core Innovation**: N-ary relations via hyperedges (connecting 3+ entities). **Key Components**: - **Hyperedge Construction**: Facts connecting multiple entities - **Hyperedge Retrieval**: Match queries to multi-entity facts - **Hyperedge Generation**: LLM reasons over hyperedge context **Example N-ary Fact**: > "The Amsterdam Museum acquired the Rembrandt collection from the Rijksmuseum in 2020 as part of the Shared Heritage initiative." Traditional triple: Can only capture binary relations. Hyperedge: Connects Museum, Collection, Year, Initiative, Source in single fact. **Applicable Pattern for GLAM**: ```python # Pattern: N-ary Relation Modeling class HyperedgeManager: """ Model complex heritage events as hyperedges. Store in Oxigraph using reification or named graphs. """ def create_custody_transfer_hyperedge( self, source_ghcid: str, target_ghcid: str, collection_uri: str, event_date: str, initiative_name: str ) -> str: """ Create hyperedge for custody transfer event. Uses RDF reification pattern: _:transfer a hc:CustodyTransfer ; hc:source ; hc:target ; hc:collection ; schema:date "2020-01-01" ; hc:initiative "Shared Heritage" . """ pass def retrieve_by_partial_match( self, known_entities: list[str] ) -> list[dict]: """ Find hyperedges matching subset of entities. E.g., query mentions "Rijksmuseum" and "2020" → find all transfers involving Rijksmuseum in 2020. """ # SPARQL with OPTIONAL patterns pass ``` **CIDOC-CRM Alignment**: We already use CIDOC-CRM which supports n-ary relations via events: - `crm:E10_Transfer_of_Custody` - connects parties, object, time - `crm:E5_Event` - generic multi-participant events **Implementation in Our Stack**: - Use **CIDOC-CRM event classes** for n-ary facts - Extend **entity extraction** to recognize event patterns - Add **event-aware SPARQL templates** - Index **event descriptions** in Qdrant for semantic matching --- ### 5. Cost-Efficient GraphRAG (TDS Article) **Core Insight**: "You don't need a perfect graph." **Key Patterns**: 1. **Star Graph Sufficiency**: - Minimal graph: Central node (report/institution) → entities - Relations inferred via iterative search, not explicit edges 2. **Iterative Search Space Optimization**: - Graph narrows documents → Vector refines chunks - Context enrichment fixes weak embeddings (IDs, dates) 3. **Graph as Classifier, Not Answer**: - Node metadata (doc_id) filters search space - Actual answers from vector chunks **Applicable Pattern for GLAM**: ```python # Pattern: Graph-Guided Vector Retrieval class GraphGuidedRetriever: """ Use KG to narrow search space, then vector for final retrieval. Fixes weak embeddings for identifiers like GHCID, ISIL codes. """ def retrieve_with_graph_filter( self, query: str, use_graph_context: bool = True ) -> list[dict]: """ 1. Extract entities from query 2. Graph lookup: Find related GHCIDs 3. Vector search: Filter by GHCID set 4. Context enrichment: Add graph metadata """ # Step 1: Entity extraction entities = self.extract_entities(query) # Step 2: Graph lookup (SPARQL) ghcid_set = self.graph_lookup(entities) # Step 3: Vector search with filter if ghcid_set: vector_results = self.qdrant_search( query, filter={"ghcid": {"$in": list(ghcid_set)}} ) else: vector_results = self.qdrant_search(query) # Step 4: Context enrichment enriched = self.enrich_with_graph_context( vector_results, ghcid_set ) return enriched def enrich_with_graph_context( self, results: list[dict], ghcid_set: set[str] ) -> list[dict]: """ Add graph metadata to vector results. Helps LLM understand relations between results. """ for result in results: ghcid = result.get("ghcid") if ghcid: # Fetch neighbors from Oxigraph neighbors = self.get_graph_neighbors(ghcid) result["graph_context"] = neighbors return results ``` **This is close to our current approach!** We already do: - Entity extraction → SPARQL → Vector fallback - GHCID-based filtering **Enhancement**: Add explicit **graph context enrichment** step. --- ### 6. HGMEM: Hypergraph-Based Memory (arxiv:2512.23959) **Core Innovation**: Working memory as evolving hypergraph for multi-step RAG. **Key Components**: - **Hyperedges as Memory Units**: Each memory unit connects multiple facts - **Memory Operations**: Update, Insert, Merge - **Adaptive Retrieval**: Local investigation vs. global exploration **Applicable Pattern for GLAM**: ```python # Pattern: Session-Based Working Memory class HypergraphSessionMemory: """ Maintain session-level working memory for multi-turn conversations. Memory evolves through retrieval steps. """ def __init__(self, session_id: str): self.session_id = session_id self.memory_hyperedges: list[dict] = [] # Each connects facts self.explored_ghcids: set[str] = set() self.unexplored_aspects: list[str] = [] def add_memory_unit( self, facts: list[dict], source_query: str ) -> None: """ Create hyperedge connecting related facts from single retrieval. """ hyperedge = { "id": generate_id(), "facts": facts, "source_query": source_query, "timestamp": datetime.now(), "ghcids": [f.get("ghcid") for f in facts if f.get("ghcid")] } self.memory_hyperedges.append(hyperedge) self.explored_ghcids.update(hyperedge["ghcids"]) def merge_related_memories(self) -> None: """ Merge hyperedges with overlapping GHCIDs. Creates higher-order connections. """ # Cluster by GHCID overlap # Merge overlapping hyperedges pass def suggest_exploration(self) -> list[str]: """ Identify unexplored aspects based on partial patterns. E.g., "You asked about archives in Haarlem. Related: Noord-Holland province has 12 more archives." """ # Analyze memory for patterns # Suggest related but unexplored queries pass ``` **Implementation in Our Stack**: - Extend **session_manager.py** with hypergraph memory - Store session memories in **Qdrant** (vector) + **Oxigraph** (structure) - Add **exploration suggestions** to response --- ### 7. 12 Advanced RAG Types Summary (Turing Post) Quick reference for additional patterns: | RAG Type | Key Idea | GLAM Applicability | |----------|----------|-------------------| | MiA-RAG | High-level summary guides retrieval | Medium (for long docs) | | QuCo-RAG | Statistical entity flagging | Low (heritage data is clean) | | HiFi-RAG | Multi-stage filtering | High (already doing) | | Bidirectional RAG | Write-back to corpus | Medium (for enrichment) | | TV-RAG | Temporal video alignment | Low (not video-focused) | | MegaRAG | Multimodal knowledge graphs | Medium (future: photos) | | Graph-O1 | MCTS graph exploration | Medium (complex reasoning) | | Hybrid RAG | Multilingual with RRF | High (Dutch/English) | --- ## Recommended Design Patterns ### Priority 1: Immediate Implementation #### Pattern A: Retrieval Verification Layer ```python # Add between retrieval and generation in dspy_heritage_rag.py class ArgumentVerifier(dspy.Signature): """ Verify if retrieved context can answer the query. Prevents hallucination from insufficient context. """ __doc__ = """ You are a verification assistant. Given a user query and retrieved context, determine if the context contains sufficient information to answer the query. Be strict: If key entities or facts are missing, return can_answer=False. """ query: str = dspy.InputField(desc="User's question") context: str = dspy.InputField(desc="Retrieved information") can_answer: bool = dspy.OutputField(desc="True if context is sufficient") missing_info: str = dspy.OutputField(desc="What information is missing if any") confidence: float = dspy.OutputField(desc="Confidence score 0-1") ``` **Benefit**: Reduces hallucination, enables retry with expanded retrieval. #### Pattern B: Dual-Level Entity Extraction ```python # Extend HeritageEntityExtractor in dspy_heritage_rag.py class DualLevelEntityExtractor(dspy.Signature): """ Extract both entity-level and relation-level keywords from query. """ query: str = dspy.InputField() # Low-level: Specific entities entities: list[str] = dspy.OutputField( desc="Named entities: institutions, cities, people, identifiers" ) # High-level: Relation/concept descriptions relations: list[str] = dspy.OutputField( desc="Relation phrases: 'digitized collections', 'founded before 1900'" ) # Combined search strategy search_strategy: str = dspy.OutputField( desc="Recommend: 'entity_first', 'relation_first', or 'parallel'" ) ``` **Benefit**: Enables fuzzy entity matching + semantic relation matching. --- ### Priority 2: Short-Term Enhancements #### Pattern C: Community Pre-Computation ```python # New module: backend/rag/community_indexer.py import leidenalg import igraph as ig class CommunityIndexer: """ Pre-compute community clusters from Oxigraph for global queries. Run periodically (daily/weekly) or on data updates. """ def build_institution_graph(self) -> ig.Graph: """ Query Oxigraph for institution-location-type relationships. Build igraph for community detection. """ sparql = """ SELECT ?s ?p ?o WHERE { ?s a crm:E39_Actor . ?s ?p ?o . FILTER(?p IN (hc:locatedIn, hc:institutionType, hc:partOf)) } """ # Execute and build graph pass def detect_communities(self, graph: ig.Graph) -> dict: """ Apply Leiden algorithm for community detection. Returns mapping: community_id -> [ghcid_list] """ partition = leidenalg.find_partition( graph, leidenalg.ModularityVertexPartition ) return { str(i): [graph.vs[idx]["ghcid"] for idx in members] for i, members in enumerate(partition) } def generate_community_summaries( self, communities: dict ) -> list[dict]: """ Generate LLM summary for each community. Store in Qdrant for global search. """ summaries = [] for comm_id, ghcids in communities.items(): # Fetch metadata for all institutions institutions = self.fetch_institution_metadata(ghcids) # Generate summary with DSPy summary = self.summarize_community(institutions) summaries.append({ "community_id": comm_id, "ghcids": ghcids, "summary": summary, "institution_count": len(ghcids) }) return summaries ``` **Benefit**: Enables answering holistic questions like "What are the main archival themes in the Netherlands?" #### Pattern D: Temporal Query Mode ```python # Extend SPARQL templates in template_sparql.py TEMPORAL_QUERY_TEMPLATES = { "point_in_time_state": """ PREFIX hc: SELECT ?ghcid ?name ?institutionType ?city WHERE { ?s hc:ghcid ?ghcid ; skos:prefLabel ?name ; hc:institutionType ?institutionType . OPTIONAL { ?s schema:addressLocality ?city } # Temporal filter for point-in-time query ?s hc:validFrom ?validFrom . OPTIONAL { ?s hc:validTo ?validTo } FILTER(?validFrom <= "{{ query_date }}"^^xsd:date) FILTER(!BOUND(?validTo) || ?validTo > "{{ query_date }}"^^xsd:date) } """, "institution_history": """ PREFIX hc: SELECT ?ghcid ?validFrom ?validTo ?changeType ?description WHERE { ?entry hc:ghcid "{{ ghcid }}" ; hc:validFrom ?validFrom . OPTIONAL { ?entry hc:validTo ?validTo } OPTIONAL { ?entry hc:changeType ?changeType } OPTIONAL { ?entry hc:changeDescription ?description } } ORDER BY ?validFrom """ } ``` **Benefit**: Answer "What was the structure of Noord-Hollands Archief before the 2001 merger?" --- ### Priority 3: Long-Term Research #### Pattern E: Hyperedge Event Modeling Model complex heritage events (mergers, custody transfers) as hyperedges using CIDOC-CRM: ```turtle # RDF representation of custody transfer hyperedge _:transfer_001 a crm:E10_Transfer_of_Custody ; crm:P28_custody_surrendered_by ; crm:P29_custody_received_by ; crm:P30_transferred_custody_of ; crm:P4_has_time-span _:timespan_001 ; hc:partOfEvent . _:timespan_001 a crm:E52_Time-Span ; crm:P82a_begin_of_the_begin "2001-01-01"^^xsd:date ; crm:P82b_end_of_the_end "2001-01-01"^^xsd:date . ``` **Benefit**: Rich event modeling for heritage organizational changes. #### Pattern F: Session Memory Evolution Implement HGMEM-style working memory for multi-turn sessions: ```python # Extend session_manager.py class EvolvingSessionMemory: """ Session memory that builds knowledge over conversation turns. """ def __init__(self, session_id: str): self.session_id = session_id self.memory_graph = {} # GHCID -> facts self.explored_paths = [] self.unexplored_suggestions = [] def update_from_turn( self, query: str, retrieved: list[dict], response: str ) -> None: """ Update memory based on conversation turn. Identify new connections between facts. """ pass def suggest_next_exploration(self) -> list[str]: """ Suggest related queries based on memory patterns. "You explored archives in Haarlem. Related: Noord-Holland has 12 more archives you might find interesting." """ pass ``` --- ## Implementation Roadmap ### Phase 1: Quick Wins (1-2 weeks) | Task | File | Pattern | |------|------|---------| | Add ArgumentVerifier | dspy_heritage_rag.py | Pattern A | | Dual-level extraction | dspy_heritage_rag.py | Pattern B | | Temporal SPARQL templates | template_sparql.py | Pattern D | | Graph context enrichment | dspy_heritage_rag.py | TDS Pattern | ### Phase 2: Infrastructure (2-4 weeks) | Task | File | Pattern | |------|------|---------| | Community detection | community_indexer.py | Pattern C | | Community summary storage | Qdrant schema | Pattern C | | Global search mode | dspy_heritage_rag.py | GraphRAG | | Enhanced session memory | session_manager.py | Pattern F | ### Phase 3: Advanced Features (1-2 months) | Task | File | Pattern | |------|------|---------| | Event hyperedge modeling | Oxigraph schema | Pattern E | | MCTS graph exploration | graph_explorer.py | Graph-O1 | | Multi-step memory evolution | session_manager.py | HGMEM | | Exploration suggestions | dspy_heritage_rag.py | HGMEM | --- ## Anti-Patterns to Avoid ### 1. Over-Engineering the Graph > **Bad**: Try to extract every possible relation into explicit edges. > **Good**: Use minimal graph structure, infer relations via search. From TDS article: "A simple graph structure—even a star graph—can still support complex queries when combined with iterative search-space refinement." ### 2. Adding New Frameworks > **Bad**: Add LangChain, LlamaIndex, Neo4j, etc. > **Good**: Implement patterns within existing TypeDB/Oxigraph/DSPy stack. We already have a working stack. New frameworks add complexity without proportional benefit. ### 3. Ignoring Vector Search Limitations > **Bad**: Rely only on vector similarity for alphanumeric IDs (GHCID, ISIL). > **Good**: Use graph context to enrich vector queries. Alphanumeric identifiers have weak embeddings. Always combine with graph-based filtering. ### 4. Generating Without Verification > **Bad**: Pass retrieved context directly to LLM for answer generation. > **Good**: Verify context sufficiency before generation; retry if insufficient. ROGRAG shows argument checking outperforms result checking (75% vs 72% accuracy). ### 5. Flat Retrieval for Holistic Questions > **Bad**: Answer "What are the main archival themes?" with chunk-level retrieval. > **Good**: Use community summaries for holistic/global questions. Microsoft GraphRAG was specifically designed to solve this: "RAG fails on global questions directed at an entire text corpus." --- ## Conclusion Our existing TypeDB-Oxigraph-DSPy architecture is well-positioned to incorporate advanced GraphRAG patterns without adding new frameworks. The key enhancements are: 1. **Verification Layer**: Prevent hallucination with argument checking 2. **Dual-Level Retrieval**: Combine entity + relation matching 3. **Community Summaries**: Enable global/holistic questions 4. **Temporal Query Mode**: Leverage our existing GHCID history 5. **Graph Context Enrichment**: Fix weak embeddings for identifiers These patterns build on our strengths (template SPARQL, semantic caching, ontology grounding) while addressing gaps (global questions, multi-step reasoning, verification). --- ## Temporal Knowledge Graph Patterns ### Overview Temporal Knowledge Graphs (TKGs) extend traditional KGs with time-aware capabilities, enabling queries like: - "What was the status of this archive in 2001?" - "Which museums merged between 1990-2010?" - "How has the collection size changed over time?" **GLAM Already Has**: Our `ghcid_history` with `valid_from`/`valid_to` provides basic temporal tracking. These patterns enhance it. --- ### 8.1 STAR-RAG: Time-Aligned Rule Graphs (arXiv:2510.16715) **Core Innovation**: Combines temporal reasoning rules with RAG retrieval using time-aligned graph structures. **Key Components**: - **Temporal Rule Extraction**: Identifies temporal patterns in data (e.g., "archives that merged → new GHCID issued") - **Time-Aligned Subgraphs**: Groups facts by temporal validity - **Rule-Guided Retrieval**: Uses rules to expand/filter retrieval **Applicable Pattern for GLAM**: ```python # Pattern: Temporal Rule-Based Query Expansion class TemporalRuleEngine: """ Apply temporal rules to expand queries with time constraints. Example Rules: - IF merger_event(A, B, date) THEN ghcid_change(A, date) AND ghcid_change(B, date) - IF founding_date(X) < 1900 THEN historical_institution(X) - IF valid_to(fact) != NULL THEN superseded_fact(fact) """ TEMPORAL_RULES = [ { "name": "merger_implies_ghcid_change", "antecedent": "?event a hc:MergerEvent ; hc:date ?date", "consequent": "?event hc:triggersGHCIDChange true" }, { "name": "historical_institution", "antecedent": "?inst schema:foundingDate ?date . FILTER(?date < '1900-01-01'^^xsd:date)", "consequent": "?inst hc:historicalPeriod 'pre-1900'" }, { "name": "active_vs_superseded", "antecedent": "?fact hc:validTo ?endDate . FILTER(BOUND(?endDate))", "consequent": "?fact hc:status 'superseded'" } ] def expand_query_with_rules( self, base_query: str, query_date: Optional[datetime] = None ) -> str: """ Expand SPARQL query with temporal rule inferences. Args: base_query: Original SPARQL query query_date: Point-in-time for temporal filtering Returns: Expanded query with rule-based clauses """ # Add temporal validity filter if query_date: temporal_filter = f""" FILTER( ?validFrom <= "{query_date.isoformat()}"^^xsd:date && (!BOUND(?validTo) || ?validTo > "{query_date.isoformat()}"^^xsd:date) ) """ # Inject into WHERE clause base_query = self._inject_filter(base_query, temporal_filter) # Apply inference rules for rule in self.TEMPORAL_RULES: if self._rule_applies(base_query, rule): base_query = self._apply_rule(base_query, rule) return base_query def detect_temporal_intent(self, question: str) -> dict: """ Detect temporal aspects of user question. Returns: { 'has_temporal_constraint': bool, 'query_date': Optional[datetime], 'temporal_relation': 'before' | 'after' | 'during' | 'at' | None, 'event_type': 'founding' | 'merger' | 'closure' | None } """ # Pattern matching for temporal expressions patterns = { 'point_in_time': r'(?:in|during|around)\s+(\d{4})', 'before': r'before\s+(\d{4})', 'after': r'after\s+(\d{4})|since\s+(\d{4})', 'range': r'between\s+(\d{4})\s+and\s+(\d{4})', 'founding': r'founded|established|created|opened', 'merger': r'merged|combined|joined', 'closure': r'closed|dissolved|ceased' } # Implementation... pass ``` **Integration with template_sparql.py**: ```python # Add to TemplateSPARQLPipeline TEMPORAL_QUERY_TEMPLATES = { "point_in_time_state": """ {{ prefixes }} SELECT ?ghcid ?name ?type ?city WHERE { ?s a crm:E39_Actor ; hc:ghcid ?ghcid ; skos:prefLabel ?name ; hc:institutionType ?type . OPTIONAL { ?s schema:addressLocality ?city } # Temporal validity filter (STAR-RAG pattern) ?s hc:validFrom ?validFrom . OPTIONAL { ?s hc:validTo ?validTo } FILTER(?validFrom <= "{{ query_date }}"^^xsd:date) FILTER(!BOUND(?validTo) || ?validTo > "{{ query_date }}"^^xsd:date) } ORDER BY ?name LIMIT {{ limit }} """, "institution_timeline": """ {{ prefixes }} SELECT ?ghcid ?validFrom ?validTo ?changeType ?description WHERE { ?entry hc:ghcid "{{ ghcid }}" ; hc:validFrom ?validFrom . OPTIONAL { ?entry hc:validTo ?validTo } OPTIONAL { ?entry hc:changeType ?changeType } OPTIONAL { ?entry hc:changeDescription ?description } } ORDER BY ?validFrom """, "events_in_period": """ {{ prefixes }} SELECT ?event ?eventType ?date ?actor1 ?actor2 ?description WHERE { ?event a hc:OrganizationalChangeEvent ; hc:eventType ?eventType ; hc:eventDate ?date . OPTIONAL { ?event hc:affectedActor ?actor1 } OPTIONAL { ?event hc:resultingActor ?actor2 } OPTIONAL { ?event schema:description ?description } FILTER(?date >= "{{ start_date }}"^^xsd:date) FILTER(?date <= "{{ end_date }}"^^xsd:date) } ORDER BY ?date """ } ``` --- ### 8.2 TimeR4: Retrieve-Rewrite-Retrieve-Rerank (EMNLP 2024) **Core Innovation**: Four-stage temporal QA pipeline that iteratively refines queries. **Key Stages**: 1. **Retrieve**: Initial retrieval with temporal keywords 2. **Rewrite**: LLM rewrites query to be more temporally precise 3. **Retrieve**: Second retrieval with refined query 4. **Rerank**: Time-aware reranking of results **Applicable Pattern for GLAM**: ```python # Pattern: TimeR4 Multi-Stage Temporal Retrieval class TemporalMultiStageRetriever(dspy.Module): """ Four-stage temporal retrieval following TimeR4 pattern. Improves recall for temporal queries by iterative refinement. """ def __init__(self): super().__init__() # Stage 2: Query rewriter self.query_rewriter = dspy.ChainOfThought( "original_query, initial_results, temporal_context -> refined_query, temporal_constraints" ) # Stage 4: Temporal reranker self.temporal_reranker = dspy.ChainOfThought( "query, results, query_date -> ranked_results, temporal_scores" ) def forward( self, question: str, retrieve_fn: Callable[[str], list[dict]], query_date: Optional[datetime] = None ) -> Prediction: """ Execute TimeR4 pattern. Args: question: User's temporal question retrieve_fn: Retrieval function (SPARQL or vector) query_date: Extracted temporal constraint """ # STAGE 1: Initial Retrieve initial_results = retrieve_fn(question) if not initial_results: return Prediction(results=[], stages_used=1) # Extract temporal context from initial results temporal_context = self._extract_temporal_context(initial_results) # STAGE 2: Rewrite query for temporal precision rewritten = self.query_rewriter( original_query=question, initial_results=self._summarize_results(initial_results), temporal_context=temporal_context ) refined_query = rewritten.refined_query temporal_constraints = rewritten.temporal_constraints # STAGE 3: Retrieve with refined query refined_results = retrieve_fn(refined_query) # Merge results (union with dedup) all_results = self._merge_results(initial_results, refined_results) # STAGE 4: Temporal Rerank if query_date and len(all_results) > 1: ranked = self.temporal_reranker( query=question, results=all_results, query_date=query_date.isoformat() ) final_results = ranked.ranked_results else: final_results = all_results return Prediction( results=final_results, stages_used=4, refined_query=refined_query, temporal_constraints=temporal_constraints ) def _extract_temporal_context(self, results: list[dict]) -> str: """Extract temporal information from initial results.""" dates = [] for r in results: if 'founding_date' in r: dates.append(f"founded {r['founding_date']}") if 'valid_from' in r: dates.append(f"valid from {r['valid_from']}") if 'event_date' in r: dates.append(f"event on {r['event_date']}") return "; ".join(dates[:10]) ``` --- ### 8.3 T-GRAG: Temporal Conflict Resolution (arXiv:2508.01680) **Core Innovation**: Handles conflicting temporal facts gracefully. **Key Pattern**: When facts contradict across time periods, T-GRAG: 1. Identifies the conflict 2. Determines temporal validity of each fact 3. Returns the fact valid for the query time 4. Optionally explains the conflict **Applicable Pattern for GLAM**: ```python # Pattern: Temporal Conflict Detection and Resolution class TemporalConflictResolver: """ Detect and resolve conflicting facts across time periods. Common conflicts in heritage data: - Same GHCID assigned to different institutions (after merger) - Institution name changed but old name still in some records - Location changed (relocation event) - Classification changed (museum → archive) """ CONFLICT_TYPES = [ "name_change", # Institution renamed "location_change", # Institution relocated "type_change", # Classification changed "ghcid_succession", # GHCID reused after closure "data_superseded" # Newer data overrides older ] def detect_conflicts( self, ghcid: str, facts: list[dict] ) -> list[dict]: """ Detect temporal conflicts in facts about an institution. Returns list of conflict descriptions. """ conflicts = [] # Group facts by property by_property = defaultdict(list) for fact in facts: by_property[fact['property']].append(fact) # Check each property for conflicts for prop, prop_facts in by_property.items(): if len(prop_facts) > 1: # Check for overlapping validity periods for i, fact1 in enumerate(prop_facts): for fact2 in prop_facts[i+1:]: if self._periods_overlap(fact1, fact2): if fact1['value'] != fact2['value']: conflicts.append({ 'type': self._classify_conflict(prop), 'property': prop, 'fact1': fact1, 'fact2': fact2, 'resolution_needed': True }) return conflicts def resolve_for_date( self, conflicts: list[dict], query_date: datetime ) -> dict: """ Resolve conflicts for a specific query date. Returns the authoritative fact for each conflicting property. """ resolutions = {} for conflict in conflicts: # Find fact valid at query_date for fact in [conflict['fact1'], conflict['fact2']]: valid_from = self._parse_date(fact.get('valid_from')) valid_to = self._parse_date(fact.get('valid_to')) if valid_from <= query_date: if valid_to is None or valid_to > query_date: resolutions[conflict['property']] = { 'value': fact['value'], 'source': fact, 'conflict_type': conflict['type'], 'note': f"Resolved for date {query_date.isoformat()}" } break return resolutions def generate_conflict_explanation( self, conflict: dict, language: str = "nl" ) -> str: """ Generate human-readable explanation of conflict. For the RAG answer generation step. """ templates = { "name_change": { "nl": "Let op: deze instelling heette '{old}' tot {date}, daarna '{new}'.", "en": "Note: this institution was named '{old}' until {date}, then '{new}'." }, "location_change": { "nl": "Deze instelling is verhuisd van {old} naar {new} op {date}.", "en": "This institution relocated from {old} to {new} on {date}." }, "ghcid_succession": { "nl": "De GHCID {ghcid} was eerder toegekend aan {old}, nu aan {new}.", "en": "GHCID {ghcid} was previously assigned to {old}, now to {new}." } } # Format template with conflict details template = templates.get(conflict['type'], {}).get(language, "") return template.format(**self._extract_template_vars(conflict)) ``` **Integration with GHCID History**: ```python # Extend GHCIDHistoryEntry handling def query_ghcid_at_date( ghcid: str, query_date: datetime, oxigraph_client: OxigraphClient ) -> dict: """ Query GHCID state at a specific point in time. Uses ghcid_history to find the valid record. """ sparql = f""" PREFIX hc: SELECT ?name ?type ?city ?validFrom ?validTo ?changeReason WHERE {{ ?entry hc:ghcid "{ghcid}" ; skos:prefLabel ?name ; hc:institutionType ?type ; hc:validFrom ?validFrom . OPTIONAL {{ ?entry schema:addressLocality ?city }} OPTIONAL {{ ?entry hc:validTo ?validTo }} OPTIONAL {{ ?entry hc:changeReason ?changeReason }} FILTER(?validFrom <= "{query_date.isoformat()}"^^xsd:date) FILTER(!BOUND(?validTo) || ?validTo > "{query_date.isoformat()}"^^xsd:date) }} """ return oxigraph_client.query(sparql) ``` --- ### 8.4 DyG-RAG: Dynamic Event Units (Emerging Pattern) **Core Innovation**: Models events as first-class temporal entities with "Dynamic Event Units" (DEUs). **Key Concepts**: - **DEU**: Self-contained event with participants, time, location, and outcome - **Temporal Anchors**: Points connecting DEUs to timeline - **Event Chains**: Sequences of related DEUs **Applicable Pattern for GLAM**: ```python # Pattern: Dynamic Event Units for Heritage Change Events @dataclass class DynamicEventUnit: """ First-class event representation following DyG-RAG pattern. Maps directly to CIDOC-CRM E5_Event and LinkML ChangeEvent. """ event_id: str event_type: str # MERGER, FOUNDING, CLOSURE, RELOCATION, etc. # Temporal anchors start_date: datetime end_date: Optional[datetime] = None # Participants actors: list[str] # GHCIDs of involved institutions collections: list[str] = field(default_factory=list) # Affected collections # Outcomes resulting_actors: list[str] = field(default_factory=list) ghcid_changes: list[dict] = field(default_factory=list) # Provenance source_document: Optional[str] = None confidence: float = 1.0 def to_sparql_insert(self) -> str: """Generate SPARQL INSERT for this DEU.""" return f""" PREFIX hc: PREFIX crm: INSERT DATA {{ <{self.event_id}> a crm:E5_Event, hc:OrganizationalChangeEvent ; hc:eventType "{self.event_type}" ; crm:P4_has_time-span [ crm:P82a_begin_of_the_begin "{self.start_date.isoformat()}"^^xsd:date {f'; crm:P82b_end_of_the_end "{self.end_date.isoformat()}"^^xsd:date' if self.end_date else ''} ] ; hc:confidence {self.confidence} . # Link actors {self._actor_triples()} # Link outcomes {self._outcome_triples()} }} """ class DynamicEventRAG: """ RAG system using Dynamic Event Units for temporal reasoning. """ def retrieve_events_for_query( self, question: str, time_range: tuple[datetime, datetime] ) -> list[DynamicEventUnit]: """ Retrieve relevant events for temporal question. Uses both SPARQL (structured) and vector (semantic) retrieval. """ # SPARQL: Get events in time range sparql_events = self._sparql_event_query(time_range) # Vector: Semantic match on event descriptions vector_events = self._vector_event_search(question) # Merge and deduplicate all_events = self._merge_events(sparql_events, vector_events) # Build event chains chains = self._identify_event_chains(all_events) return all_events, chains def _identify_event_chains( self, events: list[DynamicEventUnit] ) -> list[list[DynamicEventUnit]]: """ Identify chains of related events. E.g., FOUNDING → MERGER → NAME_CHANGE → RELOCATION """ # Group by affected actors by_actor = defaultdict(list) for event in events: for actor in event.actors + event.resulting_actors: by_actor[actor].append(event) # Build chains ordered by time chains = [] for actor, actor_events in by_actor.items(): if len(actor_events) > 1: chain = sorted(actor_events, key=lambda e: e.start_date) chains.append(chain) return chains ``` --- ### 8.5 Mapping Temporal Patterns to GLAM Stack | Pattern | GLAM Component | Implementation Location | |---------|----------------|------------------------| | Temporal Rule Engine | template_sparql.py | New `TemporalRuleEngine` class | | TimeR4 Multi-Stage | dspy_heritage_rag.py | Extend `MultiHopHeritageRetriever` | | Conflict Resolution | schema_loader.py | New `TemporalConflictResolver` | | Dynamic Event Units | Oxigraph + LinkML | New `DynamicEventUnit` dataclass | | Point-in-Time Query | template_sparql.py | New SPARQL templates | | Event Chain Detection | dspy_heritage_rag.py | New `EventChainAnalyzer` module | **Priority Integration**: ```python # Add to HeritageQueryRouter in dspy_heritage_rag.py def _detect_temporal_query(self, question: str) -> Optional[dict]: """ Detect if query has temporal dimension. Returns temporal context if found. """ temporal_patterns = { 'point_in_time': r'(?:in|during|around|before|after)\s+(\d{4})', 'date_range': r'(?:between|from)\s+(\d{4})\s+(?:and|to)\s+(\d{4})', 'event_reference': r'(?:when|after|before)\s+(?:the\s+)?(?:merger|founding|closure)', 'historical': r'(?:historical|originally|formerly|used to be)', } for pattern_type, regex in temporal_patterns.items(): match = re.search(regex, question.lower()) if match: return { 'type': pattern_type, 'match': match.group(0), 'year': match.group(1) if match.groups() else None, } return None ``` --- ## Semantic Routing Patterns ### Overview Semantic routing enables intelligent query dispatch to specialized backends based on query intent, entity types, and semantic similarity. This is critical for GLAM where queries may target: - **Institutions** (museums, archives, libraries) → Oxigraph SPARQL + Qdrant `heritage_custodians` - **People** (staff, curators, archivists) → Qdrant `heritage_persons` - **Collections** → Oxigraph + future Qdrant collection - **Locations** → PostGIS (future) + Oxigraph - **Historical events** → Temporal query subsystem **GLAM Already Has**: `HeritageQueryRouter` with intent classification and `FykeFilter` for relevance. These patterns enhance routing precision. --- ### 9.1 vLLM Semantic Router: Signal-Decision Architecture **Core Innovation**: Separates routing into two phases: 1. **Signal Extraction**: Extract semantic signals from query (intent, entities, domain) 2. **Decision Making**: Map signals to backend routes using rules + ML **Key Insight from vLLM Semantic Router v0.1 Iris**: > "Routing should be a classifier problem, not an LLM generation problem." **Applicable Pattern for GLAM**: ```python # Pattern: Signal-Decision Semantic Router from dataclasses import dataclass from typing import Literal, Optional import numpy as np @dataclass class QuerySignals: """Semantic signals extracted from query.""" # Primary signals intent: Literal["geographic", "statistical", "relational", "temporal", "entity_lookup", "comparative", "exploration"] entity_type: Literal["person", "institution", "collection", "location", "event", "mixed"] # Secondary signals language: str has_temporal_constraint: bool has_geographic_constraint: bool requires_aggregation: bool # Extracted entities institution_mentions: list[str] person_mentions: list[str] location_mentions: list[str] # Confidence signal_confidence: float class SemanticSignalExtractor: """ Phase 1: Extract semantic signals from query. Uses lightweight models (embeddings + rules) NOT LLM calls. This is the "signal" phase - fast and deterministic. """ def __init__(self): # Intent classifier: trained on heritage query examples self.intent_embeddings = self._load_intent_embeddings() # Entity extractors: pattern-based + NER self.institution_patterns = self._compile_institution_patterns() self.person_indicators = ["curator", "archivist", "director", "medewerker", "who works", "wie werkt", "staff", "personeel"] def extract_signals(self, query: str) -> QuerySignals: """ Extract all semantic signals from query. This is a FAST operation - no LLM calls. """ query_lower = query.lower() # Intent classification via embedding similarity query_embedding = self._embed_query(query) intent = self._classify_intent(query_embedding) # Entity type detection entity_type = self._detect_entity_type(query_lower) # Constraint detection has_temporal = self._has_temporal_pattern(query_lower) has_geographic = self._has_geographic_pattern(query_lower) requires_aggregation = self._requires_aggregation(query_lower) # Entity extraction institutions = self._extract_institutions(query) persons = self._extract_persons(query) locations = self._extract_locations(query) return QuerySignals( intent=intent, entity_type=entity_type, language=self._detect_language(query), has_temporal_constraint=has_temporal, has_geographic_constraint=has_geographic, requires_aggregation=requires_aggregation, institution_mentions=institutions, person_mentions=persons, location_mentions=locations, signal_confidence=0.85 # Based on extraction quality ) def _classify_intent(self, query_embedding: np.ndarray) -> str: """ Classify intent via cosine similarity to intent exemplars. No LLM needed - pure embedding comparison. """ similarities = {} for intent, exemplar_embeddings in self.intent_embeddings.items(): # Average similarity to exemplars sims = np.dot(exemplar_embeddings, query_embedding) similarities[intent] = float(np.mean(sims)) return max(similarities, key=similarities.get) def _detect_entity_type(self, query_lower: str) -> str: """Detect primary entity type in query.""" person_score = sum(1 for p in self.person_indicators if p in query_lower) institution_score = sum(1 for p in ["museum", "archief", "bibliotheek", "archive", "library", "instelling"] if p in query_lower) if person_score > 0 and institution_score > 0: return "mixed" elif person_score > 0: return "person" elif institution_score > 0: return "institution" else: return "institution" # Default class SemanticDecisionRouter: """ Phase 2: Route query to backends based on signals. This is the "decision" phase - applies routing rules. """ # Routing rules: signal patterns → backend configuration ROUTING_RULES = [ # Person queries { "condition": lambda s: s.entity_type == "person", "primary_backend": "qdrant_persons", "secondary_backend": "sparql_persons", "collection": "heritage_persons", }, # Institution + temporal { "condition": lambda s: s.entity_type == "institution" and s.has_temporal_constraint, "primary_backend": "sparql_temporal", "secondary_backend": "qdrant_custodians", "use_temporal_templates": True, }, # Institution + geographic { "condition": lambda s: s.entity_type == "institution" and s.has_geographic_constraint, "primary_backend": "sparql_geo", "secondary_backend": "qdrant_custodians", }, # Institution + aggregation (statistical) { "condition": lambda s: s.entity_type == "institution" and s.requires_aggregation, "primary_backend": "sparql", # SPARQL COUNT/SUM aggregations "secondary_backend": "qdrant", }, # Default institution query { "condition": lambda s: s.entity_type == "institution", "primary_backend": "qdrant_custodians", "secondary_backend": "sparql", }, ] def route(self, signals: QuerySignals) -> dict: """ Apply routing rules to determine backends. Returns routing configuration. """ for rule in self.ROUTING_RULES: if rule["condition"](signals): return { "primary": rule["primary_backend"], "secondary": rule.get("secondary_backend"), "collection": rule.get("collection"), "use_temporal": rule.get("use_temporal_templates", False), "signals": signals, } # Fallback return { "primary": "qdrant_custodians", "secondary": "sparql", "signals": signals, } ``` --- ### 9.2 Integration with Existing FykeFilter and TemplateClassifier **Current GLAM Pipeline**: ``` Query → ConversationContextResolver → FykeFilter → TemplateClassifier → SlotExtractor → SPARQL ``` **Enhanced Pipeline with Semantic Routing**: ``` Query → ConversationContextResolver → FykeFilter → SemanticSignalExtractor ↓ SemanticDecisionRouter ↓ ┌─────────────────┼─────────────────┐ ↓ ↓ ↓ TemplateClassifier PersonRetriever SPARQLAggregation ↓ ↓ ↓ SPARQL Qdrant SPARQL ``` **Implementation in dspy_heritage_rag.py**: ```python # Extend HeritageQueryRouter with semantic routing class EnhancedHeritageQueryRouter(dspy.Module): """ Enhanced router with Signal-Decision architecture. Uses lightweight signal extraction before LLM classification. Falls back to LLM only when signals are ambiguous. """ def __init__(self, use_schema_aware: Optional[bool] = None, fast_lm: Optional[dspy.LM] = None): super().__init__() # Lightweight signal extraction (no LLM) self.signal_extractor = SemanticSignalExtractor() self.decision_router = SemanticDecisionRouter() # LLM fallback for ambiguous cases self.fast_lm = fast_lm if use_schema_aware is None: use_schema_aware = SCHEMA_LOADER_AVAILABLE if use_schema_aware: signature = get_schema_aware_query_intent_signature() else: signature = HeritageQueryIntent self.llm_classifier = dspy.ChainOfThought(signature) def forward(self, question: str, language: str = "nl", history: History = None) -> Prediction: """ Route query using Signal-Decision pattern. 1. Extract signals (fast, no LLM) 2. If high confidence → route directly 3. If low confidence → use LLM classification """ # Phase 1: Signal extraction signals = self.signal_extractor.extract_signals(question) # Phase 2: Decision routing if signals.signal_confidence >= 0.8: # High confidence - route without LLM route_config = self.decision_router.route(signals) return Prediction( intent=signals.intent, entity_type=signals.entity_type, entities=signals.institution_mentions + signals.person_mentions, sources=self._config_to_sources(route_config), resolved_question=question, routing_method="signal_based", route_config=route_config, ) # Low confidence - fall back to LLM if history is None: history = History(messages=[]) if self.fast_lm: with dspy.settings.context(lm=self.fast_lm): result = self.llm_classifier(question=question, language=language, history=history) else: result = self.llm_classifier(question=question, language=language, history=history) # Merge LLM result with signal-based routing signals.intent = result.intent signals.entity_type = result.entity_type route_config = self.decision_router.route(signals) return Prediction( intent=result.intent, entity_type=result.entity_type, entities=result.entities, sources=self._config_to_sources(route_config), resolved_question=result.resolved_question, reasoning=result.reasoning, routing_method="llm_enhanced", route_config=route_config, ) ``` --- ### 9.3 Multi-Index Routing with Qdrant **Pattern**: Route to different Qdrant collections based on entity type. ```python # Pattern: Multi-Collection Qdrant Router class QdrantMultiIndexRouter: """ Route queries to appropriate Qdrant collections. Collections: - heritage_custodians: Museums, archives, libraries, etc. - heritage_persons: Staff, curators, archivists, etc. - heritage_collections: (Future) Collection-level data - heritage_events: (Future) Organizational change events """ COLLECTION_CONFIGS = { "heritage_custodians": { "entity_types": ["institution"], "payload_filters": ["institution_type", "country_code", "region_code"], "embedding_field": "description_embedding", }, "heritage_persons": { "entity_types": ["person"], "payload_filters": ["custodian_slug", "role_category", "institution_type"], "embedding_field": "profile_embedding", }, } def __init__(self, qdrant_client: QdrantClient): self.client = qdrant_client def search( self, query: str, route_config: dict, limit: int = 10 ) -> list[dict]: """ Search appropriate collection(s) based on routing. """ primary = route_config.get("primary", "qdrant_custodians") # Map route to collection if "persons" in primary: collection = "heritage_persons" elif "custodians" in primary: collection = "heritage_custodians" else: collection = "heritage_custodians" # Build filters from signals signals = route_config.get("signals") filters = self._build_filters(signals, collection) # Execute search results = self.client.search( collection_name=collection, query_vector=self._embed_query(query), query_filter=filters, limit=limit, ) return [self._format_result(r) for r in results] def _build_filters(self, signals: QuerySignals, collection: str) -> Optional[Filter]: """ Build Qdrant filter from query signals. """ if signals is None: return None conditions = [] # Filter by institution type if mentioned if signals.institution_mentions and collection == "heritage_custodians": # Extract institution type from mentions inst_type = self._infer_institution_type(signals.institution_mentions) if inst_type: conditions.append( FieldCondition(key="institution_type", match=MatchValue(value=inst_type)) ) # Filter persons by custodian if institution mentioned if signals.institution_mentions and collection == "heritage_persons": slug = self._institution_to_slug(signals.institution_mentions[0]) if slug: conditions.append( FieldCondition(key="custodian_slug", match=MatchValue(value=slug)) ) # Filter by location if geographic constraint if signals.has_geographic_constraint and signals.location_mentions: loc = signals.location_mentions[0] conditions.append( FieldCondition(key="city", match=MatchText(text=loc)) ) if conditions: return Filter(must=conditions) return None ``` --- ### 9.4 Intent Detection with Semantic Similarity **Pattern**: Use embedding similarity for intent classification without LLM. ```python # Pattern: Embedding-Based Intent Classifier class EmbeddingIntentClassifier: """ Classify query intent using semantic similarity to exemplars. Faster than LLM, good for common query patterns. """ # Intent exemplars (in Dutch and English) INTENT_EXEMPLARS = { "geographic": [ "Welke musea zijn er in Amsterdam?", "Which archives are located in Noord-Holland?", "Toon me bibliotheken in Utrecht", "Museums near Rotterdam", ], "statistical": [ "Hoeveel archieven zijn er in Nederland?", "How many museums have a rating above 4?", "Count libraries by province", "Verdeling van instellingen per type", ], "entity_lookup": [ "Wat is het Rijksmuseum?", "Tell me about Nationaal Archief", "Informatie over de KB", "Details of Stadsarchief Amsterdam", ], "temporal": [ "Welke musea zijn opgericht voor 1900?", "Archives that merged in 2001", "History of Noord-Hollands Archief", "Oldest libraries in the Netherlands", ], "relational": [ "Welke archieven zijn onderdeel van KVAN?", "Museums connected to Rijksmuseum", "Archives that share collections", "Networks of heritage institutions", ], } def __init__(self): self._exemplar_embeddings = None self._model = None def _ensure_loaded(self): if self._exemplar_embeddings is not None: return from sentence_transformers import SentenceTransformer self._model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") # Pre-compute exemplar embeddings self._exemplar_embeddings = {} for intent, exemplars in self.INTENT_EXEMPLARS.items(): embeddings = self._model.encode(exemplars, convert_to_numpy=True) self._exemplar_embeddings[intent] = embeddings def classify(self, query: str) -> tuple[str, float]: """ Classify intent and return (intent, confidence). """ self._ensure_loaded() query_embedding = self._model.encode([query], convert_to_numpy=True)[0] intent_scores = {} for intent, embeddings in self._exemplar_embeddings.items(): # Cosine similarity norms = np.linalg.norm(embeddings, axis=1) * np.linalg.norm(query_embedding) similarities = np.dot(embeddings, query_embedding) / norms intent_scores[intent] = float(np.max(similarities)) best_intent = max(intent_scores, key=intent_scores.get) confidence = intent_scores[best_intent] return best_intent, confidence ``` --- ### 9.5 Mapping Routing Patterns to GLAM Stack | Pattern | GLAM Component | Implementation | |---------|----------------|----------------| | Signal Extraction | dspy_heritage_rag.py | `SemanticSignalExtractor` class | | Decision Routing | dspy_heritage_rag.py | `SemanticDecisionRouter` class | | Multi-Index Qdrant | dspy_heritage_rag.py | `QdrantMultiIndexRouter` class | | Intent Embedding | template_sparql.py | `EmbeddingIntentClassifier` class | | Person Query Route | dspy_heritage_rag.py | Route to `heritage_persons` collection | | Temporal Query Route | template_sparql.py | Use temporal SPARQL templates | **Integration Priority**: 1. **Immediate**: Add `entity_type` routing to distinguish person vs institution queries 2. **Short-term**: Implement embedding-based intent classification as pre-filter 3. **Medium-term**: Add SPARQL aggregation templates for statistical queries (COUNT, SUM, AVG) --- ## Hypergraph Patterns Deep Dive ### Overview Hypergraphs extend traditional graphs by allowing edges (hyperedges) to connect **more than two nodes**. This is powerful for heritage data where: - **Custody transfers** involve: source custodian, target custodian, collection, date, legal basis - **Mergers** involve: multiple source institutions, resulting institution, date, staff transfers - **Collection accessions** involve: collection, donor, custodian, provenance chain, date range **Why Hypergraphs for GLAM?** Traditional binary edges force artificial decomposition: ``` # Binary edges (limited) NHA --merged_from--> Gemeentearchief_Haarlem NHA --merged_from--> Rijksarchief_Noord_Holland NHA --merger_date--> 2001-01-01 # Loses connection to sources! ``` Hyperedges capture the full event: ``` # Hyperedge (complete) HYPEREDGE:Merger_2001 { type: MERGER sources: [Gemeentearchief_Haarlem, Rijksarchief_Noord_Holland] result: Noord_Hollands_Archief date: 2001-01-01 staff_transferred: 45 collections_merged: [Municipal_Records, Provincial_Archives] } ``` **GLAM Already Has**: Organizational change events in LinkML schema (`ChangeEvent`, `CustodianTimelineEvent`). Hypergraph patterns enhance retrieval for these complex events. --- ### 10.1 Hyperedge Construction from CIDOC-CRM Events **Pattern**: Map heritage change events to hyperedges using CIDOC-CRM event classes. ```python # Pattern: Hyperedge Construction Pipeline from dataclasses import dataclass, field from typing import Literal, Optional from datetime import datetime import hashlib @dataclass class Hyperedge: """ A hyperedge connecting multiple entities through an event. Maps to CIDOC-CRM event classes: - crm:E10_Transfer_of_Custody - crm:E8_Acquisition_Event - crm:E66_Formation (founding) - crm:E68_Dissolution (closure) - crm:E9_Move (relocation) """ hyperedge_id: str event_type: Literal["custody_transfer", "merger", "founding", "closure", "relocation", "name_change", "acquisition"] # Connected entities (the hyperedge connects ALL of these) source_custodians: list[str] # GHCID URIs target_custodians: list[str] # GHCID URIs collections: list[str] # Collection URIs locations: list[str] # Location URIs people: list[str] # Person URIs (staff involved) # Temporal bounds event_date: datetime event_end_date: Optional[datetime] = None # For processes # Metadata description: str = "" confidence: float = 1.0 provenance: str = "" # Vector embedding for semantic search embedding: Optional[list[float]] = None @property def all_connected_entities(self) -> list[str]: """All entities connected by this hyperedge.""" return ( self.source_custodians + self.target_custodians + self.collections + self.locations + self.people ) @property def entity_count(self) -> int: """Number of entities connected (hyperedge cardinality).""" return len(self.all_connected_entities) class HyperedgeConstructor: """ Construct hyperedges from heritage change events. Sources: - CustodianTimelineEvent YAML files - ChangeEvent entries in custodian YAML - SPARQL query results from Oxigraph """ # CIDOC-CRM event type mapping EVENT_TYPE_MAPPING = { "FOUNDING": ("crm:E66_Formation", "founding"), "CLOSURE": ("crm:E68_Dissolution", "closure"), "MERGER": ("crm:E10_Transfer_of_Custody", "merger"), "ACQUISITION": ("crm:E8_Acquisition_Event", "acquisition"), "RELOCATION": ("crm:E9_Move", "relocation"), "CUSTODY_TRANSFER": ("crm:E10_Transfer_of_Custody", "custody_transfer"), "NAME_CHANGE": ("crm:E13_Attribute_Assignment", "name_change"), } def construct_from_change_event( self, event: dict, custodian: dict ) -> Hyperedge: """ Construct hyperedge from a ChangeEvent entry. Args: event: ChangeEvent dict from custodian YAML custodian: Parent custodian dict Returns: Hyperedge connecting all entities involved """ change_type = event.get("change_type", "UNKNOWN") crm_class, event_type = self.EVENT_TYPE_MAPPING.get( change_type, ("crm:E5_Event", "unknown") ) # Generate stable hyperedge ID hyperedge_id = self._generate_hyperedge_id(event, custodian) # Extract connected entities source_custodians = [] target_custodians = [] if change_type == "MERGER": # Sources: predecessor institutions source_custodians = event.get("predecessor_custodians", []) # Target: resulting institution target_custodians = [custodian.get("ghcid", {}).get("ghcid_current")] elif change_type == "FOUNDING": # Target: newly founded institution target_custodians = [custodian.get("ghcid", {}).get("ghcid_current")] elif change_type == "CLOSURE": # Source: closed institution source_custodians = [custodian.get("ghcid", {}).get("ghcid_current")] elif change_type == "CUSTODY_TRANSFER": source_custodians = [event.get("source_custodian")] target_custodians = [event.get("target_custodian")] # Extract collections involved collections = event.get("collections_affected", []) # Extract locations locations = [] if change_type == "RELOCATION": locations = [ event.get("from_location"), event.get("to_location"), ] # Extract people involved people = event.get("staff_involved", []) # Parse event date event_date = self._parse_date(event.get("event_date")) return Hyperedge( hyperedge_id=hyperedge_id, event_type=event_type, source_custodians=[s for s in source_custodians if s], target_custodians=[t for t in target_custodians if t], collections=[c for c in collections if c], locations=[l for l in locations if l], people=[p for p in people if p], event_date=event_date, description=event.get("event_description", ""), confidence=event.get("confidence_score", 1.0), provenance=event.get("source_documentation", ""), ) def _generate_hyperedge_id(self, event: dict, custodian: dict) -> str: """Generate stable hyperedge ID from event content.""" content = f"{custodian.get('ghcid', {}).get('ghcid_current', '')}" content += f":{event.get('change_type', '')}" content += f":{event.get('event_date', '')}" hash_digest = hashlib.sha256(content.encode()).hexdigest()[:16] return f"hyperedge:{hash_digest}" def _parse_date(self, date_str: str) -> datetime: """Parse date string to datetime.""" if not date_str: return datetime.now() try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except ValueError: # Handle partial dates like "2001" or "2001-01" parts = date_str.split("-") if len(parts) == 1: return datetime(int(parts[0]), 1, 1) elif len(parts) == 2: return datetime(int(parts[0]), int(parts[1]), 1) return datetime.now() ``` --- ### 10.2 Hyperedge Retrieval with Two-Stage Scoring **Pattern**: Retrieve hyperedges using entity overlap + semantic similarity. ```python # Pattern: Hyperedge Retrieval class HyperedgeRetriever: """ Two-stage hyperedge retrieval: Stage 1: Entity Overlap Find hyperedges containing query entities (exact match) Stage 2: Semantic Similarity Rank by embedding similarity to query This combines precision (entity overlap) with recall (semantic search). """ def __init__(self, hyperedge_store: list[Hyperedge]): self.hyperedges = hyperedge_store self._entity_index = self._build_entity_index() def _build_entity_index(self) -> dict[str, set[str]]: """ Build inverted index: entity URI → hyperedge IDs. """ index = {} for he in self.hyperedges: for entity in he.all_connected_entities: if entity not in index: index[entity] = set() index[entity].add(he.hyperedge_id) return index def retrieve( self, query: str, query_entities: list[str], query_embedding: list[float], top_k: int = 5, entity_weight: float = 0.6, semantic_weight: float = 0.4, ) -> list[tuple[Hyperedge, float]]: """ Retrieve top-k hyperedges by combined scoring. Args: query: Natural language query query_entities: Extracted entity URIs from query query_embedding: Query embedding vector top_k: Number of results entity_weight: Weight for entity overlap score semantic_weight: Weight for semantic similarity Returns: List of (hyperedge, score) tuples """ scores = {} # Stage 1: Entity overlap scoring for entity in query_entities: if entity in self._entity_index: for he_id in self._entity_index[entity]: if he_id not in scores: scores[he_id] = {"entity": 0, "semantic": 0} scores[he_id]["entity"] += 1 # Normalize entity scores if query_entities: for he_id in scores: scores[he_id]["entity"] /= len(query_entities) # Stage 2: Semantic similarity import numpy as np query_vec = np.array(query_embedding) for he in self.hyperedges: if he.embedding is None: continue he_vec = np.array(he.embedding) # Cosine similarity similarity = float(np.dot(query_vec, he_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(he_vec))) if he.hyperedge_id not in scores: scores[he.hyperedge_id] = {"entity": 0, "semantic": 0} scores[he.hyperedge_id]["semantic"] = similarity # Combined scoring final_scores = [] he_by_id = {he.hyperedge_id: he for he in self.hyperedges} for he_id, score_dict in scores.items(): combined = ( entity_weight * score_dict["entity"] + semantic_weight * score_dict["semantic"] ) final_scores.append((he_by_id[he_id], combined)) # Sort and return top-k final_scores.sort(key=lambda x: x[1], reverse=True) return final_scores[:top_k] def retrieve_by_type( self, event_type: str, date_range: Optional[tuple[datetime, datetime]] = None, top_k: int = 10, ) -> list[Hyperedge]: """ Retrieve hyperedges by event type and optional date range. Useful for questions like: - "What mergers happened in 2001?" - "Show all custody transfers after 2010" """ results = [ he for he in self.hyperedges if he.event_type == event_type ] if date_range: start, end = date_range results = [ he for he in results if start <= he.event_date <= end ] return results[:top_k] ``` --- ### 10.3 CIDOC-CRM RDF Serialization for Oxigraph **Pattern**: Serialize hyperedges to CIDOC-CRM RDF for SPARQL querying. ```python # Pattern: Hyperedge to CIDOC-CRM RDF from rdflib import Graph, Namespace, URIRef, Literal, BNode from rdflib.namespace import RDF, RDFS, XSD CRM = Namespace("http://www.cidoc-crm.org/cidoc-crm/") GLAM = Namespace("https://w3id.org/heritage/custodian/") class HyperedgeRDFSerializer: """ Serialize hyperedges to CIDOC-CRM RDF triples. Maps hyperedge components to CIDOC-CRM: - Hyperedge → crm:E5_Event (or specific subclass) - source_custodians → crm:P11_had_participant - target_custodians → crm:P14_carried_out_by - collections → crm:P12_occurred_in_the_presence_of - locations → crm:P7_took_place_at - event_date → crm:P4_has_time-span """ # Event type to CRM class mapping CRM_EVENT_CLASSES = { "custody_transfer": CRM["E10_Transfer_of_Custody"], "merger": CRM["E10_Transfer_of_Custody"], "founding": CRM["E66_Formation"], "closure": CRM["E68_Dissolution"], "relocation": CRM["E9_Move"], "acquisition": CRM["E8_Acquisition_Event"], "name_change": CRM["E13_Attribute_Assignment"], } def serialize(self, hyperedge: Hyperedge) -> Graph: """ Serialize a single hyperedge to RDF graph. """ g = Graph() g.bind("crm", CRM) g.bind("glam", GLAM) # Event URI event_uri = URIRef(f"{GLAM}{hyperedge.hyperedge_id}") # Event type crm_class = self.CRM_EVENT_CLASSES.get( hyperedge.event_type, CRM["E5_Event"] ) g.add((event_uri, RDF.type, crm_class)) # Description if hyperedge.description: g.add((event_uri, RDFS.label, Literal(hyperedge.description))) # Time-span timespan = BNode() g.add((event_uri, CRM["P4_has_time-span"], timespan)) g.add((timespan, RDF.type, CRM["E52_Time-Span"])) g.add((timespan, CRM["P82a_begin_of_the_begin"], Literal(hyperedge.event_date.isoformat(), datatype=XSD.dateTime))) if hyperedge.event_end_date: g.add((timespan, CRM["P82b_end_of_the_end"], Literal(hyperedge.event_end_date.isoformat(), datatype=XSD.dateTime))) # Source custodians (participants - "from") for custodian in hyperedge.source_custodians: custodian_uri = URIRef(f"{GLAM}{custodian}") g.add((event_uri, CRM["P11_had_participant"], custodian_uri)) # Mark as source with custom predicate g.add((event_uri, GLAM["source_custodian"], custodian_uri)) # Target custodians (carried out by - "to") for custodian in hyperedge.target_custodians: custodian_uri = URIRef(f"{GLAM}{custodian}") g.add((event_uri, CRM["P14_carried_out_by"], custodian_uri)) g.add((event_uri, GLAM["target_custodian"], custodian_uri)) # Collections involved for collection in hyperedge.collections: collection_uri = URIRef(f"{GLAM}collection/{collection}") g.add((event_uri, CRM["P12_occurred_in_the_presence_of"], collection_uri)) # Locations for location in hyperedge.locations: location_uri = URIRef(f"{GLAM}location/{location}") g.add((event_uri, CRM["P7_took_place_at"], location_uri)) # People involved for person in hyperedge.people: person_uri = URIRef(f"{GLAM}person/{person}") g.add((event_uri, CRM["P11_had_participant"], person_uri)) # Provenance if hyperedge.provenance: g.add((event_uri, CRM["P70i_is_documented_in"], URIRef(hyperedge.provenance))) return g def serialize_all(self, hyperedges: list[Hyperedge]) -> Graph: """Serialize all hyperedges to a single graph.""" combined = Graph() combined.bind("crm", CRM) combined.bind("glam", GLAM) for he in hyperedges: for triple in self.serialize(he): combined.add(triple) return combined ``` --- ### 10.4 SPARQL Templates for Hyperedge Queries **Pattern**: Query hyperedges via SPARQL on Oxigraph. ```python # Pattern: Hyperedge SPARQL Templates HYPEREDGE_SPARQL_TEMPLATES = { "mergers_in_year": """ PREFIX crm: PREFIX glam: PREFIX xsd: SELECT ?event ?description ?source ?target ?date WHERE {{ ?event a crm:E10_Transfer_of_Custody ; rdfs:label ?description ; crm:P4_has_time-span ?timespan ; glam:source_custodian ?source ; glam:target_custodian ?target . ?timespan crm:P82a_begin_of_the_begin ?date . FILTER(YEAR(?date) = {year}) }} ORDER BY ?date """, "custody_transfers_for_custodian": """ PREFIX crm: PREFIX glam: SELECT ?event ?description ?counterparty ?role ?date WHERE {{ {{ ?event a crm:E10_Transfer_of_Custody ; rdfs:label ?description ; crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date ; glam:source_custodian <{custodian_uri}> ; glam:target_custodian ?counterparty . BIND("source" AS ?role) }} UNION {{ ?event a crm:E10_Transfer_of_Custody ; rdfs:label ?description ; crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date ; glam:target_custodian <{custodian_uri}> ; glam:source_custodian ?counterparty . BIND("target" AS ?role) }} }} ORDER BY ?date """, "events_in_location": """ PREFIX crm: PREFIX glam: SELECT ?event ?type ?description ?date WHERE {{ ?event crm:P7_took_place_at <{location_uri}> ; a ?type ; rdfs:label ?description ; crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date . FILTER(?type IN ( crm:E10_Transfer_of_Custody, crm:E66_Formation, crm:E68_Dissolution, crm:E9_Move )) }} ORDER BY DESC(?date) """, "founding_events_before_year": """ PREFIX crm: PREFIX glam: PREFIX xsd: SELECT ?event ?custodian ?custodian_label ?date WHERE {{ ?event a crm:E66_Formation ; crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date ; crm:P14_carried_out_by ?custodian . ?custodian rdfs:label ?custodian_label . FILTER(YEAR(?date) < {year}) }} ORDER BY ?date """, "collections_transferred_in_event": """ PREFIX crm: PREFIX glam: SELECT ?collection ?collection_label WHERE {{ <{event_uri}> crm:P12_occurred_in_the_presence_of ?collection . OPTIONAL {{ ?collection rdfs:label ?collection_label }} }} """, } class HyperedgeSPARQLExecutor: """ Execute hyperedge SPARQL queries against Oxigraph. """ def __init__(self, oxigraph_endpoint: str = "http://localhost:7878/query"): self.endpoint = oxigraph_endpoint def query_mergers_in_year(self, year: int) -> list[dict]: """Find all merger events in a given year.""" query = HYPEREDGE_SPARQL_TEMPLATES["mergers_in_year"].format(year=year) return self._execute(query) def query_custody_transfers(self, custodian_ghcid: str) -> list[dict]: """Find all custody transfers involving a custodian.""" custodian_uri = f"https://w3id.org/heritage/custodian/{custodian_ghcid}" query = HYPEREDGE_SPARQL_TEMPLATES["custody_transfers_for_custodian"].format( custodian_uri=custodian_uri ) return self._execute(query) def query_events_in_location(self, location_code: str) -> list[dict]: """Find all heritage events at a location.""" location_uri = f"https://w3id.org/heritage/custodian/location/{location_code}" query = HYPEREDGE_SPARQL_TEMPLATES["events_in_location"].format( location_uri=location_uri ) return self._execute(query) def _execute(self, query: str) -> list[dict]: """Execute SPARQL query and return results.""" import httpx response = httpx.post( self.endpoint, data=query, headers={ "Content-Type": "application/sparql-query", "Accept": "application/json", }, ) response.raise_for_status() data = response.json() results = [] for binding in data.get("results", {}).get("bindings", []): row = {} for key, value in binding.items(): row[key] = value.get("value") results.append(row) return results ``` --- ### 10.5 Integration with DSPy Heritage RAG **Pattern**: Integrate hyperedge retrieval into the existing DSPy pipeline. ```python # Pattern: Hyperedge-Enhanced RAG Module class HyperedgeEnhancedRetriever(dspy.Module): """ DSPy module that incorporates hyperedge retrieval for complex queries. Use when query involves: - Organizational change events (mergers, closures) - Custody transfers - Multi-entity relationships """ def __init__( self, hyperedge_retriever: HyperedgeRetriever, sparql_executor: HyperedgeSPARQLExecutor, entity_extractor: dspy.Module, ): super().__init__() self.hyperedge_retriever = hyperedge_retriever self.sparql_executor = sparql_executor self.entity_extractor = entity_extractor # Synthesizer for combining hyperedge data with other context self.synthesize = dspy.ChainOfThought(HyperedgeContextSynthesis) def forward( self, question: str, query_embedding: list[float], language: str = "nl", ) -> dspy.Prediction: """ Retrieve relevant hyperedges and synthesize context. """ # Extract entities from question entities = self.entity_extractor(question=question) entity_uris = self._entities_to_uris(entities) # Detect if this is a hyperedge-relevant query event_type = self._detect_event_type(question) if event_type: # Query SPARQL for specific event types sparql_results = self._query_by_event_type(event_type, question) else: sparql_results = [] # Retrieve hyperedges by entity overlap + semantic similarity hyperedge_results = self.hyperedge_retriever.retrieve( query=question, query_entities=entity_uris, query_embedding=query_embedding, top_k=5, ) # Synthesize into context context = self._format_hyperedge_context(hyperedge_results, sparql_results) return dspy.Prediction( hyperedge_context=context, retrieved_hyperedges=[he for he, _ in hyperedge_results], sparql_results=sparql_results, ) def _detect_event_type(self, question: str) -> Optional[str]: """Detect if query is about a specific event type.""" question_lower = question.lower() patterns = { "merger": ["merger", "merged", "fusie", "gefuseerd", "samengevoegd"], "founding": ["founded", "established", "opgericht", "gesticht", "founded when"], "closure": ["closed", "dissolved", "gesloten", "opgeheven"], "relocation": ["moved", "relocated", "verhuisd", "verplaatst"], } for event_type, keywords in patterns.items(): if any(kw in question_lower for kw in keywords): return event_type return None def _query_by_event_type(self, event_type: str, question: str) -> list[dict]: """Query SPARQL based on event type.""" import re # Extract year if mentioned year_match = re.search(r"\b(19|20)\d{2}\b", question) if event_type == "merger" and year_match: return self.sparql_executor.query_mergers_in_year(int(year_match.group())) elif event_type == "founding" and year_match: # Use "before year" template for "oldest" queries if "oldest" in question.lower() or "oudste" in question.lower(): return self.sparql_executor._execute( HYPEREDGE_SPARQL_TEMPLATES["founding_events_before_year"].format( year=int(year_match.group()) ) ) return [] def _format_hyperedge_context( self, hyperedges: list[tuple[Hyperedge, float]], sparql_results: list[dict], ) -> str: """Format hyperedge results into context string.""" parts = [] for he, score in hyperedges: part = f"**{he.event_type.replace('_', ' ').title()}** ({he.event_date.year}):\n" part += f" {he.description}\n" if he.source_custodians: part += f" From: {', '.join(he.source_custodians)}\n" if he.target_custodians: part += f" To: {', '.join(he.target_custodians)}\n" if he.collections: part += f" Collections: {', '.join(he.collections)}\n" parts.append(part) if sparql_results: parts.append("\n**Additional SPARQL Results:**\n") for result in sparql_results[:5]: parts.append(f" - {result}\n") return "\n".join(parts) class HyperedgeContextSynthesis(dspy.Signature): """Synthesize hyperedge context for answer generation.""" question: str = dspy.InputField(desc="User's question") hyperedge_context: str = dspy.InputField(desc="Retrieved hyperedge context") language: str = dspy.InputField(desc="Response language") synthesis: str = dspy.OutputField(desc="Synthesized context highlighting key relationships") ``` --- ### 10.6 Mapping Hypergraph Patterns to GLAM Stack | Pattern | GLAM Component | Implementation | |---------|----------------|----------------| | Hyperedge Construction | Data enrichment pipeline | `HyperedgeConstructor` class | | Two-Stage Retrieval | dspy_heritage_rag.py | `HyperedgeRetriever` class | | CIDOC-CRM Serialization | RDF export pipeline | `HyperedgeRDFSerializer` class | | SPARQL Templates | template_sparql.py | Add `HYPEREDGE_SPARQL_TEMPLATES` | | DSPy Integration | dspy_heritage_rag.py | `HyperedgeEnhancedRetriever` module | **Key Implementation Decisions**: 1. **No New Database**: Store hyperedges as RDF in existing Oxigraph instance 2. **Reuse Embeddings**: Use same embedding model as custodian descriptions 3. **Extend SPARQL**: Add hyperedge templates to existing template system 4. **DSPy Module**: Create as optional module activated for event queries **Data Flow**: ``` Change Events (YAML) ↓ HyperedgeConstructor → Hyperedge objects ↓ ├── HyperedgeRDFSerializer → Oxigraph (SPARQL) │ └── Embedding → Qdrant (future: hyperedge collection) Query ↓ Event Type Detection ↓ ├── SPARQL Templates → Oxigraph results │ └── Hyperedge Retrieval → Semantic + Entity overlap ↓ Synthesized Context → LLM → Answer ``` --- ## 11. Rules on Graphs Pattern (SHACL + Datalog Inference) **Sources**: - Velitchkov, I. (2025). "Rules on Graphs in Graphs of Rules, Part 1." Link & Think - Pareti et al. (2019). "SHACL Constraints with Inference Rules." ISWC 2019 - W3C SHACL Advanced Features Working Group (2025). Datalog Rules Proposal #348 **Core Insight**: Inference rules should be **stored as nodes IN the knowledge graph**, not hardcoded in application code. This provides: 1. **Inspectability**: Rules are queryable/auditable via SPARQL 2. **Governance**: Rule provenance tracked alongside data provenance 3. **Decoupling**: Domain logic separated from application code 4. **Interoperability**: Standard formats (SHACL, Datalog) enable rule sharing --- ### 11.1 Problem Statement Traditional RAG systems hardcode inference logic in Python/application code: ```python # ❌ BAD: Logic buried in application code def get_parent_institution(ghcid: str) -> Optional[str]: """Logic for finding parent org is embedded in code.""" if ghcid.startswith("NL-") and is_regional_archive(ghcid): return find_provincial_government(ghcid) # ... more hardcoded rules ``` This creates problems: - Rules invisible to users and auditors - Rule changes require code deployments - No provenance for derived facts - Inconsistent rule application across systems --- ### 11.2 Rules as Graph Nodes Pattern Store inference rules as first-class graph entities: ```turtle # SHACL Rule stored in Oxigraph heritage:ParentInstitutionRule a sh:NodeShape, heritage:InferenceRule ; sh:targetClass heritage:Archive ; sh:rule [ a sh:TripleRule ; sh:subject sh:this ; sh:predicate heritage:hasParentOrganization ; sh:object [ sh:path ( heritage:locatedIn heritage:governingBody ) ] ; ] ; # Rule metadata for governance heritage:rulePriority 100 ; heritage:ruleCategory "organizational_hierarchy" ; prov:wasGeneratedBy heritage:DomainExpertExtraction ; prov:generatedAtTime "2025-01-06T12:00:00Z" ; rdfs:comment "Infer parent organization from location governance."@en . ``` **Benefits**: - Rule is queryable: `SELECT ?rule WHERE { ?rule a heritage:InferenceRule }` - Rule provenance tracked with PROV-O - Rule can be versioned, deprecated, or overridden - Multiple systems can consume the same rule definition --- ### 11.3 Datalog Rules for Knowledge Graphs Datalog provides declarative inference with recursion support. Key patterns for heritage domain: ```datalog # Pattern 1: Transitive Closure (organizational hierarchy) ancestor(?X, ?Z) :- parent(?X, ?Z). ancestor(?X, ?Z) :- parent(?X, ?Y), ancestor(?Y, ?Z). # Pattern 2: Derived Classification (heritage type inference) museum_archive(?X) :- institution_type(?X, "MUSEUM"), has_collection(?X, ?C), collection_type(?C, "archival"). # Pattern 3: Temporal Validity (valid at point in time) valid_at(?Entity, ?Date) :- valid_from(?Entity, ?Start), valid_to(?Entity, ?End), ?Start <= ?Date, ?Date <= ?End. # Pattern 4: Shortcut Predicates (query optimization) # Instead of traversing: institution → location → region → country in_country(?Institution, ?Country) :- located_in(?Institution, ?Location), admin_region(?Location, ?Region), country(?Region, ?Country). ``` **Why Datalog over SPARQL Property Paths**: - SPARQL property paths don't support complex recursion - Datalog rules are **materialized** (precomputed), making queries instant - Rules separate "what" from "how" (declarative vs. procedural) --- ### 11.4 GLAM Implementation: TypeDB Rules + Oxigraph SHACL Our stack has **two inference engines** that can host rules: | Component | Rule Language | Strengths | Use Case | |-----------|--------------|-----------|----------| | **TypeDB** | TypeQL Rules | Native reasoning, transitive closure | Complex inference, hierarchy traversal | | **Oxigraph** | SHACL-AF | W3C standard, RDF-native | Validation, simple derivation | **Hybrid Approach**: ``` ┌─────────────────────────────────────┐ │ Rule Repository │ │ (Rules stored as RDF in Oxigraph) │ └─────────────────────────────────────┘ ↓ Query rules ↓ ┌─────────────────────┴─────────────────────┐ ↓ ↓ ┌───────────────────┐ ┌───────────────────┐ │ Oxigraph │ │ TypeDB │ │ SHACL-AF Rules │ │ TypeQL Rules │ │ │ │ │ │ • Validation │ │ • Transitive │ │ • Simple derive │ │ closure │ │ • sh:TripleRule │ │ • Complex joins │ │ │ │ • Recursive │ └───────────────────┘ └───────────────────┘ ↓ ↓ └───────────────────┬───────────────────────┘ ↓ ┌───────────────┐ │ DSPy RAG │ │ Query Engine │ └───────────────┘ ``` --- ### 11.5 TypeDB Rules for Heritage Domain TypeDB rules (TypeQL) are particularly powerful for heritage inference: ```typeql # Rule: Infer parent-child organizational relationships rule heritage-parent-child-inference: when { $child isa heritage-custodian, has ghcid $child_id; $parent isa heritage-custodian, has ghcid $parent_id; $event isa change-event, has event-type "ACQUISITION"; ($event, acquiring: $parent, acquired: $child); } then { (parent: $parent, child: $child) isa organizational-hierarchy; }; # Rule: Infer collection custody from organizational mergers rule custody-transfer-from-merger: when { $source isa heritage-custodian; $target isa heritage-custodian; $collection isa collection; ($source, custodian-of: $collection); $event isa change-event, has event-type "MERGER"; ($event, absorbed: $source, absorbing: $target); $event has event-date $date; } then { ($target, custodian-of: $collection) isa custody-relation, has custody-start-date $date; }; # Rule: Regional archives inherit provincial governance rule regional-archive-governance: when { $archive isa heritage-custodian, has institution-type "ARCHIVE"; $location isa location, has region-code $region; ($archive, located-in: $location); $gov isa government-body, has jurisdiction $region; } then { (governed-by: $gov, governed-entity: $archive) isa governance-relation; }; ``` **Advantages of TypeDB Rules**: - Automatically materialized (precomputed) - Recursive reasoning built-in - Rules trigger on data changes - Explanation support (why was this inferred?) --- ### 11.6 SHACL Rules for Oxigraph SHACL Advanced Features (SHACL-AF) provides rule support: ```turtle @prefix sh: . @prefix heritage: . # SHACL Rule: Derive institution display name from components heritage:DisplayNameRule a sh:NodeShape ; sh:targetClass heritage:HeritageCustodian ; sh:rule [ a sh:SPARQLRule ; sh:construct """ CONSTRUCT { $this heritage:displayName ?displayName . } WHERE { $this heritage:name ?name . $this heritage:locatedIn/heritage:city ?city . BIND(CONCAT(?name, " (", ?city, ")") AS ?displayName) } """ ] . # SHACL Rule: Flag custodians with data quality issues heritage:DataQualityRule a sh:NodeShape ; sh:targetClass heritage:HeritageCustodian ; sh:rule [ a sh:SPARQLRule ; sh:construct """ CONSTRUCT { $this heritage:hasDataQualityIssue heritage:MissingLocation . } WHERE { $this a heritage:HeritageCustodian . FILTER NOT EXISTS { $this heritage:locatedIn ?loc } } """ ] . ``` --- ### 11.7 Rule-Based RAG Enhancement Rules can improve RAG retrieval and generation: **Pattern A: Rule-Guided Query Expansion** ```python class RuleGuidedQueryExpander: """Expand queries using inference rules stored in graph.""" def __init__(self, oxigraph_client: OxigraphClient): self.client = oxigraph_client self._load_expansion_rules() def _load_expansion_rules(self): """Load query expansion rules from graph.""" query = """ PREFIX heritage: SELECT ?rule ?pattern ?expansion WHERE { ?rule a heritage:QueryExpansionRule ; heritage:matchPattern ?pattern ; heritage:expandTo ?expansion . } """ self.rules = self.client.query(query) def expand(self, query: str) -> list[str]: """Apply rules to expand query terms.""" expansions = [query] for rule in self.rules: if rule['pattern'] in query.lower(): expanded = query.replace( rule['pattern'], rule['expansion'] ) expansions.append(expanded) return expansions ``` **Pattern B: Rule-Derived Facts in Context** ```python class RuleDerivedContextEnricher: """Add inferred facts to RAG context.""" def enrich_context( self, entities: list[str], context: str ) -> str: """Add rule-derived facts about entities.""" derived_facts = [] for entity_ghcid in entities: # Query TypeDB for inferred relations inferred = self.typedb_client.query(f""" match $e isa heritage-custodian, has ghcid "{entity_ghcid}"; $rel ($e, $other); $rel isa! $rel_type; # Only inferred relations get $rel_type, $other; """) for fact in inferred: derived_facts.append( f"[Inferred] {entity_ghcid} {fact['rel_type']} {fact['other']}" ) if derived_facts: context += "\n\n**Inferred relationships:**\n" context += "\n".join(derived_facts) return context ``` --- ### 11.8 Rule Governance and Provenance Store rule metadata for auditability: ```yaml # Rule definition with full provenance (stored in Oxigraph) rule_definition: id: heritage:CustodyInferenceRule_v1 type: TypeQLRule version: "1.0.0" status: active # active | deprecated | testing # Rule content when_clause: | $source isa heritage-custodian; $event isa change-event, has event-type "MERGER"; ... then_clause: | ($target, custodian-of: $collection) isa custody-relation; # Governance metadata created_by: "domain-expert-curator" created_date: "2025-01-06T12:00:00Z" approved_by: "heritage-governance-committee" approval_date: "2025-01-07T09:00:00Z" # Semantic metadata domain: organizational_change entities_affected: - heritage:HeritageCustodian - heritage:Collection related_properties: - heritage:custodianOf - heritage:changeEvent # Documentation description: | When a heritage custodian is absorbed through merger, custody of their collections transfers to the absorbing institution. rationale: | Per Dutch heritage law, organizational mergers transfer custody unless explicitly reassigned. example_trigger: | Noord-Hollands Archief merger in 2001 from Gemeentearchief Haarlem and Rijksarchief Noord-Holland. ``` --- ### 11.9 Integration with DSPy Heritage RAG Add rule-awareness to the existing pipeline: ```python # backend/rag/rule_aware_retriever.py from dataclasses import dataclass from typing import Optional @dataclass class InferredFact: """A fact derived by rule inference.""" subject: str predicate: str object: str rule_id: str confidence: float = 1.0 # Rules produce certain facts class RuleAwareRetriever: """Retriever that includes rule-inferred facts.""" def __init__( self, typedb_client, oxigraph_client, include_inferred: bool = True ): self.typedb = typedb_client self.oxigraph = oxigraph_client self.include_inferred = include_inferred def retrieve( self, query: str, entities: list[str] ) -> tuple[list[dict], list[InferredFact]]: """Retrieve both stored and inferred facts.""" # Standard retrieval stored_facts = self._retrieve_stored(query, entities) # Rule-based inference (if enabled) inferred_facts = [] if self.include_inferred: inferred_facts = self._retrieve_inferred(entities) return stored_facts, inferred_facts def _retrieve_inferred( self, entities: list[str] ) -> list[InferredFact]: """Get inferred facts from TypeDB reasoning.""" inferred = [] for ghcid in entities: # TypeDB query with inference enabled results = self.typedb.query(f""" match $e isa heritage-custodian, has ghcid "{ghcid}"; $rel ($e, $other); get $rel, $other; """, inference=True) for r in results: if r.get('inferred', False): inferred.append(InferredFact( subject=ghcid, predicate=r['relation_type'], object=r['other_entity'], rule_id=r.get('rule_id', 'unknown') )) return inferred ``` --- ### 11.10 Key Takeaways for GLAM | Principle | Implementation | |-----------|----------------| | **Rules as data** | Store SHACL/TypeQL rules in Oxigraph as RDF | | **Rule provenance** | Track rule creator, approver, version with PROV-O | | **Dual inference** | TypeDB for complex reasoning, SHACL for validation | | **Query optimization** | Precompute shortcut predicates via rules | | **RAG enhancement** | Include inferred facts in retrieval context | | **Governance** | Rules queryable, auditable, versionable | **Anti-Patterns to Avoid**: - ❌ Hardcoding inference logic in Python - ❌ Duplicating rules across TypeDB and application code - ❌ No provenance for derived facts - ❌ Rules that can't be inspected by domain experts --- ## Updated References 1. Edge et al. (2024). "From Local to Global: A Graph RAG Approach to Query-Focused Summarization." arXiv:2404.16130 2. Wang et al. (2025). "ROGRAG: A Robustly Optimized GraphRAG Framework." arXiv:2503.06474 3. Rasmussen et al. (2025). "Zep: A Temporal Knowledge Graph Architecture for Agent Memory." arXiv:2501.13956 4. Luo et al. (2025). "HyperGraphRAG: Retrieval-Augmented Generation via Hypergraph-Structured Knowledge Representation." arXiv:2503.21322 (NeurIPS 2025) 5. Zhou et al. (2025). "Improving Multi-step RAG with Hypergraph-based Memory for Long-Context Complex Relational Modeling." arXiv:2512.23959 6. Sarkar (2025). "GraphRAG in Practice: How to Build Cost-Efficient, High-Recall Retrieval Systems." Towards Data Science 7. Turing Post (2026). "12 New Advanced Types of RAG." 8. Ding et al. (2025). "STAR-RAG: Time-Aligned Rule Graphs for Temporal Reasoning." arXiv:2510.16715 9. Chen et al. (2024). "TimeR4: Time-Aware Retrieve-Rewrite-Retrieve-Rerank for Temporal QA." EMNLP 2024 10. Wang et al. (2025). "T-GRAG: Temporal Graph RAG with Conflict Resolution." arXiv:2508.01680 11. vLLM Project (2025). "Semantic Router v0.1 Iris: Signal-Decision Architecture." 12. Aurelio Labs (2025). "semantic-router: Superfast Decision-Making Layer for LLMs." 13. CIDOC-CRM Special Interest Group (2024). "CIDOC Conceptual Reference Model v7.1.3." ICOM. 14. Doerr, M. et al. (2023). "Mapping Cultural Heritage Events to CIDOC-CRM." Museum & Web 2023. 15. Klyne, G. et al. (2024). "Hypergraph Patterns for Cultural Heritage Knowledge Graphs." Digital Humanities Quarterly. 16. DSPy Framework (2024). "Declarative Self-Improving Language Programs." Stanford NLP. 17. Pinecone (2024). "Vector Database Best Practices for RAG." Technical Blog. 18. LightRAG (2024). "Simple and Fast Retrieval-Augmented Generation." GitHub Repository. 19. Velitchkov, I. (2025). "Rules on Graphs in Graphs of Rules, Part 1." Link & Think (Substack). 20. Pareti, P. et al. (2019). "SHACL Constraints with Inference Rules." ISWC 2019. arXiv:1911.00598 21. W3C SHACL Working Group (2025). "Use case: Datalog rules." GitHub Issue #348, w3c/data-shapes. 22. Han, H. et al. (2025). "Retrieval-Augmented Generation with Graphs (GraphRAG)." arXiv:2501.00309 23. SurrealDB (2025). "Automating Knowledge Graphs with SurrealDB and Gemini." Technical Blog.