glam/scripts/derive_custodian_name_v2.py

495 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Derive CustodianName by finding consensus across all enrichment sources.
APPROACH: Find the name that appears most consistently across sources.
Instead of a fixed priority, we compare all available names and pick
the one with the highest agreement (fuzzy matching).
Sources checked:
- wikidata_enrichment.wikidata_label_nl / wikidata_label_en
- google_maps_enrichment.name
- isil_enrichment.name
- original_entry.organisatie
- museum_register (if present)
- youtube_enrichment (if present)
- web_claims org_name (og:site_name, schema.org, h1, title)
The consensus approach automatically handles:
- Wrong Google Maps POIs (parking lots won't match other sources)
- Garbage web claims (exhibition titles won't match Wikidata)
- Outdated CSV names (if most sources agree on new name)
Usage:
python scripts/derive_custodian_name_v2.py [--limit N] [--entry ENTRY_NUM] [--dry-run] [--force]
"""
import argparse
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import yaml
try:
from rapidfuzz import fuzz
RAPIDFUZZ_AVAILABLE = True
except ImportError:
RAPIDFUZZ_AVAILABLE = False
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
# Source weights for tie-breaking (not primary selection)
SOURCE_WEIGHTS = {
'wikidata': 1.0,
'google_maps': 0.9,
'isil': 0.85,
'original_entry': 0.8,
'museum_register': 0.75,
'youtube': 0.7,
'web_og_site_name': 0.6,
'web_schema_org': 0.55,
'web_h1_tag': 0.4,
'web_title_tag': 0.35,
}
# Patterns that indicate invalid/garbage names
INVALID_NAME_PATTERNS = [
# Navigation/UI elements
r'^(home|welkom|welcome|menu|nav|header|footer|sidebar)$',
r'^(contact|over ons|about|info|informatie)$',
r'^(nieuws|news|agenda|calendar|events?|activiteiten)$',
r'^(zoeken?|search|filter|sort|browse|bladeren)$',
r'^zoeken in', r'^doorzoek\s', r'^bekijk\s', r'^ontdek\s',
# Cookie/privacy/legal
r'cookie', r'privacy', r'gdpr', r'consent', r'waarom gebruiken wij',
# Generic page elements
r'^(default|untitled|index|main|pagina|page)\s*\d*$',
r'^(foto|image|picture|afbeelding)\s*\d+$',
r'^(oproep|call|melding|bericht|scroll)$',
r'^(openingstijden|tickets|reserveer|plan je bezoek)$',
r'^(main menu|hoofdmenu)$',
# Exhibition/event titles
r'tentoonstelling', r'expositie', r'exhibition', r'verlengd',
r'^nu te zien', r'^te zien:',
# Taglines/slogans
r'^op het kruispunt van', r'^het verhaal van\s', r'^de geschiedenis van\s',
r'^beleef je\s', r'^ontdek ook\s', r'^welkom bij\s',
r'^over het museum$', r'^over de\s', r'^over ons$',
r'binnen handbereik$', r'met een glimlach$',
# Newsletter/marketing
r'nieuwsbrief', r'newsletter', r'^schrijf je in', r'^sign up',
# Wrong websites
r'webdesign', r'libraries\.org', r'NLmapNew\.com', r'fotobeeldbank',
# Wrong POIs from Google Maps
r'^parkeerplaats$', r'^parking$', r'^bushalte$', r'^tramhalte$',
# Generic/ambiguous
r'^homepage\s', r'^homepagina\s', r'^chat$', r'^help$',
r'onder constructie', r"web server's default page",
]
def normalize_name(name: str) -> str:
"""Normalize name for comparison."""
if not name:
return ""
return ' '.join(name.lower().split())
def fuzzy_match_score(name1: str, name2: str) -> float:
"""Calculate fuzzy match score between two names (0-1)."""
if not name1 or not name2:
return 0.0
n1 = normalize_name(name1)
n2 = normalize_name(name2)
if n1 == n2:
return 1.0
if RAPIDFUZZ_AVAILABLE:
token_score = fuzz.token_set_ratio(n1, n2) / 100.0
partial_score = fuzz.partial_ratio(n1, n2) / 100.0
return max(token_score * 0.8 + partial_score * 0.2, token_score)
else:
if n1 in n2 or n2 in n1:
return min(len(n1), len(n2)) / max(len(n1), len(n2))
return 0.0
def is_obviously_invalid(name: str) -> bool:
"""Check if a name is obviously invalid."""
if not name or len(name.strip()) < 3:
return True
name_lower = name.lower().strip()
for pattern in INVALID_NAME_PATTERNS:
if re.search(pattern, name_lower, re.IGNORECASE):
return True
# Mostly numbers
if sum(1 for c in name if c.isdigit()) > len(name) * 0.5:
return True
return False
def clean_name(name: str) -> str:
"""Clean organization name."""
if not name:
return ""
name = ' '.join(name.split())
name = name.strip(' -–—|:.')
return name
def extract_all_names(entry_data: Dict) -> List[Tuple[str, str, float]]:
"""
Extract all candidate names from all enrichment sources.
Returns list of (name, source, weight) tuples.
"""
candidates = []
# Wikidata
wikidata = entry_data.get('wikidata_enrichment', {})
for field in ['wikidata_label_nl', 'wikidata_label_en']:
if wikidata.get(field):
name = clean_name(wikidata[field])
if not is_obviously_invalid(name):
candidates.append((name, 'wikidata', SOURCE_WEIGHTS['wikidata']))
break # Only use one wikidata name
# Google Maps
google = entry_data.get('google_maps_enrichment', {})
if google.get('name'):
name = clean_name(google['name'])
if not is_obviously_invalid(name):
candidates.append((name, 'google_maps', SOURCE_WEIGHTS['google_maps']))
# ISIL registry
isil = entry_data.get('isil_enrichment', {})
if isil.get('name'):
name = clean_name(isil['name'])
if not is_obviously_invalid(name):
candidates.append((name, 'isil', SOURCE_WEIGHTS['isil']))
# Original CSV entry
original = entry_data.get('original_entry', {})
if original.get('organisatie'):
name = clean_name(original['organisatie'])
if not is_obviously_invalid(name):
candidates.append((name, 'original_entry', SOURCE_WEIGHTS['original_entry']))
# Museum register (if present)
museum_reg = entry_data.get('museum_register_enrichment', {})
if museum_reg.get('name'):
name = clean_name(museum_reg['name'])
if not is_obviously_invalid(name):
candidates.append((name, 'museum_register', SOURCE_WEIGHTS['museum_register']))
# YouTube (if present)
youtube = entry_data.get('youtube_enrichment', {})
if youtube.get('channel_name'):
name = clean_name(youtube['channel_name'])
if not is_obviously_invalid(name):
candidates.append((name, 'youtube', SOURCE_WEIGHTS['youtube']))
# Web claims
web_claims = entry_data.get('web_claims', {}).get('claims', [])
for claim in web_claims:
if claim.get('claim_type') == 'org_name':
name = clean_name(claim.get('claim_value', ''))
method = claim.get('extraction_method', '')
source_key = f'web_{method}'
weight = SOURCE_WEIGHTS.get(source_key, 0.3)
if not is_obviously_invalid(name):
candidates.append((name, source_key, weight))
return candidates
def find_consensus_name(candidates: List[Tuple[str, str, float]]) -> Tuple[Optional[str], str, float, Dict]:
"""
Find the name with highest consensus across sources.
For each candidate, calculate how well it matches all other candidates.
The name with highest total agreement wins.
Returns (best_name, best_source, confidence, match_details)
"""
if not candidates:
return None, 'none', 0.0, {}
if len(candidates) == 1:
name, source, weight = candidates[0]
return name, source, weight, {'single_source': True}
# Calculate agreement scores for each candidate
agreement_scores = []
for i, (name1, source1, weight1) in enumerate(candidates):
total_agreement = 0.0
matches = []
for j, (name2, source2, weight2) in enumerate(candidates):
if i == j:
continue
score = fuzzy_match_score(name1, name2)
# Weight the agreement by the source weight of the matching name
weighted_score = score * weight2
total_agreement += weighted_score
if score >= 0.6:
matches.append({
'source': source2,
'name': name2,
'score': score,
})
# Normalize by number of other sources
avg_agreement = total_agreement / (len(candidates) - 1) if len(candidates) > 1 else 0
# Boost by source weight
final_score = avg_agreement * 0.7 + weight1 * 0.3
agreement_scores.append({
'name': name1,
'source': source1,
'weight': weight1,
'avg_agreement': avg_agreement,
'final_score': final_score,
'matches': matches,
'match_count': len(matches),
})
# Sort by final score (highest first)
agreement_scores.sort(key=lambda x: (x['final_score'], x['match_count'], x['weight']), reverse=True)
best = agreement_scores[0]
# Calculate confidence based on agreement
confidence = best['final_score']
if best['match_count'] >= 2:
confidence = min(1.0, confidence + 0.1) # Boost for multiple matches
return best['name'], best['source'], confidence, {
'match_count': best['match_count'],
'matches': best['matches'],
'avg_agreement': best['avg_agreement'],
'all_candidates': [(c['name'], c['source'], c['final_score']) for c in agreement_scores],
}
def process_entry(filepath: Path, dry_run: bool = False) -> Dict[str, Any]:
"""
Process a single entry file to derive CustodianName by consensus.
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return {'status': 'error', 'message': 'Empty file', 'filepath': str(filepath)}
result = {
'filepath': str(filepath),
'filename': filepath.name,
'entry_index': data.get('entry_index', ''),
'status': 'ok',
'name': None,
'source': None,
'confidence': 0.0,
'match_count': 0,
'previous_name': None,
'previous_source': None,
}
# Get current custodian_name if exists
current = data.get('custodian_name', {})
if current.get('claim_value'):
result['previous_name'] = current.get('claim_value')
result['previous_source'] = current.get('source') or current.get('extraction_method', 'unknown')
# Extract all candidate names from all sources
candidates = extract_all_names(data)
if not candidates:
result['status'] = 'no_source'
result['message'] = 'No valid names found in any source'
return result
# Find consensus name
best_name, best_source, confidence, details = find_consensus_name(candidates)
if not best_name:
result['status'] = 'no_consensus'
result['message'] = 'Could not find consensus among candidates'
return result
result['name'] = best_name
result['source'] = best_source
result['confidence'] = confidence
result['match_count'] = details.get('match_count', 0)
result['candidates'] = len(candidates)
# Build custodian_name record
custodian_name = {
'claim_type': 'custodian_name',
'claim_value': best_name,
'source': best_source,
'confidence': round(confidence, 3),
'consensus_method': True,
'sources_checked': len(candidates),
'sources_matched': details.get('match_count', 0) + 1, # +1 for self
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
# Add match details
if details.get('matches'):
custodian_name['matching_sources'] = [
{'source': m['source'], 'name': m['name'], 'score': round(m['score'], 2)}
for m in details['matches']
]
# Track if changed
if result['previous_name'] and result['previous_name'] != best_name:
custodian_name['previous_value'] = result['previous_name']
custodian_name['previous_source'] = result['previous_source']
result['status'] = 'updated'
elif not result['previous_name']:
result['status'] = 'new'
# Write if not dry run
if not dry_run:
data['custodian_name'] = custodian_name
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return result
def main():
parser = argparse.ArgumentParser(description='Derive CustodianName by consensus across sources')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
parser.add_argument('--force', action='store_true', help='Re-derive even if custodian_name exists')
parser.add_argument('--show-all', action='store_true', help='Show all entries, not just changes')
parser.add_argument('--verbose', action='store_true', help='Show candidate details')
args = parser.parse_args()
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
print(f"Processing {len(files)} entries...")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print(f"Method: Consensus across all enrichment sources")
print()
# Track statistics
stats = defaultdict(int)
low_confidence = []
for filepath in files:
if filepath.is_dir():
continue
# Skip if already has custodian_name (unless --force)
if not args.force:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('custodian_name', {}).get('claim_value'):
stats['unchanged'] += 1
if args.show_all:
name = data['custodian_name']['claim_value']
source = data['custodian_name'].get('source', 'unknown')
print(f" = {filepath.name}: '{name}' [{source}]")
continue
result = process_entry(filepath, dry_run=args.dry_run)
# Update stats
if result['status'] == 'error':
stats['error'] += 1
print(f" ! {filepath.name}: ERROR - {result.get('message', 'Unknown')}")
elif result['status'] in ('no_source', 'no_consensus'):
stats['no_source'] += 1
print(f" - {filepath.name}: {result.get('message', 'No source')}")
else:
stats[result['source']] += 1
stats['total_derived'] += 1
# Track low confidence for review
if result['confidence'] < 0.5:
low_confidence.append(result)
if result['status'] == 'updated':
stats['updated'] += 1
match_info = f"[{result['match_count']+1}/{result['candidates']} sources]"
print(f" ~ {filepath.name}: '{result['previous_name']}' -> '{result['name']}' [{result['source']}] {match_info}")
elif result['status'] == 'new':
stats['new'] += 1
match_info = f"[{result['match_count']+1}/{result['candidates']} sources]"
print(f" + {filepath.name}: '{result['name']}' [{result['source']}] {match_info}")
elif args.show_all:
print(f" = {filepath.name}: '{result['name']}' [{result['source']}]")
# Summary
print()
print("=" * 70)
print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:")
print()
print("Sources used:")
for source in ['wikidata', 'google_maps', 'isil', 'original_entry', 'museum_register',
'youtube', 'web_og_site_name', 'web_schema_org', 'web_h1_tag', 'web_title_tag']:
if stats[source] > 0:
print(f" {source:20s}: {stats[source]}")
print()
print(f" New names derived: {stats['new']}")
print(f" Names updated: {stats['updated']}")
print(f" Unchanged (skipped): {stats['unchanged']}")
print(f" No valid source: {stats['no_source']}")
print(f" Errors: {stats['error']}")
print()
print(f" TOTAL DERIVED: {stats['total_derived']}")
if low_confidence:
print()
print(f" Low confidence ({len(low_confidence)} entries) - may need review:")
for r in low_confidence[:10]:
print(f" {r['filename']}: '{r['name']}' (confidence: {r['confidence']:.2f})")
if len(low_confidence) > 10:
print(f" ... and {len(low_confidence) - 10} more")
print("=" * 70)
return 0
if __name__ == '__main__':
sys.exit(main())