#!/usr/bin/env python3 """ Extract structured claims from archived website HTML with XPath provenance. This script extracts verifiable data from archived HTML files following the WebObservation provenance rules defined in AGENTS.md Rule 6. EVERY claim MUST have: - claim_type: Type of claim (org_name, description, email, phone, address, etc.) - claim_value: The extracted value - source_url: URL the claim was extracted from - retrieved_on: ISO 8601 timestamp when page was archived - xpath: XPath to the element containing this value - html_file: Relative path to archived HTML file - xpath_match_score: 1.0 for exact match, <1.0 for fuzzy match Claims WITHOUT XPath provenance are FABRICATED and must NOT be stored. Usage: python scripts/extract_html_claims.py [--limit N] [--entry ENTRY_NUM] [--dry-run] """ import argparse import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional, List, Dict, Any from urllib.parse import urlparse import yaml # Type hints for optional dependencies etree: Any = None BeautifulSoup: Any = None try: from lxml import etree as _etree etree = _etree HAS_LXML = True except ImportError: HAS_LXML = False print("Warning: Missing dependency: lxml") print("Install with: pip install lxml") try: from bs4 import BeautifulSoup as _BeautifulSoup BeautifulSoup = _BeautifulSoup HAS_BS4 = True except ImportError: HAS_BS4 = False print("Warning: Missing dependency: beautifulsoup4") print("Install with: pip install beautifulsoup4") HAS_DEPS = HAS_LXML # Only lxml is required for this script # Directories ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') WEB_DIR = ENTRIES_DIR / 'web' # Claim types to extract CLAIM_TYPES = { 'org_name': 'Organization/institution official name', 'org_name_alt': 'Alternative organization name', 'tagline': 'Organization tagline or slogan', 'description': 'Organization description', 'description_short': 'Short description (meta description)', 'email': 'Email address', 'phone': 'Phone number', 'address': 'Physical address', 'postal_code': 'Postal code', 'city': 'City name', 'opening_hours_text': 'Opening hours as text', 'social_twitter': 'Twitter/X URL', 'social_facebook': 'Facebook URL', 'social_instagram': 'Instagram URL', 'social_linkedin': 'LinkedIn URL', 'social_youtube': 'YouTube URL', } def get_xpath_lxml(element) -> str: """Generate absolute XPath for an lxml element.""" tree = element.getroottree() return tree.getpath(element) def get_xpath_bs4(element) -> str: """Generate XPath for a BeautifulSoup element.""" parts = [] current = element while current and current.name: siblings = [s for s in current.find_previous_siblings(current.name)] index = len(siblings) + 1 parts.insert(0, f"{current.name}[{index}]") current = current.parent return '/' + '/'.join(parts) if parts else '/' def create_claim( claim_type: str, claim_value: str, xpath: str, html_file: str, source_url: str, retrieved_on: str, raw_value: Optional[str] = None, extraction_method: str = 'html_parser', xpath_match_score: float = 1.0, ) -> Dict[str, Any]: """Create a properly structured claim with full provenance.""" return { 'claim_type': claim_type, 'claim_value': claim_value.strip() if claim_value else '', 'raw_value': raw_value or claim_value, 'source_url': source_url, 'retrieved_on': retrieved_on, 'xpath': xpath, 'html_file': html_file, 'xpath_match_score': xpath_match_score, 'extraction_method': extraction_method, 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), } # === Extractors for specific claim types === def extract_title_claims(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract organization name from tag.""" claims = [] titles = tree.xpath('//title') for title in titles: if title.text: raw_text = title.text.strip() # Try to extract clean org name (before separator) separators = [' - ', ' | ', ' – ', ' — ', ': '] clean_name = raw_text for sep in separators: if sep in raw_text: parts = raw_text.split(sep) # Usually the org name is first or last clean_name = parts[0].strip() break claims.append(create_claim( claim_type='org_name', claim_value=clean_name, xpath=get_xpath_lxml(title), html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, raw_value=raw_text, extraction_method='title_tag', )) return claims def extract_meta_description(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract description from meta tags.""" claims = [] # Standard meta description metas = tree.xpath('//meta[@name="description"]/@content') meta_elements = tree.xpath('//meta[@name="description"]') for i, content in enumerate(metas): if content and content.strip(): claims.append(create_claim( claim_type='description_short', claim_value=content.strip(), xpath=get_xpath_lxml(meta_elements[i]) if i < len(meta_elements) else '//meta[@name="description"]', html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='meta_description', )) # OpenGraph description og_desc = tree.xpath('//meta[@property="og:description"]/@content') og_elements = tree.xpath('//meta[@property="og:description"]') for i, content in enumerate(og_desc): if content and content.strip(): claims.append(create_claim( claim_type='description_short', claim_value=content.strip(), xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:description"]', html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='og_description', )) return claims def extract_og_site_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract organization name from og:site_name.""" claims = [] og_names = tree.xpath('//meta[@property="og:site_name"]/@content') og_elements = tree.xpath('//meta[@property="og:site_name"]') for i, content in enumerate(og_names): if content and content.strip(): claims.append(create_claim( claim_type='org_name', claim_value=content.strip(), xpath=get_xpath_lxml(og_elements[i]) if i < len(og_elements) else '//meta[@property="og:site_name"]', html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='og_site_name', )) return claims def extract_schema_org(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract data from schema.org JSON-LD.""" claims = [] import json scripts = tree.xpath('//script[@type="application/ld+json"]') for script in scripts: if script.text: try: data = json.loads(script.text) if isinstance(data, list): for item in data: claims.extend(_extract_schema_item(item, get_xpath_lxml(script), html_file, source_url, retrieved_on)) else: claims.extend(_extract_schema_item(data, get_xpath_lxml(script), html_file, source_url, retrieved_on)) except json.JSONDecodeError: pass return claims def _extract_schema_item(item: dict, xpath: str, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract claims from a schema.org item.""" claims = [] # Get the @type to distinguish organizations from events item_type = item.get('@type', '') if isinstance(item_type, list): item_type = item_type[0] if item_type else '' # Organization types that should have org_name extracted org_types = { 'Organization', 'LocalBusiness', 'Museum', 'Library', 'Archive', 'EducationalOrganization', 'GovernmentOrganization', 'NGO', 'Corporation', 'Place', 'CivicStructure', 'LandmarksOrHistoricalBuildings', 'PerformingArtsTheater', 'MovieTheater', 'Zoo', 'Aquarium', } # Event types - extract as event_name, not org_name event_types = {'Event', 'BusinessEvent', 'ChildrensEvent', 'ComedyEvent', 'CourseInstance', 'DanceEvent', 'DeliveryEvent', 'EducationEvent', 'EventSeries', 'ExhibitionEvent', 'Festival', 'FoodEvent', 'Hackathon', 'LiteraryEvent', 'MusicEvent', 'PublicationEvent', 'SaleEvent', 'ScreeningEvent', 'SocialEvent', 'SportsEvent', 'TheaterEvent', 'VisualArtsEvent'} is_org = any(t in item_type for t in org_types) or not item_type is_event = any(t in item_type for t in event_types) # Organization name - only for org types or if @type is missing if 'name' in item and is_org and not is_event: name_value = item['name'] # Skip if it looks like HTML/code if name_value and '<' not in name_value and len(name_value) < 200: claims.append(create_claim( claim_type='org_name', claim_value=name_value, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_name', )) # Description - only for organizations, skip HTML/code if 'description' in item and is_org and not is_event: desc_value = item['description'] # Skip if it looks like HTML/code if desc_value and '<' not in desc_value and 'vc_row' not in desc_value: claims.append(create_claim( claim_type='description', claim_value=desc_value, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_description', )) # Address if 'address' in item: addr = item['address'] if isinstance(addr, str): claims.append(create_claim( claim_type='address', claim_value=addr, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_address', )) elif isinstance(addr, dict): if 'streetAddress' in addr: claims.append(create_claim( claim_type='address', claim_value=addr['streetAddress'], xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_streetAddress', )) if 'postalCode' in addr: claims.append(create_claim( claim_type='postal_code', claim_value=addr['postalCode'], xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_postalCode', )) if 'addressLocality' in addr: claims.append(create_claim( claim_type='city', claim_value=addr['addressLocality'], xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_addressLocality', )) # Phone if 'telephone' in item: claims.append(create_claim( claim_type='phone', claim_value=item['telephone'], xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_telephone', )) # Email if 'email' in item: claims.append(create_claim( claim_type='email', claim_value=item['email'], xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_email', )) # Social media if 'sameAs' in item: same_as = item['sameAs'] if isinstance(item['sameAs'], list) else [item['sameAs']] for url in same_as: if 'twitter.com' in url or 'x.com' in url: claims.append(create_claim(claim_type='social_twitter', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs')) elif 'facebook.com' in url: claims.append(create_claim(claim_type='social_facebook', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs')) elif 'instagram.com' in url: claims.append(create_claim(claim_type='social_instagram', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs')) elif 'linkedin.com' in url: claims.append(create_claim(claim_type='social_linkedin', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs')) elif 'youtube.com' in url: claims.append(create_claim(claim_type='social_youtube', claim_value=url, xpath=xpath, html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='schema_org_sameAs')) return claims def extract_email_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract email addresses from mailto: links.""" claims = [] mailto_links = tree.xpath('//a[starts-with(@href, "mailto:")]') for link in mailto_links: href = link.get('href', '') if href.startswith('mailto:'): email = href[7:].split('?')[0] # Remove query params if email and '@' in email: claims.append(create_claim( claim_type='email', claim_value=email, xpath=get_xpath_lxml(link), html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='mailto_link', )) return claims def extract_phone_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract phone numbers from tel: links.""" claims = [] tel_links = tree.xpath('//a[starts-with(@href, "tel:")]') for link in tel_links: href = link.get('href', '') if href.startswith('tel:'): phone = href[4:] if phone: claims.append(create_claim( claim_type='phone', claim_value=phone, xpath=get_xpath_lxml(link), html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='tel_link', )) return claims def extract_social_links(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract social media links.""" claims = [] social_patterns = { 'social_twitter': ['twitter.com', 'x.com'], 'social_facebook': ['facebook.com'], 'social_instagram': ['instagram.com'], 'social_linkedin': ['linkedin.com'], 'social_youtube': ['youtube.com'], } for link in tree.xpath('//a[@href]'): href = link.get('href', '') for claim_type, domains in social_patterns.items(): for domain in domains: if domain in href: claims.append(create_claim( claim_type=claim_type, claim_value=href, xpath=get_xpath_lxml(link), html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='social_link', )) break return claims def extract_h1_org_name(tree, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract organization name from first h1.""" claims = [] h1s = tree.xpath('//h1') if h1s: h1 = h1s[0] text = ''.join(h1.itertext()).strip() if text and len(text) > 2 and len(text) < 150: claims.append(create_claim( claim_type='org_name', claim_value=text, xpath=get_xpath_lxml(h1), html_file=html_file, source_url=source_url, retrieved_on=retrieved_on, extraction_method='h1_tag', xpath_match_score=0.9, # Slightly lower confidence )) return claims def extract_all_claims(html_content: str, html_file: str, source_url: str, retrieved_on: str) -> List[Dict]: """Extract all claims from HTML content.""" claims = [] try: # Parse with lxml for proper XPath support tree = etree.HTML(html_content) # Run all extractors extractors = [ extract_title_claims, extract_meta_description, extract_og_site_name, extract_schema_org, extract_email_links, extract_phone_links, extract_social_links, extract_h1_org_name, ] for extractor in extractors: try: claims.extend(extractor(tree, html_file, source_url, retrieved_on)) except Exception as e: print(f" Warning: Extractor {extractor.__name__} failed: {e}") except Exception as e: print(f" Error parsing HTML: {e}") return claims def deduplicate_claims(claims: List[Dict]) -> List[Dict]: """Remove duplicate claims, keeping highest confidence.""" seen = {} for claim in claims: key = (claim['claim_type'], claim['claim_value']) if key not in seen or claim['xpath_match_score'] > seen[key]['xpath_match_score']: seen[key] = claim return list(seen.values()) def get_web_archive_path(entry_data: dict, entry_num: str) -> Optional[Path]: """Get the web archive directory path for an entry.""" web_enrichment = entry_data.get('web_enrichment', {}) web_archives = web_enrichment.get('web_archives', []) if web_archives: archive = web_archives[0] directory = archive.get('directory') if directory: return ENTRIES_DIR / directory # Fallback: look for directory in web/{entry_num}/ entry_web_dir = WEB_DIR / entry_num if entry_web_dir.exists(): subdirs = [d for d in entry_web_dir.iterdir() if d.is_dir()] if subdirs: return subdirs[0] return None def load_metadata(archive_path: Path) -> Optional[dict]: """Load metadata.yaml from archive directory.""" metadata_file = archive_path / 'metadata.yaml' if metadata_file.exists(): try: with open(metadata_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: print(f" Warning: Failed to load {metadata_file}: {e}") return None def find_html_files(archive_path: Path) -> List[Path]: """Find all HTML files in archive directory.""" html_files = [] # Check pages/ directory first pages_dir = archive_path / 'pages' if pages_dir.exists(): html_files.extend(pages_dir.glob('*.html')) # Check mirror/ directory mirror_dir = archive_path / 'mirror' if mirror_dir.exists(): html_files.extend(mirror_dir.rglob('*.html')) # Check root for rendered.html rendered = archive_path / 'rendered.html' if rendered.exists(): html_files.append(rendered) return html_files def extract_entry_number(filename: str) -> str: """Extract entry number from filename.""" match = re.match(r'^(\d+)', filename) return match.group(1) if match else filename.replace('.yaml', '') def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, List[str]]: """ Process a single entry file to extract HTML claims. Returns: (claims_count, errors) """ with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return 0, ["Empty file"] entry_num = extract_entry_number(filepath.name) errors = [] all_claims = [] # Get web archive path archive_path = get_web_archive_path(data, entry_num) if not archive_path or not archive_path.exists(): return 0, [f"No web archive found for entry {entry_num}"] # Load metadata for timestamps metadata = load_metadata(archive_path) source_url = metadata.get('url', '') if metadata else '' retrieved_on = metadata.get('archive_timestamp', '') if metadata else '' if not source_url: # Try to get URL from entry data source_url = data.get('web_enrichment', {}).get('web_archives', [{}])[0].get('url', '') if not source_url: source_url = data.get('original_entry', {}).get('webadres_organisatie', '') # Find and process HTML files html_files = find_html_files(archive_path) if not html_files: return 0, [f"No HTML files found in {archive_path}"] # Process HTML files in priority order: # 1. rendered.html (Playwright-rendered, most complete) # 2. index.html from root (fallback) # 3. index.html from mirror/ (warc mirror) # 4. First available HTML file rendered_files = [f for f in html_files if f.name == 'rendered.html' and f.parent == archive_path] root_index = [f for f in html_files if f.name == 'index.html' and f.parent == archive_path] mirror_index = [f for f in html_files if f.name == 'index.html' and 'mirror' in str(f)] if rendered_files: main_html = rendered_files[0] elif root_index: main_html = root_index[0] elif mirror_index: main_html = mirror_index[0] else: main_html = html_files[0] try: with open(main_html, 'r', encoding='utf-8', errors='replace') as f: html_content = f.read() html_file_rel = str(main_html.relative_to(ENTRIES_DIR)) claims = extract_all_claims(html_content, html_file_rel, source_url, retrieved_on) all_claims.extend(claims) except Exception as e: errors.append(f"Failed to process {main_html}: {e}") # Deduplicate claims all_claims = deduplicate_claims(all_claims) if not dry_run and all_claims: # Store claims in entry data if 'web_claims' not in data: data['web_claims'] = {} data['web_claims'] = { 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), 'source_archive': str(archive_path.relative_to(ENTRIES_DIR)), 'claims_count': len(all_claims), 'claims': all_claims, } # Write back with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return len(all_claims), errors def main(): parser = argparse.ArgumentParser(description='Extract structured claims from archived HTML') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries') parser.add_argument('--entry', type=str, default=None, help='Process specific entry number') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing') parser.add_argument('--force', action='store_true', help='Re-extract even if web_claims exists') args = parser.parse_args() if not HAS_DEPS: print("Error: Required dependencies not installed.") print("Run: pip install beautifulsoup4 lxml") return 1 # Find entry files if args.entry: files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml')) else: files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')]) if args.limit: files = files[:args.limit] total_claims = 0 total_entries = 0 total_skipped = 0 total_failed = 0 print(f"Processing {len(files)} entries...") print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}") print() for filepath in files: if filepath.is_dir(): continue # Skip if already has web_claims (unless --force) if not args.force: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if data and data.get('web_claims', {}).get('claims'): total_skipped += 1 continue claims_count, errors = process_entry(filepath, dry_run=args.dry_run) if claims_count > 0: total_entries += 1 total_claims += claims_count print(f" ✓ {filepath.name}: {claims_count} claims") elif errors: total_failed += 1 for e in errors: print(f" ✗ {filepath.name}: {e}") else: total_failed += 1 print(f" ✗ {filepath.name}: No claims extracted") print() print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:") print(f" Entries with claims: {total_entries}") print(f" Total claims extracted: {total_claims}") print(f" Skipped (already have claims): {total_skipped}") print(f" Failed (no archive/claims): {total_failed}") return 0 if __name__ == '__main__': sys.exit(main())