# Implementation Guide: GraphRAG Patterns for GLAM **Purpose**: Concrete implementation patterns for integrating external GraphRAG techniques into our TypeDB-Oxigraph-DSPy stack. --- ## Pattern A: Retrieval Verification Layer ### Rationale From ROGRAG research: Argument checking (verify context before generation) outperforms result checking (verify after generation) with 75% vs 72% accuracy. ### Implementation Add to `dspy_heritage_rag.py`: ```python # ============================================================================= # RETRIEVAL VERIFICATION (ROGRAG Pattern) # ============================================================================= class ArgumentVerifier(dspy.Signature): """ Verify if retrieved context can answer the query before generation. Prevents hallucination from insufficient context. Based on ROGRAG (arxiv:2503.06474) finding that argument checking outperforms result checking (75% vs 72% accuracy). """ __doc__ = """ You are a verification assistant for heritage institution queries. Given a user query and retrieved context, determine if the context contains sufficient information to answer the query accurately. Be strict: - If key entities (institutions, cities, dates) are mentioned in the query but not found in the context, return can_answer=False - If the query asks for counts but context doesn't provide them, return False - If the query asks about relationships but context only has entity lists, return False Examples of INSUFFICIENT context: - Query: "How many archives are in Haarlem?" / Context: mentions Haarlem archives but no count - Query: "When was Rijksmuseum founded?" / Context: describes Rijksmuseum but no founding date Examples of SUFFICIENT context: - Query: "What archives are in Haarlem?" / Context: lists 3 specific archives in Haarlem - Query: "Tell me about the Rijksmuseum" / Context: contains name, location, type, description """ query: str = dspy.InputField(desc="User's original question") context: str = dspy.InputField(desc="Retrieved information from KG and vector search") can_answer: bool = dspy.OutputField( desc="True if context contains sufficient information to answer accurately" ) missing_info: str = dspy.OutputField( desc="What specific information is missing (empty if can_answer=True)" ) confidence: float = dspy.OutputField( desc="Confidence score 0-1 that context is sufficient" ) suggested_refinement: str = dspy.OutputField( desc="Suggested query refinement if context is insufficient (empty if can_answer=True)" ) class VerifiedHeritageRAG(dspy.Module): """ RAG pipeline with verification layer before answer generation. """ def __init__(self, max_verification_retries: int = 2): super().__init__() self.max_retries = max_verification_retries self.verifier = dspy.ChainOfThought(ArgumentVerifier) self.retriever = HeritageRetriever() # Existing retriever self.generator = dspy.ChainOfThought(HeritageAnswerSignature) # Existing generator def forward( self, query: str, conversation_history: Optional[list[dict]] = None ) -> dspy.Prediction: """ Retrieve, verify, then generate - with retry on insufficient context. """ context = "" verification_attempts = [] for attempt in range(self.max_retries + 1): # Expand search if this is a retry expand_search = attempt > 0 # Retrieve context retrieval_result = self.retriever( query=query, expand=expand_search, previous_context=context ) context = retrieval_result.context # Verify sufficiency verification = self.verifier(query=query, context=context) verification_attempts.append({ "attempt": attempt, "can_answer": verification.can_answer, "confidence": verification.confidence, "missing": verification.missing_info }) if verification.can_answer and verification.confidence >= 0.7: break # Log retry logger.info( f"Verification attempt {attempt + 1}/{self.max_retries + 1}: " f"Insufficient context. Missing: {verification.missing_info}" ) # Generate answer (with caveat if low confidence) if not verification.can_answer: context = f"[NOTE: Limited information available]\n\n{context}" answer = self.generator(query=query, context=context) return dspy.Prediction( answer=answer.response, context=context, verification=verification_attempts[-1], retries=len(verification_attempts) - 1 ) ``` ### Integration Point In `dspy_heritage_rag.py`, modify `HeritageRAGModule.forward()` to use verification: ```python # Before (current): # answer = self.generate_answer(query, context) # After (with verification): verification = self.verifier(query=query, context=context) if not verification.can_answer and verification.confidence < 0.7: # Expand search and retry context = self._expand_retrieval(query, context, verification.missing_info) verification = self.verifier(query=query, context=context) answer = self.generate_answer(query, context) ``` --- ## Pattern B: Dual-Level Entity Extraction ### Rationale From ROGRAG: Separating low-level (entities) from high-level (relations) enables: - Low-level: Fuzzy string matching for names, places, IDs - High-level: Semantic similarity for concepts, relationships ### Implementation Add to `dspy_heritage_rag.py`: ```python # ============================================================================= # DUAL-LEVEL EXTRACTION (ROGRAG Pattern) # ============================================================================= class DualLevelEntityExtractor(dspy.Signature): """ Extract both entity-level and relation-level keywords from heritage queries. Based on ROGRAG (arxiv:2503.06474) dual-level retrieval method. Low-level: Named entities for fuzzy graph matching High-level: Relation descriptions for semantic vector matching """ __doc__ = """ You are a heritage query analyzer. Extract two types of information: LOW-LEVEL (Entities): - Institution names: Rijksmuseum, Nationaal Archief, etc. - Place names: Amsterdam, Limburg, Noord-Holland - Person names: Staff, directors, curators - Identifiers: GHCID, ISIL codes (NL-XXXX) - Dates: Years, date ranges HIGH-LEVEL (Relations/Concepts): - Collection types: "digitized collections", "medieval manuscripts" - Institution attributes: "oldest", "largest", "founded before 1900" - Relationship phrases: "collaborated with", "merged into", "part of" - Activities: "preserves", "exhibits", "researches" Examples: Query: "Which archives in Haarlem have digitized medieval manuscripts?" Entities: ["Haarlem", "archives"] Relations: ["digitized collections", "medieval manuscripts"] Strategy: entity_first (narrow by location, then filter by collection type) Query: "What museums were founded before 1850 in the Netherlands?" Entities: ["Netherlands", "museums", "1850"] Relations: ["founded before", "historical institution"] Strategy: relation_first (semantic search for founding dates, then verify entities) Query: "Tell me about the Rijksmuseum" Entities: ["Rijksmuseum"] Relations: ["general information", "institution overview"] Strategy: entity_first (direct lookup) """ query: str = dspy.InputField(desc="User's heritage question") entities: list[str] = dspy.OutputField( desc="Low-level: Named entities (institutions, places, people, dates, IDs)" ) relations: list[str] = dspy.OutputField( desc="High-level: Relation/concept phrases for semantic matching" ) search_strategy: Literal["entity_first", "relation_first", "parallel"] = dspy.OutputField( desc="Recommended search strategy based on query structure" ) entity_types: list[str] = dspy.OutputField( desc="Types of entities found: institution, place, person, date, identifier" ) class DualLevelRetriever(dspy.Module): """ Combines entity-level graph search with relation-level semantic search. """ def __init__(self, qdrant_client, oxigraph_endpoint: str): super().__init__() self.extractor = dspy.ChainOfThought(DualLevelEntityExtractor) self.qdrant = qdrant_client self.oxigraph = oxigraph_endpoint def match_entities_in_graph(self, entities: list[str]) -> set[str]: """ Fuzzy match entities against Oxigraph nodes. Returns matching GHCIDs. """ ghcids = set() for entity in entities: # Use FILTER with CONTAINS for fuzzy matching sparql = f""" PREFIX hc: PREFIX skos: SELECT DISTINCT ?ghcid WHERE {{ ?s hc:ghcid ?ghcid . {{ ?s skos:prefLabel ?name . FILTER(CONTAINS(LCASE(?name), LCASE("{entity}"))) }} UNION {{ ?s schema:addressLocality ?city . FILTER(CONTAINS(LCASE(?city), LCASE("{entity}"))) }} UNION {{ ?s hc:ghcid ?ghcid . FILTER(CONTAINS(?ghcid, "{entity.upper()}")) }} }} LIMIT 50 """ results = self._execute_sparql(sparql) ghcids.update(r["ghcid"] for r in results) return ghcids def match_relations_semantically( self, relations: list[str], ghcid_filter: Optional[set[str]] = None ) -> list[dict]: """ Semantic search for relation descriptions in vector store. Optionally filter by GHCID set from entity matching. """ # Combine relation phrases into search query relation_query = " ".join(relations) # Build filter qdrant_filter = None if ghcid_filter: qdrant_filter = models.Filter( must=[ models.FieldCondition( key="ghcid", match=models.MatchAny(any=list(ghcid_filter)) ) ] ) # Vector search results = self.qdrant.search( collection_name="heritage_chunks", query_vector=self._embed(relation_query), query_filter=qdrant_filter, limit=20 ) return [ { "ghcid": r.payload.get("ghcid"), "text": r.payload.get("text"), "score": r.score } for r in results ] def forward(self, query: str) -> dspy.Prediction: """ Dual-level retrieval: entities narrow search, relations refine results. """ # Extract dual levels extraction = self.extractor(query=query) if extraction.search_strategy == "entity_first": # Step 1: Entity matching in graph ghcid_set = self.match_entities_in_graph(extraction.entities) # Step 2: Relation matching with GHCID filter results = self.match_relations_semantically( extraction.relations, ghcid_filter=ghcid_set if ghcid_set else None ) elif extraction.search_strategy == "relation_first": # Step 1: Broad relation matching results = self.match_relations_semantically(extraction.relations) # Step 2: Filter by entity matching result_ghcids = {r["ghcid"] for r in results if r.get("ghcid")} entity_ghcids = self.match_entities_in_graph(extraction.entities) # Prioritize intersection intersection = result_ghcids & entity_ghcids if intersection: results = [r for r in results if r.get("ghcid") in intersection] else: # parallel # Run both in parallel, merge results ghcid_set = self.match_entities_in_graph(extraction.entities) semantic_results = self.match_relations_semantically(extraction.relations) # Score boost for results matching both for r in semantic_results: if r.get("ghcid") in ghcid_set: r["score"] *= 1.5 # Boost intersection results = sorted(semantic_results, key=lambda x: -x["score"]) return dspy.Prediction( results=results, entities=extraction.entities, relations=extraction.relations, strategy=extraction.search_strategy, ghcid_set=list(ghcid_set) if 'ghcid_set' in locals() else [] ) ``` --- ## Pattern C: Community Detection and Summaries ### Rationale From Microsoft GraphRAG: Community summaries enable answering holistic questions like "What are the main archival themes in the Netherlands?" ### Implementation Create new file `backend/rag/community_indexer.py`: ```python """ Community Detection and Summary Indexing for Global Search Based on Microsoft GraphRAG (arxiv:2404.16130) community hierarchy pattern. Uses Leiden algorithm for community detection on institution graph. """ import json import logging from dataclasses import dataclass from typing import Optional import dspy import igraph as ig import leidenalg from qdrant_client import QdrantClient, models logger = logging.getLogger(__name__) @dataclass class Community: """A community of related heritage institutions.""" community_id: str ghcids: list[str] summary: str institution_count: int dominant_type: str # Most common institution type dominant_region: str # Most common region themes: list[str] # Extracted themes class CommunitySummarizer(dspy.Signature): """Generate a summary for a community of heritage institutions.""" __doc__ = """ You are a heritage domain expert. Given a list of institutions in a community, generate a concise summary describing: 1. What types of institutions are in this community 2. Geographic concentration (if any) 3. Common themes or specializations 4. Notable relationships between institutions Keep the summary to 2-3 sentences. Focus on what makes this community distinctive. """ institutions: str = dspy.InputField(desc="JSON list of institution metadata") summary: str = dspy.OutputField(desc="2-3 sentence community summary") themes: list[str] = dspy.OutputField(desc="Key themes (3-5 keywords)") notable_features: str = dspy.OutputField(desc="What makes this community distinctive") class CommunityIndexer: """ Builds and indexes institution communities for global search. Usage: indexer = CommunityIndexer(oxigraph_url, qdrant_client) indexer.build_communities() indexer.index_summaries() """ def __init__( self, oxigraph_endpoint: str, qdrant_client: QdrantClient, collection_name: str = "heritage_communities" ): self.oxigraph = oxigraph_endpoint self.qdrant = qdrant_client self.collection_name = collection_name self.summarizer = dspy.ChainOfThought(CommunitySummarizer) def build_institution_graph(self) -> ig.Graph: """ Query Oxigraph for institution relationships. Build igraph for community detection. """ # Get all institutions with their properties sparql = """ PREFIX hc: PREFIX skos: PREFIX schema: SELECT ?ghcid ?name ?type ?city ?region WHERE { ?s hc:ghcid ?ghcid ; skos:prefLabel ?name ; hc:institutionType ?type . OPTIONAL { ?s schema:addressLocality ?city } OPTIONAL { ?s hc:regionCode ?region } } """ institutions = self._execute_sparql(sparql) # Build graph: nodes are institutions, edges connect those sharing: # - Same city # - Same region # - Same type # - Part-of relationships g = ig.Graph() ghcid_to_idx = {} # Add nodes for inst in institutions: idx = g.add_vertex( ghcid=inst["ghcid"], name=inst.get("name", ""), type=inst.get("type", ""), city=inst.get("city", ""), region=inst.get("region", "") ) ghcid_to_idx[inst["ghcid"]] = idx.index # Add edges based on shared properties for i, inst1 in enumerate(institutions): for j, inst2 in enumerate(institutions[i+1:], i+1): weight = 0 # Same city: strong connection if inst1.get("city") and inst1["city"] == inst2.get("city"): weight += 2 # Same region: medium connection if inst1.get("region") and inst1["region"] == inst2.get("region"): weight += 1 # Same type: weak connection if inst1.get("type") and inst1["type"] == inst2.get("type"): weight += 0.5 if weight > 0: g.add_edge( ghcid_to_idx[inst1["ghcid"]], ghcid_to_idx[inst2["ghcid"]], weight=weight ) return g def detect_communities(self, graph: ig.Graph) -> dict[str, list[str]]: """ Apply Leiden algorithm for community detection. Returns mapping: community_id -> [ghcid_list] """ # Leiden with modularity optimization partition = leidenalg.find_partition( graph, leidenalg.ModularityVertexPartition, weights="weight" ) communities = {} for comm_idx, members in enumerate(partition): comm_id = f"comm_{comm_idx:04d}" ghcids = [graph.vs[idx]["ghcid"] for idx in members] communities[comm_id] = ghcids logger.info(f"Detected {len(communities)} communities") return communities def generate_community_summary( self, community_id: str, ghcids: list[str] ) -> Community: """ Generate LLM summary for a community. """ # Fetch metadata for all institutions institutions = self._fetch_institutions(ghcids) # Generate summary result = self.summarizer( institutions=json.dumps(institutions, indent=2) ) # Determine dominant type and region types = [i.get("type", "") for i in institutions] regions = [i.get("region", "") for i in institutions] dominant_type = max(set(types), key=types.count) if types else "" dominant_region = max(set(regions), key=regions.count) if regions else "" return Community( community_id=community_id, ghcids=ghcids, summary=result.summary, institution_count=len(ghcids), dominant_type=dominant_type, dominant_region=dominant_region, themes=result.themes ) def index_summaries(self, communities: list[Community]) -> None: """ Store community summaries in Qdrant for global search. """ # Create collection if not exists self.qdrant.recreate_collection( collection_name=self.collection_name, vectors_config=models.VectorParams( size=384, # MiniLM embedding size distance=models.Distance.COSINE ) ) # Index each community points = [] for comm in communities: embedding = self._embed(comm.summary) points.append(models.PointStruct( id=hash(comm.community_id) % (2**63), vector=embedding, payload={ "community_id": comm.community_id, "summary": comm.summary, "ghcids": comm.ghcids, "institution_count": comm.institution_count, "dominant_type": comm.dominant_type, "dominant_region": comm.dominant_region, "themes": comm.themes } )) self.qdrant.upsert( collection_name=self.collection_name, points=points ) logger.info(f"Indexed {len(points)} community summaries") def global_search(self, query: str, limit: int = 5) -> list[dict]: """ Search community summaries for holistic questions. """ embedding = self._embed(query) results = self.qdrant.search( collection_name=self.collection_name, query_vector=embedding, limit=limit ) return [ { "community_id": r.payload["community_id"], "summary": r.payload["summary"], "themes": r.payload["themes"], "institution_count": r.payload["institution_count"], "score": r.score } for r in results ] def build_and_index(self) -> int: """ Full pipeline: build graph, detect communities, generate summaries, index. Returns number of communities indexed. """ logger.info("Building institution graph...") graph = self.build_institution_graph() logger.info("Detecting communities...") community_map = self.detect_communities(graph) logger.info("Generating community summaries...") communities = [] for comm_id, ghcids in community_map.items(): if len(ghcids) >= 3: # Only summarize communities with 3+ members comm = self.generate_community_summary(comm_id, ghcids) communities.append(comm) logger.info(f"Indexing {len(communities)} community summaries...") self.index_summaries(communities) return len(communities) ``` --- ## Pattern D: Temporal Query Templates ### Rationale From Zep: Bitemporal modeling enables point-in-time queries and provenance tracking. ### Implementation Add to `template_sparql.py`: ```python # ============================================================================= # TEMPORAL QUERY TEMPLATES (Zep Pattern) # ============================================================================= TEMPORAL_QUERY_TEMPLATES = { "point_in_time_state": TemplateDefinition( id="temporal_pit", name="Point-in-Time Institution State", description="Get institution state at a specific point in time", intent_patterns=["what was", "in [year]", "before", "after", "at that time"], sparql_template=""" PREFIX hc: PREFIX skos: PREFIX schema: PREFIX xsd: SELECT ?ghcid ?name ?type ?city ?validFrom ?validTo WHERE { ?s hc:ghcid ?ghcid ; skos:prefLabel ?name ; hc:institutionType ?type ; hc:validFrom ?validFrom . OPTIONAL { ?s schema:addressLocality ?city } OPTIONAL { ?s hc:validTo ?validTo } # Temporal filter: valid at query date FILTER(?validFrom <= "{{ query_date }}"^^xsd:date) FILTER(!BOUND(?validTo) || ?validTo > "{{ query_date }}"^^xsd:date) {% if ghcid_filter %} FILTER(STRSTARTS(?ghcid, "{{ ghcid_filter }}")) {% endif %} } ORDER BY ?ghcid """, slots=[ SlotDefinition(type=SlotType.STRING, name="query_date", required=True), SlotDefinition(type=SlotType.STRING, name="ghcid_filter", required=False) ] ), "institution_history": TemplateDefinition( id="temporal_history", name="Institution Change History", description="Get full history of changes for an institution", intent_patterns=["history of", "changes to", "evolution of", "timeline"], sparql_template=""" PREFIX hc: PREFIX skos: SELECT ?ghcid ?name ?validFrom ?validTo ?changeType ?description WHERE { ?entry hc:ghcid "{{ ghcid }}" ; skos:prefLabel ?name ; hc:validFrom ?validFrom . OPTIONAL { ?entry hc:validTo ?validTo } OPTIONAL { ?entry hc:changeType ?changeType } OPTIONAL { ?entry hc:changeDescription ?description } } ORDER BY ?validFrom """, slots=[ SlotDefinition(type=SlotType.STRING, name="ghcid", required=True) ] ), "institutions_founded_before": TemplateDefinition( id="temporal_founded_before", name="Institutions Founded Before Date", description="Find institutions founded before a specific date", intent_patterns=["founded before", "established before", "older than", "before [year]"], sparql_template=""" PREFIX hc: PREFIX skos: PREFIX schema: PREFIX xsd: SELECT ?ghcid ?name ?type ?city ?foundingDate WHERE { ?s hc:ghcid ?ghcid ; skos:prefLabel ?name ; hc:institutionType ?type ; schema:foundingDate ?foundingDate . OPTIONAL { ?s schema:addressLocality ?city } FILTER(?foundingDate < "{{ cutoff_date }}"^^xsd:date) {% if institution_type %} FILTER(?type = "{{ institution_type }}") {% endif %} } ORDER BY ?foundingDate LIMIT {{ limit | default(50) }} """, slots=[ SlotDefinition(type=SlotType.STRING, name="cutoff_date", required=True), SlotDefinition(type=SlotType.INSTITUTION_TYPE, name="institution_type", required=False), SlotDefinition(type=SlotType.INTEGER, name="limit", required=False, default="50") ] ), "merger_history": TemplateDefinition( id="temporal_mergers", name="Institution Merger History", description="Find institutions that merged or were absorbed", intent_patterns=["merged", "merger", "combined", "absorbed", "joined"], sparql_template=""" PREFIX hc: PREFIX skos: PREFIX crm: SELECT ?event ?eventDate ?description ?sourceGhcid ?sourceName ?targetGhcid ?targetName WHERE { ?event a hc:MergerEvent ; hc:eventDate ?eventDate ; hc:description ?description . OPTIONAL { ?event hc:sourceInstitution ?source . ?source hc:ghcid ?sourceGhcid ; skos:prefLabel ?sourceName . } OPTIONAL { ?event hc:resultingInstitution ?target . ?target hc:ghcid ?targetGhcid ; skos:prefLabel ?targetName . } {% if region_filter %} FILTER(STRSTARTS(?sourceGhcid, "{{ region_filter }}") || STRSTARTS(?targetGhcid, "{{ region_filter }}")) {% endif %} } ORDER BY ?eventDate """, slots=[ SlotDefinition(type=SlotType.STRING, name="region_filter", required=False) ] ) } ``` --- ## Integration Checklist ### Immediate Actions - [ ] Add `ArgumentVerifier` signature to `dspy_heritage_rag.py` - [ ] Add `DualLevelEntityExtractor` signature - [ ] Integrate verification into retrieval pipeline - [ ] Add temporal query templates to `template_sparql.py` ### Short-Term Actions - [ ] Create `backend/rag/community_indexer.py` - [ ] Add Leiden algorithm dependency: `pip install leidenalg python-igraph` - [ ] Create Qdrant collection for community summaries - [ ] Add global search mode to RAG pipeline ### Testing ```bash # Test verification layer python -c " from backend.rag.dspy_heritage_rag import ArgumentVerifier import dspy dspy.configure(lm=...) verifier = dspy.ChainOfThought(ArgumentVerifier) result = verifier( query='How many archives are in Haarlem?', context='Haarlem has several heritage institutions including archives.' ) print(f'Can answer: {result.can_answer}') print(f'Missing: {result.missing_info}') " # Test dual-level extraction python -c " from backend.rag.dspy_heritage_rag import DualLevelEntityExtractor import dspy dspy.configure(lm=...) extractor = dspy.ChainOfThought(DualLevelEntityExtractor) result = extractor(query='Which archives in Haarlem have digitized medieval manuscripts?') print(f'Entities: {result.entities}') print(f'Relations: {result.relations}') print(f'Strategy: {result.search_strategy}') " ```