glam/scripts/enrich_youtube.py

889 lines
31 KiB
Python
Executable file

#!/usr/bin/env python3
"""
YouTube Enrichment Script for Heritage Custodian Entries
This script enriches heritage custodian YAML entries with YouTube channel/video data.
It finds YouTube channels from existing web_claims (social_youtube) and fetches:
- Channel info (subscribers, video count, description, etc.)
- Recent videos (title, description, views, likes, comments)
- Video transcripts (when available)
- Comments on videos
All data includes full provenance with URLs and timestamps.
Usage:
python scripts/enrich_youtube.py [--dry-run] [--limit N] [--entry ENTRY_FILE]
Environment Variables:
YOUTUBE_API_KEY: Required. Get from https://console.cloud.google.com/
Author: GLAM Data Extraction Project
Date: December 2025
"""
import argparse
import json
import os
import re
import subprocess
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import httpx
import yaml
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # dotenv not installed, rely on shell environment
# ============================================================================
# Configuration
# ============================================================================
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"
USER_AGENT = "GLAMDataExtractor/1.0 (heritage-data@example.com) Python/httpx"
ENTRIES_DIR = Path("data/nde/enriched/entries")
# Rate limiting
REQUEST_DELAY = 0.3 # seconds between API calls (reduced for faster processing)
# ============================================================================
# Multi-API Key Management
# ============================================================================
class APIKeyManager:
"""
Manages multiple YouTube API keys with automatic rotation on quota exhaustion.
"""
def __init__(self):
self.keys = []
self.current_index = 0
self.exhausted_keys = set()
self._load_keys()
def _load_keys(self):
"""Load all available API keys from environment variables."""
# Check various environment variable patterns
key_patterns = [
"YOUTUBE_API_KEY",
"GOOGLE_YOUTUBE_TOKEN",
"GOOGLE_YOUTUBE_TOKEN_v2",
"GOOGLE_YOUTUBE_TOKEN_v3",
"GOOGLE_YOUTUBE_TOKEN_v4",
"GOOGLE_YOUTUBE_TOKEN_v5",
"YOUTUBE_API_KEY_1",
"YOUTUBE_API_KEY_2",
"YOUTUBE_API_KEY_3",
"YOUTUBE_API_KEY_4",
"YOUTUBE_API_KEY_5",
]
seen = set()
for pattern in key_patterns:
key = os.getenv(pattern)
if key and key not in seen:
self.keys.append({"key": key, "name": pattern})
seen.add(key)
if not self.keys:
print("WARNING: No YouTube API keys found in environment variables")
else:
print(f"Loaded {len(self.keys)} API key(s): {[k['name'] for k in self.keys]}")
def get_current_key(self) -> Optional[str]:
"""Get the current active API key."""
available = [k for i, k in enumerate(self.keys) if i not in self.exhausted_keys]
if not available:
return None
return available[self.current_index % len(available)]["key"]
def get_current_key_name(self) -> str:
"""Get the name of the current API key."""
available = [k for i, k in enumerate(self.keys) if i not in self.exhausted_keys]
if not available:
return "none"
return available[self.current_index % len(available)]["name"]
def mark_quota_exceeded(self):
"""Mark the current key as quota-exceeded and rotate to next."""
if not self.keys:
return
available_indices = [i for i in range(len(self.keys)) if i not in self.exhausted_keys]
if available_indices:
current_actual_index = available_indices[self.current_index % len(available_indices)]
self.exhausted_keys.add(current_actual_index)
key_name = self.keys[current_actual_index]["name"]
print(f"\n⚠️ Quota exceeded for {key_name}, rotating to next key...")
# Move to next available key
self.current_index = 0
def rotate_key(self):
"""Rotate to the next available key (for load balancing)."""
available = [k for i, k in enumerate(self.keys) if i not in self.exhausted_keys]
if len(available) > 1:
self.current_index = (self.current_index + 1) % len(available)
def has_available_keys(self) -> bool:
"""Check if any keys are still available."""
return len(self.exhausted_keys) < len(self.keys)
def get_status(self) -> str:
"""Get status string for display."""
available = len(self.keys) - len(self.exhausted_keys)
return f"{available}/{len(self.keys)} keys available"
# Global API key manager
api_key_manager = APIKeyManager()
# ============================================================================
# Helper Functions
# ============================================================================
def extract_channel_id_or_username(youtube_url: str) -> Tuple[Optional[str], str]:
"""
Extract YouTube channel ID, username, or video ID from various URL formats.
Returns:
Tuple of (identifier, identifier_type) where type is:
- 'channel_id': Direct channel ID (UCxxxxx)
- 'username': Legacy /user/name format
- 'handle': New /@name format
- 'custom_url': /c/name or bare /name format
- 'video_id': Video ID from watch?v= URL (will resolve to channel)
"""
if not youtube_url:
return None, ""
# Channel ID format: /channel/UCxxxxx (UC + 22 chars = 24 total)
match = re.search(r'youtube\.com/channel/(UC[0-9A-Za-z_-]{22})', youtube_url)
if match:
return match.group(1), "channel_id"
# Handle format: /@username
match = re.search(r'youtube\.com/@([^/?&]+)', youtube_url)
if match:
return match.group(1), "handle"
# User format: /user/username
match = re.search(r'youtube\.com/user/([^/?&]+)', youtube_url)
if match:
return match.group(1), "username"
# Custom URL format: /c/customname
match = re.search(r'youtube\.com/c/([^/?&]+)', youtube_url)
if match:
return match.group(1), "custom_url"
# Video URL format: watch?v=VIDEO_ID (we'll resolve channel from video)
match = re.search(r'youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', youtube_url)
if match:
return match.group(1), "video_id"
# Short video URL format: youtu.be/VIDEO_ID
match = re.search(r'youtu\.be/([a-zA-Z0-9_-]{11})', youtube_url)
if match:
return match.group(1), "video_id"
# Shorts format: youtube.com/shorts/VIDEO_ID
match = re.search(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})', youtube_url)
if match:
return match.group(1), "video_id"
# Direct custom URL format: youtube.com/customname (no prefix)
# Must be after all other patterns to avoid false matches
match = re.search(r'youtube\.com/([a-zA-Z][a-zA-Z0-9_-]{2,})(?:[/?]|$)', youtube_url)
if match:
# Exclude known paths that aren't custom URLs
name = match.group(1)
excluded = {'watch', 'playlist', 'channel', 'user', 'c', 'results', 'feed', 'gaming', 'shorts', 'live'}
if name.lower() not in excluded:
return name, "custom_url"
return None, ""
def get_channel_id_from_video(video_id: str, api_key: str) -> Optional[str]:
"""
Get the channel ID from a video ID.
"""
params = {
"part": "snippet",
"id": video_id,
"key": api_key
}
try:
response = httpx.get(
f"{YOUTUBE_API_BASE}/videos",
params=params,
headers={"User-Agent": USER_AGENT},
timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("items"):
return data["items"][0]["snippet"]["channelId"]
except Exception as e:
print(f" Warning: Could not get channel from video '{video_id}': {e}")
return None
def resolve_channel_id(identifier: str, id_type: str, api_key: str) -> Optional[str]:
"""
Resolve a username, handle, custom URL, or video ID to a channel ID.
"""
if id_type == "channel_id":
return identifier
# For video IDs, get the channel from the video
if id_type == "video_id":
return get_channel_id_from_video(identifier, api_key)
# Use search to find channel
search_params = {
"part": "snippet",
"type": "channel",
"maxResults": 1,
"key": api_key
}
if id_type == "handle":
search_params["q"] = f"@{identifier}"
else:
search_params["q"] = identifier
try:
response = httpx.get(
f"{YOUTUBE_API_BASE}/search",
params=search_params,
headers={"User-Agent": USER_AGENT},
timeout=30.0
)
response.raise_for_status()
data = response.json()
if data.get("items"):
return data["items"][0]["id"]["channelId"]
except Exception as e:
print(f" Warning: Could not resolve {id_type} '{identifier}': {e}")
return None
def get_channel_info(channel_id: str, api_key: str) -> Dict[str, Any]:
"""
Get detailed channel information from YouTube Data API.
"""
params = {
"part": "snippet,statistics,brandingSettings,contentDetails",
"id": channel_id,
"key": api_key
}
response = httpx.get(
f"{YOUTUBE_API_BASE}/channels",
params=params,
headers={"User-Agent": USER_AGENT},
timeout=30.0
)
response.raise_for_status()
data = response.json()
if not data.get("items"):
return {"error": f"Channel not found: {channel_id}"}
item = data["items"][0]
snippet = item.get("snippet", {})
stats = item.get("statistics", {})
branding = item.get("brandingSettings", {})
return {
"channel_id": channel_id,
"channel_url": f"https://www.youtube.com/channel/{channel_id}",
"title": snippet.get("title"),
"description": snippet.get("description"),
"custom_url": snippet.get("customUrl"),
"published_at": snippet.get("publishedAt"),
"country": snippet.get("country"),
"default_language": snippet.get("defaultLanguage"),
"thumbnail_url": snippet.get("thumbnails", {}).get("high", {}).get("url"),
"banner_url": branding.get("image", {}).get("bannerExternalUrl"),
"subscriber_count": int(stats.get("subscriberCount", 0)) if stats.get("subscriberCount") else None,
"video_count": int(stats.get("videoCount", 0)) if stats.get("videoCount") else None,
"view_count": int(stats.get("viewCount", 0)) if stats.get("viewCount") else None,
"subscriber_count_hidden": stats.get("hiddenSubscriberCount", False),
"uploads_playlist_id": item.get("contentDetails", {}).get("relatedPlaylists", {}).get("uploads"),
}
def get_channel_videos(channel_id: str, api_key: str, max_results: int = 20) -> List[Dict[str, Any]]:
"""
Get recent videos from a YouTube channel.
"""
# First, search for videos from this channel
search_params = {
"part": "snippet",
"channelId": channel_id,
"type": "video",
"order": "date",
"maxResults": min(max_results, 50),
"key": api_key
}
response = httpx.get(
f"{YOUTUBE_API_BASE}/search",
params=search_params,
headers={"User-Agent": USER_AGENT},
timeout=30.0
)
response.raise_for_status()
search_data = response.json()
video_ids = [item["id"]["videoId"] for item in search_data.get("items", [])]
if not video_ids:
return []
# Get detailed video info
video_params = {
"part": "snippet,contentDetails,statistics",
"id": ",".join(video_ids),
"key": api_key
}
response = httpx.get(
f"{YOUTUBE_API_BASE}/videos",
params=video_params,
headers={"User-Agent": USER_AGENT},
timeout=30.0
)
response.raise_for_status()
video_data = response.json()
videos = []
for item in video_data.get("items", []):
snippet = item.get("snippet", {})
stats = item.get("statistics", {})
content = item.get("contentDetails", {})
videos.append({
"video_id": item["id"],
"video_url": f"https://www.youtube.com/watch?v={item['id']}",
"title": snippet.get("title"),
"description": snippet.get("description", "")[:500], # Truncate long descriptions
"published_at": snippet.get("publishedAt"),
"duration": content.get("duration"),
"definition": content.get("definition"),
"caption_available": content.get("caption") == "true",
"view_count": int(stats.get("viewCount", 0)) if stats.get("viewCount") else None,
"like_count": int(stats.get("likeCount", 0)) if stats.get("likeCount") else None,
"comment_count": int(stats.get("commentCount", 0)) if stats.get("commentCount") else None,
"tags": snippet.get("tags", [])[:10], # Limit tags
"thumbnail_url": snippet.get("thumbnails", {}).get("high", {}).get("url"),
"default_language": snippet.get("defaultLanguage"),
"default_audio_language": snippet.get("defaultAudioLanguage"),
})
return videos
def get_video_comments(video_id: str, api_key: str, max_results: int = 50) -> List[Dict[str, Any]]:
"""
Get top-level comments on a video.
"""
params = {
"part": "snippet",
"videoId": video_id,
"order": "relevance",
"maxResults": min(max_results, 100),
"textFormat": "plainText",
"key": api_key
}
try:
response = httpx.get(
f"{YOUTUBE_API_BASE}/commentThreads",
params=params,
headers={"User-Agent": USER_AGENT},
timeout=30.0
)
response.raise_for_status()
data = response.json()
comments = []
for item in data.get("items", []):
snippet = item.get("snippet", {}).get("topLevelComment", {}).get("snippet", {})
comments.append({
"comment_id": item["id"],
"author_display_name": snippet.get("authorDisplayName"),
"author_channel_url": snippet.get("authorChannelUrl"),
"text": snippet.get("textDisplay", "")[:1000], # Truncate
"like_count": snippet.get("likeCount", 0),
"published_at": snippet.get("publishedAt"),
"updated_at": snippet.get("updatedAt"),
"reply_count": item.get("snippet", {}).get("totalReplyCount", 0),
})
return comments
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
# Comments disabled for this video
return []
raise
def get_video_transcript(video_id: str, language: str = "en") -> Optional[Dict[str, Any]]:
"""
Get video transcript using yt-dlp.
"""
video_url = f"https://www.youtube.com/watch?v={video_id}"
try:
with tempfile.TemporaryDirectory() as tmpdir:
result = subprocess.run(
[
"yt-dlp",
"--write-subs",
"--write-auto-subs",
"--sub-langs", f"{language},nl,en",
"--sub-format", "vtt",
"--skip-download",
"--output", f"{tmpdir}/%(id)s",
video_url
],
capture_output=True,
text=True,
timeout=60
)
import glob
vtt_files = glob.glob(f"{tmpdir}/*.vtt")
if vtt_files:
with open(vtt_files[0], 'r', encoding='utf-8') as f:
vtt_content = f.read()
# Parse VTT to extract text
lines = []
for line in vtt_content.split('\n'):
line = line.strip()
if line and not line.startswith('WEBVTT') and not line.startswith('Kind:') \
and not line.startswith('Language:') and '-->' not in line \
and not re.match(r'^\d+$', line):
clean_line = re.sub(r'<[^>]+>', '', line)
if clean_line:
lines.append(clean_line)
# Remove duplicate consecutive lines
deduped = []
for line in lines:
if not deduped or line != deduped[-1]:
deduped.append(line)
transcript = ' '.join(deduped)
# Determine language from filename
detected_lang = "unknown"
if ".nl." in vtt_files[0]:
detected_lang = "nl"
elif ".en." in vtt_files[0]:
detected_lang = "en"
return {
"video_id": video_id,
"language": detected_lang,
"transcript_type": "auto" if ".auto." in vtt_files[0] else "manual",
"transcript_text": transcript[:10000], # Truncate very long transcripts
"transcript_length_chars": len(transcript),
"extraction_method": "yt-dlp",
}
return None
except FileNotFoundError:
return {"error": "yt-dlp not installed"}
except subprocess.TimeoutExpired:
return {"error": "Transcript extraction timed out"}
except Exception as e:
return {"error": str(e)}
def find_youtube_url_in_entry(entry: Dict[str, Any]) -> Optional[str]:
"""
Find YouTube URL from web_claims or wikidata in an entry.
"""
# Check web_claims for social_youtube
web_claims = entry.get("web_claims", {}).get("claims", [])
for claim in web_claims:
if claim.get("claim_type") == "social_youtube":
return claim.get("claim_value")
# Check wikidata for YouTube channel ID (P2397)
wikidata = entry.get("wikidata_enrichment", {})
claims = wikidata.get("wikidata_claims", {})
youtube_claim = claims.get("P2397_youtube_channel_id")
if youtube_claim:
channel_id = youtube_claim.get("value")
if channel_id:
return f"https://www.youtube.com/channel/{channel_id}"
return None
def create_youtube_enrichment(
youtube_url: str,
api_key: str,
fetch_videos: int = 10,
fetch_comments_per_video: int = 20,
fetch_transcripts: bool = True
) -> Dict[str, Any]:
"""
Create full YouTube enrichment data with provenance.
"""
timestamp = datetime.now(timezone.utc).isoformat()
enrichment = {
"source_url": youtube_url,
"fetch_timestamp": timestamp,
"api_endpoint": YOUTUBE_API_BASE,
"api_version": "v3",
}
# Extract channel identifier
identifier, id_type = extract_channel_id_or_username(youtube_url)
if not identifier:
enrichment["error"] = f"Could not parse YouTube URL: {youtube_url}"
enrichment["status"] = "FAILED"
return enrichment
enrichment["identifier_type"] = id_type
enrichment["identifier_value"] = identifier
# Resolve to channel ID
channel_id = resolve_channel_id(identifier, id_type, api_key)
if not channel_id:
enrichment["error"] = f"Could not resolve channel ID for: {identifier}"
enrichment["status"] = "FAILED"
return enrichment
try:
# Get channel info
print(f" Fetching channel info for {channel_id}...")
channel_info = get_channel_info(channel_id, api_key)
enrichment["channel"] = channel_info
# Get recent videos
if fetch_videos > 0:
print(f" Fetching {fetch_videos} recent videos...")
videos = get_channel_videos(channel_id, api_key, fetch_videos)
enrichment["videos"] = videos
enrichment["videos_count"] = len(videos)
# Get comments for top videos
if fetch_comments_per_video > 0 and videos:
print(f" Fetching comments for top videos...")
for i, video in enumerate(videos[:5]): # Only first 5 videos
video_id = video["video_id"]
comments = get_video_comments(video_id, api_key, fetch_comments_per_video)
videos[i]["comments"] = comments
videos[i]["comments_fetched"] = len(comments)
# Get transcripts for videos with captions
if fetch_transcripts and videos:
print(f" Fetching transcripts for videos with captions...")
for i, video in enumerate(videos[:3]): # Only first 3 videos
if video.get("caption_available"):
video_id = video["video_id"]
transcript = get_video_transcript(video_id)
if transcript and not transcript.get("error"):
videos[i]["transcript"] = transcript
enrichment["status"] = "SUCCESS"
except httpx.HTTPStatusError as e:
enrichment["error"] = f"YouTube API error: {e.response.status_code}"
enrichment["status"] = "FAILED"
except Exception as e:
enrichment["error"] = str(e)
enrichment["status"] = "FAILED"
return enrichment
def update_provenance(entry: Dict[str, Any], enrichment: Dict[str, Any]) -> None:
"""
Update provenance section with YouTube enrichment source.
Only lists claims that were actually extracted.
"""
if "provenance" not in entry:
entry["provenance"] = {"sources": {}}
if "sources" not in entry["provenance"]:
entry["provenance"]["sources"] = {}
if "youtube" not in entry["provenance"]["sources"]:
entry["provenance"]["sources"]["youtube"] = []
# Build list of claims that were actually extracted
claims_extracted = []
channel = enrichment.get("channel", {})
if channel and not channel.get("error"):
claims_extracted.append("channel_info")
if channel.get("subscriber_count") is not None:
claims_extracted.append("subscriber_count")
if channel.get("video_count") is not None:
claims_extracted.append("video_count")
if channel.get("view_count") is not None:
claims_extracted.append("view_count")
videos = enrichment.get("videos", [])
if videos:
claims_extracted.append(f"recent_videos ({len(videos)} videos)")
# Check for comments
videos_with_comments = sum(1 for v in videos if v.get("comments"))
if videos_with_comments > 0:
total_comments = sum(len(v.get("comments", [])) for v in videos)
claims_extracted.append(f"video_comments ({total_comments} comments from {videos_with_comments} videos)")
# Check for transcripts
videos_with_transcripts = sum(1 for v in videos if v.get("transcript"))
if videos_with_transcripts > 0:
claims_extracted.append(f"video_transcripts ({videos_with_transcripts} videos)")
source_entry = {
"source_type": "youtube_data_api",
"fetch_timestamp": enrichment.get("fetch_timestamp"),
"api_endpoint": enrichment.get("api_endpoint"),
"channel_id": channel.get("channel_id") if channel else None,
"claims_extracted": claims_extracted
}
entry["provenance"]["sources"]["youtube"].append(source_entry)
def process_entry(entry_path: Path, api_key: str, dry_run: bool = False) -> bool:
"""
Process a single entry file and add YouTube enrichment.
"""
print(f"\nProcessing: {entry_path.name}")
# Load entry
with open(entry_path, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
# Check if already enriched
if entry.get("youtube_enrichment", {}).get("status") == "SUCCESS":
print(f" Already enriched, skipping...")
return False
# Find YouTube URL
youtube_url = find_youtube_url_in_entry(entry)
if not youtube_url:
print(f" No YouTube URL found, skipping...")
return False
print(f" Found YouTube URL: {youtube_url}")
if dry_run:
print(f" [DRY RUN] Would enrich with YouTube data")
return True
# Create enrichment
enrichment = create_youtube_enrichment(
youtube_url=youtube_url,
api_key=api_key,
fetch_videos=10,
fetch_comments_per_video=20,
fetch_transcripts=True
)
# Check for quota exceeded error - raise exception to trigger key rotation
error_msg = enrichment.get("error", "")
if "403" in error_msg or "quota" in error_msg.lower():
raise Exception(f"quotaExceeded: {error_msg}")
# Add to entry
entry["youtube_enrichment"] = enrichment
# Update provenance
if enrichment.get("status") == "SUCCESS":
update_provenance(entry, enrichment)
# Save entry
with open(entry_path, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
status = enrichment.get("status", "UNKNOWN")
print(f" Status: {status}")
if status == "SUCCESS":
channel = enrichment.get("channel", {})
videos = enrichment.get("videos", [])
print(f" Channel: {channel.get('title')}")
print(f" Subscribers: {channel.get('subscriber_count'):,}" if channel.get('subscriber_count') else " Subscribers: Hidden")
print(f" Videos fetched: {len(videos)}")
return status == "SUCCESS"
def main():
parser = argparse.ArgumentParser(
description="Enrich heritage custodian entries with YouTube channel data"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without making changes"
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit number of entries to process"
)
parser.add_argument(
"--entry",
type=str,
default=None,
help="Process a specific entry file (filename or full path)"
)
parser.add_argument(
"--skip-existing",
action="store_true",
default=True,
help="Skip entries that already have YouTube enrichment (default: True)"
)
args = parser.parse_args()
# Check API keys
if not api_key_manager.has_available_keys():
print("ERROR: No YouTube API keys found in environment variables")
print("\nTo get an API key:")
print("1. Go to https://console.cloud.google.com/")
print("2. Create a project and enable YouTube Data API v3")
print("3. Create an API key under Credentials")
print("4. Set one or more of:")
print(" export YOUTUBE_API_KEY='your-key-here'")
print(" export GOOGLE_YOUTUBE_TOKEN='your-key-here'")
print(" export GOOGLE_YOUTUBE_TOKEN_v2='your-second-key'")
print(" export GOOGLE_YOUTUBE_TOKEN_v3='your-third-key'")
sys.exit(1)
print("=" * 60)
print("YouTube Enrichment Script for Heritage Custodians")
print("=" * 60)
print(f"API Keys: {api_key_manager.get_status()}")
current_key = api_key_manager.get_current_key()
if current_key:
print(f"Current key: {api_key_manager.get_current_key_name()} ({current_key[:8]}...{current_key[-4:]})")
print(f"Entries directory: {ENTRIES_DIR}")
print(f"Dry run: {args.dry_run}")
# Collect entries to process
if args.entry:
entry_path = Path(args.entry)
if not entry_path.exists():
entry_path = ENTRIES_DIR / args.entry
if not entry_path.exists():
print(f"ERROR: Entry not found: {args.entry}")
sys.exit(1)
entries = [entry_path]
else:
entries = sorted(ENTRIES_DIR.glob("*.yaml"))
if args.limit:
entries = entries[:args.limit]
print(f"Entries to process: {len(entries)}")
print("=" * 60)
# Process entries
success_count = 0
skip_count = 0
error_count = 0
for entry_path in entries:
# Check if we still have available keys
if not api_key_manager.has_available_keys():
print("\n" + "=" * 60)
print("⚠️ ALL API KEYS EXHAUSTED - Stopping enrichment")
print("=" * 60)
print("All API keys have exceeded their daily quota.")
print("Please wait 24 hours for quota reset, or add more keys.")
break
try:
current_key = api_key_manager.get_current_key()
result = process_entry(entry_path, current_key, args.dry_run)
if result:
success_count += 1
# Rotate key after successful enrichment to distribute load
api_key_manager.rotate_key()
else:
skip_count += 1
except Exception as e:
error_str = str(e)
# Check for quota exceeded error
if "quotaExceeded" in error_str or "403" in error_str:
print(f"\n ⚠️ Quota exceeded detected, rotating key...")
api_key_manager.mark_quota_exceeded()
# Retry with new key if available
if api_key_manager.has_available_keys():
try:
current_key = api_key_manager.get_current_key()
print(f" Retrying with {api_key_manager.get_current_key_name()}...")
result = process_entry(entry_path, current_key, args.dry_run)
if result:
success_count += 1
else:
skip_count += 1
continue
except Exception as retry_e:
print(f" ERROR on retry: {retry_e}")
error_count += 1
else:
print(f" No more keys available, stopping.")
break
else:
print(f" ERROR: {e}")
error_count += 1
# Rate limiting
import time
time.sleep(REQUEST_DELAY)
# Summary
print("\n" + "=" * 60)
print("Summary")
print("=" * 60)
print(f"Entries processed: {len(entries)}")
print(f"Successfully enriched: {success_count}")
print(f"Skipped (no YouTube / already done): {skip_count}")
print(f"Errors: {error_count}")
if __name__ == "__main__":
main()