glam/backend/rag/ontology_mapping.py
kempersc d1c9aebd84 feat(rag): Add hybrid language detection and enhanced ontology mapping
Implement Heritage RAG pipeline enhancements:

1. Ontology Mapping (new file: ontology_mapping.py)
   - Hybrid language detection: heritage vocabulary -> fast-langdetect -> English default
   - HERITAGE_VOCABULARY dict (~40 terms) for domain-specific accuracy
   - FastText-based ML detection with 0.6 confidence threshold
   - Support for Dutch, French, German, Spanish, Italian, Portuguese, English
   - Dynamic synonym extraction from LinkML enum values
   - 93 comprehensive tests (all passing)

2. Schema Loader Enhancements (schema_loader.py)
   - Language-tagged multilingual synonym extraction for DSPy signatures
   - Enhanced enum value parsing with annotations support
   - Better error handling for malformed schema files

3. DSPy Heritage RAG (dspy_heritage_rag.py)
   - Fixed all 10 mypy type errors
   - Enhanced type annotations throughout
   - Improved query routing with multilingual support

4. Dependencies (pyproject.toml)
   - Added fast-langdetect ^1.0.0 (primary language detection)
   - Added types-pyyaml ^6.0.12 (mypy type stubs)

Tests: 93 new tests for ontology_mapping, all passing
Mypy: Clean (no type errors)
2025-12-14 15:55:18 +01:00

1360 lines
48 KiB
Python

"""
Dynamic Ontology Mapping from LinkML Schema Files
This module provides dynamic loading and matching of LinkML schema enumerations
for the Heritage RAG pipeline. The LinkML schema files are the SINGLE SOURCE OF TRUTH -
no hardcoded enum values.
Key features:
1. Dynamically loads enum files from schemas/20251121/linkml/modules/enums/
2. Extracts multilingual synonyms from the 'comments' field in YAML
3. Provides fuzzy matching for natural language queries
4. Supports cache invalidation based on file modification times
5. Generates filter mappings for Qdrant queries
Usage:
from backend.rag.ontology_mapping import get_ontology_mapper, match_custodian_type
mapper = get_ontology_mapper()
# Match natural language to schema enum value
result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum")
# Returns: "VIRTUAL_MUSEUM"
# Get heritage type code for Qdrant filtering
code = mapper.get_heritage_type_code("MUSEUM")
# Returns: "M"
# Get custodian type to code mapping (replaces hardcoded dict)
type_to_code = mapper.get_custodian_type_to_code_mapping()
# Returns: {"GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", ...}
"""
from __future__ import annotations
import logging
import os
import re
import unicodedata
from dataclasses import dataclass, field
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
from typing import Any
import yaml
logger = logging.getLogger(__name__)
# Default schema directory - matches schema_loader.py
SCHEMA_BASE_DIR = Path(__file__).parent.parent.parent / "schemas" / "20251121" / "linkml"
# Languages supported for synonym extraction (ISO 639-1 codes)
SUPPORTED_LANGUAGES = {"en", "nl", "de", "fr", "es", "it", "pt"}
# Heritage-specific vocabulary for domain-specific language detection
# These terms are where general-purpose language detectors often fail on short heritage terms.
# fast-langdetect is used as primary detector; this vocabulary is used as fallback for:
# 1. Low-confidence detections (score < CONFIDENCE_THRESHOLD)
# 2. Known problematic terms that detectors consistently misclassify
#
# NOTE: This is a REDUCED vocabulary focused only on disambiguation cases.
# General-purpose language detection handles most terms correctly.
HERITAGE_VOCABULARY: dict[str, set[str]] = {
"nl": {
# Dutch terms that fast-langdetect often misclassifies
# (e.g., "musea" detected as Italian, "bibliotheken" as German)
"musea", "bibliotheek", "bibliotheken", "archief", "archieven",
"galerij", "galerijen", "collectie", "collecties", "verzameling",
"heemkundige", "kring", "vereniging", "genootschap", "erfgoed",
"rijks", "gemeentelijk", "provinciale",
},
"de": {
# German terms - most are detected correctly, keep only ambiguous ones
"museen", "archiv", "sammlung", "sammlungen",
"landesarchiv", "stadtarchiv", "bundesarchiv",
},
"fr": {
# French terms with diacritics are usually detected correctly
# Keep only terms without diacritics that could be confused
"musee", "musees", "bibliotheque", "bibliotheques",
},
"es": {
# Spanish - biblioteca/museo overlap with Italian
"archivos", "bibliotecas",
},
"it": {
# Italian terms
"musei", "archivi", "biblioteche", "galleria", "gallerie",
},
"pt": {
# Portuguese - museu is distinctive
"museu", "museus", "arquivo", "arquivos",
},
"en": {
# English heritage terms - these should match English
"library", "libraries", "museum", "museums", "archive", "archives",
"gallery", "galleries", "collection", "collections",
"society", "association", "foundation", "trust", "institute",
},
}
# Confidence threshold for fast-langdetect
# Below this, fall back to heritage vocabulary matching
LANGDETECT_CONFIDENCE_THRESHOLD = 0.6
# Flag to track if fast-langdetect is available
_FAST_LANGDETECT_AVAILABLE: bool | None = None
def _is_fast_langdetect_available() -> bool:
"""Check if fast-langdetect is available."""
global _FAST_LANGDETECT_AVAILABLE
if _FAST_LANGDETECT_AVAILABLE is None:
try:
from fast_langdetect import detect # noqa: F401
_FAST_LANGDETECT_AVAILABLE = True
except ImportError:
_FAST_LANGDETECT_AVAILABLE = False
logger.warning(
"fast-langdetect not installed. Using heritage vocabulary fallback only. "
"Install with: pip install fast-langdetect"
)
return _FAST_LANGDETECT_AVAILABLE
def _match_heritage_vocabulary(term: str) -> str | None:
"""Match term against heritage-specific vocabulary.
This is the fallback method when fast-langdetect is unavailable or
returns low confidence. Uses domain-specific heritage terms that
general-purpose language detectors often misclassify.
Args:
term: The term to match
Returns:
Language code or None if no match
"""
normalized = normalize_text(term)
original_lower = term.lower().strip()
# Single-word exact match
for lang, vocab in HERITAGE_VOCABULARY.items():
normalized_vocab = {normalize_text(v) for v in vocab}
if normalized in normalized_vocab:
return lang
# Also check with original (preserves diacritics)
if original_lower in {v.lower() for v in vocab}:
return lang
# Prefix matching for morphological variations
# e.g., "bibliotheken" should match "bibliotheek"
for lang, vocab in HERITAGE_VOCABULARY.items():
normalized_vocab = {normalize_text(v) for v in vocab}
for marker in normalized_vocab:
if len(marker) >= 5 and len(normalized) >= 5:
if normalized.startswith(marker[:5]) or marker.startswith(normalized[:5]):
return lang
return None
def detect_term_language(term: str) -> str | None:
"""Detect language of a term using hybrid approach.
Uses a two-stage detection strategy:
1. Primary: fast-langdetect library (FastText model, 176 languages)
2. Fallback: Heritage-specific vocabulary for domain terms
The fallback is used when:
- fast-langdetect is not installed
- Detection confidence is below threshold (0.6)
- The term matches known heritage vocabulary
Args:
term: A single term to analyze (e.g., "bibliotheken", "museos")
Returns:
ISO 639-1 language code or None if detection fails
Examples:
>>> detect_term_language("bibliotheken")
"nl"
>>> detect_term_language("museos")
"es"
>>> detect_term_language("bibliothèques")
"fr"
>>> detect_term_language("Public libraries")
"en"
>>> detect_term_language("unknown term")
None
"""
if not term or not term.strip():
return "en" # Default for empty strings
normalized = normalize_text(term)
words = normalized.split()
# Multi-word phrase detection
if len(words) > 1:
# English phrase indicators - these words strongly suggest English
english_indicators = {
"public", "national", "special", "digital", "academic", "local",
"art", "history", "science", "natural", "city", "state",
"corporate", "government", "religious", "university",
}
if any(word in english_indicators for word in words):
return "en"
# Try heritage vocabulary first for known terms
# This catches terms that fast-langdetect misclassifies
heritage_match = _match_heritage_vocabulary(term)
if heritage_match:
return heritage_match
# Use fast-langdetect if available
if _is_fast_langdetect_available():
try:
from fast_langdetect import detect
result = detect(term)
if isinstance(result, dict):
lang = result.get("lang")
score = result.get("score", 0)
elif isinstance(result, list) and result:
lang = result[0].get("lang")
score = result[0].get("score", 0)
else:
lang = None
score = 0
# Return if confidence is high enough
if lang and score >= LANGDETECT_CONFIDENCE_THRESHOLD:
# Map to supported languages (fast-langdetect returns ISO 639-1)
if lang in SUPPORTED_LANGUAGES:
return str(lang)
# Some language codes need mapping
lang_mapping: dict[str, str] = {"af": "nl"} # Afrikaans often confused with Dutch
mapped = lang_mapping.get(str(lang), str(lang))
return mapped if mapped in SUPPORTED_LANGUAGES else None
# Low confidence - fall through to return None
logger.debug(f"Low confidence ({score:.2f}) for term '{term}', returning None")
except Exception as e:
logger.debug(f"fast-langdetect error for '{term}': {e}")
# For multi-word terms without clear indicators, default to English
if len(words) > 1:
return "en"
# Single word with no match - return None
return None
# GLAMORCUBESFIXPHDNT taxonomy mapping - enum value name to single-letter code
# This mapping is STABLE (defined by taxonomy) but the enum VALUE NAMES may evolve
# So we still load dynamically and match to this fixed mapping
GLAMORCUBESFIXPHDNT_CODES: dict[str, str] = {
# Primary type enum values -> single letter codes
"GALLERY": "G",
"LIBRARY": "L",
"ARCHIVE": "A",
"MUSEUM": "M",
"OFFICIAL_INSTITUTION": "O",
"RESEARCH_CENTER": "R",
"COMMERCIAL": "C",
"UNSPECIFIED": "U",
"BIO_CUSTODIAN": "B",
"EDUCATION_PROVIDER": "E",
"HERITAGE_SOCIETY": "S",
"FEATURE_CUSTODIAN": "F",
"INTANGIBLE_HERITAGE_GROUP": "I",
"MIXED": "X",
"PERSONAL_COLLECTION": "P",
"HOLY_SACRED_SITE": "H",
"DIGITAL_PLATFORM": "D",
"NON_PROFIT": "N",
"TASTE_SCENT_HERITAGE": "T",
}
@dataclass
class EnumValueInfo:
"""Detailed information for a single enum value.
Attributes:
name: The enum value name (e.g., "VIRTUAL_MUSEUM")
description: Human-readable description
wikidata_id: Wikidata entity ID from 'meaning' field (e.g., "Q1225034")
synonyms: Language-tagged synonyms extracted from comments
all_synonyms_normalized: Flattened list of normalized synonyms for matching
"""
name: str
description: str | None = None
wikidata_id: str | None = None
synonyms: dict[str, list[str]] = field(default_factory=dict) # lang_code -> synonyms
all_synonyms_normalized: list[str] = field(default_factory=list)
@dataclass
class EnumMapping:
"""Complete mapping for an enum type.
Attributes:
enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum")
source_file: Path to the source YAML file
values: Dictionary mapping value names to EnumValueInfo
last_loaded: When this enum was last loaded
file_mtime: File modification time for cache invalidation
description: Enum-level description
"""
enum_name: str
source_file: Path
values: dict[str, EnumValueInfo] = field(default_factory=dict)
last_loaded: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
file_mtime: float = 0.0
description: str | None = None
def normalize_text(text: str) -> str:
"""Normalize text for matching: lowercase, remove accents, strip whitespace.
Args:
text: Input text to normalize
Returns:
Normalized text for comparison
Examples:
>>> normalize_text("Digitales Museum")
"digitales museum"
>>> normalize_text("musée virtuel")
"musee virtuel"
>>> normalize_text("Bibliothèque")
"bibliotheque"
"""
# NFD decomposition separates base characters from combining marks
normalized = unicodedata.normalize('NFD', text)
# Remove combining marks (category 'Mn' = Mark, Nonspacing)
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Lowercase and strip
return ascii_text.lower().strip()
def parse_language_tag(comment: str) -> tuple[str | None, str]:
"""Parse a language-tagged comment string.
Format: "term (lang_code)" -> ("lang_code", "term")
Args:
comment: Comment string, possibly with language tag
Returns:
Tuple of (language_code, term) where language_code may be None
Examples:
>>> parse_language_tag("Digitales Museum (de)")
("de", "Digitales Museum")
>>> parse_language_tag("museo virtual (es)")
("es", "museo virtual")
>>> parse_language_tag("Some plain comment")
(None, "Some plain comment")
"""
# Pattern: text (lang_code) at end of string
pattern = r'^(.+?)\s*\(([a-z]{2})\)\s*$'
match = re.match(pattern, comment, re.IGNORECASE)
if match:
term = match.group(1).strip()
lang = match.group(2).lower()
if lang in SUPPORTED_LANGUAGES:
return (lang, term)
return (None, comment)
def extract_comma_separated_terms(comment: str) -> list[str]:
"""Extract comma-separated terms from comments like "Includes X, Y, Z".
Handles patterns commonly found in CustodianPrimaryTypeEnum.yaml:
- "Includes bibliotheken, bibliotecas, bibliothèques"
- "Public libraries, academic libraries, national libraries"
- "Kunsthallen, art galleries, visual arts centers"
Args:
comment: A comment string that may contain comma-separated terms
Returns:
List of individual terms extracted from the comment
Examples:
>>> extract_comma_separated_terms("Includes musea, museos, musées")
["musea", "museos", "musées"]
>>> extract_comma_separated_terms("Public libraries, academic libraries")
["Public libraries", "academic libraries"]
>>> extract_comma_separated_terms("Some single term comment")
[] # Empty list - no commas
"""
terms: list[str] = []
# Skip if no commas (not a list)
if ',' not in comment:
return terms
# Strip common prefixes like "Includes", "Examples:", etc.
cleaned = comment
prefixes_to_strip = [
r'^Includes\s+',
r'^Examples?:?\s*',
r'^Types?:?\s*',
r'^Such as\s+',
r'^E\.g\.?,?\s*',
r'^I\.e\.?,?\s*',
]
for prefix in prefixes_to_strip:
cleaned = re.sub(prefix, '', cleaned, flags=re.IGNORECASE)
# Split by comma
parts = cleaned.split(',')
for part in parts:
# Clean up each term
term = part.strip()
# Skip empty terms
if not term:
continue
# Skip terms that look like full sentences (long descriptions)
if len(term) > 50:
continue
# Skip terms that are just references like "(Q123456)"
if re.match(r'^\(Q\d+\)$', term):
continue
# Handle trailing parentheses like "botanical gardens (Q473972)"
# Extract just the term part
paren_match = re.match(r'^(.+?)\s*\([^)]+\)\s*$', term)
if paren_match:
term = paren_match.group(1).strip()
# Add valid terms
if term and len(term) >= 2:
terms.append(term)
return terms
def extract_wikidata_id(meaning: str | None) -> str | None:
"""Extract Wikidata ID from meaning field.
Args:
meaning: The meaning field value (e.g., "wikidata:Q1225034")
Returns:
The Wikidata ID (e.g., "Q1225034") or None
"""
if not meaning:
return None
# Handle "wikidata:Q12345" format
if meaning.startswith("wikidata:"):
return meaning.replace("wikidata:", "")
# Handle full URI format
if "wikidata.org" in meaning:
match = re.search(r'(Q\d+)', meaning)
if match:
return match.group(1)
return None
class OntologyMapper:
"""Dynamic ontology mapping from LinkML schema files.
This class loads enum definitions from the LinkML schema directory and provides:
- Multilingual synonym extraction from YAML comments
- Natural language matching to schema enum values
- Cache invalidation based on file modification times
- Integration helpers for Qdrant filtering
Usage:
mapper = OntologyMapper(schema_dir=Path("schemas/20251121/linkml"))
# Load specific enum
digital_platforms = mapper.load_enum("DigitalPlatformTypeEnum")
print(len(digital_platforms.values)) # 53
# Match natural language (Dutch)
result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum")
# Returns: "VIRTUAL_MUSEUM"
# Get heritage type code for Qdrant filtering
code = mapper.get_heritage_type_code("MUSEUM")
# Returns: "M"
"""
def __init__(self, schema_dir: Path | None = None, watch_for_changes: bool = True):
"""Initialize the OntologyMapper.
Args:
schema_dir: Path to LinkML schema directory. Defaults to schemas/20251121/linkml/
watch_for_changes: Whether to check file mtimes for cache invalidation
"""
self.schema_dir = schema_dir or SCHEMA_BASE_DIR
self.enums_dir = self.schema_dir / "modules" / "enums"
self.watch_for_changes = watch_for_changes
# Cache of loaded enums
self._cache: dict[str, EnumMapping] = {}
# File modification times for cache invalidation
self._file_mtimes: dict[str, float] = {}
logger.info(f"OntologyMapper initialized with schema_dir: {self.schema_dir}")
def _get_enum_file_path(self, enum_name: str) -> Path:
"""Get the file path for an enum.
Args:
enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum")
Returns:
Path to the enum YAML file
"""
return self.enums_dir / f"{enum_name}.yaml"
def _is_cache_stale(self, enum_name: str) -> bool:
"""Check if cached enum is stale based on file mtime.
Args:
enum_name: Name of the enum to check
Returns:
True if cache is stale and needs reload
"""
if not self.watch_for_changes:
return False
if enum_name not in self._cache:
return True
filepath = self._get_enum_file_path(enum_name)
if not filepath.exists():
return True
current_mtime = filepath.stat().st_mtime
cached_mtime = self._file_mtimes.get(enum_name, 0.0)
return current_mtime > cached_mtime
def _parse_comments_to_synonyms(
self,
comments: list[str] | None
) -> tuple[dict[str, list[str]], list[str]]:
"""Parse comments field to extract multilingual synonyms.
Handles three formats:
1. Language-tagged: "Digitales Museum (de)" -> {"de": ["Digitales Museum"]}
2. Comma-separated with auto-detection: "Includes musea, museos, musées"
-> {"nl": ["musea"], "es": ["museos"], "fr": ["musées"]}
3. Plain terms: Added to all_normalized for fuzzy matching
The auto-detection uses LANGUAGE_MARKERS to identify which language
each term belongs to based on known heritage vocabulary patterns.
Args:
comments: List of comment strings from YAML
Returns:
Tuple of (synonyms_by_language, all_normalized_synonyms)
Example:
Input: ["Digitales Museum (de)", "Includes musea, museos, musées"]
Output: (
{"de": ["Digitales Museum"], "nl": ["musea"], "es": ["museos"], "fr": ["musées"]},
["digitales museum", "musea", "museos", "musees", ...]
)
"""
synonyms_by_lang: dict[str, list[str]] = {}
all_normalized: list[str] = []
if not comments:
return synonyms_by_lang, all_normalized
def add_to_lang_dict(lang: str, term: str) -> None:
"""Helper to add term to language-specific dict."""
if lang not in synonyms_by_lang:
synonyms_by_lang[lang] = []
# Avoid duplicates
if term not in synonyms_by_lang[lang]:
synonyms_by_lang[lang].append(term)
for comment in comments:
# Try to parse explicit language tag first
lang, term = parse_language_tag(comment)
# Add to language-specific dict if explicitly tagged
if lang:
add_to_lang_dict(lang, term)
# Add normalized version to flat list
normalized = normalize_text(term)
if normalized and normalized not in all_normalized:
all_normalized.append(normalized)
# Extract comma-separated terms within the comment
# This handles patterns like "Includes bibliotheken, bibliotecas, bibliothèques"
comma_terms = extract_comma_separated_terms(comment)
for cterm in comma_terms:
cterm_normalized = normalize_text(cterm)
if cterm_normalized and cterm_normalized not in all_normalized:
all_normalized.append(cterm_normalized)
# Try to detect language for this term
detected_lang = detect_term_language(cterm)
if detected_lang:
# Store the original (unnormalized) term with its language
add_to_lang_dict(detected_lang, cterm)
return synonyms_by_lang, all_normalized
def load_enum(self, enum_name: str, force_reload: bool = False) -> EnumMapping | None:
"""Load a single enum with cache invalidation.
Args:
enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum")
force_reload: Force reload even if cached
Returns:
EnumMapping object or None if file doesn't exist
"""
# Check cache
if not force_reload and not self._is_cache_stale(enum_name):
cached = self._cache.get(enum_name)
if cached:
return cached
# Load from file
filepath = self._get_enum_file_path(enum_name)
if not filepath.exists():
logger.warning(f"Enum file not found: {filepath}")
return None
try:
with open(filepath, 'r', encoding='utf-8') as f:
yaml_content = yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load enum {enum_name}: {e}")
return None
# Parse the enum
file_mtime = filepath.stat().st_mtime
enums_section = yaml_content.get("enums", {})
enum_def = enums_section.get(enum_name, {})
if not enum_def:
# Try to find any enum in the file
if enums_section:
enum_name = next(iter(enums_section.keys()))
enum_def = enums_section[enum_name]
permissible_values = enum_def.get("permissible_values", {})
# Build EnumMapping
mapping = EnumMapping(
enum_name=enum_name,
source_file=filepath,
file_mtime=file_mtime,
description=yaml_content.get("description") or enum_def.get("description"),
)
for value_name, value_info in permissible_values.items():
if value_info is None:
value_info = {}
comments = value_info.get("comments", [])
synonyms, all_normalized = self._parse_comments_to_synonyms(comments)
# Add description to normalized synonyms
description = value_info.get("description")
if description:
desc_normalized = normalize_text(description)
if desc_normalized and desc_normalized not in all_normalized:
all_normalized.append(desc_normalized)
# Add the value name itself as a synonym
name_normalized = normalize_text(value_name.replace("_", " "))
if name_normalized and name_normalized not in all_normalized:
all_normalized.insert(0, name_normalized)
mapping.values[value_name] = EnumValueInfo(
name=value_name,
description=description,
wikidata_id=extract_wikidata_id(value_info.get("meaning")),
synonyms=synonyms,
all_synonyms_normalized=all_normalized,
)
# Update cache
self._cache[enum_name] = mapping
self._file_mtimes[enum_name] = file_mtime
logger.debug(f"Loaded enum {enum_name} with {len(mapping.values)} values")
return mapping
def load_all_enums(self) -> dict[str, EnumMapping]:
"""Load all enum files from schema directory.
Returns:
Dictionary mapping enum names to EnumMapping objects
"""
if not self.enums_dir.exists():
logger.warning(f"Enums directory not found: {self.enums_dir}")
return {}
loaded = {}
for filepath in self.enums_dir.glob("*.yaml"):
enum_name = filepath.stem
mapping = self.load_enum(enum_name)
if mapping:
loaded[enum_name] = mapping
logger.info(f"Loaded {len(loaded)} enums from {self.enums_dir}")
return loaded
def get_synonyms(self, enum_name: str, value: str) -> list[str]:
"""Get all synonyms for an enum value.
Args:
enum_name: Name of the enum
value: Enum value name
Returns:
List of normalized synonyms
"""
mapping = self.load_enum(enum_name)
if not mapping:
return []
value_info = mapping.values.get(value)
if not value_info:
return []
return value_info.all_synonyms_normalized
def get_enum_value_info(self, value_name: str, enum_name: str) -> EnumValueInfo | None:
"""Get detailed EnumValueInfo for a specific enum value.
This method provides access to the full EnumValueInfo dataclass,
including both language-tagged synonyms and all normalized synonyms.
Args:
value_name: The enum value name (e.g., "MUSEUM", "LIBRARY")
enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum")
Returns:
EnumValueInfo object or None if not found
Example:
>>> mapper = get_ontology_mapper()
>>> info = mapper.get_enum_value_info("LIBRARY", "CustodianPrimaryTypeEnum")
>>> print(info.synonyms) # Language-tagged synonyms
{"nl": ["bibliotheken"], "es": ["bibliotecas"], "fr": ["bibliothèques"]}
>>> print(info.all_synonyms_normalized[:5]) # All normalized
["library", "bibliotheken", "bibliotecas", "bibliotheques", ...]
"""
mapping = self.load_enum(enum_name)
if not mapping:
logger.debug(f"Enum {enum_name} not found for get_enum_value_info")
return None
return mapping.values.get(value_name)
def match_natural_language(
self,
text: str,
enum_name: str,
threshold: float = 0.8
) -> str | None:
"""Fuzzy match natural language text to schema enum value.
Args:
text: Natural language text to match (e.g., "virtueel museum")
enum_name: Name of the enum to match against
threshold: Similarity threshold for fuzzy matching (0.0-1.0)
Returns:
Matched enum value name or None
Examples:
>>> mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum")
"VIRTUAL_MUSEUM"
>>> mapper.match_natural_language("Digitales Museum", "DigitalPlatformTypeEnum")
"VIRTUAL_MUSEUM"
"""
mapping = self.load_enum(enum_name)
if not mapping:
return None
normalized_query = normalize_text(text)
if not normalized_query:
return None
# 1. Exact match against normalized synonyms
for value_name, value_info in mapping.values.items():
if normalized_query in value_info.all_synonyms_normalized:
return value_name
# 2. Substring match (query is contained in synonym or vice versa)
for value_name, value_info in mapping.values.items():
for synonym in value_info.all_synonyms_normalized:
if normalized_query in synonym or synonym in normalized_query:
return value_name
# 3. Fuzzy match using basic similarity
best_match: str | None = None
best_score = 0.0
for value_name, value_info in mapping.values.items():
for synonym in value_info.all_synonyms_normalized:
score = self._simple_similarity(normalized_query, synonym)
if score > best_score and score >= threshold:
best_score = score
best_match = value_name
return best_match
def _simple_similarity(self, s1: str, s2: str) -> float:
"""Calculate simple similarity ratio between two strings.
Uses multiple approaches:
1. Exact match (1.0)
2. Prefix match for singular/plural handling (0.9)
3. Word-level Jaccard similarity
4. Character bigram similarity
Args:
s1: First string
s2: Second string
Returns:
Similarity ratio (0.0-1.0)
"""
if not s1 or not s2:
return 0.0
# Exact match
if s1 == s2:
return 1.0
# Prefix match - handles singular/plural variations
# e.g., "bibliotheek" matches "bibliotheken" (Dutch)
# e.g., "archief" matches "archieven" (Dutch)
min_len = min(len(s1), len(s2))
max_len = max(len(s1), len(s2))
# If one is a prefix of the other (with reasonable length overlap)
if min_len >= 5 and max_len - min_len <= 3:
shorter, longer = (s1, s2) if len(s1) < len(s2) else (s2, s1)
if longer.startswith(shorter):
return 0.95 # High score for prefix match
# Common stem match - handle variations like archief/archieven, museum/musea
# Use shared prefix ratio
shared_prefix_len = 0
for i in range(min_len):
if s1[i] == s2[i]:
shared_prefix_len += 1
else:
break
# If they share a significant prefix (>= 70% of shorter word)
if shared_prefix_len >= 4 and shared_prefix_len / min_len >= 0.7:
return 0.90
# Word-level comparison
words1 = set(s1.split())
words2 = set(s2.split())
if words1 and words2:
intersection = len(words1 & words2)
union = len(words1 | words2)
word_similarity = intersection / union if union > 0 else 0.0
# Boost if high word overlap
if word_similarity > 0.5:
return word_similarity
# Character-level bigram comparison
def get_bigrams(s: str) -> set[str]:
return {s[i:i+2] for i in range(len(s) - 1)} if len(s) > 1 else {s}
bigrams1 = get_bigrams(s1)
bigrams2 = get_bigrams(s2)
intersection = len(bigrams1 & bigrams2)
union = len(bigrams1 | bigrams2)
return intersection / union if union > 0 else 0.0
def get_heritage_type_code(self, custodian_type: str) -> str | None:
"""Map CustodianPrimaryTypeEnum value to single-letter heritage code.
Args:
custodian_type: Enum value (e.g., "MUSEUM", "ARCHIVE")
Returns:
Single-letter GLAMORCUBESFIXPHDNT code or None
Example:
>>> mapper.get_heritage_type_code("MUSEUM")
"M"
>>> mapper.get_heritage_type_code("ARCHIVE")
"A"
"""
return GLAMORCUBESFIXPHDNT_CODES.get(custodian_type)
def get_custodian_type_to_code_mapping(self) -> dict[str, str]:
"""Generate CustodianPrimaryTypeEnum -> single-letter code mapping.
This replaces the hardcoded CUSTODIAN_TYPE_TO_HERITAGE_CODE dict
in hybrid_retriever.py.
Returns:
Dict mapping enum values to single-letter codes
"""
# Load the enum to get actual values
mapping = self.load_enum("CustodianPrimaryTypeEnum")
result = {}
if mapping:
for value_name in mapping.values:
code = GLAMORCUBESFIXPHDNT_CODES.get(value_name)
if code:
result[value_name] = code
else:
# Fall back to static mapping if enum can't be loaded
result = GLAMORCUBESFIXPHDNT_CODES.copy()
return result
def get_synonyms_for_value(self, value_name: str, enum_name: str) -> set[str]:
"""Get all synonyms for a specific enum value.
This method retrieves all synonyms associated with an enum value,
useful for building prompt context or understanding what natural language
terms map to a given enum value.
Collects synonyms from:
1. Language-tagged synonyms in comments (e.g., "bibliotheek [nl]")
2. Normalized synonyms from comma-separated lists (e.g., "Includes bibliotheken, bibliotecas")
Args:
value_name: The enum value name (e.g., "MUSEUM", "LIBRARY")
enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum")
Returns:
Set of synonym strings. Returns empty set if enum or value not found.
Example:
>>> mapper = get_ontology_mapper()
>>> synonyms = mapper.get_synonyms_for_value("LIBRARY", "CustodianPrimaryTypeEnum")
>>> print(synonyms)
{"bibliotheken", "bibliotecas", "bibliotheques", "library", ...}
"""
mapping = self.load_enum(enum_name)
if not mapping:
logger.debug(f"Enum {enum_name} not found for get_synonyms_for_value")
return set()
value_info = mapping.values.get(value_name)
if not value_info:
logger.debug(f"Value {value_name} not found in enum {enum_name}")
return set()
# Collect all synonyms from multiple sources
all_synonyms: set[str] = set()
# 1. Add language-tagged synonyms (from patterns like "bibliotheek [nl]")
for lang_code, lang_synonyms in value_info.synonyms.items():
all_synonyms.update(lang_synonyms)
# 2. Add normalized synonyms (from comma-separated lists in comments)
# These are extracted during load_enum() from patterns like
# "Includes bibliotheken, bibliotecas, bibliothèques"
all_synonyms.update(value_info.all_synonyms_normalized)
return all_synonyms
def get_all_synonyms_by_language(
self,
value_name: str,
enum_name: str
) -> dict[str, set[str]]:
"""Get synonyms for a value organized by language.
Returns language-tagged synonyms from comments, plus an "all" key
containing all normalized synonyms (not language-specific).
Args:
value_name: The enum value name (e.g., "MUSEUM", "LIBRARY")
enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum")
Returns:
Dict mapping language codes to sets of synonyms. The special key "all"
contains all normalized synonyms regardless of language.
Returns empty dict if enum or value not found.
Example:
>>> mapper = get_ontology_mapper()
>>> by_lang = mapper.get_all_synonyms_by_language("LIBRARY", "CustodianPrimaryTypeEnum")
>>> print(by_lang)
{
"nl": {"bibliotheek", "bibliotheken"},
"de": {"Bibliothek"},
"all": {"library", "bibliotheken", "bibliotecas", "bibliotheques", ...}
}
"""
mapping = self.load_enum(enum_name)
if not mapping:
return {}
value_info = mapping.values.get(value_name)
if not value_info:
return {}
# Start with language-tagged synonyms
result = {lang: set(syns) for lang, syns in value_info.synonyms.items()}
# Add "all" key with all normalized synonyms
result["all"] = set(value_info.all_synonyms_normalized)
return result
def get_enum_values_for_prompt(
self,
enum_name: str,
max_values: int = 20,
include_descriptions: bool = True
) -> str:
"""Format enum values for DSPy prompt injection.
Args:
enum_name: Name of the enum
max_values: Maximum number of values to include
include_descriptions: Whether to include value descriptions
Returns:
Formatted string for prompt injection
"""
mapping = self.load_enum(enum_name)
if not mapping:
return f"[Enum {enum_name} not found]"
lines = [f"Valid values for {enum_name}:"]
for i, (value_name, value_info) in enumerate(mapping.values.items()):
if i >= max_values:
remaining = len(mapping.values) - max_values
lines.append(f" ... and {remaining} more values")
break
if include_descriptions and value_info.description:
# Truncate long descriptions
desc = value_info.description[:60]
if len(value_info.description) > 60:
desc += "..."
lines.append(f" - {value_name}: {desc}")
else:
lines.append(f" - {value_name}")
return "\n".join(lines)
def get_valid_filter_values(self, enum_name: str) -> list[str]:
"""Get list of valid values for filtering (e.g., Qdrant).
Args:
enum_name: Name of the enum
Returns:
List of valid enum value names
"""
mapping = self.load_enum(enum_name)
if not mapping:
return []
return list(mapping.values.keys())
def invalidate_cache_if_changed(self) -> bool:
"""Check all cached enums and invalidate stale entries.
Returns:
True if any cache entries were invalidated
"""
if not self.watch_for_changes:
return False
invalidated = False
for enum_name in list(self._cache.keys()):
if self._is_cache_stale(enum_name):
del self._cache[enum_name]
del self._file_mtimes[enum_name]
invalidated = True
logger.info(f"Invalidated stale cache for {enum_name}")
return invalidated
def clear_cache(self) -> None:
"""Clear all cached enums."""
self._cache.clear()
self._file_mtimes.clear()
logger.info("Cleared ontology mapper cache")
# =========================================================================
# Role Category Mapping (for person search)
# =========================================================================
def get_role_category_keywords(self) -> dict[str, list[str]]:
"""Load role category keywords from RoleCategoryEnum.
This replaces the hardcoded ROLE_CATEGORY_KEYWORDS dict.
Keywords are extracted from the 'comments' field of each enum value.
Returns:
Dict mapping role category to list of keywords
"""
# Try to load from StaffRole.yaml which contains RoleCategoryEnum
staff_role_path = self.schema_dir / "modules" / "classes" / "StaffRole.yaml"
if not staff_role_path.exists():
logger.warning(f"StaffRole.yaml not found: {staff_role_path}")
return {}
try:
with open(staff_role_path, 'r', encoding='utf-8') as f:
yaml_content = yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load StaffRole.yaml: {e}")
return {}
enums = yaml_content.get("enums", {})
role_category_enum = enums.get("RoleCategoryEnum", {})
permissible_values = role_category_enum.get("permissible_values", {})
result = {}
for category_name, category_info in permissible_values.items():
if category_info is None:
continue
# Extract keywords from comments and description
keywords = []
# Get keywords from comments
comments = category_info.get("comments", [])
for comment in comments:
# Parse language tag if present
_, term = parse_language_tag(comment)
normalized = normalize_text(term)
if normalized:
keywords.append(normalized)
# Add keywords from description
description = category_info.get("description")
if description:
# Split description into words and add significant ones
words = description.lower().split()
for word in words:
if len(word) > 3 and word not in {"with", "that", "from", "have", "this"}:
keywords.append(normalize_text(word))
# Add the category name itself
keywords.append(normalize_text(category_name))
# Remove duplicates while preserving order
seen = set()
unique_keywords = []
for kw in keywords:
if kw and kw not in seen:
seen.add(kw)
unique_keywords.append(kw)
result[category_name] = unique_keywords
return result
# =============================================================================
# Singleton Access Pattern
# =============================================================================
_ontology_mapper: OntologyMapper | None = None
def get_ontology_mapper() -> OntologyMapper:
"""Get singleton OntologyMapper instance.
Returns:
Shared OntologyMapper instance
"""
global _ontology_mapper
if _ontology_mapper is None:
_ontology_mapper = OntologyMapper(SCHEMA_BASE_DIR)
return _ontology_mapper
def reset_ontology_mapper() -> None:
"""Reset the singleton instance (useful for testing)."""
global _ontology_mapper
_ontology_mapper = None
# =============================================================================
# Convenience Functions
# =============================================================================
def match_custodian_type(text: str) -> str | None:
"""Match text to CustodianPrimaryTypeEnum value.
Args:
text: Natural language text describing institution type
Returns:
Matched enum value or None
Example:
>>> match_custodian_type("museum")
"MUSEUM"
>>> match_custodian_type("bibliotheek")
"LIBRARY"
"""
return get_ontology_mapper().match_natural_language(text, "CustodianPrimaryTypeEnum")
def match_museum_type(text: str) -> str | None:
"""Match text to MuseumTypeEnum value.
Args:
text: Natural language text describing museum type
Returns:
Matched enum value or None
"""
return get_ontology_mapper().match_natural_language(text, "MuseumTypeEnum")
def match_digital_platform_type(text: str) -> str | None:
"""Match text to DigitalPlatformTypeEnum value.
Args:
text: Natural language text describing digital platform type
Returns:
Matched enum value or None
Example:
>>> match_digital_platform_type("virtueel museum")
"VIRTUAL_MUSEUM"
"""
return get_ontology_mapper().match_natural_language(text, "DigitalPlatformTypeEnum")
def get_heritage_code(custodian_type: str) -> str | None:
"""Get single-letter heritage code for custodian type.
Args:
custodian_type: CustodianPrimaryTypeEnum value
Returns:
Single-letter GLAMORCUBESFIXPHDNT code
Example:
>>> get_heritage_code("MUSEUM")
"M"
"""
return get_ontology_mapper().get_heritage_type_code(custodian_type)
def get_custodian_type_mapping() -> dict[str, str]:
"""Get custodian type to heritage code mapping.
Replaces hardcoded CUSTODIAN_TYPE_TO_HERITAGE_CODE in hybrid_retriever.py.
Returns:
Dict mapping CustodianPrimaryTypeEnum values to single-letter codes
"""
return get_ontology_mapper().get_custodian_type_to_code_mapping()
def get_role_keywords() -> dict[str, list[str]]:
"""Get role category to keywords mapping.
Replaces hardcoded ROLE_CATEGORY_KEYWORDS in hybrid_retriever.py.
Returns:
Dict mapping RoleCategoryEnum values to keyword lists
"""
return get_ontology_mapper().get_role_category_keywords()
# =============================================================================
# Main (for testing)
# =============================================================================
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
print("\n=== Testing OntologyMapper ===\n")
mapper = get_ontology_mapper()
# Test loading an enum
print("1. Loading DigitalPlatformTypeEnum...")
dp_enum = mapper.load_enum("DigitalPlatformTypeEnum")
if dp_enum:
print(f" Loaded {len(dp_enum.values)} values")
print(f" Sample values: {list(dp_enum.values.keys())[:5]}")
# Test natural language matching
print("\n2. Testing natural language matching...")
test_queries = [
("virtueel museum", "DigitalPlatformTypeEnum"),
("Digitales Museum", "DigitalPlatformTypeEnum"),
("museo virtual", "DigitalPlatformTypeEnum"),
("musée virtuel", "DigitalPlatformTypeEnum"),
("digital library", "DigitalPlatformTypeEnum"),
("museum", "CustodianPrimaryTypeEnum"),
("bibliotheek", "CustodianPrimaryTypeEnum"),
("archief", "CustodianPrimaryTypeEnum"),
]
for query, enum_name in test_queries:
result = mapper.match_natural_language(query, enum_name)
print(f" '{query}' -> {result}")
# Test heritage code mapping
print("\n3. Testing heritage code mapping...")
type_to_code = mapper.get_custodian_type_to_code_mapping()
print(f" Loaded {len(type_to_code)} mappings")
for k, v in list(type_to_code.items())[:5]:
print(f" {k} -> {v}")
# Test loading all enums
print("\n4. Loading all enums...")
all_enums = mapper.load_all_enums()
print(f" Loaded {len(all_enums)} enums")
# Show enum value counts
print("\n5. Enum value counts:")
for enum_name, enum_mapping in sorted(all_enums.items(), key=lambda x: len(x[1].values), reverse=True)[:10]:
print(f" {enum_name}: {len(enum_mapping.values)} values")
# Test prompt formatting
print("\n6. Testing prompt formatting...")
prompt = mapper.get_enum_values_for_prompt("CustodianPrimaryTypeEnum", max_values=5)
print(prompt)
print("\n=== Tests Complete ===")