- Introduced `has_api_version`, `has_appellation_language`, `has_appellation_type`, `has_appellation_value`, `has_applicable_country`, `has_application_deadline`, `has_application_opening_date`, `has_appraisal_note`, `has_approval_date`, `has_archdiocese_name`, `has_architectural_style`, `has_archival_reference`, `has_archive_description`, `has_archive_memento_uri`, `has_archive_name`, `has_archive_path`, `has_archive_search_score`, `has_arrangement`, `has_arrangement_level`, `has_arrangement_note`, `has_articles_archival_stage`, `has_articles_document_format`, `has_articles_document_url`, `has_articles_of_association`, `has_or_had_altitude`, `has_or_had_annotation`, `has_or_had_arrangement`, `has_or_had_document`, `has_or_had_reason`, `has_or_had_style`, `is_or_was_amended_through`, `is_or_was_approved_on`, `is_or_was_archived_as`, `is_or_was_due_on`, `is_or_was_opened_on`, and `is_or_was_used_in` slots. - Each slot includes detailed descriptions, range specifications, and appropriate mappings to existing ontologies.
2534 lines
100 KiB
Python
2534 lines
100 KiB
Python
"""
|
||
Hybrid Retriever: Vector Search + Knowledge Graph Expansion
|
||
|
||
Combines Qdrant vector similarity search with Oxigraph SPARQL graph expansion
|
||
to provide semantically-aware and structurally-enriched retrieval.
|
||
|
||
Architecture:
|
||
1. Vector Search (Qdrant) - Find semantically similar institutions AND persons
|
||
2. Graph Expansion (Oxigraph) - Expand via relationships:
|
||
- Same city/region
|
||
- Same institution type
|
||
- Related collections
|
||
- Organizational relationships
|
||
3. Re-ranking - Combine scores for final ranking
|
||
4. Query Routing - Detect if query is about institutions or persons
|
||
|
||
Collections:
|
||
- heritage_custodians: Institution data (27K+ records)
|
||
- heritage_persons: Staff/person data (10K+ records)
|
||
|
||
Example usage:
|
||
retriever = HybridRetriever(
|
||
qdrant_host="localhost",
|
||
qdrant_port=6333,
|
||
sparql_endpoint="http://localhost:7878/query"
|
||
)
|
||
|
||
# Institution search
|
||
results = retriever.search("museums with Dutch colonial history")
|
||
|
||
# Person search (auto-detected or explicit)
|
||
results = retriever.search("Who works at the Nationaal Archief?")
|
||
results = retriever.search_persons("archivist at Rijksmuseum")
|
||
"""
|
||
|
||
import hashlib
|
||
import logging
|
||
import os
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, TYPE_CHECKING
|
||
|
||
import httpx
|
||
|
||
# Polygon filter for geographic containment testing (Dutch provinces)
|
||
from glam_extractor.geocoding.polygon_filter import (
|
||
get_polygon_filter,
|
||
ProvincePolygonFilter,
|
||
)
|
||
|
||
if TYPE_CHECKING:
|
||
from qdrant_client import QdrantClient
|
||
from openai import OpenAI
|
||
from sentence_transformers import SentenceTransformer
|
||
# Forward reference as string to avoid circular imports
|
||
MultiEmbeddingRetriever = Any # Actually from glam_extractor.api.multi_embedding_retriever
|
||
EmbeddingModel = Any # Actually from glam_extractor.api.multi_embedding_retriever
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# SPARQL endpoint configuration
|
||
DEFAULT_SPARQL_ENDPOINT = os.getenv("SPARQL_ENDPOINT", "http://localhost:7878/query")
|
||
DEFAULT_SPARQL_TIMEOUT = 30.0
|
||
|
||
# Ontology prefixes used in Oxigraph
|
||
SPARQL_PREFIXES = """
|
||
PREFIX hc: <https://nde.nl/ontology/hc/>
|
||
PREFIX hcc: <https://nde.nl/ontology/hc/class/>
|
||
PREFIX ghc: <https://w3id.org/heritage/custodian/>
|
||
PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
|
||
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
|
||
PREFIX wd: <http://www.wikidata.org/entity/>
|
||
PREFIX schema: <http://schema.org/>
|
||
PREFIX geo: <http://www.w3.org/2003/01/geo/wgs84_pos#>
|
||
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
|
||
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
|
||
"""
|
||
|
||
|
||
@dataclass
|
||
class RetrievedInstitution:
|
||
"""A retrieved heritage institution with combined scores."""
|
||
|
||
ghcid: str
|
||
name: str
|
||
uri: str
|
||
vector_score: float = 0.0
|
||
graph_score: float = 0.0
|
||
combined_score: float = 0.0
|
||
|
||
# Metadata from vector search
|
||
institution_type: str | None = None
|
||
country: str | None = None
|
||
city: str | None = None
|
||
description: str | None = None
|
||
|
||
# Geographic coordinates
|
||
latitude: float | None = None
|
||
longitude: float | None = None
|
||
|
||
# Graph expansion data
|
||
related_institutions: list[str] = field(default_factory=list)
|
||
expansion_reason: str | None = None # e.g., "same_city", "same_type", "related_collection"
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
"""Convert to dictionary for API responses."""
|
||
return {
|
||
"ghcid": self.ghcid,
|
||
"name": self.name,
|
||
"uri": self.uri,
|
||
"scores": {
|
||
"vector": round(self.vector_score, 4),
|
||
"graph": round(self.graph_score, 4),
|
||
"combined": round(self.combined_score, 4),
|
||
},
|
||
"metadata": {
|
||
"institution_type": self.institution_type,
|
||
"country": self.country,
|
||
"city": self.city,
|
||
"description": self.description,
|
||
"latitude": self.latitude,
|
||
"longitude": self.longitude,
|
||
},
|
||
"graph_expansion": {
|
||
"related_institutions": self.related_institutions,
|
||
"expansion_reason": self.expansion_reason,
|
||
}
|
||
}
|
||
|
||
|
||
# ===================================================================
|
||
# Linked Data URI Generation Utilities
|
||
# ===================================================================
|
||
# Generate stable ontology-aligned URIs for Person and PersonObservation
|
||
# following the LinkML schema at schemas/20251121/linkml/
|
||
# Namespace: https://nde.nl/ontology/hc/
|
||
# ===================================================================
|
||
|
||
import re
|
||
import unicodedata
|
||
|
||
# Ontology namespaces
|
||
ONTOLOGY_BASE = "https://nde.nl/ontology/hc"
|
||
PERSON_HUB_PREFIX = f"{ONTOLOGY_BASE}/person"
|
||
PERSON_OBS_PREFIX = f"{ONTOLOGY_BASE}/person-obs"
|
||
CUSTODIAN_PREFIX = f"{ONTOLOGY_BASE}/custodian"
|
||
|
||
# JSON-LD context for person search responses
|
||
PERSON_JSONLD_CONTEXT = {
|
||
"@vocab": f"{ONTOLOGY_BASE}/",
|
||
"schema": "http://schema.org/",
|
||
"pico": "https://personsincontext.org/model#",
|
||
"prov": "http://www.w3.org/ns/prov#",
|
||
"foaf": "http://xmlns.com/foaf/0.1/",
|
||
"name": "schema:name",
|
||
"jobTitle": "schema:jobTitle",
|
||
"affiliation": "schema:affiliation",
|
||
"sameAs": "schema:sameAs",
|
||
"refers_to_person": "pico:observationOf",
|
||
"observation_source": "prov:hadPrimarySource",
|
||
}
|
||
|
||
|
||
def generate_slug(text: str) -> str:
|
||
"""Generate URL-safe slug from text.
|
||
|
||
Examples:
|
||
"Kitty Bogte" → "kitty-bogte"
|
||
"Dr. Jane Smith" → "dr-jane-smith"
|
||
"Taco Dibbits" → "taco-dibbits"
|
||
"""
|
||
if not text:
|
||
return "unknown"
|
||
|
||
# Normalize unicode (NFD decomposition) and remove diacritics
|
||
normalized = unicodedata.normalize('NFD', text)
|
||
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
||
|
||
# Convert to lowercase
|
||
lowercase = ascii_text.lower()
|
||
|
||
# Replace non-alphanumeric with hyphens
|
||
slug = re.sub(r'[^a-z0-9]+', '-', lowercase)
|
||
|
||
# Collapse multiple hyphens and strip leading/trailing
|
||
slug = re.sub(r'-+', '-', slug).strip('-')
|
||
|
||
return slug or "unknown"
|
||
|
||
|
||
def generate_role_slug(headline: str | None) -> str:
|
||
"""Generate role slug from job title/headline.
|
||
|
||
Examples:
|
||
"Programmer/curator" → "programmer-curator"
|
||
"Senior Archivist" → "senior-archivist"
|
||
None → "staff"
|
||
"""
|
||
if not headline:
|
||
return "staff"
|
||
return generate_slug(headline)
|
||
|
||
|
||
def generate_person_hub_uri(name: str, linkedin_slug: str | None = None) -> str:
|
||
"""Generate Person hub URI (abstract identity).
|
||
|
||
Format: https://nde.nl/ontology/hc/person/{person-slug}
|
||
|
||
Uses LinkedIn slug if available for stability, otherwise derives from name.
|
||
|
||
Examples:
|
||
generate_person_hub_uri("Kitty Bogte", "kittybogte")
|
||
→ "https://nde.nl/ontology/hc/person/kittybogte"
|
||
generate_person_hub_uri("Dr. Jane Smith")
|
||
→ "https://nde.nl/ontology/hc/person/dr-jane-smith"
|
||
"""
|
||
if linkedin_slug:
|
||
slug = linkedin_slug
|
||
else:
|
||
slug = generate_slug(name)
|
||
|
||
return f"{PERSON_HUB_PREFIX}/{slug}"
|
||
|
||
|
||
def generate_observation_uri(
|
||
custodian_slug: str | None,
|
||
person_name: str,
|
||
role_slug: str | None = None,
|
||
linkedin_slug: str | None = None
|
||
) -> str:
|
||
"""Generate PersonObservation URI.
|
||
|
||
Format: https://nde.nl/ontology/hc/person-obs/{custodian-slug}/{person-slug}/{role-slug}
|
||
|
||
Examples:
|
||
generate_observation_uri("nl-ga-nationaal-archief", "Kitty Bogte", "programmer-curator")
|
||
→ "https://nde.nl/ontology/hc/person-obs/nl-ga-nationaal-archief/kitty-bogte/programmer-curator"
|
||
"""
|
||
custodian = custodian_slug or "unknown-custodian"
|
||
person = linkedin_slug or generate_slug(person_name)
|
||
role = role_slug or "staff"
|
||
|
||
return f"{PERSON_OBS_PREFIX}/{custodian}/{person}/{role}"
|
||
|
||
|
||
def generate_custodian_uri(custodian_slug: str | None, ghcid: str | None = None) -> str | None:
|
||
"""Generate Custodian URI.
|
||
|
||
Format: https://nde.nl/ontology/hc/custodian/{ghcid-or-slug}
|
||
"""
|
||
if ghcid:
|
||
return f"{CUSTODIAN_PREFIX}/{ghcid}"
|
||
elif custodian_slug:
|
||
return f"{CUSTODIAN_PREFIX}/{custodian_slug}"
|
||
return None
|
||
|
||
|
||
def extract_linkedin_slug(linkedin_url: str | None) -> str | None:
|
||
"""Extract slug from LinkedIn URL.
|
||
|
||
Examples:
|
||
"https://www.linkedin.com/in/kittybogte" → "kittybogte"
|
||
"https://linkedin.com/in/jane-smith-12345" → "jane-smith-12345"
|
||
"""
|
||
if not linkedin_url:
|
||
return None
|
||
|
||
match = re.search(r'linkedin\.com/in/([^/?]+)', linkedin_url)
|
||
return match.group(1) if match else None
|
||
|
||
|
||
@dataclass
|
||
class RetrievedPerson:
|
||
"""A retrieved person/staff member with search scores and linked data URIs."""
|
||
|
||
person_id: str
|
||
name: str
|
||
vector_score: float = 0.0
|
||
combined_score: float = 0.0
|
||
richness_score: float = 0.0 # Metadata richness score (0-1)
|
||
|
||
# Metadata from vector search
|
||
headline: str | None = None # Job title/role
|
||
custodian_name: str | None = None # Organization they work at
|
||
custodian_slug: str | None = None
|
||
location: str | None = None
|
||
heritage_relevant: bool = False
|
||
heritage_type: str | None = None # GLAMORCUBESFIXPHDNT code
|
||
source_type: str | None = None # "staff_list" or "entity_profile"
|
||
linkedin_url: str | None = None
|
||
has_wcms: bool = False # WCMS-registered profile (heritage sector user)
|
||
|
||
# WCMS-specific fields for display on review page
|
||
wcms_user_id: str | None = None
|
||
wcms_abs_id: str | None = None # NAN identifier
|
||
wcms_crm_id: str | None = None
|
||
wcms_username: str | None = None
|
||
wcms_username_url: str | None = None
|
||
wcms_status: str | None = None # "Active" or "Blocked"
|
||
wcms_roles: list[str] | None = None
|
||
wcms_registered_since: str | None = None
|
||
wcms_last_access: str | None = None
|
||
|
||
# Contact details
|
||
email: str | None = None
|
||
email_domain: str | None = None
|
||
|
||
# Linked data fields (generated)
|
||
linkedin_profile_path: str | None = None # Path to entity JSON file
|
||
|
||
@property
|
||
def linkedin_slug(self) -> str | None:
|
||
"""Extract LinkedIn slug from URL."""
|
||
return extract_linkedin_slug(self.linkedin_url)
|
||
|
||
@property
|
||
def person_hub_uri(self) -> str:
|
||
"""Generate Person hub URI (abstract identity)."""
|
||
return generate_person_hub_uri(self.name, self.linkedin_slug)
|
||
|
||
@property
|
||
def observation_uri(self) -> str:
|
||
"""Generate PersonObservation URI."""
|
||
role_slug = generate_role_slug(self.headline)
|
||
return generate_observation_uri(
|
||
self.custodian_slug,
|
||
self.name,
|
||
role_slug,
|
||
self.linkedin_slug
|
||
)
|
||
|
||
@property
|
||
def custodian_uri(self) -> str | None:
|
||
"""Generate Custodian URI."""
|
||
return generate_custodian_uri(self.custodian_slug)
|
||
|
||
def to_dict(self, include_jsonld: bool = True) -> dict[str, Any]:
|
||
"""Convert to dictionary for API responses.
|
||
|
||
Args:
|
||
include_jsonld: If True, include JSON-LD linked data fields (@id, @type, etc.)
|
||
"""
|
||
result = {
|
||
"person_id": self.person_id,
|
||
"name": self.name,
|
||
"scores": {
|
||
"vector": round(self.vector_score, 4),
|
||
"combined": round(self.combined_score, 4),
|
||
"richness": round(self.richness_score, 4),
|
||
},
|
||
"metadata": {
|
||
"headline": self.headline,
|
||
"custodian_name": self.custodian_name,
|
||
"custodian_slug": self.custodian_slug,
|
||
"location": self.location,
|
||
"heritage_relevant": self.heritage_relevant,
|
||
"heritage_type": self.heritage_type,
|
||
"source_type": self.source_type,
|
||
"linkedin_url": self.linkedin_url,
|
||
"has_wcms": self.has_wcms,
|
||
# WCMS fields for review page
|
||
"wcms_user_id": self.wcms_user_id,
|
||
"wcms_abs_id": self.wcms_abs_id,
|
||
"wcms_crm_id": self.wcms_crm_id,
|
||
"wcms_username": self.wcms_username,
|
||
"wcms_username_url": self.wcms_username_url,
|
||
"wcms_status": self.wcms_status,
|
||
"wcms_roles": self.wcms_roles,
|
||
"wcms_registered_since": self.wcms_registered_since,
|
||
"wcms_last_access": self.wcms_last_access,
|
||
# Contact details
|
||
"email": self.email,
|
||
"email_domain": self.email_domain,
|
||
}
|
||
}
|
||
|
||
if include_jsonld:
|
||
# Add JSON-LD linked data fields
|
||
result["@id"] = self.observation_uri
|
||
result["@type"] = "pico:PersonObservation"
|
||
result["refers_to_person"] = self.person_hub_uri
|
||
|
||
# Add custodian affiliation if available
|
||
if self.custodian_uri:
|
||
result["unit_affiliation"] = self.custodian_uri
|
||
|
||
# Add schema:sameAs for LinkedIn URL
|
||
if self.linkedin_url:
|
||
result["schema:sameAs"] = self.linkedin_url
|
||
|
||
# Add linkedin_profile_path if available
|
||
if self.linkedin_profile_path:
|
||
result["linkedin_profile_path"] = self.linkedin_profile_path
|
||
|
||
return result
|
||
|
||
|
||
# Query type detection patterns
|
||
PERSON_QUERY_PATTERNS = [
|
||
# Dutch
|
||
"wie werkt", "wie werk", "werken in", "werken bij", "medewerker", "personeel",
|
||
"staff", "werknemer", "expert", "experts", "specialist", "specialisten",
|
||
"directeur", "curator", "archivaris", "bibliothecaris", "conservator",
|
||
"team", "collega", "collegas", "mensen bij", "werkzaam",
|
||
# English
|
||
"who works", "staff at", "employees", "team at", "people at", "work at",
|
||
"director of", "curator at", "archivist", "librarian", "works at",
|
||
"experts at", "specialists", "professionals at",
|
||
# Generic
|
||
"linkedin", "person", "professional",
|
||
]
|
||
|
||
# ===================================================================
|
||
# Dutch Province/Subdivision Code Mapping (ISO 3166-2:NL)
|
||
# ===================================================================
|
||
# Maps province names (lowercase, various spellings) to ISO 3166-2 codes
|
||
# Used for filtering Qdrant queries by region
|
||
# Qdrant payload field: "region" (stores short codes like "NH", "ZH")
|
||
# ===================================================================
|
||
|
||
DUTCH_PROVINCE_CODES: dict[str, str] = {
|
||
# Noord-Holland
|
||
"noord-holland": "NH",
|
||
"noordholland": "NH",
|
||
"north holland": "NH",
|
||
"north-holland": "NH",
|
||
# Zuid-Holland
|
||
"zuid-holland": "ZH",
|
||
"zuidholland": "ZH",
|
||
"south holland": "ZH",
|
||
"south-holland": "ZH",
|
||
# Utrecht
|
||
"utrecht": "UT",
|
||
# Gelderland
|
||
"gelderland": "GE",
|
||
# Noord-Brabant
|
||
"noord-brabant": "NB",
|
||
"noordbrabant": "NB",
|
||
"brabant": "NB",
|
||
"north brabant": "NB",
|
||
# Limburg
|
||
"limburg": "LI",
|
||
# Overijssel
|
||
"overijssel": "OV",
|
||
# Friesland / Fryslân
|
||
"friesland": "FR",
|
||
"fryslân": "FR",
|
||
"fryslan": "FR",
|
||
# Groningen
|
||
"groningen": "GR",
|
||
# Drenthe
|
||
"drenthe": "DR",
|
||
# Flevoland
|
||
"flevoland": "FL",
|
||
# Zeeland
|
||
"zeeland": "ZE",
|
||
}
|
||
|
||
|
||
def get_province_code(province_name: str | None) -> str | None:
|
||
"""Convert Dutch province name to ISO 3166-2 subdivision code (without country prefix).
|
||
|
||
Args:
|
||
province_name: Province name in Dutch or English (case-insensitive)
|
||
|
||
Returns:
|
||
Two-letter province code (e.g., "NH", "ZH") or None if not found
|
||
|
||
Example:
|
||
>>> get_province_code("Noord-Holland")
|
||
'NH'
|
||
>>> get_province_code("south holland")
|
||
'ZH'
|
||
>>> get_province_code("Bavaria")
|
||
None
|
||
"""
|
||
if not province_name:
|
||
return None
|
||
return DUTCH_PROVINCE_CODES.get(province_name.lower().strip())
|
||
|
||
def looks_like_person_name(query: str) -> bool:
|
||
"""Detect if query looks like a person's name for name-boosted search.
|
||
|
||
A query looks like a person name if it:
|
||
- Contains 2-4 capitalized words (first/last name pattern)
|
||
- Does NOT contain common non-name words (institutions, locations, etc.)
|
||
- Does NOT contain question words (who, what, where, etc.)
|
||
|
||
Args:
|
||
query: Search query string
|
||
|
||
Returns:
|
||
True if query appears to be a person name
|
||
|
||
Examples:
|
||
>>> looks_like_person_name("Kitty Bogte")
|
||
True
|
||
>>> looks_like_person_name("Who works at the Rijksmuseum?")
|
||
False
|
||
>>> looks_like_person_name("archivist at Nationaal Archief")
|
||
False
|
||
"""
|
||
# Skip if query contains question words or common phrases
|
||
non_name_indicators = [
|
||
# Question words
|
||
"who", "what", "where", "which", "how", "why",
|
||
"wie", "wat", "waar", "welk", "hoe", "waarom",
|
||
# Role/job indicators
|
||
"works at", "working at", "werkt bij", "werkzaam",
|
||
"archivist", "curator", "director", "librarian",
|
||
"archivaris", "directeur", "bibliothecaris",
|
||
# Prepositions indicating context
|
||
" at ", " in ", " of ", " for ", " the ",
|
||
" bij ", " in ", " van ", " voor ", " de ", " het ",
|
||
# Punctuation that indicates non-name queries
|
||
"?", "!",
|
||
]
|
||
|
||
query_lower = query.lower()
|
||
for indicator in non_name_indicators:
|
||
if indicator in query_lower:
|
||
return False
|
||
|
||
# Check for capitalized word pattern (typical of names)
|
||
words = query.strip().split()
|
||
if len(words) < 2 or len(words) > 4:
|
||
return False
|
||
|
||
# Check if words look like name components (capitalized or all letters)
|
||
capitalized_count = sum(1 for w in words if w[0].isupper() and w.isalpha())
|
||
|
||
# Most name words should be capitalized
|
||
return capitalized_count >= len(words) - 1 # Allow one lowercase (e.g., "van", "de")
|
||
|
||
|
||
def calculate_name_match_boost(query: str, name: str) -> float:
|
||
"""Calculate a score boost for name matching.
|
||
|
||
Uses case-insensitive substring matching to boost results where
|
||
the query matches part or all of the person's name.
|
||
|
||
Args:
|
||
query: Search query (potential name)
|
||
name: Person's name from search result
|
||
|
||
Returns:
|
||
Boost factor (1.0 = no boost, >1.0 = boosted)
|
||
- 3.0: Exact match (case-insensitive)
|
||
- 2.5: Query contains full name or name contains full query
|
||
- 2.0: Partial match (first or last name matches)
|
||
- 1.0: No match
|
||
"""
|
||
query_lower = query.lower().strip()
|
||
name_lower = name.lower().strip()
|
||
|
||
# Exact match
|
||
if query_lower == name_lower:
|
||
return 3.0
|
||
|
||
# Query is substring of name or vice versa
|
||
if query_lower in name_lower or name_lower in query_lower:
|
||
return 2.5
|
||
|
||
# Check for partial matches (first or last name)
|
||
query_parts = set(query_lower.split())
|
||
name_parts = set(name_lower.split())
|
||
|
||
# How many query parts match name parts?
|
||
matching_parts = query_parts & name_parts
|
||
if matching_parts:
|
||
# More matching parts = higher boost
|
||
match_ratio = len(matching_parts) / max(len(query_parts), len(name_parts))
|
||
return 1.0 + match_ratio # 1.5-2.0 range for partial matches
|
||
|
||
return 1.0 # No boost
|
||
|
||
|
||
def detect_query_type(query: str, dspy_entity_type: str | None = None) -> str:
|
||
"""Detect if query is about institutions or persons.
|
||
|
||
Uses DSPy LLM classification if provided, falls back to keyword heuristics.
|
||
|
||
Args:
|
||
query: Search query string
|
||
dspy_entity_type: Optional entity_type from DSPy HeritageQueryRouter
|
||
("person", "institution", or "both")
|
||
|
||
Returns:
|
||
"person" or "institution"
|
||
"""
|
||
# Prefer DSPy semantic classification when available
|
||
if dspy_entity_type:
|
||
if dspy_entity_type in ("person", "both"):
|
||
return "person"
|
||
if dspy_entity_type == "institution":
|
||
return "institution"
|
||
|
||
# Fallback to keyword heuristics
|
||
query_lower = query.lower()
|
||
|
||
for pattern in PERSON_QUERY_PATTERNS:
|
||
if pattern in query_lower:
|
||
return "person"
|
||
|
||
return "institution"
|
||
|
||
|
||
# ===================================================================
|
||
# Schema-Aware Filter Mapping for DSPy Heritage Query Router
|
||
# ===================================================================
|
||
#
|
||
# These mappings are now loaded DYNAMICALLY from the LinkML schema files
|
||
# via the ontology_mapping module. This ensures:
|
||
# 1. Schema is the single source of truth (no hardcoded values)
|
||
# 2. Multilingual support (Dutch, German, French, Spanish, etc.)
|
||
# 3. Automatic updates when schema changes
|
||
#
|
||
# The ontology_mapping module extracts synonyms from YAML comments
|
||
# and provides fuzzy matching for natural language queries.
|
||
# ===================================================================
|
||
|
||
def _get_custodian_type_mapping() -> dict[str, str]:
|
||
"""Get custodian type to heritage code mapping from schema.
|
||
|
||
Dynamically loads from CustodianPrimaryTypeEnum in LinkML schema.
|
||
Falls back to minimal hardcoded mapping if schema unavailable.
|
||
|
||
Returns:
|
||
Dict mapping custodian type (e.g., "MUSEUM") to heritage code (e.g., "M")
|
||
"""
|
||
try:
|
||
# Try backend.rag path first (when backend is in Python path)
|
||
from backend.rag.ontology_mapping import get_custodian_type_mapping
|
||
mapping = get_custodian_type_mapping()
|
||
if mapping:
|
||
return mapping
|
||
except ImportError:
|
||
try:
|
||
# Fallback: try direct import (when ontology_mapping is in sys.path)
|
||
from ontology_mapping import get_custodian_type_mapping # type: ignore[import-not-found]
|
||
mapping = get_custodian_type_mapping()
|
||
if mapping:
|
||
return mapping
|
||
except ImportError:
|
||
logger.warning("ontology_mapping not available, using fallback mapping")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load custodian type mapping from schema: {e}")
|
||
|
||
# Fallback: minimal GLAMORCUBESFIXPHDNT mapping
|
||
return {
|
||
"GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", "MUSEUM": "M",
|
||
"OFFICIAL_INSTITUTION": "O", "RESEARCH_CENTER": "R", "CORPORATION": "C",
|
||
"UNKNOWN": "U", "BIO_CUSTODIAN": "B", "EDUCATION_PROVIDER": "E",
|
||
"COLLECTING_SOCIETY": "S", "FEATURE": "F", "INTANGIBLE_HERITAGE_GROUP": "I",
|
||
"MIXED": "X", "PERSONAL_COLLECTION": "P", "HOLY_SITE": "H",
|
||
"DIGITAL_PLATFORM": "D", "NGO": "N", "TASTE_SMELL_HERITAGE": "T",
|
||
}
|
||
|
||
|
||
def _get_role_category_keywords() -> dict[str, list[str]]:
|
||
"""Get role category keywords from schema.
|
||
|
||
Dynamically loads from RoleCategoryEnum in LinkML schema.
|
||
Falls back to hardcoded keywords if schema unavailable.
|
||
|
||
Returns:
|
||
Dict mapping role category (e.g., "CURATORIAL") to keywords list
|
||
"""
|
||
try:
|
||
# Try backend.rag path first (when backend is in Python path)
|
||
from backend.rag.ontology_mapping import get_role_keywords
|
||
keywords = get_role_keywords()
|
||
if keywords:
|
||
return keywords
|
||
except ImportError:
|
||
try:
|
||
# Fallback: try direct import (when ontology_mapping is in sys.path)
|
||
from ontology_mapping import get_role_keywords # type: ignore[import-not-found]
|
||
keywords = get_role_keywords()
|
||
if keywords:
|
||
return keywords
|
||
except ImportError:
|
||
logger.warning("ontology_mapping not available, using fallback role keywords")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load role keywords from schema: {e}")
|
||
|
||
# Fallback: essential role category keywords (hardcoded)
|
||
return {
|
||
"CURATORIAL": [
|
||
"curator", "curatorial", "collectie", "collection", "tentoonstellingen",
|
||
"exhibitions", "acquisitions", "registrar", "museum professional"
|
||
],
|
||
"CONSERVATION": [
|
||
"conservator", "conservation", "restaurator", "restoration", "preservatie",
|
||
"preservation", "materiaal", "material", "preventive"
|
||
],
|
||
"ARCHIVAL": [
|
||
"archivist", "archivaris", "archief", "archive", "records", "documentalist",
|
||
"erfgoed", "heritage records", "acquisitie", "beschrijving"
|
||
],
|
||
"LIBRARY": [
|
||
"bibliothecaris", "librarian", "bibliotheek", "library", "catalogus",
|
||
"cataloging", "metadata", "special collections", "reference"
|
||
],
|
||
"DIGITAL": [
|
||
"digital", "digitaal", "developer", "data", "software", "IT", "tech",
|
||
"engineer", "digitalisering", "digitization", "web", "database"
|
||
],
|
||
"EDUCATION": [
|
||
"educatie", "education", "learning", "museum educator", "outreach",
|
||
"public programs", "docent", "teacher", "rondleiding", "guide"
|
||
],
|
||
"GOVERNANCE": [
|
||
"bestuur", "board", "governance", "trustee", "raad", "council",
|
||
"advisory", "commissie", "committee"
|
||
],
|
||
"LEADERSHIP": [
|
||
"director", "directeur", "manager", "head of", "hoofd", "chief",
|
||
"CEO", "president", "leider", "leadership"
|
||
],
|
||
"RESEARCH": [
|
||
"onderzoek", "research", "researcher", "wetenschapper", "scientist",
|
||
"academic", "scholar", "fellow", "postdoc", "PhD"
|
||
],
|
||
"TECHNICAL": [
|
||
"technical", "technisch", "facilities", "installation", "AV",
|
||
"audiovisual", "lighting", "security", "beveiliging"
|
||
],
|
||
"SUPPORT": [
|
||
"support", "admin", "administratie", "office", "HR", "finance",
|
||
"marketing", "communications", "front desk", "visitor services"
|
||
],
|
||
"CREATIVE": [
|
||
"design", "ontwerp", "creative", "graphic", "exhibition design",
|
||
"multimedia", "artist", "kunstenaar", "visual"
|
||
],
|
||
"EXTERNAL": [
|
||
"volunteer", "vrijwilliger", "intern", "stagiair", "consultant",
|
||
"advisor", "external", "contractor", "freelance"
|
||
],
|
||
}
|
||
|
||
|
||
# Lazy-loaded module-level caches (populated on first access)
|
||
_CUSTODIAN_TYPE_MAPPING: dict[str, str] | None = None
|
||
_ROLE_CATEGORY_KEYWORDS: dict[str, list[str]] | None = None
|
||
|
||
|
||
def get_custodian_type_to_heritage_code() -> dict[str, str]:
|
||
"""Get cached custodian type to heritage code mapping."""
|
||
global _CUSTODIAN_TYPE_MAPPING
|
||
if _CUSTODIAN_TYPE_MAPPING is None:
|
||
_CUSTODIAN_TYPE_MAPPING = _get_custodian_type_mapping()
|
||
return _CUSTODIAN_TYPE_MAPPING
|
||
|
||
|
||
def get_role_category_keywords() -> dict[str, list[str]]:
|
||
"""Get cached role category keywords."""
|
||
global _ROLE_CATEGORY_KEYWORDS
|
||
if _ROLE_CATEGORY_KEYWORDS is None:
|
||
_ROLE_CATEGORY_KEYWORDS = _get_role_category_keywords()
|
||
return _ROLE_CATEGORY_KEYWORDS
|
||
|
||
|
||
def build_schema_aware_person_filter(
|
||
heritage_type_code: str | None = None,
|
||
heritage_relevant_only: bool = False,
|
||
custodian_slug: str | None = None,
|
||
only_wcms: bool = False,
|
||
) -> dict[str, Any] | None:
|
||
"""Build Qdrant filter conditions for schema-aware person search.
|
||
|
||
Args:
|
||
heritage_type_code: Single-letter heritage type code (M, A, L, etc.)
|
||
heritage_relevant_only: Only return heritage-relevant staff
|
||
custodian_slug: Filter by specific custodian
|
||
only_wcms: Only return WCMS-registered profiles (heritage sector users)
|
||
|
||
Returns:
|
||
Dict of filter conditions for Qdrant, or None if no filters
|
||
"""
|
||
filters: dict[str, Any] = {}
|
||
|
||
if heritage_type_code and heritage_type_code not in ("U", "UNKNOWN", "UNSPECIFIED"):
|
||
filters["heritage_type"] = heritage_type_code
|
||
|
||
if heritage_relevant_only:
|
||
filters["heritage_relevant"] = True
|
||
|
||
if custodian_slug:
|
||
filters["custodian_slug"] = custodian_slug
|
||
|
||
if only_wcms:
|
||
filters["has_wcms"] = True
|
||
|
||
return filters if filters else None
|
||
|
||
|
||
def filter_by_role_category_keywords(
|
||
results: list["RetrievedPerson"],
|
||
role_category: str | None,
|
||
) -> list["RetrievedPerson"]:
|
||
"""Post-filter search results by role category using headline keywords.
|
||
|
||
Since role_category is not indexed in Qdrant, we use headline keyword matching
|
||
to filter results after vector search.
|
||
|
||
Args:
|
||
results: List of RetrievedPerson from vector search
|
||
role_category: Target role category (CURATORIAL, ARCHIVAL, etc.)
|
||
|
||
Returns:
|
||
Filtered list of RetrievedPerson matching the role category
|
||
"""
|
||
if not role_category or role_category in ("UNKNOWN", "UNSPECIFIED"):
|
||
return results
|
||
|
||
keywords = get_role_category_keywords().get(role_category, [])
|
||
if not keywords:
|
||
return results
|
||
|
||
filtered = []
|
||
for person in results:
|
||
headline = (person.headline or "").lower()
|
||
# Check if any keyword matches the headline
|
||
if any(kw.lower() in headline for kw in keywords):
|
||
filtered.append(person)
|
||
|
||
# If filtering removed all results, return original (don't be too strict)
|
||
if not filtered:
|
||
logger.info(f"Role category filter '{role_category}' removed all results, returning unfiltered")
|
||
return results
|
||
|
||
logger.info(f"Role category filter '{role_category}' reduced results from {len(results)} to {len(filtered)}")
|
||
return filtered
|
||
|
||
|
||
def get_heritage_type_code(custodian_type: str | None) -> str | None:
|
||
"""Convert CustodianPrimaryTypeEnum value to single-letter heritage code.
|
||
|
||
Args:
|
||
custodian_type: Custodian type from DSPy router (e.g., "MUSEUM", "ARCHIVE")
|
||
|
||
Returns:
|
||
Single-letter heritage code (e.g., "M", "A") or None if not mappable
|
||
"""
|
||
if not custodian_type or custodian_type in ("UNKNOWN", "UNSPECIFIED"):
|
||
return None
|
||
return get_custodian_type_to_heritage_code().get(custodian_type)
|
||
|
||
|
||
class SPARQLClient:
|
||
"""Client for querying Oxigraph SPARQL endpoint."""
|
||
|
||
def __init__(
|
||
self,
|
||
endpoint: str = DEFAULT_SPARQL_ENDPOINT,
|
||
timeout: float = DEFAULT_SPARQL_TIMEOUT,
|
||
max_connections: int = 20 # Allow concurrent connections for parallel queries
|
||
):
|
||
self.endpoint = endpoint
|
||
self.timeout = timeout
|
||
self.max_connections = max_connections
|
||
self._client: httpx.Client | None = None
|
||
|
||
@property
|
||
def client(self) -> httpx.Client:
|
||
"""Lazy-initialize HTTP client with connection pooling."""
|
||
if self._client is None:
|
||
# Configure connection pool for parallel SPARQL queries
|
||
limits = httpx.Limits(
|
||
max_keepalive_connections=self.max_connections,
|
||
max_connections=self.max_connections,
|
||
keepalive_expiry=30.0 # Keep connections alive for reuse
|
||
)
|
||
self._client = httpx.Client(
|
||
timeout=self.timeout,
|
||
limits=limits,
|
||
http2=False # HTTP/1.1 is often faster for small queries
|
||
)
|
||
return self._client
|
||
|
||
def query(self, sparql: str, log_timing: bool = False) -> list[dict[str, Any]]:
|
||
"""Execute SPARQL query and return results.
|
||
|
||
Args:
|
||
sparql: SPARQL query string
|
||
log_timing: Whether to log query execution time
|
||
|
||
Returns:
|
||
List of result bindings as dictionaries
|
||
"""
|
||
full_query = SPARQL_PREFIXES + sparql
|
||
start_time = time.time() if log_timing else 0
|
||
|
||
try:
|
||
response = self.client.post(
|
||
self.endpoint,
|
||
data={"query": full_query},
|
||
headers={"Accept": "application/sparql-results+json"}
|
||
)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
bindings = data.get("results", {}).get("bindings", [])
|
||
|
||
# Convert bindings to simple dicts
|
||
results = []
|
||
for binding in bindings:
|
||
row = {}
|
||
for key, value in binding.items():
|
||
row[key] = value.get("value", "")
|
||
results.append(row)
|
||
|
||
if log_timing:
|
||
duration_ms = (time.time() - start_time) * 1000
|
||
logger.debug(f"SPARQL query completed: {len(results)} results in {duration_ms:.0f}ms")
|
||
|
||
return results
|
||
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"SPARQL query failed: {e}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error in SPARQL query: {e}")
|
||
return []
|
||
|
||
def close(self) -> None:
|
||
"""Close the HTTP client."""
|
||
if self._client:
|
||
self._client.close()
|
||
self._client = None
|
||
|
||
|
||
class HybridRetriever:
|
||
"""Hybrid retriever combining vector search with knowledge graph expansion.
|
||
|
||
The retrieval process:
|
||
1. Vector search finds semantically similar institutions
|
||
2. For each result, SPARQL expands to find related institutions:
|
||
- Institutions in the same city
|
||
- Institutions of the same type
|
||
- Institutions with related collections
|
||
3. Results are re-ranked based on combined vector + graph scores
|
||
|
||
Embedding Models:
|
||
- If OpenAI API key is available AND collection uses 1536-dim vectors: use OpenAI
|
||
- Otherwise: use sentence-transformers (all-MiniLM-L6-v2, 384-dim)
|
||
|
||
Multi-Embedding Support:
|
||
Set use_multi_embedding=True to enable support for multiple embedding models
|
||
via Qdrant's named vectors feature. This allows:
|
||
- A/B testing different embedding models
|
||
- Seamless migration between models
|
||
- Specifying which model to use per query
|
||
|
||
Args:
|
||
qdrant_host: Qdrant server hostname
|
||
qdrant_port: Qdrant REST API port
|
||
sparql_endpoint: Oxigraph SPARQL endpoint URL
|
||
vector_weight: Weight for vector similarity scores (0-1)
|
||
graph_weight: Weight for graph expansion scores (0-1)
|
||
collection_name: Qdrant collection name
|
||
embedding_model: Embedding model name (auto-detected if not specified)
|
||
k_vector: Number of initial vector search results
|
||
k_expand: Number of graph expansion results per seed
|
||
k_final: Final number of results to return
|
||
use_multi_embedding: Enable multi-embedding mode with named vectors
|
||
preferred_embedding_model: Preferred model for multi-embedding mode
|
||
"""
|
||
|
||
# Class-level type annotations for instance attributes
|
||
qdrant_host: str
|
||
qdrant_port: int
|
||
sparql_endpoint: str
|
||
vector_weight: float
|
||
graph_weight: float
|
||
collection_name: str
|
||
k_vector: int
|
||
k_expand: int
|
||
k_final: int
|
||
openai_api_key: str | None
|
||
use_production_qdrant: bool
|
||
use_multi_embedding: bool
|
||
preferred_embedding_model: str | None
|
||
sparql_client: "SPARQLClient"
|
||
embedding_model: str
|
||
|
||
# Private attributes with lazy initialization
|
||
_qdrant_client: "QdrantClient | None"
|
||
_openai_client: "OpenAI | None"
|
||
_st_model: "SentenceTransformer | None"
|
||
_use_sentence_transformers: bool
|
||
_collection_vector_size: int | None
|
||
_multi_retriever: "MultiEmbeddingRetriever | None"
|
||
_selected_multi_model: "EmbeddingModel | None"
|
||
|
||
def __init__(
|
||
self,
|
||
qdrant_host: str = "localhost",
|
||
qdrant_port: int = 6333,
|
||
sparql_endpoint: str = DEFAULT_SPARQL_ENDPOINT,
|
||
vector_weight: float = 0.7,
|
||
graph_weight: float = 0.3,
|
||
collection_name: str = "heritage_custodians",
|
||
embedding_model: str | None = None, # Auto-detect if None
|
||
k_vector: int = 10,
|
||
k_expand: int = 5,
|
||
k_final: int = 10,
|
||
openai_api_key: str | None = None,
|
||
use_production_qdrant: bool = False,
|
||
use_multi_embedding: bool = False,
|
||
preferred_embedding_model: str | None = None,
|
||
):
|
||
self.qdrant_host = qdrant_host
|
||
self.qdrant_port = qdrant_port
|
||
self.sparql_endpoint = sparql_endpoint
|
||
self.vector_weight = vector_weight
|
||
self.graph_weight = graph_weight
|
||
self.collection_name = collection_name
|
||
self.k_vector = k_vector
|
||
self.k_expand = k_expand
|
||
self.k_final = k_final
|
||
self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
|
||
self.use_production_qdrant = use_production_qdrant
|
||
self.use_multi_embedding = use_multi_embedding
|
||
self.preferred_embedding_model = preferred_embedding_model
|
||
|
||
# Initialize SPARQL client
|
||
self.sparql_client = SPARQLClient(endpoint=sparql_endpoint)
|
||
|
||
# Lazy-load Qdrant, OpenAI, and sentence-transformers clients
|
||
self._qdrant_client = None
|
||
self._openai_client = None
|
||
self._st_model = None
|
||
self._use_sentence_transformers = False
|
||
self._collection_vector_size: int | None = None
|
||
|
||
# Multi-embedding retriever (lazy-loaded)
|
||
self._multi_retriever = None
|
||
|
||
# Currently selected multi-embedding model (for multi-embedding mode)
|
||
self._selected_multi_model = None
|
||
|
||
# Determine embedding model to use
|
||
self.embedding_model = embedding_model or self._auto_detect_embedding_model()
|
||
|
||
logger.info(
|
||
f"Initialized HybridRetriever: "
|
||
f"Qdrant={qdrant_host}:{qdrant_port}, "
|
||
f"SPARQL={sparql_endpoint}, "
|
||
f"embedding_model={self.embedding_model}, "
|
||
f"multi_embedding={use_multi_embedding}, "
|
||
f"weights=vector:{vector_weight}/graph:{graph_weight}"
|
||
)
|
||
|
||
@property
|
||
def qdrant_client(self) -> "QdrantClient":
|
||
"""Lazy-load Qdrant client."""
|
||
if self._qdrant_client is None:
|
||
from qdrant_client import QdrantClient
|
||
|
||
if self.use_production_qdrant:
|
||
# Connect via HTTPS to production
|
||
self._qdrant_client = QdrantClient(
|
||
host="bronhouder.nl",
|
||
port=443,
|
||
https=True,
|
||
prefix="qdrant",
|
||
prefer_grpc=False,
|
||
timeout=30
|
||
)
|
||
else:
|
||
self._qdrant_client = QdrantClient(
|
||
host=self.qdrant_host,
|
||
port=self.qdrant_port
|
||
)
|
||
return self._qdrant_client
|
||
|
||
@property
|
||
def openai_client(self) -> "OpenAI":
|
||
"""Lazy-load OpenAI client."""
|
||
if self._openai_client is None:
|
||
if not self.openai_api_key:
|
||
raise RuntimeError(
|
||
"OpenAI API key not available. Set OPENAI_API_KEY or use sentence-transformers."
|
||
)
|
||
import openai
|
||
self._openai_client = openai.OpenAI(api_key=self.openai_api_key)
|
||
return self._openai_client
|
||
|
||
def _get_collection_vector_size(self) -> int | None:
|
||
"""Get the vector size of the Qdrant collection."""
|
||
try:
|
||
info = self.qdrant_client.get_collection(self.collection_name)
|
||
if hasattr(info.config.params, 'vectors'):
|
||
vectors_config = info.config.params.vectors
|
||
if isinstance(vectors_config, dict):
|
||
# Named vectors
|
||
first_config = next(iter(vectors_config.values()), None)
|
||
return first_config.size if first_config else None
|
||
elif vectors_config is not None:
|
||
# Single vector config
|
||
return vectors_config.size
|
||
return None
|
||
except Exception as e:
|
||
logger.warning(f"Could not get collection vector size: {e}")
|
||
return None
|
||
|
||
def _auto_detect_embedding_model(self) -> str:
|
||
"""Auto-detect which embedding model to use based on collection and available APIs.
|
||
|
||
Detection priority:
|
||
1. Check main collection (heritage_custodians) vector size
|
||
2. If main collection doesn't exist, check heritage_persons collection
|
||
3. If OpenAI key available and collection uses 1536-dim, use OpenAI
|
||
4. Otherwise use sentence-transformers (384-dim, all-MiniLM-L6-v2)
|
||
"""
|
||
# Check main collection vector size first
|
||
vector_size = self._get_collection_vector_size()
|
||
self._collection_vector_size = vector_size
|
||
|
||
# If main collection doesn't exist, try heritage_persons collection
|
||
if vector_size is None:
|
||
logger.info(f"Collection '{self.collection_name}' not found, checking heritage_persons")
|
||
person_vector_size = self._get_person_collection_vector_size()
|
||
if person_vector_size:
|
||
vector_size = person_vector_size
|
||
logger.info(f"Using heritage_persons collection vector size: {vector_size}")
|
||
|
||
if vector_size == 384:
|
||
# Collection uses sentence-transformers dimensions
|
||
self._use_sentence_transformers = True
|
||
logger.info("Auto-detected 384-dim vectors, using sentence-transformers")
|
||
return "all-MiniLM-L6-v2"
|
||
elif vector_size == 1536 and self.openai_api_key:
|
||
# Collection uses OpenAI dimensions and we have API key
|
||
self._use_sentence_transformers = False
|
||
logger.info("Auto-detected 1536-dim vectors with OpenAI key, using OpenAI")
|
||
return "text-embedding-3-small"
|
||
elif self.openai_api_key:
|
||
# Default to OpenAI if we have key
|
||
self._use_sentence_transformers = False
|
||
return "text-embedding-3-small"
|
||
else:
|
||
# Fallback to sentence-transformers
|
||
self._use_sentence_transformers = True
|
||
logger.info("No OpenAI key, falling back to sentence-transformers")
|
||
return "all-MiniLM-L6-v2"
|
||
|
||
def _load_sentence_transformer(self) -> "SentenceTransformer":
|
||
"""Lazy-load sentence-transformers model."""
|
||
if self._st_model is None:
|
||
try:
|
||
from sentence_transformers import SentenceTransformer
|
||
self._st_model = SentenceTransformer(self.embedding_model)
|
||
logger.info(f"Loaded sentence-transformers model: {self.embedding_model}")
|
||
except ImportError:
|
||
raise RuntimeError("sentence-transformers not installed. Run: pip install sentence-transformers")
|
||
return self._st_model
|
||
|
||
@property
|
||
def multi_retriever(self) -> "MultiEmbeddingRetriever | None":
|
||
"""Lazy-load MultiEmbeddingRetriever when multi-embedding mode is enabled.
|
||
|
||
Returns:
|
||
MultiEmbeddingRetriever instance or None if not in multi-embedding mode
|
||
"""
|
||
if not self.use_multi_embedding:
|
||
return None
|
||
|
||
if self._multi_retriever is None:
|
||
from glam_extractor.api.multi_embedding_retriever import (
|
||
MultiEmbeddingRetriever,
|
||
MultiEmbeddingConfig,
|
||
EmbeddingModel,
|
||
)
|
||
|
||
# Create config matching current settings
|
||
config = MultiEmbeddingConfig(
|
||
qdrant_host=self.qdrant_host,
|
||
qdrant_port=self.qdrant_port,
|
||
qdrant_https=self.use_production_qdrant,
|
||
qdrant_prefix="qdrant" if self.use_production_qdrant else None,
|
||
openai_api_key=self.openai_api_key,
|
||
institutions_collection=self.collection_name,
|
||
)
|
||
|
||
self._multi_retriever = MultiEmbeddingRetriever(config)
|
||
|
||
# Auto-select model if not specified
|
||
if self.preferred_embedding_model:
|
||
try:
|
||
self._selected_multi_model = EmbeddingModel(self.preferred_embedding_model)
|
||
except ValueError:
|
||
logger.warning(f"Unknown embedding model: {self.preferred_embedding_model}")
|
||
assert self._multi_retriever is not None # Set above
|
||
self._selected_multi_model = self._multi_retriever.select_model(self.collection_name)
|
||
else:
|
||
assert self._multi_retriever is not None # Set above
|
||
self._selected_multi_model = self._multi_retriever.select_model(self.collection_name)
|
||
|
||
logger.info(f"MultiEmbeddingRetriever initialized, selected model: {self._selected_multi_model}")
|
||
|
||
return self._multi_retriever
|
||
|
||
def _get_embedding(self, text: str, using: str | None = None) -> list[float]:
|
||
"""Get embedding vector for text using the appropriate model.
|
||
|
||
Args:
|
||
text: Text to embed
|
||
using: Optional embedding model name (for multi-embedding mode)
|
||
|
||
Returns:
|
||
Embedding vector as list of floats
|
||
"""
|
||
# If multi-embedding mode, delegate to MultiEmbeddingRetriever
|
||
if self.use_multi_embedding and self.multi_retriever:
|
||
from glam_extractor.api.multi_embedding_retriever import EmbeddingModel
|
||
|
||
# Determine which model to use
|
||
if using:
|
||
try:
|
||
model = EmbeddingModel(using)
|
||
except ValueError:
|
||
logger.warning(f"Unknown model '{using}', using default")
|
||
model = self._selected_multi_model
|
||
else:
|
||
model = self._selected_multi_model
|
||
|
||
if model:
|
||
return self.multi_retriever.get_embedding(text, model)
|
||
else:
|
||
# Fallback to legacy mode
|
||
logger.warning("No multi-embedding model available, falling back to legacy")
|
||
|
||
# Legacy single-model embedding
|
||
if self._use_sentence_transformers:
|
||
model = self._load_sentence_transformer()
|
||
embedding = model.encode(text)
|
||
return embedding.tolist()
|
||
else:
|
||
response = self.openai_client.embeddings.create(
|
||
input=text,
|
||
model=self.embedding_model
|
||
)
|
||
return response.data[0].embedding
|
||
|
||
def _vector_search(
|
||
self,
|
||
query: str,
|
||
k: int,
|
||
using: str | None = None,
|
||
region_codes: list[str] | None = None,
|
||
cities: list[str] | None = None,
|
||
institution_types: list[str] | None = None,
|
||
use_polygon_filter: bool = True,
|
||
) -> list[RetrievedInstitution]:
|
||
"""Perform vector similarity search in Qdrant.
|
||
|
||
Args:
|
||
query: Search query text
|
||
k: Number of results to retrieve
|
||
using: Optional embedding model name (for multi-embedding mode)
|
||
region_codes: Optional list of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
|
||
cities: Optional list of city names (e.g., ["Amsterdam", "Rotterdam"])
|
||
institution_types: Optional list of institution types (e.g., ["ARCHIVE", "MUSEUM"])
|
||
use_polygon_filter: If True, apply polygon-based geographic filtering
|
||
using actual province boundaries (default: True)
|
||
|
||
Returns:
|
||
List of RetrievedInstitution with vector scores
|
||
"""
|
||
query_vector = self._get_embedding(query, using=using)
|
||
|
||
# When polygon filtering is enabled and regions are specified,
|
||
# over-fetch to ensure we have enough results after polygon filtering
|
||
effective_limit = k
|
||
if use_polygon_filter and region_codes:
|
||
effective_limit = k * 3 # Over-fetch 3x for polygon filtering
|
||
logger.debug(f"Over-fetching {effective_limit} results for polygon filtering")
|
||
|
||
# Build query parameters
|
||
search_params = {
|
||
"collection_name": self.collection_name,
|
||
"query": query_vector,
|
||
"limit": effective_limit,
|
||
"with_payload": True,
|
||
}
|
||
|
||
# Build geographic/type filter if any criteria provided
|
||
# NOTE: Always apply region metadata filter to Qdrant first to get relevant results.
|
||
# The polygon filter (if enabled) is an additional precision filter applied afterward.
|
||
# Previously we disabled metadata region filter when polygon filter was enabled,
|
||
# but this caused vector search to return results from wrong regions.
|
||
if region_codes or cities or institution_types:
|
||
from glam_extractor.ontology.qdrant_filters import QdrantFilterBuilder
|
||
|
||
# Convert institution types from full names (LIBRARY, MUSEUM) to single-letter codes (L, M)
|
||
# because Qdrant stores institution_type as single-letter codes per GLAMORCUBESFIXPHDNT
|
||
type_codes = None
|
||
if institution_types:
|
||
type_mapping = get_custodian_type_to_heritage_code()
|
||
type_codes = [type_mapping.get(t, t) for t in institution_types]
|
||
# Filter out any that didn't map (keep original if 1 char already)
|
||
type_codes = [c for c in type_codes if c and len(c) == 1]
|
||
logger.debug(f"Converted institution types: {institution_types} -> {type_codes}")
|
||
|
||
builder = QdrantFilterBuilder()
|
||
filter_dict = builder.combined_filter(
|
||
primary_types=type_codes, # Use single-letter codes
|
||
region_codes=region_codes, # Always apply region filter to get relevant results
|
||
cities=cities,
|
||
combine_mode="must",
|
||
)
|
||
if filter_dict:
|
||
query_filter = QdrantFilterBuilder.to_qdrant_models(filter_dict)
|
||
search_params["query_filter"] = query_filter
|
||
logger.info(
|
||
f"Applied Qdrant filter: types={type_codes}, "
|
||
f"regions={region_codes}, cities={cities}"
|
||
)
|
||
|
||
# Add named vector 'using' ONLY if collection actually has named vectors
|
||
# Single-vector collections will error with "Not existing vector name" otherwise
|
||
if self.use_multi_embedding and self.multi_retriever:
|
||
uses_named = self.multi_retriever.uses_named_vectors(self.collection_name)
|
||
if uses_named:
|
||
if using:
|
||
search_params["using"] = using
|
||
elif self._selected_multi_model:
|
||
search_params["using"] = self._selected_multi_model.value
|
||
# else: single-vector collection, don't add 'using' parameter
|
||
|
||
results = self.qdrant_client.query_points(**search_params)
|
||
|
||
institutions = []
|
||
for point in results.points:
|
||
payload = point.payload or {}
|
||
|
||
inst = RetrievedInstitution(
|
||
ghcid=payload.get("ghcid", ""),
|
||
name=payload.get("name", ""),
|
||
uri=payload.get("uri", f"https://nde.nl/ontology/hc/custodian/{payload.get('ghcid', '')}"),
|
||
vector_score=point.score,
|
||
institution_type=payload.get("institution_type"),
|
||
country=payload.get("country"),
|
||
city=payload.get("city"),
|
||
description=payload.get("text", "")[:200] if payload.get("text") else None,
|
||
latitude=payload.get("latitude"),
|
||
longitude=payload.get("longitude"),
|
||
)
|
||
institutions.append(inst)
|
||
|
||
# Apply polygon-based geographic filtering if enabled and regions specified
|
||
if use_polygon_filter and region_codes and institutions:
|
||
institutions = self._apply_polygon_filter(institutions, region_codes, k)
|
||
|
||
return institutions
|
||
|
||
def _apply_polygon_filter(
|
||
self,
|
||
institutions: list[RetrievedInstitution],
|
||
region_codes: list[str],
|
||
k: int,
|
||
) -> list[RetrievedInstitution]:
|
||
"""Filter institutions by polygon containment in specified regions.
|
||
|
||
Uses actual province boundary polygons to ensure results are
|
||
geographically within the requested regions, not just metadata matching.
|
||
|
||
Args:
|
||
institutions: List of retrieved institutions with lat/lon
|
||
region_codes: List of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
|
||
k: Maximum number of results to return
|
||
|
||
Returns:
|
||
Filtered list of institutions within the specified regions
|
||
"""
|
||
polygon_filter = get_polygon_filter()
|
||
|
||
# Handle case where polygon filter module is not available or not loaded
|
||
if polygon_filter is None:
|
||
logger.warning("Polygon filter not available, skipping geographic filtering")
|
||
return institutions[:k]
|
||
|
||
if not polygon_filter.is_loaded:
|
||
logger.warning("Polygon filter not loaded, skipping geographic filtering")
|
||
return institutions[:k]
|
||
|
||
filtered = []
|
||
for inst in institutions:
|
||
if inst.latitude is None or inst.longitude is None:
|
||
# No coordinates, check if metadata region matches
|
||
if inst.country == "NL":
|
||
# For Dutch institutions without coords, fallback to metadata
|
||
# Extract region from GHCID (format: NL-{REGION}-...)
|
||
if inst.ghcid and len(inst.ghcid) > 3:
|
||
ghcid_region = inst.ghcid.split("-")[1] if "-" in inst.ghcid else None
|
||
if ghcid_region and ghcid_region.upper() in [r.upper() for r in region_codes]:
|
||
filtered.append(inst)
|
||
continue
|
||
|
||
# Check if point is within any of the requested regions
|
||
for region_code in region_codes:
|
||
if polygon_filter.point_in_province(inst.latitude, inst.longitude, region_code):
|
||
filtered.append(inst)
|
||
break # Don't add same institution multiple times
|
||
|
||
logger.info(
|
||
f"Polygon filter: {len(filtered)}/{len(institutions)} institutions "
|
||
f"in regions {region_codes}"
|
||
)
|
||
|
||
# Return up to k results
|
||
return filtered[:k]
|
||
|
||
def _build_batched_expansion_query(
|
||
self,
|
||
seed_institutions: list[RetrievedInstitution],
|
||
exclude_ghcids: set[str],
|
||
limit_per_expansion: int = 5
|
||
) -> tuple[str, dict[str, dict]]:
|
||
"""Build a single SPARQL query with UNION clauses for all expansions.
|
||
|
||
DEDUPLICATES by city code and type+country to avoid redundant query patterns.
|
||
For example, if 5 seeds are all from Amsterdam with type MUSEUM, we only
|
||
create ONE city expansion (for AMS) and ONE type expansion (for NL + M),
|
||
not 10 redundant UNIONs.
|
||
|
||
Args:
|
||
seed_institutions: Seed institutions to expand from
|
||
exclude_ghcids: GHCIDs to exclude from results
|
||
limit_per_expansion: Max results per expansion type
|
||
|
||
Returns:
|
||
Tuple of (SPARQL query string, expansion_metadata dict)
|
||
expansion_metadata maps expansion_key -> {seed, type, city/type_code}
|
||
"""
|
||
unions = []
|
||
expansion_metadata = {}
|
||
|
||
# Track unique patterns to avoid duplicate queries
|
||
seen_city_codes: set[str] = set()
|
||
seen_type_patterns: set[str] = set() # "country-type_code" pattern
|
||
|
||
seeds_to_expand = seed_institutions[:5]
|
||
city_idx = 0
|
||
type_idx = 0
|
||
|
||
for seed in seeds_to_expand:
|
||
# City expansion - deduplicate by city code
|
||
if seed.city:
|
||
city_code = seed.city[:3].upper()
|
||
if city_code not in seen_city_codes:
|
||
seen_city_codes.add(city_code)
|
||
expansion_key = f"city_{city_idx}"
|
||
city_idx += 1
|
||
unions.append(f"""
|
||
{{
|
||
SELECT ?s ?name ?ghcid ?type ("{expansion_key}" AS ?expansion_key) WHERE {{
|
||
?s a hcc:Custodian ;
|
||
skos:prefLabel ?name ;
|
||
hc:ghcid ?ghcid .
|
||
FILTER(CONTAINS(?ghcid, "-{city_code}-"))
|
||
OPTIONAL {{ ?s hc:institutionType ?type }}
|
||
}}
|
||
LIMIT {limit_per_expansion + len(exclude_ghcids)}
|
||
}}
|
||
""")
|
||
expansion_metadata[expansion_key] = {
|
||
"seed": seed,
|
||
"type": "city",
|
||
"city": seed.city,
|
||
"city_code": city_code
|
||
}
|
||
|
||
# Type expansion - deduplicate by country + type_code pattern
|
||
if seed.institution_type and seed.country:
|
||
type_code = get_custodian_type_to_heritage_code().get(seed.institution_type, "")
|
||
if type_code:
|
||
pattern_key = f"{seed.country}-{type_code}"
|
||
if pattern_key not in seen_type_patterns:
|
||
seen_type_patterns.add(pattern_key)
|
||
expansion_key = f"type_{type_idx}"
|
||
type_idx += 1
|
||
unions.append(f"""
|
||
{{
|
||
SELECT ?s ?name ?ghcid ?city ("{expansion_key}" AS ?expansion_key) WHERE {{
|
||
?s a hcc:Custodian ;
|
||
skos:prefLabel ?name ;
|
||
hc:ghcid ?ghcid .
|
||
FILTER(STRSTARTS(?ghcid, "{seed.country}-"))
|
||
FILTER(CONTAINS(?ghcid, "-{type_code}-"))
|
||
OPTIONAL {{ ?s schema:location ?city }}
|
||
}}
|
||
LIMIT {limit_per_expansion + len(exclude_ghcids)}
|
||
}}
|
||
""")
|
||
expansion_metadata[expansion_key] = {
|
||
"seed": seed,
|
||
"type": "type",
|
||
"institution_type": seed.institution_type,
|
||
"type_code": type_code,
|
||
"country": seed.country
|
||
}
|
||
|
||
if not unions:
|
||
return "", {}
|
||
|
||
# Log deduplication stats
|
||
logger.info(f"Batched SPARQL: {len(unions)} UNIONs (deduplicated from max {len(seeds_to_expand) * 2}). "
|
||
f"Unique cities: {seen_city_codes}, Unique types: {seen_type_patterns}")
|
||
|
||
# Combine all unions into a single query
|
||
query = f"""
|
||
SELECT ?s ?name ?ghcid ?type ?city ?expansion_key WHERE {{
|
||
{" UNION ".join(unions)}
|
||
}}
|
||
"""
|
||
|
||
return query, expansion_metadata
|
||
|
||
def _graph_expand_batched(
|
||
self,
|
||
seed_institutions: list[RetrievedInstitution]
|
||
) -> list[RetrievedInstitution]:
|
||
"""Expand seed results using a SINGLE batched SPARQL query.
|
||
|
||
This is a significant optimization over the parallel ThreadPoolExecutor
|
||
approach. Instead of 10 HTTP requests (even in parallel), we execute
|
||
ONE SPARQL query with UNION clauses.
|
||
|
||
Performance comparison:
|
||
- Sequential: 10 queries × ~100ms = 4+ seconds
|
||
- Parallel (ThreadPool): ~500ms-1s (limited by GIL/connection pool)
|
||
- Batched (this method): ONE query ~150-300ms
|
||
|
||
Args:
|
||
seed_institutions: Initial vector search results
|
||
|
||
Returns:
|
||
Additional institutions found via graph expansion
|
||
"""
|
||
start_time = time.time()
|
||
exclude_ghcids = {inst.ghcid for inst in seed_institutions}
|
||
expanded = []
|
||
seen_ghcids = set(exclude_ghcids)
|
||
|
||
# Build batched query
|
||
query, expansion_metadata = self._build_batched_expansion_query(
|
||
seed_institutions, exclude_ghcids, limit_per_expansion=self.k_expand
|
||
)
|
||
|
||
if not query:
|
||
logger.debug("No graph expansion tasks to execute")
|
||
return expanded
|
||
|
||
# Execute single batched query
|
||
query_start = time.time()
|
||
results = self.sparql_client.query(query)
|
||
query_duration = (time.time() - query_start) * 1000
|
||
|
||
logger.debug(f"Batched SPARQL query: {len(results)} raw results in {query_duration:.0f}ms")
|
||
|
||
# Group results by expansion_key
|
||
results_by_expansion: dict[str, list[dict]] = {}
|
||
for row in results:
|
||
exp_key = row.get("expansion_key", "")
|
||
if exp_key:
|
||
if exp_key not in results_by_expansion:
|
||
results_by_expansion[exp_key] = []
|
||
results_by_expansion[exp_key].append(row)
|
||
|
||
# Process results, filtering and creating RetrievedInstitution objects
|
||
for exp_key, rows in results_by_expansion.items():
|
||
if exp_key not in expansion_metadata:
|
||
continue
|
||
|
||
meta = expansion_metadata[exp_key]
|
||
seed = meta["seed"]
|
||
exp_type = meta["type"]
|
||
|
||
count = 0
|
||
for row in rows:
|
||
ghcid = row.get("ghcid", "")
|
||
if not ghcid or ghcid in seen_ghcids:
|
||
continue
|
||
|
||
if count >= self.k_expand:
|
||
break
|
||
|
||
seen_ghcids.add(ghcid)
|
||
count += 1
|
||
|
||
if exp_type == "city":
|
||
expanded.append(RetrievedInstitution(
|
||
ghcid=ghcid,
|
||
name=row.get("name", ""),
|
||
uri=row.get("s", ""),
|
||
graph_score=0.8, # High score for same city
|
||
institution_type=row.get("type"),
|
||
expansion_reason="same_city",
|
||
related_institutions=[seed.ghcid]
|
||
))
|
||
elif exp_type == "type":
|
||
expanded.append(RetrievedInstitution(
|
||
ghcid=ghcid,
|
||
name=row.get("name", ""),
|
||
uri=row.get("s", ""),
|
||
graph_score=0.5, # Medium score for same type
|
||
institution_type=seed.institution_type,
|
||
city=row.get("city"),
|
||
expansion_reason="same_type",
|
||
related_institutions=[seed.ghcid]
|
||
))
|
||
|
||
logger.debug(f"Expansion {exp_key}: {count} results for {seed.ghcid}")
|
||
|
||
total_time = (time.time() - start_time) * 1000
|
||
logger.info(f"Graph expansion (batched): 1 query, {len(results)} raw results, "
|
||
f"{len(expanded)} expanded in {total_time:.0f}ms")
|
||
|
||
return expanded
|
||
|
||
def _expand_by_city(self, city: str, exclude_ghcids: set[str], limit: int = 5) -> list[dict]:
|
||
"""Find other institutions in the same city via SPARQL.
|
||
|
||
Note: This method is kept for backwards compatibility and direct calls.
|
||
For batch operations, use _graph_expand_batched() instead.
|
||
|
||
Args:
|
||
city: City name to search for
|
||
exclude_ghcids: GHCIDs to exclude from results
|
||
limit: Maximum number of results
|
||
|
||
Returns:
|
||
List of institution data dicts
|
||
"""
|
||
if not city:
|
||
return []
|
||
|
||
query = f"""
|
||
SELECT ?s ?name ?ghcid ?type WHERE {{
|
||
?s a hcc:Custodian ;
|
||
skos:prefLabel ?name ;
|
||
hc:ghcid ?ghcid .
|
||
|
||
# Match city in GHCID (format: CC-RR-CCC-T-ABBR)
|
||
FILTER(CONTAINS(?ghcid, "-{city[:3].upper()}-"))
|
||
|
||
OPTIONAL {{ ?s hc:institutionType ?type }}
|
||
}}
|
||
LIMIT {limit + len(exclude_ghcids)}
|
||
"""
|
||
|
||
results = self.sparql_client.query(query)
|
||
|
||
# Filter out excluded GHCIDs
|
||
filtered = []
|
||
for row in results:
|
||
ghcid = row.get("ghcid", "")
|
||
if ghcid not in exclude_ghcids:
|
||
filtered.append(row)
|
||
if len(filtered) >= limit:
|
||
break
|
||
|
||
return filtered
|
||
|
||
def _expand_by_type(self, institution_type: str, country: str, exclude_ghcids: set[str], limit: int = 5) -> list[dict]:
|
||
"""Find other institutions of the same type in the same country.
|
||
|
||
Args:
|
||
institution_type: Institution type (MUSEUM, LIBRARY, etc.)
|
||
country: Country code (ISO 3166-1 alpha-2)
|
||
exclude_ghcids: GHCIDs to exclude
|
||
limit: Maximum number of results
|
||
|
||
Returns:
|
||
List of institution data dicts
|
||
"""
|
||
if not institution_type:
|
||
return []
|
||
|
||
# Map institution type to GHCID type code using dynamic schema mapping
|
||
type_code = get_custodian_type_to_heritage_code().get(institution_type, "")
|
||
|
||
if not type_code or not country:
|
||
return []
|
||
|
||
query = f"""
|
||
SELECT ?s ?name ?ghcid ?city WHERE {{
|
||
?s a hcc:Custodian ;
|
||
skos:prefLabel ?name ;
|
||
hc:ghcid ?ghcid .
|
||
|
||
# Match country and type in GHCID
|
||
FILTER(STRSTARTS(?ghcid, "{country}-"))
|
||
FILTER(CONTAINS(?ghcid, "-{type_code}-"))
|
||
|
||
OPTIONAL {{ ?s schema:location ?city }}
|
||
}}
|
||
LIMIT {limit + len(exclude_ghcids)}
|
||
"""
|
||
|
||
results = self.sparql_client.query(query)
|
||
|
||
filtered = []
|
||
for row in results:
|
||
ghcid = row.get("ghcid", "")
|
||
if ghcid not in exclude_ghcids:
|
||
filtered.append(row)
|
||
if len(filtered) >= limit:
|
||
break
|
||
|
||
return filtered
|
||
|
||
def _expand_by_wikidata_country(self, wikidata_country: str, exclude_ghcids: set[str], limit: int = 5) -> list[dict]:
|
||
"""Find institutions in the same country using Wikidata P17 property.
|
||
|
||
Args:
|
||
wikidata_country: Wikidata entity ID for country (e.g., Q55 for Netherlands)
|
||
exclude_ghcids: GHCIDs to exclude
|
||
limit: Maximum number of results
|
||
|
||
Returns:
|
||
List of institution data dicts
|
||
"""
|
||
if not wikidata_country:
|
||
return []
|
||
|
||
query = f"""
|
||
SELECT ?s ?name ?ghcid ?type WHERE {{
|
||
?s a hcc:Custodian ;
|
||
skos:prefLabel ?name ;
|
||
hc:ghcid ?ghcid ;
|
||
wdt:P17 wd:{wikidata_country} .
|
||
|
||
OPTIONAL {{ ?s hc:institutionType ?type }}
|
||
}}
|
||
LIMIT {limit + len(exclude_ghcids)}
|
||
"""
|
||
|
||
results = self.sparql_client.query(query)
|
||
|
||
filtered = []
|
||
for row in results:
|
||
ghcid = row.get("ghcid", "")
|
||
if ghcid not in exclude_ghcids:
|
||
filtered.append(row)
|
||
if len(filtered) >= limit:
|
||
break
|
||
|
||
return filtered
|
||
|
||
def _graph_expand(
|
||
self,
|
||
seed_institutions: list[RetrievedInstitution],
|
||
use_batched: bool = True
|
||
) -> list[RetrievedInstitution]:
|
||
"""Expand seed results using knowledge graph relationships.
|
||
|
||
By default uses batched SPARQL (single query with UNION) for best performance.
|
||
Falls back to parallel ThreadPoolExecutor if batched fails.
|
||
|
||
Performance comparison:
|
||
- Sequential: 10 queries × ~100ms = 4+ seconds
|
||
- Parallel (ThreadPool): ~500ms-3s (limited by GIL/connection pool)
|
||
- Batched (UNION query): ONE query ~150-300ms ← DEFAULT
|
||
|
||
Args:
|
||
seed_institutions: Initial vector search results
|
||
use_batched: If True (default), use batched SPARQL query.
|
||
If False, use parallel ThreadPoolExecutor.
|
||
|
||
Returns:
|
||
Additional institutions found via graph expansion
|
||
"""
|
||
if use_batched:
|
||
try:
|
||
return self._graph_expand_batched(seed_institutions)
|
||
except Exception as e:
|
||
logger.warning(f"Batched graph expansion failed, falling back to parallel: {e}")
|
||
# Fall through to parallel implementation
|
||
|
||
return self._graph_expand_parallel(seed_institutions)
|
||
|
||
def _graph_expand_parallel(
|
||
self,
|
||
seed_institutions: list[RetrievedInstitution]
|
||
) -> list[RetrievedInstitution]:
|
||
"""Expand seed results using parallel SPARQL queries (fallback method).
|
||
|
||
Uses ThreadPoolExecutor to parallelize SPARQL queries. This is slower than
|
||
the batched approach but serves as a fallback.
|
||
|
||
Args:
|
||
seed_institutions: Initial vector search results
|
||
|
||
Returns:
|
||
Additional institutions found via graph expansion
|
||
"""
|
||
start_time = time.time()
|
||
exclude_ghcids = {inst.ghcid for inst in seed_institutions}
|
||
expanded = []
|
||
seen_ghcids = set(exclude_ghcids)
|
||
|
||
# Prepare all expansion tasks
|
||
# Each task is a tuple: (task_type, seed, query_params)
|
||
tasks = []
|
||
seeds_to_expand = seed_institutions[:5] # Expand top 5 seeds
|
||
|
||
for seed in seeds_to_expand:
|
||
# City expansion task
|
||
if seed.city:
|
||
tasks.append(("city", seed, {"city": seed.city}))
|
||
|
||
# Type expansion task
|
||
if seed.institution_type and seed.country:
|
||
tasks.append(("type", seed, {
|
||
"institution_type": seed.institution_type,
|
||
"country": seed.country
|
||
}))
|
||
|
||
if not tasks:
|
||
logger.debug("No graph expansion tasks to execute")
|
||
return expanded
|
||
|
||
# Execute SPARQL queries in parallel
|
||
# Use min(10, len(tasks)) workers to avoid over-parallelization
|
||
max_workers = min(10, len(tasks))
|
||
|
||
def execute_expansion(task):
|
||
"""Execute a single expansion task and return results with metadata."""
|
||
task_type, seed, params = task
|
||
task_start = time.time()
|
||
|
||
try:
|
||
if task_type == "city":
|
||
results = self._expand_by_city(
|
||
params["city"], exclude_ghcids, limit=self.k_expand
|
||
)
|
||
return {
|
||
"task_type": task_type,
|
||
"seed": seed,
|
||
"results": results,
|
||
"duration_ms": (time.time() - task_start) * 1000
|
||
}
|
||
elif task_type == "type":
|
||
results = self._expand_by_type(
|
||
params["institution_type"],
|
||
params["country"],
|
||
exclude_ghcids,
|
||
limit=self.k_expand
|
||
)
|
||
return {
|
||
"task_type": task_type,
|
||
"seed": seed,
|
||
"results": results,
|
||
"duration_ms": (time.time() - task_start) * 1000
|
||
}
|
||
except Exception as e:
|
||
logger.warning(f"Graph expansion task failed: {task_type} for {seed.ghcid}: {e}")
|
||
return {
|
||
"task_type": task_type,
|
||
"seed": seed,
|
||
"results": [],
|
||
"duration_ms": (time.time() - task_start) * 1000,
|
||
"error": str(e)
|
||
}
|
||
|
||
# Run all tasks in parallel
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
futures = {executor.submit(execute_expansion, task): task for task in tasks}
|
||
|
||
for future in as_completed(futures):
|
||
result = future.result()
|
||
if result is None:
|
||
continue
|
||
|
||
task_type = result["task_type"]
|
||
seed = result["seed"]
|
||
rows = result["results"]
|
||
duration = result.get("duration_ms", 0)
|
||
|
||
logger.debug(f"Graph expansion {task_type} for {seed.ghcid}: "
|
||
f"{len(rows)} results in {duration:.0f}ms")
|
||
|
||
# Process results based on task type
|
||
if task_type == "city":
|
||
for row in rows:
|
||
ghcid = row.get("ghcid", "")
|
||
if ghcid and ghcid not in seen_ghcids:
|
||
seen_ghcids.add(ghcid)
|
||
expanded.append(RetrievedInstitution(
|
||
ghcid=ghcid,
|
||
name=row.get("name", ""),
|
||
uri=row.get("s", ""),
|
||
graph_score=0.8, # High score for same city
|
||
institution_type=row.get("type"),
|
||
expansion_reason="same_city",
|
||
related_institutions=[seed.ghcid]
|
||
))
|
||
elif task_type == "type":
|
||
for row in rows:
|
||
ghcid = row.get("ghcid", "")
|
||
if ghcid and ghcid not in seen_ghcids:
|
||
seen_ghcids.add(ghcid)
|
||
expanded.append(RetrievedInstitution(
|
||
ghcid=ghcid,
|
||
name=row.get("name", ""),
|
||
uri=row.get("s", ""),
|
||
graph_score=0.5, # Medium score for same type
|
||
institution_type=seed.institution_type,
|
||
city=row.get("city"),
|
||
expansion_reason="same_type",
|
||
related_institutions=[seed.ghcid]
|
||
))
|
||
|
||
total_time = (time.time() - start_time) * 1000
|
||
logger.info(f"Graph expansion completed: {len(tasks)} queries, "
|
||
f"{len(expanded)} results in {total_time:.0f}ms (parallel)")
|
||
|
||
return expanded
|
||
|
||
def _combine_and_rank(
|
||
self,
|
||
vector_results: list[RetrievedInstitution],
|
||
graph_results: list[RetrievedInstitution],
|
||
k: int
|
||
) -> list[RetrievedInstitution]:
|
||
"""Combine vector and graph results with weighted scoring and graph inheritance.
|
||
|
||
This method implements a hybrid scoring approach:
|
||
1. Direct merge: If a graph result matches a vector result (same GHCID),
|
||
the graph_score is directly applied
|
||
2. Graph inheritance: Vector results inherit a portion of graph scores from
|
||
related institutions found via graph expansion (same city/type)
|
||
|
||
Args:
|
||
vector_results: Results from vector search
|
||
graph_results: Results from graph expansion
|
||
k: Number of final results
|
||
|
||
Returns:
|
||
Combined and ranked results
|
||
"""
|
||
# Debug logging for investigation
|
||
logger.debug(f"Combining {len(vector_results)} vector + {len(graph_results)} graph results")
|
||
|
||
# Create lookup by GHCID for merging
|
||
results_by_ghcid: dict[str, RetrievedInstitution] = {}
|
||
|
||
# Track which vector GHCIDs we have for inheritance
|
||
vector_ghcids = set()
|
||
|
||
# Add vector results
|
||
for inst in vector_results:
|
||
if inst.ghcid:
|
||
results_by_ghcid[inst.ghcid] = inst
|
||
vector_ghcids.add(inst.ghcid)
|
||
logger.debug(f" Vector: {inst.ghcid} ({inst.name[:30] if inst.name else '?'}...) "
|
||
f"v={inst.vector_score:.3f} g={inst.graph_score:.3f}")
|
||
|
||
# Track direct merges and inheritance candidates
|
||
direct_merges = 0
|
||
inheritance_boosts = []
|
||
|
||
# Merge graph results and build inheritance map
|
||
# inheritance_map: vector_ghcid -> list of (related_ghcid, graph_score, reason)
|
||
inheritance_map: dict[str, list[tuple[str, float, str]]] = {g: [] for g in vector_ghcids}
|
||
|
||
for inst in graph_results:
|
||
logger.debug(f" Graph: {inst.ghcid} ({inst.name[:30] if inst.name else '?'}...) "
|
||
f"g={inst.graph_score:.3f} reason={inst.expansion_reason} "
|
||
f"related_to={inst.related_institutions}")
|
||
|
||
if inst.ghcid in results_by_ghcid:
|
||
# Direct merge: graph result matches existing vector result
|
||
existing = results_by_ghcid[inst.ghcid]
|
||
old_graph_score = existing.graph_score
|
||
existing.graph_score = max(existing.graph_score, inst.graph_score)
|
||
existing.related_institutions.extend(inst.related_institutions)
|
||
if inst.expansion_reason:
|
||
existing.expansion_reason = inst.expansion_reason
|
||
direct_merges += 1
|
||
logger.debug(f" -> Direct merge! {inst.ghcid} graph_score: {old_graph_score:.3f} -> {existing.graph_score:.3f}")
|
||
else:
|
||
# New institution from graph expansion
|
||
results_by_ghcid[inst.ghcid] = inst
|
||
|
||
# Build inheritance: this graph result was expanded FROM a vector result
|
||
# The related_institutions field contains the seed GHCID(s) it was expanded from
|
||
for seed_ghcid in inst.related_institutions:
|
||
if seed_ghcid in inheritance_map:
|
||
inheritance_map[seed_ghcid].append(
|
||
(inst.ghcid, inst.graph_score, inst.expansion_reason or "related")
|
||
)
|
||
|
||
logger.debug(f"Direct merges: {direct_merges}")
|
||
|
||
# Apply graph score inheritance to vector results
|
||
# Vector results inherit a portion of graph scores from their related institutions
|
||
INHERITANCE_FACTOR = 0.5 # Inherit 50% of related institutions' graph scores
|
||
|
||
for vector_ghcid, related_list in inheritance_map.items():
|
||
if related_list and vector_ghcid in results_by_ghcid:
|
||
inst = results_by_ghcid[vector_ghcid]
|
||
|
||
# Calculate inherited score: average of related graph scores * inheritance factor
|
||
related_scores = [score for _, score, _ in related_list]
|
||
inherited_score = (sum(related_scores) / len(related_scores)) * INHERITANCE_FACTOR
|
||
|
||
old_graph_score = inst.graph_score
|
||
# Inherit: take max of current graph_score and inherited score
|
||
inst.graph_score = max(inst.graph_score, inherited_score)
|
||
|
||
if inst.graph_score > old_graph_score:
|
||
# Track related institutions for context
|
||
related_ghcids = [ghcid for ghcid, _, _ in related_list]
|
||
inst.related_institutions.extend(related_ghcids[:3]) # Add up to 3 related
|
||
|
||
inheritance_boosts.append({
|
||
"ghcid": vector_ghcid,
|
||
"name": inst.name,
|
||
"old_graph": old_graph_score,
|
||
"new_graph": inst.graph_score,
|
||
"inherited_from": len(related_list),
|
||
"reasons": list(set(r for _, _, r in related_list))
|
||
})
|
||
logger.debug(f" Inheritance: {vector_ghcid} graph_score: {old_graph_score:.3f} -> "
|
||
f"{inst.graph_score:.3f} (from {len(related_list)} related institutions)")
|
||
|
||
if inheritance_boosts:
|
||
logger.info(f"Graph inheritance applied to {len(inheritance_boosts)} vector results: "
|
||
f"{[b['ghcid'] for b in inheritance_boosts[:3]]}...")
|
||
|
||
# Calculate combined scores
|
||
for inst in results_by_ghcid.values():
|
||
inst.combined_score = (
|
||
self.vector_weight * inst.vector_score +
|
||
self.graph_weight * inst.graph_score
|
||
)
|
||
|
||
# Sort by combined score
|
||
ranked = sorted(
|
||
results_by_ghcid.values(),
|
||
key=lambda x: x.combined_score,
|
||
reverse=True
|
||
)
|
||
|
||
# Log top results for debugging
|
||
logger.debug(f"Top {min(5, len(ranked))} combined results:")
|
||
for i, inst in enumerate(ranked[:5]):
|
||
logger.debug(f" {i+1}. {inst.ghcid} ({inst.name[:25] if inst.name else '?'}...) "
|
||
f"combined={inst.combined_score:.3f} (v={inst.vector_score:.3f}, g={inst.graph_score:.3f})")
|
||
|
||
return ranked[:k]
|
||
|
||
def _get_person_collection_vector_size(self) -> int | None:
|
||
"""Get the vector size of the person collection."""
|
||
try:
|
||
info = self.qdrant_client.get_collection("heritage_persons")
|
||
if hasattr(info.config.params, 'vectors'):
|
||
vectors_config = info.config.params.vectors
|
||
if isinstance(vectors_config, dict):
|
||
first_config = next(iter(vectors_config.values()), None)
|
||
return first_config.size if first_config else None
|
||
elif vectors_config is not None:
|
||
return vectors_config.size # type: ignore[union-attr]
|
||
return None
|
||
except Exception as e:
|
||
logger.warning(f"Could not get person collection vector size: {e}")
|
||
return None
|
||
|
||
def _person_vector_search(
|
||
self,
|
||
query: str,
|
||
k: int,
|
||
using: str | None = None,
|
||
filter_conditions: dict[str, Any] | None = None,
|
||
) -> list[RetrievedPerson]:
|
||
"""Perform vector similarity search in Qdrant heritage_persons collection.
|
||
|
||
Args:
|
||
query: Search query text
|
||
k: Number of results to retrieve
|
||
using: Optional embedding model name (for multi-embedding mode)
|
||
filter_conditions: Optional dict of field->value filters for Qdrant
|
||
|
||
Returns:
|
||
List of RetrievedPerson with vector scores
|
||
"""
|
||
from qdrant_client.http import models
|
||
|
||
# Check person collection vector size and use appropriate model
|
||
person_vector_size = self._get_person_collection_vector_size()
|
||
person_model = using
|
||
|
||
if person_vector_size == 384 and not using:
|
||
# Person collection uses MiniLM (384-dim), override model selection
|
||
person_model = "minilm_384"
|
||
logger.info(f"Person collection uses 384-dim vectors, using MiniLM model")
|
||
elif person_vector_size == 1536 and not using:
|
||
person_model = "openai_1536"
|
||
elif person_vector_size == 768 and not using:
|
||
person_model = "bge_768"
|
||
|
||
query_vector = self._get_embedding(query, using=person_model)
|
||
|
||
try:
|
||
# Build query parameters
|
||
search_params: dict[str, Any] = {
|
||
"collection_name": "heritage_persons",
|
||
"query": query_vector,
|
||
"limit": k,
|
||
"with_payload": True,
|
||
}
|
||
|
||
# Add named vector 'using' ONLY if collection actually has named vectors
|
||
# Single-vector collections will error with "Not existing vector name" otherwise
|
||
if self.use_multi_embedding and self.multi_retriever:
|
||
uses_named = self.multi_retriever.uses_named_vectors("heritage_persons")
|
||
if uses_named:
|
||
if using:
|
||
search_params["using"] = using
|
||
elif self._selected_multi_model:
|
||
search_params["using"] = self._selected_multi_model.value
|
||
# else: single-vector collection, don't add 'using' parameter
|
||
|
||
# Add schema-aware filters if provided
|
||
if filter_conditions:
|
||
filter_list = []
|
||
for key, value in filter_conditions.items():
|
||
# Handle advanced match filters (e.g. {"email": {"match": {"text": "nos"}}})
|
||
if isinstance(value, dict) and "match" in value:
|
||
filter_list.append(
|
||
models.FieldCondition(
|
||
key=key,
|
||
match=models.MatchText(**value["match"])
|
||
)
|
||
)
|
||
else:
|
||
# Standard exact match value
|
||
filter_list.append(
|
||
models.FieldCondition(
|
||
key=key,
|
||
match=models.MatchValue(value=value),
|
||
)
|
||
)
|
||
|
||
search_params["query_filter"] = models.Filter(must=filter_list)
|
||
logger.info(f"[Qdrant] Applied person filters: {filter_conditions}")
|
||
|
||
logger.info(f"[Qdrant] Searching '{search_params['collection_name']}' with params: query_filter={filter_conditions}, limit={k}")
|
||
|
||
results = self.qdrant_client.query_points(**search_params)
|
||
except Exception as e:
|
||
logger.warning(f"Person collection search failed: {e}")
|
||
return []
|
||
|
||
persons = []
|
||
for point in results.points:
|
||
payload = point.payload or {}
|
||
|
||
# Extract richness score from payload (indexed by index_persons_qdrant.py)
|
||
richness_score = payload.get("richness_score", 0.0)
|
||
|
||
person = RetrievedPerson(
|
||
person_id=payload.get("staff_id", "") or hashlib.md5(
|
||
f"{payload.get('custodian_slug', '')}:{payload.get('name', '')}".encode()
|
||
).hexdigest()[:16],
|
||
name=payload.get("name", ""),
|
||
vector_score=point.score,
|
||
richness_score=richness_score,
|
||
headline=payload.get("headline"),
|
||
custodian_name=payload.get("custodian_name"),
|
||
custodian_slug=payload.get("custodian_slug"),
|
||
location=payload.get("location"),
|
||
heritage_relevant=payload.get("heritage_relevant", False),
|
||
heritage_type=payload.get("heritage_type"),
|
||
source_type=payload.get("source_type"),
|
||
linkedin_url=payload.get("linkedin_url"),
|
||
has_wcms=payload.get("has_wcms", False),
|
||
# WCMS-specific fields
|
||
wcms_user_id=payload.get("wcms_user_id"),
|
||
wcms_abs_id=payload.get("wcms_abs_id"),
|
||
wcms_crm_id=payload.get("wcms_crm_id"),
|
||
wcms_username=payload.get("wcms_username"),
|
||
wcms_username_url=payload.get("wcms_username_url"),
|
||
wcms_status=payload.get("wcms_status"),
|
||
wcms_roles=payload.get("wcms_roles"),
|
||
wcms_registered_since=payload.get("wcms_registered_since"),
|
||
wcms_last_access=payload.get("wcms_last_access"),
|
||
# Contact details
|
||
email=payload.get("email"),
|
||
email_domain=payload.get("email_domain"),
|
||
)
|
||
|
||
# Apply richness score boosting
|
||
# Formula: combined_score = vector_score * (0.7 + 0.3 * richness_score)
|
||
# - Profiles with richness_score=0 get 70% of vector score
|
||
# - Profiles with richness_score=1 get 100% of vector score
|
||
# This ensures rich profiles rank higher than sparse ones at similar similarity
|
||
richness_boost = 0.7 + 0.3 * richness_score
|
||
person.combined_score = person.vector_score * richness_boost
|
||
|
||
# Apply name-matching boost for queries that look like person names
|
||
# This ensures that searching for "Kitty Bogte" returns Kitty Bogte first,
|
||
# even if vector similarity ranks other Dutch names higher
|
||
if looks_like_person_name(query) and person.name:
|
||
name_boost = calculate_name_match_boost(query, person.name)
|
||
if name_boost > 1.0:
|
||
logger.debug(f"Name match boost {name_boost}x for '{person.name}' (query: '{query}')")
|
||
person.combined_score *= name_boost
|
||
|
||
persons.append(person)
|
||
|
||
# Re-sort by combined score after name boosting
|
||
persons.sort(key=lambda p: p.combined_score, reverse=True)
|
||
|
||
return persons
|
||
|
||
def search_persons(
|
||
self,
|
||
query: str,
|
||
k: int | None = None,
|
||
filter_custodian: str | None = None,
|
||
only_heritage_relevant: bool = False,
|
||
only_wcms: bool = False,
|
||
using: str | None = None,
|
||
# Schema-aware filter parameters (from DSPy HeritageQueryRouter)
|
||
target_role_category: str | None = None,
|
||
target_custodian_type: str | None = None,
|
||
# Extra filters for robust domain search (e.g. email substring)
|
||
extra_filters: dict[str, Any] | None = None,
|
||
) -> list[RetrievedPerson]:
|
||
"""Search for persons/staff in the heritage_persons collection.
|
||
|
||
Args:
|
||
query: Natural language search query
|
||
k: Number of results to return (default: k_final)
|
||
filter_custodian: Optional custodian slug to filter by
|
||
only_heritage_relevant: Only return heritage-relevant staff
|
||
only_wcms: Only return WCMS-registered profiles (heritage sector users)
|
||
using: Optional embedding model name (for multi-embedding mode).
|
||
One of: "openai_1536", "minilm_384", "bge_768"
|
||
target_role_category: Role category from DSPy router (CURATORIAL, ARCHIVAL, etc.)
|
||
Used for headline-based post-filtering since not indexed in Qdrant.
|
||
target_custodian_type: Custodian type from DSPy router (MUSEUM, ARCHIVE, etc.)
|
||
Converted to heritage_type code for Qdrant filtering.
|
||
extra_filters: Optional extra Qdrant filters (e.g. {"email": {"match": {"text": "nos"}}})
|
||
|
||
Returns:
|
||
List of RetrievedPerson with scores
|
||
"""
|
||
k = k or self.k_final
|
||
|
||
# Build Qdrant filter conditions from schema-aware parameters
|
||
heritage_type_code = get_heritage_type_code(target_custodian_type)
|
||
filter_conditions = build_schema_aware_person_filter(
|
||
heritage_type_code=heritage_type_code,
|
||
heritage_relevant_only=only_heritage_relevant,
|
||
custodian_slug=filter_custodian,
|
||
only_wcms=only_wcms,
|
||
) or {}
|
||
|
||
# Merge extra filters if provided (e.g. email match)
|
||
if extra_filters:
|
||
filter_conditions.update(extra_filters)
|
||
|
||
if not filter_conditions:
|
||
filter_conditions = None
|
||
|
||
logger.info(f"Person search for: {query[:50]}... (model: {using or 'auto'}, role_category: {target_role_category}, custodian_type: {target_custodian_type}, extras: {extra_filters})")
|
||
|
||
# Over-fetch to allow for post-filtering and name boosting
|
||
# - Base multiplier: 2x for general queries
|
||
# - Role category filter: 3x (need more candidates for keyword filtering)
|
||
# - Name queries: fetch minimum 100 to ensure name boost can find exact matches
|
||
# (vector similarity often ranks similar-sounding names higher than exact matches)
|
||
is_name_query = looks_like_person_name(query)
|
||
fetch_multiplier = 3 if target_role_category else 2
|
||
fetch_count = max(k * fetch_multiplier, 100 if is_name_query else 0)
|
||
results = self._person_vector_search(query, fetch_count, using=using, filter_conditions=filter_conditions)
|
||
logger.info(f"Found {len(results)} person results after Qdrant filtering")
|
||
|
||
# Apply role category post-filtering (keyword-based since not indexed)
|
||
if target_role_category:
|
||
results = filter_by_role_category_keywords(results, target_role_category)
|
||
|
||
# Sort by combined score and limit
|
||
results.sort(key=lambda x: x.combined_score, reverse=True)
|
||
return results[:k]
|
||
|
||
def search(
|
||
self,
|
||
query: str,
|
||
k: int | None = None,
|
||
expand_graph: bool = True,
|
||
filter_conditions: dict[str, Any] | None = None,
|
||
auto_route: bool = True,
|
||
using: str | None = None,
|
||
region_codes: list[str] | None = None,
|
||
cities: list[str] | None = None,
|
||
institution_types: list[str] | None = None,
|
||
) -> list[RetrievedInstitution] | list[RetrievedPerson]:
|
||
"""Perform hybrid vector + graph search with automatic query routing.
|
||
|
||
If auto_route is True, automatically detects if query is about persons
|
||
(e.g., "Who works at Rijksmuseum?") and routes to person search.
|
||
|
||
Args:
|
||
query: Natural language search query
|
||
k: Number of results to return (default: k_final)
|
||
expand_graph: Whether to perform graph expansion (institution search only)
|
||
filter_conditions: Optional Qdrant filter conditions (legacy, prefer new params)
|
||
auto_route: Automatically detect and route person queries
|
||
using: Optional embedding model name (for multi-embedding mode).
|
||
One of: "openai_1536", "minilm_384", "bge_768"
|
||
region_codes: Optional list of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
|
||
for filtering by province/subdivision
|
||
cities: Optional list of city names (e.g., ["Amsterdam", "Rotterdam"])
|
||
institution_types: Optional list of institution types (e.g., ["ARCHIVE", "MUSEUM"])
|
||
|
||
Returns:
|
||
List of RetrievedInstitution or RetrievedPerson with combined scores
|
||
"""
|
||
k = k or self.k_final
|
||
|
||
# Auto-route person queries
|
||
if auto_route:
|
||
query_type = detect_query_type(query)
|
||
if query_type == "person":
|
||
logger.info(f"Auto-routing to person search for: {query[:50]}...")
|
||
return self.search_persons(query, k=k, using=using)
|
||
|
||
# Institution search (original behavior)
|
||
filter_info = []
|
||
if region_codes:
|
||
filter_info.append(f"regions={region_codes}")
|
||
if cities:
|
||
filter_info.append(f"cities={cities}")
|
||
if institution_types:
|
||
filter_info.append(f"types={institution_types}")
|
||
filter_str = f" [{', '.join(filter_info)}]" if filter_info else ""
|
||
|
||
logger.info(f"Vector search for: {query[:50]}...{filter_str} (model: {using or 'auto'})")
|
||
vector_results = self._vector_search(
|
||
query,
|
||
self.k_vector,
|
||
using=using,
|
||
region_codes=region_codes,
|
||
cities=cities,
|
||
institution_types=institution_types,
|
||
)
|
||
logger.info(f"Found {len(vector_results)} vector results")
|
||
|
||
# Step 2: Graph expansion (if enabled)
|
||
graph_results = []
|
||
if expand_graph and vector_results:
|
||
logger.info("Expanding via knowledge graph...")
|
||
graph_results = self._graph_expand(vector_results)
|
||
logger.info(f"Found {len(graph_results)} graph expansion results")
|
||
|
||
# Step 3: Combine and rank
|
||
final_results = self._combine_and_rank(vector_results, graph_results, k)
|
||
logger.info(f"Returning {len(final_results)} combined results")
|
||
|
||
return final_results
|
||
|
||
def search_institutions(
|
||
self,
|
||
query: str,
|
||
k: int | None = None,
|
||
expand_graph: bool = True,
|
||
filter_conditions: dict[str, Any] | None = None,
|
||
using: str | None = None,
|
||
region_codes: list[str] | None = None,
|
||
cities: list[str] | None = None,
|
||
institution_types: list[str] | None = None,
|
||
) -> list[RetrievedInstitution]:
|
||
"""Explicit institution search (bypasses auto-routing).
|
||
|
||
Args:
|
||
query: Natural language search query
|
||
k: Number of results to return (default: k_final)
|
||
expand_graph: Whether to perform graph expansion
|
||
filter_conditions: Optional Qdrant filter conditions (legacy, prefer new params)
|
||
using: Optional embedding model name (for multi-embedding mode).
|
||
One of: "openai_1536", "minilm_384", "bge_768"
|
||
region_codes: Optional list of ISO 3166-2 region codes (e.g., ["NH", "ZH"])
|
||
for filtering by province/subdivision
|
||
cities: Optional list of city names (e.g., ["Amsterdam", "Rotterdam"])
|
||
institution_types: Optional list of institution types (e.g., ["ARCHIVE", "MUSEUM"])
|
||
|
||
Returns:
|
||
List of RetrievedInstitution with combined scores
|
||
"""
|
||
# auto_route=False ensures we get RetrievedInstitution, not RetrievedPerson
|
||
results = self.search(
|
||
query,
|
||
k=k,
|
||
expand_graph=expand_graph,
|
||
filter_conditions=filter_conditions,
|
||
auto_route=False,
|
||
using=using,
|
||
region_codes=region_codes,
|
||
cities=cities,
|
||
institution_types=institution_types,
|
||
)
|
||
return results # type: ignore[return-value]
|
||
|
||
def __call__(self, query: str, k: int | None = None) -> list[str]:
|
||
"""DSPy-compatible interface returning passage texts.
|
||
|
||
Supports both institution and person queries with auto-routing.
|
||
|
||
Args:
|
||
query: Search query
|
||
k: Number of results
|
||
|
||
Returns:
|
||
List of passage texts (institution/person descriptions)
|
||
"""
|
||
results = self.search(query, k=k)
|
||
|
||
passages = []
|
||
for r in results:
|
||
if isinstance(r, RetrievedPerson):
|
||
# Person result
|
||
org = f" at {r.custodian_name}" if r.custodian_name else ""
|
||
role = r.headline or "Unknown role"
|
||
passages.append(f"{r.name} ({role}{org})")
|
||
else:
|
||
# Institution result
|
||
inst_type = r.institution_type or "Unknown type"
|
||
desc = r.description or "No description"
|
||
passages.append(f"{r.name} ({inst_type}) - {desc}")
|
||
|
||
return passages
|
||
|
||
def get_stats(self) -> dict[str, Any]:
|
||
"""Get retriever statistics.
|
||
|
||
Returns:
|
||
Dict with Qdrant and Oxigraph stats
|
||
"""
|
||
stats = {
|
||
"qdrant": {
|
||
"institutions": {},
|
||
"persons": {},
|
||
},
|
||
"oxigraph": {},
|
||
"config": {
|
||
"vector_weight": self.vector_weight,
|
||
"graph_weight": self.graph_weight,
|
||
"k_vector": self.k_vector,
|
||
"k_expand": self.k_expand,
|
||
"k_final": self.k_final
|
||
}
|
||
}
|
||
|
||
# Qdrant institution collection stats
|
||
try:
|
||
info = self.qdrant_client.get_collection(self.collection_name)
|
||
stats["qdrant"]["institutions"] = {
|
||
"collection": self.collection_name,
|
||
"points_count": info.points_count,
|
||
"status": info.status.value if info.status else "unknown"
|
||
}
|
||
except Exception as e:
|
||
stats["qdrant"]["institutions"]["error"] = str(e)
|
||
|
||
# Qdrant person collection stats
|
||
try:
|
||
info = self.qdrant_client.get_collection("heritage_persons")
|
||
stats["qdrant"]["persons"] = {
|
||
"collection": "heritage_persons",
|
||
"points_count": info.points_count,
|
||
"status": info.status.value if info.status else "unknown"
|
||
}
|
||
except Exception as e:
|
||
stats["qdrant"]["persons"]["error"] = str(e)
|
||
|
||
# Oxigraph stats
|
||
try:
|
||
result = self.sparql_client.query(
|
||
"SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s a hcc:Custodian }"
|
||
)
|
||
if result:
|
||
stats["oxigraph"]["custodian_count"] = int(result[0].get("count", 0))
|
||
except Exception as e:
|
||
stats["oxigraph"]["error"] = str(e)
|
||
|
||
return stats
|
||
|
||
def close(self):
|
||
"""Clean up resources."""
|
||
self.sparql_client.close()
|
||
if self._qdrant_client:
|
||
self._qdrant_client.close()
|
||
|
||
|
||
def create_hybrid_retriever(
|
||
use_production: bool = False,
|
||
**kwargs
|
||
) -> HybridRetriever:
|
||
"""Factory function to create a hybrid retriever.
|
||
|
||
Args:
|
||
use_production: If True, connect to production endpoints
|
||
**kwargs: Additional arguments for HybridRetriever
|
||
|
||
Returns:
|
||
Configured HybridRetriever instance
|
||
"""
|
||
if use_production:
|
||
return HybridRetriever(
|
||
qdrant_host="bronhouder.nl",
|
||
qdrant_port=443,
|
||
sparql_endpoint="https://bronhouder.nl/sparql",
|
||
use_production_qdrant=True,
|
||
**kwargs
|
||
)
|
||
else:
|
||
return HybridRetriever(
|
||
qdrant_host=os.getenv("QDRANT_HOST", "localhost"),
|
||
qdrant_port=int(os.getenv("QDRANT_PORT", "6333")),
|
||
sparql_endpoint=os.getenv("SPARQL_ENDPOINT", "http://localhost:7878/query"),
|
||
**kwargs
|
||
)
|