#!/usr/bin/env python3 """ Analyze and clean up web_enrichment sections that lack xpath provenance. Per Rule 6 in AGENTS.md: "Every claim extracted from a webpage MUST have an XPath pointer to the exact location in archived HTML where that value appears. Claims without XPath provenance are FABRICATED and must be removed." This script: 1. Identifies web_enrichment sections with vague confidence but no xpath 2. Removes these fabricated claims 3. Preserves valid web_enrichment sections that have xpath provenance """ import os import sys from pathlib import Path from datetime import datetime import subprocess import re def has_xpath_in_block(content: str, start_line: int, block_content: str) -> bool: """Check if a block contains xpath provenance.""" return 'xpath:' in block_content def has_vague_confidence(block_content: str) -> bool: """Check if block has vague confidence scores (no xpath but has confidence).""" has_confidence = bool(re.search(r'confidence:\s*0\.\d+', block_content)) has_xpath = 'xpath:' in block_content return has_confidence and not has_xpath def extract_block(content: str, key: str) -> tuple[int, int, str]: """ Extract a top-level YAML block by key. Returns (start_line, end_line, block_content) or (-1, -1, '') if not found. """ lines = content.split('\n') in_block = False start_idx = -1 for i, line in enumerate(lines): # Check if this is the key we want (top-level) if line.startswith(f'{key}:') and not line.startswith(' '): in_block = True start_idx = i continue # If we're in the block, check if we've hit the next top-level key if in_block: if line and not line.startswith(' ') and not line.startswith('#') and ':' in line: # Found next top-level key block_content = '\n'.join(lines[start_idx:i]) return start_idx, i, block_content # Block extends to end of file if in_block: block_content = '\n'.join(lines[start_idx:]) return start_idx, len(lines), block_content return -1, -1, '' def analyze_file(filepath: Path) -> dict: """Analyze a single file for problematic web_enrichment.""" result = { 'has_web_enrichment': False, 'has_vague_confidence': False, 'has_xpath': False, 'should_remove': False, 'claims_count': 0 } try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() start, end, block = extract_block(content, 'web_enrichment') if start >= 0: result['has_web_enrichment'] = True result['has_xpath'] = 'xpath:' in block result['has_vague_confidence'] = has_vague_confidence(block) result['claims_count'] = block.count('claim_type:') # Should remove if has vague confidence without xpath result['should_remove'] = result['has_vague_confidence'] and not result['has_xpath'] except Exception as e: result['error'] = str(e) return result def remove_web_enrichment_block(filepath: Path, dry_run: bool = False) -> bool: """Remove the web_enrichment block from a file.""" try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() lines = content.split('\n') new_lines = [] skip_until_next_top_level = False removed = False for line in lines: # Check if this is web_enrichment (top-level) if line.startswith('web_enrichment:') and not line.startswith(' '): skip_until_next_top_level = True removed = True continue # If we're skipping, check if we've hit the next top-level key if skip_until_next_top_level: if line and not line.startswith(' ') and not line.startswith('#') and ':' in line: skip_until_next_top_level = False new_lines.append(line) continue new_lines.append(line) if removed and not dry_run: with open(filepath, 'w', encoding='utf-8') as f: f.write('\n'.join(new_lines)) return removed except Exception as e: print(f" ERROR: {e}") return False def main(): import argparse parser = argparse.ArgumentParser(description='Clean up fabricated web_enrichment claims') parser.add_argument('--dry-run', action='store_true', help='Analyze without making changes') parser.add_argument('--verbose', '-v', action='store_true', help='Print details') parser.add_argument('--analyze-only', action='store_true', help='Only analyze, show statistics') args = parser.parse_args() custodian_dir = Path('/Users/kempersc/apps/glam/data/custodian') print(f"{'[DRY RUN] ' if args.dry_run else ''}Analyzing web_enrichment sections...") print(f"Started at: {datetime.now().isoformat()}") print() # Find files with web_enrichment using grep (fast) result = subprocess.run( 'grep -l "^web_enrichment:" *.yaml 2>/dev/null || true', shell=True, capture_output=True, text=True, cwd=custodian_dir ) files_to_check = [] if result.stdout.strip(): files_to_check = [custodian_dir / f for f in result.stdout.strip().split('\n') if f] print(f"Found {len(files_to_check)} files with web_enrichment sections") print() # Analyze all files stats = { 'total': len(files_to_check), 'with_xpath': 0, 'without_xpath_fabricated': 0, 'mixed': 0, 'total_claims_removed': 0, 'errors': 0 } files_to_remove = [] for filepath in files_to_check: analysis = analyze_file(filepath) if 'error' in analysis: stats['errors'] += 1 if args.verbose: print(f" ERROR in {filepath.name}: {analysis['error']}") continue if analysis['has_xpath'] and not analysis['has_vague_confidence']: stats['with_xpath'] += 1 elif analysis['should_remove']: stats['without_xpath_fabricated'] += 1 stats['total_claims_removed'] += analysis['claims_count'] files_to_remove.append(filepath) if args.verbose: print(f" FABRICATED: {filepath.name} ({analysis['claims_count']} claims)") elif analysis['has_xpath'] and analysis['has_vague_confidence']: stats['mixed'] += 1 print("=" * 60) print("ANALYSIS RESULTS") print("=" * 60) print(f"Total files with web_enrichment: {stats['total']}") print(f" With proper xpath provenance: {stats['with_xpath']}") print(f" FABRICATED (no xpath, vague conf): {stats['without_xpath_fabricated']}") print(f" Mixed (has both): {stats['mixed']}") print(f" Errors: {stats['errors']}") print(f"Total fabricated claims to remove: {stats['total_claims_removed']}") print() if args.analyze_only: print("[ANALYZE ONLY] No changes made.") return if not files_to_remove: print("No fabricated web_enrichment sections found.") return # Remove fabricated sections print(f"{'[DRY RUN] ' if args.dry_run else ''}Removing {len(files_to_remove)} fabricated web_enrichment sections...") removed_count = 0 for filepath in files_to_remove: if remove_web_enrichment_block(filepath, dry_run=args.dry_run): removed_count += 1 if args.verbose: print(f" Removed: {filepath.name}") print() print("=" * 60) print("CLEANUP RESULTS") print("=" * 60) print(f"web_enrichment sections removed: {removed_count}") print(f"Completed at: {datetime.now().isoformat()}") if args.dry_run: print() print("[DRY RUN] No changes were made. Run without --dry-run to apply changes.") if __name__ == '__main__': main()