glam/docs/dspy_rag/02-dspy-signatures.md
2025-12-12 12:51:10 +01:00

20 KiB

DSPy Signatures for Heritage Custodian Extraction

Overview

This document defines the DSPy signatures (module interfaces) for the Heritage Custodian RAG pipeline. Each signature maps to LinkML classes and follows the CH-Annotator v1.7.0 convention.

Core Signatures

1. CustodianTypeClassifier

Classifies text chunks into GLAMORCUBESFIXPHDNT categories.

import dspy
from typing import List, Optional
from pydantic import BaseModel, Field

class CustodianTypeOutput(BaseModel):
    """Output schema for custodian type classification."""
    primary_type: str = Field(description="Primary GLAMORCUBESFIXPHDNT type code (G, L, A, M, O, R, C, U, B, E, S, F, I, X, P, H, D, N, T)")
    primary_type_label: str = Field(description="Human-readable type label")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence score")
    secondary_types: List[str] = Field(default=[], description="Additional types if MIXED")
    reasoning: str = Field(description="Explanation for classification")

class CustodianTypeClassifier(dspy.Signature):
    """Classify heritage institution text into GLAMORCUBESFIXPHDNT taxonomy.
    
    The 19 types are:
    - G: GALLERY (art gallery, exhibition space)
    - L: LIBRARY (public, academic, national library)
    - A: ARCHIVE (government, corporate, personal archive)
    - M: MUSEUM (art, history, science museum)
    - O: OFFICIAL_INSTITUTION (government heritage agency)
    - R: RESEARCH_CENTER (research institute, documentation center)
    - C: COMMERCIAL (corporate heritage, company archive)
    - U: UNSPECIFIED (cannot determine type)
    - B: BIO_CUSTODIAN (botanical garden, zoo, aquarium)
    - E: EDUCATION_PROVIDER (university with collections)
    - S: HERITAGE_SOCIETY (historical society, heemkundige kring)
    - F: FEATURE_CUSTODIAN (monument, historic mansion)
    - I: INTANGIBLE_HERITAGE_GROUP (folklore, oral tradition)
    - X: MIXED (multiple types simultaneously)
    - P: PERSONAL_COLLECTION (private collector)
    - H: HOLY_SACRED_SITE (church archive, monastery library)
    - D: DIGITAL_PLATFORM (online archive, virtual museum)
    - N: NON_PROFIT (heritage NGO)
    - T: TASTE_SCENT_HERITAGE (culinary, perfumery heritage)
    """
    
    text: str = dspy.InputField(desc="Text describing a heritage institution")
    context: Optional[str] = dspy.InputField(desc="Additional context (country, source)", default=None)
    
    classification: CustodianTypeOutput = dspy.OutputField(desc="Classification result")


# Example usage
class CustodianTypeModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.classifier = dspy.ChainOfThought(CustodianTypeClassifier)
    
    def forward(self, text: str, context: str = None) -> CustodianTypeOutput:
        result = self.classifier(text=text, context=context)
        return result.classification

2. CustodianEntityExtractor

Extracts heritage institution entities from text.

class CustodianEntity(BaseModel):
    """Extracted custodian entity conforming to LinkML Custodian class."""
    name: str = Field(description="Official emic name of institution")
    custodian_type: str = Field(description="GLAMORCUBESFIXPHDNT type code")
    description: Optional[str] = Field(default=None, description="Institution description")
    alternative_names: List[str] = Field(default=[], description="Alternative names, abbreviations")
    founding_date: Optional[str] = Field(default=None, description="Founding date (ISO 8601)")
    
    # Location (maps to CustodianPlace)
    city: Optional[str] = Field(default=None)
    country_code: Optional[str] = Field(default=None, description="ISO 3166-1 alpha-2")
    address: Optional[str] = Field(default=None)
    
    # Identifiers
    wikidata_id: Optional[str] = Field(default=None, description="Wikidata Q-number")
    isil_code: Optional[str] = Field(default=None, description="ISIL identifier")
    website: Optional[str] = Field(default=None)
    
    # Extraction metadata
    text_span: str = Field(description="Original text span mentioning entity")
    confidence: float = Field(ge=0.0, le=1.0)

class CustodianExtractorOutput(BaseModel):
    entities: List[CustodianEntity] = Field(description="Extracted custodian entities")
    entity_count: int = Field(description="Number of entities extracted")

class CustodianEntityExtractor(dspy.Signature):
    """Extract heritage institution entities from text.
    
    Focus on:
    - Institution names (museums, archives, libraries, etc.)
    - Associated locations
    - Identifiers mentioned (ISIL, Wikidata, etc.)
    - Organizational details (founding date, type)
    
    Return structured entities conforming to the Heritage Custodian ontology.
    """
    
    text: str = dspy.InputField(desc="Text potentially containing heritage institution mentions")
    expected_types: List[str] = dspy.InputField(
        desc="Expected custodian types to look for (e.g., ['MUSEUM', 'ARCHIVE'])",
        default=[]
    )
    country_hint: Optional[str] = dspy.InputField(
        desc="Country context for better extraction",
        default=None
    )
    
    extracted: CustodianExtractorOutput = dspy.OutputField(desc="Extracted entities")

3. IdentifierExtractor

Extracts and validates external identifiers.

class Identifier(BaseModel):
    """Extracted identifier conforming to LinkML Identifier class."""
    scheme: str = Field(description="Identifier scheme (ISIL, Wikidata, VIAF, KvK, ISNI)")
    value: str = Field(description="Identifier value")
    url: Optional[str] = Field(default=None, description="Resolvable URL")
    confidence: float = Field(ge=0.0, le=1.0)
    valid: bool = Field(description="Whether identifier passes format validation")

class IdentifierExtractorOutput(BaseModel):
    identifiers: List[Identifier]
    
class IdentifierExtractor(dspy.Signature):
    """Extract external identifiers from text.
    
    Identifier patterns:
    - ISIL: XX-XXXXX (e.g., NL-AmRM, US-DLC)
    - Wikidata: Q followed by digits (Q190804)
    - VIAF: viaf.org/viaf/NNNNNN
    - KvK: 8-digit Dutch chamber of commerce number
    - ISNI: 16-digit ISNI (0000 0001 2345 6789)
    - ROR: 0xxxxxxxx (Research Organization Registry)
    
    Validate format and return structured identifiers.
    """
    
    text: str = dspy.InputField(desc="Text containing potential identifiers")
    
    extracted: IdentifierExtractorOutput = dspy.OutputField(desc="Extracted identifiers")


class IdentifierExtractorModule(dspy.Module):
    """Module with validation logic for identifiers."""
    
    PATTERNS = {
        "ISIL": r"[A-Z]{2}-[A-Za-z0-9]+",
        "Wikidata": r"Q\d+",
        "VIAF": r"viaf\.org/viaf/(\d+)",
        "KvK": r"\b\d{8}\b",
        "ISNI": r"\d{4}\s?\d{4}\s?\d{4}\s?\d{4}",
        "ROR": r"0[a-z0-9]{8}",
    }
    
    def __init__(self):
        super().__init__()
        self.extractor = dspy.ChainOfThought(IdentifierExtractor)
    
    def validate_identifier(self, scheme: str, value: str) -> bool:
        import re
        pattern = self.PATTERNS.get(scheme)
        if not pattern:
            return False
        return bool(re.match(pattern, value))
    
    def forward(self, text: str) -> IdentifierExtractorOutput:
        result = self.extractor(text=text)
        # Post-validate identifiers
        for ident in result.extracted.identifiers:
            ident.valid = self.validate_identifier(ident.scheme, ident.value)
        return result.extracted

4. CollectionExtractor

Extracts collection metadata.

class Collection(BaseModel):
    """Extracted collection conforming to LinkML CustodianCollection class."""
    name: str = Field(description="Collection name")
    description: Optional[str] = Field(default=None)
    collection_type: Optional[str] = Field(
        default=None, 
        description="archival, bibliographic, museum_objects, audio_visual, etc."
    )
    subject_areas: List[str] = Field(default=[], description="Subject/topic areas")
    temporal_extent: Optional[str] = Field(
        default=None, 
        description="Time period covered (e.g., '1800-1900', 'medieval')"
    )
    extent: Optional[str] = Field(
        default=None,
        description="Size/extent (e.g., '10,000 items', '500 linear meters')"
    )
    custodian_name: Optional[str] = Field(
        default=None,
        description="Name of institution holding collection"
    )

class CollectionExtractorOutput(BaseModel):
    collections: List[Collection]

class CollectionExtractor(dspy.Signature):
    """Extract heritage collection information from text.
    
    Look for:
    - Named collections (e.g., "Van Gogh collection")
    - Collection descriptions
    - Subject areas and themes
    - Temporal coverage (centuries, periods)
    - Extent/size information
    - Digital availability
    
    Map to CIDOC-CRM E78_Curated_Holding / PREMIS vocabulary.
    """
    
    text: str = dspy.InputField(desc="Text describing collections")
    custodian_context: Optional[str] = dspy.InputField(
        desc="Institution context",
        default=None
    )
    
    extracted: CollectionExtractorOutput = dspy.OutputField(desc="Extracted collections")

5. RelationshipExtractor

Extracts relationships between entities.

class Relationship(BaseModel):
    """Extracted relationship between heritage entities."""
    source_entity: str = Field(description="Source entity name")
    relationship_type: str = Field(
        description="Relationship type: member_of, part_of, collaborated_with, merged_with, etc."
    )
    target_entity: str = Field(description="Target entity name")
    temporal: Optional[str] = Field(default=None, description="When relationship held")
    confidence: float = Field(ge=0.0, le=1.0)

class RelationshipExtractorOutput(BaseModel):
    relationships: List[Relationship]

class RelationshipExtractor(dspy.Signature):
    """Extract relationships between heritage institutions.
    
    Relationship types (from ontology):
    - member_of: Institution is member of EncompassingBody
    - part_of: Institution is organizational unit of another
    - collaborated_with: Partnership/project collaboration
    - merged_with: Historical merger (ChangeEvent)
    - split_from: Historical split (ChangeEvent)
    - succeeded_by: Institution succession
    - participated_in_project: Project participation
    - manages_collection: Collection management responsibility
    
    Map to W3C ORG ontology (org:memberOf, org:subOrganizationOf, etc.)
    """
    
    text: str = dspy.InputField(desc="Text describing institutional relationships")
    known_entities: List[str] = dspy.InputField(
        desc="Already-extracted entity names for reference",
        default=[]
    )
    
    extracted: RelationshipExtractorOutput = dspy.OutputField(desc="Extracted relationships")

6. ChangeEventExtractor

Extracts organizational change events.

class ChangeEvent(BaseModel):
    """Extracted change event conforming to LinkML ChangeEvent class."""
    event_type: str = Field(
        description="FOUNDING, CLOSURE, MERGER, SPLIT, ACQUISITION, RELOCATION, NAME_CHANGE, TYPE_CHANGE, STATUS_CHANGE, RESTRUCTURING, LEGAL_CHANGE"
    )
    event_date: Optional[str] = Field(default=None, description="ISO 8601 date")
    description: str = Field(description="Event description")
    affected_entities: List[str] = Field(description="Entities affected by event")
    resulting_entities: List[str] = Field(default=[], description="Entities resulting from event")
    confidence: float = Field(ge=0.0, le=1.0)

class ChangeEventExtractorOutput(BaseModel):
    events: List[ChangeEvent]

class ChangeEventExtractor(dspy.Signature):
    """Extract organizational change events from text.
    
    Event types (from ChangeTypeEnum):
    - FOUNDING: "established", "founded", "created", "opened"
    - CLOSURE: "closed", "dissolved", "ceased operations"
    - MERGER: "merged with", "combined with", "absorbed"
    - SPLIT: "split into", "divided into", "spun off"
    - ACQUISITION: "acquired", "took over"
    - RELOCATION: "moved to", "relocated to"
    - NAME_CHANGE: "renamed to", "formerly known as"
    - TYPE_CHANGE: "became a museum", "converted to archive"
    - STATUS_CHANGE: "reopened", "temporarily closed"
    - RESTRUCTURING: "reorganized", "restructured"
    - LEGAL_CHANGE: "incorporated as", "became a foundation"
    
    Map to CIDOC-CRM E5_Event / PROV-O Activity.
    """
    
    text: str = dspy.InputField(desc="Text describing organizational changes")
    
    extracted: ChangeEventExtractorOutput = dspy.OutputField(desc="Extracted events")

7. WebClaimExtractor

Extracts claims from web pages with XPath provenance.

class WebClaim(BaseModel):
    """Extracted web claim conforming to CanonicalClaimTypes enum."""
    claim_type: str = Field(
        description="Canonical claim type: full_name, description, email, phone, address, etc."
    )
    claim_value: str = Field(description="Extracted value")
    xpath: str = Field(description="XPath to source element")
    confidence: float = Field(ge=0.0, le=1.0)
    tier: int = Field(ge=1, le=3, description="Reliability tier (1=structural, 2=pattern, 3=NLP)")

class WebClaimExtractorOutput(BaseModel):
    claims: List[WebClaim]
    source_url: str
    retrieved_on: str

class WebClaimExtractor(dspy.Signature):
    """Extract claims from web page content with XPath provenance.
    
    CRITICAL: Every claim MUST have an XPath pointer to the source element.
    Claims without XPath are considered fabricated per AGENTS.md Rule 6.
    
    Claim tiers:
    - Tier 1 (STRUCTURAL): page_title, page_count, image_count (from HTML structure)
    - Tier 2 (PATTERN): social_facebook, isil_code, kvk_number (regex patterns)
    - Tier 3 (NLP): full_name, description, address (requires XPath verification)
    
    Use Docling output for structured extraction.
    """
    
    html_content: str = dspy.InputField(desc="HTML content or Docling markdown")
    source_url: str = dspy.InputField(desc="URL of source page")
    
    extracted: WebClaimExtractorOutput = dspy.OutputField(desc="Extracted claims with XPath")

Composite Modules

HeritageEntityPipeline

Combines extractors into a full pipeline.

class HeritageEntityPipeline(dspy.Module):
    """Full heritage entity extraction pipeline."""
    
    def __init__(self):
        super().__init__()
        self.type_classifier = CustodianTypeModule()
        self.entity_extractor = dspy.ChainOfThought(CustodianEntityExtractor)
        self.identifier_extractor = IdentifierExtractorModule()
        self.collection_extractor = dspy.ChainOfThought(CollectionExtractor)
        self.relationship_extractor = dspy.ChainOfThought(RelationshipExtractor)
        self.change_event_extractor = dspy.ChainOfThought(ChangeEventExtractor)
    
    def forward(
        self, 
        text: str, 
        expected_types: List[str] = None,
        country_hint: str = None
    ) -> dict:
        # 1. Classify document type
        type_result = self.type_classifier(text=text, context=country_hint)
        
        # 2. Extract entities
        entities = self.entity_extractor(
            text=text,
            expected_types=expected_types or [type_result.primary_type],
            country_hint=country_hint
        )
        
        # 3. Extract identifiers
        identifiers = self.identifier_extractor(text=text)
        
        # 4. Extract collections
        collections = self.collection_extractor(text=text)
        
        # 5. Extract relationships
        entity_names = [e.name for e in entities.extracted.entities]
        relationships = self.relationship_extractor(
            text=text,
            known_entities=entity_names
        )
        
        # 6. Extract change events
        events = self.change_event_extractor(text=text)
        
        return {
            "document_type": type_result,
            "entities": entities.extracted,
            "identifiers": identifiers,
            "collections": collections.extracted,
            "relationships": relationships.extracted,
            "events": events.extracted,
        }

QueryRouter

Routes queries to appropriate retrieval strategies.

class QueryIntent(BaseModel):
    """Classified query intent."""
    intent_type: str = Field(
        description="factual, comparative, exploratory, relationship, temporal"
    )
    custodian_types: List[str] = Field(
        description="Relevant GLAMORCUBESFIXPHDNT types"
    )
    geographic_scope: Optional[str] = Field(default=None, description="Country/region filter")
    temporal_scope: Optional[str] = Field(default=None, description="Time period filter")
    entity_mentions: List[str] = Field(default=[], description="Mentioned entity names")

class QueryRouter(dspy.Signature):
    """Route heritage queries to appropriate retrieval strategies.
    
    Intent types:
    - factual: "What is the ISIL code for Rijksmuseum?"
    - comparative: "Compare Dutch and Belgian archive systems"
    - exploratory: "What museums exist in Limburg?"
    - relationship: "Which institutions are members of NDE?"
    - temporal: "How has the Noord-Hollands Archief changed since 2000?"
    
    Route to:
    - Vector retrieval for semantic similarity
    - Graph traversal for relationships
    - SPARQL for structured queries
    - Hybrid for complex questions
    """
    
    query: str = dspy.InputField(desc="User query")
    
    intent: QueryIntent = dspy.OutputField(desc="Classified query intent")

Optimizers and Metrics

Extraction Accuracy Metric

def extraction_accuracy(pred: CustodianExtractorOutput, gold: CustodianExtractorOutput) -> float:
    """Measure extraction accuracy against gold standard."""
    
    pred_names = {e.name.lower() for e in pred.entities}
    gold_names = {e.name.lower() for e in gold.entities}
    
    if not gold_names:
        return 1.0 if not pred_names else 0.0
    
    precision = len(pred_names & gold_names) / len(pred_names) if pred_names else 0.0
    recall = len(pred_names & gold_names) / len(gold_names)
    
    if precision + recall == 0:
        return 0.0
    
    f1 = 2 * (precision * recall) / (precision + recall)
    return f1

Type Classification Metric

def type_classification_accuracy(pred: CustodianTypeOutput, gold: CustodianTypeOutput) -> float:
    """Measure type classification accuracy."""
    
    # Primary type match
    primary_match = 1.0 if pred.primary_type == gold.primary_type else 0.0
    
    # Secondary types (for MIXED)
    if gold.secondary_types:
        pred_set = set(pred.secondary_types)
        gold_set = set(gold.secondary_types)
        secondary_jaccard = len(pred_set & gold_set) / len(pred_set | gold_set) if (pred_set | gold_set) else 1.0
    else:
        secondary_jaccard = 1.0 if not pred.secondary_types else 0.5
    
    return 0.7 * primary_match + 0.3 * secondary_jaccard

DSPy Optimizer Configuration

from dspy.teleprompt import BootstrapFewShot

# Training data
trainset = [
    dspy.Example(
        text="The Rijksmuseum in Amsterdam is one of the most famous art museums in the world...",
        expected_types=["MUSEUM"],
        country_hint="NL"
    ).with_inputs("text", "expected_types", "country_hint"),
    # ... more examples
]

# Optimizer
optimizer = BootstrapFewShot(
    metric=extraction_accuracy,
    max_bootstrapped_demos=4,
    max_labeled_demos=16,
)

# Compile
compiled_pipeline = optimizer.compile(
    HeritageEntityPipeline(),
    trainset=trainset
)

Example Invocations

Extract from Conversation JSON

import json

# Load conversation
with open("data/conversations/brazilian_glam.json") as f:
    conv = json.load(f)

# Extract from assistant messages
pipeline = HeritageEntityPipeline()
for msg in conv["chat_messages"]:
    if msg["sender"] == "assistant":
        text = msg["text"]
        result = pipeline(text=text, country_hint="BR")
        
        for entity in result["entities"].entities:
            print(f"Found: {entity.name} ({entity.custodian_type})")

Extract from Web Archive

from docling.document_converter import DocumentConverter

# Convert HTML to markdown
converter = DocumentConverter()
doc = converter.convert("data/web/rijksmuseum.html")
markdown = doc.document.export_to_markdown()

# Extract with web claims
claim_extractor = dspy.ChainOfThought(WebClaimExtractor)
claims = claim_extractor(
    html_content=markdown,
    source_url="https://www.rijksmuseum.nl/"
)

for claim in claims.extracted.claims:
    print(f"{claim.claim_type}: {claim.claim_value} (xpath: {claim.xpath})")