#!/usr/bin/env python3 """ Cross-validate web_claims against other data sources (Wikidata, Google Maps, original entry). This script: 1. Compares org_name claims from websites against authoritative sources 2. Identifies claims that contradict known data 3. Flags suspicious claims for manual review 4. Generates a validation report Usage: python scripts/validate_web_claims.py [--fix] [--limit N] [--output FILE] """ import argparse import json import re import sys from collections import defaultdict from pathlib import Path from typing import Dict, List, Optional, Tuple import yaml try: from rapidfuzz import fuzz HAS_FUZZ = True except ImportError: HAS_FUZZ = False print("Warning: rapidfuzz not installed. Fuzzy matching disabled.") print("Install with: pip install rapidfuzz") ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') def normalize_name(name: str) -> str: """Normalize a name for comparison.""" if not name: return '' # Lowercase, remove extra whitespace name = ' '.join(name.lower().split()) # Remove common prefixes/suffixes removals = ['stichting ', 'vereniging ', 'museum ', 'het ', 'de ', 'een '] for removal in removals: if name.startswith(removal): name = name[len(removal):] return name.strip() def compare_names(name1: str, name2: str) -> float: """Compare two names and return similarity score (0-1).""" if not name1 or not name2: return 0.0 n1 = normalize_name(name1) n2 = normalize_name(name2) # Exact match if n1 == n2: return 1.0 # One contains the other if n1 in n2 or n2 in n1: return 0.9 # Fuzzy match if HAS_FUZZ: ratio = fuzz.ratio(n1, n2) / 100.0 partial = fuzz.partial_ratio(n1, n2) / 100.0 return max(ratio, partial * 0.9) return 0.0 def get_authoritative_names(data: dict) -> Dict[str, str]: """Extract authoritative names from all sources.""" names = {} # Original entry (most authoritative) original = data.get('original_entry', {}) if original.get('organisatie'): names['original_entry'] = original['organisatie'] # Wikidata wikidata = data.get('wikidata_enrichment', {}) if wikidata.get('wikidata_label_nl'): names['wikidata_nl'] = wikidata['wikidata_label_nl'] if wikidata.get('wikidata_label_en'): names['wikidata_en'] = wikidata['wikidata_label_en'] # Google Maps google = data.get('google_maps_enrichment', {}) if google.get('name'): names['google_maps'] = google['name'] # Derived custodian name (if already validated) custodian = data.get('custodian_name', {}) if custodian.get('claim_value') and custodian.get('confidence', 0) > 0.7: names['custodian_name'] = custodian['claim_value'] return names def validate_web_claims(data: dict) -> Dict: """Validate web claims against authoritative sources.""" results = { 'valid_claims': [], 'suspicious_claims': [], 'invalid_claims': [], 'authoritative_names': {}, 'best_match': None, } # Get authoritative names auth_names = get_authoritative_names(data) results['authoritative_names'] = auth_names if not auth_names: return results # Get web claims web_claims = data.get('web_claims', {}) claims = web_claims.get('claims', []) # Find org_name claims org_name_claims = [c for c in claims if c.get('claim_type') == 'org_name'] for claim in org_name_claims: claim_value = claim.get('claim_value', '') if not claim_value: continue # Compare against all authoritative sources best_score = 0 best_source = None for source, auth_name in auth_names.items(): score = compare_names(claim_value, auth_name) if score > best_score: best_score = score best_source = source claim_info = { 'claim_value': claim_value, 'extraction_method': claim.get('extraction_method', 'unknown'), 'xpath': claim.get('xpath', ''), 'best_match_score': best_score, 'best_match_source': best_source, 'matched_name': auth_names.get(best_source, '') if best_source else '', } if best_score >= 0.8: results['valid_claims'].append(claim_info) if results['best_match'] is None or best_score > results['best_match']['score']: results['best_match'] = { 'claim_value': claim_value, 'source': best_source, 'score': best_score, } elif best_score >= 0.5: results['suspicious_claims'].append(claim_info) else: # Score below 0.5 - likely invalid results['invalid_claims'].append(claim_info) return results def process_entry(filepath: Path) -> Optional[Dict]: """Process a single entry for validation.""" with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return None # Skip entries without web_claims if 'web_claims' not in data: return None validation = validate_web_claims(data) return { 'entry_file': filepath.name, 'validation': validation, } def main(): parser = argparse.ArgumentParser(description='Validate web claims against authoritative sources') parser.add_argument('--limit', type=int, default=None, help='Limit entries') parser.add_argument('--output', type=str, default=None, help='Output JSON file') parser.add_argument('--verbose', action='store_true', help='Show detailed output') parser.add_argument('--show-suspicious', action='store_true', help='List all suspicious claims') args = parser.parse_args() files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')]) if args.limit: files = files[:args.limit] print(f"Validating web claims in {len(files)} entries...") # Statistics total_with_claims = 0 total_valid = 0 total_suspicious = 0 total_invalid = 0 suspicious_examples = [] invalid_examples = [] results = [] for filepath in files: result = process_entry(filepath) if result: results.append(result) total_with_claims += 1 v = result['validation'] total_valid += len(v['valid_claims']) total_suspicious += len(v['suspicious_claims']) total_invalid += len(v['invalid_claims']) # Collect examples if v['suspicious_claims']: for claim in v['suspicious_claims'][:2]: suspicious_examples.append({ 'file': result['entry_file'], 'claim': claim['claim_value'], 'score': claim['best_match_score'], 'matched': claim['matched_name'], }) if v['invalid_claims']: for claim in v['invalid_claims'][:2]: invalid_examples.append({ 'file': result['entry_file'], 'claim': claim['claim_value'], 'score': claim['best_match_score'], 'auth_names': list(v['authoritative_names'].values())[:2], }) # Print summary print("\n" + "=" * 60) print("WEB CLAIMS VALIDATION SUMMARY") print("=" * 60) print(f"Entries with web_claims: {total_with_claims}") print(f"Valid org_name claims: {total_valid}") print(f"Suspicious claims (0.5-0.8 match): {total_suspicious}") print(f"Invalid claims (<0.5 match): {total_invalid}") if total_valid + total_suspicious + total_invalid > 0: total = total_valid + total_suspicious + total_invalid print(f"\nValidation rates:") print(f" Valid: {total_valid/total:.1%}") print(f" Suspicious: {total_suspicious/total:.1%}") print(f" Invalid: {total_invalid/total:.1%}") if args.show_suspicious and suspicious_examples: print("\n" + "-" * 40) print("SUSPICIOUS CLAIMS (sample):") for ex in suspicious_examples[:20]: print(f" {ex['file']}") print(f" Claim: {ex['claim'][:50]}") print(f" Score: {ex['score']:.2f} vs '{ex['matched'][:30]}'") if args.verbose and invalid_examples: print("\n" + "-" * 40) print("INVALID CLAIMS (sample):") for ex in invalid_examples[:20]: print(f" {ex['file']}") print(f" Claim: {ex['claim'][:50]}") print(f" Auth names: {ex['auth_names']}") # Save output if args.output: with open(args.output, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"\nSaved detailed results to {args.output}") return 0 if __name__ == '__main__': sys.exit(main())