glam/scripts/fetch_website_playwright.py

443 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Fetch website content using Playwright and store HTML + markdown.
This script:
1. Uses Playwright to render pages (handles JavaScript)
2. Stores raw HTML, rendered HTML, and markdown
3. Enables XPath-based provenance tracking
4. Stores all content in data/nde/enriched/entries/web/{entry_number}/
Directory structure per entry:
web/{entry_number}/
{domain}/
index.html # Raw HTML as received
rendered.html # HTML after JS execution
content.md # Markdown conversion
metadata.yaml # Fetch metadata and XPath mappings
screenshot.png # Optional page screenshot
Usage:
python scripts/fetch_website_playwright.py [--limit N] [--entry ENTRY_NUM] [--screenshot]
"""
import argparse
import hashlib
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
import yaml
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout
from bs4 import BeautifulSoup
from markdownify import markdownify as md
HAS_DEPS = True
except ImportError as e:
HAS_DEPS = False
print(f"Warning: Missing dependency: {e}")
print("Install with: pip install playwright markdownify")
print("Then run: playwright install chromium")
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
WEB_DIR = ENTRIES_DIR / 'web'
def sanitize_dirname(url: str) -> str:
"""Create a safe directory name from a URL."""
parsed = urlparse(url)
# Use domain as directory name
name = parsed.netloc.replace('www.', '')
# Sanitize
name = re.sub(r'[^\w\-.]', '_', name)
return name
def clean_html_for_markdown(html: str) -> str:
"""Clean HTML before markdown conversion."""
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup.find_all(['script', 'style', 'nav', 'footer', 'header',
'aside', 'form', 'iframe', 'noscript', 'svg',
'button', 'input', 'select', 'textarea', 'meta',
'link']):
element.decompose()
# Remove elements with common ad/tracking/cookie classes
for element in soup.find_all(class_=lambda x: x and any(
term in str(x).lower() for term in
['cookie', 'gdpr', 'consent', 'tracking', 'analytics', 'advertisement',
'popup', 'modal', 'banner', 'newsletter']
)):
element.decompose()
# Remove hidden elements
for element in soup.find_all(style=lambda x: x and 'display:none' in x.replace(' ', '')):
element.decompose()
return str(soup)
def extract_text_with_xpaths(soup: BeautifulSoup) -> list[dict]:
"""
Extract text content with XPath locations for provenance.
Returns list of {text, xpath, tag, classes}
"""
extractions = []
def get_xpath(element) -> str:
"""Generate XPath for an element."""
parts = []
while element and element.name:
siblings = element.find_previous_siblings(element.name)
index = len(siblings) + 1
parts.insert(0, f"{element.name}[{index}]")
element = element.parent
return '/' + '/'.join(parts) if parts else '/'
# Extract headings
for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
for elem in soup.find_all(tag):
text = elem.get_text(strip=True)
if text and len(text) > 2:
extractions.append({
'text': text,
'xpath': get_xpath(elem),
'tag': tag,
'classes': list(elem.get('class', [])) # Convert to plain list
})
# Extract paragraphs with substantial content
for elem in soup.find_all('p'):
text = elem.get_text(strip=True)
if text and len(text) > 20:
extractions.append({
'text': text[:500], # Limit length
'xpath': get_xpath(elem),
'tag': 'p',
'classes': list(elem.get('class', []))
})
# Extract list items
for elem in soup.find_all('li'):
text = elem.get_text(strip=True)
if text and len(text) > 10:
extractions.append({
'text': text[:200],
'xpath': get_xpath(elem),
'tag': 'li',
'classes': list(elem.get('class', []))
})
# Extract address/contact info
for elem in soup.find_all(['address', 'span', 'div'],
class_=lambda x: x and any(
t in str(x).lower() for t in
['address', 'contact', 'phone', 'email', 'location']
)):
text = elem.get_text(strip=True)
if text and len(text) > 5:
extractions.append({
'text': text[:300],
'xpath': get_xpath(elem),
'tag': elem.name,
'classes': list(elem.get('class', [])),
'type': 'contact_info'
})
return extractions
def fetch_with_playwright(url: str, take_screenshot: bool = False, timeout: int = 30000) -> dict:
"""
Fetch URL using Playwright.
Returns dict with:
- raw_html: Original HTML
- rendered_html: HTML after JS execution
- markdown: Cleaned markdown
- extractions: Text with XPaths
- screenshot: PNG bytes (if requested)
- error: Error message if failed
"""
result = {
'url': url,
'fetch_timestamp': datetime.now(timezone.utc).isoformat(),
'raw_html': None,
'rendered_html': None,
'markdown': None,
'extractions': [],
'screenshot': None,
'error': None
}
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
viewport={'width': 1920, 'height': 1080}
)
page = context.new_page()
# Navigate to page
response = page.goto(url, wait_until='networkidle', timeout=timeout)
if not response or response.status >= 400:
result['error'] = f"HTTP {response.status if response else 'No response'}"
browser.close()
return result
# Get raw HTML (before full JS execution)
# Note: This is still after initial load, but before networkidle
raw_html = page.content()
result['raw_html'] = raw_html
# Wait a bit more for dynamic content
page.wait_for_timeout(2000)
# Get rendered HTML (after JS execution)
rendered_html = page.content()
result['rendered_html'] = rendered_html
# Take screenshot if requested
if take_screenshot:
result['screenshot'] = page.screenshot(full_page=True)
# Parse for extractions
soup = BeautifulSoup(rendered_html, 'html.parser')
result['extractions'] = extract_text_with_xpaths(soup)
# Convert to markdown
cleaned = clean_html_for_markdown(rendered_html)
markdown = md(
cleaned,
heading_style='atx',
bullets='-'
)
# Clean up excessive whitespace
markdown = re.sub(r'\n{3,}', '\n\n', markdown)
result['markdown'] = markdown.strip()
browser.close()
except PlaywrightTimeout:
result['error'] = f"Timeout loading {url}"
except Exception as e:
result['error'] = f"Error: {str(e)}"
return result
def get_urls_from_entry(data: dict) -> list[str]:
"""Extract all source URLs from an entry."""
urls = set()
# Check web_enrichment
if 'web_enrichment' in data:
we = data['web_enrichment']
if we.get('source_url'):
urls.add(we['source_url'])
for source in we.get('raw_sources', []):
if source.get('url'):
urls.add(source['url'])
# Check original_entry for website
if 'original_entry' in data:
oe = data['original_entry']
if oe.get('webadres_organisatie'):
urls.add(oe['webadres_organisatie'])
return [u for u in urls if u.startswith('http')]
def extract_entry_number(filename: str) -> str:
"""Extract entry number from filename."""
match = re.match(r'^(\d+)', filename)
return match.group(1) if match else filename.replace('.yaml', '')
def process_entry(filepath: Path, take_screenshot: bool = False) -> tuple[int, int, list[str]]:
"""Process a single entry file."""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return 0, 0, ["Empty file"]
urls = get_urls_from_entry(data)
if not urls:
return 0, 0, []
entry_num = extract_entry_number(filepath.name)
fetched = 0
failed = 0
errors = []
web_files = []
for url in urls:
dirname = sanitize_dirname(url)
url_dir = WEB_DIR / entry_num / dirname
# Check if already fetched
if (url_dir / 'rendered.html').exists():
web_files.append({
'url': url,
'directory': str(url_dir.relative_to(ENTRIES_DIR))
})
continue
print(f" Fetching: {url}")
result = fetch_with_playwright(url, take_screenshot=take_screenshot)
if result['error']:
errors.append(result['error'])
failed += 1
continue
if not result['rendered_html']:
errors.append(f"No content from {url}")
failed += 1
continue
# Create directory and save files
url_dir.mkdir(parents=True, exist_ok=True)
# Save raw HTML
with open(url_dir / 'index.html', 'w', encoding='utf-8') as f:
f.write(result['raw_html'])
# Save rendered HTML
with open(url_dir / 'rendered.html', 'w', encoding='utf-8') as f:
f.write(result['rendered_html'])
# Save markdown with metadata header
md_header = f"""---
source_url: {url}
fetch_timestamp: {result['fetch_timestamp']}
entry_file: {filepath.name}
---
"""
with open(url_dir / 'content.md', 'w', encoding='utf-8') as f:
f.write(md_header + (result['markdown'] or ''))
# Save metadata with XPath extractions
metadata = {
'url': url,
'fetch_timestamp': result['fetch_timestamp'],
'entry_file': filepath.name,
'files': {
'raw_html': 'index.html',
'rendered_html': 'rendered.html',
'markdown': 'content.md'
},
'extractions': result['extractions'][:100] # Limit to first 100
}
# Save screenshot if taken
if result['screenshot']:
with open(url_dir / 'screenshot.png', 'wb') as f:
f.write(result['screenshot'])
metadata['files']['screenshot'] = 'screenshot.png'
with open(url_dir / 'metadata.yaml', 'w', encoding='utf-8') as f:
yaml.dump(metadata, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
web_files.append({
'url': url,
'directory': str(url_dir.relative_to(ENTRIES_DIR))
})
fetched += 1
# Rate limiting
time.sleep(2)
# Update YAML with web file references
if web_files:
if 'web_enrichment' not in data:
data['web_enrichment'] = {}
data['web_enrichment']['web_archives'] = web_files
data['web_enrichment']['web_archive_timestamp'] = datetime.now(timezone.utc).isoformat()
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return fetched, failed, errors
def main():
parser = argparse.ArgumentParser(description='Fetch website content using Playwright')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--screenshot', action='store_true', help='Take screenshots')
parser.add_argument('--skip-existing', action='store_true', default=True,
help='Skip entries that already have web archives')
args = parser.parse_args()
if not HAS_DEPS:
print("Error: Required dependencies not installed.")
print("Run: pip install playwright markdownify && playwright install chromium")
return 1
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.name != 'web'])
if args.limit:
files = files[:args.limit]
total_fetched = 0
total_failed = 0
total_skipped = 0
entries_processed = 0
for filepath in files:
# Skip web directory
if filepath.is_dir():
continue
# Skip if already has web archives
if args.skip_existing:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('web_enrichment', {}).get('web_archives'):
total_skipped += 1
continue
print(f"Processing: {filepath.name}")
fetched, failed, errors = process_entry(filepath, take_screenshot=args.screenshot)
if fetched or failed:
entries_processed += 1
total_fetched += fetched
total_failed += failed
for e in errors:
print(f" Error: {e}")
print(f"\nSummary:")
print(f" Entries processed: {entries_processed}")
print(f" Entries skipped (already archived): {total_skipped}")
print(f" URLs fetched: {total_fetched}")
print(f" URLs failed: {total_failed}")
return 0 if total_failed == 0 else 1
if __name__ == '__main__':
sys.exit(main())