glam/scripts/batch_crawl4ai_recrawl.py
kempersc 0c36429257 feat(scripts): Add batch crawling and data quality scripts
- batch_crawl4ai_recrawl.py: Retry failed URL crawls
- batch_firecrawl_recrawl.py: FireCrawl batch processing
- batch_httpx_scrape.py: HTTPX-based scraping
- detect_name_mismatch.py: Find name mismatches in data
- enrich_dutch_custodians_crawl4ai.py: Dutch custodian enrichment
- fix_collision_victims.py: GHCID collision resolution
- fix_generic_platform_names*.py: Platform name cleanup
- fix_ghcid_type.py: GHCID type corrections
- fix_simon_kemper_contamination.py: Data cleanup
- scan_dutch_data_quality.py: Data quality scanning
- transform_crawl4ai_to_digital_platform.py: Data transformation
2025-12-15 01:47:46 +01:00

371 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Batch re-crawl failed URLs using crawl4ai (free, local) and transform to digital_platform_v2.
This script:
1. Reads the list of failed crawl URLs
2. Uses crawl4ai to fetch content (free, no API limits)
3. Transforms results to digital_platform_v2 format
4. Updates the custodian YAML files
Usage:
python scripts/batch_crawl4ai_recrawl.py --limit 100 --start 0
"""
import argparse
import asyncio
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import yaml
from crawl4ai import AsyncWebCrawler
# Configuration
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt")
# Platform type detection patterns
PLATFORM_PATTERNS = {
'DISCOVERY_PORTAL': [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/archief', r'/archive',
r'/beeldbank', r'/images', r'/foto', r'/photo',
],
'DIGITAL_ARCHIVE': [
r'archieven\.nl', r'archief', r'archive',
r'/inventaris', r'/inventory', r'/toegang',
],
'EDUCATION': [
r'/educatie', r'/education', r'/onderwijs', r'/leren',
r'/scholen', r'/schools', r'/lesmateriaal',
],
'INSTITUTIONAL_WEBSITE': [
r'/over-ons', r'/about', r'/contact', r'/bezoek',
r'/visit', r'/openingstijden', r'/hours',
],
}
def detect_platform_type(url: str, links: list[str] | None = None) -> str:
"""Detect the platform type based on URL patterns and extracted links."""
url_lower = url.lower()
all_urls = [url_lower] + [l.lower() for l in (links or [])]
for platform_type, patterns in PLATFORM_PATTERNS.items():
for pattern in patterns:
for check_url in all_urls:
if re.search(pattern, check_url):
return platform_type
return 'INSTITUTIONAL_WEBSITE'
def extract_collection_urls(links: list[str], base_url: str) -> list[str]:
"""Extract URLs that appear to be collection/catalog pages."""
collection_patterns = [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/beeldbank', r'/inventaris',
r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen',
]
collection_urls = []
base_domain = urlparse(base_url).netloc
for link in links:
try:
parsed = urlparse(link)
if base_domain in parsed.netloc or parsed.netloc in base_domain:
for pattern in collection_patterns:
if re.search(pattern, link.lower()):
if link not in collection_urls:
collection_urls.append(link)
break
except Exception:
continue
return collection_urls[:10]
def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict]:
"""Extract external platform links (aggregators, portals, etc.)."""
external_patterns = {
'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'},
'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'},
'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'},
'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'},
'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'},
'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'},
}
base_domain = urlparse(base_url).netloc
auxiliary = []
seen_domains = set()
for link in links:
try:
parsed = urlparse(link)
domain = parsed.netloc.replace('www.', '')
if base_domain in domain or domain in base_domain:
continue
for pattern, info in external_patterns.items():
if pattern in domain and domain not in seen_domains:
seen_domains.add(domain)
auxiliary.append({
'platform_name': info['name'],
'platform_url': link,
'platform_type': info['type'],
'integration_type': 'external_aggregator',
})
break
except Exception:
continue
return auxiliary[:5]
def is_generic_title(title: str) -> bool:
"""Check if a title is too generic to use as platform name."""
generic_patterns = [
'home', 'homepage', 'welkom', 'welcome', 'startpagina',
'index', 'main', 'website', 'webpagina', 'homepagina',
]
if not title:
return True
title_lower = title.lower().strip()
for pattern in generic_patterns:
if title_lower == pattern or title_lower.startswith(f"{pattern} -") or title_lower.startswith(f"{pattern} |"):
return True
return len(title) < 3
def transform_to_platform_v2(crawl_result, source_url: str, org_name: str) -> dict[str, Any]:
"""Transform crawl4ai result to digital_platform_v2 format."""
metadata = crawl_result.metadata or {}
# Get internal links
internal_links = []
if crawl_result.links:
internal_links = [l.get('href', '') for l in crawl_result.links.get('internal', []) if l.get('href')]
# Extract title, checking for generic titles
candidate_titles = [
metadata.get('og:title'),
metadata.get('title', '').split(' - ')[0].strip(),
metadata.get('title', '').split(' | ')[0].strip(),
metadata.get('og:site_name'),
]
title = org_name # Default fallback
for candidate in candidate_titles:
if candidate and not is_generic_title(candidate):
title = candidate
break
# Generate platform ID
domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_')
platform_id = f"primary_website_{domain}"
# Detect platform type
platform_type = detect_platform_type(source_url, internal_links)
# Extract collection URLs
collection_urls = extract_collection_urls(internal_links, source_url)
# Extract auxiliary platforms
auxiliary_platforms = extract_auxiliary_platforms(internal_links, source_url)
# Build digital_platform_v2 structure
platform_v2: dict[str, Any] = {
'transformation_metadata': {
'transformed_from': 'crawl4ai_recrawl',
'transformation_date': datetime.now(timezone.utc).isoformat(),
'transformation_version': '2.0',
'source_status_code': crawl_result.status_code,
},
'primary_platform': {
'platform_id': platform_id,
'platform_name': f"{title} Website" if 'website' not in title.lower() else title,
'platform_url': source_url,
'platform_type': platform_type,
'description': metadata.get('description') or metadata.get('og:description', ''),
'language': metadata.get('language', 'nl'),
'og_image': metadata.get('og:image'),
'favicon': metadata.get('favicon'),
},
}
if collection_urls:
platform_v2['primary_platform']['collection_urls'] = collection_urls
if auxiliary_platforms:
platform_v2['auxiliary_platforms'] = auxiliary_platforms
if internal_links:
platform_v2['navigation_links'] = internal_links[:20]
return platform_v2
def update_custodian_file(filepath: Path, platform_v2: dict) -> bool:
"""Update a custodian YAML file with digital_platform_v2 data."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data is None:
data = {}
data['digital_platform_v2'] = platform_v2
if 'crawl4ai_enrichment' in data:
data['crawl4ai_enrichment']['recrawled_with'] = 'crawl4ai_v2'
data['crawl4ai_enrichment']['recrawl_date'] = datetime.now(timezone.utc).isoformat()
with open(filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
print(f" Error updating {filepath}: {e}")
return False
def load_failed_urls() -> list[tuple[str, str]]:
"""Load the list of failed URLs with their file paths."""
urls = []
with open(FAILED_URLS_FILE, 'r') as f:
for line in f:
line = line.strip()
if '\t' in line:
filename, url = line.split('\t', 1)
urls.append((filename, url))
return urls
def get_org_name(filepath: Path) -> str:
"""Extract organization name from custodian file."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data:
if 'original_entry' in data and data['original_entry'].get('organisatie'):
return data['original_entry']['organisatie']
if 'custodian_name' in data:
return data['custodian_name'].get('emic_name', '') or data['custodian_name'].get('preferred_name', '')
if 'name' in data:
return data['name']
stem = filepath.stem
parts = stem.split('-')
return parts[-1] if parts else stem
except Exception:
return filepath.stem
async def scrape_single_url(crawler: AsyncWebCrawler, url: str) -> Any:
"""Scrape a single URL using crawl4ai."""
try:
result = await crawler.arun(url, verbose=False)
if result.success:
return result
print(f" Crawl failed: {result.error_message}")
return None
except Exception as e:
print(f" Exception: {e}")
return None
async def main_async(args):
"""Async main function."""
all_urls = load_failed_urls()
print(f"Loaded {len(all_urls)} failed URLs")
if args.limit > 0:
urls_to_process = all_urls[args.start:args.start + args.limit]
else:
urls_to_process = all_urls[args.start:]
print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})")
if args.dry_run:
print("\n[DRY RUN MODE - No changes will be made]")
for filename, url in urls_to_process[:10]:
print(f" Would scrape: {filename} -> {url}")
print(f" ... and {len(urls_to_process) - 10} more")
return
success_count = 0
fail_count = 0
skip_count = 0
async with AsyncWebCrawler(verbose=False) as crawler:
for i, (filename, url) in enumerate(urls_to_process):
filepath = CUSTODIAN_DIR / filename
print(f"\n[{i+1}/{len(urls_to_process)}] {filename}")
print(f" URL: {url}")
if not filepath.exists():
print(f" SKIP: File not found")
skip_count += 1
continue
# Check if already has digital_platform_v2
with open(filepath, 'r') as f:
content = f.read()
if 'digital_platform_v2:' in content:
print(f" SKIP: Already has digital_platform_v2")
skip_count += 1
continue
org_name = get_org_name(filepath)
result = await scrape_single_url(crawler, url)
if result:
platform_v2 = transform_to_platform_v2(result, url, org_name)
if update_custodian_file(filepath, platform_v2):
success_count += 1
print(f" SUCCESS: {platform_v2['primary_platform']['platform_name']}")
else:
fail_count += 1
else:
fail_count += 1
print(f" FAILED: Could not scrape URL")
# Small delay to be polite
await asyncio.sleep(args.delay)
if (i + 1) % 50 == 0:
print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, skip={skip_count}, fail={fail_count}) ===\n")
print(f"\n=== Final Results ===")
print(f"Success: {success_count}")
print(f"Skipped: {skip_count}")
print(f"Failed: {fail_count}")
print(f"Total: {len(urls_to_process)}")
def main():
parser = argparse.ArgumentParser(description='Batch re-crawl failed URLs with crawl4ai')
parser.add_argument('--start', type=int, default=0, help='Starting index')
parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)')
parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes')
parser.add_argument('--delay', type=float, default=0.5, help='Delay between requests in seconds')
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == '__main__':
main()