glam/docs/dspy_rag/04-entity-extraction.md
2025-12-12 12:51:10 +01:00

20 KiB
Raw Permalink Blame History

Entity Extraction for Heritage Custodians

Overview

This document defines Named Entity Recognition (NER) patterns for extracting heritage institution entities from text, following the CH-Annotator v1.7.0 convention and aligning with the Heritage Custodian Ontology LinkML schema.

Hypernym Entity Types

CH-Annotator defines 9 hypernym categories relevant to heritage extraction:

Code Hypernym Primary Ontology Description
AGT AGENT crm:E39_Actor Persons, staff, curators
GRP GROUP crm:E74_Group Organizations, institutions
TOP TOPONYM crm:E53_Place Place names (nominal)
GEO GEOMETRY geo:Geometry Coordinates, shapes
TMP TEMPORAL time:TemporalEntity Dates, periods
APP APPELLATION crm:E41_Appellation Titles, collection names
ROL ROLE org:Role Positions, occupations
WRK WORK frbr:Work Documents, artworks
QTY QUANTITY crm:E54_Dimension Counts, measurements

Heritage Institution Subtypes (GRP.HER)

GRP.HER:  # Heritage Custodian
  subtypes:
    GRP.HER.GAL:  # G - Gallery
    GRP.HER.LIB:  # L - Library
    GRP.HER.ARC:  # A - Archive
    GRP.HER.MUS:  # M - Museum
    GRP.HER.OFF:  # O - Official institution
    GRP.HER.RES:  # R - Research center
    GRP.HER.COR:  # C - Commercial heritage
    GRP.HER.UNK:  # U - Unknown/unspecified
    GRP.HER.BIO:  # B - Botanical/zoo
    GRP.HER.EDU:  # E - Education provider
    GRP.HER.SOC:  # S - Heritage society
    GRP.HER.FEA:  # F - Feature custodian
    GRP.HER.INT:  # I - Intangible heritage
    GRP.HER.MIX:  # X - Mixed types
    GRP.HER.PER:  # P - Personal collection
    GRP.HER.HOL:  # H - Holy/sacred site
    GRP.HER.DIG:  # D - Digital platform
    GRP.HER.NGO:  # N - Non-profit organization
    GRP.HER.TAS:  # T - Taste/scent heritage

Pattern-Based Entity Extraction

Heritage Institution Patterns

import re
from typing import List, Tuple

# Institution name patterns by language
INSTITUTION_PATTERNS = {
    "dutch": {
        "museum": r"\b(?:(?:Nationaal|Koninklijk|Stedelijk|Rijks|Gemeentelijk|Historisch|Maritiem)\s+)?(?:Museum|Musea)\s+[\w\s-]+\b",
        "archive": r"\b(?:(?:Nationaal|Regionaal|Gemeentelijk|Stads|Rijks)\s+)?(?:Archief|Archieven)\s*[\w\s-]*\b",
        "library": r"\b(?:(?:Koninklijke|Nationale|Openbare|Universiteits)\s+)?(?:Bibliotheek|Bibliotheken)\s*[\w\s-]*\b",
        "society": r"\b(?:Historische\s+)?(?:Vereniging|Stichting|Genootschap|Kring)\s+[\w\s-]+\b",
    },
    "english": {
        "museum": r"\b(?:(?:National|Royal|State|City|County)\s+)?Museum(?:\s+of\s+[\w\s]+)?\b",
        "archive": r"\b(?:(?:National|State|County)\s+)?Archives?(?:\s+of\s+[\w\s]+)?\b",
        "library": r"\b(?:(?:National|State|Public|University)\s+)?Library(?:\s+of\s+[\w\s]+)?\b",
    },
    "german": {
        "museum": r"\b(?:(?:Staatliches|Deutsches|Historisches)\s+)?(?:Museum|Museen)\s+[\w\s-]+\b",
        "archive": r"\b(?:(?:Bundes|Landes|Stadt)\s+)?(?:Archiv|Archive)\s*[\w\s-]*\b",
        "library": r"\b(?:(?:Staats|Landes|Stadt|Universitäts)\s+)?(?:Bibliothek|Bücherei)\s*[\w\s-]*\b",
    },
}

def extract_institutions_by_pattern(text: str, language: str = "dutch") -> List[Tuple[str, str, int, int]]:
    """Extract heritage institutions using regex patterns.
    
    Returns: List of (entity_text, entity_type, start_offset, end_offset)
    """
    results = []
    patterns = INSTITUTION_PATTERNS.get(language, INSTITUTION_PATTERNS["english"])
    
    for inst_type, pattern in patterns.items():
        for match in re.finditer(pattern, text, re.IGNORECASE):
            results.append((
                match.group(),
                f"GRP.HER.{inst_type.upper()[:3]}",
                match.start(),
                match.end()
            ))
    
    return results

Identifier Patterns

IDENTIFIER_PATTERNS = {
    "isil": {
        "pattern": r"\b([A-Z]{2}-[A-Za-z0-9]{2,12})\b",
        "validation": lambda x: len(x) >= 5 and "-" in x,
        "scheme": "ISIL"
    },
    "wikidata": {
        "pattern": r"\b(Q\d{1,10})\b",
        "validation": lambda x: x.startswith("Q") and x[1:].isdigit(),
        "scheme": "Wikidata"
    },
    "viaf": {
        "pattern": r"viaf\.org/viaf/(\d+)",
        "validation": lambda x: x.isdigit() and len(x) >= 4,
        "scheme": "VIAF"
    },
    "kvk": {
        "pattern": r"\bKvK[:\s#]*(\d{8})\b|\b(\d{8})\s*(?:KvK|Chamber)",
        "validation": lambda x: len(x) == 8 and x.isdigit(),
        "scheme": "KvK"
    },
    "isni": {
        "pattern": r"\b((?:\d{4}[\s-]?){4})\b",
        "validation": lambda x: len(x.replace(" ", "").replace("-", "")) == 16,
        "scheme": "ISNI"
    },
    "ror": {
        "pattern": r"\b(0[a-z0-9]{8})\b",
        "validation": lambda x: len(x) == 9 and x.startswith("0"),
        "scheme": "ROR"
    },
}

def extract_identifiers(text: str) -> List[dict]:
    """Extract and validate external identifiers from text."""
    results = []
    
    for id_type, config in IDENTIFIER_PATTERNS.items():
        for match in re.finditer(config["pattern"], text, re.IGNORECASE):
            value = match.group(1) if match.groups() else match.group()
            if config["validation"](value):
                results.append({
                    "scheme": config["scheme"],
                    "value": value,
                    "span": (match.start(), match.end()),
                    "valid": True
                })
    
    return results

Temporal Patterns (TIMEX3-aligned)

TEMPORAL_PATTERNS = {
    # TMP.DAB - Datable (absolute dates)
    "full_date": r"\b(\d{1,2}[-/]\d{1,2}[-/]\d{2,4}|\d{4}[-/]\d{1,2}[-/]\d{1,2})\b",
    "year": r"\b(1[5-9]\d{2}|20[0-2]\d)\b",  # 1500-2029
    "month_year": r"\b((?:January|February|March|April|May|June|July|August|September|October|November|December|Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\.?\s+\d{4})\b",
    
    # TMP.DRL - Deictic/Relative (context-dependent)
    "relative": r"\b(last\s+(?:year|month|week)|next\s+(?:year|month)|recently|currently|now|today)\b",
    
    # TMP.DUR - Durations
    "duration": r"\b(\d+\s+(?:years?|months?|weeks?|days?|centuries?|decades?))\b",
    
    # TMP.SET - Recurring/periodic
    "recurring": r"\b(every\s+(?:day|week|month|year)|daily|weekly|monthly|annually|(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)days?)\b",
    
    # TMP.RNG - Ranges
    "range": r"\b(\d{4}[-]\d{4}|\d{4}\s*(?:to|through|until)\s*\d{4})\b",
    
    # Century references
    "century": r"\b((?:\d{1,2}(?:st|nd|rd|th)|(?:first|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth|eleventh|twelfth|thirteenth|fourteenth|fifteenth|sixteenth|seventeenth|eighteenth|nineteenth|twentieth|twenty-first))\s+century)\b",
}

def extract_temporal(text: str) -> List[dict]:
    """Extract temporal expressions following TIMEX3 typology."""
    results = []
    
    for temp_type, pattern in TEMPORAL_PATTERNS.items():
        for match in re.finditer(pattern, text, re.IGNORECASE):
            results.append({
                "value": match.group(1) if match.groups() else match.group(),
                "type": temp_type,
                "span": (match.start(), match.end())
            })
    
    return results

DSPy Entity Extraction Module

CustodianNER Signature

import dspy
from typing import List, Optional
from pydantic import BaseModel, Field

class EntityMention(BaseModel):
    """A single entity mention in text."""
    text: str = Field(description="The entity text as it appears")
    entity_type: str = Field(description="CH-Annotator type code (e.g., GRP.HER.MUS)")
    start_offset: int = Field(description="Character start offset")
    end_offset: int = Field(description="Character end offset")
    confidence: float = Field(ge=0.0, le=1.0)
    
    # Optional normalization
    normalized_name: Optional[str] = Field(default=None, description="Normalized form of entity")
    wikidata_candidate: Optional[str] = Field(default=None, description="Candidate Wikidata ID")

class CustodianNEROutput(BaseModel):
    """Output of heritage entity extraction."""
    entities: List[EntityMention]
    text_language: str = Field(description="Detected language (ISO 639-1)")

class CustodianNER(dspy.Signature):
    """Extract heritage institution entities from text.
    
    Entity types to extract (GRP.HER subtypes):
    - GRP.HER.MUS: Museums (art, history, science, natural history)
    - GRP.HER.ARC: Archives (national, regional, municipal, corporate)
    - GRP.HER.LIB: Libraries (national, public, academic, special)
    - GRP.HER.GAL: Galleries (art galleries, exhibition spaces)
    - GRP.HER.SOC: Heritage societies (historische vereniging, heemkundige kring)
    - GRP.HER.RES: Research centers, documentation centers
    - GRP.HER.EDU: Universities with heritage collections
    - GRP.HER.HOL: Religious sites with heritage collections
    - GRP.HER.DIG: Digital platforms, online archives
    
    Also extract:
    - AGT.STF: Staff members with titles/roles
    - TOP: Place names (cities, regions, countries)
    - TMP: Temporal expressions (founding dates, periods)
    - APP.COL: Collection names
    
    Follow CH-Annotator v1.7.0 convention for type codes.
    """
    
    text: str = dspy.InputField(desc="Text to extract entities from")
    language_hint: Optional[str] = dspy.InputField(desc="Language hint (nl, en, de, fr)", default=None)
    
    extracted: CustodianNEROutput = dspy.OutputField(desc="Extracted entities")

Hybrid Extraction Pipeline

class HybridEntityExtractor(dspy.Module):
    """Combines pattern-based and LLM-based entity extraction."""
    
    def __init__(self):
        super().__init__()
        self.llm_extractor = dspy.ChainOfThought(CustodianNER)
    
    def forward(self, text: str, language: str = "nl") -> CustodianNEROutput:
        # 1. Pattern-based extraction (high precision)
        pattern_entities = []
        
        # Extract institutions by pattern
        for entity_text, entity_type, start, end in extract_institutions_by_pattern(text, language):
            pattern_entities.append(EntityMention(
                text=entity_text,
                entity_type=entity_type,
                start_offset=start,
                end_offset=end,
                confidence=0.9  # High confidence for pattern matches
            ))
        
        # Extract identifiers
        for ident in extract_identifiers(text):
            pattern_entities.append(EntityMention(
                text=f"{ident['scheme']}:{ident['value']}",
                entity_type="IDENTIFIER",
                start_offset=ident["span"][0],
                end_offset=ident["span"][1],
                confidence=0.95
            ))
        
        # 2. LLM-based extraction (high recall)
        llm_result = self.llm_extractor(text=text, language_hint=language)
        
        # 3. Merge results (deduplicate by span overlap)
        merged = self._merge_entities(pattern_entities, llm_result.extracted.entities)
        
        return CustodianNEROutput(
            entities=merged,
            text_language=language
        )
    
    def _merge_entities(self, pattern_entities: List[EntityMention], 
                        llm_entities: List[EntityMention]) -> List[EntityMention]:
        """Merge pattern and LLM entities, preferring pattern matches."""
        merged = list(pattern_entities)
        pattern_spans = {(e.start_offset, e.end_offset) for e in pattern_entities}
        
        for llm_entity in llm_entities:
            # Check for overlap with pattern entities
            overlaps = any(
                self._spans_overlap((llm_entity.start_offset, llm_entity.end_offset), span)
                for span in pattern_spans
            )
            if not overlaps:
                merged.append(llm_entity)
        
        return sorted(merged, key=lambda e: e.start_offset)
    
    @staticmethod
    def _spans_overlap(span1: tuple, span2: tuple) -> bool:
        return not (span1[1] <= span2[0] or span2[1] <= span1[0])

Staff and Role Extraction

AGT.STF (Staff Members)

class StaffMember(BaseModel):
    """Extracted staff member."""
    name: str
    role: Optional[str] = None
    role_type: str = Field(description="ROL.OCC, ROL.POS, ROL.HON, etc.")
    institution: Optional[str] = None
    current: bool = True

STAFF_PATTERNS = {
    # Dutch patterns
    "dutch_role_name": r"(?P<role>(?:directeur|curator|archivaris|conservator|bibliothecaris|hoofd|medewerker)\s+(?:van\s+)?)?(?P<name>[A-Z][a-z]+(?:\s+(?:van\s+(?:de|den|der|het)\s+)?[A-Z][a-z]+)+)",
    "dutch_name_role": r"(?P<name>[A-Z][a-z]+(?:\s+(?:van\s+(?:de|den|der|het)\s+)?[A-Z][a-z]+)+),?\s+(?P<role>directeur|curator|archivaris|conservator|bibliothecaris)",
    
    # English patterns
    "english_role_name": r"(?P<role>(?:Director|Curator|Archivist|Librarian|Head|Chief)\s+(?:of\s+)?)?(?P<name>[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)",
    
    # Title + name patterns
    "titled_name": r"(?P<title>(?:Prof\.?|Dr\.?|Mr\.?|Ms\.?|Drs\.?)\s+)?(?P<name>[A-Z][a-z]+(?:\s+(?:van\s+(?:de|den|der|het)\s+)?[A-Z][a-z]+)+)",
}

def extract_staff(text: str, institution_context: str = None) -> List[StaffMember]:
    """Extract staff members with their roles."""
    results = []
    
    for pattern_name, pattern in STAFF_PATTERNS.items():
        for match in re.finditer(pattern, text):
            groups = match.groupdict()
            results.append(StaffMember(
                name=groups.get("name", "").strip(),
                role=groups.get("role", "").strip() if groups.get("role") else None,
                role_type="ROL.OCC" if groups.get("role") else "ROL.POS",
                institution=institution_context
            ))
    
    return results

Collection Name Extraction (APP.COL)

COLLECTION_PATTERNS = {
    # Named collections
    "named_collection": r"(?:collectie|collection|verzameling|fonds|archief)\s+(?:van\s+)?([A-Z][a-z]+(?:\s+[A-Za-z]+)*)",
    
    # Archival fonds
    "archival_fonds": r"(?:Fonds|Archief)\s+([A-Z][a-z]+(?:\s+(?:van\s+(?:de|den|der|het)\s+)?[A-Za-z]+)*)",
    
    # Subject collections
    "subject_collection": r"([A-Z][a-z]+(?:\s+[A-Za-z]+)*)\s+(?:collectie|collection|verzameling)",
}

def extract_collections(text: str) -> List[dict]:
    """Extract collection names from text."""
    results = []
    
    for pattern_name, pattern in COLLECTION_PATTERNS.items():
        for match in re.finditer(pattern, text, re.IGNORECASE):
            results.append({
                "name": match.group(1).strip(),
                "type": pattern_name,
                "span": (match.start(), match.end())
            })
    
    return results

Change Event Detection

CHANGE_EVENT_PATTERNS = {
    "FOUNDING": [
        r"(?:opgericht|gesticht|founded|established)\s+(?:in\s+)?(\d{4})",
        r"(?:since|sinds)\s+(\d{4})",
        r"(?:founded|opgericht)\s+(?:by\s+|door\s+)?[\w\s]+\s+in\s+(\d{4})",
    ],
    "MERGER": [
        r"(?:fusie|merger|merged)\s+(?:met|with)\s+([\w\s]+)",
        r"(?:samengevoegd|combined)\s+(?:met|with)\s+([\w\s]+)",
        r"(?:arose|ontstaan)\s+(?:from|uit)\s+(?:the\s+)?(?:merger|fusie)\s+(?:of|van)\s+([\w\s]+)",
    ],
    "CLOSURE": [
        r"(?:gesloten|closed|dissolved)\s+(?:in\s+)?(\d{4})",
        r"(?:ceased\s+operations|opgeheven)\s+(?:in\s+)?(\d{4})",
    ],
    "RELOCATION": [
        r"(?:verhuisd|moved|relocated)\s+(?:naar|to)\s+([\w\s]+)",
        r"(?:new\s+location|nieuwe\s+locatie)\s+(?:in|at)\s+([\w\s]+)",
    ],
    "NAME_CHANGE": [
        r"(?:formerly|voorheen)\s+(?:known\s+as\s+)?([\w\s]+)",
        r"(?:renamed|hernoemd)\s+(?:to|naar)\s+([\w\s]+)",
    ],
}

def extract_change_events(text: str) -> List[dict]:
    """Extract organizational change events."""
    results = []
    
    for event_type, patterns in CHANGE_EVENT_PATTERNS.items():
        for pattern in patterns:
            for match in re.finditer(pattern, text, re.IGNORECASE):
                results.append({
                    "event_type": event_type,
                    "extracted_value": match.group(1).strip() if match.groups() else match.group(),
                    "span": (match.start(), match.end()),
                    "full_match": match.group()
                })
    
    return results

Integration with DSPy Pipeline

Full Extraction Pipeline

class HeritageNERPipeline(dspy.Module):
    """Complete NER pipeline for heritage institution extraction."""
    
    def __init__(self):
        super().__init__()
        self.entity_extractor = HybridEntityExtractor()
        self.type_classifier = dspy.ChainOfThought(CustodianTypeClassifier)
    
    def forward(self, text: str, source_metadata: dict = None) -> dict:
        # 1. Detect language
        language = self._detect_language(text)
        
        # 2. Extract all entities
        ner_result = self.entity_extractor(text=text, language=language)
        
        # 3. Extract identifiers (high precision)
        identifiers = extract_identifiers(text)
        
        # 4. Extract temporal expressions
        temporals = extract_temporal(text)
        
        # 5. Extract collections
        collections = extract_collections(text)
        
        # 6. Extract change events
        events = extract_change_events(text)
        
        # 7. Classify heritage institutions
        heritage_entities = [
            e for e in ner_result.entities 
            if e.entity_type.startswith("GRP.HER")
        ]
        
        classified = []
        for entity in heritage_entities:
            context = self._get_entity_context(text, entity)
            classification = self.type_classifier(text=context)
            classified.append({
                "entity": entity,
                "classification": classification.classification
            })
        
        return {
            "entities": ner_result.entities,
            "identifiers": identifiers,
            "temporals": temporals,
            "collections": collections,
            "events": events,
            "classified_institutions": classified,
            "language": language,
            "source_metadata": source_metadata
        }
    
    def _detect_language(self, text: str) -> str:
        """Simple language detection."""
        dutch_indicators = ["de", "het", "van", "en", "voor", "museum", "archief"]
        german_indicators = ["der", "die", "das", "und", "für", "archiv"]
        
        text_lower = text.lower()
        dutch_count = sum(1 for w in dutch_indicators if f" {w} " in text_lower)
        german_count = sum(1 for w in german_indicators if f" {w} " in text_lower)
        
        if dutch_count > german_count:
            return "nl"
        elif german_count > dutch_count:
            return "de"
        return "en"
    
    def _get_entity_context(self, text: str, entity: EntityMention, window: int = 200) -> str:
        """Get surrounding context for an entity."""
        start = max(0, entity.start_offset - window)
        end = min(len(text), entity.end_offset + window)
        return text[start:end]

Output Format

LinkML-Compliant Entity Output

# Example extraction output conforming to Heritage Custodian Ontology

entities:
  - id: "extraction_001"
    text: "Rijksmuseum Amsterdam"
    entity_type: "GRP.HER.MUS"
    start_offset: 45
    end_offset: 66
    confidence: 0.95
    normalized:
      custodian_name: "Rijksmuseum"
      city: "Amsterdam"
      country_code: "NL"
    linking_candidates:
      - wikidata_id: "Q190804"
        confidence: 0.98
        
  - id: "extraction_002"
    text: "ISIL code NL-AmRM"
    entity_type: "IDENTIFIER"
    start_offset: 120
    end_offset: 137
    confidence: 0.99
    normalized:
      scheme: "ISIL"
      value: "NL-AmRM"
      valid: true

temporals:
  - text: "founded in 1808"
    type: "FOUNDING"
    normalized: "1808-01-01"
    precision: "year"
    
  - text: "19th century"
    type: "century"
    normalized: "1800/1899"
    precision: "century"

collections:
  - name: "Nachtwacht"
    type: "named_collection"
    custodian: "Rijksmuseum"

provenance:
  extraction_date: "2025-12-12T10:00:00Z"
  extraction_method: "ch_annotator-v1_7_0"
  extraction_agent: "HybridEntityExtractor"
  source_file: "conversations/dutch_glam_01.json"

See Also