glam/scripts/extract_html_claims.py
2025-12-01 16:06:34 +01:00

731 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Extract structured claims from archived website HTML with XPath provenance.
This script extracts verifiable data from archived HTML files following
the WebObservation provenance rules defined in AGENTS.md Rule 6.
EVERY claim MUST have:
- claim_type: Type of claim (org_name, description, email, phone, address, etc.)
- claim_value: The extracted value
- source_url: URL the claim was extracted from
- retrieved_on: ISO 8601 timestamp when page was archived
- xpath: XPath to the element containing this value
- html_file: Relative path to archived HTML file
- xpath_match_score: 1.0 for exact match, <1.0 for fuzzy match
Claims WITHOUT XPath provenance are FABRICATED and must NOT be stored.
Usage:
python scripts/extract_html_claims.py [--limit N] [--entry ENTRY_NUM] [--dry-run]
"""
import argparse
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
from urllib.parse import urlparse
import yaml
# Type hints for optional dependencies
etree: Any = None
BeautifulSoup: Any = None
try:
from lxml import etree as _etree
etree = _etree
HAS_LXML = True
except ImportError:
HAS_LXML = False
print("Warning: Missing dependency: lxml")
print("Install with: pip install lxml")
try:
from bs4 import BeautifulSoup as _BeautifulSoup
BeautifulSoup = _BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
print("Warning: Missing dependency: beautifulsoup4")
print("Install with: pip install beautifulsoup4")
HAS_DEPS = HAS_LXML # Only lxml is required for this script
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
WEB_DIR = ENTRIES_DIR / 'web'
# Claim types to extract
CLAIM_TYPES = {
'org_name': 'Organization/institution official name',
'org_name_alt': 'Alternative organization name',
'tagline': 'Organization tagline or slogan',
'description': 'Organization description',
'description_short': 'Short description (meta description)',
'email': 'Email address',
'phone': 'Phone number',
'address': 'Physical address',
'postal_code': 'Postal code',
'city': 'City name',
'opening_hours_text': 'Opening hours as text',
'social_twitter': 'Twitter/X URL',
'social_facebook': 'Facebook URL',
'social_instagram': 'Instagram URL',
'social_linkedin': 'LinkedIn URL',
'social_youtube': 'YouTube URL',
}
def get_xpath_lxml(element) -> str:
"""Generate absolute XPath for an lxml element."""
tree = element.getroottree()
return tree.getpath(element)
def get_xpath_bs4(element) -> str:
"""Generate XPath for a BeautifulSoup element."""
parts = []
current = element
while current and current.name:
siblings = [s for s in current.find_previous_siblings(current.name)]
index = len(siblings) + 1
parts.insert(0, f"{current.name}[{index}]")
current = current.parent
return '/' + '/'.join(parts) if parts else '/'
def create_claim(
claim_type: str,
claim_value: str,
xpath: str,
html_file: str,
source_url: str,
retrieved_on: str,
raw_value: Optional[str] = None,
extraction_method: str = 'html_parser',
xpath_match_score: float = 1.0,
) -> Dict[str, Any]:
"""Create a properly structured claim with full provenance."""
return {
'claim_type': claim_type,
'claim_value': claim_value.strip() if claim_value else '',
'raw_value': raw_value or claim_value,
'source_url': source_url,
'retrieved_on': retrieved_on,
'xpath': xpath,
'html_file': html_file,
'xpath_match_score': xpath_match_score,
'extraction_method': extraction_method,
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
# === Extractors for specific claim types ===
def extract_title_claims(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract organization name from <title> tag."""
claims = []
titles = tree.xpath('//title')
for title in titles:
if title.text:
raw_text = title.text.strip()
# Try to extract clean org name (before separator)
separators = [' - ', ' | ', ' ', '', ': ']
clean_name = raw_text
for sep in separators:
if sep in raw_text:
parts = raw_text.split(sep)
# Usually the org name is first or last
clean_name = parts[0].strip()
break
claims.append(create_claim(
claim_type='org_name',
claim_value=clean_name,
xpath=get_xpath_lxml(title),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
raw_value=raw_text,
extraction_method='title_tag',
))
return claims
def extract_meta_description(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract description from meta tags."""
claims = []
# Standard meta description
metas = tree.xpath('//meta[@name="description"]/@content')
meta_elements = tree.xpath('//meta[@name="description"]')
for i, content in enumerate(metas):
if content and content.strip():
claims.append(create_claim(
claim_type='description_short',
claim_value=content.strip(),
xpath=get_xpath_lxml(meta_elements[i]) if i < len(meta_elements) else '//meta[@name="description"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='meta_description',
))
# OpenGraph description
og_desc = tree.xpath('//meta[@property="og:description"]/@content')
og_elements = tree.xpath('//meta[@property="og:description"]')
for i, content in enumerate(og_desc):
if content and content.strip():
claims.append(create_claim(
claim_type='description_short',
claim_value=content.strip(),
xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:description"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='og_description',
))
return claims
def extract_og_site_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract organization name from og:site_name."""
claims = []
og_names = tree.xpath('//meta[@property="og:site_name"]/@content')
og_elements = tree.xpath('//meta[@property="og:site_name"]')
for i, content in enumerate(og_names):
if content and content.strip():
claims.append(create_claim(
claim_type='org_name',
claim_value=content.strip(),
xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:site_name"]',
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='og_site_name',
))
return claims
def extract_schema_org(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract data from schema.org JSON-LD."""
claims = []
import json
scripts = tree.xpath('//script[@type="application/ld+json"]')
for script in scripts:
if script.text:
try:
data = json.loads(script.text)
if isinstance(data, list):
for item in data:
claims.extend(_extract_schema_item(item, get_xpath_lxml(script), html_file, source_url, retrieved_on))
else:
claims.extend(_extract_schema_item(data, get_xpath_lxml(script), html_file, source_url, retrieved_on))
except json.JSONDecodeError:
pass
return claims
def _extract_schema_item(item: dict, xpath: str, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract claims from a schema.org item."""
claims = []
# Get the @type to distinguish organizations from events
item_type = item.get('@type', '')
if isinstance(item_type, list):
item_type = item_type[0] if item_type else ''
# Organization types that should have org_name extracted
org_types = {
'Organization', 'LocalBusiness', 'Museum', 'Library', 'Archive',
'EducationalOrganization', 'GovernmentOrganization', 'NGO',
'Corporation', 'Place', 'CivicStructure', 'LandmarksOrHistoricalBuildings',
'PerformingArtsTheater', 'MovieTheater', 'Zoo', 'Aquarium',
}
# Event types - extract as event_name, not org_name
event_types = {'Event', 'BusinessEvent', 'ChildrensEvent', 'ComedyEvent',
'CourseInstance', 'DanceEvent', 'DeliveryEvent', 'EducationEvent',
'EventSeries', 'ExhibitionEvent', 'Festival', 'FoodEvent',
'Hackathon', 'LiteraryEvent', 'MusicEvent', 'PublicationEvent',
'SaleEvent', 'ScreeningEvent', 'SocialEvent', 'SportsEvent',
'TheaterEvent', 'VisualArtsEvent'}
is_org = any(t in item_type for t in org_types) or not item_type
is_event = any(t in item_type for t in event_types)
# Organization name - only for org types or if @type is missing
if 'name' in item and is_org and not is_event:
name_value = item['name']
# Skip if it looks like HTML/code
if name_value and '<' not in name_value and len(name_value) < 200:
claims.append(create_claim(
claim_type='org_name',
claim_value=name_value,
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_name',
))
# Description - only for organizations, skip HTML/code
if 'description' in item and is_org and not is_event:
desc_value = item['description']
# Skip if it looks like HTML/code
if desc_value and '<' not in desc_value and 'vc_row' not in desc_value:
claims.append(create_claim(
claim_type='description',
claim_value=desc_value,
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_description',
))
# Address
if 'address' in item:
addr = item['address']
if isinstance(addr, str):
claims.append(create_claim(
claim_type='address',
claim_value=addr,
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_address',
))
elif isinstance(addr, dict):
if 'streetAddress' in addr:
claims.append(create_claim(
claim_type='address',
claim_value=addr['streetAddress'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_streetAddress',
))
if 'postalCode' in addr:
claims.append(create_claim(
claim_type='postal_code',
claim_value=addr['postalCode'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_postalCode',
))
if 'addressLocality' in addr:
claims.append(create_claim(
claim_type='city',
claim_value=addr['addressLocality'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_addressLocality',
))
# Phone
if 'telephone' in item:
claims.append(create_claim(
claim_type='phone',
claim_value=item['telephone'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_telephone',
))
# Email
if 'email' in item:
claims.append(create_claim(
claim_type='email',
claim_value=item['email'],
xpath=xpath,
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='schema_org_email',
))
# Social media
if 'sameAs' in item:
same_as = item['sameAs'] if isinstance(item['sameAs'], list) else [item['sameAs']]
for url in same_as:
if 'twitter.com' in url or 'x.com' in url:
claims.append(create_claim(claim_type='social_twitter', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'facebook.com' in url:
claims.append(create_claim(claim_type='social_facebook', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'instagram.com' in url:
claims.append(create_claim(claim_type='social_instagram', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'linkedin.com' in url:
claims.append(create_claim(claim_type='social_linkedin', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
elif 'youtube.com' in url:
claims.append(create_claim(claim_type='social_youtube', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs'))
return claims
def extract_email_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract email addresses from mailto: links."""
claims = []
mailto_links = tree.xpath('//a[starts-with(@href, "mailto:")]')
for link in mailto_links:
href = link.get('href', '')
if href.startswith('mailto:'):
email = href[7:].split('?')[0] # Remove query params
if email and '@' in email:
claims.append(create_claim(
claim_type='email',
claim_value=email,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='mailto_link',
))
return claims
def extract_phone_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract phone numbers from tel: links."""
claims = []
tel_links = tree.xpath('//a[starts-with(@href, "tel:")]')
for link in tel_links:
href = link.get('href', '')
if href.startswith('tel:'):
phone = href[4:]
if phone:
claims.append(create_claim(
claim_type='phone',
claim_value=phone,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='tel_link',
))
return claims
def extract_social_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract social media links."""
claims = []
social_patterns = {
'social_twitter': ['twitter.com', 'x.com'],
'social_facebook': ['facebook.com'],
'social_instagram': ['instagram.com'],
'social_linkedin': ['linkedin.com'],
'social_youtube': ['youtube.com'],
}
for link in tree.xpath('//a[@href]'):
href = link.get('href', '')
for claim_type, domains in social_patterns.items():
for domain in domains:
if domain in href:
claims.append(create_claim(
claim_type=claim_type,
claim_value=href,
xpath=get_xpath_lxml(link),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='social_link',
))
break
return claims
def extract_h1_org_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract organization name from first h1."""
claims = []
h1s = tree.xpath('//h1')
if h1s:
h1 = h1s[0]
text = ''.join(h1.itertext()).strip()
if text and len(text) > 2 and len(text) < 150:
claims.append(create_claim(
claim_type='org_name',
claim_value=text,
xpath=get_xpath_lxml(h1),
html_file=html_file,
source_url=source_url,
retrieved_on=retrieved_on,
extraction_method='h1_tag',
xpath_match_score=0.9, # Slightly lower confidence
))
return claims
def extract_all_claims(html_content: str, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]:
"""Extract all claims from HTML content."""
claims = []
try:
# Parse with lxml for proper XPath support
tree = etree.HTML(html_content)
# Run all extractors
extractors = [
extract_title_claims,
extract_meta_description,
extract_og_site_name,
extract_schema_org,
extract_email_links,
extract_phone_links,
extract_social_links,
extract_h1_org_name,
]
for extractor in extractors:
try:
claims.extend(extractor(tree, html_file, source_url, retrieved_on))
except Exception as e:
print(f" Warning: Extractor {extractor.__name__} failed: {e}")
except Exception as e:
print(f" Error parsing HTML: {e}")
return claims
def deduplicate_claims(claims: List[Dict]) -> List[Dict]:
"""Remove duplicate claims, keeping highest confidence."""
seen = {}
for claim in claims:
key = (claim['claim_type'], claim['claim_value'])
if key not in seen or claim['xpath_match_score'] > seen[key]['xpath_match_score']:
seen[key] = claim
return list(seen.values())
def get_web_archive_path(entry_data: dict, entry_num: str) -> Optional[Path]:
"""Get the web archive directory path for an entry."""
web_enrichment = entry_data.get('web_enrichment', {})
web_archives = web_enrichment.get('web_archives', [])
if web_archives:
archive = web_archives[0]
directory = archive.get('directory')
if directory:
return ENTRIES_DIR / directory
# Fallback: look for directory in web/{entry_num}/
entry_web_dir = WEB_DIR / entry_num
if entry_web_dir.exists():
subdirs = [d for d in entry_web_dir.iterdir() if d.is_dir()]
if subdirs:
return subdirs[0]
return None
def load_metadata(archive_path: Path) -> Optional[dict]:
"""Load metadata.yaml from archive directory."""
metadata_file = archive_path / 'metadata.yaml'
if metadata_file.exists():
try:
with open(metadata_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f" Warning: Failed to load {metadata_file}: {e}")
return None
def find_html_files(archive_path: Path) -> List[Path]:
"""Find all HTML files in archive directory."""
html_files = []
# Check pages/ directory first
pages_dir = archive_path / 'pages'
if pages_dir.exists():
html_files.extend(pages_dir.glob('*.html'))
# Check mirror/ directory
mirror_dir = archive_path / 'mirror'
if mirror_dir.exists():
html_files.extend(mirror_dir.rglob('*.html'))
# Check root for rendered.html
rendered = archive_path / 'rendered.html'
if rendered.exists():
html_files.append(rendered)
return html_files
def extract_entry_number(filename: str) -> str:
"""Extract entry number from filename."""
match = re.match(r'^(\d+)', filename)
return match.group(1) if match else filename.replace('.yaml', '')
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, List[str]]:
"""
Process a single entry file to extract HTML claims.
Returns: (claims_count, errors)
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return 0, ["Empty file"]
entry_num = extract_entry_number(filepath.name)
errors = []
all_claims = []
# Get web archive path
archive_path = get_web_archive_path(data, entry_num)
if not archive_path or not archive_path.exists():
return 0, [f"No web archive found for entry {entry_num}"]
# Load metadata for timestamps
metadata = load_metadata(archive_path)
source_url = metadata.get('url', '') if metadata else ''
retrieved_on = metadata.get('archive_timestamp', '') if metadata else ''
if not source_url:
# Try to get URL from entry data
source_url = data.get('web_enrichment', {}).get('web_archives', [{}])[0].get('url', '')
if not source_url:
source_url = data.get('original_entry', {}).get('webadres_organisatie', '')
# Find and process HTML files
html_files = find_html_files(archive_path)
if not html_files:
return 0, [f"No HTML files found in {archive_path}"]
# Process HTML files in priority order:
# 1. rendered.html (Playwright-rendered, most complete)
# 2. index.html from root (fallback)
# 3. index.html from mirror/ (warc mirror)
# 4. First available HTML file
rendered_files = [f for f in html_files if f.name == 'rendered.html' and f.parent == archive_path]
root_index = [f for f in html_files if f.name == 'index.html' and f.parent == archive_path]
mirror_index = [f for f in html_files if f.name == 'index.html' and 'mirror' in str(f)]
if rendered_files:
main_html = rendered_files[0]
elif root_index:
main_html = root_index[0]
elif mirror_index:
main_html = mirror_index[0]
else:
main_html = html_files[0]
try:
with open(main_html, 'r', encoding='utf-8', errors='replace') as f:
html_content = f.read()
html_file_rel = str(main_html.relative_to(ENTRIES_DIR))
claims = extract_all_claims(html_content, html_file_rel, source_url, retrieved_on)
all_claims.extend(claims)
except Exception as e:
errors.append(f"Failed to process {main_html}: {e}")
# Deduplicate claims
all_claims = deduplicate_claims(all_claims)
if not dry_run and all_claims:
# Store claims in entry data
if 'web_claims' not in data:
data['web_claims'] = {}
data['web_claims'] = {
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
'source_archive': str(archive_path.relative_to(ENTRIES_DIR)),
'claims_count': len(all_claims),
'claims': all_claims,
}
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return len(all_claims), errors
def main():
parser = argparse.ArgumentParser(description='Extract structured claims from archived HTML')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
parser.add_argument('--force', action='store_true', help='Re-extract even if web_claims exists')
args = parser.parse_args()
if not HAS_DEPS:
print("Error: Required dependencies not installed.")
print("Run: pip install beautifulsoup4 lxml")
return 1
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
total_claims = 0
total_entries = 0
total_skipped = 0
total_failed = 0
print(f"Processing {len(files)} entries...")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print()
for filepath in files:
if filepath.is_dir():
continue
# Skip if already has web_claims (unless --force)
if not args.force:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('web_claims', {}).get('claims'):
total_skipped += 1
continue
claims_count, errors = process_entry(filepath, dry_run=args.dry_run)
if claims_count > 0:
total_entries += 1
total_claims += claims_count
print(f"{filepath.name}: {claims_count} claims")
elif errors:
total_failed += 1
for e in errors:
print(f"{filepath.name}: {e}")
else:
total_failed += 1
print(f"{filepath.name}: No claims extracted")
print()
print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:")
print(f" Entries with claims: {total_entries}")
print(f" Total claims extracted: {total_claims}")
print(f" Skipped (already have claims): {total_skipped}")
print(f" Failed (no archive/claims): {total_failed}")
return 0
if __name__ == '__main__':
sys.exit(main())