glam/src/glam_extractor/extractors/nlp_extractor.py
2025-12-05 15:30:23 +01:00

1514 lines
59 KiB
Python

"""
NLP-based extraction of heritage institutions from conversation text.
This module provides the InstitutionExtractor class that uses pattern matching,
NER, and heuristics to extract structured institution data from unstructured
conversation text. Extracted records include confidence scores and provenance metadata.
"""
import re
import uuid
from datetime import datetime, timezone
from typing import List, Optional, Tuple, Set, Dict, Any
from dataclasses import dataclass
try:
from rapidfuzz import fuzz
RAPIDFUZZ_AVAILABLE = True
except ImportError:
RAPIDFUZZ_AVAILABLE = False
from glam_extractor.models import (
HeritageCustodian,
InstitutionType,
OrganizationStatus,
DataSource,
DataTier,
Provenance,
Location,
Identifier,
)
from glam_extractor.parsers.conversation import Conversation
# =============================================================================
# RESULT PATTERN FOR ERROR HANDLING
# =============================================================================
@dataclass
class Result:
"""Result type for error handling following Result pattern"""
success: bool
value: Any = None
error: Optional[str] = None
@classmethod
def ok(cls, value: Any) -> "Result":
"""Create a successful result"""
return cls(success=True, value=value, error=None)
@classmethod
def err(cls, error: str) -> "Result":
"""Create an error result"""
return cls(success=False, value=None, error=error)
# =============================================================================
# EXTRACTION DATA STRUCTURES
# =============================================================================
@dataclass
class ExtractedEntity:
"""
An entity extracted from text with metadata.
Used as an intermediate representation before converting to HeritageCustodian.
Note: institution_type is a string (e.g., 'MUSEUM', 'LIBRARY') since LinkML
PermissibleValue enum objects are not hashable and can't be used as dict keys.
"""
name: str
institution_type: Optional[str] = None # String key e.g. 'MUSEUM', 'LIBRARY'
city: Optional[str] = None
country: Optional[str] = None
identifiers: Optional[List[Identifier]] = None
confidence_score: float = 0.5
text_snippet: Optional[str] = None # The text where this was found
def __post_init__(self):
if self.identifiers is None:
self.identifiers = []
# =============================================================================
# PATTERN DEFINITIONS
# =============================================================================
class ExtractionPatterns:
"""Regular expression patterns for extracting identifiers and keywords"""
# Identifier patterns
# Enhanced ISIL pattern (Nov 2025) - supports multiple context formats:
# 1. Standard: "ISIL: NL-AsdAM" or "ISIL code: NL-AsdAM"
# 2. Standalone: "NL-AsdAM" (basic word boundary)
# 3. In context: "code NL-AsdAM" or "code: NL-AsdAM"
# 4. Parenthetical: "(NL-AsdAM)"
ISIL_PATTERN = re.compile(r'\b([A-Z]{2}-[A-Za-z0-9]+)\b')
# Additional context-aware ISIL patterns (Nov 2025)
# These catch ISIL codes that appear with common prefixes
ISIL_CONTEXT_PATTERNS = [
re.compile(r'\bISIL[:\s]+([A-Z]{2}-[A-Za-z0-9]+)\b', re.IGNORECASE), # "ISIL: NL-..."
re.compile(r'\bcode[:\s]+([A-Z]{2}-[A-Za-z0-9]+)\b', re.IGNORECASE), # "code: NL-..."
re.compile(r'\(([A-Z]{2}-[A-Za-z0-9]+)\)'), # "(NL-AsdAM)"
]
WIKIDATA_PATTERN = re.compile(r'\b(Q\d+)\b')
VIAF_URL_PATTERN = re.compile(r'viaf\.org/viaf/(\d+)')
KVK_PATTERN = re.compile(r'\b(\d{8})\b')
# ISIL code blacklist (common false positives)
# NOTE: Matching is case-insensitive, but stored in original case for reference
ISIL_BLACKLIST = {
'CD-ROM', # CD-ROM is not Congo ISIL code
'US-ASCII',
'UTF-8',
'ISO-8859',
'AI-Powered', # AI tool descriptions (English)
'AI-processed',
'AI-driven',
'AI-based',
'AI-enhanced',
'AI-assisted',
'AI-generated',
'AI-gedreven', # Dutch: AI-driven
'AI-aangedreven', # Dutch: AI-powered
'AI-ondersteund', # Dutch: AI-supported
'MS-DOS', # Operating systems
'US-WEST', # AWS regions
'EU-GDPR', # Legal frameworks
'US-EN', # Language codes
}
# Institution type keywords (multilingual)
# Map keyword variants to canonical English form for normalization
# Note: Use string keys (not PermissibleValue) because LinkML enum values are unhashable
INSTITUTION_KEYWORDS = {
'MUSEUM': [
'museum', 'museo', 'museu', 'musée', 'muzeum', 'muzeul',
'kunstmuseum', 'kunsthalle', 'muzej', 'μουσείο'
],
'LIBRARY': [
'library', 'biblioteca', 'bibliothek', 'bibliotheek', 'bibliothèque',
'biblioteka', 'knihovna', 'βιβλιοθήκη', 'national library'
],
'ARCHIVE': [
'archive', 'archivo', 'archiv', 'archief', 'archives',
'arkiv', 'arkivet', 'αρχείο', 'arquivos', 'national archive'
],
'GALLERY': [
'gallery', 'galerie', 'galerija', 'γκαλερί', 'art gallery',
'kunstgalerie', 'galería'
],
'RESEARCH_CENTER': [
'research center', 'research centre', 'research institute',
'onderzoekscentrum', 'forschungszentrum', 'centre de recherche'
],
'BOTANICAL_ZOO': [
'botanical garden', 'botanic garden', 'zoo', 'aquarium',
'botanische tuin', 'jardin botanique', 'dierentuin'
],
'EDUCATION_PROVIDER': [
'university', 'universiteit', 'universidad', 'université',
'college', 'school', 'educational institution'
],
}
# Keyword normalization map: variant → canonical English form
KEYWORD_NORMALIZATION = {
'museo': 'Museum',
'museu': 'Museum',
'musée': 'Museum',
'muzeum': 'Museum',
'muzeul': 'Museum',
'muzej': 'Museum',
'μουσείο': 'Museum',
'museum': 'Museum',
'biblioteca': 'Library',
'bibliothek': 'Library',
'bibliotheek': 'Library',
'bibliothèque': 'Library',
'biblioteka': 'Library',
'knihovna': 'Library',
'βιβλιοθήκη': 'Library',
'library': 'Library',
'archivo': 'Archive',
'archiv': 'Archive',
'archief': 'Archive',
'archives': 'Archives',
'arkiv': 'Archive',
'arkivet': 'Archive',
'αρχείο': 'Archive',
'arquivos': 'Archives',
'archive': 'Archive',
'galerie': 'Gallery',
'galerija': 'Gallery',
'γκαλερί': 'Gallery',
'galería': 'Gallery',
'gallery': 'Gallery',
}
# Location extraction patterns
CITY_PATTERN = re.compile(
r'\bin\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)' # "in Amsterdam", "in New York"
)
# Country name to ISO 3166-1 alpha-2 code mapping
# Used to infer country from conversation filename
COUNTRY_NAME_TO_CODE = {
'brazilian': 'BR', 'brazil': 'BR',
'vietnamese': 'VN', 'vietnam': 'VN',
'chilean': 'CL', 'chile': 'CL',
'japanese': 'JP', 'japan': 'JP',
'mexican': 'MX', 'mexico': 'MX',
'canadian': 'CA', 'canada': 'CA',
'polish': 'PL', 'poland': 'PL',
'hungarian': 'HU', 'hungary': 'HU',
'norwegian': 'NO', 'norway': 'NO',
'portuguese': 'PT', 'portugal': 'PT',
'thai': 'TH', 'thailand': 'TH',
'taiwan': 'TW',
'turkish': 'TR', 'turkey': 'TR',
'belgian': 'BE', 'belgium': 'BE',
'swedish': 'SE', 'sweden': 'SE',
'azerbaijan': 'AZ',
'estonian': 'EE', 'estonia': 'EE',
'south african': 'ZA', 'south africa': 'ZA',
'namibian': 'NA', 'namibia': 'NA',
'iraqi': 'IQ', 'iraq': 'IQ',
'algeria': 'DZ', 'algerian': 'DZ',
'argentine': 'AR', 'argentina': 'AR',
'moroccan': 'MA', 'morocco': 'MA',
'tunisian': 'TN', 'tunisia': 'TN',
'libyan': 'LY', 'libya': 'LY',
'mali': 'ML',
'senegal': 'SN',
'mauritania': 'MR',
'egyptian': 'EG', 'egypt': 'EG',
'ghana': 'GH',
'jordanian': 'JO', 'jordan': 'JO',
'iranian': 'IR', 'iran': 'IR',
'russian': 'RU', 'russia': 'RU',
'uzbekistan': 'UZ',
'armenian': 'AM', 'armenia': 'AM',
'georgia': 'GE', 'georgian': 'GE',
'croatian': 'HR', 'croatia': 'HR',
'greece': 'GR', 'greek': 'GR',
'nigerian': 'NG', 'nigeria': 'NG',
'somali': 'SO', 'somalia': 'SO',
'yemen': 'YE',
'oman': 'OM',
'korean': 'KR', 'korea': 'KR', 'south korea': 'KR',
'north korean': 'KP', 'north korea': 'KP',
'malaysian': 'MY', 'malaysia': 'MY',
'colombian': 'CO', 'colombia': 'CO',
'swiss': 'CH', 'switzerland': 'CH',
'nepal': 'NP',
'united states': 'US', 'american': 'US',
'serbian': 'RS', 'serbia': 'RS',
'moldavian': 'MD', 'moldova': 'MD',
'bulgarian': 'BG', 'bulgaria': 'BG',
'romanian': 'RO', 'romania': 'RO',
'albanian': 'AL', 'albania': 'AL',
'bosnian': 'BA', 'bosnia': 'BA',
'india': 'IN', 'indian': 'IN',
'bhutan': 'BT',
'pakistan': 'PK',
'suriname': 'SR',
'nicaragua': 'NI',
'congo': 'CG',
'danish': 'DK', 'denmark': 'DK',
'austrian': 'AT', 'austria': 'AT',
'australian': 'AU', 'australia': 'AU',
'burma': 'MM', 'myanmar': 'MM',
'cambodian': 'KH', 'cambodia': 'KH',
'afghan': 'AF', 'afghanistan': 'AF',
'sri lankan': 'LK', 'sri lanka': 'LK',
'laos': 'LA',
'tajikistan': 'TJ',
'turkmenistan': 'TM',
'uruguay': 'UY',
'philippine': 'PH', 'philippines': 'PH',
'finnish': 'FI', 'finland': 'FI',
'latvian': 'LV', 'latvia': 'LV',
'israeli': 'IL', 'israel': 'IL',
'palestinian': 'PS', 'palestine': 'PS',
'cyprus': 'CY',
'overijssel': 'NL', 'limburg': 'NL', 'north brabant': 'NL',
'zeeland': 'NL', 'zuid holland': 'NL', 'noord holland': 'NL',
'gelderland': 'NL', 'drenthe': 'NL', 'groningen': 'NL',
'friesland': 'NL', 'flevoland': 'NL',
'dutch': 'NL', 'netherlands': 'NL',
'slovak': 'SK', 'slovakia': 'SK',
'slovenian': 'SI', 'slovenia': 'SI',
'north macedonia': 'MK', 'macedonia': 'MK',
'peruvian': 'PE', 'peru': 'PE',
'ethiopian': 'ET', 'ethiopia': 'ET',
'kenyan': 'KE', 'kenya': 'KE',
'paraguay': 'PY',
'honduran': 'HN', 'honduras': 'HN',
'panamanian': 'PA', 'panama': 'PA',
'madagascar': 'MG',
'mozambique': 'MZ',
'eritrean': 'ER', 'eritrea': 'ER',
'sudan': 'SD',
'rwandan': 'RW', 'rwanda': 'RW',
'kiribati': 'KI',
'new zealand': 'NZ',
'haiti': 'HT',
'jamaican': 'JM', 'jamaica': 'JM',
'cuban': 'CU', 'cuba': 'CU',
'indonesian': 'ID', 'indonesia': 'ID',
'vatican': 'VA',
'italian': 'IT', 'italy': 'IT',
'zimbabwe': 'ZW',
'east timor': 'TL',
'qatar': 'QA',
'arabic emirates': 'AE', 'uae': 'AE',
'kuwait': 'KW',
'lebanese': 'LB', 'lebanon': 'LB',
'syrian': 'SY', 'syria': 'SY',
'saudi arabian': 'SA', 'saudi arabia': 'SA',
'maldives': 'MV',
'burkina faso': 'BF',
'togo': 'TG',
'benin': 'BJ',
'liberian': 'LR', 'liberia': 'LR',
}
# Common patterns for institution names
# Match capitalized sequences that could be institution names
INSTITUTION_NAME_PATTERN = re.compile(
r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,5})\b'
)
# =============================================================================
# INSTITUTION EXTRACTOR
# =============================================================================
class InstitutionExtractor:
"""
Extract heritage institution data from conversation text using NLP techniques.
This extractor uses pattern matching and heuristics to identify institutions,
classify their types, extract locations, and find associated identifiers.
Each extraction includes a confidence score for data quality tracking.
Usage:
extractor = InstitutionExtractor()
result = extractor.extract_from_conversation(conversation)
if result.success:
for institution in result.value:
print(f"{institution.name} - {institution.institution_type}")
"""
def __init__(self):
"""Initialize the extractor"""
self.patterns = ExtractionPatterns()
def extract_from_conversation(
self,
conversation: Conversation
) -> Result:
"""
Extract heritage institutions from a conversation.
Infers country from conversation name when possible.
Args:
conversation: Parsed Conversation object
Returns:
Result containing List[HeritageCustodian] on success, error message on failure
"""
try:
# Extract text from assistant messages (most likely to contain institution data)
text = conversation.extract_all_text(sender="assistant")
if not text.strip():
return Result.ok([]) # No text to process, but not an error
# Infer country from conversation name
inferred_country = self._infer_country_from_name(conversation.name)
# Extract entities from text
entities = self._extract_entities(text, inferred_country=inferred_country)
# Convert entities to HeritageCustodian records
custodians = []
for entity in entities:
custodian = self._entity_to_custodian(
entity,
conversation_id=conversation.uuid,
conversation_name=conversation.name
)
custodians.append(custodian)
return Result.ok(custodians)
except Exception as e:
return Result.err(f"Extraction failed: {str(e)}")
def extract_from_text(
self,
text: str,
conversation_id: Optional[str] = None,
conversation_name: Optional[str] = None
) -> Result:
"""
Extract heritage institutions from raw text.
Args:
text: Text to extract from
conversation_id: Optional conversation UUID for provenance
conversation_name: Optional conversation name for context
Returns:
Result containing List[HeritageCustodian] on success, error message on failure
"""
try:
if not text.strip():
return Result.ok([])
entities = self._extract_entities(text)
custodians = []
for entity in entities:
custodian = self._entity_to_custodian(
entity,
conversation_id=conversation_id,
conversation_name=conversation_name
)
custodians.append(custodian)
return Result.ok(custodians)
except Exception as e:
return Result.err(f"Extraction failed: {str(e)}")
# =========================================================================
# HELPER METHODS
# =========================================================================
def _infer_country_from_name(self, conversation_name: Optional[str]) -> Optional[str]:
"""
Infer country code from conversation name.
Example: "Brazilian_GLAM_collection_inventories""BR"
Args:
conversation_name: Conversation name/title
Returns:
ISO 3166-1 alpha-2 country code if found, otherwise None
"""
if not conversation_name:
return None
name_lower = conversation_name.lower()
# Check against country name mapping
for country_name, country_code in self.patterns.COUNTRY_NAME_TO_CODE.items():
if country_name in name_lower:
return country_code
return None
# =========================================================================
# ENTITY EXTRACTION
# =========================================================================
def _extract_entities(
self,
text: str,
inferred_country: Optional[str] = None
) -> List[ExtractedEntity]:
"""
Extract institution entities from text.
This method:
1. Splits text into sentences
2. Identifies sentences mentioning institutions
3. Extracts institution names, types, locations, and identifiers
4. Assigns confidence scores
5. Uses inferred_country as fallback when location extraction fails
Args:
text: Text to extract from
inferred_country: ISO country code inferred from conversation name (optional)
Returns:
List of extracted entities
"""
entities = []
# Split into sentences for context-aware extraction
sentences = self._split_sentences(text)
for sentence in sentences:
# Check if sentence contains institution-related keywords
if not self._is_institution_sentence(sentence):
continue
# Extract institution type from keywords
institution_type = self._classify_institution_type(sentence)
# Extract potential institution names
names = self._extract_institution_names(sentence)
# Extract location information
city, country = self._extract_location(sentence)
# V5: Validate country context (explicit vs inferred)
validated_country, country_source = self._validate_country_context(
sentence, names[0] if names else "", inferred_country
)
if validated_country:
country = validated_country
# Extract identifiers
identifiers = self._extract_identifiers(sentence)
# Create entities for each name found
for name in names:
# V5: Skip organizations/networks
if self._is_organization_or_network(name, sentence):
continue
# V5: Skip improper institutional names
if not self._is_proper_institutional_name(name, sentence):
continue
# V5: Calculate confidence score with enhanced algorithm
confidence = self._calculate_confidence_v5(
name=name,
institution_type=institution_type,
city=city,
country=country,
identifiers=identifiers,
sentence=sentence,
country_source=country_source
)
entity = ExtractedEntity(
name=name,
institution_type=institution_type,
city=city,
country=country,
identifiers=identifiers,
confidence_score=confidence,
text_snippet=sentence[:200] # First 200 chars for reference
)
entities.append(entity)
# Deduplicate entities by name
entities = self._deduplicate_entities(entities)
return entities
def _split_sentences(self, text: str) -> List[str]:
"""
Split text into sentences.
Simple sentence splitting on common delimiters.
Args:
text: Text to split
Returns:
List of sentences
"""
# Split on period, exclamation, question mark followed by space and capital
sentences = re.split(r'[.!?]\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def _is_institution_sentence(self, sentence: str) -> bool:
"""
Check if sentence likely mentions a heritage institution.
Args:
sentence: Sentence to check
Returns:
True if sentence contains institution-related keywords
"""
sentence_lower = sentence.lower()
# Check for any institution type keyword
for keywords in self.patterns.INSTITUTION_KEYWORDS.values():
for keyword in keywords:
if keyword in sentence_lower:
return True
# Check for identifier patterns
if (self.patterns.ISIL_PATTERN.search(sentence) or
self.patterns.WIKIDATA_PATTERN.search(sentence) or
self.patterns.VIAF_URL_PATTERN.search(sentence)):
return True
return False
def _classify_institution_type(self, sentence: str) -> Optional[str]:
"""
Classify institution type based on keywords in sentence.
Args:
sentence: Sentence to classify
Returns:
Institution type string (e.g., 'MUSEUM', 'LIBRARY') if detected, otherwise None
"""
sentence_lower = sentence.lower()
# Check for type keywords, prioritize more specific types
for inst_type, keywords in self.patterns.INSTITUTION_KEYWORDS.items():
for keyword in keywords:
if keyword in sentence_lower:
return inst_type
return None
def _extract_institution_names(self, sentence: str) -> List[str]:
"""
Extract potential institution names from sentence.
Handles multiple patterns:
1. "[Type] of [Name]": "Museum of Modern Art"
2. "[Name] + [Type]": "British Museum", "National Library"
3. Compound words: "Rijksmuseum" (contains "museum")
4. ISIL-based: "NL-AsdAM for Amsterdam Museum"
Normalizes multilingual keyword variants to canonical English forms.
Args:
sentence: Sentence to extract from
Returns:
List of potential institution names (normalized)
"""
names = []
sentence_lower = sentence.lower()
# Pattern 1: Keyword appears AT THE START of institution name
# "Museum of Modern Art", "Library of Congress"
for inst_type, keywords in self.patterns.INSTITUTION_KEYWORDS.items():
for keyword in keywords:
if keyword not in sentence_lower:
continue
# Find all occurrences
idx = 0
while True:
idx = sentence_lower.find(keyword, idx)
if idx == -1:
break
# Check if keyword is at word boundary (start of institution name)
if idx > 0 and sentence_lower[idx-1].isalnum():
# Pattern 3: Compound word (e.g., "Rijksmuseum")
# Extract backward to find start of the capitalized compound word
start = idx
while start > 0 and sentence[start-1].isalpha():
start -= 1
# Extract forward to find end of compound word
end = idx + len(keyword)
while end < len(sentence) and sentence[end].isalpha():
end += 1
# Check if we found a capitalized word
if start < idx and sentence[start].isupper():
compound_word = sentence[start:end]
# Verify it's a proper noun (capitalized)
if compound_word[0].isupper():
# Check if there are capitalized words BEFORE this compound word
# (indicates multi-word name like "Van Abbemuseum")
prefix = sentence[max(0, start-50):start].strip()
if prefix:
words_before = prefix.split()
# Check last few words before compound
has_prefix_name = False
for word in words_before[-3:]: # Check last 3 words
if word and word[0].isupper() and word.lower() not in ['the', 'a', 'an', 'in', 'at']:
has_prefix_name = True
break
if has_prefix_name:
# This is a multi-word name, use Pattern 2 logic below
# Don't append compound_word, let Pattern 2 handle it
idx += 1
# Fall through to Pattern 2 (don't continue)
else:
# True compound word (single word like "Rijksmuseum")
names.append(compound_word)
idx += 1
continue
else:
# No prefix, true compound word
names.append(compound_word)
idx += 1
continue
idx += 1
# Fall through to Pattern 2
# Extract text AFTER keyword (for "Museum of Modern Art" and "Museu Nacional" patterns)
text_after = sentence[idx:idx+100]
# Pattern 1a: keyword + "of/for/and" + capitalized words (English)
# "Museum of Modern Art", "Library of Congress"
match = re.match(
r'(' + re.escape(keyword) + r'(?:\s+(?:of|for|and)\s+[A-Z][a-zA-Z]+)+)',
text_after,
re.IGNORECASE
)
if match:
full_name = match.group(1).strip()
# Normalize keyword to canonical English form
normalized_name = full_name
for variant, canonical in self.patterns.KEYWORD_NORMALIZATION.items():
if full_name.lower().startswith(variant):
normalized_name = canonical + full_name[len(variant):]
break
names.append(normalized_name)
else:
# Pattern 1b: keyword + capitalized words (stop at location markers)
# "Museu Nacional", "Biblioteca Nacional"
# Match until we hit location markers (in, at, from) or lowercase word
parts = [keyword.title() if keyword.lower() in self.patterns.KEYWORD_NORMALIZATION else keyword]
words_after = text_after[len(keyword):].strip().split()
for word in words_after:
# Stop at location indicators
if word.lower() in ['in', 'at', 'from', 'located', 'on', 'near']:
break
# Stop at lowercase words (articles, prepositions)
if word.lower() in ['the', 'a', 'an', 'de', 'do', 'da', 'del', 'de la', 'di']:
break
# Include capitalized words
if word[0].isupper():
parts.append(word)
else:
break
if len(parts) > 1: # At least keyword + one name part
# Normalize keyword to canonical English form
normalized_keyword = self.patterns.KEYWORD_NORMALIZATION.get(
keyword.lower(),
keyword.title()
)
full_name = ' '.join([normalized_keyword] + parts[1:])
names.append(full_name)
# Pattern 2: Keyword appears AFTER the name
# "British Museum", "National Library"
# Extract text BEFORE keyword (up to 50 chars)
prefix = sentence[max(0, idx-50):idx].strip()
if prefix:
words = prefix.split()
name_parts = []
for i, word in enumerate(reversed(words)):
# Stop at leading articles only
if word.lower() in ['the', 'a', 'an'] and len(name_parts) == 0:
continue # Skip leading article
# Stop at location indicators
if word.lower() in ['in', 'at', 'from', 'located']:
break
# Include capitalized words or mid-name prepositions
if word[0].isupper() or word.lower() in ['of', 'for', 'and']:
name_parts.insert(0, word)
else:
break
if name_parts:
# Normalize the keyword to canonical English form
normalized_keyword = self.patterns.KEYWORD_NORMALIZATION.get(
keyword.lower(),
keyword.title()
)
full_name = ' '.join(name_parts + [normalized_keyword])
names.append(full_name)
idx += 1
# Pattern 4: Match ISIL pattern followed by institution name
# Example: "NL-AsdAM for Amsterdam Museum"
isil_matches = self.patterns.ISIL_PATTERN.finditer(sentence)
for match in isil_matches:
# Look for "for" or "identifies" after ISIL code
text_after = sentence[match.end():match.end()+50]
for_match = re.search(r'\s+(?:for|identifies)\s+([A-Z][a-zA-Z\s]+)', text_after)
if for_match:
name = for_match.group(1).strip()
# Clean up name (stop at punctuation)
name = re.split(r'[,.\(\)]', name)[0].strip()
if name:
names.append(name)
return list(set(names)) # Remove duplicates
def _extract_location(self, sentence: str) -> Tuple[Optional[str], Optional[str]]:
"""
Extract location information from sentence.
Args:
sentence: Sentence to extract from
Returns:
Tuple of (city, country) or (None, None)
"""
city = None
country = None
# Extract city using "in [City]" pattern
city_match = self.patterns.CITY_PATTERN.search(sentence)
if city_match:
city = city_match.group(1).strip()
# Extract country code if present (e.g., "NL-", "US-")
isil_match = self.patterns.ISIL_PATTERN.search(sentence)
if isil_match:
isil_code = isil_match.group(1)
country = isil_code.split('-')[0] # First part is country code
# If no country from ISIL, scan sentence for country names
if not country:
sentence_lower = sentence.lower()
# Sort by length (longest first) to match "United States" before "States"
sorted_countries = sorted(
self.patterns.COUNTRY_NAME_TO_CODE.items(),
key=lambda x: len(x[0]),
reverse=True
)
for country_name, country_code in sorted_countries:
# Use word boundary matching to avoid false positives
# e.g., "Austria" shouldn't match "Australian"
if f' {country_name} ' in f' {sentence_lower} ' or \
sentence_lower.startswith(f'{country_name} ') or \
sentence_lower.endswith(f' {country_name}'):
country = country_code
break
return city, country
# =========================================================================
# V5 VALIDATION METHODS
# =========================================================================
def _validate_country_context(
self,
sentence: str,
name: str,
inferred_country: Optional[str]
) -> Tuple[Optional[str], str]:
"""
Validate country assignment by checking for explicit mentions in context.
V5 enhancement to prevent geographic errors like assigning Malaysian
institutions to Netherlands based solely on conversation filename.
Args:
sentence: The sentence containing the institution mention
name: Institution name being validated
inferred_country: Country code inferred from conversation filename
Returns:
Tuple of (validated_country, country_source) where:
- validated_country: ISO 3166-1 alpha-2 country code or None
- country_source: 'explicit' | 'inferred' | 'none'
Examples:
>>> # Explicit country mention overrides inferred
>>> validate("University Malaysia in Kuala Lumpur", "University Malaysia", "NL")
('MY', 'explicit')
>>> # No explicit country, use inferred
>>> validate("Amsterdam Museum holds...", "Amsterdam Museum", "NL")
('NL', 'inferred')
>>> # Contradiction: reject extraction
>>> validate("Islamic University Malaysia", "Islamic University", "NL")
(None, 'none')
"""
# First, check if sentence explicitly mentions ANY country
sentence_lower = sentence.lower()
name_lower = name.lower()
# Look for explicit country mentions near the institution name
# Pattern 1: "[Name] in [Country]"
# Pattern 2: "[Name], [City], [Country]"
# Pattern 3: "[Country]'s [Name]" or "[Country] [Name]"
explicit_country = None
# Sort countries by length (longest first) to match multi-word names
sorted_countries = sorted(
self.patterns.COUNTRY_NAME_TO_CODE.items(),
key=lambda x: len(x[0]),
reverse=True
)
for country_name, country_code in sorted_countries:
# Check if country name appears in sentence
if f' {country_name} ' in f' {sentence_lower} ' or \
sentence_lower.startswith(f'{country_name} ') or \
sentence_lower.endswith(f' {country_name}'):
# Check if country mention is near institution name (within 50 chars)
name_pos = sentence_lower.find(name_lower)
country_pos = sentence_lower.find(country_name)
if name_pos != -1 and country_pos != -1:
distance = abs(name_pos - country_pos)
if distance < 50: # Within 50 characters = likely related
explicit_country = country_code
break
# Check for ISIL codes (strongest signal for country)
isil_match = self.patterns.ISIL_PATTERN.search(sentence)
if isil_match:
isil_code = isil_match.group(1)
explicit_country = isil_code.split('-')[0]
# Decision logic
if explicit_country:
# Found explicit country mention in text
if inferred_country and explicit_country != inferred_country:
# Contradiction: explicit country contradicts inferred country
# Example: sentence mentions "Malaysia" but inferred is "NL"
# REJECT: This institution doesn't belong to inferred country
return None, 'none'
else:
# Explicit country matches inferred (or no inferred country)
return explicit_country, 'explicit'
# No explicit country found
if inferred_country:
# Use inferred country, but mark as less confident
return inferred_country, 'inferred'
# No country information available
return None, 'none'
def _is_organization_or_network(self, name: str, sentence: str) -> bool:
"""
Check if extracted name is an organization/network rather than an institution.
V5 enhancement to filter out:
- International organizations (IFLA, UNESCO, ICOM)
- Networks and platforms (Archive Net, Museum Association)
- Federations and consortia
Args:
name: Institution name to validate
sentence: Context sentence
Returns:
True if name should be filtered (is an organization/network)
Examples:
>>> _is_organization_or_network("IFLA", "IFLA sets library standards")
True
>>> _is_organization_or_network("National Library", "The National Library...")
False
"""
name_lower = name.lower()
sentence_lower = sentence.lower()
# Organization blacklist (case-insensitive)
ORGANIZATION_BLACKLIST = {
# International organizations
'ifla', 'unesco', 'icom', 'icomos', 'ica',
'international federation of library associations',
'international council of museums',
'international council on archives',
# Networks and platforms
'archive net', 'netwerk oorlogsbronnen',
'museum association', 'archives association',
'library consortium', 'museum network',
# Generic organizational terms (when standalone)
'federation', 'consortium', 'network', 'association',
'platform', 'union', 'alliance',
}
# Check exact match against blacklist
if name_lower in ORGANIZATION_BLACKLIST:
return True
# Check if any blacklist term is contained in name
for org_term in ORGANIZATION_BLACKLIST:
if org_term in name_lower and len(name_lower) - len(org_term) < 5:
# Name is mostly just the blacklisted term
return True
# Pattern detection: "X is a network of Y"
NETWORK_PATTERNS = [
r'\b' + re.escape(name_lower) + r'\s+is\s+a\s+network',
r'\b' + re.escape(name_lower) + r'\s+is\s+an?\s+organization',
r'\b' + re.escape(name_lower) + r'\s+is\s+an?\s+association',
r'\b' + re.escape(name_lower) + r'\s+is\s+an?\s+federation',
r'\bnetwork\s+of\s+\d+\s+\w+\s+including\s+' + re.escape(name_lower),
]
for pattern in NETWORK_PATTERNS:
if re.search(pattern, sentence_lower):
return True
return False
def _is_proper_institutional_name(self, name: str, sentence: str) -> bool:
"""
Validate that extracted name is a proper institution name, not a generic term.
V5 enhancement to filter out:
- Generic descriptors (Library FabLab, Museum Café)
- Concepts/services rather than named institutions
- Academic departments without proper institutional context
Args:
name: Institution name to validate
sentence: Context sentence
Returns:
True if name is valid, False if should be filtered
Examples:
>>> _is_proper_institutional_name("Library FabLab", "The Library FabLab...")
False # Generic service descriptor
>>> _is_proper_institutional_name("Rijksmuseum", "Rijksmuseum in Amsterdam")
True # Valid compound name
>>> _is_proper_institutional_name("Southeast Asian Studies", "...")
False # Academic department without institution
"""
name_lower = name.lower()
# Generic descriptor blacklist
GENERIC_DESCRIPTORS = {
'library fablab', 'library makerspace', 'library café', 'library cafe',
'museum café', 'museum cafe', 'museum shop', 'museum store',
'archive reading room', 'archive portal',
'library services', 'museum services', 'archive services',
# Too generic (without specific qualifier)
'dutch museum', 'dutch library', 'dutch archive',
'local archive', 'local museum', 'local library',
'university library', 'university archive', 'university museum',
'city archive', 'city museum', 'city library',
}
# Check against generic descriptor blacklist
if name_lower in GENERIC_DESCRIPTORS:
return False
# Check for generic patterns
GENERIC_PATTERNS = [
r'^(library|museum|archive)\s+(fablab|makerspace|café|cafe|shop)$',
r'^(university|city|local|regional)\s+(library|museum|archive)$',
]
for pattern in GENERIC_PATTERNS:
if re.match(pattern, name_lower):
return False
# Check for academic department patterns (without institutional context)
# "Southeast Asian Studies" is too vague
# "Southeast Asian Studies Library, Leiden University" would be OK
if name_lower.endswith(' studies') and not any(
keyword in name_lower for keyword in ['library', 'museum', 'archive', 'center', 'centre']
):
return False
# Minimum name requirements
words = name.split()
# Check for compound words (Rijksmuseum, Tropenmuseum)
# These are allowed as single-word names
institution_keywords = [
'museum', 'museu', 'museo', 'musée', 'muzeum',
'library', 'biblioteca', 'bibliothek', 'bibliotheek',
'archive', 'archivo', 'archiv', 'archief',
'gallery', 'galerie',
]
is_compound_word = False
if len(words) == 1:
# Check if single word contains institution keyword as suffix
for keyword in institution_keywords:
if name_lower.endswith(keyword) and len(name) > len(keyword):
# Has prefix before keyword (Rijks-museum)
is_compound_word = True
break
# Single-word names only allowed if compound
if len(words) == 1 and not is_compound_word:
return False
# Multi-word names must have at least one word that's NOT just the keyword
if len(words) >= 2:
# At least one word should be a proper name (not just "National Museum")
has_specific_name = False
for word in words:
word_lower = word.lower()
# Skip articles, prepositions, and generic adjectives
if word_lower in ['the', 'of', 'for', 'and', 'national', 'state',
'public', 'royal', 'central', 'general']:
continue
# Skip institution keywords
if word_lower in institution_keywords:
continue
# Found a specific name component
has_specific_name = True
break
if not has_specific_name:
# Name is too generic (e.g., "National Museum" without specific qualifier)
return False
return True
def _calculate_confidence_v5(
self,
name: str,
institution_type: Optional[str],
city: Optional[str],
country: Optional[str],
identifiers: List[Identifier],
sentence: str,
country_source: str
) -> float:
"""
Calculate confidence score for extracted institution (V5 algorithm).
V5 changes from V4:
- Lower base score (0.2 vs 0.3) - more conservative
- Penalties for weak signals (single-word names, inferred country)
- Bonuses for strong signals (explicit location mentions, identifiers)
- Higher threshold (0.6 vs 0.5) for final filtering
Scoring algorithm:
Base: 0.2 (lower than v4's 0.3)
Positive signals:
- +0.3 Has institution type
- +0.2 Has explicit location (city OR country)
- +0.4 Has identifier (ISIL/Wikidata/VIAF)
- +0.2 Name is 2-6 words (optimal length)
- +0.2 Explicit "is a" pattern
- +0.1 Country from explicit mention (not inferred)
Negative signals:
- -0.2 Single-word name without compound validation
- -0.3 Generic descriptor pattern
- -0.2 Country only inferred
- -0.5 Organization/network blacklist match
Args:
name: Institution name
institution_type: Detected type (MUSEUM, LIBRARY, etc.)
city: Detected city
country: Validated country code
identifiers: List of extracted identifiers
sentence: Source sentence for context analysis
country_source: 'explicit', 'inferred', or 'none'
Returns:
Confidence score (0.0-1.0)
"""
score = 0.2 # Lower base than v4 (was 0.3)
# Positive signals
# +0.3 Has institution type
if institution_type:
score += 0.3
# +0.2 Has explicit location (city OR country with explicit source)
has_explicit_location = bool(city) or (country and country_source == 'explicit')
if has_explicit_location:
score += 0.2
# +0.4 Has identifier (strong signal of real institution)
if identifiers:
score += 0.4
# +0.2 Name is 2-6 words (optimal length, not too short/long)
word_count = len(name.split())
if 2 <= word_count <= 6:
score += 0.2
# +0.2 Explicit "is a" pattern in sentence
is_a_patterns = [
rf'\b{re.escape(name)}\b\s+is\s+a\s+(museum|library|archive|gallery)',
rf'\b{re.escape(name)}\b,\s+a\s+(museum|library|archive|gallery)',
]
if any(re.search(pattern, sentence, re.IGNORECASE) for pattern in is_a_patterns):
score += 0.2
# +0.1 Country from explicit mention (bonus beyond location bonus)
if country_source == 'explicit':
score += 0.1
# Negative signals
# -0.2 Single-word name (risky unless compound word)
if word_count == 1:
# Check if it's a compound word (e.g., Rijksmuseum)
is_compound = len(name) > 12 or any(
substring in name.lower()
for substring in ['museum', 'archief', 'bibliotheek', 'gallery']
)
if not is_compound:
score -= 0.2
# -0.3 Generic descriptor pattern
generic_patterns = [
r'\b(national|state|provincial|regional|local)\s+(museum|library|archive)\b',
r'\b(university|college|school)\s+(library|archive)\b',
r'\b(public|city|town)\s+library\b',
]
if any(re.search(pattern, name, re.IGNORECASE) for pattern in generic_patterns):
# But allow if it has a specific qualifier (e.g., "National Museum of Brazil")
has_specific = re.search(r'\b(of|in|for|at)\s+\w+', name, re.IGNORECASE)
if not has_specific:
score -= 0.3
# -0.2 Country only inferred (weak geographic signal)
if country_source == 'inferred':
score -= 0.2
# -0.5 Organization/network (should be filtered, but penalty if missed)
# Note: This should be caught by _is_organization_or_network() first
org_keywords = ['IFLA', 'UNESCO', 'ICOM', 'Network', 'Association', 'Foundation']
if any(keyword in name for keyword in org_keywords):
score -= 0.5
# Clamp to [0.0, 1.0]
return max(0.0, min(1.0, score))
def _extract_identifiers(self, sentence: str) -> List[Identifier]:
"""
Extract external identifiers from sentence.
Filters out common false positives (CD-ROM, UTF-8, etc.)
Enhanced (Nov 2025): Uses context-aware patterns to catch more ISIL codes
Args:
sentence: Sentence to extract from
Returns:
List of Identifier objects
"""
identifiers = []
isil_codes_found = set() # Track to avoid duplicates
# Extract ISIL codes using base pattern
for match in self.patterns.ISIL_PATTERN.finditer(sentence):
isil_code = match.group(1)
# Filter out blacklisted false positives (case-insensitive check)
if any(isil_code.lower() == blacklisted.lower()
for blacklisted in self.patterns.ISIL_BLACKLIST):
continue
# Validate ISIL format (2-letter country code followed by hyphen)
if len(isil_code.split('-')[0]) == 2:
isil_codes_found.add(isil_code)
# Extract ISIL codes using context-aware patterns (Nov 2025)
for context_pattern in self.patterns.ISIL_CONTEXT_PATTERNS:
for match in context_pattern.finditer(sentence):
isil_code = match.group(1)
# Apply same filters
if any(isil_code.lower() == blacklisted.lower()
for blacklisted in self.patterns.ISIL_BLACKLIST):
continue
if len(isil_code.split('-')[0]) == 2:
isil_codes_found.add(isil_code)
# Create Identifier objects for all unique ISIL codes
for isil_code in isil_codes_found:
identifiers.append(Identifier(
identifier_scheme="ISIL",
identifier_value=isil_code,
identifier_url=None,
assigned_date=None
))
# Extract Wikidata IDs
for match in self.patterns.WIKIDATA_PATTERN.finditer(sentence):
wikidata_id = match.group(1)
identifiers.append(Identifier(
identifier_scheme="Wikidata",
identifier_value=wikidata_id,
identifier_url=f"https://www.wikidata.org/entity/{wikidata_id}", # type: ignore[arg-type]
assigned_date=None
))
# Extract VIAF IDs
for match in self.patterns.VIAF_URL_PATTERN.finditer(sentence):
viaf_id = match.group(1)
identifiers.append(Identifier(
identifier_scheme="VIAF",
identifier_value=viaf_id,
identifier_url=f"https://viaf.org/viaf/{viaf_id}", # type: ignore[arg-type]
assigned_date=None
))
return identifiers
def _calculate_confidence(
self,
name: str,
institution_type: Optional[InstitutionType],
city: Optional[str],
identifiers: List[Identifier],
sentence: str
) -> float:
"""
Calculate confidence score for extracted entity.
Scoring criteria:
- Has institution type: +0.2
- Has location: +0.1
- Has identifier: +0.3
- Name length appropriate (2-6 words): +0.2
- Explicit context ("The X is a museum"): +0.2
Base score: 0.3
Args:
name: Institution name
institution_type: Classified type
city: Extracted city
identifiers: Extracted identifiers
sentence: Source sentence
Returns:
Confidence score between 0.0 and 1.0
"""
score = 0.3 # Base score
# Has institution type
if institution_type:
score += 0.2
# Has location
if city:
score += 0.1
# Has identifiers (strong signal)
if identifiers:
score += 0.3
# Name length (2-6 words is typical)
word_count = len(name.split())
if 2 <= word_count <= 6:
score += 0.2
# Explicit "is a" pattern (strong signal)
sentence_lower = sentence.lower()
if f"{name.lower()} is a" in sentence_lower:
score += 0.2
# Cap at 1.0
return min(1.0, score)
def _deduplicate_entities(self, entities: List[ExtractedEntity]) -> List[ExtractedEntity]:
"""
Remove duplicate entities, keeping highest confidence version.
Uses fuzzy matching to detect near-duplicates like:
- "National Museum" vs "National Museu" (truncation)
- "Archive" vs "Archives" (pluralization)
- "Vietnam Museum" vs "Vietnamese Museum" (minor variations)
Args:
entities: List of extracted entities
Returns:
Deduplicated list
"""
if not entities:
return []
# Group by normalized name (case-insensitive + fuzzy matching)
deduplicated: List[ExtractedEntity] = []
for entity in entities:
# Check if this entity is similar to any already deduplicated
is_duplicate = False
for i, existing in enumerate(deduplicated):
# Use fuzzy matching if available, otherwise exact case-insensitive match
if RAPIDFUZZ_AVAILABLE:
similarity = fuzz.ratio(
entity.name.lower(),
existing.name.lower()
)
# 85% similarity threshold catches most duplicates
# "Vietnam Museum" vs "Vietnamese Museum" = 88.9%
# "Archive" vs "Archives" = 93.3%
# "National Museum" vs "National Museu" = 96.0%
if similarity >= 85:
is_duplicate = True
# Keep entity with higher confidence
if entity.confidence_score > existing.confidence_score:
deduplicated[i] = entity
break
else:
# Fallback: exact case-insensitive match
if entity.name.lower() == existing.name.lower():
is_duplicate = True
if entity.confidence_score > existing.confidence_score:
deduplicated[i] = entity
break
if not is_duplicate:
deduplicated.append(entity)
return deduplicated
# =========================================================================
# ENTITY TO HERITAGE CUSTODIAN CONVERSION
# =========================================================================
def _entity_to_custodian(
self,
entity: ExtractedEntity,
conversation_id: Optional[str] = None,
conversation_name: Optional[str] = None
) -> HeritageCustodian:
"""
Convert extracted entity to HeritageCustodian record.
Args:
entity: Extracted entity
conversation_id: UUID of source conversation
conversation_name: Name of source conversation
Returns:
HeritageCustodian record with provenance metadata
"""
# Generate unique ID for this record
record_id = f"https://w3id.org/heritage/custodian/{uuid.uuid4()}"
# Build location if city/country available
locations = []
if entity.city or entity.country:
location = Location(
location_type=None,
street_address=None,
city=entity.city,
postal_code=None,
region=None,
country=entity.country,
latitude=None,
longitude=None,
geonames_id=None,
is_primary=True
)
locations.append(location)
# Create provenance metadata
provenance = Provenance(
data_source=DataSource.CONVERSATION_NLP,
data_tier=DataTier.TIER_4_INFERRED,
extraction_date=datetime.now(timezone.utc),
extraction_method="Pattern matching + heuristic NER",
confidence_score=entity.confidence_score,
conversation_id=conversation_id,
source_url=None,
verified_date=None,
verified_by=None
)
# Build description from context
description = None
if conversation_name:
description = f"Extracted from conversation: {conversation_name}"
if entity.text_snippet:
description += f"\n\nContext: {entity.text_snippet}"
# Create HeritageCustodian record
custodian = HeritageCustodian(
id=record_id,
name=entity.name,
institution_type=entity.institution_type or InstitutionType.UNKNOWN,
organization_status=OrganizationStatus.UNKNOWN,
description=description,
parent_organization=None,
founded_date=None,
closed_date=None,
homepage=None,
ghcid_numeric=None,
ghcid_current=None,
ghcid_original=None,
ghcid_history=None,
contact_info=None,
locations=locations if locations else [],
identifiers=entity.identifiers if entity.identifiers else [],
provenance=provenance
)
return custodian