628 lines
22 KiB
Python
628 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enrich custodian YAML files with logo images using Firecrawl.
|
|
|
|
This script extracts logo URLs from heritage institution websites with proper
|
|
xpath provenance, following AGENTS.md Rule 6 (WebObservation Claims MUST Have XPath Provenance).
|
|
|
|
Logo extraction looks for:
|
|
1. <link rel="icon"> or <link rel="apple-touch-icon"> (favicon/icon)
|
|
2. <meta property="og:image"> (Open Graph image)
|
|
3. <img> elements with logo/brand in class/id/alt attributes
|
|
4. SVG elements in header/nav regions
|
|
|
|
Output format follows WebClaim schema with:
|
|
- claim_type: logo_url, favicon_url, og_image_url
|
|
- claim_value: The extracted image URL
|
|
- source_url: Website where logo was found
|
|
- xpath: XPath to the element (for verification)
|
|
- xpath_match_score: Always 1.0 for direct attribute extraction
|
|
- retrieved_on: ISO 8601 timestamp
|
|
- html_file: Path to archived HTML (if available)
|
|
|
|
Usage:
|
|
python scripts/enrich_custodian_logos.py [options]
|
|
|
|
Options:
|
|
--dry-run Show what would be enriched without modifying files
|
|
--limit N Process only first N files (for testing)
|
|
--file PATH Process a single specific file
|
|
--country CODE Filter by country code (e.g., NL, BE, DE)
|
|
--resume Resume from last checkpoint
|
|
|
|
Environment Variables:
|
|
FIRECRAWL_API_KEY - Required API key for Firecrawl
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import httpx
|
|
import yaml
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
FIRECRAWL_API_BASE = "https://api.firecrawl.dev/v2"
|
|
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
|
|
CHECKPOINT_FILE = CUSTODIAN_DIR / ".logo_enrichment_checkpoint.json"
|
|
REQUEST_DELAY = 3.5 # seconds between requests
|
|
|
|
# API Key
|
|
FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", "")
|
|
|
|
# Logo detection patterns - prioritized by specificity
|
|
LOGO_PATTERNS = {
|
|
# High confidence patterns (explicit logo indicators)
|
|
'high': [
|
|
r'logo',
|
|
r'brand',
|
|
r'site-icon',
|
|
r'site-logo',
|
|
r'header-logo',
|
|
r'nav-logo',
|
|
r'navbar-brand',
|
|
r'company-logo',
|
|
r'organization-logo',
|
|
],
|
|
# Medium confidence (common logo locations)
|
|
'medium': [
|
|
r'emblem',
|
|
r'symbol',
|
|
r'masthead',
|
|
r'identity',
|
|
],
|
|
}
|
|
|
|
|
|
class FirecrawlClient:
|
|
"""Firecrawl API client for logo extraction."""
|
|
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = FIRECRAWL_API_BASE
|
|
self.client = httpx.Client(
|
|
timeout=60.0,
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
)
|
|
|
|
def scrape_for_logos(self, url: str) -> dict | None:
|
|
"""
|
|
Scrape a URL and extract logo-related elements.
|
|
|
|
Returns dict with:
|
|
- html: Raw HTML content
|
|
- metadata: Extracted metadata (og:image, icons, etc.)
|
|
- links: All links found on page
|
|
"""
|
|
payload = {
|
|
"url": url,
|
|
"formats": ["html", "links"],
|
|
"onlyMainContent": False, # Need full page for header/footer logos
|
|
"maxAge": 172800000, # 2 days cache
|
|
"blockAds": True,
|
|
"skipTlsVerification": True,
|
|
}
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
response = self.client.post(f"{self.base_url}/scrape", json=payload)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if result.get("success"):
|
|
return result.get("data")
|
|
else:
|
|
logger.warning(f"Scrape failed for {url}: {result}")
|
|
return None
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 429 and attempt < max_retries - 1:
|
|
wait_time = (attempt + 1) * 10
|
|
logger.warning(f"Rate limited, waiting {wait_time}s...")
|
|
time.sleep(wait_time)
|
|
continue
|
|
logger.error(f"HTTP error scraping {url}: {e.response.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error scraping {url}: {e}")
|
|
return None
|
|
return None
|
|
|
|
def close(self):
|
|
self.client.close()
|
|
|
|
|
|
def extract_logos_from_html(html: str, base_url: str) -> list[dict]:
|
|
"""
|
|
Extract logo URLs from HTML content with xpath provenance.
|
|
|
|
Returns list of WebClaim-compatible dicts.
|
|
"""
|
|
from html.parser import HTMLParser
|
|
|
|
logos = []
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Parse HTML to find logo elements
|
|
class LogoExtractor(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.path = []
|
|
self.index_stack = [{}] # Track element indices at each level
|
|
self.results = []
|
|
|
|
def get_xpath(self):
|
|
"""Generate XPath from current path."""
|
|
if not self.path:
|
|
return "/"
|
|
parts = []
|
|
for i, (tag, idx) in enumerate(self.path):
|
|
parts.append(f"{tag}[{idx}]")
|
|
return "/" + "/".join(parts)
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
# Track element index at this level
|
|
parent_indices = self.index_stack[-1]
|
|
if tag not in parent_indices:
|
|
parent_indices[tag] = 0
|
|
parent_indices[tag] += 1
|
|
idx = parent_indices[tag]
|
|
|
|
self.path.append((tag, idx))
|
|
self.index_stack.append({})
|
|
|
|
attrs_dict = dict(attrs)
|
|
|
|
# Check for favicon/icon links
|
|
if tag == 'link':
|
|
rel = (attrs_dict.get('rel') or '').lower()
|
|
href = attrs_dict.get('href') or ''
|
|
|
|
if 'icon' in rel and href:
|
|
icon_url = urljoin(base_url, href)
|
|
# Determine claim type
|
|
if 'apple-touch' in rel:
|
|
claim_type = 'logo_url' # Apple touch icons are typically high-res logos
|
|
else:
|
|
claim_type = 'favicon_url'
|
|
|
|
self.results.append({
|
|
'claim_type': claim_type,
|
|
'claim_value': icon_url,
|
|
'source_url': base_url,
|
|
'xpath': self.get_xpath() + "/@href",
|
|
'xpath_match_score': 1.0,
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'link_rel_icon',
|
|
})
|
|
|
|
# Check for og:image meta tag
|
|
elif tag == 'meta':
|
|
prop = (attrs_dict.get('property') or '').lower()
|
|
name = (attrs_dict.get('name') or '').lower()
|
|
content = attrs_dict.get('content') or ''
|
|
|
|
if (prop == 'og:image' or name == 'og:image') and content:
|
|
og_url = urljoin(base_url, content)
|
|
self.results.append({
|
|
'claim_type': 'og_image_url',
|
|
'claim_value': og_url,
|
|
'source_url': base_url,
|
|
'xpath': self.get_xpath() + "/@content",
|
|
'xpath_match_score': 1.0,
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'meta_og_image',
|
|
})
|
|
|
|
# Check for img elements with logo indicators
|
|
elif tag == 'img':
|
|
src = attrs_dict.get('src') or ''
|
|
alt = (attrs_dict.get('alt') or '').lower()
|
|
cls = (attrs_dict.get('class') or '').lower()
|
|
id_attr = (attrs_dict.get('id') or '').lower()
|
|
|
|
# Check if this looks like a logo
|
|
all_attrs = f"{alt} {cls} {id_attr} {src.lower()}"
|
|
is_logo = False
|
|
confidence = 'low'
|
|
|
|
for pattern in LOGO_PATTERNS['high']:
|
|
if re.search(pattern, all_attrs, re.IGNORECASE):
|
|
is_logo = True
|
|
confidence = 'high'
|
|
break
|
|
|
|
if not is_logo:
|
|
for pattern in LOGO_PATTERNS['medium']:
|
|
if re.search(pattern, all_attrs, re.IGNORECASE):
|
|
is_logo = True
|
|
confidence = 'medium'
|
|
break
|
|
|
|
if is_logo and src:
|
|
img_url = urljoin(base_url, src)
|
|
# Skip data URLs and tiny tracking pixels
|
|
if not img_url.startswith('data:') and '1x1' not in img_url:
|
|
self.results.append({
|
|
'claim_type': 'logo_url',
|
|
'claim_value': img_url,
|
|
'source_url': base_url,
|
|
'xpath': self.get_xpath() + "/@src",
|
|
'xpath_match_score': 1.0,
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': f'img_logo_detection_{confidence}',
|
|
'detection_confidence': confidence,
|
|
})
|
|
|
|
# Check for SVG logos
|
|
elif tag == 'svg':
|
|
cls = (attrs_dict.get('class') or '').lower()
|
|
id_attr = (attrs_dict.get('id') or '').lower()
|
|
|
|
all_attrs = f"{cls} {id_attr}"
|
|
for pattern in LOGO_PATTERNS['high']:
|
|
if re.search(pattern, all_attrs, re.IGNORECASE):
|
|
self.results.append({
|
|
'claim_type': 'logo_url',
|
|
'claim_value': f"[SVG inline at {self.get_xpath()}]",
|
|
'source_url': base_url,
|
|
'xpath': self.get_xpath(),
|
|
'xpath_match_score': 1.0,
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'svg_logo_detection',
|
|
'is_inline_svg': True,
|
|
})
|
|
break
|
|
|
|
def handle_endtag(self, tag):
|
|
if self.path and self.path[-1][0] == tag:
|
|
self.path.pop()
|
|
self.index_stack.pop()
|
|
|
|
try:
|
|
parser = LogoExtractor()
|
|
parser.feed(html)
|
|
return parser.results
|
|
except Exception as e:
|
|
logger.error(f"Error parsing HTML: {e}")
|
|
return []
|
|
|
|
|
|
def deduplicate_logos(logos: list[dict]) -> list[dict]:
|
|
"""
|
|
Deduplicate logos, keeping highest confidence for each URL.
|
|
|
|
Priority order:
|
|
1. logo_url (explicit logos) over favicon_url
|
|
2. High confidence over medium/low
|
|
3. First occurrence wins for ties
|
|
"""
|
|
seen_urls = {}
|
|
|
|
# Priority scoring
|
|
type_priority = {'logo_url': 3, 'og_image_url': 2, 'favicon_url': 1}
|
|
confidence_priority = {'high': 3, 'medium': 2, 'low': 1}
|
|
|
|
for logo in logos:
|
|
url = logo['claim_value']
|
|
if url.startswith('[SVG'):
|
|
# Always keep inline SVGs (they're unique)
|
|
key = logo['xpath']
|
|
else:
|
|
key = url
|
|
|
|
if key not in seen_urls:
|
|
seen_urls[key] = logo
|
|
else:
|
|
# Compare priorities
|
|
existing = seen_urls[key]
|
|
new_score = (
|
|
type_priority.get(logo['claim_type'], 0) * 10 +
|
|
confidence_priority.get(logo.get('detection_confidence', 'low'), 1)
|
|
)
|
|
existing_score = (
|
|
type_priority.get(existing['claim_type'], 0) * 10 +
|
|
confidence_priority.get(existing.get('detection_confidence', 'low'), 1)
|
|
)
|
|
|
|
if new_score > existing_score:
|
|
seen_urls[key] = logo
|
|
|
|
return list(seen_urls.values())
|
|
|
|
|
|
def get_website_url(entry: dict) -> str | None:
|
|
"""Extract website URL from custodian entry."""
|
|
# Priority 1: Original entry webadres
|
|
if entry.get('original_entry', {}).get('webadres_organisatie'):
|
|
url = entry['original_entry']['webadres_organisatie']
|
|
if url and url.strip() and url.strip().lower() != 'null':
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 2: Museum register website
|
|
if entry.get('museum_register_enrichment', {}).get('website_url'):
|
|
url = entry['museum_register_enrichment']['website_url']
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 3: Wikidata official website
|
|
if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'):
|
|
url = entry['wikidata_enrichment']['wikidata_official_website']
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 4: Google Maps website
|
|
if entry.get('google_maps_enrichment', {}).get('website'):
|
|
url = entry['google_maps_enrichment']['website']
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
return None
|
|
|
|
|
|
def normalize_url(url: str) -> str:
|
|
"""Normalize URL to ensure it has a scheme."""
|
|
if not url:
|
|
return url
|
|
|
|
url = url.strip()
|
|
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'https://' + url
|
|
|
|
url = url.rstrip('/')
|
|
return url
|
|
|
|
|
|
def get_custodian_name(entry: dict) -> str:
|
|
"""Get display name for a custodian entry."""
|
|
if entry.get('custodian_name', {}).get('emic_name'):
|
|
return entry['custodian_name']['emic_name']
|
|
if entry.get('original_entry', {}).get('organisatie'):
|
|
return entry['original_entry']['organisatie']
|
|
if entry.get('museum_register_enrichment', {}).get('museum_name'):
|
|
return entry['museum_register_enrichment']['museum_name']
|
|
return "Unknown"
|
|
|
|
|
|
def load_checkpoint() -> dict:
|
|
"""Load progress checkpoint."""
|
|
if CHECKPOINT_FILE.exists():
|
|
with open(CHECKPOINT_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return {'processed_files': [], 'last_index': 0}
|
|
|
|
|
|
def save_checkpoint(checkpoint: dict):
|
|
"""Save progress checkpoint."""
|
|
with open(CHECKPOINT_FILE, 'w') as f:
|
|
json.dump(checkpoint, f, indent=2)
|
|
|
|
|
|
def enrich_custodian_with_logos(
|
|
filepath: Path,
|
|
client: FirecrawlClient,
|
|
dry_run: bool = False
|
|
) -> dict:
|
|
"""
|
|
Enrich a single custodian file with logo data.
|
|
|
|
Returns dict with:
|
|
- success: bool
|
|
- logos_found: int
|
|
- message: str
|
|
"""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
if not entry:
|
|
return {'success': False, 'logos_found': 0, 'message': 'Empty file'}
|
|
|
|
# Check if already has logo enrichment
|
|
if entry.get('logo_enrichment', {}).get('claims'):
|
|
return {
|
|
'success': True,
|
|
'logos_found': len(entry['logo_enrichment']['claims']),
|
|
'message': 'Already enriched (skipped)'
|
|
}
|
|
|
|
# Get website URL
|
|
website_url = get_website_url(entry)
|
|
if not website_url:
|
|
return {'success': False, 'logos_found': 0, 'message': 'No website URL'}
|
|
|
|
custodian_name = get_custodian_name(entry)
|
|
logger.info(f"Processing: {custodian_name} ({website_url})")
|
|
|
|
# Scrape website
|
|
scrape_result = client.scrape_for_logos(website_url)
|
|
if not scrape_result:
|
|
return {'success': False, 'logos_found': 0, 'message': 'Scrape failed'}
|
|
|
|
# Extract logos from HTML
|
|
html = scrape_result.get('html', '')
|
|
if not html:
|
|
return {'success': False, 'logos_found': 0, 'message': 'No HTML content'}
|
|
|
|
logos = extract_logos_from_html(html, website_url)
|
|
logos = deduplicate_logos(logos)
|
|
|
|
if not logos:
|
|
return {'success': True, 'logos_found': 0, 'message': 'No logos found'}
|
|
|
|
# Prepare enrichment data
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
logo_enrichment = {
|
|
'enrichment_timestamp': timestamp,
|
|
'source_url': website_url,
|
|
'extraction_method': 'firecrawl_html_parsing',
|
|
'claims': logos,
|
|
'summary': {
|
|
'total_logos_found': len(logos),
|
|
'logo_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'logo_url' and not l.get('is_inline_svg')],
|
|
'favicon_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'favicon_url'],
|
|
'og_image_urls': [l['claim_value'] for l in logos if l['claim_type'] == 'og_image_url'],
|
|
'has_inline_svg': any(l.get('is_inline_svg') for l in logos),
|
|
}
|
|
}
|
|
|
|
if dry_run:
|
|
logger.info(f" [DRY RUN] Would add {len(logos)} logo claims")
|
|
return {'success': True, 'logos_found': len(logos), 'message': 'Dry run'}
|
|
|
|
# Update entry
|
|
entry['logo_enrichment'] = logo_enrichment
|
|
|
|
# Add to provenance notes
|
|
if 'provenance' not in entry:
|
|
entry['provenance'] = {}
|
|
if 'notes' not in entry['provenance']:
|
|
entry['provenance']['notes'] = []
|
|
|
|
entry['provenance']['notes'].append(
|
|
f"Logo enrichment added on {timestamp} - {len(logos)} claims extracted"
|
|
)
|
|
|
|
# Save updated entry
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return {
|
|
'success': True,
|
|
'logos_found': len(logos),
|
|
'message': f'Added {len(logos)} logo claims'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {filepath}: {e}")
|
|
return {'success': False, 'logos_found': 0, 'message': str(e)}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Enrich custodian files with logo data')
|
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
|
|
parser.add_argument('--limit', type=int, default=0, help='Process only N files')
|
|
parser.add_argument('--file', type=str, help='Process a single file')
|
|
parser.add_argument('--country', type=str, help='Filter by country code')
|
|
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
|
|
args = parser.parse_args()
|
|
|
|
if not FIRECRAWL_API_KEY:
|
|
logger.error("FIRECRAWL_API_KEY environment variable not set")
|
|
sys.exit(1)
|
|
|
|
client = FirecrawlClient(FIRECRAWL_API_KEY)
|
|
|
|
try:
|
|
# Single file mode
|
|
if args.file:
|
|
filepath = Path(args.file)
|
|
if not filepath.exists():
|
|
logger.error(f"File not found: {filepath}")
|
|
sys.exit(1)
|
|
|
|
result = enrich_custodian_with_logos(filepath, client, args.dry_run)
|
|
logger.info(f"Result: {result['message']} ({result['logos_found']} logos)")
|
|
return
|
|
|
|
# Batch mode
|
|
checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'last_index': 0}
|
|
|
|
# Get all custodian files
|
|
files = sorted(CUSTODIAN_DIR.glob('*.yaml'))
|
|
|
|
# Apply country filter
|
|
if args.country:
|
|
files = [f for f in files if f.name.startswith(f"{args.country}-")]
|
|
|
|
# Apply limit
|
|
if args.limit > 0:
|
|
files = files[:args.limit]
|
|
|
|
# Skip already processed
|
|
if args.resume:
|
|
files = [f for f in files if f.name not in checkpoint['processed_files']]
|
|
|
|
logger.info(f"Processing {len(files)} custodian files...")
|
|
|
|
stats = {
|
|
'processed': 0,
|
|
'success': 0,
|
|
'failed': 0,
|
|
'skipped': 0,
|
|
'logos_found': 0,
|
|
}
|
|
|
|
for i, filepath in enumerate(files):
|
|
try:
|
|
result = enrich_custodian_with_logos(filepath, client, args.dry_run)
|
|
|
|
stats['processed'] += 1
|
|
if result['success']:
|
|
if 'skipped' in result['message'].lower():
|
|
stats['skipped'] += 1
|
|
else:
|
|
stats['success'] += 1
|
|
stats['logos_found'] += result['logos_found']
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
# Update checkpoint
|
|
checkpoint['processed_files'].append(filepath.name)
|
|
checkpoint['last_index'] = i
|
|
|
|
if (i + 1) % 10 == 0:
|
|
save_checkpoint(checkpoint)
|
|
logger.info(f"Progress: {i+1}/{len(files)} - {stats['logos_found']} logos found")
|
|
|
|
# Rate limiting
|
|
time.sleep(REQUEST_DELAY)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Interrupted - saving checkpoint...")
|
|
save_checkpoint(checkpoint)
|
|
break
|
|
|
|
# Final checkpoint
|
|
save_checkpoint(checkpoint)
|
|
|
|
# Summary
|
|
logger.info("\n" + "="*60)
|
|
logger.info("LOGO ENRICHMENT SUMMARY")
|
|
logger.info("="*60)
|
|
logger.info(f"Total processed: {stats['processed']}")
|
|
logger.info(f"Successful: {stats['success']}")
|
|
logger.info(f"Failed: {stats['failed']}")
|
|
logger.info(f"Skipped (already enriched): {stats['skipped']}")
|
|
logger.info(f"Total logos found: {stats['logos_found']}")
|
|
logger.info("="*60)
|
|
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|