#!/usr/bin/env python3 """ Find Wikidata IDs for NDE entries that don't have them. Uses Wikidata's search API to find matching entities based on: - Organization name - City (plaatsnaam) - Type (museum, archive, library) Usage: python scripts/find_wikidata_for_missing_nde.py Output: data/nde/wikidata_candidates.yaml - Candidates for review data/nde/wikidata_matches.yaml - High-confidence matches """ import os import sys import time import yaml import httpx from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Any from dataclasses import dataclass, field, asdict import logging from rapidfuzz import fuzz # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration WIKIDATA_SEARCH_API = "https://www.wikidata.org/w/api.php" BASE_DELAY = 1.5 # Seconds between requests WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-data@example.com") USER_AGENT = f"GLAMDataExtractor/1.0 ({WIKIMEDIA_CONTACT_EMAIL}) Python/httpx" # Paths PROJECT_ROOT = Path(__file__).parent.parent NDE_YAML = PROJECT_ROOT / "data" / "nde" / "nde_register_nl.yaml" ENTRIES_DIR = PROJECT_ROOT / "data" / "nde" / "enriched" / "entries" OUTPUT_CANDIDATES = PROJECT_ROOT / "data" / "nde" / "wikidata_candidates.yaml" OUTPUT_MATCHES = PROJECT_ROOT / "data" / "nde" / "wikidata_matches.yaml" # Headers HEADERS = { "Accept": "application/json", "User-Agent": USER_AGENT, } # Institution type mappings for Wikidata search TYPE_KEYWORDS = { "M": ["museum", "musea"], "A": ["archief", "archive", "gemeentearchief", "regionaal archief"], "L": ["bibliotheek", "library", "openbare bibliotheek"], "S": ["historische vereniging", "heemkundekring", "stichting"], } @dataclass class WikidataCandidate: """A potential Wikidata match for an NDE entry.""" qid: str label: str description: str match_score: float search_query: str entry_index: int entry_name: str entry_city: str def search_wikidata(query: str, limit: int = 5) -> List[Dict]: """Search Wikidata for entities matching the query.""" params = { "action": "wbsearchentities", "format": "json", "language": "nl", "uselang": "nl", "type": "item", "limit": limit, "search": query, } try: with httpx.Client(headers=HEADERS, timeout=30.0) as client: response = client.get(WIKIDATA_SEARCH_API, params=params) response.raise_for_status() data = response.json() return data.get("search", []) except Exception as e: logger.error(f"Search error for '{query}': {e}") return [] def find_best_match(entry: Dict, entry_index: int) -> Optional[WikidataCandidate]: """Find the best Wikidata match for an NDE entry.""" name = entry.get("organisatie", "") city = entry.get("plaatsnaam_bezoekadres", "") org_type = entry.get("type", [""])[0] if entry.get("type") else "" if not name: return None # Try different search strategies queries = [ name, # Full name f"{name} {city}", # Name + city ] # Add type-specific keywords if org_type in TYPE_KEYWORDS: for keyword in TYPE_KEYWORDS[org_type][:1]: queries.append(f"{name} {keyword}") best_candidate = None best_score = 0 for query in queries: results = search_wikidata(query) time.sleep(BASE_DELAY) # Rate limiting for result in results: qid = result.get("id", "") label = result.get("label", "") description = result.get("description", "") # Calculate match score using fuzzy matching name_score = fuzz.ratio(name.lower(), label.lower()) # Boost score if city appears in description city_boost = 10 if city.lower() in description.lower() else 0 # Boost for Netherlands mentions nl_boost = 5 if any(x in description.lower() for x in ["nederland", "netherlands", "dutch", "nl-"]) else 0 # Boost for institution type mentions type_boost = 0 if org_type in TYPE_KEYWORDS: for keyword in TYPE_KEYWORDS[org_type]: if keyword.lower() in description.lower(): type_boost = 10 break total_score = name_score + city_boost + nl_boost + type_boost if total_score > best_score: best_score = total_score best_candidate = WikidataCandidate( qid=qid, label=label, description=description, match_score=total_score, search_query=query, entry_index=entry_index, entry_name=name, entry_city=city, ) return best_candidate if best_candidate and best_candidate.match_score >= 70 else None def load_entries_without_wikidata() -> List[tuple]: """Load NDE entries that don't have Wikidata IDs.""" with open(NDE_YAML, "r", encoding="utf-8") as f: entries = yaml.safe_load(f) missing = [] for i, entry in enumerate(entries): wikidata_id = entry.get("wikidata_id", "") # Check if it's missing or invalid if not wikidata_id or not str(wikidata_id).startswith("Q"): missing.append((i, entry)) return missing def main(): """Main function.""" logger.info("Finding Wikidata IDs for NDE entries without them...") # Load entries without Wikidata IDs missing_entries = load_entries_without_wikidata() logger.info(f"Found {len(missing_entries)} entries without Wikidata IDs") candidates = [] high_confidence_matches = [] for idx, (entry_index, entry) in enumerate(missing_entries): name = entry.get("organisatie", "Unknown") logger.info(f"[{idx+1}/{len(missing_entries)}] Searching for: {name}") candidate = find_best_match(entry, entry_index) if candidate: candidates.append(asdict(candidate)) logger.info(f" Found: {candidate.qid} - {candidate.label} (score: {candidate.match_score:.0f})") # High confidence: score >= 90 if candidate.match_score >= 90: high_confidence_matches.append({ "entry_index": entry_index, "entry_name": name, "wikidata_id": candidate.qid, "wikidata_label": candidate.label, "match_score": candidate.match_score, }) else: logger.info(f" No match found") # Progress save every 50 entries if (idx + 1) % 50 == 0: save_results(candidates, high_confidence_matches) # Final save save_results(candidates, high_confidence_matches) logger.info(f"\nDone! Found {len(candidates)} candidates, {len(high_confidence_matches)} high-confidence matches") logger.info(f"Candidates saved to: {OUTPUT_CANDIDATES}") logger.info(f"High-confidence matches saved to: {OUTPUT_MATCHES}") def save_results(candidates: List[Dict], matches: List[Dict]): """Save results to YAML files.""" with open(OUTPUT_CANDIDATES, "w", encoding="utf-8") as f: yaml.dump({ "timestamp": datetime.now(timezone.utc).isoformat(), "total_candidates": len(candidates), "candidates": candidates, }, f, default_flow_style=False, allow_unicode=True) with open(OUTPUT_MATCHES, "w", encoding="utf-8") as f: yaml.dump({ "timestamp": datetime.now(timezone.utc).isoformat(), "total_matches": len(matches), "matches": matches, }, f, default_flow_style=False, allow_unicode=True) if __name__ == "__main__": main()