# Entity Linking for Heritage Custodians ## Overview This document defines entity linking strategies for resolving extracted heritage institution mentions to canonical knowledge bases (Wikidata, VIAF, ISIL registry) and the local Heritage Custodian Ontology knowledge graph. ## Entity Linking Architecture ``` ┌──────────────────────────────────────────────────────────────────────┐ │ Entity Linking Pipeline │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ Extracted Entity ──► Candidate Generation ──► Candidate Ranking │ │ (NER) (Multi-source) (Features + ML) │ │ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Knowledge │ │ Disambiguation │ │ │ │ Bases │ │ Module │ │ │ ├─────────────────┤ └────────┬────────┘ │ │ │ • Wikidata │ │ │ │ │ • VIAF │ ▼ │ │ │ • ISIL Registry │ ┌─────────────────┐ │ │ │ • GeoNames │ │ NIL Detection │ │ │ │ • Local KG │ │ (No KB Entry) │ │ │ └─────────────────┘ └────────┬────────┘ │ │ │ │ │ ▼ │ │ Linked Entity (or NIL) │ └──────────────────────────────────────────────────────────────────────┘ ``` ## Knowledge Bases ### Primary Knowledge Bases | KB | Property | Use Case | Lookup Method | |----|----------|----------|---------------| | **Wikidata** | Q-entities | Primary reference KB | SPARQL + API | | **VIAF** | Authority IDs | Organization authorities | SRU API | | **ISIL** | Library/archive codes | Unique institution IDs | Direct lookup | | **GeoNames** | Place IDs | Location disambiguation | API + DB | | **Local KG** | GHCID | Internal entity resolution | TypeDB query | ### Identifier Cross-Reference Table ```python IDENTIFIER_PROPERTIES = { "wikidata": { "isil": "P791", # ISIL identifier "viaf": "P214", # VIAF ID "isni": "P213", # ISNI "ror": "P6782", # ROR ID "gnd": "P227", # GND ID (German) "loc": "P244", # Library of Congress "bnf": "P268", # BnF (French) "nta": "P1006", # Netherlands Thesaurus for Author names } } ``` ## DSPy Entity Linker Module ### EntityLinker Signature ```python import dspy from typing import List, Optional from pydantic import BaseModel, Field class LinkedEntity(BaseModel): """A linked entity with KB reference.""" mention_text: str = Field(description="Original mention text") canonical_name: str = Field(description="Canonical name from KB") kb_id: str = Field(description="Knowledge base identifier") kb_source: str = Field(description="KB source: wikidata, viaf, isil, geonames, local") confidence: float = Field(ge=0.0, le=1.0) # Additional identifiers discovered wikidata_id: Optional[str] = None viaf_id: Optional[str] = None isil_code: Optional[str] = None ghcid: Optional[str] = None # Disambiguation features type_match: bool = Field(default=False, description="KB type matches expected type") location_match: bool = Field(default=False, description="Location context matches") class EntityLinkerOutput(BaseModel): linked_entities: List[LinkedEntity] nil_entities: List[str] = Field(description="Mentions with no KB match (NIL)") class EntityLinker(dspy.Signature): """Link extracted heritage institution mentions to knowledge bases. Linking strategy: 1. Generate candidates from multiple KBs (Wikidata, VIAF, ISIL, local KG) 2. Score candidates using name similarity, type matching, location context 3. Apply disambiguation for ambiguous cases 4. Detect NIL entities (no KB entry exists) Priority: - ISIL code match → highest confidence (unique identifier) - Wikidata exact match → high confidence - VIAF authority match → high confidence - Local KG GHCID match → medium confidence - Fuzzy name match → lower confidence, requires verification """ entities: List[str] = dspy.InputField(desc="Extracted entity mentions to link") entity_types: List[str] = dspy.InputField(desc="Expected types (GLAMORCUBESFIXPHDNT)") context: str = dspy.InputField(desc="Surrounding text for disambiguation") country_hint: Optional[str] = dspy.InputField(default=None, desc="Country context") linked: EntityLinkerOutput = dspy.OutputField(desc="Linked entities") ``` ## Candidate Generation ### Multi-Source Candidate Generator ```python class CandidateGenerator: """Generate entity candidates from multiple knowledge bases.""" def __init__(self): self.wikidata_client = WikidataClient() self.viaf_client = VIAFClient() self.isil_registry = ISILRegistry() self.geonames_client = GeoNamesClient() self.local_kg = TypeDBClient() def generate_candidates( self, mention: str, entity_type: str, country_hint: str = None, max_candidates: int = 10, ) -> List[Candidate]: """Generate candidates from all sources.""" candidates = [] # 1. ISIL Registry (exact match for known codes) if self._looks_like_isil(mention): isil_candidate = self.isil_registry.lookup(mention) if isil_candidate: candidates.append(Candidate( kb_id=mention, kb_source="isil", name=isil_candidate["name"], score=1.0, # Exact match )) # 2. Wikidata (label search + type filter) wd_candidates = self.wikidata_client.search_entities( query=mention, instance_of=self._type_to_wikidata_class(entity_type), country=country_hint, limit=max_candidates, ) candidates.extend(wd_candidates) # 3. VIAF (organization search) if entity_type in ["A", "L", "M", "O", "R"]: # Formal organizations viaf_candidates = self.viaf_client.search_organizations( query=mention, limit=max_candidates // 2, ) candidates.extend(viaf_candidates) # 4. Local KG (GHCID lookup) local_candidates = self.local_kg.search_custodians( name_query=mention, custodian_type=entity_type, country=country_hint, limit=max_candidates // 2, ) candidates.extend(local_candidates) return self._deduplicate(candidates) def _type_to_wikidata_class(self, glamor_type: str) -> str: """Map GLAMORCUBESFIXPHDNT type to Wikidata class.""" TYPE_MAP = { "G": "Q1007870", # art gallery "L": "Q7075", # library "A": "Q166118", # archive "M": "Q33506", # museum "O": "Q2659904", # government agency "R": "Q31855", # research institute "B": "Q167346", # botanical garden "E": "Q3918", # university "S": "Q988108", # historical society "H": "Q16970", # church (with collections) "D": "Q35127", # website / digital platform } return TYPE_MAP.get(glamor_type, "Q43229") # Default: organization def _looks_like_isil(self, text: str) -> bool: import re return bool(re.match(r"^[A-Z]{2}-[A-Za-z0-9]+$", text)) ``` ### Wikidata Candidate Search ```python class WikidataClient: """Wikidata entity search and lookup.""" ENDPOINT = "https://query.wikidata.org/sparql" def search_entities( self, query: str, instance_of: str = None, country: str = None, limit: int = 10, ) -> List[Candidate]: """Search Wikidata entities by label.""" # Build SPARQL query with filters filters = [] if instance_of: filters.append(f"?item wdt:P31/wdt:P279* wd:{instance_of} .") if country: country_qid = self._country_to_qid(country) if country_qid: filters.append(f"?item wdt:P17 wd:{country_qid} .") filter_clause = "\n".join(filters) sparql = f""" SELECT ?item ?itemLabel ?itemDescription ?isil ?viaf WHERE {{ SERVICE wikibase:mwapi {{ bd:serviceParam wikibase:api "EntitySearch" . bd:serviceParam wikibase:endpoint "www.wikidata.org" . bd:serviceParam mwapi:search "{query}" . bd:serviceParam mwapi:language "en,nl,de,fr" . ?item wikibase:apiOutputItem mwapi:item . }} {filter_clause} OPTIONAL {{ ?item wdt:P791 ?isil }} OPTIONAL {{ ?item wdt:P214 ?viaf }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,nl,de,fr" }} }} LIMIT {limit} """ results = self._execute_sparql(sparql) return [ Candidate( kb_id=r["item"]["value"].split("/")[-1], kb_source="wikidata", name=r.get("itemLabel", {}).get("value", ""), description=r.get("itemDescription", {}).get("value", ""), isil=r.get("isil", {}).get("value"), viaf=r.get("viaf", {}).get("value"), score=0.0, # Score computed later ) for r in results ] def get_entity_details(self, qid: str) -> dict: """Get full entity details from Wikidata.""" sparql = f""" SELECT ?prop ?propLabel ?value ?valueLabel WHERE {{ wd:{qid} ?prop ?value . SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,nl" }} }} """ return self._execute_sparql(sparql) ``` ### VIAF Authority Search ```python class VIAFClient: """VIAF Virtual International Authority File client.""" SRU_ENDPOINT = "https://viaf.org/viaf/search" def search_organizations( self, query: str, limit: int = 10, ) -> List[Candidate]: """Search VIAF for corporate bodies.""" # SRU CQL query cql_query = f'local.corporateNames all "{query}"' params = { "query": cql_query, "maximumRecords": limit, "httpAccept": "application/json", "recordSchema": "BriefVIAF", } response = requests.get(self.SRU_ENDPOINT, params=params) data = response.json() candidates = [] for record in data.get("records", []): viaf_id = record.get("viafID") main_heading = record.get("mainHeadingEl", {}).get("datafield", {}) name = self._extract_name(main_heading) candidates.append(Candidate( kb_id=viaf_id, kb_source="viaf", name=name, score=0.0, )) return candidates def get_authority_cluster(self, viaf_id: str) -> dict: """Get all authority records linked to a VIAF cluster.""" url = f"https://viaf.org/viaf/{viaf_id}/viaf.json" response = requests.get(url) if response.status_code == 200: return response.json() return {} ``` ### ISIL Registry Lookup ```python class ISILRegistry: """ISIL (International Standard Identifier for Libraries) registry.""" def __init__(self, db_path: str = "data/reference/isil_registry.db"): self.db_path = db_path def lookup(self, isil_code: str) -> Optional[dict]: """Look up institution by ISIL code.""" import sqlite3 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT name, city, country, institution_type, notes FROM isil_registry WHERE isil_code = ? """, (isil_code,)) row = cursor.fetchone() conn.close() if row: return { "isil_code": isil_code, "name": row[0], "city": row[1], "country": row[2], "institution_type": row[3], "notes": row[4], } return None def search_by_name( self, name: str, country: str = None, limit: int = 10, ) -> List[dict]: """Search ISIL registry by institution name.""" import sqlite3 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = """ SELECT isil_code, name, city, country, institution_type FROM isil_registry WHERE name LIKE ? """ params = [f"%{name}%"] if country: query += " AND country = ?" params.append(country) query += f" LIMIT {limit}" cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [ { "isil_code": row[0], "name": row[1], "city": row[2], "country": row[3], "institution_type": row[4], } for row in rows ] ``` ## Candidate Ranking ### Feature-Based Ranking ```python class CandidateRanker: """Rank entity candidates using multiple features.""" def __init__(self): self.name_matcher = NameMatcher() self.type_checker = TypeChecker() self.location_matcher = LocationMatcher() def rank_candidates( self, mention: str, candidates: List[Candidate], context: str, expected_type: str, location_context: str = None, ) -> List[Candidate]: """Rank candidates by combined feature score.""" for candidate in candidates: # Feature 1: Name similarity name_score = self.name_matcher.similarity(mention, candidate.name) # Feature 2: Type match type_score = self.type_checker.type_match_score( candidate.kb_source, candidate.kb_id, expected_type, ) # Feature 3: Location context location_score = 0.0 if location_context: location_score = self.location_matcher.location_match_score( candidate, location_context, ) # Feature 4: Context similarity context_score = self._context_similarity(candidate, context) # Feature 5: Source priority source_score = self._source_priority(candidate.kb_source) # Combine scores (weighted) candidate.score = ( 0.35 * name_score + 0.25 * type_score + 0.15 * location_score + 0.15 * context_score + 0.10 * source_score ) # Sort by score descending candidates.sort(key=lambda c: c.score, reverse=True) return candidates def _source_priority(self, source: str) -> float: """Priority score for KB source (ISIL > Wikidata > VIAF > local).""" PRIORITIES = { "isil": 1.0, # Unique identifier "wikidata": 0.9, # Rich entity data "viaf": 0.8, # Authority file "local": 0.7, # Local KG "geonames": 0.6, # Place data } return PRIORITIES.get(source, 0.5) def _context_similarity(self, candidate: Candidate, context: str) -> float: """Semantic similarity between candidate description and context.""" if not candidate.description: return 0.5 # Use sentence embeddings from sentence_transformers import util context_emb = self.embedder.encode(context) desc_emb = self.embedder.encode(candidate.description) return float(util.cos_sim(context_emb, desc_emb)[0][0]) ``` ### Name Matching ```python class NameMatcher: """Fuzzy name matching for entity linking.""" def __init__(self): self.normalizer = NameNormalizer() def similarity(self, mention: str, candidate_name: str) -> float: """Compute name similarity score.""" # Normalize both names norm_mention = self.normalizer.normalize(mention) norm_candidate = self.normalizer.normalize(candidate_name) # Exact match if norm_mention == norm_candidate: return 1.0 # Token overlap (Jaccard) mention_tokens = set(norm_mention.split()) candidate_tokens = set(norm_candidate.split()) jaccard = len(mention_tokens & candidate_tokens) / len(mention_tokens | candidate_tokens) # Levenshtein ratio from rapidfuzz import fuzz levenshtein = fuzz.ratio(norm_mention, norm_candidate) / 100.0 # Token sort ratio (order-independent) token_sort = fuzz.token_sort_ratio(norm_mention, norm_candidate) / 100.0 # Combine scores return 0.4 * jaccard + 0.3 * levenshtein + 0.3 * token_sort class NameNormalizer: """Normalize institution names for matching.""" # Skip words by language (legal forms, articles) SKIP_WORDS = { "nl": ["stichting", "de", "het", "van", "voor", "en", "te"], "en": ["the", "of", "and", "for", "foundation", "trust", "inc"], "de": ["der", "die", "das", "und", "für", "stiftung", "e.v."], "fr": ["le", "la", "les", "de", "du", "et", "fondation"], } def normalize(self, name: str, language: str = "nl") -> str: """Normalize institution name.""" import unicodedata import re # Lowercase name = name.lower() # Remove diacritics name = unicodedata.normalize("NFD", name) name = "".join(c for c in name if unicodedata.category(c) != "Mn") # Remove punctuation name = re.sub(r"[^\w\s]", " ", name) # Remove skip words skip = set(self.SKIP_WORDS.get(language, [])) tokens = [t for t in name.split() if t not in skip] # Collapse whitespace return " ".join(tokens) ``` ### Type Checking ```python class TypeChecker: """Check if candidate type matches expected type.""" # Wikidata class mappings for GLAMORCUBESFIXPHDNT WIKIDATA_TYPE_MAP = { "G": ["Q1007870", "Q207694"], # art gallery, museum of art "L": ["Q7075", "Q856234"], # library, national library "A": ["Q166118", "Q2860091"], # archive, national archive "M": ["Q33506", "Q17431399"], # museum, museum building "O": ["Q2659904", "Q327333"], # government agency, public body "R": ["Q31855", "Q7315155"], # research institute, research center "B": ["Q167346", "Q43501"], # botanical garden, zoo "E": ["Q3918", "Q875538"], # university, public university "S": ["Q988108", "Q15911314"], # historical society, heritage organization "H": ["Q16970", "Q839954"], # church, religious institute "D": ["Q35127", "Q856584"], # website, digital library } def type_match_score( self, kb_source: str, kb_id: str, expected_type: str, ) -> float: """Score type compatibility.""" if kb_source == "wikidata": return self._wikidata_type_match(kb_id, expected_type) elif kb_source == "isil": return 0.9 # ISIL implies library/archive type elif kb_source == "viaf": return 0.8 # VIAF implies organization return 0.5 # Unknown def _wikidata_type_match(self, qid: str, expected_type: str) -> float: """Check if Wikidata entity type matches expected.""" expected_classes = self.WIKIDATA_TYPE_MAP.get(expected_type, []) if not expected_classes: return 0.5 # Query Wikidata for instance_of sparql = f""" SELECT ?class WHERE {{ wd:{qid} wdt:P31/wdt:P279* ?class . VALUES ?class {{ {' '.join(f'wd:{c}' for c in expected_classes)} }} }} LIMIT 1 """ results = wikidata_execute_sparql(sparql) if results: return 1.0 # Direct type match # Check for broader match sparql_broad = f""" SELECT ?class WHERE {{ wd:{qid} wdt:P31 ?class . }} LIMIT 5 """ results_broad = wikidata_execute_sparql(sparql_broad) if results_broad: return 0.6 # Has some type, but not exact match return 0.3 # No type information ``` ## Disambiguation Strategies ### Context-Based Disambiguation ```python class DisambiguationModule(dspy.Module): """Disambiguate between multiple candidate matches.""" def __init__(self): super().__init__() self.disambiguator = dspy.ChainOfThought(DisambiguationSignature) def forward( self, mention: str, candidates: List[Candidate], context: str, ) -> Candidate: # Format candidates for LLM candidate_descriptions = "\n".join([ f"- {c.kb_source}:{c.kb_id} - {c.name}: {c.description or 'No description'}" for c in candidates[:5] # Top 5 ]) result = self.disambiguator( mention=mention, candidates=candidate_descriptions, context=context, ) # Parse result and find matching candidate selected_id = result.selected_id for candidate in candidates: if f"{candidate.kb_source}:{candidate.kb_id}" == selected_id: return candidate # Return top candidate if parsing fails return candidates[0] if candidates else None class DisambiguationSignature(dspy.Signature): """Select the correct entity from candidates. Given a mention, multiple candidate matches, and surrounding context, determine which candidate is the correct entity reference. Consider: - Name similarity (exact vs partial match) - Type compatibility (is it the right kind of institution?) - Location context (does location match?) - Contextual clues (other entities, topics mentioned) """ mention: str = dspy.InputField(desc="Entity mention text") candidates: str = dspy.InputField(desc="Formatted candidate list") context: str = dspy.InputField(desc="Surrounding text context") selected_id: str = dspy.OutputField(desc="Selected candidate ID (format: source:id)") reasoning: str = dspy.OutputField(desc="Explanation for selection") ``` ### Geographic Disambiguation ```python class LocationMatcher: """Disambiguate entities using location context.""" def __init__(self): self.geonames = GeoNamesClient() def location_match_score( self, candidate: Candidate, location_context: str, ) -> float: """Score location compatibility.""" # Extract location from context context_locations = self._extract_locations(location_context) if not context_locations: return 0.5 # No location to match # Get candidate location candidate_location = self._get_candidate_location(candidate) if not candidate_location: return 0.5 # No candidate location # Compare locations for context_loc in context_locations: # Same city if self._same_city(context_loc, candidate_location): return 1.0 # Same region if self._same_region(context_loc, candidate_location): return 0.8 # Same country if self._same_country(context_loc, candidate_location): return 0.6 return 0.2 # No location match def _get_candidate_location(self, candidate: Candidate) -> Optional[dict]: """Get location for candidate from KB.""" if candidate.kb_source == "wikidata": sparql = f""" SELECT ?city ?country ?coords WHERE {{ OPTIONAL {{ wd:{candidate.kb_id} wdt:P131 ?city }} OPTIONAL {{ wd:{candidate.kb_id} wdt:P17 ?country }} OPTIONAL {{ wd:{candidate.kb_id} wdt:P625 ?coords }} }} LIMIT 1 """ results = wikidata_execute_sparql(sparql) if results: return { "city": results[0].get("city", {}).get("value"), "country": results[0].get("country", {}).get("value"), "coords": results[0].get("coords", {}).get("value"), } elif candidate.kb_source == "isil": # ISIL country from code prefix country_code = candidate.kb_id.split("-")[0] return {"country_code": country_code} return None ``` ## NIL Detection ### NIL Entity Classifier ```python class NILDetector: """Detect entities with no knowledge base entry (NIL).""" def __init__(self, nil_threshold: float = 0.4): self.nil_threshold = nil_threshold def is_nil( self, mention: str, top_candidate: Optional[Candidate], context: str, ) -> Tuple[bool, str]: """Determine if mention refers to a NIL entity. Returns: (is_nil, reason) """ # No candidates found if top_candidate is None: return True, "no_candidates_found" # Top candidate score below threshold if top_candidate.score < self.nil_threshold: return True, f"low_confidence_score_{top_candidate.score:.2f}" # Name too dissimilar name_sim = NameMatcher().similarity(mention, top_candidate.name) if name_sim < 0.5: return True, f"name_mismatch_{name_sim:.2f}" # Type mismatch (if type info available) # ... return False, "valid_match" def create_nil_entity( self, mention: str, entity_type: str, context: str, provenance: dict, ) -> dict: """Create a NIL entity record for later KB population.""" return { "mention_text": mention, "entity_type": entity_type, "context_snippet": context[:500], "nil_reason": "no_kb_match", "provenance": provenance, "created_date": datetime.now().isoformat(), "status": "pending_verification", } ``` ## Full Entity Linking Pipeline ```python class EntityLinkingPipeline(dspy.Module): """Complete entity linking pipeline.""" def __init__(self): super().__init__() self.candidate_generator = CandidateGenerator() self.candidate_ranker = CandidateRanker() self.disambiguator = DisambiguationModule() self.nil_detector = NILDetector() def forward( self, entities: List[dict], # [{mention, type, context}] country_hint: str = None, ) -> EntityLinkerOutput: linked_entities = [] nil_entities = [] for entity in entities: mention = entity["mention"] entity_type = entity["type"] context = entity["context"] # 1. Generate candidates candidates = self.candidate_generator.generate_candidates( mention=mention, entity_type=entity_type, country_hint=country_hint, ) if not candidates: nil_entities.append(mention) continue # 2. Rank candidates ranked = self.candidate_ranker.rank_candidates( mention=mention, candidates=candidates, context=context, expected_type=entity_type, location_context=country_hint, ) # 3. Disambiguate if needed if len(ranked) > 1 and ranked[0].score - ranked[1].score < 0.1: # Close scores - need disambiguation selected = self.disambiguator( mention=mention, candidates=ranked[:5], context=context, ) else: selected = ranked[0] # 4. NIL detection is_nil, nil_reason = self.nil_detector.is_nil( mention=mention, top_candidate=selected, context=context, ) if is_nil: nil_entities.append(mention) continue # 5. Create linked entity linked_entities.append(LinkedEntity( mention_text=mention, canonical_name=selected.name, kb_id=selected.kb_id, kb_source=selected.kb_source, confidence=selected.score, wikidata_id=selected.kb_id if selected.kb_source == "wikidata" else None, viaf_id=selected.viaf, isil_code=selected.isil, type_match=selected.score > 0.7, )) return EntityLinkerOutput( linked_entities=linked_entities, nil_entities=nil_entities, ) ``` ## Confidence Thresholds | Scenario | Threshold | Action | |----------|-----------|--------| | **Exact ISIL match** | 1.0 | Auto-link | | **Wikidata exact name + type** | ≥0.9 | Auto-link | | **Fuzzy match, high context** | ≥0.7 | Auto-link | | **Fuzzy match, low context** | 0.5-0.7 | Flag for review | | **Low score** | <0.5 | Mark as NIL | | **No candidates** | 0.0 | Create NIL record | ## See Also - [04-entity-extraction.md](./04-entity-extraction.md) - NER patterns and extraction - [07-sparql-templates.md](./07-sparql-templates.md) - Wikidata SPARQL queries - [06-retrieval-patterns.md](./06-retrieval-patterns.md) - KG retrieval strategies - [AGENTS.md](../../AGENTS.md) - Rule 1 (Ontology consultation), Rule 10 (CH-Annotator)