""" Automated Spot Checks for Wikidata Fuzzy Matches Programmatically detects obvious errors in fuzzy matches to prioritize manual review: 1. City name mismatches (different cities = likely wrong match) 2. Institution type mismatches (detected from Wikidata) 3. ISIL code conflicts (if Wikidata has different ISIL) 4. Name pattern issues (branch suffixes, gymnasium libraries) 5. Very low scores (<87%) with no ISIL confirmation Generates prioritized review list with auto-detected issues. """ import json import csv import re from pathlib import Path from typing import Dict, List, Optional, Tuple from rapidfuzz import fuzz from SPARQLWrapper import SPARQLWrapper, JSON as SPARQL_JSON import time def load_csv_matches(csv_path: Path) -> List[Dict]: """Load fuzzy matches from CSV.""" matches = [] with open(csv_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: matches.append(row) return matches def query_wikidata_entity(qid: str) -> Optional[Dict]: """ Query Wikidata for entity details. Returns dict with: type (P31), isil (P791), city (P131) """ query = f""" SELECT ?type ?typeLabel ?isil ?city ?cityLabel WHERE {{ wd:{qid} wdt:P31 ?type . OPTIONAL {{ wd:{qid} wdt:P791 ?isil }} OPTIONAL {{ wd:{qid} wdt:P131 ?city }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "da,en" }} }} LIMIT 5 """ endpoint = SPARQLWrapper("https://query.wikidata.org/sparql") endpoint.setQuery(query) endpoint.setReturnFormat(SPARQL_JSON) endpoint.addCustomHttpHeader('User-Agent', 'GLAM-Spot-Check/1.0') try: results = endpoint.query().convert() bindings = results['results']['bindings'] if not bindings: return None # Aggregate results types = [b.get('typeLabel', {}).get('value') for b in bindings] isil = bindings[0].get('isil', {}).get('value') if bindings else None city = bindings[0].get('cityLabel', {}).get('value') if bindings else None return { 'types': list(set(filter(None, types))), 'isil': isil, 'city': city } except Exception as e: print(f" āš ļø Error querying {qid}: {e}") return None def check_city_mismatch(our_city: str, wd_city: Optional[str]) -> Tuple[bool, str]: """Check if cities match (accounting for variations).""" if not our_city or not wd_city: return False, "" our_city_clean = our_city.lower().strip() wd_city_clean = wd_city.lower().strip() # Exact match if our_city_clean == wd_city_clean: return False, "" # One contains the other if our_city_clean in wd_city_clean or wd_city_clean in our_city_clean: return False, "" # Fuzzy match (allow minor spelling variations) similarity = fuzz.ratio(our_city_clean, wd_city_clean) if similarity > 85: return False, "" # Cities don't match return True, f"City mismatch: '{our_city}' vs Wikidata '{wd_city}'" def check_isil_conflict(our_isil: str, wd_isil: Optional[str]) -> Tuple[bool, str]: """Check if ISIL codes conflict.""" if not our_isil or not wd_isil: return False, "" our_isil_clean = our_isil.strip() wd_isil_clean = wd_isil.strip() if our_isil_clean != wd_isil_clean: return True, f"ISIL conflict: our '{our_isil}' vs Wikidata '{wd_isil}'" return False, "" def check_type_mismatch(our_type: str, wd_types: List[str]) -> Tuple[bool, str]: """Check if institution types match.""" if not wd_types: return False, "" # Map our types to Wikidata type labels type_mappings = { 'LIBRARY': ['library', 'public library', 'academic library', 'university library', 'national library'], 'ARCHIVE': ['archive', 'archives', 'archival institution', 'state archive', 'national archives'], 'MUSEUM': ['museum', 'art museum', 'history museum'] } expected_types = type_mappings.get(our_type, []) # Check if any Wikidata type matches our expected types for wd_type in wd_types: wd_type_lower = wd_type.lower() for expected in expected_types: if expected in wd_type_lower: return False, "" # No match found return True, f"Type mismatch: our {our_type} vs Wikidata {', '.join(wd_types[:3])}" def check_name_patterns(inst_name: str, wd_label: str) -> Tuple[bool, str]: """Check for problematic name patterns.""" issues = [] # Pattern 1: Branch suffix in our name if ', Biblioteket' in inst_name and ', Biblioteket' not in wd_label: issues.append("Our name has ', Biblioteket' suffix (branch?), Wikidata doesn't") # Pattern 2: Gymnasium library if 'Gymnasium' in inst_name and 'Gymnasium' not in wd_label: issues.append("Our name has 'Gymnasium' (school library?), Wikidata doesn't") # Pattern 3: Different institution names entirely # Extract base names (remove suffixes) our_base = re.sub(r',\s*Biblioteket$', '', inst_name) wd_base = re.sub(r'\s+Bibliotek$', '', wd_label) similarity = fuzz.ratio(our_base.lower(), wd_base.lower()) if similarity < 60: issues.append(f"Low name similarity ({similarity}%) - possibly different institutions") if issues: return True, "; ".join(issues) return False, "" def check_low_score_no_isil(match_score: float, our_isil: str) -> Tuple[bool, str]: """Flag low scores without ISIL confirmation.""" if match_score < 87 and not our_isil: return True, f"Low score ({match_score}%) with no ISIL to verify" return False, "" def run_spot_checks(matches: List[Dict]) -> List[Dict]: """ Run automated spot checks on all fuzzy matches. Returns list of matches with spot_check_issues field added. """ print(f"\nRunning automated spot checks on {len(matches)} fuzzy matches...") print("This will query Wikidata for each Q-number (may take ~5 minutes)\n") results = [] issue_count = 0 for i, match in enumerate(matches, 1): if i % 20 == 0: print(f" Progress: {i}/{len(matches)} ({i/len(matches)*100:.1f}%)") qid = match['wikidata_qid'] inst_name = match['institution_name'] wd_label = match['wikidata_label'] our_city = match['city'] our_type = match['institution_type'] our_isil = match['isil_code'] match_score = float(match['match_score']) issues = [] # Check 1: Low score without ISIL has_issue, msg = check_low_score_no_isil(match_score, our_isil) if has_issue: issues.append(f"āš ļø {msg}") # Check 2: Name patterns has_issue, msg = check_name_patterns(inst_name, wd_label) if has_issue: issues.append(f"šŸ” {msg}") # Query Wikidata for entity details wd_data = query_wikidata_entity(qid) time.sleep(0.5) # Rate limiting (2 req/sec) if wd_data: # Check 3: City mismatch has_issue, msg = check_city_mismatch(our_city, wd_data.get('city')) if has_issue: issues.append(f"🚨 {msg}") # Check 4: ISIL conflict has_issue, msg = check_isil_conflict(our_isil, wd_data.get('isil')) if has_issue: issues.append(f"🚨 {msg}") # Check 5: Type mismatch has_issue, msg = check_type_mismatch(our_type, wd_data.get('types', [])) if has_issue: issues.append(f"āš ļø {msg}") # Add spot check results to match match_with_issues = match.copy() if issues: match_with_issues['spot_check_issues'] = " | ".join(issues) match_with_issues['auto_flag'] = 'REVIEW_URGENT' issue_count += 1 else: match_with_issues['spot_check_issues'] = '' match_with_issues['auto_flag'] = 'OK' results.append(match_with_issues) print(f"\n āœ… Spot checks complete: {issue_count}/{len(matches)} matches flagged") return results def generate_flagged_report(results: List[Dict], output_path: Path): """Generate CSV with spot check results.""" # Sort by: auto_flag (REVIEW_URGENT first), then priority, then score def sort_key(r): flag_priority = 0 if r['auto_flag'] == 'REVIEW_URGENT' else 1 return (flag_priority, int(r['priority']), float(r['match_score'])) results_sorted = sorted(results, key=sort_key) # Write CSV with new columns fieldnames = [ 'auto_flag', 'spot_check_issues', 'priority', 'match_score', 'institution_name', 'wikidata_label', 'city', 'institution_type', 'isil_code', 'ghcid', 'wikidata_qid', 'wikidata_url', 'validation_status', 'validation_notes', 'institution_id' ] with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(results_sorted) def generate_summary(results: List[Dict]): """Print summary of spot check findings.""" total = len(results) flagged = sum(1 for r in results if r['auto_flag'] == 'REVIEW_URGENT') ok = total - flagged print("\n" + "=" * 70) print("Automated Spot Check Summary") print("=" * 70) print(f"\nšŸ“Š Overall Results") print(f" Total fuzzy matches: {total}") print(f" Flagged issues: {flagged} ({flagged/total*100:.1f}%)") print(f" No issues detected: {ok} ({ok/total*100:.1f}%)") # Count issue types issue_types = { 'City mismatch': 0, 'ISIL conflict': 0, 'Type mismatch': 0, 'Low score no ISIL': 0, 'Name pattern issue': 0 } for result in results: issues = result.get('spot_check_issues', '') if 'City mismatch' in issues: issue_types['City mismatch'] += 1 if 'ISIL conflict' in issues: issue_types['ISIL conflict'] += 1 if 'Type mismatch' in issues: issue_types['Type mismatch'] += 1 if 'Low score' in issues and 'no ISIL' in issues: issue_types['Low score no ISIL'] += 1 if 'Biblioteket' in issues or 'Gymnasium' in issues or 'similarity' in issues: issue_types['Name pattern issue'] += 1 print(f"\n🚨 Issue Breakdown") for issue_type, count in sorted(issue_types.items(), key=lambda x: -x[1]): if count > 0: print(f" {issue_type:<25}: {count:3d} matches") # Sample flagged records print(f"\nšŸ” Sample Flagged Records (Top 5)") flagged_records = [r for r in results if r['auto_flag'] == 'REVIEW_URGENT'] for i, record in enumerate(flagged_records[:5], 1): print(f"\n {i}. Priority {record['priority']} - Score {record['match_score']}%") print(f" Institution: {record['institution_name']}") print(f" Wikidata: {record['wikidata_label']}") print(f" Issues: {record['spot_check_issues']}") print("\n" + "=" * 70) print("Next Steps") print("=" * 70) print(f""" 1. Review flagged CSV: data/review/denmark_wikidata_fuzzy_matches_flagged.csv 2. Focus on REVIEW_URGENT rows first ({flagged} matches) 3. Fill validation_status for flagged rows: - City/ISIL conflicts → Likely INCORRECT - Type mismatches → Likely INCORRECT - Name pattern issues → Needs manual judgment 4. Then review remaining OK rows ({ok} matches) 5. Run: python scripts/apply_wikidata_validation.py """) print("=" * 70) def main(): print("=" * 70) print("Automated Spot Checks for Wikidata Fuzzy Matches") print("=" * 70) # Load fuzzy matches csv_path = Path('data/review/denmark_wikidata_fuzzy_matches.csv') print(f"\nLoading fuzzy matches: {csv_path}") matches = load_csv_matches(csv_path) print(f" āœ… Loaded {len(matches)} matches") # Run spot checks (queries Wikidata) results = run_spot_checks(matches) # Generate flagged report output_path = Path('data/review/denmark_wikidata_fuzzy_matches_flagged.csv') print(f"\nGenerating flagged report: {output_path}") generate_flagged_report(results, output_path) print(f" āœ… Saved flagged report") # Print summary generate_summary(results) print("\nāœ… Automated Spot Checks Complete") if __name__ == '__main__': main()