glam/docs/dspy_rag/03-chunking-strategy.md
2025-12-12 12:51:10 +01:00

16 KiB

Schema-Aware Chunking Strategy

Overview

Traditional chunking strategies (fixed-size, recursive split) lose ontological context. This document describes a schema-aware chunking strategy that respects the Heritage Custodian ontology structure.

Chunking Hierarchy

Document
    │
    ├── Custodian Chunks (entity boundaries)
    │   ├── Name mentions
    │   ├── Type indicators
    │   └── Description blocks
    │
    ├── Observation Chunks (evidence units)
    │   ├── Source attribution
    │   ├── Claim statements
    │   └── XPath pointers
    │
    ├── Collection Chunks (holdings descriptions)
    │   ├── Collection names
    │   ├── Subject areas
    │   └── Extent information
    │
    ├── Relationship Chunks (inter-entity links)
    │   ├── Membership statements
    │   ├── Collaboration mentions
    │   └── Historical events
    │
    └── Context Chunks (background information)
        ├── Country/region context
        ├── Domain background
        └── Temporal context

Chunking Strategies by Source Type

1. Conversation JSON Chunking

Conversations are chunked by message boundaries with entity co-reference:

class ConversationChunker:
    """Chunk conversation JSON respecting entity boundaries."""
    
    def __init__(self, entity_extractor):
        self.entity_extractor = entity_extractor
        self.coref_resolver = EntityCorefResolver()
    
    def chunk(self, conversation: dict) -> List[Chunk]:
        chunks = []
        entity_mentions = {}  # Track entity mentions across messages
        
        for i, message in enumerate(conversation["chat_messages"]):
            text = message["text"]
            sender = message["sender"]
            
            # Skip short messages
            if len(text) < 100:
                continue
            
            # Extract entities from this message
            entities = self.entity_extractor(text)
            
            # Resolve coreferences with previous mentions
            for entity in entities:
                resolved = self.coref_resolver.resolve(entity, entity_mentions)
                entity_mentions[resolved.name] = resolved
            
            # Create chunk with entity metadata
            chunk = Chunk(
                text=text,
                metadata={
                    "source_type": "conversation",
                    "conversation_id": conversation["uuid"],
                    "message_index": i,
                    "sender": sender,
                    "mentioned_entities": [e.name for e in entities],
                    "entity_types": list(set(e.custodian_type for e in entities)),
                }
            )
            chunks.append(chunk)
        
        return chunks

Chunk boundaries:

  • Message boundaries (natural conversation turns)
  • Long messages split at paragraph breaks
  • Entity mention continuity preserved

Metadata added:

  • conversation_id: Source conversation UUID
  • message_index: Position in conversation
  • sender: human/assistant
  • mentioned_entities: Entity names in chunk
  • entity_types: GLAMORCUBESFIXPHDNT types

2. Website HTML Chunking

Websites are chunked by semantic sections detected via Docling:

class WebsiteChunker:
    """Chunk website HTML using Docling structure analysis."""
    
    def __init__(self):
        self.docling = DocumentConverter()
        self.section_classifier = SectionClassifier()
    
    def chunk(self, html_path: str, source_url: str) -> List[Chunk]:
        # Convert to structured document
        doc = self.docling.convert(html_path)
        
        chunks = []
        for section in doc.document.sections:
            # Classify section type
            section_type = self.section_classifier.classify(section)
            
            # Skip navigation, footer, etc.
            if section_type in ["navigation", "footer", "sidebar"]:
                continue
            
            # Extract XPath for provenance
            xpath = self._get_xpath(section)
            
            chunk = Chunk(
                text=section.text,
                metadata={
                    "source_type": "website",
                    "source_url": source_url,
                    "section_type": section_type,  # about, contact, collection, etc.
                    "xpath": xpath,
                    "has_images": section.has_pictures,
                    "html_tag": section.original_tag,
                }
            )
            chunks.append(chunk)
        
        return chunks
    
    def _get_xpath(self, section) -> str:
        """Generate XPath for section element."""
        # Implementation depends on Docling structure
        pass

Section types detected:

  • about: Organization description (maps to Custodian.description)
  • contact: Contact information (maps to CustodianPlace)
  • collection: Collection descriptions (maps to CustodianCollection)
  • visit: Visitor information
  • news: Recent updates
  • history: Historical information (maps to ChangeEvent)

3. CSV Registry Chunking

CSV rows become individual chunks with schema mapping:

class CSVRegistryChunker:
    """Chunk CSV registry data with schema mapping."""
    
    SCHEMA_MAPPINGS = {
        # ISIL registry columns → LinkML slots
        "Instelling": "preferred_label",
        "Plaats": "settlement",
        "ISIL code": "isil_code",
        "Toegekend op": "registration_date",
        "Opmerking": "provenance_note",
        
        # Dutch organizations CSV columns
        "naam_organisatie": "preferred_label",
        "postadres_plaats": "settlement",
        "ISIL-code": "isil_code",
        "KvK-nummer": "registration_number",
        "Collectie Nederland": "aggregator_membership",
    }
    
    def chunk(self, csv_path: str, registry_type: str) -> List[Chunk]:
        df = pd.read_csv(csv_path, encoding='utf-8-sig')
        
        chunks = []
        for idx, row in df.iterrows():
            # Build text representation
            text_parts = []
            metadata = {
                "source_type": "csv_registry",
                "registry_type": registry_type,
                "row_index": idx,
                "data_tier": "TIER_1_AUTHORITATIVE",
            }
            
            for col, slot in self.SCHEMA_MAPPINGS.items():
                if col in row and pd.notna(row[col]):
                    value = str(row[col])
                    text_parts.append(f"{slot}: {value}")
                    metadata[slot] = value
            
            chunk = Chunk(
                text="\n".join(text_parts),
                metadata=metadata
            )
            chunks.append(chunk)
        
        return chunks

CSV sources:

  • ISIL registry (ISIL-codes_2025-08-01.csv)
  • Dutch organizations (voorbeeld_lijst_organisaties_en_diensten-totaallijst_nederland.csv)
  • Country-specific registries

4. YAML Instance Chunking

LinkML instance files are chunked by entity:

class YAMLInstanceChunker:
    """Chunk YAML instance files by entity."""
    
    def chunk(self, yaml_path: str) -> List[Chunk]:
        with open(yaml_path) as f:
            data = yaml.safe_load(f)
        
        # Handle single entity or list
        entities = data if isinstance(data, list) else [data]
        
        chunks = []
        for entity in entities:
            # Build text from entity fields
            text = self._entity_to_text(entity)
            
            # Extract metadata from entity
            metadata = {
                "source_type": "yaml_instance",
                "yaml_path": yaml_path,
                "hc_id": entity.get("hc_id"),
                "ghcid": entity.get("ghcid"),
                "custodian_type": entity.get("custodian_type"),
                "country_code": self._extract_country(entity),
            }
            
            chunk = Chunk(
                text=text,
                metadata=metadata
            )
            chunks.append(chunk)
        
        return chunks
    
    def _entity_to_text(self, entity: dict) -> str:
        """Convert entity dict to searchable text."""
        parts = []
        
        # Core identity
        if "preferred_label" in entity:
            parts.append(f"Name: {entity['preferred_label']}")
        if "description" in entity:
            parts.append(f"Description: {entity['description']}")
        
        # Location
        if "custodian_place" in entity:
            place = entity["custodian_place"]
            parts.append(f"Location: {place.get('settlement', '')} {place.get('country', '')}")
        
        # Collections
        if "collections" in entity:
            for coll in entity["collections"]:
                parts.append(f"Collection: {coll.get('collection_name', '')}")
        
        # Identifiers
        if "identifiers" in entity:
            for ident in entity["identifiers"]:
                parts.append(f"{ident['identifier_scheme']}: {ident['identifier_value']}")
        
        return "\n".join(parts)

Chunk Metadata Schema

All chunks share a common metadata schema for consistent retrieval:

class ChunkMetadata(BaseModel):
    """Universal chunk metadata schema."""
    
    # Source identification
    source_type: Literal["conversation", "website", "csv_registry", "yaml_instance", "wikidata"]
    source_id: str  # Unique source identifier
    
    # Ontology mapping
    primary_class: Optional[str] = None  # LinkML class name
    mentioned_classes: List[str] = []
    
    # Entity information
    custodian_type: Optional[str] = None  # GLAMORCUBESFIXPHDNT code
    mentioned_entities: List[str] = []
    ghcid: Optional[str] = None
    
    # Geographic scope
    country_code: Optional[str] = None  # ISO 3166-1 alpha-2
    region_code: Optional[str] = None
    settlement: Optional[str] = None
    
    # Provenance
    data_tier: str = "TIER_4_INFERRED"
    extraction_date: str
    confidence_score: float = 0.5
    
    # Retrieval hints
    language: str = "en"
    has_identifiers: bool = False
    has_collections: bool = False
    has_relationships: bool = False

Embedding Strategy

Multilingual Embeddings

Use BGE-M3 for multilingual support (NL, EN, DE, FR, ES, PT, etc.):

from sentence_transformers import SentenceTransformer

class HeritageEmbedder:
    """Domain-aware embedding generator."""
    
    def __init__(self):
        self.model = SentenceTransformer("BAAI/bge-m3")
        
        # Heritage domain prefixes for query/passage distinction
        self.QUERY_PREFIX = "Represent this heritage institution query: "
        self.PASSAGE_PREFIX = "Represent this heritage institution text: "
    
    def embed_passage(self, text: str, metadata: ChunkMetadata) -> List[float]:
        """Embed passage with domain context."""
        
        # Add type context for better discrimination
        context = ""
        if metadata.custodian_type:
            type_label = CUSTODIAN_TYPE_LABELS.get(metadata.custodian_type, "")
            context = f"[{type_label}] "
        
        if metadata.country_code:
            context += f"[{metadata.country_code}] "
        
        full_text = self.PASSAGE_PREFIX + context + text
        return self.model.encode(full_text).tolist()
    
    def embed_query(self, query: str, type_filter: str = None) -> List[float]:
        """Embed query with optional type context."""
        
        context = ""
        if type_filter:
            type_label = CUSTODIAN_TYPE_LABELS.get(type_filter, "")
            context = f"[{type_label}] "
        
        full_query = self.QUERY_PREFIX + context + query
        return self.model.encode(full_query).tolist()

Hybrid Embeddings

Combine dense and sparse embeddings for better retrieval:

class HybridEmbedder:
    """Combine dense and sparse embeddings."""
    
    def __init__(self):
        self.dense_model = SentenceTransformer("BAAI/bge-m3")
        self.sparse_model = BM25Encoder()  # Or SPLADE
    
    def embed(self, text: str, metadata: ChunkMetadata) -> dict:
        return {
            "dense": self.dense_model.encode(text).tolist(),
            "sparse": self.sparse_model.encode(text),
            "metadata": metadata.dict(),
        }

Chunk Size Guidelines

Source Type Target Size Max Size Overlap
Conversation 500 tokens 1000 tokens 100 tokens
Website 300 tokens 800 tokens 50 tokens
CSV Registry 1 row 1 row None
YAML Instance 1 entity 1 entity None

Deduplication

Prevent duplicate chunks from multiple sources:

class ChunkDeduplicator:
    """Deduplicate chunks using content hashing and entity matching."""
    
    def __init__(self):
        self.seen_hashes = set()
        self.entity_chunks = {}  # entity_name → best chunk
    
    def deduplicate(self, chunks: List[Chunk]) -> List[Chunk]:
        unique_chunks = []
        
        for chunk in chunks:
            # Content hash deduplication
            content_hash = hashlib.md5(chunk.text.encode()).hexdigest()
            if content_hash in self.seen_hashes:
                continue
            self.seen_hashes.add(content_hash)
            
            # Entity-based deduplication (keep highest tier)
            for entity in chunk.metadata.get("mentioned_entities", []):
                if entity in self.entity_chunks:
                    existing = self.entity_chunks[entity]
                    # Keep higher tier source
                    if self._tier_priority(chunk) > self._tier_priority(existing):
                        unique_chunks.remove(existing)
                        self.entity_chunks[entity] = chunk
                        unique_chunks.append(chunk)
                else:
                    self.entity_chunks[entity] = chunk
                    unique_chunks.append(chunk)
        
        return unique_chunks
    
    def _tier_priority(self, chunk: Chunk) -> int:
        tier = chunk.metadata.get("data_tier", "TIER_4_INFERRED")
        return {
            "TIER_1_AUTHORITATIVE": 4,
            "TIER_2_VERIFIED": 3,
            "TIER_3_CROWD_SOURCED": 2,
            "TIER_4_INFERRED": 1,
        }.get(tier, 0)

Implementation

Full Chunking Pipeline

class HeritageChunkingPipeline:
    """Complete chunking pipeline for heritage documents."""
    
    def __init__(self, schema_path: str):
        self.schema = load_linkml_schema(schema_path)
        self.conversation_chunker = ConversationChunker(entity_extractor)
        self.website_chunker = WebsiteChunker()
        self.csv_chunker = CSVRegistryChunker()
        self.yaml_chunker = YAMLInstanceChunker()
        self.embedder = HeritageEmbedder()
        self.deduplicator = ChunkDeduplicator()
    
    def process(self, source_path: str, source_type: str) -> List[Chunk]:
        # Route to appropriate chunker
        if source_type == "conversation":
            chunks = self.conversation_chunker.chunk(json.load(open(source_path)))
        elif source_type == "website":
            chunks = self.website_chunker.chunk(source_path)
        elif source_type == "csv":
            chunks = self.csv_chunker.chunk(source_path)
        elif source_type == "yaml":
            chunks = self.yaml_chunker.chunk(source_path)
        else:
            raise ValueError(f"Unknown source type: {source_type}")
        
        # Deduplicate
        chunks = self.deduplicator.deduplicate(chunks)
        
        # Generate embeddings
        for chunk in chunks:
            chunk.embedding = self.embedder.embed_passage(
                chunk.text, 
                ChunkMetadata(**chunk.metadata)
            )
        
        return chunks