#!/usr/bin/env python3 """ Batch re-crawl failed URLs using Firecrawl and transform to digital_platform_v2. This script: 1. Reads the list of failed crawl URLs 2. Uses Firecrawl batch_scrape or individual scrape to fetch content 3. Transforms results to digital_platform_v2 format 4. Updates the custodian YAML files Usage: python scripts/batch_firecrawl_recrawl.py --batch-size 50 --start 0 Firecrawl API reference: https://docs.firecrawl.dev/api-reference/endpoint/scrape """ import argparse import json import os import re import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import urlparse import httpx import yaml # Configuration CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian") FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt") FIRECRAWL_API_KEY = os.environ.get("FIRECRAWL_API_KEY", "") FIRECRAWL_BASE_URL = "https://api.firecrawl.dev/v1" # Platform type detection patterns PLATFORM_PATTERNS = { 'DISCOVERY_PORTAL': [ r'/collectie', r'/collection', r'/catalogus', r'/catalog', r'/zoeken', r'/search', r'/archief', r'/archive', r'/beeldbank', r'/images', r'/foto', r'/photo', ], 'DIGITAL_ARCHIVE': [ r'archieven\.nl', r'archief', r'archive', r'/inventaris', r'/inventory', r'/toegang', ], 'EDUCATION': [ r'/educatie', r'/education', r'/onderwijs', r'/leren', r'/scholen', r'/schools', r'/lesmateriaal', ], 'INSTITUTIONAL_WEBSITE': [ r'/over-ons', r'/about', r'/contact', r'/bezoek', r'/visit', r'/openingstijden', r'/hours', ], } def detect_platform_type(url: str, links: list[str] | None = None) -> str: """Detect the platform type based on URL patterns and extracted links.""" url_lower = url.lower() all_urls = [url_lower] + [l.lower() for l in (links or [])] for platform_type, patterns in PLATFORM_PATTERNS.items(): for pattern in patterns: for check_url in all_urls: if re.search(pattern, check_url): return platform_type return 'INSTITUTIONAL_WEBSITE' def extract_collection_urls(links: list[str], base_url: str) -> list[str]: """Extract URLs that appear to be collection/catalog pages.""" collection_patterns = [ r'/collectie', r'/collection', r'/catalogus', r'/catalog', r'/zoeken', r'/search', r'/beeldbank', r'/inventaris', r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen', ] collection_urls = [] base_domain = urlparse(base_url).netloc for link in links: try: parsed = urlparse(link) # Only include links from same domain or subdomains if base_domain in parsed.netloc or parsed.netloc in base_domain: for pattern in collection_patterns: if re.search(pattern, link.lower()): if link not in collection_urls: collection_urls.append(link) break except Exception: continue return collection_urls[:10] # Limit to 10 collection URLs def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict]: """Extract external platform links (aggregators, portals, etc.).""" external_patterns = { 'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'}, 'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'}, 'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'}, 'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'}, 'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'}, 'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'}, 'archiefweb.eu': {'name': 'Archiefweb', 'type': 'DIGITAL_ARCHIVE'}, } base_domain = urlparse(base_url).netloc auxiliary = [] seen_domains = set() for link in links: try: parsed = urlparse(link) domain = parsed.netloc.replace('www.', '') # Skip if same domain as base URL if base_domain in domain or domain in base_domain: continue # Check for known external platforms for pattern, info in external_patterns.items(): if pattern in domain and domain not in seen_domains: seen_domains.add(domain) auxiliary.append({ 'platform_name': info['name'], 'platform_url': link, 'platform_type': info['type'], 'integration_type': 'external_aggregator', }) break except Exception: continue return auxiliary[:5] # Limit to 5 auxiliary platforms def is_generic_title(title: str) -> bool: """Check if a title is too generic to use as platform name.""" generic_patterns = [ 'home', 'homepage', 'welkom', 'welcome', 'startpagina', 'index', 'main', 'website', 'webpagina', 'web page', ] if not title: return True title_lower = title.lower().strip() # Check if title is just one of the generic patterns for pattern in generic_patterns: if title_lower == pattern or title_lower == f"{pattern} -" or title_lower.startswith(f"{pattern} |"): return True return len(title) < 3 def transform_to_platform_v2(scrape_result: dict, source_url: str, org_name: str) -> dict[str, Any]: """Transform Firecrawl scrape result to digital_platform_v2 format.""" metadata = scrape_result.get('metadata', {}) links = scrape_result.get('links', []) markdown = scrape_result.get('markdown', '') # Extract title from metadata, checking for generic titles candidate_titles = [ metadata.get('ogTitle'), metadata.get('title', '').split(' - ')[0].strip(), metadata.get('title', '').split(' | ')[0].strip(), metadata.get('og:title'), metadata.get('ogSiteName'), metadata.get('og:site_name'), ] # Find first non-generic title title = org_name # Default fallback for candidate in candidate_titles: if candidate and not is_generic_title(candidate): title = candidate break # Generate platform ID domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_') platform_id = f"primary_website_{domain}" # Detect platform type platform_type = detect_platform_type(source_url, links) # Extract collection URLs collection_urls = extract_collection_urls(links, source_url) # Extract auxiliary platforms auxiliary_platforms = extract_auxiliary_platforms(links, source_url) # Build digital_platform_v2 structure platform_v2 = { 'transformation_metadata': { 'transformed_from': 'firecrawl_scrape', 'transformation_date': datetime.now(timezone.utc).isoformat(), 'transformation_version': '2.0', 'source_status_code': metadata.get('statusCode', 200), }, 'primary_platform': { 'platform_id': platform_id, 'platform_name': f"{title} Website" if 'website' not in title.lower() else title, 'platform_url': source_url, 'platform_type': platform_type, 'description': metadata.get('description') or metadata.get('ogDescription', ''), 'language': metadata.get('language', 'nl'), 'og_image': metadata.get('ogImage') or metadata.get('og:image'), 'favicon': metadata.get('favicon'), }, } # Add collection URLs if found if collection_urls: platform_v2['primary_platform']['collection_urls'] = collection_urls # Add auxiliary platforms if found if auxiliary_platforms: platform_v2['auxiliary_platforms'] = auxiliary_platforms # Add internal navigation links (sample) internal_links = [ l for l in links if urlparse(l).netloc in urlparse(source_url).netloc ][:20] if internal_links: platform_v2['navigation_links'] = internal_links return platform_v2 def scrape_single_url(url: str, client: httpx.Client, max_retries: int = 3) -> dict | None: """Scrape a single URL using Firecrawl API with retry on rate limit.""" for attempt in range(max_retries): try: response = client.post( f"{FIRECRAWL_BASE_URL}/scrape", json={ 'url': url, 'formats': ['markdown', 'links'], 'onlyMainContent': True, }, timeout=60.0, ) if response.status_code == 200: data = response.json() if data.get('success'): return data.get('data', {}) # Handle rate limiting (429) if response.status_code == 429: wait_time = 15 * (attempt + 1) # 15s, 30s, 45s print(f" Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{max_retries})") time.sleep(wait_time) continue print(f" Error {response.status_code}: {response.text[:200]}") return None except Exception as e: print(f" Exception: {e}") if attempt < max_retries - 1: time.sleep(5) continue return None print(f" Max retries exceeded") return None def update_custodian_file(filepath: Path, platform_v2: dict) -> bool: """Update a custodian YAML file with digital_platform_v2 data.""" try: with open(filepath, 'r') as f: data = yaml.safe_load(f) if data is None: data = {} # Add digital_platform_v2 section data['digital_platform_v2'] = platform_v2 # Update crawl4ai_enrichment status if 'crawl4ai_enrichment' in data: data['crawl4ai_enrichment']['recrawled_with'] = 'firecrawl' data['crawl4ai_enrichment']['recrawl_date'] = datetime.now(timezone.utc).isoformat() with open(filepath, 'w') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return True except Exception as e: print(f" Error updating {filepath}: {e}") return False def load_failed_urls() -> list[tuple[str, str]]: """Load the list of failed URLs with their file paths.""" urls = [] with open(FAILED_URLS_FILE, 'r') as f: for line in f: line = line.strip() if '\t' in line: filename, url = line.split('\t', 1) urls.append((filename, url)) return urls def get_org_name(filepath: Path) -> str: """Extract organization name from custodian file.""" try: with open(filepath, 'r') as f: data = yaml.safe_load(f) # Try different name fields if data: if 'original_entry' in data and data['original_entry'].get('organisatie'): return data['original_entry']['organisatie'] if 'custodian_name' in data: return data['custodian_name'].get('emic_name', '') or data['custodian_name'].get('preferred_name', '') if 'name' in data: return data['name'] # Fallback: extract from filename stem = filepath.stem parts = stem.split('-') return parts[-1] if parts else stem except Exception: return filepath.stem def main(): parser = argparse.ArgumentParser(description='Batch re-crawl failed URLs with Firecrawl') parser.add_argument('--batch-size', type=int, default=50, help='Number of URLs per batch') parser.add_argument('--start', type=int, default=0, help='Starting index') parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)') parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes') parser.add_argument('--delay', type=float, default=6.0, help='Delay between requests in seconds (default 6 for rate limits)') args = parser.parse_args() if not FIRECRAWL_API_KEY: print("Error: FIRECRAWL_API_KEY environment variable not set") sys.exit(1) # Load URLs all_urls = load_failed_urls() print(f"Loaded {len(all_urls)} failed URLs") # Slice based on start and limit if args.limit > 0: urls_to_process = all_urls[args.start:args.start + args.limit] else: urls_to_process = all_urls[args.start:] print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})") if args.dry_run: print("\n[DRY RUN MODE - No changes will be made]") for filename, url in urls_to_process[:10]: print(f" Would scrape: {filename} -> {url}") print(f" ... and {len(urls_to_process) - 10} more") return # Create HTTP client client = httpx.Client( headers={ 'Authorization': f'Bearer {FIRECRAWL_API_KEY}', 'Content-Type': 'application/json', } ) success_count = 0 fail_count = 0 try: for i, (filename, url) in enumerate(urls_to_process): filepath = CUSTODIAN_DIR / filename print(f"\n[{i+1}/{len(urls_to_process)}] {filename}") print(f" URL: {url}") if not filepath.exists(): print(f" SKIP: File not found") continue # Check if already has digital_platform_v2 with open(filepath, 'r') as f: content = f.read() if 'digital_platform_v2:' in content: print(f" SKIP: Already has digital_platform_v2") continue # Get org name for platform naming org_name = get_org_name(filepath) # Scrape URL result = scrape_single_url(url, client) if result: # Transform to platform_v2 platform_v2 = transform_to_platform_v2(result, url, org_name) # Update file if update_custodian_file(filepath, platform_v2): success_count += 1 print(f" SUCCESS: {platform_v2['primary_platform']['platform_name']}") else: fail_count += 1 else: fail_count += 1 print(f" FAILED: Could not scrape URL") # Rate limiting time.sleep(args.delay) # Progress update every 50 URLs if (i + 1) % 50 == 0: print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, fail={fail_count}) ===\n") finally: client.close() print(f"\n=== Final Results ===") print(f"Success: {success_count}") print(f"Failed: {fail_count}") print(f"Total: {len(urls_to_process)}") if __name__ == '__main__': main()