#!/usr/bin/env python3 """ Enrich Dutch custodian YAML files with web data using Firecrawl API. This script: 1. Maps websites to discover all URLs (APIs, catalogs, portals) 2. Scrapes homepage for metadata and content 3. Extracts structured digital platform information Uses Firecrawl MCP tools directly through the existing MCP infrastructure, but this script provides a standalone batch processing approach using the Firecrawl REST API directly. Usage: python scripts/enrich_dutch_custodians_firecrawl.py [options] Options: --dry-run Show what would be enriched without modifying files --limit N Process only first N files (for testing) --start-index N Start from index N (for resuming) --resume Resume from last checkpoint --force Re-enrich even if already has firecrawl_enrichment --file PATH Process a single specific file Environment Variables: FIRECRAWL_API_KEY - Required API key for Firecrawl """ import argparse import json import logging import os import re import sys import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse, urlunparse, parse_qs, urlencode import httpx import yaml from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration FIRECRAWL_API_BASE = "https://api.firecrawl.dev/v2" CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian" CHECKPOINT_FILE = CUSTODIAN_DIR / ".firecrawl_enrichment_checkpoint.json" # Rate limiting - Firecrawl has rate limits, be conservative # Increased from 2.0 to 3.5 after hitting 429 errors in batch testing REQUEST_DELAY = 3.5 # seconds between requests # API Key FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", "") # Digital platform detection patterns API_ENDPOINT_PATTERNS = [ r'/oai[-_]?pmh', r'/api/', r'/rest/', r'/sparql', r'/graphql', r'/iiif/', r'/sru', r'/z39\.50', r'/opensearch', ] CATALOG_PATTERNS = [ r'/catalogu[es]?(?:/|\?|$)', r'/collecti[eo]n?[s]?(?:/|\?|$)', r'/archie[fv](?:/|\?|$)', r'/beeldbank(?:/|\?|$)', r'/zoeken(?:/|\?|$)', r'/search(?:/|\?|$)', r'/discover(?:/|\?|$)', r'/browse(?:/|\?|$)', ] # Dutch-specific catalog type detection # Maps URL path patterns to human-readable catalog types CATALOG_TYPE_PATTERNS = { 'beeldbank': { 'patterns': [r'/beeldbank', r'/beeld', r'/images', r'/foto'], 'label': 'Image Collection', 'description_nl': 'Beeldbank met gedigitaliseerde foto\'s, kaarten en afbeeldingen', }, 'genealogie': { 'patterns': [r'/genealogie', r'/stamboom', r'/persons', r'/akten'], 'label': 'Genealogy Records', 'description_nl': 'Genealogische bronnen en persoonsgegevens', }, 'archieven': { 'patterns': [r'/archie[fv]', r'/inventaris', r'/toegangen', r'/finding'], 'label': 'Archive Finding Aids', 'description_nl': 'Archiefinventarissen en toegangen', }, 'collectie': { 'patterns': [r'/collectie', r'/collection', r'/object'], 'label': 'Collection Portal', 'description_nl': 'Collectieportaal met objecten en kunstwerken', }, 'kranten': { 'patterns': [r'/kranten', r'/newspaper', r'/periodiek'], 'label': 'Newspaper Archive', 'description_nl': 'Gedigitaliseerde kranten en periodieken', }, 'kaarten': { 'patterns': [r'/kaart', r'/map', r'/cartogra'], 'label': 'Map Collection', 'description_nl': 'Historische kaarten en cartografisch materiaal', }, 'bibliotheek': { 'patterns': [r'/catalogu', r'/biblio', r'/library', r'/boek'], 'label': 'Library Catalog', 'description_nl': 'Bibliotheekcatalogus', }, 'zoeken': { 'patterns': [r'/zoeken', r'/search', r'/discover', r'/browse'], 'label': 'Search Interface', 'description_nl': 'Algemene zoekinterface', }, } # Query parameters that should be stripped for URL normalization NOISE_QUERY_PARAMS = [ 'sort', 'order', 'view', 'mode', 'ss', 'page', 'offset', 'limit', 'random', 'session', 'sid', 'token', 'ref', 'utm_', 'fbclid', 'gclid', ] CMS_INDICATORS = { 'atlantis': ['atlantis', 'picturae'], 'mais_flexis': ['mais-flexis', 'mais flexis', 'de ree'], 'adlib': ['adlib', 'axiell'], 'collective_access': ['collectiveaccess', 'collective access'], 'archivematica': ['archivematica'], 'archivesspace': ['archivesspace'], 'atom': ['accesstomemory', 'atom'], 'omeka': ['omeka'], 'contentdm': ['contentdm'], 'dspace': ['dspace'], 'islandora': ['islandora'], 'memorix': ['memorix'], } # Metadata standards detection patterns # Each entry is (pattern, standard_name, use_regex) # use_regex=True means use word boundary matching (for short acronyms that cause false positives) # use_regex=False means use simple substring matching (for longer unambiguous phrases) METADATA_STANDARDS_PATTERNS = [ # Dublin Core (r'\bdublin\s+core\b', 'Dublin Core', True), (r'\bdc:', 'Dublin Core', True), # dc: namespace prefix (r'\bdcterms\b', 'Dublin Core', True), # MARC21 (r'\bmarc\s*21\b', 'MARC21', True), (r'\bmarc21\b', 'MARC21', True), # EAD - Encoded Archival Description (short acronym, needs word boundary!) (r'\bead\b', 'EAD', True), # Must be whole word, not "leader", "already", etc. (r'encoded\s+archival\s+description', 'EAD', True), (r'\bead\s*2002\b', 'EAD', True), (r'\bead3\b', 'EAD', True), # METS - Metadata Encoding and Transmission Standard (r'\bmets\b', 'METS', True), (r'metadata\s+encoding\s+and\s+transmission', 'METS', True), # MODS - Metadata Object Description Schema (r'\bmods\b', 'MODS', True), (r'metadata\s+object\s+description', 'MODS', True), # LIDO - Lightweight Information Describing Objects (r'\blido\b', 'LIDO', True), (r'lightweight\s+information\s+describing', 'LIDO', True), # CIDOC-CRM (r'\bcidoc[-\s]?crm\b', 'CIDOC-CRM', True), # Schema.org (r'\bschema\.org\b', 'Schema.org', True), (r'\bschema:', 'Schema.org', True), # schema: namespace prefix # RiC-O - Records in Contexts Ontology (r'\bric[-\s]?o\b', 'RiC-O', True), (r'records\s+in\s+contexts', 'RiC-O', True), # PREMIS - Preservation Metadata (r'\bpremis\b', 'PREMIS', True), (r'preservation\s+metadata', 'PREMIS', True), # BIBFRAME (r'\bbibframe\b', 'BIBFRAME', True), # IIIF - International Image Interoperability Framework (r'\biiif\b', 'IIIF', True), (r'image\s+interoperability\s+framework', 'IIIF', True), ] @dataclass class FirecrawlClient: """Simple Firecrawl API client.""" api_key: str base_url: str = FIRECRAWL_API_BASE def __post_init__(self): self.client = httpx.Client( timeout=60.0, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } ) def scrape(self, url: str, formats: list[str] | None = None, only_main_content: bool = True) -> dict | None: """Scrape a single URL.""" if formats is None: formats = ["markdown", "links"] payload = { "url": url, "formats": formats, "onlyMainContent": only_main_content, "maxAge": 172800000, # 2 days cache "blockAds": True, "skipTlsVerification": True, "location": {"country": "NL"}, # Dutch locale } max_retries = 3 for attempt in range(max_retries): try: response = self.client.post(f"{self.base_url}/scrape", json=payload) response.raise_for_status() result = response.json() if result.get("success"): return result.get("data") else: logger.warning(f"Scrape failed for {url}: {result}") return None except httpx.HTTPStatusError as e: if e.response.status_code == 429 and attempt < max_retries - 1: wait_time = (attempt + 1) * 10 # 10s, 20s, 30s logger.warning(f"Rate limited (429), waiting {wait_time}s before retry...") time.sleep(wait_time) continue logger.error(f"HTTP error scraping {url}: {e.response.status_code}") return None except Exception as e: logger.error(f"Error scraping {url}: {e}") return None return None def map_site(self, url: str, limit: int = 100) -> list[dict] | None: """Map all URLs on a site.""" payload = { "url": url, "limit": limit, "sitemap": "include", "includeSubdomains": True, "ignoreQueryParameters": False, # Keep query params for API endpoints "location": {"country": "NL"}, } max_retries = 3 for attempt in range(max_retries): try: response = self.client.post(f"{self.base_url}/map", json=payload) response.raise_for_status() result = response.json() if result.get("success"): return result.get("links", []) else: logger.warning(f"Map failed for {url}: {result}") return None except httpx.HTTPStatusError as e: if e.response.status_code == 429 and attempt < max_retries - 1: wait_time = (attempt + 1) * 10 # 10s, 20s, 30s logger.warning(f"Rate limited (429), waiting {wait_time}s before retry...") time.sleep(wait_time) continue logger.error(f"HTTP error mapping {url}: {e.response.status_code}") return None except Exception as e: logger.error(f"Error mapping {url}: {e}") return None return None def close(self): """Close the HTTP client.""" self.client.close() def get_website_url(entry: dict) -> str | None: """Extract website URL from custodian entry, prioritizing different sources.""" # Priority 1: Original entry webadres if entry.get('original_entry', {}).get('webadres_organisatie'): url = entry['original_entry']['webadres_organisatie'] if url and url.strip(): return normalize_url(url.strip()) # Priority 2: Wikidata official website if entry.get('wikidata_enrichment', {}).get('wikidata_official_website'): url = entry['wikidata_enrichment']['wikidata_official_website'] if url and url.strip(): return normalize_url(url.strip()) # Priority 3: Google Maps website if entry.get('google_maps_enrichment', {}).get('website'): url = entry['google_maps_enrichment']['website'] if url and url.strip(): return normalize_url(url.strip()) return None def normalize_url(url: str) -> str: """Normalize URL to ensure it has a scheme.""" if not url: return url url = url.strip() # Add https if no scheme if not url.startswith(('http://', 'https://')): url = 'https://' + url # Remove trailing slash for consistency url = url.rstrip('/') return url def detect_apis_from_urls(urls: list[dict]) -> list[dict]: """Detect API endpoints from a list of URLs.""" apis = [] for url_info in urls: url = url_info.get('url', '') if isinstance(url_info, dict) else str(url_info) url_lower = url.lower() for pattern in API_ENDPOINT_PATTERNS: if re.search(pattern, url_lower): api_type = detect_api_type(url_lower) apis.append({ 'url': url, 'type': api_type, 'title': url_info.get('title') if isinstance(url_info, dict) else None, }) break return apis def detect_api_type(url: str) -> str: """Detect the type of API from URL.""" url_lower = url.lower() if 'oai' in url_lower or 'pmh' in url_lower: return 'OAI-PMH' elif 'sparql' in url_lower: return 'SPARQL' elif 'iiif' in url_lower: return 'IIIF' elif 'sru' in url_lower: return 'SRU' elif 'opensearch' in url_lower: return 'OpenSearch' elif 'graphql' in url_lower: return 'GraphQL' elif '/api/' in url_lower or '/rest/' in url_lower: return 'REST' else: return 'Unknown' def normalize_catalog_url(url: str, strip_all_params: bool = True) -> str: """Normalize catalog URL by removing query parameters. For catalog URLs, we want the canonical base URL without query params, as they often contain session-specific or record-specific parameters. Args: url: The URL to normalize strip_all_params: If True, remove ALL query params (default for catalogs) If False, only remove known noisy params """ try: parsed = urlparse(url) # For catalog URLs, strip all query params to get canonical base URL if strip_all_params: normalized = urlunparse(( parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), '', # No params '', # No query '' # No fragment )) return normalized # Parse query parameters query_params = parse_qs(parsed.query, keep_blank_values=False) # Filter out noisy parameters cleaned_params = {} for key, values in query_params.items(): key_lower = key.lower() # Skip if parameter matches noise patterns is_noise = False for noise in NOISE_QUERY_PARAMS: if noise in key_lower: is_noise = True break # Also skip if value contains random-looking strings (session IDs, etc.) if not is_noise and values: value = values[0] # Skip values that look like random session IDs (long hex strings, timestamps) if re.match(r'^[a-f0-9]{20,}$', value, re.I): is_noise = True # Skip values with encoded JSON containing random seeds if 'random' in value.lower(): is_noise = True if not is_noise: cleaned_params[key] = values # Reconstruct URL without noisy params new_query = urlencode(cleaned_params, doseq=True) if cleaned_params else '' normalized = urlunparse(( parsed.scheme, parsed.netloc, parsed.path.rstrip('/'), # Normalize trailing slash parsed.params, new_query, '' # Remove fragment )) return normalized except Exception: return url def detect_catalog_type(url: str) -> tuple[str | None, str | None]: """Detect the catalog type from URL path. Returns tuple of (type_key, label) or (None, None) if not detected. """ url_lower = url.lower() for type_key, config in CATALOG_TYPE_PATTERNS.items(): for pattern in config['patterns']: if re.search(pattern, url_lower): return type_key, config['label'] return None, None def generate_catalog_description(url: str, catalog_type: str | None) -> str | None: """Generate a meaningful description based on URL path and catalog type.""" if catalog_type and catalog_type in CATALOG_TYPE_PATTERNS: return CATALOG_TYPE_PATTERNS[catalog_type]['description_nl'] # Fallback: extract description from URL path parsed = urlparse(url) path_parts = [p for p in parsed.path.split('/') if p] if path_parts: # Use last meaningful path segment last_segment = path_parts[-1].replace('-', ' ').replace('_', ' ').title() if last_segment.lower() not in ('zoeken', 'search', 'index', 'home'): return f"Zoekinterface: {last_segment}" return None def detect_catalogs_from_urls(urls: list[dict]) -> list[dict]: """Detect catalog/collection portals from URLs. Improvements over basic detection: 1. Normalizes URLs (removes session IDs, random sort params) 2. Deduplicates by catalog section (keeps one URL per catalog type per domain) 3. Categorizes by catalog type (beeldbank, genealogie, etc.) 4. Generates meaningful descriptions """ catalogs = [] seen_catalog_types = {} # Track which catalog types we've seen per domain # First pass: collect all matching URLs candidate_urls = [] for url_info in urls: url = url_info.get('url', '') if isinstance(url_info, dict) else str(url_info) url_lower = url.lower() # Check if URL matches any catalog pattern is_catalog = False for pattern in CATALOG_PATTERNS: if re.search(pattern, url_lower): is_catalog = True break if is_catalog: # Normalize URL normalized_url = normalize_catalog_url(url) catalog_type, catalog_label = detect_catalog_type(url) # Calculate path depth (shallower URLs are preferred) parsed = urlparse(normalized_url) path_parts = [p for p in parsed.path.split('/') if p] path_depth = len(path_parts) candidate_urls.append({ 'url': normalized_url, 'original_url': url, 'url_info': url_info, 'catalog_type': catalog_type, 'catalog_label': catalog_label, 'path_depth': path_depth, 'has_query': bool(parsed.query), }) # Sort by: catalog_type, then has_query (no query first), then path_depth candidate_urls.sort(key=lambda x: ( x['catalog_type'] or 'zzz', # Group by type x['has_query'], # Prefer URLs without query params x['path_depth'], # Prefer shallower paths )) # Second pass: keep only one URL per catalog type per domain for candidate in candidate_urls: parsed = urlparse(candidate['url']) domain = parsed.netloc catalog_type = candidate['catalog_type'] or 'generic' # Create a key for deduplication: domain + catalog_type dedup_key = f"{domain}:{catalog_type}" # If we haven't seen this catalog type on this domain, add it if dedup_key not in seen_catalog_types: seen_catalog_types[dedup_key] = candidate['url'] # Get or generate description url_info = candidate['url_info'] title = url_info.get('title') if isinstance(url_info, dict) else None original_description = url_info.get('description') if isinstance(url_info, dict) else None description = original_description if not description: description = generate_catalog_description(candidate['url'], candidate['catalog_type']) # Build catalog entry catalog_entry = { 'url': candidate['url'], 'catalog_type': candidate['catalog_type'], 'catalog_type_label': candidate['catalog_label'], 'title': title, 'description': description, } catalogs.append(catalog_entry) # Sort by catalog type for consistent ordering type_order = list(CATALOG_TYPE_PATTERNS.keys()) catalogs.sort(key=lambda x: ( type_order.index(x['catalog_type']) if x['catalog_type'] in type_order else 999, x['url'] )) return catalogs def detect_cms_from_content(content: str) -> list[str]: """Detect CMS/platform indicators from page content.""" if not content: return [] content_lower = content.lower() detected = [] for cms_name, indicators in CMS_INDICATORS.items(): for indicator in indicators: if indicator in content_lower: detected.append(cms_name) break return list(set(detected)) def detect_metadata_standards(content: str) -> list[str]: """Detect metadata standards mentioned in content using regex word boundaries. Uses METADATA_STANDARDS_PATTERNS which includes regex patterns with word boundaries to avoid false positives like matching 'ead' in 'leader' or 'development'. """ if not content: return [] content_lower = content.lower() detected = set() for pattern, standard_name, use_regex in METADATA_STANDARDS_PATTERNS: if use_regex: if re.search(pattern, content_lower, re.IGNORECASE): detected.add(standard_name) else: if pattern in content_lower: detected.add(standard_name) return list(detected) def create_firecrawl_enrichment( url: str, scrape_data: dict | None, map_data: list[dict] | None, ) -> dict: """Create the firecrawl_enrichment section for a custodian entry.""" timestamp = datetime.now(timezone.utc).isoformat() enrichment = { 'fetch_timestamp': timestamp, 'source_url': url, 'success': scrape_data is not None, } if scrape_data: # Extract metadata metadata = scrape_data.get('metadata', {}) enrichment['page_metadata'] = { 'title': metadata.get('title'), 'description': metadata.get('description'), 'language': metadata.get('language'), 'status_code': metadata.get('statusCode'), } # Get markdown content for analysis content = scrape_data.get('markdown', '') # Detect CMS and standards from content enrichment['detected_cms'] = detect_cms_from_content(content) enrichment['detected_standards'] = detect_metadata_standards(content) # Get links from scrape links = scrape_data.get('links', []) if links: enrichment['links_count'] = len(links) if map_data: enrichment['sitemap_urls_count'] = len(map_data) # Detect APIs and catalogs from mapped URLs apis = detect_apis_from_urls(map_data) if apis: enrichment['detected_api_endpoints'] = apis catalogs = detect_catalogs_from_urls(map_data) if catalogs: enrichment['detected_catalog_urls'] = catalogs[:10] # Limit to 10 return enrichment def update_provenance(entry: dict, firecrawl_data: dict) -> dict: """Update provenance section with Firecrawl source.""" provenance = entry.get('provenance', {}) sources = provenance.get('sources', {}) # Add Firecrawl source sources['firecrawl'] = [{ 'source_type': 'firecrawl_api', 'fetch_timestamp': firecrawl_data.get('fetch_timestamp'), 'api_version': 'v2', 'source_url': firecrawl_data.get('source_url'), 'claims_extracted': [ 'page_metadata', 'detected_cms', 'detected_standards', 'detected_api_endpoints', 'detected_catalog_urls', ], }] provenance['sources'] = sources entry['provenance'] = provenance return entry def load_checkpoint() -> dict: """Load checkpoint for resumable processing.""" if CHECKPOINT_FILE.exists(): try: with open(CHECKPOINT_FILE, 'r') as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load checkpoint: {e}") return { 'processed_files': [], 'last_processed_index': -1, 'stats': { 'total_processed': 0, 'successful': 0, 'failed': 0, 'skipped_no_url': 0, 'skipped_already_enriched': 0, } } def save_checkpoint(checkpoint: dict): """Save checkpoint for resumable processing.""" try: with open(CHECKPOINT_FILE, 'w') as f: json.dump(checkpoint, f, indent=2) except Exception as e: logger.error(f"Failed to save checkpoint: {e}") def process_custodian( filepath: Path, client: FirecrawlClient, dry_run: bool = False, force: bool = False, ) -> dict: """Process a single custodian file.""" result = { 'filepath': str(filepath), 'status': 'unknown', 'url': None, 'error': None, } try: # Load entry with open(filepath, 'r', encoding='utf-8') as f: entry = yaml.safe_load(f) if not entry: result['status'] = 'skipped_empty' return result # Check if already enriched if not force and entry.get('firecrawl_enrichment'): result['status'] = 'skipped_already_enriched' return result # Get website URL url = get_website_url(entry) result['url'] = url if not url: result['status'] = 'skipped_no_url' return result logger.info(f"Processing {filepath.name}: {url}") if dry_run: result['status'] = 'dry_run' return result # Rate limiting time.sleep(REQUEST_DELAY) # Scrape homepage scrape_data = client.scrape(url) # Map site URLs (with smaller limit for efficiency) time.sleep(REQUEST_DELAY) map_data = client.map_site(url, limit=200) # Create enrichment firecrawl_enrichment = create_firecrawl_enrichment(url, scrape_data, map_data) # Update entry entry['firecrawl_enrichment'] = firecrawl_enrichment entry = update_provenance(entry, firecrawl_enrichment) # Save updated entry with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False) result['status'] = 'success' if firecrawl_enrichment.get('success') else 'partial' result['apis_found'] = len(firecrawl_enrichment.get('detected_api_endpoints', [])) result['catalogs_found'] = len(firecrawl_enrichment.get('detected_catalog_urls', [])) except Exception as e: logger.error(f"Error processing {filepath}: {e}") result['status'] = 'error' result['error'] = str(e) return result def main(): parser = argparse.ArgumentParser( description='Enrich Dutch custodian files with Firecrawl web data' ) parser.add_argument('--dry-run', action='store_true', help='Show what would be enriched without modifying files') parser.add_argument('--limit', type=int, default=None, help='Process only first N files') parser.add_argument('--start-index', type=int, default=0, help='Start from index N') parser.add_argument('--resume', action='store_true', help='Resume from last checkpoint') parser.add_argument('--force', action='store_true', help='Re-enrich even if already has firecrawl_enrichment') parser.add_argument('--file', type=str, default=None, help='Process a single specific file') args = parser.parse_args() # Check API key if not FIRECRAWL_API_KEY: logger.error("FIRECRAWL_API_KEY environment variable not set") sys.exit(1) # Initialize client client = FirecrawlClient(api_key=FIRECRAWL_API_KEY) try: # Single file mode if args.file: filepath = Path(args.file) if not filepath.exists(): logger.error(f"File not found: {filepath}") sys.exit(1) result = process_custodian(filepath, client, args.dry_run, args.force) logger.info(f"Result: {result}") return # Get list of Dutch custodian files files = sorted(CUSTODIAN_DIR.glob("NL-*.yaml")) total_files = len(files) logger.info(f"Found {total_files} Dutch custodian files") # Load checkpoint if resuming checkpoint = load_checkpoint() if args.resume else { 'processed_files': [], 'last_processed_index': -1, 'stats': { 'total_processed': 0, 'successful': 0, 'failed': 0, 'skipped_no_url': 0, 'skipped_already_enriched': 0, } } # Determine start index start_index = args.start_index if args.resume and checkpoint['last_processed_index'] >= 0: start_index = checkpoint['last_processed_index'] + 1 logger.info(f"Resuming from index {start_index}") # Determine end index end_index = total_files if args.limit: end_index = min(start_index + args.limit, total_files) logger.info(f"Processing files {start_index} to {end_index - 1}") # Process files for i, filepath in enumerate(files[start_index:end_index], start=start_index): logger.info(f"[{i + 1}/{total_files}] Processing {filepath.name}") result = process_custodian(filepath, client, args.dry_run, args.force) # Update stats checkpoint['stats']['total_processed'] += 1 if result['status'] == 'success': checkpoint['stats']['successful'] += 1 elif result['status'] == 'error': checkpoint['stats']['failed'] += 1 elif result['status'] == 'skipped_no_url': checkpoint['stats']['skipped_no_url'] += 1 elif result['status'] == 'skipped_already_enriched': checkpoint['stats']['skipped_already_enriched'] += 1 checkpoint['processed_files'].append(str(filepath)) checkpoint['last_processed_index'] = i # Save checkpoint every 10 files if (i + 1) % 10 == 0: save_checkpoint(checkpoint) logger.info(f"Checkpoint saved at index {i}") # Log progress if result['status'] in ('success', 'partial'): logger.info( f" -> {result['status']}: " f"{result.get('apis_found', 0)} APIs, " f"{result.get('catalogs_found', 0)} catalogs found" ) else: logger.info(f" -> {result['status']}") # Final checkpoint save save_checkpoint(checkpoint) # Print summary logger.info("\n=== Processing Summary ===") logger.info(f"Total processed: {checkpoint['stats']['total_processed']}") logger.info(f"Successful: {checkpoint['stats']['successful']}") logger.info(f"Failed: {checkpoint['stats']['failed']}") logger.info(f"Skipped (no URL): {checkpoint['stats']['skipped_no_url']}") logger.info(f"Skipped (already enriched): {checkpoint['stats']['skipped_already_enriched']}") finally: client.close() if __name__ == '__main__': main()