#!/usr/bin/env python3 """ Fetch website content using Playwright and store HTML + markdown. This script: 1. Uses Playwright to render pages (handles JavaScript) 2. Stores raw HTML, rendered HTML, and markdown 3. Enables XPath-based provenance tracking 4. Stores all content in data/nde/enriched/entries/web/{entry_number}/ Directory structure per entry: web/{entry_number}/ {domain}/ index.html # Raw HTML as received rendered.html # HTML after JS execution content.md # Markdown conversion metadata.yaml # Fetch metadata and XPath mappings screenshot.png # Optional page screenshot Usage: python scripts/fetch_website_playwright.py [--limit N] [--entry ENTRY_NUM] [--screenshot] """ import argparse import hashlib import re import sys import time from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse import yaml try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout from bs4 import BeautifulSoup from markdownify import markdownify as md HAS_DEPS = True except ImportError as e: HAS_DEPS = False print(f"Warning: Missing dependency: {e}") print("Install with: pip install playwright markdownify") print("Then run: playwright install chromium") # Directories ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') WEB_DIR = ENTRIES_DIR / 'web' def sanitize_dirname(url: str) -> str: """Create a safe directory name from a URL.""" parsed = urlparse(url) # Use domain as directory name name = parsed.netloc.replace('www.', '') # Sanitize name = re.sub(r'[^\w\-.]', '_', name) return name def clean_html_for_markdown(html: str) -> str: """Clean HTML before markdown conversion.""" soup = BeautifulSoup(html, 'html.parser') # Remove unwanted elements for element in soup.find_all(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'iframe', 'noscript', 'svg', 'button', 'input', 'select', 'textarea', 'meta', 'link']): element.decompose() # Remove elements with common ad/tracking/cookie classes for element in soup.find_all(class_=lambda x: x and any( term in str(x).lower() for term in ['cookie', 'gdpr', 'consent', 'tracking', 'analytics', 'advertisement', 'popup', 'modal', 'banner', 'newsletter'] )): element.decompose() # Remove hidden elements for element in soup.find_all(style=lambda x: x and 'display:none' in x.replace(' ', '')): element.decompose() return str(soup) def extract_text_with_xpaths(soup: BeautifulSoup) -> list[dict]: """ Extract text content with XPath locations for provenance. Returns list of {text, xpath, tag, classes} """ extractions = [] def get_xpath(element) -> str: """Generate XPath for an element.""" parts = [] while element and element.name: siblings = element.find_previous_siblings(element.name) index = len(siblings) + 1 parts.insert(0, f"{element.name}[{index}]") element = element.parent return '/' + '/'.join(parts) if parts else '/' # Extract headings for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: for elem in soup.find_all(tag): text = elem.get_text(strip=True) if text and len(text) > 2: extractions.append({ 'text': text, 'xpath': get_xpath(elem), 'tag': tag, 'classes': list(elem.get('class', [])) # Convert to plain list }) # Extract paragraphs with substantial content for elem in soup.find_all('p'): text = elem.get_text(strip=True) if text and len(text) > 20: extractions.append({ 'text': text[:500], # Limit length 'xpath': get_xpath(elem), 'tag': 'p', 'classes': list(elem.get('class', [])) }) # Extract list items for elem in soup.find_all('li'): text = elem.get_text(strip=True) if text and len(text) > 10: extractions.append({ 'text': text[:200], 'xpath': get_xpath(elem), 'tag': 'li', 'classes': list(elem.get('class', [])) }) # Extract address/contact info for elem in soup.find_all(['address', 'span', 'div'], class_=lambda x: x and any( t in str(x).lower() for t in ['address', 'contact', 'phone', 'email', 'location'] )): text = elem.get_text(strip=True) if text and len(text) > 5: extractions.append({ 'text': text[:300], 'xpath': get_xpath(elem), 'tag': elem.name, 'classes': list(elem.get('class', [])), 'type': 'contact_info' }) return extractions def fetch_with_playwright(url: str, take_screenshot: bool = False, timeout: int = 30000) -> dict: """ Fetch URL using Playwright. Returns dict with: - raw_html: Original HTML - rendered_html: HTML after JS execution - markdown: Cleaned markdown - extractions: Text with XPaths - screenshot: PNG bytes (if requested) - error: Error message if failed """ result = { 'url': url, 'fetch_timestamp': datetime.now(timezone.utc).isoformat(), 'raw_html': None, 'rendered_html': None, 'markdown': None, 'extractions': [], 'screenshot': None, 'error': None } try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', viewport={'width': 1920, 'height': 1080} ) page = context.new_page() # Navigate to page response = page.goto(url, wait_until='networkidle', timeout=timeout) if not response or response.status >= 400: result['error'] = f"HTTP {response.status if response else 'No response'}" browser.close() return result # Get raw HTML (before full JS execution) # Note: This is still after initial load, but before networkidle raw_html = page.content() result['raw_html'] = raw_html # Wait a bit more for dynamic content page.wait_for_timeout(2000) # Get rendered HTML (after JS execution) rendered_html = page.content() result['rendered_html'] = rendered_html # Take screenshot if requested if take_screenshot: result['screenshot'] = page.screenshot(full_page=True) # Parse for extractions soup = BeautifulSoup(rendered_html, 'html.parser') result['extractions'] = extract_text_with_xpaths(soup) # Convert to markdown cleaned = clean_html_for_markdown(rendered_html) markdown = md( cleaned, heading_style='atx', bullets='-' ) # Clean up excessive whitespace markdown = re.sub(r'\n{3,}', '\n\n', markdown) result['markdown'] = markdown.strip() browser.close() except PlaywrightTimeout: result['error'] = f"Timeout loading {url}" except Exception as e: result['error'] = f"Error: {str(e)}" return result def get_urls_from_entry(data: dict) -> list[str]: """Extract all source URLs from an entry.""" urls = set() # Check web_enrichment if 'web_enrichment' in data: we = data['web_enrichment'] if we.get('source_url'): urls.add(we['source_url']) for source in we.get('raw_sources', []): if source.get('url'): urls.add(source['url']) # Check original_entry for website if 'original_entry' in data: oe = data['original_entry'] if oe.get('webadres_organisatie'): urls.add(oe['webadres_organisatie']) return [u for u in urls if u.startswith('http')] def extract_entry_number(filename: str) -> str: """Extract entry number from filename.""" match = re.match(r'^(\d+)', filename) return match.group(1) if match else filename.replace('.yaml', '') def process_entry(filepath: Path, take_screenshot: bool = False) -> tuple[int, int, list[str]]: """Process a single entry file.""" with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return 0, 0, ["Empty file"] urls = get_urls_from_entry(data) if not urls: return 0, 0, [] entry_num = extract_entry_number(filepath.name) fetched = 0 failed = 0 errors = [] web_files = [] for url in urls: dirname = sanitize_dirname(url) url_dir = WEB_DIR / entry_num / dirname # Check if already fetched if (url_dir / 'rendered.html').exists(): web_files.append({ 'url': url, 'directory': str(url_dir.relative_to(ENTRIES_DIR)) }) continue print(f" Fetching: {url}") result = fetch_with_playwright(url, take_screenshot=take_screenshot) if result['error']: errors.append(result['error']) failed += 1 continue if not result['rendered_html']: errors.append(f"No content from {url}") failed += 1 continue # Create directory and save files url_dir.mkdir(parents=True, exist_ok=True) # Save raw HTML with open(url_dir / 'index.html', 'w', encoding='utf-8') as f: f.write(result['raw_html']) # Save rendered HTML with open(url_dir / 'rendered.html', 'w', encoding='utf-8') as f: f.write(result['rendered_html']) # Save markdown with metadata header md_header = f"""--- source_url: {url} fetch_timestamp: {result['fetch_timestamp']} entry_file: {filepath.name} --- """ with open(url_dir / 'content.md', 'w', encoding='utf-8') as f: f.write(md_header + (result['markdown'] or '')) # Save metadata with XPath extractions metadata = { 'url': url, 'fetch_timestamp': result['fetch_timestamp'], 'entry_file': filepath.name, 'files': { 'raw_html': 'index.html', 'rendered_html': 'rendered.html', 'markdown': 'content.md' }, 'extractions': result['extractions'][:100] # Limit to first 100 } # Save screenshot if taken if result['screenshot']: with open(url_dir / 'screenshot.png', 'wb') as f: f.write(result['screenshot']) metadata['files']['screenshot'] = 'screenshot.png' with open(url_dir / 'metadata.yaml', 'w', encoding='utf-8') as f: yaml.dump(metadata, f, default_flow_style=False, allow_unicode=True, sort_keys=False) web_files.append({ 'url': url, 'directory': str(url_dir.relative_to(ENTRIES_DIR)) }) fetched += 1 # Rate limiting time.sleep(2) # Update YAML with web file references if web_files: if 'web_enrichment' not in data: data['web_enrichment'] = {} data['web_enrichment']['web_archives'] = web_files data['web_enrichment']['web_archive_timestamp'] = datetime.now(timezone.utc).isoformat() with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return fetched, failed, errors def main(): parser = argparse.ArgumentParser(description='Fetch website content using Playwright') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries') parser.add_argument('--entry', type=str, default=None, help='Process specific entry number') parser.add_argument('--screenshot', action='store_true', help='Take screenshots') parser.add_argument('--skip-existing', action='store_true', default=True, help='Skip entries that already have web archives') args = parser.parse_args() if not HAS_DEPS: print("Error: Required dependencies not installed.") print("Run: pip install playwright markdownify && playwright install chromium") return 1 # Find entry files if args.entry: files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml')) else: files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.name != 'web']) if args.limit: files = files[:args.limit] total_fetched = 0 total_failed = 0 total_skipped = 0 entries_processed = 0 for filepath in files: # Skip web directory if filepath.is_dir(): continue # Skip if already has web archives if args.skip_existing: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if data and data.get('web_enrichment', {}).get('web_archives'): total_skipped += 1 continue print(f"Processing: {filepath.name}") fetched, failed, errors = process_entry(filepath, take_screenshot=args.screenshot) if fetched or failed: entries_processed += 1 total_fetched += fetched total_failed += failed for e in errors: print(f" Error: {e}") print(f"\nSummary:") print(f" Entries processed: {entries_processed}") print(f" Entries skipped (already archived): {total_skipped}") print(f" URLs fetched: {total_fetched}") print(f" URLs failed: {total_failed}") return 0 if total_failed == 0 else 1 if __name__ == '__main__': sys.exit(main())