glam/scripts/enrich_kb_libraries_exa_provenance.py

1041 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Enrich KB Netherlands library entries with claim-level provenance tracking.
This script implements proper data provenance following the web_enrichment_provenance.yaml schema:
- Each factual claim has precise source references
- Character offsets in source markdown
- Markdown heading paths for structural context
- SHA-256 hashes for content verification
- Exa highlight indices when available
Usage:
python scripts/enrich_kb_libraries_exa_provenance.py [--dry-run] [--limit N] [--file FILENAME]
Schema: schemas/web_enrichment_provenance.yaml
"""
import os
import sys
import json
import yaml
import time
import hashlib
import re
import uuid
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field, asdict
import logging
import argparse
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Paths
ENTRIES_DIR = Path("/Users/kempersc/apps/glam/data/nde/enriched/entries")
REPORTS_DIR = Path("/Users/kempersc/apps/glam/reports")
# Rate limiting
REQUEST_DELAY = 1.5 # seconds between Exa requests
# =============================================================================
# DATA CLASSES (matching web_enrichment_provenance.yaml schema)
# =============================================================================
@dataclass
class SourceReference:
"""Precise reference to source text supporting a claim."""
source_id: str
text_excerpt: str
char_start: int
char_end: int
markdown_heading_path: Optional[str] = None
sentence_index: Optional[int] = None
exa_highlight_index: Optional[int] = None
relevance_score: Optional[float] = None
@dataclass
class Claim:
"""A single factual assertion extracted from web sources."""
claim_id: str
claim_type: str # ClaimTypeEnum value
field_path: str
value: Any
value_type: str # ValueTypeEnum value
source_references: List[SourceReference]
confidence_score: float
verified: bool = False
verified_by: Optional[str] = None
verified_date: Optional[str] = None
claim_notes: Optional[str] = None
@dataclass
class WebSource:
"""A web page fetched and used as source for claims."""
source_id: str
url: str
fetch_timestamp: str
http_status: Optional[int] = None
content_type: Optional[str] = None
title: Optional[str] = None
author: Optional[str] = None
published_date: Optional[str] = None
raw_markdown: Optional[str] = None
raw_markdown_hash: Optional[str] = None
exa_highlights: List[str] = field(default_factory=list)
exa_highlight_scores: List[float] = field(default_factory=list)
@dataclass
class WebEnrichment:
"""Container for all web-enriched data with full provenance tracking."""
enrichment_id: str
search_query: str
search_timestamp: str
search_engine: str
claims: List[Claim]
raw_sources: List[WebSource]
enrichment_status: str # EnrichmentStatusEnum value
enrichment_notes: Optional[str] = None
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def generate_enrichment_id() -> str:
"""Generate unique enrichment ID: enrich-YYYYMMDDTHHMMSS-xxxxxxxx"""
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')
suffix = uuid.uuid4().hex[:8]
return f"enrich-{timestamp}-{suffix}"
def generate_source_id(url: str) -> str:
"""Generate source ID from URL hash."""
return f"src-{hashlib.sha256(url.encode()).hexdigest()[:12]}"
def generate_claim_id(field_path: str, index: int = 1) -> str:
"""Generate claim ID from field path."""
safe_path = re.sub(r'[^a-z0-9_]', '_', field_path.lower())
return f"claim-{safe_path}-{index}"
def compute_content_hash(content: str) -> str:
"""Compute SHA-256 hash of content."""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def find_text_position(haystack: str, needle: str) -> Tuple[int, int]:
"""Find character positions of needle in haystack."""
start = haystack.find(needle)
if start == -1:
# Try case-insensitive
start = haystack.lower().find(needle.lower())
if start == -1:
return (-1, -1)
end = start + len(needle)
return (start, end)
def extract_markdown_heading_path(markdown: str, char_position: int) -> Optional[str]:
"""
Extract the markdown heading path for a given character position.
Returns format: "# H1 > ## H2 > ### H3"
"""
if char_position < 0 or char_position >= len(markdown):
return None
# Find all headings before this position
heading_pattern = r'^(#{1,6})\s+(.+)$'
headings = []
current_pos = 0
for line in markdown.split('\n'):
line_end = current_pos + len(line)
if current_pos > char_position:
break
match = re.match(heading_pattern, line)
if match:
level = len(match.group(1))
title = match.group(2).strip()
# Remove headings of same or lower level (they're siblings or parents being replaced)
headings = [(l, t) for l, t in headings if l < level]
headings.append((level, title))
current_pos = line_end + 1 # +1 for newline
if not headings:
return None
return " > ".join([f"{'#' * level} {title}" for level, title in headings])
def get_sentence_index(text: str, char_position: int) -> Optional[int]:
"""Get the sentence index for a character position."""
if char_position < 0:
return None
# Simple sentence splitting
sentences = re.split(r'(?<=[.!?])\s+', text[:char_position + 100])
current_pos = 0
for i, sentence in enumerate(sentences):
sentence_end = current_pos + len(sentence)
if char_position <= sentence_end:
return i
current_pos = sentence_end + 1
return len(sentences) - 1
# =============================================================================
# CLAIM EXTRACTION WITH PROVENANCE
# =============================================================================
class ClaimExtractor:
"""Extract claims from text with source references."""
def __init__(self, source: WebSource):
self.source = source
self.markdown = source.raw_markdown or ""
self.claims: List[Claim] = []
self.claim_counter: Dict[str, int] = {}
def _next_claim_id(self, field_path: str) -> str:
"""Get next claim ID for a field path."""
if field_path not in self.claim_counter:
self.claim_counter[field_path] = 0
self.claim_counter[field_path] += 1
return generate_claim_id(field_path, self.claim_counter[field_path])
def _create_source_reference(
self,
excerpt: str,
exa_highlight_index: Optional[int] = None,
relevance_score: Optional[float] = None
) -> Optional[SourceReference]:
"""Create a source reference with character offsets."""
char_start, char_end = find_text_position(self.markdown, excerpt)
if char_start == -1:
# Text not found - use approximate reference
return SourceReference(
source_id=self.source.source_id,
text_excerpt=excerpt[:200] + "..." if len(excerpt) > 200 else excerpt,
char_start=-1,
char_end=-1,
markdown_heading_path=None,
sentence_index=None,
exa_highlight_index=exa_highlight_index,
relevance_score=relevance_score
)
heading_path = extract_markdown_heading_path(self.markdown, char_start)
sentence_idx = get_sentence_index(self.markdown, char_start)
return SourceReference(
source_id=self.source.source_id,
text_excerpt=excerpt[:500] if len(excerpt) > 500 else excerpt,
char_start=char_start,
char_end=char_end,
markdown_heading_path=heading_path,
sentence_index=sentence_idx,
exa_highlight_index=exa_highlight_index,
relevance_score=relevance_score
)
def extract_description(self) -> Optional[Claim]:
"""Extract description claim from first meaningful paragraph."""
# Look for first substantial paragraph (>100 chars, not navigation)
paragraphs = re.split(r'\n\n+', self.markdown)
for para in paragraphs:
# Skip short paragraphs, navigation, lists
if len(para) < 100:
continue
if para.strip().startswith(('- ', '* ', '|', '#')):
continue
if 'skip to' in para.lower() or 'jump to' in para.lower():
continue
# Found a good paragraph
clean_para = re.sub(r'\s+', ' ', para).strip()
if len(clean_para) > 50:
excerpt = clean_para[:500]
ref = self._create_source_reference(excerpt)
if ref:
return Claim(
claim_id=self._next_claim_id("description"),
claim_type="DESCRIPTIVE",
field_path="description",
value=excerpt + "..." if len(clean_para) > 500 else clean_para,
value_type="STRING",
source_references=[ref],
confidence_score=0.8,
verified=False
)
return None
def extract_green_library_features(self) -> List[Claim]:
"""Extract claims about 'green library' features (specific to Schiedam case)."""
claims = []
text_lower = self.markdown.lower()
# Check for "green library" mention
if "'green' library" in text_lower or "green library" in text_lower:
# Find the specific text
patterns = [
(r"first 'green' library in the Netherlands", "notable_features.green_library.distinction"),
(r"first green library", "notable_features.green_library.distinction"),
(r"trees? (?:that )?(?:are |weigh(?:ing)? )?(\d+)[- ]to[- ](\d+)\s*(?:kilos?|kg)", "notable_features.green_library.tree_weights"),
(r"(\d+)[- ](?:to|-)[- ](\d+)\s*met(?:er|re)s?\s*high", "notable_features.green_library.tree_heights"),
(r"large trees?\s+(\d+)[- ](?:to|-)[- ](\d+)\s*met", "notable_features.green_library.tree_heights"),
]
for pattern, field_path in patterns:
match = re.search(pattern, self.markdown, re.IGNORECASE)
if match:
excerpt = match.group(0)
ref = self._create_source_reference(excerpt)
if ref:
# Determine value based on field
if "weights" in field_path:
value = f"{match.group(1)}-{match.group(2)} kg"
elif "heights" in field_path:
value = f"{match.group(1)}-{match.group(2)} meters"
else:
value = excerpt
claims.append(Claim(
claim_id=self._next_claim_id(field_path),
claim_type="ARCHITECTURAL",
field_path=field_path,
value=value,
value_type="STRING",
source_references=[ref],
confidence_score=0.9,
verified=False
))
return claims
def extract_sustainability_features(self) -> List[Claim]:
"""Extract sustainability/green design features."""
claims = []
sustainability_patterns = [
(r"recycled bookcases?.*?cardboard", "notable_features.sustainability", "Recycled bookcases made from industrial cardboard"),
(r"LED lighting", "notable_features.sustainability", "LED lighting"),
(r"climate control.*?planters?", "notable_features.sustainability", "Climate control system in planters"),
(r"chairs?.*?PET bottles?", "notable_features.sustainability", "Chairs made from recycled PET bottles"),
]
for pattern, field_path, default_value in sustainability_patterns:
match = re.search(pattern, self.markdown, re.IGNORECASE)
if match:
excerpt = match.group(0)
ref = self._create_source_reference(excerpt)
if ref:
claims.append(Claim(
claim_id=self._next_claim_id(field_path),
claim_type="ARCHITECTURAL",
field_path=field_path,
value=default_value,
value_type="STRING",
source_references=[ref],
confidence_score=0.85,
verified=False
))
return claims
def extract_tree_species(self) -> List[Claim]:
"""Extract tree species mentions."""
claims = []
# Pattern for tree species (Bucida, Tamarinde, Ficus, etc.)
species_pattern = r'\b(Bucida|Tamarinde?|Ficus|Ficus benjamina)\b'
matches = list(re.finditer(species_pattern, self.markdown, re.IGNORECASE))
if matches:
species_list = list(set(m.group(1).title() for m in matches))
# Use first match for source reference
first_match = matches[0]
# Get surrounding context
start = max(0, first_match.start() - 20)
end = min(len(self.markdown), first_match.end() + 20)
excerpt = self.markdown[start:end]
ref = self._create_source_reference(excerpt)
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("notable_features.green_library.tree_species"),
claim_type="DESCRIPTIVE",
field_path="notable_features.green_library.tree_species",
value=species_list,
value_type="LIST_STRING",
source_references=[ref],
confidence_score=0.95,
verified=False
))
return claims
def extract_accessibility_features(self) -> List[Claim]:
"""Extract accessibility information."""
claims = []
accessibility_patterns = [
(r"wheelchair", "notable_features.accessibility", "Wheelchair accessible"),
(r"ramps?.*?disabled access", "notable_features.accessibility", "Ramps for disabled access"),
(r"lift|elevator", "notable_features.accessibility", "Lift/elevator available"),
(r"wheelchairs? available", "notable_features.accessibility", "Wheelchairs available for use"),
]
for pattern, field_path, default_value in accessibility_patterns:
match = re.search(pattern, self.markdown, re.IGNORECASE)
if match:
excerpt = match.group(0)
ref = self._create_source_reference(excerpt)
if ref:
claims.append(Claim(
claim_id=self._next_claim_id(field_path),
claim_type="SERVICE",
field_path=field_path,
value=default_value,
value_type="STRING",
source_references=[ref],
confidence_score=0.85,
verified=False
))
return claims
def extract_historic_building_info(self) -> List[Claim]:
"""Extract historic building information."""
claims = []
# Architect pattern
architect_match = re.search(
r'(?:designed by|architect[:\s]+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)',
self.markdown
)
if architect_match:
ref = self._create_source_reference(architect_match.group(0))
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("historic_building.architect"),
claim_type="DESCRIPTIVE",
field_path="historic_building.architect",
value=architect_match.group(1),
value_type="STRING",
source_references=[ref],
confidence_score=0.85,
verified=False
))
# Monument status
if re.search(r'\bmonument\b', self.markdown, re.IGNORECASE):
match = re.search(r'monument', self.markdown, re.IGNORECASE)
if match:
# Get context
start = max(0, match.start() - 30)
end = min(len(self.markdown), match.end() + 30)
excerpt = self.markdown[start:end]
ref = self._create_source_reference(excerpt)
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("historic_building.status"),
claim_type="DESCRIPTIVE",
field_path="historic_building.status",
value="Monument",
value_type="STRING",
source_references=[ref],
confidence_score=0.8,
verified=False
))
return claims
def extract_services(self) -> List[Claim]:
"""Extract services offered by the library."""
claims = []
service_patterns = {
"Large book collection": r'large (?:collection of )?books?|book collection',
"DVDs and Blu-rays": r'DVDs?|Blu-?rays?',
"Comics collection": r'comics?|graphic novels?',
"Study spaces": r'study (?:spaces?|room)|reading (?:room|table)|leestafel',
"Free WiFi": r'free Wi-?Fi|gratis (?:wifi|internet)|Wifi',
"Coffee service": r'roasted coffee|koffie|coffee',
"DigiTaalhuis": r'DigiTaalhuis|digitaalhuis',
"Digicafé": r'digicaf[eé]',
"Taalcafé": r'taalcaf[eé]',
"Tax filing help": r'invulhulp|belastingaangifte',
"Digital skills training": r'klik\s*[&+]\s*tik|digitale vaardigheden',
"Internet access": r'internettoegang',
"Interlibrary loan": r'interbibliothecair|boeken afhalen',
}
services_found = []
refs_found = []
for service, pattern in service_patterns.items():
match = re.search(pattern, self.markdown, re.IGNORECASE)
if match:
services_found.append(service)
ref = self._create_source_reference(match.group(0))
if ref:
refs_found.append(ref)
if services_found and refs_found:
claims.append(Claim(
claim_id=self._next_claim_id("services"),
claim_type="SERVICE",
field_path="services",
value=services_found,
value_type="LIST_STRING",
source_references=refs_found[:3], # Limit to 3 references
confidence_score=0.8,
verified=False
))
return claims
def extract_contact_info(self) -> List[Claim]:
"""Extract contact information (phone, email, address)."""
claims = []
# Phone numbers (Dutch format)
phone_match = re.search(r'\b(0\d{2,3}[\s-]?\d{3}[\s-]?\d{3,4}|\+31\s?\d{2,3}[\s-]?\d{3}[\s-]?\d{3,4})\b', self.markdown)
if phone_match:
ref = self._create_source_reference(phone_match.group(0))
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("contact.phone"),
claim_type="CONTACT",
field_path="contact.phone",
value=phone_match.group(1),
value_type="STRING",
source_references=[ref],
confidence_score=0.9,
verified=False
))
# Email addresses
email_match = re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', self.markdown)
if email_match:
ref = self._create_source_reference(email_match.group(0))
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("contact.email"),
claim_type="CONTACT",
field_path="contact.email",
value=email_match.group(0),
value_type="STRING",
source_references=[ref],
confidence_score=0.9,
verified=False
))
# Dutch postal addresses
address_match = re.search(r'([A-Z][a-z]+(?:straat|weg|plein|laan)\s+\d+)\s*(?:,\s*)?\n?(\d{4}\s*[A-Z]{2})\s+([A-Z][a-z]+)', self.markdown)
if address_match:
full_address = f"{address_match.group(1)}, {address_match.group(2)} {address_match.group(3)}"
ref = self._create_source_reference(address_match.group(0))
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("contact.address"),
claim_type="GEOGRAPHIC",
field_path="contact.address",
value=full_address,
value_type="STRING",
source_references=[ref],
confidence_score=0.85,
verified=False
))
return claims
def extract_organizational_info(self) -> List[Claim]:
"""Extract organizational structure information."""
claims = []
# Legal entity names (Stichting = foundation)
stichting_match = re.search(r'Stichting\s+(?:de\s+)?([A-Z][a-zA-Z\s]+?)(?:\s+is|\s+in|\.|\,)', self.markdown)
if stichting_match:
ref = self._create_source_reference(stichting_match.group(0))
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("organization.legal_name"),
claim_type="ORGANIZATIONAL",
field_path="organization.legal_name",
value=f"Stichting {stichting_match.group(1).strip()}",
value_type="STRING",
source_references=[ref],
confidence_score=0.9,
verified=False
))
# Fusion/merger info
fusie_match = re.search(r'(?:fusie|ontstaan uit)\s+(?:tussen\s+)?(?:de\s+)?(.+?)\s+en\s+(.+?)(?:\s+en|\s+is|\.|,)', self.markdown, re.IGNORECASE)
if fusie_match:
ref = self._create_source_reference(fusie_match.group(0))
if ref:
claims.append(Claim(
claim_id=self._next_claim_id("organization.fusion_components"),
claim_type="ORGANIZATIONAL",
field_path="organization.fusion_components",
value=[fusie_match.group(1).strip(), fusie_match.group(2).strip()],
value_type="LIST_STRING",
source_references=[ref],
confidence_score=0.85,
verified=False
))
# Municipalities served
gemeenten_match = re.search(r'(?:werkzaam in|gemeenten?)\s+(.+?(?:,\s*.+?)*\s+en\s+[A-Z][a-z]+)', self.markdown, re.IGNORECASE)
if gemeenten_match:
municipalities_text = gemeenten_match.group(1)
# Parse comma-separated list with "en" for last item
municipalities = re.split(r',\s*|\s+en\s+', municipalities_text)
municipalities = [m.strip() for m in municipalities if m.strip()]
ref = self._create_source_reference(gemeenten_match.group(0))
if ref and municipalities:
claims.append(Claim(
claim_id=self._next_claim_id("organization.municipalities_served"),
claim_type="GEOGRAPHIC",
field_path="organization.municipalities_served",
value=municipalities,
value_type="LIST_STRING",
source_references=[ref],
confidence_score=0.85,
verified=False
))
return claims
def extract_opening_hours(self) -> List[Claim]:
"""Extract opening hours information."""
claims = []
# Look for Dutch day patterns with times
days_pattern = r'(maandag|dinsdag|woensdag|donderdag|vrijdag|zaterdag|zondag)[:\s]+(\d{1,2}[:.]\d{2})\s*[-]\s*(\d{1,2}[:.]\d{2})'
matches = list(re.finditer(days_pattern, self.markdown, re.IGNORECASE))
if matches:
hours = {}
refs = []
for match in matches:
day = match.group(1).lower()
hours[day] = f"{match.group(2)} - {match.group(3)}"
ref = self._create_source_reference(match.group(0))
if ref:
refs.append(ref)
if hours and refs:
claims.append(Claim(
claim_id=self._next_claim_id("opening_hours"),
claim_type="TEMPORAL",
field_path="opening_hours",
value=hours,
value_type="OBJECT",
source_references=refs[:3],
confidence_score=0.9,
verified=False
))
return claims
def extract_from_exa_highlights(self) -> List[Claim]:
"""Extract claims specifically from Exa highlights."""
claims = []
for i, highlight in enumerate(self.source.exa_highlights):
score = self.source.exa_highlight_scores[i] if i < len(self.source.exa_highlight_scores) else 0.0
# Find position of highlight in markdown
ref = self._create_source_reference(highlight, exa_highlight_index=i, relevance_score=score)
if ref and score >= 0.7: # Only high-relevance highlights
# Create a descriptive claim from the highlight
claims.append(Claim(
claim_id=self._next_claim_id("exa_highlight"),
claim_type="DESCRIPTIVE",
field_path=f"highlights[{i}]",
value=highlight,
value_type="STRING",
source_references=[ref],
confidence_score=score,
verified=False,
claim_notes=f"Exa highlight with relevance score {score:.2f}"
))
return claims
def extract_all_claims(self) -> List[Claim]:
"""Extract all claims from the source."""
all_claims = []
# Description (always try)
desc_claim = self.extract_description()
if desc_claim:
all_claims.append(desc_claim)
# Specific feature extractions
all_claims.extend(self.extract_green_library_features())
all_claims.extend(self.extract_sustainability_features())
all_claims.extend(self.extract_tree_species())
all_claims.extend(self.extract_accessibility_features())
all_claims.extend(self.extract_historic_building_info())
all_claims.extend(self.extract_services())
# New extractors
all_claims.extend(self.extract_contact_info())
all_claims.extend(self.extract_organizational_info())
all_claims.extend(self.extract_opening_hours())
all_claims.extend(self.extract_from_exa_highlights())
return all_claims
# =============================================================================
# EXA INTEGRATION (placeholder for MCP-based calls)
# =============================================================================
def process_exa_result(result: Dict[str, Any], fetch_timestamp: str) -> WebSource:
"""Convert Exa search result to WebSource with full content."""
url = result.get('url', '')
source_id = generate_source_id(url)
raw_markdown = result.get('text', '')
return WebSource(
source_id=source_id,
url=url,
fetch_timestamp=fetch_timestamp,
http_status=200, # Exa doesn't return this
content_type="text/markdown",
title=result.get('title'),
author=result.get('author'),
published_date=result.get('publishedDate'),
raw_markdown=raw_markdown,
raw_markdown_hash=compute_content_hash(raw_markdown) if raw_markdown else None,
exa_highlights=result.get('highlights', []),
exa_highlight_scores=result.get('highlightScores', [])
)
def create_web_enrichment_from_exa_results(
results: List[Dict[str, Any]],
search_query: str
) -> WebEnrichment:
"""Create WebEnrichment from Exa search results with claim-level provenance."""
enrichment_id = generate_enrichment_id()
fetch_timestamp = datetime.now(timezone.utc).isoformat()
# Process sources
sources = [process_exa_result(r, fetch_timestamp) for r in results]
# Extract claims from each source
all_claims = []
for source in sources:
extractor = ClaimExtractor(source)
claims = extractor.extract_all_claims()
all_claims.extend(claims)
# Determine status
if all_claims:
status = "SUCCESS"
elif sources:
status = "PARTIAL"
else:
status = "NO_RESULTS"
return WebEnrichment(
enrichment_id=enrichment_id,
search_query=search_query,
search_timestamp=fetch_timestamp,
search_engine="exa",
claims=all_claims,
raw_sources=sources,
enrichment_status=status,
enrichment_notes=f"Extracted {len(all_claims)} claims from {len(sources)} sources"
)
# =============================================================================
# YAML CONVERSION
# =============================================================================
def source_reference_to_dict(ref: SourceReference) -> Dict[str, Any]:
"""Convert SourceReference to dict for YAML."""
d = {
'source_id': ref.source_id,
'text_excerpt': ref.text_excerpt,
'char_start': ref.char_start,
'char_end': ref.char_end,
}
if ref.markdown_heading_path:
d['markdown_heading_path'] = ref.markdown_heading_path
if ref.sentence_index is not None:
d['sentence_index'] = ref.sentence_index
if ref.exa_highlight_index is not None:
d['exa_highlight_index'] = ref.exa_highlight_index
if ref.relevance_score is not None:
d['relevance_score'] = ref.relevance_score
return d
def claim_to_dict(claim: Claim) -> Dict[str, Any]:
"""Convert Claim to dict for YAML."""
d = {
'claim_id': claim.claim_id,
'claim_type': claim.claim_type,
'field_path': claim.field_path,
'value': claim.value,
'value_type': claim.value_type,
'source_references': [source_reference_to_dict(r) for r in claim.source_references],
'confidence_score': claim.confidence_score,
'verified': claim.verified,
}
if claim.verified_by:
d['verified_by'] = claim.verified_by
if claim.verified_date:
d['verified_date'] = claim.verified_date
if claim.claim_notes:
d['claim_notes'] = claim.claim_notes
return d
def web_source_to_dict(source: WebSource, include_raw: bool = False) -> Dict[str, Any]:
"""Convert WebSource to dict for YAML."""
d = {
'source_id': source.source_id,
'url': source.url,
'fetch_timestamp': source.fetch_timestamp,
}
if source.http_status:
d['http_status'] = source.http_status
if source.title:
d['title'] = source.title
if source.author:
d['author'] = source.author
if source.published_date:
d['published_date'] = source.published_date
if source.raw_markdown_hash:
d['raw_markdown_hash'] = source.raw_markdown_hash
if source.exa_highlights:
d['exa_highlights'] = source.exa_highlights
if source.exa_highlight_scores:
d['exa_highlight_scores'] = source.exa_highlight_scores
# Optionally include full raw content (can be large)
if include_raw and source.raw_markdown:
d['raw_markdown'] = source.raw_markdown
return d
def web_enrichment_to_dict(enrichment: WebEnrichment, include_raw: bool = False) -> Dict[str, Any]:
"""Convert WebEnrichment to dict for YAML storage."""
return {
'enrichment_id': enrichment.enrichment_id,
'search_query': enrichment.search_query,
'search_timestamp': enrichment.search_timestamp,
'search_engine': enrichment.search_engine,
'enrichment_status': enrichment.enrichment_status,
'enrichment_notes': enrichment.enrichment_notes,
'claims': [claim_to_dict(c) for c in enrichment.claims],
'raw_sources': [web_source_to_dict(s, include_raw) for s in enrichment.raw_sources],
}
# =============================================================================
# FILE OPERATIONS
# =============================================================================
def load_kb_library_file(filepath: Path) -> Dict[str, Any]:
"""Load a single KB library YAML file."""
with open(filepath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def save_kb_library_file(filepath: Path, data: Dict[str, Any]):
"""Save KB library data to YAML file."""
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
def get_library_website(entry: Dict[str, Any]) -> Optional[str]:
"""Extract website URL from entry."""
# Try Google Maps enrichment first
google_enrichment = entry.get('google_maps_enrichment', {})
if google_enrichment.get('website'):
return google_enrichment['website']
# Try Wikidata enrichment
wikidata_enrichment = entry.get('wikidata_enrichment', {})
wikidata_ids = wikidata_enrichment.get('wikidata_identifiers', {})
if wikidata_ids.get('Website'):
return wikidata_ids['Website']
return None
def get_library_name(entry: Dict[str, Any]) -> str:
"""Extract library name from entry."""
original = entry.get('original_entry', {})
return original.get('organisatie', 'Unknown Library')
def get_library_city(entry: Dict[str, Any]) -> str:
"""Extract library city from entry."""
original = entry.get('original_entry', {})
return original.get('plaatsnaam_bezoekadres', '')
# =============================================================================
# MAIN PROCESSING
# =============================================================================
def process_single_file(
filepath: Path,
exa_results: List[Dict[str, Any]],
search_query: str,
dry_run: bool = False
) -> bool:
"""
Process a single file with Exa results and add claim-level provenance.
Args:
filepath: Path to YAML file
exa_results: Results from Exa search
search_query: The search query used
dry_run: If True, don't write changes
Returns:
True if successful, False otherwise
"""
try:
# Load existing data
data = load_kb_library_file(filepath)
library_name = get_library_name(data)
logger.info(f"Processing: {library_name}")
# Create web enrichment with provenance
enrichment = create_web_enrichment_from_exa_results(exa_results, search_query)
logger.info(f" - Extracted {len(enrichment.claims)} claims from {len(enrichment.raw_sources)} sources")
# Remove old enrichment formats
if 'exa_enrichment' in data:
del data['exa_enrichment']
if 'website_enrichment' in data:
del data['website_enrichment']
# Add new provenance-tracked enrichment
data['web_enrichment'] = web_enrichment_to_dict(enrichment, include_raw=False)
if not dry_run:
save_kb_library_file(filepath, data)
logger.info(f" - Saved to {filepath.name}")
else:
logger.info(f" - [DRY RUN] Would save to {filepath.name}")
return True
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
return False
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description='Enrich KB libraries with claim-level provenance tracking'
)
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without making changes')
parser.add_argument('--limit', type=int, default=None,
help='Limit number of entries to process')
parser.add_argument('--file', type=str, default=None,
help='Process a specific file (e.g., 1377_kb_isil.yaml)')
args = parser.parse_args()
logger.info("=" * 70)
logger.info("KB Netherlands Libraries - Claim-Level Provenance Enrichment")
logger.info("=" * 70)
logger.info(f"Schema: schemas/web_enrichment_provenance.yaml")
logger.info("")
# Find files to process
if args.file:
files = [ENTRIES_DIR / args.file]
if not files[0].exists():
logger.error(f"File not found: {files[0]}")
return 1
else:
files = sorted(ENTRIES_DIR.glob("*_kb_isil.yaml"))
logger.info(f"Found {len(files)} KB library files")
if args.limit:
files = files[:args.limit]
logger.info(f"Limited to {len(files)} files")
# Print instructions for MCP-based enrichment
logger.info("")
logger.info("=" * 70)
logger.info("This script processes Exa results into claim-level provenance.")
logger.info("To use: Pass Exa search results as JSON to stdin, or integrate with MCP.")
logger.info("=" * 70)
logger.info("")
# Example: show what would be processed
for f in files[:5]:
data = load_kb_library_file(f)
name = get_library_name(data)
city = get_library_city(data)
website = get_library_website(data)
has_old = 'exa_enrichment' in data or 'website_enrichment' in data
has_new = 'web_enrichment' in data
status = "NEW" if has_new else ("LEGACY" if has_old else "NONE")
logger.info(f" {f.name}: {name} ({city})")
logger.info(f" Website: {website or 'Not found'}")
logger.info(f" Status: {status}")
if len(files) > 5:
logger.info(f" ... and {len(files) - 5} more files")
return 0
if __name__ == "__main__":
sys.exit(main())