- batch_crawl4ai_recrawl.py: Retry failed URL crawls - batch_firecrawl_recrawl.py: FireCrawl batch processing - batch_httpx_scrape.py: HTTPX-based scraping - detect_name_mismatch.py: Find name mismatches in data - enrich_dutch_custodians_crawl4ai.py: Dutch custodian enrichment - fix_collision_victims.py: GHCID collision resolution - fix_generic_platform_names*.py: Platform name cleanup - fix_ghcid_type.py: GHCID type corrections - fix_simon_kemper_contamination.py: Data cleanup - scan_dutch_data_quality.py: Data quality scanning - transform_crawl4ai_to_digital_platform.py: Data transformation
488 lines
18 KiB
Python
488 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Batch web scraper using httpx + BeautifulSoup for digital_platform_v2 enrichment.
|
|
|
|
This script:
|
|
1. Reads the list of failed crawl URLs
|
|
2. Uses httpx to fetch HTML content directly (no browser, no external API)
|
|
3. Uses BeautifulSoup to parse and extract metadata
|
|
4. Transforms results to digital_platform_v2 format
|
|
5. Updates the custodian YAML files
|
|
|
|
Usage:
|
|
python scripts/batch_httpx_scrape.py --limit 10
|
|
python scripts/batch_httpx_scrape.py --start 100 --limit 50
|
|
python scripts/batch_httpx_scrape.py --dry-run
|
|
|
|
No API keys or external services required!
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import httpx
|
|
import yaml
|
|
from bs4 import BeautifulSoup
|
|
|
|
# Configuration
|
|
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt")
|
|
|
|
# User agent to mimic a real browser
|
|
USER_AGENT = (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
# Platform type detection patterns
|
|
PLATFORM_PATTERNS: dict[str, list[str]] = {
|
|
'DISCOVERY_PORTAL': [
|
|
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
|
|
r'/zoeken', r'/search', r'/archief', r'/archive',
|
|
r'/beeldbank', r'/images', r'/foto', r'/photo',
|
|
],
|
|
'DIGITAL_ARCHIVE': [
|
|
r'archieven\.nl', r'archief', r'archive',
|
|
r'/inventaris', r'/inventory', r'/toegang',
|
|
],
|
|
'EDUCATION': [
|
|
r'/educatie', r'/education', r'/onderwijs', r'/leren',
|
|
r'/scholen', r'/schools', r'/lesmateriaal',
|
|
],
|
|
'INSTITUTIONAL_WEBSITE': [
|
|
r'/over-ons', r'/about', r'/contact', r'/bezoek',
|
|
r'/visit', r'/openingstijden', r'/hours',
|
|
],
|
|
}
|
|
|
|
|
|
def detect_platform_type(url: str, links: list[str] | None = None) -> str:
|
|
"""Detect the platform type based on URL patterns and extracted links."""
|
|
url_lower = url.lower()
|
|
all_urls = [url_lower] + [link.lower() for link in (links or [])]
|
|
|
|
for platform_type, patterns in PLATFORM_PATTERNS.items():
|
|
for pattern in patterns:
|
|
for check_url in all_urls:
|
|
if re.search(pattern, check_url):
|
|
return platform_type
|
|
|
|
return 'INSTITUTIONAL_WEBSITE'
|
|
|
|
|
|
def extract_collection_urls(links: list[str], base_url: str) -> list[str]:
|
|
"""Extract URLs that appear to be collection/catalog pages."""
|
|
collection_patterns = [
|
|
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
|
|
r'/zoeken', r'/search', r'/beeldbank', r'/inventaris',
|
|
r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen',
|
|
]
|
|
|
|
collection_urls: list[str] = []
|
|
base_domain = urlparse(base_url).netloc
|
|
|
|
for link in links:
|
|
try:
|
|
parsed = urlparse(link)
|
|
if base_domain in parsed.netloc or parsed.netloc in base_domain:
|
|
for pattern in collection_patterns:
|
|
if re.search(pattern, link.lower()):
|
|
if link not in collection_urls:
|
|
collection_urls.append(link)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
return collection_urls[:10]
|
|
|
|
|
|
def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict[str, str]]:
|
|
"""Extract external platform links (aggregators, portals, etc.)."""
|
|
external_patterns: dict[str, dict[str, str]] = {
|
|
'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'},
|
|
'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'},
|
|
'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'},
|
|
'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'},
|
|
'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'},
|
|
'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'},
|
|
'archiefweb.eu': {'name': 'Archiefweb', 'type': 'DIGITAL_ARCHIVE'},
|
|
}
|
|
|
|
base_domain = urlparse(base_url).netloc
|
|
auxiliary: list[dict[str, str]] = []
|
|
seen_domains: set[str] = set()
|
|
|
|
for link in links:
|
|
try:
|
|
parsed = urlparse(link)
|
|
domain = parsed.netloc.replace('www.', '')
|
|
|
|
if base_domain in domain or domain in base_domain:
|
|
continue
|
|
|
|
for pattern, info in external_patterns.items():
|
|
if pattern in domain and domain not in seen_domains:
|
|
seen_domains.add(domain)
|
|
auxiliary.append({
|
|
'platform_name': info['name'],
|
|
'platform_url': link,
|
|
'platform_type': info['type'],
|
|
'integration_type': 'external_aggregator',
|
|
})
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
return auxiliary[:5]
|
|
|
|
|
|
def is_generic_title(title: str | None) -> bool:
|
|
"""Check if a title is too generic to use as platform name."""
|
|
generic_patterns = [
|
|
'home', 'homepage', 'welkom', 'welcome', 'startpagina',
|
|
'index', 'main', 'website', 'webpagina', 'web page',
|
|
]
|
|
if not title:
|
|
return True
|
|
title_lower = title.lower().strip()
|
|
for pattern in generic_patterns:
|
|
if title_lower == pattern or title_lower == f"{pattern} -" or title_lower.startswith(f"{pattern} |"):
|
|
return True
|
|
return len(title) < 3
|
|
|
|
|
|
def scrape_with_httpx(url: str, client: httpx.Client, timeout: float = 30.0) -> dict[str, Any] | None:
|
|
"""Scrape a URL using httpx and return parsed metadata."""
|
|
try:
|
|
response = client.get(url, timeout=timeout, follow_redirects=True)
|
|
|
|
if response.status_code != 200:
|
|
return {'error': f'HTTP {response.status_code}', 'status_code': response.status_code}
|
|
|
|
# Parse HTML
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
|
|
# Extract metadata
|
|
metadata: dict[str, Any] = {
|
|
'status_code': response.status_code,
|
|
'final_url': str(response.url),
|
|
}
|
|
|
|
# Title
|
|
title_tag = soup.find('title')
|
|
metadata['title'] = title_tag.get_text(strip=True) if title_tag else None
|
|
|
|
# Meta tags
|
|
for meta in soup.find_all('meta'):
|
|
name = str(meta.get('name', '')).lower()
|
|
prop = str(meta.get('property', '')).lower()
|
|
content = str(meta.get('content', ''))
|
|
|
|
if name == 'description' or prop == 'og:description':
|
|
if 'description' not in metadata or prop == 'og:description':
|
|
metadata['description'] = content
|
|
elif prop == 'og:title':
|
|
metadata['og_title'] = content
|
|
elif prop == 'og:image':
|
|
metadata['og_image'] = urljoin(url, content) if content else None
|
|
elif prop == 'og:site_name':
|
|
metadata['og_site_name'] = content
|
|
elif name == 'language' or str(meta.get('http-equiv', '')).lower() == 'content-language':
|
|
metadata['language'] = content.split(',')[0].split('-')[0]
|
|
|
|
# Detect language from html tag
|
|
html_tag = soup.find('html')
|
|
if html_tag:
|
|
lang_attr = html_tag.get('lang')
|
|
if lang_attr:
|
|
lang_str = str(lang_attr) if not isinstance(lang_attr, list) else str(lang_attr[0])
|
|
metadata['language'] = lang_str.split('-')[0]
|
|
|
|
# Favicon
|
|
for link in soup.find_all('link'):
|
|
rel = link.get('rel')
|
|
if rel is None:
|
|
rel = []
|
|
if isinstance(rel, list):
|
|
rel_str = ' '.join(str(r) for r in rel)
|
|
else:
|
|
rel_str = str(rel)
|
|
if 'icon' in rel_str.lower():
|
|
href = link.get('href')
|
|
if href:
|
|
metadata['favicon'] = urljoin(url, str(href))
|
|
break
|
|
|
|
# Extract links
|
|
links: list[str] = []
|
|
for a in soup.find_all('a', href=True):
|
|
href = str(a['href'])
|
|
if href.startswith('http') or href.startswith('/'):
|
|
full_url = urljoin(url, href)
|
|
if full_url not in links:
|
|
links.append(full_url)
|
|
|
|
metadata['links'] = links[:100] # Limit to 100 links
|
|
|
|
return metadata
|
|
|
|
except httpx.TimeoutException:
|
|
return {'error': 'Timeout', 'status_code': None}
|
|
except httpx.ConnectError as e:
|
|
return {'error': f'Connection error: {e}', 'status_code': None}
|
|
except httpx.HTTPError as e:
|
|
return {'error': f'HTTP error: {e}', 'status_code': None}
|
|
except Exception as e:
|
|
return {'error': f'Exception: {e}', 'status_code': None}
|
|
|
|
|
|
def transform_to_platform_v2(scrape_result: dict[str, Any], source_url: str, org_name: str) -> dict[str, Any]:
|
|
"""Transform scrape result to digital_platform_v2 format."""
|
|
links: list[str] = scrape_result.get('links', [])
|
|
|
|
# Extract title, preferring og:title, then site_name, then page title
|
|
raw_title = scrape_result.get('title', '') or ''
|
|
candidate_titles: list[str | None] = [
|
|
scrape_result.get('og_title'),
|
|
scrape_result.get('og_site_name'),
|
|
raw_title.split(' - ')[0].strip() if raw_title else None,
|
|
raw_title.split(' | ')[0].strip() if raw_title else None,
|
|
]
|
|
|
|
title = org_name # Default fallback
|
|
for candidate in candidate_titles:
|
|
if candidate and not is_generic_title(candidate):
|
|
title = candidate
|
|
break
|
|
|
|
# Generate platform ID
|
|
domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_')
|
|
platform_id = f"primary_website_{domain}"
|
|
|
|
# Detect platform type
|
|
platform_type = detect_platform_type(source_url, links)
|
|
|
|
# Extract collection URLs
|
|
collection_urls = extract_collection_urls(links, source_url)
|
|
|
|
# Extract auxiliary platforms
|
|
auxiliary_platforms = extract_auxiliary_platforms(links, source_url)
|
|
|
|
# Build digital_platform_v2 structure
|
|
platform_v2: dict[str, Any] = {
|
|
'transformation_metadata': {
|
|
'transformed_from': 'httpx_beautifulsoup',
|
|
'transformation_date': datetime.now(timezone.utc).isoformat(),
|
|
'transformation_version': '2.1',
|
|
'source_status_code': scrape_result.get('status_code', 200),
|
|
},
|
|
'primary_platform': {
|
|
'platform_id': platform_id,
|
|
'platform_name': f"{title} Website" if 'website' not in title.lower() else title,
|
|
'platform_url': scrape_result.get('final_url', source_url),
|
|
'platform_type': platform_type,
|
|
'description': scrape_result.get('description', ''),
|
|
'language': scrape_result.get('language', 'nl'),
|
|
'og_image': scrape_result.get('og_image'),
|
|
'favicon': scrape_result.get('favicon'),
|
|
},
|
|
}
|
|
|
|
# Add collection URLs if found
|
|
if collection_urls:
|
|
platform_v2['primary_platform']['collection_urls'] = collection_urls
|
|
|
|
# Add auxiliary platforms if found
|
|
if auxiliary_platforms:
|
|
platform_v2['auxiliary_platforms'] = auxiliary_platforms
|
|
|
|
# Add internal navigation links (sample)
|
|
base_domain = urlparse(source_url).netloc
|
|
internal_links = [link for link in links if base_domain in urlparse(link).netloc][:20]
|
|
if internal_links:
|
|
platform_v2['navigation_links'] = internal_links
|
|
|
|
return platform_v2
|
|
|
|
|
|
def update_custodian_file(filepath: Path, platform_v2: dict[str, Any]) -> bool:
|
|
"""Update a custodian YAML file with digital_platform_v2 data."""
|
|
try:
|
|
with open(filepath, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if data is None:
|
|
data = {}
|
|
|
|
# Add digital_platform_v2 section
|
|
data['digital_platform_v2'] = platform_v2
|
|
|
|
with open(filepath, 'w') as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" Error updating {filepath}: {e}")
|
|
return False
|
|
|
|
|
|
def load_failed_urls() -> list[tuple[str, str]]:
|
|
"""Load the list of failed URLs with their file paths."""
|
|
urls: list[tuple[str, str]] = []
|
|
with open(FAILED_URLS_FILE, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if '\t' in line:
|
|
filename, url = line.split('\t', 1)
|
|
urls.append((filename, url))
|
|
return urls
|
|
|
|
|
|
def get_org_name(filepath: Path) -> str:
|
|
"""Extract organization name from custodian file."""
|
|
try:
|
|
with open(filepath, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if data:
|
|
if 'original_entry' in data and data['original_entry'].get('organisatie'):
|
|
return str(data['original_entry']['organisatie'])
|
|
if 'custodian_name' in data:
|
|
cn = data['custodian_name']
|
|
return str(cn.get('emic_name', '') or cn.get('preferred_name', ''))
|
|
if 'name' in data:
|
|
return str(data['name'])
|
|
|
|
# Fallback: extract from filename
|
|
stem = filepath.stem
|
|
parts = stem.split('-')
|
|
return parts[-1] if parts else stem
|
|
|
|
except Exception:
|
|
return filepath.stem
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description='Batch web scraper using httpx + BeautifulSoup')
|
|
parser.add_argument('--start', type=int, default=0, help='Starting index')
|
|
parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)')
|
|
parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes')
|
|
parser.add_argument('--delay', type=float, default=1.0, help='Delay between requests in seconds (default 1)')
|
|
parser.add_argument('--timeout', type=float, default=30.0, help='Request timeout in seconds (default 30)')
|
|
parser.add_argument('--skip-existing', action='store_true', default=True, help='Skip files that already have digital_platform_v2')
|
|
args = parser.parse_args()
|
|
|
|
# Check for BeautifulSoup
|
|
try:
|
|
from bs4 import BeautifulSoup as _ # noqa: F401
|
|
except ImportError:
|
|
print("Error: BeautifulSoup not installed. Run: pip install beautifulsoup4")
|
|
sys.exit(1)
|
|
|
|
# Load URLs
|
|
all_urls = load_failed_urls()
|
|
print(f"Loaded {len(all_urls)} failed URLs from {FAILED_URLS_FILE}")
|
|
|
|
# Slice based on start and limit
|
|
if args.limit > 0:
|
|
urls_to_process = all_urls[args.start:args.start + args.limit]
|
|
else:
|
|
urls_to_process = all_urls[args.start:]
|
|
|
|
print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})")
|
|
|
|
if args.dry_run:
|
|
print("\n[DRY RUN MODE - No changes will be made]")
|
|
for filename, url in urls_to_process[:10]:
|
|
print(f" Would scrape: {filename} -> {url}")
|
|
if len(urls_to_process) > 10:
|
|
print(f" ... and {len(urls_to_process) - 10} more")
|
|
return
|
|
|
|
# Create HTTP client with headers
|
|
client = httpx.Client(
|
|
headers={
|
|
'User-Agent': USER_AGENT,
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
|
'Accept-Language': 'nl,en-US;q=0.9,en;q=0.8',
|
|
},
|
|
follow_redirects=True,
|
|
timeout=args.timeout,
|
|
)
|
|
|
|
success_count = 0
|
|
skip_count = 0
|
|
fail_count = 0
|
|
|
|
try:
|
|
for i, (filename, url) in enumerate(urls_to_process):
|
|
filepath = CUSTODIAN_DIR / filename
|
|
|
|
print(f"\n[{i+1}/{len(urls_to_process)}] {filename}")
|
|
print(f" URL: {url}")
|
|
|
|
if not filepath.exists():
|
|
print(f" SKIP: File not found")
|
|
skip_count += 1
|
|
continue
|
|
|
|
# Check if already has digital_platform_v2
|
|
if args.skip_existing:
|
|
with open(filepath, 'r') as f:
|
|
content = f.read()
|
|
if 'digital_platform_v2:' in content:
|
|
print(f" SKIP: Already has digital_platform_v2")
|
|
skip_count += 1
|
|
continue
|
|
|
|
# Get org name for platform naming
|
|
org_name = get_org_name(filepath)
|
|
|
|
# Scrape URL
|
|
result = scrape_with_httpx(url, client, timeout=args.timeout)
|
|
|
|
if result and 'error' not in result:
|
|
# Transform to platform_v2
|
|
platform_v2 = transform_to_platform_v2(result, url, org_name)
|
|
|
|
# Update file
|
|
if update_custodian_file(filepath, platform_v2):
|
|
success_count += 1
|
|
platform_name = platform_v2['primary_platform']['platform_name']
|
|
print(f" SUCCESS: {platform_name}")
|
|
else:
|
|
fail_count += 1
|
|
else:
|
|
fail_count += 1
|
|
error_msg = result.get('error', 'Unknown error') if result else 'No result'
|
|
print(f" FAILED: {error_msg}")
|
|
|
|
# Rate limiting
|
|
if args.delay > 0:
|
|
time.sleep(args.delay)
|
|
|
|
# Progress update every 50 URLs
|
|
if (i + 1) % 50 == 0:
|
|
print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, skip={skip_count}, fail={fail_count}) ===\n")
|
|
|
|
finally:
|
|
client.close()
|
|
|
|
print(f"\n=== Final Results ===")
|
|
print(f"Success: {success_count}")
|
|
print(f"Skipped: {skip_count}")
|
|
print(f"Failed: {fail_count}")
|
|
print(f"Total: {len(urls_to_process)}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|