19 KiB
19 KiB
Retrieval Patterns for Heritage Custodian RAG
Overview
This document describes hybrid retrieval patterns that combine vector similarity, knowledge graph traversal, and structured queries for the Heritage Custodian domain.
Retrieval Strategy Matrix
| Query Type | Primary Strategy | Secondary Strategy | Example Query |
|---|---|---|---|
| Factual | Vector + KG lookup | SPARQL (Wikidata) | "What is the ISIL code for Rijksmuseum?" |
| Exploratory | Vector (broad) | KG traversal | "What museums exist in Limburg?" |
| Relationship | KG traversal | Vector context | "Which institutions are members of NDE?" |
| Comparative | Multi-vector | KG aggregation | "Compare Dutch and Belgian archive systems" |
| Temporal | KG (events) | Vector context | "How has Noord-Hollands Archief changed?" |
| Collection | Vector (semantic) | KG links | "Find archives with WWII collections" |
1. Vector Retrieval Patterns
Basic Semantic Search
class SemanticRetriever:
"""Basic vector similarity retrieval."""
def __init__(self, vector_store):
self.vector_store = vector_store
self.embedder = HeritageEmbedder()
def retrieve(
self,
query: str,
k: int = 10,
type_filter: str = None,
country_filter: str = None,
) -> List[RetrievalResult]:
# Embed query
query_embedding = self.embedder.embed_query(query, type_filter)
# Build filter
filters = {}
if type_filter:
filters["custodian_type"] = type_filter
if country_filter:
filters["country_code"] = country_filter
# Search
results = self.vector_store.query(
query_embedding=query_embedding,
n_results=k,
where=filters if filters else None,
)
return [
RetrievalResult(
text=r["document"],
metadata=r["metadata"],
score=r["distance"],
)
for r in results
]
Filtered Retrieval by Type
class TypeFilteredRetriever:
"""Retrieval with GLAMORCUBESFIXPHDNT type filtering."""
TYPE_GROUPS = {
"CULTURAL": ["G", "M", "A", "L"], # Core GLAM
"HERITAGE_SOCIETIES": ["S", "I", "N"], # Community organizations
"INSTITUTIONAL": ["O", "R", "E"], # Government/academic
"SPECIALIZED": ["B", "H", "T", "F"], # Domain-specific
"DIGITAL": ["D"], # Digital platforms
"PRIVATE": ["C", "P"], # Commercial/personal
}
def retrieve_by_group(
self,
query: str,
type_group: str,
k: int = 10,
) -> List[RetrievalResult]:
types = self.TYPE_GROUPS.get(type_group, [])
# Multi-type OR filter
all_results = []
for ctype in types:
results = self.semantic_retriever.retrieve(
query=query,
k=k // len(types) + 1,
type_filter=ctype,
)
all_results.extend(results)
# Re-rank by relevance
all_results.sort(key=lambda r: r.score)
return all_results[:k]
Multi-Aspect Retrieval
Retrieve across different ontology aspects (Name, Place, Collection, etc.):
class MultiAspectRetriever:
"""Retrieve from multiple ontology aspect collections."""
ASPECT_COLLECTIONS = {
"custodian": "custodian_chunks",
"collection": "collection_chunks",
"place": "place_chunks",
"platform": "platform_chunks",
"project": "project_chunks",
}
def retrieve(
self,
query: str,
aspects: List[str] = None,
k_per_aspect: int = 5,
) -> Dict[str, List[RetrievalResult]]:
aspects = aspects or list(self.ASPECT_COLLECTIONS.keys())
results = {}
for aspect in aspects:
collection = self.ASPECT_COLLECTIONS.get(aspect)
if collection:
results[aspect] = self.vector_store.query(
collection=collection,
query_embedding=self.embedder.embed_query(query),
n_results=k_per_aspect,
)
return results
2. Knowledge Graph Retrieval Patterns
Entity Lookup
class KGEntityRetriever:
"""Retrieve entities from TypeDB knowledge graph."""
def __init__(self, typedb_client):
self.client = typedb_client
def get_by_identifier(self, scheme: str, value: str) -> Optional[CustodianEntity]:
"""Lookup entity by identifier (ISIL, Wikidata, etc.)."""
query = f"""
match
$c isa custodian;
$i isa identifier, has scheme "{scheme}", has value "{value}";
(custodian: $c, identifier: $i) isa has-identifier;
get $c;
"""
results = self.client.query(query)
if results:
return self._parse_custodian(results[0])
return None
def get_by_ghcid(self, ghcid: str) -> Optional[CustodianEntity]:
"""Lookup entity by GHCID."""
query = f"""
match
$c isa custodian, has ghcid "{ghcid}";
get $c;
"""
results = self.client.query(query)
if results:
return self._parse_custodian(results[0])
return None
Relationship Traversal
class KGRelationshipRetriever:
"""Traverse relationships in knowledge graph."""
def get_members_of(self, body_name: str) -> List[CustodianEntity]:
"""Get all members of an encompassing body."""
query = f"""
match
$body isa encompassing-body, has name "{body_name}";
$c isa custodian;
(member: $c, body: $body) isa member-of;
get $c;
"""
return self._execute_and_parse(query)
def get_collections_of(self, custodian_ghcid: str) -> List[Collection]:
"""Get all collections managed by a custodian."""
query = f"""
match
$c isa custodian, has ghcid "{custodian_ghcid}";
$coll isa collection;
(custodian: $c, collection: $coll) isa manages-collection;
get $coll;
"""
return self._execute_and_parse(query)
def get_related_projects(self, custodian_ghcid: str) -> List[Project]:
"""Get projects a custodian participated in."""
query = f"""
match
$c isa custodian, has ghcid "{custodian_ghcid}";
$p isa project;
(participant: $c, project: $p) isa participated-in-project;
get $p;
"""
return self._execute_and_parse(query)
def get_change_events(self, custodian_ghcid: str) -> List[ChangeEvent]:
"""Get organizational change events for a custodian."""
query = f"""
match
$c isa custodian, has ghcid "{custodian_ghcid}";
$e isa change-event;
(affected: $c, event: $e) isa affected-by-event;
get $e;
order by $e.event-date asc;
"""
return self._execute_and_parse(query)
Graph-Based Exploration
class KGExplorationRetriever:
"""Explore knowledge graph neighborhoods."""
def get_neighborhood(
self,
entity_ghcid: str,
depth: int = 2,
relationship_types: List[str] = None,
) -> NetworkX.Graph:
"""Get n-hop neighborhood around an entity."""
rel_filter = ""
if relationship_types:
rel_filter = f"$r type in [{', '.join(relationship_types)}];"
query = f"""
match
$c isa custodian, has ghcid "{entity_ghcid}";
$c2 isa custodian;
$r ($c, $c2) isa relation;
{rel_filter}
get $c, $c2, $r;
"""
# Build graph from results
G = nx.Graph()
for result in self.client.query(query):
G.add_edge(
result["c"]["ghcid"],
result["c2"]["ghcid"],
relationship=result["r"]["type"]
)
return G
def find_path(
self,
source_ghcid: str,
target_ghcid: str,
max_depth: int = 4,
) -> Optional[List[str]]:
"""Find shortest path between two entities."""
G = self.get_neighborhood(source_ghcid, depth=max_depth)
try:
return nx.shortest_path(G, source_ghcid, target_ghcid)
except nx.NetworkXNoPath:
return None
3. SPARQL Federation Patterns
Wikidata Enrichment
class WikidataRetriever:
"""Retrieve from Wikidata via SPARQL."""
def enrich_custodian(self, wikidata_id: str) -> dict:
"""Get additional data from Wikidata."""
query = f"""
SELECT ?instanceOf ?country ?coords ?website ?viaf ?isni WHERE {{
wd:{wikidata_id} wdt:P31 ?instanceOf .
OPTIONAL {{ wd:{wikidata_id} wdt:P17 ?country }}
OPTIONAL {{ wd:{wikidata_id} wdt:P625 ?coords }}
OPTIONAL {{ wd:{wikidata_id} wdt:P856 ?website }}
OPTIONAL {{ wd:{wikidata_id} wdt:P214 ?viaf }}
OPTIONAL {{ wd:{wikidata_id} wdt:P213 ?isni }}
}}
"""
return self._execute_sparql(query)
def find_similar_institutions(
self,
instance_of: str, # e.g., Q33506 (museum)
country: str, # e.g., Q55 (Netherlands)
limit: int = 50,
) -> List[dict]:
"""Find similar institutions in Wikidata."""
query = f"""
SELECT ?item ?itemLabel ?coords WHERE {{
?item wdt:P31 wd:{instance_of} ;
wdt:P17 wd:{country} .
OPTIONAL {{ ?item wdt:P625 ?coords }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,nl" }}
}}
LIMIT {limit}
"""
return self._execute_sparql(query)
Cross-Source Linking
class CrossSourceLinker:
"""Link entities across sources using identifiers."""
def link_by_isil(self, isil_code: str) -> dict:
"""Link entity across all sources by ISIL."""
results = {
"local_kg": self.kg_retriever.get_by_identifier("ISIL", isil_code),
"wikidata": self._search_wikidata_by_isil(isil_code),
"vector_chunks": self.vector_retriever.retrieve(
query=f"ISIL {isil_code}",
k=5,
),
}
return results
def _search_wikidata_by_isil(self, isil_code: str) -> Optional[str]:
"""Find Wikidata entity by ISIL code."""
query = f"""
SELECT ?item WHERE {{
?item wdt:P791 "{isil_code}" .
}}
"""
results = wikidata_execute_sparql(query)
if results:
return results[0]["item"]["value"].split("/")[-1]
return None
4. Hybrid Retrieval Patterns
Query-Adaptive Retrieval
class AdaptiveRetriever:
"""Select retrieval strategy based on query intent."""
def __init__(self):
self.query_router = dspy.ChainOfThought(QueryRouter)
self.semantic_retriever = SemanticRetriever()
self.kg_retriever = KGEntityRetriever()
self.sparql_retriever = WikidataRetriever()
def retrieve(self, query: str, k: int = 10) -> List[RetrievalResult]:
# Classify query intent
intent = self.query_router(query=query).intent
results = []
# Route to appropriate retrieval
if intent.intent_type == "factual":
# Try KG lookup first, then vector
if intent.entity_mentions:
for entity in intent.entity_mentions:
kg_result = self.kg_retriever.search_by_name(entity)
if kg_result:
results.append(kg_result)
# Augment with vector results
vector_results = self.semantic_retriever.retrieve(query, k=k)
results.extend(vector_results)
elif intent.intent_type == "exploratory":
# Broad vector search with type filtering
for ctype in intent.custodian_types:
type_results = self.semantic_retriever.retrieve(
query=query,
k=k // len(intent.custodian_types),
type_filter=ctype,
country_filter=intent.geographic_scope,
)
results.extend(type_results)
elif intent.intent_type == "relationship":
# KG traversal primary
for entity in intent.entity_mentions:
related = self.kg_retriever.get_neighborhood(entity)
results.extend(related)
# Context from vector
context_results = self.semantic_retriever.retrieve(query, k=k//2)
results.extend(context_results)
elif intent.intent_type == "temporal":
# KG events + vector context
for entity in intent.entity_mentions:
events = self.kg_retriever.get_change_events(entity)
results.extend(events)
vector_results = self.semantic_retriever.retrieve(query, k=k)
results.extend(vector_results)
return self._dedupe_and_rank(results)[:k]
Reciprocal Rank Fusion
class RRFRetriever:
"""Combine multiple retrieval strategies using Reciprocal Rank Fusion."""
def __init__(self, k: int = 60):
self.k = k # RRF parameter
self.retrievers = {
"semantic": SemanticRetriever(),
"kg": KGEntityRetriever(),
"sparse": SparseRetriever(), # BM25 or similar
}
def retrieve(self, query: str, n: int = 10) -> List[RetrievalResult]:
# Get results from all retrievers
all_rankings = {}
for name, retriever in self.retrievers.items():
results = retriever.retrieve(query, k=n * 2)
for rank, result in enumerate(results):
doc_id = result.metadata.get("ghcid") or hash(result.text)
if doc_id not in all_rankings:
all_rankings[doc_id] = {"result": result, "scores": {}}
all_rankings[doc_id]["scores"][name] = rank
# Calculate RRF scores
for doc_id, data in all_rankings.items():
rrf_score = sum(
1.0 / (self.k + rank)
for rank in data["scores"].values()
)
data["rrf_score"] = rrf_score
# Sort by RRF score
sorted_results = sorted(
all_rankings.values(),
key=lambda x: x["rrf_score"],
reverse=True,
)
return [r["result"] for r in sorted_results[:n]]
5. Context Aggregation
Multi-Hop Context Building
class ContextBuilder:
"""Build rich context from multiple retrieval results."""
def build_context(
self,
query: str,
primary_results: List[RetrievalResult],
max_tokens: int = 4000,
) -> str:
context_parts = []
token_count = 0
# Add primary results
for result in primary_results:
text = result.text
tokens = len(text.split()) * 1.3 # Rough token estimate
if token_count + tokens > max_tokens:
break
# Format with source attribution
source = result.metadata.get("source_type", "unknown")
tier = result.metadata.get("data_tier", "TIER_4")
formatted = f"[Source: {source}, Tier: {tier}]\n{text}\n"
context_parts.append(formatted)
token_count += tokens
# Add relationship context if space
if token_count < max_tokens * 0.8:
rel_context = self._get_relationship_context(primary_results)
context_parts.append(f"\n[Relationships]\n{rel_context}")
return "\n---\n".join(context_parts)
def _get_relationship_context(self, results: List[RetrievalResult]) -> str:
"""Extract and format relationship information."""
relationships = []
for result in results:
ghcid = result.metadata.get("ghcid")
if ghcid:
# Get relationships from KG
members = self.kg_retriever.get_members_of_body(ghcid)
projects = self.kg_retriever.get_related_projects(ghcid)
if members:
relationships.append(f"Members of {ghcid}: {', '.join(m.name for m in members)}")
if projects:
relationships.append(f"Projects: {', '.join(p.name for p in projects)}")
return "\n".join(relationships)
Performance Optimization
Caching Strategy
class CachedRetriever:
"""Retriever with multi-level caching."""
def __init__(self):
self.query_cache = LRUCache(maxsize=1000) # Query → results
self.entity_cache = TTLCache(maxsize=5000, ttl=3600) # Entity lookups
self.embedding_cache = DiskCache("embeddings/") # Persistent embeddings
def retrieve(self, query: str, **kwargs) -> List[RetrievalResult]:
# Check query cache
cache_key = f"{query}:{json.dumps(kwargs, sort_keys=True)}"
if cache_key in self.query_cache:
return self.query_cache[cache_key]
# Get embedding (cached)
if query not in self.embedding_cache:
self.embedding_cache[query] = self.embedder.embed_query(query)
embedding = self.embedding_cache[query]
# Execute retrieval
results = self._do_retrieve(embedding, **kwargs)
# Cache results
self.query_cache[cache_key] = results
return results
Batch Processing
class BatchRetriever:
"""Efficient batch retrieval for multiple queries."""
def batch_retrieve(
self,
queries: List[str],
k: int = 10,
) -> Dict[str, List[RetrievalResult]]:
# Batch embed queries
embeddings = self.embedder.batch_encode(queries)
# Batch search
results = self.vector_store.batch_query(
query_embeddings=embeddings,
n_results=k,
)
return {
query: results[i]
for i, query in enumerate(queries)
}