#!/usr/bin/env python3 """ Replace ad-hoc confidence scores with XPath-based provenance. This script: 1. Reads claims from web_enrichment.claims 2. Matches claim values to XPath extractions in metadata.yaml 3. Replaces 'confidence' with 'xpath' and 'html_file' references 4. Provides verifiable provenance instead of arbitrary confidence scores The new claim structure: - claim_type: full_name claim_value: "Museum Name" source_url: https://example.com extraction_timestamp: '2025-11-28T12:00:00+00:00' xpath: '/html[1]/body[1]/div[2]/h1[1]' html_file: 'web/0001/example.com/rendered.html' Usage: python scripts/add_xpath_provenance.py [--limit N] [--entry ENTRY_NUM] [--dry-run] """ import argparse import re import sys from datetime import datetime, timezone from pathlib import Path from difflib import SequenceMatcher import yaml # Directories ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') WEB_DIR = ENTRIES_DIR / 'web' def normalize_text(text: str) -> str: """Normalize text for fuzzy matching.""" if not text: return "" # Lowercase, remove extra whitespace, normalize unicode text = str(text).lower().strip() text = re.sub(r'\s+', ' ', text) return text def fuzzy_match(claim_value: str, extraction_text: str, threshold: float = 0.6) -> float: """ Calculate similarity between claim value and extracted text. Returns similarity ratio (0.0 to 1.0). """ claim_norm = normalize_text(claim_value) extract_norm = normalize_text(extraction_text) if not claim_norm or not extract_norm: return 0.0 # Exact substring match if claim_norm in extract_norm or extract_norm in claim_norm: return 1.0 # Sequence matcher for fuzzy matching return SequenceMatcher(None, claim_norm, extract_norm).ratio() def load_metadata(web_archive_dir: Path) -> dict | None: """Load metadata.yaml from web archive directory.""" metadata_file = web_archive_dir / 'metadata.yaml' if not metadata_file.exists(): return None try: with open(metadata_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: print(f" Warning: Failed to load {metadata_file}: {e}") return None def find_best_xpath_match(claim_value: str, extractions: list[dict], threshold: float = 0.5) -> dict | None: """ Find the best matching XPath extraction for a claim value. Returns dict with xpath and match_score, or None if no good match. """ if not extractions: return None best_match = None best_score = 0.0 for extraction in extractions: text = extraction.get('text', '') score = fuzzy_match(claim_value, text, threshold) if score > best_score and score >= threshold: best_score = score best_match = { 'xpath': extraction.get('xpath'), 'matched_text': text[:100], # Truncate for readability 'match_score': round(score, 3), 'tag': extraction.get('tag'), } return best_match def get_web_archive_path(entry_data: dict, entry_num: str) -> Path | None: """Get the web archive directory path for an entry.""" web_enrichment = entry_data.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) if web_archives: # Use first archive archive = web_archives[0] directory = archive.get('directory') if directory: return ENTRIES_DIR / directory # Fallback: look for directory in web/{entry_num}/ entry_web_dir = WEB_DIR / entry_num if entry_web_dir.exists(): subdirs = [d for d in entry_web_dir.iterdir() if d.is_dir()] if subdirs: return subdirs[0] return None def process_claims(claims: list[dict], extractions: list[dict], html_file_path: str, remove_unverified: bool = True) -> tuple[list[dict], list[dict]]: """ Process claims to replace confidence with XPath provenance. Claims without XPath verification are either removed (fabricated/hallucinated) or moved to a separate unverified list. Args: claims: List of claim dicts with confidence scores extractions: List of XPath extractions from metadata.yaml html_file_path: Relative path to HTML file for provenance remove_unverified: If True, unverified claims are removed entirely Returns: Tuple of (verified_claims, removed_claims) """ verified_claims = [] removed_claims = [] for claim in claims: # Try to find matching XPath claim_value = str(claim.get('claim_value', '')) match = find_best_xpath_match(claim_value, extractions) if match: # Verified claim - has XPath provenance new_claim = { 'claim_type': claim.get('claim_type'), 'claim_value': claim.get('claim_value'), 'source_url': claim.get('source_url'), 'extraction_timestamp': claim.get('extraction_timestamp'), 'xpath': match['xpath'], 'html_file': html_file_path, 'xpath_match_score': match['match_score'], } # Keep matched_text for debugging if not exact match if match['match_score'] < 1.0: new_claim['xpath_matched_text'] = match['matched_text'] verified_claims.append(new_claim) else: # No XPath match - claim cannot be verified from archived HTML # This means either: # 1. The claim was fabricated/hallucinated by LLM # 2. The value is in an attribute (href, src) not text content # 3. The value was dynamically generated # In all cases, we cannot verify it, so remove it removed_claims.append({ 'claim_type': claim.get('claim_type'), 'claim_value': claim.get('claim_value'), 'reason': 'Cannot verify - value not found in archived HTML text content' }) return verified_claims, removed_claims def extract_entry_number(filename: str) -> str: """Extract entry number from filename.""" match = re.match(r'^(\d+)', filename) return match.group(1) if match else filename.replace('.yaml', '') def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, int, list[str]]: """ Process a single entry file to add XPath provenance. Returns: (claims_updated, claims_unmatched, errors) """ with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return 0, 0, ["Empty file"] web_enrichment = data.get('web_enrichment', {}) claims = web_enrichment.get('claims', []) if not claims: return 0, 0, [] # No claims to process # Check if already processed (has xpath instead of confidence) if claims and 'xpath' in claims[0] and 'confidence' not in claims[0]: return 0, 0, [] # Already migrated entry_num = extract_entry_number(filepath.name) # Get web archive path archive_path = get_web_archive_path(data, entry_num) if not archive_path: return 0, 0, [f"No web archive found for entry {entry_num}"] # Load metadata with extractions metadata = load_metadata(archive_path) if not metadata: return 0, 0, [f"No metadata.yaml in {archive_path}"] extractions = metadata.get('extractions', []) if not extractions: return 0, 0, [f"No extractions in metadata for {entry_num}"] # HTML file path (relative to entries dir) html_file_path = str(archive_path.relative_to(ENTRIES_DIR) / 'rendered.html') # Process claims - remove unverified ones verified_claims, removed_claims = process_claims(claims, extractions, html_file_path) # Count results matched = len(verified_claims) removed = len(removed_claims) if not dry_run: # Update the data - only keep verified claims data['web_enrichment']['claims'] = verified_claims data['web_enrichment']['xpath_provenance_added'] = datetime.now(timezone.utc).isoformat() # Store removed claims for audit trail if removed_claims: data['web_enrichment']['removed_unverified_claims'] = removed_claims # Write back with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) errors = [] if removed > 0: errors.append(f"{removed} unverified claims removed (not found in HTML)") return matched, removed, errors def main(): parser = argparse.ArgumentParser(description='Add XPath provenance to web_enrichment claims') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries') parser.add_argument('--entry', type=str, default=None, help='Process specific entry number') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing') args = parser.parse_args() # Find entry files with claims if args.entry: files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml')) else: files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if not f.name.startswith('.')]) if args.limit: files = files[:args.limit] total_verified = 0 total_removed = 0 entries_processed = 0 entries_with_claims = 0 for filepath in files: if filepath.is_dir(): continue # Quick check if file has claims with confidence with open(filepath, 'r', encoding='utf-8') as f: content = f.read() if 'confidence:' not in content or 'claims:' not in content: continue entries_with_claims += 1 print(f"Processing: {filepath.name}") verified, removed, errors = process_entry(filepath, dry_run=args.dry_run) if verified or removed: entries_processed += 1 total_verified += verified total_removed += removed print(f" Verified: {verified}, Removed: {removed}") for e in errors: print(f" {e}") print(f"\n{'DRY RUN - ' if args.dry_run else ''}Summary:") print(f" Entries with claims: {entries_with_claims}") print(f" Entries processed: {entries_processed}") print(f" Claims verified with XPath: {total_verified}") print(f" Claims removed (unverified): {total_removed}") if total_removed > 0: print(f"\n Removed claims are stored in 'removed_unverified_claims' for audit.") print(f" These claims could not be verified against archived HTML content.") return 0 if __name__ == '__main__': sys.exit(main())