glam/scripts/reenrich_wikidata_with_verification.py
kempersc 486bbee813 feat(wikidata): add re-enrichment and duplicate removal scripts
- Add reenrich_wikidata_with_verification.py for re-running enrichment
- Add remove_wikidata_duplicates.py for deduplication
2025-12-08 14:59:38 +01:00

958 lines
36 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Re-enrich heritage institutions with Wikidata using GLM-4.6 CH Annotator verification.
This script:
1. Finds files marked for re-enrichment (after duplicate cleanup)
2. Queries Wikidata API for candidates by institution name
3. Uses GLM-4.6 to verify matches based on CH Annotator entity types (GRP.HER)
4. Only adds Wikidata enrichment if entity is verified as heritage institution
5. Updates files with verified Wikidata data
CH Annotator Convention (v1.7.0):
- Heritage institutions are type GRP.HER (glam:HeritageCustodian)
- Maps to: org:FormalOrganization, rov:RegisteredOrganization, schema:Museum, schema:Library, schema:ArchiveOrganization
- Subtypes: GRP.HER.GAL (Gallery), GRP.HER.LIB (Library), GRP.HER.ARC (Archive), GRP.HER.MUS (Museum)
Wikidata "instance of" (P31) values for heritage institutions:
- Q33506 (museum)
- Q7075 (library)
- Q166118 (archive)
- Q1007870 (art gallery)
- Q207694 (art museum)
- Q1970365 (natural history museum)
- Q18388277 (history museum)
- Q23413 (castle) - when used as museum
- Q839954 (archaeological site)
- Q174782 (town square) - NOT heritage institution
- Q515 (city) - NOT heritage institution
"""
import asyncio
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
import httpx
import logging
# Load environment variables from .env file
from dotenv import load_dotenv
env_path = Path(__file__).parent.parent / ".env"
load_dotenv(env_path)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
# =============================================================================
# WIKIDATA HERITAGE INSTITUTION TYPE CLASSES (P31 values)
# =============================================================================
# These are valid Wikidata "instance of" values for heritage institutions
HERITAGE_P31_TYPES = {
# Museums
"Q33506": "museum",
"Q207694": "art museum",
"Q1970365": "natural history museum",
"Q18388277": "history museum",
"Q2087181": "university museum",
"Q1007870": "art gallery",
"Q17431399": "national museum",
"Q16735822": "museum building",
"Q1788742": "war museum",
"Q7889618": "gallery of art",
"Q4989906": "monuments and memorials",
"Q57660343": "maritime museum",
"Q15206070": "transport museum",
"Q214090": "ethnographic museum",
"Q2522387": "aviation museum",
"Q841573": "archaeological museum",
"Q28737012": "memorial museum",
"Q588140": "railway museum",
"Q515034": "science museum",
"Q4287745": "local museum",
# Libraries
"Q7075": "library",
"Q856234": "national library",
"Q1078570": "academic library",
"Q11294": "public library",
"Q13226383": "research library",
# Archives
"Q166118": "archive",
"Q473972": "national archives",
"Q1423895": "film archive",
"Q2066131": "regional archive",
"Q63400100": "historical archive",
"Q63400127": "municipal archive",
"Q1026954": "photo archive",
# Galleries
"Q1007870": "art gallery",
"Q7889618": "gallery of art",
# Research centers
"Q31855": "research institute",
"Q327333": "heritage organisation",
# Botanical/Zoo
"Q43229": "botanical garden",
"Q45585": "botanical garden",
"Q43501": "zoo",
# Holy sites (when managing heritage collections)
"Q317557": "monastery",
"Q83405": "abbey",
"Q1088552": "cathedral chapter",
# Educational (with collections)
"Q3918": "university",
"Q875538": "public university",
}
# These P31 values indicate NOT a heritage institution
NON_HERITAGE_P31_TYPES = {
"Q515": "city",
"Q174782": "square",
"Q5": "human",
"Q4830453": "business",
"Q891723": "public company",
"Q783794": "company",
"Q6881511": "enterprise",
"Q43229": "organization", # Too generic
"Q55678": "movie",
"Q7366": "song",
"Q5398426": "television series",
}
# =============================================================================
# WIKIDATA API CLIENT
# =============================================================================
class WikidataSearchClient:
"""Client for Wikidata search and entity API."""
SEARCH_URL = "https://www.wikidata.org/w/api.php"
ENTITY_URL = "https://www.wikidata.org/wiki/Special:EntityData/{qid}.json"
def __init__(self, contact_email: Optional[str] = None):
self.contact_email = contact_email or os.environ.get("WIKIMEDIA_CONTACT_EMAIL", "glam@example.org")
self.client = httpx.AsyncClient(
timeout=30.0,
headers={
"User-Agent": f"GLAMBot/1.0 ({self.contact_email})",
}
)
async def search_entity(self, name: str, language: str = "en", limit: int = 5) -> List[Dict[str, Any]]:
"""
Search Wikidata for entities matching a name.
Returns list of candidates with qid, label, description.
"""
params = {
"action": "wbsearchentities",
"format": "json",
"language": language,
"type": "item",
"limit": limit,
"search": name,
}
try:
response = await self.client.get(self.SEARCH_URL, params=params)
response.raise_for_status()
data = response.json()
results = []
for item in data.get("search", []):
results.append({
"qid": item.get("id"),
"label": item.get("label"),
"description": item.get("description", ""),
"url": item.get("concepturi"),
})
return results
except Exception as e:
logger.error(f"Wikidata search error for '{name}': {e}")
return []
async def get_entity_claims(self, qid: str) -> Dict[str, Any]:
"""
Get entity claims (properties) from Wikidata.
Returns dict with P31 (instance of), P131 (located in), P625 (coordinates), etc.
"""
url = self.ENTITY_URL.format(qid=qid)
try:
response = await self.client.get(url)
response.raise_for_status()
data = response.json()
entity = data.get("entities", {}).get(qid, {})
claims = entity.get("claims", {})
labels = entity.get("labels", {})
descriptions = entity.get("descriptions", {})
# Extract P31 values (instance of)
p31_values = []
for claim in claims.get("P31", []):
mainsnak = claim.get("mainsnak", {})
if mainsnak.get("snaktype") == "value":
datavalue = mainsnak.get("datavalue", {})
if datavalue.get("type") == "wikibase-entityid":
p31_qid = datavalue.get("value", {}).get("id")
if p31_qid:
p31_values.append(p31_qid)
# Extract P131 (located in administrative entity)
p131_values = []
for claim in claims.get("P131", []):
mainsnak = claim.get("mainsnak", {})
if mainsnak.get("snaktype") == "value":
datavalue = mainsnak.get("datavalue", {})
if datavalue.get("type") == "wikibase-entityid":
p131_qid = datavalue.get("value", {}).get("id")
if p131_qid:
p131_values.append(p131_qid)
# Extract P625 (coordinates)
coordinates = None
for claim in claims.get("P625", []):
mainsnak = claim.get("mainsnak", {})
if mainsnak.get("snaktype") == "value":
datavalue = mainsnak.get("datavalue", {})
if datavalue.get("type") == "globecoordinate":
value = datavalue.get("value", {})
coordinates = {
"latitude": value.get("latitude"),
"longitude": value.get("longitude"),
}
break
# Extract P17 (country)
country = None
for claim in claims.get("P17", []):
mainsnak = claim.get("mainsnak", {})
if mainsnak.get("snaktype") == "value":
datavalue = mainsnak.get("datavalue", {})
if datavalue.get("type") == "wikibase-entityid":
country = datavalue.get("value", {}).get("id")
break
return {
"qid": qid,
"labels": {k: v.get("value") for k, v in labels.items()},
"descriptions": {k: v.get("value") for k, v in descriptions.items()},
"p31": p31_values,
"p131": p131_values,
"p17_country": country,
"coordinates": coordinates,
}
except Exception as e:
logger.error(f"Wikidata entity fetch error for {qid}: {e}")
return {}
async def close(self):
await self.client.aclose()
# =============================================================================
# GLM-4.6 CH ANNOTATOR VERIFICATION
# =============================================================================
class GLMHeritageVerifier:
"""
Verify Wikidata entity matches using GLM-4.6 CH Annotator.
Uses CH Annotator v1.7.0 entity type GRP.HER to verify that
a Wikidata entity is actually a heritage institution.
"""
# Z.AI Coding Plan endpoint (different from regular BigModel API)
ZAI_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions"
VERIFICATION_PROMPT = """You are a heritage institution classifier following CH-Annotator v1.7.0 convention.
Your task is to determine if a Wikidata entity is a heritage institution (type GRP.HER).
## CH-Annotator GRP.HER Definition
Heritage institutions are organizations that:
- Collect, preserve, and provide access to cultural heritage materials
- Include: museums (GRP.HER.MUS), libraries (GRP.HER.LIB), archives (GRP.HER.ARC), galleries (GRP.HER.GAL)
- May also include: research centers, botanical gardens, educational institutions WITH collections
## Entity Types That Are NOT Heritage Institutions
- Cities, towns, municipalities (these are places, not institutions)
- General businesses or companies (unless they manage heritage collections)
- People (individuals are AGT.PER, not GRP.HER)
- Events, festivals, exhibitions (temporary, not institutions)
- Buildings without institutional function (just architecture)
## Your Task
Analyze the Wikidata entity data and determine:
1. Is this entity a heritage institution (GRP.HER)?
2. If yes, what subtype? (MUS/LIB/ARC/GAL/OTHER)
3. Confidence score (0.0-1.0)
Respond in JSON format:
```json
{{
"is_heritage_institution": true/false,
"subtype": "MUS|LIB|ARC|GAL|RES|BOT|EDU|OTHER|null",
"confidence": 0.95,
"reasoning": "Brief explanation"
}}
```
## Entity to Analyze
Institution name from our data: {institution_name}
Location from our data: {institution_location}
Wikidata entity:
- QID: {qid}
- Label: {wd_label}
- Description: {wd_description}
- Instance of (P31): {p31_types}
- Located in (P131): {p131_location}
"""
def __init__(self, api_key: Optional[str] = None, model: str = "glm-4.6", use_claude: bool = False):
self.use_claude = use_claude
if use_claude:
self.api_key = api_key or os.environ.get("CLAUDE_API_KEY")
self.model = "claude-3-5-haiku-20241022" # Fast, cheap model
self.api_url = "https://api.anthropic.com/v1/messages"
if not self.api_key:
raise ValueError("CLAUDE_API_KEY not found in environment")
self.client = httpx.AsyncClient(
timeout=60.0,
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
}
)
else:
self.api_key = api_key or os.environ.get("ZAI_API_TOKEN")
self.model = model
# Use Z.AI Coding Plan endpoint (same as OpenCode)
self.api_url = "https://api.z.ai/api/coding/paas/v4/chat/completions"
if not self.api_key:
raise ValueError("ZAI_API_TOKEN not found in environment")
self.client = httpx.AsyncClient(
timeout=60.0,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
async def verify_heritage_institution(
self,
institution_name: str,
institution_location: str,
qid: str,
wd_label: str,
wd_description: str,
p31_types: List[str],
p131_location: List[str],
) -> Dict[str, Any]:
"""
Verify if a Wikidata entity matches a heritage institution.
Returns verification result with confidence score.
"""
# First, quick heuristic check using P31 types
p31_set = set(p31_types)
# Check for definite heritage types
heritage_matches = p31_set & set(HERITAGE_P31_TYPES.keys())
non_heritage_matches = p31_set & set(NON_HERITAGE_P31_TYPES.keys())
# If clear non-heritage type, reject without LLM call
if non_heritage_matches and not heritage_matches:
logger.debug(f"Quick reject {qid}: P31 indicates non-heritage ({non_heritage_matches})")
return {
"is_heritage_institution": False,
"subtype": None,
"confidence": 0.95,
"reasoning": f"P31 types indicate non-heritage: {[NON_HERITAGE_P31_TYPES.get(t, t) for t in non_heritage_matches]}",
"verification_method": "p31_heuristic",
}
# If clear heritage type, high confidence without LLM
if heritage_matches and not non_heritage_matches:
subtype = self._infer_subtype_from_p31(p31_types)
logger.debug(f"Quick accept {qid}: P31 indicates heritage ({heritage_matches})")
return {
"is_heritage_institution": True,
"subtype": subtype,
"confidence": 0.9,
"reasoning": f"P31 types indicate heritage: {[HERITAGE_P31_TYPES.get(t, t) for t in heritage_matches]}",
"verification_method": "p31_heuristic",
}
# Ambiguous case - use GLM-4.6 for verification
p31_labels = [HERITAGE_P31_TYPES.get(t, NON_HERITAGE_P31_TYPES.get(t, t)) for t in p31_types]
prompt = self.VERIFICATION_PROMPT.format(
institution_name=institution_name,
institution_location=institution_location,
qid=qid,
wd_label=wd_label,
wd_description=wd_description,
p31_types=", ".join(p31_labels) if p31_labels else "None specified",
p131_location=", ".join(p131_location) if p131_location else "Not specified",
)
try:
if self.use_claude:
# Claude API request format
response = await self.client.post(
self.api_url,
json={
"model": self.model,
"max_tokens": 512,
"messages": [
{"role": "user", "content": prompt},
],
"system": "You are a heritage institution classifier. Respond only in valid JSON. Start your response with { and end with }.",
}
)
response.raise_for_status()
data = response.json()
content = data.get("content", [{}])[0].get("text", "")
logger.debug(f"Claude raw response for {qid}: {content[:300]}")
verification_method = "claude_ch_annotator"
else:
# GLM/Z.AI API request format
response = await self.client.post(
self.api_url,
json={
"model": self.model,
"messages": [
{"role": "system", "content": "You are a heritage institution classifier. Respond only in valid JSON."},
{"role": "user", "content": prompt},
],
"temperature": 0.1,
"max_tokens": 512,
}
)
response.raise_for_status()
data = response.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
verification_method = "glm_4.6_ch_annotator"
# Parse JSON from response
try:
# Extract JSON from markdown code blocks if present
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
# Try to find JSON object in content
content = content.strip()
# If content doesn't start with {, try to find first {
if not content.startswith("{"):
start_idx = content.find("{")
if start_idx != -1:
# Find matching closing brace
brace_count = 0
end_idx = start_idx
for i, char in enumerate(content[start_idx:], start_idx):
if char == "{":
brace_count += 1
elif char == "}":
brace_count -= 1
if brace_count == 0:
end_idx = i
break
content = content[start_idx:end_idx + 1]
else:
# No { found - wrap content in braces if it looks like JSON body
if '"is_heritage_institution"' in content:
content = "{" + content.rstrip().rstrip(",") + "}"
result = json.loads(content)
result["verification_method"] = verification_method
return result
except json.JSONDecodeError as e:
# Fallback: try to extract values with regex
logger.debug(f"JSON parse failed for {qid}, trying regex fallback: {content[:200]}")
is_heritage = None
subtype = None
confidence = 0.5
reasoning = "Parsed from non-JSON response"
# Check for is_heritage_institution value
if '"is_heritage_institution"' in content:
if 'true' in content.lower():
is_heritage = True
elif 'false' in content.lower():
is_heritage = False
# Extract subtype
subtype_match = re.search(r'"subtype"\s*:\s*"([^"]+)"', content)
if subtype_match:
subtype = subtype_match.group(1)
# Extract confidence
conf_match = re.search(r'"confidence"\s*:\s*([\d.]+)', content)
if conf_match:
try:
confidence = float(conf_match.group(1))
except ValueError:
pass
# Extract reasoning
reason_match = re.search(r'"reasoning"\s*:\s*"([^"]+)"', content)
if reason_match:
reasoning = reason_match.group(1)
if is_heritage is not None:
return {
"is_heritage_institution": is_heritage,
"subtype": subtype,
"confidence": confidence,
"reasoning": reasoning,
"verification_method": f"{verification_method}_regex_fallback",
}
logger.warning(f"Failed to parse LLM response for {qid}: {str(e)[:100]} - content: {content[:200]}")
return {
"is_heritage_institution": False,
"subtype": None,
"confidence": 0.0,
"reasoning": f"Failed to parse LLM response: {str(e)}",
"verification_method": f"{verification_method}_parse_error",
}
except Exception as e:
logger.error(f"LLM verification error for {qid}: {e}")
return {
"is_heritage_institution": False,
"subtype": None,
"confidence": 0.0,
"reasoning": f"API error: {e}",
"verification_method": "llm_api_error",
}
def _infer_subtype_from_p31(self, p31_types: List[str]) -> str:
"""Infer heritage institution subtype from P31 values."""
p31_set = set(p31_types)
# Museum types
museum_types = {"Q33506", "Q207694", "Q1970365", "Q18388277", "Q2087181", "Q17431399",
"Q1788742", "Q57660343", "Q15206070", "Q214090", "Q2522387",
"Q841573", "Q28737012", "Q588140", "Q515034", "Q4287745"}
if p31_set & museum_types:
return "MUS"
# Library types
library_types = {"Q7075", "Q856234", "Q1078570", "Q11294", "Q13226383"}
if p31_set & library_types:
return "LIB"
# Archive types
archive_types = {"Q166118", "Q473972", "Q1423895", "Q2066131", "Q63400100", "Q63400127", "Q1026954"}
if p31_set & archive_types:
return "ARC"
# Gallery types
gallery_types = {"Q1007870", "Q7889618"}
if p31_set & gallery_types:
return "GAL"
# Research centers
if "Q31855" in p31_set or "Q327333" in p31_set:
return "RES"
# Botanical/Zoo
if "Q43229" in p31_set or "Q45585" in p31_set or "Q43501" in p31_set:
return "BOT"
# Educational
if "Q3918" in p31_set or "Q875538" in p31_set:
return "EDU"
return "OTHER"
async def close(self):
await self.client.aclose()
# =============================================================================
# MAIN ENRICHMENT LOGIC
# =============================================================================
async def find_files_needing_reenrichment(custodian_dir: Path) -> List[Path]:
"""Find all files marked for re-enrichment."""
files = []
for file_path in custodian_dir.glob("*.yaml"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if "Re-enrichment required" in content:
files.append(file_path)
except Exception as e:
logger.warning(f"Error reading {file_path}: {e}")
return files
async def enrich_file_with_wikidata(
file_path: Path,
wd_client: WikidataSearchClient,
verifier: GLMHeritageVerifier,
) -> Dict[str, Any]:
"""
Enrich a single file with verified Wikidata data.
Returns enrichment result.
"""
with open(file_path, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
return {"status": "empty_file", "file": str(file_path)}
# Get institution name
name = None
if "custodian_name" in entry and isinstance(entry["custodian_name"], dict):
name = entry["custodian_name"].get("claim_value")
if not name and "google_maps_enrichment" in entry:
name = entry["google_maps_enrichment"].get("name")
if not name and "original_entry" in entry:
name = entry["original_entry"].get("organisatie") or entry["original_entry"].get("instelling")
if not name:
return {"status": "no_name", "file": str(file_path)}
# Get location for verification
location = ""
if "google_maps_enrichment" in entry:
gm = entry["google_maps_enrichment"]
parts = []
if gm.get("short_address"):
parts.append(gm["short_address"])
elif gm.get("formatted_address"):
parts.append(gm["formatted_address"])
location = ", ".join(parts)
elif "original_entry" in entry:
oe = entry["original_entry"]
parts = []
if oe.get("plaatsnaam_bezoekadres"):
parts.append(oe["plaatsnaam_bezoekadres"])
if oe.get("provincie"):
parts.append(oe["provincie"])
location = ", ".join(parts)
# Get country for search language
country_code = "NL" # Default
if "ghcid" in entry:
ghcid = entry["ghcid"].get("ghcid_current", "")
if ghcid and len(ghcid) >= 2:
country_code = ghcid[:2]
# Determine search language based on country
search_langs = ["en"] # Always search English
if country_code == "NL":
search_langs = ["nl", "en"]
elif country_code == "BE":
search_langs = ["nl", "fr", "en"]
elif country_code == "DE":
search_langs = ["de", "en"]
elif country_code == "FR":
search_langs = ["fr", "en"]
elif country_code in ["BR", "PT"]:
search_langs = ["pt", "en"]
elif country_code in ["ES", "MX", "AR", "CL", "CO"]:
search_langs = ["es", "en"]
# Search Wikidata for candidates
all_candidates = []
for lang in search_langs:
candidates = await wd_client.search_entity(name, language=lang, limit=5)
all_candidates.extend(candidates)
await asyncio.sleep(0.2) # Rate limiting
# Deduplicate by QID
seen_qids = set()
unique_candidates = []
for c in all_candidates:
if c["qid"] not in seen_qids:
seen_qids.add(c["qid"])
unique_candidates.append(c)
if not unique_candidates:
# Update file to mark as not found
entry["wikidata_enrichment_status"] = "NOT_FOUND"
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
# Remove re-enrichment note from provenance
if "provenance" in entry and "notes" in entry["provenance"]:
notes = entry["provenance"]["notes"]
if "Re-enrichment required" in notes:
entry["provenance"]["notes"] = notes.split("Re-enrichment required")[0].strip()
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return {"status": "not_found", "file": str(file_path), "name": name}
# Verify each candidate
best_match = None
best_confidence = 0.0
for candidate in unique_candidates[:5]: # Limit to top 5
qid = candidate["qid"]
# Get entity details
entity_data = await wd_client.get_entity_claims(qid)
await asyncio.sleep(0.2)
if not entity_data:
continue
# Verify with GLM-4.6
verification = await verifier.verify_heritage_institution(
institution_name=name,
institution_location=location,
qid=qid,
wd_label=candidate.get("label", ""),
wd_description=candidate.get("description", ""),
p31_types=entity_data.get("p31", []),
p131_location=[str(x) for x in entity_data.get("p131", [])],
)
if verification.get("is_heritage_institution") and verification.get("confidence", 0) > best_confidence:
best_match = {
"qid": qid,
"label": candidate.get("label"),
"description": candidate.get("description"),
"entity_data": entity_data,
"verification": verification,
}
best_confidence = verification.get("confidence", 0)
if not best_match or best_confidence < 0.5:
# No verified match found
entry["wikidata_enrichment_status"] = "NO_VERIFIED_MATCH"
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
entry["wikidata_candidates_checked"] = len(unique_candidates)
# Remove re-enrichment note
if "provenance" in entry and "notes" in entry["provenance"]:
notes = entry["provenance"]["notes"]
if "Re-enrichment required" in notes:
entry["provenance"]["notes"] = notes.split("Re-enrichment required")[0].strip()
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return {"status": "no_verified_match", "file": str(file_path), "name": name, "candidates": len(unique_candidates)}
# Add verified Wikidata enrichment
qid = best_match["qid"]
entity_data = best_match["entity_data"]
verification = best_match["verification"]
entry["wikidata_enrichment"] = {
"wikidata_id": qid,
"wikidata_url": f"https://www.wikidata.org/wiki/{qid}",
"wikidata_label": best_match.get("label"),
"wikidata_description": best_match.get("description"),
"labels": entity_data.get("labels", {}),
"descriptions": entity_data.get("descriptions", {}),
"instance_of": entity_data.get("p31", []),
"located_in": entity_data.get("p131", []),
"country": entity_data.get("p17_country"),
"coordinates": entity_data.get("coordinates"),
"enrichment_timestamp": datetime.now(timezone.utc).isoformat(),
"verification": {
"method": verification.get("verification_method"),
"confidence": verification.get("confidence"),
"subtype": verification.get("subtype"),
"reasoning": verification.get("reasoning"),
"ch_annotator_version": "v1.7.0",
},
}
entry["wikidata_enrichment_status"] = "VERIFIED"
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
# Add Wikidata ID to identifiers
if "identifiers" not in entry:
entry["identifiers"] = []
# Check if Wikidata ID already exists
existing_schemes = {i.get("identifier_scheme") for i in entry["identifiers"] if isinstance(i, dict)}
if "Wikidata" not in existing_schemes:
entry["identifiers"].append({
"identifier_scheme": "Wikidata",
"identifier_value": qid,
"identifier_url": f"https://www.wikidata.org/wiki/{qid}",
})
# Remove re-enrichment note
if "provenance" in entry and "notes" in entry["provenance"]:
notes = entry["provenance"]["notes"]
if "Re-enrichment required" in notes:
entry["provenance"]["notes"] = notes.split("Re-enrichment required")[0].strip()
# Save updated file
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
logger.info(f"✓ Enriched {file_path.name} with {qid} ({best_match.get('label')}) - confidence: {best_confidence:.2f}")
return {
"status": "verified_match",
"file": str(file_path),
"name": name,
"qid": qid,
"label": best_match.get("label"),
"confidence": best_confidence,
"subtype": verification.get("subtype"),
}
async def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description="Re-enrich Wikidata with LLM verification")
parser.add_argument("--limit", type=int, default=100, help="Max files to process")
parser.add_argument("--dry-run", action="store_true", help="Don't modify files")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--use-claude", action="store_true", help="Use Claude instead of GLM-4.6")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
custodian_dir = Path("/Users/kempersc/apps/glam/data/custodian")
llm_name = "Claude" if args.use_claude else "GLM-4.6"
print("=" * 60)
print(f"Wikidata Re-enrichment with {llm_name} CH Annotator Verification")
print("=" * 60)
print()
# Find files needing re-enrichment
print("Finding files needing re-enrichment...")
files = await find_files_needing_reenrichment(custodian_dir)
print(f"Found {len(files)} files needing re-enrichment")
if not files:
print("No files to process!")
return
# Limit files
files = files[:args.limit]
print(f"Processing {len(files)} files (limit: {args.limit})")
print()
if args.dry_run:
print("DRY RUN - no files will be modified")
for f in files[:20]:
print(f" Would process: {f.name}")
return
# Initialize clients
wd_client = WikidataSearchClient()
verifier = GLMHeritageVerifier(use_claude=args.use_claude)
# Process files
results = {
"verified_match": [],
"no_verified_match": [],
"not_found": [],
"no_name": [],
"error": [],
}
try:
for i, file_path in enumerate(files, 1):
print(f"\n[{i}/{len(files)}] Processing {file_path.name}...")
try:
result = await enrich_file_with_wikidata(file_path, wd_client, verifier)
status = result.get("status", "error")
results.setdefault(status, []).append(result)
if status == "verified_match":
print(f"{result.get('qid')} ({result.get('label')}) - {result.get('confidence', 0):.2f}")
elif status == "no_verified_match":
print(f" ✗ No verified match (checked {result.get('candidates', 0)} candidates)")
elif status == "not_found":
print(f" ✗ No Wikidata candidates found")
elif status == "no_name":
print(f" ⚠ No institution name found")
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
results["error"].append({"file": str(file_path), "error": str(e)})
# Rate limiting
await asyncio.sleep(0.5)
finally:
await wd_client.close()
await verifier.close()
# Print summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"Verified matches: {len(results.get('verified_match', []))}")
print(f"No verified match: {len(results.get('no_verified_match', []))}")
print(f"Not found: {len(results.get('not_found', []))}")
print(f"No name: {len(results.get('no_name', []))}")
print(f"Errors: {len(results.get('error', []))}")
print()
# Save results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_file = Path(f"/Users/kempersc/apps/glam/reports/wikidata_reenrichment_{timestamp}.yaml")
results_file.parent.mkdir(parents=True, exist_ok=True)
with open(results_file, 'w', encoding='utf-8') as f:
yaml.dump({
"timestamp": datetime.now(timezone.utc).isoformat(),
"files_processed": len(files),
"results": results,
}, f, allow_unicode=True, default_flow_style=False)
print(f"Results saved to: {results_file}")
if __name__ == "__main__":
asyncio.run(main())