glam/scripts/validate_web_claims.py
2025-12-02 14:36:01 +01:00

288 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""
Cross-validate web_claims against other data sources (Wikidata, Google Maps, original entry).
This script:
1. Compares org_name claims from websites against authoritative sources
2. Identifies claims that contradict known data
3. Flags suspicious claims for manual review
4. Generates a validation report
Usage:
python scripts/validate_web_claims.py [--fix] [--limit N] [--output FILE]
"""
import argparse
import json
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import yaml
try:
from rapidfuzz import fuzz
HAS_FUZZ = True
except ImportError:
HAS_FUZZ = False
print("Warning: rapidfuzz not installed. Fuzzy matching disabled.")
print("Install with: pip install rapidfuzz")
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
def normalize_name(name: str) -> str:
"""Normalize a name for comparison."""
if not name:
return ''
# Lowercase, remove extra whitespace
name = ' '.join(name.lower().split())
# Remove common prefixes/suffixes
removals = ['stichting ', 'vereniging ', 'museum ', 'het ', 'de ', 'een ']
for removal in removals:
if name.startswith(removal):
name = name[len(removal):]
return name.strip()
def compare_names(name1: str, name2: str) -> float:
"""Compare two names and return similarity score (0-1)."""
if not name1 or not name2:
return 0.0
n1 = normalize_name(name1)
n2 = normalize_name(name2)
# Exact match
if n1 == n2:
return 1.0
# One contains the other
if n1 in n2 or n2 in n1:
return 0.9
# Fuzzy match
if HAS_FUZZ:
ratio = fuzz.ratio(n1, n2) / 100.0
partial = fuzz.partial_ratio(n1, n2) / 100.0
return max(ratio, partial * 0.9)
return 0.0
def get_authoritative_names(data: dict) -> Dict[str, str]:
"""Extract authoritative names from all sources."""
names = {}
# Original entry (most authoritative)
original = data.get('original_entry', {})
if original.get('organisatie'):
names['original_entry'] = original['organisatie']
# Wikidata
wikidata = data.get('wikidata_enrichment', {})
if wikidata.get('wikidata_label_nl'):
names['wikidata_nl'] = wikidata['wikidata_label_nl']
if wikidata.get('wikidata_label_en'):
names['wikidata_en'] = wikidata['wikidata_label_en']
# Google Maps
google = data.get('google_maps_enrichment', {})
if google.get('name'):
names['google_maps'] = google['name']
# Derived custodian name (if already validated)
custodian = data.get('custodian_name', {})
if custodian.get('claim_value') and custodian.get('confidence', 0) > 0.7:
names['custodian_name'] = custodian['claim_value']
return names
def validate_web_claims(data: dict) -> Dict:
"""Validate web claims against authoritative sources."""
results = {
'valid_claims': [],
'suspicious_claims': [],
'invalid_claims': [],
'authoritative_names': {},
'best_match': None,
}
# Get authoritative names
auth_names = get_authoritative_names(data)
results['authoritative_names'] = auth_names
if not auth_names:
return results
# Get web claims
web_claims = data.get('web_claims', {})
claims = web_claims.get('claims', [])
# Find org_name claims
org_name_claims = [c for c in claims if c.get('claim_type') == 'org_name']
for claim in org_name_claims:
claim_value = claim.get('claim_value', '')
if not claim_value:
continue
# Compare against all authoritative sources
best_score = 0
best_source = None
for source, auth_name in auth_names.items():
score = compare_names(claim_value, auth_name)
if score > best_score:
best_score = score
best_source = source
claim_info = {
'claim_value': claim_value,
'extraction_method': claim.get('extraction_method', 'unknown'),
'xpath': claim.get('xpath', ''),
'best_match_score': best_score,
'best_match_source': best_source,
'matched_name': auth_names.get(best_source, '') if best_source else '',
}
if best_score >= 0.8:
results['valid_claims'].append(claim_info)
if results['best_match'] is None or best_score > results['best_match']['score']:
results['best_match'] = {
'claim_value': claim_value,
'source': best_source,
'score': best_score,
}
elif best_score >= 0.5:
results['suspicious_claims'].append(claim_info)
else:
# Score below 0.5 - likely invalid
results['invalid_claims'].append(claim_info)
return results
def process_entry(filepath: Path) -> Optional[Dict]:
"""Process a single entry for validation."""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return None
# Skip entries without web_claims
if 'web_claims' not in data:
return None
validation = validate_web_claims(data)
return {
'entry_file': filepath.name,
'validation': validation,
}
def main():
parser = argparse.ArgumentParser(description='Validate web claims against authoritative sources')
parser.add_argument('--limit', type=int, default=None, help='Limit entries')
parser.add_argument('--output', type=str, default=None, help='Output JSON file')
parser.add_argument('--verbose', action='store_true', help='Show detailed output')
parser.add_argument('--show-suspicious', action='store_true', help='List all suspicious claims')
args = parser.parse_args()
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
print(f"Validating web claims in {len(files)} entries...")
# Statistics
total_with_claims = 0
total_valid = 0
total_suspicious = 0
total_invalid = 0
suspicious_examples = []
invalid_examples = []
results = []
for filepath in files:
result = process_entry(filepath)
if result:
results.append(result)
total_with_claims += 1
v = result['validation']
total_valid += len(v['valid_claims'])
total_suspicious += len(v['suspicious_claims'])
total_invalid += len(v['invalid_claims'])
# Collect examples
if v['suspicious_claims']:
for claim in v['suspicious_claims'][:2]:
suspicious_examples.append({
'file': result['entry_file'],
'claim': claim['claim_value'],
'score': claim['best_match_score'],
'matched': claim['matched_name'],
})
if v['invalid_claims']:
for claim in v['invalid_claims'][:2]:
invalid_examples.append({
'file': result['entry_file'],
'claim': claim['claim_value'],
'score': claim['best_match_score'],
'auth_names': list(v['authoritative_names'].values())[:2],
})
# Print summary
print("\n" + "=" * 60)
print("WEB CLAIMS VALIDATION SUMMARY")
print("=" * 60)
print(f"Entries with web_claims: {total_with_claims}")
print(f"Valid org_name claims: {total_valid}")
print(f"Suspicious claims (0.5-0.8 match): {total_suspicious}")
print(f"Invalid claims (<0.5 match): {total_invalid}")
if total_valid + total_suspicious + total_invalid > 0:
total = total_valid + total_suspicious + total_invalid
print(f"\nValidation rates:")
print(f" Valid: {total_valid/total:.1%}")
print(f" Suspicious: {total_suspicious/total:.1%}")
print(f" Invalid: {total_invalid/total:.1%}")
if args.show_suspicious and suspicious_examples:
print("\n" + "-" * 40)
print("SUSPICIOUS CLAIMS (sample):")
for ex in suspicious_examples[:20]:
print(f" {ex['file']}")
print(f" Claim: {ex['claim'][:50]}")
print(f" Score: {ex['score']:.2f} vs '{ex['matched'][:30]}'")
if args.verbose and invalid_examples:
print("\n" + "-" * 40)
print("INVALID CLAIMS (sample):")
for ex in invalid_examples[:20]:
print(f" {ex['file']}")
print(f" Claim: {ex['claim'][:50]}")
print(f" Auth names: {ex['auth_names']}")
# Save output
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\nSaved detailed results to {args.output}")
return 0
if __name__ == '__main__':
sys.exit(main())