595 lines
22 KiB
Python
595 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validate custodian_name against authoritative enrichment sources.
|
|
|
|
This script validates extracted web claims against:
|
|
1. wikidata_label_nl (authoritative)
|
|
2. google_maps_enrichment.name (high confidence)
|
|
3. original_entry.organisatie (source CSV)
|
|
|
|
Uses fuzzy string matching to detect mismatches and flags entries for review.
|
|
|
|
Usage:
|
|
python scripts/validate_custodian_name.py [--limit N] [--entry ENTRY_NUM] [--fix]
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Tuple
|
|
|
|
import yaml
|
|
|
|
try:
|
|
from rapidfuzz import fuzz
|
|
RAPIDFUZZ_AVAILABLE = True
|
|
except ImportError:
|
|
RAPIDFUZZ_AVAILABLE = False
|
|
print("Warning: rapidfuzz not installed. Using basic string matching.")
|
|
|
|
|
|
# Directories
|
|
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
|
|
REPORTS_DIR = Path('/Users/kempersc/apps/glam/reports')
|
|
|
|
|
|
# Trust levels for different sources (0-1)
|
|
SOURCE_TRUST_LEVELS = {
|
|
'wikidata': 1.0, # Community-verified, highest trust
|
|
'google_maps': 0.9, # Google-verified business data
|
|
'isil': 0.85, # Official ISIL registry
|
|
'original_entry': 0.7, # CSV source, may be outdated
|
|
'web_og_site_name': 0.8, # Website self-declaration
|
|
'web_schema_org': 0.75, # Structured data, sometimes wrong
|
|
'web_h1_tag': 0.6, # May be exhibition title
|
|
'web_title_tag': 0.5, # Often has taglines, events
|
|
}
|
|
|
|
|
|
# Expanded blocklist for invalid names
|
|
INVALID_NAME_PATTERNS = [
|
|
# Navigation/UI elements (Dutch + English)
|
|
r'^(home|welkom|welcome|menu|navigation?|nav|header|footer|sidebar)$',
|
|
r'^(contact|over ons|about|info|informatie)$',
|
|
r'^(nieuws|news|agenda|calendar|events?|activiteiten)$',
|
|
r'^(zoeken?|search|filter|sort)$',
|
|
r'^zoeken in de', # "Zoeken in de archieven" etc.
|
|
r'^doorzoek ', # "Doorzoek de collectie" etc.
|
|
r'^bekijk ', # "Bekijk onze..." etc.
|
|
r'^ontdek ', # "Ontdek de..." etc.
|
|
|
|
# Archive/library search UI elements
|
|
r'^(zoeken|search|browse|bladeren)\s+(in|door|de|het|onze)',
|
|
r'in de archieven$',
|
|
r'in de collectie$',
|
|
|
|
# Cookie/privacy popups
|
|
r'cookie', r'privacy', r'gdpr', r'consent',
|
|
r'waarom gebruiken wij',
|
|
|
|
# Generic page titles
|
|
r'^(default|untitled|index|main|pagina|page)\s*\d*$',
|
|
r'^(foto|image|picture|afbeelding)\s*\d+$',
|
|
r'^(oproep|call|melding|bericht)$', # Generic action words
|
|
|
|
# Exhibition/event titles (Dutch)
|
|
r'tentoonstelling', r'expositie', r'exhibition',
|
|
r'^[A-Z][a-z]+:\s', # Pattern like "K-pop: A Snapshot"
|
|
r'verlengd', # Exhibition extension notice
|
|
|
|
# Tagline/slogan patterns
|
|
r'^het verhaal van\s', # "Het verhaal van Heerenveen" is tagline
|
|
r'^de geschiedenis van\s',
|
|
r'^welkom bij\s',
|
|
r'^over het museum$',
|
|
r'^over de',
|
|
r'^over ons$',
|
|
|
|
# Webdesign companies (wrong site scraped)
|
|
r'webdesign', r'web design', r'website by',
|
|
r'endless webdesign',
|
|
|
|
# Social media
|
|
r'^(facebook|instagram|twitter|linkedin|youtube|tiktok)',
|
|
|
|
# Library/archive systems (wrong extraction)
|
|
r'libraries\.org',
|
|
r'NLmapNew\.com',
|
|
r'fotobeeldbank', # Photo database UI, not org name
|
|
|
|
# Multi-institution pages (extracted wrong institution)
|
|
r'amelander musea', # Umbrella term, not specific museum
|
|
r'musea noardeast', # Regional umbrella
|
|
|
|
# Homepage indicators
|
|
r'^homepage\s',
|
|
]
|
|
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize name for comparison."""
|
|
if not name:
|
|
return ""
|
|
# Lowercase, remove extra whitespace
|
|
normalized = ' '.join(name.lower().split())
|
|
# Remove common prefixes that don't affect matching
|
|
for prefix in ['stichting ', 'vereniging ', 'museum ', 'archief ', 'bibliotheek ']:
|
|
if normalized.startswith(prefix):
|
|
# Keep track but don't remove - needed for matching
|
|
pass
|
|
return normalized
|
|
|
|
|
|
def fuzzy_match_score(name1: str, name2: str) -> float:
|
|
"""
|
|
Calculate fuzzy match score between two names.
|
|
|
|
Returns score 0-1 where 1 is exact match.
|
|
"""
|
|
if not name1 or not name2:
|
|
return 0.0
|
|
|
|
n1 = normalize_name(name1)
|
|
n2 = normalize_name(name2)
|
|
|
|
if n1 == n2:
|
|
return 1.0
|
|
|
|
if RAPIDFUZZ_AVAILABLE:
|
|
# Use token_set_ratio for better handling of word order differences
|
|
# and partial matches (e.g., "Museum X" vs "Stichting Museum X")
|
|
token_score = fuzz.token_set_ratio(n1, n2) / 100.0
|
|
partial_score = fuzz.partial_ratio(n1, n2) / 100.0
|
|
# Weight token_set higher as it handles "Stichting X" vs "X" well
|
|
return max(token_score * 0.8 + partial_score * 0.2, token_score)
|
|
else:
|
|
# Basic substring matching as fallback
|
|
if n1 in n2 or n2 in n1:
|
|
shorter = min(len(n1), len(n2))
|
|
longer = max(len(n1), len(n2))
|
|
return shorter / longer
|
|
return 0.0
|
|
|
|
|
|
def is_obviously_invalid(name: str) -> Tuple[bool, str]:
|
|
"""
|
|
Check if a name is obviously invalid using pattern matching.
|
|
|
|
Returns (is_invalid, reason)
|
|
"""
|
|
if not name:
|
|
return True, "empty"
|
|
|
|
name_lower = name.lower().strip()
|
|
|
|
# Check against blocklist patterns
|
|
for pattern in INVALID_NAME_PATTERNS:
|
|
if re.search(pattern, name_lower, re.IGNORECASE):
|
|
return True, f"matches blocklist: {pattern}"
|
|
|
|
# Check for very short names
|
|
if len(name.strip()) < 3:
|
|
return True, "too short"
|
|
|
|
# Check for names that are mostly numbers
|
|
digits = sum(1 for c in name if c.isdigit())
|
|
if digits > len(name) * 0.5:
|
|
return True, "mostly numbers"
|
|
|
|
return False, ""
|
|
|
|
|
|
def get_authoritative_names(entry_data: Dict) -> Dict[str, str]:
|
|
"""
|
|
Extract names from authoritative sources in entry.
|
|
|
|
Returns dict of {source: name}
|
|
"""
|
|
names = {}
|
|
|
|
# Wikidata (highest authority)
|
|
wikidata = entry_data.get('wikidata_enrichment', {})
|
|
if wikidata.get('wikidata_label_nl'):
|
|
names['wikidata'] = wikidata['wikidata_label_nl']
|
|
elif wikidata.get('wikidata_label_en'):
|
|
names['wikidata'] = wikidata['wikidata_label_en']
|
|
|
|
# Google Maps
|
|
google = entry_data.get('google_maps_enrichment', {})
|
|
if google.get('name'):
|
|
names['google_maps'] = google['name']
|
|
|
|
# ISIL registry
|
|
isil = entry_data.get('isil_enrichment', {})
|
|
if isil.get('name'):
|
|
names['isil'] = isil['name']
|
|
|
|
# Original CSV entry
|
|
original = entry_data.get('original_entry', {})
|
|
if original.get('organisatie'):
|
|
names['original_entry'] = original['organisatie']
|
|
|
|
return names
|
|
|
|
|
|
def validate_name_against_sources(
|
|
candidate_name: str,
|
|
authoritative_names: Dict[str, str],
|
|
extraction_method: str = ''
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Validate a candidate name against authoritative sources.
|
|
|
|
Returns validation result with confidence score and recommendations.
|
|
"""
|
|
result = {
|
|
'is_valid': True,
|
|
'confidence_score': 0.5, # Default medium confidence
|
|
'match_scores': {},
|
|
'warnings': [],
|
|
'best_alternative': None,
|
|
'recommendation': None,
|
|
}
|
|
|
|
if not authoritative_names:
|
|
result['warnings'].append("No authoritative sources to validate against")
|
|
return result
|
|
|
|
# Check for obvious invalidity first
|
|
is_invalid, reason = is_obviously_invalid(candidate_name)
|
|
if is_invalid:
|
|
result['is_valid'] = False
|
|
result['confidence_score'] = 0.0
|
|
result['warnings'].append(f"Obviously invalid: {reason}")
|
|
# Find best alternative
|
|
for source, name in authoritative_names.items():
|
|
if not is_obviously_invalid(name)[0]:
|
|
result['best_alternative'] = {'source': source, 'name': name}
|
|
result['recommendation'] = f"Use {source}: '{name}'"
|
|
break
|
|
return result
|
|
|
|
# Calculate match scores against each source
|
|
match_scores = {}
|
|
for source, auth_name in authoritative_names.items():
|
|
score = fuzzy_match_score(candidate_name, auth_name)
|
|
match_scores[source] = {
|
|
'score': score,
|
|
'authoritative_name': auth_name,
|
|
'trust_level': SOURCE_TRUST_LEVELS.get(source, 0.5),
|
|
}
|
|
|
|
result['match_scores'] = match_scores
|
|
|
|
# Calculate weighted confidence
|
|
if match_scores:
|
|
weighted_scores = []
|
|
for source, data in match_scores.items():
|
|
weighted = data['score'] * data['trust_level']
|
|
weighted_scores.append(weighted)
|
|
|
|
# Use best match, not average (one good match is enough)
|
|
best_match_score = max(data['score'] for data in match_scores.values())
|
|
weighted_confidence = max(weighted_scores)
|
|
|
|
result['confidence_score'] = weighted_confidence
|
|
|
|
# Check for mismatches
|
|
if best_match_score < 0.5:
|
|
result['is_valid'] = False
|
|
result['warnings'].append(f"Low match with all sources (best: {best_match_score:.2f})")
|
|
|
|
# Find best alternative
|
|
best_source = max(match_scores.keys(),
|
|
key=lambda s: match_scores[s]['trust_level'])
|
|
best_name = match_scores[best_source]['authoritative_name']
|
|
result['best_alternative'] = {'source': best_source, 'name': best_name}
|
|
result['recommendation'] = f"Use {best_source}: '{best_name}'"
|
|
|
|
elif best_match_score < 0.7:
|
|
result['warnings'].append(f"Moderate match (best: {best_match_score:.2f}) - review recommended")
|
|
|
|
# Penalize extraction methods that are less reliable
|
|
if extraction_method in ('h1_tag', 'title_tag') and result['confidence_score'] < 0.8:
|
|
# h1 and title are more likely to have exhibition names
|
|
result['confidence_score'] *= 0.9
|
|
if not result['warnings']:
|
|
result['warnings'].append(f"Extracted from {extraction_method} - may be page/event title")
|
|
|
|
return result
|
|
|
|
|
|
def select_best_name(entry_data: Dict) -> Tuple[Optional[str], str, Dict]:
|
|
"""
|
|
Select the best name for an entry with validation.
|
|
|
|
Returns (name, source, validation_result)
|
|
"""
|
|
authoritative_names = get_authoritative_names(entry_data)
|
|
|
|
# Get web claims if available
|
|
web_claims = entry_data.get('web_claims', {}).get('claims', [])
|
|
org_name_claims = [c for c in web_claims if c.get('claim_type') == 'org_name']
|
|
|
|
best_candidate = None
|
|
best_source = None
|
|
best_validation = None
|
|
|
|
# Try each web claim and validate
|
|
for claim in sorted(org_name_claims,
|
|
key=lambda c: {'og_site_name': 4, 'schema_org_name': 3,
|
|
'h1_tag': 2, 'title_tag': 1}.get(c.get('extraction_method', ''), 0),
|
|
reverse=True):
|
|
name = claim.get('claim_value', '')
|
|
method = claim.get('extraction_method', '')
|
|
|
|
validation = validate_name_against_sources(name, authoritative_names, method)
|
|
|
|
if validation['is_valid'] and validation['confidence_score'] >= 0.6:
|
|
# Good candidate found
|
|
return name, f"web:{method}", validation
|
|
|
|
# Track best invalid candidate for reporting
|
|
if best_candidate is None or (validation['confidence_score'] >
|
|
(best_validation['confidence_score'] if best_validation else 0)):
|
|
best_candidate = name
|
|
best_source = f"web:{method}"
|
|
best_validation = validation
|
|
|
|
# If no valid web claim, fall back to authoritative sources
|
|
# Priority: wikidata > google_maps > isil > original_entry
|
|
for source in ['wikidata', 'google_maps', 'isil', 'original_entry']:
|
|
if source in authoritative_names:
|
|
name = authoritative_names[source]
|
|
if not is_obviously_invalid(name)[0]:
|
|
validation = {
|
|
'is_valid': True,
|
|
'confidence_score': SOURCE_TRUST_LEVELS.get(source, 0.5),
|
|
'match_scores': {},
|
|
'warnings': [f"Fallback to {source} (no valid web claims)"],
|
|
'best_alternative': None,
|
|
'recommendation': None,
|
|
}
|
|
return name, source, validation
|
|
|
|
# Return best candidate even if invalid (for reporting)
|
|
if best_candidate:
|
|
return best_candidate, best_source, best_validation
|
|
|
|
return None, "none", {'is_valid': False, 'confidence_score': 0,
|
|
'warnings': ['No name candidates found'],
|
|
'match_scores': {}, 'best_alternative': None, 'recommendation': None}
|
|
|
|
|
|
def process_entry(filepath: Path, fix: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Process a single entry file, validating or fixing custodian_name.
|
|
|
|
Returns processing result with status and recommendations.
|
|
"""
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not data:
|
|
return {'status': 'error', 'message': 'Empty file'}
|
|
|
|
result = {
|
|
'entry_file': filepath.name,
|
|
'entry_index': data.get('entry_index', ''),
|
|
'status': 'ok',
|
|
'current_name': None,
|
|
'current_source': None,
|
|
'validation': None,
|
|
'fixed': False,
|
|
'new_name': None,
|
|
'new_source': None,
|
|
}
|
|
|
|
# Get current custodian_name
|
|
current = data.get('custodian_name', {})
|
|
result['current_name'] = current.get('claim_value')
|
|
result['current_source'] = current.get('extraction_method') or current.get('source', 'unknown')
|
|
|
|
# Get authoritative names for validation
|
|
authoritative_names = get_authoritative_names(data)
|
|
|
|
# Validate current name if exists
|
|
if result['current_name']:
|
|
validation = validate_name_against_sources(
|
|
result['current_name'],
|
|
authoritative_names,
|
|
result['current_source']
|
|
)
|
|
result['validation'] = validation
|
|
|
|
if not validation['is_valid']:
|
|
result['status'] = 'invalid'
|
|
|
|
if fix and validation['best_alternative']:
|
|
# Apply fix
|
|
alt = validation['best_alternative']
|
|
new_custodian_name = {
|
|
'claim_type': 'custodian_name',
|
|
'claim_value': alt['name'],
|
|
'source': alt['source'],
|
|
'provenance_note': f"Auto-corrected from '{result['current_name']}' ({result['current_source']}) - validation failed",
|
|
'previous_value': result['current_name'],
|
|
'previous_source': result['current_source'],
|
|
'correction_reason': '; '.join(validation['warnings']),
|
|
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
data['custodian_name'] = new_custodian_name
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
result['fixed'] = True
|
|
result['new_name'] = alt['name']
|
|
result['new_source'] = alt['source']
|
|
result['status'] = 'fixed'
|
|
|
|
elif validation['warnings']:
|
|
result['status'] = 'review'
|
|
else:
|
|
# No custodian_name - try to derive one
|
|
best_name, best_source, validation = select_best_name(data)
|
|
result['validation'] = validation
|
|
|
|
if best_name and validation['is_valid']:
|
|
result['status'] = 'missing_valid'
|
|
result['new_name'] = best_name
|
|
result['new_source'] = best_source
|
|
|
|
if fix:
|
|
new_custodian_name = {
|
|
'claim_type': 'custodian_name',
|
|
'claim_value': best_name,
|
|
'source': best_source,
|
|
'provenance_note': 'Auto-derived with validation',
|
|
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
data['custodian_name'] = new_custodian_name
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
result['fixed'] = True
|
|
result['status'] = 'derived'
|
|
else:
|
|
result['status'] = 'missing_invalid'
|
|
|
|
return result
|
|
|
|
|
|
def generate_report(results: List[Dict], output_path: Path):
|
|
"""Generate CSV report of validation results."""
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
'entry_file', 'entry_index', 'status',
|
|
'current_name', 'current_source', 'confidence',
|
|
'new_name', 'new_source', 'warnings', 'recommendation'
|
|
])
|
|
|
|
for r in results:
|
|
validation = r.get('validation', {}) or {}
|
|
writer.writerow([
|
|
r.get('entry_file', ''),
|
|
r.get('entry_index', ''),
|
|
r.get('status', ''),
|
|
r.get('current_name', ''),
|
|
r.get('current_source', ''),
|
|
f"{validation.get('confidence_score', 0):.2f}",
|
|
r.get('new_name', ''),
|
|
r.get('new_source', ''),
|
|
'; '.join(validation.get('warnings', [])),
|
|
validation.get('recommendation', ''),
|
|
])
|
|
|
|
print(f"\nReport saved to: {output_path}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Validate custodian_name against authoritative sources')
|
|
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
|
|
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
|
|
parser.add_argument('--fix', action='store_true', help='Auto-fix invalid names')
|
|
parser.add_argument('--report', type=str, default=None, help='Output report CSV path')
|
|
parser.add_argument('--show-all', action='store_true', help='Show all entries, not just problems')
|
|
args = parser.parse_args()
|
|
|
|
# Find entry files
|
|
if args.entry:
|
|
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
|
|
else:
|
|
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
|
|
print(f"Validating {len(files)} entries...")
|
|
print(f"Mode: {'FIX' if args.fix else 'VALIDATE ONLY'}")
|
|
print()
|
|
|
|
results = []
|
|
stats = {'ok': 0, 'invalid': 0, 'fixed': 0, 'review': 0, 'missing_valid': 0, 'missing_invalid': 0, 'derived': 0, 'error': 0}
|
|
|
|
for filepath in files:
|
|
if filepath.is_dir():
|
|
continue
|
|
|
|
result = process_entry(filepath, fix=args.fix)
|
|
results.append(result)
|
|
stats[result['status']] = stats.get(result['status'], 0) + 1
|
|
|
|
# Print result
|
|
if args.show_all or result['status'] not in ('ok',):
|
|
status_icon = {
|
|
'ok': ' ',
|
|
'review': ' ?',
|
|
'invalid': ' X',
|
|
'fixed': ' !',
|
|
'missing_valid': ' +',
|
|
'missing_invalid': ' -',
|
|
'derived': ' +',
|
|
'error': '!!',
|
|
}.get(result['status'], '??')
|
|
|
|
line = f"{status_icon} {result['entry_file']}: "
|
|
|
|
if result['status'] == 'fixed':
|
|
line += f"'{result['current_name']}' -> '{result['new_name']}' ({result['new_source']})"
|
|
elif result['status'] == 'invalid':
|
|
validation = result.get('validation', {})
|
|
line += f"'{result['current_name']}' INVALID"
|
|
if validation.get('recommendation'):
|
|
line += f" -> {validation['recommendation']}"
|
|
elif result['status'] == 'review':
|
|
validation = result.get('validation', {})
|
|
line += f"'{result['current_name']}' [{validation.get('confidence_score', 0):.2f}]"
|
|
if validation.get('warnings'):
|
|
line += f" - {validation['warnings'][0]}"
|
|
elif result['status'] in ('missing_valid', 'derived'):
|
|
line += f"NEW: '{result['new_name']}' ({result['new_source']})"
|
|
elif result['status'] == 'missing_invalid':
|
|
line += "No valid name found"
|
|
else:
|
|
line += f"'{result['current_name']}'"
|
|
|
|
print(line)
|
|
|
|
# Summary
|
|
print()
|
|
print("=" * 60)
|
|
print("Summary:")
|
|
print(f" Valid (OK): {stats['ok']}")
|
|
print(f" Needs review: {stats['review']}")
|
|
print(f" Invalid: {stats['invalid']}")
|
|
print(f" Fixed: {stats['fixed']}")
|
|
print(f" Missing (derivable): {stats['missing_valid']}")
|
|
print(f" Missing (no source): {stats['missing_invalid']}")
|
|
print(f" Auto-derived: {stats['derived']}")
|
|
print(f" Errors: {stats['error']}")
|
|
print("=" * 60)
|
|
|
|
# Generate report if requested
|
|
if args.report:
|
|
generate_report(results, Path(args.report))
|
|
elif stats['invalid'] + stats['review'] > 0:
|
|
# Auto-generate report for problems
|
|
report_path = REPORTS_DIR / f"custodian_name_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
problem_results = [r for r in results if r['status'] in ('invalid', 'review', 'missing_invalid')]
|
|
if problem_results:
|
|
generate_report(problem_results, report_path)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|