glam/scripts/validate_wikidata_enrichments.py
2025-12-09 07:56:35 +01:00

769 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Validate Wikidata enrichments using GLM-4.6 with CH-Annotator v1.7.0.
This script validates that Wikidata entities linked to custodian records
actually represent heritage institutions (GRP.HER) and that temporal data
is reasonable.
Validation checks:
1. Entity Type Match: Wikidata P31 (instance of) aligns with expected heritage types
2. Name Match: Wikidata label reasonably matches custodian name
3. Temporal Plausibility: Inception dates are reasonable (not future, not impossibly old)
4. Location Consistency: P131 (located in) aligns with custodian location data
Uses GLM-4.6 for ambiguous cases where heuristics are insufficient.
Usage:
python scripts/validate_wikidata_enrichments.py [--country XX] [--limit N] [--dry-run]
python scripts/validate_wikidata_enrichments.py --country JP --limit 50
Options:
--country XX Only validate files for country code XX (e.g., JP, CZ)
--limit N Process only first N files (for testing)
--dry-run Show what would be validated without making API calls
--fix Automatically fix or flag issues
--report FILE Write validation report to FILE (default: validation_report.json)
Environment Variables:
ZAI_API_TOKEN - Required for GLM-4.6 verification (from Z.AI Coding Plan)
CH-Annotator Reference:
Entity type: GRP.HER (Heritage Institution)
Subtypes: GRP.HER.MUS, GRP.HER.LIB, GRP.HER.ARC, GRP.HER.GAL, etc.
See: data/entity_annotation/ch_annotator-v1_7_0.yaml
"""
import argparse
import asyncio
import json
import logging
import os
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
import httpx
import yaml
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
REPORT_DIR = Path(__file__).parent.parent / "reports" / "validation"
# Z.AI API Configuration (from AGENTS.md Rule 11)
ZAI_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions"
ZAI_MODEL = "glm-4.6"
ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "")
# Request delay for API rate limiting
REQUEST_DELAY = 0.5 # 0.5s between requests (well under 5000/hr limit)
# ============================================================================
# Heritage Institution Type Mappings (from CH-Annotator v1.7.0)
# ============================================================================
# Wikidata P31 types that indicate heritage institutions (GRP.HER)
HERITAGE_P31_TYPES = {
"Q33506": "museum",
"Q207694": "art museum",
"Q17431399": "national museum",
"Q2772772": "history museum",
"Q1007870": "natural history museum",
"Q15243209": "science museum",
"Q1060829": "open-air museum",
"Q16735822": "ethnographic museum",
"Q2598870": "archaeological museum",
"Q2889680": "military museum",
"Q210272": "library",
"Q7075": "library",
"Q856234": "national library",
"Q28564": "public library",
"Q7298645": "research library",
"Q166118": "archive",
"Q473972": "national archives",
"Q2668072": "state archive",
"Q1030034": "municipal archive",
"Q1007870": "historical archive",
"Q1060829": "art gallery",
"Q207628": "musical ensemble", # May have archives
"Q3152824": "cultural institution",
"Q1137809": "botanical garden",
"Q43229": "organization", # Too generic, needs LLM verification
"Q327333": "government agency", # May be heritage-related
"Q16917": "hospital", # May have medical archives
}
# Wikidata P31 types that definitely NOT heritage institutions
NON_HERITAGE_P31_TYPES = {
"Q5": "human",
"Q515": "city",
"Q486972": "human settlement",
"Q532": "village",
"Q3957": "town",
"Q15284": "municipality",
"Q1115575": "populated place",
"Q7930989": "urban area",
"Q41176": "building", # Just a building, not institution
"Q811979": "architectural structure",
"Q4989906": "monument", # Physical monument, not institution
"Q571": "book",
"Q11424": "film",
"Q7889": "video game",
"Q215380": "musical group", # Music bands, not heritage
"Q5398426": "television series",
"Q1344": "opera",
"Q35127": "website",
"Q4830453": "business",
"Q783794": "company",
"Q891723": "public company",
"Q6881511": "enterprise",
}
# Expected inception date range (heritage institutions founded after year 500)
MIN_INCEPTION_YEAR = 500
MAX_INCEPTION_YEAR = datetime.now().year
# ============================================================================
# Validation Results
# ============================================================================
@dataclass
class ValidationIssue:
"""A single validation issue found in a custodian record."""
issue_type: str # entity_type_mismatch, name_mismatch, temporal_invalid, location_mismatch
severity: str # error, warning, info
message: str
expected: Optional[str] = None
actual: Optional[str] = None
wikidata_field: Optional[str] = None
suggestion: Optional[str] = None
@dataclass
class ValidationResult:
"""Validation result for a single custodian file."""
file_path: str
custodian_name: str
wikidata_id: Optional[str]
is_valid: bool
issues: List[ValidationIssue] = field(default_factory=list)
verification_method: str = "heuristic"
verified_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
# ============================================================================
# Validation Logic
# ============================================================================
def extract_wikidata_info(data: Dict) -> Optional[Dict]:
"""Extract Wikidata enrichment info from custodian data."""
wd = data.get("wikidata_enrichment", {})
if not wd:
return None
return {
"entity_id": wd.get("wikidata_entity_id"),
"label_en": wd.get("wikidata_label_en", ""),
"label_nl": wd.get("wikidata_label_nl", ""),
"description_en": wd.get("wikidata_description_en", ""),
"instance_of": wd.get("wikidata_instance_of", []),
"inception": wd.get("wikidata_inception"),
"located_in": wd.get("wikidata_located_in", {}),
"country": wd.get("wikidata_country", {}),
"labels": wd.get("wikidata_labels", {}),
}
def validate_entity_types(
wd_info: Dict,
custodian_name: str,
) -> List[ValidationIssue]:
"""Validate that Wikidata P31 types indicate a heritage institution."""
issues = []
instance_of = wd_info.get("instance_of", [])
if not instance_of:
issues.append(ValidationIssue(
issue_type="entity_type_missing",
severity="warning",
message="No instance_of (P31) data in Wikidata enrichment",
wikidata_field="wikidata_instance_of",
))
return issues
# Extract QIDs from instance_of list
qids = set()
for item in instance_of:
if isinstance(item, dict):
qid = item.get("id", "")
if qid:
qids.add(qid)
elif isinstance(item, str):
qids.add(item)
# Check for heritage types
heritage_matches = qids & set(HERITAGE_P31_TYPES.keys())
non_heritage_matches = qids & set(NON_HERITAGE_P31_TYPES.keys())
if non_heritage_matches and not heritage_matches:
# Definite mismatch - this is NOT a heritage institution
non_heritage_labels = [NON_HERITAGE_P31_TYPES.get(q, q) for q in non_heritage_matches]
issues.append(ValidationIssue(
issue_type="entity_type_mismatch",
severity="error",
message=f"Wikidata entity appears to be: {', '.join(non_heritage_labels)}",
expected="Heritage institution (museum, library, archive, gallery)",
actual=", ".join(non_heritage_labels),
wikidata_field="wikidata_instance_of",
suggestion="Review Wikidata link - may be wrong entity or entity needs verification",
))
elif not heritage_matches and not non_heritage_matches:
# Unknown types - needs review
unknown_qids = list(qids)[:5] # First 5 for brevity
issues.append(ValidationIssue(
issue_type="entity_type_unknown",
severity="warning",
message=f"Unknown P31 types: {', '.join(unknown_qids)}",
wikidata_field="wikidata_instance_of",
suggestion="Verify entity types manually or with LLM",
))
return issues
def validate_inception_date(
wd_info: Dict,
custodian_name: str,
) -> List[ValidationIssue]:
"""Validate that inception date is plausible."""
issues = []
inception = wd_info.get("inception")
if not inception:
# Missing inception is not an error, just informational
return issues
# Parse inception date
try:
# Handle various date formats
if isinstance(inception, str):
# Extract year from date string like "1850-01-01" or "1850"
year_match = re.match(r'^-?(\d{4})', inception)
if year_match:
year = int(year_match.group(1))
if inception.startswith('-'):
year = -year
else:
issues.append(ValidationIssue(
issue_type="temporal_parse_error",
severity="warning",
message=f"Could not parse inception date: {inception}",
wikidata_field="wikidata_inception",
))
return issues
else:
year = int(inception)
# Validate year range
if year > MAX_INCEPTION_YEAR:
issues.append(ValidationIssue(
issue_type="temporal_future",
severity="error",
message=f"Inception date is in the future: {year}",
expected=f"Year <= {MAX_INCEPTION_YEAR}",
actual=str(year),
wikidata_field="wikidata_inception",
suggestion="Check Wikidata - inception date may be wrong",
))
elif year < MIN_INCEPTION_YEAR:
issues.append(ValidationIssue(
issue_type="temporal_implausible",
severity="warning",
message=f"Inception date seems very old for modern institution: {year}",
expected=f"Year >= {MIN_INCEPTION_YEAR}",
actual=str(year),
wikidata_field="wikidata_inception",
suggestion="Verify - may be founding date of predecessor organization",
))
except (ValueError, TypeError) as e:
issues.append(ValidationIssue(
issue_type="temporal_parse_error",
severity="warning",
message=f"Error parsing inception: {e}",
wikidata_field="wikidata_inception",
))
return issues
def validate_name_match(
wd_info: Dict,
custodian_name: str,
file_path: str,
) -> List[ValidationIssue]:
"""Validate that Wikidata label reasonably matches custodian name."""
issues = []
wd_label_en = wd_info.get("label_en", "")
wd_labels = wd_info.get("labels", {})
if not wd_label_en and not wd_labels:
issues.append(ValidationIssue(
issue_type="name_missing",
severity="warning",
message="No Wikidata labels found",
wikidata_field="wikidata_labels",
))
return issues
# Collect all labels for comparison
all_labels = []
if wd_label_en:
all_labels.append(wd_label_en.lower())
for lang, label in wd_labels.items():
if label:
all_labels.append(label.lower())
# Normalize custodian name
name_lower = custodian_name.lower()
# Check for any reasonable match
# A match is found if:
# 1. Exact match
# 2. One contains the other
# 3. Significant word overlap (>50%)
has_match = False
for label in all_labels:
if name_lower == label:
has_match = True
break
if name_lower in label or label in name_lower:
has_match = True
break
# Word overlap check
name_words = set(re.findall(r'\w+', name_lower))
label_words = set(re.findall(r'\w+', label))
if name_words and label_words:
overlap = len(name_words & label_words)
max_words = max(len(name_words), len(label_words))
if overlap / max_words >= 0.5:
has_match = True
break
if not has_match:
issues.append(ValidationIssue(
issue_type="name_mismatch",
severity="warning",
message=f"Custodian name doesn't match Wikidata labels",
expected=custodian_name,
actual=wd_label_en or str(list(wd_labels.values())[:3]),
wikidata_field="wikidata_labels",
suggestion="Verify Wikidata entity is correct match",
))
return issues
def validate_custodian_file(file_path: Path, dry_run: bool = False) -> ValidationResult:
"""Validate a single custodian YAML file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return ValidationResult(
file_path=str(file_path),
custodian_name="",
wikidata_id=None,
is_valid=False,
issues=[ValidationIssue(
issue_type="file_empty",
severity="error",
message="Empty YAML file",
)],
)
# Extract custodian name
custodian_name = data.get("custodian_name", {}).get("emic_name", "")
if not custodian_name:
custodian_name = data.get("name", data.get("original_entry", {}).get("name", file_path.stem))
# Extract Wikidata info
wd_info = extract_wikidata_info(data)
if not wd_info or not wd_info.get("entity_id"):
return ValidationResult(
file_path=str(file_path),
custodian_name=custodian_name,
wikidata_id=None,
is_valid=True, # No Wikidata = nothing to validate
issues=[ValidationIssue(
issue_type="no_wikidata",
severity="info",
message="No Wikidata enrichment to validate",
)],
)
wikidata_id = wd_info["entity_id"]
all_issues = []
# Run validations
all_issues.extend(validate_entity_types(wd_info, custodian_name))
all_issues.extend(validate_inception_date(wd_info, custodian_name))
all_issues.extend(validate_name_match(wd_info, custodian_name, str(file_path)))
# Determine overall validity
has_errors = any(issue.severity == "error" for issue in all_issues)
return ValidationResult(
file_path=str(file_path),
custodian_name=custodian_name,
wikidata_id=wikidata_id,
is_valid=not has_errors,
issues=all_issues,
verification_method="heuristic",
)
except Exception as e:
return ValidationResult(
file_path=str(file_path),
custodian_name="",
wikidata_id=None,
is_valid=False,
issues=[ValidationIssue(
issue_type="file_error",
severity="error",
message=f"Error reading file: {e}",
)],
)
# ============================================================================
# GLM-4.6 Verification for Ambiguous Cases
# ============================================================================
class GLMValidator:
"""Use GLM-4.6 to verify ambiguous Wikidata matches."""
VERIFICATION_PROMPT = """You are a heritage institution validator following CH-Annotator v1.7.0 convention.
Your task is to determine if a Wikidata entity correctly represents a heritage institution.
## CH-Annotator GRP.HER Definition
Heritage institutions (GRP.HER) are organizations that:
- Collect, preserve, and provide access to cultural heritage materials
- Include: museums (MUS), libraries (LIB), archives (ARC), galleries (GAL)
- Also includes: research centers with collections, botanical gardens, educational institutions WITH heritage collections
## Validation Task
Analyze if the Wikidata entity matches the expected custodian and is actually a heritage institution.
## Custodian Record
Name: {custodian_name}
Location: {custodian_location}
Expected type: Heritage institution
## Wikidata Entity
QID: {wikidata_id}
Label: {wd_label}
Description: {wd_description}
Instance of (P31): {p31_types}
Inception: {inception}
## Validation Issues Found
{issues_summary}
## Respond in JSON format:
```json
{{
"is_correct_match": true/false,
"is_heritage_institution": true/false,
"heritage_subtype": "MUS|LIB|ARC|GAL|RES|BOT|EDU|OTHER|null",
"confidence": 0.95,
"issues_assessment": [
{{
"issue_type": "...",
"valid_concern": true/false,
"explanation": "..."
}}
],
"recommendation": "accept|reject|needs_review",
"reasoning": "Brief explanation"
}}
```"""
def __init__(self):
self.api_key = ZAI_API_TOKEN
if not self.api_key:
raise ValueError("ZAI_API_TOKEN not set in environment")
self.client = httpx.AsyncClient(
timeout=60.0,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
async def verify_ambiguous_case(
self,
result: ValidationResult,
wd_info: Dict,
custodian_location: str = "",
) -> Dict[str, Any]:
"""Use GLM-4.6 to verify an ambiguous validation result."""
# Format issues for prompt
issues_summary = "\n".join([
f"- {issue.issue_type}: {issue.message}"
for issue in result.issues
]) or "None"
# Format P31 types
p31_list = []
for item in wd_info.get("instance_of", []):
if isinstance(item, dict):
qid = item.get("id", "")
label = HERITAGE_P31_TYPES.get(qid, NON_HERITAGE_P31_TYPES.get(qid, qid))
p31_list.append(f"{qid} ({label})")
elif isinstance(item, str):
label = HERITAGE_P31_TYPES.get(item, NON_HERITAGE_P31_TYPES.get(item, item))
p31_list.append(f"{item} ({label})")
prompt = self.VERIFICATION_PROMPT.format(
custodian_name=result.custodian_name,
custodian_location=custodian_location,
wikidata_id=result.wikidata_id,
wd_label=wd_info.get("label_en", ""),
wd_description=wd_info.get("description_en", ""),
p31_types=", ".join(p31_list) if p31_list else "None",
inception=wd_info.get("inception", "Unknown"),
issues_summary=issues_summary,
)
try:
response = await self.client.post(
ZAI_API_URL,
json={
"model": ZAI_MODEL,
"messages": [
{"role": "system", "content": "You are a heritage institution validator. Respond only in valid JSON."},
{"role": "user", "content": prompt},
],
"temperature": 0.1,
"max_tokens": 1024,
}
)
response.raise_for_status()
data = response.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
# Parse JSON from response
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
content = content.strip()
if not content.startswith("{"):
start_idx = content.find("{")
if start_idx != -1:
end_idx = content.rfind("}")
if end_idx != -1:
content = content[start_idx:end_idx + 1]
verification = json.loads(content)
verification["verification_method"] = "glm_4.6_ch_annotator"
return verification
except Exception as e:
logger.error(f"GLM verification error: {e}")
return {
"is_correct_match": None,
"is_heritage_institution": None,
"confidence": 0.0,
"recommendation": "needs_review",
"reasoning": f"Verification failed: {e}",
"verification_method": "glm_4.6_error",
}
async def close(self):
await self.client.aclose()
# ============================================================================
# Main Processing
# ============================================================================
async def process_files(
files: List[Path],
use_llm: bool = False,
dry_run: bool = False,
) -> List[ValidationResult]:
"""Process multiple custodian files for validation."""
results = []
llm_validator = None
if use_llm and ZAI_API_TOKEN:
try:
llm_validator = GLMValidator()
except ValueError as e:
logger.warning(f"LLM validation disabled: {e}")
try:
for i, file_path in enumerate(files):
logger.info(f"[{i+1}/{len(files)}] Validating {file_path.name}")
result = validate_custodian_file(file_path, dry_run)
# For files with warnings/errors and LLM enabled, do additional verification
if llm_validator and result.issues and not result.is_valid:
# Load file again to get full wd_info
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
wd_info = extract_wikidata_info(data) or {}
if not dry_run:
verification = await llm_validator.verify_ambiguous_case(result, wd_info)
result.verification_method = verification.get("verification_method", "glm_4.6")
# Update validity based on LLM verification
if verification.get("recommendation") == "accept":
result.is_valid = True
# Add LLM reasoning to issues
result.issues.append(ValidationIssue(
issue_type="llm_verification",
severity="info",
message=f"GLM-4.6: {verification.get('reasoning', 'No reasoning')}",
suggestion=f"Recommendation: {verification.get('recommendation', 'unknown')}",
))
await asyncio.sleep(REQUEST_DELAY)
results.append(result)
finally:
if llm_validator:
await llm_validator.close()
return results
def generate_report(results: List[ValidationResult]) -> Dict:
"""Generate a validation report from results."""
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_files": len(results),
"valid_count": sum(1 for r in results if r.is_valid),
"invalid_count": sum(1 for r in results if not r.is_valid),
"issues_by_type": {},
"issues_by_severity": {"error": 0, "warning": 0, "info": 0},
"files_with_issues": [],
}
# Aggregate issues
for result in results:
for issue in result.issues:
# Count by type
if issue.issue_type not in report["issues_by_type"]:
report["issues_by_type"][issue.issue_type] = 0
report["issues_by_type"][issue.issue_type] += 1
# Count by severity
report["issues_by_severity"][issue.severity] += 1
# Track files with issues
if result.issues and any(i.severity in ("error", "warning") for i in result.issues):
report["files_with_issues"].append({
"file": result.file_path,
"name": result.custodian_name,
"wikidata_id": result.wikidata_id,
"is_valid": result.is_valid,
"issues": [
{
"type": i.issue_type,
"severity": i.severity,
"message": i.message,
"suggestion": i.suggestion,
}
for i in result.issues
],
})
return report
def main():
parser = argparse.ArgumentParser(description="Validate Wikidata enrichments using CH-Annotator")
parser.add_argument("--country", type=str, help="Only validate files for country code XX (e.g., JP, CZ)")
parser.add_argument("--limit", type=int, default=0, help="Process only first N files (0 = no limit)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be validated without API calls")
parser.add_argument("--use-llm", action="store_true", help="Use GLM-4.6 for ambiguous case verification")
parser.add_argument("--report", type=str, default="validation_report.json", help="Output report filename")
args = parser.parse_args()
# Find files to process
pattern = f"{args.country}-*.yaml" if args.country else "*.yaml"
yaml_files = sorted(CUSTODIAN_DIR.glob(pattern))
logger.info(f"Found {len(yaml_files)} YAML files in {CUSTODIAN_DIR}")
if args.limit > 0:
yaml_files = yaml_files[:args.limit]
logger.info(f"Limited to first {args.limit} files")
if args.dry_run:
logger.info("DRY RUN - No API calls will be made")
if args.use_llm:
if ZAI_API_TOKEN:
logger.info("LLM verification enabled (GLM-4.6)")
else:
logger.warning("LLM requested but ZAI_API_TOKEN not set - using heuristics only")
# Process files
results = asyncio.run(process_files(yaml_files, use_llm=args.use_llm, dry_run=args.dry_run))
# Generate report
report = generate_report(results)
# Ensure report directory exists
REPORT_DIR.mkdir(parents=True, exist_ok=True)
report_path = REPORT_DIR / args.report
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2)
# Print summary
logger.info("\n" + "=" * 60)
logger.info("VALIDATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total files validated: {report['total_files']}")
logger.info(f"Valid: {report['valid_count']}")
logger.info(f"Invalid: {report['invalid_count']}")
logger.info(f"\nIssues by severity:")
for severity, count in report["issues_by_severity"].items():
logger.info(f" {severity}: {count}")
logger.info(f"\nIssues by type:")
for issue_type, count in sorted(report["issues_by_type"].items(), key=lambda x: -x[1]):
logger.info(f" {issue_type}: {count}")
logger.info(f"\nReport saved to: {report_path}")
logger.info("=" * 60)
if __name__ == "__main__":
main()