#!/usr/bin/env python3 """ Enrich custodian YAML files with logo images using Crawl4AI. This script extracts logo URLs from heritage institution websites with proper provenance, following AGENTS.md Rule 6 (WebObservation Claims MUST Have XPath Provenance). Crawl4AI advantages over Playwright: - LLM-friendly structured output - Built-in caching (avoids re-fetching) - Magic mode for auto-handling cookies/popups - Simpler API for extraction Logo extraction looks for: 1. or (favicon/icon) 2. (Open Graph image) 3. elements with logo/brand in class/id/alt attributes 4. SVG elements with logo class/id Output format follows WebClaim schema with: - claim_type: logo_url, favicon_url, og_image_url - claim_value: The extracted image URL - source_url: Website where logo was found - css_selector: CSS selector to the element (for verification) - retrieved_on: ISO 8601 timestamp Usage: python scripts/enrich_custodian_logos_crawl4ai.py [options] Options: --dry-run Show what would be enriched without modifying files --limit N Process only first N files (for testing) --file PATH Process a single specific file --country CODE Filter by country code (e.g., NL, BE, DE) --resume Resume from last checkpoint --no-cache Disable crawl4ai caching Requirements: pip install crawl4ai pyyaml """ import argparse import asyncio import json import logging import re import sys from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin, urlparse import yaml try: from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig from crawl4ai.extraction_strategy import JsonCssExtractionStrategy except ImportError: print("Please install crawl4ai: pip install crawl4ai") sys.exit(1) from bs4 import BeautifulSoup # Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" CHECKPOINT_FILE = CUSTODIAN_DIR / ".logo_enrichment_crawl4ai_checkpoint.json" REQUEST_DELAY = 2.0 # seconds between requests def get_website_url(entry: dict) -> str | None: """Extract website URL from custodian entry.""" # Priority 1: Original entry webadres (Dutch ISIL format) if entry.get('original_entry', {}).get('webadres_organisatie'): url = entry['original_entry']['webadres_organisatie'] if url and url.strip() and url.strip().lower() not in ('null', 'none', ''): return normalize_url(url.strip()) # Priority 2: Website in identifiers array (Czech ISIL and ARON format) for ident in entry.get('original_entry', {}).get('identifiers', []): if ident.get('identifier_scheme') == 'Website': url = ident.get('identifier_value') or ident.get('identifier_url') if url and url.strip(): return normalize_url(url.strip()) # Priority 3: Museum register website if entry.get('museum_register_enrichment', {}).get('website_url'): url = entry['museum_register_enrichment']['website_url'] if url and url.strip(): return normalize_url(url.strip()) # Priority 4: Wikidata official website if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'): url = entry['wikidata_enrichment']['wikidata_official_website'] # Handle list of URLs (take first one) if isinstance(url, list): url = url[0] if url else None if url and isinstance(url, str) and url.strip(): return normalize_url(url.strip()) # Priority 5: Google Maps website if entry.get('google_maps_enrichment', {}).get('website'): url = entry['google_maps_enrichment']['website'] if url and url.strip(): return normalize_url(url.strip()) # Priority 6: Web enrichment source URL if entry.get('web_enrichment', {}).get('source_url'): url = entry['web_enrichment']['source_url'] if url and url.strip(): return normalize_url(url.strip()) return None def normalize_url(url: str) -> str: """Normalize URL to ensure it has a scheme.""" if not url: return url url = url.strip() if not url.startswith(('http://', 'https://')): url = 'https://' + url url = url.rstrip('/') return url def get_custodian_name(entry: dict) -> str: """Get display name for a custodian entry.""" if entry.get('custodian_name', {}).get('emic_name'): return entry['custodian_name']['emic_name'] if entry.get('original_entry', {}).get('organisatie'): return entry['original_entry']['organisatie'] if entry.get('museum_register_enrichment', {}).get('museum_name'): return entry['museum_register_enrichment']['museum_name'] return "Unknown" def load_checkpoint() -> dict: """Load progress checkpoint.""" if CHECKPOINT_FILE.exists(): with open(CHECKPOINT_FILE, 'r') as f: return json.load(f) return {'processed_files': [], 'last_index': 0} def save_checkpoint(checkpoint: dict): """Save progress checkpoint.""" with open(CHECKPOINT_FILE, 'w') as f: json.dump(checkpoint, f, indent=2) def extract_logos_from_html(html: str, base_url: str) -> dict: """ Extract logo information from HTML using BeautifulSoup. Returns structured data with favicons, og images, and logo images. """ soup = BeautifulSoup(html, 'lxml') results = { 'favicons': [], 'ogImages': [], 'logos': [], 'svgLogos': [], 'primaryLogo': None } # Logo detection patterns logo_pattern = re.compile(r'logo|brand|site-icon|masthead|emblem', re.I) exclude_pattern = re.compile(r'sponsor|partner|social|facebook|twitter|instagram|linkedin|youtube|tiktok|footer-logo|cookie', re.I) def make_absolute(url: str) -> str: """Convert relative URL to absolute.""" if not url: return url if url.startswith('data:'): return url return urljoin(base_url, url) def get_css_selector(el) -> str: """Generate a CSS selector for an element.""" parts = [] while el and el.name: selector = el.name if el.get('id'): selector = f"#{el['id']}" parts.insert(0, selector) break elif el.get('class'): classes = el['class'][:2] # Limit to first 2 classes if classes: selector += '.' + '.'.join(classes) # Add nth-of-type if needed siblings = el.find_previous_siblings(el.name) if el.parent else [] if siblings: selector += f':nth-of-type({len(siblings) + 1})' parts.insert(0, selector) el = el.parent return ' > '.join(parts) # Extract favicons for link in soup.find_all('link', rel=lambda x: x and ('icon' in x or 'apple-touch' in str(x))): href = link.get('href') if href: results['favicons'].append({ 'href': make_absolute(href), 'rel': ' '.join(link.get('rel', [])), 'type': link.get('type', ''), 'sizes': link.get('sizes', ''), 'selector': get_css_selector(link) }) # Extract OG image og_image = soup.find('meta', property='og:image') if og_image and og_image.get('content'): results['ogImages'].append({ 'content': make_absolute(og_image['content']), 'selector': get_css_selector(og_image) }) # Twitter image twitter_image = soup.find('meta', attrs={'name': 'twitter:image'}) if twitter_image and twitter_image.get('content'): results['ogImages'].append({ 'content': make_absolute(twitter_image['content']), 'selector': get_css_selector(twitter_image) }) # Find header/nav for primary logo header = soup.find(['header', 'nav']) or soup.find(role='banner') if header: # Look for images in header for img in header.find_all('img'): attrs_str = ' '.join([ img.get('class', [''])[0] if img.get('class') else '', img.get('id', ''), img.get('alt', ''), img.get('src', '') ]).lower() if logo_pattern.search(attrs_str) and not exclude_pattern.search(attrs_str): src = img.get('src') if src: if not results['primaryLogo']: results['primaryLogo'] = { 'src': make_absolute(src), 'alt': img.get('alt', ''), 'class': ' '.join(img.get('class', [])), 'id': img.get('id', ''), 'selector': get_css_selector(img), 'location': 'header' } # Look for SVG logos in header for svg in header.find_all('svg'): attrs_str = ' '.join([ svg.get('class', [''])[0] if svg.get('class') else '', svg.get('id', '') ]).lower() if logo_pattern.search(attrs_str) and not exclude_pattern.search(attrs_str): if not results['primaryLogo']: results['primaryLogo'] = { 'src': '[inline-svg]', 'alt': svg.get('aria-label', ''), 'class': ' '.join(svg.get('class', [])) if svg.get('class') else '', 'id': svg.get('id', ''), 'selector': get_css_selector(svg), 'location': 'header', 'isInlineSvg': True } results['svgLogos'].append({ 'class': ' '.join(svg.get('class', [])) if svg.get('class') else '', 'id': svg.get('id', ''), 'selector': get_css_selector(svg), 'ariaLabel': svg.get('aria-label', '') }) # Find other logo images on page seen_srcs = set() for img in soup.find_all('img'): attrs_str = ' '.join([ img.get('class', [''])[0] if img.get('class') else '', img.get('id', ''), img.get('alt', ''), img.get('src', '') ]).lower() if logo_pattern.search(attrs_str) and not exclude_pattern.search(attrs_str): src = img.get('src') if src and src not in seen_srcs: seen_srcs.add(src) results['logos'].append({ 'src': make_absolute(src), 'alt': img.get('alt', ''), 'class': ' '.join(img.get('class', [])) if img.get('class') else '', 'id': img.get('id', ''), 'selector': get_css_selector(img) }) # Deduplicate favicons by href seen_hrefs = set() results['favicons'] = [ f for f in results['favicons'] if f['href'] not in seen_hrefs and not seen_hrefs.add(f['href']) ] return results def select_best_favicon(favicons: list[dict]) -> dict | None: """Select the best favicon from available options.""" if not favicons: return None # Priority: SVG > largest PNG > ICO svg_favicons = [f for f in favicons if f['href'].endswith('.svg') or f['type'] == 'image/svg+xml'] if svg_favicons: return svg_favicons[0] # Look for apple-touch-icon (high res) apple_icons = [f for f in favicons if 'apple-touch' in f['rel']] if apple_icons: sized = [f for f in apple_icons if f.get('sizes')] if sized: sized.sort(key=lambda x: int(x['sizes'].split('x')[0]) if 'x' in x['sizes'] else 0, reverse=True) return sized[0] return apple_icons[0] # Look for standard icon icons = [f for f in favicons if 'icon' in f['rel']] if icons: png_icons = [i for i in icons if '.png' in i['href']] if png_icons: sized = [f for f in png_icons if f.get('sizes')] if sized: sized.sort(key=lambda x: int(x['sizes'].split('x')[0]) if 'x' in x['sizes'] else 0, reverse=True) return sized[0] return png_icons[0] return icons[0] return favicons[0] if favicons else None def build_logo_claims(logo_data: dict, source_url: str, timestamp: str) -> list[dict]: """Build WebClaim-compatible claims from extracted logo data.""" claims = [] # Primary logo (highest priority) if logo_data.get('primaryLogo'): primary = logo_data['primaryLogo'] if primary.get('isInlineSvg'): claims.append({ 'claim_type': 'logo_url', 'claim_value': '[inline-svg]', 'source_url': source_url, 'css_selector': primary.get('selector', ''), 'retrieved_on': timestamp, 'extraction_method': 'crawl4ai_svg_detection', 'detection_confidence': 'high', 'is_inline_svg': True, 'aria_label': primary.get('alt', ''), }) elif primary.get('src'): claims.append({ 'claim_type': 'logo_url', 'claim_value': primary['src'], 'source_url': source_url, 'css_selector': primary.get('selector', ''), 'retrieved_on': timestamp, 'extraction_method': 'crawl4ai_header_logo', 'detection_confidence': 'high', 'alt_text': primary.get('alt', ''), }) # Best favicon best_favicon = select_best_favicon(logo_data.get('favicons', [])) if best_favicon: claims.append({ 'claim_type': 'favicon_url', 'claim_value': best_favicon['href'], 'source_url': source_url, 'css_selector': best_favicon.get('selector', ''), 'retrieved_on': timestamp, 'extraction_method': 'crawl4ai_link_rel', 'favicon_type': best_favicon.get('type', ''), 'favicon_sizes': best_favicon.get('sizes', ''), }) # OG Image if logo_data.get('ogImages'): og = logo_data['ogImages'][0] claims.append({ 'claim_type': 'og_image_url', 'claim_value': og['content'], 'source_url': source_url, 'css_selector': og.get('selector', ''), 'retrieved_on': timestamp, 'extraction_method': 'crawl4ai_meta_og', }) return claims async def extract_logos_from_url(crawler: AsyncWebCrawler, url: str) -> dict | None: """Crawl URL and extract logo information using Crawl4AI.""" try: # Configure crawl for this request config = CrawlerRunConfig( wait_until="domcontentloaded", page_timeout=30000, delay_before_return_html=1.5, # Wait for JS magic=True, # Auto-handle popups/cookies remove_overlay_elements=True, # Remove cookie banners ) result = await crawler.arun(url=url, config=config) if not result.success: logger.warning(f"Failed to crawl {url}: {result.error_message}") return None if not result.html: logger.warning(f"No HTML content from {url}") return None # Extract logos from HTML logo_data = extract_logos_from_html(result.html, url) return logo_data except Exception as e: logger.error(f"Error extracting logos from {url}: {e}") return None async def enrich_custodian_with_logos( filepath: Path, crawler: AsyncWebCrawler, dry_run: bool = False ) -> dict: """ Enrich a single custodian file with logo data. Returns dict with: - success: bool - logos_found: int - message: str """ try: with open(filepath, 'r', encoding='utf-8') as f: entry = yaml.safe_load(f) if not entry: return {'success': False, 'logos_found': 0, 'message': 'Empty file'} # Check if already has logo enrichment if entry.get('logo_enrichment', {}).get('claims'): return { 'success': True, 'logos_found': len(entry['logo_enrichment']['claims']), 'message': 'Already enriched (skipped)' } # Get website URL website_url = get_website_url(entry) if not website_url: return {'success': False, 'logos_found': 0, 'message': 'No website URL'} custodian_name = get_custodian_name(entry) logger.info(f"Processing: {custodian_name} ({website_url})") # Extract logos logo_data = await extract_logos_from_url(crawler, website_url) if not logo_data: return {'success': False, 'logos_found': 0, 'message': 'Failed to extract logos'} # Build claims timestamp = datetime.now(timezone.utc).isoformat() claims = build_logo_claims(logo_data, website_url, timestamp) if not claims: return {'success': True, 'logos_found': 0, 'message': 'No logos found'} # Prepare enrichment data logo_enrichment = { 'enrichment_timestamp': timestamp, 'source_url': website_url, 'extraction_method': 'crawl4ai', 'claims': claims, 'summary': { 'total_claims': len(claims), 'has_primary_logo': logo_data.get('primaryLogo') is not None, 'has_favicon': any(c['claim_type'] == 'favicon_url' for c in claims), 'has_og_image': any(c['claim_type'] == 'og_image_url' for c in claims), 'favicon_count': len(logo_data.get('favicons', [])), } } if dry_run: logger.info(f" [DRY RUN] Would add {len(claims)} logo claims") for claim in claims: value = claim['claim_value'] if len(value) > 80: value = value[:80] + "..." logger.info(f" - {claim['claim_type']}: {value}") return {'success': True, 'logos_found': len(claims), 'message': 'Dry run'} # Update entry entry['logo_enrichment'] = logo_enrichment # Save updated entry with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False) return { 'success': True, 'logos_found': len(claims), 'message': f'Added {len(claims)} logo claims' } except Exception as e: logger.error(f"Error processing {filepath}: {e}") return {'success': False, 'logos_found': 0, 'message': str(e)} async def main(): parser = argparse.ArgumentParser(description='Enrich custodian files with logo data using Crawl4AI') parser.add_argument('--dry-run', action='store_true', help='Show what would be done') parser.add_argument('--limit', type=int, default=0, help='Process only N files') parser.add_argument('--file', type=str, help='Process a single file') parser.add_argument('--country', type=str, help='Filter by country code') parser.add_argument('--resume', action='store_true', help='Resume from checkpoint') parser.add_argument('--no-cache', action='store_true', help='Disable crawl4ai caching') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Configure browser browser_config = BrowserConfig( headless=True, viewport_width=1280, viewport_height=720, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) async with AsyncWebCrawler(config=browser_config) as crawler: # Single file mode if args.file: filepath = Path(args.file) if not filepath.exists(): logger.error(f"File not found: {filepath}") sys.exit(1) result = await enrich_custodian_with_logos(filepath, crawler, args.dry_run) logger.info(f"Result: {result['message']} ({result['logos_found']} logos)") return # Batch mode checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'last_index': 0} # Get all custodian files files = sorted(CUSTODIAN_DIR.glob('*.yaml')) # Apply country filter if args.country: files = [f for f in files if f.name.startswith(f"{args.country}-")] # Skip already processed (BEFORE applying limit) if args.resume: processed_set = set(checkpoint['processed_files']) files = [f for f in files if f.name not in processed_set] logger.info(f"Resuming: {len(processed_set)} files already processed, {len(files)} remaining") # Apply limit (AFTER resume filter) if args.limit > 0: files = files[:args.limit] logger.info(f"Processing {len(files)} custodian files...") stats = { 'processed': 0, 'success': 0, 'failed': 0, 'skipped': 0, 'logos_found': 0, } for i, filepath in enumerate(files): try: result = await enrich_custodian_with_logos(filepath, crawler, args.dry_run) stats['processed'] += 1 if result['success']: if 'skipped' in result['message'].lower(): stats['skipped'] += 1 else: stats['success'] += 1 stats['logos_found'] += result['logos_found'] else: stats['failed'] += 1 # Update checkpoint checkpoint['processed_files'].append(filepath.name) checkpoint['last_index'] = i if (i + 1) % 10 == 0: save_checkpoint(checkpoint) logger.info(f"Progress: {i+1}/{len(files)} - {stats['logos_found']} logos found") # Rate limiting await asyncio.sleep(REQUEST_DELAY) except KeyboardInterrupt: logger.info("Interrupted - saving checkpoint...") save_checkpoint(checkpoint) break # Final checkpoint save_checkpoint(checkpoint) # Summary logger.info("\n" + "="*60) logger.info("LOGO ENRICHMENT SUMMARY (Crawl4AI)") logger.info("="*60) logger.info(f"Total processed: {stats['processed']}") logger.info(f"Successful: {stats['success']}") logger.info(f"Failed: {stats['failed']}") logger.info(f"Skipped (already enriched): {stats['skipped']}") logger.info(f"Total logos found: {stats['logos_found']}") logger.info("="*60) if __name__ == '__main__': asyncio.run(main())