glam/scripts/fetch_website_markdown.py

295 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Fetch website content and store as markdown files.
This script:
1. Reads YAML entry files to find source URLs
2. Fetches each URL and converts to markdown
3. Stores markdown in data/nde/enriched/entries/web/{entry_number}/
4. Updates the YAML file with reference to stored markdown
Usage:
python scripts/fetch_website_markdown.py [--dry-run] [--limit N] [--entry ENTRY_NUM]
"""
import argparse
import hashlib
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse
import yaml
try:
import httpx
from markdownify import markdownify as md
HAS_DEPS = True
except ImportError:
HAS_DEPS = False
print("Warning: httpx and/or markdownify not installed. Install with:")
print(" pip install httpx markdownify")
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
WEB_DIR = ENTRIES_DIR / 'web'
def sanitize_filename(url: str) -> str:
"""Create a safe filename from a URL."""
parsed = urlparse(url)
# Use domain + path, sanitized
name = f"{parsed.netloc}{parsed.path}"
# Replace unsafe characters
name = re.sub(r'[^\w\-.]', '_', name)
# Limit length
if len(name) > 100:
# Use hash for long names
name = name[:50] + '_' + hashlib.md5(name.encode()).hexdigest()[:16]
return name
def fetch_and_convert(url: str, timeout: int = 30) -> tuple[str | None, str | None]:
"""
Fetch URL and convert to markdown.
Returns:
Tuple of (markdown_content, error_message)
"""
if not HAS_DEPS:
return None, "Dependencies not installed (httpx, markdownify)"
try:
headers = {
'User-Agent': 'Mozilla/5.0 (compatible; GLAMBot/1.0; heritage-data-collection)'
}
with httpx.Client(follow_redirects=True, timeout=timeout) as client:
response = client.get(url, headers=headers)
response.raise_for_status()
content_type = response.headers.get('content-type', '')
if 'text/html' not in content_type.lower():
return None, f"Not HTML content: {content_type}"
html = response.text
# Pre-process HTML to remove script content that might leak through
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup.find_all(['script', 'style', 'nav', 'footer', 'header',
'aside', 'form', 'iframe', 'noscript', 'svg',
'button', 'input', 'select', 'textarea']):
element.decompose()
# Remove elements with common ad/tracking classes
for element in soup.find_all(class_=lambda x: x and any(
term in str(x).lower() for term in ['cookie', 'gdpr', 'consent', 'tracking', 'analytics', 'advertisement']
)):
element.decompose()
# Convert cleaned HTML to markdown
markdown = md(
str(soup),
heading_style='atx',
bullets='-',
strip=[] # Already stripped above
)
# Clean up excessive whitespace
markdown = re.sub(r'\n{3,}', '\n\n', markdown)
markdown = markdown.strip()
return markdown, None
except httpx.TimeoutException:
return None, f"Timeout fetching {url}"
except httpx.HTTPStatusError as e:
return None, f"HTTP {e.response.status_code}: {url}"
except Exception as e:
return None, f"Error fetching {url}: {str(e)}"
def get_urls_from_entry(data: dict) -> list[str]:
"""Extract all source URLs from an entry."""
urls = set()
# Check web_enrichment
if 'web_enrichment' in data:
we = data['web_enrichment']
if we.get('source_url'):
urls.add(we['source_url'])
# Check raw_sources
for source in we.get('raw_sources', []):
if source.get('url'):
urls.add(source['url'])
# Check original_entry for website
if 'original_entry' in data:
oe = data['original_entry']
if oe.get('webadres_organisatie'):
urls.add(oe['webadres_organisatie'])
# Filter out non-http URLs
return [u for u in urls if u.startswith('http')]
def extract_entry_number(filename: str) -> str:
"""Extract entry number from filename like '0034_rolder_historisch_gezelschap.yaml'."""
match = re.match(r'^(\d+)', filename)
return match.group(1) if match else filename.replace('.yaml', '')
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[int, int, list[str]]:
"""
Process a single entry file.
Returns:
Tuple of (urls_fetched, urls_failed, error_messages)
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return 0, 0, ["Empty file"]
urls = get_urls_from_entry(data)
if not urls:
return 0, 0, []
entry_num = extract_entry_number(filepath.name)
entry_web_dir = WEB_DIR / entry_num
fetched = 0
failed = 0
errors = []
markdown_files = []
for url in urls:
filename = sanitize_filename(url) + '.md'
md_path = entry_web_dir / filename
# Check if already fetched
if md_path.exists():
markdown_files.append(str(md_path.relative_to(ENTRIES_DIR)))
continue
if dry_run:
print(f" Would fetch: {url}")
fetched += 1
continue
# Fetch and save
markdown, error = fetch_and_convert(url)
if error:
errors.append(error)
failed += 1
continue
if not markdown or len(markdown) < 100:
errors.append(f"Empty or too short content from {url}")
failed += 1
continue
# Create directory and save
entry_web_dir.mkdir(parents=True, exist_ok=True)
# Add metadata header
header = f"""---
source_url: {url}
fetch_timestamp: {datetime.now(timezone.utc).isoformat()}
entry_file: {filepath.name}
---
"""
with open(md_path, 'w', encoding='utf-8') as f:
f.write(header + markdown)
markdown_files.append(str(md_path.relative_to(ENTRIES_DIR)))
fetched += 1
# Rate limiting
time.sleep(1)
# Update YAML with markdown file references
if markdown_files and not dry_run:
if 'web_enrichment' not in data:
data['web_enrichment'] = {}
data['web_enrichment']['markdown_files'] = markdown_files
data['web_enrichment']['markdown_fetch_timestamp'] = datetime.now(timezone.utc).isoformat()
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return fetched, failed, errors
def main():
parser = argparse.ArgumentParser(description='Fetch website content and store as markdown')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries to process')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--skip-existing', action='store_true', default=True,
help='Skip entries that already have markdown files')
args = parser.parse_args()
if not HAS_DEPS and not args.dry_run:
print("Error: Required dependencies not installed. Use --dry-run or install deps.")
return 1
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted(ENTRIES_DIR.glob('*.yaml'))
if args.limit:
files = files[:args.limit]
total_fetched = 0
total_failed = 0
total_skipped = 0
entries_processed = 0
for filepath in files:
# Skip if already has markdown files
if args.skip_existing:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('web_enrichment', {}).get('markdown_files'):
total_skipped += 1
continue
print(f"Processing: {filepath.name}")
fetched, failed, errors = process_entry(filepath, dry_run=args.dry_run)
if fetched or failed:
entries_processed += 1
total_fetched += fetched
total_failed += failed
if errors:
for e in errors:
print(f" Error: {e}")
print(f"\n{'DRY RUN ' if args.dry_run else ''}Summary:")
print(f" Entries processed: {entries_processed}")
print(f" Entries skipped (already have markdown): {total_skipped}")
print(f" URLs fetched: {total_fetched}")
print(f" URLs failed: {total_failed}")
return 0 if total_failed == 0 else 1
if __name__ == '__main__':
sys.exit(main())