#!/usr/bin/env python3
"""
Extract structured claims from archived website HTML with XPath provenance.
This script extracts verifiable data from archived HTML files following
the WebObservation provenance rules defined in AGENTS.md Rule 6.
EVERY claim MUST have:
- claim_type: Type of claim (org_name, description, email, phone, address, etc.)
- claim_value: The extracted value
- source_url: URL the claim was extracted from
- retrieved_on: ISO 8601 timestamp when page was archived
- xpath: XPath to the element containing this value
- html_file: Relative path to archived HTML file
- xpath_match_score: 1.0 for exact match, <1.0 for fuzzy match
Claims WITHOUT XPath provenance are FABRICATED and must NOT be stored.
Usage:
python scripts/extract_html_claims.py [--limit N] [--entry ENTRY_NUM] [--dry-run]
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
from urllib.parse import urlparse
import yaml
# Load environment variables from .env
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv is optional, rely on shell environment
# Optional httpx for z.ai API calls
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
httpx = None # type: ignore
# Type hints for optional dependencies
etree: Any = None
BeautifulSoup: Any = None
try:
from lxml import etree as _etree
etree = _etree
HAS_LXML = True
except ImportError:
HAS_LXML = False
print("Warning: Missing dependency: lxml")
print("Install with: pip install lxml")
try:
from bs4 import BeautifulSoup as _BeautifulSoup
BeautifulSoup = _BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
print("Warning: Missing dependency: beautifulsoup4")
print("Install with: pip install beautifulsoup4")
HAS_DEPS = HAS_LXML # Only lxml is required for this script
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
WEB_DIR = ENTRIES_DIR / 'web'
# Claim types to extract
CLAIM_TYPES = {
'org_name': 'Organization/institution official name',
'org_name_alt': 'Alternative organization name',
'tagline': 'Organization tagline or slogan',
'description': 'Organization description',
'description_short': 'Short description (meta description)',
'email': 'Email address',
'phone': 'Phone number',
'address': 'Physical address',
'postal_code': 'Postal code',
'city': 'City name',
'opening_hours_text': 'Opening hours as text',
'social_twitter': 'Twitter/X URL',
'social_facebook': 'Facebook URL',
'social_instagram': 'Instagram URL',
'social_linkedin': 'LinkedIn URL',
'social_youtube_channel': 'YouTube channel URL (official channel)',
'social_youtube_video': 'YouTube video URL (individual video, NOT institution channel)',
'social_tiktok': 'TikTok URL',
'social_pinterest': 'Pinterest URL',
# Video embeds
'video_youtube': 'YouTube video embed (ID or URL)',
'video_vimeo': 'Vimeo video embed (ID or URL)',
'video_other': 'Other video embed (Dailymotion, etc.)',
# Gallery/collection indicators
'gallery_detected': 'Gallery/slideshow detected on page',
'collection_page': 'Collection/exhibition page detected',
'image_count': 'Number of images in gallery container',
# External marketplace links
'external_boekwinkeltjes': 'Link to boekwinkeltjes.nl (book sales)',
# Page elements
'page_title': 'HTML page title',
'favicon': 'Favicon URL',
'logo': 'Logo image URL',
# Authentication UI elements
'ui_login': 'Login button/link detected',
'ui_signup': 'Signup/register button/link detected',
# Person/role NER claims (PiCO-aligned, extracted via z.ai API)
# Following Gado2 v1.5.0 annotation conventions with PiCO/PNV ontology
'person_name': 'Person name (picom:PersonObservation, pnv:literalName)',
'person_given_name': 'Given/first name (pnv:givenName)',
'person_family_name': 'Family/surname (pnv:baseSurname)',
'person_name_prefix': 'Name prefix like van, de (pnv:surnamePrefix)',
'job_title': 'Job title or professional role (rico:Position, sdo:Occupation)',
'title_rank': 'Honorific title or rank (rico:Title, DENOMINATION/TITLERANK)',
'department': 'Department or organizational unit (rico:CorporateBody)',
'role': 'Functional role in context (picom:Role)',
}
# =============================================================================
# DEFAULT PAGE / HOSTING PROVIDER BLOCKLIST
# =============================================================================
# These patterns indicate a web server default page (not institution content).
# When detected, extraction should be skipped to avoid attributing hosting
# provider social links (e.g., Plesk's YouTube channel) to institutions.
DEFAULT_PAGE_INDICATORS = [
# Plesk (common hosting control panel)
"Web Server's Default Page",
"Congratulations! Your Plesk is working",
"This page is used to test the proper operation of",
"web hosting platform",
# cPanel
"Great success! You've configured your",
"Default Web Site Page",
"cPanel, Inc.", # More specific than just "cPanel"
# DirectAdmin
"DirectAdmin default page",
# Apache
"Apache2 Ubuntu Default Page",
"Apache2 Debian Default Page",
"If you can read this page", # Apache default
# nginx
"Welcome to nginx!",
"If you see this page, the nginx web server is successfully installed",
# IIS
"Internet Information Services",
"IIS Windows Server",
# Generic hosting defaults
"Website Coming Soon",
"Under Construction",
"Parked Domain",
"This domain is parked",
"Domain Parking",
"This site is parked free",
# Dutch equivalents
"Website binnenkort beschikbaar",
"In aanbouw",
"Domein geparkeerd",
]
# YouTube channels known to belong to hosting providers (not institutions)
# These should NEVER be attributed to heritage institutions
BLOCKED_YOUTUBE_CHANNELS = {
# Plesk
"UCeU-_6YHGQFcVSHLbEXLNlA", # Plesk official channel
"plesk",
"@plesk",
# cPanel
"UCDGXoXJxAFYAGhXN7r62wvA", # cPanel official
"cpanel",
"@cpanel",
# Other hosting providers (add as discovered)
}
# =============================================================================
# Z.AI API CLIENT FOR NER (Anthropic-compatible endpoint for GLM Coding Plan)
# =============================================================================
# z.ai API configuration - Using Anthropic-compatible endpoint for GLM Coding Plan
# See: https://docs.z.ai/devpack/quick-start
ZAI_API_URL = "https://api.z.ai/api/anthropic/v1/messages"
ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "")
ZAI_MODEL = "glm-4.6" # Default model (z.ai's latest via Anthropic endpoint)
# NER extraction enabled flag
NER_ENABLED = bool(ZAI_API_TOKEN and HAS_HTTPX)
# Global flags for processing modes (set via CLI args)
FAST_MODE = False # Skip email NER, use pattern matching only
SKIP_VALIDATION = False # Skip LLM claim validation
def call_zai_api(
prompt: str,
system_prompt: Optional[str] = None,
model: str = ZAI_MODEL,
max_tokens: int = 1024,
temperature: float = 0.0,
) -> Optional[str]:
"""
Call z.ai API for NER extraction using Anthropic-compatible endpoint.
The GLM Coding Plan uses an Anthropic-compatible endpoint, which is different
from the standard z.ai OpenAI-style API. This function uses the Anthropic
message format as documented at https://docs.z.ai/devpack/tool/goose
Args:
prompt: The user prompt with context to analyze
system_prompt: Optional system instructions
model: Model to use (default: glm-4.6)
max_tokens: Maximum response tokens
temperature: Sampling temperature (0.0 for deterministic)
Returns:
Response text or None if failed
"""
if not ZAI_API_TOKEN:
return None
if not HAS_HTTPX:
return None
# Anthropic message format: system is separate, messages are user/assistant
messages = [{"role": "user", "content": prompt}]
payload = {
"model": model,
"max_tokens": max_tokens,
"messages": messages,
}
# Add system prompt as top-level field (Anthropic style)
if system_prompt:
payload["system"] = system_prompt
# Only add temperature if non-zero (some models may not support it)
if temperature > 0:
payload["temperature"] = temperature
# Anthropic-style headers: x-api-key instead of Authorization Bearer
headers = {
"x-api-key": ZAI_API_TOKEN,
"Content-Type": "application/json",
"anthropic-version": "2023-06-01", # Anthropic API version
}
# Retry logic for transient failures
max_retries = 3
for attempt in range(max_retries):
try:
response = httpx.post(
ZAI_API_URL,
json=payload,
headers=headers,
timeout=60.0, # Increased timeout for stability
)
response.raise_for_status()
result = response.json()
break # Success, exit retry loop
except (httpx.ReadTimeout, httpx.ConnectTimeout) as e:
if attempt < max_retries - 1:
import time
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f"z.ai API timeout, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})...", file=sys.stderr)
time.sleep(wait_time)
continue
print(f"z.ai API timeout after {max_retries} attempts: {e}", file=sys.stderr)
return None
except httpx.HTTPStatusError as e:
print(f"z.ai API HTTP error: {e.response.status_code} - {e.response.text}", file=sys.stderr)
return None
except Exception as e:
print(f"z.ai API error: {e}", file=sys.stderr)
return None
else:
return None # All retries exhausted
# Anthropic response format: content is a list of content blocks
if "content" in result and len(result["content"]) > 0:
# Get text from the first text block
for block in result["content"]:
if block.get("type") == "text":
return block.get("text")
# Fallback: return first block's text if type not specified
return result["content"][0].get("text")
return None
# =============================================================================
# NER SYSTEM PROMPT - Gado2 v1.5.0 Annotation Convention
# =============================================================================
#
# This NER extraction uses:
# - MODEL: GLM-4.6 (via z.ai Anthropic-compatible endpoint)
# - CONVENTION: Gado2 v1.5.0 (Golden Agents Data Annotations)
# - ONTOLOGIES: PiCO (Persons in Context Ontology), PNV (Person Name Vocabulary)
#
# Gado2 v1.5.0 Reference: https://github.com/knaw-huc/golden-agents-htr
# PiCO Ontology: https://data.goldenagents.org/ontology/pico/
# PNV Ontology: https://w3id.org/pnv
# =============================================================================
NER_CONVENTION_VERSION = "Gado2 v1.6.0-unified"
NER_MODEL = ZAI_MODEL # GLM-4.6
NER_SYSTEM_PROMPT = f"""You are an expert Named Entity Recognition (NER) system for Dutch heritage institution contact information.
=== ANNOTATION CONVENTION: {NER_CONVENTION_VERSION} ===
This extraction follows the Gado2 v1.5.0 annotation guidelines from the Golden Agents project,
combined with PiCO (Persons in Context Ontology) and PNV (Person Name Vocabulary) standards.
=== ENTITY TYPES AND ONTOLOGY MAPPING ===
1. PERSON NAMES (PNV - Person Name Vocabulary, https://w3id.org/pnv):
- pnv:literalName → Full name as written (e.g., "Jan van der Berg")
- pnv:givenName → First/given name (e.g., "Jan", "Maria", "Pieter")
- pnv:surnamePrefix → Dutch surname prefixes (e.g., "van", "de", "van der", "ter", "ten")
- pnv:baseSurname → Base surname without prefix (e.g., "Berg", "Vries", "Groot")
2. DENOMINATIONS (Gado2 v1.5.0 DENOMINATION category):
- DENOMINATION/PROF → Job title, profession, occupation
Maps to: rico:Position, schema:Occupation
Examples: voorzitter, secretaris, archivaris, bibliothecaris, conservator
- DENOMINATION/TITLERANK → Honorific title or rank
Maps to: rico:Title
Examples: dr., prof., ir., mr., drs., ing.
3. ROLES (PiCO - Persons in Context Ontology):
- picom:Role → Functional role in organizational context
Examples: contactpersoon, coördinator, beheerder, medewerker
- picom:PersonObservation → Observation of a person in a specific context
4. ORGANIZATIONAL UNITS (RiC-O - Records in Contexts Ontology):
- rico:CorporateBody → Department or organizational unit
Examples: bestuur, redactie, beeldbank, archief, bibliotheek
=== DUTCH HERITAGE INSTITUTION CONTEXT ===
Common Dutch job titles in heritage institutions:
- Board: voorzitter (chair), secretaris (secretary), penningmeester (treasurer), bestuurslid (board member)
- Editorial: redacteur (editor), hoofdredacteur (editor-in-chief), redactie (editorial board)
- Technical: webmaster, ICT-medewerker, beheerder (administrator)
- Collections: archivaris (archivist), bibliothecaris (librarian), conservator (curator)
- Contact: contactpersoon (contact person), coördinator (coordinator), medewerker (staff member)
Dutch surname prefixes (always lowercase, attached to surname):
van, de, het, den, der, ter, ten, van de, van der, van den, van het, in 't, op de, op 't
=== OUTPUT FORMAT ===
Return valid JSON only. No markdown code blocks. No explanatory text.
{{
"persons": [
{{
"full_name": "string or null - pnv:literalName",
"given_name": "string or null - pnv:givenName",
"surname_prefix": "string or null - pnv:surnamePrefix",
"base_surname": "string or null - pnv:baseSurname",
"job_title": "string or null - DENOMINATION/PROF",
"job_title_en": "string or null - English translation",
"title_rank": "string or null - DENOMINATION/TITLERANK",
"department": "string or null - rico:CorporateBody",
"department_en": "string or null - English translation",
"role": "string or null - picom:Role",
"email": "associated email address if identifiable"
}}
],
"confidence": 0.0-1.0,
"convention": "{NER_CONVENTION_VERSION}"
}}
If no person/role information is found, return: {{"persons": [], "confidence": 1.0, "convention": "{NER_CONVENTION_VERSION}"}}"""
def extract_ner_from_context(
context_text: str,
email: str,
html_context: Optional[str] = None,
verbose: bool = False,
) -> Optional[Dict[str, Any]]:
"""
Extract person/role NER from context around an email address.
Uses GLM-4.6 model via z.ai Anthropic-compatible endpoint with
Gado2 v1.5.0 annotation convention for Dutch heritage institutions.
Args:
context_text: Plain text context around the email
email: The email address being contextualized
html_context: Optional raw HTML context for additional signals
verbose: If True, log model and convention info
Returns:
Dict with extracted entities or None if NER failed/unavailable
Includes 'ner_model' and 'ner_convention' metadata fields
"""
if not NER_ENABLED:
return None
if verbose:
print(f" [NER] Using model: {NER_MODEL}", file=sys.stderr)
print(f" [NER] Convention: {NER_CONVENTION_VERSION}", file=sys.stderr)
# Build prompt with context
prompt = f"""Extract person and role information from this Dutch heritage institution contact context.
EMAIL: {email}
CONTEXT TEXT:
{context_text}
{f"HTML CONTEXT:{chr(10)}{html_context[:500]}" if html_context else ""}
Extract any person names, job titles, roles, or departments associated with this email contact.
Follow the {NER_CONVENTION_VERSION} annotation guidelines provided in the system prompt.
Return JSON only."""
response = call_zai_api(prompt, system_prompt=NER_SYSTEM_PROMPT, model=NER_MODEL)
if not response:
return None
# Parse JSON response
try:
# Handle potential markdown code blocks
if response.startswith("```"):
# Extract JSON from code block
lines = response.split("\n")
json_lines = []
in_block = False
for line in lines:
if line.startswith("```"):
in_block = not in_block
continue
if in_block:
json_lines.append(line)
response = "\n".join(json_lines)
result = json.loads(response)
# Add NER metadata to result
result['ner_model'] = NER_MODEL
result['ner_convention'] = NER_CONVENTION_VERSION
return result
except json.JSONDecodeError:
return None
# =============================================================================
# CLAIM VALIDATION SYSTEM - Gado2 v1.6.0-unified Compliance Check
# =============================================================================
#
# This validation uses GLM-4.6 to check if extracted claims are valid according
# to the Gado2 v1.6.0-unified annotation convention. This unified convention
# handles BOTH Early Modern Dutch texts AND modern web content through a
# multi-domain architecture with source_domains (EARLY_MODERN_TEXT, MODERN_WEB).
#
# For web content extraction, the WEB_EXC001-007 exclusion rules apply.
#
# Convention file: docs/convention/schema/20251202/entity_annotation_rules_v1.6.0_unified.yaml
# =============================================================================
VALIDATION_SYSTEM_PROMPT = f"""You are an expert claim validator for heritage institution web data extraction.
=== ANNOTATION CONVENTION: {NER_CONVENTION_VERSION} ===
You validate extracted org_name claims from MODERN DUTCH HERITAGE INSTITUTION WEBSITES
against the Gado2 v1.6.0-unified annotation guidelines.
This is a UNIFIED convention that handles both Early Modern Dutch texts and modern web content.
For web extraction, apply the MODERN_WEB source domain rules.
=== ORGANISATION ENTITY DEFINITION (v1.6.0) ===
Entity Type: ORGANISATION (ORG) / HERINST (Heritage Institution subcategory)
Description: Organizations including heritage institutions (museums, archives, libraries,
historical societies), companies, governments, branches, associations, legislative bodies,
political parties, military forces, sports teams, meetings, bands, religious orders, and ships.
Ontology Classes: rico:CorporateBody, rico:Group, crm:E74_Group, schema:Organization
=== HERITAGE INSTITUTION SUBCATEGORIES (HERINST - valid org_name) ===
HERINST/MUSEUM: "Rijksmuseum", "Amsterdam Museum", "Stedelijk Museum", "Smalspoormuseum"
HERINST/ARCHIVE: "Nationaal Archief", "Gemeentearchief", "Stadsarchief Rotterdam"
HERINST/LIBRARY: "Koninklijke Bibliotheek", "Universiteitsbibliotheek", "OBA"
HERINST/HISTSOC: "Historische Vereniging Nijeveen", "Heemkundige Kring De Goede Stede"
HERINST/RESEARCH: "NIOD", "Huygens Instituut", "Fryske Akademy"
HERINST/FOUNDATION: "Stichting Erfgoed", "Hidde Nijland Stichting"
=== OTHER ORGANISATION SUBCATEGORIES (from v1.5.0-ontology-pico) ===
COMP (Companies): "Philips", "ING", "Shell"
BRANCH (Branches): "ING Rotterdam", "Rekenkamer Gemeente Rotterdam"
ASSOC (Associations): "NVM", "de vakbond"
PUBFAC (Public Facilities): "Middelbare school", "Technische Universiteit Delft"
AUTH (Authorities): "Ministerie van Financiën", "Raad voor Aangelegenheden"
INTORG (International Orgs): "Verenigde Naties", "Europese Unie"
=== WEB INCLUSION RULES (WEB_INC) - v1.6.0-unified ===
WEB_INC001: Tag heritage institution names with specific identifiers
- VALID: "Nationaal Archief" (specific name)
- VALID: "Historische Vereniging Nijeveen" (place-qualified)
- VALID: "Smalspoormuseum" (distinctive compound)
WEB_INC002: Tag organization names in structured data (schema.org)
- VALID: Names from JSON-LD schema:Organization blocks
WEB_INC003: Tag organization names with legal form indicators
- VALID: "Stichting Openbare Bibliotheek" (Stichting = foundation)
- VALID: "Vereniging Oud-Haarlem" (Vereniging = association)
=== WEB EXCLUSION RULES (WEB_EXC) - v1.6.0-unified - CRITICAL ===
WEB_EXC001: Do NOT tag navigation menu items
- INVALID: "Home", "Menu", "Contact", "Contact opnemen", "Over ons"
- INVALID: "Nieuws", "Zoeken", "Welkom", "Informatie", "Terug", "Volgende"
- semantic_category: navigation
- Rationale: UI chrome, not organization identifiers
WEB_EXC002: Do NOT tag call-to-action buttons/links
- INVALID: "Lees meer", "Meer lezen", "Bekijk", "Download", "Bestel"
- INVALID: "Word lid", "Meld je aan", "Subscribe", "Doneer"
- semantic_category: cta
- Rationale: Interactive UI elements, not organization names
WEB_EXC003: Do NOT tag social media platform names
- INVALID: "Facebook", "Twitter", "Instagram", "LinkedIn", "YouTube", "X"
- INVALID: "TikTok", "Pinterest", "Flickr", "Vimeo"
- semantic_category: social_media
- Rationale: Third-party platforms, not the heritage institution itself
WEB_EXC004: Do NOT tag CMS placeholder/boilerplate text
- INVALID: "Hello world!", "Lorem ipsum", "Sample Page", "Just another WordPress site"
- INVALID: "Colofon", "Powered by WordPress", "Theme by..."
- semantic_category: cms_default
- Rationale: Template artifacts, not meaningful institution names
WEB_EXC005: Do NOT tag legal/policy page titles
- INVALID: "Privacy Policy", "Privacyverklaring", "Disclaimer", "Cookie Policy"
- INVALID: "Algemene voorwaarden", "Terms of Service", "ANBI"
- semantic_category: legal_boilerplate
- Rationale: Standard legal pages, not organization identifiers
WEB_EXC006: Do NOT tag web functionality labels
- INVALID: "Login", "Logout", "Inloggen", "Winkelwagen", "Cart", "Search"
- INVALID: "Sitemap", "RSS", "Print", "Share", "Delen"
- semantic_category: web_functionality
- Rationale: Web application UI, not organization names
WEB_EXC007: Do NOT tag generic single words without institution context
- INVALID: "Archief", "Museum", "Bibliotheek" (standalone)
- INVALID: "Collectie", "Expositie", "Tentoonstelling" (standalone)
- VALID: "Nationaal Archief", "Smalspoormuseum" (with qualifier)
- semantic_category: generic_word
- Rationale: Category labels need qualifying words to be institution names
=== BASE ORG EXCLUSION RULES (ORG_EXC from v1.5.0) ===
ORG_EXC001: Strip articles from organization names
- "de Tweede Kamer" → "Tweede Kamer"
ORG_EXC002: Don't tag abbreviations separately
- "Nederlandse Vereniging van Makelaars (NVM)" → tag full name only
ORG_EXC003: Don't tag generic group references
- INVALID: "De jongerenbeweging" (= DENOMINATION, not ORG)
=== VALIDATION RESPONSE FORMAT ===
Provide your FULL REASONING as provenance. This reasoning IS the validation evidence.
Return ONLY valid JSON (no markdown code blocks):
{{
"is_valid": true/false,
"reasoning": "Your complete analysis: What is this text? Why is it (in)valid? Which v1.6.0 convention rules apply? What semantic category does it belong to?",
"convention_rules": ["WEB_EXC001", "WEB_EXC003"] or ["WEB_INC001"] or [] if no specific rules,
"semantic_category": "navigation|cta|social_media|cms_default|legal_boilerplate|web_functionality|generic_word|heritage_institution|other",
"confidence": 0.0-1.0
}}
IMPORTANT: Your "reasoning" field is stored as provenance. Be thorough and explicit.
Reference specific v1.6.0-unified rule IDs (WEB_EXC001-007, WEB_INC001-003, ORG_EXC001-003) in your analysis.
"""
# Cache for validation results (to avoid repeated API calls)
_validation_cache: Dict[str, Dict[str, Any]] = {}
def validate_claim_with_llm(
claim_type: str,
claim_value: str,
extraction_method: str,
source_context: Optional[str] = None,
verbose: bool = False,
) -> Dict[str, Any]:
"""
Validate a claim using GLM-4.6 against Gado2 v1.5.0 convention.
Args:
claim_type: Type of claim (org_name, description, etc.)
claim_value: The extracted value to validate
extraction_method: How the claim was extracted (h1_tag, title_tag, etc.)
source_context: Optional surrounding HTML/text context
verbose: If True, log validation details
Returns:
Dict with validation result:
- is_valid: bool
- reason: str explanation
- convention_rule: str or None
- confidence: float 0.0-1.0
"""
# Only validate certain claim types that are prone to errors
VALIDATE_CLAIM_TYPES = {'org_name', 'org_name_alt', 'tagline', 'description_short'}
if claim_type not in VALIDATE_CLAIM_TYPES:
return {'is_valid': True, 'reason': 'Claim type not subject to LLM validation', 'convention_rule': None, 'confidence': 1.0}
# Skip validation if NER/API is not available
if not NER_ENABLED:
return {'is_valid': True, 'reason': 'LLM validation unavailable (no API token)', 'convention_rule': None, 'confidence': 0.5}
# Check cache
cache_key = f"{claim_type}:{claim_value}:{extraction_method}"
if cache_key in _validation_cache:
if verbose:
print(f" [VALIDATE] Cache hit for {claim_value[:30]}", file=sys.stderr)
return _validation_cache[cache_key]
if verbose:
print(f" [VALIDATE] Checking claim: {claim_type}={claim_value[:50]}", file=sys.stderr)
# Build validation prompt
prompt = f"""Validate this extracted claim from a Dutch heritage institution website:
CLAIM TYPE: {claim_type}
CLAIM VALUE: "{claim_value}"
EXTRACTION METHOD: {extraction_method}
{f"SOURCE CONTEXT: {source_context[:500]}" if source_context else ""}
Is this a valid {claim_type} according to Gado2 v1.5.0 convention?
For org_name claims: Is this the actual name of a heritage institution (museum, archive, library, historical society, etc.) or is it generic page text (navigation, headings, UI elements)?
Return JSON only."""
response = call_zai_api(prompt, system_prompt=VALIDATION_SYSTEM_PROMPT, model=NER_MODEL)
# Default result if API fails
default_result = {'is_valid': True, 'reason': 'API validation unavailable', 'convention_rule': None, 'confidence': 0.5}
if not response:
_validation_cache[cache_key] = default_result
return default_result
# Parse JSON response
try:
# Handle potential markdown code blocks
if response.startswith("```"):
lines = response.split("\n")
json_lines = []
in_block = False
for line in lines:
if line.startswith("```"):
in_block = not in_block
continue
if in_block:
json_lines.append(line)
response = "\n".join(json_lines)
result = json.loads(response)
# Ensure required fields with new format (reasoning, convention_rules)
result.setdefault('is_valid', True)
result.setdefault('confidence', 0.5)
# Handle both old format (reason/convention_rule) and new (reasoning/convention_rules)
if 'reasoning' in result:
result['reason'] = result['reasoning'] # Alias for compatibility
else:
result.setdefault('reason', 'Unknown')
result['reasoning'] = result['reason']
if 'convention_rules' in result:
# Store full list, and keep first for backward compatibility
result['convention_rule'] = result['convention_rules'][0] if result['convention_rules'] else None
else:
result.setdefault('convention_rule', None)
result['convention_rules'] = [result['convention_rule']] if result['convention_rule'] else []
result.setdefault('semantic_category', 'other')
# Cache the result
_validation_cache[cache_key] = result
if verbose:
status = "VALID" if result['is_valid'] else "INVALID"
reasoning_preview = result['reasoning'][:100] + "..." if len(result['reasoning']) > 100 else result['reasoning']
print(f" [VALIDATE] {status}: {reasoning_preview}", file=sys.stderr)
return result
except json.JSONDecodeError:
_validation_cache[cache_key] = default_result
return default_result
def filter_claims_with_validation(
claims: List[Dict],
verbose: bool = False,
) -> Tuple[List[Dict], List[Dict]]:
"""
Filter claims using LLM validation.
Args:
claims: List of claim dicts to validate
verbose: If True, log validation progress
Returns:
Tuple of (valid_claims, invalid_claims)
"""
valid_claims = []
invalid_claims = []
for claim in claims:
claim_type = claim.get('claim_type', '')
claim_value = claim.get('claim_value', '')
extraction_method = claim.get('extraction_method', '')
validation = validate_claim_with_llm(
claim_type=claim_type,
claim_value=claim_value,
extraction_method=extraction_method,
verbose=verbose,
)
if validation['is_valid']:
valid_claims.append(claim)
else:
# Store FULL validation provenance for audit (the reasoning IS the provenance)
claim['validation_provenance'] = {
'reasoning': validation.get('reasoning', validation.get('reason', 'Unknown')),
'convention_rules': validation.get('convention_rules', []),
'semantic_category': validation.get('semantic_category', 'other'),
'confidence': validation.get('confidence', 0.5),
'model': NER_MODEL,
'convention_version': NER_CONVENTION_VERSION,
}
# Keep backward-compatible fields
claim['validation_reason'] = validation.get('reasoning', validation.get('reason', 'Unknown'))
claim['validation_rule'] = validation.get('convention_rule')
invalid_claims.append(claim)
return valid_claims, invalid_claims
# Pattern-based role extraction (fallback when NER API unavailable)
# Dutch job titles/roles commonly found in heritage institution contacts
DUTCH_ROLE_PATTERNS = {
# Board positions
r'\bvoorzitter\b': {'job_title': 'voorzitter', 'job_title_en': 'chairperson'},
r'\bsecretaris\b': {'job_title': 'secretaris', 'job_title_en': 'secretary'},
r'\bsecretariaat\b': {'job_title': 'secretaris', 'job_title_en': 'secretary'},
r'\bpenningmeester\b': {'job_title': 'penningmeester', 'job_title_en': 'treasurer'},
r'\bbestuur\b': {'job_title': 'bestuurslid', 'job_title_en': 'board member'},
r'\bbestuurslid\b': {'job_title': 'bestuurslid', 'job_title_en': 'board member'},
# Editorial/content
r'\bredactie\b': {'job_title': 'redacteur', 'job_title_en': 'editor', 'department': 'redactie'},
r'\bredacteur\b': {'job_title': 'redacteur', 'job_title_en': 'editor'},
r'\bhoofdredacteur\b': {'job_title': 'hoofdredacteur', 'job_title_en': 'editor-in-chief'},
# Technical/digital
r'\bwebmaster\b': {'job_title': 'webmaster', 'job_title_en': 'webmaster'},
r'\bwebmast\b': {'job_title': 'webmaster', 'job_title_en': 'webmaster'},
r'\bict\b': {'job_title': 'ICT-medewerker', 'job_title_en': 'IT staff'},
r'\bbeheerder\b': {'job_title': 'beheerder', 'job_title_en': 'administrator'},
# Collections
r'\barchivaris\b': {'job_title': 'archivaris', 'job_title_en': 'archivist'},
r'\bbibliothecaris\b': {'job_title': 'bibliothecaris', 'job_title_en': 'librarian'},
r'\bconservator\b': {'job_title': 'conservator', 'job_title_en': 'curator'},
r'\bcurator\b': {'job_title': 'curator', 'job_title_en': 'curator'},
r'\bcollectiebeheer\b': {'job_title': 'collectiebeheerder', 'job_title_en': 'collection manager'},
# General
r'\bdirecteur\b': {'job_title': 'directeur', 'job_title_en': 'director'},
r'\bcoördinator\b': {'job_title': 'coördinator', 'job_title_en': 'coordinator'},
r'\bmedewerker\b': {'job_title': 'medewerker', 'job_title_en': 'staff member'},
r'\bvrijwilliger\b': {'job_title': 'vrijwilliger', 'job_title_en': 'volunteer'},
# Departments/sections
r'\bbeeldbank\b': {'department': 'beeldbank', 'department_en': 'image archive'},
r'\bdocumentenbank\b': {'department': 'documentenbank', 'department_en': 'document archive'},
r'\bvoorwerpen\b': {'department': 'voorwerpenbank', 'department_en': 'object collection'},
}
def extract_role_from_context_pattern(
context_text: str,
email: str,
) -> Optional[Dict[str, Any]]:
"""
Extract role/job title information using pattern matching.
This is a fallback when the LLM-based NER is unavailable.
Works well for structured Dutch heritage institution contacts.
Args:
context_text: Plain text context around the email (e.g., "- de voorzitter:")
email: The email address for context
Returns:
Dict with extracted role info or None if no patterns match
Examples:
>>> extract_role_from_context_pattern("- de voorzitter:", "voorzitter@example.nl")
{'job_title': 'voorzitter', 'job_title_en': 'chairperson', 'confidence': 0.9}
>>> extract_role_from_context_pattern("- de webmaster van de beeldbank:", "info@beeldbank.nl")
{'job_title': 'webmaster', 'job_title_en': 'webmaster', 'department': 'beeldbank', ...}
"""
if not context_text:
return None
context_lower = context_text.lower()
result = {}
confidence = 0.0
# Check each pattern
for pattern, info in DUTCH_ROLE_PATTERNS.items():
if re.search(pattern, context_lower, re.IGNORECASE):
result.update(info)
# Higher confidence for more specific matches
if 'job_title' in info:
confidence = max(confidence, 0.85)
if 'department' in info:
confidence = max(confidence, 0.8)
# Also check email prefix for role hints (e.g., voorzitter@, secretariaat@)
email_prefix = email.split('@')[0].lower() if '@' in email else ''
for pattern, info in DUTCH_ROLE_PATTERNS.items():
# Check if email prefix matches a role pattern
clean_pattern = pattern.replace(r'\b', '')
if re.search(clean_pattern, email_prefix):
# Merge info but don't overwrite existing
for k, v in info.items():
if k not in result:
result[k] = v
confidence = max(confidence, 0.9) # High confidence when email matches
if not result:
return None
result['confidence'] = confidence
result['extraction_method'] = 'pattern_matching'
return result
def get_xpath_lxml(element) -> str:
"""Generate absolute XPath for an lxml element."""
tree = element.getroottree()
return tree.getpath(element)
def get_xpath_bs4(element) -> str:
"""Generate XPath for a BeautifulSoup element."""
parts = []
current = element
while current and current.name:
siblings = [s for s in current.find_previous_siblings(current.name)]
index = len(siblings) + 1
parts.insert(0, f"{current.name}[{index}]")
current = current.parent
return '/' + '/'.join(parts) if parts else '/'
def get_institution_domain(entry_data: dict) -> Optional[str]:
"""
Extract the institution's primary domain from entry data.
Used to determine if email addresses belong to the institution
or are external (e.g., partner organizations, village associations).
Args:
entry_data: The entry dict loaded from YAML
Returns:
Normalized domain (e.g., 'dewolden.nl') or None if not found
Example:
>>> get_institution_domain({'original_entry': {'webadres_organisatie': 'https://www.dewolden.nl/'}})
'dewolden.nl'
"""
# Priority order for finding institution URL
url_sources = [
# Primary: original_entry.webadres_organisatie (from NDE CSV)
entry_data.get('original_entry', {}).get('webadres_organisatie', ''),
# Fallback: web_enrichment source URL
entry_data.get('web_enrichment', {}).get('web_archives', [{}])[0].get('url', ''),
# Fallback: any website identifier
*[ident.get('identifier_value', '') for ident in entry_data.get('identifiers', [])
if ident.get('identifier_scheme') == 'website'],
]
for url in url_sources:
if not url:
continue
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www. prefix for comparison
if domain.startswith('www.'):
domain = domain[4:]
if domain:
return domain
except Exception:
continue
return None
def is_email_external(email: str, institution_domain: Optional[str]) -> bool:
"""
Check if an email address belongs to an external organization.
Args:
email: Email address to check
institution_domain: The institution's primary domain (e.g., 'dewolden.nl')
Returns:
True if email is external, False if internal or domain unknown
Examples:
>>> is_email_external('gemeente@dewolden.nl', 'dewolden.nl')
False
>>> is_email_external('info@stichtingoco.nl', 'dewolden.nl')
True
>>> is_email_external('info@example.com', None) # Unknown institution domain
False
"""
if not institution_domain or '@' not in email:
return False # Can't determine, assume internal
email_domain = email.split('@')[1].lower()
# Remove www. prefix if present
if email_domain.startswith('www.'):
email_domain = email_domain[4:]
# Check if email domain matches institution domain (or is a subdomain)
return not (email_domain == institution_domain or email_domain.endswith('.' + institution_domain))
def create_claim(
claim_type: str,
claim_value: str,
xpath: str,
html_file: str,
source_url: str,
retrieved_on: str,
raw_value: Optional[str] = None,
extraction_method: str = 'html_parser',
xpath_match_score: float = 1.0,
**extra_fields,
) -> Dict[str, Any]:
"""Create a properly structured claim with full provenance.
Args:
claim_type: Type of claim (email, phone, org_name, etc.)
claim_value: The extracted value
xpath: XPath to the element containing this value
html_file: Relative path to archived HTML file
source_url: URL the claim was extracted from
retrieved_on: ISO 8601 timestamp when page was archived
raw_value: Original unprocessed value (optional)
extraction_method: Method used to extract (e.g., 'mailto_link', 'schema_org')
xpath_match_score: 1.0 for exact match, <1.0 for fuzzy
**extra_fields: Additional metadata fields (e.g., is_external, email_domain)
Returns:
Dict with full claim structure and provenance
"""
claim = {
'claim_type': claim_type,
'claim_value': claim_value.strip() if claim_value else '',
'raw_value': raw_value or claim_value,
'source_url': source_url,
'retrieved_on': retrieved_on,
'xpath': xpath,
'html_file': html_file,
'xpath_match_score': xpath_match_score,
'extraction_method': extraction_method,
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
# Add any extra fields (e.g., is_external for emails)
claim.update(extra_fields)
return claim
# === Extractors for specific claim types ===
def extract_title_claims(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract organization name from
tag."""
claims = []
titles = tree.xpath('//title')
for title in titles:
if title.text:
raw_text = title.text.strip()
# Try to extract clean org name (before separator)
separators = [' - ', ' | ', ' – ', ' — ', ': ']
clean_name = raw_text
for sep in separators:
if sep in raw_text:
parts = raw_text.split(sep)
# Usually the org name is first or last
clean_name = parts[0].strip()
break
claims.append(create_claim(
claim_type='org_name',
claim_value=clean_name,
xpath=get_xpath_lxml(title),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=raw_text,
extraction_method='title_tag',
))
return claims
def extract_meta_description(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract description from meta tags."""
claims = []
# Standard meta description
metas = tree.xpath('//meta[@name="description"]/@content')
meta_elements = tree.xpath('//meta[@name="description"]')
for i, content in enumerate(metas):
if content and content.strip():
claims.append(create_claim(
claim_type='description_short',
claim_value=content.strip(),
xpath=get_xpath_lxml(meta_elements[i]) if i < len(meta_elements) else '//meta[@name="description"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='meta_description',
))
# OpenGraph description
og_desc = tree.xpath('//meta[@property="og:description"]/@content')
og_elements = tree.xpath('//meta[@property="og:description"]')
for i, content in enumerate(og_desc):
if content and content.strip():
claims.append(create_claim(
claim_type='description_short',
claim_value=content.strip(),
xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:description"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='og_description',
))
return claims
def extract_og_site_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract organization name from og:site_name."""
claims = []
og_names = tree.xpath('//meta[@property="og:site_name"]/@content')
og_elements = tree.xpath('//meta[@property="og:site_name"]')
for i, content in enumerate(og_names):
if content and content.strip():
claims.append(create_claim(
claim_type='org_name',
claim_value=content.strip(),
xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:site_name"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='og_site_name',
))
return claims
def extract_schema_org(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract data from schema.org JSON-LD."""
claims = []
import json
scripts = tree.xpath('//script[@type="application/ld+json"]')
for script in scripts:
if script.text:
try:
data = json.loads(script.text)
if isinstance(data, list):
for item in data:
claims.extend(_extract_schema_item(item, get_xpath_lxml(script), html_file, source_url, retrieved_on))
else:
claims.extend(_extract_schema_item(data, get_xpath_lxml(script), html_file, source_url, retrieved_on))
except json.JSONDecodeError:
pass
return claims
def _classify_youtube_url_inline(url: str) -> str:
"""
Inline YouTube URL classifier for use before classify_youtube_url is defined.
Returns 'social_youtube_channel' for channel URLs, 'social_youtube_video' otherwise.
"""
# Channel URL patterns - these ARE official channel links
channel_patterns = [
'/@', # Handle format: /@username
'/channel/UC', # Channel ID format: /channel/UCxxxxx
'/user/', # Legacy user format: /user/username
'/c/', # Custom URL format: /c/customname
]
for pattern in channel_patterns:
if pattern in url:
return 'social_youtube_channel'
# Everything else (watch?v=, youtu.be/, shorts/, etc.) is a video
return 'social_youtube_video'
def _extract_schema_item(item: dict, xpath: str, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract claims from a schema.org item."""
claims = []
# Get the @type to distinguish organizations from events
item_type = item.get('@type', '')
if isinstance(item_type, list):
item_type = item_type[0] if item_type else ''
# Organization types that should have org_name extracted
org_types = {
'Organization', 'LocalBusiness', 'Museum', 'Library', 'Archive',
'EducationalOrganization', 'GovernmentOrganization', 'NGO',
'Corporation', 'Place', 'CivicStructure', 'LandmarksOrHistoricalBuildings',
'PerformingArtsTheater', 'MovieTheater', 'Zoo', 'Aquarium',
}
# Event types - extract as event_name, not org_name
event_types = {'Event', 'BusinessEvent', 'ChildrensEvent', 'ComedyEvent',
'CourseInstance', 'DanceEvent', 'DeliveryEvent', 'EducationEvent',
'EventSeries', 'ExhibitionEvent', 'Festival', 'FoodEvent',
'Hackathon', 'LiteraryEvent', 'MusicEvent', 'PublicationEvent',
'SaleEvent', 'ScreeningEvent', 'SocialEvent', 'SportsEvent',
'TheaterEvent', 'VisualArtsEvent'}
is_org = any(t in item_type for t in org_types) or not item_type
is_event = any(t in item_type for t in event_types)
# Organization name - only for org types or if @type is missing
if 'name' in item and is_org and not is_event:
name_value = item['name']
# Skip if it looks like HTML/code
if name_value and '<' not in name_value and len(name_value) < 200:
claims.append(create_claim(
claim_type='org_name',
claim_value=name_value,
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_name',
))
# Description - only for organizations, skip HTML/code
if 'description' in item and is_org and not is_event:
desc_value = item['description']
# Skip if it looks like HTML/code
if desc_value and '<' not in desc_value and 'vc_row' not in desc_value:
claims.append(create_claim(
claim_type='description',
claim_value=desc_value,
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_description',
))
# Address
if 'address' in item:
addr = item['address']
if isinstance(addr, str):
claims.append(create_claim(
claim_type='address',
claim_value=addr,
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_address',
))
elif isinstance(addr, dict):
if 'streetAddress' in addr:
claims.append(create_claim(
claim_type='address',
claim_value=addr['streetAddress'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_streetAddress',
))
if 'postalCode' in addr:
claims.append(create_claim(
claim_type='postal_code',
claim_value=addr['postalCode'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_postalCode',
))
if 'addressLocality' in addr:
claims.append(create_claim(
claim_type='city',
claim_value=addr['addressLocality'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_addressLocality',
))
# Phone
if 'telephone' in item:
claims.append(create_claim(
claim_type='phone',
claim_value=item['telephone'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_telephone',
))
# Email
if 'email' in item:
claims.append(create_claim(
claim_type='email',
claim_value=item['email'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_email',
))
# Social media
if 'sameAs' in item:
same_as = item['sameAs'] if isinstance(item['sameAs'], list) else [item['sameAs']]
for url in same_as:
if 'twitter.com' in url or 'x.com' in url:
claims.append(create_claim(claim_type='social_twitter', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'facebook.com' in url:
claims.append(create_claim(claim_type='social_facebook', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'instagram.com' in url:
claims.append(create_claim(claim_type='social_instagram', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'linkedin.com' in url:
claims.append(create_claim(claim_type='social_linkedin', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'youtube.com' in url or 'youtu.be' in url:
# Classify YouTube URLs as channel vs video
youtube_type = _classify_youtube_url_inline(url)
claims.append(create_claim(claim_type=youtube_type, claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
return claims
def extract_email_context(link) -> dict:
"""Extract contextual information around an email mailto link.
Looks for:
- Role/title text before the email (e.g., "Sportmedewerkers:", "Contact:")
- Organization name for external emails (e.g., "Plaatselijk Belang Alteveer")
- Person name if present
Args:
link: lxml element for the mailto link
Returns:
Dict with optional context fields:
- label: The descriptive label before the email
- organization_context: Organization name if this is a contact listing
- person_name: Person name if detectable
"""
# Words that are NOT useful as labels (common prepositions, conjunctions, etc.)
SKIP_WORDS = {
# Dutch
'of', 'en', 'via', 'naar', 'per', 'op', 'bij', 'aan', 'met', 'voor',
'door', 'om', 'dan', 'als', 'maar', 'want', 'dus', 'toch', 'nog',
'mail', 'e-mail', 'email',
# English
'or', 'and', 'via', 'to', 'at', 'by', 'for', 'with', 'from',
'the', 'a', 'an',
}
def is_valid_label(text: str) -> bool:
"""Check if text is a meaningful label (not just a common word)."""
if not text:
return False
text_lower = text.lower().strip()
# Skip if it's a single common word
if text_lower in SKIP_WORDS:
return False
# Skip if it's too short (less than 3 chars)
if len(text_lower) < 3:
return False
return True
context: dict = {
'label': None,
'organization_context': None,
}
# Strategy 1: Check if in a
element and get text before the link
parent = link.getparent()
if parent is not None and parent.tag == 'li':
# Get all text in the li before the link
li_text = parent.text or ''
# Also check for text in child elements before the link
for child in parent:
if child == link:
break
child_text = child.text or ''
child_tail = child.tail or ''
li_text += child_text + child_tail
li_text = li_text.strip()
# Clean up: remove trailing colon, nbsp, etc.
li_text = li_text.rstrip(':').rstrip('\xa0').strip()
if is_valid_label(li_text):
context['label'] = li_text
# If this looks like an organization name (title case, multiple words)
if any(c.isupper() for c in li_text) and len(li_text.split()) >= 2:
context['organization_context'] = li_text
# Strategy 2: Check immediate preceding sibling text
if not context['label']:
prev = link.getprevious()
if prev is not None and prev.tail:
tail_text = prev.tail.strip().rstrip(':').rstrip('\xa0').strip()
if is_valid_label(tail_text):
context['label'] = tail_text
elif parent is not None and parent.text:
# Text directly in parent before this element
parent_text = parent.text.strip().rstrip(':').rstrip('\xa0').strip()
if is_valid_label(parent_text):
context['label'] = parent_text
# Strategy 3: Check for label in a sibling , , or
if not context['label'] and parent is not None:
for sibling in parent:
if sibling == link:
break
if sibling.tag in ('strong', 'b', 'span', 'label'):
sib_text = (sibling.text or '').strip().rstrip(':').strip()
if is_valid_label(sib_text):
context['label'] = sib_text
break
return context
def get_broader_context(link, max_chars: int = 500) -> Tuple[str, str]:
"""
Get broader text and HTML context around an element for NER extraction.
Walks up the DOM tree to find meaningful context (paragraphs, list items,
divs, sections) and extracts text content.
Args:
link: lxml element
max_chars: Maximum characters to extract
Returns:
Tuple of (plain_text_context, html_context)
"""
# Find a meaningful parent container
container_tags = {'p', 'li', 'div', 'td', 'section', 'article', 'aside', 'header', 'footer', 'address'}
current = link.getparent()
container = None
# Walk up to find a good container (max 5 levels)
for _ in range(5):
if current is None:
break
if current.tag in container_tags:
container = current
# For small containers like
, try to get parent
text_len = len(etree.tostring(current, method='text', encoding='unicode') or '')
if text_len < 100 and current.tag in {'li', 'td'}:
parent = current.getparent()
if parent is not None and parent.tag in container_tags:
container = parent
break
current = current.getparent()
if container is None:
container = link.getparent()
if container is None:
return "", ""
# Get text content
text_content = etree.tostring(container, method='text', encoding='unicode') or ''
text_content = ' '.join(text_content.split())[:max_chars]
# Get HTML content (for additional signals like element names, classes)
try:
html_content = etree.tostring(container, encoding='unicode')[:max_chars]
except Exception:
html_content = ""
return text_content, html_content
def extract_email_links(tree, html_file: str, source_url: str, retrieved_on: str,
institution_domain: Optional[str] = None,
enable_ner: bool = True) -> List[Dict]:
"""Extract email addresses from mailto: links with optional NER for person/role extraction.
Args:
tree: Parsed lxml HTML tree
html_file: Relative path to HTML file
source_url: URL where page was fetched from
retrieved_on: ISO timestamp of archival
institution_domain: Primary domain of the institution (e.g., 'dewolden.nl')
Used to determine if emails are internal or external.
enable_ner: Whether to run NER extraction for person names/roles (default: True)
Returns:
List of claims including:
- email claims with context
- person_name, job_title, department, role claims (if NER enabled and successful)
"""
claims = []
mailto_links = tree.xpath('//a[starts-with(@href, "mailto:")]')
for link in mailto_links:
href = link.get('href', '')
if href.startswith('mailto:'):
email = href[7:].split('?')[0] # Remove query params
if email and '@' in email:
email_domain = email.split('@')[1].lower()
external = is_email_external(email, institution_domain)
email_xpath = get_xpath_lxml(link)
# Extract context around the email link
context = extract_email_context(link)
# Create base email claim
claims.append(create_claim(
claim_type='email',
claim_value=email,
xpath=email_xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='mailto_link',
# Additional metadata for email classification
is_external=external,
email_domain=email_domain,
# Context information
context_label=context.get('label'),
context_organization=context.get('organization_context'),
))
# Run NER/pattern extraction if enabled
if enable_ner:
text_context, html_context = get_broader_context(link)
ner_result = None
extraction_method = None
# Try LLM-based NER first (if API available and not in fast mode)
if NER_ENABLED and text_context and not FAST_MODE:
ner_result = extract_ner_from_context(
context_text=text_context,
email=email,
html_context=html_context,
)
# Include model and convention in extraction_method
extraction_method = f'ner_zai_{NER_MODEL}_{NER_CONVENTION_VERSION.replace(" ", "_")}'
# Fallback to pattern-based extraction
if not ner_result or not ner_result.get('persons'):
# Use context_label if available, otherwise use broader text context
pattern_context = context.get('label') or text_context or ''
pattern_result = extract_role_from_context_pattern(
context_text=pattern_context,
email=email,
)
if pattern_result:
# Convert pattern result to NER-like format
ner_result = {
'persons': [{
'job_title': pattern_result.get('job_title'),
'job_title_en': pattern_result.get('job_title_en'),
'department': pattern_result.get('department'),
'department_en': pattern_result.get('department_en'),
}],
'confidence': pattern_result.get('confidence', 0.8),
}
extraction_method = 'pattern_matching'
# Process NER results (from either source)
if ner_result and ner_result.get('persons'):
# Get model/convention from NER result if available
ner_model = ner_result.get('ner_model', NER_MODEL)
ner_convention = ner_result.get('ner_convention', NER_CONVENTION_VERSION)
# Ensure extraction_method has a default
if not extraction_method:
extraction_method = f'ner_zai_{NER_MODEL}_{NER_CONVENTION_VERSION.replace(" ", "_")}'
# Create claims for each extracted person
for person in ner_result['persons']:
confidence = ner_result.get('confidence', 0.8)
# Person name claim (full name)
if person.get('full_name'):
claims.append(create_claim(
claim_type='person_name',
claim_value=person['full_name'],
xpath=email_xpath, # XPath of associated email
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method=extraction_method,
xpath_match_score=confidence,
# Associated email for linking
associated_email=email,
# Name components (PNV aligned)
given_name=person.get('given_name'),
surname_prefix=person.get('surname_prefix'),
base_surname=person.get('base_surname'),
# NER provenance
ner_model=ner_model,
ner_convention=ner_convention,
))
# Job title claim
if person.get('job_title'):
claims.append(create_claim(
claim_type='job_title',
claim_value=person['job_title'],
xpath=email_xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method=extraction_method,
xpath_match_score=confidence,
associated_email=email,
associated_person=person.get('full_name'),
# English translation if available
job_title_en=person.get('job_title_en'),
# NER provenance
ner_model=ner_model,
ner_convention=ner_convention,
))
# Title/rank claim
if person.get('title_rank'):
claims.append(create_claim(
claim_type='title_rank',
claim_value=person['title_rank'],
xpath=email_xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method=extraction_method,
xpath_match_score=confidence,
associated_email=email,
associated_person=person.get('full_name'),
# NER provenance
ner_model=ner_model,
ner_convention=ner_convention,
))
# Department claim
if person.get('department'):
claims.append(create_claim(
claim_type='department',
claim_value=person['department'],
xpath=email_xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method=extraction_method,
xpath_match_score=confidence,
associated_email=email,
associated_person=person.get('full_name'),
# English translation if available
department_en=person.get('department_en'),
# NER provenance
ner_model=ner_model,
ner_convention=ner_convention,
))
# Role claim
if person.get('role'):
claims.append(create_claim(
claim_type='role',
claim_value=person['role'],
xpath=email_xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method=extraction_method,
xpath_match_score=confidence,
associated_email=email,
associated_person=person.get('full_name'),
# NER provenance
ner_model=ner_model,
ner_convention=ner_convention,
))
return claims
def extract_phone_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract phone numbers from tel: links."""
claims = []
tel_links = tree.xpath('//a[starts-with(@href, "tel:")]')
for link in tel_links:
href = link.get('href', '')
if href.startswith('tel:'):
phone = href[4:]
if phone:
claims.append(create_claim(
claim_type='phone',
claim_value=phone,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='tel_link',
))
return claims
def classify_youtube_url(url: str) -> str:
"""
Classify a YouTube URL as either a channel link or a video link.
CRITICAL: This distinction prevents wrong YouTube channel assignment!
A video link pointing to content ABOUT an institution is NOT the institution's
official channel. For example, a news report about Fryske Akademy hosted on
NOS Jeugdjournaal's channel should NOT be classified as Fryske Akademy's YouTube.
Returns:
'social_youtube_channel' - Official channel URLs (/@handle, /channel/, /user/, /c/)
'social_youtube_video' - Individual video URLs (watch?v=, youtu.be/, /shorts/)
"""
import re
# Channel URL patterns - these ARE official channel links
channel_patterns = [
r'youtube\.com/channel/UC[^/?&]+', # Channel ID format: /channel/UCxxxxx
r'youtube\.com/user/[^/?&]+', # Legacy user format: /user/username
r'youtube\.com/c/[^/?&]+', # Custom URL format: /c/customname
r'youtube\.com/@[^/?&]+', # Handle format: /@username (modern format)
]
for pattern in channel_patterns:
if re.search(pattern, url):
return 'social_youtube_channel'
# Video URL patterns - these are NOT official channels
video_patterns = [
r'youtube\.com/watch\?v=', # Standard video URL
r'youtu\.be/', # Short video URL
r'youtube\.com/shorts/', # Shorts video URL
r'youtube\.com/live/', # Live stream URL
r'youtube\.com/embed/', # Embed URL
r'youtube-nocookie\.com/embed/', # Privacy-enhanced embed
]
for pattern in video_patterns:
if re.search(pattern, url):
return 'social_youtube_video'
# Playlist URLs - also not direct channel links
if 'youtube.com/playlist' in url:
return 'social_youtube_video' # Treat playlists like videos (not channel)
# Default: if it's youtube.com but doesn't match channel patterns,
# treat it as potentially a video or unknown (safer to not assume channel)
return 'social_youtube_video'
def extract_social_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract social media links.
IMPORTANT:
- Filters out share/intent URLs which are NOT actual profiles.
- Distinguishes YouTube channel links from video links to prevent
incorrect channel attribution (e.g., news video ABOUT an institution
being mistaken for the institution's official channel).
"""
claims = []
social_patterns = {
'social_twitter': ['twitter.com', 'x.com'],
'social_facebook': ['facebook.com'],
'social_instagram': ['instagram.com'],
'social_linkedin': ['linkedin.com'],
# NOTE: YouTube handled separately with classify_youtube_url()
'social_tiktok': ['tiktok.com'],
'social_pinterest': ['pinterest.com', 'pinterest.nl'],
}
# Share URL patterns to EXCLUDE (not actual profiles)
share_patterns = [
'/sharer', '/share', '/intent/',
'shareArticle', '/pin/create',
'/submit', 'addthis.com', 'sharethis.com',
# Pinterest pin URLs (not profile pages)
'/pin/',
]
for link in tree.xpath('//a[@href]'):
href = link.get('href', '')
# Skip share/intent URLs
if any(pattern in href for pattern in share_patterns):
continue
# Handle YouTube URLs specially - classify as channel vs video
if 'youtube.com' in href or 'youtu.be' in href:
# Check if this is a blocked hosting provider channel
is_blocked = False
for blocked_id in BLOCKED_YOUTUBE_CHANNELS:
if blocked_id in href:
print(f" ⚠️ BLOCKED YouTube channel: {blocked_id} in {href}")
is_blocked = True
break
if is_blocked:
continue # Skip this link entirely
youtube_claim_type = classify_youtube_url(href)
claims.append(create_claim(
claim_type=youtube_claim_type,
claim_value=href,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='social_link',
))
continue
# Handle other social platforms
for claim_type, domains in social_patterns.items():
for domain in domains:
if domain in href:
claims.append(create_claim(
claim_type=claim_type,
claim_value=href,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='social_link',
))
break
return claims
def extract_h1_org_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract organization name from first h1.
IMPORTANT: Filters out generic UI text that is not an organization name.
Updated 2025-12-02 based on Gado2 v1.5.0 convention analysis.
"""
claims = []
# Generic UI text that should NOT be org names
# Based on analysis of 40,149 org_name claims across 1,630 entries
# Following Gado2 v1.5.0 ORGANISATION exclusion rules (ORG_EXC001-006)
INVALID_ORG_NAMES = {
# Navigation (Dutch + English)
'Home', 'home', 'HOME', 'Menu', 'menu', 'Contact', 'contact', 'Contact us',
'Over ons', 'About us', 'Nieuws', 'News', 'Zoeken', 'Search', 'Terug', 'Back',
'Volgende', 'Next', 'Vorige', 'Previous', 'Close', 'Sluiten',
# Section headers (Dutch)
'Welkom', 'Informatie', 'Homepage', 'Startpagina', 'Algemeen',
'Collectie', 'Collection', 'Agenda', 'Activiteiten', 'Activities',
'Vacatures', 'Organisatie', 'Nieuwsbrief', 'Newsletter', 'Bestuur',
'Publicaties', 'Publications', 'Openingstijden', 'Opening hours',
'Geschiedenis', 'History', 'Educatie', 'Education',
'Vrijwilligers', 'Volunteers', 'Tentoonstellingen', 'Exhibitions',
'Boeken', 'Books', 'Winkel', 'Shop', 'Werkgroepen', 'Genealogie',
'Exposities', 'Archief', 'Archive', 'Collecties', 'Collections',
'Jeugd', 'Youth', 'Onderwijs', 'Lidmaatschap', 'Membership',
'Jaarverslagen', 'Annual reports', 'Historie', 'Arrangementen',
'Rondleidingen', 'Tours', 'Partners', 'Actueel', 'Current',
'Tickets', 'Projecten', 'Projects', 'Contactformulier',
'Webshop', 'Vrienden', 'Friends', 'Pers', 'Press', 'Bezoek', 'Visit',
'Contactgegevens', 'Contact details', 'Bezoekersinformatie',
'Visitor information', 'Scholen', 'Schools', 'Medewerkers', 'Staff',
'Lezingen', 'Lectures', 'Groepsbezoek', 'Group visits',
'Expositie', 'Exhibition', 'Evenementen', 'Events',
'Donateurs', 'Donors', 'Colofon', 'Imprint', 'Links',
'Bibliotheek', 'Library', 'Museumwinkel', 'Museum shop',
'Beeldbank', 'Image bank', 'Archieven', 'Archives',
'Nieuwsbrieven', 'Newsletters', 'Sponsors', 'Sponsoren',
'Museum', 'Archeologie', 'Archaeology', 'Artikelen', 'Articles',
'Bereikbaarheid', 'Accessibility', 'Groepen', 'Groups',
# CTA / Call-to-action
'Lees meer', 'Meer lezen', 'Read more', 'Bekijk', 'View',
'Download', 'Steun ons', 'Support us', 'Lid worden', 'Become member',
'Word vrijwilliger', 'Become volunteer', 'Doneer', 'Donate',
'Plan je bezoek', 'Plan your visit', 'Word Vriend', 'Become Friend',
'Vrijwilliger worden', 'Schrijf je in', 'Sign up', 'Aanmelden',
# UI elements / Social
'Facebook', 'Twitter', 'Instagram', 'LinkedIn', 'YouTube', 'X',
'Zoekresultaten', 'Search results', 'Winkelwagen', 'Shopping cart',
'Sitemap', 'Login', 'Logout', 'Inloggen', 'Uitloggen',
'Chevron left', 'Chevron right', 'Arrow left', 'Arrow right',
'Eye', 'Share', 'Delen', 'Print', 'Oproep',
'Opent in externe pagina', 'Opens in new window',
'Loading...', 'Laden...', 'Wachtwoord kwijt', 'Forgot password',
# Legal / Policy pages
'ANBI', 'Privacyverklaring', 'Privacy statement', 'Disclaimer',
'Privacybeleid', 'Privacy policy', 'Cookies', 'Cookie policy',
'Algemene voorwaarden', 'Terms and conditions', 'Huisregels',
'House rules', 'Toegankelijkheid', 'Privacy verklaring', 'Privacy',
'Datenschutzerklärung', 'Impressum',
# FAQ / Generic
'Veelgestelde vragen', 'FAQ', 'Over het museum', 'About the museum',
'Verplicht', 'Required', 'Uncategorized', 'Geen categorie',
'admin', 'Het museum', 'The museum', 'Praktische informatie',
'Practical information', 'Tarieven', 'Rates', 'Toegangsprijzen',
'Admission', 'Werken bij', 'Work with us', 'Nu te zien', 'Now showing',
'English', 'Nederlands', 'Deutsch', 'Français',
# Content types / categories
'Natuur & Dieren', 'Nature & Animals', 'Kunst & Cultuur', 'Art & Culture',
'Sport', 'Koken & Eten', 'Food & Cooking', 'Biografie & Waargebeurd',
'Economie & Management', 'Spiritualiteit & Filosofie', 'Romantiek',
'Films', 'Podcasts', 'Video', 'Audio', 'Foto', "Foto's", 'Photos',
'Duurzaamheid', 'Sustainability', 'Schenkingen', 'Donations',
'Voortgezet onderwijs', 'Secondary education', 'Verhalen', 'Stories',
'Catalogus', 'Catalogue', 'Vaste collectie', 'Permanent collection',
'Kinderfeestje', "Children's party", 'Kinderactiviteiten',
"Children's activities", 'Museumcafé', 'Museum café',
'Online leren', 'Online learning', 'Gesproken boeken', 'Audiobooks',
'Spanning', 'Thriller', 'Bidprentjes', 'Prayer cards',
'Wetenschappelijke boeken lenen', 'Borrow scientific books',
'BoekStart', 'Grootletterboeken', 'Large print books',
'Engelse boeken', 'English books', 'Meer boeken', 'More books',
'Boeken op onderwerp', 'Books by subject',
'Informatiepunt Digitale Overheid', 'Hulp in de Bibliotheek',
'Ontdekken & Onderzoeken', 'Gezin & Gezondheid', 'Family & Health',
# Events
'Open Monumentendag', 'Monument Day', 'Evenementen in',
# Generic single words
'Doel', 'Goal', 'Boek', 'Book', 'Kaart', 'Map', 'Film',
'Wie zijn wij?', 'Who are we?',
# Technical/placeholder
'Gemeentearchief', # Generic term, not specific org name
}
# Patterns that indicate invalid org names (regex-like)
INVALID_PATTERNS = [
# Month patterns
'januari', 'februari', 'maart', 'april', 'mei', 'juni',
'juli', 'augustus', 'september', 'oktober', 'november', 'december',
# UI/icon references
'-svg', '-icoon', '-icon', 'icon-', 'svg-',
# Scroll/navigation
'scroll naar', 'scroll to',
# Archive patterns
'archieven', # when not exact "Archieven"
# Link labels
'externe-link',
]
h1s = tree.xpath('//h1')
if h1s:
h1 = h1s[0]
text = ''.join(h1.itertext()).strip()
# Filter out invalid org names
if text and len(text) > 2 and len(text) < 150:
# Check exact match (case-insensitive)
if text in INVALID_ORG_NAMES or text.lower() in {v.lower() for v in INVALID_ORG_NAMES}:
return claims
# Check patterns
text_lower = text.lower()
if any(pattern in text_lower for pattern in INVALID_PATTERNS):
return claims
# Check if it's just a year (e.g., "2023")
if text.isdigit() and len(text) == 4:
return claims
# Check if starts with "Evenementen in" (events calendar)
if text_lower.startswith('evenementen in '):
return claims
# Check for "News Archives" patterns
if 'nieuws archieven' in text_lower:
return claims
claims.append(create_claim(
claim_type='org_name',
claim_value=text,
xpath=get_xpath_lxml(h1),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='h1_tag',
xpath_match_score=0.9, # Slightly lower confidence
))
return claims
def extract_youtube_embeds(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract YouTube video embeds from iframes.
Finds:
- youtube.com/embed/VIDEO_ID
- youtube-nocookie.com/embed/VIDEO_ID
- youtu.be/VIDEO_ID (in data attributes)
"""
claims = []
# Standard YouTube iframes
youtube_iframes = tree.xpath(
'//iframe[contains(@src, "youtube.com/embed/") or contains(@src, "youtube-nocookie.com/embed/")]'
)
for iframe in youtube_iframes:
src = iframe.get('src', '')
# Extract video ID from URL
video_id_match = re.search(r'embed/([a-zA-Z0-9_-]{11})', src)
if video_id_match:
video_id = video_id_match.group(1)
claims.append(create_claim(
claim_type='video_youtube',
claim_value=f'https://www.youtube.com/watch?v={video_id}',
xpath=get_xpath_lxml(iframe),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=src,
extraction_method='youtube_iframe',
))
# Also check for YouTube links in data attributes (lazy-loaded videos)
youtube_data_attrs = tree.xpath(
'//*[@data-video-url[contains(., "youtube")] or @data-src[contains(., "youtube")]]'
)
for elem in youtube_data_attrs:
for attr in ['data-video-url', 'data-src', 'data-video-id']:
value = elem.get(attr, '')
if 'youtube' in value.lower():
video_id_match = re.search(r'(?:embed/|v=|youtu\.be/)([a-zA-Z0-9_-]{11})', value)
if video_id_match:
video_id = video_id_match.group(1)
claims.append(create_claim(
claim_type='video_youtube',
claim_value=f'https://www.youtube.com/watch?v={video_id}',
xpath=get_xpath_lxml(elem),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=value,
extraction_method='youtube_data_attr',
))
elif attr == 'data-video-id' and value and len(value) == 11:
# Direct video ID in data attribute
claims.append(create_claim(
claim_type='video_youtube',
claim_value=f'https://www.youtube.com/watch?v={value}',
xpath=get_xpath_lxml(elem),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=value,
extraction_method='youtube_video_id_attr',
))
return claims
def extract_vimeo_embeds(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract Vimeo video embeds from iframes.
Finds:
- player.vimeo.com/video/VIDEO_ID
- vimeo.com/VIDEO_ID (in data attributes)
"""
claims = []
# Standard Vimeo iframes
vimeo_iframes = tree.xpath(
'//iframe[contains(@src, "vimeo.com")]'
)
for iframe in vimeo_iframes:
src = iframe.get('src', '')
# Extract video ID from URL
video_id_match = re.search(r'vimeo\.com/(?:video/)?(\d+)', src)
if video_id_match:
video_id = video_id_match.group(1)
claims.append(create_claim(
claim_type='video_vimeo',
claim_value=f'https://vimeo.com/{video_id}',
xpath=get_xpath_lxml(iframe),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=src,
extraction_method='vimeo_iframe',
))
# Check for Vimeo links in data attributes
vimeo_data_attrs = tree.xpath(
'//*[@data-video-url[contains(., "vimeo")] or @data-src[contains(., "vimeo")]]'
)
for elem in vimeo_data_attrs:
for attr in ['data-video-url', 'data-src']:
value = elem.get(attr, '')
if 'vimeo' in value.lower():
video_id_match = re.search(r'vimeo\.com/(?:video/)?(\d+)', value)
if video_id_match:
video_id = video_id_match.group(1)
claims.append(create_claim(
claim_type='video_vimeo',
claim_value=f'https://vimeo.com/{video_id}',
xpath=get_xpath_lxml(elem),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=value,
extraction_method='vimeo_data_attr',
))
return claims
def extract_gallery_patterns(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Detect gallery/slideshow patterns indicating collection displays.
Finds common gallery plugins/patterns:
- Lightbox galleries
- WordPress gallery blocks
- Carousel/slider components
- Collection display patterns
"""
claims = []
# Gallery class patterns to detect (from analysis of 115K+ files)
gallery_patterns = [
# Lightbox patterns
('*[contains(@class, "lightbox")]', 'lightbox'),
('*[contains(@class, "fancybox")]', 'fancybox'),
('*[contains(@class, "simplelightbox")]', 'simplelightbox'),
# Gallery patterns
('*[contains(@class, "gallery")]', 'gallery'),
('*[contains(@class, "ngg-gallery")]', 'nextgen_gallery'),
('*[contains(@class, "spectra-image-gallery")]', 'spectra_gallery'),
('*[contains(@class, "et_pb_gallery")]', 'divi_gallery'),
('*[contains(@class, "kadence-blocks-gallery")]', 'kadence_gallery'),
('*[contains(@class, "elementor-gallery")]', 'elementor_gallery'),
('*[contains(@class, "woocommerce-product-gallery")]', 'woocommerce_gallery'),
# Carousel/slider patterns
('*[contains(@class, "carousel")]', 'carousel'),
('*[contains(@class, "slider")]', 'slider'),
('*[contains(@class, "swiper")]', 'swiper'),
('*[contains(@class, "slick")]', 'slick'),
# Collection page indicators
('*[contains(@class, "collection")]', 'collection'),
('*[contains(@class, "exhibit")]', 'exhibition'),
('*[contains(@class, "artwork")]', 'artwork'),
]
detected_galleries = {}
for xpath_pattern, gallery_type in gallery_patterns:
try:
elements = tree.xpath(f'//{xpath_pattern}')
if elements:
if gallery_type not in detected_galleries:
detected_galleries[gallery_type] = {
'count': len(elements),
'first_xpath': get_xpath_lxml(elements[0]),
}
except Exception:
continue
# Create claims for detected galleries
for gallery_type, info in detected_galleries.items():
claims.append(create_claim(
claim_type='gallery_detected',
claim_value=gallery_type,
xpath=info['first_xpath'],
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"{gallery_type}: {info['count']} elements",
extraction_method='gallery_pattern',
xpath_match_score=0.85, # Pattern detection has slightly lower confidence
))
# Count images within gallery containers
for gallery_type, info in detected_galleries.items():
try:
# Find images within gallery containers
gallery_images = tree.xpath(f'//*[contains(@class, "{gallery_type}")]//img')
if len(gallery_images) >= 3: # Only report if 3+ images (likely a gallery)
claims.append(create_claim(
claim_type='image_count',
claim_value=str(len(gallery_images)),
xpath=info['first_xpath'],
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"{len(gallery_images)} images in {gallery_type} container",
extraction_method='gallery_image_count',
xpath_match_score=0.8,
))
break # Only count once
except Exception:
continue
return claims
def extract_collection_page_indicators(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Detect collection/exhibition page patterns from URL and content.
Heritage institutions typically have:
- /collectie/ or /collection/ URLs
- /tentoonstelling/ or /exhibition/ URLs
- /object/ or /item/ pages
"""
claims = []
# Check URL patterns (from source_url)
collection_url_patterns = [
('collectie', 'collection_nl'),
('collection', 'collection_en'),
('tentoonstelling', 'exhibition_nl'),
('exhibition', 'exhibition_en'),
('expositie', 'exhibition_nl'),
('/object/', 'object_page'),
('/item/', 'item_page'),
('/artwork/', 'artwork_page'),
('/archief/', 'archive_nl'),
('/archive/', 'archive_en'),
('/catalogus/', 'catalog_nl'),
('/catalog/', 'catalog_en'),
]
url_lower = source_url.lower()
for pattern, indicator_type in collection_url_patterns:
if pattern in url_lower:
claims.append(create_claim(
claim_type='collection_page',
claim_value=indicator_type,
xpath='/', # URL-based detection
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"URL contains '{pattern}'",
extraction_method='url_pattern',
xpath_match_score=0.9,
))
# Check for canonical collection page meta tags
canonical = tree.xpath('//link[@rel="canonical"]/@href')
for href in canonical:
href = str(href) # Cast lxml _ElementUnicodeResult to plain string
href_lower = href.lower()
for pattern, indicator_type in collection_url_patterns:
if pattern in href_lower:
link_elem = tree.xpath('//link[@rel="canonical"]')[0]
claims.append(create_claim(
claim_type='collection_page',
claim_value=f'{indicator_type}_canonical',
xpath=get_xpath_lxml(link_elem),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=href,
extraction_method='canonical_url_pattern',
xpath_match_score=0.95,
))
break
# Check for collection-related structured data
try:
scripts = tree.xpath('//script[@type="application/ld+json"]')
for script in scripts:
if script.text:
text_lower = script.text.lower()
if any(term in text_lower for term in ['collection', 'museum', 'exhibition', 'artwork', 'archivecomponent']):
claims.append(create_claim(
claim_type='collection_page',
claim_value='structured_data_collection',
xpath=get_xpath_lxml(script),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value='JSON-LD contains collection-related schema',
extraction_method='schema_org_collection',
xpath_match_score=0.85,
))
break
except Exception:
pass
return claims
def extract_boekwinkeltjes_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract links to boekwinkeltjes.nl (Dutch secondhand book marketplace).
Heritage institutions often sell publications through boekwinkeltjes.nl.
Links may point to:
- Shop pages (/v/shopname/)
- Search results (/s/?q=...)
- General homepage references
"""
claims = []
# Find all links to boekwinkeltjes.nl
boekwinkeltjes_links = tree.xpath('//a[contains(@href, "boekwinkeltjes.nl")]')
for link in boekwinkeltjes_links:
href = str(link.get('href', ''))
if not href:
continue
# Determine link type
if '/v/' in href:
# Shop page: /v/shopname/
link_type = 'shop_page'
# Extract shop name
import re
match = re.search(r'/v/([^/]+)/?', href)
shop_name = match.group(1) if match else None
elif '/s/' in href or '/su/' in href:
# Search results
link_type = 'search_results'
shop_name = None
else:
# Generic link to homepage
link_type = 'homepage'
shop_name = None
claims.append(create_claim(
claim_type='external_boekwinkeltjes',
claim_value=href,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"link_type={link_type}" + (f", shop={shop_name}" if shop_name else ""),
extraction_method='boekwinkeltjes_link',
xpath_match_score=1.0,
))
return claims
def extract_page_title(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract the full page title.
Unlike extract_title_claims which tries to parse org name,
this extracts the complete tag content.
"""
claims = []
titles = tree.xpath('//title')
for title in titles:
if title.text:
raw_text = title.text.strip()
if raw_text and len(raw_text) > 0:
claims.append(create_claim(
claim_type='page_title',
claim_value=raw_text,
xpath=get_xpath_lxml(title),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='title_tag_full',
))
return claims
def extract_favicon(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract favicon URLs from link tags.
Looks for:
-
-
-
"""
claims = []
# Various favicon link patterns
favicon_selectors = [
'//link[@rel="icon"]',
'//link[@rel="shortcut icon"]',
'//link[contains(@rel, "icon")]',
'//link[@rel="apple-touch-icon"]',
'//link[@rel="apple-touch-icon-precomposed"]',
]
seen_hrefs = set()
for selector in favicon_selectors:
links = tree.xpath(selector)
for link in links:
href = link.get('href', '')
if href and href not in seen_hrefs:
seen_hrefs.add(href)
# Get additional attributes
sizes = link.get('sizes', '')
link_type = link.get('type', '')
raw_value = f"sizes={sizes}" if sizes else ""
if link_type:
raw_value += f", type={link_type}" if raw_value else f"type={link_type}"
claims.append(create_claim(
claim_type='favicon',
claim_value=str(href),
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=raw_value or None,
extraction_method='favicon_link',
))
return claims
def extract_logo(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract logo images from various patterns.
Looks for:
- Images with 'logo' in class, id, alt, or src
- Images inside elements with 'logo' in class/id
- Schema.org logo property
- OpenGraph image (often the logo)
"""
claims = []
seen_srcs = set()
# Pattern 1: Images with 'logo' in attributes
logo_images = tree.xpath(
'//img[contains(@class, "logo") or contains(@id, "logo") or '
'contains(translate(@alt, "LOGO", "logo"), "logo") or '
'contains(translate(@src, "LOGO", "logo"), "logo")]'
)
for img in logo_images:
src = img.get('src', '')
if src and src not in seen_srcs:
seen_srcs.add(src)
alt = img.get('alt', '')
claims.append(create_claim(
claim_type='logo',
claim_value=str(src),
xpath=get_xpath_lxml(img),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"alt={alt}" if alt else None,
extraction_method='logo_img_attr',
))
# Pattern 2: Images inside logo containers
logo_container_images = tree.xpath(
'//*[contains(@class, "logo") or contains(@id, "logo")]//img'
)
for img in logo_container_images:
src = img.get('src', '')
if src and src not in seen_srcs:
seen_srcs.add(src)
alt = img.get('alt', '')
claims.append(create_claim(
claim_type='logo',
claim_value=str(src),
xpath=get_xpath_lxml(img),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"alt={alt}" if alt else None,
extraction_method='logo_container_img',
))
# Pattern 3: Schema.org logo
schema_logos = tree.xpath('//meta[@property="og:image"]/@content')
schema_elements = tree.xpath('//meta[@property="og:image"]')
for i, content in enumerate(schema_logos):
if content and str(content) not in seen_srcs:
seen_srcs.add(str(content))
claims.append(create_claim(
claim_type='logo',
claim_value=str(content),
xpath=get_xpath_lxml(schema_elements[i]) if i < len(schema_elements) else '//meta[@property="og:image"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='og_image',
xpath_match_score=0.7, # Lower confidence - og:image might not be logo
))
# Pattern 4: Link with logo in itemprop
itemprop_logos = tree.xpath('//*[@itemprop="logo"]/@content | //*[@itemprop="logo"]/@src | //*[@itemprop="logo"]/@href')
itemprop_elements = tree.xpath('//*[@itemprop="logo"]')
for i, content in enumerate(itemprop_logos):
if content and str(content) not in seen_srcs:
seen_srcs.add(str(content))
claims.append(create_claim(
claim_type='logo',
claim_value=str(content),
xpath=get_xpath_lxml(itemprop_elements[i]) if i < len(itemprop_elements) else '//*[@itemprop="logo"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_logo',
))
return claims
def extract_login_signup(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract login and signup buttons/links.
Detects authentication UI elements indicating member portals,
user accounts, or restricted content areas.
"""
claims = []
# Login patterns (Dutch + English + German + French)
login_patterns = [
# Dutch
'inloggen', 'log in', 'login', 'aanmelden', 'mijn account', 'mijn profiel',
# English
'sign in', 'signin', 'log on', 'logon', 'my account', 'member login',
# German
'anmelden', 'einloggen', 'mein konto',
# French
'connexion', 'se connecter', 'mon compte',
]
# Signup patterns (Dutch + English + German + French)
signup_patterns = [
# Dutch
'registreren', 'registreer', 'account aanmaken', 'word lid', 'lid worden',
'nieuw account', 'schrijf in', 'inschrijven',
# English
'sign up', 'signup', 'register', 'create account', 'join', 'become a member',
'new account', 'subscribe',
# German
'registrieren', 'konto erstellen', 'mitglied werden',
# French
'inscription', 's\'inscrire', 'créer un compte',
]
# Search in links and buttons
clickable_elements = tree.xpath('//a | //button | //input[@type="submit"] | //input[@type="button"]')
for elem in clickable_elements:
# Get text content and relevant attributes
text_content = ''.join(elem.itertext()).strip().lower()
href = str(elem.get('href', '')).lower()
title = str(elem.get('title', '')).lower()
aria_label = str(elem.get('aria-label', '')).lower()
elem_class = str(elem.get('class', '')).lower()
elem_id = str(elem.get('id', '')).lower()
value = str(elem.get('value', '')).lower()
# Combine all searchable text
searchable = f"{text_content} {href} {title} {aria_label} {elem_class} {elem_id} {value}"
# Check for login patterns
for pattern in login_patterns:
if pattern in searchable:
# Get the actual displayed text or value
display_value = text_content or value or title or aria_label or pattern
claims.append(create_claim(
claim_type='ui_login',
claim_value=elem.get('href', '') or display_value,
xpath=get_xpath_lxml(elem),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"text={display_value}, pattern={pattern}",
extraction_method='login_button',
))
break # Only one match per element
# Check for signup patterns
for pattern in signup_patterns:
if pattern in searchable:
display_value = text_content or value or title or aria_label or pattern
claims.append(create_claim(
claim_type='ui_signup',
claim_value=elem.get('href', '') or display_value,
xpath=get_xpath_lxml(elem),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=f"text={display_value}, pattern={pattern}",
extraction_method='signup_button',
))
break # Only one match per element
# Also check for login forms
login_forms = tree.xpath(
'//form[contains(@action, "login") or contains(@action, "signin") or '
'contains(@action, "auth") or contains(@id, "login") or contains(@class, "login")]'
)
for form in login_forms:
action = form.get('action', '')
claims.append(create_claim(
claim_type='ui_login',
claim_value=str(action) if action else 'login_form_detected',
xpath=get_xpath_lxml(form),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value='login_form',
extraction_method='login_form',
xpath_match_score=0.9,
))
return claims
def extract_all_claims(html_content: str, html_file: str, source_url: str, retrieved_on: str,
institution_domain: Optional[str] = None) -> List[Dict]:
"""Extract all claims from HTML content.
Args:
html_content: Raw HTML string
html_file: Relative path to HTML file
source_url: URL where page was fetched from
retrieved_on: ISO timestamp of archival
institution_domain: Primary domain of the institution (for email classification)
Returns:
List of claim dictionaries, or empty list if default page detected
"""
claims = []
# ==========================================================================
# BLOCKLIST CHECK: Skip extraction for hosting provider default pages
# ==========================================================================
# These pages contain social links to hosting providers (e.g., Plesk's YouTube)
# which should NOT be attributed to heritage institutions.
for indicator in DEFAULT_PAGE_INDICATORS:
if indicator in html_content:
print(f" ⚠️ BLOCKED: Default page detected ('{indicator}')")
print(f" Skipping extraction to avoid hosting provider attribution")
return [] # Return empty list - no claims from default pages
try:
# Parse with lxml for proper XPath support
tree = etree.HTML(html_content)
# Standard extractors (all take same 4 params)
standard_extractors = [
extract_title_claims,
extract_meta_description,
extract_og_site_name,
extract_schema_org,
extract_phone_links,
extract_social_links,
extract_h1_org_name,
# Video embeds
extract_youtube_embeds,
extract_vimeo_embeds,
# Gallery and collection patterns
extract_gallery_patterns,
extract_collection_page_indicators,
# External marketplace links
extract_boekwinkeltjes_links,
# Page metadata and branding
extract_page_title,
extract_favicon,
extract_logo,
# UI patterns (login/signup detection)
extract_login_signup,
]
for extractor in standard_extractors:
try:
claims.extend(extractor(tree, html_file, source_url, retrieved_on))
except Exception as e:
print(f" Warning: Extractor {extractor.__name__} failed: {e}")
# Email extractor gets additional institution_domain parameter
try:
claims.extend(extract_email_links(tree, html_file, source_url, retrieved_on, institution_domain))
except Exception as e:
print(f" Warning: Extractor extract_email_links failed: {e}")
except Exception as e:
print(f" Error parsing HTML: {e}")
return claims
def deduplicate_claims(claims: List[Dict]) -> List[Dict]:
"""Remove duplicate claims, keeping highest confidence."""
seen = {}
for claim in claims:
key = (claim['claim_type'], claim['claim_value'])
if key not in seen or claim['xpath_match_score'] > seen[key]['xpath_match_score']:
seen[key] = claim
return list(seen.values())
def get_web_archive_path(entry_data: dict, entry_num: str) -> Optional[Path]:
"""Get the web archive directory path for an entry."""
web_enrichment = entry_data.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
if web_archives:
archive = web_archives[0]
directory = archive.get('directory')
if directory:
return ENTRIES_DIR / directory
# Fallback: look for directory in web/{entry_num}/
entry_web_dir = WEB_DIR / entry_num
if entry_web_dir.exists():
subdirs = [d for d in entry_web_dir.iterdir() if d.is_dir()]
if subdirs:
return subdirs[0]
return None
def load_metadata(archive_path: Path) -> Optional[dict]:
"""Load metadata.yaml from archive directory."""
metadata_file = archive_path / 'metadata.yaml'
if metadata_file.exists():
try:
with open(metadata_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f" Warning: Failed to load {metadata_file}: {e}")
return None
def find_html_files(archive_path: Path) -> List[Path]:
"""Find all HTML files in archive directory."""
html_files = []
# Check pages/ directory first
pages_dir = archive_path / 'pages'
if pages_dir.exists():
html_files.extend(pages_dir.glob('*.html'))
# Check mirror/ directory
mirror_dir = archive_path / 'mirror'
if mirror_dir.exists():
html_files.extend(mirror_dir.rglob('*.html'))
# Check root for rendered.html
rendered = archive_path / 'rendered.html'
if rendered.exists():
html_files.append(rendered)
return html_files
def extract_entry_number(filename: str) -> str:
"""Extract entry number from filename."""
match = re.match(r'^(\d+)', filename)
return match.group(1) if match else filename.replace('.yaml', '')
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, List[str]]:
"""
Process a single entry file to extract HTML claims.
Returns: (claims_count, errors)
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return 0, ["Empty file"]
entry_num = extract_entry_number(filepath.name)
errors = []
all_claims = []
# Get web archive path
archive_path = get_web_archive_path(data, entry_num)
if not archive_path or not archive_path.exists():
return 0, [f"No web archive found for entry {entry_num}"]
# Load metadata for timestamps
metadata = load_metadata(archive_path)
source_url = metadata.get('url', '') if metadata else ''
retrieved_on = metadata.get('archive_timestamp', '') if metadata else ''
if not source_url:
# Try to get URL from entry data
source_url = data.get('web_enrichment', {}).get('web_archives', [{}])[0].get('url', '')
if not source_url:
source_url = data.get('original_entry', {}).get('webadres_organisatie', '')
# Extract institution domain for email classification
institution_domain = get_institution_domain(data)
# Find and process HTML files
html_files = find_html_files(archive_path)
if not html_files:
return 0, [f"No HTML files found in {archive_path}"]
# Process HTML files with smart prioritization:
# 1. Prioritize pages with known interesting content patterns
# 2. Process all prioritized files + sample of others
MAX_HTML_FILES = 100
# Patterns that indicate interesting subpages
priority_patterns = [
'bibliotheek', 'collectie', 'collection', 'publicat', 'uitgave',
'winkel', 'shop', 'boek', 'book', 'contact', 'over-ons', 'about',
'social', 'link', 'partner', 'sponsor'
]
# Separate priority files from others
priority_files = []
other_files = []
for f in html_files:
filename_lower = str(f).lower()
if any(p in filename_lower for p in priority_patterns):
priority_files.append(f)
else:
other_files.append(f)
# Process all priority files + fill remaining slots with others
files_to_process = priority_files[:MAX_HTML_FILES]
remaining_slots = MAX_HTML_FILES - len(files_to_process)
if remaining_slots > 0:
files_to_process.extend(other_files[:remaining_slots])
for html_file in files_to_process:
try:
with open(html_file, 'r', encoding='utf-8', errors='replace') as f:
html_content = f.read()
html_file_rel = str(html_file.relative_to(ENTRIES_DIR))
claims = extract_all_claims(html_content, html_file_rel, source_url, retrieved_on, institution_domain)
all_claims.extend(claims)
except Exception as e:
errors.append(f"Failed to process {html_file}: {e}")
# Deduplicate claims
all_claims = deduplicate_claims(all_claims)
# Validate claims using LLM (Gado2 v1.5.0 convention)
invalid_claims = []
if NER_ENABLED and all_claims and not SKIP_VALIDATION:
# Filter claims using GLM-4.6 validation
all_claims, invalid_claims = filter_claims_with_validation(all_claims, verbose=False)
if not dry_run:
# Store claims in entry data (even if empty, to clear old bad data)
if 'web_claims' not in data:
data['web_claims'] = {}
web_claims_data = {
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
'source_archive': str(archive_path.relative_to(ENTRIES_DIR)),
'claims_count': len(all_claims),
'claims': all_claims,
}
# Store invalid claims for audit (filtered by LLM validation)
if invalid_claims:
web_claims_data['removed_invalid_claims'] = invalid_claims
web_claims_data['validation_metadata'] = {
'model': NER_MODEL,
'convention': NER_CONVENTION_VERSION,
'validated_at': datetime.now(timezone.utc).isoformat(),
'invalid_count': len(invalid_claims),
}
data['web_claims'] = web_claims_data
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return len(all_claims), errors
def main():
parser = argparse.ArgumentParser(description='Extract structured claims from archived HTML')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
parser.add_argument('--force', action='store_true', help='Re-extract even if web_claims exists')
parser.add_argument('--fast', action='store_true',
help='Fast mode: skip email NER (use patterns only), keep claim validation')
parser.add_argument('--no-validation', action='store_true',
help='Skip LLM claim validation (fastest, but may include invalid claims)')
args = parser.parse_args()
# Set global flags for fast mode
global FAST_MODE, SKIP_VALIDATION
FAST_MODE = args.fast
SKIP_VALIDATION = args.no_validation
if not HAS_DEPS:
print("Error: Required dependencies not installed.")
print("Run: pip install beautifulsoup4 lxml")
return 1
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
total_claims = 0
total_entries = 0
total_skipped = 0
total_failed = 0
print(f"Processing {len(files)} entries...")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print()
for filepath in files:
if filepath.is_dir():
continue
# Skip if already has web_claims (unless --force)
if not args.force:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('web_claims', {}).get('claims'):
total_skipped += 1
continue
claims_count, errors = process_entry(filepath, dry_run=args.dry_run)
if claims_count > 0:
total_entries += 1
total_claims += claims_count
print(f" ✓ {filepath.name}: {claims_count} claims")
elif errors:
total_failed += 1
for e in errors:
print(f" ✗ {filepath.name}: {e}")
else:
total_failed += 1
print(f" ✗ {filepath.name}: No claims extracted")
print()
print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:")
print(f" Entries with claims: {total_entries}")
print(f" Total claims extracted: {total_claims}")
print(f" Skipped (already have claims): {total_skipped}")
print(f" Failed (no archive/claims): {total_failed}")
return 0
if __name__ == '__main__':
sys.exit(main())