#!/usr/bin/env python3 """ Enrich custodian YAML files with logo images using Firecrawl. This script extracts logo URLs from heritage institution websites with proper xpath provenance, following AGENTS.md Rule 6 (WebObservation Claims MUST Have XPath Provenance). Logo extraction looks for: 1. or (favicon/icon) 2. (Open Graph image) 3. elements with logo/brand in class/id/alt attributes 4. SVG elements in header/nav regions Output format follows WebClaim schema with: - claim_type: logo_url, favicon_url, og_image_url - claim_value: The extracted image URL - source_url: Website where logo was found - xpath: XPath to the element (for verification) - xpath_match_score: Always 1.0 for direct attribute extraction - retrieved_on: ISO 8601 timestamp - html_file: Path to archived HTML (if available) Usage: python scripts/enrich_custodian_logos.py [options] Options: --dry-run Show what would be enriched without modifying files --limit N Process only first N files (for testing) --file PATH Process a single specific file --country CODE Filter by country code (e.g., NL, BE, DE) --resume Resume from last checkpoint Environment Variables: FIRECRAWL_API_KEY - Required API key for Firecrawl """ import argparse import json import logging import os import re import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import urljoin, urlparse import httpx import yaml from dotenv import load_dotenv # Load environment variables load_dotenv() # Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration FIRECRAWL_API_BASE = "https://api.firecrawl.dev/v2" CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" CHECKPOINT_FILE = CUSTODIAN_DIR / ".logo_enrichment_checkpoint.json" REQUEST_DELAY = 3.5 # seconds between requests # API Key FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", "") # Logo detection patterns - prioritized by specificity LOGO_PATTERNS = { # High confidence patterns (explicit logo indicators) 'high': [ r'logo', r'brand', r'site-icon', r'site-logo', r'header-logo', r'nav-logo', r'navbar-brand', r'company-logo', r'organization-logo', ], # Medium confidence (common logo locations) 'medium': [ r'emblem', r'symbol', r'masthead', r'identity', ], } class FirecrawlClient: """Firecrawl API client for logo extraction.""" def __init__(self, api_key: str): self.api_key = api_key self.base_url = FIRECRAWL_API_BASE self.client = httpx.Client( timeout=60.0, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } ) def scrape_for_logos(self, url: str) -> dict | None: """ Scrape a URL and extract logo-related elements. Returns dict with: - html: Raw HTML content - metadata: Extracted metadata (og:image, icons, etc.) - links: All links found on page """ payload = { "url": url, "formats": ["html", "links"], "onlyMainContent": False, # Need full page for header/footer logos "maxAge": 172800000, # 2 days cache "blockAds": True, "skipTlsVerification": True, } max_retries = 3 for attempt in range(max_retries): try: response = self.client.post(f"{self.base_url}/scrape", json=payload) response.raise_for_status() result = response.json() if result.get("success"): return result.get("data") else: logger.warning(f"Scrape failed for {url}: {result}") return None except httpx.HTTPStatusError as e: if e.response.status_code == 429 and attempt < max_retries - 1: wait_time = (attempt + 1) * 10 logger.warning(f"Rate limited, waiting {wait_time}s...") time.sleep(wait_time) continue logger.error(f"HTTP error scraping {url}: {e.response.status_code}") return None except Exception as e: logger.error(f"Error scraping {url}: {e}") return None return None def close(self): self.client.close() def extract_logos_from_html(html: str, base_url: str) -> list[dict]: """ Extract logo URLs from HTML content with xpath provenance. Returns list of WebClaim-compatible dicts. """ from html.parser import HTMLParser logos = [] timestamp = datetime.now(timezone.utc).isoformat() # Parse HTML to find logo elements class LogoExtractor(HTMLParser): def __init__(self): super().__init__() self.path = [] self.index_stack = [{}] # Track element indices at each level self.results = [] def get_xpath(self): """Generate XPath from current path.""" if not self.path: return "/" parts = [] for i, (tag, idx) in enumerate(self.path): parts.append(f"{tag}[{idx}]") return "/" + "/".join(parts) def handle_starttag(self, tag, attrs): # Track element index at this level parent_indices = self.index_stack[-1] if tag not in parent_indices: parent_indices[tag] = 0 parent_indices[tag] += 1 idx = parent_indices[tag] self.path.append((tag, idx)) self.index_stack.append({}) attrs_dict = dict(attrs) # Check for favicon/icon links if tag == 'link': rel = (attrs_dict.get('rel') or '').lower() href = attrs_dict.get('href') or '' if 'icon' in rel and href: icon_url = urljoin(base_url, href) # Determine claim type if 'apple-touch' in rel: claim_type = 'logo_url' # Apple touch icons are typically high-res logos else: claim_type = 'favicon_url' self.results.append({ 'claim_type': claim_type, 'claim_value': icon_url, 'source_url': base_url, 'xpath': self.get_xpath() + "/@href", 'xpath_match_score': 1.0, 'retrieved_on': timestamp, 'extraction_method': 'link_rel_icon', }) # Check for og:image meta tag elif tag == 'meta': prop = (attrs_dict.get('property') or '').lower() name = (attrs_dict.get('name') or '').lower() content = attrs_dict.get('content') or '' if (prop == 'og:image' or name == 'og:image') and content: og_url = urljoin(base_url, content) self.results.append({ 'claim_type': 'og_image_url', 'claim_value': og_url, 'source_url': base_url, 'xpath': self.get_xpath() + "/@content", 'xpath_match_score': 1.0, 'retrieved_on': timestamp, 'extraction_method': 'meta_og_image', }) # Check for img elements with logo indicators elif tag == 'img': src = attrs_dict.get('src') or '' alt = (attrs_dict.get('alt') or '').lower() cls = (attrs_dict.get('class') or '').lower() id_attr = (attrs_dict.get('id') or '').lower() # Check if this looks like a logo all_attrs = f"{alt} {cls} {id_attr} {src.lower()}" is_logo = False confidence = 'low' for pattern in LOGO_PATTERNS['high']: if re.search(pattern, all_attrs, re.IGNORECASE): is_logo = True confidence = 'high' break if not is_logo: for pattern in LOGO_PATTERNS['medium']: if re.search(pattern, all_attrs, re.IGNORECASE): is_logo = True confidence = 'medium' break if is_logo and src: img_url = urljoin(base_url, src) # Skip data URLs and tiny tracking pixels if not img_url.startswith('data:') and '1x1' not in img_url: self.results.append({ 'claim_type': 'logo_url', 'claim_value': img_url, 'source_url': base_url, 'xpath': self.get_xpath() + "/@src", 'xpath_match_score': 1.0, 'retrieved_on': timestamp, 'extraction_method': f'img_logo_detection_{confidence}', 'detection_confidence': confidence, }) # Check for SVG logos elif tag == 'svg': cls = (attrs_dict.get('class') or '').lower() id_attr = (attrs_dict.get('id') or '').lower() all_attrs = f"{cls} {id_attr}" for pattern in LOGO_PATTERNS['high']: if re.search(pattern, all_attrs, re.IGNORECASE): self.results.append({ 'claim_type': 'logo_url', 'claim_value': f"[SVG inline at {self.get_xpath()}]", 'source_url': base_url, 'xpath': self.get_xpath(), 'xpath_match_score': 1.0, 'retrieved_on': timestamp, 'extraction_method': 'svg_logo_detection', 'is_inline_svg': True, }) break def handle_endtag(self, tag): if self.path and self.path[-1][0] == tag: self.path.pop() self.index_stack.pop() try: parser = LogoExtractor() parser.feed(html) return parser.results except Exception as e: logger.error(f"Error parsing HTML: {e}") return [] def deduplicate_logos(logos: list[dict]) -> list[dict]: """ Deduplicate logos, keeping highest confidence for each URL. Priority order: 1. logo_url (explicit logos) over favicon_url 2. High confidence over medium/low 3. First occurrence wins for ties """ seen_urls = {} # Priority scoring type_priority = {'logo_url': 3, 'og_image_url': 2, 'favicon_url': 1} confidence_priority = {'high': 3, 'medium': 2, 'low': 1} for logo in logos: url = logo['claim_value'] if url.startswith('[SVG'): # Always keep inline SVGs (they're unique) key = logo['xpath'] else: key = url if key not in seen_urls: seen_urls[key] = logo else: # Compare priorities existing = seen_urls[key] new_score = ( type_priority.get(logo['claim_type'], 0) * 10 + confidence_priority.get(logo.get('detection_confidence', 'low'), 1) ) existing_score = ( type_priority.get(existing['claim_type'], 0) * 10 + confidence_priority.get(existing.get('detection_confidence', 'low'), 1) ) if new_score > existing_score: seen_urls[key] = logo return list(seen_urls.values()) def get_website_url(entry: dict) -> str | None: """Extract website URL from custodian entry.""" # Priority 1: Original entry webadres if entry.get('original_entry', {}).get('webadres_organisatie'): url = entry['original_entry']['webadres_organisatie'] if url and url.strip() and url.strip().lower() != 'null': return normalize_url(url.strip()) # Priority 2: Museum register website if entry.get('museum_register_enrichment', {}).get('website_url'): url = entry['museum_register_enrichment']['website_url'] if url and url.strip(): return normalize_url(url.strip()) # Priority 3: Wikidata official website if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'): url = entry['wikidata_enrichment']['wikidata_official_website'] if url and url.strip(): return normalize_url(url.strip()) # Priority 4: Google Maps website if entry.get('google_maps_enrichment', {}).get('website'): url = entry['google_maps_enrichment']['website'] if url and url.strip(): return normalize_url(url.strip()) return None def normalize_url(url: str) -> str: """Normalize URL to ensure it has a scheme.""" if not url: return url url = url.strip() if not url.startswith(('http://', 'https://')): url = 'https://' + url url = url.rstrip('/') return url def get_custodian_name(entry: dict) -> str: """Get display name for a custodian entry.""" if entry.get('custodian_name', {}).get('emic_name'): return entry['custodian_name']['emic_name'] if entry.get('original_entry', {}).get('organisatie'): return entry['original_entry']['organisatie'] if entry.get('museum_register_enrichment', {}).get('museum_name'): return entry['museum_register_enrichment']['museum_name'] return "Unknown" def load_checkpoint() -> dict: """Load progress checkpoint.""" if CHECKPOINT_FILE.exists(): with open(CHECKPOINT_FILE, 'r') as f: return json.load(f) return {'processed_files': [], 'last_index': 0} def save_checkpoint(checkpoint: dict): """Save progress checkpoint.""" with open(CHECKPOINT_FILE, 'w') as f: json.dump(checkpoint, f, indent=2) def enrich_custodian_with_logos( filepath: Path, client: FirecrawlClient, dry_run: bool = False ) -> dict: """ Enrich a single custodian file with logo data. Returns dict with: - success: bool - logos_found: int - message: str """ try: with open(filepath, 'r', encoding='utf-8') as f: entry = yaml.safe_load(f) if not entry: return {'success': False, 'logos_found': 0, 'message': 'Empty file'} # Check if already has logo enrichment if entry.get('logo_enrichment', {}).get('claims'): return { 'success': True, 'logos_found': len(entry['logo_enrichment']['claims']), 'message': 'Already enriched (skipped)' } # Get website URL website_url = get_website_url(entry) if not website_url: return {'success': False, 'logos_found': 0, 'message': 'No website URL'} custodian_name = get_custodian_name(entry) logger.info(f"Processing: {custodian_name} ({website_url})") # Scrape website scrape_result = client.scrape_for_logos(website_url) if not scrape_result: return {'success': False, 'logos_found': 0, 'message': 'Scrape failed'} # Extract logos from HTML html = scrape_result.get('html', '') if not html: return {'success': False, 'logos_found': 0, 'message': 'No HTML content'} logos = extract_logos_from_html(html, website_url) logos = deduplicate_logos(logos) if not logos: return {'success': True, 'logos_found': 0, 'message': 'No logos found'} # Prepare enrichment data timestamp = datetime.now(timezone.utc).isoformat() logo_enrichment = { 'enrichment_timestamp': timestamp, 'source_url': website_url, 'extraction_method': 'firecrawl_html_parsing', 'claims': logos, 'summary': { 'total_logos_found': len(logos), 'logo_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'logo_url' and not l.get('is_inline_svg')], 'favicon_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'favicon_url'], 'og_image_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'og_image_url'], 'has_inline_svg': any(l.get('is_inline_svg') for l in logos), } } if dry_run: logger.info(f" [DRY RUN] Would add {len(logos)} logo claims") return {'success': True, 'logos_found': len(logos), 'message': 'Dry run'} # Update entry entry['logo_enrichment'] = logo_enrichment # Add to provenance notes if 'provenance' not in entry: entry['provenance'] = {} if 'notes' not in entry['provenance']: entry['provenance']['notes'] = [] entry['provenance']['notes'].append( f"Logo enrichment added on {timestamp} - {len(logos)} claims extracted" ) # Save updated entry with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False) return { 'success': True, 'logos_found': len(logos), 'message': f'Added {len(logos)} logo claims' } except Exception as e: logger.error(f"Error processing {filepath}: {e}") return {'success': False, 'logos_found': 0, 'message': str(e)} def main(): parser = argparse.ArgumentParser(description='Enrich custodian files with logo data') parser.add_argument('--dry-run', action='store_true', help='Show what would be done') parser.add_argument('--limit', type=int, default=0, help='Process only N files') parser.add_argument('--file', type=str, help='Process a single file') parser.add_argument('--country', type=str, help='Filter by country code') parser.add_argument('--resume', action='store_true', help='Resume from checkpoint') args = parser.parse_args() if not FIRECRAWL_API_KEY: logger.error("FIRECRAWL_API_KEY environment variable not set") sys.exit(1) client = FirecrawlClient(FIRECRAWL_API_KEY) try: # Single file mode if args.file: filepath = Path(args.file) if not filepath.exists(): logger.error(f"File not found: {filepath}") sys.exit(1) result = enrich_custodian_with_logos(filepath, client, args.dry_run) logger.info(f"Result: {result['message']} ({result['logos_found']} logos)") return # Batch mode checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'last_index': 0} # Get all custodian files files = sorted(CUSTODIAN_DIR.glob('*.yaml')) # Apply country filter if args.country: files = [f for f in files if f.name.startswith(f"{args.country}-")] # Apply limit if args.limit > 0: files = files[:args.limit] # Skip already processed if args.resume: files = [f for f in files if f.name not in checkpoint['processed_files']] logger.info(f"Processing {len(files)} custodian files...") stats = { 'processed': 0, 'success': 0, 'failed': 0, 'skipped': 0, 'logos_found': 0, } for i, filepath in enumerate(files): try: result = enrich_custodian_with_logos(filepath, client, args.dry_run) stats['processed'] += 1 if result['success']: if 'skipped' in result['message'].lower(): stats['skipped'] += 1 else: stats['success'] += 1 stats['logos_found'] += result['logos_found'] else: stats['failed'] += 1 # Update checkpoint checkpoint['processed_files'].append(filepath.name) checkpoint['last_index'] = i if (i + 1) % 10 == 0: save_checkpoint(checkpoint) logger.info(f"Progress: {i+1}/{len(files)} - {stats['logos_found']} logos found") # Rate limiting time.sleep(REQUEST_DELAY) except KeyboardInterrupt: logger.info("Interrupted - saving checkpoint...") save_checkpoint(checkpoint) break # Final checkpoint save_checkpoint(checkpoint) # Summary logger.info("\n" + "="*60) logger.info("LOGO ENRICHMENT SUMMARY") logger.info("="*60) logger.info(f"Total processed: {stats['processed']}") logger.info(f"Successful: {stats['success']}") logger.info(f"Failed: {stats['failed']}") logger.info(f"Skipped (already enriched): {stats['skipped']}") logger.info(f"Total logos found: {stats['logos_found']}") logger.info("="*60) finally: client.close() if __name__ == '__main__': main()