- Add reenrich_wikidata_with_verification.py for re-running enrichment - Add remove_wikidata_duplicates.py for deduplication
958 lines
36 KiB
Python
Executable file
958 lines
36 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Re-enrich heritage institutions with Wikidata using GLM-4.6 CH Annotator verification.
|
|
|
|
This script:
|
|
1. Finds files marked for re-enrichment (after duplicate cleanup)
|
|
2. Queries Wikidata API for candidates by institution name
|
|
3. Uses GLM-4.6 to verify matches based on CH Annotator entity types (GRP.HER)
|
|
4. Only adds Wikidata enrichment if entity is verified as heritage institution
|
|
5. Updates files with verified Wikidata data
|
|
|
|
CH Annotator Convention (v1.7.0):
|
|
- Heritage institutions are type GRP.HER (glam:HeritageCustodian)
|
|
- Maps to: org:FormalOrganization, rov:RegisteredOrganization, schema:Museum, schema:Library, schema:ArchiveOrganization
|
|
- Subtypes: GRP.HER.GAL (Gallery), GRP.HER.LIB (Library), GRP.HER.ARC (Archive), GRP.HER.MUS (Museum)
|
|
|
|
Wikidata "instance of" (P31) values for heritage institutions:
|
|
- Q33506 (museum)
|
|
- Q7075 (library)
|
|
- Q166118 (archive)
|
|
- Q1007870 (art gallery)
|
|
- Q207694 (art museum)
|
|
- Q1970365 (natural history museum)
|
|
- Q18388277 (history museum)
|
|
- Q23413 (castle) - when used as museum
|
|
- Q839954 (archaeological site)
|
|
- Q174782 (town square) - NOT heritage institution
|
|
- Q515 (city) - NOT heritage institution
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
import yaml
|
|
import httpx
|
|
import logging
|
|
|
|
# Load environment variables from .env file
|
|
from dotenv import load_dotenv
|
|
env_path = Path(__file__).parent.parent / ".env"
|
|
load_dotenv(env_path)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
|
|
|
# =============================================================================
|
|
# WIKIDATA HERITAGE INSTITUTION TYPE CLASSES (P31 values)
|
|
# =============================================================================
|
|
|
|
# These are valid Wikidata "instance of" values for heritage institutions
|
|
HERITAGE_P31_TYPES = {
|
|
# Museums
|
|
"Q33506": "museum",
|
|
"Q207694": "art museum",
|
|
"Q1970365": "natural history museum",
|
|
"Q18388277": "history museum",
|
|
"Q2087181": "university museum",
|
|
"Q1007870": "art gallery",
|
|
"Q17431399": "national museum",
|
|
"Q16735822": "museum building",
|
|
"Q1788742": "war museum",
|
|
"Q7889618": "gallery of art",
|
|
"Q4989906": "monuments and memorials",
|
|
"Q57660343": "maritime museum",
|
|
"Q15206070": "transport museum",
|
|
"Q214090": "ethnographic museum",
|
|
"Q2522387": "aviation museum",
|
|
"Q841573": "archaeological museum",
|
|
"Q28737012": "memorial museum",
|
|
"Q588140": "railway museum",
|
|
"Q515034": "science museum",
|
|
"Q4287745": "local museum",
|
|
|
|
# Libraries
|
|
"Q7075": "library",
|
|
"Q856234": "national library",
|
|
"Q1078570": "academic library",
|
|
"Q11294": "public library",
|
|
"Q13226383": "research library",
|
|
|
|
# Archives
|
|
"Q166118": "archive",
|
|
"Q473972": "national archives",
|
|
"Q1423895": "film archive",
|
|
"Q2066131": "regional archive",
|
|
"Q63400100": "historical archive",
|
|
"Q63400127": "municipal archive",
|
|
"Q1026954": "photo archive",
|
|
|
|
# Galleries
|
|
"Q1007870": "art gallery",
|
|
"Q7889618": "gallery of art",
|
|
|
|
# Research centers
|
|
"Q31855": "research institute",
|
|
"Q327333": "heritage organisation",
|
|
|
|
# Botanical/Zoo
|
|
"Q43229": "botanical garden",
|
|
"Q45585": "botanical garden",
|
|
"Q43501": "zoo",
|
|
|
|
# Holy sites (when managing heritage collections)
|
|
"Q317557": "monastery",
|
|
"Q83405": "abbey",
|
|
"Q1088552": "cathedral chapter",
|
|
|
|
# Educational (with collections)
|
|
"Q3918": "university",
|
|
"Q875538": "public university",
|
|
}
|
|
|
|
# These P31 values indicate NOT a heritage institution
|
|
NON_HERITAGE_P31_TYPES = {
|
|
"Q515": "city",
|
|
"Q174782": "square",
|
|
"Q5": "human",
|
|
"Q4830453": "business",
|
|
"Q891723": "public company",
|
|
"Q783794": "company",
|
|
"Q6881511": "enterprise",
|
|
"Q43229": "organization", # Too generic
|
|
"Q55678": "movie",
|
|
"Q7366": "song",
|
|
"Q5398426": "television series",
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# WIKIDATA API CLIENT
|
|
# =============================================================================
|
|
|
|
class WikidataSearchClient:
|
|
"""Client for Wikidata search and entity API."""
|
|
|
|
SEARCH_URL = "https://www.wikidata.org/w/api.php"
|
|
ENTITY_URL = "https://www.wikidata.org/wiki/Special:EntityData/{qid}.json"
|
|
|
|
def __init__(self, contact_email: Optional[str] = None):
|
|
self.contact_email = contact_email or os.environ.get("WIKIMEDIA_CONTACT_EMAIL", "glam@example.org")
|
|
self.client = httpx.AsyncClient(
|
|
timeout=30.0,
|
|
headers={
|
|
"User-Agent": f"GLAMBot/1.0 ({self.contact_email})",
|
|
}
|
|
)
|
|
|
|
async def search_entity(self, name: str, language: str = "en", limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search Wikidata for entities matching a name.
|
|
|
|
Returns list of candidates with qid, label, description.
|
|
"""
|
|
params = {
|
|
"action": "wbsearchentities",
|
|
"format": "json",
|
|
"language": language,
|
|
"type": "item",
|
|
"limit": limit,
|
|
"search": name,
|
|
}
|
|
|
|
try:
|
|
response = await self.client.get(self.SEARCH_URL, params=params)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
results = []
|
|
for item in data.get("search", []):
|
|
results.append({
|
|
"qid": item.get("id"),
|
|
"label": item.get("label"),
|
|
"description": item.get("description", ""),
|
|
"url": item.get("concepturi"),
|
|
})
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Wikidata search error for '{name}': {e}")
|
|
return []
|
|
|
|
async def get_entity_claims(self, qid: str) -> Dict[str, Any]:
|
|
"""
|
|
Get entity claims (properties) from Wikidata.
|
|
|
|
Returns dict with P31 (instance of), P131 (located in), P625 (coordinates), etc.
|
|
"""
|
|
url = self.ENTITY_URL.format(qid=qid)
|
|
|
|
try:
|
|
response = await self.client.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
entity = data.get("entities", {}).get(qid, {})
|
|
claims = entity.get("claims", {})
|
|
labels = entity.get("labels", {})
|
|
descriptions = entity.get("descriptions", {})
|
|
|
|
# Extract P31 values (instance of)
|
|
p31_values = []
|
|
for claim in claims.get("P31", []):
|
|
mainsnak = claim.get("mainsnak", {})
|
|
if mainsnak.get("snaktype") == "value":
|
|
datavalue = mainsnak.get("datavalue", {})
|
|
if datavalue.get("type") == "wikibase-entityid":
|
|
p31_qid = datavalue.get("value", {}).get("id")
|
|
if p31_qid:
|
|
p31_values.append(p31_qid)
|
|
|
|
# Extract P131 (located in administrative entity)
|
|
p131_values = []
|
|
for claim in claims.get("P131", []):
|
|
mainsnak = claim.get("mainsnak", {})
|
|
if mainsnak.get("snaktype") == "value":
|
|
datavalue = mainsnak.get("datavalue", {})
|
|
if datavalue.get("type") == "wikibase-entityid":
|
|
p131_qid = datavalue.get("value", {}).get("id")
|
|
if p131_qid:
|
|
p131_values.append(p131_qid)
|
|
|
|
# Extract P625 (coordinates)
|
|
coordinates = None
|
|
for claim in claims.get("P625", []):
|
|
mainsnak = claim.get("mainsnak", {})
|
|
if mainsnak.get("snaktype") == "value":
|
|
datavalue = mainsnak.get("datavalue", {})
|
|
if datavalue.get("type") == "globecoordinate":
|
|
value = datavalue.get("value", {})
|
|
coordinates = {
|
|
"latitude": value.get("latitude"),
|
|
"longitude": value.get("longitude"),
|
|
}
|
|
break
|
|
|
|
# Extract P17 (country)
|
|
country = None
|
|
for claim in claims.get("P17", []):
|
|
mainsnak = claim.get("mainsnak", {})
|
|
if mainsnak.get("snaktype") == "value":
|
|
datavalue = mainsnak.get("datavalue", {})
|
|
if datavalue.get("type") == "wikibase-entityid":
|
|
country = datavalue.get("value", {}).get("id")
|
|
break
|
|
|
|
return {
|
|
"qid": qid,
|
|
"labels": {k: v.get("value") for k, v in labels.items()},
|
|
"descriptions": {k: v.get("value") for k, v in descriptions.items()},
|
|
"p31": p31_values,
|
|
"p131": p131_values,
|
|
"p17_country": country,
|
|
"coordinates": coordinates,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Wikidata entity fetch error for {qid}: {e}")
|
|
return {}
|
|
|
|
async def close(self):
|
|
await self.client.aclose()
|
|
|
|
|
|
# =============================================================================
|
|
# GLM-4.6 CH ANNOTATOR VERIFICATION
|
|
# =============================================================================
|
|
|
|
class GLMHeritageVerifier:
|
|
"""
|
|
Verify Wikidata entity matches using GLM-4.6 CH Annotator.
|
|
|
|
Uses CH Annotator v1.7.0 entity type GRP.HER to verify that
|
|
a Wikidata entity is actually a heritage institution.
|
|
"""
|
|
|
|
# Z.AI Coding Plan endpoint (different from regular BigModel API)
|
|
ZAI_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions"
|
|
|
|
VERIFICATION_PROMPT = """You are a heritage institution classifier following CH-Annotator v1.7.0 convention.
|
|
|
|
Your task is to determine if a Wikidata entity is a heritage institution (type GRP.HER).
|
|
|
|
## CH-Annotator GRP.HER Definition
|
|
Heritage institutions are organizations that:
|
|
- Collect, preserve, and provide access to cultural heritage materials
|
|
- Include: museums (GRP.HER.MUS), libraries (GRP.HER.LIB), archives (GRP.HER.ARC), galleries (GRP.HER.GAL)
|
|
- May also include: research centers, botanical gardens, educational institutions WITH collections
|
|
|
|
## Entity Types That Are NOT Heritage Institutions
|
|
- Cities, towns, municipalities (these are places, not institutions)
|
|
- General businesses or companies (unless they manage heritage collections)
|
|
- People (individuals are AGT.PER, not GRP.HER)
|
|
- Events, festivals, exhibitions (temporary, not institutions)
|
|
- Buildings without institutional function (just architecture)
|
|
|
|
## Your Task
|
|
Analyze the Wikidata entity data and determine:
|
|
1. Is this entity a heritage institution (GRP.HER)?
|
|
2. If yes, what subtype? (MUS/LIB/ARC/GAL/OTHER)
|
|
3. Confidence score (0.0-1.0)
|
|
|
|
Respond in JSON format:
|
|
```json
|
|
{{
|
|
"is_heritage_institution": true/false,
|
|
"subtype": "MUS|LIB|ARC|GAL|RES|BOT|EDU|OTHER|null",
|
|
"confidence": 0.95,
|
|
"reasoning": "Brief explanation"
|
|
}}
|
|
```
|
|
|
|
## Entity to Analyze
|
|
Institution name from our data: {institution_name}
|
|
Location from our data: {institution_location}
|
|
|
|
Wikidata entity:
|
|
- QID: {qid}
|
|
- Label: {wd_label}
|
|
- Description: {wd_description}
|
|
- Instance of (P31): {p31_types}
|
|
- Located in (P131): {p131_location}
|
|
"""
|
|
|
|
def __init__(self, api_key: Optional[str] = None, model: str = "glm-4.6", use_claude: bool = False):
|
|
self.use_claude = use_claude
|
|
|
|
if use_claude:
|
|
self.api_key = api_key or os.environ.get("CLAUDE_API_KEY")
|
|
self.model = "claude-3-5-haiku-20241022" # Fast, cheap model
|
|
self.api_url = "https://api.anthropic.com/v1/messages"
|
|
if not self.api_key:
|
|
raise ValueError("CLAUDE_API_KEY not found in environment")
|
|
self.client = httpx.AsyncClient(
|
|
timeout=60.0,
|
|
headers={
|
|
"x-api-key": self.api_key,
|
|
"anthropic-version": "2023-06-01",
|
|
"Content-Type": "application/json",
|
|
}
|
|
)
|
|
else:
|
|
self.api_key = api_key or os.environ.get("ZAI_API_TOKEN")
|
|
self.model = model
|
|
# Use Z.AI Coding Plan endpoint (same as OpenCode)
|
|
self.api_url = "https://api.z.ai/api/coding/paas/v4/chat/completions"
|
|
if not self.api_key:
|
|
raise ValueError("ZAI_API_TOKEN not found in environment")
|
|
self.client = httpx.AsyncClient(
|
|
timeout=60.0,
|
|
headers={
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
)
|
|
|
|
async def verify_heritage_institution(
|
|
self,
|
|
institution_name: str,
|
|
institution_location: str,
|
|
qid: str,
|
|
wd_label: str,
|
|
wd_description: str,
|
|
p31_types: List[str],
|
|
p131_location: List[str],
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Verify if a Wikidata entity matches a heritage institution.
|
|
|
|
Returns verification result with confidence score.
|
|
"""
|
|
# First, quick heuristic check using P31 types
|
|
p31_set = set(p31_types)
|
|
|
|
# Check for definite heritage types
|
|
heritage_matches = p31_set & set(HERITAGE_P31_TYPES.keys())
|
|
non_heritage_matches = p31_set & set(NON_HERITAGE_P31_TYPES.keys())
|
|
|
|
# If clear non-heritage type, reject without LLM call
|
|
if non_heritage_matches and not heritage_matches:
|
|
logger.debug(f"Quick reject {qid}: P31 indicates non-heritage ({non_heritage_matches})")
|
|
return {
|
|
"is_heritage_institution": False,
|
|
"subtype": None,
|
|
"confidence": 0.95,
|
|
"reasoning": f"P31 types indicate non-heritage: {[NON_HERITAGE_P31_TYPES.get(t, t) for t in non_heritage_matches]}",
|
|
"verification_method": "p31_heuristic",
|
|
}
|
|
|
|
# If clear heritage type, high confidence without LLM
|
|
if heritage_matches and not non_heritage_matches:
|
|
subtype = self._infer_subtype_from_p31(p31_types)
|
|
logger.debug(f"Quick accept {qid}: P31 indicates heritage ({heritage_matches})")
|
|
return {
|
|
"is_heritage_institution": True,
|
|
"subtype": subtype,
|
|
"confidence": 0.9,
|
|
"reasoning": f"P31 types indicate heritage: {[HERITAGE_P31_TYPES.get(t, t) for t in heritage_matches]}",
|
|
"verification_method": "p31_heuristic",
|
|
}
|
|
|
|
# Ambiguous case - use GLM-4.6 for verification
|
|
p31_labels = [HERITAGE_P31_TYPES.get(t, NON_HERITAGE_P31_TYPES.get(t, t)) for t in p31_types]
|
|
|
|
prompt = self.VERIFICATION_PROMPT.format(
|
|
institution_name=institution_name,
|
|
institution_location=institution_location,
|
|
qid=qid,
|
|
wd_label=wd_label,
|
|
wd_description=wd_description,
|
|
p31_types=", ".join(p31_labels) if p31_labels else "None specified",
|
|
p131_location=", ".join(p131_location) if p131_location else "Not specified",
|
|
)
|
|
|
|
try:
|
|
if self.use_claude:
|
|
# Claude API request format
|
|
response = await self.client.post(
|
|
self.api_url,
|
|
json={
|
|
"model": self.model,
|
|
"max_tokens": 512,
|
|
"messages": [
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"system": "You are a heritage institution classifier. Respond only in valid JSON. Start your response with { and end with }.",
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
content = data.get("content", [{}])[0].get("text", "")
|
|
logger.debug(f"Claude raw response for {qid}: {content[:300]}")
|
|
verification_method = "claude_ch_annotator"
|
|
else:
|
|
# GLM/Z.AI API request format
|
|
response = await self.client.post(
|
|
self.api_url,
|
|
json={
|
|
"model": self.model,
|
|
"messages": [
|
|
{"role": "system", "content": "You are a heritage institution classifier. Respond only in valid JSON."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"temperature": 0.1,
|
|
"max_tokens": 512,
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
verification_method = "glm_4.6_ch_annotator"
|
|
|
|
# Parse JSON from response
|
|
try:
|
|
# Extract JSON from markdown code blocks if present
|
|
if "```json" in content:
|
|
content = content.split("```json")[1].split("```")[0]
|
|
elif "```" in content:
|
|
content = content.split("```")[1].split("```")[0]
|
|
|
|
# Try to find JSON object in content
|
|
content = content.strip()
|
|
|
|
# If content doesn't start with {, try to find first {
|
|
if not content.startswith("{"):
|
|
start_idx = content.find("{")
|
|
if start_idx != -1:
|
|
# Find matching closing brace
|
|
brace_count = 0
|
|
end_idx = start_idx
|
|
for i, char in enumerate(content[start_idx:], start_idx):
|
|
if char == "{":
|
|
brace_count += 1
|
|
elif char == "}":
|
|
brace_count -= 1
|
|
if brace_count == 0:
|
|
end_idx = i
|
|
break
|
|
content = content[start_idx:end_idx + 1]
|
|
else:
|
|
# No { found - wrap content in braces if it looks like JSON body
|
|
if '"is_heritage_institution"' in content:
|
|
content = "{" + content.rstrip().rstrip(",") + "}"
|
|
|
|
result = json.loads(content)
|
|
result["verification_method"] = verification_method
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
# Fallback: try to extract values with regex
|
|
logger.debug(f"JSON parse failed for {qid}, trying regex fallback: {content[:200]}")
|
|
|
|
is_heritage = None
|
|
subtype = None
|
|
confidence = 0.5
|
|
reasoning = "Parsed from non-JSON response"
|
|
|
|
# Check for is_heritage_institution value
|
|
if '"is_heritage_institution"' in content:
|
|
if 'true' in content.lower():
|
|
is_heritage = True
|
|
elif 'false' in content.lower():
|
|
is_heritage = False
|
|
|
|
# Extract subtype
|
|
subtype_match = re.search(r'"subtype"\s*:\s*"([^"]+)"', content)
|
|
if subtype_match:
|
|
subtype = subtype_match.group(1)
|
|
|
|
# Extract confidence
|
|
conf_match = re.search(r'"confidence"\s*:\s*([\d.]+)', content)
|
|
if conf_match:
|
|
try:
|
|
confidence = float(conf_match.group(1))
|
|
except ValueError:
|
|
pass
|
|
|
|
# Extract reasoning
|
|
reason_match = re.search(r'"reasoning"\s*:\s*"([^"]+)"', content)
|
|
if reason_match:
|
|
reasoning = reason_match.group(1)
|
|
|
|
if is_heritage is not None:
|
|
return {
|
|
"is_heritage_institution": is_heritage,
|
|
"subtype": subtype,
|
|
"confidence": confidence,
|
|
"reasoning": reasoning,
|
|
"verification_method": f"{verification_method}_regex_fallback",
|
|
}
|
|
|
|
logger.warning(f"Failed to parse LLM response for {qid}: {str(e)[:100]} - content: {content[:200]}")
|
|
return {
|
|
"is_heritage_institution": False,
|
|
"subtype": None,
|
|
"confidence": 0.0,
|
|
"reasoning": f"Failed to parse LLM response: {str(e)}",
|
|
"verification_method": f"{verification_method}_parse_error",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"LLM verification error for {qid}: {e}")
|
|
return {
|
|
"is_heritage_institution": False,
|
|
"subtype": None,
|
|
"confidence": 0.0,
|
|
"reasoning": f"API error: {e}",
|
|
"verification_method": "llm_api_error",
|
|
}
|
|
|
|
def _infer_subtype_from_p31(self, p31_types: List[str]) -> str:
|
|
"""Infer heritage institution subtype from P31 values."""
|
|
p31_set = set(p31_types)
|
|
|
|
# Museum types
|
|
museum_types = {"Q33506", "Q207694", "Q1970365", "Q18388277", "Q2087181", "Q17431399",
|
|
"Q1788742", "Q57660343", "Q15206070", "Q214090", "Q2522387",
|
|
"Q841573", "Q28737012", "Q588140", "Q515034", "Q4287745"}
|
|
if p31_set & museum_types:
|
|
return "MUS"
|
|
|
|
# Library types
|
|
library_types = {"Q7075", "Q856234", "Q1078570", "Q11294", "Q13226383"}
|
|
if p31_set & library_types:
|
|
return "LIB"
|
|
|
|
# Archive types
|
|
archive_types = {"Q166118", "Q473972", "Q1423895", "Q2066131", "Q63400100", "Q63400127", "Q1026954"}
|
|
if p31_set & archive_types:
|
|
return "ARC"
|
|
|
|
# Gallery types
|
|
gallery_types = {"Q1007870", "Q7889618"}
|
|
if p31_set & gallery_types:
|
|
return "GAL"
|
|
|
|
# Research centers
|
|
if "Q31855" in p31_set or "Q327333" in p31_set:
|
|
return "RES"
|
|
|
|
# Botanical/Zoo
|
|
if "Q43229" in p31_set or "Q45585" in p31_set or "Q43501" in p31_set:
|
|
return "BOT"
|
|
|
|
# Educational
|
|
if "Q3918" in p31_set or "Q875538" in p31_set:
|
|
return "EDU"
|
|
|
|
return "OTHER"
|
|
|
|
async def close(self):
|
|
await self.client.aclose()
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN ENRICHMENT LOGIC
|
|
# =============================================================================
|
|
|
|
async def find_files_needing_reenrichment(custodian_dir: Path) -> List[Path]:
|
|
"""Find all files marked for re-enrichment."""
|
|
files = []
|
|
|
|
for file_path in custodian_dir.glob("*.yaml"):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
if "Re-enrichment required" in content:
|
|
files.append(file_path)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error reading {file_path}: {e}")
|
|
|
|
return files
|
|
|
|
|
|
async def enrich_file_with_wikidata(
|
|
file_path: Path,
|
|
wd_client: WikidataSearchClient,
|
|
verifier: GLMHeritageVerifier,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Enrich a single file with verified Wikidata data.
|
|
|
|
Returns enrichment result.
|
|
"""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
if not entry:
|
|
return {"status": "empty_file", "file": str(file_path)}
|
|
|
|
# Get institution name
|
|
name = None
|
|
if "custodian_name" in entry and isinstance(entry["custodian_name"], dict):
|
|
name = entry["custodian_name"].get("claim_value")
|
|
if not name and "google_maps_enrichment" in entry:
|
|
name = entry["google_maps_enrichment"].get("name")
|
|
if not name and "original_entry" in entry:
|
|
name = entry["original_entry"].get("organisatie") or entry["original_entry"].get("instelling")
|
|
|
|
if not name:
|
|
return {"status": "no_name", "file": str(file_path)}
|
|
|
|
# Get location for verification
|
|
location = ""
|
|
if "google_maps_enrichment" in entry:
|
|
gm = entry["google_maps_enrichment"]
|
|
parts = []
|
|
if gm.get("short_address"):
|
|
parts.append(gm["short_address"])
|
|
elif gm.get("formatted_address"):
|
|
parts.append(gm["formatted_address"])
|
|
location = ", ".join(parts)
|
|
elif "original_entry" in entry:
|
|
oe = entry["original_entry"]
|
|
parts = []
|
|
if oe.get("plaatsnaam_bezoekadres"):
|
|
parts.append(oe["plaatsnaam_bezoekadres"])
|
|
if oe.get("provincie"):
|
|
parts.append(oe["provincie"])
|
|
location = ", ".join(parts)
|
|
|
|
# Get country for search language
|
|
country_code = "NL" # Default
|
|
if "ghcid" in entry:
|
|
ghcid = entry["ghcid"].get("ghcid_current", "")
|
|
if ghcid and len(ghcid) >= 2:
|
|
country_code = ghcid[:2]
|
|
|
|
# Determine search language based on country
|
|
search_langs = ["en"] # Always search English
|
|
if country_code == "NL":
|
|
search_langs = ["nl", "en"]
|
|
elif country_code == "BE":
|
|
search_langs = ["nl", "fr", "en"]
|
|
elif country_code == "DE":
|
|
search_langs = ["de", "en"]
|
|
elif country_code == "FR":
|
|
search_langs = ["fr", "en"]
|
|
elif country_code in ["BR", "PT"]:
|
|
search_langs = ["pt", "en"]
|
|
elif country_code in ["ES", "MX", "AR", "CL", "CO"]:
|
|
search_langs = ["es", "en"]
|
|
|
|
# Search Wikidata for candidates
|
|
all_candidates = []
|
|
for lang in search_langs:
|
|
candidates = await wd_client.search_entity(name, language=lang, limit=5)
|
|
all_candidates.extend(candidates)
|
|
await asyncio.sleep(0.2) # Rate limiting
|
|
|
|
# Deduplicate by QID
|
|
seen_qids = set()
|
|
unique_candidates = []
|
|
for c in all_candidates:
|
|
if c["qid"] not in seen_qids:
|
|
seen_qids.add(c["qid"])
|
|
unique_candidates.append(c)
|
|
|
|
if not unique_candidates:
|
|
# Update file to mark as not found
|
|
entry["wikidata_enrichment_status"] = "NOT_FOUND"
|
|
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Remove re-enrichment note from provenance
|
|
if "provenance" in entry and "notes" in entry["provenance"]:
|
|
notes = entry["provenance"]["notes"]
|
|
if "Re-enrichment required" in notes:
|
|
entry["provenance"]["notes"] = notes.split("Re-enrichment required")[0].strip()
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return {"status": "not_found", "file": str(file_path), "name": name}
|
|
|
|
# Verify each candidate
|
|
best_match = None
|
|
best_confidence = 0.0
|
|
|
|
for candidate in unique_candidates[:5]: # Limit to top 5
|
|
qid = candidate["qid"]
|
|
|
|
# Get entity details
|
|
entity_data = await wd_client.get_entity_claims(qid)
|
|
await asyncio.sleep(0.2)
|
|
|
|
if not entity_data:
|
|
continue
|
|
|
|
# Verify with GLM-4.6
|
|
verification = await verifier.verify_heritage_institution(
|
|
institution_name=name,
|
|
institution_location=location,
|
|
qid=qid,
|
|
wd_label=candidate.get("label", ""),
|
|
wd_description=candidate.get("description", ""),
|
|
p31_types=entity_data.get("p31", []),
|
|
p131_location=[str(x) for x in entity_data.get("p131", [])],
|
|
)
|
|
|
|
if verification.get("is_heritage_institution") and verification.get("confidence", 0) > best_confidence:
|
|
best_match = {
|
|
"qid": qid,
|
|
"label": candidate.get("label"),
|
|
"description": candidate.get("description"),
|
|
"entity_data": entity_data,
|
|
"verification": verification,
|
|
}
|
|
best_confidence = verification.get("confidence", 0)
|
|
|
|
if not best_match or best_confidence < 0.5:
|
|
# No verified match found
|
|
entry["wikidata_enrichment_status"] = "NO_VERIFIED_MATCH"
|
|
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
|
|
entry["wikidata_candidates_checked"] = len(unique_candidates)
|
|
|
|
# Remove re-enrichment note
|
|
if "provenance" in entry and "notes" in entry["provenance"]:
|
|
notes = entry["provenance"]["notes"]
|
|
if "Re-enrichment required" in notes:
|
|
entry["provenance"]["notes"] = notes.split("Re-enrichment required")[0].strip()
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return {"status": "no_verified_match", "file": str(file_path), "name": name, "candidates": len(unique_candidates)}
|
|
|
|
# Add verified Wikidata enrichment
|
|
qid = best_match["qid"]
|
|
entity_data = best_match["entity_data"]
|
|
verification = best_match["verification"]
|
|
|
|
entry["wikidata_enrichment"] = {
|
|
"wikidata_id": qid,
|
|
"wikidata_url": f"https://www.wikidata.org/wiki/{qid}",
|
|
"wikidata_label": best_match.get("label"),
|
|
"wikidata_description": best_match.get("description"),
|
|
"labels": entity_data.get("labels", {}),
|
|
"descriptions": entity_data.get("descriptions", {}),
|
|
"instance_of": entity_data.get("p31", []),
|
|
"located_in": entity_data.get("p131", []),
|
|
"country": entity_data.get("p17_country"),
|
|
"coordinates": entity_data.get("coordinates"),
|
|
"enrichment_timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"verification": {
|
|
"method": verification.get("verification_method"),
|
|
"confidence": verification.get("confidence"),
|
|
"subtype": verification.get("subtype"),
|
|
"reasoning": verification.get("reasoning"),
|
|
"ch_annotator_version": "v1.7.0",
|
|
},
|
|
}
|
|
|
|
entry["wikidata_enrichment_status"] = "VERIFIED"
|
|
entry["wikidata_search_timestamp"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Add Wikidata ID to identifiers
|
|
if "identifiers" not in entry:
|
|
entry["identifiers"] = []
|
|
|
|
# Check if Wikidata ID already exists
|
|
existing_schemes = {i.get("identifier_scheme") for i in entry["identifiers"] if isinstance(i, dict)}
|
|
if "Wikidata" not in existing_schemes:
|
|
entry["identifiers"].append({
|
|
"identifier_scheme": "Wikidata",
|
|
"identifier_value": qid,
|
|
"identifier_url": f"https://www.wikidata.org/wiki/{qid}",
|
|
})
|
|
|
|
# Remove re-enrichment note
|
|
if "provenance" in entry and "notes" in entry["provenance"]:
|
|
notes = entry["provenance"]["notes"]
|
|
if "Re-enrichment required" in notes:
|
|
entry["provenance"]["notes"] = notes.split("Re-enrichment required")[0].strip()
|
|
|
|
# Save updated file
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
logger.info(f"✓ Enriched {file_path.name} with {qid} ({best_match.get('label')}) - confidence: {best_confidence:.2f}")
|
|
|
|
return {
|
|
"status": "verified_match",
|
|
"file": str(file_path),
|
|
"name": name,
|
|
"qid": qid,
|
|
"label": best_match.get("label"),
|
|
"confidence": best_confidence,
|
|
"subtype": verification.get("subtype"),
|
|
}
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Re-enrich Wikidata with LLM verification")
|
|
parser.add_argument("--limit", type=int, default=100, help="Max files to process")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't modify files")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
parser.add_argument("--use-claude", action="store_true", help="Use Claude instead of GLM-4.6")
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
custodian_dir = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
|
|
llm_name = "Claude" if args.use_claude else "GLM-4.6"
|
|
print("=" * 60)
|
|
print(f"Wikidata Re-enrichment with {llm_name} CH Annotator Verification")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
# Find files needing re-enrichment
|
|
print("Finding files needing re-enrichment...")
|
|
files = await find_files_needing_reenrichment(custodian_dir)
|
|
print(f"Found {len(files)} files needing re-enrichment")
|
|
|
|
if not files:
|
|
print("No files to process!")
|
|
return
|
|
|
|
# Limit files
|
|
files = files[:args.limit]
|
|
print(f"Processing {len(files)} files (limit: {args.limit})")
|
|
print()
|
|
|
|
if args.dry_run:
|
|
print("DRY RUN - no files will be modified")
|
|
for f in files[:20]:
|
|
print(f" Would process: {f.name}")
|
|
return
|
|
|
|
# Initialize clients
|
|
wd_client = WikidataSearchClient()
|
|
verifier = GLMHeritageVerifier(use_claude=args.use_claude)
|
|
|
|
# Process files
|
|
results = {
|
|
"verified_match": [],
|
|
"no_verified_match": [],
|
|
"not_found": [],
|
|
"no_name": [],
|
|
"error": [],
|
|
}
|
|
|
|
try:
|
|
for i, file_path in enumerate(files, 1):
|
|
print(f"\n[{i}/{len(files)}] Processing {file_path.name}...")
|
|
|
|
try:
|
|
result = await enrich_file_with_wikidata(file_path, wd_client, verifier)
|
|
status = result.get("status", "error")
|
|
results.setdefault(status, []).append(result)
|
|
|
|
if status == "verified_match":
|
|
print(f" ✓ {result.get('qid')} ({result.get('label')}) - {result.get('confidence', 0):.2f}")
|
|
elif status == "no_verified_match":
|
|
print(f" ✗ No verified match (checked {result.get('candidates', 0)} candidates)")
|
|
elif status == "not_found":
|
|
print(f" ✗ No Wikidata candidates found")
|
|
elif status == "no_name":
|
|
print(f" ⚠ No institution name found")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}")
|
|
results["error"].append({"file": str(file_path), "error": str(e)})
|
|
|
|
# Rate limiting
|
|
await asyncio.sleep(0.5)
|
|
|
|
finally:
|
|
await wd_client.close()
|
|
await verifier.close()
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Verified matches: {len(results.get('verified_match', []))}")
|
|
print(f"No verified match: {len(results.get('no_verified_match', []))}")
|
|
print(f"Not found: {len(results.get('not_found', []))}")
|
|
print(f"No name: {len(results.get('no_name', []))}")
|
|
print(f"Errors: {len(results.get('error', []))}")
|
|
print()
|
|
|
|
# Save results
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
results_file = Path(f"/Users/kempersc/apps/glam/reports/wikidata_reenrichment_{timestamp}.yaml")
|
|
results_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(results_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump({
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"files_processed": len(files),
|
|
"results": results,
|
|
}, f, allow_unicode=True, default_flow_style=False)
|
|
|
|
print(f"Results saved to: {results_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|