glam/scripts/enrich_nde_entries.py
2025-11-30 23:30:29 +01:00

1043 lines
36 KiB
Python

#!/usr/bin/env python3
"""
Unified NDE Entry Enrichment Script
This script provides a flexible way to enrich NDE entries with:
- Wikidata data (Q-numbers, coordinates, founding dates, identifiers)
- Google Maps data (place IDs, coordinates, ratings, reviews, opening hours)
Supports different entry types through configuration profiles:
- museum_register: Museum Register Nederland entries (1515-1655)
- kb_isil: KB Netherlands library entries
- all: All entries without enrichment
- custom: Custom entry range or pattern
Usage:
# Enrich Museum Register entries with Wikidata
python scripts/enrich_nde_entries.py --profile museum_register --source wikidata
# Enrich KB libraries with Google Maps
python scripts/enrich_nde_entries.py --profile kb_isil --source google_maps
# Enrich specific range with both sources
python scripts/enrich_nde_entries.py --start 1515 --end 1600 --source both
# Enrich all entries missing Wikidata
python scripts/enrich_nde_entries.py --profile all --source wikidata --skip-enriched
Environment Variables:
GOOGLE_PLACES_TOKEN - Required for Google Maps enrichment
"""
import os
import sys
import time
import json
import yaml
import re
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Tuple, Callable
from dataclasses import dataclass, field, asdict
from difflib import SequenceMatcher
import logging
import argparse
try:
import httpx
except ImportError:
print("httpx is required. Install with: pip install httpx")
sys.exit(1)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# =============================================================================
# Configuration Profiles
# =============================================================================
PROFILES = {
"museum_register": {
"description": "Museum Register Nederland entries",
"entry_range": (1515, 1655),
"file_pattern": None,
"institution_type": "museum",
"wikidata_query_type": "museum",
},
"kb_isil": {
"description": "KB Netherlands library entries",
"entry_range": None,
"file_pattern": "*_kb_isil.yaml",
"institution_type": "library",
"wikidata_query_type": "library",
},
"na_isil": {
"description": "NA Netherlands archive entries",
"entry_range": None,
"file_pattern": None,
"has_field": "isil-code_na",
"institution_type": "archive",
"wikidata_query_type": "archive",
},
"all": {
"description": "All entries",
"entry_range": None,
"file_pattern": "*.yaml",
"institution_type": None,
"wikidata_query_type": "heritage",
},
}
# =============================================================================
# API Configuration
# =============================================================================
SPARQL_URL = "https://query.wikidata.org/sparql"
TEXT_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
USER_AGENT = "GLAM-NDE-Enricher/1.0 (https://github.com/sst/glam)"
REQUEST_DELAY = 0.4 # Seconds between requests
GOOGLE_PLACE_FIELDS = [
"id", "displayName", "formattedAddress", "addressComponents",
"location", "types", "businessStatus", "internationalPhoneNumber",
"nationalPhoneNumber", "regularOpeningHours", "websiteUri",
"rating", "userRatingCount", "googleMapsUri", "primaryType",
"shortFormattedAddress", "editorialSummary",
]
# =============================================================================
# Data Classes
# =============================================================================
@dataclass
class EnrichmentStats:
"""Track enrichment statistics."""
total_files: int = 0
already_enriched: int = 0
website_matches: int = 0
isil_matches: int = 0
name_matches: int = 0
not_found: int = 0
skipped: int = 0
errors: int = 0
@property
def total_enriched(self) -> int:
return self.website_matches + self.isil_matches + self.name_matches
def to_dict(self) -> Dict[str, int]:
return asdict(self)
# =============================================================================
# Name Normalization and Matching
# =============================================================================
def normalize_name(name: str) -> str:
"""Normalize institution name for fuzzy matching."""
if not name:
return ""
name = name.lower()
# Remove parenthetical content (e.g., "(incl. Kunsthal)")
name = re.sub(r'\s*\([^)]*\)', '', name)
# Remove common Dutch prefixes (at start of name only)
prefix_patterns = [
r'^stichting\s+', r'^vereniging\s+',
r'^het\s+', r'^de\s+', r'^nationaal\s+', r'^gemeentelijk\s+',
r'^openbare\s+bibliotheek\s+',
]
for pattern in prefix_patterns:
name = re.sub(pattern, '', name)
# Remove suffixes
suffix_patterns = [
r'\s+nederland$', r'\s+stichting$',
]
for pattern in suffix_patterns:
name = re.sub(pattern, '', name)
# Remove location suffixes (city names after main name)
# e.g., "Rijksmuseum Amsterdam" -> "Rijksmuseum"
name = re.sub(r'\s+(amsterdam|rotterdam|den haag|utrecht|eindhoven|groningen|tilburg|almere|breda|nijmegen|enschede|haarlem|arnhem|zaanstad|amersfoort|apeldoorn|hoofddorp|maastricht|leiden|dordrecht|zoetermeer|zwolle|deventer|delft|alkmaar|heerlen|venlo|leeuwarden|hilversum)$', '', name)
# Normalize compound museum words: keep core name
# "molenmuseum" -> "molen", "scheepvaartmuseum" -> "scheepvaart"
# But keep standalone "museum" words like "rijksmuseum"
name = re.sub(r'(\w{3,})museum\b', r'\1', name) # compound: keep prefix
name = re.sub(r'\bmuseum\s+', '', name) # "museum xyz" -> "xyz"
name = re.sub(r'\s+museum$', '', name) # "xyz museum" -> "xyz"
# Remove articles that appear mid-name
name = re.sub(r'\b(het|de)\b', ' ', name)
# Remove punctuation and normalize whitespace
name = re.sub(r'[^\w\s]', ' ', name)
name = ' '.join(name.split())
return name.strip()
def similarity_score(name1: str, name2: str) -> float:
"""Calculate similarity between two names (0-1)."""
norm1 = normalize_name(name1)
norm2 = normalize_name(name2)
if not norm1 or not norm2:
return 0.0
# Standard sequence matching
seq_score = SequenceMatcher(None, norm1, norm2).ratio()
# Bonus for substring containment (one name contains the other)
# This helps match "molen valk" with "valk" or "naturalis" with "naturalis biodiversity center"
shorter, longer = (norm1, norm2) if len(norm1) <= len(norm2) else (norm2, norm1)
if shorter and shorter in longer:
# Substring match bonus - scaled by how much of the longer string is matched
containment_ratio = len(shorter) / len(longer)
seq_score = max(seq_score, 0.65 + 0.35 * containment_ratio)
return seq_score
# =============================================================================
# Wikidata Functions
# =============================================================================
def get_wikidata_query(query_type: str) -> str:
"""Get SPARQL query for different institution types."""
type_filters = {
"museum": "?item wdt:P31/wdt:P279* wd:Q33506 .",
"library": "?item wdt:P31/wdt:P279* wd:Q7075 .",
"archive": "?item wdt:P31/wdt:P279* wd:Q166118 .",
"heritage": """
{ ?item wdt:P31/wdt:P279* wd:Q33506 . } # museum
UNION { ?item wdt:P31/wdt:P279* wd:Q7075 . } # library
UNION { ?item wdt:P31/wdt:P279* wd:Q166118 . } # archive
""",
}
type_filter = type_filters.get(query_type, type_filters["heritage"])
return f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?isil ?viaf ?coords ?website ?inception
WHERE {{
{type_filter}
?item wdt:P17 wd:Q55 . # country: Netherlands
OPTIONAL {{ ?item wdt:P791 ?isil . }}
OPTIONAL {{ ?item wdt:P214 ?viaf . }}
OPTIONAL {{ ?item wdt:P625 ?coords . }}
OPTIONAL {{ ?item wdt:P856 ?website . }}
OPTIONAL {{ ?item wdt:P571 ?inception . }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,en" . }}
}}
LIMIT 3000
"""
def query_wikidata_institutions(client: httpx.Client, query_type: str) -> Dict[str, Dict[str, Any]]:
"""Query Wikidata for Dutch institutions."""
query = get_wikidata_query(query_type)
headers = {
"Accept": "application/sparql-results+json",
"User-Agent": USER_AGENT,
}
try:
logger.info(f"Querying Wikidata for Dutch {query_type} institutions...")
response = client.get(
SPARQL_URL,
params={"query": query, "format": "json"},
headers=headers,
timeout=120.0
)
response.raise_for_status()
data = response.json()
results = {}
for binding in data.get("results", {}).get("bindings", []):
item_uri = binding.get("item", {}).get("value", "")
qid = item_uri.split("/")[-1] if item_uri else None
if not qid or not qid.startswith("Q") or qid in results:
continue
result = {
"qid": qid,
"name": binding.get("itemLabel", {}).get("value", ""),
"description": binding.get("itemDescription", {}).get("value", ""),
"identifiers": {}
}
if "isil" in binding:
result["isil"] = binding["isil"]["value"]
if "viaf" in binding:
result["identifiers"]["VIAF"] = binding["viaf"]["value"]
if "website" in binding:
result["identifiers"]["Website"] = binding["website"]["value"]
if "inception" in binding:
result["founding_date"] = binding["inception"]["value"].split("T")[0]
if "coords" in binding:
coords_str = binding["coords"]["value"]
if coords_str.startswith("Point("):
try:
lon, lat = coords_str[6:-1].split()
result["latitude"] = float(lat)
result["longitude"] = float(lon)
except (ValueError, IndexError):
pass
results[qid] = result
logger.info(f"Found {len(results)} institutions in Wikidata")
return results
except Exception as e:
logger.error(f"Error querying Wikidata: {e}")
return {}
def query_wikidata_by_isil(client: httpx.Client, isil_codes: List[str]) -> Dict[str, Dict[str, Any]]:
"""Query Wikidata for institutions by ISIL codes."""
if not isil_codes:
return {}
isil_values = " ".join(f'"{code}"' for code in isil_codes[:100]) # Limit batch size
query = f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?isil ?viaf ?coords ?website ?inception
WHERE {{
VALUES ?isil {{ {isil_values} }}
?item wdt:P791 ?isil .
OPTIONAL {{ ?item wdt:P214 ?viaf . }}
OPTIONAL {{ ?item wdt:P625 ?coords . }}
OPTIONAL {{ ?item wdt:P856 ?website . }}
OPTIONAL {{ ?item wdt:P571 ?inception . }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "nl,en" . }}
}}
"""
headers = {
"Accept": "application/sparql-results+json",
"User-Agent": USER_AGENT,
}
try:
response = client.get(
SPARQL_URL,
params={"query": query, "format": "json"},
headers=headers,
timeout=60.0
)
response.raise_for_status()
data = response.json()
results = {}
for binding in data.get("results", {}).get("bindings", []):
isil = binding.get("isil", {}).get("value", "")
item_uri = binding.get("item", {}).get("value", "")
qid = item_uri.split("/")[-1] if item_uri else None
if not isil or not qid:
continue
result = {
"qid": qid,
"name": binding.get("itemLabel", {}).get("value", ""),
"description": binding.get("itemDescription", {}).get("value", ""),
"isil": isil,
"identifiers": {}
}
if "viaf" in binding:
result["identifiers"]["VIAF"] = binding["viaf"]["value"]
if "website" in binding:
result["identifiers"]["Website"] = binding["website"]["value"]
if "inception" in binding:
result["founding_date"] = binding["inception"]["value"].split("T")[0]
if "coords" in binding:
coords_str = binding["coords"]["value"]
if coords_str.startswith("Point("):
try:
lon, lat = coords_str[6:-1].split()
result["latitude"] = float(lat)
result["longitude"] = float(lon)
except (ValueError, IndexError):
pass
results[isil] = result
return results
except Exception as e:
logger.error(f"Error querying Wikidata by ISIL: {e}")
return {}
def find_wikidata_match(
name: str,
city: Optional[str],
province: Optional[str],
institutions: Dict[str, Dict[str, Any]],
threshold: float = 0.70
) -> Optional[Dict[str, Any]]:
"""Find best matching institution by name."""
best_score = 0.0
best_match = None
for qid, inst_data in institutions.items():
inst_name = inst_data.get("name", "")
if not inst_name:
continue
name_score = similarity_score(name, inst_name)
# Boost for location match
location_boost = 0.0
search_text = (inst_name + " " + inst_data.get("description", "")).lower()
if city and city.lower() in search_text:
location_boost = 0.12
if province and province.lower() in search_text:
location_boost = max(location_boost, 0.08)
total_score = name_score + location_boost
if total_score > best_score:
best_score = total_score
best_match = inst_data.copy()
if best_score >= threshold and best_match:
best_match["match_score"] = best_score
return best_match
return None
def create_wikidata_enrichment(wikidata: Dict[str, Any], match_method: str) -> Dict[str, Any]:
"""Create Wikidata enrichment section."""
enrichment = {
"wikidata_entity_id": wikidata["qid"],
"wikidata_label": wikidata.get("name"),
"wikidata_description": wikidata.get("description"),
"fetch_timestamp": datetime.now(timezone.utc).isoformat(),
"match_method": match_method,
}
if "latitude" in wikidata and "longitude" in wikidata:
enrichment["wikidata_coordinates"] = {
"latitude": wikidata["latitude"],
"longitude": wikidata["longitude"]
}
if "founding_date" in wikidata:
enrichment["wikidata_inception"] = wikidata["founding_date"]
if wikidata.get("identifiers"):
enrichment["wikidata_identifiers"] = wikidata["identifiers"]
if "isil" in wikidata:
enrichment["wikidata_isil"] = wikidata["isil"]
if "match_score" in wikidata:
enrichment["match_confidence"] = round(wikidata["match_score"], 3)
return enrichment
# =============================================================================
# Google Maps Functions
# =============================================================================
def search_google_place(
query: str,
client: httpx.Client,
api_key: str,
location_bias: Optional[Tuple[float, float]] = None,
) -> Optional[Dict[str, Any]]:
"""Search for a place using Google Places API."""
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": api_key,
"X-Goog-FieldMask": ",".join([f"places.{f}" for f in GOOGLE_PLACE_FIELDS]),
}
body = {
"textQuery": query,
"languageCode": "nl",
"regionCode": "NL",
"maxResultCount": 1,
}
if location_bias:
lat, lng = location_bias
body["locationBias"] = {
"circle": {
"center": {"latitude": lat, "longitude": lng},
"radius": 50000.0
}
}
try:
response = client.post(TEXT_SEARCH_URL, headers=headers, json=body)
response.raise_for_status()
data = response.json()
places = data.get("places", [])
return places[0] if places else None
except httpx.HTTPStatusError as e:
error_data = {}
try:
error_data = e.response.json()
except Exception:
pass
error_msg = error_data.get("error", {}).get("message", str(e))
logger.error(f"Google API error: {error_msg}")
return None
except Exception as e:
logger.error(f"Error searching Google: {e}")
return None
def create_google_maps_enrichment(place: Dict[str, Any]) -> Dict[str, Any]:
"""Create Google Maps enrichment section."""
location = place.get("location", {})
display_name = place.get("displayName", {})
opening_hours = place.get("regularOpeningHours")
if opening_hours:
opening_hours = {
"periods": opening_hours.get("periods"),
"weekday_text": opening_hours.get("weekdayDescriptions"),
}
address_components = place.get("addressComponents")
if address_components:
address_components = [
{
"long_name": c.get("longText"),
"short_name": c.get("shortText"),
"types": c.get("types", []),
}
for c in address_components
]
enrichment = {
"place_id": place.get("id", ""),
"name": display_name.get("text", ""),
"fetch_timestamp": datetime.now(timezone.utc).isoformat(),
"api_status": "OK",
}
if location.get("latitude") and location.get("longitude"):
enrichment["coordinates"] = {
"latitude": location["latitude"],
"longitude": location["longitude"],
}
if place.get("formattedAddress"):
enrichment["formatted_address"] = place["formattedAddress"]
if place.get("shortFormattedAddress"):
enrichment["short_address"] = place["shortFormattedAddress"]
if address_components:
enrichment["address_components"] = address_components
if place.get("nationalPhoneNumber"):
enrichment["phone_local"] = place["nationalPhoneNumber"]
if place.get("internationalPhoneNumber"):
enrichment["phone_international"] = place["internationalPhoneNumber"]
if place.get("websiteUri"):
enrichment["website"] = place["websiteUri"]
if place.get("types"):
enrichment["google_place_types"] = place["types"]
if place.get("primaryType"):
enrichment["primary_type"] = place["primaryType"]
if place.get("businessStatus"):
enrichment["business_status"] = place["businessStatus"]
if opening_hours:
enrichment["opening_hours"] = opening_hours
if place.get("rating") is not None:
enrichment["rating"] = place["rating"]
if place.get("userRatingCount") is not None:
enrichment["total_ratings"] = place["userRatingCount"]
if place.get("editorialSummary"):
enrichment["editorial_summary"] = place["editorialSummary"].get("text")
if place.get("googleMapsUri"):
enrichment["google_maps_url"] = place["googleMapsUri"]
return enrichment
# =============================================================================
# Entry Processing
# =============================================================================
def get_entry_info(entry: Dict[str, Any]) -> Dict[str, Any]:
"""Extract key information from an entry."""
original = entry.get("original_entry", {})
mr_enrichment = entry.get("museum_register_enrichment", {})
kb_enrichment = entry.get("kb_enrichment", {})
return {
"name": (
original.get("organisatie") or
mr_enrichment.get("museum_name") or
kb_enrichment.get("name") or
""
),
"website": (
original.get("webadres_organisatie") or
mr_enrichment.get("website_url") or
""
),
"city": (
original.get("plaatsnaam_bezoekadres") or
kb_enrichment.get("city") or
""
),
"province": (
original.get("provincie") or
mr_enrichment.get("province") or
""
),
"street": original.get("straat_en_huisnummer_bezoekadres") or "",
"isil_na": original.get("isil-code_na") or "",
"isil_kb": original.get("isil_code_kb") or kb_enrichment.get("isil_code") or "",
"type": original.get("type_organisatie") or "",
}
def build_google_search_query(info: Dict[str, Any], institution_type: Optional[str]) -> str:
"""Build a search query for Google Places."""
name = info["name"]
city = info["city"]
street = info["street"]
# Add institution type hint if not in name
if institution_type == "library" and "bibliotheek" not in name.lower():
name = f"Bibliotheek {name}"
elif institution_type == "museum" and "museum" not in name.lower():
name = f"{name} museum"
parts = [name]
if street:
parts.append(street)
if city:
parts.append(city)
parts.append("Netherlands")
return ", ".join(filter(None, parts))
def get_entry_files(
entries_dir: Path,
profile: Dict[str, Any],
entry_range: Optional[Tuple[int, int]] = None,
) -> List[Path]:
"""Get list of entry files to process based on profile."""
yaml_files = []
# Get pattern from profile or use entry range
file_pattern = profile.get("file_pattern")
profile_range = profile.get("entry_range") or entry_range
has_field = profile.get("has_field")
if file_pattern and file_pattern != "*.yaml":
# Use specific file pattern
yaml_files = sorted(entries_dir.glob(file_pattern))
else:
# Use entry range
for f in sorted(entries_dir.glob("*.yaml")):
if f.name.startswith("_"):
continue
match = re.match(r'^(\d+)_', f.name)
if not match:
continue
entry_num = int(match.group(1))
if profile_range:
start, end = profile_range
if entry_num < start or entry_num > end:
continue
yaml_files.append(f)
# Filter by has_field if specified
if has_field:
filtered = []
for f in yaml_files:
try:
with open(f, 'r', encoding='utf-8') as fh:
entry = yaml.safe_load(fh)
if entry and entry.get("original_entry", {}).get(has_field):
filtered.append(f)
except Exception:
pass
yaml_files = filtered
return yaml_files
def process_entries(
entries_dir: Path,
profile: Dict[str, Any],
source: str,
dry_run: bool = False,
limit: Optional[int] = None,
entry_range: Optional[Tuple[int, int]] = None,
force: bool = False,
google_api_key: Optional[str] = None,
) -> EnrichmentStats:
"""Process entries for enrichment."""
stats = EnrichmentStats()
# Get files to process
yaml_files = get_entry_files(entries_dir, profile, entry_range)
stats.total_files = len(yaml_files)
if limit:
yaml_files = yaml_files[:limit]
logger.info(f"Found {stats.total_files} entry files matching profile")
logger.info(f"Processing {len(yaml_files)} files (limit: {limit or 'none'})")
# Determine which enrichments to run
do_wikidata = source in ("wikidata", "both")
do_google = source in ("google_maps", "both")
if do_google and not google_api_key:
logger.error("GOOGLE_PLACES_TOKEN required for Google Maps enrichment")
return stats
# Collect entry data
entries_data = []
isil_codes = []
for yaml_file in yaml_files:
try:
with open(yaml_file, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
stats.skipped += 1
continue
# Check existing enrichment
has_wikidata = bool(entry.get("wikidata_enrichment"))
has_google = bool(entry.get("google_maps_enrichment"))
if not force:
if do_wikidata and has_wikidata and do_google and has_google:
stats.already_enriched += 1
continue
if do_wikidata and not do_google and has_wikidata:
stats.already_enriched += 1
continue
if do_google and not do_wikidata and has_google:
stats.already_enriched += 1
continue
info = get_entry_info(entry)
if not info["name"]:
stats.skipped += 1
continue
# Collect ISIL codes for batch query
if info["isil_na"]:
isil_codes.append(info["isil_na"])
if info["isil_kb"]:
isil_codes.append(info["isil_kb"])
entries_data.append({
"file": yaml_file,
"entry": entry,
"info": info,
"needs_wikidata": do_wikidata and (force or not has_wikidata),
"needs_google": do_google and (force or not has_google),
})
except Exception as e:
logger.error(f"Error loading {yaml_file.name}: {e}")
stats.errors += 1
if not entries_data:
logger.info("No entries to process")
return stats
logger.info(f"Collected {len(entries_data)} entries for enrichment")
# Initialize data sources
wikidata_institutions = {}
isil_results = {}
with httpx.Client(timeout=120.0) as client:
if do_wikidata:
# Query Wikidata
query_type = profile.get("wikidata_query_type", "heritage")
wikidata_institutions = query_wikidata_institutions(client, query_type)
time.sleep(REQUEST_DELAY)
# Also query by ISIL codes
if isil_codes:
logger.info(f"Querying Wikidata for {len(isil_codes)} ISIL codes...")
isil_results = query_wikidata_by_isil(client, list(set(isil_codes)))
logger.info(f"Found {len(isil_results)} by ISIL")
time.sleep(REQUEST_DELAY)
# Process each entry
for entry_data in entries_data:
yaml_file = entry_data["file"]
entry = entry_data["entry"]
info = entry_data["info"]
modified = False
logger.info(f"\nProcessing: {info['name'][:60]}")
# Wikidata enrichment
if entry_data["needs_wikidata"]:
wikidata_match: Optional[Dict[str, Any]] = None
match_method: str = "unknown"
# Try ISIL match first
for isil in [info["isil_na"], info["isil_kb"]]:
if isil and isil in isil_results:
wikidata_match = isil_results[isil]
match_method = "isil_code_match"
stats.isil_matches += 1
logger.info(f" -> ISIL match: {wikidata_match['name']} ({wikidata_match['qid']})")
break
# Try name match
if not wikidata_match:
wikidata_match = find_wikidata_match(
info["name"], info["city"], info["province"],
wikidata_institutions, threshold=0.75
)
if wikidata_match:
match_method = "fuzzy_name_match"
stats.name_matches += 1
score = wikidata_match.get("match_score", 0)
logger.info(f" -> Name match: {wikidata_match['name']} ({wikidata_match['qid']}) [{score:.2f}]")
if wikidata_match:
entry["wikidata_enrichment"] = create_wikidata_enrichment(wikidata_match, match_method)
modified = True
else:
entry["wikidata_enrichment_status"] = "NOT_FOUND"
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
if entry_data["needs_wikidata"] and not entry_data["needs_google"]:
stats.not_found += 1
logger.info(" -> No Wikidata match")
# Google Maps enrichment
if entry_data["needs_google"]:
# google_api_key is guaranteed non-None here (checked at line 702-704)
assert google_api_key is not None
institution_type = profile.get("institution_type")
query = build_google_search_query(info, institution_type)
NL_CENTER = (52.1326, 5.2913)
place = search_google_place(query, client, google_api_key, NL_CENTER)
if place:
entry["google_maps_enrichment"] = create_google_maps_enrichment(place)
entry["google_maps_status"] = "SUCCESS"
entry["google_maps_search_query"] = query
modified = True
gm_name = place.get("displayName", {}).get("text", "")
rating = place.get("rating", "N/A")
logger.info(f" -> Google: {gm_name} ({rating}★)")
else:
entry["google_maps_status"] = "NOT_FOUND"
entry["google_maps_search_query"] = query
entry["google_maps_search_timestamp"] = datetime.now(timezone.utc).isoformat()
stats.not_found += 1
logger.info(" -> No Google match")
time.sleep(REQUEST_DELAY)
# Save entry
if modified and not dry_run:
try:
with open(yaml_file, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
except Exception as e:
logger.error(f"Error saving {yaml_file.name}: {e}")
stats.errors += 1
return stats
# =============================================================================
# Main Entry Point
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="Unified NDE entry enrichment with Wikidata and Google Maps",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Enrich Museum Register entries with Wikidata
%(prog)s --profile museum_register --source wikidata
# Enrich KB libraries with Google Maps
%(prog)s --profile kb_isil --source google_maps
# Enrich custom range with both sources
%(prog)s --start 1515 --end 1600 --source both
# Dry run to see what would be done
%(prog)s --profile museum_register --source both --dry-run
"""
)
parser.add_argument(
"--profile",
choices=list(PROFILES.keys()),
default="all",
help="Entry profile to process (default: all)"
)
parser.add_argument(
"--source",
choices=["wikidata", "google_maps", "both"],
default="both",
help="Enrichment source (default: both)"
)
parser.add_argument(
"--start",
type=int,
help="Start entry number (overrides profile range)"
)
parser.add_argument(
"--end",
type=int,
help="End entry number (overrides profile range)"
)
parser.add_argument(
"--limit",
type=int,
help="Limit number of entries to process"
)
parser.add_argument(
"--force",
action="store_true",
help="Re-enrich entries that already have data"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't save changes, just show what would be done"
)
parser.add_argument(
"--entries-dir",
type=Path,
default=Path(__file__).parent.parent / "data" / "nde" / "enriched" / "entries",
help="Path to entries directory"
)
args = parser.parse_args()
# Get profile
profile = PROFILES[args.profile].copy()
logger.info(f"Profile: {args.profile} - {profile['description']}")
# Override range if specified
entry_range = None
if args.start is not None or args.end is not None:
start = args.start or 0
end = args.end or 99999
entry_range = (start, end)
logger.info(f"Entry range: {start} to {end}")
if args.dry_run:
logger.info("DRY RUN MODE - no changes will be saved")
if not args.entries_dir.exists():
logger.error(f"Entries directory not found: {args.entries_dir}")
return 1
# Get Google API key if needed
google_api_key = None
if args.source in ("google_maps", "both"):
google_api_key = os.getenv("GOOGLE_PLACES_TOKEN", "")
if not google_api_key:
logger.error("GOOGLE_PLACES_TOKEN environment variable required for Google Maps enrichment")
return 1
# Process entries
stats = process_entries(
entries_dir=args.entries_dir,
profile=profile,
source=args.source,
dry_run=args.dry_run,
limit=args.limit,
entry_range=entry_range,
force=args.force,
google_api_key=google_api_key,
)
# Print summary
logger.info("\n" + "=" * 60)
logger.info("ENRICHMENT COMPLETE")
logger.info("=" * 60)
logger.info(f"Total files: {stats.total_files}")
logger.info(f"Already enriched: {stats.already_enriched}")
logger.info(f"ISIL matches: {stats.isil_matches}")
logger.info(f"Name matches: {stats.name_matches}")
logger.info(f"Not found: {stats.not_found}")
logger.info(f"Skipped: {stats.skipped}")
logger.info(f"Errors: {stats.errors}")
logger.info(f"Total enriched: {stats.total_enriched}")
# Save stats
if not args.dry_run:
stats_file = args.entries_dir.parent / f"enrichment_stats_{args.profile}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(stats_file, 'w') as f:
json.dump({
"timestamp": datetime.now(timezone.utc).isoformat(),
"profile": args.profile,
"source": args.source,
"dry_run": args.dry_run,
"limit": args.limit,
"entry_range": list(entry_range) if entry_range else None,
**stats.to_dict()
}, f, indent=2)
logger.info(f"Stats saved to: {stats_file}")
return 0
if __name__ == "__main__":
sys.exit(main())