glam/scripts/enrich_custodian_logos.py
2025-12-21 22:12:34 +01:00

628 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Enrich custodian YAML files with logo images using Firecrawl.
This script extracts logo URLs from heritage institution websites with proper
xpath provenance, following AGENTS.md Rule 6 (WebObservation Claims MUST Have XPath Provenance).
Logo extraction looks for:
1. <link rel="icon"> or <link rel="apple-touch-icon"> (favicon/icon)
2. <meta property="og:image"> (Open Graph image)
3. <img> elements with logo/brand in class/id/alt attributes
4. SVG elements in header/nav regions
Output format follows WebClaim schema with:
- claim_type: logo_url, favicon_url, og_image_url
- claim_value: The extracted image URL
- source_url: Website where logo was found
- xpath: XPath to the element (for verification)
- xpath_match_score: Always 1.0 for direct attribute extraction
- retrieved_on: ISO 8601 timestamp
- html_file: Path to archived HTML (if available)
Usage:
python scripts/enrich_custodian_logos.py [options]
Options:
--dry-run Show what would be enriched without modifying files
--limit N Process only first N files (for testing)
--file PATH Process a single specific file
--country CODE Filter by country code (e.g., NL, BE, DE)
--resume Resume from last checkpoint
Environment Variables:
FIRECRAWL_API_KEY - Required API key for Firecrawl
"""
import argparse
import json
import logging
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse
import httpx
import yaml
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
FIRECRAWL_API_BASE = "https://api.firecrawl.dev/v2"
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
CHECKPOINT_FILE = CUSTODIAN_DIR / ".logo_enrichment_checkpoint.json"
REQUEST_DELAY = 3.5 # seconds between requests
# API Key
FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", "")
# Logo detection patterns - prioritized by specificity
LOGO_PATTERNS = {
# High confidence patterns (explicit logo indicators)
'high': [
r'logo',
r'brand',
r'site-icon',
r'site-logo',
r'header-logo',
r'nav-logo',
r'navbar-brand',
r'company-logo',
r'organization-logo',
],
# Medium confidence (common logo locations)
'medium': [
r'emblem',
r'symbol',
r'masthead',
r'identity',
],
}
class FirecrawlClient:
"""Firecrawl API client for logo extraction."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = FIRECRAWL_API_BASE
self.client = httpx.Client(
timeout=60.0,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
)
def scrape_for_logos(self, url: str) -> dict | None:
"""
Scrape a URL and extract logo-related elements.
Returns dict with:
- html: Raw HTML content
- metadata: Extracted metadata (og:image, icons, etc.)
- links: All links found on page
"""
payload = {
"url": url,
"formats": ["html", "links"],
"onlyMainContent": False, # Need full page for header/footer logos
"maxAge": 172800000, # 2 days cache
"blockAds": True,
"skipTlsVerification": True,
}
max_retries = 3
for attempt in range(max_retries):
try:
response = self.client.post(f"{self.base_url}/scrape", json=payload)
response.raise_for_status()
result = response.json()
if result.get("success"):
return result.get("data")
else:
logger.warning(f"Scrape failed for {url}: {result}")
return None
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_retries - 1:
wait_time = (attempt + 1) * 10
logger.warning(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
continue
logger.error(f"HTTP error scraping {url}: {e.response.status_code}")
return None
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
return None
return None
def close(self):
self.client.close()
def extract_logos_from_html(html: str, base_url: str) -> list[dict]:
"""
Extract logo URLs from HTML content with xpath provenance.
Returns list of WebClaim-compatible dicts.
"""
from html.parser import HTMLParser
logos = []
timestamp = datetime.now(timezone.utc).isoformat()
# Parse HTML to find logo elements
class LogoExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.path = []
self.index_stack = [{}] # Track element indices at each level
self.results = []
def get_xpath(self):
"""Generate XPath from current path."""
if not self.path:
return "/"
parts = []
for i, (tag, idx) in enumerate(self.path):
parts.append(f"{tag}[{idx}]")
return "/" + "/".join(parts)
def handle_starttag(self, tag, attrs):
# Track element index at this level
parent_indices = self.index_stack[-1]
if tag not in parent_indices:
parent_indices[tag] = 0
parent_indices[tag] += 1
idx = parent_indices[tag]
self.path.append((tag, idx))
self.index_stack.append({})
attrs_dict = dict(attrs)
# Check for favicon/icon links
if tag == 'link':
rel = (attrs_dict.get('rel') or '').lower()
href = attrs_dict.get('href') or ''
if 'icon' in rel and href:
icon_url = urljoin(base_url, href)
# Determine claim type
if 'apple-touch' in rel:
claim_type = 'logo_url' # Apple touch icons are typically high-res logos
else:
claim_type = 'favicon_url'
self.results.append({
'claim_type': claim_type,
'claim_value': icon_url,
'source_url': base_url,
'xpath': self.get_xpath() + "/@href",
'xpath_match_score': 1.0,
'retrieved_on': timestamp,
'extraction_method': 'link_rel_icon',
})
# Check for og:image meta tag
elif tag == 'meta':
prop = (attrs_dict.get('property') or '').lower()
name = (attrs_dict.get('name') or '').lower()
content = attrs_dict.get('content') or ''
if (prop == 'og:image' or name == 'og:image') and content:
og_url = urljoin(base_url, content)
self.results.append({
'claim_type': 'og_image_url',
'claim_value': og_url,
'source_url': base_url,
'xpath': self.get_xpath() + "/@content",
'xpath_match_score': 1.0,
'retrieved_on': timestamp,
'extraction_method': 'meta_og_image',
})
# Check for img elements with logo indicators
elif tag == 'img':
src = attrs_dict.get('src') or ''
alt = (attrs_dict.get('alt') or '').lower()
cls = (attrs_dict.get('class') or '').lower()
id_attr = (attrs_dict.get('id') or '').lower()
# Check if this looks like a logo
all_attrs = f"{alt} {cls} {id_attr} {src.lower()}"
is_logo = False
confidence = 'low'
for pattern in LOGO_PATTERNS['high']:
if re.search(pattern, all_attrs, re.IGNORECASE):
is_logo = True
confidence = 'high'
break
if not is_logo:
for pattern in LOGO_PATTERNS['medium']:
if re.search(pattern, all_attrs, re.IGNORECASE):
is_logo = True
confidence = 'medium'
break
if is_logo and src:
img_url = urljoin(base_url, src)
# Skip data URLs and tiny tracking pixels
if not img_url.startswith('data:') and '1x1' not in img_url:
self.results.append({
'claim_type': 'logo_url',
'claim_value': img_url,
'source_url': base_url,
'xpath': self.get_xpath() + "/@src",
'xpath_match_score': 1.0,
'retrieved_on': timestamp,
'extraction_method': f'img_logo_detection_{confidence}',
'detection_confidence': confidence,
})
# Check for SVG logos
elif tag == 'svg':
cls = (attrs_dict.get('class') or '').lower()
id_attr = (attrs_dict.get('id') or '').lower()
all_attrs = f"{cls} {id_attr}"
for pattern in LOGO_PATTERNS['high']:
if re.search(pattern, all_attrs, re.IGNORECASE):
self.results.append({
'claim_type': 'logo_url',
'claim_value': f"[SVG inline at {self.get_xpath()}]",
'source_url': base_url,
'xpath': self.get_xpath(),
'xpath_match_score': 1.0,
'retrieved_on': timestamp,
'extraction_method': 'svg_logo_detection',
'is_inline_svg': True,
})
break
def handle_endtag(self, tag):
if self.path and self.path[-1][0] == tag:
self.path.pop()
self.index_stack.pop()
try:
parser = LogoExtractor()
parser.feed(html)
return parser.results
except Exception as e:
logger.error(f"Error parsing HTML: {e}")
return []
def deduplicate_logos(logos: list[dict]) -> list[dict]:
"""
Deduplicate logos, keeping highest confidence for each URL.
Priority order:
1. logo_url (explicit logos) over favicon_url
2. High confidence over medium/low
3. First occurrence wins for ties
"""
seen_urls = {}
# Priority scoring
type_priority = {'logo_url': 3, 'og_image_url': 2, 'favicon_url': 1}
confidence_priority = {'high': 3, 'medium': 2, 'low': 1}
for logo in logos:
url = logo['claim_value']
if url.startswith('[SVG'):
# Always keep inline SVGs (they're unique)
key = logo['xpath']
else:
key = url
if key not in seen_urls:
seen_urls[key] = logo
else:
# Compare priorities
existing = seen_urls[key]
new_score = (
type_priority.get(logo['claim_type'], 0) * 10 +
confidence_priority.get(logo.get('detection_confidence', 'low'), 1)
)
existing_score = (
type_priority.get(existing['claim_type'], 0) * 10 +
confidence_priority.get(existing.get('detection_confidence', 'low'), 1)
)
if new_score > existing_score:
seen_urls[key] = logo
return list(seen_urls.values())
def get_website_url(entry: dict) -> str | None:
"""Extract website URL from custodian entry."""
# Priority 1: Original entry webadres
if entry.get('original_entry', {}).get('webadres_organisatie'):
url = entry['original_entry']['webadres_organisatie']
if url and url.strip() and url.strip().lower() != 'null':
return normalize_url(url.strip())
# Priority 2: Museum register website
if entry.get('museum_register_enrichment', {}).get('website_url'):
url = entry['museum_register_enrichment']['website_url']
if url and url.strip():
return normalize_url(url.strip())
# Priority 3: Wikidata official website
if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'):
url = entry['wikidata_enrichment']['wikidata_official_website']
if url and url.strip():
return normalize_url(url.strip())
# Priority 4: Google Maps website
if entry.get('google_maps_enrichment', {}).get('website'):
url = entry['google_maps_enrichment']['website']
if url and url.strip():
return normalize_url(url.strip())
return None
def normalize_url(url: str) -> str:
"""Normalize URL to ensure it has a scheme."""
if not url:
return url
url = url.strip()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
url = url.rstrip('/')
return url
def get_custodian_name(entry: dict) -> str:
"""Get display name for a custodian entry."""
if entry.get('custodian_name', {}).get('emic_name'):
return entry['custodian_name']['emic_name']
if entry.get('original_entry', {}).get('organisatie'):
return entry['original_entry']['organisatie']
if entry.get('museum_register_enrichment', {}).get('museum_name'):
return entry['museum_register_enrichment']['museum_name']
return "Unknown"
def load_checkpoint() -> dict:
"""Load progress checkpoint."""
if CHECKPOINT_FILE.exists():
with open(CHECKPOINT_FILE, 'r') as f:
return json.load(f)
return {'processed_files': [], 'last_index': 0}
def save_checkpoint(checkpoint: dict):
"""Save progress checkpoint."""
with open(CHECKPOINT_FILE, 'w') as f:
json.dump(checkpoint, f, indent=2)
def enrich_custodian_with_logos(
filepath: Path,
client: FirecrawlClient,
dry_run: bool = False
) -> dict:
"""
Enrich a single custodian file with logo data.
Returns dict with:
- success: bool
- logos_found: int
- message: str
"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
return {'success': False, 'logos_found': 0, 'message': 'Empty file'}
# Check if already has logo enrichment
if entry.get('logo_enrichment', {}).get('claims'):
return {
'success': True,
'logos_found': len(entry['logo_enrichment']['claims']),
'message': 'Already enriched (skipped)'
}
# Get website URL
website_url = get_website_url(entry)
if not website_url:
return {'success': False, 'logos_found': 0, 'message': 'No website URL'}
custodian_name = get_custodian_name(entry)
logger.info(f"Processing: {custodian_name} ({website_url})")
# Scrape website
scrape_result = client.scrape_for_logos(website_url)
if not scrape_result:
return {'success': False, 'logos_found': 0, 'message': 'Scrape failed'}
# Extract logos from HTML
html = scrape_result.get('html', '')
if not html:
return {'success': False, 'logos_found': 0, 'message': 'No HTML content'}
logos = extract_logos_from_html(html, website_url)
logos = deduplicate_logos(logos)
if not logos:
return {'success': True, 'logos_found': 0, 'message': 'No logos found'}
# Prepare enrichment data
timestamp = datetime.now(timezone.utc).isoformat()
logo_enrichment = {
'enrichment_timestamp': timestamp,
'source_url': website_url,
'extraction_method': 'firecrawl_html_parsing',
'claims': logos,
'summary': {
'total_logos_found': len(logos),
'logo_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'logo_url' and not l.get('is_inline_svg')],
'favicon_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'favicon_url'],
'og_image_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'og_image_url'],
'has_inline_svg': any(l.get('is_inline_svg') for l in logos),
}
}
if dry_run:
logger.info(f" [DRY RUN] Would add {len(logos)} logo claims")
return {'success': True, 'logos_found': len(logos), 'message': 'Dry run'}
# Update entry
entry['logo_enrichment'] = logo_enrichment
# Add to provenance notes
if 'provenance' not in entry:
entry['provenance'] = {}
if 'notes' not in entry['provenance']:
entry['provenance']['notes'] = []
entry['provenance']['notes'].append(
f"Logo enrichment added on {timestamp} - {len(logos)} claims extracted"
)
# Save updated entry
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return {
'success': True,
'logos_found': len(logos),
'message': f'Added {len(logos)} logo claims'
}
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
return {'success': False, 'logos_found': 0, 'message': str(e)}
def main():
parser = argparse.ArgumentParser(description='Enrich custodian files with logo data')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
parser.add_argument('--limit', type=int, default=0, help='Process only N files')
parser.add_argument('--file', type=str, help='Process a single file')
parser.add_argument('--country', type=str, help='Filter by country code')
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
args = parser.parse_args()
if not FIRECRAWL_API_KEY:
logger.error("FIRECRAWL_API_KEY environment variable not set")
sys.exit(1)
client = FirecrawlClient(FIRECRAWL_API_KEY)
try:
# Single file mode
if args.file:
filepath = Path(args.file)
if not filepath.exists():
logger.error(f"File not found: {filepath}")
sys.exit(1)
result = enrich_custodian_with_logos(filepath, client, args.dry_run)
logger.info(f"Result: {result['message']} ({result['logos_found']} logos)")
return
# Batch mode
checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'last_index': 0}
# Get all custodian files
files = sorted(CUSTODIAN_DIR.glob('*.yaml'))
# Apply country filter
if args.country:
files = [f for f in files if f.name.startswith(f"{args.country}-")]
# Apply limit
if args.limit > 0:
files = files[:args.limit]
# Skip already processed
if args.resume:
files = [f for f in files if f.name not in checkpoint['processed_files']]
logger.info(f"Processing {len(files)} custodian files...")
stats = {
'processed': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'logos_found': 0,
}
for i, filepath in enumerate(files):
try:
result = enrich_custodian_with_logos(filepath, client, args.dry_run)
stats['processed'] += 1
if result['success']:
if 'skipped' in result['message'].lower():
stats['skipped'] += 1
else:
stats['success'] += 1
stats['logos_found'] += result['logos_found']
else:
stats['failed'] += 1
# Update checkpoint
checkpoint['processed_files'].append(filepath.name)
checkpoint['last_index'] = i
if (i + 1) % 10 == 0:
save_checkpoint(checkpoint)
logger.info(f"Progress: {i+1}/{len(files)} - {stats['logos_found']} logos found")
# Rate limiting
time.sleep(REQUEST_DELAY)
except KeyboardInterrupt:
logger.info("Interrupted - saving checkpoint...")
save_checkpoint(checkpoint)
break
# Final checkpoint
save_checkpoint(checkpoint)
# Summary
logger.info("\n" + "="*60)
logger.info("LOGO ENRICHMENT SUMMARY")
logger.info("="*60)
logger.info(f"Total processed: {stats['processed']}")
logger.info(f"Successful: {stats['success']}")
logger.info(f"Failed: {stats['failed']}")
logger.info(f"Skipped (already enriched): {stats['skipped']}")
logger.info(f"Total logos found: {stats['logos_found']}")
logger.info("="*60)
finally:
client.close()
if __name__ == '__main__':
main()