# Implementation Guide: GraphRAG Patterns for GLAM
**Purpose**: Concrete implementation patterns for integrating external GraphRAG techniques into our TypeDB-Oxigraph-DSPy stack.
---
## Pattern A: Retrieval Verification Layer
### Rationale
From ROGRAG research: Argument checking (verify context before generation) outperforms result checking (verify after generation) with 75% vs 72% accuracy.
### Implementation
Add to `dspy_heritage_rag.py`:
```python
# =============================================================================
# RETRIEVAL VERIFICATION (ROGRAG Pattern)
# =============================================================================
class ArgumentVerifier(dspy.Signature):
"""
Verify if retrieved context can answer the query before generation.
Prevents hallucination from insufficient context.
Based on ROGRAG (arxiv:2503.06474) finding that argument checking
outperforms result checking (75% vs 72% accuracy).
"""
__doc__ = """
You are a verification assistant for heritage institution queries.
Given a user query and retrieved context, determine if the context
contains sufficient information to answer the query accurately.
Be strict:
- If key entities (institutions, cities, dates) are mentioned in the query
but not found in the context, return can_answer=False
- If the query asks for counts but context doesn't provide them, return False
- If the query asks about relationships but context only has entity lists, return False
Examples of INSUFFICIENT context:
- Query: "How many archives are in Haarlem?" / Context: mentions Haarlem archives but no count
- Query: "When was Rijksmuseum founded?" / Context: describes Rijksmuseum but no founding date
Examples of SUFFICIENT context:
- Query: "What archives are in Haarlem?" / Context: lists 3 specific archives in Haarlem
- Query: "Tell me about the Rijksmuseum" / Context: contains name, location, type, description
"""
query: str = dspy.InputField(desc="User's original question")
context: str = dspy.InputField(desc="Retrieved information from KG and vector search")
can_answer: bool = dspy.OutputField(
desc="True if context contains sufficient information to answer accurately"
)
missing_info: str = dspy.OutputField(
desc="What specific information is missing (empty if can_answer=True)"
)
confidence: float = dspy.OutputField(
desc="Confidence score 0-1 that context is sufficient"
)
suggested_refinement: str = dspy.OutputField(
desc="Suggested query refinement if context is insufficient (empty if can_answer=True)"
)
class VerifiedHeritageRAG(dspy.Module):
"""
RAG pipeline with verification layer before answer generation.
"""
def __init__(self, max_verification_retries: int = 2):
super().__init__()
self.max_retries = max_verification_retries
self.verifier = dspy.ChainOfThought(ArgumentVerifier)
self.retriever = HeritageRetriever() # Existing retriever
self.generator = dspy.ChainOfThought(HeritageAnswerSignature) # Existing generator
def forward(
self,
query: str,
conversation_history: Optional[list[dict]] = None
) -> dspy.Prediction:
"""
Retrieve, verify, then generate - with retry on insufficient context.
"""
context = ""
verification_attempts = []
for attempt in range(self.max_retries + 1):
# Expand search if this is a retry
expand_search = attempt > 0
# Retrieve context
retrieval_result = self.retriever(
query=query,
expand=expand_search,
previous_context=context
)
context = retrieval_result.context
# Verify sufficiency
verification = self.verifier(query=query, context=context)
verification_attempts.append({
"attempt": attempt,
"can_answer": verification.can_answer,
"confidence": verification.confidence,
"missing": verification.missing_info
})
if verification.can_answer and verification.confidence >= 0.7:
break
# Log retry
logger.info(
f"Verification attempt {attempt + 1}/{self.max_retries + 1}: "
f"Insufficient context. Missing: {verification.missing_info}"
)
# Generate answer (with caveat if low confidence)
if not verification.can_answer:
context = f"[NOTE: Limited information available]\n\n{context}"
answer = self.generator(query=query, context=context)
return dspy.Prediction(
answer=answer.response,
context=context,
verification=verification_attempts[-1],
retries=len(verification_attempts) - 1
)
```
### Integration Point
In `dspy_heritage_rag.py`, modify `HeritageRAGModule.forward()` to use verification:
```python
# Before (current):
# answer = self.generate_answer(query, context)
# After (with verification):
verification = self.verifier(query=query, context=context)
if not verification.can_answer and verification.confidence < 0.7:
# Expand search and retry
context = self._expand_retrieval(query, context, verification.missing_info)
verification = self.verifier(query=query, context=context)
answer = self.generate_answer(query, context)
```
---
## Pattern B: Dual-Level Entity Extraction
### Rationale
From ROGRAG: Separating low-level (entities) from high-level (relations) enables:
- Low-level: Fuzzy string matching for names, places, IDs
- High-level: Semantic similarity for concepts, relationships
### Implementation
Add to `dspy_heritage_rag.py`:
```python
# =============================================================================
# DUAL-LEVEL EXTRACTION (ROGRAG Pattern)
# =============================================================================
class DualLevelEntityExtractor(dspy.Signature):
"""
Extract both entity-level and relation-level keywords from heritage queries.
Based on ROGRAG (arxiv:2503.06474) dual-level retrieval method.
Low-level: Named entities for fuzzy graph matching
High-level: Relation descriptions for semantic vector matching
"""
__doc__ = """
You are a heritage query analyzer. Extract two types of information:
LOW-LEVEL (Entities):
- Institution names: Rijksmuseum, Nationaal Archief, etc.
- Place names: Amsterdam, Limburg, Noord-Holland
- Person names: Staff, directors, curators
- Identifiers: GHCID, ISIL codes (NL-XXXX)
- Dates: Years, date ranges
HIGH-LEVEL (Relations/Concepts):
- Collection types: "digitized collections", "medieval manuscripts"
- Institution attributes: "oldest", "largest", "founded before 1900"
- Relationship phrases: "collaborated with", "merged into", "part of"
- Activities: "preserves", "exhibits", "researches"
Examples:
Query: "Which archives in Haarlem have digitized medieval manuscripts?"
Entities: ["Haarlem", "archives"]
Relations: ["digitized collections", "medieval manuscripts"]
Strategy: entity_first (narrow by location, then filter by collection type)
Query: "What museums were founded before 1850 in the Netherlands?"
Entities: ["Netherlands", "museums", "1850"]
Relations: ["founded before", "historical institution"]
Strategy: relation_first (semantic search for founding dates, then verify entities)
Query: "Tell me about the Rijksmuseum"
Entities: ["Rijksmuseum"]
Relations: ["general information", "institution overview"]
Strategy: entity_first (direct lookup)
"""
query: str = dspy.InputField(desc="User's heritage question")
entities: list[str] = dspy.OutputField(
desc="Low-level: Named entities (institutions, places, people, dates, IDs)"
)
relations: list[str] = dspy.OutputField(
desc="High-level: Relation/concept phrases for semantic matching"
)
search_strategy: Literal["entity_first", "relation_first", "parallel"] = dspy.OutputField(
desc="Recommended search strategy based on query structure"
)
entity_types: list[str] = dspy.OutputField(
desc="Types of entities found: institution, place, person, date, identifier"
)
class DualLevelRetriever(dspy.Module):
"""
Combines entity-level graph search with relation-level semantic search.
"""
def __init__(self, qdrant_client, oxigraph_endpoint: str):
super().__init__()
self.extractor = dspy.ChainOfThought(DualLevelEntityExtractor)
self.qdrant = qdrant_client
self.oxigraph = oxigraph_endpoint
def match_entities_in_graph(self, entities: list[str]) -> set[str]:
"""
Fuzzy match entities against Oxigraph nodes.
Returns matching GHCIDs.
"""
ghcids = set()
for entity in entities:
# Use FILTER with CONTAINS for fuzzy matching
sparql = f"""
PREFIX hc:
PREFIX skos:
SELECT DISTINCT ?ghcid WHERE {{
?s hc:ghcid ?ghcid .
{{
?s skos:prefLabel ?name .
FILTER(CONTAINS(LCASE(?name), LCASE("{entity}")))
}} UNION {{
?s schema:addressLocality ?city .
FILTER(CONTAINS(LCASE(?city), LCASE("{entity}")))
}} UNION {{
?s hc:ghcid ?ghcid .
FILTER(CONTAINS(?ghcid, "{entity.upper()}"))
}}
}}
LIMIT 50
"""
results = self._execute_sparql(sparql)
ghcids.update(r["ghcid"] for r in results)
return ghcids
def match_relations_semantically(
self,
relations: list[str],
ghcid_filter: Optional[set[str]] = None
) -> list[dict]:
"""
Semantic search for relation descriptions in vector store.
Optionally filter by GHCID set from entity matching.
"""
# Combine relation phrases into search query
relation_query = " ".join(relations)
# Build filter
qdrant_filter = None
if ghcid_filter:
qdrant_filter = models.Filter(
must=[
models.FieldCondition(
key="ghcid",
match=models.MatchAny(any=list(ghcid_filter))
)
]
)
# Vector search
results = self.qdrant.search(
collection_name="heritage_chunks",
query_vector=self._embed(relation_query),
query_filter=qdrant_filter,
limit=20
)
return [
{
"ghcid": r.payload.get("ghcid"),
"text": r.payload.get("text"),
"score": r.score
}
for r in results
]
def forward(self, query: str) -> dspy.Prediction:
"""
Dual-level retrieval: entities narrow search, relations refine results.
"""
# Extract dual levels
extraction = self.extractor(query=query)
if extraction.search_strategy == "entity_first":
# Step 1: Entity matching in graph
ghcid_set = self.match_entities_in_graph(extraction.entities)
# Step 2: Relation matching with GHCID filter
results = self.match_relations_semantically(
extraction.relations,
ghcid_filter=ghcid_set if ghcid_set else None
)
elif extraction.search_strategy == "relation_first":
# Step 1: Broad relation matching
results = self.match_relations_semantically(extraction.relations)
# Step 2: Filter by entity matching
result_ghcids = {r["ghcid"] for r in results if r.get("ghcid")}
entity_ghcids = self.match_entities_in_graph(extraction.entities)
# Prioritize intersection
intersection = result_ghcids & entity_ghcids
if intersection:
results = [r for r in results if r.get("ghcid") in intersection]
else: # parallel
# Run both in parallel, merge results
ghcid_set = self.match_entities_in_graph(extraction.entities)
semantic_results = self.match_relations_semantically(extraction.relations)
# Score boost for results matching both
for r in semantic_results:
if r.get("ghcid") in ghcid_set:
r["score"] *= 1.5 # Boost intersection
results = sorted(semantic_results, key=lambda x: -x["score"])
return dspy.Prediction(
results=results,
entities=extraction.entities,
relations=extraction.relations,
strategy=extraction.search_strategy,
ghcid_set=list(ghcid_set) if 'ghcid_set' in locals() else []
)
```
---
## Pattern C: Community Detection and Summaries
### Rationale
From Microsoft GraphRAG: Community summaries enable answering holistic questions like "What are the main archival themes in the Netherlands?"
### Implementation
Create new file `backend/rag/community_indexer.py`:
```python
"""
Community Detection and Summary Indexing for Global Search
Based on Microsoft GraphRAG (arxiv:2404.16130) community hierarchy pattern.
Uses Leiden algorithm for community detection on institution graph.
"""
import json
import logging
from dataclasses import dataclass
from typing import Optional
import dspy
import igraph as ig
import leidenalg
from qdrant_client import QdrantClient, models
logger = logging.getLogger(__name__)
@dataclass
class Community:
"""A community of related heritage institutions."""
community_id: str
ghcids: list[str]
summary: str
institution_count: int
dominant_type: str # Most common institution type
dominant_region: str # Most common region
themes: list[str] # Extracted themes
class CommunitySummarizer(dspy.Signature):
"""Generate a summary for a community of heritage institutions."""
__doc__ = """
You are a heritage domain expert. Given a list of institutions in a community,
generate a concise summary describing:
1. What types of institutions are in this community
2. Geographic concentration (if any)
3. Common themes or specializations
4. Notable relationships between institutions
Keep the summary to 2-3 sentences. Focus on what makes this community distinctive.
"""
institutions: str = dspy.InputField(desc="JSON list of institution metadata")
summary: str = dspy.OutputField(desc="2-3 sentence community summary")
themes: list[str] = dspy.OutputField(desc="Key themes (3-5 keywords)")
notable_features: str = dspy.OutputField(desc="What makes this community distinctive")
class CommunityIndexer:
"""
Builds and indexes institution communities for global search.
Usage:
indexer = CommunityIndexer(oxigraph_url, qdrant_client)
indexer.build_communities()
indexer.index_summaries()
"""
def __init__(
self,
oxigraph_endpoint: str,
qdrant_client: QdrantClient,
collection_name: str = "heritage_communities"
):
self.oxigraph = oxigraph_endpoint
self.qdrant = qdrant_client
self.collection_name = collection_name
self.summarizer = dspy.ChainOfThought(CommunitySummarizer)
def build_institution_graph(self) -> ig.Graph:
"""
Query Oxigraph for institution relationships.
Build igraph for community detection.
"""
# Get all institutions with their properties
sparql = """
PREFIX hc:
PREFIX skos:
PREFIX schema:
SELECT ?ghcid ?name ?type ?city ?region WHERE {
?s hc:ghcid ?ghcid ;
skos:prefLabel ?name ;
hc:institutionType ?type .
OPTIONAL { ?s schema:addressLocality ?city }
OPTIONAL { ?s hc:regionCode ?region }
}
"""
institutions = self._execute_sparql(sparql)
# Build graph: nodes are institutions, edges connect those sharing:
# - Same city
# - Same region
# - Same type
# - Part-of relationships
g = ig.Graph()
ghcid_to_idx = {}
# Add nodes
for inst in institutions:
idx = g.add_vertex(
ghcid=inst["ghcid"],
name=inst.get("name", ""),
type=inst.get("type", ""),
city=inst.get("city", ""),
region=inst.get("region", "")
)
ghcid_to_idx[inst["ghcid"]] = idx.index
# Add edges based on shared properties
for i, inst1 in enumerate(institutions):
for j, inst2 in enumerate(institutions[i+1:], i+1):
weight = 0
# Same city: strong connection
if inst1.get("city") and inst1["city"] == inst2.get("city"):
weight += 2
# Same region: medium connection
if inst1.get("region") and inst1["region"] == inst2.get("region"):
weight += 1
# Same type: weak connection
if inst1.get("type") and inst1["type"] == inst2.get("type"):
weight += 0.5
if weight > 0:
g.add_edge(
ghcid_to_idx[inst1["ghcid"]],
ghcid_to_idx[inst2["ghcid"]],
weight=weight
)
return g
def detect_communities(self, graph: ig.Graph) -> dict[str, list[str]]:
"""
Apply Leiden algorithm for community detection.
Returns mapping: community_id -> [ghcid_list]
"""
# Leiden with modularity optimization
partition = leidenalg.find_partition(
graph,
leidenalg.ModularityVertexPartition,
weights="weight"
)
communities = {}
for comm_idx, members in enumerate(partition):
comm_id = f"comm_{comm_idx:04d}"
ghcids = [graph.vs[idx]["ghcid"] for idx in members]
communities[comm_id] = ghcids
logger.info(f"Detected {len(communities)} communities")
return communities
def generate_community_summary(
self,
community_id: str,
ghcids: list[str]
) -> Community:
"""
Generate LLM summary for a community.
"""
# Fetch metadata for all institutions
institutions = self._fetch_institutions(ghcids)
# Generate summary
result = self.summarizer(
institutions=json.dumps(institutions, indent=2)
)
# Determine dominant type and region
types = [i.get("type", "") for i in institutions]
regions = [i.get("region", "") for i in institutions]
dominant_type = max(set(types), key=types.count) if types else ""
dominant_region = max(set(regions), key=regions.count) if regions else ""
return Community(
community_id=community_id,
ghcids=ghcids,
summary=result.summary,
institution_count=len(ghcids),
dominant_type=dominant_type,
dominant_region=dominant_region,
themes=result.themes
)
def index_summaries(self, communities: list[Community]) -> None:
"""
Store community summaries in Qdrant for global search.
"""
# Create collection if not exists
self.qdrant.recreate_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=384, # MiniLM embedding size
distance=models.Distance.COSINE
)
)
# Index each community
points = []
for comm in communities:
embedding = self._embed(comm.summary)
points.append(models.PointStruct(
id=hash(comm.community_id) % (2**63),
vector=embedding,
payload={
"community_id": comm.community_id,
"summary": comm.summary,
"ghcids": comm.ghcids,
"institution_count": comm.institution_count,
"dominant_type": comm.dominant_type,
"dominant_region": comm.dominant_region,
"themes": comm.themes
}
))
self.qdrant.upsert(
collection_name=self.collection_name,
points=points
)
logger.info(f"Indexed {len(points)} community summaries")
def global_search(self, query: str, limit: int = 5) -> list[dict]:
"""
Search community summaries for holistic questions.
"""
embedding = self._embed(query)
results = self.qdrant.search(
collection_name=self.collection_name,
query_vector=embedding,
limit=limit
)
return [
{
"community_id": r.payload["community_id"],
"summary": r.payload["summary"],
"themes": r.payload["themes"],
"institution_count": r.payload["institution_count"],
"score": r.score
}
for r in results
]
def build_and_index(self) -> int:
"""
Full pipeline: build graph, detect communities, generate summaries, index.
Returns number of communities indexed.
"""
logger.info("Building institution graph...")
graph = self.build_institution_graph()
logger.info("Detecting communities...")
community_map = self.detect_communities(graph)
logger.info("Generating community summaries...")
communities = []
for comm_id, ghcids in community_map.items():
if len(ghcids) >= 3: # Only summarize communities with 3+ members
comm = self.generate_community_summary(comm_id, ghcids)
communities.append(comm)
logger.info(f"Indexing {len(communities)} community summaries...")
self.index_summaries(communities)
return len(communities)
```
---
## Pattern D: Temporal Query Templates
### Rationale
From Zep: Bitemporal modeling enables point-in-time queries and provenance tracking.
### Implementation
Add to `template_sparql.py`:
```python
# =============================================================================
# TEMPORAL QUERY TEMPLATES (Zep Pattern)
# =============================================================================
TEMPORAL_QUERY_TEMPLATES = {
"point_in_time_state": TemplateDefinition(
id="temporal_pit",
name="Point-in-Time Institution State",
description="Get institution state at a specific point in time",
intent_patterns=["what was", "in [year]", "before", "after", "at that time"],
sparql_template="""
PREFIX hc:
PREFIX skos:
PREFIX schema:
PREFIX xsd:
SELECT ?ghcid ?name ?type ?city ?validFrom ?validTo WHERE {
?s hc:ghcid ?ghcid ;
skos:prefLabel ?name ;
hc:institutionType ?type ;
hc:validFrom ?validFrom .
OPTIONAL { ?s schema:addressLocality ?city }
OPTIONAL { ?s hc:validTo ?validTo }
# Temporal filter: valid at query date
FILTER(?validFrom <= "{{ query_date }}"^^xsd:date)
FILTER(!BOUND(?validTo) || ?validTo > "{{ query_date }}"^^xsd:date)
{% if ghcid_filter %}
FILTER(STRSTARTS(?ghcid, "{{ ghcid_filter }}"))
{% endif %}
}
ORDER BY ?ghcid
""",
slots=[
SlotDefinition(type=SlotType.STRING, name="query_date", required=True),
SlotDefinition(type=SlotType.STRING, name="ghcid_filter", required=False)
]
),
"institution_history": TemplateDefinition(
id="temporal_history",
name="Institution Change History",
description="Get full history of changes for an institution",
intent_patterns=["history of", "changes to", "evolution of", "timeline"],
sparql_template="""
PREFIX hc:
PREFIX skos:
SELECT ?ghcid ?name ?validFrom ?validTo ?changeType ?description WHERE {
?entry hc:ghcid "{{ ghcid }}" ;
skos:prefLabel ?name ;
hc:validFrom ?validFrom .
OPTIONAL { ?entry hc:validTo ?validTo }
OPTIONAL { ?entry hc:changeType ?changeType }
OPTIONAL { ?entry hc:changeDescription ?description }
}
ORDER BY ?validFrom
""",
slots=[
SlotDefinition(type=SlotType.STRING, name="ghcid", required=True)
]
),
"institutions_founded_before": TemplateDefinition(
id="temporal_founded_before",
name="Institutions Founded Before Date",
description="Find institutions founded before a specific date",
intent_patterns=["founded before", "established before", "older than", "before [year]"],
sparql_template="""
PREFIX hc:
PREFIX skos:
PREFIX schema:
PREFIX xsd:
SELECT ?ghcid ?name ?type ?city ?foundingDate WHERE {
?s hc:ghcid ?ghcid ;
skos:prefLabel ?name ;
hc:institutionType ?type ;
schema:foundingDate ?foundingDate .
OPTIONAL { ?s schema:addressLocality ?city }
FILTER(?foundingDate < "{{ cutoff_date }}"^^xsd:date)
{% if institution_type %}
FILTER(?type = "{{ institution_type }}")
{% endif %}
}
ORDER BY ?foundingDate
LIMIT {{ limit | default(50) }}
""",
slots=[
SlotDefinition(type=SlotType.STRING, name="cutoff_date", required=True),
SlotDefinition(type=SlotType.INSTITUTION_TYPE, name="institution_type", required=False),
SlotDefinition(type=SlotType.INTEGER, name="limit", required=False, default="50")
]
),
"merger_history": TemplateDefinition(
id="temporal_mergers",
name="Institution Merger History",
description="Find institutions that merged or were absorbed",
intent_patterns=["merged", "merger", "combined", "absorbed", "joined"],
sparql_template="""
PREFIX hc:
PREFIX skos:
PREFIX crm:
SELECT ?event ?eventDate ?description
?sourceGhcid ?sourceName
?targetGhcid ?targetName WHERE {
?event a hc:MergerEvent ;
hc:eventDate ?eventDate ;
hc:description ?description .
OPTIONAL {
?event hc:sourceInstitution ?source .
?source hc:ghcid ?sourceGhcid ;
skos:prefLabel ?sourceName .
}
OPTIONAL {
?event hc:resultingInstitution ?target .
?target hc:ghcid ?targetGhcid ;
skos:prefLabel ?targetName .
}
{% if region_filter %}
FILTER(STRSTARTS(?sourceGhcid, "{{ region_filter }}") ||
STRSTARTS(?targetGhcid, "{{ region_filter }}"))
{% endif %}
}
ORDER BY ?eventDate
""",
slots=[
SlotDefinition(type=SlotType.STRING, name="region_filter", required=False)
]
)
}
```
---
## Integration Checklist
### Immediate Actions
- [ ] Add `ArgumentVerifier` signature to `dspy_heritage_rag.py`
- [ ] Add `DualLevelEntityExtractor` signature
- [ ] Integrate verification into retrieval pipeline
- [ ] Add temporal query templates to `template_sparql.py`
### Short-Term Actions
- [ ] Create `backend/rag/community_indexer.py`
- [ ] Add Leiden algorithm dependency: `pip install leidenalg python-igraph`
- [ ] Create Qdrant collection for community summaries
- [ ] Add global search mode to RAG pipeline
### Testing
```bash
# Test verification layer
python -c "
from backend.rag.dspy_heritage_rag import ArgumentVerifier
import dspy
dspy.configure(lm=...)
verifier = dspy.ChainOfThought(ArgumentVerifier)
result = verifier(
query='How many archives are in Haarlem?',
context='Haarlem has several heritage institutions including archives.'
)
print(f'Can answer: {result.can_answer}')
print(f'Missing: {result.missing_info}')
"
# Test dual-level extraction
python -c "
from backend.rag.dspy_heritage_rag import DualLevelEntityExtractor
import dspy
dspy.configure(lm=...)
extractor = dspy.ChainOfThought(DualLevelEntityExtractor)
result = extractor(query='Which archives in Haarlem have digitized medieval manuscripts?')
print(f'Entities: {result.entities}')
print(f'Relations: {result.relations}')
print(f'Strategy: {result.search_strategy}')
"
```