#!/usr/bin/env python3 """ Resolve NL-*-XXX-* files by looking up institutions in Wikidata and GeoNames. This script: 1. Reads all NL-*-XXX-*.yaml files 2. Searches Wikidata for each institution 3. Gets coordinates and city from Wikidata 4. Looks up city code from GeoNames 5. Generates new GHCID with proper city code 6. Optionally renames files to new GHCID Usage: python scripts/resolve_nl_xxx_locations.py --dry-run # Preview changes python scripts/resolve_nl_xxx_locations.py # Apply changes """ import os import re import sys import yaml import sqlite3 import argparse from pathlib import Path from datetime import datetime, timezone from typing import Optional, Dict, Any, Tuple import requests import time # Load environment variables try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # dotenv not installed, rely on environment # Configuration CUSTODIAN_DIR = Path("data/custodian") GEONAMES_DB = Path("data/reference/geonames.db") # Wikidata API credentials from environment WIKIDATA_API_TOKEN = os.getenv("WIKIDATA_API_TOKEN") WIKIMEDIA_CONTACT_EMAIL = os.getenv("WIKIMEDIA_CONTACT_EMAIL", "glam-ontology-bot@example.com") # Wikidata API endpoints WIKIDATA_SEARCH_URL = "https://www.wikidata.org/w/api.php" WIKIDATA_SPARQL_URL = "https://query.wikidata.org/sparql" # Request headers with authentication and proper User-Agent (required by Wikimedia policy) def get_headers(include_auth: bool = True) -> Dict[str, str]: """Get request headers with authentication and User-Agent.""" headers = { "User-Agent": f"GLAM-Ontology-Bot/1.0 ({WIKIMEDIA_CONTACT_EMAIL})", "Accept": "application/json", } if include_auth and WIKIDATA_API_TOKEN: headers["Authorization"] = f"Bearer {WIKIDATA_API_TOKEN}" return headers # Netherlands province code mapping (GeoNames admin1 -> ISO 3166-2) NL_PROVINCE_CODES = { "01": "DR", # Drenthe "02": "FR", # Friesland "03": "GE", # Gelderland "04": "GR", # Groningen "05": "LI", # Limburg "06": "NB", # Noord-Brabant "07": "NH", # Noord-Holland "09": "UT", # Utrecht "10": "ZE", # Zeeland "11": "ZH", # Zuid-Holland "15": "OV", # Overijssel "16": "FL", # Flevoland } def search_wikidata(name: str) -> Optional[str]: """Search Wikidata for an entity by name, return QID.""" params = { "action": "wbsearchentities", "search": name, "language": "nl", "format": "json", "limit": 5, } try: resp = requests.get( WIKIDATA_SEARCH_URL, params=params, headers=get_headers(include_auth=True), timeout=10 ) resp.raise_for_status() data = resp.json() if data.get("search"): return data["search"][0]["id"] except Exception as e: print(f" Warning: Wikidata search failed for '{name}': {e}") return None def get_wikidata_location(qid: str) -> Optional[Dict[str, Any]]: """Get location data from Wikidata using SPARQL.""" query = f""" SELECT ?coords ?cityLabel ?city ?regionLabel ?region WHERE {{ wd:{qid} wdt:P625 ?coords . OPTIONAL {{ wd:{qid} wdt:P131 ?city . }} OPTIONAL {{ ?city wdt:P131* ?region . ?region wdt:P31 wd:Q134390 . }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,en". }} }} LIMIT 1 """ try: resp = requests.get( WIKIDATA_SPARQL_URL, params={"query": query, "format": "json"}, headers=get_headers(include_auth=False), # SPARQL endpoint doesn't use OAuth timeout=30, ) resp.raise_for_status() data = resp.json() if data.get("results", {}).get("bindings"): result = data["results"]["bindings"][0] coords_str = result.get("coords", {}).get("value", "") # Parse Point(lon lat) match = re.search(r"Point\(([-\d.]+)\s+([-\d.]+)\)", coords_str) if match: lon, lat = float(match.group(1)), float(match.group(2)) return { "qid": qid, "latitude": lat, "longitude": lon, "city_label": result.get("cityLabel", {}).get("value"), "city_qid": result.get("city", {}).get("value", "").split("/")[-1], } except Exception as e: print(f" Warning: SPARQL query failed for {qid}: {e}") return None def get_city_code_from_geonames( lat: float, lon: float, country_code: str = "NL" ) -> Optional[Dict[str, str]]: """Reverse geocode coordinates to get city code from GeoNames database. Strategy: Find the largest settlement within ~10km. If no settlement found, fall back to nearest settlement within ~20km. This prevents small villages from being selected over nearby major cities (e.g., Apenheul should map to Apeldoorn, not Ugchelen). """ if not GEONAMES_DB.exists(): print(f" Warning: GeoNames database not found: {GEONAMES_DB}") return None conn = sqlite3.connect(GEONAMES_DB) cursor = conn.cursor() # First try: Find largest settlement within ~10km (0.01 degree² ≈ 10km at NL latitude) # This prefers major cities over small villages query_largest = """ SELECT name, ascii_name, admin1_code, admin1_name, latitude, longitude, geonames_id, population, feature_code, ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) < 0.01 ORDER BY population DESC LIMIT 1 """ # Fallback: Find nearest settlement within ~20km query_nearest = """ SELECT name, ascii_name, admin1_code, admin1_name, latitude, longitude, geonames_id, population, feature_code, ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) as distance_sq FROM cities WHERE country_code = ? AND feature_code IN ('PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLS', 'PPLG') AND ((latitude - ?) * (latitude - ?) + (longitude - ?) * (longitude - ?)) < 0.04 ORDER BY distance_sq LIMIT 1 """ try: # Try largest first cursor.execute(query_largest, ( lat, lat, lon, lon, # distance_sq country_code, lat, lat, lon, lon, # distance filter )) row = cursor.fetchone() # Fallback to nearest if no large city found if not row: cursor.execute(query_nearest, ( lat, lat, lon, lon, # distance_sq country_code, lat, lat, lon, lon, # distance filter )) row = cursor.fetchone() if row: name, ascii_name, admin1_code, admin1_name, g_lat, g_lon, geonames_id, pop, feature_code = row[:9] # Generate city code (first 3 letters of ASCII name, uppercase) city_code = ascii_name[:3].upper() if ascii_name else name[:3].upper() # Get province code province_code = NL_PROVINCE_CODES.get(admin1_code, "XX") return { "city_name": name, "city_code": city_code, "geonames_id": geonames_id, "admin1_code": admin1_code, "province_code": province_code, "feature_code": feature_code, "latitude": g_lat, "longitude": g_lon, } except Exception as e: print(f" Warning: GeoNames lookup failed: {e}") finally: conn.close() return None def generate_abbreviation(name: str) -> str: """Generate institution abbreviation from name.""" # Skip words (Dutch/English articles, prepositions) skip_words = { 'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des', "'s", 'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit', 'over', 'onder', 'door', 'en', 'of', 'a', 'an', 'the', 'of', 'in', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'as', 'and', 'or' } words = re.split(r'[\s\-]+', name) initials = [] for word in words: # Clean word clean = re.sub(r'[^\w]', '', word) if clean.lower() not in skip_words and clean: initials.append(clean[0].upper()) return ''.join(initials[:10]) # Max 10 characters def load_yaml(filepath: Path) -> Dict[str, Any]: """Load YAML file.""" with open(filepath, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def save_yaml(filepath: Path, data: Dict[str, Any]): """Save YAML file.""" with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) def resolve_institution(filepath: Path, dry_run: bool = True) -> Optional[Dict[str, Any]]: """Resolve location for a single institution file.""" data = load_yaml(filepath) # Get institution name name = data.get("custodian_name", {}).get("emic_name", "") if not name: print(f" Skipping: No emic_name found") return None print(f"\n Institution: {name}") # Search Wikidata time.sleep(0.5) # Rate limiting qid = search_wikidata(name) if not qid: print(f" Warning: Not found in Wikidata") return None print(f" Wikidata: {qid}") # Get location from Wikidata time.sleep(0.5) location = get_wikidata_location(qid) if not location: print(f" Warning: No coordinates in Wikidata") return None print(f" Coords: ({location['latitude']}, {location['longitude']})") if location.get("city_label"): print(f" Wikidata city: {location['city_label']}") # Reverse geocode to GeoNames geonames = get_city_code_from_geonames(location["latitude"], location["longitude"]) if not geonames: print(f" Warning: GeoNames lookup failed") return None print(f" GeoNames city: {geonames['city_name']} ({geonames['city_code']})") print(f" Province: {geonames['province_code']} (admin1: {geonames['admin1_code']})") # Get institution type and abbreviation inst_types = data.get("institution_type", ["U"]) inst_type = inst_types[0] if isinstance(inst_types, list) else inst_types abbrev = generate_abbreviation(name) # Generate new GHCID old_ghcid = data.get("ghcid", {}).get("ghcid_current", filepath.stem) new_ghcid = f"NL-{geonames['province_code']}-{geonames['city_code']}-{inst_type}-{abbrev}" print(f" Old GHCID: {old_ghcid}") print(f" New GHCID: {new_ghcid}") # Check if province changed (location was wrong) old_province = old_ghcid.split("-")[1] if len(old_ghcid.split("-")) > 1 else "XX" if old_province != geonames['province_code']: print(f" ⚠️ PROVINCE MISMATCH: Was {old_province}, should be {geonames['province_code']}") result = { "filepath": filepath, "name": name, "qid": qid, "old_ghcid": old_ghcid, "new_ghcid": new_ghcid, "city_name": geonames["city_name"], "city_code": geonames["city_code"], "province_code": geonames["province_code"], "geonames_id": geonames["geonames_id"], "latitude": location["latitude"], "longitude": location["longitude"], "province_changed": old_province != geonames['province_code'], } if not dry_run: # Update the YAML file timestamp = datetime.now(timezone.utc).isoformat() # Update location data["location"] = { "city": geonames["city_name"], "region": geonames["province_code"], "country": "NL", "coordinates": { "latitude": location["latitude"], "longitude": location["longitude"], } } # Update GHCID old_history = data.get("ghcid", {}).get("ghcid_history", []) data["ghcid"] = { "ghcid_current": new_ghcid, "ghcid_original": old_ghcid, "location_resolution": { "method": "WIKIDATA_GEONAMES_LOOKUP", "wikidata_id": qid, "geonames_id": geonames["geonames_id"], "city_name": geonames["city_name"], "city_code": geonames["city_code"], "region_code": geonames["province_code"], "country_code": "NL", "resolution_date": timestamp, }, "ghcid_history": [ { "ghcid": new_ghcid, "valid_from": timestamp, "valid_to": None, "reason": f"Location resolved via Wikidata ({qid}) + GeoNames reverse geocoding" } ] + old_history } # Add Wikidata identifier if not present identifiers = data.get("identifiers", []) has_wikidata = any(i.get("identifier_scheme") == "Wikidata" for i in identifiers) if not has_wikidata: identifiers.append({ "identifier_scheme": "Wikidata", "identifier_value": qid, "identifier_url": f"https://www.wikidata.org/wiki/{qid}" }) data["identifiers"] = identifiers # Save updated file save_yaml(filepath, data) print(f" ✓ Updated file") # Rename file if GHCID changed if new_ghcid != old_ghcid: new_filepath = filepath.parent / f"{new_ghcid}.yaml" if new_filepath.exists(): print(f" ⚠️ Cannot rename: {new_filepath} already exists!") else: filepath.rename(new_filepath) print(f" ✓ Renamed to {new_filepath.name}") result["new_filepath"] = new_filepath return result def main(): parser = argparse.ArgumentParser(description="Resolve NL-*-XXX-* location files") parser.add_argument("--dry-run", action="store_true", help="Preview changes without modifying files") parser.add_argument("--limit", type=int, default=None, help="Limit number of files to process") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") args = parser.parse_args() # Check API token if WIKIDATA_API_TOKEN: print(f"✓ Wikidata API token loaded ({len(WIKIDATA_API_TOKEN)} chars)") else: print("⚠️ Warning: No WIKIDATA_API_TOKEN found in environment") print(" The script will use unauthenticated requests (may hit rate limits)") if args.verbose: print(f" User-Agent: GLAM-Ontology-Bot/1.0 ({WIKIMEDIA_CONTACT_EMAIL})") # Find all NL-*-XXX-*.yaml files pattern = "NL-*-XXX-*.yaml" files = sorted(CUSTODIAN_DIR.glob(pattern)) print(f"Found {len(files)} NL-*-XXX-*.yaml files") if args.dry_run: print("DRY RUN - No changes will be made") if args.limit: files = files[:args.limit] print(f"Processing first {args.limit} files") results = [] resolved = 0 failed = 0 province_mismatches = 0 for filepath in files: print(f"\n{'='*60}") print(f"Processing: {filepath.name}") try: result = resolve_institution(filepath, dry_run=args.dry_run) if result: results.append(result) resolved += 1 if result.get("province_changed"): province_mismatches += 1 else: failed += 1 except Exception as e: print(f" ERROR: {e}") failed += 1 # Summary print(f"\n{'='*60}") print("SUMMARY") print(f"{'='*60}") print(f"Total files: {len(files)}") print(f"Resolved: {resolved}") print(f"Failed/Skipped: {failed}") print(f"Province mismatches: {province_mismatches}") if province_mismatches > 0: print(f"\n⚠️ Province mismatches found - these institutions were assigned wrong province in LinkedIn import:") for r in results: if r.get("province_changed"): print(f" - {r['name']}: {r['old_ghcid']} → {r['new_ghcid']}") if __name__ == "__main__": main()