glam/backend/rag/hybrid_retriever.py
kempersc 80eb3d969c Add new slots for heritage custodian ontology
- Introduced `has_api_version`, `has_appellation_language`, `has_appellation_type`, `has_appellation_value`, `has_applicable_country`, `has_application_deadline`, `has_application_opening_date`, `has_appraisal_note`, `has_approval_date`, `has_archdiocese_name`, `has_architectural_style`, `has_archival_reference`, `has_archive_description`, `has_archive_memento_uri`, `has_archive_name`, `has_archive_path`, `has_archive_search_score`, `has_arrangement`, `has_arrangement_level`, `has_arrangement_note`, `has_articles_archival_stage`, `has_articles_document_format`, `has_articles_document_url`, `has_articles_of_association`, `has_or_had_altitude`, `has_or_had_annotation`, `has_or_had_arrangement`, `has_or_had_document`, `has_or_had_reason`, `has_or_had_style`, `is_or_was_amended_through`, `is_or_was_approved_on`, `is_or_was_archived_as`, `is_or_was_due_on`, `is_or_was_opened_on`, and `is_or_was_used_in` slots.
- Each slot includes detailed descriptions, range specifications, and appropriate mappings to existing ontologies.
2026-01-27 10:07:16 +01:00

2534 lines
100 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Hybrid Retriever: Vector Search + Knowledge Graph Expansion
Combines Qdrant vector similarity search with Oxigraph SPARQL graph expansion
to provide semantically-aware and structurally-enriched retrieval.
Architecture:
1. Vector Search (Qdrant) - Find semantically similar institutions AND persons
2. Graph Expansion (Oxigraph) - Expand via relationships:
- Same city/region
- Same institution type
- Related collections
- Organizational relationships
3. Re-ranking - Combine scores for final ranking
4. Query Routing - Detect if query is about institutions or persons
Collections:
- heritage_custodians: Institution data (27K+ records)
- heritage_persons: Staff/person data (10K+ records)
Example usage:
retriever = HybridRetriever(
qdrant_host="localhost",
qdrant_port=6333,
sparql_endpoint="http://localhost:7878/query"
)
# Institution search
results = retriever.search("museums with Dutch colonial history")
# Person search (auto-detected or explicit)
results = retriever.search("Who works at the Nationaal Archief?")
results = retriever.search_persons("archivist at Rijksmuseum")
"""
import hashlib
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Any, TYPE_CHECKING
import httpx
# Polygon filter for geographic containment testing (Dutch provinces)
from glam_extractor.geocoding.polygon_filter import (
get_polygon_filter,
ProvincePolygonFilter,
)
if TYPE_CHECKING:
from qdrant_client import QdrantClient
from openai import OpenAI
from sentence_transformers import SentenceTransformer
# Forward reference as string to avoid circular imports
MultiEmbeddingRetriever = Any # Actually from glam_extractor.api.multi_embedding_retriever
EmbeddingModel = Any # Actually from glam_extractor.api.multi_embedding_retriever
logger = logging.getLogger(__name__)
# SPARQL endpoint configuration
DEFAULT_SPARQL_ENDPOINT = os.getenv("SPARQL_ENDPOINT", "http://localhost:7878/query")
DEFAULT_SPARQL_TIMEOUT = 30.0
# Ontology prefixes used in Oxigraph
SPARQL_PREFIXES = """
PREFIX hc: <https://nde.nl/ontology/hc/>
PREFIX hcc: <https://nde.nl/ontology/hc/class/>
PREFIX ghc: <https://w3id.org/heritage/custodian/>
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX schema: <http://schema.org/>
PREFIX geo: <http://www.w3.org/2003/01/geo/wgs84_pos#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
"""
@dataclass
class RetrievedInstitution:
"""A retrieved heritage institution with combined scores."""
ghcid: str
name: str
uri: str
vector_score: float = 0.0
graph_score: float = 0.0
combined_score: float = 0.0
# Metadata from vector search
institution_type: str | None = None
country: str | None = None
city: str | None = None
description: str | None = None
# Geographic coordinates
latitude: float | None = None
longitude: float | None = None
# Graph expansion data
related_institutions: list[str] = field(default_factory=list)
expansion_reason: str | None = None # e.g., "same_city", "same_type", "related_collection"
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API responses."""
return {
"ghcid": self.ghcid,
"name": self.name,
"uri": self.uri,
"scores": {
"vector": round(self.vector_score, 4),
"graph": round(self.graph_score, 4),
"combined": round(self.combined_score, 4),
},
"metadata": {
"institution_type": self.institution_type,
"country": self.country,
"city": self.city,
"description": self.description,
"latitude": self.latitude,
"longitude": self.longitude,
},
"graph_expansion": {
"related_institutions": self.related_institutions,
"expansion_reason": self.expansion_reason,
}
}
# ===================================================================
# Linked Data URI Generation Utilities
# ===================================================================
# Generate stable ontology-aligned URIs for Person and PersonObservation
# following the LinkML schema at schemas/20251121/linkml/
# Namespace: https://nde.nl/ontology/hc/
# ===================================================================
import re
import unicodedata
# Ontology namespaces
ONTOLOGY_BASE = "https://nde.nl/ontology/hc"
PERSON_HUB_PREFIX = f"{ONTOLOGY_BASE}/person"
PERSON_OBS_PREFIX = f"{ONTOLOGY_BASE}/person-obs"
CUSTODIAN_PREFIX = f"{ONTOLOGY_BASE}/custodian"
# JSON-LD context for person search responses
PERSON_JSONLD_CONTEXT = {
"@vocab": f"{ONTOLOGY_BASE}/",
"schema": "http://schema.org/",
"pico": "https://personsincontext.org/model#",
"prov": "http://www.w3.org/ns/prov#",
"foaf": "http://xmlns.com/foaf/0.1/",
"name": "schema:name",
"jobTitle": "schema:jobTitle",
"affiliation": "schema:affiliation",
"sameAs": "schema:sameAs",
"refers_to_person": "pico:observationOf",
"observation_source": "prov:hadPrimarySource",
}
def generate_slug(text: str) -> str:
"""Generate URL-safe slug from text.
Examples:
"Kitty Bogte""kitty-bogte"
"Dr. Jane Smith""dr-jane-smith"
"Taco Dibbits""taco-dibbits"
"""
if not text:
return "unknown"
# Normalize unicode (NFD decomposition) and remove diacritics
normalized = unicodedata.normalize('NFD', text)
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Convert to lowercase
lowercase = ascii_text.lower()
# Replace non-alphanumeric with hyphens
slug = re.sub(r'[^a-z0-9]+', '-', lowercase)
# Collapse multiple hyphens and strip leading/trailing
slug = re.sub(r'-+', '-', slug).strip('-')
return slug or "unknown"
def generate_role_slug(headline: str | None) -> str:
"""Generate role slug from job title/headline.
Examples:
"Programmer/curator""programmer-curator"
"Senior Archivist""senior-archivist"
None → "staff"
"""
if not headline:
return "staff"
return generate_slug(headline)
def generate_person_hub_uri(name: str, linkedin_slug: str | None = None) -> str:
"""Generate Person hub URI (abstract identity).
Format: https://nde.nl/ontology/hc/person/{person-slug}
Uses LinkedIn slug if available for stability, otherwise derives from name.
Examples:
generate_person_hub_uri("Kitty Bogte", "kittybogte")
"https://nde.nl/ontology/hc/person/kittybogte"
generate_person_hub_uri("Dr. Jane Smith")
"https://nde.nl/ontology/hc/person/dr-jane-smith"
"""
if linkedin_slug:
slug = linkedin_slug
else:
slug = generate_slug(name)
return f"{PERSON_HUB_PREFIX}/{slug}"
def generate_observation_uri(
custodian_slug: str | None,
person_name: str,
role_slug: str | None = None,
linkedin_slug: str | None = None
) -> str:
"""Generate PersonObservation URI.
Format: https://nde.nl/ontology/hc/person-obs/{custodian-slug}/{person-slug}/{role-slug}
Examples:
generate_observation_uri("nl-ga-nationaal-archief", "Kitty Bogte", "programmer-curator")
"https://nde.nl/ontology/hc/person-obs/nl-ga-nationaal-archief/kitty-bogte/programmer-curator"
"""
custodian = custodian_slug or "unknown-custodian"
person = linkedin_slug or generate_slug(person_name)
role = role_slug or "staff"
return f"{PERSON_OBS_PREFIX}/{custodian}/{person}/{role}"
def generate_custodian_uri(custodian_slug: str | None, ghcid: str | None = None) -> str | None:
"""Generate Custodian URI.
Format: https://nde.nl/ontology/hc/custodian/{ghcid-or-slug}
"""
if ghcid:
return f"{CUSTODIAN_PREFIX}/{ghcid}"
elif custodian_slug:
return f"{CUSTODIAN_PREFIX}/{custodian_slug}"
return None
def extract_linkedin_slug(linkedin_url: str | None) -> str | None:
"""Extract slug from LinkedIn URL.
Examples:
"https://www.linkedin.com/in/kittybogte""kittybogte"
"https://linkedin.com/in/jane-smith-12345""jane-smith-12345"
"""
if not linkedin_url:
return None
match = re.search(r'linkedin\.com/in/([^/?]+)', linkedin_url)
return match.group(1) if match else None
@dataclass
class RetrievedPerson:
"""A retrieved person/staff member with search scores and linked data URIs."""
person_id: str
name: str
vector_score: float = 0.0
combined_score: float = 0.0
richness_score: float = 0.0 # Metadata richness score (0-1)
# Metadata from vector search
headline: str | None = None # Job title/role
custodian_name: str | None = None # Organization they work at
custodian_slug: str | None = None
location: str | None = None
heritage_relevant: bool = False
heritage_type: str | None = None # GLAMORCUBESFIXPHDNT code
source_type: str | None = None # "staff_list" or "entity_profile"
linkedin_url: str | None = None
has_wcms: bool = False # WCMS-registered profile (heritage sector user)
# WCMS-specific fields for display on review page
wcms_user_id: str | None = None
wcms_abs_id: str | None = None # NAN identifier
wcms_crm_id: str | None = None
wcms_username: str | None = None
wcms_username_url: str | None = None
wcms_status: str | None = None # "Active" or "Blocked"
wcms_roles: list[str] | None = None
wcms_registered_since: str | None = None
wcms_last_access: str | None = None
# Contact details
email: str | None = None
email_domain: str | None = None
# Linked data fields (generated)
linkedin_profile_path: str | None = None # Path to entity JSON file
@property
def linkedin_slug(self) -> str | None:
"""Extract LinkedIn slug from URL."""
return extract_linkedin_slug(self.linkedin_url)
@property
def person_hub_uri(self) -> str:
"""Generate Person hub URI (abstract identity)."""
return generate_person_hub_uri(self.name, self.linkedin_slug)
@property
def observation_uri(self) -> str:
"""Generate PersonObservation URI."""
role_slug = generate_role_slug(self.headline)
return generate_observation_uri(
self.custodian_slug,
self.name,
role_slug,
self.linkedin_slug
)
@property
def custodian_uri(self) -> str | None:
"""Generate Custodian URI."""
return generate_custodian_uri(self.custodian_slug)
def to_dict(self, include_jsonld: bool = True) -> dict[str, Any]:
"""Convert to dictionary for API responses.
Args:
include_jsonld: If True, include JSON-LD linked data fields (@id, @type, etc.)
"""
result = {
"person_id": self.person_id,
"name": self.name,
"scores": {
"vector": round(self.vector_score, 4),
"combined": round(self.combined_score, 4),
"richness": round(self.richness_score, 4),
},
"metadata": {
"headline": self.headline,
"custodian_name": self.custodian_name,
"custodian_slug": self.custodian_slug,
"location": self.location,
"heritage_relevant": self.heritage_relevant,
"heritage_type": self.heritage_type,
"source_type": self.source_type,
"linkedin_url": self.linkedin_url,
"has_wcms": self.has_wcms,
# WCMS fields for review page
"wcms_user_id": self.wcms_user_id,
"wcms_abs_id": self.wcms_abs_id,
"wcms_crm_id": self.wcms_crm_id,
"wcms_username": self.wcms_username,
"wcms_username_url": self.wcms_username_url,
"wcms_status": self.wcms_status,
"wcms_roles": self.wcms_roles,
"wcms_registered_since": self.wcms_registered_since,
"wcms_last_access": self.wcms_last_access,
# Contact details
"email": self.email,
"email_domain": self.email_domain,
}
}
if include_jsonld:
# Add JSON-LD linked data fields
result["@id"] = self.observation_uri
result["@type"] = "pico:PersonObservation"
result["refers_to_person"] = self.person_hub_uri
# Add custodian affiliation if available
if self.custodian_uri:
result["unit_affiliation"] = self.custodian_uri
# Add schema:sameAs for LinkedIn URL
if self.linkedin_url:
result["schema:sameAs"] = self.linkedin_url
# Add linkedin_profile_path if available
if self.linkedin_profile_path:
result["linkedin_profile_path"] = self.linkedin_profile_path
return result
# Query type detection patterns
PERSON_QUERY_PATTERNS = [
# Dutch
"wie werkt", "wie werk", "werken in", "werken bij", "medewerker", "personeel",
"staff", "werknemer", "expert", "experts", "specialist", "specialisten",
"directeur", "curator", "archivaris", "bibliothecaris", "conservator",
"team", "collega", "collegas", "mensen bij", "werkzaam",
# English
"who works", "staff at", "employees", "team at", "people at", "work at",
"director of", "curator at", "archivist", "librarian", "works at",
"experts at", "specialists", "professionals at",
# Generic
"linkedin", "person", "professional",
]
# ===================================================================
# Dutch Province/Subdivision Code Mapping (ISO 3166-2:NL)
# ===================================================================
# Maps province names (lowercase, various spellings) to ISO 3166-2 codes
# Used for filtering Qdrant queries by region
# Qdrant payload field: "region" (stores short codes like "NH", "ZH")
# ===================================================================
DUTCH_PROVINCE_CODES: dict[str, str] = {
# Noord-Holland
"noord-holland": "NH",
"noordholland": "NH",
"north holland": "NH",
"north-holland": "NH",
# Zuid-Holland
"zuid-holland": "ZH",
"zuidholland": "ZH",
"south holland": "ZH",
"south-holland": "ZH",
# Utrecht
"utrecht": "UT",
# Gelderland
"gelderland": "GE",
# Noord-Brabant
"noord-brabant": "NB",
"noordbrabant": "NB",
"brabant": "NB",
"north brabant": "NB",
# Limburg
"limburg": "LI",
# Overijssel
"overijssel": "OV",
# Friesland / Fryslân
"friesland": "FR",
"fryslân": "FR",
"fryslan": "FR",
# Groningen
"groningen": "GR",
# Drenthe
"drenthe": "DR",
# Flevoland
"flevoland": "FL",
# Zeeland
"zeeland": "ZE",
}
def get_province_code(province_name: str | None) -> str | None:
"""Convert Dutch province name to ISO 3166-2 subdivision code (without country prefix).
Args:
province_name: Province name in Dutch or English (case-insensitive)
Returns:
Two-letter province code (e.g., "NH", "ZH") or None if not found
Example:
>>> get_province_code("Noord-Holland")
'NH'
>>> get_province_code("south holland")
'ZH'
>>> get_province_code("Bavaria")
None
"""
if not province_name:
return None
return DUTCH_PROVINCE_CODES.get(province_name.lower().strip())
def looks_like_person_name(query: str) -> bool:
"""Detect if query looks like a person's name for name-boosted search.
A query looks like a person name if it:
- Contains 2-4 capitalized words (first/last name pattern)
- Does NOT contain common non-name words (institutions, locations, etc.)
- Does NOT contain question words (who, what, where, etc.)
Args:
query: Search query string
Returns:
True if query appears to be a person name
Examples:
>>> looks_like_person_name("Kitty Bogte")
True
>>> looks_like_person_name("Who works at the Rijksmuseum?")
False
>>> looks_like_person_name("archivist at Nationaal Archief")
False
"""
# Skip if query contains question words or common phrases
non_name_indicators = [
# Question words
"who", "what", "where", "which", "how", "why",
"wie", "wat", "waar", "welk", "hoe", "waarom",
# Role/job indicators
"works at", "working at", "werkt bij", "werkzaam",
"archivist", "curator", "director", "librarian",
"archivaris", "directeur", "bibliothecaris",
# Prepositions indicating context
" at ", " in ", " of ", " for ", " the ",
" bij ", " in ", " van ", " voor ", " de ", " het ",
# Punctuation that indicates non-name queries
"?", "!",
]
query_lower = query.lower()
for indicator in non_name_indicators:
if indicator in query_lower:
return False
# Check for capitalized word pattern (typical of names)
words = query.strip().split()
if len(words) < 2 or len(words) > 4:
return False
# Check if words look like name components (capitalized or all letters)
capitalized_count = sum(1 for w in words if w[0].isupper() and w.isalpha())
# Most name words should be capitalized
return capitalized_count >= len(words) - 1 # Allow one lowercase (e.g., "van", "de")
def calculate_name_match_boost(query: str, name: str) -> float:
"""Calculate a score boost for name matching.
Uses case-insensitive substring matching to boost results where
the query matches part or all of the person's name.
Args:
query: Search query (potential name)
name: Person's name from search result
Returns:
Boost factor (1.0 = no boost, >1.0 = boosted)
- 3.0: Exact match (case-insensitive)
- 2.5: Query contains full name or name contains full query
- 2.0: Partial match (first or last name matches)
- 1.0: No match
"""
query_lower = query.lower().strip()
name_lower = name.lower().strip()
# Exact match
if query_lower == name_lower:
return 3.0
# Query is substring of name or vice versa
if query_lower in name_lower or name_lower in query_lower:
return 2.5
# Check for partial matches (first or last name)
query_parts = set(query_lower.split())
name_parts = set(name_lower.split())
# How many query parts match name parts?
matching_parts = query_parts & name_parts
if matching_parts:
# More matching parts = higher boost
match_ratio = len(matching_parts) / max(len(query_parts), len(name_parts))
return 1.0 + match_ratio # 1.5-2.0 range for partial matches
return 1.0 # No boost
def detect_query_type(query: str, dspy_entity_type: str | None = None) -> str:
"""Detect if query is about institutions or persons.
Uses DSPy LLM classification if provided, falls back to keyword heuristics.
Args:
query: Search query string
dspy_entity_type: Optional entity_type from DSPy HeritageQueryRouter
("person", "institution", or "both")
Returns:
"person" or "institution"
"""
# Prefer DSPy semantic classification when available
if dspy_entity_type:
if dspy_entity_type in ("person", "both"):
return "person"
if dspy_entity_type == "institution":
return "institution"
# Fallback to keyword heuristics
query_lower = query.lower()
for pattern in PERSON_QUERY_PATTERNS:
if pattern in query_lower:
return "person"
return "institution"
# ===================================================================
# Schema-Aware Filter Mapping for DSPy Heritage Query Router
# ===================================================================
#
# These mappings are now loaded DYNAMICALLY from the LinkML schema files
# via the ontology_mapping module. This ensures:
# 1. Schema is the single source of truth (no hardcoded values)
# 2. Multilingual support (Dutch, German, French, Spanish, etc.)
# 3. Automatic updates when schema changes
#
# The ontology_mapping module extracts synonyms from YAML comments
# and provides fuzzy matching for natural language queries.
# ===================================================================
def _get_custodian_type_mapping() -> dict[str, str]:
"""Get custodian type to heritage code mapping from schema.
Dynamically loads from CustodianPrimaryTypeEnum in LinkML schema.
Falls back to minimal hardcoded mapping if schema unavailable.
Returns:
Dict mapping custodian type (e.g., "MUSEUM") to heritage code (e.g., "M")
"""
try:
# Try backend.rag path first (when backend is in Python path)
from backend.rag.ontology_mapping import get_custodian_type_mapping
mapping = get_custodian_type_mapping()
if mapping:
return mapping
except ImportError:
try:
# Fallback: try direct import (when ontology_mapping is in sys.path)
from ontology_mapping import get_custodian_type_mapping # type: ignore[import-not-found]
mapping = get_custodian_type_mapping()
if mapping:
return mapping
except ImportError:
logger.warning("ontology_mapping not available, using fallback mapping")
except Exception as e:
logger.warning(f"Failed to load custodian type mapping from schema: {e}")
# Fallback: minimal GLAMORCUBESFIXPHDNT mapping
return {
"GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", "MUSEUM": "M",
"OFFICIAL_INSTITUTION": "O", "RESEARCH_CENTER": "R", "CORPORATION": "C",
"UNKNOWN": "U", "BIO_CUSTODIAN": "B", "EDUCATION_PROVIDER": "E",
"COLLECTING_SOCIETY": "S", "FEATURE": "F", "INTANGIBLE_HERITAGE_GROUP": "I",
"MIXED": "X", "PERSONAL_COLLECTION": "P", "HOLY_SITE": "H",
"DIGITAL_PLATFORM": "D", "NGO": "N", "TASTE_SMELL_HERITAGE": "T",
}
def _get_role_category_keywords() -> dict[str, list[str]]:
"""Get role category keywords from schema.
Dynamically loads from RoleCategoryEnum in LinkML schema.
Falls back to hardcoded keywords if schema unavailable.
Returns:
Dict mapping role category (e.g., "CURATORIAL") to keywords list
"""
try:
# Try backend.rag path first (when backend is in Python path)
from backend.rag.ontology_mapping import get_role_keywords
keywords = get_role_keywords()
if keywords:
return keywords
except ImportError:
try:
# Fallback: try direct import (when ontology_mapping is in sys.path)
from ontology_mapping import get_role_keywords # type: ignore[import-not-found]
keywords = get_role_keywords()
if keywords:
return keywords
except ImportError:
logger.warning("ontology_mapping not available, using fallback role keywords")
except Exception as e:
logger.warning(f"Failed to load role keywords from schema: {e}")
# Fallback: essential role category keywords (hardcoded)
return {
"CURATORIAL": [
"curator", "curatorial", "collectie", "collection", "tentoonstellingen",
"exhibitions", "acquisitions", "registrar", "museum professional"
],
"CONSERVATION": [
"conservator", "conservation", "restaurator", "restoration", "preservatie",
"preservation", "materiaal", "material", "preventive"
],
"ARCHIVAL": [
"archivist", "archivaris", "archief", "archive", "records", "documentalist",
"erfgoed", "heritage records", "acquisitie", "beschrijving"
],
"LIBRARY": [
"bibliothecaris", "librarian", "bibliotheek", "library", "catalogus",
"cataloging", "metadata", "special collections", "reference"
],
"DIGITAL": [
"digital", "digitaal", "developer", "data", "software", "IT", "tech",
"engineer", "digitalisering", "digitization", "web", "database"
],
"EDUCATION": [
"educatie", "education", "learning", "museum educator", "outreach",
"public programs", "docent", "teacher", "rondleiding", "guide"
],
"GOVERNANCE": [
"bestuur", "board", "governance", "trustee", "raad", "council",
"advisory", "commissie", "committee"
],
"LEADERSHIP": [
"director", "directeur", "manager", "head of", "hoofd", "chief",
"CEO", "president", "leider", "leadership"
],
"RESEARCH": [
"onderzoek", "research", "researcher", "wetenschapper", "scientist",
"academic", "scholar", "fellow", "postdoc", "PhD"
],
"TECHNICAL": [
"technical", "technisch", "facilities", "installation", "AV",
"audiovisual", "lighting", "security", "beveiliging"
],
"SUPPORT": [
"support", "admin", "administratie", "office", "HR", "finance",
"marketing", "communications", "front desk", "visitor services"
],
"CREATIVE": [
"design", "ontwerp", "creative", "graphic", "exhibition design",
"multimedia", "artist", "kunstenaar", "visual"
],
"EXTERNAL": [
"volunteer", "vrijwilliger", "intern", "stagiair", "consultant",
"advisor", "external", "contractor", "freelance"
],
}
# Lazy-loaded module-level caches (populated on first access)
_CUSTODIAN_TYPE_MAPPING: dict[str, str] | None = None
_ROLE_CATEGORY_KEYWORDS: dict[str, list[str]] | None = None
def get_custodian_type_to_heritage_code() -> dict[str, str]:
"""Get cached custodian type to heritage code mapping."""
global _CUSTODIAN_TYPE_MAPPING
if _CUSTODIAN_TYPE_MAPPING is None:
_CUSTODIAN_TYPE_MAPPING = _get_custodian_type_mapping()
return _CUSTODIAN_TYPE_MAPPING
def get_role_category_keywords() -> dict[str, list[str]]:
"""Get cached role category keywords."""
global _ROLE_CATEGORY_KEYWORDS
if _ROLE_CATEGORY_KEYWORDS is None:
_ROLE_CATEGORY_KEYWORDS = _get_role_category_keywords()
return _ROLE_CATEGORY_KEYWORDS
def build_schema_aware_person_filter(
heritage_type_code: str | None = None,
heritage_relevant_only: bool = False,
custodian_slug: str | None = None,
only_wcms: bool = False,
) -> dict[str, Any] | None:
"""Build Qdrant filter conditions for schema-aware person search.
Args:
heritage_type_code: Single-letter heritage type code (M, A, L, etc.)
heritage_relevant_only: Only return heritage-relevant staff
custodian_slug: Filter by specific custodian
only_wcms: Only return WCMS-registered profiles (heritage sector users)
Returns:
Dict of filter conditions for Qdrant, or None if no filters
"""
filters: dict[str, Any] = {}
if heritage_type_code and heritage_type_code not in ("U", "UNKNOWN", "UNSPECIFIED"):
filters["heritage_type"] = heritage_type_code
if heritage_relevant_only:
filters["heritage_relevant"] = True
if custodian_slug:
filters["custodian_slug"] = custodian_slug
if only_wcms:
filters["has_wcms"] = True
return filters if filters else None
def filter_by_role_category_keywords(
results: list["RetrievedPerson"],
role_category: str | None,
) -> list["RetrievedPerson"]:
"""Post-filter search results by role category using headline keywords.
Since role_category is not indexed in Qdrant, we use headline keyword matching
to filter results after vector search.
Args:
results: List of RetrievedPerson from vector search
role_category: Target role category (CURATORIAL, ARCHIVAL, etc.)
Returns:
Filtered list of RetrievedPerson matching the role category
"""
if not role_category or role_category in ("UNKNOWN", "UNSPECIFIED"):
return results
keywords = get_role_category_keywords().get(role_category, [])
if not keywords:
return results
filtered = []
for person in results:
headline = (person.headline or "").lower()
# Check if any keyword matches the headline
if any(kw.lower() in headline for kw in keywords):
filtered.append(person)
# If filtering removed all results, return original (don't be too strict)
if not filtered:
logger.info(f"Role category filter '{role_category}' removed all results, returning unfiltered")
return results
logger.info(f"Role category filter '{role_category}' reduced results from {len(results)} to {len(filtered)}")
return filtered
def get_heritage_type_code(custodian_type: str | None) -> str | None:
"""Convert CustodianPrimaryTypeEnum value to single-letter heritage code.
Args:
custodian_type: Custodian type from DSPy router (e.g., "MUSEUM", "ARCHIVE")
Returns:
Single-letter heritage code (e.g., "M", "A") or None if not mappable
"""
if not custodian_type or custodian_type in ("UNKNOWN", "UNSPECIFIED"):
return None
return get_custodian_type_to_heritage_code().get(custodian_type)
class SPARQLClient:
"""Client for querying Oxigraph SPARQL endpoint."""
def __init__(
self,
endpoint: str = DEFAULT_SPARQL_ENDPOINT,
timeout: float = DEFAULT_SPARQL_TIMEOUT,
max_connections: int = 20 # Allow concurrent connections for parallel queries
):
self.endpoint = endpoint
self.timeout = timeout
self.max_connections = max_connections
self._client: httpx.Client | None = None
@property
def client(self) -> httpx.Client:
"""Lazy-initialize HTTP client with connection pooling."""
if self._client is None:
# Configure connection pool for parallel SPARQL queries
limits = httpx.Limits(
max_keepalive_connections=self.max_connections,
max_connections=self.max_connections,
keepalive_expiry=30.0 # Keep connections alive for reuse
)
self._client = httpx.Client(
timeout=self.timeout,
limits=limits,
http2=False # HTTP/1.1 is often faster for small queries
)
return self._client
def query(self, sparql: str, log_timing: bool = False) -> list[dict[str, Any]]:
"""Execute SPARQL query and return results.
Args:
sparql: SPARQL query string
log_timing: Whether to log query execution time
Returns:
List of result bindings as dictionaries
"""
full_query = SPARQL_PREFIXES + sparql
start_time = time.time() if log_timing else 0
try:
response = self.client.post(
self.endpoint,
data={"query": full_query},
headers={"Accept": "application/sparql-results+json"}
)
response.raise_for_status()
data = response.json()
bindings = data.get("results", {}).get("bindings", [])
# Convert bindings to simple dicts
results = []
for binding in bindings:
row = {}
for key, value in binding.items():
row[key] = value.get("value", "")
results.append(row)
if log_timing:
duration_ms = (time.time() - start_time) * 1000
logger.debug(f"SPARQL query completed: {len(results)} results in {duration_ms:.0f}ms")
return results
except httpx.HTTPError as e:
logger.error(f"SPARQL query failed: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error in SPARQL query: {e}")
return []
def close(self) -> None:
"""Close the HTTP client."""
if self._client:
self._client.close()
self._client = None
class HybridRetriever:
"""Hybrid retriever combining vector search with knowledge graph expansion.
The retrieval process:
1. Vector search finds semantically similar institutions
2. For each result, SPARQL expands to find related institutions:
- Institutions in the same city
- Institutions of the same type
- Institutions with related collections
3. Results are re-ranked based on combined vector + graph scores
Embedding Models:
- If OpenAI API key is available AND collection uses 1536-dim vectors: use OpenAI
- Otherwise: use sentence-transformers (all-MiniLM-L6-v2, 384-dim)
Multi-Embedding Support:
Set use_multi_embedding=True to enable support for multiple embedding models
via Qdrant's named vectors feature. This allows:
- A/B testing different embedding models
- Seamless migration between models
- Specifying which model to use per query
Args:
qdrant_host: Qdrant server hostname
qdrant_port: Qdrant REST API port
sparql_endpoint: Oxigraph SPARQL endpoint URL
vector_weight: Weight for vector similarity scores (0-1)
graph_weight: Weight for graph expansion scores (0-1)
collection_name: Qdrant collection name
embedding_model: Embedding model name (auto-detected if not specified)
k_vector: Number of initial vector search results
k_expand: Number of graph expansion results per seed
k_final: Final number of results to return
use_multi_embedding: Enable multi-embedding mode with named vectors
preferred_embedding_model: Preferred model for multi-embedding mode
"""
# Class-level type annotations for instance attributes
qdrant_host: str
qdrant_port: int
sparql_endpoint: str
vector_weight: float
graph_weight: float
collection_name: str
k_vector: int
k_expand: int
k_final: int
openai_api_key: str | None
use_production_qdrant: bool
use_multi_embedding: bool
preferred_embedding_model: str | None
sparql_client: "SPARQLClient"
embedding_model: str
# Private attributes with lazy initialization
_qdrant_client: "QdrantClient | None"
_openai_client: "OpenAI | None"
_st_model: "SentenceTransformer | None"
_use_sentence_transformers: bool
_collection_vector_size: int | None
_multi_retriever: "MultiEmbeddingRetriever | None"
_selected_multi_model: "EmbeddingModel | None"
def __init__(
self,
qdrant_host: str = "localhost",
qdrant_port: int = 6333,
sparql_endpoint: str = DEFAULT_SPARQL_ENDPOINT,
vector_weight: float = 0.7,
graph_weight: float = 0.3,
collection_name: str = "heritage_custodians",
embedding_model: str | None = None, # Auto-detect if None
k_vector: int = 10,
k_expand: int = 5,
k_final: int = 10,
openai_api_key: str | None = None,
use_production_qdrant: bool = False,
use_multi_embedding: bool = False,
preferred_embedding_model: str | None = None,
):
self.qdrant_host = qdrant_host
self.qdrant_port = qdrant_port
self.sparql_endpoint = sparql_endpoint
self.vector_weight = vector_weight
self.graph_weight = graph_weight
self.collection_name = collection_name
self.k_vector = k_vector
self.k_expand = k_expand
self.k_final = k_final
self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
self.use_production_qdrant = use_production_qdrant
self.use_multi_embedding = use_multi_embedding
self.preferred_embedding_model = preferred_embedding_model
# Initialize SPARQL client
self.sparql_client = SPARQLClient(endpoint=sparql_endpoint)
# Lazy-load Qdrant, OpenAI, and sentence-transformers clients
self._qdrant_client = None
self._openai_client = None
self._st_model = None
self._use_sentence_transformers = False
self._collection_vector_size: int | None = None
# Multi-embedding retriever (lazy-loaded)
self._multi_retriever = None
# Currently selected multi-embedding model (for multi-embedding mode)
self._selected_multi_model = None
# Determine embedding model to use
self.embedding_model = embedding_model or self._auto_detect_embedding_model()
logger.info(
f"Initialized HybridRetriever: "
f"Qdrant={qdrant_host}:{qdrant_port}, "
f"SPARQL={sparql_endpoint}, "
f"embedding_model={self.embedding_model}, "
f"multi_embedding={use_multi_embedding}, "
f"weights=vector:{vector_weight}/graph:{graph_weight}"
)
@property
def qdrant_client(self) -> "QdrantClient":
"""Lazy-load Qdrant client."""
if self._qdrant_client is None:
from qdrant_client import QdrantClient
if self.use_production_qdrant:
# Connect via HTTPS to production
self._qdrant_client = QdrantClient(
host="bronhouder.nl",
port=443,
https=True,
prefix="qdrant",
prefer_grpc=False,
timeout=30
)
else:
self._qdrant_client = QdrantClient(
host=self.qdrant_host,
port=self.qdrant_port
)
return self._qdrant_client
@property
def openai_client(self) -> "OpenAI":
"""Lazy-load OpenAI client."""
if self._openai_client is None:
if not self.openai_api_key:
raise RuntimeError(
"OpenAI API key not available. Set OPENAI_API_KEY or use sentence-transformers."
)
import openai
self._openai_client = openai.OpenAI(api_key=self.openai_api_key)
return self._openai_client
def _get_collection_vector_size(self) -> int | None:
"""Get the vector size of the Qdrant collection."""
try:
info = self.qdrant_client.get_collection(self.collection_name)
if hasattr(info.config.params, 'vectors'):
vectors_config = info.config.params.vectors
if isinstance(vectors_config, dict):
# Named vectors
first_config = next(iter(vectors_config.values()), None)
return first_config.size if first_config else None
elif vectors_config is not None:
# Single vector config
return vectors_config.size
return None
except Exception as e:
logger.warning(f"Could not get collection vector size: {e}")
return None
def _auto_detect_embedding_model(self) -> str:
"""Auto-detect which embedding model to use based on collection and available APIs.
Detection priority:
1. Check main collection (heritage_custodians) vector size
2. If main collection doesn't exist, check heritage_persons collection
3. If OpenAI key available and collection uses 1536-dim, use OpenAI
4. Otherwise use sentence-transformers (384-dim, all-MiniLM-L6-v2)
"""
# Check main collection vector size first
vector_size = self._get_collection_vector_size()
self._collection_vector_size = vector_size
# If main collection doesn't exist, try heritage_persons collection
if vector_size is None:
logger.info(f"Collection '{self.collection_name}' not found, checking heritage_persons")
person_vector_size = self._get_person_collection_vector_size()
if person_vector_size:
vector_size = person_vector_size
logger.info(f"Using heritage_persons collection vector size: {vector_size}")
if vector_size == 384:
# Collection uses sentence-transformers dimensions
self._use_sentence_transformers = True
logger.info("Auto-detected 384-dim vectors, using sentence-transformers")
return "all-MiniLM-L6-v2"
elif vector_size == 1536 and self.openai_api_key:
# Collection uses OpenAI dimensions and we have API key
self._use_sentence_transformers = False
logger.info("Auto-detected 1536-dim vectors with OpenAI key, using OpenAI")
return "text-embedding-3-small"
elif self.openai_api_key:
# Default to OpenAI if we have key
self._use_sentence_transformers = False
return "text-embedding-3-small"
else:
# Fallback to sentence-transformers
self._use_sentence_transformers = True
logger.info("No OpenAI key, falling back to sentence-transformers")
return "all-MiniLM-L6-v2"
def _load_sentence_transformer(self) -> "SentenceTransformer":
"""Lazy-load sentence-transformers model."""
if self._st_model is None:
try:
from sentence_transformers import SentenceTransformer
self._st_model = SentenceTransformer(self.embedding_model)
logger.info(f"Loaded sentence-transformers model: {self.embedding_model}")
except ImportError:
raise RuntimeError("sentence-transformers not installed. Run: pip install sentence-transformers")
return self._st_model
@property
def multi_retriever(self) -> "MultiEmbeddingRetriever | None":
"""Lazy-load MultiEmbeddingRetriever when multi-embedding mode is enabled.
Returns:
MultiEmbeddingRetriever instance or None if not in multi-embedding mode
"""
if not self.use_multi_embedding:
return None
if self._multi_retriever is None:
from glam_extractor.api.multi_embedding_retriever import (
MultiEmbeddingRetriever,
MultiEmbeddingConfig,
EmbeddingModel,
)
# Create config matching current settings
config = MultiEmbeddingConfig(
qdrant_host=self.qdrant_host,
qdrant_port=self.qdrant_port,
qdrant_https=self.use_production_qdrant,
qdrant_prefix="qdrant" if self.use_production_qdrant else None,
openai_api_key=self.openai_api_key,
institutions_collection=self.collection_name,
)
self._multi_retriever = MultiEmbeddingRetriever(config)
# Auto-select model if not specified
if self.preferred_embedding_model:
try:
self._selected_multi_model = EmbeddingModel(self.preferred_embedding_model)
except ValueError:
logger.warning(f"Unknown embedding model: {self.preferred_embedding_model}")
assert self._multi_retriever is not None # Set above
self._selected_multi_model = self._multi_retriever.select_model(self.collection_name)
else:
assert self._multi_retriever is not None # Set above
self._selected_multi_model = self._multi_retriever.select_model(self.collection_name)
logger.info(f"MultiEmbeddingRetriever initialized, selected model: {self._selected_multi_model}")
return self._multi_retriever
def _get_embedding(self, text: str, using: str | None = None) -> list[float]:
"""Get embedding vector for text using the appropriate model.
Args:
text: Text to embed
using: Optional embedding model name (for multi-embedding mode)
Returns:
Embedding vector as list of floats
"""
# If multi-embedding mode, delegate to MultiEmbeddingRetriever
if self.use_multi_embedding and self.multi_retriever:
from glam_extractor.api.multi_embedding_retriever import EmbeddingModel
# Determine which model to use
if using:
try:
model = EmbeddingModel(using)
except ValueError:
logger.warning(f"Unknown model '{using}', using default")
model = self._selected_multi_model
else:
model = self._selected_multi_model
if model:
return self.multi_retriever.get_embedding(text, model)
else:
# Fallback to legacy mode
logger.warning("No multi-embedding model available, falling back to legacy")
# Legacy single-model embedding
if self._use_sentence_transformers:
model = self._load_sentence_transformer()
embedding = model.encode(text)
return embedding.tolist()
else:
response = self.openai_client.embeddings.create(
input=text,
model=self.embedding_model
)
return response.data[0].embedding
def _vector_search(
self,
query: str,
k: int,
using: str | None = None,
region_codes: list[str] | None = None,
cities: list[str] | None = None,
institution_types: list[str] | None = None,
use_polygon_filter: bool = True,
) -> list[RetrievedInstitution]:
"""Perform vector similarity search in Qdrant.
Args:
query: Search query text
k: Number of results to retrieve
using: Optional embedding model name (for multi-embedding mode)
region_codes: Optional list of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
cities: Optional list of city names (e.g., ["Amsterdam", "Rotterdam"])
institution_types: Optional list of institution types (e.g., ["ARCHIVE", "MUSEUM"])
use_polygon_filter: If True, apply polygon-based geographic filtering
using actual province boundaries (default: True)
Returns:
List of RetrievedInstitution with vector scores
"""
query_vector = self._get_embedding(query, using=using)
# When polygon filtering is enabled and regions are specified,
# over-fetch to ensure we have enough results after polygon filtering
effective_limit = k
if use_polygon_filter and region_codes:
effective_limit = k * 3 # Over-fetch 3x for polygon filtering
logger.debug(f"Over-fetching {effective_limit} results for polygon filtering")
# Build query parameters
search_params = {
"collection_name": self.collection_name,
"query": query_vector,
"limit": effective_limit,
"with_payload": True,
}
# Build geographic/type filter if any criteria provided
# NOTE: Always apply region metadata filter to Qdrant first to get relevant results.
# The polygon filter (if enabled) is an additional precision filter applied afterward.
# Previously we disabled metadata region filter when polygon filter was enabled,
# but this caused vector search to return results from wrong regions.
if region_codes or cities or institution_types:
from glam_extractor.ontology.qdrant_filters import QdrantFilterBuilder
# Convert institution types from full names (LIBRARY, MUSEUM) to single-letter codes (L, M)
# because Qdrant stores institution_type as single-letter codes per GLAMORCUBESFIXPHDNT
type_codes = None
if institution_types:
type_mapping = get_custodian_type_to_heritage_code()
type_codes = [type_mapping.get(t, t) for t in institution_types]
# Filter out any that didn't map (keep original if 1 char already)
type_codes = [c for c in type_codes if c and len(c) == 1]
logger.debug(f"Converted institution types: {institution_types} -> {type_codes}")
builder = QdrantFilterBuilder()
filter_dict = builder.combined_filter(
primary_types=type_codes, # Use single-letter codes
region_codes=region_codes, # Always apply region filter to get relevant results
cities=cities,
combine_mode="must",
)
if filter_dict:
query_filter = QdrantFilterBuilder.to_qdrant_models(filter_dict)
search_params["query_filter"] = query_filter
logger.info(
f"Applied Qdrant filter: types={type_codes}, "
f"regions={region_codes}, cities={cities}"
)
# Add named vector 'using' ONLY if collection actually has named vectors
# Single-vector collections will error with "Not existing vector name" otherwise
if self.use_multi_embedding and self.multi_retriever:
uses_named = self.multi_retriever.uses_named_vectors(self.collection_name)
if uses_named:
if using:
search_params["using"] = using
elif self._selected_multi_model:
search_params["using"] = self._selected_multi_model.value
# else: single-vector collection, don't add 'using' parameter
results = self.qdrant_client.query_points(**search_params)
institutions = []
for point in results.points:
payload = point.payload or {}
inst = RetrievedInstitution(
ghcid=payload.get("ghcid", ""),
name=payload.get("name", ""),
uri=payload.get("uri", f"https://nde.nl/ontology/hc/custodian/{payload.get('ghcid', '')}"),
vector_score=point.score,
institution_type=payload.get("institution_type"),
country=payload.get("country"),
city=payload.get("city"),
description=payload.get("text", "")[:200] if payload.get("text") else None,
latitude=payload.get("latitude"),
longitude=payload.get("longitude"),
)
institutions.append(inst)
# Apply polygon-based geographic filtering if enabled and regions specified
if use_polygon_filter and region_codes and institutions:
institutions = self._apply_polygon_filter(institutions, region_codes, k)
return institutions
def _apply_polygon_filter(
self,
institutions: list[RetrievedInstitution],
region_codes: list[str],
k: int,
) -> list[RetrievedInstitution]:
"""Filter institutions by polygon containment in specified regions.
Uses actual province boundary polygons to ensure results are
geographically within the requested regions, not just metadata matching.
Args:
institutions: List of retrieved institutions with lat/lon
region_codes: List of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
k: Maximum number of results to return
Returns:
Filtered list of institutions within the specified regions
"""
polygon_filter = get_polygon_filter()
# Handle case where polygon filter module is not available or not loaded
if polygon_filter is None:
logger.warning("Polygon filter not available, skipping geographic filtering")
return institutions[:k]
if not polygon_filter.is_loaded:
logger.warning("Polygon filter not loaded, skipping geographic filtering")
return institutions[:k]
filtered = []
for inst in institutions:
if inst.latitude is None or inst.longitude is None:
# No coordinates, check if metadata region matches
if inst.country == "NL":
# For Dutch institutions without coords, fallback to metadata
# Extract region from GHCID (format: NL-{REGION}-...)
if inst.ghcid and len(inst.ghcid) > 3:
ghcid_region = inst.ghcid.split("-")[1] if "-" in inst.ghcid else None
if ghcid_region and ghcid_region.upper() in [r.upper() for r in region_codes]:
filtered.append(inst)
continue
# Check if point is within any of the requested regions
for region_code in region_codes:
if polygon_filter.point_in_province(inst.latitude, inst.longitude, region_code):
filtered.append(inst)
break # Don't add same institution multiple times
logger.info(
f"Polygon filter: {len(filtered)}/{len(institutions)} institutions "
f"in regions {region_codes}"
)
# Return up to k results
return filtered[:k]
def _build_batched_expansion_query(
self,
seed_institutions: list[RetrievedInstitution],
exclude_ghcids: set[str],
limit_per_expansion: int = 5
) -> tuple[str, dict[str, dict]]:
"""Build a single SPARQL query with UNION clauses for all expansions.
DEDUPLICATES by city code and type+country to avoid redundant query patterns.
For example, if 5 seeds are all from Amsterdam with type MUSEUM, we only
create ONE city expansion (for AMS) and ONE type expansion (for NL + M),
not 10 redundant UNIONs.
Args:
seed_institutions: Seed institutions to expand from
exclude_ghcids: GHCIDs to exclude from results
limit_per_expansion: Max results per expansion type
Returns:
Tuple of (SPARQL query string, expansion_metadata dict)
expansion_metadata maps expansion_key -> {seed, type, city/type_code}
"""
unions = []
expansion_metadata = {}
# Track unique patterns to avoid duplicate queries
seen_city_codes: set[str] = set()
seen_type_patterns: set[str] = set() # "country-type_code" pattern
seeds_to_expand = seed_institutions[:5]
city_idx = 0
type_idx = 0
for seed in seeds_to_expand:
# City expansion - deduplicate by city code
if seed.city:
city_code = seed.city[:3].upper()
if city_code not in seen_city_codes:
seen_city_codes.add(city_code)
expansion_key = f"city_{city_idx}"
city_idx += 1
unions.append(f"""
{{
SELECT ?s ?name ?ghcid ?type ("{expansion_key}" AS ?expansion_key) WHERE {{
?s a hcc:Custodian ;
skos:prefLabel ?name ;
hc:ghcid ?ghcid .
FILTER(CONTAINS(?ghcid, "-{city_code}-"))
OPTIONAL {{ ?s hc:institutionType ?type }}
}}
LIMIT {limit_per_expansion + len(exclude_ghcids)}
}}
""")
expansion_metadata[expansion_key] = {
"seed": seed,
"type": "city",
"city": seed.city,
"city_code": city_code
}
# Type expansion - deduplicate by country + type_code pattern
if seed.institution_type and seed.country:
type_code = get_custodian_type_to_heritage_code().get(seed.institution_type, "")
if type_code:
pattern_key = f"{seed.country}-{type_code}"
if pattern_key not in seen_type_patterns:
seen_type_patterns.add(pattern_key)
expansion_key = f"type_{type_idx}"
type_idx += 1
unions.append(f"""
{{
SELECT ?s ?name ?ghcid ?city ("{expansion_key}" AS ?expansion_key) WHERE {{
?s a hcc:Custodian ;
skos:prefLabel ?name ;
hc:ghcid ?ghcid .
FILTER(STRSTARTS(?ghcid, "{seed.country}-"))
FILTER(CONTAINS(?ghcid, "-{type_code}-"))
OPTIONAL {{ ?s schema:location ?city }}
}}
LIMIT {limit_per_expansion + len(exclude_ghcids)}
}}
""")
expansion_metadata[expansion_key] = {
"seed": seed,
"type": "type",
"institution_type": seed.institution_type,
"type_code": type_code,
"country": seed.country
}
if not unions:
return "", {}
# Log deduplication stats
logger.info(f"Batched SPARQL: {len(unions)} UNIONs (deduplicated from max {len(seeds_to_expand) * 2}). "
f"Unique cities: {seen_city_codes}, Unique types: {seen_type_patterns}")
# Combine all unions into a single query
query = f"""
SELECT ?s ?name ?ghcid ?type ?city ?expansion_key WHERE {{
{" UNION ".join(unions)}
}}
"""
return query, expansion_metadata
def _graph_expand_batched(
self,
seed_institutions: list[RetrievedInstitution]
) -> list[RetrievedInstitution]:
"""Expand seed results using a SINGLE batched SPARQL query.
This is a significant optimization over the parallel ThreadPoolExecutor
approach. Instead of 10 HTTP requests (even in parallel), we execute
ONE SPARQL query with UNION clauses.
Performance comparison:
- Sequential: 10 queries × ~100ms = 4+ seconds
- Parallel (ThreadPool): ~500ms-1s (limited by GIL/connection pool)
- Batched (this method): ONE query ~150-300ms
Args:
seed_institutions: Initial vector search results
Returns:
Additional institutions found via graph expansion
"""
start_time = time.time()
exclude_ghcids = {inst.ghcid for inst in seed_institutions}
expanded = []
seen_ghcids = set(exclude_ghcids)
# Build batched query
query, expansion_metadata = self._build_batched_expansion_query(
seed_institutions, exclude_ghcids, limit_per_expansion=self.k_expand
)
if not query:
logger.debug("No graph expansion tasks to execute")
return expanded
# Execute single batched query
query_start = time.time()
results = self.sparql_client.query(query)
query_duration = (time.time() - query_start) * 1000
logger.debug(f"Batched SPARQL query: {len(results)} raw results in {query_duration:.0f}ms")
# Group results by expansion_key
results_by_expansion: dict[str, list[dict]] = {}
for row in results:
exp_key = row.get("expansion_key", "")
if exp_key:
if exp_key not in results_by_expansion:
results_by_expansion[exp_key] = []
results_by_expansion[exp_key].append(row)
# Process results, filtering and creating RetrievedInstitution objects
for exp_key, rows in results_by_expansion.items():
if exp_key not in expansion_metadata:
continue
meta = expansion_metadata[exp_key]
seed = meta["seed"]
exp_type = meta["type"]
count = 0
for row in rows:
ghcid = row.get("ghcid", "")
if not ghcid or ghcid in seen_ghcids:
continue
if count >= self.k_expand:
break
seen_ghcids.add(ghcid)
count += 1
if exp_type == "city":
expanded.append(RetrievedInstitution(
ghcid=ghcid,
name=row.get("name", ""),
uri=row.get("s", ""),
graph_score=0.8, # High score for same city
institution_type=row.get("type"),
expansion_reason="same_city",
related_institutions=[seed.ghcid]
))
elif exp_type == "type":
expanded.append(RetrievedInstitution(
ghcid=ghcid,
name=row.get("name", ""),
uri=row.get("s", ""),
graph_score=0.5, # Medium score for same type
institution_type=seed.institution_type,
city=row.get("city"),
expansion_reason="same_type",
related_institutions=[seed.ghcid]
))
logger.debug(f"Expansion {exp_key}: {count} results for {seed.ghcid}")
total_time = (time.time() - start_time) * 1000
logger.info(f"Graph expansion (batched): 1 query, {len(results)} raw results, "
f"{len(expanded)} expanded in {total_time:.0f}ms")
return expanded
def _expand_by_city(self, city: str, exclude_ghcids: set[str], limit: int = 5) -> list[dict]:
"""Find other institutions in the same city via SPARQL.
Note: This method is kept for backwards compatibility and direct calls.
For batch operations, use _graph_expand_batched() instead.
Args:
city: City name to search for
exclude_ghcids: GHCIDs to exclude from results
limit: Maximum number of results
Returns:
List of institution data dicts
"""
if not city:
return []
query = f"""
SELECT ?s ?name ?ghcid ?type WHERE {{
?s a hcc:Custodian ;
skos:prefLabel ?name ;
hc:ghcid ?ghcid .
# Match city in GHCID (format: CC-RR-CCC-T-ABBR)
FILTER(CONTAINS(?ghcid, "-{city[:3].upper()}-"))
OPTIONAL {{ ?s hc:institutionType ?type }}
}}
LIMIT {limit + len(exclude_ghcids)}
"""
results = self.sparql_client.query(query)
# Filter out excluded GHCIDs
filtered = []
for row in results:
ghcid = row.get("ghcid", "")
if ghcid not in exclude_ghcids:
filtered.append(row)
if len(filtered) >= limit:
break
return filtered
def _expand_by_type(self, institution_type: str, country: str, exclude_ghcids: set[str], limit: int = 5) -> list[dict]:
"""Find other institutions of the same type in the same country.
Args:
institution_type: Institution type (MUSEUM, LIBRARY, etc.)
country: Country code (ISO 3166-1 alpha-2)
exclude_ghcids: GHCIDs to exclude
limit: Maximum number of results
Returns:
List of institution data dicts
"""
if not institution_type:
return []
# Map institution type to GHCID type code using dynamic schema mapping
type_code = get_custodian_type_to_heritage_code().get(institution_type, "")
if not type_code or not country:
return []
query = f"""
SELECT ?s ?name ?ghcid ?city WHERE {{
?s a hcc:Custodian ;
skos:prefLabel ?name ;
hc:ghcid ?ghcid .
# Match country and type in GHCID
FILTER(STRSTARTS(?ghcid, "{country}-"))
FILTER(CONTAINS(?ghcid, "-{type_code}-"))
OPTIONAL {{ ?s schema:location ?city }}
}}
LIMIT {limit + len(exclude_ghcids)}
"""
results = self.sparql_client.query(query)
filtered = []
for row in results:
ghcid = row.get("ghcid", "")
if ghcid not in exclude_ghcids:
filtered.append(row)
if len(filtered) >= limit:
break
return filtered
def _expand_by_wikidata_country(self, wikidata_country: str, exclude_ghcids: set[str], limit: int = 5) -> list[dict]:
"""Find institutions in the same country using Wikidata P17 property.
Args:
wikidata_country: Wikidata entity ID for country (e.g., Q55 for Netherlands)
exclude_ghcids: GHCIDs to exclude
limit: Maximum number of results
Returns:
List of institution data dicts
"""
if not wikidata_country:
return []
query = f"""
SELECT ?s ?name ?ghcid ?type WHERE {{
?s a hcc:Custodian ;
skos:prefLabel ?name ;
hc:ghcid ?ghcid ;
wdt:P17 wd:{wikidata_country} .
OPTIONAL {{ ?s hc:institutionType ?type }}
}}
LIMIT {limit + len(exclude_ghcids)}
"""
results = self.sparql_client.query(query)
filtered = []
for row in results:
ghcid = row.get("ghcid", "")
if ghcid not in exclude_ghcids:
filtered.append(row)
if len(filtered) >= limit:
break
return filtered
def _graph_expand(
self,
seed_institutions: list[RetrievedInstitution],
use_batched: bool = True
) -> list[RetrievedInstitution]:
"""Expand seed results using knowledge graph relationships.
By default uses batched SPARQL (single query with UNION) for best performance.
Falls back to parallel ThreadPoolExecutor if batched fails.
Performance comparison:
- Sequential: 10 queries × ~100ms = 4+ seconds
- Parallel (ThreadPool): ~500ms-3s (limited by GIL/connection pool)
- Batched (UNION query): ONE query ~150-300ms ← DEFAULT
Args:
seed_institutions: Initial vector search results
use_batched: If True (default), use batched SPARQL query.
If False, use parallel ThreadPoolExecutor.
Returns:
Additional institutions found via graph expansion
"""
if use_batched:
try:
return self._graph_expand_batched(seed_institutions)
except Exception as e:
logger.warning(f"Batched graph expansion failed, falling back to parallel: {e}")
# Fall through to parallel implementation
return self._graph_expand_parallel(seed_institutions)
def _graph_expand_parallel(
self,
seed_institutions: list[RetrievedInstitution]
) -> list[RetrievedInstitution]:
"""Expand seed results using parallel SPARQL queries (fallback method).
Uses ThreadPoolExecutor to parallelize SPARQL queries. This is slower than
the batched approach but serves as a fallback.
Args:
seed_institutions: Initial vector search results
Returns:
Additional institutions found via graph expansion
"""
start_time = time.time()
exclude_ghcids = {inst.ghcid for inst in seed_institutions}
expanded = []
seen_ghcids = set(exclude_ghcids)
# Prepare all expansion tasks
# Each task is a tuple: (task_type, seed, query_params)
tasks = []
seeds_to_expand = seed_institutions[:5] # Expand top 5 seeds
for seed in seeds_to_expand:
# City expansion task
if seed.city:
tasks.append(("city", seed, {"city": seed.city}))
# Type expansion task
if seed.institution_type and seed.country:
tasks.append(("type", seed, {
"institution_type": seed.institution_type,
"country": seed.country
}))
if not tasks:
logger.debug("No graph expansion tasks to execute")
return expanded
# Execute SPARQL queries in parallel
# Use min(10, len(tasks)) workers to avoid over-parallelization
max_workers = min(10, len(tasks))
def execute_expansion(task):
"""Execute a single expansion task and return results with metadata."""
task_type, seed, params = task
task_start = time.time()
try:
if task_type == "city":
results = self._expand_by_city(
params["city"], exclude_ghcids, limit=self.k_expand
)
return {
"task_type": task_type,
"seed": seed,
"results": results,
"duration_ms": (time.time() - task_start) * 1000
}
elif task_type == "type":
results = self._expand_by_type(
params["institution_type"],
params["country"],
exclude_ghcids,
limit=self.k_expand
)
return {
"task_type": task_type,
"seed": seed,
"results": results,
"duration_ms": (time.time() - task_start) * 1000
}
except Exception as e:
logger.warning(f"Graph expansion task failed: {task_type} for {seed.ghcid}: {e}")
return {
"task_type": task_type,
"seed": seed,
"results": [],
"duration_ms": (time.time() - task_start) * 1000,
"error": str(e)
}
# Run all tasks in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(execute_expansion, task): task for task in tasks}
for future in as_completed(futures):
result = future.result()
if result is None:
continue
task_type = result["task_type"]
seed = result["seed"]
rows = result["results"]
duration = result.get("duration_ms", 0)
logger.debug(f"Graph expansion {task_type} for {seed.ghcid}: "
f"{len(rows)} results in {duration:.0f}ms")
# Process results based on task type
if task_type == "city":
for row in rows:
ghcid = row.get("ghcid", "")
if ghcid and ghcid not in seen_ghcids:
seen_ghcids.add(ghcid)
expanded.append(RetrievedInstitution(
ghcid=ghcid,
name=row.get("name", ""),
uri=row.get("s", ""),
graph_score=0.8, # High score for same city
institution_type=row.get("type"),
expansion_reason="same_city",
related_institutions=[seed.ghcid]
))
elif task_type == "type":
for row in rows:
ghcid = row.get("ghcid", "")
if ghcid and ghcid not in seen_ghcids:
seen_ghcids.add(ghcid)
expanded.append(RetrievedInstitution(
ghcid=ghcid,
name=row.get("name", ""),
uri=row.get("s", ""),
graph_score=0.5, # Medium score for same type
institution_type=seed.institution_type,
city=row.get("city"),
expansion_reason="same_type",
related_institutions=[seed.ghcid]
))
total_time = (time.time() - start_time) * 1000
logger.info(f"Graph expansion completed: {len(tasks)} queries, "
f"{len(expanded)} results in {total_time:.0f}ms (parallel)")
return expanded
def _combine_and_rank(
self,
vector_results: list[RetrievedInstitution],
graph_results: list[RetrievedInstitution],
k: int
) -> list[RetrievedInstitution]:
"""Combine vector and graph results with weighted scoring and graph inheritance.
This method implements a hybrid scoring approach:
1. Direct merge: If a graph result matches a vector result (same GHCID),
the graph_score is directly applied
2. Graph inheritance: Vector results inherit a portion of graph scores from
related institutions found via graph expansion (same city/type)
Args:
vector_results: Results from vector search
graph_results: Results from graph expansion
k: Number of final results
Returns:
Combined and ranked results
"""
# Debug logging for investigation
logger.debug(f"Combining {len(vector_results)} vector + {len(graph_results)} graph results")
# Create lookup by GHCID for merging
results_by_ghcid: dict[str, RetrievedInstitution] = {}
# Track which vector GHCIDs we have for inheritance
vector_ghcids = set()
# Add vector results
for inst in vector_results:
if inst.ghcid:
results_by_ghcid[inst.ghcid] = inst
vector_ghcids.add(inst.ghcid)
logger.debug(f" Vector: {inst.ghcid} ({inst.name[:30] if inst.name else '?'}...) "
f"v={inst.vector_score:.3f} g={inst.graph_score:.3f}")
# Track direct merges and inheritance candidates
direct_merges = 0
inheritance_boosts = []
# Merge graph results and build inheritance map
# inheritance_map: vector_ghcid -> list of (related_ghcid, graph_score, reason)
inheritance_map: dict[str, list[tuple[str, float, str]]] = {g: [] for g in vector_ghcids}
for inst in graph_results:
logger.debug(f" Graph: {inst.ghcid} ({inst.name[:30] if inst.name else '?'}...) "
f"g={inst.graph_score:.3f} reason={inst.expansion_reason} "
f"related_to={inst.related_institutions}")
if inst.ghcid in results_by_ghcid:
# Direct merge: graph result matches existing vector result
existing = results_by_ghcid[inst.ghcid]
old_graph_score = existing.graph_score
existing.graph_score = max(existing.graph_score, inst.graph_score)
existing.related_institutions.extend(inst.related_institutions)
if inst.expansion_reason:
existing.expansion_reason = inst.expansion_reason
direct_merges += 1
logger.debug(f" -> Direct merge! {inst.ghcid} graph_score: {old_graph_score:.3f} -> {existing.graph_score:.3f}")
else:
# New institution from graph expansion
results_by_ghcid[inst.ghcid] = inst
# Build inheritance: this graph result was expanded FROM a vector result
# The related_institutions field contains the seed GHCID(s) it was expanded from
for seed_ghcid in inst.related_institutions:
if seed_ghcid in inheritance_map:
inheritance_map[seed_ghcid].append(
(inst.ghcid, inst.graph_score, inst.expansion_reason or "related")
)
logger.debug(f"Direct merges: {direct_merges}")
# Apply graph score inheritance to vector results
# Vector results inherit a portion of graph scores from their related institutions
INHERITANCE_FACTOR = 0.5 # Inherit 50% of related institutions' graph scores
for vector_ghcid, related_list in inheritance_map.items():
if related_list and vector_ghcid in results_by_ghcid:
inst = results_by_ghcid[vector_ghcid]
# Calculate inherited score: average of related graph scores * inheritance factor
related_scores = [score for _, score, _ in related_list]
inherited_score = (sum(related_scores) / len(related_scores)) * INHERITANCE_FACTOR
old_graph_score = inst.graph_score
# Inherit: take max of current graph_score and inherited score
inst.graph_score = max(inst.graph_score, inherited_score)
if inst.graph_score > old_graph_score:
# Track related institutions for context
related_ghcids = [ghcid for ghcid, _, _ in related_list]
inst.related_institutions.extend(related_ghcids[:3]) # Add up to 3 related
inheritance_boosts.append({
"ghcid": vector_ghcid,
"name": inst.name,
"old_graph": old_graph_score,
"new_graph": inst.graph_score,
"inherited_from": len(related_list),
"reasons": list(set(r for _, _, r in related_list))
})
logger.debug(f" Inheritance: {vector_ghcid} graph_score: {old_graph_score:.3f} -> "
f"{inst.graph_score:.3f} (from {len(related_list)} related institutions)")
if inheritance_boosts:
logger.info(f"Graph inheritance applied to {len(inheritance_boosts)} vector results: "
f"{[b['ghcid'] for b in inheritance_boosts[:3]]}...")
# Calculate combined scores
for inst in results_by_ghcid.values():
inst.combined_score = (
self.vector_weight * inst.vector_score +
self.graph_weight * inst.graph_score
)
# Sort by combined score
ranked = sorted(
results_by_ghcid.values(),
key=lambda x: x.combined_score,
reverse=True
)
# Log top results for debugging
logger.debug(f"Top {min(5, len(ranked))} combined results:")
for i, inst in enumerate(ranked[:5]):
logger.debug(f" {i+1}. {inst.ghcid} ({inst.name[:25] if inst.name else '?'}...) "
f"combined={inst.combined_score:.3f} (v={inst.vector_score:.3f}, g={inst.graph_score:.3f})")
return ranked[:k]
def _get_person_collection_vector_size(self) -> int | None:
"""Get the vector size of the person collection."""
try:
info = self.qdrant_client.get_collection("heritage_persons")
if hasattr(info.config.params, 'vectors'):
vectors_config = info.config.params.vectors
if isinstance(vectors_config, dict):
first_config = next(iter(vectors_config.values()), None)
return first_config.size if first_config else None
elif vectors_config is not None:
return vectors_config.size # type: ignore[union-attr]
return None
except Exception as e:
logger.warning(f"Could not get person collection vector size: {e}")
return None
def _person_vector_search(
self,
query: str,
k: int,
using: str | None = None,
filter_conditions: dict[str, Any] | None = None,
) -> list[RetrievedPerson]:
"""Perform vector similarity search in Qdrant heritage_persons collection.
Args:
query: Search query text
k: Number of results to retrieve
using: Optional embedding model name (for multi-embedding mode)
filter_conditions: Optional dict of field->value filters for Qdrant
Returns:
List of RetrievedPerson with vector scores
"""
from qdrant_client.http import models
# Check person collection vector size and use appropriate model
person_vector_size = self._get_person_collection_vector_size()
person_model = using
if person_vector_size == 384 and not using:
# Person collection uses MiniLM (384-dim), override model selection
person_model = "minilm_384"
logger.info(f"Person collection uses 384-dim vectors, using MiniLM model")
elif person_vector_size == 1536 and not using:
person_model = "openai_1536"
elif person_vector_size == 768 and not using:
person_model = "bge_768"
query_vector = self._get_embedding(query, using=person_model)
try:
# Build query parameters
search_params: dict[str, Any] = {
"collection_name": "heritage_persons",
"query": query_vector,
"limit": k,
"with_payload": True,
}
# Add named vector 'using' ONLY if collection actually has named vectors
# Single-vector collections will error with "Not existing vector name" otherwise
if self.use_multi_embedding and self.multi_retriever:
uses_named = self.multi_retriever.uses_named_vectors("heritage_persons")
if uses_named:
if using:
search_params["using"] = using
elif self._selected_multi_model:
search_params["using"] = self._selected_multi_model.value
# else: single-vector collection, don't add 'using' parameter
# Add schema-aware filters if provided
if filter_conditions:
filter_list = []
for key, value in filter_conditions.items():
# Handle advanced match filters (e.g. {"email": {"match": {"text": "nos"}}})
if isinstance(value, dict) and "match" in value:
filter_list.append(
models.FieldCondition(
key=key,
match=models.MatchText(**value["match"])
)
)
else:
# Standard exact match value
filter_list.append(
models.FieldCondition(
key=key,
match=models.MatchValue(value=value),
)
)
search_params["query_filter"] = models.Filter(must=filter_list)
logger.info(f"[Qdrant] Applied person filters: {filter_conditions}")
logger.info(f"[Qdrant] Searching '{search_params['collection_name']}' with params: query_filter={filter_conditions}, limit={k}")
results = self.qdrant_client.query_points(**search_params)
except Exception as e:
logger.warning(f"Person collection search failed: {e}")
return []
persons = []
for point in results.points:
payload = point.payload or {}
# Extract richness score from payload (indexed by index_persons_qdrant.py)
richness_score = payload.get("richness_score", 0.0)
person = RetrievedPerson(
person_id=payload.get("staff_id", "") or hashlib.md5(
f"{payload.get('custodian_slug', '')}:{payload.get('name', '')}".encode()
).hexdigest()[:16],
name=payload.get("name", ""),
vector_score=point.score,
richness_score=richness_score,
headline=payload.get("headline"),
custodian_name=payload.get("custodian_name"),
custodian_slug=payload.get("custodian_slug"),
location=payload.get("location"),
heritage_relevant=payload.get("heritage_relevant", False),
heritage_type=payload.get("heritage_type"),
source_type=payload.get("source_type"),
linkedin_url=payload.get("linkedin_url"),
has_wcms=payload.get("has_wcms", False),
# WCMS-specific fields
wcms_user_id=payload.get("wcms_user_id"),
wcms_abs_id=payload.get("wcms_abs_id"),
wcms_crm_id=payload.get("wcms_crm_id"),
wcms_username=payload.get("wcms_username"),
wcms_username_url=payload.get("wcms_username_url"),
wcms_status=payload.get("wcms_status"),
wcms_roles=payload.get("wcms_roles"),
wcms_registered_since=payload.get("wcms_registered_since"),
wcms_last_access=payload.get("wcms_last_access"),
# Contact details
email=payload.get("email"),
email_domain=payload.get("email_domain"),
)
# Apply richness score boosting
# Formula: combined_score = vector_score * (0.7 + 0.3 * richness_score)
# - Profiles with richness_score=0 get 70% of vector score
# - Profiles with richness_score=1 get 100% of vector score
# This ensures rich profiles rank higher than sparse ones at similar similarity
richness_boost = 0.7 + 0.3 * richness_score
person.combined_score = person.vector_score * richness_boost
# Apply name-matching boost for queries that look like person names
# This ensures that searching for "Kitty Bogte" returns Kitty Bogte first,
# even if vector similarity ranks other Dutch names higher
if looks_like_person_name(query) and person.name:
name_boost = calculate_name_match_boost(query, person.name)
if name_boost > 1.0:
logger.debug(f"Name match boost {name_boost}x for '{person.name}' (query: '{query}')")
person.combined_score *= name_boost
persons.append(person)
# Re-sort by combined score after name boosting
persons.sort(key=lambda p: p.combined_score, reverse=True)
return persons
def search_persons(
self,
query: str,
k: int | None = None,
filter_custodian: str | None = None,
only_heritage_relevant: bool = False,
only_wcms: bool = False,
using: str | None = None,
# Schema-aware filter parameters (from DSPy HeritageQueryRouter)
target_role_category: str | None = None,
target_custodian_type: str | None = None,
# Extra filters for robust domain search (e.g. email substring)
extra_filters: dict[str, Any] | None = None,
) -> list[RetrievedPerson]:
"""Search for persons/staff in the heritage_persons collection.
Args:
query: Natural language search query
k: Number of results to return (default: k_final)
filter_custodian: Optional custodian slug to filter by
only_heritage_relevant: Only return heritage-relevant staff
only_wcms: Only return WCMS-registered profiles (heritage sector users)
using: Optional embedding model name (for multi-embedding mode).
One of: "openai_1536", "minilm_384", "bge_768"
target_role_category: Role category from DSPy router (CURATORIAL, ARCHIVAL, etc.)
Used for headline-based post-filtering since not indexed in Qdrant.
target_custodian_type: Custodian type from DSPy router (MUSEUM, ARCHIVE, etc.)
Converted to heritage_type code for Qdrant filtering.
extra_filters: Optional extra Qdrant filters (e.g. {"email": {"match": {"text": "nos"}}})
Returns:
List of RetrievedPerson with scores
"""
k = k or self.k_final
# Build Qdrant filter conditions from schema-aware parameters
heritage_type_code = get_heritage_type_code(target_custodian_type)
filter_conditions = build_schema_aware_person_filter(
heritage_type_code=heritage_type_code,
heritage_relevant_only=only_heritage_relevant,
custodian_slug=filter_custodian,
only_wcms=only_wcms,
) or {}
# Merge extra filters if provided (e.g. email match)
if extra_filters:
filter_conditions.update(extra_filters)
if not filter_conditions:
filter_conditions = None
logger.info(f"Person search for: {query[:50]}... (model: {using or 'auto'}, role_category: {target_role_category}, custodian_type: {target_custodian_type}, extras: {extra_filters})")
# Over-fetch to allow for post-filtering and name boosting
# - Base multiplier: 2x for general queries
# - Role category filter: 3x (need more candidates for keyword filtering)
# - Name queries: fetch minimum 100 to ensure name boost can find exact matches
# (vector similarity often ranks similar-sounding names higher than exact matches)
is_name_query = looks_like_person_name(query)
fetch_multiplier = 3 if target_role_category else 2
fetch_count = max(k * fetch_multiplier, 100 if is_name_query else 0)
results = self._person_vector_search(query, fetch_count, using=using, filter_conditions=filter_conditions)
logger.info(f"Found {len(results)} person results after Qdrant filtering")
# Apply role category post-filtering (keyword-based since not indexed)
if target_role_category:
results = filter_by_role_category_keywords(results, target_role_category)
# Sort by combined score and limit
results.sort(key=lambda x: x.combined_score, reverse=True)
return results[:k]
def search(
self,
query: str,
k: int | None = None,
expand_graph: bool = True,
filter_conditions: dict[str, Any] | None = None,
auto_route: bool = True,
using: str | None = None,
region_codes: list[str] | None = None,
cities: list[str] | None = None,
institution_types: list[str] | None = None,
) -> list[RetrievedInstitution] | list[RetrievedPerson]:
"""Perform hybrid vector + graph search with automatic query routing.
If auto_route is True, automatically detects if query is about persons
(e.g., "Who works at Rijksmuseum?") and routes to person search.
Args:
query: Natural language search query
k: Number of results to return (default: k_final)
expand_graph: Whether to perform graph expansion (institution search only)
filter_conditions: Optional Qdrant filter conditions (legacy, prefer new params)
auto_route: Automatically detect and route person queries
using: Optional embedding model name (for multi-embedding mode).
One of: "openai_1536", "minilm_384", "bge_768"
region_codes: Optional list of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
for filtering by province/subdivision
cities: Optional list of city names (e.g., ["Amsterdam", "Rotterdam"])
institution_types: Optional list of institution types (e.g., ["ARCHIVE", "MUSEUM"])
Returns:
List of RetrievedInstitution or RetrievedPerson with combined scores
"""
k = k or self.k_final
# Auto-route person queries
if auto_route:
query_type = detect_query_type(query)
if query_type == "person":
logger.info(f"Auto-routing to person search for: {query[:50]}...")
return self.search_persons(query, k=k, using=using)
# Institution search (original behavior)
filter_info = []
if region_codes:
filter_info.append(f"regions={region_codes}")
if cities:
filter_info.append(f"cities={cities}")
if institution_types:
filter_info.append(f"types={institution_types}")
filter_str = f" [{', '.join(filter_info)}]" if filter_info else ""
logger.info(f"Vector search for: {query[:50]}...{filter_str} (model: {using or 'auto'})")
vector_results = self._vector_search(
query,
self.k_vector,
using=using,
region_codes=region_codes,
cities=cities,
institution_types=institution_types,
)
logger.info(f"Found {len(vector_results)} vector results")
# Step 2: Graph expansion (if enabled)
graph_results = []
if expand_graph and vector_results:
logger.info("Expanding via knowledge graph...")
graph_results = self._graph_expand(vector_results)
logger.info(f"Found {len(graph_results)} graph expansion results")
# Step 3: Combine and rank
final_results = self._combine_and_rank(vector_results, graph_results, k)
logger.info(f"Returning {len(final_results)} combined results")
return final_results
def search_institutions(
self,
query: str,
k: int | None = None,
expand_graph: bool = True,
filter_conditions: dict[str, Any] | None = None,
using: str | None = None,
region_codes: list[str] | None = None,
cities: list[str] | None = None,
institution_types: list[str] | None = None,
) -> list[RetrievedInstitution]:
"""Explicit institution search (bypasses auto-routing).
Args:
query: Natural language search query
k: Number of results to return (default: k_final)
expand_graph: Whether to perform graph expansion
filter_conditions: Optional Qdrant filter conditions (legacy, prefer new params)
using: Optional embedding model name (for multi-embedding mode).
One of: "openai_1536", "minilm_384", "bge_768"
region_codes: Optional list of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
for filtering by province/subdivision
cities: Optional list of city names (e.g., ["Amsterdam", "Rotterdam"])
institution_types: Optional list of institution types (e.g., ["ARCHIVE", "MUSEUM"])
Returns:
List of RetrievedInstitution with combined scores
"""
# auto_route=False ensures we get RetrievedInstitution, not RetrievedPerson
results = self.search(
query,
k=k,
expand_graph=expand_graph,
filter_conditions=filter_conditions,
auto_route=False,
using=using,
region_codes=region_codes,
cities=cities,
institution_types=institution_types,
)
return results # type: ignore[return-value]
def __call__(self, query: str, k: int | None = None) -> list[str]:
"""DSPy-compatible interface returning passage texts.
Supports both institution and person queries with auto-routing.
Args:
query: Search query
k: Number of results
Returns:
List of passage texts (institution/person descriptions)
"""
results = self.search(query, k=k)
passages = []
for r in results:
if isinstance(r, RetrievedPerson):
# Person result
org = f" at {r.custodian_name}" if r.custodian_name else ""
role = r.headline or "Unknown role"
passages.append(f"{r.name} ({role}{org})")
else:
# Institution result
inst_type = r.institution_type or "Unknown type"
desc = r.description or "No description"
passages.append(f"{r.name} ({inst_type}) - {desc}")
return passages
def get_stats(self) -> dict[str, Any]:
"""Get retriever statistics.
Returns:
Dict with Qdrant and Oxigraph stats
"""
stats = {
"qdrant": {
"institutions": {},
"persons": {},
},
"oxigraph": {},
"config": {
"vector_weight": self.vector_weight,
"graph_weight": self.graph_weight,
"k_vector": self.k_vector,
"k_expand": self.k_expand,
"k_final": self.k_final
}
}
# Qdrant institution collection stats
try:
info = self.qdrant_client.get_collection(self.collection_name)
stats["qdrant"]["institutions"] = {
"collection": self.collection_name,
"points_count": info.points_count,
"status": info.status.value if info.status else "unknown"
}
except Exception as e:
stats["qdrant"]["institutions"]["error"] = str(e)
# Qdrant person collection stats
try:
info = self.qdrant_client.get_collection("heritage_persons")
stats["qdrant"]["persons"] = {
"collection": "heritage_persons",
"points_count": info.points_count,
"status": info.status.value if info.status else "unknown"
}
except Exception as e:
stats["qdrant"]["persons"]["error"] = str(e)
# Oxigraph stats
try:
result = self.sparql_client.query(
"SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s a hcc:Custodian }"
)
if result:
stats["oxigraph"]["custodian_count"] = int(result[0].get("count", 0))
except Exception as e:
stats["oxigraph"]["error"] = str(e)
return stats
def close(self):
"""Clean up resources."""
self.sparql_client.close()
if self._qdrant_client:
self._qdrant_client.close()
def create_hybrid_retriever(
use_production: bool = False,
**kwargs
) -> HybridRetriever:
"""Factory function to create a hybrid retriever.
Args:
use_production: If True, connect to production endpoints
**kwargs: Additional arguments for HybridRetriever
Returns:
Configured HybridRetriever instance
"""
if use_production:
return HybridRetriever(
qdrant_host="bronhouder.nl",
qdrant_port=443,
sparql_endpoint="https://bronhouder.nl/sparql",
use_production_qdrant=True,
**kwargs
)
else:
return HybridRetriever(
qdrant_host=os.getenv("QDRANT_HOST", "localhost"),
qdrant_port=int(os.getenv("QDRANT_PORT", "6333")),
sparql_endpoint=os.getenv("SPARQL_ENDPOINT", "http://localhost:7878/query"),
**kwargs
)