#!/usr/bin/env python3 """ Validate custodian_name against authoritative enrichment sources. This script validates extracted web claims against: 1. wikidata_label_nl (authoritative) 2. google_maps_enrichment.name (high confidence) 3. original_entry.organisatie (source CSV) Uses fuzzy string matching to detect mismatches and flags entries for review. Usage: python scripts/validate_custodian_name.py [--limit N] [--entry ENTRY_NUM] [--fix] """ import argparse import csv import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, List, Tuple import yaml try: from rapidfuzz import fuzz RAPIDFUZZ_AVAILABLE = True except ImportError: RAPIDFUZZ_AVAILABLE = False print("Warning: rapidfuzz not installed. Using basic string matching.") # Directories ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') REPORTS_DIR = Path('/Users/kempersc/apps/glam/reports') # Trust levels for different sources (0-1) SOURCE_TRUST_LEVELS = { 'wikidata': 1.0, # Community-verified, highest trust 'google_maps': 0.9, # Google-verified business data 'isil': 0.85, # Official ISIL registry 'original_entry': 0.7, # CSV source, may be outdated 'web_og_site_name': 0.8, # Website self-declaration 'web_schema_org': 0.75, # Structured data, sometimes wrong 'web_h1_tag': 0.6, # May be exhibition title 'web_title_tag': 0.5, # Often has taglines, events } # Expanded blocklist for invalid names INVALID_NAME_PATTERNS = [ # Navigation/UI elements (Dutch + English) r'^(home|welkom|welcome|menu|navigation?|nav|header|footer|sidebar)$', r'^(contact|over ons|about|info|informatie)$', r'^(nieuws|news|agenda|calendar|events?|activiteiten)$', r'^(zoeken?|search|filter|sort)$', r'^zoeken in de', # "Zoeken in de archieven" etc. r'^doorzoek ', # "Doorzoek de collectie" etc. r'^bekijk ', # "Bekijk onze..." etc. r'^ontdek ', # "Ontdek de..." etc. # Archive/library search UI elements r'^(zoeken|search|browse|bladeren)\s+(in|door|de|het|onze)', r'in de archieven$', r'in de collectie$', # Cookie/privacy popups r'cookie', r'privacy', r'gdpr', r'consent', r'waarom gebruiken wij', # Generic page titles r'^(default|untitled|index|main|pagina|page)\s*\d*$', r'^(foto|image|picture|afbeelding)\s*\d+$', r'^(oproep|call|melding|bericht)$', # Generic action words # Exhibition/event titles (Dutch) r'tentoonstelling', r'expositie', r'exhibition', r'^[A-Z][a-z]+:\s', # Pattern like "K-pop: A Snapshot" r'verlengd', # Exhibition extension notice # Tagline/slogan patterns r'^het verhaal van\s', # "Het verhaal van Heerenveen" is tagline r'^de geschiedenis van\s', r'^welkom bij\s', r'^over het museum$', r'^over de', r'^over ons$', # Webdesign companies (wrong site scraped) r'webdesign', r'web design', r'website by', r'endless webdesign', # Social media r'^(facebook|instagram|twitter|linkedin|youtube|tiktok)', # Library/archive systems (wrong extraction) r'libraries\.org', r'NLmapNew\.com', r'fotobeeldbank', # Photo database UI, not org name # Multi-institution pages (extracted wrong institution) r'amelander musea', # Umbrella term, not specific museum r'musea noardeast', # Regional umbrella # Homepage indicators r'^homepage\s', ] def normalize_name(name: str) -> str: """Normalize name for comparison.""" if not name: return "" # Lowercase, remove extra whitespace normalized = ' '.join(name.lower().split()) # Remove common prefixes that don't affect matching for prefix in ['stichting ', 'vereniging ', 'museum ', 'archief ', 'bibliotheek ']: if normalized.startswith(prefix): # Keep track but don't remove - needed for matching pass return normalized def fuzzy_match_score(name1: str, name2: str) -> float: """ Calculate fuzzy match score between two names. Returns score 0-1 where 1 is exact match. """ if not name1 or not name2: return 0.0 n1 = normalize_name(name1) n2 = normalize_name(name2) if n1 == n2: return 1.0 if RAPIDFUZZ_AVAILABLE: # Use token_set_ratio for better handling of word order differences # and partial matches (e.g., "Museum X" vs "Stichting Museum X") token_score = fuzz.token_set_ratio(n1, n2) / 100.0 partial_score = fuzz.partial_ratio(n1, n2) / 100.0 # Weight token_set higher as it handles "Stichting X" vs "X" well return max(token_score * 0.8 + partial_score * 0.2, token_score) else: # Basic substring matching as fallback if n1 in n2 or n2 in n1: shorter = min(len(n1), len(n2)) longer = max(len(n1), len(n2)) return shorter / longer return 0.0 def is_obviously_invalid(name: str) -> Tuple[bool, str]: """ Check if a name is obviously invalid using pattern matching. Returns (is_invalid, reason) """ if not name: return True, "empty" name_lower = name.lower().strip() # Check against blocklist patterns for pattern in INVALID_NAME_PATTERNS: if re.search(pattern, name_lower, re.IGNORECASE): return True, f"matches blocklist: {pattern}" # Check for very short names if len(name.strip()) < 3: return True, "too short" # Check for names that are mostly numbers digits = sum(1 for c in name if c.isdigit()) if digits > len(name) * 0.5: return True, "mostly numbers" return False, "" def get_authoritative_names(entry_data: Dict) -> Dict[str, str]: """ Extract names from authoritative sources in entry. Returns dict of {source: name} """ names = {} # Wikidata (highest authority) wikidata = entry_data.get('wikidata_enrichment', {}) if wikidata.get('wikidata_label_nl'): names['wikidata'] = wikidata['wikidata_label_nl'] elif wikidata.get('wikidata_label_en'): names['wikidata'] = wikidata['wikidata_label_en'] # Google Maps google = entry_data.get('google_maps_enrichment', {}) if google.get('name'): names['google_maps'] = google['name'] # ISIL registry isil = entry_data.get('isil_enrichment', {}) if isil.get('name'): names['isil'] = isil['name'] # Original CSV entry original = entry_data.get('original_entry', {}) if original.get('organisatie'): names['original_entry'] = original['organisatie'] return names def validate_name_against_sources( candidate_name: str, authoritative_names: Dict[str, str], extraction_method: str = '' ) -> Dict[str, Any]: """ Validate a candidate name against authoritative sources. Returns validation result with confidence score and recommendations. """ result = { 'is_valid': True, 'confidence_score': 0.5, # Default medium confidence 'match_scores': {}, 'warnings': [], 'best_alternative': None, 'recommendation': None, } if not authoritative_names: result['warnings'].append("No authoritative sources to validate against") return result # Check for obvious invalidity first is_invalid, reason = is_obviously_invalid(candidate_name) if is_invalid: result['is_valid'] = False result['confidence_score'] = 0.0 result['warnings'].append(f"Obviously invalid: {reason}") # Find best alternative for source, name in authoritative_names.items(): if not is_obviously_invalid(name)[0]: result['best_alternative'] = {'source': source, 'name': name} result['recommendation'] = f"Use {source}: '{name}'" break return result # Calculate match scores against each source match_scores = {} for source, auth_name in authoritative_names.items(): score = fuzzy_match_score(candidate_name, auth_name) match_scores[source] = { 'score': score, 'authoritative_name': auth_name, 'trust_level': SOURCE_TRUST_LEVELS.get(source, 0.5), } result['match_scores'] = match_scores # Calculate weighted confidence if match_scores: weighted_scores = [] for source, data in match_scores.items(): weighted = data['score'] * data['trust_level'] weighted_scores.append(weighted) # Use best match, not average (one good match is enough) best_match_score = max(data['score'] for data in match_scores.values()) weighted_confidence = max(weighted_scores) result['confidence_score'] = weighted_confidence # Check for mismatches if best_match_score < 0.5: result['is_valid'] = False result['warnings'].append(f"Low match with all sources (best: {best_match_score:.2f})") # Find best alternative best_source = max(match_scores.keys(), key=lambda s: match_scores[s]['trust_level']) best_name = match_scores[best_source]['authoritative_name'] result['best_alternative'] = {'source': best_source, 'name': best_name} result['recommendation'] = f"Use {best_source}: '{best_name}'" elif best_match_score < 0.7: result['warnings'].append(f"Moderate match (best: {best_match_score:.2f}) - review recommended") # Penalize extraction methods that are less reliable if extraction_method in ('h1_tag', 'title_tag') and result['confidence_score'] < 0.8: # h1 and title are more likely to have exhibition names result['confidence_score'] *= 0.9 if not result['warnings']: result['warnings'].append(f"Extracted from {extraction_method} - may be page/event title") return result def select_best_name(entry_data: Dict) -> Tuple[Optional[str], str, Dict]: """ Select the best name for an entry with validation. Returns (name, source, validation_result) """ authoritative_names = get_authoritative_names(entry_data) # Get web claims if available web_claims = entry_data.get('web_claims', {}).get('claims', []) org_name_claims = [c for c in web_claims if c.get('claim_type') == 'org_name'] best_candidate = None best_source = None best_validation = None # Try each web claim and validate for claim in sorted(org_name_claims, key=lambda c: {'og_site_name': 4, 'schema_org_name': 3, 'h1_tag': 2, 'title_tag': 1}.get(c.get('extraction_method', ''), 0), reverse=True): name = claim.get('claim_value', '') method = claim.get('extraction_method', '') validation = validate_name_against_sources(name, authoritative_names, method) if validation['is_valid'] and validation['confidence_score'] >= 0.6: # Good candidate found return name, f"web:{method}", validation # Track best invalid candidate for reporting if best_candidate is None or (validation['confidence_score'] > (best_validation['confidence_score'] if best_validation else 0)): best_candidate = name best_source = f"web:{method}" best_validation = validation # If no valid web claim, fall back to authoritative sources # Priority: wikidata > google_maps > isil > original_entry for source in ['wikidata', 'google_maps', 'isil', 'original_entry']: if source in authoritative_names: name = authoritative_names[source] if not is_obviously_invalid(name)[0]: validation = { 'is_valid': True, 'confidence_score': SOURCE_TRUST_LEVELS.get(source, 0.5), 'match_scores': {}, 'warnings': [f"Fallback to {source} (no valid web claims)"], 'best_alternative': None, 'recommendation': None, } return name, source, validation # Return best candidate even if invalid (for reporting) if best_candidate: return best_candidate, best_source, best_validation return None, "none", {'is_valid': False, 'confidence_score': 0, 'warnings': ['No name candidates found'], 'match_scores': {}, 'best_alternative': None, 'recommendation': None} def process_entry(filepath: Path, fix: bool = False) -> Dict[str, Any]: """ Process a single entry file, validating or fixing custodian_name. Returns processing result with status and recommendations. """ with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return {'status': 'error', 'message': 'Empty file'} result = { 'entry_file': filepath.name, 'entry_index': data.get('entry_index', ''), 'status': 'ok', 'current_name': None, 'current_source': None, 'validation': None, 'fixed': False, 'new_name': None, 'new_source': None, } # Get current custodian_name current = data.get('custodian_name', {}) result['current_name'] = current.get('claim_value') result['current_source'] = current.get('extraction_method') or current.get('source', 'unknown') # Get authoritative names for validation authoritative_names = get_authoritative_names(data) # Validate current name if exists if result['current_name']: validation = validate_name_against_sources( result['current_name'], authoritative_names, result['current_source'] ) result['validation'] = validation if not validation['is_valid']: result['status'] = 'invalid' if fix and validation['best_alternative']: # Apply fix alt = validation['best_alternative'] new_custodian_name = { 'claim_type': 'custodian_name', 'claim_value': alt['name'], 'source': alt['source'], 'provenance_note': f"Auto-corrected from '{result['current_name']}' ({result['current_source']}) - validation failed", 'previous_value': result['current_name'], 'previous_source': result['current_source'], 'correction_reason': '; '.join(validation['warnings']), 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), } data['custodian_name'] = new_custodian_name with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) result['fixed'] = True result['new_name'] = alt['name'] result['new_source'] = alt['source'] result['status'] = 'fixed' elif validation['warnings']: result['status'] = 'review' else: # No custodian_name - try to derive one best_name, best_source, validation = select_best_name(data) result['validation'] = validation if best_name and validation['is_valid']: result['status'] = 'missing_valid' result['new_name'] = best_name result['new_source'] = best_source if fix: new_custodian_name = { 'claim_type': 'custodian_name', 'claim_value': best_name, 'source': best_source, 'provenance_note': 'Auto-derived with validation', 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), } data['custodian_name'] = new_custodian_name with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) result['fixed'] = True result['status'] = 'derived' else: result['status'] = 'missing_invalid' return result def generate_report(results: List[Dict], output_path: Path): """Generate CSV report of validation results.""" output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ 'entry_file', 'entry_index', 'status', 'current_name', 'current_source', 'confidence', 'new_name', 'new_source', 'warnings', 'recommendation' ]) for r in results: validation = r.get('validation', {}) or {} writer.writerow([ r.get('entry_file', ''), r.get('entry_index', ''), r.get('status', ''), r.get('current_name', ''), r.get('current_source', ''), f"{validation.get('confidence_score', 0):.2f}", r.get('new_name', ''), r.get('new_source', ''), '; '.join(validation.get('warnings', [])), validation.get('recommendation', ''), ]) print(f"\nReport saved to: {output_path}") def main(): parser = argparse.ArgumentParser(description='Validate custodian_name against authoritative sources') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries') parser.add_argument('--entry', type=str, default=None, help='Process specific entry number') parser.add_argument('--fix', action='store_true', help='Auto-fix invalid names') parser.add_argument('--report', type=str, default=None, help='Output report CSV path') parser.add_argument('--show-all', action='store_true', help='Show all entries, not just problems') args = parser.parse_args() # Find entry files if args.entry: files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml')) else: files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')]) if args.limit: files = files[:args.limit] print(f"Validating {len(files)} entries...") print(f"Mode: {'FIX' if args.fix else 'VALIDATE ONLY'}") print() results = [] stats = {'ok': 0, 'invalid': 0, 'fixed': 0, 'review': 0, 'missing_valid': 0, 'missing_invalid': 0, 'derived': 0, 'error': 0} for filepath in files: if filepath.is_dir(): continue result = process_entry(filepath, fix=args.fix) results.append(result) stats[result['status']] = stats.get(result['status'], 0) + 1 # Print result if args.show_all or result['status'] not in ('ok',): status_icon = { 'ok': ' ', 'review': ' ?', 'invalid': ' X', 'fixed': ' !', 'missing_valid': ' +', 'missing_invalid': ' -', 'derived': ' +', 'error': '!!', }.get(result['status'], '??') line = f"{status_icon} {result['entry_file']}: " if result['status'] == 'fixed': line += f"'{result['current_name']}' -> '{result['new_name']}' ({result['new_source']})" elif result['status'] == 'invalid': validation = result.get('validation', {}) line += f"'{result['current_name']}' INVALID" if validation.get('recommendation'): line += f" -> {validation['recommendation']}" elif result['status'] == 'review': validation = result.get('validation', {}) line += f"'{result['current_name']}' [{validation.get('confidence_score', 0):.2f}]" if validation.get('warnings'): line += f" - {validation['warnings'][0]}" elif result['status'] in ('missing_valid', 'derived'): line += f"NEW: '{result['new_name']}' ({result['new_source']})" elif result['status'] == 'missing_invalid': line += "No valid name found" else: line += f"'{result['current_name']}'" print(line) # Summary print() print("=" * 60) print("Summary:") print(f" Valid (OK): {stats['ok']}") print(f" Needs review: {stats['review']}") print(f" Invalid: {stats['invalid']}") print(f" Fixed: {stats['fixed']}") print(f" Missing (derivable): {stats['missing_valid']}") print(f" Missing (no source): {stats['missing_invalid']}") print(f" Auto-derived: {stats['derived']}") print(f" Errors: {stats['error']}") print("=" * 60) # Generate report if requested if args.report: generate_report(results, Path(args.report)) elif stats['invalid'] + stats['review'] > 0: # Auto-generate report for problems report_path = REPORTS_DIR / f"custodian_name_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" problem_results = [r for r in results if r['status'] in ('invalid', 'review', 'missing_invalid')] if problem_results: generate_report(problem_results, report_path) return 0 if __name__ == '__main__': sys.exit(main())