- Add prompt-query_template_mapping/SOTA_analysis.md with Formica et al. research - Update GraphRAG design patterns documentation - Update temporal semantic hypergraph documentation
116 KiB
GraphRAG Design Patterns for Heritage Knowledge Graph + Vector Search
Created: 2025-01-06
Purpose: Analysis of external GraphRAG patterns applicable to our TypeDB-Oxigraph-DSPy architecture
Status: Research and Planning
Table of Contents
- Executive Summary
- Current GLAM Architecture Analysis
- External Pattern Analysis
- Recommended Design Patterns
- Implementation Roadmap
- Anti-Patterns to Avoid
- Conclusion
- Temporal Knowledge Graph Patterns
- Semantic Routing Patterns
- Hypergraph Patterns Deep Dive
- Rules on Graphs Pattern
- References
Executive Summary
This document analyzes design patterns from leading GraphRAG research and libraries (Microsoft GraphRAG, ROGRAG, Zep, HyperGraphRAG, LightRAG, etc.) and identifies patterns applicable to our existing TypeDB-Oxigraph-DSPy stack without adding new frameworks.
Key Findings
| Pattern Category | Applicable to GLAM | Implementation Complexity | Priority |
|---|---|---|---|
| Community Hierarchies | Yes | Medium | High |
| Temporal Knowledge Graphs | Yes (already have) | Low | High |
| Dual-Level Retrieval | Yes | Low | High |
| Hypergraph Memory | Partial | High | Medium |
| Multi-Stage Verification | Yes | Medium | High |
| Iterative Search Optimization | Yes | Low | High |
Core Principle
Avoid adding new frameworks. Focus on extracting design patterns as implementation strategies within our existing stack: TypeDB (semantic graph), Oxigraph (RDF/SPARQL), Qdrant (vector search), and DSPy (LLM orchestration).
Current GLAM Architecture Analysis
Existing Components
┌─────────────────────────────────────────────────────────────────────┐
│ GLAM RAG Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Qdrant │ │ Oxigraph │ │ TypeDB │ │
│ │ Vector Store │ │ SPARQL/RDF │ │ Schema Store │ │
│ │ │ │ │ │ │ │
│ │ - Embeddings │ │ - Triples │ │ - LinkML │ │
│ │ - Semantic │ │ - SPARQL │ │ - Ontology │ │
│ │ Search │ │ Queries │ │ - Validation │ │
│ └──────┬───────┘ └──────┬───────┘ └──────────────┘ │
│ │ │ │
│ └───────────────────┴──────────────────────────────────┐ │
│ │ │
│ ┌────────────────────────────────────────────────────────────┐│ │
│ │ DSPy Heritage RAG ││ │
│ │ ││ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌────────────────┐ ││ │
│ │ │ Template │ │ Entity │ │ SPARQL │ ││ │
│ │ │ SPARQL │ │ Extraction │ │ Generator │ ││ │
│ │ │ Classifier │ │ (DSPy Sig) │ │ (DSPy Sig) │ ││ │
│ │ └───────────────┘ └───────────────┘ └────────────────┘ ││ │
│ │ ││ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌────────────────┐ ││ │
│ │ │ Semantic │ │ Cost │ │ GEPA │ ││ │
│ │ │ Cache │ │ Tracker │ │ Optimizer │ ││ │
│ │ └───────────────┘ └───────────────┘ └────────────────┘ ││ │
│ └────────────────────────────────────────────────────────────┘│ │
│ │ │
└─────────────────────────────────────────────────────────────────────┘
Current Retrieval Flow
- Query Intent Classification (DSPy Signature)
- Entity Extraction (Heritage-specific NER)
- Template Matching (SPARQL template selection)
- Dual Retrieval:
- SPARQL queries to Oxigraph (structured)
- Vector search in Qdrant (semantic)
- Result Fusion (merge and deduplicate)
- Answer Generation (DSPy with context)
Strengths of Current System
- Template-based SPARQL: 65% precision vs 10% LLM-only (Formica et al., 2023)
- Semantic caching: Reduces redundant LLM calls
- Temporal awareness: GHCID history tracking with
valid_from/valid_to - Ontology grounding: LinkML schema provides type safety
- Multi-hop capable: SPARQL traverses relationships
Current Gaps (Opportunities)
| Gap | External Pattern Solution |
|---|---|
| No community/cluster summaries | Microsoft GraphRAG communities |
| Limited iterative refinement | ROGRAG dual-level + logic form |
| No explicit verification step | ROGRAG argument checking |
| Flat retrieval (no hierarchy) | GraphRAG local/global search |
| Missing hyperedge relations | HyperGraphRAG n-ary facts |
External Pattern Analysis
1. Microsoft GraphRAG (arxiv:2404.16130)
Core Innovation: Hierarchical community summarization using Leiden clustering.
Key Components:
- Entity Extraction: LLM extracts entities/relationships from text chunks
- Community Detection: Leiden algorithm clusters related entities
- Hierarchical Summaries: Bottom-up summaries for each community level
- Query Modes:
- Global Search: Uses community summaries for holistic questions
- Local Search: Fan-out from specific entities
- DRIFT Search: Local + community context
Applicable Pattern for GLAM:
# Pattern: Community-Based Retrieval for Holistic Questions
# E.g., "What are the main themes across Dutch archives?"
class CommunityRetriever:
"""
Pre-compute community clusters from Oxigraph triples.
Store community summaries in Qdrant as additional vectors.
"""
def detect_communities(self) -> dict[str, list[str]]:
"""Use Leiden/Louvain on institution-location-type graph."""
# SPARQL: Get all institution relationships
# Apply community detection algorithm
# Return community_id -> [ghcid_list]
pass
def generate_community_summary(self, community_ghcids: list[str]) -> str:
"""LLM summarizes institutions in a community."""
# Retrieve metadata for all institutions
# Generate summary with DSPy signature
pass
def global_search(self, query: str) -> list[str]:
"""Search community summaries for holistic questions."""
# Vector search community summaries
# Aggregate partial answers
pass
Implementation in Our Stack:
- Use Oxigraph SPARQL to extract graph for clustering
- Run Leiden algorithm (Python
leidenalglibrary) - Store community summaries as Qdrant vectors
- Add global_search mode to DSPy RAG
2. ROGRAG (arxiv:2503.06474)
Core Innovation: Multi-stage retrieval with dual-level + logic form methods.
Key Components:
- Dual-Level Retrieval:
- Low-level: Entity keywords (fuzzy matching)
- High-level: Relational descriptions (semantic matching)
- Logic Form Retrieval: Operator-based query decomposition
- Retrieval Verifier: Argument checking before generation
Applicable Pattern for GLAM:
# Pattern: Dual-Level Retrieval with Verification
class DualLevelRetriever:
"""
Combine entity-level and relation-level matching.
"""
def extract_dual_level(self, query: str) -> tuple[list[str], list[str]]:
"""
Extract low-level (entities) and high-level (relations) from query.
E.g., "Which archives in Haarlem have digitized collections?"
Low-level: ["Haarlem", "archief"]
High-level: ["digitized collections", "heritage institution"]
"""
# DSPy signature for dual extraction
pass
def match_low_level(self, entities: list[str]) -> set[str]:
"""Fuzzy match entities against Oxigraph nodes."""
# SPARQL with FILTER(CONTAINS(...))
# Return matching GHCIDs
pass
def match_high_level(self, relations: list[str]) -> set[str]:
"""Semantic match relations against edge descriptions."""
# Vector search in Qdrant
# Return matching GHCIDs
pass
def merge_results(self, low: set[str], high: set[str]) -> list[str]:
"""Merge and deduplicate, prioritize intersection."""
intersection = low & high
return list(intersection) + list(low - high) + list(high - low)
class RetrievalVerifier:
"""
Verify retrieved context answers the question before generation.
"""
def verify_argument(self, query: str, context: str) -> bool:
"""
Check if context is sufficient to answer query.
Reject if confidence < threshold.
"""
# DSPy signature for verification
# Return True if sufficient, False to retry
pass
Key Insight from ROGRAG:
"Although the dual-level method achieves higher precision, logic form method provides higher information density and is more concise and clear."
Implementation in Our Stack:
- Add dual-level extraction as DSPy Signature
- Extend template_sparql.py with fuzzy matching
- Add RetrievalVerifier between retrieval and generation
- Implement fallback cascade: Template → Dual-Level → Logic Form → Vector-only
3. Zep Temporal Knowledge Graph (arxiv:2501.13956)
Core Innovation: Bitemporal modeling with episodic, semantic, and community subgraphs.
Key Components:
- Episode Subgraph: Raw events with original timestamps
- Semantic Entity Subgraph: Extracted entities with embeddings
- Community Subgraph: Clustered entities with summaries
- Bitemporal Modeling:
- Event Time (T): When fact occurred
- Ingestion Time (T'): When added to graph
- Edge Invalidation: Update/supersede facts over time
We Already Have This! GLAM's ghcid_history with valid_from/valid_to implements temporal tracking.
Enhancement Pattern:
# Pattern: Enhanced Temporal Reasoning
class TemporalReasoningEnhancer:
"""
Extend existing temporal tracking with Zep-style capabilities.
"""
def query_at_point_in_time(
self,
ghcid: str,
query_date: datetime
) -> dict:
"""
Return institution state at specific point in time.
Uses GHCID history to find valid record.
"""
# SPARQL with temporal filter:
# FILTER(?valid_from <= ?query_date &&
# (?valid_to IS NULL || ?valid_to > ?query_date))
pass
def track_provenance_chain(self, ghcid: str) -> list[dict]:
"""
Full audit trail: what changed, when, why.
Critical for heritage institutions with mergers/splits.
"""
# Query ghcid_history entries
# Include change_event references
pass
def invalidate_superseded_facts(
self,
ghcid: str,
new_fact: dict,
reason: str
) -> None:
"""
When new fact supersedes old, mark old as invalid.
Preserve provenance for auditability.
"""
# Set valid_to on old fact
# Create new fact with valid_from
# Link via change_event
pass
Implementation in Our Stack:
- Already have
GHCIDHistoryEntryin LinkML schema - Enhance SPARQL templates with temporal filters
- Add point-in-time query mode to DSPy RAG
- Leverage ChangeEvent for provenance chain
4. HyperGraphRAG (arxiv:2503.21322)
Core Innovation: N-ary relations via hyperedges (connecting 3+ entities).
Key Components:
- Hyperedge Construction: Facts connecting multiple entities
- Hyperedge Retrieval: Match queries to multi-entity facts
- Hyperedge Generation: LLM reasons over hyperedge context
Example N-ary Fact:
"The Amsterdam Museum acquired the Rembrandt collection from the Rijksmuseum in 2020 as part of the Shared Heritage initiative."
Traditional triple: Can only capture binary relations.
Hyperedge: Connects Museum, Collection, Year, Initiative, Source in single fact.
Applicable Pattern for GLAM:
# Pattern: N-ary Relation Modeling
class HyperedgeManager:
"""
Model complex heritage events as hyperedges.
Store in Oxigraph using reification or named graphs.
"""
def create_custody_transfer_hyperedge(
self,
source_ghcid: str,
target_ghcid: str,
collection_uri: str,
event_date: str,
initiative_name: str
) -> str:
"""
Create hyperedge for custody transfer event.
Uses RDF reification pattern:
_:transfer a hc:CustodyTransfer ;
hc:source <source_ghcid> ;
hc:target <target_ghcid> ;
hc:collection <collection_uri> ;
schema:date "2020-01-01" ;
hc:initiative "Shared Heritage" .
"""
pass
def retrieve_by_partial_match(
self,
known_entities: list[str]
) -> list[dict]:
"""
Find hyperedges matching subset of entities.
E.g., query mentions "Rijksmuseum" and "2020" → find all
transfers involving Rijksmuseum in 2020.
"""
# SPARQL with OPTIONAL patterns
pass
CIDOC-CRM Alignment: We already use CIDOC-CRM which supports n-ary relations via events:
crm:E10_Transfer_of_Custody- connects parties, object, timecrm:E5_Event- generic multi-participant events
Implementation in Our Stack:
- Use CIDOC-CRM event classes for n-ary facts
- Extend entity extraction to recognize event patterns
- Add event-aware SPARQL templates
- Index event descriptions in Qdrant for semantic matching
5. Cost-Efficient GraphRAG (TDS Article)
Core Insight: "You don't need a perfect graph."
Key Patterns:
-
Star Graph Sufficiency:
- Minimal graph: Central node (report/institution) → entities
- Relations inferred via iterative search, not explicit edges
-
Iterative Search Space Optimization:
- Graph narrows documents → Vector refines chunks
- Context enrichment fixes weak embeddings (IDs, dates)
-
Graph as Classifier, Not Answer:
- Node metadata (doc_id) filters search space
- Actual answers from vector chunks
Applicable Pattern for GLAM:
# Pattern: Graph-Guided Vector Retrieval
class GraphGuidedRetriever:
"""
Use KG to narrow search space, then vector for final retrieval.
Fixes weak embeddings for identifiers like GHCID, ISIL codes.
"""
def retrieve_with_graph_filter(
self,
query: str,
use_graph_context: bool = True
) -> list[dict]:
"""
1. Extract entities from query
2. Graph lookup: Find related GHCIDs
3. Vector search: Filter by GHCID set
4. Context enrichment: Add graph metadata
"""
# Step 1: Entity extraction
entities = self.extract_entities(query)
# Step 2: Graph lookup (SPARQL)
ghcid_set = self.graph_lookup(entities)
# Step 3: Vector search with filter
if ghcid_set:
vector_results = self.qdrant_search(
query,
filter={"ghcid": {"$in": list(ghcid_set)}}
)
else:
vector_results = self.qdrant_search(query)
# Step 4: Context enrichment
enriched = self.enrich_with_graph_context(
vector_results,
ghcid_set
)
return enriched
def enrich_with_graph_context(
self,
results: list[dict],
ghcid_set: set[str]
) -> list[dict]:
"""
Add graph metadata to vector results.
Helps LLM understand relations between results.
"""
for result in results:
ghcid = result.get("ghcid")
if ghcid:
# Fetch neighbors from Oxigraph
neighbors = self.get_graph_neighbors(ghcid)
result["graph_context"] = neighbors
return results
This is close to our current approach! We already do:
- Entity extraction → SPARQL → Vector fallback
- GHCID-based filtering
Enhancement: Add explicit graph context enrichment step.
6. HGMEM: Hypergraph-Based Memory (arxiv:2512.23959)
Core Innovation: Working memory as evolving hypergraph for multi-step RAG.
Key Components:
- Hyperedges as Memory Units: Each memory unit connects multiple facts
- Memory Operations: Update, Insert, Merge
- Adaptive Retrieval: Local investigation vs. global exploration
Applicable Pattern for GLAM:
# Pattern: Session-Based Working Memory
class HypergraphSessionMemory:
"""
Maintain session-level working memory for multi-turn conversations.
Memory evolves through retrieval steps.
"""
def __init__(self, session_id: str):
self.session_id = session_id
self.memory_hyperedges: list[dict] = [] # Each connects facts
self.explored_ghcids: set[str] = set()
self.unexplored_aspects: list[str] = []
def add_memory_unit(
self,
facts: list[dict],
source_query: str
) -> None:
"""
Create hyperedge connecting related facts from single retrieval.
"""
hyperedge = {
"id": generate_id(),
"facts": facts,
"source_query": source_query,
"timestamp": datetime.now(),
"ghcids": [f.get("ghcid") for f in facts if f.get("ghcid")]
}
self.memory_hyperedges.append(hyperedge)
self.explored_ghcids.update(hyperedge["ghcids"])
def merge_related_memories(self) -> None:
"""
Merge hyperedges with overlapping GHCIDs.
Creates higher-order connections.
"""
# Cluster by GHCID overlap
# Merge overlapping hyperedges
pass
def suggest_exploration(self) -> list[str]:
"""
Identify unexplored aspects based on partial patterns.
E.g., "You asked about archives in Haarlem.
Related: Noord-Holland province has 12 more archives."
"""
# Analyze memory for patterns
# Suggest related but unexplored queries
pass
Implementation in Our Stack:
- Extend session_manager.py with hypergraph memory
- Store session memories in Qdrant (vector) + Oxigraph (structure)
- Add exploration suggestions to response
7. 12 Advanced RAG Types Summary (Turing Post)
Quick reference for additional patterns:
| RAG Type | Key Idea | GLAM Applicability |
|---|---|---|
| MiA-RAG | High-level summary guides retrieval | Medium (for long docs) |
| QuCo-RAG | Statistical entity flagging | Low (heritage data is clean) |
| HiFi-RAG | Multi-stage filtering | High (already doing) |
| Bidirectional RAG | Write-back to corpus | Medium (for enrichment) |
| TV-RAG | Temporal video alignment | Low (not video-focused) |
| MegaRAG | Multimodal knowledge graphs | Medium (future: photos) |
| Graph-O1 | MCTS graph exploration | Medium (complex reasoning) |
| Hybrid RAG | Multilingual with RRF | High (Dutch/English) |
Recommended Design Patterns
Priority 1: Immediate Implementation
Pattern A: Retrieval Verification Layer
# Add between retrieval and generation in dspy_heritage_rag.py
class ArgumentVerifier(dspy.Signature):
"""
Verify if retrieved context can answer the query.
Prevents hallucination from insufficient context.
"""
__doc__ = """
You are a verification assistant. Given a user query and retrieved context,
determine if the context contains sufficient information to answer the query.
Be strict: If key entities or facts are missing, return can_answer=False.
"""
query: str = dspy.InputField(desc="User's question")
context: str = dspy.InputField(desc="Retrieved information")
can_answer: bool = dspy.OutputField(desc="True if context is sufficient")
missing_info: str = dspy.OutputField(desc="What information is missing if any")
confidence: float = dspy.OutputField(desc="Confidence score 0-1")
Benefit: Reduces hallucination, enables retry with expanded retrieval.
Pattern B: Dual-Level Entity Extraction
# Extend HeritageEntityExtractor in dspy_heritage_rag.py
class DualLevelEntityExtractor(dspy.Signature):
"""
Extract both entity-level and relation-level keywords from query.
"""
query: str = dspy.InputField()
# Low-level: Specific entities
entities: list[str] = dspy.OutputField(
desc="Named entities: institutions, cities, people, identifiers"
)
# High-level: Relation/concept descriptions
relations: list[str] = dspy.OutputField(
desc="Relation phrases: 'digitized collections', 'founded before 1900'"
)
# Combined search strategy
search_strategy: str = dspy.OutputField(
desc="Recommend: 'entity_first', 'relation_first', or 'parallel'"
)
Benefit: Enables fuzzy entity matching + semantic relation matching.
Priority 2: Short-Term Enhancements
Pattern C: Community Pre-Computation
# New module: backend/rag/community_indexer.py
import leidenalg
import igraph as ig
class CommunityIndexer:
"""
Pre-compute community clusters from Oxigraph for global queries.
Run periodically (daily/weekly) or on data updates.
"""
def build_institution_graph(self) -> ig.Graph:
"""
Query Oxigraph for institution-location-type relationships.
Build igraph for community detection.
"""
sparql = """
SELECT ?s ?p ?o WHERE {
?s a crm:E39_Actor .
?s ?p ?o .
FILTER(?p IN (hc:locatedIn, hc:institutionType, hc:partOf))
}
"""
# Execute and build graph
pass
def detect_communities(self, graph: ig.Graph) -> dict:
"""
Apply Leiden algorithm for community detection.
Returns mapping: community_id -> [ghcid_list]
"""
partition = leidenalg.find_partition(
graph,
leidenalg.ModularityVertexPartition
)
return {
str(i): [graph.vs[idx]["ghcid"] for idx in members]
for i, members in enumerate(partition)
}
def generate_community_summaries(
self,
communities: dict
) -> list[dict]:
"""
Generate LLM summary for each community.
Store in Qdrant for global search.
"""
summaries = []
for comm_id, ghcids in communities.items():
# Fetch metadata for all institutions
institutions = self.fetch_institution_metadata(ghcids)
# Generate summary with DSPy
summary = self.summarize_community(institutions)
summaries.append({
"community_id": comm_id,
"ghcids": ghcids,
"summary": summary,
"institution_count": len(ghcids)
})
return summaries
Benefit: Enables answering holistic questions like "What are the main archival themes in the Netherlands?"
Pattern D: Temporal Query Mode
# Extend SPARQL templates in template_sparql.py
TEMPORAL_QUERY_TEMPLATES = {
"point_in_time_state": """
PREFIX hc: <https://nde.nl/ontology/hc/>
SELECT ?ghcid ?name ?institutionType ?city WHERE {
?s hc:ghcid ?ghcid ;
skos:prefLabel ?name ;
hc:institutionType ?institutionType .
OPTIONAL { ?s schema:addressLocality ?city }
# Temporal filter for point-in-time query
?s hc:validFrom ?validFrom .
OPTIONAL { ?s hc:validTo ?validTo }
FILTER(?validFrom <= "{{ query_date }}"^^xsd:date)
FILTER(!BOUND(?validTo) || ?validTo > "{{ query_date }}"^^xsd:date)
}
""",
"institution_history": """
PREFIX hc: <https://nde.nl/ontology/hc/>
SELECT ?ghcid ?validFrom ?validTo ?changeType ?description WHERE {
?entry hc:ghcid "{{ ghcid }}" ;
hc:validFrom ?validFrom .
OPTIONAL { ?entry hc:validTo ?validTo }
OPTIONAL { ?entry hc:changeType ?changeType }
OPTIONAL { ?entry hc:changeDescription ?description }
}
ORDER BY ?validFrom
"""
}
Benefit: Answer "What was the structure of Noord-Hollands Archief before the 2001 merger?"
Priority 3: Long-Term Research
Pattern E: Hyperedge Event Modeling
Model complex heritage events (mergers, custody transfers) as hyperedges using CIDOC-CRM:
# RDF representation of custody transfer hyperedge
_:transfer_001 a crm:E10_Transfer_of_Custody ;
crm:P28_custody_surrendered_by <https://glam.nl/custodian/NL-NH-HAR-A-GA> ;
crm:P29_custody_received_by <https://glam.nl/custodian/NL-NH-HAR-A-NHA> ;
crm:P30_transferred_custody_of <https://glam.nl/collection/haarlem-archives> ;
crm:P4_has_time-span _:timespan_001 ;
hc:partOfEvent <https://glam.nl/event/nha-merger-2001> .
_:timespan_001 a crm:E52_Time-Span ;
crm:P82a_begin_of_the_begin "2001-01-01"^^xsd:date ;
crm:P82b_end_of_the_end "2001-01-01"^^xsd:date .
Benefit: Rich event modeling for heritage organizational changes.
Pattern F: Session Memory Evolution
Implement HGMEM-style working memory for multi-turn sessions:
# Extend session_manager.py
class EvolvingSessionMemory:
"""
Session memory that builds knowledge over conversation turns.
"""
def __init__(self, session_id: str):
self.session_id = session_id
self.memory_graph = {} # GHCID -> facts
self.explored_paths = []
self.unexplored_suggestions = []
def update_from_turn(
self,
query: str,
retrieved: list[dict],
response: str
) -> None:
"""
Update memory based on conversation turn.
Identify new connections between facts.
"""
pass
def suggest_next_exploration(self) -> list[str]:
"""
Suggest related queries based on memory patterns.
"You explored archives in Haarlem. Related:
Noord-Holland has 12 more archives you might find interesting."
"""
pass
Implementation Roadmap
Phase 1: Quick Wins (1-2 weeks)
| Task | File | Pattern |
|---|---|---|
| Add ArgumentVerifier | dspy_heritage_rag.py | Pattern A |
| Dual-level extraction | dspy_heritage_rag.py | Pattern B |
| Temporal SPARQL templates | template_sparql.py | Pattern D |
| Graph context enrichment | dspy_heritage_rag.py | TDS Pattern |
Phase 2: Infrastructure (2-4 weeks)
| Task | File | Pattern |
|---|---|---|
| Community detection | community_indexer.py | Pattern C |
| Community summary storage | Qdrant schema | Pattern C |
| Global search mode | dspy_heritage_rag.py | GraphRAG |
| Enhanced session memory | session_manager.py | Pattern F |
Phase 3: Advanced Features (1-2 months)
| Task | File | Pattern |
|---|---|---|
| Event hyperedge modeling | Oxigraph schema | Pattern E |
| MCTS graph exploration | graph_explorer.py | Graph-O1 |
| Multi-step memory evolution | session_manager.py | HGMEM |
| Exploration suggestions | dspy_heritage_rag.py | HGMEM |
Anti-Patterns to Avoid
1. Over-Engineering the Graph
Bad: Try to extract every possible relation into explicit edges.
Good: Use minimal graph structure, infer relations via search.
From TDS article: "A simple graph structure—even a star graph—can still support complex queries when combined with iterative search-space refinement."
2. Adding New Frameworks
Bad: Add LangChain, LlamaIndex, Neo4j, etc.
Good: Implement patterns within existing TypeDB/Oxigraph/DSPy stack.
We already have a working stack. New frameworks add complexity without proportional benefit.
3. Ignoring Vector Search Limitations
Bad: Rely only on vector similarity for alphanumeric IDs (GHCID, ISIL).
Good: Use graph context to enrich vector queries.
Alphanumeric identifiers have weak embeddings. Always combine with graph-based filtering.
4. Generating Without Verification
Bad: Pass retrieved context directly to LLM for answer generation.
Good: Verify context sufficiency before generation; retry if insufficient.
ROGRAG shows argument checking outperforms result checking (75% vs 72% accuracy).
5. Flat Retrieval for Holistic Questions
Bad: Answer "What are the main archival themes?" with chunk-level retrieval.
Good: Use community summaries for holistic/global questions.
Microsoft GraphRAG was specifically designed to solve this: "RAG fails on global questions directed at an entire text corpus."
Conclusion
Our existing TypeDB-Oxigraph-DSPy architecture is well-positioned to incorporate advanced GraphRAG patterns without adding new frameworks. The key enhancements are:
- Verification Layer: Prevent hallucination with argument checking
- Dual-Level Retrieval: Combine entity + relation matching
- Community Summaries: Enable global/holistic questions
- Temporal Query Mode: Leverage our existing GHCID history
- Graph Context Enrichment: Fix weak embeddings for identifiers
These patterns build on our strengths (template SPARQL, semantic caching, ontology grounding) while addressing gaps (global questions, multi-step reasoning, verification).
Temporal Knowledge Graph Patterns
Overview
Temporal Knowledge Graphs (TKGs) extend traditional KGs with time-aware capabilities, enabling queries like:
- "What was the status of this archive in 2001?"
- "Which museums merged between 1990-2010?"
- "How has the collection size changed over time?"
GLAM Already Has: Our ghcid_history with valid_from/valid_to provides basic temporal tracking. These patterns enhance it.
8.1 STAR-RAG: Time-Aligned Rule Graphs (arXiv:2510.16715)
Core Innovation: Combines temporal reasoning rules with RAG retrieval using time-aligned graph structures.
Key Components:
- Temporal Rule Extraction: Identifies temporal patterns in data (e.g., "archives that merged → new GHCID issued")
- Time-Aligned Subgraphs: Groups facts by temporal validity
- Rule-Guided Retrieval: Uses rules to expand/filter retrieval
Applicable Pattern for GLAM:
# Pattern: Temporal Rule-Based Query Expansion
class TemporalRuleEngine:
"""
Apply temporal rules to expand queries with time constraints.
Example Rules:
- IF merger_event(A, B, date) THEN ghcid_change(A, date) AND ghcid_change(B, date)
- IF founding_date(X) < 1900 THEN historical_institution(X)
- IF valid_to(fact) != NULL THEN superseded_fact(fact)
"""
TEMPORAL_RULES = [
{
"name": "merger_implies_ghcid_change",
"antecedent": "?event a hc:MergerEvent ; hc:date ?date",
"consequent": "?event hc:triggersGHCIDChange true"
},
{
"name": "historical_institution",
"antecedent": "?inst schema:foundingDate ?date . FILTER(?date < '1900-01-01'^^xsd:date)",
"consequent": "?inst hc:historicalPeriod 'pre-1900'"
},
{
"name": "active_vs_superseded",
"antecedent": "?fact hc:validTo ?endDate . FILTER(BOUND(?endDate))",
"consequent": "?fact hc:status 'superseded'"
}
]
def expand_query_with_rules(
self,
base_query: str,
query_date: Optional[datetime] = None
) -> str:
"""
Expand SPARQL query with temporal rule inferences.
Args:
base_query: Original SPARQL query
query_date: Point-in-time for temporal filtering
Returns:
Expanded query with rule-based clauses
"""
# Add temporal validity filter
if query_date:
temporal_filter = f"""
FILTER(
?validFrom <= "{query_date.isoformat()}"^^xsd:date &&
(!BOUND(?validTo) || ?validTo > "{query_date.isoformat()}"^^xsd:date)
)
"""
# Inject into WHERE clause
base_query = self._inject_filter(base_query, temporal_filter)
# Apply inference rules
for rule in self.TEMPORAL_RULES:
if self._rule_applies(base_query, rule):
base_query = self._apply_rule(base_query, rule)
return base_query
def detect_temporal_intent(self, question: str) -> dict:
"""
Detect temporal aspects of user question.
Returns:
{
'has_temporal_constraint': bool,
'query_date': Optional[datetime],
'temporal_relation': 'before' | 'after' | 'during' | 'at' | None,
'event_type': 'founding' | 'merger' | 'closure' | None
}
"""
# Pattern matching for temporal expressions
patterns = {
'point_in_time': r'(?:in|during|around)\s+(\d{4})',
'before': r'before\s+(\d{4})',
'after': r'after\s+(\d{4})|since\s+(\d{4})',
'range': r'between\s+(\d{4})\s+and\s+(\d{4})',
'founding': r'founded|established|created|opened',
'merger': r'merged|combined|joined',
'closure': r'closed|dissolved|ceased'
}
# Implementation...
pass
Integration with template_sparql.py:
# Add to TemplateSPARQLPipeline
TEMPORAL_QUERY_TEMPLATES = {
"point_in_time_state": """
{{ prefixes }}
SELECT ?ghcid ?name ?type ?city WHERE {
?s a crm:E39_Actor ;
hc:ghcid ?ghcid ;
skos:prefLabel ?name ;
hc:institutionType ?type .
OPTIONAL { ?s schema:addressLocality ?city }
# Temporal validity filter (STAR-RAG pattern)
?s hc:validFrom ?validFrom .
OPTIONAL { ?s hc:validTo ?validTo }
FILTER(?validFrom <= "{{ query_date }}"^^xsd:date)
FILTER(!BOUND(?validTo) || ?validTo > "{{ query_date }}"^^xsd:date)
}
ORDER BY ?name
LIMIT {{ limit }}
""",
"institution_timeline": """
{{ prefixes }}
SELECT ?ghcid ?validFrom ?validTo ?changeType ?description WHERE {
?entry hc:ghcid "{{ ghcid }}" ;
hc:validFrom ?validFrom .
OPTIONAL { ?entry hc:validTo ?validTo }
OPTIONAL { ?entry hc:changeType ?changeType }
OPTIONAL { ?entry hc:changeDescription ?description }
}
ORDER BY ?validFrom
""",
"events_in_period": """
{{ prefixes }}
SELECT ?event ?eventType ?date ?actor1 ?actor2 ?description WHERE {
?event a hc:OrganizationalChangeEvent ;
hc:eventType ?eventType ;
hc:eventDate ?date .
OPTIONAL { ?event hc:affectedActor ?actor1 }
OPTIONAL { ?event hc:resultingActor ?actor2 }
OPTIONAL { ?event schema:description ?description }
FILTER(?date >= "{{ start_date }}"^^xsd:date)
FILTER(?date <= "{{ end_date }}"^^xsd:date)
}
ORDER BY ?date
"""
}
8.2 TimeR4: Retrieve-Rewrite-Retrieve-Rerank (EMNLP 2024)
Core Innovation: Four-stage temporal QA pipeline that iteratively refines queries.
Key Stages:
- Retrieve: Initial retrieval with temporal keywords
- Rewrite: LLM rewrites query to be more temporally precise
- Retrieve: Second retrieval with refined query
- Rerank: Time-aware reranking of results
Applicable Pattern for GLAM:
# Pattern: TimeR4 Multi-Stage Temporal Retrieval
class TemporalMultiStageRetriever(dspy.Module):
"""
Four-stage temporal retrieval following TimeR4 pattern.
Improves recall for temporal queries by iterative refinement.
"""
def __init__(self):
super().__init__()
# Stage 2: Query rewriter
self.query_rewriter = dspy.ChainOfThought(
"original_query, initial_results, temporal_context -> refined_query, temporal_constraints"
)
# Stage 4: Temporal reranker
self.temporal_reranker = dspy.ChainOfThought(
"query, results, query_date -> ranked_results, temporal_scores"
)
def forward(
self,
question: str,
retrieve_fn: Callable[[str], list[dict]],
query_date: Optional[datetime] = None
) -> Prediction:
"""
Execute TimeR4 pattern.
Args:
question: User's temporal question
retrieve_fn: Retrieval function (SPARQL or vector)
query_date: Extracted temporal constraint
"""
# STAGE 1: Initial Retrieve
initial_results = retrieve_fn(question)
if not initial_results:
return Prediction(results=[], stages_used=1)
# Extract temporal context from initial results
temporal_context = self._extract_temporal_context(initial_results)
# STAGE 2: Rewrite query for temporal precision
rewritten = self.query_rewriter(
original_query=question,
initial_results=self._summarize_results(initial_results),
temporal_context=temporal_context
)
refined_query = rewritten.refined_query
temporal_constraints = rewritten.temporal_constraints
# STAGE 3: Retrieve with refined query
refined_results = retrieve_fn(refined_query)
# Merge results (union with dedup)
all_results = self._merge_results(initial_results, refined_results)
# STAGE 4: Temporal Rerank
if query_date and len(all_results) > 1:
ranked = self.temporal_reranker(
query=question,
results=all_results,
query_date=query_date.isoformat()
)
final_results = ranked.ranked_results
else:
final_results = all_results
return Prediction(
results=final_results,
stages_used=4,
refined_query=refined_query,
temporal_constraints=temporal_constraints
)
def _extract_temporal_context(self, results: list[dict]) -> str:
"""Extract temporal information from initial results."""
dates = []
for r in results:
if 'founding_date' in r:
dates.append(f"founded {r['founding_date']}")
if 'valid_from' in r:
dates.append(f"valid from {r['valid_from']}")
if 'event_date' in r:
dates.append(f"event on {r['event_date']}")
return "; ".join(dates[:10])
8.3 T-GRAG: Temporal Conflict Resolution (arXiv:2508.01680)
Core Innovation: Handles conflicting temporal facts gracefully.
Key Pattern: When facts contradict across time periods, T-GRAG:
- Identifies the conflict
- Determines temporal validity of each fact
- Returns the fact valid for the query time
- Optionally explains the conflict
Applicable Pattern for GLAM:
# Pattern: Temporal Conflict Detection and Resolution
class TemporalConflictResolver:
"""
Detect and resolve conflicting facts across time periods.
Common conflicts in heritage data:
- Same GHCID assigned to different institutions (after merger)
- Institution name changed but old name still in some records
- Location changed (relocation event)
- Classification changed (museum → archive)
"""
CONFLICT_TYPES = [
"name_change", # Institution renamed
"location_change", # Institution relocated
"type_change", # Classification changed
"ghcid_succession", # GHCID reused after closure
"data_superseded" # Newer data overrides older
]
def detect_conflicts(
self,
ghcid: str,
facts: list[dict]
) -> list[dict]:
"""
Detect temporal conflicts in facts about an institution.
Returns list of conflict descriptions.
"""
conflicts = []
# Group facts by property
by_property = defaultdict(list)
for fact in facts:
by_property[fact['property']].append(fact)
# Check each property for conflicts
for prop, prop_facts in by_property.items():
if len(prop_facts) > 1:
# Check for overlapping validity periods
for i, fact1 in enumerate(prop_facts):
for fact2 in prop_facts[i+1:]:
if self._periods_overlap(fact1, fact2):
if fact1['value'] != fact2['value']:
conflicts.append({
'type': self._classify_conflict(prop),
'property': prop,
'fact1': fact1,
'fact2': fact2,
'resolution_needed': True
})
return conflicts
def resolve_for_date(
self,
conflicts: list[dict],
query_date: datetime
) -> dict:
"""
Resolve conflicts for a specific query date.
Returns the authoritative fact for each conflicting property.
"""
resolutions = {}
for conflict in conflicts:
# Find fact valid at query_date
for fact in [conflict['fact1'], conflict['fact2']]:
valid_from = self._parse_date(fact.get('valid_from'))
valid_to = self._parse_date(fact.get('valid_to'))
if valid_from <= query_date:
if valid_to is None or valid_to > query_date:
resolutions[conflict['property']] = {
'value': fact['value'],
'source': fact,
'conflict_type': conflict['type'],
'note': f"Resolved for date {query_date.isoformat()}"
}
break
return resolutions
def generate_conflict_explanation(
self,
conflict: dict,
language: str = "nl"
) -> str:
"""
Generate human-readable explanation of conflict.
For the RAG answer generation step.
"""
templates = {
"name_change": {
"nl": "Let op: deze instelling heette '{old}' tot {date}, daarna '{new}'.",
"en": "Note: this institution was named '{old}' until {date}, then '{new}'."
},
"location_change": {
"nl": "Deze instelling is verhuisd van {old} naar {new} op {date}.",
"en": "This institution relocated from {old} to {new} on {date}."
},
"ghcid_succession": {
"nl": "De GHCID {ghcid} was eerder toegekend aan {old}, nu aan {new}.",
"en": "GHCID {ghcid} was previously assigned to {old}, now to {new}."
}
}
# Format template with conflict details
template = templates.get(conflict['type'], {}).get(language, "")
return template.format(**self._extract_template_vars(conflict))
Integration with GHCID History:
# Extend GHCIDHistoryEntry handling
def query_ghcid_at_date(
ghcid: str,
query_date: datetime,
oxigraph_client: OxigraphClient
) -> dict:
"""
Query GHCID state at a specific point in time.
Uses ghcid_history to find the valid record.
"""
sparql = f"""
PREFIX hc: <https://nde.nl/ontology/hc/>
SELECT ?name ?type ?city ?validFrom ?validTo ?changeReason WHERE {{
?entry hc:ghcid "{ghcid}" ;
skos:prefLabel ?name ;
hc:institutionType ?type ;
hc:validFrom ?validFrom .
OPTIONAL {{ ?entry schema:addressLocality ?city }}
OPTIONAL {{ ?entry hc:validTo ?validTo }}
OPTIONAL {{ ?entry hc:changeReason ?changeReason }}
FILTER(?validFrom <= "{query_date.isoformat()}"^^xsd:date)
FILTER(!BOUND(?validTo) || ?validTo > "{query_date.isoformat()}"^^xsd:date)
}}
"""
return oxigraph_client.query(sparql)
8.4 DyG-RAG: Dynamic Event Units (Emerging Pattern)
Core Innovation: Models events as first-class temporal entities with "Dynamic Event Units" (DEUs).
Key Concepts:
- DEU: Self-contained event with participants, time, location, and outcome
- Temporal Anchors: Points connecting DEUs to timeline
- Event Chains: Sequences of related DEUs
Applicable Pattern for GLAM:
# Pattern: Dynamic Event Units for Heritage Change Events
@dataclass
class DynamicEventUnit:
"""
First-class event representation following DyG-RAG pattern.
Maps directly to CIDOC-CRM E5_Event and LinkML ChangeEvent.
"""
event_id: str
event_type: str # MERGER, FOUNDING, CLOSURE, RELOCATION, etc.
# Temporal anchors
start_date: datetime
end_date: Optional[datetime] = None
# Participants
actors: list[str] # GHCIDs of involved institutions
collections: list[str] = field(default_factory=list) # Affected collections
# Outcomes
resulting_actors: list[str] = field(default_factory=list)
ghcid_changes: list[dict] = field(default_factory=list)
# Provenance
source_document: Optional[str] = None
confidence: float = 1.0
def to_sparql_insert(self) -> str:
"""Generate SPARQL INSERT for this DEU."""
return f"""
PREFIX hc: <https://nde.nl/ontology/hc/>
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
INSERT DATA {{
<{self.event_id}> a crm:E5_Event, hc:OrganizationalChangeEvent ;
hc:eventType "{self.event_type}" ;
crm:P4_has_time-span [
crm:P82a_begin_of_the_begin "{self.start_date.isoformat()}"^^xsd:date
{f'; crm:P82b_end_of_the_end "{self.end_date.isoformat()}"^^xsd:date' if self.end_date else ''}
] ;
hc:confidence {self.confidence} .
# Link actors
{self._actor_triples()}
# Link outcomes
{self._outcome_triples()}
}}
"""
class DynamicEventRAG:
"""
RAG system using Dynamic Event Units for temporal reasoning.
"""
def retrieve_events_for_query(
self,
question: str,
time_range: tuple[datetime, datetime]
) -> list[DynamicEventUnit]:
"""
Retrieve relevant events for temporal question.
Uses both SPARQL (structured) and vector (semantic) retrieval.
"""
# SPARQL: Get events in time range
sparql_events = self._sparql_event_query(time_range)
# Vector: Semantic match on event descriptions
vector_events = self._vector_event_search(question)
# Merge and deduplicate
all_events = self._merge_events(sparql_events, vector_events)
# Build event chains
chains = self._identify_event_chains(all_events)
return all_events, chains
def _identify_event_chains(
self,
events: list[DynamicEventUnit]
) -> list[list[DynamicEventUnit]]:
"""
Identify chains of related events.
E.g., FOUNDING → MERGER → NAME_CHANGE → RELOCATION
"""
# Group by affected actors
by_actor = defaultdict(list)
for event in events:
for actor in event.actors + event.resulting_actors:
by_actor[actor].append(event)
# Build chains ordered by time
chains = []
for actor, actor_events in by_actor.items():
if len(actor_events) > 1:
chain = sorted(actor_events, key=lambda e: e.start_date)
chains.append(chain)
return chains
8.5 Mapping Temporal Patterns to GLAM Stack
| Pattern | GLAM Component | Implementation Location |
|---|---|---|
| Temporal Rule Engine | template_sparql.py | New TemporalRuleEngine class |
| TimeR4 Multi-Stage | dspy_heritage_rag.py | Extend MultiHopHeritageRetriever |
| Conflict Resolution | schema_loader.py | New TemporalConflictResolver |
| Dynamic Event Units | Oxigraph + LinkML | New DynamicEventUnit dataclass |
| Point-in-Time Query | template_sparql.py | New SPARQL templates |
| Event Chain Detection | dspy_heritage_rag.py | New EventChainAnalyzer module |
Priority Integration:
# Add to HeritageQueryRouter in dspy_heritage_rag.py
def _detect_temporal_query(self, question: str) -> Optional[dict]:
"""
Detect if query has temporal dimension.
Returns temporal context if found.
"""
temporal_patterns = {
'point_in_time': r'(?:in|during|around|before|after)\s+(\d{4})',
'date_range': r'(?:between|from)\s+(\d{4})\s+(?:and|to)\s+(\d{4})',
'event_reference': r'(?:when|after|before)\s+(?:the\s+)?(?:merger|founding|closure)',
'historical': r'(?:historical|originally|formerly|used to be)',
}
for pattern_type, regex in temporal_patterns.items():
match = re.search(regex, question.lower())
if match:
return {
'type': pattern_type,
'match': match.group(0),
'year': match.group(1) if match.groups() else None,
}
return None
Semantic Routing Patterns
Overview
Semantic routing enables intelligent query dispatch to specialized backends based on query intent, entity types, and semantic similarity. This is critical for GLAM where queries may target:
- Institutions (museums, archives, libraries) → Oxigraph SPARQL + Qdrant
heritage_custodians - People (staff, curators, archivists) → Qdrant
heritage_persons - Collections → Oxigraph + future Qdrant collection
- Locations → PostGIS (future) + Oxigraph
- Historical events → Temporal query subsystem
GLAM Already Has: HeritageQueryRouter with intent classification and FykeFilter for relevance. These patterns enhance routing precision.
9.1 vLLM Semantic Router: Signal-Decision Architecture
Core Innovation: Separates routing into two phases:
- Signal Extraction: Extract semantic signals from query (intent, entities, domain)
- Decision Making: Map signals to backend routes using rules + ML
Key Insight from vLLM Semantic Router v0.1 Iris:
"Routing should be a classifier problem, not an LLM generation problem."
Applicable Pattern for GLAM:
# Pattern: Signal-Decision Semantic Router
from dataclasses import dataclass
from typing import Literal, Optional
import numpy as np
@dataclass
class QuerySignals:
"""Semantic signals extracted from query."""
# Primary signals
intent: Literal["geographic", "statistical", "relational", "temporal",
"entity_lookup", "comparative", "exploration"]
entity_type: Literal["person", "institution", "collection", "location", "event", "mixed"]
# Secondary signals
language: str
has_temporal_constraint: bool
has_geographic_constraint: bool
requires_aggregation: bool
# Extracted entities
institution_mentions: list[str]
person_mentions: list[str]
location_mentions: list[str]
# Confidence
signal_confidence: float
class SemanticSignalExtractor:
"""
Phase 1: Extract semantic signals from query.
Uses lightweight models (embeddings + rules) NOT LLM calls.
This is the "signal" phase - fast and deterministic.
"""
def __init__(self):
# Intent classifier: trained on heritage query examples
self.intent_embeddings = self._load_intent_embeddings()
# Entity extractors: pattern-based + NER
self.institution_patterns = self._compile_institution_patterns()
self.person_indicators = ["curator", "archivist", "director", "medewerker",
"who works", "wie werkt", "staff", "personeel"]
def extract_signals(self, query: str) -> QuerySignals:
"""
Extract all semantic signals from query.
This is a FAST operation - no LLM calls.
"""
query_lower = query.lower()
# Intent classification via embedding similarity
query_embedding = self._embed_query(query)
intent = self._classify_intent(query_embedding)
# Entity type detection
entity_type = self._detect_entity_type(query_lower)
# Constraint detection
has_temporal = self._has_temporal_pattern(query_lower)
has_geographic = self._has_geographic_pattern(query_lower)
requires_aggregation = self._requires_aggregation(query_lower)
# Entity extraction
institutions = self._extract_institutions(query)
persons = self._extract_persons(query)
locations = self._extract_locations(query)
return QuerySignals(
intent=intent,
entity_type=entity_type,
language=self._detect_language(query),
has_temporal_constraint=has_temporal,
has_geographic_constraint=has_geographic,
requires_aggregation=requires_aggregation,
institution_mentions=institutions,
person_mentions=persons,
location_mentions=locations,
signal_confidence=0.85 # Based on extraction quality
)
def _classify_intent(self, query_embedding: np.ndarray) -> str:
"""
Classify intent via cosine similarity to intent exemplars.
No LLM needed - pure embedding comparison.
"""
similarities = {}
for intent, exemplar_embeddings in self.intent_embeddings.items():
# Average similarity to exemplars
sims = np.dot(exemplar_embeddings, query_embedding)
similarities[intent] = float(np.mean(sims))
return max(similarities, key=similarities.get)
def _detect_entity_type(self, query_lower: str) -> str:
"""Detect primary entity type in query."""
person_score = sum(1 for p in self.person_indicators if p in query_lower)
institution_score = sum(1 for p in ["museum", "archief", "bibliotheek",
"archive", "library", "instelling"]
if p in query_lower)
if person_score > 0 and institution_score > 0:
return "mixed"
elif person_score > 0:
return "person"
elif institution_score > 0:
return "institution"
else:
return "institution" # Default
class SemanticDecisionRouter:
"""
Phase 2: Route query to backends based on signals.
This is the "decision" phase - applies routing rules.
"""
# Routing rules: signal patterns → backend configuration
ROUTING_RULES = [
# Person queries
{
"condition": lambda s: s.entity_type == "person",
"primary_backend": "qdrant_persons",
"secondary_backend": "sparql_persons",
"collection": "heritage_persons",
},
# Institution + temporal
{
"condition": lambda s: s.entity_type == "institution" and s.has_temporal_constraint,
"primary_backend": "sparql_temporal",
"secondary_backend": "qdrant_custodians",
"use_temporal_templates": True,
},
# Institution + geographic
{
"condition": lambda s: s.entity_type == "institution" and s.has_geographic_constraint,
"primary_backend": "sparql_geo",
"secondary_backend": "qdrant_custodians",
},
# Institution + aggregation (statistical)
{
"condition": lambda s: s.entity_type == "institution" and s.requires_aggregation,
"primary_backend": "sparql", # SPARQL COUNT/SUM aggregations
"secondary_backend": "qdrant",
},
# Default institution query
{
"condition": lambda s: s.entity_type == "institution",
"primary_backend": "qdrant_custodians",
"secondary_backend": "sparql",
},
]
def route(self, signals: QuerySignals) -> dict:
"""
Apply routing rules to determine backends.
Returns routing configuration.
"""
for rule in self.ROUTING_RULES:
if rule["condition"](signals):
return {
"primary": rule["primary_backend"],
"secondary": rule.get("secondary_backend"),
"collection": rule.get("collection"),
"use_temporal": rule.get("use_temporal_templates", False),
"signals": signals,
}
# Fallback
return {
"primary": "qdrant_custodians",
"secondary": "sparql",
"signals": signals,
}
9.2 Integration with Existing FykeFilter and TemplateClassifier
Current GLAM Pipeline:
Query → ConversationContextResolver → FykeFilter → TemplateClassifier → SlotExtractor → SPARQL
Enhanced Pipeline with Semantic Routing:
Query → ConversationContextResolver → FykeFilter → SemanticSignalExtractor
↓
SemanticDecisionRouter
↓
┌─────────────────┼─────────────────┐
↓ ↓ ↓
TemplateClassifier PersonRetriever SPARQLAggregation
↓ ↓ ↓
SPARQL Qdrant SPARQL
Implementation in dspy_heritage_rag.py:
# Extend HeritageQueryRouter with semantic routing
class EnhancedHeritageQueryRouter(dspy.Module):
"""
Enhanced router with Signal-Decision architecture.
Uses lightweight signal extraction before LLM classification.
Falls back to LLM only when signals are ambiguous.
"""
def __init__(self, use_schema_aware: Optional[bool] = None, fast_lm: Optional[dspy.LM] = None):
super().__init__()
# Lightweight signal extraction (no LLM)
self.signal_extractor = SemanticSignalExtractor()
self.decision_router = SemanticDecisionRouter()
# LLM fallback for ambiguous cases
self.fast_lm = fast_lm
if use_schema_aware is None:
use_schema_aware = SCHEMA_LOADER_AVAILABLE
if use_schema_aware:
signature = get_schema_aware_query_intent_signature()
else:
signature = HeritageQueryIntent
self.llm_classifier = dspy.ChainOfThought(signature)
def forward(self, question: str, language: str = "nl", history: History = None) -> Prediction:
"""
Route query using Signal-Decision pattern.
1. Extract signals (fast, no LLM)
2. If high confidence → route directly
3. If low confidence → use LLM classification
"""
# Phase 1: Signal extraction
signals = self.signal_extractor.extract_signals(question)
# Phase 2: Decision routing
if signals.signal_confidence >= 0.8:
# High confidence - route without LLM
route_config = self.decision_router.route(signals)
return Prediction(
intent=signals.intent,
entity_type=signals.entity_type,
entities=signals.institution_mentions + signals.person_mentions,
sources=self._config_to_sources(route_config),
resolved_question=question,
routing_method="signal_based",
route_config=route_config,
)
# Low confidence - fall back to LLM
if history is None:
history = History(messages=[])
if self.fast_lm:
with dspy.settings.context(lm=self.fast_lm):
result = self.llm_classifier(question=question, language=language, history=history)
else:
result = self.llm_classifier(question=question, language=language, history=history)
# Merge LLM result with signal-based routing
signals.intent = result.intent
signals.entity_type = result.entity_type
route_config = self.decision_router.route(signals)
return Prediction(
intent=result.intent,
entity_type=result.entity_type,
entities=result.entities,
sources=self._config_to_sources(route_config),
resolved_question=result.resolved_question,
reasoning=result.reasoning,
routing_method="llm_enhanced",
route_config=route_config,
)
9.3 Multi-Index Routing with Qdrant
Pattern: Route to different Qdrant collections based on entity type.
# Pattern: Multi-Collection Qdrant Router
class QdrantMultiIndexRouter:
"""
Route queries to appropriate Qdrant collections.
Collections:
- heritage_custodians: Museums, archives, libraries, etc.
- heritage_persons: Staff, curators, archivists, etc.
- heritage_collections: (Future) Collection-level data
- heritage_events: (Future) Organizational change events
"""
COLLECTION_CONFIGS = {
"heritage_custodians": {
"entity_types": ["institution"],
"payload_filters": ["institution_type", "country_code", "region_code"],
"embedding_field": "description_embedding",
},
"heritage_persons": {
"entity_types": ["person"],
"payload_filters": ["custodian_slug", "role_category", "institution_type"],
"embedding_field": "profile_embedding",
},
}
def __init__(self, qdrant_client: QdrantClient):
self.client = qdrant_client
def search(
self,
query: str,
route_config: dict,
limit: int = 10
) -> list[dict]:
"""
Search appropriate collection(s) based on routing.
"""
primary = route_config.get("primary", "qdrant_custodians")
# Map route to collection
if "persons" in primary:
collection = "heritage_persons"
elif "custodians" in primary:
collection = "heritage_custodians"
else:
collection = "heritage_custodians"
# Build filters from signals
signals = route_config.get("signals")
filters = self._build_filters(signals, collection)
# Execute search
results = self.client.search(
collection_name=collection,
query_vector=self._embed_query(query),
query_filter=filters,
limit=limit,
)
return [self._format_result(r) for r in results]
def _build_filters(self, signals: QuerySignals, collection: str) -> Optional[Filter]:
"""
Build Qdrant filter from query signals.
"""
if signals is None:
return None
conditions = []
# Filter by institution type if mentioned
if signals.institution_mentions and collection == "heritage_custodians":
# Extract institution type from mentions
inst_type = self._infer_institution_type(signals.institution_mentions)
if inst_type:
conditions.append(
FieldCondition(key="institution_type", match=MatchValue(value=inst_type))
)
# Filter persons by custodian if institution mentioned
if signals.institution_mentions and collection == "heritage_persons":
slug = self._institution_to_slug(signals.institution_mentions[0])
if slug:
conditions.append(
FieldCondition(key="custodian_slug", match=MatchValue(value=slug))
)
# Filter by location if geographic constraint
if signals.has_geographic_constraint and signals.location_mentions:
loc = signals.location_mentions[0]
conditions.append(
FieldCondition(key="city", match=MatchText(text=loc))
)
if conditions:
return Filter(must=conditions)
return None
9.4 Intent Detection with Semantic Similarity
Pattern: Use embedding similarity for intent classification without LLM.
# Pattern: Embedding-Based Intent Classifier
class EmbeddingIntentClassifier:
"""
Classify query intent using semantic similarity to exemplars.
Faster than LLM, good for common query patterns.
"""
# Intent exemplars (in Dutch and English)
INTENT_EXEMPLARS = {
"geographic": [
"Welke musea zijn er in Amsterdam?",
"Which archives are located in Noord-Holland?",
"Toon me bibliotheken in Utrecht",
"Museums near Rotterdam",
],
"statistical": [
"Hoeveel archieven zijn er in Nederland?",
"How many museums have a rating above 4?",
"Count libraries by province",
"Verdeling van instellingen per type",
],
"entity_lookup": [
"Wat is het Rijksmuseum?",
"Tell me about Nationaal Archief",
"Informatie over de KB",
"Details of Stadsarchief Amsterdam",
],
"temporal": [
"Welke musea zijn opgericht voor 1900?",
"Archives that merged in 2001",
"History of Noord-Hollands Archief",
"Oldest libraries in the Netherlands",
],
"relational": [
"Welke archieven zijn onderdeel van KVAN?",
"Museums connected to Rijksmuseum",
"Archives that share collections",
"Networks of heritage institutions",
],
}
def __init__(self):
self._exemplar_embeddings = None
self._model = None
def _ensure_loaded(self):
if self._exemplar_embeddings is not None:
return
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2")
# Pre-compute exemplar embeddings
self._exemplar_embeddings = {}
for intent, exemplars in self.INTENT_EXEMPLARS.items():
embeddings = self._model.encode(exemplars, convert_to_numpy=True)
self._exemplar_embeddings[intent] = embeddings
def classify(self, query: str) -> tuple[str, float]:
"""
Classify intent and return (intent, confidence).
"""
self._ensure_loaded()
query_embedding = self._model.encode([query], convert_to_numpy=True)[0]
intent_scores = {}
for intent, embeddings in self._exemplar_embeddings.items():
# Cosine similarity
norms = np.linalg.norm(embeddings, axis=1) * np.linalg.norm(query_embedding)
similarities = np.dot(embeddings, query_embedding) / norms
intent_scores[intent] = float(np.max(similarities))
best_intent = max(intent_scores, key=intent_scores.get)
confidence = intent_scores[best_intent]
return best_intent, confidence
9.5 Mapping Routing Patterns to GLAM Stack
| Pattern | GLAM Component | Implementation |
|---|---|---|
| Signal Extraction | dspy_heritage_rag.py | SemanticSignalExtractor class |
| Decision Routing | dspy_heritage_rag.py | SemanticDecisionRouter class |
| Multi-Index Qdrant | dspy_heritage_rag.py | QdrantMultiIndexRouter class |
| Intent Embedding | template_sparql.py | EmbeddingIntentClassifier class |
| Person Query Route | dspy_heritage_rag.py | Route to heritage_persons collection |
| Temporal Query Route | template_sparql.py | Use temporal SPARQL templates |
Integration Priority:
- Immediate: Add
entity_typerouting to distinguish person vs institution queries - Short-term: Implement embedding-based intent classification as pre-filter
- Medium-term: Add SPARQL aggregation templates for statistical queries (COUNT, SUM, AVG)
Hypergraph Patterns Deep Dive
Overview
Hypergraphs extend traditional graphs by allowing edges (hyperedges) to connect more than two nodes. This is powerful for heritage data where:
- Custody transfers involve: source custodian, target custodian, collection, date, legal basis
- Mergers involve: multiple source institutions, resulting institution, date, staff transfers
- Collection accessions involve: collection, donor, custodian, provenance chain, date range
Why Hypergraphs for GLAM?
Traditional binary edges force artificial decomposition:
# Binary edges (limited)
NHA --merged_from--> Gemeentearchief_Haarlem
NHA --merged_from--> Rijksarchief_Noord_Holland
NHA --merger_date--> 2001-01-01 # Loses connection to sources!
Hyperedges capture the full event:
# Hyperedge (complete)
HYPEREDGE:Merger_2001 {
type: MERGER
sources: [Gemeentearchief_Haarlem, Rijksarchief_Noord_Holland]
result: Noord_Hollands_Archief
date: 2001-01-01
staff_transferred: 45
collections_merged: [Municipal_Records, Provincial_Archives]
}
GLAM Already Has: Organizational change events in LinkML schema (ChangeEvent, CustodianTimelineEvent). Hypergraph patterns enhance retrieval for these complex events.
10.1 Hyperedge Construction from CIDOC-CRM Events
Pattern: Map heritage change events to hyperedges using CIDOC-CRM event classes.
# Pattern: Hyperedge Construction Pipeline
from dataclasses import dataclass, field
from typing import Literal, Optional
from datetime import datetime
import hashlib
@dataclass
class Hyperedge:
"""
A hyperedge connecting multiple entities through an event.
Maps to CIDOC-CRM event classes:
- crm:E10_Transfer_of_Custody
- crm:E8_Acquisition_Event
- crm:E66_Formation (founding)
- crm:E68_Dissolution (closure)
- crm:E9_Move (relocation)
"""
hyperedge_id: str
event_type: Literal["custody_transfer", "merger", "founding", "closure",
"relocation", "name_change", "acquisition"]
# Connected entities (the hyperedge connects ALL of these)
source_custodians: list[str] # GHCID URIs
target_custodians: list[str] # GHCID URIs
collections: list[str] # Collection URIs
locations: list[str] # Location URIs
people: list[str] # Person URIs (staff involved)
# Temporal bounds
event_date: datetime
event_end_date: Optional[datetime] = None # For processes
# Metadata
description: str = ""
confidence: float = 1.0
provenance: str = ""
# Vector embedding for semantic search
embedding: Optional[list[float]] = None
@property
def all_connected_entities(self) -> list[str]:
"""All entities connected by this hyperedge."""
return (
self.source_custodians +
self.target_custodians +
self.collections +
self.locations +
self.people
)
@property
def entity_count(self) -> int:
"""Number of entities connected (hyperedge cardinality)."""
return len(self.all_connected_entities)
class HyperedgeConstructor:
"""
Construct hyperedges from heritage change events.
Sources:
- CustodianTimelineEvent YAML files
- ChangeEvent entries in custodian YAML
- SPARQL query results from Oxigraph
"""
# CIDOC-CRM event type mapping
EVENT_TYPE_MAPPING = {
"FOUNDING": ("crm:E66_Formation", "founding"),
"CLOSURE": ("crm:E68_Dissolution", "closure"),
"MERGER": ("crm:E10_Transfer_of_Custody", "merger"),
"ACQUISITION": ("crm:E8_Acquisition_Event", "acquisition"),
"RELOCATION": ("crm:E9_Move", "relocation"),
"CUSTODY_TRANSFER": ("crm:E10_Transfer_of_Custody", "custody_transfer"),
"NAME_CHANGE": ("crm:E13_Attribute_Assignment", "name_change"),
}
def construct_from_change_event(
self,
event: dict,
custodian: dict
) -> Hyperedge:
"""
Construct hyperedge from a ChangeEvent entry.
Args:
event: ChangeEvent dict from custodian YAML
custodian: Parent custodian dict
Returns:
Hyperedge connecting all entities involved
"""
change_type = event.get("change_type", "UNKNOWN")
crm_class, event_type = self.EVENT_TYPE_MAPPING.get(
change_type, ("crm:E5_Event", "unknown")
)
# Generate stable hyperedge ID
hyperedge_id = self._generate_hyperedge_id(event, custodian)
# Extract connected entities
source_custodians = []
target_custodians = []
if change_type == "MERGER":
# Sources: predecessor institutions
source_custodians = event.get("predecessor_custodians", [])
# Target: resulting institution
target_custodians = [custodian.get("ghcid", {}).get("ghcid_current")]
elif change_type == "FOUNDING":
# Target: newly founded institution
target_custodians = [custodian.get("ghcid", {}).get("ghcid_current")]
elif change_type == "CLOSURE":
# Source: closed institution
source_custodians = [custodian.get("ghcid", {}).get("ghcid_current")]
elif change_type == "CUSTODY_TRANSFER":
source_custodians = [event.get("source_custodian")]
target_custodians = [event.get("target_custodian")]
# Extract collections involved
collections = event.get("collections_affected", [])
# Extract locations
locations = []
if change_type == "RELOCATION":
locations = [
event.get("from_location"),
event.get("to_location"),
]
# Extract people involved
people = event.get("staff_involved", [])
# Parse event date
event_date = self._parse_date(event.get("event_date"))
return Hyperedge(
hyperedge_id=hyperedge_id,
event_type=event_type,
source_custodians=[s for s in source_custodians if s],
target_custodians=[t for t in target_custodians if t],
collections=[c for c in collections if c],
locations=[l for l in locations if l],
people=[p for p in people if p],
event_date=event_date,
description=event.get("event_description", ""),
confidence=event.get("confidence_score", 1.0),
provenance=event.get("source_documentation", ""),
)
def _generate_hyperedge_id(self, event: dict, custodian: dict) -> str:
"""Generate stable hyperedge ID from event content."""
content = f"{custodian.get('ghcid', {}).get('ghcid_current', '')}"
content += f":{event.get('change_type', '')}"
content += f":{event.get('event_date', '')}"
hash_digest = hashlib.sha256(content.encode()).hexdigest()[:16]
return f"hyperedge:{hash_digest}"
def _parse_date(self, date_str: str) -> datetime:
"""Parse date string to datetime."""
if not date_str:
return datetime.now()
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
# Handle partial dates like "2001" or "2001-01"
parts = date_str.split("-")
if len(parts) == 1:
return datetime(int(parts[0]), 1, 1)
elif len(parts) == 2:
return datetime(int(parts[0]), int(parts[1]), 1)
return datetime.now()
10.2 Hyperedge Retrieval with Two-Stage Scoring
Pattern: Retrieve hyperedges using entity overlap + semantic similarity.
# Pattern: Hyperedge Retrieval
class HyperedgeRetriever:
"""
Two-stage hyperedge retrieval:
Stage 1: Entity Overlap
Find hyperedges containing query entities (exact match)
Stage 2: Semantic Similarity
Rank by embedding similarity to query
This combines precision (entity overlap) with recall (semantic search).
"""
def __init__(self, hyperedge_store: list[Hyperedge]):
self.hyperedges = hyperedge_store
self._entity_index = self._build_entity_index()
def _build_entity_index(self) -> dict[str, set[str]]:
"""
Build inverted index: entity URI → hyperedge IDs.
"""
index = {}
for he in self.hyperedges:
for entity in he.all_connected_entities:
if entity not in index:
index[entity] = set()
index[entity].add(he.hyperedge_id)
return index
def retrieve(
self,
query: str,
query_entities: list[str],
query_embedding: list[float],
top_k: int = 5,
entity_weight: float = 0.6,
semantic_weight: float = 0.4,
) -> list[tuple[Hyperedge, float]]:
"""
Retrieve top-k hyperedges by combined scoring.
Args:
query: Natural language query
query_entities: Extracted entity URIs from query
query_embedding: Query embedding vector
top_k: Number of results
entity_weight: Weight for entity overlap score
semantic_weight: Weight for semantic similarity
Returns:
List of (hyperedge, score) tuples
"""
scores = {}
# Stage 1: Entity overlap scoring
for entity in query_entities:
if entity in self._entity_index:
for he_id in self._entity_index[entity]:
if he_id not in scores:
scores[he_id] = {"entity": 0, "semantic": 0}
scores[he_id]["entity"] += 1
# Normalize entity scores
if query_entities:
for he_id in scores:
scores[he_id]["entity"] /= len(query_entities)
# Stage 2: Semantic similarity
import numpy as np
query_vec = np.array(query_embedding)
for he in self.hyperedges:
if he.embedding is None:
continue
he_vec = np.array(he.embedding)
# Cosine similarity
similarity = float(np.dot(query_vec, he_vec) /
(np.linalg.norm(query_vec) * np.linalg.norm(he_vec)))
if he.hyperedge_id not in scores:
scores[he.hyperedge_id] = {"entity": 0, "semantic": 0}
scores[he.hyperedge_id]["semantic"] = similarity
# Combined scoring
final_scores = []
he_by_id = {he.hyperedge_id: he for he in self.hyperedges}
for he_id, score_dict in scores.items():
combined = (
entity_weight * score_dict["entity"] +
semantic_weight * score_dict["semantic"]
)
final_scores.append((he_by_id[he_id], combined))
# Sort and return top-k
final_scores.sort(key=lambda x: x[1], reverse=True)
return final_scores[:top_k]
def retrieve_by_type(
self,
event_type: str,
date_range: Optional[tuple[datetime, datetime]] = None,
top_k: int = 10,
) -> list[Hyperedge]:
"""
Retrieve hyperedges by event type and optional date range.
Useful for questions like:
- "What mergers happened in 2001?"
- "Show all custody transfers after 2010"
"""
results = [
he for he in self.hyperedges
if he.event_type == event_type
]
if date_range:
start, end = date_range
results = [
he for he in results
if start <= he.event_date <= end
]
return results[:top_k]
10.3 CIDOC-CRM RDF Serialization for Oxigraph
Pattern: Serialize hyperedges to CIDOC-CRM RDF for SPARQL querying.
# Pattern: Hyperedge to CIDOC-CRM RDF
from rdflib import Graph, Namespace, URIRef, Literal, BNode
from rdflib.namespace import RDF, RDFS, XSD
CRM = Namespace("http://www.cidoc-crm.org/cidoc-crm/")
GLAM = Namespace("https://w3id.org/heritage/custodian/")
class HyperedgeRDFSerializer:
"""
Serialize hyperedges to CIDOC-CRM RDF triples.
Maps hyperedge components to CIDOC-CRM:
- Hyperedge → crm:E5_Event (or specific subclass)
- source_custodians → crm:P11_had_participant
- target_custodians → crm:P14_carried_out_by
- collections → crm:P12_occurred_in_the_presence_of
- locations → crm:P7_took_place_at
- event_date → crm:P4_has_time-span
"""
# Event type to CRM class mapping
CRM_EVENT_CLASSES = {
"custody_transfer": CRM["E10_Transfer_of_Custody"],
"merger": CRM["E10_Transfer_of_Custody"],
"founding": CRM["E66_Formation"],
"closure": CRM["E68_Dissolution"],
"relocation": CRM["E9_Move"],
"acquisition": CRM["E8_Acquisition_Event"],
"name_change": CRM["E13_Attribute_Assignment"],
}
def serialize(self, hyperedge: Hyperedge) -> Graph:
"""
Serialize a single hyperedge to RDF graph.
"""
g = Graph()
g.bind("crm", CRM)
g.bind("glam", GLAM)
# Event URI
event_uri = URIRef(f"{GLAM}{hyperedge.hyperedge_id}")
# Event type
crm_class = self.CRM_EVENT_CLASSES.get(
hyperedge.event_type,
CRM["E5_Event"]
)
g.add((event_uri, RDF.type, crm_class))
# Description
if hyperedge.description:
g.add((event_uri, RDFS.label, Literal(hyperedge.description)))
# Time-span
timespan = BNode()
g.add((event_uri, CRM["P4_has_time-span"], timespan))
g.add((timespan, RDF.type, CRM["E52_Time-Span"]))
g.add((timespan, CRM["P82a_begin_of_the_begin"],
Literal(hyperedge.event_date.isoformat(), datatype=XSD.dateTime)))
if hyperedge.event_end_date:
g.add((timespan, CRM["P82b_end_of_the_end"],
Literal(hyperedge.event_end_date.isoformat(), datatype=XSD.dateTime)))
# Source custodians (participants - "from")
for custodian in hyperedge.source_custodians:
custodian_uri = URIRef(f"{GLAM}{custodian}")
g.add((event_uri, CRM["P11_had_participant"], custodian_uri))
# Mark as source with custom predicate
g.add((event_uri, GLAM["source_custodian"], custodian_uri))
# Target custodians (carried out by - "to")
for custodian in hyperedge.target_custodians:
custodian_uri = URIRef(f"{GLAM}{custodian}")
g.add((event_uri, CRM["P14_carried_out_by"], custodian_uri))
g.add((event_uri, GLAM["target_custodian"], custodian_uri))
# Collections involved
for collection in hyperedge.collections:
collection_uri = URIRef(f"{GLAM}collection/{collection}")
g.add((event_uri, CRM["P12_occurred_in_the_presence_of"], collection_uri))
# Locations
for location in hyperedge.locations:
location_uri = URIRef(f"{GLAM}location/{location}")
g.add((event_uri, CRM["P7_took_place_at"], location_uri))
# People involved
for person in hyperedge.people:
person_uri = URIRef(f"{GLAM}person/{person}")
g.add((event_uri, CRM["P11_had_participant"], person_uri))
# Provenance
if hyperedge.provenance:
g.add((event_uri, CRM["P70i_is_documented_in"],
URIRef(hyperedge.provenance)))
return g
def serialize_all(self, hyperedges: list[Hyperedge]) -> Graph:
"""Serialize all hyperedges to a single graph."""
combined = Graph()
combined.bind("crm", CRM)
combined.bind("glam", GLAM)
for he in hyperedges:
for triple in self.serialize(he):
combined.add(triple)
return combined
10.4 SPARQL Templates for Hyperedge Queries
Pattern: Query hyperedges via SPARQL on Oxigraph.
# Pattern: Hyperedge SPARQL Templates
HYPEREDGE_SPARQL_TEMPLATES = {
"mergers_in_year": """
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
PREFIX glam: <https://w3id.org/heritage/custodian/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
SELECT ?event ?description ?source ?target ?date
WHERE {{
?event a crm:E10_Transfer_of_Custody ;
rdfs:label ?description ;
crm:P4_has_time-span ?timespan ;
glam:source_custodian ?source ;
glam:target_custodian ?target .
?timespan crm:P82a_begin_of_the_begin ?date .
FILTER(YEAR(?date) = {year})
}}
ORDER BY ?date
""",
"custody_transfers_for_custodian": """
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
PREFIX glam: <https://w3id.org/heritage/custodian/>
SELECT ?event ?description ?counterparty ?role ?date
WHERE {{
{{
?event a crm:E10_Transfer_of_Custody ;
rdfs:label ?description ;
crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date ;
glam:source_custodian <{custodian_uri}> ;
glam:target_custodian ?counterparty .
BIND("source" AS ?role)
}}
UNION
{{
?event a crm:E10_Transfer_of_Custody ;
rdfs:label ?description ;
crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date ;
glam:target_custodian <{custodian_uri}> ;
glam:source_custodian ?counterparty .
BIND("target" AS ?role)
}}
}}
ORDER BY ?date
""",
"events_in_location": """
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
PREFIX glam: <https://w3id.org/heritage/custodian/>
SELECT ?event ?type ?description ?date
WHERE {{
?event crm:P7_took_place_at <{location_uri}> ;
a ?type ;
rdfs:label ?description ;
crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date .
FILTER(?type IN (
crm:E10_Transfer_of_Custody,
crm:E66_Formation,
crm:E68_Dissolution,
crm:E9_Move
))
}}
ORDER BY DESC(?date)
""",
"founding_events_before_year": """
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
PREFIX glam: <https://w3id.org/heritage/custodian/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
SELECT ?event ?custodian ?custodian_label ?date
WHERE {{
?event a crm:E66_Formation ;
crm:P4_has_time-span/crm:P82a_begin_of_the_begin ?date ;
crm:P14_carried_out_by ?custodian .
?custodian rdfs:label ?custodian_label .
FILTER(YEAR(?date) < {year})
}}
ORDER BY ?date
""",
"collections_transferred_in_event": """
PREFIX crm: <http://www.cidoc-crm.org/cidoc-crm/>
PREFIX glam: <https://w3id.org/heritage/custodian/>
SELECT ?collection ?collection_label
WHERE {{
<{event_uri}> crm:P12_occurred_in_the_presence_of ?collection .
OPTIONAL {{ ?collection rdfs:label ?collection_label }}
}}
""",
}
class HyperedgeSPARQLExecutor:
"""
Execute hyperedge SPARQL queries against Oxigraph.
"""
def __init__(self, oxigraph_endpoint: str = "http://localhost:7878/query"):
self.endpoint = oxigraph_endpoint
def query_mergers_in_year(self, year: int) -> list[dict]:
"""Find all merger events in a given year."""
query = HYPEREDGE_SPARQL_TEMPLATES["mergers_in_year"].format(year=year)
return self._execute(query)
def query_custody_transfers(self, custodian_ghcid: str) -> list[dict]:
"""Find all custody transfers involving a custodian."""
custodian_uri = f"https://w3id.org/heritage/custodian/{custodian_ghcid}"
query = HYPEREDGE_SPARQL_TEMPLATES["custody_transfers_for_custodian"].format(
custodian_uri=custodian_uri
)
return self._execute(query)
def query_events_in_location(self, location_code: str) -> list[dict]:
"""Find all heritage events at a location."""
location_uri = f"https://w3id.org/heritage/custodian/location/{location_code}"
query = HYPEREDGE_SPARQL_TEMPLATES["events_in_location"].format(
location_uri=location_uri
)
return self._execute(query)
def _execute(self, query: str) -> list[dict]:
"""Execute SPARQL query and return results."""
import httpx
response = httpx.post(
self.endpoint,
data=query,
headers={
"Content-Type": "application/sparql-query",
"Accept": "application/json",
},
)
response.raise_for_status()
data = response.json()
results = []
for binding in data.get("results", {}).get("bindings", []):
row = {}
for key, value in binding.items():
row[key] = value.get("value")
results.append(row)
return results
10.5 Integration with DSPy Heritage RAG
Pattern: Integrate hyperedge retrieval into the existing DSPy pipeline.
# Pattern: Hyperedge-Enhanced RAG Module
class HyperedgeEnhancedRetriever(dspy.Module):
"""
DSPy module that incorporates hyperedge retrieval for complex queries.
Use when query involves:
- Organizational change events (mergers, closures)
- Custody transfers
- Multi-entity relationships
"""
def __init__(
self,
hyperedge_retriever: HyperedgeRetriever,
sparql_executor: HyperedgeSPARQLExecutor,
entity_extractor: dspy.Module,
):
super().__init__()
self.hyperedge_retriever = hyperedge_retriever
self.sparql_executor = sparql_executor
self.entity_extractor = entity_extractor
# Synthesizer for combining hyperedge data with other context
self.synthesize = dspy.ChainOfThought(HyperedgeContextSynthesis)
def forward(
self,
question: str,
query_embedding: list[float],
language: str = "nl",
) -> dspy.Prediction:
"""
Retrieve relevant hyperedges and synthesize context.
"""
# Extract entities from question
entities = self.entity_extractor(question=question)
entity_uris = self._entities_to_uris(entities)
# Detect if this is a hyperedge-relevant query
event_type = self._detect_event_type(question)
if event_type:
# Query SPARQL for specific event types
sparql_results = self._query_by_event_type(event_type, question)
else:
sparql_results = []
# Retrieve hyperedges by entity overlap + semantic similarity
hyperedge_results = self.hyperedge_retriever.retrieve(
query=question,
query_entities=entity_uris,
query_embedding=query_embedding,
top_k=5,
)
# Synthesize into context
context = self._format_hyperedge_context(hyperedge_results, sparql_results)
return dspy.Prediction(
hyperedge_context=context,
retrieved_hyperedges=[he for he, _ in hyperedge_results],
sparql_results=sparql_results,
)
def _detect_event_type(self, question: str) -> Optional[str]:
"""Detect if query is about a specific event type."""
question_lower = question.lower()
patterns = {
"merger": ["merger", "merged", "fusie", "gefuseerd", "samengevoegd"],
"founding": ["founded", "established", "opgericht", "gesticht", "founded when"],
"closure": ["closed", "dissolved", "gesloten", "opgeheven"],
"relocation": ["moved", "relocated", "verhuisd", "verplaatst"],
}
for event_type, keywords in patterns.items():
if any(kw in question_lower for kw in keywords):
return event_type
return None
def _query_by_event_type(self, event_type: str, question: str) -> list[dict]:
"""Query SPARQL based on event type."""
import re
# Extract year if mentioned
year_match = re.search(r"\b(19|20)\d{2}\b", question)
if event_type == "merger" and year_match:
return self.sparql_executor.query_mergers_in_year(int(year_match.group()))
elif event_type == "founding" and year_match:
# Use "before year" template for "oldest" queries
if "oldest" in question.lower() or "oudste" in question.lower():
return self.sparql_executor._execute(
HYPEREDGE_SPARQL_TEMPLATES["founding_events_before_year"].format(
year=int(year_match.group())
)
)
return []
def _format_hyperedge_context(
self,
hyperedges: list[tuple[Hyperedge, float]],
sparql_results: list[dict],
) -> str:
"""Format hyperedge results into context string."""
parts = []
for he, score in hyperedges:
part = f"**{he.event_type.replace('_', ' ').title()}** ({he.event_date.year}):\n"
part += f" {he.description}\n"
if he.source_custodians:
part += f" From: {', '.join(he.source_custodians)}\n"
if he.target_custodians:
part += f" To: {', '.join(he.target_custodians)}\n"
if he.collections:
part += f" Collections: {', '.join(he.collections)}\n"
parts.append(part)
if sparql_results:
parts.append("\n**Additional SPARQL Results:**\n")
for result in sparql_results[:5]:
parts.append(f" - {result}\n")
return "\n".join(parts)
class HyperedgeContextSynthesis(dspy.Signature):
"""Synthesize hyperedge context for answer generation."""
question: str = dspy.InputField(desc="User's question")
hyperedge_context: str = dspy.InputField(desc="Retrieved hyperedge context")
language: str = dspy.InputField(desc="Response language")
synthesis: str = dspy.OutputField(desc="Synthesized context highlighting key relationships")
10.6 Mapping Hypergraph Patterns to GLAM Stack
| Pattern | GLAM Component | Implementation |
|---|---|---|
| Hyperedge Construction | Data enrichment pipeline | HyperedgeConstructor class |
| Two-Stage Retrieval | dspy_heritage_rag.py | HyperedgeRetriever class |
| CIDOC-CRM Serialization | RDF export pipeline | HyperedgeRDFSerializer class |
| SPARQL Templates | template_sparql.py | Add HYPEREDGE_SPARQL_TEMPLATES |
| DSPy Integration | dspy_heritage_rag.py | HyperedgeEnhancedRetriever module |
Key Implementation Decisions:
- No New Database: Store hyperedges as RDF in existing Oxigraph instance
- Reuse Embeddings: Use same embedding model as custodian descriptions
- Extend SPARQL: Add hyperedge templates to existing template system
- DSPy Module: Create as optional module activated for event queries
Data Flow:
Change Events (YAML)
↓
HyperedgeConstructor → Hyperedge objects
↓
├── HyperedgeRDFSerializer → Oxigraph (SPARQL)
│
└── Embedding → Qdrant (future: hyperedge collection)
Query
↓
Event Type Detection
↓
├── SPARQL Templates → Oxigraph results
│
└── Hyperedge Retrieval → Semantic + Entity overlap
↓
Synthesized Context → LLM → Answer
11. Rules on Graphs Pattern (SHACL + Datalog Inference)
Sources:
- Velitchkov, I. (2025). "Rules on Graphs in Graphs of Rules, Part 1." Link & Think
- Pareti et al. (2019). "SHACL Constraints with Inference Rules." ISWC 2019
- W3C SHACL Advanced Features Working Group (2025). Datalog Rules Proposal #348
Core Insight: Inference rules should be stored as nodes IN the knowledge graph, not hardcoded in application code. This provides:
- Inspectability: Rules are queryable/auditable via SPARQL
- Governance: Rule provenance tracked alongside data provenance
- Decoupling: Domain logic separated from application code
- Interoperability: Standard formats (SHACL, Datalog) enable rule sharing
11.1 Problem Statement
Traditional RAG systems hardcode inference logic in Python/application code:
# ❌ BAD: Logic buried in application code
def get_parent_institution(ghcid: str) -> Optional[str]:
"""Logic for finding parent org is embedded in code."""
if ghcid.startswith("NL-") and is_regional_archive(ghcid):
return find_provincial_government(ghcid)
# ... more hardcoded rules
This creates problems:
- Rules invisible to users and auditors
- Rule changes require code deployments
- No provenance for derived facts
- Inconsistent rule application across systems
11.2 Rules as Graph Nodes Pattern
Store inference rules as first-class graph entities:
# SHACL Rule stored in Oxigraph
heritage:ParentInstitutionRule
a sh:NodeShape, heritage:InferenceRule ;
sh:targetClass heritage:Archive ;
sh:rule [
a sh:TripleRule ;
sh:subject sh:this ;
sh:predicate heritage:hasParentOrganization ;
sh:object [
sh:path ( heritage:locatedIn heritage:governingBody )
] ;
] ;
# Rule metadata for governance
heritage:rulePriority 100 ;
heritage:ruleCategory "organizational_hierarchy" ;
prov:wasGeneratedBy heritage:DomainExpertExtraction ;
prov:generatedAtTime "2025-01-06T12:00:00Z" ;
rdfs:comment "Infer parent organization from location governance."@en .
Benefits:
- Rule is queryable:
SELECT ?rule WHERE { ?rule a heritage:InferenceRule } - Rule provenance tracked with PROV-O
- Rule can be versioned, deprecated, or overridden
- Multiple systems can consume the same rule definition
11.3 Datalog Rules for Knowledge Graphs
Datalog provides declarative inference with recursion support. Key patterns for heritage domain:
# Pattern 1: Transitive Closure (organizational hierarchy)
ancestor(?X, ?Z) :- parent(?X, ?Z).
ancestor(?X, ?Z) :- parent(?X, ?Y), ancestor(?Y, ?Z).
# Pattern 2: Derived Classification (heritage type inference)
museum_archive(?X) :-
institution_type(?X, "MUSEUM"),
has_collection(?X, ?C),
collection_type(?C, "archival").
# Pattern 3: Temporal Validity (valid at point in time)
valid_at(?Entity, ?Date) :-
valid_from(?Entity, ?Start),
valid_to(?Entity, ?End),
?Start <= ?Date,
?Date <= ?End.
# Pattern 4: Shortcut Predicates (query optimization)
# Instead of traversing: institution → location → region → country
in_country(?Institution, ?Country) :-
located_in(?Institution, ?Location),
admin_region(?Location, ?Region),
country(?Region, ?Country).
Why Datalog over SPARQL Property Paths:
- SPARQL property paths don't support complex recursion
- Datalog rules are materialized (precomputed), making queries instant
- Rules separate "what" from "how" (declarative vs. procedural)
11.4 GLAM Implementation: TypeDB Rules + Oxigraph SHACL
Our stack has two inference engines that can host rules:
| Component | Rule Language | Strengths | Use Case |
|---|---|---|---|
| TypeDB | TypeQL Rules | Native reasoning, transitive closure | Complex inference, hierarchy traversal |
| Oxigraph | SHACL-AF | W3C standard, RDF-native | Validation, simple derivation |
Hybrid Approach:
┌─────────────────────────────────────┐
│ Rule Repository │
│ (Rules stored as RDF in Oxigraph) │
└─────────────────────────────────────┘
↓ Query rules ↓
┌─────────────────────┴─────────────────────┐
↓ ↓
┌───────────────────┐ ┌───────────────────┐
│ Oxigraph │ │ TypeDB │
│ SHACL-AF Rules │ │ TypeQL Rules │
│ │ │ │
│ • Validation │ │ • Transitive │
│ • Simple derive │ │ closure │
│ • sh:TripleRule │ │ • Complex joins │
│ │ │ • Recursive │
└───────────────────┘ └───────────────────┘
↓ ↓
└───────────────────┬───────────────────────┘
↓
┌───────────────┐
│ DSPy RAG │
│ Query Engine │
└───────────────┘
11.5 TypeDB Rules for Heritage Domain
TypeDB rules (TypeQL) are particularly powerful for heritage inference:
# Rule: Infer parent-child organizational relationships
rule heritage-parent-child-inference:
when {
$child isa heritage-custodian, has ghcid $child_id;
$parent isa heritage-custodian, has ghcid $parent_id;
$event isa change-event, has event-type "ACQUISITION";
($event, acquiring: $parent, acquired: $child);
} then {
(parent: $parent, child: $child) isa organizational-hierarchy;
};
# Rule: Infer collection custody from organizational mergers
rule custody-transfer-from-merger:
when {
$source isa heritage-custodian;
$target isa heritage-custodian;
$collection isa collection;
($source, custodian-of: $collection);
$event isa change-event, has event-type "MERGER";
($event, absorbed: $source, absorbing: $target);
$event has event-date $date;
} then {
($target, custodian-of: $collection) isa custody-relation,
has custody-start-date $date;
};
# Rule: Regional archives inherit provincial governance
rule regional-archive-governance:
when {
$archive isa heritage-custodian, has institution-type "ARCHIVE";
$location isa location, has region-code $region;
($archive, located-in: $location);
$gov isa government-body, has jurisdiction $region;
} then {
(governed-by: $gov, governed-entity: $archive) isa governance-relation;
};
Advantages of TypeDB Rules:
- Automatically materialized (precomputed)
- Recursive reasoning built-in
- Rules trigger on data changes
- Explanation support (why was this inferred?)
11.6 SHACL Rules for Oxigraph
SHACL Advanced Features (SHACL-AF) provides rule support:
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix heritage: <https://w3id.org/heritage/> .
# SHACL Rule: Derive institution display name from components
heritage:DisplayNameRule
a sh:NodeShape ;
sh:targetClass heritage:HeritageCustodian ;
sh:rule [
a sh:SPARQLRule ;
sh:construct """
CONSTRUCT {
$this heritage:displayName ?displayName .
}
WHERE {
$this heritage:name ?name .
$this heritage:locatedIn/heritage:city ?city .
BIND(CONCAT(?name, " (", ?city, ")") AS ?displayName)
}
"""
] .
# SHACL Rule: Flag custodians with data quality issues
heritage:DataQualityRule
a sh:NodeShape ;
sh:targetClass heritage:HeritageCustodian ;
sh:rule [
a sh:SPARQLRule ;
sh:construct """
CONSTRUCT {
$this heritage:hasDataQualityIssue heritage:MissingLocation .
}
WHERE {
$this a heritage:HeritageCustodian .
FILTER NOT EXISTS { $this heritage:locatedIn ?loc }
}
"""
] .
11.7 Rule-Based RAG Enhancement
Rules can improve RAG retrieval and generation:
Pattern A: Rule-Guided Query Expansion
class RuleGuidedQueryExpander:
"""Expand queries using inference rules stored in graph."""
def __init__(self, oxigraph_client: OxigraphClient):
self.client = oxigraph_client
self._load_expansion_rules()
def _load_expansion_rules(self):
"""Load query expansion rules from graph."""
query = """
PREFIX heritage: <https://w3id.org/heritage/>
SELECT ?rule ?pattern ?expansion WHERE {
?rule a heritage:QueryExpansionRule ;
heritage:matchPattern ?pattern ;
heritage:expandTo ?expansion .
}
"""
self.rules = self.client.query(query)
def expand(self, query: str) -> list[str]:
"""Apply rules to expand query terms."""
expansions = [query]
for rule in self.rules:
if rule['pattern'] in query.lower():
expanded = query.replace(
rule['pattern'],
rule['expansion']
)
expansions.append(expanded)
return expansions
Pattern B: Rule-Derived Facts in Context
class RuleDerivedContextEnricher:
"""Add inferred facts to RAG context."""
def enrich_context(
self,
entities: list[str],
context: str
) -> str:
"""Add rule-derived facts about entities."""
derived_facts = []
for entity_ghcid in entities:
# Query TypeDB for inferred relations
inferred = self.typedb_client.query(f"""
match
$e isa heritage-custodian, has ghcid "{entity_ghcid}";
$rel ($e, $other);
$rel isa! $rel_type; # Only inferred relations
get $rel_type, $other;
""")
for fact in inferred:
derived_facts.append(
f"[Inferred] {entity_ghcid} {fact['rel_type']} {fact['other']}"
)
if derived_facts:
context += "\n\n**Inferred relationships:**\n"
context += "\n".join(derived_facts)
return context
11.8 Rule Governance and Provenance
Store rule metadata for auditability:
# Rule definition with full provenance (stored in Oxigraph)
rule_definition:
id: heritage:CustodyInferenceRule_v1
type: TypeQLRule
version: "1.0.0"
status: active # active | deprecated | testing
# Rule content
when_clause: |
$source isa heritage-custodian;
$event isa change-event, has event-type "MERGER";
...
then_clause: |
($target, custodian-of: $collection) isa custody-relation;
# Governance metadata
created_by: "domain-expert-curator"
created_date: "2025-01-06T12:00:00Z"
approved_by: "heritage-governance-committee"
approval_date: "2025-01-07T09:00:00Z"
# Semantic metadata
domain: organizational_change
entities_affected:
- heritage:HeritageCustodian
- heritage:Collection
related_properties:
- heritage:custodianOf
- heritage:changeEvent
# Documentation
description: |
When a heritage custodian is absorbed through merger,
custody of their collections transfers to the absorbing institution.
rationale: |
Per Dutch heritage law, organizational mergers transfer custody
unless explicitly reassigned.
example_trigger: |
Noord-Hollands Archief merger in 2001 from Gemeentearchief Haarlem
and Rijksarchief Noord-Holland.
11.9 Integration with DSPy Heritage RAG
Add rule-awareness to the existing pipeline:
# backend/rag/rule_aware_retriever.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class InferredFact:
"""A fact derived by rule inference."""
subject: str
predicate: str
object: str
rule_id: str
confidence: float = 1.0 # Rules produce certain facts
class RuleAwareRetriever:
"""Retriever that includes rule-inferred facts."""
def __init__(
self,
typedb_client,
oxigraph_client,
include_inferred: bool = True
):
self.typedb = typedb_client
self.oxigraph = oxigraph_client
self.include_inferred = include_inferred
def retrieve(
self,
query: str,
entities: list[str]
) -> tuple[list[dict], list[InferredFact]]:
"""Retrieve both stored and inferred facts."""
# Standard retrieval
stored_facts = self._retrieve_stored(query, entities)
# Rule-based inference (if enabled)
inferred_facts = []
if self.include_inferred:
inferred_facts = self._retrieve_inferred(entities)
return stored_facts, inferred_facts
def _retrieve_inferred(
self,
entities: list[str]
) -> list[InferredFact]:
"""Get inferred facts from TypeDB reasoning."""
inferred = []
for ghcid in entities:
# TypeDB query with inference enabled
results = self.typedb.query(f"""
match
$e isa heritage-custodian, has ghcid "{ghcid}";
$rel ($e, $other);
get $rel, $other;
""", inference=True)
for r in results:
if r.get('inferred', False):
inferred.append(InferredFact(
subject=ghcid,
predicate=r['relation_type'],
object=r['other_entity'],
rule_id=r.get('rule_id', 'unknown')
))
return inferred
11.10 Key Takeaways for GLAM
| Principle | Implementation |
|---|---|
| Rules as data | Store SHACL/TypeQL rules in Oxigraph as RDF |
| Rule provenance | Track rule creator, approver, version with PROV-O |
| Dual inference | TypeDB for complex reasoning, SHACL for validation |
| Query optimization | Precompute shortcut predicates via rules |
| RAG enhancement | Include inferred facts in retrieval context |
| Governance | Rules queryable, auditable, versionable |
Anti-Patterns to Avoid:
- ❌ Hardcoding inference logic in Python
- ❌ Duplicating rules across TypeDB and application code
- ❌ No provenance for derived facts
- ❌ Rules that can't be inspected by domain experts
Updated References
- Edge et al. (2024). "From Local to Global: A Graph RAG Approach to Query-Focused Summarization." arXiv:2404.16130
- Wang et al. (2025). "ROGRAG: A Robustly Optimized GraphRAG Framework." arXiv:2503.06474
- Rasmussen et al. (2025). "Zep: A Temporal Knowledge Graph Architecture for Agent Memory." arXiv:2501.13956
- Luo et al. (2025). "HyperGraphRAG: Retrieval-Augmented Generation via Hypergraph-Structured Knowledge Representation." arXiv:2503.21322 (NeurIPS 2025)
- Zhou et al. (2025). "Improving Multi-step RAG with Hypergraph-based Memory for Long-Context Complex Relational Modeling." arXiv:2512.23959
- Sarkar (2025). "GraphRAG in Practice: How to Build Cost-Efficient, High-Recall Retrieval Systems." Towards Data Science
- Turing Post (2026). "12 New Advanced Types of RAG."
- Ding et al. (2025). "STAR-RAG: Time-Aligned Rule Graphs for Temporal Reasoning." arXiv:2510.16715
- Chen et al. (2024). "TimeR4: Time-Aware Retrieve-Rewrite-Retrieve-Rerank for Temporal QA." EMNLP 2024
- Wang et al. (2025). "T-GRAG: Temporal Graph RAG with Conflict Resolution." arXiv:2508.01680
- vLLM Project (2025). "Semantic Router v0.1 Iris: Signal-Decision Architecture."
- Aurelio Labs (2025). "semantic-router: Superfast Decision-Making Layer for LLMs."
- CIDOC-CRM Special Interest Group (2024). "CIDOC Conceptual Reference Model v7.1.3." ICOM.
- Doerr, M. et al. (2023). "Mapping Cultural Heritage Events to CIDOC-CRM." Museum & Web 2023.
- Klyne, G. et al. (2024). "Hypergraph Patterns for Cultural Heritage Knowledge Graphs." Digital Humanities Quarterly.
- DSPy Framework (2024). "Declarative Self-Improving Language Programs." Stanford NLP.
- Pinecone (2024). "Vector Database Best Practices for RAG." Technical Blog.
- LightRAG (2024). "Simple and Fast Retrieval-Augmented Generation." GitHub Repository.
- Velitchkov, I. (2025). "Rules on Graphs in Graphs of Rules, Part 1." Link & Think (Substack).
- Pareti, P. et al. (2019). "SHACL Constraints with Inference Rules." ISWC 2019. arXiv:1911.00598
- W3C SHACL Working Group (2025). "Use case: Datalog rules." GitHub Issue #348, w3c/data-shapes.
- Han, H. et al. (2025). "Retrieval-Augmented Generation with Graphs (GraphRAG)." arXiv:2501.00309
- SurrealDB (2025). "Automating Knowledge Graphs with SurrealDB and Gemini." Technical Blog.