glam/scripts/add_xpath_provenance.py
2025-11-29 18:05:16 +01:00

321 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Replace ad-hoc confidence scores with XPath-based provenance.
This script:
1. Reads claims from web_enrichment.claims
2. Matches claim values to XPath extractions in metadata.yaml
3. Replaces 'confidence' with 'xpath' and 'html_file' references
4. Provides verifiable provenance instead of arbitrary confidence scores
The new claim structure:
- claim_type: full_name
claim_value: "Museum Name"
source_url: https://example.com
extraction_timestamp: '2025-11-28T12:00:00+00:00'
xpath: '/html[1]/body[1]/div[2]/h1[1]'
html_file: 'web/0001/example.com/rendered.html'
Usage:
python scripts/add_xpath_provenance.py [--limit N] [--entry ENTRY_NUM] [--dry-run]
"""
import argparse
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from difflib import SequenceMatcher
import yaml
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
WEB_DIR = ENTRIES_DIR / 'web'
def normalize_text(text: str) -> str:
"""Normalize text for fuzzy matching."""
if not text:
return ""
# Lowercase, remove extra whitespace, normalize unicode
text = str(text).lower().strip()
text = re.sub(r'\s+', ' ', text)
return text
def fuzzy_match(claim_value: str, extraction_text: str, threshold: float = 0.6) -> float:
"""
Calculate similarity between claim value and extracted text.
Returns similarity ratio (0.0 to 1.0).
"""
claim_norm = normalize_text(claim_value)
extract_norm = normalize_text(extraction_text)
if not claim_norm or not extract_norm:
return 0.0
# Exact substring match
if claim_norm in extract_norm or extract_norm in claim_norm:
return 1.0
# Sequence matcher for fuzzy matching
return SequenceMatcher(None, claim_norm, extract_norm).ratio()
def load_metadata(web_archive_dir: Path) -> dict | None:
"""Load metadata.yaml from web archive directory."""
metadata_file = web_archive_dir / 'metadata.yaml'
if not metadata_file.exists():
return None
try:
with open(metadata_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f" Warning: Failed to load {metadata_file}: {e}")
return None
def find_best_xpath_match(claim_value: str, extractions: list[dict], threshold: float = 0.5) -> dict | None:
"""
Find the best matching XPath extraction for a claim value.
Returns dict with xpath and match_score, or None if no good match.
"""
if not extractions:
return None
best_match = None
best_score = 0.0
for extraction in extractions:
text = extraction.get('text', '')
score = fuzzy_match(claim_value, text, threshold)
if score > best_score and score >= threshold:
best_score = score
best_match = {
'xpath': extraction.get('xpath'),
'matched_text': text[:100], # Truncate for readability
'match_score': round(score, 3),
'tag': extraction.get('tag'),
}
return best_match
def get_web_archive_path(entry_data: dict, entry_num: str) -> Path | None:
"""Get the web archive directory path for an entry."""
web_enrichment = entry_data.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
if web_archives:
# Use first archive
archive = web_archives[0]
directory = archive.get('directory')
if directory:
return ENTRIES_DIR / directory
# Fallback: look for directory in web/{entry_num}/
entry_web_dir = WEB_DIR / entry_num
if entry_web_dir.exists():
subdirs = [d for d in entry_web_dir.iterdir() if d.is_dir()]
if subdirs:
return subdirs[0]
return None
def process_claims(claims: list[dict], extractions: list[dict], html_file_path: str,
remove_unverified: bool = True) -> tuple[list[dict], list[dict]]:
"""
Process claims to replace confidence with XPath provenance.
Claims without XPath verification are either removed (fabricated/hallucinated)
or moved to a separate unverified list.
Args:
claims: List of claim dicts with confidence scores
extractions: List of XPath extractions from metadata.yaml
html_file_path: Relative path to HTML file for provenance
remove_unverified: If True, unverified claims are removed entirely
Returns:
Tuple of (verified_claims, removed_claims)
"""
verified_claims = []
removed_claims = []
for claim in claims:
# Try to find matching XPath
claim_value = str(claim.get('claim_value', ''))
match = find_best_xpath_match(claim_value, extractions)
if match:
# Verified claim - has XPath provenance
new_claim = {
'claim_type': claim.get('claim_type'),
'claim_value': claim.get('claim_value'),
'source_url': claim.get('source_url'),
'extraction_timestamp': claim.get('extraction_timestamp'),
'xpath': match['xpath'],
'html_file': html_file_path,
'xpath_match_score': match['match_score'],
}
# Keep matched_text for debugging if not exact match
if match['match_score'] < 1.0:
new_claim['xpath_matched_text'] = match['matched_text']
verified_claims.append(new_claim)
else:
# No XPath match - claim cannot be verified from archived HTML
# This means either:
# 1. The claim was fabricated/hallucinated by LLM
# 2. The value is in an attribute (href, src) not text content
# 3. The value was dynamically generated
# In all cases, we cannot verify it, so remove it
removed_claims.append({
'claim_type': claim.get('claim_type'),
'claim_value': claim.get('claim_value'),
'reason': 'Cannot verify - value not found in archived HTML text content'
})
return verified_claims, removed_claims
def extract_entry_number(filename: str) -> str:
"""Extract entry number from filename."""
match = re.match(r'^(\d+)', filename)
return match.group(1) if match else filename.replace('.yaml', '')
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, int, list[str]]:
"""
Process a single entry file to add XPath provenance.
Returns: (claims_updated, claims_unmatched, errors)
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return 0, 0, ["Empty file"]
web_enrichment = data.get('web_enrichment', {})
claims = web_enrichment.get('claims', [])
if not claims:
return 0, 0, [] # No claims to process
# Check if already processed (has xpath instead of confidence)
if claims and 'xpath' in claims[0] and 'confidence' not in claims[0]:
return 0, 0, [] # Already migrated
entry_num = extract_entry_number(filepath.name)
# Get web archive path
archive_path = get_web_archive_path(data, entry_num)
if not archive_path:
return 0, 0, [f"No web archive found for entry {entry_num}"]
# Load metadata with extractions
metadata = load_metadata(archive_path)
if not metadata:
return 0, 0, [f"No metadata.yaml in {archive_path}"]
extractions = metadata.get('extractions', [])
if not extractions:
return 0, 0, [f"No extractions in metadata for {entry_num}"]
# HTML file path (relative to entries dir)
html_file_path = str(archive_path.relative_to(ENTRIES_DIR) / 'rendered.html')
# Process claims - remove unverified ones
verified_claims, removed_claims = process_claims(claims, extractions, html_file_path)
# Count results
matched = len(verified_claims)
removed = len(removed_claims)
if not dry_run:
# Update the data - only keep verified claims
data['web_enrichment']['claims'] = verified_claims
data['web_enrichment']['xpath_provenance_added'] = datetime.now(timezone.utc).isoformat()
# Store removed claims for audit trail
if removed_claims:
data['web_enrichment']['removed_unverified_claims'] = removed_claims
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
errors = []
if removed > 0:
errors.append(f"{removed} unverified claims removed (not found in HTML)")
return matched, removed, errors
def main():
parser = argparse.ArgumentParser(description='Add XPath provenance to web_enrichment claims')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
args = parser.parse_args()
# Find entry files with claims
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
total_verified = 0
total_removed = 0
entries_processed = 0
entries_with_claims = 0
for filepath in files:
if filepath.is_dir():
continue
# Quick check if file has claims with confidence
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
if 'confidence:' not in content or 'claims:' not in content:
continue
entries_with_claims += 1
print(f"Processing: {filepath.name}")
verified, removed, errors = process_entry(filepath, dry_run=args.dry_run)
if verified or removed:
entries_processed += 1
total_verified += verified
total_removed += removed
print(f" Verified: {verified}, Removed: {removed}")
for e in errors:
print(f" {e}")
print(f"\n{'DRY RUN - ' if args.dry_run else ''}Summary:")
print(f" Entries with claims: {entries_with_claims}")
print(f" Entries processed: {entries_processed}")
print(f" Claims verified with XPath: {total_verified}")
print(f" Claims removed (unverified): {total_removed}")
if total_removed > 0:
print(f"\n Removed claims are stored in 'removed_unverified_claims' for audit.")
print(f" These claims could not be verified against archived HTML content.")
return 0
if __name__ == '__main__':
sys.exit(main())