""" Dynamic Ontology Mapping from LinkML Schema Files This module provides dynamic loading and matching of LinkML schema enumerations for the Heritage RAG pipeline. The LinkML schema files are the SINGLE SOURCE OF TRUTH - no hardcoded enum values. Key features: 1. Dynamically loads enum files from schemas/20251121/linkml/modules/enums/ 2. Extracts multilingual synonyms from the 'comments' field in YAML 3. Provides fuzzy matching for natural language queries 4. Supports cache invalidation based on file modification times 5. Generates filter mappings for Qdrant queries Usage: from backend.rag.ontology_mapping import get_ontology_mapper, match_custodian_type mapper = get_ontology_mapper() # Match natural language to schema enum value result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") # Returns: "VIRTUAL_MUSEUM" # Get heritage type code for Qdrant filtering code = mapper.get_heritage_type_code("MUSEUM") # Returns: "M" # Get custodian type to code mapping (replaces hardcoded dict) type_to_code = mapper.get_custodian_type_to_code_mapping() # Returns: {"GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", ...} """ from __future__ import annotations import logging import os import re import unicodedata from dataclasses import dataclass, field from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from typing import Any import yaml logger = logging.getLogger(__name__) # Default schema directory - matches schema_loader.py SCHEMA_BASE_DIR = Path(__file__).parent.parent.parent / "schemas" / "20251121" / "linkml" # Languages supported for synonym extraction (ISO 639-1 codes) SUPPORTED_LANGUAGES = {"en", "nl", "de", "fr", "es", "it", "pt"} # Heritage-specific vocabulary for domain-specific language detection # These terms are where general-purpose language detectors often fail on short heritage terms. # fast-langdetect is used as primary detector; this vocabulary is used as fallback for: # 1. Low-confidence detections (score < CONFIDENCE_THRESHOLD) # 2. Known problematic terms that detectors consistently misclassify # # NOTE: This is a REDUCED vocabulary focused only on disambiguation cases. # General-purpose language detection handles most terms correctly. HERITAGE_VOCABULARY: dict[str, set[str]] = { "nl": { # Dutch terms that fast-langdetect often misclassifies # (e.g., "musea" detected as Italian, "bibliotheken" as German) "musea", "bibliotheek", "bibliotheken", "archief", "archieven", "galerij", "galerijen", "collectie", "collecties", "verzameling", "heemkundige", "kring", "vereniging", "genootschap", "erfgoed", "rijks", "gemeentelijk", "provinciale", }, "de": { # German terms - most are detected correctly, keep only ambiguous ones "museen", "archiv", "sammlung", "sammlungen", "landesarchiv", "stadtarchiv", "bundesarchiv", }, "fr": { # French terms with diacritics are usually detected correctly # Keep only terms without diacritics that could be confused "musee", "musees", "bibliotheque", "bibliotheques", }, "es": { # Spanish - biblioteca/museo overlap with Italian "archivos", "bibliotecas", }, "it": { # Italian terms "musei", "archivi", "biblioteche", "galleria", "gallerie", }, "pt": { # Portuguese - museu is distinctive "museu", "museus", "arquivo", "arquivos", }, "en": { # English heritage terms - these should match English "library", "libraries", "museum", "museums", "archive", "archives", "gallery", "galleries", "collection", "collections", "society", "association", "foundation", "trust", "institute", }, } # Confidence threshold for fast-langdetect # Below this, fall back to heritage vocabulary matching LANGDETECT_CONFIDENCE_THRESHOLD = 0.6 # Flag to track if fast-langdetect is available _FAST_LANGDETECT_AVAILABLE: bool | None = None def _is_fast_langdetect_available() -> bool: """Check if fast-langdetect is available.""" global _FAST_LANGDETECT_AVAILABLE if _FAST_LANGDETECT_AVAILABLE is None: try: from fast_langdetect import detect # noqa: F401 _FAST_LANGDETECT_AVAILABLE = True except ImportError: _FAST_LANGDETECT_AVAILABLE = False logger.warning( "fast-langdetect not installed. Using heritage vocabulary fallback only. " "Install with: pip install fast-langdetect" ) return _FAST_LANGDETECT_AVAILABLE def _match_heritage_vocabulary(term: str) -> str | None: """Match term against heritage-specific vocabulary. This is the fallback method when fast-langdetect is unavailable or returns low confidence. Uses domain-specific heritage terms that general-purpose language detectors often misclassify. Args: term: The term to match Returns: Language code or None if no match """ normalized = normalize_text(term) original_lower = term.lower().strip() # Single-word exact match for lang, vocab in HERITAGE_VOCABULARY.items(): normalized_vocab = {normalize_text(v) for v in vocab} if normalized in normalized_vocab: return lang # Also check with original (preserves diacritics) if original_lower in {v.lower() for v in vocab}: return lang # Prefix matching for morphological variations # e.g., "bibliotheken" should match "bibliotheek" for lang, vocab in HERITAGE_VOCABULARY.items(): normalized_vocab = {normalize_text(v) for v in vocab} for marker in normalized_vocab: if len(marker) >= 5 and len(normalized) >= 5: if normalized.startswith(marker[:5]) or marker.startswith(normalized[:5]): return lang return None def detect_term_language(term: str) -> str | None: """Detect language of a term using hybrid approach. Uses a two-stage detection strategy: 1. Primary: fast-langdetect library (FastText model, 176 languages) 2. Fallback: Heritage-specific vocabulary for domain terms The fallback is used when: - fast-langdetect is not installed - Detection confidence is below threshold (0.6) - The term matches known heritage vocabulary Args: term: A single term to analyze (e.g., "bibliotheken", "museos") Returns: ISO 639-1 language code or None if detection fails Examples: >>> detect_term_language("bibliotheken") "nl" >>> detect_term_language("museos") "es" >>> detect_term_language("bibliothèques") "fr" >>> detect_term_language("Public libraries") "en" >>> detect_term_language("unknown term") None """ if not term or not term.strip(): return "en" # Default for empty strings normalized = normalize_text(term) words = normalized.split() # Multi-word phrase detection if len(words) > 1: # English phrase indicators - these words strongly suggest English english_indicators = { "public", "national", "special", "digital", "academic", "local", "art", "history", "science", "natural", "city", "state", "corporate", "government", "religious", "university", } if any(word in english_indicators for word in words): return "en" # Try heritage vocabulary first for known terms # This catches terms that fast-langdetect misclassifies heritage_match = _match_heritage_vocabulary(term) if heritage_match: return heritage_match # Use fast-langdetect if available if _is_fast_langdetect_available(): try: from fast_langdetect import detect result = detect(term) if isinstance(result, dict): lang = result.get("lang") score = result.get("score", 0) elif isinstance(result, list) and result: lang = result[0].get("lang") score = result[0].get("score", 0) else: lang = None score = 0 # Return if confidence is high enough if lang and score >= LANGDETECT_CONFIDENCE_THRESHOLD: # Map to supported languages (fast-langdetect returns ISO 639-1) if lang in SUPPORTED_LANGUAGES: return str(lang) # Some language codes need mapping lang_mapping: dict[str, str] = {"af": "nl"} # Afrikaans often confused with Dutch mapped = lang_mapping.get(str(lang), str(lang)) return mapped if mapped in SUPPORTED_LANGUAGES else None # Low confidence - fall through to return None logger.debug(f"Low confidence ({score:.2f}) for term '{term}', returning None") except Exception as e: logger.debug(f"fast-langdetect error for '{term}': {e}") # For multi-word terms without clear indicators, default to English if len(words) > 1: return "en" # Single word with no match - return None return None # GLAMORCUBESFIXPHDNT taxonomy mapping - enum value name to single-letter code # This mapping is STABLE (defined by taxonomy) but the enum VALUE NAMES may evolve # So we still load dynamically and match to this fixed mapping GLAMORCUBESFIXPHDNT_CODES: dict[str, str] = { # Primary type enum values -> single letter codes "GALLERY": "G", "LIBRARY": "L", "ARCHIVE": "A", "MUSEUM": "M", "OFFICIAL_INSTITUTION": "O", "RESEARCH_CENTER": "R", "COMMERCIAL": "C", "UNSPECIFIED": "U", "BIO_CUSTODIAN": "B", "EDUCATION_PROVIDER": "E", "HERITAGE_SOCIETY": "S", "FEATURE_CUSTODIAN": "F", "INTANGIBLE_HERITAGE_GROUP": "I", "MIXED": "X", "PERSONAL_COLLECTION": "P", "HOLY_SACRED_SITE": "H", "DIGITAL_PLATFORM": "D", "NON_PROFIT": "N", "TASTE_SCENT_HERITAGE": "T", } @dataclass class EnumValueInfo: """Detailed information for a single enum value. Attributes: name: The enum value name (e.g., "VIRTUAL_MUSEUM") description: Human-readable description wikidata_id: Wikidata entity ID from 'meaning' field (e.g., "Q1225034") synonyms: Language-tagged synonyms extracted from comments all_synonyms_normalized: Flattened list of normalized synonyms for matching """ name: str description: str | None = None wikidata_id: str | None = None synonyms: dict[str, list[str]] = field(default_factory=dict) # lang_code -> synonyms all_synonyms_normalized: list[str] = field(default_factory=list) @dataclass class EnumMapping: """Complete mapping for an enum type. Attributes: enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum") source_file: Path to the source YAML file values: Dictionary mapping value names to EnumValueInfo last_loaded: When this enum was last loaded file_mtime: File modification time for cache invalidation description: Enum-level description """ enum_name: str source_file: Path values: dict[str, EnumValueInfo] = field(default_factory=dict) last_loaded: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) file_mtime: float = 0.0 description: str | None = None def normalize_text(text: str) -> str: """Normalize text for matching: lowercase, remove accents, strip whitespace. Args: text: Input text to normalize Returns: Normalized text for comparison Examples: >>> normalize_text("Digitales Museum") "digitales museum" >>> normalize_text("musée virtuel") "musee virtuel" >>> normalize_text("Bibliothèque") "bibliotheque" """ # NFD decomposition separates base characters from combining marks normalized = unicodedata.normalize('NFD', text) # Remove combining marks (category 'Mn' = Mark, Nonspacing) ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') # Lowercase and strip return ascii_text.lower().strip() def parse_language_tag(comment: str) -> tuple[str | None, str]: """Parse a language-tagged comment string. Format: "term (lang_code)" -> ("lang_code", "term") Args: comment: Comment string, possibly with language tag Returns: Tuple of (language_code, term) where language_code may be None Examples: >>> parse_language_tag("Digitales Museum (de)") ("de", "Digitales Museum") >>> parse_language_tag("museo virtual (es)") ("es", "museo virtual") >>> parse_language_tag("Some plain comment") (None, "Some plain comment") """ # Pattern: text (lang_code) at end of string pattern = r'^(.+?)\s*\(([a-z]{2})\)\s*$' match = re.match(pattern, comment, re.IGNORECASE) if match: term = match.group(1).strip() lang = match.group(2).lower() if lang in SUPPORTED_LANGUAGES: return (lang, term) return (None, comment) def extract_comma_separated_terms(comment: str) -> list[str]: """Extract comma-separated terms from comments like "Includes X, Y, Z". Handles patterns commonly found in CustodianPrimaryTypeEnum.yaml: - "Includes bibliotheken, bibliotecas, bibliothèques" - "Public libraries, academic libraries, national libraries" - "Kunsthallen, art galleries, visual arts centers" Args: comment: A comment string that may contain comma-separated terms Returns: List of individual terms extracted from the comment Examples: >>> extract_comma_separated_terms("Includes musea, museos, musées") ["musea", "museos", "musées"] >>> extract_comma_separated_terms("Public libraries, academic libraries") ["Public libraries", "academic libraries"] >>> extract_comma_separated_terms("Some single term comment") [] # Empty list - no commas """ terms: list[str] = [] # Skip if no commas (not a list) if ',' not in comment: return terms # Strip common prefixes like "Includes", "Examples:", etc. cleaned = comment prefixes_to_strip = [ r'^Includes\s+', r'^Examples?:?\s*', r'^Types?:?\s*', r'^Such as\s+', r'^E\.g\.?,?\s*', r'^I\.e\.?,?\s*', ] for prefix in prefixes_to_strip: cleaned = re.sub(prefix, '', cleaned, flags=re.IGNORECASE) # Split by comma parts = cleaned.split(',') for part in parts: # Clean up each term term = part.strip() # Skip empty terms if not term: continue # Skip terms that look like full sentences (long descriptions) if len(term) > 50: continue # Skip terms that are just references like "(Q123456)" if re.match(r'^\(Q\d+\)$', term): continue # Handle trailing parentheses like "botanical gardens (Q473972)" # Extract just the term part paren_match = re.match(r'^(.+?)\s*\([^)]+\)\s*$', term) if paren_match: term = paren_match.group(1).strip() # Add valid terms if term and len(term) >= 2: terms.append(term) return terms def extract_wikidata_id(meaning: str | None) -> str | None: """Extract Wikidata ID from meaning field. Args: meaning: The meaning field value (e.g., "wikidata:Q1225034") Returns: The Wikidata ID (e.g., "Q1225034") or None """ if not meaning: return None # Handle "wikidata:Q12345" format if meaning.startswith("wikidata:"): return meaning.replace("wikidata:", "") # Handle full URI format if "wikidata.org" in meaning: match = re.search(r'(Q\d+)', meaning) if match: return match.group(1) return None class OntologyMapper: """Dynamic ontology mapping from LinkML schema files. This class loads enum definitions from the LinkML schema directory and provides: - Multilingual synonym extraction from YAML comments - Natural language matching to schema enum values - Cache invalidation based on file modification times - Integration helpers for Qdrant filtering Usage: mapper = OntologyMapper(schema_dir=Path("schemas/20251121/linkml")) # Load specific enum digital_platforms = mapper.load_enum("DigitalPlatformTypeEnum") print(len(digital_platforms.values)) # 53 # Match natural language (Dutch) result = mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") # Returns: "VIRTUAL_MUSEUM" # Get heritage type code for Qdrant filtering code = mapper.get_heritage_type_code("MUSEUM") # Returns: "M" """ def __init__(self, schema_dir: Path | None = None, watch_for_changes: bool = True): """Initialize the OntologyMapper. Args: schema_dir: Path to LinkML schema directory. Defaults to schemas/20251121/linkml/ watch_for_changes: Whether to check file mtimes for cache invalidation """ self.schema_dir = schema_dir or SCHEMA_BASE_DIR self.enums_dir = self.schema_dir / "modules" / "enums" self.watch_for_changes = watch_for_changes # Cache of loaded enums self._cache: dict[str, EnumMapping] = {} # File modification times for cache invalidation self._file_mtimes: dict[str, float] = {} logger.info(f"OntologyMapper initialized with schema_dir: {self.schema_dir}") def _get_enum_file_path(self, enum_name: str) -> Path: """Get the file path for an enum. Args: enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum") Returns: Path to the enum YAML file """ return self.enums_dir / f"{enum_name}.yaml" def _is_cache_stale(self, enum_name: str) -> bool: """Check if cached enum is stale based on file mtime. Args: enum_name: Name of the enum to check Returns: True if cache is stale and needs reload """ if not self.watch_for_changes: return False if enum_name not in self._cache: return True filepath = self._get_enum_file_path(enum_name) if not filepath.exists(): return True current_mtime = filepath.stat().st_mtime cached_mtime = self._file_mtimes.get(enum_name, 0.0) return current_mtime > cached_mtime def _parse_comments_to_synonyms( self, comments: list[str] | None ) -> tuple[dict[str, list[str]], list[str]]: """Parse comments field to extract multilingual synonyms. Handles three formats: 1. Language-tagged: "Digitales Museum (de)" -> {"de": ["Digitales Museum"]} 2. Comma-separated with auto-detection: "Includes musea, museos, musées" -> {"nl": ["musea"], "es": ["museos"], "fr": ["musées"]} 3. Plain terms: Added to all_normalized for fuzzy matching The auto-detection uses LANGUAGE_MARKERS to identify which language each term belongs to based on known heritage vocabulary patterns. Args: comments: List of comment strings from YAML Returns: Tuple of (synonyms_by_language, all_normalized_synonyms) Example: Input: ["Digitales Museum (de)", "Includes musea, museos, musées"] Output: ( {"de": ["Digitales Museum"], "nl": ["musea"], "es": ["museos"], "fr": ["musées"]}, ["digitales museum", "musea", "museos", "musees", ...] ) """ synonyms_by_lang: dict[str, list[str]] = {} all_normalized: list[str] = [] if not comments: return synonyms_by_lang, all_normalized def add_to_lang_dict(lang: str, term: str) -> None: """Helper to add term to language-specific dict.""" if lang not in synonyms_by_lang: synonyms_by_lang[lang] = [] # Avoid duplicates if term not in synonyms_by_lang[lang]: synonyms_by_lang[lang].append(term) for comment in comments: # Try to parse explicit language tag first lang, term = parse_language_tag(comment) # Add to language-specific dict if explicitly tagged if lang: add_to_lang_dict(lang, term) # Add normalized version to flat list normalized = normalize_text(term) if normalized and normalized not in all_normalized: all_normalized.append(normalized) # Extract comma-separated terms within the comment # This handles patterns like "Includes bibliotheken, bibliotecas, bibliothèques" comma_terms = extract_comma_separated_terms(comment) for cterm in comma_terms: cterm_normalized = normalize_text(cterm) if cterm_normalized and cterm_normalized not in all_normalized: all_normalized.append(cterm_normalized) # Try to detect language for this term detected_lang = detect_term_language(cterm) if detected_lang: # Store the original (unnormalized) term with its language add_to_lang_dict(detected_lang, cterm) return synonyms_by_lang, all_normalized def load_enum(self, enum_name: str, force_reload: bool = False) -> EnumMapping | None: """Load a single enum with cache invalidation. Args: enum_name: Name of the enum (e.g., "DigitalPlatformTypeEnum") force_reload: Force reload even if cached Returns: EnumMapping object or None if file doesn't exist """ # Check cache if not force_reload and not self._is_cache_stale(enum_name): cached = self._cache.get(enum_name) if cached: return cached # Load from file filepath = self._get_enum_file_path(enum_name) if not filepath.exists(): logger.warning(f"Enum file not found: {filepath}") return None try: with open(filepath, 'r', encoding='utf-8') as f: yaml_content = yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load enum {enum_name}: {e}") return None # Parse the enum file_mtime = filepath.stat().st_mtime enums_section = yaml_content.get("enums", {}) enum_def = enums_section.get(enum_name, {}) if not enum_def: # Try to find any enum in the file if enums_section: enum_name = next(iter(enums_section.keys())) enum_def = enums_section[enum_name] permissible_values = enum_def.get("permissible_values", {}) # Build EnumMapping mapping = EnumMapping( enum_name=enum_name, source_file=filepath, file_mtime=file_mtime, description=yaml_content.get("description") or enum_def.get("description"), ) for value_name, value_info in permissible_values.items(): if value_info is None: value_info = {} comments = value_info.get("comments", []) synonyms, all_normalized = self._parse_comments_to_synonyms(comments) # Add description to normalized synonyms description = value_info.get("description") if description: desc_normalized = normalize_text(description) if desc_normalized and desc_normalized not in all_normalized: all_normalized.append(desc_normalized) # Add the value name itself as a synonym name_normalized = normalize_text(value_name.replace("_", " ")) if name_normalized and name_normalized not in all_normalized: all_normalized.insert(0, name_normalized) mapping.values[value_name] = EnumValueInfo( name=value_name, description=description, wikidata_id=extract_wikidata_id(value_info.get("meaning")), synonyms=synonyms, all_synonyms_normalized=all_normalized, ) # Update cache self._cache[enum_name] = mapping self._file_mtimes[enum_name] = file_mtime logger.debug(f"Loaded enum {enum_name} with {len(mapping.values)} values") return mapping def load_all_enums(self) -> dict[str, EnumMapping]: """Load all enum files from schema directory. Returns: Dictionary mapping enum names to EnumMapping objects """ if not self.enums_dir.exists(): logger.warning(f"Enums directory not found: {self.enums_dir}") return {} loaded = {} for filepath in self.enums_dir.glob("*.yaml"): enum_name = filepath.stem mapping = self.load_enum(enum_name) if mapping: loaded[enum_name] = mapping logger.info(f"Loaded {len(loaded)} enums from {self.enums_dir}") return loaded def get_synonyms(self, enum_name: str, value: str) -> list[str]: """Get all synonyms for an enum value. Args: enum_name: Name of the enum value: Enum value name Returns: List of normalized synonyms """ mapping = self.load_enum(enum_name) if not mapping: return [] value_info = mapping.values.get(value) if not value_info: return [] return value_info.all_synonyms_normalized def get_enum_value_info(self, value_name: str, enum_name: str) -> EnumValueInfo | None: """Get detailed EnumValueInfo for a specific enum value. This method provides access to the full EnumValueInfo dataclass, including both language-tagged synonyms and all normalized synonyms. Args: value_name: The enum value name (e.g., "MUSEUM", "LIBRARY") enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum") Returns: EnumValueInfo object or None if not found Example: >>> mapper = get_ontology_mapper() >>> info = mapper.get_enum_value_info("LIBRARY", "CustodianPrimaryTypeEnum") >>> print(info.synonyms) # Language-tagged synonyms {"nl": ["bibliotheken"], "es": ["bibliotecas"], "fr": ["bibliothèques"]} >>> print(info.all_synonyms_normalized[:5]) # All normalized ["library", "bibliotheken", "bibliotecas", "bibliotheques", ...] """ mapping = self.load_enum(enum_name) if not mapping: logger.debug(f"Enum {enum_name} not found for get_enum_value_info") return None return mapping.values.get(value_name) def match_natural_language( self, text: str, enum_name: str, threshold: float = 0.8 ) -> str | None: """Fuzzy match natural language text to schema enum value. Args: text: Natural language text to match (e.g., "virtueel museum") enum_name: Name of the enum to match against threshold: Similarity threshold for fuzzy matching (0.0-1.0) Returns: Matched enum value name or None Examples: >>> mapper.match_natural_language("virtueel museum", "DigitalPlatformTypeEnum") "VIRTUAL_MUSEUM" >>> mapper.match_natural_language("Digitales Museum", "DigitalPlatformTypeEnum") "VIRTUAL_MUSEUM" """ mapping = self.load_enum(enum_name) if not mapping: return None normalized_query = normalize_text(text) if not normalized_query: return None # 1. Exact match against normalized synonyms for value_name, value_info in mapping.values.items(): if normalized_query in value_info.all_synonyms_normalized: return value_name # 2. Substring match (query is contained in synonym or vice versa) for value_name, value_info in mapping.values.items(): for synonym in value_info.all_synonyms_normalized: if normalized_query in synonym or synonym in normalized_query: return value_name # 3. Fuzzy match using basic similarity best_match: str | None = None best_score = 0.0 for value_name, value_info in mapping.values.items(): for synonym in value_info.all_synonyms_normalized: score = self._simple_similarity(normalized_query, synonym) if score > best_score and score >= threshold: best_score = score best_match = value_name return best_match def _simple_similarity(self, s1: str, s2: str) -> float: """Calculate simple similarity ratio between two strings. Uses multiple approaches: 1. Exact match (1.0) 2. Prefix match for singular/plural handling (0.9) 3. Word-level Jaccard similarity 4. Character bigram similarity Args: s1: First string s2: Second string Returns: Similarity ratio (0.0-1.0) """ if not s1 or not s2: return 0.0 # Exact match if s1 == s2: return 1.0 # Prefix match - handles singular/plural variations # e.g., "bibliotheek" matches "bibliotheken" (Dutch) # e.g., "archief" matches "archieven" (Dutch) min_len = min(len(s1), len(s2)) max_len = max(len(s1), len(s2)) # If one is a prefix of the other (with reasonable length overlap) if min_len >= 5 and max_len - min_len <= 3: shorter, longer = (s1, s2) if len(s1) < len(s2) else (s2, s1) if longer.startswith(shorter): return 0.95 # High score for prefix match # Common stem match - handle variations like archief/archieven, museum/musea # Use shared prefix ratio shared_prefix_len = 0 for i in range(min_len): if s1[i] == s2[i]: shared_prefix_len += 1 else: break # If they share a significant prefix (>= 70% of shorter word) if shared_prefix_len >= 4 and shared_prefix_len / min_len >= 0.7: return 0.90 # Word-level comparison words1 = set(s1.split()) words2 = set(s2.split()) if words1 and words2: intersection = len(words1 & words2) union = len(words1 | words2) word_similarity = intersection / union if union > 0 else 0.0 # Boost if high word overlap if word_similarity > 0.5: return word_similarity # Character-level bigram comparison def get_bigrams(s: str) -> set[str]: return {s[i:i+2] for i in range(len(s) - 1)} if len(s) > 1 else {s} bigrams1 = get_bigrams(s1) bigrams2 = get_bigrams(s2) intersection = len(bigrams1 & bigrams2) union = len(bigrams1 | bigrams2) return intersection / union if union > 0 else 0.0 def get_heritage_type_code(self, custodian_type: str) -> str | None: """Map CustodianPrimaryTypeEnum value to single-letter heritage code. Args: custodian_type: Enum value (e.g., "MUSEUM", "ARCHIVE") Returns: Single-letter GLAMORCUBESFIXPHDNT code or None Example: >>> mapper.get_heritage_type_code("MUSEUM") "M" >>> mapper.get_heritage_type_code("ARCHIVE") "A" """ return GLAMORCUBESFIXPHDNT_CODES.get(custodian_type) def get_custodian_type_to_code_mapping(self) -> dict[str, str]: """Generate CustodianPrimaryTypeEnum -> single-letter code mapping. This replaces the hardcoded CUSTODIAN_TYPE_TO_HERITAGE_CODE dict in hybrid_retriever.py. Returns: Dict mapping enum values to single-letter codes """ # Load the enum to get actual values mapping = self.load_enum("CustodianPrimaryTypeEnum") result = {} if mapping: for value_name in mapping.values: code = GLAMORCUBESFIXPHDNT_CODES.get(value_name) if code: result[value_name] = code else: # Fall back to static mapping if enum can't be loaded result = GLAMORCUBESFIXPHDNT_CODES.copy() return result def get_synonyms_for_value(self, value_name: str, enum_name: str) -> set[str]: """Get all synonyms for a specific enum value. This method retrieves all synonyms associated with an enum value, useful for building prompt context or understanding what natural language terms map to a given enum value. Collects synonyms from: 1. Language-tagged synonyms in comments (e.g., "bibliotheek [nl]") 2. Normalized synonyms from comma-separated lists (e.g., "Includes bibliotheken, bibliotecas") Args: value_name: The enum value name (e.g., "MUSEUM", "LIBRARY") enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum") Returns: Set of synonym strings. Returns empty set if enum or value not found. Example: >>> mapper = get_ontology_mapper() >>> synonyms = mapper.get_synonyms_for_value("LIBRARY", "CustodianPrimaryTypeEnum") >>> print(synonyms) {"bibliotheken", "bibliotecas", "bibliotheques", "library", ...} """ mapping = self.load_enum(enum_name) if not mapping: logger.debug(f"Enum {enum_name} not found for get_synonyms_for_value") return set() value_info = mapping.values.get(value_name) if not value_info: logger.debug(f"Value {value_name} not found in enum {enum_name}") return set() # Collect all synonyms from multiple sources all_synonyms: set[str] = set() # 1. Add language-tagged synonyms (from patterns like "bibliotheek [nl]") for lang_code, lang_synonyms in value_info.synonyms.items(): all_synonyms.update(lang_synonyms) # 2. Add normalized synonyms (from comma-separated lists in comments) # These are extracted during load_enum() from patterns like # "Includes bibliotheken, bibliotecas, bibliothèques" all_synonyms.update(value_info.all_synonyms_normalized) return all_synonyms def get_all_synonyms_by_language( self, value_name: str, enum_name: str ) -> dict[str, set[str]]: """Get synonyms for a value organized by language. Returns language-tagged synonyms from comments, plus an "all" key containing all normalized synonyms (not language-specific). Args: value_name: The enum value name (e.g., "MUSEUM", "LIBRARY") enum_name: Name of the enum (e.g., "CustodianPrimaryTypeEnum") Returns: Dict mapping language codes to sets of synonyms. The special key "all" contains all normalized synonyms regardless of language. Returns empty dict if enum or value not found. Example: >>> mapper = get_ontology_mapper() >>> by_lang = mapper.get_all_synonyms_by_language("LIBRARY", "CustodianPrimaryTypeEnum") >>> print(by_lang) { "nl": {"bibliotheek", "bibliotheken"}, "de": {"Bibliothek"}, "all": {"library", "bibliotheken", "bibliotecas", "bibliotheques", ...} } """ mapping = self.load_enum(enum_name) if not mapping: return {} value_info = mapping.values.get(value_name) if not value_info: return {} # Start with language-tagged synonyms result = {lang: set(syns) for lang, syns in value_info.synonyms.items()} # Add "all" key with all normalized synonyms result["all"] = set(value_info.all_synonyms_normalized) return result def get_enum_values_for_prompt( self, enum_name: str, max_values: int = 20, include_descriptions: bool = True ) -> str: """Format enum values for DSPy prompt injection. Args: enum_name: Name of the enum max_values: Maximum number of values to include include_descriptions: Whether to include value descriptions Returns: Formatted string for prompt injection """ mapping = self.load_enum(enum_name) if not mapping: return f"[Enum {enum_name} not found]" lines = [f"Valid values for {enum_name}:"] for i, (value_name, value_info) in enumerate(mapping.values.items()): if i >= max_values: remaining = len(mapping.values) - max_values lines.append(f" ... and {remaining} more values") break if include_descriptions and value_info.description: # Truncate long descriptions desc = value_info.description[:60] if len(value_info.description) > 60: desc += "..." lines.append(f" - {value_name}: {desc}") else: lines.append(f" - {value_name}") return "\n".join(lines) def get_valid_filter_values(self, enum_name: str) -> list[str]: """Get list of valid values for filtering (e.g., Qdrant). Args: enum_name: Name of the enum Returns: List of valid enum value names """ mapping = self.load_enum(enum_name) if not mapping: return [] return list(mapping.values.keys()) def invalidate_cache_if_changed(self) -> bool: """Check all cached enums and invalidate stale entries. Returns: True if any cache entries were invalidated """ if not self.watch_for_changes: return False invalidated = False for enum_name in list(self._cache.keys()): if self._is_cache_stale(enum_name): del self._cache[enum_name] del self._file_mtimes[enum_name] invalidated = True logger.info(f"Invalidated stale cache for {enum_name}") return invalidated def clear_cache(self) -> None: """Clear all cached enums.""" self._cache.clear() self._file_mtimes.clear() logger.info("Cleared ontology mapper cache") # ========================================================================= # Role Category Mapping (for person search) # ========================================================================= def get_role_category_keywords(self) -> dict[str, list[str]]: """Load role category keywords from RoleCategoryEnum. This replaces the hardcoded ROLE_CATEGORY_KEYWORDS dict. Keywords are extracted from the 'comments' field of each enum value. Returns: Dict mapping role category to list of keywords """ # Try to load from StaffRole.yaml which contains RoleCategoryEnum staff_role_path = self.schema_dir / "modules" / "classes" / "StaffRole.yaml" if not staff_role_path.exists(): logger.warning(f"StaffRole.yaml not found: {staff_role_path}") return {} try: with open(staff_role_path, 'r', encoding='utf-8') as f: yaml_content = yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load StaffRole.yaml: {e}") return {} enums = yaml_content.get("enums", {}) role_category_enum = enums.get("RoleCategoryEnum", {}) permissible_values = role_category_enum.get("permissible_values", {}) result = {} for category_name, category_info in permissible_values.items(): if category_info is None: continue # Extract keywords from comments and description keywords = [] # Get keywords from comments comments = category_info.get("comments", []) for comment in comments: # Parse language tag if present _, term = parse_language_tag(comment) normalized = normalize_text(term) if normalized: keywords.append(normalized) # Add keywords from description description = category_info.get("description") if description: # Split description into words and add significant ones words = description.lower().split() for word in words: if len(word) > 3 and word not in {"with", "that", "from", "have", "this"}: keywords.append(normalize_text(word)) # Add the category name itself keywords.append(normalize_text(category_name)) # Remove duplicates while preserving order seen = set() unique_keywords = [] for kw in keywords: if kw and kw not in seen: seen.add(kw) unique_keywords.append(kw) result[category_name] = unique_keywords return result # ============================================================================= # Singleton Access Pattern # ============================================================================= _ontology_mapper: OntologyMapper | None = None def get_ontology_mapper() -> OntologyMapper: """Get singleton OntologyMapper instance. Returns: Shared OntologyMapper instance """ global _ontology_mapper if _ontology_mapper is None: _ontology_mapper = OntologyMapper(SCHEMA_BASE_DIR) return _ontology_mapper def reset_ontology_mapper() -> None: """Reset the singleton instance (useful for testing).""" global _ontology_mapper _ontology_mapper = None # ============================================================================= # Convenience Functions # ============================================================================= def match_custodian_type(text: str) -> str | None: """Match text to CustodianPrimaryTypeEnum value. Args: text: Natural language text describing institution type Returns: Matched enum value or None Example: >>> match_custodian_type("museum") "MUSEUM" >>> match_custodian_type("bibliotheek") "LIBRARY" """ return get_ontology_mapper().match_natural_language(text, "CustodianPrimaryTypeEnum") def match_museum_type(text: str) -> str | None: """Match text to MuseumTypeEnum value. Args: text: Natural language text describing museum type Returns: Matched enum value or None """ return get_ontology_mapper().match_natural_language(text, "MuseumTypeEnum") def match_digital_platform_type(text: str) -> str | None: """Match text to DigitalPlatformTypeEnum value. Args: text: Natural language text describing digital platform type Returns: Matched enum value or None Example: >>> match_digital_platform_type("virtueel museum") "VIRTUAL_MUSEUM" """ return get_ontology_mapper().match_natural_language(text, "DigitalPlatformTypeEnum") def get_heritage_code(custodian_type: str) -> str | None: """Get single-letter heritage code for custodian type. Args: custodian_type: CustodianPrimaryTypeEnum value Returns: Single-letter GLAMORCUBESFIXPHDNT code Example: >>> get_heritage_code("MUSEUM") "M" """ return get_ontology_mapper().get_heritage_type_code(custodian_type) def get_custodian_type_mapping() -> dict[str, str]: """Get custodian type to heritage code mapping. Replaces hardcoded CUSTODIAN_TYPE_TO_HERITAGE_CODE in hybrid_retriever.py. Returns: Dict mapping CustodianPrimaryTypeEnum values to single-letter codes """ return get_ontology_mapper().get_custodian_type_to_code_mapping() def get_role_keywords() -> dict[str, list[str]]: """Get role category to keywords mapping. Replaces hardcoded ROLE_CATEGORY_KEYWORDS in hybrid_retriever.py. Returns: Dict mapping RoleCategoryEnum values to keyword lists """ return get_ontology_mapper().get_role_category_keywords() # ============================================================================= # Main (for testing) # ============================================================================= if __name__ == "__main__": logging.basicConfig(level=logging.INFO) print("\n=== Testing OntologyMapper ===\n") mapper = get_ontology_mapper() # Test loading an enum print("1. Loading DigitalPlatformTypeEnum...") dp_enum = mapper.load_enum("DigitalPlatformTypeEnum") if dp_enum: print(f" Loaded {len(dp_enum.values)} values") print(f" Sample values: {list(dp_enum.values.keys())[:5]}") # Test natural language matching print("\n2. Testing natural language matching...") test_queries = [ ("virtueel museum", "DigitalPlatformTypeEnum"), ("Digitales Museum", "DigitalPlatformTypeEnum"), ("museo virtual", "DigitalPlatformTypeEnum"), ("musée virtuel", "DigitalPlatformTypeEnum"), ("digital library", "DigitalPlatformTypeEnum"), ("museum", "CustodianPrimaryTypeEnum"), ("bibliotheek", "CustodianPrimaryTypeEnum"), ("archief", "CustodianPrimaryTypeEnum"), ] for query, enum_name in test_queries: result = mapper.match_natural_language(query, enum_name) print(f" '{query}' -> {result}") # Test heritage code mapping print("\n3. Testing heritage code mapping...") type_to_code = mapper.get_custodian_type_to_code_mapping() print(f" Loaded {len(type_to_code)} mappings") for k, v in list(type_to_code.items())[:5]: print(f" {k} -> {v}") # Test loading all enums print("\n4. Loading all enums...") all_enums = mapper.load_all_enums() print(f" Loaded {len(all_enums)} enums") # Show enum value counts print("\n5. Enum value counts:") for enum_name, enum_mapping in sorted(all_enums.items(), key=lambda x: len(x[1].values), reverse=True)[:10]: print(f" {enum_name}: {len(enum_mapping.values)} values") # Test prompt formatting print("\n6. Testing prompt formatting...") prompt = mapper.get_enum_values_for_prompt("CustodianPrimaryTypeEnum", max_values=5) print(prompt) print("\n=== Tests Complete ===")