#!/usr/bin/env python3 """ Enrich KB Netherlands library entries with claim-level provenance tracking. This script implements proper data provenance following the web_enrichment_provenance.yaml schema: - Each factual claim has precise source references - Character offsets in source markdown - Markdown heading paths for structural context - SHA-256 hashes for content verification - Exa highlight indices when available Usage: python scripts/enrich_kb_libraries_exa_provenance.py [--dry-run] [--limit N] [--file FILENAME] Schema: schemas/web_enrichment_provenance.yaml """ import os import sys import json import yaml import time import hashlib import re import uuid from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, field, asdict import logging import argparse # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Paths ENTRIES_DIR = Path("/Users/kempersc/apps/glam/data/nde/enriched/entries") REPORTS_DIR = Path("/Users/kempersc/apps/glam/reports") # Rate limiting REQUEST_DELAY = 1.5 # seconds between Exa requests # ============================================================================= # DATA CLASSES (matching web_enrichment_provenance.yaml schema) # ============================================================================= @dataclass class SourceReference: """Precise reference to source text supporting a claim.""" source_id: str text_excerpt: str char_start: int char_end: int markdown_heading_path: Optional[str] = None sentence_index: Optional[int] = None exa_highlight_index: Optional[int] = None relevance_score: Optional[float] = None @dataclass class Claim: """A single factual assertion extracted from web sources.""" claim_id: str claim_type: str # ClaimTypeEnum value field_path: str value: Any value_type: str # ValueTypeEnum value source_references: List[SourceReference] confidence_score: float verified: bool = False verified_by: Optional[str] = None verified_date: Optional[str] = None claim_notes: Optional[str] = None @dataclass class WebSource: """A web page fetched and used as source for claims.""" source_id: str url: str fetch_timestamp: str http_status: Optional[int] = None content_type: Optional[str] = None title: Optional[str] = None author: Optional[str] = None published_date: Optional[str] = None raw_markdown: Optional[str] = None raw_markdown_hash: Optional[str] = None exa_highlights: List[str] = field(default_factory=list) exa_highlight_scores: List[float] = field(default_factory=list) @dataclass class WebEnrichment: """Container for all web-enriched data with full provenance tracking.""" enrichment_id: str search_query: str search_timestamp: str search_engine: str claims: List[Claim] raw_sources: List[WebSource] enrichment_status: str # EnrichmentStatusEnum value enrichment_notes: Optional[str] = None # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= def generate_enrichment_id() -> str: """Generate unique enrichment ID: enrich-YYYYMMDDTHHMMSS-xxxxxxxx""" timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S') suffix = uuid.uuid4().hex[:8] return f"enrich-{timestamp}-{suffix}" def generate_source_id(url: str) -> str: """Generate source ID from URL hash.""" return f"src-{hashlib.sha256(url.encode()).hexdigest()[:12]}" def generate_claim_id(field_path: str, index: int = 1) -> str: """Generate claim ID from field path.""" safe_path = re.sub(r'[^a-z0-9_]', '_', field_path.lower()) return f"claim-{safe_path}-{index}" def compute_content_hash(content: str) -> str: """Compute SHA-256 hash of content.""" return hashlib.sha256(content.encode('utf-8')).hexdigest() def find_text_position(haystack: str, needle: str) -> Tuple[int, int]: """Find character positions of needle in haystack.""" start = haystack.find(needle) if start == -1: # Try case-insensitive start = haystack.lower().find(needle.lower()) if start == -1: return (-1, -1) end = start + len(needle) return (start, end) def extract_markdown_heading_path(markdown: str, char_position: int) -> Optional[str]: """ Extract the markdown heading path for a given character position. Returns format: "# H1 > ## H2 > ### H3" """ if char_position < 0 or char_position >= len(markdown): return None # Find all headings before this position heading_pattern = r'^(#{1,6})\s+(.+)$' headings = [] current_pos = 0 for line in markdown.split('\n'): line_end = current_pos + len(line) if current_pos > char_position: break match = re.match(heading_pattern, line) if match: level = len(match.group(1)) title = match.group(2).strip() # Remove headings of same or lower level (they're siblings or parents being replaced) headings = [(l, t) for l, t in headings if l < level] headings.append((level, title)) current_pos = line_end + 1 # +1 for newline if not headings: return None return " > ".join([f"{'#' * level} {title}" for level, title in headings]) def get_sentence_index(text: str, char_position: int) -> Optional[int]: """Get the sentence index for a character position.""" if char_position < 0: return None # Simple sentence splitting sentences = re.split(r'(?<=[.!?])\s+', text[:char_position + 100]) current_pos = 0 for i, sentence in enumerate(sentences): sentence_end = current_pos + len(sentence) if char_position <= sentence_end: return i current_pos = sentence_end + 1 return len(sentences) - 1 # ============================================================================= # CLAIM EXTRACTION WITH PROVENANCE # ============================================================================= class ClaimExtractor: """Extract claims from text with source references.""" def __init__(self, source: WebSource): self.source = source self.markdown = source.raw_markdown or "" self.claims: List[Claim] = [] self.claim_counter: Dict[str, int] = {} def _next_claim_id(self, field_path: str) -> str: """Get next claim ID for a field path.""" if field_path not in self.claim_counter: self.claim_counter[field_path] = 0 self.claim_counter[field_path] += 1 return generate_claim_id(field_path, self.claim_counter[field_path]) def _create_source_reference( self, excerpt: str, exa_highlight_index: Optional[int] = None, relevance_score: Optional[float] = None ) -> Optional[SourceReference]: """Create a source reference with character offsets.""" char_start, char_end = find_text_position(self.markdown, excerpt) if char_start == -1: # Text not found - use approximate reference return SourceReference( source_id=self.source.source_id, text_excerpt=excerpt[:200] + "..." if len(excerpt) > 200 else excerpt, char_start=-1, char_end=-1, markdown_heading_path=None, sentence_index=None, exa_highlight_index=exa_highlight_index, relevance_score=relevance_score ) heading_path = extract_markdown_heading_path(self.markdown, char_start) sentence_idx = get_sentence_index(self.markdown, char_start) return SourceReference( source_id=self.source.source_id, text_excerpt=excerpt[:500] if len(excerpt) > 500 else excerpt, char_start=char_start, char_end=char_end, markdown_heading_path=heading_path, sentence_index=sentence_idx, exa_highlight_index=exa_highlight_index, relevance_score=relevance_score ) def extract_description(self) -> Optional[Claim]: """Extract description claim from first meaningful paragraph.""" # Look for first substantial paragraph (>100 chars, not navigation) paragraphs = re.split(r'\n\n+', self.markdown) for para in paragraphs: # Skip short paragraphs, navigation, lists if len(para) < 100: continue if para.strip().startswith(('- ', '* ', '|', '#')): continue if 'skip to' in para.lower() or 'jump to' in para.lower(): continue # Found a good paragraph clean_para = re.sub(r'\s+', ' ', para).strip() if len(clean_para) > 50: excerpt = clean_para[:500] ref = self._create_source_reference(excerpt) if ref: return Claim( claim_id=self._next_claim_id("description"), claim_type="DESCRIPTIVE", field_path="description", value=excerpt + "..." if len(clean_para) > 500 else clean_para, value_type="STRING", source_references=[ref], confidence_score=0.8, verified=False ) return None def extract_green_library_features(self) -> List[Claim]: """Extract claims about 'green library' features (specific to Schiedam case).""" claims = [] text_lower = self.markdown.lower() # Check for "green library" mention if "'green' library" in text_lower or "green library" in text_lower: # Find the specific text patterns = [ (r"first 'green' library in the Netherlands", "notable_features.green_library.distinction"), (r"first green library", "notable_features.green_library.distinction"), (r"trees? (?:that )?(?:are |weigh(?:ing)? )?(\d+)[- ]to[- ](\d+)\s*(?:kilos?|kg)", "notable_features.green_library.tree_weights"), (r"(\d+)[- ](?:to|-)[- ](\d+)\s*met(?:er|re)s?\s*high", "notable_features.green_library.tree_heights"), (r"large trees?\s+(\d+)[- ](?:to|-)[- ](\d+)\s*met", "notable_features.green_library.tree_heights"), ] for pattern, field_path in patterns: match = re.search(pattern, self.markdown, re.IGNORECASE) if match: excerpt = match.group(0) ref = self._create_source_reference(excerpt) if ref: # Determine value based on field if "weights" in field_path: value = f"{match.group(1)}-{match.group(2)} kg" elif "heights" in field_path: value = f"{match.group(1)}-{match.group(2)} meters" else: value = excerpt claims.append(Claim( claim_id=self._next_claim_id(field_path), claim_type="ARCHITECTURAL", field_path=field_path, value=value, value_type="STRING", source_references=[ref], confidence_score=0.9, verified=False )) return claims def extract_sustainability_features(self) -> List[Claim]: """Extract sustainability/green design features.""" claims = [] sustainability_patterns = [ (r"recycled bookcases?.*?cardboard", "notable_features.sustainability", "Recycled bookcases made from industrial cardboard"), (r"LED lighting", "notable_features.sustainability", "LED lighting"), (r"climate control.*?planters?", "notable_features.sustainability", "Climate control system in planters"), (r"chairs?.*?PET bottles?", "notable_features.sustainability", "Chairs made from recycled PET bottles"), ] for pattern, field_path, default_value in sustainability_patterns: match = re.search(pattern, self.markdown, re.IGNORECASE) if match: excerpt = match.group(0) ref = self._create_source_reference(excerpt) if ref: claims.append(Claim( claim_id=self._next_claim_id(field_path), claim_type="ARCHITECTURAL", field_path=field_path, value=default_value, value_type="STRING", source_references=[ref], confidence_score=0.85, verified=False )) return claims def extract_tree_species(self) -> List[Claim]: """Extract tree species mentions.""" claims = [] # Pattern for tree species (Bucida, Tamarinde, Ficus, etc.) species_pattern = r'\b(Bucida|Tamarinde?|Ficus|Ficus benjamina)\b' matches = list(re.finditer(species_pattern, self.markdown, re.IGNORECASE)) if matches: species_list = list(set(m.group(1).title() for m in matches)) # Use first match for source reference first_match = matches[0] # Get surrounding context start = max(0, first_match.start() - 20) end = min(len(self.markdown), first_match.end() + 20) excerpt = self.markdown[start:end] ref = self._create_source_reference(excerpt) if ref: claims.append(Claim( claim_id=self._next_claim_id("notable_features.green_library.tree_species"), claim_type="DESCRIPTIVE", field_path="notable_features.green_library.tree_species", value=species_list, value_type="LIST_STRING", source_references=[ref], confidence_score=0.95, verified=False )) return claims def extract_accessibility_features(self) -> List[Claim]: """Extract accessibility information.""" claims = [] accessibility_patterns = [ (r"wheelchair", "notable_features.accessibility", "Wheelchair accessible"), (r"ramps?.*?disabled access", "notable_features.accessibility", "Ramps for disabled access"), (r"lift|elevator", "notable_features.accessibility", "Lift/elevator available"), (r"wheelchairs? available", "notable_features.accessibility", "Wheelchairs available for use"), ] for pattern, field_path, default_value in accessibility_patterns: match = re.search(pattern, self.markdown, re.IGNORECASE) if match: excerpt = match.group(0) ref = self._create_source_reference(excerpt) if ref: claims.append(Claim( claim_id=self._next_claim_id(field_path), claim_type="SERVICE", field_path=field_path, value=default_value, value_type="STRING", source_references=[ref], confidence_score=0.85, verified=False )) return claims def extract_historic_building_info(self) -> List[Claim]: """Extract historic building information.""" claims = [] # Architect pattern architect_match = re.search( r'(?:designed by|architect[:\s]+)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)', self.markdown ) if architect_match: ref = self._create_source_reference(architect_match.group(0)) if ref: claims.append(Claim( claim_id=self._next_claim_id("historic_building.architect"), claim_type="DESCRIPTIVE", field_path="historic_building.architect", value=architect_match.group(1), value_type="STRING", source_references=[ref], confidence_score=0.85, verified=False )) # Monument status if re.search(r'\bmonument\b', self.markdown, re.IGNORECASE): match = re.search(r'monument', self.markdown, re.IGNORECASE) if match: # Get context start = max(0, match.start() - 30) end = min(len(self.markdown), match.end() + 30) excerpt = self.markdown[start:end] ref = self._create_source_reference(excerpt) if ref: claims.append(Claim( claim_id=self._next_claim_id("historic_building.status"), claim_type="DESCRIPTIVE", field_path="historic_building.status", value="Monument", value_type="STRING", source_references=[ref], confidence_score=0.8, verified=False )) return claims def extract_services(self) -> List[Claim]: """Extract services offered by the library.""" claims = [] service_patterns = { "Large book collection": r'large (?:collection of )?books?|book collection', "DVDs and Blu-rays": r'DVDs?|Blu-?rays?', "Comics collection": r'comics?|graphic novels?', "Study spaces": r'study (?:spaces?|room)|reading (?:room|table)|leestafel', "Free WiFi": r'free Wi-?Fi|gratis (?:wifi|internet)|Wifi', "Coffee service": r'roasted coffee|koffie|coffee', "DigiTaalhuis": r'DigiTaalhuis|digitaalhuis', "Digicafé": r'digicaf[eé]', "Taalcafé": r'taalcaf[eé]', "Tax filing help": r'invulhulp|belastingaangifte', "Digital skills training": r'klik\s*[&+]\s*tik|digitale vaardigheden', "Internet access": r'internettoegang', "Interlibrary loan": r'interbibliothecair|boeken afhalen', } services_found = [] refs_found = [] for service, pattern in service_patterns.items(): match = re.search(pattern, self.markdown, re.IGNORECASE) if match: services_found.append(service) ref = self._create_source_reference(match.group(0)) if ref: refs_found.append(ref) if services_found and refs_found: claims.append(Claim( claim_id=self._next_claim_id("services"), claim_type="SERVICE", field_path="services", value=services_found, value_type="LIST_STRING", source_references=refs_found[:3], # Limit to 3 references confidence_score=0.8, verified=False )) return claims def extract_contact_info(self) -> List[Claim]: """Extract contact information (phone, email, address).""" claims = [] # Phone numbers (Dutch format) phone_match = re.search(r'\b(0\d{2,3}[\s-]?\d{3}[\s-]?\d{3,4}|\+31\s?\d{2,3}[\s-]?\d{3}[\s-]?\d{3,4})\b', self.markdown) if phone_match: ref = self._create_source_reference(phone_match.group(0)) if ref: claims.append(Claim( claim_id=self._next_claim_id("contact.phone"), claim_type="CONTACT", field_path="contact.phone", value=phone_match.group(1), value_type="STRING", source_references=[ref], confidence_score=0.9, verified=False )) # Email addresses email_match = re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', self.markdown) if email_match: ref = self._create_source_reference(email_match.group(0)) if ref: claims.append(Claim( claim_id=self._next_claim_id("contact.email"), claim_type="CONTACT", field_path="contact.email", value=email_match.group(0), value_type="STRING", source_references=[ref], confidence_score=0.9, verified=False )) # Dutch postal addresses address_match = re.search(r'([A-Z][a-z]+(?:straat|weg|plein|laan)\s+\d+)\s*(?:,\s*)?\n?(\d{4}\s*[A-Z]{2})\s+([A-Z][a-z]+)', self.markdown) if address_match: full_address = f"{address_match.group(1)}, {address_match.group(2)} {address_match.group(3)}" ref = self._create_source_reference(address_match.group(0)) if ref: claims.append(Claim( claim_id=self._next_claim_id("contact.address"), claim_type="GEOGRAPHIC", field_path="contact.address", value=full_address, value_type="STRING", source_references=[ref], confidence_score=0.85, verified=False )) return claims def extract_organizational_info(self) -> List[Claim]: """Extract organizational structure information.""" claims = [] # Legal entity names (Stichting = foundation) stichting_match = re.search(r'Stichting\s+(?:de\s+)?([A-Z][a-zA-Z\s]+?)(?:\s+is|\s+in|\.|\,)', self.markdown) if stichting_match: ref = self._create_source_reference(stichting_match.group(0)) if ref: claims.append(Claim( claim_id=self._next_claim_id("organization.legal_name"), claim_type="ORGANIZATIONAL", field_path="organization.legal_name", value=f"Stichting {stichting_match.group(1).strip()}", value_type="STRING", source_references=[ref], confidence_score=0.9, verified=False )) # Fusion/merger info fusie_match = re.search(r'(?:fusie|ontstaan uit)\s+(?:tussen\s+)?(?:de\s+)?(.+?)\s+en\s+(.+?)(?:\s+en|\s+is|\.|,)', self.markdown, re.IGNORECASE) if fusie_match: ref = self._create_source_reference(fusie_match.group(0)) if ref: claims.append(Claim( claim_id=self._next_claim_id("organization.fusion_components"), claim_type="ORGANIZATIONAL", field_path="organization.fusion_components", value=[fusie_match.group(1).strip(), fusie_match.group(2).strip()], value_type="LIST_STRING", source_references=[ref], confidence_score=0.85, verified=False )) # Municipalities served gemeenten_match = re.search(r'(?:werkzaam in|gemeenten?)\s+(.+?(?:,\s*.+?)*\s+en\s+[A-Z][a-z]+)', self.markdown, re.IGNORECASE) if gemeenten_match: municipalities_text = gemeenten_match.group(1) # Parse comma-separated list with "en" for last item municipalities = re.split(r',\s*|\s+en\s+', municipalities_text) municipalities = [m.strip() for m in municipalities if m.strip()] ref = self._create_source_reference(gemeenten_match.group(0)) if ref and municipalities: claims.append(Claim( claim_id=self._next_claim_id("organization.municipalities_served"), claim_type="GEOGRAPHIC", field_path="organization.municipalities_served", value=municipalities, value_type="LIST_STRING", source_references=[ref], confidence_score=0.85, verified=False )) return claims def extract_opening_hours(self) -> List[Claim]: """Extract opening hours information.""" claims = [] # Look for Dutch day patterns with times days_pattern = r'(maandag|dinsdag|woensdag|donderdag|vrijdag|zaterdag|zondag)[:\s]+(\d{1,2}[:.]\d{2})\s*[-–]\s*(\d{1,2}[:.]\d{2})' matches = list(re.finditer(days_pattern, self.markdown, re.IGNORECASE)) if matches: hours = {} refs = [] for match in matches: day = match.group(1).lower() hours[day] = f"{match.group(2)} - {match.group(3)}" ref = self._create_source_reference(match.group(0)) if ref: refs.append(ref) if hours and refs: claims.append(Claim( claim_id=self._next_claim_id("opening_hours"), claim_type="TEMPORAL", field_path="opening_hours", value=hours, value_type="OBJECT", source_references=refs[:3], confidence_score=0.9, verified=False )) return claims def extract_from_exa_highlights(self) -> List[Claim]: """Extract claims specifically from Exa highlights.""" claims = [] for i, highlight in enumerate(self.source.exa_highlights): score = self.source.exa_highlight_scores[i] if i < len(self.source.exa_highlight_scores) else 0.0 # Find position of highlight in markdown ref = self._create_source_reference(highlight, exa_highlight_index=i, relevance_score=score) if ref and score >= 0.7: # Only high-relevance highlights # Create a descriptive claim from the highlight claims.append(Claim( claim_id=self._next_claim_id("exa_highlight"), claim_type="DESCRIPTIVE", field_path=f"highlights[{i}]", value=highlight, value_type="STRING", source_references=[ref], confidence_score=score, verified=False, claim_notes=f"Exa highlight with relevance score {score:.2f}" )) return claims def extract_all_claims(self) -> List[Claim]: """Extract all claims from the source.""" all_claims = [] # Description (always try) desc_claim = self.extract_description() if desc_claim: all_claims.append(desc_claim) # Specific feature extractions all_claims.extend(self.extract_green_library_features()) all_claims.extend(self.extract_sustainability_features()) all_claims.extend(self.extract_tree_species()) all_claims.extend(self.extract_accessibility_features()) all_claims.extend(self.extract_historic_building_info()) all_claims.extend(self.extract_services()) # New extractors all_claims.extend(self.extract_contact_info()) all_claims.extend(self.extract_organizational_info()) all_claims.extend(self.extract_opening_hours()) all_claims.extend(self.extract_from_exa_highlights()) return all_claims # ============================================================================= # EXA INTEGRATION (placeholder for MCP-based calls) # ============================================================================= def process_exa_result(result: Dict[str, Any], fetch_timestamp: str) -> WebSource: """Convert Exa search result to WebSource with full content.""" url = result.get('url', '') source_id = generate_source_id(url) raw_markdown = result.get('text', '') return WebSource( source_id=source_id, url=url, fetch_timestamp=fetch_timestamp, http_status=200, # Exa doesn't return this content_type="text/markdown", title=result.get('title'), author=result.get('author'), published_date=result.get('publishedDate'), raw_markdown=raw_markdown, raw_markdown_hash=compute_content_hash(raw_markdown) if raw_markdown else None, exa_highlights=result.get('highlights', []), exa_highlight_scores=result.get('highlightScores', []) ) def create_web_enrichment_from_exa_results( results: List[Dict[str, Any]], search_query: str ) -> WebEnrichment: """Create WebEnrichment from Exa search results with claim-level provenance.""" enrichment_id = generate_enrichment_id() fetch_timestamp = datetime.now(timezone.utc).isoformat() # Process sources sources = [process_exa_result(r, fetch_timestamp) for r in results] # Extract claims from each source all_claims = [] for source in sources: extractor = ClaimExtractor(source) claims = extractor.extract_all_claims() all_claims.extend(claims) # Determine status if all_claims: status = "SUCCESS" elif sources: status = "PARTIAL" else: status = "NO_RESULTS" return WebEnrichment( enrichment_id=enrichment_id, search_query=search_query, search_timestamp=fetch_timestamp, search_engine="exa", claims=all_claims, raw_sources=sources, enrichment_status=status, enrichment_notes=f"Extracted {len(all_claims)} claims from {len(sources)} sources" ) # ============================================================================= # YAML CONVERSION # ============================================================================= def source_reference_to_dict(ref: SourceReference) -> Dict[str, Any]: """Convert SourceReference to dict for YAML.""" d = { 'source_id': ref.source_id, 'text_excerpt': ref.text_excerpt, 'char_start': ref.char_start, 'char_end': ref.char_end, } if ref.markdown_heading_path: d['markdown_heading_path'] = ref.markdown_heading_path if ref.sentence_index is not None: d['sentence_index'] = ref.sentence_index if ref.exa_highlight_index is not None: d['exa_highlight_index'] = ref.exa_highlight_index if ref.relevance_score is not None: d['relevance_score'] = ref.relevance_score return d def claim_to_dict(claim: Claim) -> Dict[str, Any]: """Convert Claim to dict for YAML.""" d = { 'claim_id': claim.claim_id, 'claim_type': claim.claim_type, 'field_path': claim.field_path, 'value': claim.value, 'value_type': claim.value_type, 'source_references': [source_reference_to_dict(r) for r in claim.source_references], 'confidence_score': claim.confidence_score, 'verified': claim.verified, } if claim.verified_by: d['verified_by'] = claim.verified_by if claim.verified_date: d['verified_date'] = claim.verified_date if claim.claim_notes: d['claim_notes'] = claim.claim_notes return d def web_source_to_dict(source: WebSource, include_raw: bool = False) -> Dict[str, Any]: """Convert WebSource to dict for YAML.""" d = { 'source_id': source.source_id, 'url': source.url, 'fetch_timestamp': source.fetch_timestamp, } if source.http_status: d['http_status'] = source.http_status if source.title: d['title'] = source.title if source.author: d['author'] = source.author if source.published_date: d['published_date'] = source.published_date if source.raw_markdown_hash: d['raw_markdown_hash'] = source.raw_markdown_hash if source.exa_highlights: d['exa_highlights'] = source.exa_highlights if source.exa_highlight_scores: d['exa_highlight_scores'] = source.exa_highlight_scores # Optionally include full raw content (can be large) if include_raw and source.raw_markdown: d['raw_markdown'] = source.raw_markdown return d def web_enrichment_to_dict(enrichment: WebEnrichment, include_raw: bool = False) -> Dict[str, Any]: """Convert WebEnrichment to dict for YAML storage.""" return { 'enrichment_id': enrichment.enrichment_id, 'search_query': enrichment.search_query, 'search_timestamp': enrichment.search_timestamp, 'search_engine': enrichment.search_engine, 'enrichment_status': enrichment.enrichment_status, 'enrichment_notes': enrichment.enrichment_notes, 'claims': [claim_to_dict(c) for c in enrichment.claims], 'raw_sources': [web_source_to_dict(s, include_raw) for s in enrichment.raw_sources], } # ============================================================================= # FILE OPERATIONS # ============================================================================= def load_kb_library_file(filepath: Path) -> Dict[str, Any]: """Load a single KB library YAML file.""" with open(filepath, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def save_kb_library_file(filepath: Path, data: Dict[str, Any]): """Save KB library data to YAML file.""" with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) def get_library_website(entry: Dict[str, Any]) -> Optional[str]: """Extract website URL from entry.""" # Try Google Maps enrichment first google_enrichment = entry.get('google_maps_enrichment', {}) if google_enrichment.get('website'): return google_enrichment['website'] # Try Wikidata enrichment wikidata_enrichment = entry.get('wikidata_enrichment', {}) wikidata_ids = wikidata_enrichment.get('wikidata_identifiers', {}) if wikidata_ids.get('Website'): return wikidata_ids['Website'] return None def get_library_name(entry: Dict[str, Any]) -> str: """Extract library name from entry.""" original = entry.get('original_entry', {}) return original.get('organisatie', 'Unknown Library') def get_library_city(entry: Dict[str, Any]) -> str: """Extract library city from entry.""" original = entry.get('original_entry', {}) return original.get('plaatsnaam_bezoekadres', '') # ============================================================================= # MAIN PROCESSING # ============================================================================= def process_single_file( filepath: Path, exa_results: List[Dict[str, Any]], search_query: str, dry_run: bool = False ) -> bool: """ Process a single file with Exa results and add claim-level provenance. Args: filepath: Path to YAML file exa_results: Results from Exa search search_query: The search query used dry_run: If True, don't write changes Returns: True if successful, False otherwise """ try: # Load existing data data = load_kb_library_file(filepath) library_name = get_library_name(data) logger.info(f"Processing: {library_name}") # Create web enrichment with provenance enrichment = create_web_enrichment_from_exa_results(exa_results, search_query) logger.info(f" - Extracted {len(enrichment.claims)} claims from {len(enrichment.raw_sources)} sources") # Remove old enrichment formats if 'exa_enrichment' in data: del data['exa_enrichment'] if 'website_enrichment' in data: del data['website_enrichment'] # Add new provenance-tracked enrichment data['web_enrichment'] = web_enrichment_to_dict(enrichment, include_raw=False) if not dry_run: save_kb_library_file(filepath, data) logger.info(f" - Saved to {filepath.name}") else: logger.info(f" - [DRY RUN] Would save to {filepath.name}") return True except Exception as e: logger.error(f"Error processing {filepath}: {e}") return False def main(): """Main entry point.""" parser = argparse.ArgumentParser( description='Enrich KB libraries with claim-level provenance tracking' ) parser.add_argument('--dry-run', action='store_true', help='Show what would be done without making changes') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries to process') parser.add_argument('--file', type=str, default=None, help='Process a specific file (e.g., 1377_kb_isil.yaml)') args = parser.parse_args() logger.info("=" * 70) logger.info("KB Netherlands Libraries - Claim-Level Provenance Enrichment") logger.info("=" * 70) logger.info(f"Schema: schemas/web_enrichment_provenance.yaml") logger.info("") # Find files to process if args.file: files = [ENTRIES_DIR / args.file] if not files[0].exists(): logger.error(f"File not found: {files[0]}") return 1 else: files = sorted(ENTRIES_DIR.glob("*_kb_isil.yaml")) logger.info(f"Found {len(files)} KB library files") if args.limit: files = files[:args.limit] logger.info(f"Limited to {len(files)} files") # Print instructions for MCP-based enrichment logger.info("") logger.info("=" * 70) logger.info("This script processes Exa results into claim-level provenance.") logger.info("To use: Pass Exa search results as JSON to stdin, or integrate with MCP.") logger.info("=" * 70) logger.info("") # Example: show what would be processed for f in files[:5]: data = load_kb_library_file(f) name = get_library_name(data) city = get_library_city(data) website = get_library_website(data) has_old = 'exa_enrichment' in data or 'website_enrichment' in data has_new = 'web_enrichment' in data status = "NEW" if has_new else ("LEGACY" if has_old else "NONE") logger.info(f" {f.name}: {name} ({city})") logger.info(f" Website: {website or 'Not found'}") logger.info(f" Status: {status}") if len(files) > 5: logger.info(f" ... and {len(files) - 5} more files") return 0 if __name__ == "__main__": sys.exit(main())