#!/usr/bin/env python3 """ Enrich Dutch custodian YAML files with web data using Crawl4AI (free, local). This script replaces the Firecrawl-based enrichment with Crawl4AI which: 1. Runs locally using Playwright (no API costs) 2. Extracts links, metadata, and content with XPath provenance 3. Detects APIs, catalogs, and metadata standards Usage: python scripts/enrich_dutch_custodians_crawl4ai.py [options] Options: --dry-run Show what would be enriched without modifying files --limit N Process only first N files (for testing) --start-index N Start from index N (for resuming) --resume Resume from last checkpoint --force Re-enrich even if already has crawl4ai_enrichment --file PATH Process a single specific file """ import argparse import asyncio import json import logging import os import re import sys from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse, urlunparse import yaml from dotenv import load_dotenv from lxml import etree # Crawl4AI imports from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode # Load environment variables from .env file load_dotenv() # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" CHECKPOINT_FILE = CUSTODIAN_DIR / ".crawl4ai_enrichment_checkpoint.json" # Rate limiting - be nice to websites even though we're local REQUEST_DELAY = 2.0 # seconds between requests # Digital platform detection patterns API_ENDPOINT_PATTERNS = [ r'/oai[-_]?pmh', r'/api/', r'/rest/', r'/sparql', r'/graphql', r'/iiif/', r'/sru', r'/z39\.50', r'/opensearch', ] CATALOG_PATTERNS = [ r'/catalogu[es]?(?:/|\?|$)', r'/collecti[eo]n?[s]?(?:/|\?|$)', r'/archie[fv](?:/|\?|$)', r'/beeldbank(?:/|\?|$)', r'/zoeken(?:/|\?|$)', r'/search(?:/|\?|$)', r'/discover(?:/|\?|$)', r'/browse(?:/|\?|$)', ] # Dutch-specific catalog type detection CATALOG_TYPE_PATTERNS = { 'beeldbank': { 'patterns': [r'/beeldbank', r'/beeld', r'/images', r'/foto'], 'label': 'Image Collection', 'description_nl': 'Beeldbank met gedigitaliseerde foto\'s, kaarten en afbeeldingen', }, 'genealogie': { 'patterns': [r'/genealogie', r'/stamboom', r'/persons', r'/akten'], 'label': 'Genealogy Records', 'description_nl': 'Genealogische bronnen en persoonsgegevens', }, 'archieven': { 'patterns': [r'/archie[fv]', r'/inventaris', r'/toegangen', r'/finding'], 'label': 'Archive Finding Aids', 'description_nl': 'Archiefinventarissen en toegangen', }, 'collectie': { 'patterns': [r'/collectie', r'/collection', r'/object'], 'label': 'Collection Portal', 'description_nl': 'Collectieportaal met objecten en kunstwerken', }, 'kranten': { 'patterns': [r'/kranten', r'/newspaper', r'/periodiek'], 'label': 'Newspaper Archive', 'description_nl': 'Gedigitaliseerde kranten en periodieken', }, 'kaarten': { 'patterns': [r'/kaart', r'/map', r'/cartogra'], 'label': 'Map Collection', 'description_nl': 'Historische kaarten en cartografisch materiaal', }, 'bibliotheek': { 'patterns': [r'/catalogu', r'/biblio', r'/library', r'/boek'], 'label': 'Library Catalog', 'description_nl': 'Bibliotheekcatalogus', }, 'zoeken': { 'patterns': [r'/zoeken', r'/search', r'/discover', r'/browse'], 'label': 'Search Interface', 'description_nl': 'Algemene zoekinterface', }, } CMS_INDICATORS = { 'atlantis': ['atlantis', 'picturae'], 'mais_flexis': ['mais-flexis', 'mais flexis', 'de ree'], 'adlib': ['adlib', 'axiell'], 'collective_access': ['collectiveaccess', 'collective access'], 'archivematica': ['archivematica'], 'archivesspace': ['archivesspace'], 'atom': ['accesstomemory', 'atom'], 'omeka': ['omeka'], 'contentdm': ['contentdm'], 'dspace': ['dspace'], 'islandora': ['islandora'], 'memorix': ['memorix'], } # Metadata standards detection patterns with regex word boundaries METADATA_STANDARDS_PATTERNS = [ (r'\bdublin\s+core\b', 'Dublin Core', True), (r'\bdc:', 'Dublin Core', True), (r'\bdcterms\b', 'Dublin Core', True), (r'\bmarc\s*21\b', 'MARC21', True), (r'\bmarc21\b', 'MARC21', True), (r'\bead\b', 'EAD', True), (r'encoded\s+archival\s+description', 'EAD', True), (r'\bead\s*2002\b', 'EAD', True), (r'\bead3\b', 'EAD', True), (r'\bmets\b', 'METS', True), (r'metadata\s+encoding\s+and\s+transmission', 'METS', True), (r'\bmods\b', 'MODS', True), (r'metadata\s+object\s+description', 'MODS', True), (r'\blido\b', 'LIDO', True), (r'lightweight\s+information\s+describing', 'LIDO', True), (r'\bcidoc[-\s]?crm\b', 'CIDOC-CRM', True), (r'\bschema\.org\b', 'Schema.org', True), (r'\bschema:', 'Schema.org', True), (r'\bric[-\s]?o\b', 'RiC-O', True), (r'records\s+in\s+contexts', 'RiC-O', True), (r'\bpremis\b', 'PREMIS', True), (r'preservation\s+metadata', 'PREMIS', True), (r'\bbibframe\b', 'BIBFRAME', True), (r'\biiif\b', 'IIIF', True), (r'image\s+interoperability\s+framework', 'IIIF', True), ] # Dutch archive platform domains to detect DUTCH_ARCHIVE_PLATFORMS = [ 'archieven.nl', 'memorix.nl', 'archiefweb.eu', 'atlantisdigitaal.nl', 'picturae.nl', 'mais-flexis.nl', 'delpher.nl', 'geheugen.nl', ] def get_xpath(element, tree) -> str: """Generate XPath for an lxml element.""" parts = [] while element is not None: parent = element.getparent() if parent is None: parts.append(element.tag) else: siblings = [c for c in parent if c.tag == element.tag] if len(siblings) == 1: parts.append(element.tag) else: index = siblings.index(element) + 1 parts.append(f'{element.tag}[{index}]') element = parent return '/' + '/'.join(reversed(parts)) def normalize_url(url: str) -> str: """Normalize URL by removing noise query parameters.""" if not url: return url parsed = urlparse(url) # Remove common tracking/session parameters noise_params = ['sort', 'order', 'view', 'mode', 'ss', 'page', 'offset', 'limit', 'random', 'session', 'sid', 'token', 'ref'] if parsed.query: params = dict(p.split('=', 1) if '=' in p else (p, '') for p in parsed.query.split('&')) filtered = {k: v for k, v in params.items() if not any(k.startswith(n) for n in noise_params + ['utm_', 'fbclid', 'gclid'])} new_query = '&'.join(f'{k}={v}' if v else k for k, v in sorted(filtered.items())) return urlunparse(parsed._replace(query=new_query)) return url def detect_catalog_type(url: str) -> dict | None: """Detect catalog type from URL pattern.""" url_lower = url.lower() for type_key, type_info in CATALOG_TYPE_PATTERNS.items(): for pattern in type_info['patterns']: if re.search(pattern, url_lower): return { 'type': type_key, 'label': type_info['label'], 'description_nl': type_info['description_nl'], } return None def detect_metadata_standards(content: str) -> list[str]: """Detect metadata standards mentioned in content using regex word boundaries.""" if not content: return [] content_lower = content.lower() standards_found = set() for pattern, standard_name, use_regex in METADATA_STANDARDS_PATTERNS: if use_regex: if re.search(pattern, content_lower, re.IGNORECASE): standards_found.add(standard_name) else: if pattern.lower() in content_lower: standards_found.add(standard_name) return sorted(list(standards_found)) def detect_cms(content: str) -> str | None: """Detect CMS/collection management system from content.""" if not content: return None content_lower = content.lower() for cms_name, indicators in CMS_INDICATORS.items(): for indicator in indicators: if indicator in content_lower: return cms_name return None def extract_website_url(entry: dict) -> str | None: """Extract website URL from custodian entry.""" # Check various possible locations for website if 'website' in entry: return entry['website'] # Check in enrichment data for enrichment_key in ['zcbs_enrichment', 'google_maps_enrichment', 'wikidata_enrichment']: if enrichment_key in entry: enrichment = entry[enrichment_key] if isinstance(enrichment, dict): if 'website' in enrichment: return enrichment['website'] if 'url' in enrichment: return enrichment['url'] # Check identifiers if 'identifiers' in entry: for identifier in entry.get('identifiers', []): if isinstance(identifier, dict): if identifier.get('identifier_scheme') == 'Website': return identifier.get('identifier_value') return None async def crawl_website(crawler: AsyncWebCrawler, url: str) -> dict: """ Crawl a website and extract structured data with XPath provenance. Returns a dict with: - success: bool - title: str - description: str - html: str (raw HTML for further processing) - markdown: str - links: list of dicts with href, text, xpath - metadata: dict of og/meta tags - error: str (if failed) """ config = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, verbose=False, # Wait for page to fully load wait_until="networkidle", page_timeout=30000, ) try: result = await crawler.arun(url=url, config=config) if not result.success: return { 'success': False, 'error': f'Crawl failed with status {result.status_code}', 'status_code': result.status_code, } # Parse HTML with lxml to extract XPaths links_with_xpath = [] if result.html: try: tree = etree.HTML(result.html) link_elements = tree.xpath('//a[@href]') for link_el in link_elements: href = link_el.get('href', '') text = ''.join(link_el.itertext()).strip() xpath = get_xpath(link_el, tree) # Skip empty links and javascript if href and not href.startswith(('javascript:', '#', 'mailto:', 'tel:')): links_with_xpath.append({ 'href': href, 'text': text[:200] if text else '', # Truncate long text 'xpath': xpath, }) except Exception as e: logger.warning(f"Error parsing HTML for XPath extraction: {e}") # Also include crawl4ai's extracted links for completeness internal_links = result.links.get('internal', []) if result.links else [] external_links = result.links.get('external', []) if result.links else [] return { 'success': True, 'status_code': result.status_code, 'title': result.metadata.get('title', '') if result.metadata else '', 'description': result.metadata.get('description', '') if result.metadata else '', 'html': result.html, 'markdown': result.markdown.raw_markdown if result.markdown else '', 'links_with_xpath': links_with_xpath, 'internal_links': [l.get('href', '') for l in internal_links if isinstance(l, dict)], 'external_links': [l.get('href', '') for l in external_links if isinstance(l, dict)], 'metadata': result.metadata or {}, } except Exception as e: logger.error(f"Error crawling {url}: {e}") return { 'success': False, 'error': str(e), } def analyze_crawl_results(crawl_data: dict, base_url: str) -> dict: """ Analyze crawl results to extract APIs, catalogs, and metadata standards. Returns enrichment dict ready to add to YAML. """ enrichment = { 'retrieval_timestamp': datetime.now(timezone.utc).isoformat(), 'retrieval_agent': 'crawl4ai', 'source_url': base_url, 'status_code': crawl_data.get('status_code'), } if not crawl_data.get('success'): enrichment['error'] = crawl_data.get('error', 'Unknown error') return enrichment # Basic metadata enrichment['title'] = crawl_data.get('title', '') enrichment['description'] = crawl_data.get('description', '') enrichment['links_count'] = len(crawl_data.get('links_with_xpath', [])) # Collect all URLs for analysis all_urls = set() links_with_xpath = crawl_data.get('links_with_xpath', []) for link in links_with_xpath: href = link.get('href', '') if href: # Make absolute URL if relative if href.startswith('/'): parsed_base = urlparse(base_url) href = f"{parsed_base.scheme}://{parsed_base.netloc}{href}" all_urls.add(href) # Add internal/external links from crawl4ai for link in crawl_data.get('internal_links', []): if link: all_urls.add(link) for link in crawl_data.get('external_links', []): if link: all_urls.add(link) # Detect API endpoints detected_apis = [] for url in all_urls: url_lower = url.lower() for pattern in API_ENDPOINT_PATTERNS: if re.search(pattern, url_lower): detected_apis.append({ 'url': normalize_url(url), 'pattern_matched': pattern, }) break if detected_apis: enrichment['detected_api_endpoints'] = detected_apis # Detect catalog URLs with type classification detected_catalogs = [] for url in all_urls: url_lower = url.lower() for pattern in CATALOG_PATTERNS: if re.search(pattern, url_lower): catalog_entry = { 'url': normalize_url(url), } catalog_type = detect_catalog_type(url) if catalog_type: catalog_entry['type'] = catalog_type['type'] catalog_entry['label'] = catalog_type['label'] # Find XPath for this link for link in links_with_xpath: if link.get('href', '').rstrip('/') == url.rstrip('/') or \ (link.get('href', '').startswith('/') and url.endswith(link.get('href', ''))): catalog_entry['xpath'] = link.get('xpath') catalog_entry['link_text'] = link.get('text', '') break detected_catalogs.append(catalog_entry) break if detected_catalogs: enrichment['detected_catalog_urls'] = detected_catalogs # Detect external archive platforms external_platforms = [] for url in all_urls: url_lower = url.lower() for platform in DUTCH_ARCHIVE_PLATFORMS: if platform in url_lower: external_platforms.append({ 'url': normalize_url(url), 'platform': platform, }) break if external_platforms: enrichment['external_archive_platforms'] = external_platforms # Detect metadata standards from content # Handle None values explicitly to avoid string concatenation errors markdown = crawl_data.get('markdown') or '' title = crawl_data.get('title') or '' description = crawl_data.get('description') or '' content = f"{markdown} {title} {description}" standards = detect_metadata_standards(content) if standards: enrichment['detected_standards'] = standards # Detect CMS cms = detect_cms(content) if cms: enrichment['detected_cms'] = cms # Extract OG/meta tags of interest metadata = crawl_data.get('metadata', {}) og_data = {} for key in ['og:title', 'og:description', 'og:image', 'og:url', 'og:site_name']: if key in metadata: og_data[key.replace('og:', '')] = metadata[key] if og_data: enrichment['open_graph'] = og_data return enrichment def load_checkpoint() -> dict: """Load checkpoint from file.""" if CHECKPOINT_FILE.exists(): with open(CHECKPOINT_FILE, 'r') as f: return json.load(f) return {} def save_checkpoint(checkpoint: dict): """Save checkpoint to file.""" with open(CHECKPOINT_FILE, 'w') as f: json.dump(checkpoint, f, indent=2) async def process_single_file( crawler: AsyncWebCrawler, filepath: Path, dry_run: bool = False, force: bool = False, ) -> bool: """Process a single custodian YAML file.""" try: with open(filepath, 'r', encoding='utf-8') as f: entry = yaml.safe_load(f) if not entry: logger.warning(f"Empty file: {filepath}") return False # Check if already enriched if 'crawl4ai_enrichment' in entry and not force: logger.info(f"Skipping {filepath.name}: already has crawl4ai_enrichment") return True # Extract website URL website_url = extract_website_url(entry) if not website_url: logger.info(f"Skipping {filepath.name}: no website URL found") return False # Ensure URL has protocol if not website_url.startswith(('http://', 'https://')): website_url = 'https://' + website_url logger.info(f"Processing {filepath.name}: {website_url}") if dry_run: logger.info(f" -> DRY RUN: would crawl {website_url}") return True # Crawl the website crawl_data = await crawl_website(crawler, website_url) # Analyze results enrichment = analyze_crawl_results(crawl_data, website_url) # Add enrichment to entry entry['crawl4ai_enrichment'] = enrichment # Write back to file with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False) # Log summary apis_count = len(enrichment.get('detected_api_endpoints', [])) catalogs_count = len(enrichment.get('detected_catalog_urls', [])) platforms_count = len(enrichment.get('external_archive_platforms', [])) logger.info(f" -> success: {apis_count} APIs, {catalogs_count} catalogs, {platforms_count} external platforms found") return True except Exception as e: logger.error(f"Error processing {filepath}: {e}") return False async def main(): parser = argparse.ArgumentParser(description='Enrich Dutch custodians with Crawl4AI') parser.add_argument('--dry-run', action='store_true', help='Show what would be enriched') parser.add_argument('--limit', type=int, help='Process only first N files') parser.add_argument('--start-index', type=int, default=0, help='Start from index N') parser.add_argument('--resume', action='store_true', help='Resume from last checkpoint') parser.add_argument('--force', action='store_true', help='Re-enrich even if already enriched') parser.add_argument('--file', type=str, help='Process a single specific file') args = parser.parse_args() # Create logs directory logs_dir = Path(__file__).parent.parent / "logs" logs_dir.mkdir(exist_ok=True) # Add file handler for logging log_file = logs_dir / f"crawl4ai_enrichment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" file_handler = logging.FileHandler(log_file) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) # Single file mode if args.file: filepath = Path(args.file) if not filepath.exists(): logger.error(f"File not found: {filepath}") sys.exit(1) async with AsyncWebCrawler() as crawler: success = await process_single_file(crawler, filepath, args.dry_run, args.force) sys.exit(0 if success else 1) # Batch mode files = sorted(CUSTODIAN_DIR.glob("NL-*.yaml")) logger.info(f"Found {len(files)} Dutch custodian files") # Handle resume start_index = args.start_index if args.resume: checkpoint = load_checkpoint() if 'last_processed_index' in checkpoint: start_index = checkpoint['last_processed_index'] + 1 logger.info(f"Resuming from index {start_index}") # Apply limit end_index = len(files) if args.limit: end_index = min(start_index + args.limit, len(files)) logger.info(f"Processing files {start_index} to {end_index - 1}") # Process files success_count = 0 error_count = 0 async with AsyncWebCrawler() as crawler: for i, filepath in enumerate(files[start_index:end_index], start=start_index): logger.info(f"[{i + 1}/{len(files)}] Processing {filepath.name}") success = await process_single_file(crawler, filepath, args.dry_run, args.force) if success: success_count += 1 else: error_count += 1 # Save checkpoint if not args.dry_run: save_checkpoint({ 'last_processed_index': i, 'last_processed_file': str(filepath), 'last_processed_time': datetime.now(timezone.utc).isoformat(), 'success_count': success_count, 'error_count': error_count, }) # Rate limiting if i < end_index - 1: await asyncio.sleep(REQUEST_DELAY) # Summary logger.info(f"\n{'='*50}") logger.info(f"Enrichment complete!") logger.info(f" Success: {success_count}") logger.info(f" Errors: {error_count}") logger.info(f" Log file: {log_file}") if __name__ == '__main__': asyncio.run(main())