glam/scripts/validate_custodian_name.py

595 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Validate custodian_name against authoritative enrichment sources.
This script validates extracted web claims against:
1. wikidata_label_nl (authoritative)
2. google_maps_enrichment.name (high confidence)
3. original_entry.organisatie (source CSV)
Uses fuzzy string matching to detect mismatches and flags entries for review.
Usage:
python scripts/validate_custodian_name.py [--limit N] [--entry ENTRY_NUM] [--fix]
"""
import argparse
import csv
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import yaml
try:
from rapidfuzz import fuzz
RAPIDFUZZ_AVAILABLE = True
except ImportError:
RAPIDFUZZ_AVAILABLE = False
print("Warning: rapidfuzz not installed. Using basic string matching.")
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
REPORTS_DIR = Path('/Users/kempersc/apps/glam/reports')
# Trust levels for different sources (0-1)
SOURCE_TRUST_LEVELS = {
'wikidata': 1.0, # Community-verified, highest trust
'google_maps': 0.9, # Google-verified business data
'isil': 0.85, # Official ISIL registry
'original_entry': 0.7, # CSV source, may be outdated
'web_og_site_name': 0.8, # Website self-declaration
'web_schema_org': 0.75, # Structured data, sometimes wrong
'web_h1_tag': 0.6, # May be exhibition title
'web_title_tag': 0.5, # Often has taglines, events
}
# Expanded blocklist for invalid names
INVALID_NAME_PATTERNS = [
# Navigation/UI elements (Dutch + English)
r'^(home|welkom|welcome|menu|navigation?|nav|header|footer|sidebar)$',
r'^(contact|over ons|about|info|informatie)$',
r'^(nieuws|news|agenda|calendar|events?|activiteiten)$',
r'^(zoeken?|search|filter|sort)$',
r'^zoeken in de', # "Zoeken in de archieven" etc.
r'^doorzoek ', # "Doorzoek de collectie" etc.
r'^bekijk ', # "Bekijk onze..." etc.
r'^ontdek ', # "Ontdek de..." etc.
# Archive/library search UI elements
r'^(zoeken|search|browse|bladeren)\s+(in|door|de|het|onze)',
r'in de archieven$',
r'in de collectie$',
# Cookie/privacy popups
r'cookie', r'privacy', r'gdpr', r'consent',
r'waarom gebruiken wij',
# Generic page titles
r'^(default|untitled|index|main|pagina|page)\s*\d*$',
r'^(foto|image|picture|afbeelding)\s*\d+$',
r'^(oproep|call|melding|bericht)$', # Generic action words
# Exhibition/event titles (Dutch)
r'tentoonstelling', r'expositie', r'exhibition',
r'^[A-Z][a-z]+:\s', # Pattern like "K-pop: A Snapshot"
r'verlengd', # Exhibition extension notice
# Tagline/slogan patterns
r'^het verhaal van\s', # "Het verhaal van Heerenveen" is tagline
r'^de geschiedenis van\s',
r'^welkom bij\s',
r'^over het museum$',
r'^over de',
r'^over ons$',
# Webdesign companies (wrong site scraped)
r'webdesign', r'web design', r'website by',
r'endless webdesign',
# Social media
r'^(facebook|instagram|twitter|linkedin|youtube|tiktok)',
# Library/archive systems (wrong extraction)
r'libraries\.org',
r'NLmapNew\.com',
r'fotobeeldbank', # Photo database UI, not org name
# Multi-institution pages (extracted wrong institution)
r'amelander musea', # Umbrella term, not specific museum
r'musea noardeast', # Regional umbrella
# Homepage indicators
r'^homepage\s',
]
def normalize_name(name: str) -> str:
"""Normalize name for comparison."""
if not name:
return ""
# Lowercase, remove extra whitespace
normalized = ' '.join(name.lower().split())
# Remove common prefixes that don't affect matching
for prefix in ['stichting ', 'vereniging ', 'museum ', 'archief ', 'bibliotheek ']:
if normalized.startswith(prefix):
# Keep track but don't remove - needed for matching
pass
return normalized
def fuzzy_match_score(name1: str, name2: str) -> float:
"""
Calculate fuzzy match score between two names.
Returns score 0-1 where 1 is exact match.
"""
if not name1 or not name2:
return 0.0
n1 = normalize_name(name1)
n2 = normalize_name(name2)
if n1 == n2:
return 1.0
if RAPIDFUZZ_AVAILABLE:
# Use token_set_ratio for better handling of word order differences
# and partial matches (e.g., "Museum X" vs "Stichting Museum X")
token_score = fuzz.token_set_ratio(n1, n2) / 100.0
partial_score = fuzz.partial_ratio(n1, n2) / 100.0
# Weight token_set higher as it handles "Stichting X" vs "X" well
return max(token_score * 0.8 + partial_score * 0.2, token_score)
else:
# Basic substring matching as fallback
if n1 in n2 or n2 in n1:
shorter = min(len(n1), len(n2))
longer = max(len(n1), len(n2))
return shorter / longer
return 0.0
def is_obviously_invalid(name: str) -> Tuple[bool, str]:
"""
Check if a name is obviously invalid using pattern matching.
Returns (is_invalid, reason)
"""
if not name:
return True, "empty"
name_lower = name.lower().strip()
# Check against blocklist patterns
for pattern in INVALID_NAME_PATTERNS:
if re.search(pattern, name_lower, re.IGNORECASE):
return True, f"matches blocklist: {pattern}"
# Check for very short names
if len(name.strip()) < 3:
return True, "too short"
# Check for names that are mostly numbers
digits = sum(1 for c in name if c.isdigit())
if digits > len(name) * 0.5:
return True, "mostly numbers"
return False, ""
def get_authoritative_names(entry_data: Dict) -> Dict[str, str]:
"""
Extract names from authoritative sources in entry.
Returns dict of {source: name}
"""
names = {}
# Wikidata (highest authority)
wikidata = entry_data.get('wikidata_enrichment', {})
if wikidata.get('wikidata_label_nl'):
names['wikidata'] = wikidata['wikidata_label_nl']
elif wikidata.get('wikidata_label_en'):
names['wikidata'] = wikidata['wikidata_label_en']
# Google Maps
google = entry_data.get('google_maps_enrichment', {})
if google.get('name'):
names['google_maps'] = google['name']
# ISIL registry
isil = entry_data.get('isil_enrichment', {})
if isil.get('name'):
names['isil'] = isil['name']
# Original CSV entry
original = entry_data.get('original_entry', {})
if original.get('organisatie'):
names['original_entry'] = original['organisatie']
return names
def validate_name_against_sources(
candidate_name: str,
authoritative_names: Dict[str, str],
extraction_method: str = ''
) -> Dict[str, Any]:
"""
Validate a candidate name against authoritative sources.
Returns validation result with confidence score and recommendations.
"""
result = {
'is_valid': True,
'confidence_score': 0.5, # Default medium confidence
'match_scores': {},
'warnings': [],
'best_alternative': None,
'recommendation': None,
}
if not authoritative_names:
result['warnings'].append("No authoritative sources to validate against")
return result
# Check for obvious invalidity first
is_invalid, reason = is_obviously_invalid(candidate_name)
if is_invalid:
result['is_valid'] = False
result['confidence_score'] = 0.0
result['warnings'].append(f"Obviously invalid: {reason}")
# Find best alternative
for source, name in authoritative_names.items():
if not is_obviously_invalid(name)[0]:
result['best_alternative'] = {'source': source, 'name': name}
result['recommendation'] = f"Use {source}: '{name}'"
break
return result
# Calculate match scores against each source
match_scores = {}
for source, auth_name in authoritative_names.items():
score = fuzzy_match_score(candidate_name, auth_name)
match_scores[source] = {
'score': score,
'authoritative_name': auth_name,
'trust_level': SOURCE_TRUST_LEVELS.get(source, 0.5),
}
result['match_scores'] = match_scores
# Calculate weighted confidence
if match_scores:
weighted_scores = []
for source, data in match_scores.items():
weighted = data['score'] * data['trust_level']
weighted_scores.append(weighted)
# Use best match, not average (one good match is enough)
best_match_score = max(data['score'] for data in match_scores.values())
weighted_confidence = max(weighted_scores)
result['confidence_score'] = weighted_confidence
# Check for mismatches
if best_match_score < 0.5:
result['is_valid'] = False
result['warnings'].append(f"Low match with all sources (best: {best_match_score:.2f})")
# Find best alternative
best_source = max(match_scores.keys(),
key=lambda s: match_scores[s]['trust_level'])
best_name = match_scores[best_source]['authoritative_name']
result['best_alternative'] = {'source': best_source, 'name': best_name}
result['recommendation'] = f"Use {best_source}: '{best_name}'"
elif best_match_score < 0.7:
result['warnings'].append(f"Moderate match (best: {best_match_score:.2f}) - review recommended")
# Penalize extraction methods that are less reliable
if extraction_method in ('h1_tag', 'title_tag') and result['confidence_score'] < 0.8:
# h1 and title are more likely to have exhibition names
result['confidence_score'] *= 0.9
if not result['warnings']:
result['warnings'].append(f"Extracted from {extraction_method} - may be page/event title")
return result
def select_best_name(entry_data: Dict) -> Tuple[Optional[str], str, Dict]:
"""
Select the best name for an entry with validation.
Returns (name, source, validation_result)
"""
authoritative_names = get_authoritative_names(entry_data)
# Get web claims if available
web_claims = entry_data.get('web_claims', {}).get('claims', [])
org_name_claims = [c for c in web_claims if c.get('claim_type') == 'org_name']
best_candidate = None
best_source = None
best_validation = None
# Try each web claim and validate
for claim in sorted(org_name_claims,
key=lambda c: {'og_site_name': 4, 'schema_org_name': 3,
'h1_tag': 2, 'title_tag': 1}.get(c.get('extraction_method', ''), 0),
reverse=True):
name = claim.get('claim_value', '')
method = claim.get('extraction_method', '')
validation = validate_name_against_sources(name, authoritative_names, method)
if validation['is_valid'] and validation['confidence_score'] >= 0.6:
# Good candidate found
return name, f"web:{method}", validation
# Track best invalid candidate for reporting
if best_candidate is None or (validation['confidence_score'] >
(best_validation['confidence_score'] if best_validation else 0)):
best_candidate = name
best_source = f"web:{method}"
best_validation = validation
# If no valid web claim, fall back to authoritative sources
# Priority: wikidata > google_maps > isil > original_entry
for source in ['wikidata', 'google_maps', 'isil', 'original_entry']:
if source in authoritative_names:
name = authoritative_names[source]
if not is_obviously_invalid(name)[0]:
validation = {
'is_valid': True,
'confidence_score': SOURCE_TRUST_LEVELS.get(source, 0.5),
'match_scores': {},
'warnings': [f"Fallback to {source} (no valid web claims)"],
'best_alternative': None,
'recommendation': None,
}
return name, source, validation
# Return best candidate even if invalid (for reporting)
if best_candidate:
return best_candidate, best_source, best_validation
return None, "none", {'is_valid': False, 'confidence_score': 0,
'warnings': ['No name candidates found'],
'match_scores': {}, 'best_alternative': None, 'recommendation': None}
def process_entry(filepath: Path, fix: bool = False) -> Dict[str, Any]:
"""
Process a single entry file, validating or fixing custodian_name.
Returns processing result with status and recommendations.
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return {'status': 'error', 'message': 'Empty file'}
result = {
'entry_file': filepath.name,
'entry_index': data.get('entry_index', ''),
'status': 'ok',
'current_name': None,
'current_source': None,
'validation': None,
'fixed': False,
'new_name': None,
'new_source': None,
}
# Get current custodian_name
current = data.get('custodian_name', {})
result['current_name'] = current.get('claim_value')
result['current_source'] = current.get('extraction_method') or current.get('source', 'unknown')
# Get authoritative names for validation
authoritative_names = get_authoritative_names(data)
# Validate current name if exists
if result['current_name']:
validation = validate_name_against_sources(
result['current_name'],
authoritative_names,
result['current_source']
)
result['validation'] = validation
if not validation['is_valid']:
result['status'] = 'invalid'
if fix and validation['best_alternative']:
# Apply fix
alt = validation['best_alternative']
new_custodian_name = {
'claim_type': 'custodian_name',
'claim_value': alt['name'],
'source': alt['source'],
'provenance_note': f"Auto-corrected from '{result['current_name']}' ({result['current_source']}) - validation failed",
'previous_value': result['current_name'],
'previous_source': result['current_source'],
'correction_reason': '; '.join(validation['warnings']),
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
data['custodian_name'] = new_custodian_name
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
result['fixed'] = True
result['new_name'] = alt['name']
result['new_source'] = alt['source']
result['status'] = 'fixed'
elif validation['warnings']:
result['status'] = 'review'
else:
# No custodian_name - try to derive one
best_name, best_source, validation = select_best_name(data)
result['validation'] = validation
if best_name and validation['is_valid']:
result['status'] = 'missing_valid'
result['new_name'] = best_name
result['new_source'] = best_source
if fix:
new_custodian_name = {
'claim_type': 'custodian_name',
'claim_value': best_name,
'source': best_source,
'provenance_note': 'Auto-derived with validation',
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
data['custodian_name'] = new_custodian_name
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
result['fixed'] = True
result['status'] = 'derived'
else:
result['status'] = 'missing_invalid'
return result
def generate_report(results: List[Dict], output_path: Path):
"""Generate CSV report of validation results."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'entry_file', 'entry_index', 'status',
'current_name', 'current_source', 'confidence',
'new_name', 'new_source', 'warnings', 'recommendation'
])
for r in results:
validation = r.get('validation', {}) or {}
writer.writerow([
r.get('entry_file', ''),
r.get('entry_index', ''),
r.get('status', ''),
r.get('current_name', ''),
r.get('current_source', ''),
f"{validation.get('confidence_score', 0):.2f}",
r.get('new_name', ''),
r.get('new_source', ''),
'; '.join(validation.get('warnings', [])),
validation.get('recommendation', ''),
])
print(f"\nReport saved to: {output_path}")
def main():
parser = argparse.ArgumentParser(description='Validate custodian_name against authoritative sources')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--fix', action='store_true', help='Auto-fix invalid names')
parser.add_argument('--report', type=str, default=None, help='Output report CSV path')
parser.add_argument('--show-all', action='store_true', help='Show all entries, not just problems')
args = parser.parse_args()
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
print(f"Validating {len(files)} entries...")
print(f"Mode: {'FIX' if args.fix else 'VALIDATE ONLY'}")
print()
results = []
stats = {'ok': 0, 'invalid': 0, 'fixed': 0, 'review': 0, 'missing_valid': 0, 'missing_invalid': 0, 'derived': 0, 'error': 0}
for filepath in files:
if filepath.is_dir():
continue
result = process_entry(filepath, fix=args.fix)
results.append(result)
stats[result['status']] = stats.get(result['status'], 0) + 1
# Print result
if args.show_all or result['status'] not in ('ok',):
status_icon = {
'ok': ' ',
'review': ' ?',
'invalid': ' X',
'fixed': ' !',
'missing_valid': ' +',
'missing_invalid': ' -',
'derived': ' +',
'error': '!!',
}.get(result['status'], '??')
line = f"{status_icon} {result['entry_file']}: "
if result['status'] == 'fixed':
line += f"'{result['current_name']}' -> '{result['new_name']}' ({result['new_source']})"
elif result['status'] == 'invalid':
validation = result.get('validation', {})
line += f"'{result['current_name']}' INVALID"
if validation.get('recommendation'):
line += f" -> {validation['recommendation']}"
elif result['status'] == 'review':
validation = result.get('validation', {})
line += f"'{result['current_name']}' [{validation.get('confidence_score', 0):.2f}]"
if validation.get('warnings'):
line += f" - {validation['warnings'][0]}"
elif result['status'] in ('missing_valid', 'derived'):
line += f"NEW: '{result['new_name']}' ({result['new_source']})"
elif result['status'] == 'missing_invalid':
line += "No valid name found"
else:
line += f"'{result['current_name']}'"
print(line)
# Summary
print()
print("=" * 60)
print("Summary:")
print(f" Valid (OK): {stats['ok']}")
print(f" Needs review: {stats['review']}")
print(f" Invalid: {stats['invalid']}")
print(f" Fixed: {stats['fixed']}")
print(f" Missing (derivable): {stats['missing_valid']}")
print(f" Missing (no source): {stats['missing_invalid']}")
print(f" Auto-derived: {stats['derived']}")
print(f" Errors: {stats['error']}")
print("=" * 60)
# Generate report if requested
if args.report:
generate_report(results, Path(args.report))
elif stats['invalid'] + stats['review'] > 0:
# Auto-generate report for problems
report_path = REPORTS_DIR / f"custodian_name_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
problem_results = [r for r in results if r['status'] in ('invalid', 'review', 'missing_invalid')]
if problem_results:
generate_report(problem_results, report_path)
return 0
if __name__ == '__main__':
sys.exit(main())