648 lines
23 KiB
Python
648 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enrich custodian YAML files with logo images using Crawl4AI.
|
|
|
|
This script extracts logo URLs from heritage institution websites with proper
|
|
provenance, following AGENTS.md Rule 6 (WebObservation Claims MUST Have XPath Provenance).
|
|
|
|
Crawl4AI advantages over Playwright:
|
|
- LLM-friendly structured output
|
|
- Built-in caching (avoids re-fetching)
|
|
- Magic mode for auto-handling cookies/popups
|
|
- Simpler API for extraction
|
|
|
|
Logo extraction looks for:
|
|
1. <link rel="icon"> or <link rel="apple-touch-icon"> (favicon/icon)
|
|
2. <meta property="og:image"> (Open Graph image)
|
|
3. <img> elements with logo/brand in class/id/alt attributes
|
|
4. SVG elements with logo class/id
|
|
|
|
Output format follows WebClaim schema with:
|
|
- claim_type: logo_url, favicon_url, og_image_url
|
|
- claim_value: The extracted image URL
|
|
- source_url: Website where logo was found
|
|
- css_selector: CSS selector to the element (for verification)
|
|
- retrieved_on: ISO 8601 timestamp
|
|
|
|
Usage:
|
|
python scripts/enrich_custodian_logos_crawl4ai.py [options]
|
|
|
|
Options:
|
|
--dry-run Show what would be enriched without modifying files
|
|
--limit N Process only first N files (for testing)
|
|
--file PATH Process a single specific file
|
|
--country CODE Filter by country code (e.g., NL, BE, DE)
|
|
--resume Resume from last checkpoint
|
|
--no-cache Disable crawl4ai caching
|
|
|
|
Requirements:
|
|
pip install crawl4ai pyyaml
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import yaml
|
|
|
|
try:
|
|
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
|
|
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
|
|
except ImportError:
|
|
print("Please install crawl4ai: pip install crawl4ai")
|
|
sys.exit(1)
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
# Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
|
|
CHECKPOINT_FILE = CUSTODIAN_DIR / ".logo_enrichment_crawl4ai_checkpoint.json"
|
|
REQUEST_DELAY = 2.0 # seconds between requests
|
|
|
|
|
|
def get_website_url(entry: dict) -> str | None:
|
|
"""Extract website URL from custodian entry."""
|
|
# Priority 1: Original entry webadres (Dutch ISIL format)
|
|
if entry.get('original_entry', {}).get('webadres_organisatie'):
|
|
url = entry['original_entry']['webadres_organisatie']
|
|
if url and url.strip() and url.strip().lower() not in ('null', 'none', ''):
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 2: Website in identifiers array (Czech ISIL and ARON format)
|
|
for ident in entry.get('original_entry', {}).get('identifiers', []):
|
|
if ident.get('identifier_scheme') == 'Website':
|
|
url = ident.get('identifier_value') or ident.get('identifier_url')
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 3: Museum register website
|
|
if entry.get('museum_register_enrichment', {}).get('website_url'):
|
|
url = entry['museum_register_enrichment']['website_url']
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 4: Wikidata official website
|
|
if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'):
|
|
url = entry['wikidata_enrichment']['wikidata_official_website']
|
|
# Handle list of URLs (take first one)
|
|
if isinstance(url, list):
|
|
url = url[0] if url else None
|
|
if url and isinstance(url, str) and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 5: Google Maps website
|
|
if entry.get('google_maps_enrichment', {}).get('website'):
|
|
url = entry['google_maps_enrichment']['website']
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
# Priority 6: Web enrichment source URL
|
|
if entry.get('web_enrichment', {}).get('source_url'):
|
|
url = entry['web_enrichment']['source_url']
|
|
if url and url.strip():
|
|
return normalize_url(url.strip())
|
|
|
|
return None
|
|
|
|
|
|
def normalize_url(url: str) -> str:
|
|
"""Normalize URL to ensure it has a scheme."""
|
|
if not url:
|
|
return url
|
|
|
|
url = url.strip()
|
|
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'https://' + url
|
|
|
|
url = url.rstrip('/')
|
|
return url
|
|
|
|
|
|
def get_custodian_name(entry: dict) -> str:
|
|
"""Get display name for a custodian entry."""
|
|
if entry.get('custodian_name', {}).get('emic_name'):
|
|
return entry['custodian_name']['emic_name']
|
|
if entry.get('original_entry', {}).get('organisatie'):
|
|
return entry['original_entry']['organisatie']
|
|
if entry.get('museum_register_enrichment', {}).get('museum_name'):
|
|
return entry['museum_register_enrichment']['museum_name']
|
|
return "Unknown"
|
|
|
|
|
|
def load_checkpoint() -> dict:
|
|
"""Load progress checkpoint."""
|
|
if CHECKPOINT_FILE.exists():
|
|
with open(CHECKPOINT_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return {'processed_files': [], 'last_index': 0}
|
|
|
|
|
|
def save_checkpoint(checkpoint: dict):
|
|
"""Save progress checkpoint."""
|
|
with open(CHECKPOINT_FILE, 'w') as f:
|
|
json.dump(checkpoint, f, indent=2)
|
|
|
|
|
|
def extract_logos_from_html(html: str, base_url: str) -> dict:
|
|
"""
|
|
Extract logo information from HTML using BeautifulSoup.
|
|
|
|
Returns structured data with favicons, og images, and logo images.
|
|
"""
|
|
soup = BeautifulSoup(html, 'lxml')
|
|
|
|
results = {
|
|
'favicons': [],
|
|
'ogImages': [],
|
|
'logos': [],
|
|
'svgLogos': [],
|
|
'primaryLogo': None
|
|
}
|
|
|
|
# Logo detection patterns
|
|
logo_pattern = re.compile(r'logo|brand|site-icon|masthead|emblem', re.I)
|
|
exclude_pattern = re.compile(r'sponsor|partner|social|facebook|twitter|instagram|linkedin|youtube|tiktok|footer-logo|cookie', re.I)
|
|
|
|
def make_absolute(url: str) -> str:
|
|
"""Convert relative URL to absolute."""
|
|
if not url:
|
|
return url
|
|
if url.startswith('data:'):
|
|
return url
|
|
return urljoin(base_url, url)
|
|
|
|
def get_css_selector(el) -> str:
|
|
"""Generate a CSS selector for an element."""
|
|
parts = []
|
|
while el and el.name:
|
|
selector = el.name
|
|
if el.get('id'):
|
|
selector = f"#{el['id']}"
|
|
parts.insert(0, selector)
|
|
break
|
|
elif el.get('class'):
|
|
classes = el['class'][:2] # Limit to first 2 classes
|
|
if classes:
|
|
selector += '.' + '.'.join(classes)
|
|
|
|
# Add nth-of-type if needed
|
|
siblings = el.find_previous_siblings(el.name) if el.parent else []
|
|
if siblings:
|
|
selector += f':nth-of-type({len(siblings) + 1})'
|
|
|
|
parts.insert(0, selector)
|
|
el = el.parent
|
|
|
|
return ' > '.join(parts)
|
|
|
|
# Extract favicons
|
|
for link in soup.find_all('link', rel=lambda x: x and ('icon' in x or 'apple-touch' in str(x))):
|
|
href = link.get('href')
|
|
if href:
|
|
results['favicons'].append({
|
|
'href': make_absolute(href),
|
|
'rel': ' '.join(link.get('rel', [])),
|
|
'type': link.get('type', ''),
|
|
'sizes': link.get('sizes', ''),
|
|
'selector': get_css_selector(link)
|
|
})
|
|
|
|
# Extract OG image
|
|
og_image = soup.find('meta', property='og:image')
|
|
if og_image and og_image.get('content'):
|
|
results['ogImages'].append({
|
|
'content': make_absolute(og_image['content']),
|
|
'selector': get_css_selector(og_image)
|
|
})
|
|
|
|
# Twitter image
|
|
twitter_image = soup.find('meta', attrs={'name': 'twitter:image'})
|
|
if twitter_image and twitter_image.get('content'):
|
|
results['ogImages'].append({
|
|
'content': make_absolute(twitter_image['content']),
|
|
'selector': get_css_selector(twitter_image)
|
|
})
|
|
|
|
# Find header/nav for primary logo
|
|
header = soup.find(['header', 'nav']) or soup.find(role='banner')
|
|
|
|
if header:
|
|
# Look for images in header
|
|
for img in header.find_all('img'):
|
|
attrs_str = ' '.join([
|
|
img.get('class', [''])[0] if img.get('class') else '',
|
|
img.get('id', ''),
|
|
img.get('alt', ''),
|
|
img.get('src', '')
|
|
]).lower()
|
|
|
|
if logo_pattern.search(attrs_str) and not exclude_pattern.search(attrs_str):
|
|
src = img.get('src')
|
|
if src:
|
|
if not results['primaryLogo']:
|
|
results['primaryLogo'] = {
|
|
'src': make_absolute(src),
|
|
'alt': img.get('alt', ''),
|
|
'class': ' '.join(img.get('class', [])),
|
|
'id': img.get('id', ''),
|
|
'selector': get_css_selector(img),
|
|
'location': 'header'
|
|
}
|
|
|
|
# Look for SVG logos in header
|
|
for svg in header.find_all('svg'):
|
|
attrs_str = ' '.join([
|
|
svg.get('class', [''])[0] if svg.get('class') else '',
|
|
svg.get('id', '')
|
|
]).lower()
|
|
|
|
if logo_pattern.search(attrs_str) and not exclude_pattern.search(attrs_str):
|
|
if not results['primaryLogo']:
|
|
results['primaryLogo'] = {
|
|
'src': '[inline-svg]',
|
|
'alt': svg.get('aria-label', ''),
|
|
'class': ' '.join(svg.get('class', [])) if svg.get('class') else '',
|
|
'id': svg.get('id', ''),
|
|
'selector': get_css_selector(svg),
|
|
'location': 'header',
|
|
'isInlineSvg': True
|
|
}
|
|
results['svgLogos'].append({
|
|
'class': ' '.join(svg.get('class', [])) if svg.get('class') else '',
|
|
'id': svg.get('id', ''),
|
|
'selector': get_css_selector(svg),
|
|
'ariaLabel': svg.get('aria-label', '')
|
|
})
|
|
|
|
# Find other logo images on page
|
|
seen_srcs = set()
|
|
for img in soup.find_all('img'):
|
|
attrs_str = ' '.join([
|
|
img.get('class', [''])[0] if img.get('class') else '',
|
|
img.get('id', ''),
|
|
img.get('alt', ''),
|
|
img.get('src', '')
|
|
]).lower()
|
|
|
|
if logo_pattern.search(attrs_str) and not exclude_pattern.search(attrs_str):
|
|
src = img.get('src')
|
|
if src and src not in seen_srcs:
|
|
seen_srcs.add(src)
|
|
results['logos'].append({
|
|
'src': make_absolute(src),
|
|
'alt': img.get('alt', ''),
|
|
'class': ' '.join(img.get('class', [])) if img.get('class') else '',
|
|
'id': img.get('id', ''),
|
|
'selector': get_css_selector(img)
|
|
})
|
|
|
|
# Deduplicate favicons by href
|
|
seen_hrefs = set()
|
|
results['favicons'] = [
|
|
f for f in results['favicons']
|
|
if f['href'] not in seen_hrefs and not seen_hrefs.add(f['href'])
|
|
]
|
|
|
|
return results
|
|
|
|
|
|
def select_best_favicon(favicons: list[dict]) -> dict | None:
|
|
"""Select the best favicon from available options."""
|
|
if not favicons:
|
|
return None
|
|
|
|
# Priority: SVG > largest PNG > ICO
|
|
svg_favicons = [f for f in favicons if f['href'].endswith('.svg') or f['type'] == 'image/svg+xml']
|
|
if svg_favicons:
|
|
return svg_favicons[0]
|
|
|
|
# Look for apple-touch-icon (high res)
|
|
apple_icons = [f for f in favicons if 'apple-touch' in f['rel']]
|
|
if apple_icons:
|
|
sized = [f for f in apple_icons if f.get('sizes')]
|
|
if sized:
|
|
sized.sort(key=lambda x: int(x['sizes'].split('x')[0]) if 'x' in x['sizes'] else 0, reverse=True)
|
|
return sized[0]
|
|
return apple_icons[0]
|
|
|
|
# Look for standard icon
|
|
icons = [f for f in favicons if 'icon' in f['rel']]
|
|
if icons:
|
|
png_icons = [i for i in icons if '.png' in i['href']]
|
|
if png_icons:
|
|
sized = [f for f in png_icons if f.get('sizes')]
|
|
if sized:
|
|
sized.sort(key=lambda x: int(x['sizes'].split('x')[0]) if 'x' in x['sizes'] else 0, reverse=True)
|
|
return sized[0]
|
|
return png_icons[0]
|
|
return icons[0]
|
|
|
|
return favicons[0] if favicons else None
|
|
|
|
|
|
def build_logo_claims(logo_data: dict, source_url: str, timestamp: str) -> list[dict]:
|
|
"""Build WebClaim-compatible claims from extracted logo data."""
|
|
claims = []
|
|
|
|
# Primary logo (highest priority)
|
|
if logo_data.get('primaryLogo'):
|
|
primary = logo_data['primaryLogo']
|
|
if primary.get('isInlineSvg'):
|
|
claims.append({
|
|
'claim_type': 'logo_url',
|
|
'claim_value': '[inline-svg]',
|
|
'source_url': source_url,
|
|
'css_selector': primary.get('selector', ''),
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'crawl4ai_svg_detection',
|
|
'detection_confidence': 'high',
|
|
'is_inline_svg': True,
|
|
'aria_label': primary.get('alt', ''),
|
|
})
|
|
elif primary.get('src'):
|
|
claims.append({
|
|
'claim_type': 'logo_url',
|
|
'claim_value': primary['src'],
|
|
'source_url': source_url,
|
|
'css_selector': primary.get('selector', ''),
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'crawl4ai_header_logo',
|
|
'detection_confidence': 'high',
|
|
'alt_text': primary.get('alt', ''),
|
|
})
|
|
|
|
# Best favicon
|
|
best_favicon = select_best_favicon(logo_data.get('favicons', []))
|
|
if best_favicon:
|
|
claims.append({
|
|
'claim_type': 'favicon_url',
|
|
'claim_value': best_favicon['href'],
|
|
'source_url': source_url,
|
|
'css_selector': best_favicon.get('selector', ''),
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'crawl4ai_link_rel',
|
|
'favicon_type': best_favicon.get('type', ''),
|
|
'favicon_sizes': best_favicon.get('sizes', ''),
|
|
})
|
|
|
|
# OG Image
|
|
if logo_data.get('ogImages'):
|
|
og = logo_data['ogImages'][0]
|
|
claims.append({
|
|
'claim_type': 'og_image_url',
|
|
'claim_value': og['content'],
|
|
'source_url': source_url,
|
|
'css_selector': og.get('selector', ''),
|
|
'retrieved_on': timestamp,
|
|
'extraction_method': 'crawl4ai_meta_og',
|
|
})
|
|
|
|
return claims
|
|
|
|
|
|
async def extract_logos_from_url(crawler: AsyncWebCrawler, url: str) -> dict | None:
|
|
"""Crawl URL and extract logo information using Crawl4AI."""
|
|
try:
|
|
# Configure crawl for this request
|
|
config = CrawlerRunConfig(
|
|
wait_until="domcontentloaded",
|
|
page_timeout=30000,
|
|
delay_before_return_html=1.5, # Wait for JS
|
|
magic=True, # Auto-handle popups/cookies
|
|
remove_overlay_elements=True, # Remove cookie banners
|
|
)
|
|
|
|
result = await crawler.arun(url=url, config=config)
|
|
|
|
if not result.success:
|
|
logger.warning(f"Failed to crawl {url}: {result.error_message}")
|
|
return None
|
|
|
|
if not result.html:
|
|
logger.warning(f"No HTML content from {url}")
|
|
return None
|
|
|
|
# Extract logos from HTML
|
|
logo_data = extract_logos_from_html(result.html, url)
|
|
return logo_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error extracting logos from {url}: {e}")
|
|
return None
|
|
|
|
|
|
async def enrich_custodian_with_logos(
|
|
filepath: Path,
|
|
crawler: AsyncWebCrawler,
|
|
dry_run: bool = False
|
|
) -> dict:
|
|
"""
|
|
Enrich a single custodian file with logo data.
|
|
|
|
Returns dict with:
|
|
- success: bool
|
|
- logos_found: int
|
|
- message: str
|
|
"""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
if not entry:
|
|
return {'success': False, 'logos_found': 0, 'message': 'Empty file'}
|
|
|
|
# Check if already has logo enrichment
|
|
if entry.get('logo_enrichment', {}).get('claims'):
|
|
return {
|
|
'success': True,
|
|
'logos_found': len(entry['logo_enrichment']['claims']),
|
|
'message': 'Already enriched (skipped)'
|
|
}
|
|
|
|
# Get website URL
|
|
website_url = get_website_url(entry)
|
|
if not website_url:
|
|
return {'success': False, 'logos_found': 0, 'message': 'No website URL'}
|
|
|
|
custodian_name = get_custodian_name(entry)
|
|
logger.info(f"Processing: {custodian_name} ({website_url})")
|
|
|
|
# Extract logos
|
|
logo_data = await extract_logos_from_url(crawler, website_url)
|
|
if not logo_data:
|
|
return {'success': False, 'logos_found': 0, 'message': 'Failed to extract logos'}
|
|
|
|
# Build claims
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
claims = build_logo_claims(logo_data, website_url, timestamp)
|
|
|
|
if not claims:
|
|
return {'success': True, 'logos_found': 0, 'message': 'No logos found'}
|
|
|
|
# Prepare enrichment data
|
|
logo_enrichment = {
|
|
'enrichment_timestamp': timestamp,
|
|
'source_url': website_url,
|
|
'extraction_method': 'crawl4ai',
|
|
'claims': claims,
|
|
'summary': {
|
|
'total_claims': len(claims),
|
|
'has_primary_logo': logo_data.get('primaryLogo') is not None,
|
|
'has_favicon': any(c['claim_type'] == 'favicon_url' for c in claims),
|
|
'has_og_image': any(c['claim_type'] == 'og_image_url' for c in claims),
|
|
'favicon_count': len(logo_data.get('favicons', [])),
|
|
}
|
|
}
|
|
|
|
if dry_run:
|
|
logger.info(f" [DRY RUN] Would add {len(claims)} logo claims")
|
|
for claim in claims:
|
|
value = claim['claim_value']
|
|
if len(value) > 80:
|
|
value = value[:80] + "..."
|
|
logger.info(f" - {claim['claim_type']}: {value}")
|
|
return {'success': True, 'logos_found': len(claims), 'message': 'Dry run'}
|
|
|
|
# Update entry
|
|
entry['logo_enrichment'] = logo_enrichment
|
|
|
|
# Save updated entry
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
return {
|
|
'success': True,
|
|
'logos_found': len(claims),
|
|
'message': f'Added {len(claims)} logo claims'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {filepath}: {e}")
|
|
return {'success': False, 'logos_found': 0, 'message': str(e)}
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description='Enrich custodian files with logo data using Crawl4AI')
|
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
|
|
parser.add_argument('--limit', type=int, default=0, help='Process only N files')
|
|
parser.add_argument('--file', type=str, help='Process a single file')
|
|
parser.add_argument('--country', type=str, help='Filter by country code')
|
|
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
|
|
parser.add_argument('--no-cache', action='store_true', help='Disable crawl4ai caching')
|
|
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Configure browser
|
|
browser_config = BrowserConfig(
|
|
headless=True,
|
|
viewport_width=1280,
|
|
viewport_height=720,
|
|
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
async with AsyncWebCrawler(config=browser_config) as crawler:
|
|
# Single file mode
|
|
if args.file:
|
|
filepath = Path(args.file)
|
|
if not filepath.exists():
|
|
logger.error(f"File not found: {filepath}")
|
|
sys.exit(1)
|
|
|
|
result = await enrich_custodian_with_logos(filepath, crawler, args.dry_run)
|
|
logger.info(f"Result: {result['message']} ({result['logos_found']} logos)")
|
|
return
|
|
|
|
# Batch mode
|
|
checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'last_index': 0}
|
|
|
|
# Get all custodian files
|
|
files = sorted(CUSTODIAN_DIR.glob('*.yaml'))
|
|
|
|
# Apply country filter
|
|
if args.country:
|
|
files = [f for f in files if f.name.startswith(f"{args.country}-")]
|
|
|
|
# Skip already processed (BEFORE applying limit)
|
|
if args.resume:
|
|
processed_set = set(checkpoint['processed_files'])
|
|
files = [f for f in files if f.name not in processed_set]
|
|
logger.info(f"Resuming: {len(processed_set)} files already processed, {len(files)} remaining")
|
|
|
|
# Apply limit (AFTER resume filter)
|
|
if args.limit > 0:
|
|
files = files[:args.limit]
|
|
|
|
logger.info(f"Processing {len(files)} custodian files...")
|
|
|
|
stats = {
|
|
'processed': 0,
|
|
'success': 0,
|
|
'failed': 0,
|
|
'skipped': 0,
|
|
'logos_found': 0,
|
|
}
|
|
|
|
for i, filepath in enumerate(files):
|
|
try:
|
|
result = await enrich_custodian_with_logos(filepath, crawler, args.dry_run)
|
|
|
|
stats['processed'] += 1
|
|
if result['success']:
|
|
if 'skipped' in result['message'].lower():
|
|
stats['skipped'] += 1
|
|
else:
|
|
stats['success'] += 1
|
|
stats['logos_found'] += result['logos_found']
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
# Update checkpoint
|
|
checkpoint['processed_files'].append(filepath.name)
|
|
checkpoint['last_index'] = i
|
|
|
|
if (i + 1) % 10 == 0:
|
|
save_checkpoint(checkpoint)
|
|
logger.info(f"Progress: {i+1}/{len(files)} - {stats['logos_found']} logos found")
|
|
|
|
# Rate limiting
|
|
await asyncio.sleep(REQUEST_DELAY)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Interrupted - saving checkpoint...")
|
|
save_checkpoint(checkpoint)
|
|
break
|
|
|
|
# Final checkpoint
|
|
save_checkpoint(checkpoint)
|
|
|
|
# Summary
|
|
logger.info("\n" + "="*60)
|
|
logger.info("LOGO ENRICHMENT SUMMARY (Crawl4AI)")
|
|
logger.info("="*60)
|
|
logger.info(f"Total processed: {stats['processed']}")
|
|
logger.info(f"Successful: {stats['success']}")
|
|
logger.info(f"Failed: {stats['failed']}")
|
|
logger.info(f"Skipped (already enriched): {stats['skipped']}")
|
|
logger.info(f"Total logos found: {stats['logos_found']}")
|
|
logger.info("="*60)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|