3048 lines
123 KiB
Python
3048 lines
123 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Extract structured claims from archived website HTML with XPath provenance.
|
||
|
||
This script extracts verifiable data from archived HTML files following
|
||
the WebObservation provenance rules defined in AGENTS.md Rule 6.
|
||
|
||
EVERY claim MUST have:
|
||
- claim_type: Type of claim (org_name, description, email, phone, address, etc.)
|
||
- claim_value: The extracted value
|
||
- source_url: URL the claim was extracted from
|
||
- retrieved_on: ISO 8601 timestamp when page was archived
|
||
- xpath: XPath to the element containing this value
|
||
- html_file: Relative path to archived HTML file
|
||
- xpath_match_score: 1.0 for exact match, <1.0 for fuzzy match
|
||
|
||
Claims WITHOUT XPath provenance are FABRICATED and must NOT be stored.
|
||
|
||
Usage:
|
||
python scripts/extract_html_claims.py [--limit N] [--entry ENTRY_NUM] [--dry-run]
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict, Any, Tuple
|
||
from urllib.parse import urlparse
|
||
|
||
import yaml
|
||
|
||
# Load environment variables from .env
|
||
try:
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
except ImportError:
|
||
pass # dotenv is optional, rely on shell environment
|
||
|
||
# Optional httpx for z.ai API calls
|
||
try:
|
||
import httpx
|
||
HAS_HTTPX = True
|
||
except ImportError:
|
||
HAS_HTTPX = False
|
||
httpx = None # type: ignore
|
||
|
||
# Type hints for optional dependencies
|
||
etree: Any = None
|
||
BeautifulSoup: Any = None
|
||
|
||
try:
|
||
from lxml import etree as _etree
|
||
etree = _etree
|
||
HAS_LXML = True
|
||
except ImportError:
|
||
HAS_LXML = False
|
||
print("Warning: Missing dependency: lxml")
|
||
print("Install with: pip install lxml")
|
||
|
||
try:
|
||
from bs4 import BeautifulSoup as _BeautifulSoup
|
||
BeautifulSoup = _BeautifulSoup
|
||
HAS_BS4 = True
|
||
except ImportError:
|
||
HAS_BS4 = False
|
||
print("Warning: Missing dependency: beautifulsoup4")
|
||
print("Install with: pip install beautifulsoup4")
|
||
|
||
HAS_DEPS = HAS_LXML # Only lxml is required for this script
|
||
|
||
|
||
# Directories
|
||
BASE_DIR = Path('/Users/kempersc/apps/glam/data')
|
||
ENTRIES_DIR = BASE_DIR / 'nde/enriched/entries'
|
||
WEB_DIR = BASE_DIR / 'custodian/web'
|
||
|
||
|
||
# Claim types to extract
|
||
CLAIM_TYPES = {
|
||
'org_name': 'Organization/institution official name',
|
||
'org_name_alt': 'Alternative organization name',
|
||
'tagline': 'Organization tagline or slogan',
|
||
'description': 'Organization description',
|
||
'description_short': 'Short description (meta description)',
|
||
'email': 'Email address',
|
||
'phone': 'Phone number',
|
||
'address': 'Physical address',
|
||
'postal_code': 'Postal code',
|
||
'city': 'City name',
|
||
'opening_hours_text': 'Opening hours as text',
|
||
'social_twitter': 'Twitter/X URL',
|
||
'social_facebook': 'Facebook URL',
|
||
'social_instagram': 'Instagram URL',
|
||
'social_linkedin': 'LinkedIn URL',
|
||
'social_youtube_channel': 'YouTube channel URL (official channel)',
|
||
'social_youtube_video': 'YouTube video URL (individual video, NOT institution channel)',
|
||
'social_tiktok': 'TikTok URL',
|
||
'social_pinterest': 'Pinterest URL',
|
||
# Video embeds
|
||
'video_youtube': 'YouTube video embed (ID or URL)',
|
||
'video_vimeo': 'Vimeo video embed (ID or URL)',
|
||
'video_other': 'Other video embed (Dailymotion, etc.)',
|
||
# Gallery/collection indicators
|
||
'gallery_detected': 'Gallery/slideshow detected on page',
|
||
'collection_page': 'Collection/exhibition page detected',
|
||
'image_count': 'Number of images in gallery container',
|
||
# External marketplace links
|
||
'external_boekwinkeltjes': 'Link to boekwinkeltjes.nl (book sales)',
|
||
# Page elements
|
||
'page_title': 'HTML page title',
|
||
'favicon': 'Favicon URL',
|
||
'logo': 'Logo image URL',
|
||
# Authentication UI elements
|
||
'ui_login': 'Login button/link detected',
|
||
'ui_signup': 'Signup/register button/link detected',
|
||
# Person/role NER claims (PiCO-aligned, extracted via z.ai API)
|
||
# Following Gado2 v1.5.0 annotation conventions with PiCO/PNV ontology
|
||
'person_name': 'Person name (picom:PersonObservation, pnv:literalName)',
|
||
'person_given_name': 'Given/first name (pnv:givenName)',
|
||
'person_family_name': 'Family/surname (pnv:baseSurname)',
|
||
'person_name_prefix': 'Name prefix like van, de (pnv:surnamePrefix)',
|
||
'job_title': 'Job title or professional role (rico:Position, sdo:Occupation)',
|
||
'title_rank': 'Honorific title or rank (rico:Title, DENOMINATION/TITLERANK)',
|
||
'department': 'Department or organizational unit (rico:CorporateBody)',
|
||
'role': 'Functional role in context (picom:Role)',
|
||
# Financial documents (December 2025)
|
||
# For extracting links to annual reports, financial statements, policy documents
|
||
# from jaarverslag/organisatie pages on Dutch heritage institution websites
|
||
'annual_report_url': 'Annual report PDF URL (Dutch: jaarverslag, publieksjaarverslag)',
|
||
'financial_statement_url': 'Financial statement document URL (Dutch: jaarstukken, jaarrekening)',
|
||
'anbi_publication_url': 'ANBI publication URL (Dutch charity tax status)',
|
||
'policy_document_url': 'Multi-year policy document URL (Dutch: meerjarenbeleid, beleidsplan)',
|
||
'financial_document_year': 'Fiscal year of financial document (extracted from filename/text)',
|
||
}
|
||
|
||
|
||
# =============================================================================
|
||
# DEFAULT PAGE / HOSTING PROVIDER BLOCKLIST
|
||
# =============================================================================
|
||
# These patterns indicate a web server default page (not institution content).
|
||
# When detected, extraction should be skipped to avoid attributing hosting
|
||
# provider social links (e.g., Plesk's YouTube channel) to institutions.
|
||
|
||
DEFAULT_PAGE_INDICATORS = [
|
||
# Plesk (common hosting control panel)
|
||
"Web Server's Default Page",
|
||
"Congratulations! Your Plesk is working",
|
||
"This page is used to test the proper operation of",
|
||
"web hosting platform",
|
||
# cPanel
|
||
"Great success! You've configured your",
|
||
"Default Web Site Page",
|
||
"cPanel, Inc.", # More specific than just "cPanel"
|
||
# DirectAdmin
|
||
"DirectAdmin default page",
|
||
# Apache
|
||
"Apache2 Ubuntu Default Page",
|
||
"Apache2 Debian Default Page",
|
||
"If you can read this page", # Apache default
|
||
# nginx
|
||
"Welcome to nginx!",
|
||
"If you see this page, the nginx web server is successfully installed",
|
||
# IIS
|
||
"Internet Information Services",
|
||
"IIS Windows Server",
|
||
# Generic hosting defaults
|
||
"Website Coming Soon",
|
||
"Under Construction",
|
||
"Parked Domain",
|
||
"This domain is parked",
|
||
"Domain Parking",
|
||
"This site is parked free",
|
||
# Dutch equivalents
|
||
"Website binnenkort beschikbaar",
|
||
"In aanbouw",
|
||
"Domein geparkeerd",
|
||
]
|
||
|
||
# YouTube channels known to belong to hosting providers (not institutions)
|
||
# These should NEVER be attributed to heritage institutions
|
||
BLOCKED_YOUTUBE_CHANNELS = {
|
||
# Plesk
|
||
"UCeU-_6YHGQFcVSHLbEXLNlA", # Plesk official channel
|
||
"plesk",
|
||
"@plesk",
|
||
# cPanel
|
||
"UCDGXoXJxAFYAGhXN7r62wvA", # cPanel official
|
||
"cpanel",
|
||
"@cpanel",
|
||
# Other hosting providers (add as discovered)
|
||
}
|
||
|
||
|
||
# =============================================================================
|
||
# Z.AI API CLIENT FOR NER (Anthropic-compatible endpoint for GLM Coding Plan)
|
||
# =============================================================================
|
||
|
||
# z.ai API configuration - Using Anthropic-compatible endpoint for GLM Coding Plan
|
||
# See: https://docs.z.ai/devpack/quick-start
|
||
ZAI_API_URL = "https://api.z.ai/api/anthropic/v1/messages"
|
||
ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "")
|
||
ZAI_MODEL = "glm-4.6" # Default model (z.ai's latest via Anthropic endpoint)
|
||
|
||
# NER extraction enabled flag
|
||
NER_ENABLED = bool(ZAI_API_TOKEN and HAS_HTTPX)
|
||
|
||
# Global flags for processing modes (set via CLI args)
|
||
FAST_MODE = False # Skip email NER, use pattern matching only
|
||
SKIP_VALIDATION = False # Skip LLM claim validation
|
||
|
||
|
||
def call_zai_api(
|
||
prompt: str,
|
||
system_prompt: Optional[str] = None,
|
||
model: str = ZAI_MODEL,
|
||
max_tokens: int = 1024,
|
||
temperature: float = 0.0,
|
||
) -> Optional[str]:
|
||
"""
|
||
Call z.ai API for NER extraction using Anthropic-compatible endpoint.
|
||
|
||
The GLM Coding Plan uses an Anthropic-compatible endpoint, which is different
|
||
from the standard z.ai OpenAI-style API. This function uses the Anthropic
|
||
message format as documented at https://docs.z.ai/devpack/tool/goose
|
||
|
||
Args:
|
||
prompt: The user prompt with context to analyze
|
||
system_prompt: Optional system instructions
|
||
model: Model to use (default: glm-4.6)
|
||
max_tokens: Maximum response tokens
|
||
temperature: Sampling temperature (0.0 for deterministic)
|
||
|
||
Returns:
|
||
Response text or None if failed
|
||
"""
|
||
if not ZAI_API_TOKEN:
|
||
return None
|
||
|
||
if not HAS_HTTPX:
|
||
return None
|
||
|
||
# Anthropic message format: system is separate, messages are user/assistant
|
||
messages = [{"role": "user", "content": prompt}]
|
||
|
||
payload = {
|
||
"model": model,
|
||
"max_tokens": max_tokens,
|
||
"messages": messages,
|
||
}
|
||
|
||
# Add system prompt as top-level field (Anthropic style)
|
||
if system_prompt:
|
||
payload["system"] = system_prompt
|
||
|
||
# Only add temperature if non-zero (some models may not support it)
|
||
if temperature > 0:
|
||
payload["temperature"] = temperature
|
||
|
||
# Anthropic-style headers: x-api-key instead of Authorization Bearer
|
||
headers = {
|
||
"x-api-key": ZAI_API_TOKEN,
|
||
"Content-Type": "application/json",
|
||
"anthropic-version": "2023-06-01", # Anthropic API version
|
||
}
|
||
|
||
# Retry logic for transient failures
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = httpx.post(
|
||
ZAI_API_URL,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=60.0, # Increased timeout for stability
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
break # Success, exit retry loop
|
||
except (httpx.ReadTimeout, httpx.ConnectTimeout) as e:
|
||
if attempt < max_retries - 1:
|
||
import time
|
||
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
|
||
print(f"z.ai API timeout, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})...", file=sys.stderr)
|
||
time.sleep(wait_time)
|
||
continue
|
||
print(f"z.ai API timeout after {max_retries} attempts: {e}", file=sys.stderr)
|
||
return None
|
||
except httpx.HTTPStatusError as e:
|
||
print(f"z.ai API HTTP error: {e.response.status_code} - {e.response.text}", file=sys.stderr)
|
||
return None
|
||
except Exception as e:
|
||
print(f"z.ai API error: {e}", file=sys.stderr)
|
||
return None
|
||
else:
|
||
return None # All retries exhausted
|
||
|
||
# Anthropic response format: content is a list of content blocks
|
||
if "content" in result and len(result["content"]) > 0:
|
||
# Get text from the first text block
|
||
for block in result["content"]:
|
||
if block.get("type") == "text":
|
||
return block.get("text")
|
||
# Fallback: return first block's text if type not specified
|
||
return result["content"][0].get("text")
|
||
return None
|
||
|
||
|
||
# =============================================================================
|
||
# NER SYSTEM PROMPT - Gado2 v1.5.0 Annotation Convention
|
||
# =============================================================================
|
||
#
|
||
# This NER extraction uses:
|
||
# - MODEL: GLM-4.6 (via z.ai Anthropic-compatible endpoint)
|
||
# - CONVENTION: Gado2 v1.5.0 (Golden Agents Data Annotations)
|
||
# - ONTOLOGIES: PiCO (Persons in Context Ontology), PNV (Person Name Vocabulary)
|
||
#
|
||
# Gado2 v1.5.0 Reference: https://github.com/knaw-huc/golden-agents-htr
|
||
# PiCO Ontology: https://data.goldenagents.org/ontology/pico/
|
||
# PNV Ontology: https://w3id.org/pnv
|
||
# =============================================================================
|
||
|
||
NER_CONVENTION_VERSION = "Gado2 v1.6.0-unified"
|
||
NER_MODEL = ZAI_MODEL # GLM-4.6
|
||
|
||
NER_SYSTEM_PROMPT = f"""You are an expert Named Entity Recognition (NER) system for Dutch heritage institution contact information.
|
||
|
||
=== ANNOTATION CONVENTION: {NER_CONVENTION_VERSION} ===
|
||
This extraction follows the Gado2 v1.5.0 annotation guidelines from the Golden Agents project,
|
||
combined with PiCO (Persons in Context Ontology) and PNV (Person Name Vocabulary) standards.
|
||
|
||
=== ENTITY TYPES AND ONTOLOGY MAPPING ===
|
||
|
||
1. PERSON NAMES (PNV - Person Name Vocabulary, https://w3id.org/pnv):
|
||
- pnv:literalName → Full name as written (e.g., "Jan van der Berg")
|
||
- pnv:givenName → First/given name (e.g., "Jan", "Maria", "Pieter")
|
||
- pnv:surnamePrefix → Dutch surname prefixes (e.g., "van", "de", "van der", "ter", "ten")
|
||
- pnv:baseSurname → Base surname without prefix (e.g., "Berg", "Vries", "Groot")
|
||
|
||
2. DENOMINATIONS (Gado2 v1.5.0 DENOMINATION category):
|
||
- DENOMINATION/PROF → Job title, profession, occupation
|
||
Maps to: rico:Position, schema:Occupation
|
||
Examples: voorzitter, secretaris, archivaris, bibliothecaris, conservator
|
||
- DENOMINATION/TITLERANK → Honorific title or rank
|
||
Maps to: rico:Title
|
||
Examples: dr., prof., ir., mr., drs., ing.
|
||
|
||
3. ROLES (PiCO - Persons in Context Ontology):
|
||
- picom:Role → Functional role in organizational context
|
||
Examples: contactpersoon, coördinator, beheerder, medewerker
|
||
- picom:PersonObservation → Observation of a person in a specific context
|
||
|
||
4. ORGANIZATIONAL UNITS (RiC-O - Records in Contexts Ontology):
|
||
- rico:CorporateBody → Department or organizational unit
|
||
Examples: bestuur, redactie, beeldbank, archief, bibliotheek
|
||
|
||
=== DUTCH HERITAGE INSTITUTION CONTEXT ===
|
||
|
||
Common Dutch job titles in heritage institutions:
|
||
- Board: voorzitter (chair), secretaris (secretary), penningmeester (treasurer), bestuurslid (board member)
|
||
- Editorial: redacteur (editor), hoofdredacteur (editor-in-chief), redactie (editorial board)
|
||
- Technical: webmaster, ICT-medewerker, beheerder (administrator)
|
||
- Collections: archivaris (archivist), bibliothecaris (librarian), conservator (curator)
|
||
- Contact: contactpersoon (contact person), coördinator (coordinator), medewerker (staff member)
|
||
|
||
Dutch surname prefixes (always lowercase, attached to surname):
|
||
van, de, het, den, der, ter, ten, van de, van der, van den, van het, in 't, op de, op 't
|
||
|
||
=== OUTPUT FORMAT ===
|
||
Return valid JSON only. No markdown code blocks. No explanatory text.
|
||
|
||
{{
|
||
"persons": [
|
||
{{
|
||
"full_name": "string or null - pnv:literalName",
|
||
"given_name": "string or null - pnv:givenName",
|
||
"surname_prefix": "string or null - pnv:surnamePrefix",
|
||
"base_surname": "string or null - pnv:baseSurname",
|
||
"job_title": "string or null - DENOMINATION/PROF",
|
||
"job_title_en": "string or null - English translation",
|
||
"title_rank": "string or null - DENOMINATION/TITLERANK",
|
||
"department": "string or null - rico:CorporateBody",
|
||
"department_en": "string or null - English translation",
|
||
"role": "string or null - picom:Role",
|
||
"email": "associated email address if identifiable"
|
||
}}
|
||
],
|
||
"confidence": 0.0-1.0,
|
||
"convention": "{NER_CONVENTION_VERSION}"
|
||
}}
|
||
|
||
If no person/role information is found, return: {{"persons": [], "confidence": 1.0, "convention": "{NER_CONVENTION_VERSION}"}}"""
|
||
|
||
|
||
def extract_ner_from_context(
|
||
context_text: str,
|
||
email: str,
|
||
html_context: Optional[str] = None,
|
||
verbose: bool = False,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Extract person/role NER from context around an email address.
|
||
|
||
Uses GLM-4.6 model via z.ai Anthropic-compatible endpoint with
|
||
Gado2 v1.5.0 annotation convention for Dutch heritage institutions.
|
||
|
||
Args:
|
||
context_text: Plain text context around the email
|
||
email: The email address being contextualized
|
||
html_context: Optional raw HTML context for additional signals
|
||
verbose: If True, log model and convention info
|
||
|
||
Returns:
|
||
Dict with extracted entities or None if NER failed/unavailable
|
||
Includes 'ner_model' and 'ner_convention' metadata fields
|
||
"""
|
||
if not NER_ENABLED:
|
||
return None
|
||
|
||
if verbose:
|
||
print(f" [NER] Using model: {NER_MODEL}", file=sys.stderr)
|
||
print(f" [NER] Convention: {NER_CONVENTION_VERSION}", file=sys.stderr)
|
||
|
||
# Build prompt with context
|
||
prompt = f"""Extract person and role information from this Dutch heritage institution contact context.
|
||
|
||
EMAIL: {email}
|
||
|
||
CONTEXT TEXT:
|
||
{context_text}
|
||
|
||
{f"HTML CONTEXT:{chr(10)}{html_context[:500]}" if html_context else ""}
|
||
|
||
Extract any person names, job titles, roles, or departments associated with this email contact.
|
||
Follow the {NER_CONVENTION_VERSION} annotation guidelines provided in the system prompt.
|
||
Return JSON only."""
|
||
|
||
response = call_zai_api(prompt, system_prompt=NER_SYSTEM_PROMPT, model=NER_MODEL)
|
||
|
||
if not response:
|
||
return None
|
||
|
||
# Parse JSON response
|
||
try:
|
||
# Handle potential markdown code blocks
|
||
if response.startswith("```"):
|
||
# Extract JSON from code block
|
||
lines = response.split("\n")
|
||
json_lines = []
|
||
in_block = False
|
||
for line in lines:
|
||
if line.startswith("```"):
|
||
in_block = not in_block
|
||
continue
|
||
if in_block:
|
||
json_lines.append(line)
|
||
response = "\n".join(json_lines)
|
||
|
||
result = json.loads(response)
|
||
|
||
# Add NER metadata to result
|
||
result['ner_model'] = NER_MODEL
|
||
result['ner_convention'] = NER_CONVENTION_VERSION
|
||
|
||
return result
|
||
except json.JSONDecodeError:
|
||
return None
|
||
|
||
|
||
# =============================================================================
|
||
# CLAIM VALIDATION SYSTEM - Gado2 v1.6.0-unified Compliance Check
|
||
# =============================================================================
|
||
#
|
||
# This validation uses GLM-4.6 to check if extracted claims are valid according
|
||
# to the Gado2 v1.6.0-unified annotation convention. This unified convention
|
||
# handles BOTH Early Modern Dutch texts AND modern web content through a
|
||
# multi-domain architecture with source_domains (EARLY_MODERN_TEXT, MODERN_WEB).
|
||
#
|
||
# For web content extraction, the WEB_EXC001-007 exclusion rules apply.
|
||
#
|
||
# Convention file: docs/convention/schema/20251202/entity_annotation_rules_v1.6.0_unified.yaml
|
||
# =============================================================================
|
||
|
||
VALIDATION_SYSTEM_PROMPT = f"""You are an expert claim validator for heritage institution web data extraction.
|
||
|
||
=== ANNOTATION CONVENTION: {NER_CONVENTION_VERSION} ===
|
||
|
||
You validate extracted org_name claims from MODERN DUTCH HERITAGE INSTITUTION WEBSITES
|
||
against the Gado2 v1.6.0-unified annotation guidelines.
|
||
|
||
This is a UNIFIED convention that handles both Early Modern Dutch texts and modern web content.
|
||
For web extraction, apply the MODERN_WEB source domain rules.
|
||
|
||
=== ORGANISATION ENTITY DEFINITION (v1.6.0) ===
|
||
|
||
Entity Type: ORGANISATION (ORG) / HERINST (Heritage Institution subcategory)
|
||
Description: Organizations including heritage institutions (museums, archives, libraries,
|
||
historical societies), companies, governments, branches, associations, legislative bodies,
|
||
political parties, military forces, sports teams, meetings, bands, religious orders, and ships.
|
||
|
||
Ontology Classes: rico:CorporateBody, rico:Group, crm:E74_Group, schema:Organization
|
||
|
||
=== HERITAGE INSTITUTION SUBCATEGORIES (HERINST - valid org_name) ===
|
||
|
||
HERINST/MUSEUM: "Rijksmuseum", "Amsterdam Museum", "Stedelijk Museum", "Smalspoormuseum"
|
||
HERINST/ARCHIVE: "Nationaal Archief", "Gemeentearchief", "Stadsarchief Rotterdam"
|
||
HERINST/LIBRARY: "Koninklijke Bibliotheek", "Universiteitsbibliotheek", "OBA"
|
||
HERINST/HISTSOC: "Historische Vereniging Nijeveen", "Heemkundige Kring De Goede Stede"
|
||
HERINST/RESEARCH: "NIOD", "Huygens Instituut", "Fryske Akademy"
|
||
HERINST/FOUNDATION: "Stichting Erfgoed", "Hidde Nijland Stichting"
|
||
|
||
=== OTHER ORGANISATION SUBCATEGORIES (from v1.5.0-ontology-pico) ===
|
||
|
||
COMP (Companies): "Philips", "ING", "Shell"
|
||
BRANCH (Branches): "ING Rotterdam", "Rekenkamer Gemeente Rotterdam"
|
||
ASSOC (Associations): "NVM", "de vakbond"
|
||
PUBFAC (Public Facilities): "Middelbare school", "Technische Universiteit Delft"
|
||
AUTH (Authorities): "Ministerie van Financiën", "Raad voor Aangelegenheden"
|
||
INTORG (International Orgs): "Verenigde Naties", "Europese Unie"
|
||
|
||
=== WEB INCLUSION RULES (WEB_INC) - v1.6.0-unified ===
|
||
|
||
WEB_INC001: Tag heritage institution names with specific identifiers
|
||
- VALID: "Nationaal Archief" (specific name)
|
||
- VALID: "Historische Vereniging Nijeveen" (place-qualified)
|
||
- VALID: "Smalspoormuseum" (distinctive compound)
|
||
|
||
WEB_INC002: Tag organization names in structured data (schema.org)
|
||
- VALID: Names from JSON-LD schema:Organization blocks
|
||
|
||
WEB_INC003: Tag organization names with legal form indicators
|
||
- VALID: "Stichting Openbare Bibliotheek" (Stichting = foundation)
|
||
- VALID: "Vereniging Oud-Haarlem" (Vereniging = association)
|
||
|
||
=== WEB EXCLUSION RULES (WEB_EXC) - v1.6.0-unified - CRITICAL ===
|
||
|
||
WEB_EXC001: Do NOT tag navigation menu items
|
||
- INVALID: "Home", "Menu", "Contact", "Contact opnemen", "Over ons"
|
||
- INVALID: "Nieuws", "Zoeken", "Welkom", "Informatie", "Terug", "Volgende"
|
||
- semantic_category: navigation
|
||
- Rationale: UI chrome, not organization identifiers
|
||
|
||
WEB_EXC002: Do NOT tag call-to-action buttons/links
|
||
- INVALID: "Lees meer", "Meer lezen", "Bekijk", "Download", "Bestel"
|
||
- INVALID: "Word lid", "Meld je aan", "Subscribe", "Doneer"
|
||
- semantic_category: cta
|
||
- Rationale: Interactive UI elements, not organization names
|
||
|
||
WEB_EXC003: Do NOT tag social media platform names
|
||
- INVALID: "Facebook", "Twitter", "Instagram", "LinkedIn", "YouTube", "X"
|
||
- INVALID: "TikTok", "Pinterest", "Flickr", "Vimeo"
|
||
- semantic_category: social_media
|
||
- Rationale: Third-party platforms, not the heritage institution itself
|
||
|
||
WEB_EXC004: Do NOT tag CMS placeholder/boilerplate text
|
||
- INVALID: "Hello world!", "Lorem ipsum", "Sample Page", "Just another WordPress site"
|
||
- INVALID: "Colofon", "Powered by WordPress", "Theme by..."
|
||
- semantic_category: cms_default
|
||
- Rationale: Template artifacts, not meaningful institution names
|
||
|
||
WEB_EXC005: Do NOT tag legal/policy page titles
|
||
- INVALID: "Privacy Policy", "Privacyverklaring", "Disclaimer", "Cookie Policy"
|
||
- INVALID: "Algemene voorwaarden", "Terms of Service", "ANBI"
|
||
- semantic_category: legal_boilerplate
|
||
- Rationale: Standard legal pages, not organization identifiers
|
||
|
||
WEB_EXC006: Do NOT tag web functionality labels
|
||
- INVALID: "Login", "Logout", "Inloggen", "Winkelwagen", "Cart", "Search"
|
||
- INVALID: "Sitemap", "RSS", "Print", "Share", "Delen"
|
||
- semantic_category: web_functionality
|
||
- Rationale: Web application UI, not organization names
|
||
|
||
WEB_EXC007: Do NOT tag generic single words without institution context
|
||
- INVALID: "Archief", "Museum", "Bibliotheek" (standalone)
|
||
- INVALID: "Collectie", "Expositie", "Tentoonstelling" (standalone)
|
||
- VALID: "Nationaal Archief", "Smalspoormuseum" (with qualifier)
|
||
- semantic_category: generic_word
|
||
- Rationale: Category labels need qualifying words to be institution names
|
||
|
||
=== BASE ORG EXCLUSION RULES (ORG_EXC from v1.5.0) ===
|
||
|
||
ORG_EXC001: Strip articles from organization names
|
||
- "de Tweede Kamer" → "Tweede Kamer"
|
||
|
||
ORG_EXC002: Don't tag abbreviations separately
|
||
- "Nederlandse Vereniging van Makelaars (NVM)" → tag full name only
|
||
|
||
ORG_EXC003: Don't tag generic group references
|
||
- INVALID: "De jongerenbeweging" (= DENOMINATION, not ORG)
|
||
|
||
=== VALIDATION RESPONSE FORMAT ===
|
||
|
||
Provide your FULL REASONING as provenance. This reasoning IS the validation evidence.
|
||
|
||
Return ONLY valid JSON (no markdown code blocks):
|
||
|
||
{{
|
||
"is_valid": true/false,
|
||
"reasoning": "Your complete analysis: What is this text? Why is it (in)valid? Which v1.6.0 convention rules apply? What semantic category does it belong to?",
|
||
"convention_rules": ["WEB_EXC001", "WEB_EXC003"] or ["WEB_INC001"] or [] if no specific rules,
|
||
"semantic_category": "navigation|cta|social_media|cms_default|legal_boilerplate|web_functionality|generic_word|heritage_institution|other",
|
||
"confidence": 0.0-1.0
|
||
}}
|
||
|
||
IMPORTANT: Your "reasoning" field is stored as provenance. Be thorough and explicit.
|
||
Reference specific v1.6.0-unified rule IDs (WEB_EXC001-007, WEB_INC001-003, ORG_EXC001-003) in your analysis.
|
||
"""
|
||
|
||
# Cache for validation results (to avoid repeated API calls)
|
||
_validation_cache: Dict[str, Dict[str, Any]] = {}
|
||
|
||
|
||
def validate_claim_with_llm(
|
||
claim_type: str,
|
||
claim_value: str,
|
||
extraction_method: str,
|
||
source_context: Optional[str] = None,
|
||
verbose: bool = False,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Validate a claim using GLM-4.6 against Gado2 v1.5.0 convention.
|
||
|
||
Args:
|
||
claim_type: Type of claim (org_name, description, etc.)
|
||
claim_value: The extracted value to validate
|
||
extraction_method: How the claim was extracted (h1_tag, title_tag, etc.)
|
||
source_context: Optional surrounding HTML/text context
|
||
verbose: If True, log validation details
|
||
|
||
Returns:
|
||
Dict with validation result:
|
||
- is_valid: bool
|
||
- reason: str explanation
|
||
- convention_rule: str or None
|
||
- confidence: float 0.0-1.0
|
||
"""
|
||
# Only validate certain claim types that are prone to errors
|
||
VALIDATE_CLAIM_TYPES = {'org_name', 'org_name_alt', 'tagline', 'description_short'}
|
||
|
||
if claim_type not in VALIDATE_CLAIM_TYPES:
|
||
return {'is_valid': True, 'reason': 'Claim type not subject to LLM validation', 'convention_rule': None, 'confidence': 1.0}
|
||
|
||
# Skip validation if NER/API is not available
|
||
if not NER_ENABLED:
|
||
return {'is_valid': True, 'reason': 'LLM validation unavailable (no API token)', 'convention_rule': None, 'confidence': 0.5}
|
||
|
||
# Check cache
|
||
cache_key = f"{claim_type}:{claim_value}:{extraction_method}"
|
||
if cache_key in _validation_cache:
|
||
if verbose:
|
||
print(f" [VALIDATE] Cache hit for {claim_value[:30]}", file=sys.stderr)
|
||
return _validation_cache[cache_key]
|
||
|
||
if verbose:
|
||
print(f" [VALIDATE] Checking claim: {claim_type}={claim_value[:50]}", file=sys.stderr)
|
||
|
||
# Build validation prompt
|
||
prompt = f"""Validate this extracted claim from a Dutch heritage institution website:
|
||
|
||
CLAIM TYPE: {claim_type}
|
||
CLAIM VALUE: "{claim_value}"
|
||
EXTRACTION METHOD: {extraction_method}
|
||
{f"SOURCE CONTEXT: {source_context[:500]}" if source_context else ""}
|
||
|
||
Is this a valid {claim_type} according to Gado2 v1.5.0 convention?
|
||
|
||
For org_name claims: Is this the actual name of a heritage institution (museum, archive, library, historical society, etc.) or is it generic page text (navigation, headings, UI elements)?
|
||
|
||
Return JSON only."""
|
||
|
||
response = call_zai_api(prompt, system_prompt=VALIDATION_SYSTEM_PROMPT, model=NER_MODEL)
|
||
|
||
# Default result if API fails
|
||
default_result = {'is_valid': True, 'reason': 'API validation unavailable', 'convention_rule': None, 'confidence': 0.5}
|
||
|
||
if not response:
|
||
_validation_cache[cache_key] = default_result
|
||
return default_result
|
||
|
||
# Parse JSON response
|
||
try:
|
||
# Handle potential markdown code blocks
|
||
if response.startswith("```"):
|
||
lines = response.split("\n")
|
||
json_lines = []
|
||
in_block = False
|
||
for line in lines:
|
||
if line.startswith("```"):
|
||
in_block = not in_block
|
||
continue
|
||
if in_block:
|
||
json_lines.append(line)
|
||
response = "\n".join(json_lines)
|
||
|
||
result = json.loads(response)
|
||
|
||
# Ensure required fields with new format (reasoning, convention_rules)
|
||
result.setdefault('is_valid', True)
|
||
result.setdefault('confidence', 0.5)
|
||
|
||
# Handle both old format (reason/convention_rule) and new (reasoning/convention_rules)
|
||
if 'reasoning' in result:
|
||
result['reason'] = result['reasoning'] # Alias for compatibility
|
||
else:
|
||
result.setdefault('reason', 'Unknown')
|
||
result['reasoning'] = result['reason']
|
||
|
||
if 'convention_rules' in result:
|
||
# Store full list, and keep first for backward compatibility
|
||
result['convention_rule'] = result['convention_rules'][0] if result['convention_rules'] else None
|
||
else:
|
||
result.setdefault('convention_rule', None)
|
||
result['convention_rules'] = [result['convention_rule']] if result['convention_rule'] else []
|
||
|
||
result.setdefault('semantic_category', 'other')
|
||
|
||
# Cache the result
|
||
_validation_cache[cache_key] = result
|
||
|
||
if verbose:
|
||
status = "VALID" if result['is_valid'] else "INVALID"
|
||
reasoning_preview = result['reasoning'][:100] + "..." if len(result['reasoning']) > 100 else result['reasoning']
|
||
print(f" [VALIDATE] {status}: {reasoning_preview}", file=sys.stderr)
|
||
|
||
return result
|
||
|
||
except json.JSONDecodeError:
|
||
_validation_cache[cache_key] = default_result
|
||
return default_result
|
||
|
||
|
||
def filter_claims_with_validation(
|
||
claims: List[Dict],
|
||
verbose: bool = False,
|
||
) -> Tuple[List[Dict], List[Dict]]:
|
||
"""
|
||
Filter claims using LLM validation.
|
||
|
||
Args:
|
||
claims: List of claim dicts to validate
|
||
verbose: If True, log validation progress
|
||
|
||
Returns:
|
||
Tuple of (valid_claims, invalid_claims)
|
||
"""
|
||
valid_claims = []
|
||
invalid_claims = []
|
||
|
||
for claim in claims:
|
||
claim_type = claim.get('claim_type', '')
|
||
claim_value = claim.get('claim_value', '')
|
||
extraction_method = claim.get('extraction_method', '')
|
||
|
||
validation = validate_claim_with_llm(
|
||
claim_type=claim_type,
|
||
claim_value=claim_value,
|
||
extraction_method=extraction_method,
|
||
verbose=verbose,
|
||
)
|
||
|
||
if validation['is_valid']:
|
||
valid_claims.append(claim)
|
||
else:
|
||
# Store FULL validation provenance for audit (the reasoning IS the provenance)
|
||
claim['validation_provenance'] = {
|
||
'reasoning': validation.get('reasoning', validation.get('reason', 'Unknown')),
|
||
'convention_rules': validation.get('convention_rules', []),
|
||
'semantic_category': validation.get('semantic_category', 'other'),
|
||
'confidence': validation.get('confidence', 0.5),
|
||
'model': NER_MODEL,
|
||
'convention_version': NER_CONVENTION_VERSION,
|
||
}
|
||
# Keep backward-compatible fields
|
||
claim['validation_reason'] = validation.get('reasoning', validation.get('reason', 'Unknown'))
|
||
claim['validation_rule'] = validation.get('convention_rule')
|
||
invalid_claims.append(claim)
|
||
|
||
return valid_claims, invalid_claims
|
||
|
||
|
||
# Pattern-based role extraction (fallback when NER API unavailable)
|
||
# Dutch job titles/roles commonly found in heritage institution contacts
|
||
DUTCH_ROLE_PATTERNS = {
|
||
# Board positions
|
||
r'\bvoorzitter\b': {'job_title': 'voorzitter', 'job_title_en': 'chairperson'},
|
||
r'\bsecretaris\b': {'job_title': 'secretaris', 'job_title_en': 'secretary'},
|
||
r'\bsecretariaat\b': {'job_title': 'secretaris', 'job_title_en': 'secretary'},
|
||
r'\bpenningmeester\b': {'job_title': 'penningmeester', 'job_title_en': 'treasurer'},
|
||
r'\bbestuur\b': {'job_title': 'bestuurslid', 'job_title_en': 'board member'},
|
||
r'\bbestuurslid\b': {'job_title': 'bestuurslid', 'job_title_en': 'board member'},
|
||
# Editorial/content
|
||
r'\bredactie\b': {'job_title': 'redacteur', 'job_title_en': 'editor', 'department': 'redactie'},
|
||
r'\bredacteur\b': {'job_title': 'redacteur', 'job_title_en': 'editor'},
|
||
r'\bhoofdredacteur\b': {'job_title': 'hoofdredacteur', 'job_title_en': 'editor-in-chief'},
|
||
# Technical/digital
|
||
r'\bwebmaster\b': {'job_title': 'webmaster', 'job_title_en': 'webmaster'},
|
||
r'\bwebmast\b': {'job_title': 'webmaster', 'job_title_en': 'webmaster'},
|
||
r'\bict\b': {'job_title': 'ICT-medewerker', 'job_title_en': 'IT staff'},
|
||
r'\bbeheerder\b': {'job_title': 'beheerder', 'job_title_en': 'administrator'},
|
||
# Collections
|
||
r'\barchivaris\b': {'job_title': 'archivaris', 'job_title_en': 'archivist'},
|
||
r'\bbibliothecaris\b': {'job_title': 'bibliothecaris', 'job_title_en': 'librarian'},
|
||
r'\bconservator\b': {'job_title': 'conservator', 'job_title_en': 'curator'},
|
||
r'\bcurator\b': {'job_title': 'curator', 'job_title_en': 'curator'},
|
||
r'\bcollectiebeheer\b': {'job_title': 'collectiebeheerder', 'job_title_en': 'collection manager'},
|
||
# General
|
||
r'\bdirecteur\b': {'job_title': 'directeur', 'job_title_en': 'director'},
|
||
r'\bcoördinator\b': {'job_title': 'coördinator', 'job_title_en': 'coordinator'},
|
||
r'\bmedewerker\b': {'job_title': 'medewerker', 'job_title_en': 'staff member'},
|
||
r'\bvrijwilliger\b': {'job_title': 'vrijwilliger', 'job_title_en': 'volunteer'},
|
||
# Departments/sections
|
||
r'\bbeeldbank\b': {'department': 'beeldbank', 'department_en': 'image archive'},
|
||
r'\bdocumentenbank\b': {'department': 'documentenbank', 'department_en': 'document archive'},
|
||
r'\bvoorwerpen\b': {'department': 'voorwerpenbank', 'department_en': 'object collection'},
|
||
}
|
||
|
||
|
||
def extract_role_from_context_pattern(
|
||
context_text: str,
|
||
email: str,
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Extract role/job title information using pattern matching.
|
||
|
||
This is a fallback when the LLM-based NER is unavailable.
|
||
Works well for structured Dutch heritage institution contacts.
|
||
|
||
Args:
|
||
context_text: Plain text context around the email (e.g., "- de voorzitter:")
|
||
email: The email address for context
|
||
|
||
Returns:
|
||
Dict with extracted role info or None if no patterns match
|
||
|
||
Examples:
|
||
>>> extract_role_from_context_pattern("- de voorzitter:", "voorzitter@example.nl")
|
||
{'job_title': 'voorzitter', 'job_title_en': 'chairperson', 'confidence': 0.9}
|
||
|
||
>>> extract_role_from_context_pattern("- de webmaster van de beeldbank:", "info@beeldbank.nl")
|
||
{'job_title': 'webmaster', 'job_title_en': 'webmaster', 'department': 'beeldbank', ...}
|
||
"""
|
||
if not context_text:
|
||
return None
|
||
|
||
context_lower = context_text.lower()
|
||
result = {}
|
||
confidence = 0.0
|
||
|
||
# Check each pattern
|
||
for pattern, info in DUTCH_ROLE_PATTERNS.items():
|
||
if re.search(pattern, context_lower, re.IGNORECASE):
|
||
result.update(info)
|
||
# Higher confidence for more specific matches
|
||
if 'job_title' in info:
|
||
confidence = max(confidence, 0.85)
|
||
if 'department' in info:
|
||
confidence = max(confidence, 0.8)
|
||
|
||
# Also check email prefix for role hints (e.g., voorzitter@, secretariaat@)
|
||
email_prefix = email.split('@')[0].lower() if '@' in email else ''
|
||
for pattern, info in DUTCH_ROLE_PATTERNS.items():
|
||
# Check if email prefix matches a role pattern
|
||
clean_pattern = pattern.replace(r'\b', '')
|
||
if re.search(clean_pattern, email_prefix):
|
||
# Merge info but don't overwrite existing
|
||
for k, v in info.items():
|
||
if k not in result:
|
||
result[k] = v
|
||
confidence = max(confidence, 0.9) # High confidence when email matches
|
||
|
||
if not result:
|
||
return None
|
||
|
||
result['confidence'] = confidence
|
||
result['extraction_method'] = 'pattern_matching'
|
||
return result
|
||
|
||
|
||
def get_xpath_lxml(element) -> str:
|
||
"""Generate absolute XPath for an lxml element."""
|
||
tree = element.getroottree()
|
||
return tree.getpath(element)
|
||
|
||
|
||
def get_xpath_bs4(element) -> str:
|
||
"""Generate XPath for a BeautifulSoup element."""
|
||
parts = []
|
||
current = element
|
||
while current and current.name:
|
||
siblings = [s for s in current.find_previous_siblings(current.name)]
|
||
index = len(siblings) + 1
|
||
parts.insert(0, f"{current.name}[{index}]")
|
||
current = current.parent
|
||
return '/' + '/'.join(parts) if parts else '/'
|
||
|
||
|
||
def get_institution_domain(entry_data: dict) -> Optional[str]:
|
||
"""
|
||
Extract the institution's primary domain from entry data.
|
||
|
||
Used to determine if email addresses belong to the institution
|
||
or are external (e.g., partner organizations, village associations).
|
||
|
||
Args:
|
||
entry_data: The entry dict loaded from YAML
|
||
|
||
Returns:
|
||
Normalized domain (e.g., 'dewolden.nl') or None if not found
|
||
|
||
Example:
|
||
>>> get_institution_domain({'original_entry': {'webadres_organisatie': 'https://www.dewolden.nl/'}})
|
||
'dewolden.nl'
|
||
"""
|
||
# Priority order for finding institution URL
|
||
url_sources = [
|
||
# Primary: original_entry.webadres_organisatie (from NDE CSV)
|
||
entry_data.get('original_entry', {}).get('webadres_organisatie', ''),
|
||
# Fallback: web_enrichment source URL
|
||
entry_data.get('web_enrichment', {}).get('web_archives', [{}])[0].get('url', ''),
|
||
# Fallback: any website identifier
|
||
*[ident.get('identifier_value', '') for ident in entry_data.get('identifiers', [])
|
||
if ident.get('identifier_scheme') == 'website'],
|
||
]
|
||
|
||
for url in url_sources:
|
||
if not url:
|
||
continue
|
||
try:
|
||
parsed = urlparse(url)
|
||
domain = parsed.netloc.lower()
|
||
# Remove www. prefix for comparison
|
||
if domain.startswith('www.'):
|
||
domain = domain[4:]
|
||
if domain:
|
||
return domain
|
||
except Exception:
|
||
continue
|
||
|
||
return None
|
||
|
||
|
||
def is_email_external(email: str, institution_domain: Optional[str]) -> bool:
|
||
"""
|
||
Check if an email address belongs to an external organization.
|
||
|
||
Args:
|
||
email: Email address to check
|
||
institution_domain: The institution's primary domain (e.g., 'dewolden.nl')
|
||
|
||
Returns:
|
||
True if email is external, False if internal or domain unknown
|
||
|
||
Examples:
|
||
>>> is_email_external('gemeente@dewolden.nl', 'dewolden.nl')
|
||
False
|
||
>>> is_email_external('info@stichtingoco.nl', 'dewolden.nl')
|
||
True
|
||
>>> is_email_external('info@example.com', None) # Unknown institution domain
|
||
False
|
||
"""
|
||
if not institution_domain or '@' not in email:
|
||
return False # Can't determine, assume internal
|
||
|
||
email_domain = email.split('@')[1].lower()
|
||
# Remove www. prefix if present
|
||
if email_domain.startswith('www.'):
|
||
email_domain = email_domain[4:]
|
||
|
||
# Check if email domain matches institution domain (or is a subdomain)
|
||
return not (email_domain == institution_domain or email_domain.endswith('.' + institution_domain))
|
||
|
||
|
||
def create_claim(
|
||
claim_type: str,
|
||
claim_value: str,
|
||
xpath: str,
|
||
html_file: str,
|
||
source_url: str,
|
||
retrieved_on: str,
|
||
raw_value: Optional[str] = None,
|
||
extraction_method: str = 'html_parser',
|
||
xpath_match_score: float = 1.0,
|
||
**extra_fields,
|
||
) -> Dict[str, Any]:
|
||
"""Create a properly structured claim with full provenance.
|
||
|
||
Args:
|
||
claim_type: Type of claim (email, phone, org_name, etc.)
|
||
claim_value: The extracted value
|
||
xpath: XPath to the element containing this value
|
||
html_file: Relative path to archived HTML file
|
||
source_url: URL the claim was extracted from
|
||
retrieved_on: ISO 8601 timestamp when page was archived
|
||
raw_value: Original unprocessed value (optional)
|
||
extraction_method: Method used to extract (e.g., 'mailto_link', 'schema_org')
|
||
xpath_match_score: 1.0 for exact match, <1.0 for fuzzy
|
||
**extra_fields: Additional metadata fields (e.g., is_external, email_domain)
|
||
|
||
Returns:
|
||
Dict with full claim structure and provenance
|
||
"""
|
||
claim = {
|
||
'claim_type': claim_type,
|
||
'claim_value': claim_value.strip() if claim_value else '',
|
||
'raw_value': raw_value or claim_value,
|
||
'source_url': source_url,
|
||
'retrieved_on': retrieved_on,
|
||
'xpath': xpath,
|
||
'html_file': html_file,
|
||
'xpath_match_score': xpath_match_score,
|
||
'extraction_method': extraction_method,
|
||
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
# Add any extra fields (e.g., is_external for emails)
|
||
claim.update(extra_fields)
|
||
return claim
|
||
|
||
|
||
# === Extractors for specific claim types ===
|
||
|
||
def extract_title_claims(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract organization name from <title> tag."""
|
||
claims = []
|
||
titles = tree.xpath('//title')
|
||
for title in titles:
|
||
if title.text:
|
||
raw_text = title.text.strip()
|
||
# Try to extract clean org name (before separator)
|
||
separators = [' - ', ' | ', ' – ', ' — ', ': ']
|
||
clean_name = raw_text
|
||
for sep in separators:
|
||
if sep in raw_text:
|
||
parts = raw_text.split(sep)
|
||
# Usually the org name is first or last
|
||
clean_name = parts[0].strip()
|
||
break
|
||
|
||
claims.append(create_claim(
|
||
claim_type='org_name',
|
||
claim_value=clean_name,
|
||
xpath=get_xpath_lxml(title),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=raw_text,
|
||
extraction_method='title_tag',
|
||
))
|
||
return claims
|
||
|
||
|
||
def extract_meta_description(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract description from meta tags."""
|
||
claims = []
|
||
|
||
# Standard meta description
|
||
metas = tree.xpath('//meta[@name="description"]/@content')
|
||
meta_elements = tree.xpath('//meta[@name="description"]')
|
||
for i, content in enumerate(metas):
|
||
if content and content.strip():
|
||
claims.append(create_claim(
|
||
claim_type='description_short',
|
||
claim_value=content.strip(),
|
||
xpath=get_xpath_lxml(meta_elements[i]) if i < len(meta_elements) else '//meta[@name="description"]',
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='meta_description',
|
||
))
|
||
|
||
# OpenGraph description
|
||
og_desc = tree.xpath('//meta[@property="og:description"]/@content')
|
||
og_elements = tree.xpath('//meta[@property="og:description"]')
|
||
for i, content in enumerate(og_desc):
|
||
if content and content.strip():
|
||
claims.append(create_claim(
|
||
claim_type='description_short',
|
||
claim_value=content.strip(),
|
||
xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:description"]',
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='og_description',
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_og_site_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract organization name from og:site_name."""
|
||
claims = []
|
||
og_names = tree.xpath('//meta[@property="og:site_name"]/@content')
|
||
og_elements = tree.xpath('//meta[@property="og:site_name"]')
|
||
for i, content in enumerate(og_names):
|
||
if content and content.strip():
|
||
claims.append(create_claim(
|
||
claim_type='org_name',
|
||
claim_value=content.strip(),
|
||
xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:site_name"]',
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='og_site_name',
|
||
))
|
||
return claims
|
||
|
||
|
||
def extract_schema_org(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract data from schema.org JSON-LD."""
|
||
claims = []
|
||
import json
|
||
|
||
scripts = tree.xpath('//script[@type="application/ld+json"]')
|
||
for script in scripts:
|
||
if script.text:
|
||
try:
|
||
data = json.loads(script.text)
|
||
if isinstance(data, list):
|
||
for item in data:
|
||
claims.extend(_extract_schema_item(item, get_xpath_lxml(script), html_file, source_url, retrieved_on))
|
||
else:
|
||
claims.extend(_extract_schema_item(data, get_xpath_lxml(script), html_file, source_url, retrieved_on))
|
||
except json.JSONDecodeError:
|
||
pass
|
||
return claims
|
||
|
||
|
||
def _classify_youtube_url_inline(url: str) -> str:
|
||
"""
|
||
Inline YouTube URL classifier for use before classify_youtube_url is defined.
|
||
|
||
Returns 'social_youtube_channel' for channel URLs, 'social_youtube_video' otherwise.
|
||
"""
|
||
# Channel URL patterns - these ARE official channel links
|
||
channel_patterns = [
|
||
'/@', # Handle format: /@username
|
||
'/channel/UC', # Channel ID format: /channel/UCxxxxx
|
||
'/user/', # Legacy user format: /user/username
|
||
'/c/', # Custom URL format: /c/customname
|
||
]
|
||
|
||
for pattern in channel_patterns:
|
||
if pattern in url:
|
||
return 'social_youtube_channel'
|
||
|
||
# Everything else (watch?v=, youtu.be/, shorts/, etc.) is a video
|
||
return 'social_youtube_video'
|
||
|
||
|
||
def _extract_schema_item(item: dict, xpath: str, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract claims from a schema.org item."""
|
||
claims = []
|
||
|
||
# Get the @type to distinguish organizations from events
|
||
item_type = item.get('@type', '')
|
||
if isinstance(item_type, list):
|
||
item_type = item_type[0] if item_type else ''
|
||
|
||
# Organization types that should have org_name extracted
|
||
org_types = {
|
||
'Organization', 'LocalBusiness', 'Museum', 'Library', 'Archive',
|
||
'EducationalOrganization', 'GovernmentOrganization', 'NGO',
|
||
'Corporation', 'Place', 'CivicStructure', 'LandmarksOrHistoricalBuildings',
|
||
'PerformingArtsTheater', 'MovieTheater', 'Zoo', 'Aquarium',
|
||
}
|
||
|
||
# Event types - extract as event_name, not org_name
|
||
event_types = {'Event', 'BusinessEvent', 'ChildrensEvent', 'ComedyEvent',
|
||
'CourseInstance', 'DanceEvent', 'DeliveryEvent', 'EducationEvent',
|
||
'EventSeries', 'ExhibitionEvent', 'Festival', 'FoodEvent',
|
||
'Hackathon', 'LiteraryEvent', 'MusicEvent', 'PublicationEvent',
|
||
'SaleEvent', 'ScreeningEvent', 'SocialEvent', 'SportsEvent',
|
||
'TheaterEvent', 'VisualArtsEvent'}
|
||
|
||
is_org = any(t in item_type for t in org_types) or not item_type
|
||
is_event = any(t in item_type for t in event_types)
|
||
|
||
# Organization name - only for org types or if @type is missing
|
||
if 'name' in item and is_org and not is_event:
|
||
name_value = item['name']
|
||
# Skip if it looks like HTML/code
|
||
if name_value and '<' not in name_value and len(name_value) < 200:
|
||
claims.append(create_claim(
|
||
claim_type='org_name',
|
||
claim_value=name_value,
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_name',
|
||
))
|
||
|
||
# Description - only for organizations, skip HTML/code
|
||
if 'description' in item and is_org and not is_event:
|
||
desc_value = item['description']
|
||
# Skip if it looks like HTML/code
|
||
if desc_value and '<' not in desc_value and 'vc_row' not in desc_value:
|
||
claims.append(create_claim(
|
||
claim_type='description',
|
||
claim_value=desc_value,
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_description',
|
||
))
|
||
|
||
# Address
|
||
if 'address' in item:
|
||
addr = item['address']
|
||
if isinstance(addr, str):
|
||
claims.append(create_claim(
|
||
claim_type='address',
|
||
claim_value=addr,
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_address',
|
||
))
|
||
elif isinstance(addr, dict):
|
||
if 'streetAddress' in addr:
|
||
claims.append(create_claim(
|
||
claim_type='address',
|
||
claim_value=addr['streetAddress'],
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_streetAddress',
|
||
))
|
||
if 'postalCode' in addr:
|
||
claims.append(create_claim(
|
||
claim_type='postal_code',
|
||
claim_value=addr['postalCode'],
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_postalCode',
|
||
))
|
||
if 'addressLocality' in addr:
|
||
claims.append(create_claim(
|
||
claim_type='city',
|
||
claim_value=addr['addressLocality'],
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_addressLocality',
|
||
))
|
||
|
||
# Phone
|
||
if 'telephone' in item:
|
||
claims.append(create_claim(
|
||
claim_type='phone',
|
||
claim_value=item['telephone'],
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_telephone',
|
||
))
|
||
|
||
# Email
|
||
if 'email' in item:
|
||
claims.append(create_claim(
|
||
claim_type='email',
|
||
claim_value=item['email'],
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_org_email',
|
||
))
|
||
|
||
# Social media
|
||
if 'sameAs' in item:
|
||
same_as = item['sameAs'] if isinstance(item['sameAs'], list) else [item['sameAs']]
|
||
for url in same_as:
|
||
if 'twitter.com' in url or 'x.com' in url:
|
||
claims.append(create_claim(claim_type='social_twitter', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
|
||
elif 'facebook.com' in url:
|
||
claims.append(create_claim(claim_type='social_facebook', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
|
||
elif 'instagram.com' in url:
|
||
claims.append(create_claim(claim_type='social_instagram', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
|
||
elif 'linkedin.com' in url:
|
||
claims.append(create_claim(claim_type='social_linkedin', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
|
||
elif 'youtube.com' in url or 'youtu.be' in url:
|
||
# Classify YouTube URLs as channel vs video
|
||
youtube_type = _classify_youtube_url_inline(url)
|
||
claims.append(create_claim(claim_type=youtube_type, claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_email_context(link) -> dict:
|
||
"""Extract contextual information around an email mailto link.
|
||
|
||
Looks for:
|
||
- Role/title text before the email (e.g., "Sportmedewerkers:", "Contact:")
|
||
- Organization name for external emails (e.g., "Plaatselijk Belang Alteveer")
|
||
- Person name if present
|
||
|
||
Args:
|
||
link: lxml element for the mailto link
|
||
|
||
Returns:
|
||
Dict with optional context fields:
|
||
- label: The descriptive label before the email
|
||
- organization_context: Organization name if this is a contact listing
|
||
- person_name: Person name if detectable
|
||
"""
|
||
# Words that are NOT useful as labels (common prepositions, conjunctions, etc.)
|
||
SKIP_WORDS = {
|
||
# Dutch
|
||
'of', 'en', 'via', 'naar', 'per', 'op', 'bij', 'aan', 'met', 'voor',
|
||
'door', 'om', 'dan', 'als', 'maar', 'want', 'dus', 'toch', 'nog',
|
||
'mail', 'e-mail', 'email',
|
||
# English
|
||
'or', 'and', 'via', 'to', 'at', 'by', 'for', 'with', 'from',
|
||
'the', 'a', 'an',
|
||
}
|
||
|
||
def is_valid_label(text: str) -> bool:
|
||
"""Check if text is a meaningful label (not just a common word)."""
|
||
if not text:
|
||
return False
|
||
text_lower = text.lower().strip()
|
||
# Skip if it's a single common word
|
||
if text_lower in SKIP_WORDS:
|
||
return False
|
||
# Skip if it's too short (less than 3 chars)
|
||
if len(text_lower) < 3:
|
||
return False
|
||
return True
|
||
|
||
context: dict = {
|
||
'label': None,
|
||
'organization_context': None,
|
||
}
|
||
|
||
# Strategy 1: Check if in a <li> element and get text before the link
|
||
parent = link.getparent()
|
||
if parent is not None and parent.tag == 'li':
|
||
# Get all text in the li before the link
|
||
li_text = parent.text or ''
|
||
# Also check for text in child elements before the link
|
||
for child in parent:
|
||
if child == link:
|
||
break
|
||
child_text = child.text or ''
|
||
child_tail = child.tail or ''
|
||
li_text += child_text + child_tail
|
||
|
||
li_text = li_text.strip()
|
||
# Clean up: remove trailing colon, nbsp, etc.
|
||
li_text = li_text.rstrip(':').rstrip('\xa0').strip()
|
||
if is_valid_label(li_text):
|
||
context['label'] = li_text
|
||
# If this looks like an organization name (title case, multiple words)
|
||
if any(c.isupper() for c in li_text) and len(li_text.split()) >= 2:
|
||
context['organization_context'] = li_text
|
||
|
||
# Strategy 2: Check immediate preceding sibling text
|
||
if not context['label']:
|
||
prev = link.getprevious()
|
||
if prev is not None and prev.tail:
|
||
tail_text = prev.tail.strip().rstrip(':').rstrip('\xa0').strip()
|
||
if is_valid_label(tail_text):
|
||
context['label'] = tail_text
|
||
elif parent is not None and parent.text:
|
||
# Text directly in parent before this element
|
||
parent_text = parent.text.strip().rstrip(':').rstrip('\xa0').strip()
|
||
if is_valid_label(parent_text):
|
||
context['label'] = parent_text
|
||
|
||
# Strategy 3: Check for label in a sibling <strong>, <b>, or <span>
|
||
if not context['label'] and parent is not None:
|
||
for sibling in parent:
|
||
if sibling == link:
|
||
break
|
||
if sibling.tag in ('strong', 'b', 'span', 'label'):
|
||
sib_text = (sibling.text or '').strip().rstrip(':').strip()
|
||
if is_valid_label(sib_text):
|
||
context['label'] = sib_text
|
||
break
|
||
|
||
return context
|
||
|
||
|
||
def get_broader_context(link, max_chars: int = 500) -> Tuple[str, str]:
|
||
"""
|
||
Get broader text and HTML context around an element for NER extraction.
|
||
|
||
Walks up the DOM tree to find meaningful context (paragraphs, list items,
|
||
divs, sections) and extracts text content.
|
||
|
||
Args:
|
||
link: lxml element
|
||
max_chars: Maximum characters to extract
|
||
|
||
Returns:
|
||
Tuple of (plain_text_context, html_context)
|
||
"""
|
||
# Find a meaningful parent container
|
||
container_tags = {'p', 'li', 'div', 'td', 'section', 'article', 'aside', 'header', 'footer', 'address'}
|
||
current = link.getparent()
|
||
container = None
|
||
|
||
# Walk up to find a good container (max 5 levels)
|
||
for _ in range(5):
|
||
if current is None:
|
||
break
|
||
if current.tag in container_tags:
|
||
container = current
|
||
# For small containers like <li>, try to get parent
|
||
text_len = len(etree.tostring(current, method='text', encoding='unicode') or '')
|
||
if text_len < 100 and current.tag in {'li', 'td'}:
|
||
parent = current.getparent()
|
||
if parent is not None and parent.tag in container_tags:
|
||
container = parent
|
||
break
|
||
current = current.getparent()
|
||
|
||
if container is None:
|
||
container = link.getparent()
|
||
|
||
if container is None:
|
||
return "", ""
|
||
|
||
# Get text content
|
||
text_content = etree.tostring(container, method='text', encoding='unicode') or ''
|
||
text_content = ' '.join(text_content.split())[:max_chars]
|
||
|
||
# Get HTML content (for additional signals like element names, classes)
|
||
try:
|
||
html_content = etree.tostring(container, encoding='unicode')[:max_chars]
|
||
except Exception:
|
||
html_content = ""
|
||
|
||
return text_content, html_content
|
||
|
||
|
||
def extract_email_links(tree, html_file: str, source_url: str, retrieved_on: str,
|
||
institution_domain: Optional[str] = None,
|
||
enable_ner: bool = True) -> List[Dict]:
|
||
"""Extract email addresses from mailto: links with optional NER for person/role extraction.
|
||
|
||
Args:
|
||
tree: Parsed lxml HTML tree
|
||
html_file: Relative path to HTML file
|
||
source_url: URL where page was fetched from
|
||
retrieved_on: ISO timestamp of archival
|
||
institution_domain: Primary domain of the institution (e.g., 'dewolden.nl')
|
||
Used to determine if emails are internal or external.
|
||
enable_ner: Whether to run NER extraction for person names/roles (default: True)
|
||
|
||
Returns:
|
||
List of claims including:
|
||
- email claims with context
|
||
- person_name, job_title, department, role claims (if NER enabled and successful)
|
||
"""
|
||
claims = []
|
||
mailto_links = tree.xpath('//a[starts-with(@href, "mailto:")]')
|
||
|
||
for link in mailto_links:
|
||
href = link.get('href', '')
|
||
if href.startswith('mailto:'):
|
||
email = href[7:].split('?')[0] # Remove query params
|
||
if email and '@' in email:
|
||
email_domain = email.split('@')[1].lower()
|
||
external = is_email_external(email, institution_domain)
|
||
email_xpath = get_xpath_lxml(link)
|
||
|
||
# Extract context around the email link
|
||
context = extract_email_context(link)
|
||
|
||
# Create base email claim
|
||
claims.append(create_claim(
|
||
claim_type='email',
|
||
claim_value=email,
|
||
xpath=email_xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='mailto_link',
|
||
# Additional metadata for email classification
|
||
is_external=external,
|
||
email_domain=email_domain,
|
||
# Context information
|
||
context_label=context.get('label'),
|
||
context_organization=context.get('organization_context'),
|
||
))
|
||
|
||
# Run NER/pattern extraction if enabled
|
||
if enable_ner:
|
||
text_context, html_context = get_broader_context(link)
|
||
ner_result = None
|
||
extraction_method = None
|
||
|
||
# Try LLM-based NER first (if API available and not in fast mode)
|
||
if NER_ENABLED and text_context and not FAST_MODE:
|
||
ner_result = extract_ner_from_context(
|
||
context_text=text_context,
|
||
email=email,
|
||
html_context=html_context,
|
||
)
|
||
# Include model and convention in extraction_method
|
||
extraction_method = f'ner_zai_{NER_MODEL}_{NER_CONVENTION_VERSION.replace(" ", "_")}'
|
||
|
||
# Fallback to pattern-based extraction
|
||
if not ner_result or not ner_result.get('persons'):
|
||
# Use context_label if available, otherwise use broader text context
|
||
pattern_context = context.get('label') or text_context or ''
|
||
pattern_result = extract_role_from_context_pattern(
|
||
context_text=pattern_context,
|
||
email=email,
|
||
)
|
||
|
||
if pattern_result:
|
||
# Convert pattern result to NER-like format
|
||
ner_result = {
|
||
'persons': [{
|
||
'job_title': pattern_result.get('job_title'),
|
||
'job_title_en': pattern_result.get('job_title_en'),
|
||
'department': pattern_result.get('department'),
|
||
'department_en': pattern_result.get('department_en'),
|
||
}],
|
||
'confidence': pattern_result.get('confidence', 0.8),
|
||
}
|
||
extraction_method = 'pattern_matching'
|
||
|
||
# Process NER results (from either source)
|
||
if ner_result and ner_result.get('persons'):
|
||
# Get model/convention from NER result if available
|
||
ner_model = ner_result.get('ner_model', NER_MODEL)
|
||
ner_convention = ner_result.get('ner_convention', NER_CONVENTION_VERSION)
|
||
|
||
# Ensure extraction_method has a default
|
||
if not extraction_method:
|
||
extraction_method = f'ner_zai_{NER_MODEL}_{NER_CONVENTION_VERSION.replace(" ", "_")}'
|
||
|
||
# Create claims for each extracted person
|
||
for person in ner_result['persons']:
|
||
confidence = ner_result.get('confidence', 0.8)
|
||
|
||
# Person name claim (full name)
|
||
if person.get('full_name'):
|
||
claims.append(create_claim(
|
||
claim_type='person_name',
|
||
claim_value=person['full_name'],
|
||
xpath=email_xpath, # XPath of associated email
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method=extraction_method,
|
||
xpath_match_score=confidence,
|
||
# Associated email for linking
|
||
associated_email=email,
|
||
# Name components (PNV aligned)
|
||
given_name=person.get('given_name'),
|
||
surname_prefix=person.get('surname_prefix'),
|
||
base_surname=person.get('base_surname'),
|
||
# NER provenance
|
||
ner_model=ner_model,
|
||
ner_convention=ner_convention,
|
||
))
|
||
|
||
# Job title claim
|
||
if person.get('job_title'):
|
||
claims.append(create_claim(
|
||
claim_type='job_title',
|
||
claim_value=person['job_title'],
|
||
xpath=email_xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method=extraction_method,
|
||
xpath_match_score=confidence,
|
||
associated_email=email,
|
||
associated_person=person.get('full_name'),
|
||
# English translation if available
|
||
job_title_en=person.get('job_title_en'),
|
||
# NER provenance
|
||
ner_model=ner_model,
|
||
ner_convention=ner_convention,
|
||
))
|
||
|
||
# Title/rank claim
|
||
if person.get('title_rank'):
|
||
claims.append(create_claim(
|
||
claim_type='title_rank',
|
||
claim_value=person['title_rank'],
|
||
xpath=email_xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method=extraction_method,
|
||
xpath_match_score=confidence,
|
||
associated_email=email,
|
||
associated_person=person.get('full_name'),
|
||
# NER provenance
|
||
ner_model=ner_model,
|
||
ner_convention=ner_convention,
|
||
))
|
||
|
||
# Department claim
|
||
if person.get('department'):
|
||
claims.append(create_claim(
|
||
claim_type='department',
|
||
claim_value=person['department'],
|
||
xpath=email_xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method=extraction_method,
|
||
xpath_match_score=confidence,
|
||
associated_email=email,
|
||
associated_person=person.get('full_name'),
|
||
# English translation if available
|
||
department_en=person.get('department_en'),
|
||
# NER provenance
|
||
ner_model=ner_model,
|
||
ner_convention=ner_convention,
|
||
))
|
||
|
||
# Role claim
|
||
if person.get('role'):
|
||
claims.append(create_claim(
|
||
claim_type='role',
|
||
claim_value=person['role'],
|
||
xpath=email_xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method=extraction_method,
|
||
xpath_match_score=confidence,
|
||
associated_email=email,
|
||
associated_person=person.get('full_name'),
|
||
# NER provenance
|
||
ner_model=ner_model,
|
||
ner_convention=ner_convention,
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_phone_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract phone numbers from tel: links."""
|
||
claims = []
|
||
tel_links = tree.xpath('//a[starts-with(@href, "tel:")]')
|
||
for link in tel_links:
|
||
href = link.get('href', '')
|
||
if href.startswith('tel:'):
|
||
phone = href[4:]
|
||
if phone:
|
||
claims.append(create_claim(
|
||
claim_type='phone',
|
||
claim_value=phone,
|
||
xpath=get_xpath_lxml(link),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='tel_link',
|
||
))
|
||
return claims
|
||
|
||
|
||
def classify_youtube_url(url: str) -> str:
|
||
"""
|
||
Classify a YouTube URL as either a channel link or a video link.
|
||
|
||
CRITICAL: This distinction prevents wrong YouTube channel assignment!
|
||
|
||
A video link pointing to content ABOUT an institution is NOT the institution's
|
||
official channel. For example, a news report about Fryske Akademy hosted on
|
||
NOS Jeugdjournaal's channel should NOT be classified as Fryske Akademy's YouTube.
|
||
|
||
Returns:
|
||
'social_youtube_channel' - Official channel URLs (/@handle, /channel/, /user/, /c/)
|
||
'social_youtube_video' - Individual video URLs (watch?v=, youtu.be/, /shorts/)
|
||
"""
|
||
import re
|
||
|
||
# Channel URL patterns - these ARE official channel links
|
||
channel_patterns = [
|
||
r'youtube\.com/channel/UC[^/?&]+', # Channel ID format: /channel/UCxxxxx
|
||
r'youtube\.com/user/[^/?&]+', # Legacy user format: /user/username
|
||
r'youtube\.com/c/[^/?&]+', # Custom URL format: /c/customname
|
||
r'youtube\.com/@[^/?&]+', # Handle format: /@username (modern format)
|
||
]
|
||
|
||
for pattern in channel_patterns:
|
||
if re.search(pattern, url):
|
||
return 'social_youtube_channel'
|
||
|
||
# Video URL patterns - these are NOT official channels
|
||
video_patterns = [
|
||
r'youtube\.com/watch\?v=', # Standard video URL
|
||
r'youtu\.be/', # Short video URL
|
||
r'youtube\.com/shorts/', # Shorts video URL
|
||
r'youtube\.com/live/', # Live stream URL
|
||
r'youtube\.com/embed/', # Embed URL
|
||
r'youtube-nocookie\.com/embed/', # Privacy-enhanced embed
|
||
]
|
||
|
||
for pattern in video_patterns:
|
||
if re.search(pattern, url):
|
||
return 'social_youtube_video'
|
||
|
||
# Playlist URLs - also not direct channel links
|
||
if 'youtube.com/playlist' in url:
|
||
return 'social_youtube_video' # Treat playlists like videos (not channel)
|
||
|
||
# Default: if it's youtube.com but doesn't match channel patterns,
|
||
# treat it as potentially a video or unknown (safer to not assume channel)
|
||
return 'social_youtube_video'
|
||
|
||
|
||
def extract_social_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract social media links.
|
||
|
||
IMPORTANT:
|
||
- Filters out share/intent URLs which are NOT actual profiles.
|
||
- Distinguishes YouTube channel links from video links to prevent
|
||
incorrect channel attribution (e.g., news video ABOUT an institution
|
||
being mistaken for the institution's official channel).
|
||
"""
|
||
claims = []
|
||
social_patterns = {
|
||
'social_twitter': ['twitter.com', 'x.com'],
|
||
'social_facebook': ['facebook.com'],
|
||
'social_instagram': ['instagram.com'],
|
||
'social_linkedin': ['linkedin.com'],
|
||
# NOTE: YouTube handled separately with classify_youtube_url()
|
||
'social_tiktok': ['tiktok.com'],
|
||
'social_pinterest': ['pinterest.com', 'pinterest.nl'],
|
||
}
|
||
|
||
# Share URL patterns to EXCLUDE (not actual profiles)
|
||
share_patterns = [
|
||
'/sharer', '/share', '/intent/',
|
||
'shareArticle', '/pin/create',
|
||
'/submit', 'addthis.com', 'sharethis.com',
|
||
# Pinterest pin URLs (not profile pages)
|
||
'/pin/',
|
||
]
|
||
|
||
for link in tree.xpath('//a[@href]'):
|
||
href = link.get('href', '')
|
||
|
||
# Skip share/intent URLs
|
||
if any(pattern in href for pattern in share_patterns):
|
||
continue
|
||
|
||
# Handle YouTube URLs specially - classify as channel vs video
|
||
if 'youtube.com' in href or 'youtu.be' in href:
|
||
# Check if this is a blocked hosting provider channel
|
||
is_blocked = False
|
||
for blocked_id in BLOCKED_YOUTUBE_CHANNELS:
|
||
if blocked_id in href:
|
||
print(f" ⚠️ BLOCKED YouTube channel: {blocked_id} in {href}")
|
||
is_blocked = True
|
||
break
|
||
|
||
if is_blocked:
|
||
continue # Skip this link entirely
|
||
|
||
youtube_claim_type = classify_youtube_url(href)
|
||
claims.append(create_claim(
|
||
claim_type=youtube_claim_type,
|
||
claim_value=href,
|
||
xpath=get_xpath_lxml(link),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='social_link',
|
||
))
|
||
continue
|
||
|
||
# Handle other social platforms
|
||
for claim_type, domains in social_patterns.items():
|
||
for domain in domains:
|
||
if domain in href:
|
||
claims.append(create_claim(
|
||
claim_type=claim_type,
|
||
claim_value=href,
|
||
xpath=get_xpath_lxml(link),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='social_link',
|
||
))
|
||
break
|
||
return claims
|
||
|
||
|
||
def extract_h1_org_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract organization name from first h1.
|
||
|
||
IMPORTANT: Filters out generic UI text that is not an organization name.
|
||
Updated 2025-12-02 based on Gado2 v1.5.0 convention analysis.
|
||
"""
|
||
claims = []
|
||
|
||
# Generic UI text that should NOT be org names
|
||
# Based on analysis of 40,149 org_name claims across 1,630 entries
|
||
# Following Gado2 v1.5.0 ORGANISATION exclusion rules (ORG_EXC001-006)
|
||
INVALID_ORG_NAMES = {
|
||
# Navigation (Dutch + English)
|
||
'Home', 'home', 'HOME', 'Menu', 'menu', 'Contact', 'contact', 'Contact us',
|
||
'Over ons', 'About us', 'Nieuws', 'News', 'Zoeken', 'Search', 'Terug', 'Back',
|
||
'Volgende', 'Next', 'Vorige', 'Previous', 'Close', 'Sluiten',
|
||
# Section headers (Dutch)
|
||
'Welkom', 'Informatie', 'Homepage', 'Startpagina', 'Algemeen',
|
||
'Collectie', 'Collection', 'Agenda', 'Activiteiten', 'Activities',
|
||
'Vacatures', 'Organisatie', 'Nieuwsbrief', 'Newsletter', 'Bestuur',
|
||
'Publicaties', 'Publications', 'Openingstijden', 'Opening hours',
|
||
'Geschiedenis', 'History', 'Educatie', 'Education',
|
||
'Vrijwilligers', 'Volunteers', 'Tentoonstellingen', 'Exhibitions',
|
||
'Boeken', 'Books', 'Winkel', 'Shop', 'Werkgroepen', 'Genealogie',
|
||
'Exposities', 'Archief', 'Archive', 'Collecties', 'Collections',
|
||
'Jeugd', 'Youth', 'Onderwijs', 'Lidmaatschap', 'Membership',
|
||
'Jaarverslagen', 'Annual reports', 'Historie', 'Arrangementen',
|
||
'Rondleidingen', 'Tours', 'Partners', 'Actueel', 'Current',
|
||
'Tickets', 'Projecten', 'Projects', 'Contactformulier',
|
||
'Webshop', 'Vrienden', 'Friends', 'Pers', 'Press', 'Bezoek', 'Visit',
|
||
'Contactgegevens', 'Contact details', 'Bezoekersinformatie',
|
||
'Visitor information', 'Scholen', 'Schools', 'Medewerkers', 'Staff',
|
||
'Lezingen', 'Lectures', 'Groepsbezoek', 'Group visits',
|
||
'Expositie', 'Exhibition', 'Evenementen', 'Events',
|
||
'Donateurs', 'Donors', 'Colofon', 'Imprint', 'Links',
|
||
'Bibliotheek', 'Library', 'Museumwinkel', 'Museum shop',
|
||
'Beeldbank', 'Image bank', 'Archieven', 'Archives',
|
||
'Nieuwsbrieven', 'Newsletters', 'Sponsors', 'Sponsoren',
|
||
'Museum', 'Archeologie', 'Archaeology', 'Artikelen', 'Articles',
|
||
'Bereikbaarheid', 'Accessibility', 'Groepen', 'Groups',
|
||
# CTA / Call-to-action
|
||
'Lees meer', 'Meer lezen', 'Read more', 'Bekijk', 'View',
|
||
'Download', 'Steun ons', 'Support us', 'Lid worden', 'Become member',
|
||
'Word vrijwilliger', 'Become volunteer', 'Doneer', 'Donate',
|
||
'Plan je bezoek', 'Plan your visit', 'Word Vriend', 'Become Friend',
|
||
'Vrijwilliger worden', 'Schrijf je in', 'Sign up', 'Aanmelden',
|
||
# UI elements / Social
|
||
'Facebook', 'Twitter', 'Instagram', 'LinkedIn', 'YouTube', 'X',
|
||
'Zoekresultaten', 'Search results', 'Winkelwagen', 'Shopping cart',
|
||
'Sitemap', 'Login', 'Logout', 'Inloggen', 'Uitloggen',
|
||
'Chevron left', 'Chevron right', 'Arrow left', 'Arrow right',
|
||
'Eye', 'Share', 'Delen', 'Print', 'Oproep',
|
||
'Opent in externe pagina', 'Opens in new window',
|
||
'Loading...', 'Laden...', 'Wachtwoord kwijt', 'Forgot password',
|
||
# Legal / Policy pages
|
||
'ANBI', 'Privacyverklaring', 'Privacy statement', 'Disclaimer',
|
||
'Privacybeleid', 'Privacy policy', 'Cookies', 'Cookie policy',
|
||
'Algemene voorwaarden', 'Terms and conditions', 'Huisregels',
|
||
'House rules', 'Toegankelijkheid', 'Privacy verklaring', 'Privacy',
|
||
'Datenschutzerklärung', 'Impressum',
|
||
# FAQ / Generic
|
||
'Veelgestelde vragen', 'FAQ', 'Over het museum', 'About the museum',
|
||
'Verplicht', 'Required', 'Uncategorized', 'Geen categorie',
|
||
'admin', 'Het museum', 'The museum', 'Praktische informatie',
|
||
'Practical information', 'Tarieven', 'Rates', 'Toegangsprijzen',
|
||
'Admission', 'Werken bij', 'Work with us', 'Nu te zien', 'Now showing',
|
||
'English', 'Nederlands', 'Deutsch', 'Français',
|
||
# Content types / categories
|
||
'Natuur & Dieren', 'Nature & Animals', 'Kunst & Cultuur', 'Art & Culture',
|
||
'Sport', 'Koken & Eten', 'Food & Cooking', 'Biografie & Waargebeurd',
|
||
'Economie & Management', 'Spiritualiteit & Filosofie', 'Romantiek',
|
||
'Films', 'Podcasts', 'Video', 'Audio', 'Foto', "Foto's", 'Photos',
|
||
'Duurzaamheid', 'Sustainability', 'Schenkingen', 'Donations',
|
||
'Voortgezet onderwijs', 'Secondary education', 'Verhalen', 'Stories',
|
||
'Catalogus', 'Catalogue', 'Vaste collectie', 'Permanent collection',
|
||
'Kinderfeestje', "Children's party", 'Kinderactiviteiten',
|
||
"Children's activities", 'Museumcafé', 'Museum café',
|
||
'Online leren', 'Online learning', 'Gesproken boeken', 'Audiobooks',
|
||
'Spanning', 'Thriller', 'Bidprentjes', 'Prayer cards',
|
||
'Wetenschappelijke boeken lenen', 'Borrow scientific books',
|
||
'BoekStart', 'Grootletterboeken', 'Large print books',
|
||
'Engelse boeken', 'English books', 'Meer boeken', 'More books',
|
||
'Boeken op onderwerp', 'Books by subject',
|
||
'Informatiepunt Digitale Overheid', 'Hulp in de Bibliotheek',
|
||
'Ontdekken & Onderzoeken', 'Gezin & Gezondheid', 'Family & Health',
|
||
# Events
|
||
'Open Monumentendag', 'Monument Day', 'Evenementen in',
|
||
# Generic single words
|
||
'Doel', 'Goal', 'Boek', 'Book', 'Kaart', 'Map', 'Film',
|
||
'Wie zijn wij?', 'Who are we?',
|
||
# Technical/placeholder
|
||
'Gemeentearchief', # Generic term, not specific org name
|
||
}
|
||
|
||
# Patterns that indicate invalid org names (regex-like)
|
||
INVALID_PATTERNS = [
|
||
# Month patterns
|
||
'januari', 'februari', 'maart', 'april', 'mei', 'juni',
|
||
'juli', 'augustus', 'september', 'oktober', 'november', 'december',
|
||
# UI/icon references
|
||
'-svg', '-icoon', '-icon', 'icon-', 'svg-',
|
||
# Scroll/navigation
|
||
'scroll naar', 'scroll to',
|
||
# Archive patterns
|
||
'archieven', # when not exact "Archieven"
|
||
# Link labels
|
||
'externe-link',
|
||
]
|
||
|
||
h1s = tree.xpath('//h1')
|
||
if h1s:
|
||
h1 = h1s[0]
|
||
text = ''.join(h1.itertext()).strip()
|
||
|
||
# Filter out invalid org names
|
||
if text and len(text) > 2 and len(text) < 150:
|
||
# Check exact match (case-insensitive)
|
||
if text in INVALID_ORG_NAMES or text.lower() in {v.lower() for v in INVALID_ORG_NAMES}:
|
||
return claims
|
||
|
||
# Check patterns
|
||
text_lower = text.lower()
|
||
if any(pattern in text_lower for pattern in INVALID_PATTERNS):
|
||
return claims
|
||
|
||
# Check if it's just a year (e.g., "2023")
|
||
if text.isdigit() and len(text) == 4:
|
||
return claims
|
||
|
||
# Check if starts with "Evenementen in" (events calendar)
|
||
if text_lower.startswith('evenementen in '):
|
||
return claims
|
||
|
||
# Check for "News Archives" patterns
|
||
if 'nieuws archieven' in text_lower:
|
||
return claims
|
||
|
||
claims.append(create_claim(
|
||
claim_type='org_name',
|
||
claim_value=text,
|
||
xpath=get_xpath_lxml(h1),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='h1_tag',
|
||
xpath_match_score=0.9, # Slightly lower confidence
|
||
))
|
||
return claims
|
||
|
||
|
||
def extract_youtube_embeds(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract YouTube video embeds from iframes.
|
||
|
||
Finds:
|
||
- youtube.com/embed/VIDEO_ID
|
||
- youtube-nocookie.com/embed/VIDEO_ID
|
||
- youtu.be/VIDEO_ID (in data attributes)
|
||
"""
|
||
claims = []
|
||
|
||
# Standard YouTube iframes
|
||
youtube_iframes = tree.xpath(
|
||
'//iframe[contains(@src, "youtube.com/embed/") or contains(@src, "youtube-nocookie.com/embed/")]'
|
||
)
|
||
|
||
for iframe in youtube_iframes:
|
||
src = iframe.get('src', '')
|
||
# Extract video ID from URL
|
||
video_id_match = re.search(r'embed/([a-zA-Z0-9_-]{11})', src)
|
||
if video_id_match:
|
||
video_id = video_id_match.group(1)
|
||
claims.append(create_claim(
|
||
claim_type='video_youtube',
|
||
claim_value=f'https://www.youtube.com/watch?v={video_id}',
|
||
xpath=get_xpath_lxml(iframe),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=src,
|
||
extraction_method='youtube_iframe',
|
||
))
|
||
|
||
# Also check for YouTube links in data attributes (lazy-loaded videos)
|
||
youtube_data_attrs = tree.xpath(
|
||
'//*[@data-video-url[contains(., "youtube")] or @data-src[contains(., "youtube")]]'
|
||
)
|
||
for elem in youtube_data_attrs:
|
||
for attr in ['data-video-url', 'data-src', 'data-video-id']:
|
||
value = elem.get(attr, '')
|
||
if 'youtube' in value.lower():
|
||
video_id_match = re.search(r'(?:embed/|v=|youtu\.be/)([a-zA-Z0-9_-]{11})', value)
|
||
if video_id_match:
|
||
video_id = video_id_match.group(1)
|
||
claims.append(create_claim(
|
||
claim_type='video_youtube',
|
||
claim_value=f'https://www.youtube.com/watch?v={video_id}',
|
||
xpath=get_xpath_lxml(elem),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=value,
|
||
extraction_method='youtube_data_attr',
|
||
))
|
||
elif attr == 'data-video-id' and value and len(value) == 11:
|
||
# Direct video ID in data attribute
|
||
claims.append(create_claim(
|
||
claim_type='video_youtube',
|
||
claim_value=f'https://www.youtube.com/watch?v={value}',
|
||
xpath=get_xpath_lxml(elem),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=value,
|
||
extraction_method='youtube_video_id_attr',
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_vimeo_embeds(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract Vimeo video embeds from iframes.
|
||
|
||
Finds:
|
||
- player.vimeo.com/video/VIDEO_ID
|
||
- vimeo.com/VIDEO_ID (in data attributes)
|
||
"""
|
||
claims = []
|
||
|
||
# Standard Vimeo iframes
|
||
vimeo_iframes = tree.xpath(
|
||
'//iframe[contains(@src, "vimeo.com")]'
|
||
)
|
||
|
||
for iframe in vimeo_iframes:
|
||
src = iframe.get('src', '')
|
||
# Extract video ID from URL
|
||
video_id_match = re.search(r'vimeo\.com/(?:video/)?(\d+)', src)
|
||
if video_id_match:
|
||
video_id = video_id_match.group(1)
|
||
claims.append(create_claim(
|
||
claim_type='video_vimeo',
|
||
claim_value=f'https://vimeo.com/{video_id}',
|
||
xpath=get_xpath_lxml(iframe),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=src,
|
||
extraction_method='vimeo_iframe',
|
||
))
|
||
|
||
# Check for Vimeo links in data attributes
|
||
vimeo_data_attrs = tree.xpath(
|
||
'//*[@data-video-url[contains(., "vimeo")] or @data-src[contains(., "vimeo")]]'
|
||
)
|
||
for elem in vimeo_data_attrs:
|
||
for attr in ['data-video-url', 'data-src']:
|
||
value = elem.get(attr, '')
|
||
if 'vimeo' in value.lower():
|
||
video_id_match = re.search(r'vimeo\.com/(?:video/)?(\d+)', value)
|
||
if video_id_match:
|
||
video_id = video_id_match.group(1)
|
||
claims.append(create_claim(
|
||
claim_type='video_vimeo',
|
||
claim_value=f'https://vimeo.com/{video_id}',
|
||
xpath=get_xpath_lxml(elem),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=value,
|
||
extraction_method='vimeo_data_attr',
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_gallery_patterns(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Detect gallery/slideshow patterns indicating collection displays.
|
||
|
||
Finds common gallery plugins/patterns:
|
||
- Lightbox galleries
|
||
- WordPress gallery blocks
|
||
- Carousel/slider components
|
||
- Collection display patterns
|
||
"""
|
||
claims = []
|
||
|
||
# Gallery class patterns to detect (from analysis of 115K+ files)
|
||
gallery_patterns = [
|
||
# Lightbox patterns
|
||
('*[contains(@class, "lightbox")]', 'lightbox'),
|
||
('*[contains(@class, "fancybox")]', 'fancybox'),
|
||
('*[contains(@class, "simplelightbox")]', 'simplelightbox'),
|
||
# Gallery patterns
|
||
('*[contains(@class, "gallery")]', 'gallery'),
|
||
('*[contains(@class, "ngg-gallery")]', 'nextgen_gallery'),
|
||
('*[contains(@class, "spectra-image-gallery")]', 'spectra_gallery'),
|
||
('*[contains(@class, "et_pb_gallery")]', 'divi_gallery'),
|
||
('*[contains(@class, "kadence-blocks-gallery")]', 'kadence_gallery'),
|
||
('*[contains(@class, "elementor-gallery")]', 'elementor_gallery'),
|
||
('*[contains(@class, "woocommerce-product-gallery")]', 'woocommerce_gallery'),
|
||
# Carousel/slider patterns
|
||
('*[contains(@class, "carousel")]', 'carousel'),
|
||
('*[contains(@class, "slider")]', 'slider'),
|
||
('*[contains(@class, "swiper")]', 'swiper'),
|
||
('*[contains(@class, "slick")]', 'slick'),
|
||
# Collection page indicators
|
||
('*[contains(@class, "collection")]', 'collection'),
|
||
('*[contains(@class, "exhibit")]', 'exhibition'),
|
||
('*[contains(@class, "artwork")]', 'artwork'),
|
||
]
|
||
|
||
detected_galleries = {}
|
||
|
||
for xpath_pattern, gallery_type in gallery_patterns:
|
||
try:
|
||
elements = tree.xpath(f'//{xpath_pattern}')
|
||
if elements:
|
||
if gallery_type not in detected_galleries:
|
||
detected_galleries[gallery_type] = {
|
||
'count': len(elements),
|
||
'first_xpath': get_xpath_lxml(elements[0]),
|
||
}
|
||
except Exception:
|
||
continue
|
||
|
||
# Create claims for detected galleries
|
||
for gallery_type, info in detected_galleries.items():
|
||
claims.append(create_claim(
|
||
claim_type='gallery_detected',
|
||
claim_value=gallery_type,
|
||
xpath=info['first_xpath'],
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"{gallery_type}: {info['count']} elements",
|
||
extraction_method='gallery_pattern',
|
||
xpath_match_score=0.85, # Pattern detection has slightly lower confidence
|
||
))
|
||
|
||
# Count images within gallery containers
|
||
for gallery_type, info in detected_galleries.items():
|
||
try:
|
||
# Find images within gallery containers
|
||
gallery_images = tree.xpath(f'//*[contains(@class, "{gallery_type}")]//img')
|
||
if len(gallery_images) >= 3: # Only report if 3+ images (likely a gallery)
|
||
claims.append(create_claim(
|
||
claim_type='image_count',
|
||
claim_value=str(len(gallery_images)),
|
||
xpath=info['first_xpath'],
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"{len(gallery_images)} images in {gallery_type} container",
|
||
extraction_method='gallery_image_count',
|
||
xpath_match_score=0.8,
|
||
))
|
||
break # Only count once
|
||
except Exception:
|
||
continue
|
||
|
||
return claims
|
||
|
||
|
||
def extract_collection_page_indicators(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Detect collection/exhibition page patterns from URL and content.
|
||
|
||
Heritage institutions typically have:
|
||
- /collectie/ or /collection/ URLs
|
||
- /tentoonstelling/ or /exhibition/ URLs
|
||
- /object/ or /item/ pages
|
||
"""
|
||
claims = []
|
||
|
||
# Check URL patterns (from source_url)
|
||
collection_url_patterns = [
|
||
('collectie', 'collection_nl'),
|
||
('collection', 'collection_en'),
|
||
('tentoonstelling', 'exhibition_nl'),
|
||
('exhibition', 'exhibition_en'),
|
||
('expositie', 'exhibition_nl'),
|
||
('/object/', 'object_page'),
|
||
('/item/', 'item_page'),
|
||
('/artwork/', 'artwork_page'),
|
||
('/archief/', 'archive_nl'),
|
||
('/archive/', 'archive_en'),
|
||
('/catalogus/', 'catalog_nl'),
|
||
('/catalog/', 'catalog_en'),
|
||
]
|
||
|
||
url_lower = source_url.lower()
|
||
for pattern, indicator_type in collection_url_patterns:
|
||
if pattern in url_lower:
|
||
claims.append(create_claim(
|
||
claim_type='collection_page',
|
||
claim_value=indicator_type,
|
||
xpath='/', # URL-based detection
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"URL contains '{pattern}'",
|
||
extraction_method='url_pattern',
|
||
xpath_match_score=0.9,
|
||
))
|
||
|
||
# Check for canonical collection page meta tags
|
||
canonical = tree.xpath('//link[@rel="canonical"]/@href')
|
||
for href in canonical:
|
||
href = str(href) # Cast lxml _ElementUnicodeResult to plain string
|
||
href_lower = href.lower()
|
||
for pattern, indicator_type in collection_url_patterns:
|
||
if pattern in href_lower:
|
||
link_elem = tree.xpath('//link[@rel="canonical"]')[0]
|
||
claims.append(create_claim(
|
||
claim_type='collection_page',
|
||
claim_value=f'{indicator_type}_canonical',
|
||
xpath=get_xpath_lxml(link_elem),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=href,
|
||
extraction_method='canonical_url_pattern',
|
||
xpath_match_score=0.95,
|
||
))
|
||
break
|
||
|
||
# Check for collection-related structured data
|
||
try:
|
||
scripts = tree.xpath('//script[@type="application/ld+json"]')
|
||
for script in scripts:
|
||
if script.text:
|
||
text_lower = script.text.lower()
|
||
if any(term in text_lower for term in ['collection', 'museum', 'exhibition', 'artwork', 'archivecomponent']):
|
||
claims.append(create_claim(
|
||
claim_type='collection_page',
|
||
claim_value='structured_data_collection',
|
||
xpath=get_xpath_lxml(script),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value='JSON-LD contains collection-related schema',
|
||
extraction_method='schema_org_collection',
|
||
xpath_match_score=0.85,
|
||
))
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
return claims
|
||
|
||
|
||
def extract_boekwinkeltjes_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract links to boekwinkeltjes.nl (Dutch secondhand book marketplace).
|
||
|
||
Heritage institutions often sell publications through boekwinkeltjes.nl.
|
||
Links may point to:
|
||
- Shop pages (/v/shopname/)
|
||
- Search results (/s/?q=...)
|
||
- General homepage references
|
||
"""
|
||
claims = []
|
||
|
||
# Find all links to boekwinkeltjes.nl
|
||
boekwinkeltjes_links = tree.xpath('//a[contains(@href, "boekwinkeltjes.nl")]')
|
||
|
||
for link in boekwinkeltjes_links:
|
||
href = str(link.get('href', ''))
|
||
if not href:
|
||
continue
|
||
|
||
# Determine link type
|
||
if '/v/' in href:
|
||
# Shop page: /v/shopname/
|
||
link_type = 'shop_page'
|
||
# Extract shop name
|
||
import re
|
||
match = re.search(r'/v/([^/]+)/?', href)
|
||
shop_name = match.group(1) if match else None
|
||
elif '/s/' in href or '/su/' in href:
|
||
# Search results
|
||
link_type = 'search_results'
|
||
shop_name = None
|
||
else:
|
||
# Generic link to homepage
|
||
link_type = 'homepage'
|
||
shop_name = None
|
||
|
||
claims.append(create_claim(
|
||
claim_type='external_boekwinkeltjes',
|
||
claim_value=href,
|
||
xpath=get_xpath_lxml(link),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"link_type={link_type}" + (f", shop={shop_name}" if shop_name else ""),
|
||
extraction_method='boekwinkeltjes_link',
|
||
xpath_match_score=1.0,
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_page_title(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract the full page title.
|
||
|
||
Unlike extract_title_claims which tries to parse org name,
|
||
this extracts the complete <title> tag content.
|
||
"""
|
||
claims = []
|
||
titles = tree.xpath('//title')
|
||
for title in titles:
|
||
if title.text:
|
||
raw_text = title.text.strip()
|
||
if raw_text and len(raw_text) > 0:
|
||
claims.append(create_claim(
|
||
claim_type='page_title',
|
||
claim_value=raw_text,
|
||
xpath=get_xpath_lxml(title),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='title_tag_full',
|
||
))
|
||
return claims
|
||
|
||
|
||
def extract_favicon(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract favicon URLs from link tags.
|
||
|
||
Looks for:
|
||
- <link rel="icon" href="...">
|
||
- <link rel="shortcut icon" href="...">
|
||
- <link rel="apple-touch-icon" href="...">
|
||
"""
|
||
claims = []
|
||
|
||
# Various favicon link patterns
|
||
favicon_selectors = [
|
||
'//link[@rel="icon"]',
|
||
'//link[@rel="shortcut icon"]',
|
||
'//link[contains(@rel, "icon")]',
|
||
'//link[@rel="apple-touch-icon"]',
|
||
'//link[@rel="apple-touch-icon-precomposed"]',
|
||
]
|
||
|
||
seen_hrefs = set()
|
||
|
||
for selector in favicon_selectors:
|
||
links = tree.xpath(selector)
|
||
for link in links:
|
||
href = link.get('href', '')
|
||
if href and href not in seen_hrefs:
|
||
seen_hrefs.add(href)
|
||
# Get additional attributes
|
||
sizes = link.get('sizes', '')
|
||
link_type = link.get('type', '')
|
||
|
||
raw_value = f"sizes={sizes}" if sizes else ""
|
||
if link_type:
|
||
raw_value += f", type={link_type}" if raw_value else f"type={link_type}"
|
||
|
||
claims.append(create_claim(
|
||
claim_type='favicon',
|
||
claim_value=str(href),
|
||
xpath=get_xpath_lxml(link),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=raw_value or None,
|
||
extraction_method='favicon_link',
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_logo(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract logo images from various patterns.
|
||
|
||
Looks for:
|
||
- Images with 'logo' in class, id, alt, or src
|
||
- Images inside elements with 'logo' in class/id
|
||
- Schema.org logo property
|
||
- OpenGraph image (often the logo)
|
||
"""
|
||
claims = []
|
||
seen_srcs = set()
|
||
|
||
# Pattern 1: Images with 'logo' in attributes
|
||
logo_images = tree.xpath(
|
||
'//img[contains(@class, "logo") or contains(@id, "logo") or '
|
||
'contains(translate(@alt, "LOGO", "logo"), "logo") or '
|
||
'contains(translate(@src, "LOGO", "logo"), "logo")]'
|
||
)
|
||
|
||
for img in logo_images:
|
||
src = img.get('src', '')
|
||
if src and src not in seen_srcs:
|
||
seen_srcs.add(src)
|
||
alt = img.get('alt', '')
|
||
claims.append(create_claim(
|
||
claim_type='logo',
|
||
claim_value=str(src),
|
||
xpath=get_xpath_lxml(img),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"alt={alt}" if alt else None,
|
||
extraction_method='logo_img_attr',
|
||
))
|
||
|
||
# Pattern 2: Images inside logo containers
|
||
logo_container_images = tree.xpath(
|
||
'//*[contains(@class, "logo") or contains(@id, "logo")]//img'
|
||
)
|
||
|
||
for img in logo_container_images:
|
||
src = img.get('src', '')
|
||
if src and src not in seen_srcs:
|
||
seen_srcs.add(src)
|
||
alt = img.get('alt', '')
|
||
claims.append(create_claim(
|
||
claim_type='logo',
|
||
claim_value=str(src),
|
||
xpath=get_xpath_lxml(img),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"alt={alt}" if alt else None,
|
||
extraction_method='logo_container_img',
|
||
))
|
||
|
||
# Pattern 3: Schema.org logo
|
||
schema_logos = tree.xpath('//meta[@property="og:image"]/@content')
|
||
schema_elements = tree.xpath('//meta[@property="og:image"]')
|
||
for i, content in enumerate(schema_logos):
|
||
if content and str(content) not in seen_srcs:
|
||
seen_srcs.add(str(content))
|
||
claims.append(create_claim(
|
||
claim_type='logo',
|
||
claim_value=str(content),
|
||
xpath=get_xpath_lxml(schema_elements[i]) if i < len(schema_elements) else '//meta[@property="og:image"]',
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='og_image',
|
||
xpath_match_score=0.7, # Lower confidence - og:image might not be logo
|
||
))
|
||
|
||
# Pattern 4: Link with logo in itemprop
|
||
itemprop_logos = tree.xpath('//*[@itemprop="logo"]/@content | //*[@itemprop="logo"]/@src | //*[@itemprop="logo"]/@href')
|
||
itemprop_elements = tree.xpath('//*[@itemprop="logo"]')
|
||
for i, content in enumerate(itemprop_logos):
|
||
if content and str(content) not in seen_srcs:
|
||
seen_srcs.add(str(content))
|
||
claims.append(create_claim(
|
||
claim_type='logo',
|
||
claim_value=str(content),
|
||
xpath=get_xpath_lxml(itemprop_elements[i]) if i < len(itemprop_elements) else '//*[@itemprop="logo"]',
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='schema_logo',
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_login_signup(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract login and signup buttons/links.
|
||
|
||
Detects authentication UI elements indicating member portals,
|
||
user accounts, or restricted content areas.
|
||
"""
|
||
claims = []
|
||
|
||
# Login patterns (Dutch + English + German + French)
|
||
login_patterns = [
|
||
# Dutch
|
||
'inloggen', 'log in', 'login', 'aanmelden', 'mijn account', 'mijn profiel',
|
||
# English
|
||
'sign in', 'signin', 'log on', 'logon', 'my account', 'member login',
|
||
# German
|
||
'anmelden', 'einloggen', 'mein konto',
|
||
# French
|
||
'connexion', 'se connecter', 'mon compte',
|
||
]
|
||
|
||
# Signup patterns (Dutch + English + German + French)
|
||
signup_patterns = [
|
||
# Dutch
|
||
'registreren', 'registreer', 'account aanmaken', 'word lid', 'lid worden',
|
||
'nieuw account', 'schrijf in', 'inschrijven',
|
||
# English
|
||
'sign up', 'signup', 'register', 'create account', 'join', 'become a member',
|
||
'new account', 'subscribe',
|
||
# German
|
||
'registrieren', 'konto erstellen', 'mitglied werden',
|
||
# French
|
||
'inscription', 's\'inscrire', 'créer un compte',
|
||
]
|
||
|
||
# Search in links and buttons
|
||
clickable_elements = tree.xpath('//a | //button | //input[@type="submit"] | //input[@type="button"]')
|
||
|
||
for elem in clickable_elements:
|
||
# Get text content and relevant attributes
|
||
text_content = ''.join(elem.itertext()).strip().lower()
|
||
href = str(elem.get('href', '')).lower()
|
||
title = str(elem.get('title', '')).lower()
|
||
aria_label = str(elem.get('aria-label', '')).lower()
|
||
elem_class = str(elem.get('class', '')).lower()
|
||
elem_id = str(elem.get('id', '')).lower()
|
||
value = str(elem.get('value', '')).lower()
|
||
|
||
# Combine all searchable text
|
||
searchable = f"{text_content} {href} {title} {aria_label} {elem_class} {elem_id} {value}"
|
||
|
||
# Check for login patterns
|
||
for pattern in login_patterns:
|
||
if pattern in searchable:
|
||
# Get the actual displayed text or value
|
||
display_value = text_content or value or title or aria_label or pattern
|
||
claims.append(create_claim(
|
||
claim_type='ui_login',
|
||
claim_value=elem.get('href', '') or display_value,
|
||
xpath=get_xpath_lxml(elem),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"text={display_value}, pattern={pattern}",
|
||
extraction_method='login_button',
|
||
))
|
||
break # Only one match per element
|
||
|
||
# Check for signup patterns
|
||
for pattern in signup_patterns:
|
||
if pattern in searchable:
|
||
display_value = text_content or value or title or aria_label or pattern
|
||
claims.append(create_claim(
|
||
claim_type='ui_signup',
|
||
claim_value=elem.get('href', '') or display_value,
|
||
xpath=get_xpath_lxml(elem),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value=f"text={display_value}, pattern={pattern}",
|
||
extraction_method='signup_button',
|
||
))
|
||
break # Only one match per element
|
||
|
||
# Also check for login forms
|
||
login_forms = tree.xpath(
|
||
'//form[contains(@action, "login") or contains(@action, "signin") or '
|
||
'contains(@action, "auth") or contains(@id, "login") or contains(@class, "login")]'
|
||
)
|
||
|
||
for form in login_forms:
|
||
action = form.get('action', '')
|
||
claims.append(create_claim(
|
||
claim_type='ui_login',
|
||
claim_value=str(action) if action else 'login_form_detected',
|
||
xpath=get_xpath_lxml(form),
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
raw_value='login_form',
|
||
extraction_method='login_form',
|
||
xpath_match_score=0.9,
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_financial_document_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
|
||
"""Extract links to annual reports, financial statements, and policy documents.
|
||
|
||
Targets Dutch heritage institution financial documents:
|
||
- Jaarverslag (annual report / public annual report)
|
||
- Jaarstukken (annual financial statements / accounts)
|
||
- Jaarrekening (annual financial report)
|
||
- Meerjarenbeleid (multi-year policy document)
|
||
- Beleidsplan (policy plan)
|
||
- ANBI publicatieplicht (Dutch charity tax publication requirement)
|
||
|
||
Extracts PDF links from pages typically at /organisatie/jaarverslagen/ or similar.
|
||
Each document link creates a URL claim, and an optional year claim if extractable.
|
||
|
||
Args:
|
||
tree: lxml parsed HTML tree
|
||
html_file: Relative path to HTML file (for provenance)
|
||
source_url: URL where page was fetched from
|
||
retrieved_on: ISO timestamp of archival
|
||
|
||
Returns:
|
||
List of claim dictionaries with XPath provenance
|
||
"""
|
||
claims = []
|
||
|
||
# Document type classification patterns (Dutch and English)
|
||
# Order matters - more specific patterns should come first
|
||
doc_patterns = {
|
||
'annual_report_url': [
|
||
r'publieksjaarverslag', # Public annual report
|
||
r'jaarverslag', # Annual report (general)
|
||
r'annual[\s_-]?report',
|
||
],
|
||
'financial_statement_url': [
|
||
r'jaarstukken', # Annual financial statements
|
||
r'jaarrekening', # Annual accounts
|
||
r'financial[\s_-]?statement',
|
||
r'annual[\s_-]?accounts',
|
||
],
|
||
'anbi_publication_url': [
|
||
r'anbi',
|
||
r'publicatieplicht',
|
||
],
|
||
'policy_document_url': [
|
||
r'meerjarenbeleid', # Multi-year policy
|
||
r'meerjarenbeleidsplan',
|
||
r'mjbp', # Abbreviation
|
||
r'beleidsplan', # Policy plan
|
||
r'strategisch[\s_-]?plan',
|
||
r'multi[\s_-]?year[\s_-]?policy',
|
||
r'strategic[\s_-]?plan',
|
||
],
|
||
}
|
||
|
||
# Year extraction pattern - captures 4-digit years 2000-2099
|
||
year_pattern = re.compile(r'(20\d{2})')
|
||
|
||
# Find all PDF links
|
||
pdf_links = tree.xpath('//a[contains(@href, ".pdf")]')
|
||
|
||
for link in pdf_links:
|
||
href = link.get('href', '')
|
||
if not href:
|
||
continue
|
||
|
||
# Normalize href for matching
|
||
href_lower = href.lower()
|
||
|
||
# Get link text for matching and display
|
||
link_text = ''.join(link.itertext()).strip()
|
||
link_text_lower = link_text.lower()
|
||
|
||
# Determine document type by checking patterns against link text and URL
|
||
claim_type = None
|
||
matched_pattern = None
|
||
|
||
for doc_type, patterns in doc_patterns.items():
|
||
for pattern in patterns:
|
||
# Check both link text and URL for pattern
|
||
if re.search(pattern, link_text_lower, re.IGNORECASE) or \
|
||
re.search(pattern, href_lower, re.IGNORECASE):
|
||
claim_type = doc_type
|
||
matched_pattern = pattern
|
||
break
|
||
if claim_type:
|
||
break
|
||
|
||
if not claim_type:
|
||
continue # Not a financial document we're interested in
|
||
|
||
# Extract year from URL or link text
|
||
# Try URL first (more reliable), then link text
|
||
year_match = year_pattern.search(href)
|
||
if not year_match:
|
||
year_match = year_pattern.search(link_text)
|
||
|
||
doc_year = year_match.group(1) if year_match else None
|
||
|
||
# Get XPath for provenance
|
||
xpath = get_xpath_lxml(link)
|
||
|
||
# Create URL claim for the document
|
||
claims.append(create_claim(
|
||
claim_type=claim_type,
|
||
claim_value=href,
|
||
xpath=xpath,
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='financial_document_link',
|
||
raw_value=f"text={link_text}, pattern={matched_pattern}",
|
||
))
|
||
|
||
# Create year claim if year was extracted
|
||
if doc_year:
|
||
claims.append(create_claim(
|
||
claim_type='financial_document_year',
|
||
claim_value=doc_year,
|
||
xpath=xpath, # Same XPath - year comes from this element
|
||
html_file=html_file,
|
||
source_url=source_url,
|
||
retrieved_on=retrieved_on,
|
||
extraction_method='financial_document_year',
|
||
raw_value=f"year={doc_year}, from={claim_type}, url={href}",
|
||
))
|
||
|
||
return claims
|
||
|
||
|
||
def extract_all_claims(html_content: str, html_file: str, source_url: str, retrieved_on: str,
|
||
institution_domain: Optional[str] = None) -> List[Dict]:
|
||
"""Extract all claims from HTML content.
|
||
|
||
Args:
|
||
html_content: Raw HTML string
|
||
html_file: Relative path to HTML file
|
||
source_url: URL where page was fetched from
|
||
retrieved_on: ISO timestamp of archival
|
||
institution_domain: Primary domain of the institution (for email classification)
|
||
|
||
Returns:
|
||
List of claim dictionaries, or empty list if default page detected
|
||
"""
|
||
claims = []
|
||
|
||
# ==========================================================================
|
||
# BLOCKLIST CHECK: Skip extraction for hosting provider default pages
|
||
# ==========================================================================
|
||
# These pages contain social links to hosting providers (e.g., Plesk's YouTube)
|
||
# which should NOT be attributed to heritage institutions.
|
||
for indicator in DEFAULT_PAGE_INDICATORS:
|
||
if indicator in html_content:
|
||
print(f" ⚠️ BLOCKED: Default page detected ('{indicator}')")
|
||
print(f" Skipping extraction to avoid hosting provider attribution")
|
||
return [] # Return empty list - no claims from default pages
|
||
|
||
try:
|
||
# Parse with lxml for proper XPath support
|
||
tree = etree.HTML(html_content)
|
||
|
||
# Standard extractors (all take same 4 params)
|
||
standard_extractors = [
|
||
extract_title_claims,
|
||
extract_meta_description,
|
||
extract_og_site_name,
|
||
extract_schema_org,
|
||
extract_phone_links,
|
||
extract_social_links,
|
||
extract_h1_org_name,
|
||
# Video embeds
|
||
extract_youtube_embeds,
|
||
extract_vimeo_embeds,
|
||
# Gallery and collection patterns
|
||
extract_gallery_patterns,
|
||
extract_collection_page_indicators,
|
||
# External marketplace links
|
||
extract_boekwinkeltjes_links,
|
||
# Page metadata and branding
|
||
extract_page_title,
|
||
extract_favicon,
|
||
extract_logo,
|
||
# UI patterns (login/signup detection)
|
||
extract_login_signup,
|
||
# Financial documents (jaarverslagen, jaarstukken, ANBI, beleidsplannen)
|
||
extract_financial_document_links,
|
||
]
|
||
|
||
for extractor in standard_extractors:
|
||
try:
|
||
claims.extend(extractor(tree, html_file, source_url, retrieved_on))
|
||
except Exception as e:
|
||
print(f" Warning: Extractor {extractor.__name__} failed: {e}")
|
||
|
||
# Email extractor gets additional institution_domain parameter
|
||
try:
|
||
claims.extend(extract_email_links(tree, html_file, source_url, retrieved_on, institution_domain))
|
||
except Exception as e:
|
||
print(f" Warning: Extractor extract_email_links failed: {e}")
|
||
|
||
except Exception as e:
|
||
print(f" Error parsing HTML: {e}")
|
||
|
||
return claims
|
||
|
||
|
||
def deduplicate_claims(claims: List[Dict]) -> List[Dict]:
|
||
"""Remove duplicate claims, keeping highest confidence."""
|
||
seen = {}
|
||
for claim in claims:
|
||
key = (claim['claim_type'], claim['claim_value'])
|
||
if key not in seen or claim['xpath_match_score'] > seen[key]['xpath_match_score']:
|
||
seen[key] = claim
|
||
return list(seen.values())
|
||
|
||
|
||
def get_web_archive_path(entry_data: dict, entry_num: str) -> Optional[Path]:
|
||
"""Get the web archive directory path for an entry."""
|
||
web_enrichment = entry_data.get('web_enrichment', {})
|
||
web_archives = web_enrichment.get('web_archives', [])
|
||
|
||
if web_archives:
|
||
archive = web_archives[0]
|
||
directory = archive.get('directory')
|
||
if directory:
|
||
# Directory is relative to data/custodian/ (e.g., "web/0000/example.nl")
|
||
archive_path = BASE_DIR / 'custodian' / directory
|
||
if archive_path.exists():
|
||
return archive_path
|
||
|
||
# Fallback: look for directory in web/{entry_num}/
|
||
entry_web_dir = WEB_DIR / entry_num
|
||
if entry_web_dir.exists():
|
||
subdirs = [d for d in entry_web_dir.iterdir() if d.is_dir()]
|
||
if subdirs:
|
||
return subdirs[0]
|
||
|
||
return None
|
||
|
||
|
||
def load_metadata(archive_path: Path) -> Optional[dict]:
|
||
"""Load metadata.yaml from archive directory."""
|
||
metadata_file = archive_path / 'metadata.yaml'
|
||
if metadata_file.exists():
|
||
try:
|
||
with open(metadata_file, 'r', encoding='utf-8') as f:
|
||
return yaml.safe_load(f)
|
||
except Exception as e:
|
||
print(f" Warning: Failed to load {metadata_file}: {e}")
|
||
return None
|
||
|
||
|
||
def find_html_files(archive_path: Path) -> List[Path]:
|
||
"""Find all HTML files in archive directory."""
|
||
html_files = []
|
||
|
||
# Check pages/ directory first
|
||
pages_dir = archive_path / 'pages'
|
||
if pages_dir.exists():
|
||
html_files.extend(pages_dir.glob('*.html'))
|
||
|
||
# Check mirror/ directory
|
||
mirror_dir = archive_path / 'mirror'
|
||
if mirror_dir.exists():
|
||
html_files.extend(mirror_dir.rglob('*.html'))
|
||
|
||
# Check root for rendered.html
|
||
rendered = archive_path / 'rendered.html'
|
||
if rendered.exists():
|
||
html_files.append(rendered)
|
||
|
||
return html_files
|
||
|
||
|
||
def extract_entry_number(filename: str) -> str:
|
||
"""Extract entry number from filename."""
|
||
match = re.match(r'^(\d+)', filename)
|
||
return match.group(1) if match else filename.replace('.yaml', '')
|
||
|
||
|
||
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, List[str]]:
|
||
"""
|
||
Process a single entry file to extract HTML claims.
|
||
|
||
Returns: (claims_count, errors)
|
||
"""
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
if not data:
|
||
return 0, ["Empty file"]
|
||
|
||
entry_num = extract_entry_number(filepath.name)
|
||
errors = []
|
||
all_claims = []
|
||
|
||
# Get web archive path
|
||
archive_path = get_web_archive_path(data, entry_num)
|
||
if not archive_path or not archive_path.exists():
|
||
return 0, [f"No web archive found for entry {entry_num}"]
|
||
|
||
# Load metadata for timestamps
|
||
metadata = load_metadata(archive_path)
|
||
source_url = metadata.get('url', '') if metadata else ''
|
||
retrieved_on = metadata.get('archive_timestamp', '') if metadata else ''
|
||
|
||
if not source_url:
|
||
# Try to get URL from entry data
|
||
source_url = data.get('web_enrichment', {}).get('web_archives', [{}])[0].get('url', '')
|
||
if not source_url:
|
||
source_url = data.get('original_entry', {}).get('webadres_organisatie', '')
|
||
|
||
# Extract institution domain for email classification
|
||
institution_domain = get_institution_domain(data)
|
||
|
||
# Find and process HTML files
|
||
html_files = find_html_files(archive_path)
|
||
if not html_files:
|
||
return 0, [f"No HTML files found in {archive_path}"]
|
||
|
||
# Process HTML files with smart prioritization:
|
||
# 1. Prioritize pages with known interesting content patterns
|
||
# 2. Process all prioritized files + sample of others
|
||
MAX_HTML_FILES = 100
|
||
|
||
# Patterns that indicate interesting subpages
|
||
priority_patterns = [
|
||
'bibliotheek', 'collectie', 'collection', 'publicat', 'uitgave',
|
||
'winkel', 'shop', 'boek', 'book', 'contact', 'over-ons', 'about',
|
||
'social', 'link', 'partner', 'sponsor'
|
||
]
|
||
|
||
# Separate priority files from others
|
||
priority_files = []
|
||
other_files = []
|
||
|
||
for f in html_files:
|
||
filename_lower = str(f).lower()
|
||
if any(p in filename_lower for p in priority_patterns):
|
||
priority_files.append(f)
|
||
else:
|
||
other_files.append(f)
|
||
|
||
# Process all priority files + fill remaining slots with others
|
||
files_to_process = priority_files[:MAX_HTML_FILES]
|
||
remaining_slots = MAX_HTML_FILES - len(files_to_process)
|
||
if remaining_slots > 0:
|
||
files_to_process.extend(other_files[:remaining_slots])
|
||
|
||
for html_file in files_to_process:
|
||
try:
|
||
with open(html_file, 'r', encoding='utf-8', errors='replace') as f:
|
||
html_content = f.read()
|
||
|
||
html_file_rel = str(html_file.relative_to(BASE_DIR))
|
||
claims = extract_all_claims(html_content, html_file_rel, source_url, retrieved_on, institution_domain)
|
||
all_claims.extend(claims)
|
||
except Exception as e:
|
||
errors.append(f"Failed to process {html_file}: {e}")
|
||
|
||
# Deduplicate claims
|
||
all_claims = deduplicate_claims(all_claims)
|
||
|
||
# Validate claims using LLM (Gado2 v1.5.0 convention)
|
||
invalid_claims = []
|
||
if NER_ENABLED and all_claims and not SKIP_VALIDATION:
|
||
# Filter claims using GLM-4.6 validation
|
||
all_claims, invalid_claims = filter_claims_with_validation(all_claims, verbose=False)
|
||
|
||
if not dry_run:
|
||
# Store claims in entry data (even if empty, to clear old bad data)
|
||
if 'web_claims' not in data:
|
||
data['web_claims'] = {}
|
||
|
||
web_claims_data = {
|
||
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
||
'source_archive': str(archive_path.relative_to(BASE_DIR)),
|
||
'claims_count': len(all_claims),
|
||
'claims': all_claims,
|
||
}
|
||
|
||
# Store invalid claims for audit (filtered by LLM validation)
|
||
if invalid_claims:
|
||
web_claims_data['removed_invalid_claims'] = invalid_claims
|
||
web_claims_data['validation_metadata'] = {
|
||
'model': NER_MODEL,
|
||
'convention': NER_CONVENTION_VERSION,
|
||
'validated_at': datetime.now(timezone.utc).isoformat(),
|
||
'invalid_count': len(invalid_claims),
|
||
}
|
||
|
||
data['web_claims'] = web_claims_data
|
||
|
||
# Write back
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
||
|
||
return len(all_claims), errors
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='Extract structured claims from archived HTML')
|
||
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
|
||
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
|
||
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
|
||
parser.add_argument('--force', action='store_true', help='Re-extract even if web_claims exists')
|
||
parser.add_argument('--fast', action='store_true',
|
||
help='Fast mode: skip email NER (use patterns only), keep claim validation')
|
||
parser.add_argument('--no-validation', action='store_true',
|
||
help='Skip LLM claim validation (fastest, but may include invalid claims)')
|
||
args = parser.parse_args()
|
||
|
||
# Set global flags for fast mode
|
||
global FAST_MODE, SKIP_VALIDATION
|
||
FAST_MODE = args.fast
|
||
SKIP_VALIDATION = args.no_validation
|
||
|
||
if not HAS_DEPS:
|
||
print("Error: Required dependencies not installed.")
|
||
print("Run: pip install beautifulsoup4 lxml")
|
||
return 1
|
||
|
||
# Find entry files
|
||
if args.entry:
|
||
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
|
||
else:
|
||
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
|
||
|
||
if args.limit:
|
||
files = files[:args.limit]
|
||
|
||
total_claims = 0
|
||
total_entries = 0
|
||
total_skipped = 0
|
||
total_failed = 0
|
||
|
||
print(f"Processing {len(files)} entries...")
|
||
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
|
||
print()
|
||
|
||
for filepath in files:
|
||
if filepath.is_dir():
|
||
continue
|
||
|
||
# Skip if already has web_claims (unless --force)
|
||
if not args.force:
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
if data and data.get('web_claims', {}).get('claims'):
|
||
total_skipped += 1
|
||
continue
|
||
|
||
claims_count, errors = process_entry(filepath, dry_run=args.dry_run)
|
||
|
||
if claims_count > 0:
|
||
total_entries += 1
|
||
total_claims += claims_count
|
||
print(f" ✓ {filepath.name}: {claims_count} claims")
|
||
elif errors:
|
||
total_failed += 1
|
||
for e in errors:
|
||
print(f" ✗ {filepath.name}: {e}")
|
||
else:
|
||
total_failed += 1
|
||
print(f" ✗ {filepath.name}: No claims extracted")
|
||
|
||
print()
|
||
print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:")
|
||
print(f" Entries with claims: {total_entries}")
|
||
print(f" Total claims extracted: {total_claims}")
|
||
print(f" Skipped (already have claims): {total_skipped}")
|
||
print(f" Failed (no archive/claims): {total_failed}")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main())
|