glam/scripts/enrich_custodian_logos_playwright.py
2025-12-21 22:12:34 +01:00

652 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Enrich custodian YAML files with logo images using Playwright.
This script extracts logo URLs from heritage institution websites with proper
provenance, following AGENTS.md Rule 6 (WebObservation Claims MUST Have XPath Provenance).
Logo extraction looks for:
1. <link rel="icon"> or <link rel="apple-touch-icon"> (favicon/icon)
2. <meta property="og:image"> (Open Graph image)
3. <img> elements with logo/brand in class/id/alt attributes
4. SVG elements with logo class/id
Output format follows WebClaim schema with:
- claim_type: logo_url, favicon_url, og_image_url
- claim_value: The extracted image URL
- source_url: Website where logo was found
- css_selector: CSS selector to the element (for verification)
- retrieved_on: ISO 8601 timestamp
Usage:
python scripts/enrich_custodian_logos_playwright.py [options]
Options:
--dry-run Show what would be enriched without modifying files
--limit N Process only first N files (for testing)
--file PATH Process a single specific file
--country CODE Filter by country code (e.g., NL, BE, DE)
--resume Resume from last checkpoint
Requirements:
pip install playwright pyyaml
playwright install chromium
"""
import argparse
import asyncio
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin
import yaml
try:
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeout
except ImportError:
print("Please install playwright: pip install playwright && playwright install chromium")
sys.exit(1)
# Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
CHECKPOINT_FILE = CUSTODIAN_DIR / ".logo_enrichment_playwright_checkpoint.json"
REQUEST_DELAY = 2.0 # seconds between requests
PAGE_TIMEOUT = 30000 # 30 seconds
# JavaScript to extract logo information from page
LOGO_EXTRACTION_JS = """
() => {
const results = {
favicons: [],
ogImages: [],
logos: [],
svgLogos: [],
primaryLogo: null
};
// Helper to generate a CSS selector for an element
function getCssSelector(el) {
if (el.id) return '#' + el.id;
let path = [];
while (el && el.nodeType === Node.ELEMENT_NODE) {
let selector = el.nodeName.toLowerCase();
if (el.id) {
selector = '#' + el.id;
path.unshift(selector);
break;
} else if (el.className && typeof el.className === 'string') {
const classes = el.className.trim().split(/\\s+/).filter(c => c).slice(0, 2);
if (classes.length > 0) {
selector += '.' + classes.join('.');
}
}
// Add index if needed
let sibling = el;
let nth = 1;
while (sibling = sibling.previousElementSibling) {
if (sibling.nodeName.toLowerCase() === el.nodeName.toLowerCase()) nth++;
}
if (nth > 1) selector += ':nth-of-type(' + nth + ')';
path.unshift(selector);
el = el.parentNode;
}
return path.join(' > ');
}
// Get favicons from link elements
document.querySelectorAll('link[rel*="icon"]').forEach(el => {
if (el.href) {
results.favicons.push({
href: el.href,
rel: el.rel,
type: el.type || '',
sizes: el.sizes?.value || '',
selector: getCssSelector(el)
});
}
});
// Get apple-touch-icons
document.querySelectorAll('link[rel*="apple-touch"]').forEach(el => {
if (el.href) {
results.favicons.push({
href: el.href,
rel: el.rel,
type: el.type || '',
sizes: el.sizes?.value || '',
selector: getCssSelector(el)
});
}
});
// Get og:image
const ogImage = document.querySelector('meta[property="og:image"]');
if (ogImage && ogImage.content) {
results.ogImages.push({
content: ogImage.content,
selector: getCssSelector(ogImage)
});
}
// Also check twitter:image
const twitterImage = document.querySelector('meta[name="twitter:image"]');
if (twitterImage && twitterImage.content) {
results.ogImages.push({
content: twitterImage.content,
selector: getCssSelector(twitterImage)
});
}
// Logo detection patterns
const logoPatterns = /logo|brand|site-icon|masthead|emblem/i;
const excludePatterns = /sponsor|partner|social|facebook|twitter|instagram|linkedin|youtube|tiktok|footer-logo|cookie/i;
// Get images with logo indicators (prioritize header/nav)
const headerNav = document.querySelector('header, nav, [role="banner"]');
// First check header/nav for primary logo
if (headerNav) {
headerNav.querySelectorAll('img').forEach(el => {
const attrs = `${el.className || ''} ${el.id || ''} ${el.alt || ''} ${el.src || ''}`.toLowerCase();
if (logoPatterns.test(attrs) && !excludePatterns.test(attrs) && el.src) {
if (!results.primaryLogo) {
results.primaryLogo = {
src: el.src,
alt: el.alt || '',
class: el.className || '',
id: el.id || '',
selector: getCssSelector(el),
location: 'header'
};
}
}
});
// Check for SVG logos in header
headerNav.querySelectorAll('svg').forEach(el => {
const attrs = `${el.className?.baseVal || ''} ${el.id || ''}`.toLowerCase();
if (logoPatterns.test(attrs) && !excludePatterns.test(attrs)) {
if (!results.primaryLogo) {
results.primaryLogo = {
src: '[inline-svg]',
alt: el.getAttribute('aria-label') || '',
class: el.className?.baseVal || '',
id: el.id || '',
selector: getCssSelector(el),
location: 'header',
isInlineSvg: true
};
}
results.svgLogos.push({
class: el.className?.baseVal || '',
id: el.id || '',
selector: getCssSelector(el),
ariaLabel: el.getAttribute('aria-label') || ''
});
}
});
}
// Then check rest of page for additional logos
document.querySelectorAll('img').forEach(el => {
const attrs = `${el.className || ''} ${el.id || ''} ${el.alt || ''} ${el.src || ''}`.toLowerCase();
if (logoPatterns.test(attrs) && el.src && !excludePatterns.test(attrs)) {
results.logos.push({
src: el.src,
alt: el.alt || '',
class: el.className || '',
id: el.id || '',
selector: getCssSelector(el)
});
}
});
// Deduplicate logos by src
const seenSrcs = new Set();
results.logos = results.logos.filter(l => {
if (seenSrcs.has(l.src)) return false;
seenSrcs.add(l.src);
return true;
});
// Deduplicate favicons by href
const seenHrefs = new Set();
results.favicons = results.favicons.filter(f => {
if (seenHrefs.has(f.href)) return false;
seenHrefs.add(f.href);
return true;
});
return results;
}
"""
def get_website_url(entry: dict) -> str | None:
"""Extract website URL from custodian entry."""
# Priority 1: Original entry webadres
if entry.get('original_entry', {}).get('webadres_organisatie'):
url = entry['original_entry']['webadres_organisatie']
if url and url.strip() and url.strip().lower() not in ('null', 'none', ''):
return normalize_url(url.strip())
# Priority 2: Museum register website
if entry.get('museum_register_enrichment', {}).get('website_url'):
url = entry['museum_register_enrichment']['website_url']
if url and url.strip():
return normalize_url(url.strip())
# Priority 3: Wikidata official website
if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'):
url = entry['wikidata_enrichment']['wikidata_official_website']
if url and url.strip():
return normalize_url(url.strip())
# Priority 4: Google Maps website
if entry.get('google_maps_enrichment', {}).get('website'):
url = entry['google_maps_enrichment']['website']
if url and url.strip():
return normalize_url(url.strip())
# Priority 5: Web enrichment source URL
if entry.get('web_enrichment', {}).get('source_url'):
url = entry['web_enrichment']['source_url']
if url and url.strip():
return normalize_url(url.strip())
return None
def normalize_url(url: str) -> str:
"""Normalize URL to ensure it has a scheme."""
if not url:
return url
url = url.strip()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
url = url.rstrip('/')
return url
def get_custodian_name(entry: dict) -> str:
"""Get display name for a custodian entry."""
if entry.get('custodian_name', {}).get('emic_name'):
return entry['custodian_name']['emic_name']
if entry.get('original_entry', {}).get('organisatie'):
return entry['original_entry']['organisatie']
if entry.get('museum_register_enrichment', {}).get('museum_name'):
return entry['museum_register_enrichment']['museum_name']
return "Unknown"
def load_checkpoint() -> dict:
"""Load progress checkpoint."""
if CHECKPOINT_FILE.exists():
with open(CHECKPOINT_FILE, 'r') as f:
return json.load(f)
return {'processed_files': [], 'last_index': 0}
def save_checkpoint(checkpoint: dict):
"""Save progress checkpoint."""
with open(CHECKPOINT_FILE, 'w') as f:
json.dump(checkpoint, f, indent=2)
def select_best_favicon(favicons: list[dict]) -> dict | None:
"""Select the best favicon from available options."""
if not favicons:
return None
# Priority: SVG > largest PNG > ICO
svg_favicons = [f for f in favicons if f['href'].endswith('.svg') or f['type'] == 'image/svg+xml']
if svg_favicons:
return svg_favicons[0]
# Look for apple-touch-icon (high res)
apple_icons = [f for f in favicons if 'apple-touch' in f['rel']]
if apple_icons:
# Sort by size if available
sized = [f for f in apple_icons if f.get('sizes')]
if sized:
sized.sort(key=lambda x: int(x['sizes'].split('x')[0]) if 'x' in x['sizes'] else 0, reverse=True)
return sized[0]
return apple_icons[0]
# Look for standard icon
icons = [f for f in favicons if f['rel'] == 'icon']
if icons:
# Prefer PNG over ICO
png_icons = [i for i in icons if '.png' in i['href']]
if png_icons:
sized = [f for f in png_icons if f.get('sizes')]
if sized:
sized.sort(key=lambda x: int(x['sizes'].split('x')[0]) if 'x' in x['sizes'] else 0, reverse=True)
return sized[0]
return png_icons[0]
return icons[0]
# Return first available
return favicons[0] if favicons else None
def build_logo_claims(logo_data: dict, source_url: str, timestamp: str) -> list[dict]:
"""Build WebClaim-compatible claims from extracted logo data."""
claims = []
# Primary logo (highest priority)
if logo_data.get('primaryLogo'):
primary = logo_data['primaryLogo']
if primary.get('isInlineSvg'):
claims.append({
'claim_type': 'logo_url',
'claim_value': '[inline-svg]',
'source_url': source_url,
'css_selector': primary.get('selector', ''),
'retrieved_on': timestamp,
'extraction_method': 'playwright_svg_detection',
'detection_confidence': 'high',
'is_inline_svg': True,
'aria_label': primary.get('alt', ''),
})
elif primary.get('src'):
claims.append({
'claim_type': 'logo_url',
'claim_value': primary['src'],
'source_url': source_url,
'css_selector': primary.get('selector', ''),
'retrieved_on': timestamp,
'extraction_method': 'playwright_header_logo',
'detection_confidence': 'high',
'alt_text': primary.get('alt', ''),
})
# Best favicon
best_favicon = select_best_favicon(logo_data.get('favicons', []))
if best_favicon:
claims.append({
'claim_type': 'favicon_url',
'claim_value': best_favicon['href'],
'source_url': source_url,
'css_selector': best_favicon.get('selector', ''),
'retrieved_on': timestamp,
'extraction_method': 'playwright_link_rel',
'favicon_type': best_favicon.get('type', ''),
'favicon_sizes': best_favicon.get('sizes', ''),
})
# OG Image
if logo_data.get('ogImages'):
og = logo_data['ogImages'][0]
claims.append({
'claim_type': 'og_image_url',
'claim_value': og['content'],
'source_url': source_url,
'css_selector': og.get('selector', ''),
'retrieved_on': timestamp,
'extraction_method': 'playwright_meta_og',
})
return claims
async def extract_logos_from_url(page, url: str) -> dict | None:
"""Navigate to URL and extract logo information using Playwright."""
try:
# Navigate to the page
response = await page.goto(url, wait_until='domcontentloaded', timeout=PAGE_TIMEOUT)
if not response or response.status >= 400:
logger.warning(f"Failed to load {url}: HTTP {response.status if response else 'no response'}")
return None
# Wait a bit for JS to execute
await page.wait_for_timeout(1500)
# Try to dismiss cookie banners (common patterns)
cookie_selectors = [
'button:has-text("Accept")',
'button:has-text("Accepteren")',
'button:has-text("Akzeptieren")',
'button:has-text("Accepter")',
'button:has-text("OK")',
'[id*="cookie"] button',
'[class*="cookie"] button',
'.consent-banner button',
]
for selector in cookie_selectors:
try:
button = page.locator(selector).first
if await button.is_visible(timeout=500):
await button.click(timeout=1000)
await page.wait_for_timeout(500)
break
except Exception:
continue
# Extract logo information
logo_data = await page.evaluate(LOGO_EXTRACTION_JS)
return logo_data
except PlaywrightTimeout:
logger.warning(f"Timeout loading {url}")
return None
except Exception as e:
logger.error(f"Error extracting logos from {url}: {e}")
return None
async def enrich_custodian_with_logos(
filepath: Path,
page,
dry_run: bool = False
) -> dict:
"""
Enrich a single custodian file with logo data.
Returns dict with:
- success: bool
- logos_found: int
- message: str
"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
return {'success': False, 'logos_found': 0, 'message': 'Empty file'}
# Check if already has logo enrichment
if entry.get('logo_enrichment', {}).get('claims'):
return {
'success': True,
'logos_found': len(entry['logo_enrichment']['claims']),
'message': 'Already enriched (skipped)'
}
# Get website URL
website_url = get_website_url(entry)
if not website_url:
return {'success': False, 'logos_found': 0, 'message': 'No website URL'}
custodian_name = get_custodian_name(entry)
logger.info(f"Processing: {custodian_name} ({website_url})")
# Extract logos
logo_data = await extract_logos_from_url(page, website_url)
if not logo_data:
return {'success': False, 'logos_found': 0, 'message': 'Failed to extract logos'}
# Build claims
timestamp = datetime.now(timezone.utc).isoformat()
claims = build_logo_claims(logo_data, website_url, timestamp)
if not claims:
return {'success': True, 'logos_found': 0, 'message': 'No logos found'}
# Prepare enrichment data
logo_enrichment = {
'enrichment_timestamp': timestamp,
'source_url': website_url,
'extraction_method': 'playwright_browser',
'claims': claims,
'summary': {
'total_claims': len(claims),
'has_primary_logo': logo_data.get('primaryLogo') is not None,
'has_favicon': any(c['claim_type'] == 'favicon_url' for c in claims),
'has_og_image': any(c['claim_type'] == 'og_image_url' for c in claims),
'favicon_count': len(logo_data.get('favicons', [])),
}
}
if dry_run:
logger.info(f" [DRY RUN] Would add {len(claims)} logo claims")
for claim in claims:
logger.info(f" - {claim['claim_type']}: {claim['claim_value'][:80]}...")
return {'success': True, 'logos_found': len(claims), 'message': 'Dry run'}
# Update entry
entry['logo_enrichment'] = logo_enrichment
# Save updated entry
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return {
'success': True,
'logos_found': len(claims),
'message': f'Added {len(claims)} logo claims'
}
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
return {'success': False, 'logos_found': 0, 'message': str(e)}
async def main():
parser = argparse.ArgumentParser(description='Enrich custodian files with logo data using Playwright')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done')
parser.add_argument('--limit', type=int, default=0, help='Process only N files')
parser.add_argument('--file', type=str, help='Process a single file')
parser.add_argument('--country', type=str, help='Filter by country code')
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
parser.add_argument('--headless', action='store_true', default=True, help='Run browser headless (default)')
parser.add_argument('--visible', action='store_true', help='Show browser window')
args = parser.parse_args()
headless = not args.visible
async with async_playwright() as p:
browser = await p.chromium.launch(headless=headless)
context = await browser.new_context(
viewport={'width': 1280, 'height': 720},
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
)
page = await context.new_page()
try:
# Single file mode
if args.file:
filepath = Path(args.file)
if not filepath.exists():
logger.error(f"File not found: {filepath}")
sys.exit(1)
result = await enrich_custodian_with_logos(filepath, page, args.dry_run)
logger.info(f"Result: {result['message']} ({result['logos_found']} logos)")
return
# Batch mode
checkpoint = load_checkpoint() if args.resume else {'processed_files': [], 'last_index': 0}
# Get all custodian files
files = sorted(CUSTODIAN_DIR.glob('*.yaml'))
# Apply country filter
if args.country:
files = [f for f in files if f.name.startswith(f"{args.country}-")]
# Apply limit
if args.limit > 0:
files = files[:args.limit]
# Skip already processed
if args.resume:
files = [f for f in files if f.name not in checkpoint['processed_files']]
logger.info(f"Processing {len(files)} custodian files...")
stats = {
'processed': 0,
'success': 0,
'failed': 0,
'skipped': 0,
'logos_found': 0,
}
for i, filepath in enumerate(files):
try:
result = await enrich_custodian_with_logos(filepath, page, args.dry_run)
stats['processed'] += 1
if result['success']:
if 'skipped' in result['message'].lower():
stats['skipped'] += 1
else:
stats['success'] += 1
stats['logos_found'] += result['logos_found']
else:
stats['failed'] += 1
# Update checkpoint
checkpoint['processed_files'].append(filepath.name)
checkpoint['last_index'] = i
if (i + 1) % 10 == 0:
save_checkpoint(checkpoint)
logger.info(f"Progress: {i+1}/{len(files)} - {stats['logos_found']} logos found")
# Rate limiting
await asyncio.sleep(REQUEST_DELAY)
except KeyboardInterrupt:
logger.info("Interrupted - saving checkpoint...")
save_checkpoint(checkpoint)
break
# Final checkpoint
save_checkpoint(checkpoint)
# Summary
logger.info("\n" + "="*60)
logger.info("LOGO ENRICHMENT SUMMARY (Playwright)")
logger.info("="*60)
logger.info(f"Total processed: {stats['processed']}")
logger.info(f"Successful: {stats['success']}")
logger.info(f"Failed: {stats['failed']}")
logger.info(f"Skipped (already enriched): {stats['skipped']}")
logger.info(f"Total logos found: {stats['logos_found']}")
logger.info("="*60)
finally:
await browser.close()
if __name__ == '__main__':
asyncio.run(main())