glam/scripts/batch_firecrawl_recrawl.py
kempersc 0c36429257 feat(scripts): Add batch crawling and data quality scripts
- batch_crawl4ai_recrawl.py: Retry failed URL crawls
- batch_firecrawl_recrawl.py: FireCrawl batch processing
- batch_httpx_scrape.py: HTTPX-based scraping
- detect_name_mismatch.py: Find name mismatches in data
- enrich_dutch_custodians_crawl4ai.py: Dutch custodian enrichment
- fix_collision_victims.py: GHCID collision resolution
- fix_generic_platform_names*.py: Platform name cleanup
- fix_ghcid_type.py: GHCID type corrections
- fix_simon_kemper_contamination.py: Data cleanup
- scan_dutch_data_quality.py: Data quality scanning
- transform_crawl4ai_to_digital_platform.py: Data transformation
2025-12-15 01:47:46 +01:00

434 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Batch re-crawl failed URLs using Firecrawl and transform to digital_platform_v2.
This script:
1. Reads the list of failed crawl URLs
2. Uses Firecrawl batch_scrape or individual scrape to fetch content
3. Transforms results to digital_platform_v2 format
4. Updates the custodian YAML files
Usage:
python scripts/batch_firecrawl_recrawl.py --batch-size 50 --start 0
Firecrawl API reference: https://docs.firecrawl.dev/api-reference/endpoint/scrape
"""
import argparse
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import httpx
import yaml
# Configuration
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt")
FIRECRAWL_API_KEY = os.environ.get("FIRECRAWL_API_KEY", "")
FIRECRAWL_BASE_URL = "https://api.firecrawl.dev/v1"
# Platform type detection patterns
PLATFORM_PATTERNS = {
'DISCOVERY_PORTAL': [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/archief', r'/archive',
r'/beeldbank', r'/images', r'/foto', r'/photo',
],
'DIGITAL_ARCHIVE': [
r'archieven\.nl', r'archief', r'archive',
r'/inventaris', r'/inventory', r'/toegang',
],
'EDUCATION': [
r'/educatie', r'/education', r'/onderwijs', r'/leren',
r'/scholen', r'/schools', r'/lesmateriaal',
],
'INSTITUTIONAL_WEBSITE': [
r'/over-ons', r'/about', r'/contact', r'/bezoek',
r'/visit', r'/openingstijden', r'/hours',
],
}
def detect_platform_type(url: str, links: list[str] | None = None) -> str:
"""Detect the platform type based on URL patterns and extracted links."""
url_lower = url.lower()
all_urls = [url_lower] + [l.lower() for l in (links or [])]
for platform_type, patterns in PLATFORM_PATTERNS.items():
for pattern in patterns:
for check_url in all_urls:
if re.search(pattern, check_url):
return platform_type
return 'INSTITUTIONAL_WEBSITE'
def extract_collection_urls(links: list[str], base_url: str) -> list[str]:
"""Extract URLs that appear to be collection/catalog pages."""
collection_patterns = [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/beeldbank', r'/inventaris',
r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen',
]
collection_urls = []
base_domain = urlparse(base_url).netloc
for link in links:
try:
parsed = urlparse(link)
# Only include links from same domain or subdomains
if base_domain in parsed.netloc or parsed.netloc in base_domain:
for pattern in collection_patterns:
if re.search(pattern, link.lower()):
if link not in collection_urls:
collection_urls.append(link)
break
except Exception:
continue
return collection_urls[:10] # Limit to 10 collection URLs
def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict]:
"""Extract external platform links (aggregators, portals, etc.)."""
external_patterns = {
'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'},
'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'},
'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'},
'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'},
'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'},
'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'},
'archiefweb.eu': {'name': 'Archiefweb', 'type': 'DIGITAL_ARCHIVE'},
}
base_domain = urlparse(base_url).netloc
auxiliary = []
seen_domains = set()
for link in links:
try:
parsed = urlparse(link)
domain = parsed.netloc.replace('www.', '')
# Skip if same domain as base URL
if base_domain in domain or domain in base_domain:
continue
# Check for known external platforms
for pattern, info in external_patterns.items():
if pattern in domain and domain not in seen_domains:
seen_domains.add(domain)
auxiliary.append({
'platform_name': info['name'],
'platform_url': link,
'platform_type': info['type'],
'integration_type': 'external_aggregator',
})
break
except Exception:
continue
return auxiliary[:5] # Limit to 5 auxiliary platforms
def is_generic_title(title: str) -> bool:
"""Check if a title is too generic to use as platform name."""
generic_patterns = [
'home', 'homepage', 'welkom', 'welcome', 'startpagina',
'index', 'main', 'website', 'webpagina', 'web page',
]
if not title:
return True
title_lower = title.lower().strip()
# Check if title is just one of the generic patterns
for pattern in generic_patterns:
if title_lower == pattern or title_lower == f"{pattern} -" or title_lower.startswith(f"{pattern} |"):
return True
return len(title) < 3
def transform_to_platform_v2(scrape_result: dict, source_url: str, org_name: str) -> dict[str, Any]:
"""Transform Firecrawl scrape result to digital_platform_v2 format."""
metadata = scrape_result.get('metadata', {})
links = scrape_result.get('links', [])
markdown = scrape_result.get('markdown', '')
# Extract title from metadata, checking for generic titles
candidate_titles = [
metadata.get('ogTitle'),
metadata.get('title', '').split(' - ')[0].strip(),
metadata.get('title', '').split(' | ')[0].strip(),
metadata.get('og:title'),
metadata.get('ogSiteName'),
metadata.get('og:site_name'),
]
# Find first non-generic title
title = org_name # Default fallback
for candidate in candidate_titles:
if candidate and not is_generic_title(candidate):
title = candidate
break
# Generate platform ID
domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_')
platform_id = f"primary_website_{domain}"
# Detect platform type
platform_type = detect_platform_type(source_url, links)
# Extract collection URLs
collection_urls = extract_collection_urls(links, source_url)
# Extract auxiliary platforms
auxiliary_platforms = extract_auxiliary_platforms(links, source_url)
# Build digital_platform_v2 structure
platform_v2 = {
'transformation_metadata': {
'transformed_from': 'firecrawl_scrape',
'transformation_date': datetime.now(timezone.utc).isoformat(),
'transformation_version': '2.0',
'source_status_code': metadata.get('statusCode', 200),
},
'primary_platform': {
'platform_id': platform_id,
'platform_name': f"{title} Website" if 'website' not in title.lower() else title,
'platform_url': source_url,
'platform_type': platform_type,
'description': metadata.get('description') or metadata.get('ogDescription', ''),
'language': metadata.get('language', 'nl'),
'og_image': metadata.get('ogImage') or metadata.get('og:image'),
'favicon': metadata.get('favicon'),
},
}
# Add collection URLs if found
if collection_urls:
platform_v2['primary_platform']['collection_urls'] = collection_urls
# Add auxiliary platforms if found
if auxiliary_platforms:
platform_v2['auxiliary_platforms'] = auxiliary_platforms
# Add internal navigation links (sample)
internal_links = [
l for l in links
if urlparse(l).netloc in urlparse(source_url).netloc
][:20]
if internal_links:
platform_v2['navigation_links'] = internal_links
return platform_v2
def scrape_single_url(url: str, client: httpx.Client, max_retries: int = 3) -> dict | None:
"""Scrape a single URL using Firecrawl API with retry on rate limit."""
for attempt in range(max_retries):
try:
response = client.post(
f"{FIRECRAWL_BASE_URL}/scrape",
json={
'url': url,
'formats': ['markdown', 'links'],
'onlyMainContent': True,
},
timeout=60.0,
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
return data.get('data', {})
# Handle rate limiting (429)
if response.status_code == 429:
wait_time = 15 * (attempt + 1) # 15s, 30s, 45s
print(f" Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait_time)
continue
print(f" Error {response.status_code}: {response.text[:200]}")
return None
except Exception as e:
print(f" Exception: {e}")
if attempt < max_retries - 1:
time.sleep(5)
continue
return None
print(f" Max retries exceeded")
return None
def update_custodian_file(filepath: Path, platform_v2: dict) -> bool:
"""Update a custodian YAML file with digital_platform_v2 data."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data is None:
data = {}
# Add digital_platform_v2 section
data['digital_platform_v2'] = platform_v2
# Update crawl4ai_enrichment status
if 'crawl4ai_enrichment' in data:
data['crawl4ai_enrichment']['recrawled_with'] = 'firecrawl'
data['crawl4ai_enrichment']['recrawl_date'] = datetime.now(timezone.utc).isoformat()
with open(filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
print(f" Error updating {filepath}: {e}")
return False
def load_failed_urls() -> list[tuple[str, str]]:
"""Load the list of failed URLs with their file paths."""
urls = []
with open(FAILED_URLS_FILE, 'r') as f:
for line in f:
line = line.strip()
if '\t' in line:
filename, url = line.split('\t', 1)
urls.append((filename, url))
return urls
def get_org_name(filepath: Path) -> str:
"""Extract organization name from custodian file."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
# Try different name fields
if data:
if 'original_entry' in data and data['original_entry'].get('organisatie'):
return data['original_entry']['organisatie']
if 'custodian_name' in data:
return data['custodian_name'].get('emic_name', '') or data['custodian_name'].get('preferred_name', '')
if 'name' in data:
return data['name']
# Fallback: extract from filename
stem = filepath.stem
parts = stem.split('-')
return parts[-1] if parts else stem
except Exception:
return filepath.stem
def main():
parser = argparse.ArgumentParser(description='Batch re-crawl failed URLs with Firecrawl')
parser.add_argument('--batch-size', type=int, default=50, help='Number of URLs per batch')
parser.add_argument('--start', type=int, default=0, help='Starting index')
parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)')
parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes')
parser.add_argument('--delay', type=float, default=6.0, help='Delay between requests in seconds (default 6 for rate limits)')
args = parser.parse_args()
if not FIRECRAWL_API_KEY:
print("Error: FIRECRAWL_API_KEY environment variable not set")
sys.exit(1)
# Load URLs
all_urls = load_failed_urls()
print(f"Loaded {len(all_urls)} failed URLs")
# Slice based on start and limit
if args.limit > 0:
urls_to_process = all_urls[args.start:args.start + args.limit]
else:
urls_to_process = all_urls[args.start:]
print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})")
if args.dry_run:
print("\n[DRY RUN MODE - No changes will be made]")
for filename, url in urls_to_process[:10]:
print(f" Would scrape: {filename} -> {url}")
print(f" ... and {len(urls_to_process) - 10} more")
return
# Create HTTP client
client = httpx.Client(
headers={
'Authorization': f'Bearer {FIRECRAWL_API_KEY}',
'Content-Type': 'application/json',
}
)
success_count = 0
fail_count = 0
try:
for i, (filename, url) in enumerate(urls_to_process):
filepath = CUSTODIAN_DIR / filename
print(f"\n[{i+1}/{len(urls_to_process)}] {filename}")
print(f" URL: {url}")
if not filepath.exists():
print(f" SKIP: File not found")
continue
# Check if already has digital_platform_v2
with open(filepath, 'r') as f:
content = f.read()
if 'digital_platform_v2:' in content:
print(f" SKIP: Already has digital_platform_v2")
continue
# Get org name for platform naming
org_name = get_org_name(filepath)
# Scrape URL
result = scrape_single_url(url, client)
if result:
# Transform to platform_v2
platform_v2 = transform_to_platform_v2(result, url, org_name)
# Update file
if update_custodian_file(filepath, platform_v2):
success_count += 1
print(f" SUCCESS: {platform_v2['primary_platform']['platform_name']}")
else:
fail_count += 1
else:
fail_count += 1
print(f" FAILED: Could not scrape URL")
# Rate limiting
time.sleep(args.delay)
# Progress update every 50 URLs
if (i + 1) % 50 == 0:
print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, fail={fail_count}) ===\n")
finally:
client.close()
print(f"\n=== Final Results ===")
print(f"Success: {success_count}")
print(f"Failed: {fail_count}")
print(f"Total: {len(urls_to_process)}")
if __name__ == '__main__':
main()