#!/usr/bin/env python3 """ Validate Wikidata enrichments using GLM-4.6 with CH-Annotator v1.7.0. This script validates that Wikidata entities linked to custodian records actually represent heritage institutions (GRP.HER) and that temporal data is reasonable. Validation checks: 1. Entity Type Match: Wikidata P31 (instance of) aligns with expected heritage types 2. Name Match: Wikidata label reasonably matches custodian name 3. Temporal Plausibility: Inception dates are reasonable (not future, not impossibly old) 4. Location Consistency: P131 (located in) aligns with custodian location data Uses GLM-4.6 for ambiguous cases where heuristics are insufficient. Usage: python scripts/validate_wikidata_enrichments.py [--country XX] [--limit N] [--dry-run] python scripts/validate_wikidata_enrichments.py --country JP --limit 50 Options: --country XX Only validate files for country code XX (e.g., JP, CZ) --limit N Process only first N files (for testing) --dry-run Show what would be validated without making API calls --fix Automatically fix or flag issues --report FILE Write validation report to FILE (default: validation_report.json) Environment Variables: ZAI_API_TOKEN - Required for GLM-4.6 verification (from Z.AI Coding Plan) CH-Annotator Reference: Entity type: GRP.HER (Heritage Institution) Subtypes: GRP.HER.MUS, GRP.HER.LIB, GRP.HER.ARC, GRP.HER.GAL, etc. See: data/entity_annotation/ch_annotator-v1_7_0.yaml """ import argparse import asyncio import json import logging import os import re import sys from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple import httpx import yaml # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" REPORT_DIR = Path(__file__).parent.parent / "reports" / "validation" # Z.AI API Configuration (from AGENTS.md Rule 11) ZAI_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions" ZAI_MODEL = "glm-4.6" ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "") # Request delay for API rate limiting REQUEST_DELAY = 0.5 # 0.5s between requests (well under 5000/hr limit) # ============================================================================ # Heritage Institution Type Mappings (from CH-Annotator v1.7.0) # ============================================================================ # Wikidata P31 types that indicate heritage institutions (GRP.HER) HERITAGE_P31_TYPES = { "Q33506": "museum", "Q207694": "art museum", "Q17431399": "national museum", "Q2772772": "history museum", "Q1007870": "natural history museum", "Q15243209": "science museum", "Q1060829": "open-air museum", "Q16735822": "ethnographic museum", "Q2598870": "archaeological museum", "Q2889680": "military museum", "Q210272": "library", "Q7075": "library", "Q856234": "national library", "Q28564": "public library", "Q7298645": "research library", "Q166118": "archive", "Q473972": "national archives", "Q2668072": "state archive", "Q1030034": "municipal archive", "Q1007870": "historical archive", "Q1060829": "art gallery", "Q207628": "musical ensemble", # May have archives "Q3152824": "cultural institution", "Q1137809": "botanical garden", "Q43229": "organization", # Too generic, needs LLM verification "Q327333": "government agency", # May be heritage-related "Q16917": "hospital", # May have medical archives } # Wikidata P31 types that definitely NOT heritage institutions NON_HERITAGE_P31_TYPES = { "Q5": "human", "Q515": "city", "Q486972": "human settlement", "Q532": "village", "Q3957": "town", "Q15284": "municipality", "Q1115575": "populated place", "Q7930989": "urban area", "Q41176": "building", # Just a building, not institution "Q811979": "architectural structure", "Q4989906": "monument", # Physical monument, not institution "Q571": "book", "Q11424": "film", "Q7889": "video game", "Q215380": "musical group", # Music bands, not heritage "Q5398426": "television series", "Q1344": "opera", "Q35127": "website", "Q4830453": "business", "Q783794": "company", "Q891723": "public company", "Q6881511": "enterprise", } # Expected inception date range (heritage institutions founded after year 500) MIN_INCEPTION_YEAR = 500 MAX_INCEPTION_YEAR = datetime.now().year # ============================================================================ # Validation Results # ============================================================================ @dataclass class ValidationIssue: """A single validation issue found in a custodian record.""" issue_type: str # entity_type_mismatch, name_mismatch, temporal_invalid, location_mismatch severity: str # error, warning, info message: str expected: Optional[str] = None actual: Optional[str] = None wikidata_field: Optional[str] = None suggestion: Optional[str] = None @dataclass class ValidationResult: """Validation result for a single custodian file.""" file_path: str custodian_name: str wikidata_id: Optional[str] is_valid: bool issues: List[ValidationIssue] = field(default_factory=list) verification_method: str = "heuristic" verified_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) # ============================================================================ # Validation Logic # ============================================================================ def extract_wikidata_info(data: Dict) -> Optional[Dict]: """Extract Wikidata enrichment info from custodian data.""" wd = data.get("wikidata_enrichment", {}) if not wd: return None return { "entity_id": wd.get("wikidata_entity_id"), "label_en": wd.get("wikidata_label_en", ""), "label_nl": wd.get("wikidata_label_nl", ""), "description_en": wd.get("wikidata_description_en", ""), "instance_of": wd.get("wikidata_instance_of", []), "inception": wd.get("wikidata_inception"), "located_in": wd.get("wikidata_located_in", {}), "country": wd.get("wikidata_country", {}), "labels": wd.get("wikidata_labels", {}), } def validate_entity_types( wd_info: Dict, custodian_name: str, ) -> List[ValidationIssue]: """Validate that Wikidata P31 types indicate a heritage institution.""" issues = [] instance_of = wd_info.get("instance_of", []) if not instance_of: issues.append(ValidationIssue( issue_type="entity_type_missing", severity="warning", message="No instance_of (P31) data in Wikidata enrichment", wikidata_field="wikidata_instance_of", )) return issues # Extract QIDs from instance_of list qids = set() for item in instance_of: if isinstance(item, dict): qid = item.get("id", "") if qid: qids.add(qid) elif isinstance(item, str): qids.add(item) # Check for heritage types heritage_matches = qids & set(HERITAGE_P31_TYPES.keys()) non_heritage_matches = qids & set(NON_HERITAGE_P31_TYPES.keys()) if non_heritage_matches and not heritage_matches: # Definite mismatch - this is NOT a heritage institution non_heritage_labels = [NON_HERITAGE_P31_TYPES.get(q, q) for q in non_heritage_matches] issues.append(ValidationIssue( issue_type="entity_type_mismatch", severity="error", message=f"Wikidata entity appears to be: {', '.join(non_heritage_labels)}", expected="Heritage institution (museum, library, archive, gallery)", actual=", ".join(non_heritage_labels), wikidata_field="wikidata_instance_of", suggestion="Review Wikidata link - may be wrong entity or entity needs verification", )) elif not heritage_matches and not non_heritage_matches: # Unknown types - needs review unknown_qids = list(qids)[:5] # First 5 for brevity issues.append(ValidationIssue( issue_type="entity_type_unknown", severity="warning", message=f"Unknown P31 types: {', '.join(unknown_qids)}", wikidata_field="wikidata_instance_of", suggestion="Verify entity types manually or with LLM", )) return issues def validate_inception_date( wd_info: Dict, custodian_name: str, ) -> List[ValidationIssue]: """Validate that inception date is plausible.""" issues = [] inception = wd_info.get("inception") if not inception: # Missing inception is not an error, just informational return issues # Parse inception date try: # Handle various date formats if isinstance(inception, str): # Extract year from date string like "1850-01-01" or "1850" year_match = re.match(r'^-?(\d{4})', inception) if year_match: year = int(year_match.group(1)) if inception.startswith('-'): year = -year else: issues.append(ValidationIssue( issue_type="temporal_parse_error", severity="warning", message=f"Could not parse inception date: {inception}", wikidata_field="wikidata_inception", )) return issues else: year = int(inception) # Validate year range if year > MAX_INCEPTION_YEAR: issues.append(ValidationIssue( issue_type="temporal_future", severity="error", message=f"Inception date is in the future: {year}", expected=f"Year <= {MAX_INCEPTION_YEAR}", actual=str(year), wikidata_field="wikidata_inception", suggestion="Check Wikidata - inception date may be wrong", )) elif year < MIN_INCEPTION_YEAR: issues.append(ValidationIssue( issue_type="temporal_implausible", severity="warning", message=f"Inception date seems very old for modern institution: {year}", expected=f"Year >= {MIN_INCEPTION_YEAR}", actual=str(year), wikidata_field="wikidata_inception", suggestion="Verify - may be founding date of predecessor organization", )) except (ValueError, TypeError) as e: issues.append(ValidationIssue( issue_type="temporal_parse_error", severity="warning", message=f"Error parsing inception: {e}", wikidata_field="wikidata_inception", )) return issues def validate_name_match( wd_info: Dict, custodian_name: str, file_path: str, ) -> List[ValidationIssue]: """Validate that Wikidata label reasonably matches custodian name.""" issues = [] wd_label_en = wd_info.get("label_en", "") wd_labels = wd_info.get("labels", {}) if not wd_label_en and not wd_labels: issues.append(ValidationIssue( issue_type="name_missing", severity="warning", message="No Wikidata labels found", wikidata_field="wikidata_labels", )) return issues # Collect all labels for comparison all_labels = [] if wd_label_en: all_labels.append(wd_label_en.lower()) for lang, label in wd_labels.items(): if label: all_labels.append(label.lower()) # Normalize custodian name name_lower = custodian_name.lower() # Check for any reasonable match # A match is found if: # 1. Exact match # 2. One contains the other # 3. Significant word overlap (>50%) has_match = False for label in all_labels: if name_lower == label: has_match = True break if name_lower in label or label in name_lower: has_match = True break # Word overlap check name_words = set(re.findall(r'\w+', name_lower)) label_words = set(re.findall(r'\w+', label)) if name_words and label_words: overlap = len(name_words & label_words) max_words = max(len(name_words), len(label_words)) if overlap / max_words >= 0.5: has_match = True break if not has_match: issues.append(ValidationIssue( issue_type="name_mismatch", severity="warning", message=f"Custodian name doesn't match Wikidata labels", expected=custodian_name, actual=wd_label_en or str(list(wd_labels.values())[:3]), wikidata_field="wikidata_labels", suggestion="Verify Wikidata entity is correct match", )) return issues def validate_custodian_file(file_path: Path, dry_run: bool = False) -> ValidationResult: """Validate a single custodian YAML file.""" try: with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return ValidationResult( file_path=str(file_path), custodian_name="", wikidata_id=None, is_valid=False, issues=[ValidationIssue( issue_type="file_empty", severity="error", message="Empty YAML file", )], ) # Extract custodian name custodian_name = data.get("custodian_name", {}).get("emic_name", "") if not custodian_name: custodian_name = data.get("name", data.get("original_entry", {}).get("name", file_path.stem)) # Extract Wikidata info wd_info = extract_wikidata_info(data) if not wd_info or not wd_info.get("entity_id"): return ValidationResult( file_path=str(file_path), custodian_name=custodian_name, wikidata_id=None, is_valid=True, # No Wikidata = nothing to validate issues=[ValidationIssue( issue_type="no_wikidata", severity="info", message="No Wikidata enrichment to validate", )], ) wikidata_id = wd_info["entity_id"] all_issues = [] # Run validations all_issues.extend(validate_entity_types(wd_info, custodian_name)) all_issues.extend(validate_inception_date(wd_info, custodian_name)) all_issues.extend(validate_name_match(wd_info, custodian_name, str(file_path))) # Determine overall validity has_errors = any(issue.severity == "error" for issue in all_issues) return ValidationResult( file_path=str(file_path), custodian_name=custodian_name, wikidata_id=wikidata_id, is_valid=not has_errors, issues=all_issues, verification_method="heuristic", ) except Exception as e: return ValidationResult( file_path=str(file_path), custodian_name="", wikidata_id=None, is_valid=False, issues=[ValidationIssue( issue_type="file_error", severity="error", message=f"Error reading file: {e}", )], ) # ============================================================================ # GLM-4.6 Verification for Ambiguous Cases # ============================================================================ class GLMValidator: """Use GLM-4.6 to verify ambiguous Wikidata matches.""" VERIFICATION_PROMPT = """You are a heritage institution validator following CH-Annotator v1.7.0 convention. Your task is to determine if a Wikidata entity correctly represents a heritage institution. ## CH-Annotator GRP.HER Definition Heritage institutions (GRP.HER) are organizations that: - Collect, preserve, and provide access to cultural heritage materials - Include: museums (MUS), libraries (LIB), archives (ARC), galleries (GAL) - Also includes: research centers with collections, botanical gardens, educational institutions WITH heritage collections ## Validation Task Analyze if the Wikidata entity matches the expected custodian and is actually a heritage institution. ## Custodian Record Name: {custodian_name} Location: {custodian_location} Expected type: Heritage institution ## Wikidata Entity QID: {wikidata_id} Label: {wd_label} Description: {wd_description} Instance of (P31): {p31_types} Inception: {inception} ## Validation Issues Found {issues_summary} ## Respond in JSON format: ```json {{ "is_correct_match": true/false, "is_heritage_institution": true/false, "heritage_subtype": "MUS|LIB|ARC|GAL|RES|BOT|EDU|OTHER|null", "confidence": 0.95, "issues_assessment": [ {{ "issue_type": "...", "valid_concern": true/false, "explanation": "..." }} ], "recommendation": "accept|reject|needs_review", "reasoning": "Brief explanation" }} ```""" def __init__(self): self.api_key = ZAI_API_TOKEN if not self.api_key: raise ValueError("ZAI_API_TOKEN not set in environment") self.client = httpx.AsyncClient( timeout=60.0, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } ) async def verify_ambiguous_case( self, result: ValidationResult, wd_info: Dict, custodian_location: str = "", ) -> Dict[str, Any]: """Use GLM-4.6 to verify an ambiguous validation result.""" # Format issues for prompt issues_summary = "\n".join([ f"- {issue.issue_type}: {issue.message}" for issue in result.issues ]) or "None" # Format P31 types p31_list = [] for item in wd_info.get("instance_of", []): if isinstance(item, dict): qid = item.get("id", "") label = HERITAGE_P31_TYPES.get(qid, NON_HERITAGE_P31_TYPES.get(qid, qid)) p31_list.append(f"{qid} ({label})") elif isinstance(item, str): label = HERITAGE_P31_TYPES.get(item, NON_HERITAGE_P31_TYPES.get(item, item)) p31_list.append(f"{item} ({label})") prompt = self.VERIFICATION_PROMPT.format( custodian_name=result.custodian_name, custodian_location=custodian_location, wikidata_id=result.wikidata_id, wd_label=wd_info.get("label_en", ""), wd_description=wd_info.get("description_en", ""), p31_types=", ".join(p31_list) if p31_list else "None", inception=wd_info.get("inception", "Unknown"), issues_summary=issues_summary, ) try: response = await self.client.post( ZAI_API_URL, json={ "model": ZAI_MODEL, "messages": [ {"role": "system", "content": "You are a heritage institution validator. Respond only in valid JSON."}, {"role": "user", "content": prompt}, ], "temperature": 0.1, "max_tokens": 1024, } ) response.raise_for_status() data = response.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") # Parse JSON from response if "```json" in content: content = content.split("```json")[1].split("```")[0] elif "```" in content: content = content.split("```")[1].split("```")[0] content = content.strip() if not content.startswith("{"): start_idx = content.find("{") if start_idx != -1: end_idx = content.rfind("}") if end_idx != -1: content = content[start_idx:end_idx + 1] verification = json.loads(content) verification["verification_method"] = "glm_4.6_ch_annotator" return verification except Exception as e: logger.error(f"GLM verification error: {e}") return { "is_correct_match": None, "is_heritage_institution": None, "confidence": 0.0, "recommendation": "needs_review", "reasoning": f"Verification failed: {e}", "verification_method": "glm_4.6_error", } async def close(self): await self.client.aclose() # ============================================================================ # Main Processing # ============================================================================ async def process_files( files: List[Path], use_llm: bool = False, dry_run: bool = False, ) -> List[ValidationResult]: """Process multiple custodian files for validation.""" results = [] llm_validator = None if use_llm and ZAI_API_TOKEN: try: llm_validator = GLMValidator() except ValueError as e: logger.warning(f"LLM validation disabled: {e}") try: for i, file_path in enumerate(files): logger.info(f"[{i+1}/{len(files)}] Validating {file_path.name}") result = validate_custodian_file(file_path, dry_run) # For files with warnings/errors and LLM enabled, do additional verification if llm_validator and result.issues and not result.is_valid: # Load file again to get full wd_info with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) wd_info = extract_wikidata_info(data) or {} if not dry_run: verification = await llm_validator.verify_ambiguous_case(result, wd_info) result.verification_method = verification.get("verification_method", "glm_4.6") # Update validity based on LLM verification if verification.get("recommendation") == "accept": result.is_valid = True # Add LLM reasoning to issues result.issues.append(ValidationIssue( issue_type="llm_verification", severity="info", message=f"GLM-4.6: {verification.get('reasoning', 'No reasoning')}", suggestion=f"Recommendation: {verification.get('recommendation', 'unknown')}", )) await asyncio.sleep(REQUEST_DELAY) results.append(result) finally: if llm_validator: await llm_validator.close() return results def generate_report(results: List[ValidationResult]) -> Dict: """Generate a validation report from results.""" report = { "generated_at": datetime.now(timezone.utc).isoformat(), "total_files": len(results), "valid_count": sum(1 for r in results if r.is_valid), "invalid_count": sum(1 for r in results if not r.is_valid), "issues_by_type": {}, "issues_by_severity": {"error": 0, "warning": 0, "info": 0}, "files_with_issues": [], } # Aggregate issues for result in results: for issue in result.issues: # Count by type if issue.issue_type not in report["issues_by_type"]: report["issues_by_type"][issue.issue_type] = 0 report["issues_by_type"][issue.issue_type] += 1 # Count by severity report["issues_by_severity"][issue.severity] += 1 # Track files with issues if result.issues and any(i.severity in ("error", "warning") for i in result.issues): report["files_with_issues"].append({ "file": result.file_path, "name": result.custodian_name, "wikidata_id": result.wikidata_id, "is_valid": result.is_valid, "issues": [ { "type": i.issue_type, "severity": i.severity, "message": i.message, "suggestion": i.suggestion, } for i in result.issues ], }) return report def main(): parser = argparse.ArgumentParser(description="Validate Wikidata enrichments using CH-Annotator") parser.add_argument("--country", type=str, help="Only validate files for country code XX (e.g., JP, CZ)") parser.add_argument("--limit", type=int, default=0, help="Process only first N files (0 = no limit)") parser.add_argument("--dry-run", action="store_true", help="Show what would be validated without API calls") parser.add_argument("--use-llm", action="store_true", help="Use GLM-4.6 for ambiguous case verification") parser.add_argument("--report", type=str, default="validation_report.json", help="Output report filename") args = parser.parse_args() # Find files to process pattern = f"{args.country}-*.yaml" if args.country else "*.yaml" yaml_files = sorted(CUSTODIAN_DIR.glob(pattern)) logger.info(f"Found {len(yaml_files)} YAML files in {CUSTODIAN_DIR}") if args.limit > 0: yaml_files = yaml_files[:args.limit] logger.info(f"Limited to first {args.limit} files") if args.dry_run: logger.info("DRY RUN - No API calls will be made") if args.use_llm: if ZAI_API_TOKEN: logger.info("LLM verification enabled (GLM-4.6)") else: logger.warning("LLM requested but ZAI_API_TOKEN not set - using heuristics only") # Process files results = asyncio.run(process_files(yaml_files, use_llm=args.use_llm, dry_run=args.dry_run)) # Generate report report = generate_report(results) # Ensure report directory exists REPORT_DIR.mkdir(parents=True, exist_ok=True) report_path = REPORT_DIR / args.report with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2) # Print summary logger.info("\n" + "=" * 60) logger.info("VALIDATION SUMMARY") logger.info("=" * 60) logger.info(f"Total files validated: {report['total_files']}") logger.info(f"Valid: {report['valid_count']}") logger.info(f"Invalid: {report['invalid_count']}") logger.info(f"\nIssues by severity:") for severity, count in report["issues_by_severity"].items(): logger.info(f" {severity}: {count}") logger.info(f"\nIssues by type:") for issue_type, count in sorted(report["issues_by_type"].items(), key=lambda x: -x[1]): logger.info(f" {issue_type}: {count}") logger.info(f"\nReport saved to: {report_path}") logger.info("=" * 60) if __name__ == "__main__": main()