16 KiB
16 KiB
Schema-Aware Chunking Strategy
Overview
Traditional chunking strategies (fixed-size, recursive split) lose ontological context. This document describes a schema-aware chunking strategy that respects the Heritage Custodian ontology structure.
Chunking Hierarchy
Document
│
├── Custodian Chunks (entity boundaries)
│ ├── Name mentions
│ ├── Type indicators
│ └── Description blocks
│
├── Observation Chunks (evidence units)
│ ├── Source attribution
│ ├── Claim statements
│ └── XPath pointers
│
├── Collection Chunks (holdings descriptions)
│ ├── Collection names
│ ├── Subject areas
│ └── Extent information
│
├── Relationship Chunks (inter-entity links)
│ ├── Membership statements
│ ├── Collaboration mentions
│ └── Historical events
│
└── Context Chunks (background information)
├── Country/region context
├── Domain background
└── Temporal context
Chunking Strategies by Source Type
1. Conversation JSON Chunking
Conversations are chunked by message boundaries with entity co-reference:
class ConversationChunker:
"""Chunk conversation JSON respecting entity boundaries."""
def __init__(self, entity_extractor):
self.entity_extractor = entity_extractor
self.coref_resolver = EntityCorefResolver()
def chunk(self, conversation: dict) -> List[Chunk]:
chunks = []
entity_mentions = {} # Track entity mentions across messages
for i, message in enumerate(conversation["chat_messages"]):
text = message["text"]
sender = message["sender"]
# Skip short messages
if len(text) < 100:
continue
# Extract entities from this message
entities = self.entity_extractor(text)
# Resolve coreferences with previous mentions
for entity in entities:
resolved = self.coref_resolver.resolve(entity, entity_mentions)
entity_mentions[resolved.name] = resolved
# Create chunk with entity metadata
chunk = Chunk(
text=text,
metadata={
"source_type": "conversation",
"conversation_id": conversation["uuid"],
"message_index": i,
"sender": sender,
"mentioned_entities": [e.name for e in entities],
"entity_types": list(set(e.custodian_type for e in entities)),
}
)
chunks.append(chunk)
return chunks
Chunk boundaries:
- Message boundaries (natural conversation turns)
- Long messages split at paragraph breaks
- Entity mention continuity preserved
Metadata added:
conversation_id: Source conversation UUIDmessage_index: Position in conversationsender: human/assistantmentioned_entities: Entity names in chunkentity_types: GLAMORCUBESFIXPHDNT types
2. Website HTML Chunking
Websites are chunked by semantic sections detected via Docling:
class WebsiteChunker:
"""Chunk website HTML using Docling structure analysis."""
def __init__(self):
self.docling = DocumentConverter()
self.section_classifier = SectionClassifier()
def chunk(self, html_path: str, source_url: str) -> List[Chunk]:
# Convert to structured document
doc = self.docling.convert(html_path)
chunks = []
for section in doc.document.sections:
# Classify section type
section_type = self.section_classifier.classify(section)
# Skip navigation, footer, etc.
if section_type in ["navigation", "footer", "sidebar"]:
continue
# Extract XPath for provenance
xpath = self._get_xpath(section)
chunk = Chunk(
text=section.text,
metadata={
"source_type": "website",
"source_url": source_url,
"section_type": section_type, # about, contact, collection, etc.
"xpath": xpath,
"has_images": section.has_pictures,
"html_tag": section.original_tag,
}
)
chunks.append(chunk)
return chunks
def _get_xpath(self, section) -> str:
"""Generate XPath for section element."""
# Implementation depends on Docling structure
pass
Section types detected:
about: Organization description (maps to Custodian.description)contact: Contact information (maps to CustodianPlace)collection: Collection descriptions (maps to CustodianCollection)visit: Visitor informationnews: Recent updateshistory: Historical information (maps to ChangeEvent)
3. CSV Registry Chunking
CSV rows become individual chunks with schema mapping:
class CSVRegistryChunker:
"""Chunk CSV registry data with schema mapping."""
SCHEMA_MAPPINGS = {
# ISIL registry columns → LinkML slots
"Instelling": "preferred_label",
"Plaats": "settlement",
"ISIL code": "isil_code",
"Toegekend op": "registration_date",
"Opmerking": "provenance_note",
# Dutch organizations CSV columns
"naam_organisatie": "preferred_label",
"postadres_plaats": "settlement",
"ISIL-code": "isil_code",
"KvK-nummer": "registration_number",
"Collectie Nederland": "aggregator_membership",
}
def chunk(self, csv_path: str, registry_type: str) -> List[Chunk]:
df = pd.read_csv(csv_path, encoding='utf-8-sig')
chunks = []
for idx, row in df.iterrows():
# Build text representation
text_parts = []
metadata = {
"source_type": "csv_registry",
"registry_type": registry_type,
"row_index": idx,
"data_tier": "TIER_1_AUTHORITATIVE",
}
for col, slot in self.SCHEMA_MAPPINGS.items():
if col in row and pd.notna(row[col]):
value = str(row[col])
text_parts.append(f"{slot}: {value}")
metadata[slot] = value
chunk = Chunk(
text="\n".join(text_parts),
metadata=metadata
)
chunks.append(chunk)
return chunks
CSV sources:
- ISIL registry (
ISIL-codes_2025-08-01.csv) - Dutch organizations (
voorbeeld_lijst_organisaties_en_diensten-totaallijst_nederland.csv) - Country-specific registries
4. YAML Instance Chunking
LinkML instance files are chunked by entity:
class YAMLInstanceChunker:
"""Chunk YAML instance files by entity."""
def chunk(self, yaml_path: str) -> List[Chunk]:
with open(yaml_path) as f:
data = yaml.safe_load(f)
# Handle single entity or list
entities = data if isinstance(data, list) else [data]
chunks = []
for entity in entities:
# Build text from entity fields
text = self._entity_to_text(entity)
# Extract metadata from entity
metadata = {
"source_type": "yaml_instance",
"yaml_path": yaml_path,
"hc_id": entity.get("hc_id"),
"ghcid": entity.get("ghcid"),
"custodian_type": entity.get("custodian_type"),
"country_code": self._extract_country(entity),
}
chunk = Chunk(
text=text,
metadata=metadata
)
chunks.append(chunk)
return chunks
def _entity_to_text(self, entity: dict) -> str:
"""Convert entity dict to searchable text."""
parts = []
# Core identity
if "preferred_label" in entity:
parts.append(f"Name: {entity['preferred_label']}")
if "description" in entity:
parts.append(f"Description: {entity['description']}")
# Location
if "custodian_place" in entity:
place = entity["custodian_place"]
parts.append(f"Location: {place.get('settlement', '')} {place.get('country', '')}")
# Collections
if "collections" in entity:
for coll in entity["collections"]:
parts.append(f"Collection: {coll.get('collection_name', '')}")
# Identifiers
if "identifiers" in entity:
for ident in entity["identifiers"]:
parts.append(f"{ident['identifier_scheme']}: {ident['identifier_value']}")
return "\n".join(parts)
Chunk Metadata Schema
All chunks share a common metadata schema for consistent retrieval:
class ChunkMetadata(BaseModel):
"""Universal chunk metadata schema."""
# Source identification
source_type: Literal["conversation", "website", "csv_registry", "yaml_instance", "wikidata"]
source_id: str # Unique source identifier
# Ontology mapping
primary_class: Optional[str] = None # LinkML class name
mentioned_classes: List[str] = []
# Entity information
custodian_type: Optional[str] = None # GLAMORCUBESFIXPHDNT code
mentioned_entities: List[str] = []
ghcid: Optional[str] = None
# Geographic scope
country_code: Optional[str] = None # ISO 3166-1 alpha-2
region_code: Optional[str] = None
settlement: Optional[str] = None
# Provenance
data_tier: str = "TIER_4_INFERRED"
extraction_date: str
confidence_score: float = 0.5
# Retrieval hints
language: str = "en"
has_identifiers: bool = False
has_collections: bool = False
has_relationships: bool = False
Embedding Strategy
Multilingual Embeddings
Use BGE-M3 for multilingual support (NL, EN, DE, FR, ES, PT, etc.):
from sentence_transformers import SentenceTransformer
class HeritageEmbedder:
"""Domain-aware embedding generator."""
def __init__(self):
self.model = SentenceTransformer("BAAI/bge-m3")
# Heritage domain prefixes for query/passage distinction
self.QUERY_PREFIX = "Represent this heritage institution query: "
self.PASSAGE_PREFIX = "Represent this heritage institution text: "
def embed_passage(self, text: str, metadata: ChunkMetadata) -> List[float]:
"""Embed passage with domain context."""
# Add type context for better discrimination
context = ""
if metadata.custodian_type:
type_label = CUSTODIAN_TYPE_LABELS.get(metadata.custodian_type, "")
context = f"[{type_label}] "
if metadata.country_code:
context += f"[{metadata.country_code}] "
full_text = self.PASSAGE_PREFIX + context + text
return self.model.encode(full_text).tolist()
def embed_query(self, query: str, type_filter: str = None) -> List[float]:
"""Embed query with optional type context."""
context = ""
if type_filter:
type_label = CUSTODIAN_TYPE_LABELS.get(type_filter, "")
context = f"[{type_label}] "
full_query = self.QUERY_PREFIX + context + query
return self.model.encode(full_query).tolist()
Hybrid Embeddings
Combine dense and sparse embeddings for better retrieval:
class HybridEmbedder:
"""Combine dense and sparse embeddings."""
def __init__(self):
self.dense_model = SentenceTransformer("BAAI/bge-m3")
self.sparse_model = BM25Encoder() # Or SPLADE
def embed(self, text: str, metadata: ChunkMetadata) -> dict:
return {
"dense": self.dense_model.encode(text).tolist(),
"sparse": self.sparse_model.encode(text),
"metadata": metadata.dict(),
}
Chunk Size Guidelines
| Source Type | Target Size | Max Size | Overlap |
|---|---|---|---|
| Conversation | 500 tokens | 1000 tokens | 100 tokens |
| Website | 300 tokens | 800 tokens | 50 tokens |
| CSV Registry | 1 row | 1 row | None |
| YAML Instance | 1 entity | 1 entity | None |
Deduplication
Prevent duplicate chunks from multiple sources:
class ChunkDeduplicator:
"""Deduplicate chunks using content hashing and entity matching."""
def __init__(self):
self.seen_hashes = set()
self.entity_chunks = {} # entity_name → best chunk
def deduplicate(self, chunks: List[Chunk]) -> List[Chunk]:
unique_chunks = []
for chunk in chunks:
# Content hash deduplication
content_hash = hashlib.md5(chunk.text.encode()).hexdigest()
if content_hash in self.seen_hashes:
continue
self.seen_hashes.add(content_hash)
# Entity-based deduplication (keep highest tier)
for entity in chunk.metadata.get("mentioned_entities", []):
if entity in self.entity_chunks:
existing = self.entity_chunks[entity]
# Keep higher tier source
if self._tier_priority(chunk) > self._tier_priority(existing):
unique_chunks.remove(existing)
self.entity_chunks[entity] = chunk
unique_chunks.append(chunk)
else:
self.entity_chunks[entity] = chunk
unique_chunks.append(chunk)
return unique_chunks
def _tier_priority(self, chunk: Chunk) -> int:
tier = chunk.metadata.get("data_tier", "TIER_4_INFERRED")
return {
"TIER_1_AUTHORITATIVE": 4,
"TIER_2_VERIFIED": 3,
"TIER_3_CROWD_SOURCED": 2,
"TIER_4_INFERRED": 1,
}.get(tier, 0)
Implementation
Full Chunking Pipeline
class HeritageChunkingPipeline:
"""Complete chunking pipeline for heritage documents."""
def __init__(self, schema_path: str):
self.schema = load_linkml_schema(schema_path)
self.conversation_chunker = ConversationChunker(entity_extractor)
self.website_chunker = WebsiteChunker()
self.csv_chunker = CSVRegistryChunker()
self.yaml_chunker = YAMLInstanceChunker()
self.embedder = HeritageEmbedder()
self.deduplicator = ChunkDeduplicator()
def process(self, source_path: str, source_type: str) -> List[Chunk]:
# Route to appropriate chunker
if source_type == "conversation":
chunks = self.conversation_chunker.chunk(json.load(open(source_path)))
elif source_type == "website":
chunks = self.website_chunker.chunk(source_path)
elif source_type == "csv":
chunks = self.csv_chunker.chunk(source_path)
elif source_type == "yaml":
chunks = self.yaml_chunker.chunk(source_path)
else:
raise ValueError(f"Unknown source type: {source_type}")
# Deduplicate
chunks = self.deduplicator.deduplicate(chunks)
# Generate embeddings
for chunk in chunks:
chunk.embedding = self.embedder.embed_passage(
chunk.text,
ChunkMetadata(**chunk.metadata)
)
return chunks