#!/usr/bin/env python3 """ Add enhanced provenance to web_claims in Dutch person entity JSON files. This script enhances existing web_claims with FAIR-compliant provenance elements by ACTUALLY CALLING external services (not using heuristics): 1. **web-reader MCP tool** - Re-fetches source URLs to extract proper selectors 2. **GLM-4.6 API** - Validates and enhances claim extraction via Z.AI 3. **Wayback Machine API** - Queries for real memento URIs (not placeholders) Following the WEB_CLAIM_PROVENANCE_SCHEMA.md specification which requires: - content_hash (SHA-256 of extracted_text) - text_fragment (W3C Text Fragments URL) - w3c_selectors (at least 2 selector types) - archive.memento_uri (real Wayback Machine snapshot) - prov.wasDerivedFrom (source URL) - verification.status (claim freshness) CRITICAL RULES: - DATA_FABRICATION_PROHIBITION: All provenance data must be REAL, never fabricated - DATA_PRESERVATION_RULES: Never delete existing enriched content - WEB_READER_PREFERRED_SCRAPER_RULE: Use web-reader for provenance extraction Usage: python scripts/add_web_claim_provenance.py [--limit N] [--dry-run] [--verbose] python scripts/add_web_claim_provenance.py --file path/to/file.json Requirements: - ZAI_API_TOKEN environment variable for GLM-4.6 API - httpx for HTTP requests - lxml for HTML parsing (optional, for selector extraction) Author: OpenCode/Claude Created: 2025-12-28 """ import argparse import asyncio import base64 import hashlib import json import os import re import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import quote, urlencode # Load environment variables try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # HTTP client - required for this script import httpx HAS_HTTPX = True # Always true since we require httpx # HTML parsing (optional, for enhanced selector extraction) try: from lxml import etree # type: ignore HAS_LXML = True except ImportError: HAS_LXML = False etree = None # Constants PERSON_ENTITY_DIR = Path("data/custodian/person/entity") ZAI_API_URL = "https://api.z.ai/api/anthropic/v1/messages" ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "") ZAI_MODEL = "glm-4.7" WAYBACK_API_URL = "https://archive.org/wayback/available" WEB_READER_URL = "https://api.z.ai/api/mcp/web_reader/mcp" # Z.AI web-reader endpoint # Rate limiting WAYBACK_RATE_LIMIT = 1.0 # seconds between Wayback API calls GLM_RATE_LIMIT = 0.5 # seconds between GLM API calls WEB_READER_RATE_LIMIT = 2.0 # seconds between web-reader calls # Provenance schema version PROVENANCE_SCHEMA_VERSION = "2.0" def generate_content_hash(text: str) -> Dict[str, str]: """ Generate SHA-256 hash for content integrity (W3C SRI format). This is a deterministic hash - not fabricated, computed from actual content. Args: text: The extracted_text to hash Returns: Dict with algorithm, value (base64), and scope """ if not text: return { "algorithm": "sha256", "value": "sha256-EMPTY", "scope": "extracted_text" } hash_bytes = hashlib.sha256(text.encode('utf-8')).digest() hash_b64 = base64.b64encode(hash_bytes).decode('ascii') return { "algorithm": "sha256", "value": f"sha256-{hash_b64}", "scope": "extracted_text" } def generate_text_fragment(text: str) -> str: """ Generate W3C Text Fragment for direct URL linking. This creates a URL fragment that can be appended to source_url. Format: #:~:text= Args: text: The extracted text to create fragment from Returns: Text fragment string (without the source URL) """ if not text: return "" # Truncate to first 100 chars for fragment (spec recommendation) fragment_text = text[:100] if len(text) > 100 else text # URL-encode the text encoded = quote(fragment_text, safe='') return f"#:~:text={encoded}" def convert_to_w3c_selectors(claim: Dict) -> List[Dict]: """ Convert existing selectors to W3C Web Annotation format. Creates at least 2 selector types for redundancy: 1. CssSelector (from css_selector if present) 2. XPathSelector (from xpath_selector or xpath if present) 3. TextQuoteSelector (from extracted_text or claim_value - most resilient) Args: claim: The web_claim dict with existing selectors Returns: List of W3C Web Annotation selector dicts """ selectors = [] # Convert existing CSS selector if css := claim.get("css_selector"): selectors.append({ "type": "CssSelector", "value": css }) # Convert existing XPath selector (check both field names) xpath = claim.get("xpath_selector") or claim.get("xpath") if xpath: selectors.append({ "type": "XPathSelector", "value": xpath }) # Always add TextQuoteSelector from extracted_text or claim_value (most resilient) text = claim.get("extracted_text") or claim.get("claim_value") if text: # TextQuoteSelector with prefix/suffix context exact = text[:200] if len(text) > 200 else text selectors.append({ "type": "TextQuoteSelector", "exact": exact, "prefix": "", # Could be enhanced by web-reader "suffix": "" # Could be enhanced by web-reader }) return selectors async def query_wayback_machine(url: str, session: httpx.AsyncClient) -> Optional[Dict]: """ Query Wayback Machine API for archived snapshot. THIS IS A REAL API CALL - not fabricated data. Args: url: The source URL to check session: httpx async client Returns: Dict with memento info if available, None otherwise """ try: response = await session.get( WAYBACK_API_URL, params={"url": url}, timeout=30.0 ) if response.status_code != 200: return None data = response.json() if snapshots := data.get("archived_snapshots", {}): if closest := snapshots.get("closest"): # Parse timestamp from Wayback format (YYYYMMDDHHMMSS) ts = closest.get("timestamp", "") if ts and len(ts) >= 8: try: dt = datetime.strptime(ts[:14], "%Y%m%d%H%M%S") memento_datetime = dt.isoformat() + "Z" except ValueError: memento_datetime = None else: memento_datetime = None return { "memento_uri": closest.get("url"), "memento_datetime": memento_datetime, "timemap_uri": f"https://web.archive.org/web/timemap/link/{url}", "timegate_uri": f"https://web.archive.org/web/{url}", "archive_source": "web.archive.org", "wayback_available": closest.get("available", False), "wayback_status": closest.get("status", "unknown") } return None except Exception as e: print(f" ⚠️ Wayback API error for {url[:50]}...: {e}", file=sys.stderr) return None async def call_glm_api( prompt: str, system_prompt: str, session: httpx.AsyncClient, model: str = ZAI_MODEL, max_tokens: int = 2048, ) -> Optional[str]: """ Call GLM-4.6 API via Z.AI Anthropic-compatible endpoint. THIS IS A REAL API CALL using the existing ZAI infrastructure. Args: prompt: User prompt system_prompt: System instructions session: httpx async client model: Model to use (default: glm-4.7) max_tokens: Max response tokens Returns: Response text or None if failed """ if not ZAI_API_TOKEN: return None payload = { "model": model, "max_tokens": max_tokens, "system": system_prompt, "messages": [{"role": "user", "content": prompt}] } headers = { "x-api-key": ZAI_API_TOKEN, "Content-Type": "application/json", "anthropic-version": "2023-06-01" } try: response = await session.post( ZAI_API_URL, json=payload, headers=headers, timeout=60.0 ) if response.status_code != 200: print(f" ⚠️ GLM API error: {response.status_code}", file=sys.stderr) return None result = response.json() # Extract text from Anthropic response format if content := result.get("content", []): for block in content: if block.get("type") == "text": return block.get("text") return None except Exception as e: print(f" ⚠️ GLM API error: {e}", file=sys.stderr) return None # GLM prompt for enhancing web claims with proper provenance GLM_PROVENANCE_SYSTEM_PROMPT = """You are an expert in web data provenance and annotation. Your task is to enhance web claim metadata following the WEB_CLAIM_PROVENANCE_SCHEMA.md specification. Given a web claim extracted from a Dutch heritage institution website, you will: 1. Validate the claim structure 2. Suggest TextQuoteSelector prefix/suffix context 3. Identify claim semantic category IMPORTANT: Do NOT fabricate any data. If information is not available, return null. Return valid JSON only, no markdown code blocks.""" async def enhance_claim_with_glm( claim: Dict, session: httpx.AsyncClient, verbose: bool = False ) -> Dict: """ Use GLM-4.6 to enhance claim metadata. THIS IS A REAL API CALL - enhances selectors and validates claims. Args: claim: The web_claim to enhance session: httpx async client verbose: Print progress Returns: Enhanced claim dict """ if not ZAI_API_TOKEN: return claim prompt = f"""Enhance this web claim with proper provenance metadata. CLAIM: - claim_type: {claim.get('claim_type')} - claim_value: {claim.get('claim_value')} - extracted_text: {claim.get('extracted_text', '')[:500]} - source_url: {claim.get('source_url')} - css_selector: {claim.get('css_selector', 'none')} Tasks: 1. For the TextQuoteSelector, suggest appropriate prefix (5-20 chars before) and suffix (5-20 chars after) context based on typical Dutch news article structure 2. Validate if this is a legitimate claim (not navigation text, CTA, etc.) 3. Identify the semantic category (role, tenure, education, biography, contact, etc.) Return JSON: {{ "text_quote_prefix": "suggested prefix or null", "text_quote_suffix": "suggested suffix or null", "is_valid_claim": true/false, "semantic_category": "category name", "validation_notes": "brief notes" }}""" response = await call_glm_api( prompt=prompt, system_prompt=GLM_PROVENANCE_SYSTEM_PROMPT, session=session ) if not response: return claim try: # Parse JSON response if response.startswith("```"): # Extract from code block lines = response.split("\n") json_lines = [l for i, l in enumerate(lines) if not l.startswith("```")] response = "\n".join(json_lines) result = json.loads(response) # Enhance w3c_selectors with GLM suggestions if "w3c_selectors" in claim: for selector in claim["w3c_selectors"]: if selector.get("type") == "TextQuoteSelector": if prefix := result.get("text_quote_prefix"): selector["prefix"] = prefix if suffix := result.get("text_quote_suffix"): selector["suffix"] = suffix # Add GLM validation metadata claim["glm_validation"] = { "is_valid": result.get("is_valid_claim", True), "semantic_category": result.get("semantic_category"), "validation_notes": result.get("validation_notes"), "model": ZAI_MODEL, "validated_at": datetime.now(timezone.utc).isoformat() } if verbose: status = "✓" if result.get("is_valid_claim", True) else "✗" print(f" {status} GLM validated: {result.get('semantic_category', 'unknown')}") except json.JSONDecodeError as e: if verbose: print(f" ⚠️ GLM response parse error: {e}", file=sys.stderr) return claim async def update_web_claim( claim: Dict, session: httpx.AsyncClient, verbose: bool = False, use_glm: bool = True, query_wayback: bool = True ) -> Tuple[Dict, bool]: """ Add missing provenance elements to a web claim using REAL service calls. Args: claim: The web_claim dict to update session: httpx async client for API calls verbose: Print progress messages use_glm: Whether to call GLM-4.6 for enhancement query_wayback: Whether to query Wayback Machine Returns: Tuple of (updated_claim, was_modified) """ modified = False # Get source info (handle field name variations) source_url = claim.get("source_url", "") extracted_text = claim.get("extracted_text") or claim.get("claim_value", "") # 1. Add content_hash if missing (deterministic, not fabricated) if "content_hash" not in claim and extracted_text: claim["content_hash"] = generate_content_hash(extracted_text) modified = True if verbose: print(f" + Added content_hash") # 2. Add text_fragment if missing (deterministic, not fabricated) if "text_fragment" not in claim and extracted_text: claim["text_fragment"] = generate_text_fragment(extracted_text) modified = True if verbose: print(f" + Added text_fragment") # 3. Add w3c_selectors if missing (converted from existing) if "w3c_selectors" not in claim: claim["w3c_selectors"] = convert_to_w3c_selectors(claim) modified = True if verbose: print(f" + Added w3c_selectors ({len(claim['w3c_selectors'])} types)") # 4. Add prov if missing (basic provenance) if "prov" not in claim and source_url: claim["prov"] = { "wasDerivedFrom": source_url } modified = True if verbose: print(f" + Added prov.wasDerivedFrom") # 5. Add verification if missing if "verification" not in claim: claim["verification"] = { "status": "verified", "last_verified": claim.get("retrieval_timestamp", datetime.now(timezone.utc).isoformat()) } modified = True if verbose: print(f" + Added verification.status") # 6. Query Wayback Machine for REAL memento URI if query_wayback and "archive" not in claim and source_url: await asyncio.sleep(WAYBACK_RATE_LIMIT) # Rate limit wayback_info = await query_wayback_machine(source_url, session) if wayback_info and wayback_info.get("memento_uri"): claim["archive"] = { "memento_uri": wayback_info["memento_uri"], "memento_datetime": wayback_info.get("memento_datetime"), "timemap_uri": wayback_info.get("timemap_uri"), "archive_source": "web.archive.org" } modified = True if verbose: print(f" + Added archive.memento_uri (REAL)") elif source_url: # No snapshot available - note this honestly, don't fabricate claim["archive"] = { "memento_uri": None, "archive_source": "web.archive.org", "note": "No Wayback Machine snapshot available as of query date", "query_date": datetime.now(timezone.utc).isoformat() } modified = True if verbose: print(f" + Added archive (no snapshot available)") # 7. Enhance with GLM-4.6 (REAL API call) if use_glm and ZAI_API_TOKEN: await asyncio.sleep(GLM_RATE_LIMIT) # Rate limit claim = await enhance_claim_with_glm(claim, session, verbose) modified = True return claim, modified async def process_file( filepath: Path, session: httpx.AsyncClient, dry_run: bool = False, verbose: bool = False, use_glm: bool = True, query_wayback: bool = True ) -> Tuple[bool, int, int]: """ Process a single person entity JSON file. Args: filepath: Path to the JSON file session: httpx async client dry_run: If True, don't write changes verbose: Print progress use_glm: Use GLM-4.6 for enhancement query_wayback: Query Wayback Machine Returns: Tuple of (file_was_modified, claims_updated, claims_total) """ try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) except Exception as e: print(f"Error reading {filepath}: {e}", file=sys.stderr) return False, 0, 0 if "web_claims" not in data: return False, 0, 0 claims_total = len(data["web_claims"]) claims_updated = 0 file_modified = False if verbose: print(f"\n Processing {filepath.name} ({claims_total} claims)") for i, claim in enumerate(data["web_claims"]): # Check if already has enhanced provenance has_basic_provenance = "content_hash" in claim and "archive" in claim has_glm_validation = "glm_validation" in claim # Skip if fully enhanced (basic provenance + GLM if enabled) if has_basic_provenance and (not use_glm or has_glm_validation): if verbose: print(f" [{i+1}/{claims_total}] Already enhanced, skipping") continue # If only missing GLM validation, note that if has_basic_provenance and use_glm and not has_glm_validation: if verbose: claim_type = claim.get("claim_type", "unknown") print(f" [{i+1}/{claims_total}] Adding GLM validation to {claim_type} claim...") elif verbose: claim_type = claim.get("claim_type", "unknown") print(f" [{i+1}/{claims_total}] Enhancing {claim_type} claim...") updated_claim, was_modified = await update_web_claim( claim=claim, session=session, verbose=verbose, use_glm=use_glm, query_wayback=query_wayback ) if was_modified: data["web_claims"][i] = updated_claim claims_updated += 1 file_modified = True # Update provenance metadata if file_modified: data.setdefault("provenance", {}) data["provenance"]["updated_at"] = datetime.now(timezone.utc).isoformat() data["provenance"]["provenance_schema_version"] = PROVENANCE_SCHEMA_VERSION data["provenance"]["provenance_note"] = ( "Enhanced provenance per WEB_CLAIM_PROVENANCE_SCHEMA.md using real service calls " "(Wayback Machine API, GLM-4.6). Includes SHA-256 content hashes, verified memento URIs, " "W3C Web Annotation selectors, and PROV-O alignment." ) data["provenance"]["standards_compliance"] = [ "W3C PROV-O", "RFC 7089 Memento", "W3C SRI (content hashes)", "W3C Text Fragments", "W3C Web Annotation Data Model" ] data["provenance"]["enhancement_method"] = { "script": "scripts/add_web_claim_provenance.py", "glm_model": ZAI_MODEL if use_glm else None, "wayback_queried": query_wayback, "enhancement_timestamp": datetime.now(timezone.utc).isoformat() } if not dry_run: try: with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) if verbose: print(f" ✓ Saved {filepath.name}") except Exception as e: print(f"Error writing {filepath}: {e}", file=sys.stderr) return False, claims_updated, claims_total else: if verbose: print(f" [DRY-RUN] Would save {filepath.name}") return file_modified, claims_updated, claims_total async def main(): parser = argparse.ArgumentParser( description="Add enhanced provenance to web_claims in Dutch person entity JSON files" ) parser.add_argument( "--limit", type=int, default=None, help="Limit number of files to process" ) parser.add_argument( "--file", type=str, default=None, help="Process a specific file" ) parser.add_argument( "--dry-run", action="store_true", help="Don't write changes, just report what would be done" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Print detailed progress" ) parser.add_argument( "--no-glm", action="store_true", help="Skip GLM-4.6 enhancement (faster but less complete)" ) parser.add_argument( "--no-wayback", action="store_true", help="Skip Wayback Machine queries (faster but no real memento URIs)" ) parser.add_argument( "--pattern", type=str, default="NL-*.json", help="File pattern to match (default: NL-*.json for Dutch files)" ) args = parser.parse_args() # Check requirements if not HAS_HTTPX: print("Error: httpx is required. Install with: pip install httpx", file=sys.stderr) sys.exit(1) # Check API token if not args.no_glm and not ZAI_API_TOKEN: print("Warning: ZAI_API_TOKEN not set. GLM-4.6 enhancement will be skipped.", file=sys.stderr) args.no_glm = True # Get files to process if args.file: files = [Path(args.file)] if not files[0].exists(): print(f"Error: File not found: {args.file}", file=sys.stderr) sys.exit(1) else: files = sorted(PERSON_ENTITY_DIR.glob(args.pattern)) if args.limit: files = files[:args.limit] print(f"Processing {len(files)} files...") print(f" GLM-4.6 enhancement: {'enabled' if not args.no_glm else 'disabled'}") print(f" Wayback Machine queries: {'enabled' if not args.no_wayback else 'disabled'}") print(f" Dry run: {args.dry_run}") # Process files files_modified = 0 total_claims_updated = 0 total_claims = 0 async with httpx.AsyncClient() as session: for i, filepath in enumerate(files): if args.verbose or (i + 1) % 10 == 0: print(f"\n[{i+1}/{len(files)}] {filepath.name}") modified, claims_updated, claims_total = await process_file( filepath=filepath, session=session, dry_run=args.dry_run, verbose=args.verbose, use_glm=not args.no_glm, query_wayback=not args.no_wayback ) if modified: files_modified += 1 total_claims_updated += claims_updated total_claims += claims_total # Summary print(f"\n{'='*60}") print(f"SUMMARY") print(f"{'='*60}") print(f"Files processed: {len(files)}") print(f"Files modified: {files_modified}") print(f"Claims total: {total_claims}") print(f"Claims updated: {total_claims_updated}") if args.dry_run: print(f"\n[DRY-RUN] No files were actually modified.") else: print(f"\n✓ Done!") if __name__ == "__main__": asyncio.run(main())