#!/usr/bin/env python3 """ Enrich existing YouTube channel data with individual video metadata. This script targets custodian files that have: - youtube_status: SUCCESS - youtube_enrichment with channel_id - No videos array (or empty videos array) It fetches video details from the YouTube Data API and adds them to the existing youtube_enrichment section. Usage: python scripts/enrich_youtube_videos.py [--dry-run] [--limit N] """ import os import sys import argparse import httpx import yaml from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, List, Optional import logging import time # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Load environment from dotenv import load_dotenv load_dotenv() GOOGLE_YOUTUBE_TOKEN = os.environ.get("GOOGLE_YOUTUBE_TOKEN") YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3" DATA_DIR = Path(__file__).parent.parent / "data" / "custodian" REQUEST_DELAY = 0.5 # Seconds between API requests def resolve_channel_id(channel_identifier: str, client: httpx.Client) -> Optional[str]: """Resolve a channel identifier to a proper channel ID. Handles: - UC... format: Already a channel ID, return as-is - @handle format: Resolve via channels API with forHandle parameter """ if channel_identifier.startswith("UC"): return channel_identifier if channel_identifier.startswith("@"): # Resolve handle to channel ID params = { "part": "id", "forHandle": channel_identifier, "key": GOOGLE_YOUTUBE_TOKEN, } try: response = client.get(f"{YOUTUBE_API_BASE}/channels", params=params) response.raise_for_status() data = response.json() if data.get("items"): return data["items"][0].get("id") return None except Exception as e: logger.warning(f"Could not resolve handle {channel_identifier}: {e}") return None # Unknown format return channel_identifier def get_uploads_playlist_id(channel_id: str, client: httpx.Client) -> Optional[str]: """Get uploads playlist ID from channel.""" params = { "part": "contentDetails", "id": channel_id, "key": GOOGLE_YOUTUBE_TOKEN, } try: response = client.get(f"{YOUTUBE_API_BASE}/channels", params=params) response.raise_for_status() data = response.json() if data.get("items"): return data["items"][0].get("contentDetails", {}).get("relatedPlaylists", {}).get("uploads") return None except Exception as e: logger.error(f"Error getting uploads playlist for {channel_id}: {e}") return None def get_playlist_videos(playlist_id: str, client: httpx.Client, max_results: int = 50) -> List[str]: """Fetch video IDs from a YouTube playlist.""" params = { "part": "contentDetails", "playlistId": playlist_id, "maxResults": min(max_results, 50), "key": GOOGLE_YOUTUBE_TOKEN, } try: response = client.get(f"{YOUTUBE_API_BASE}/playlistItems", params=params) response.raise_for_status() data = response.json() video_ids = [] for item in data.get("items", []): video_id = item.get("contentDetails", {}).get("videoId") if video_id: video_ids.append(video_id) return video_ids except Exception as e: logger.error(f"Error getting playlist videos for {playlist_id}: {e}") return [] def get_video_details(video_ids: List[str], client: httpx.Client) -> List[Dict[str, Any]]: """Fetch detailed metadata for multiple videos.""" if not video_ids: return [] params = { "part": "snippet,contentDetails,statistics", "id": ",".join(video_ids[:50]), "key": GOOGLE_YOUTUBE_TOKEN, } try: response = client.get(f"{YOUTUBE_API_BASE}/videos", params=params) response.raise_for_status() data = response.json() videos = [] for item in data.get("items", []): video_id = item.get("id", "") snippet = item.get("snippet", {}) content_details = item.get("contentDetails", {}) stats = item.get("statistics", {}) video_data = { "video_id": video_id, "video_url": f"https://www.youtube.com/watch?v={video_id}", "title": snippet.get("title", ""), "description": snippet.get("description", ""), "published_at": snippet.get("publishedAt", ""), "duration": content_details.get("duration", ""), "view_count": int(stats.get("viewCount", 0)) if stats.get("viewCount") else 0, "like_count": int(stats.get("likeCount", 0)) if stats.get("likeCount") else 0, "comment_count": int(stats.get("commentCount", 0)) if stats.get("commentCount") else 0, "comments": [], } # Get highest quality thumbnail thumbnails = snippet.get("thumbnails", {}) for quality in ["maxres", "high", "medium", "default"]: if thumbnails.get(quality, {}).get("url"): video_data["thumbnail_url"] = thumbnails[quality]["url"] break videos.append(video_data) return videos except Exception as e: logger.error(f"Error getting video details: {e}") return [] def fetch_channel_videos(channel_id: str, client: httpx.Client, max_videos: int = 50) -> List[Dict[str, Any]]: """Fetch all videos from a YouTube channel.""" # Step 0: Resolve channel ID if it's a handle time.sleep(REQUEST_DELAY) resolved_id = resolve_channel_id(channel_id, client) if not resolved_id: logger.warning(f"Could not resolve channel identifier: {channel_id}") return [] if resolved_id != channel_id: logger.info(f" Resolved {channel_id} -> {resolved_id}") # Step 1: Get uploads playlist ID time.sleep(REQUEST_DELAY) uploads_playlist_id = get_uploads_playlist_id(resolved_id, client) if not uploads_playlist_id: logger.warning(f"No uploads playlist found for channel {channel_id}") return [] # Step 2: Get video IDs from playlist time.sleep(REQUEST_DELAY) video_ids = get_playlist_videos(uploads_playlist_id, client, max_videos) if not video_ids: logger.info(f"No videos found in uploads playlist for {channel_id}") return [] logger.info(f" Found {len(video_ids)} videos in uploads playlist") # Step 3: Get detailed video metadata time.sleep(REQUEST_DELAY) videos = get_video_details(video_ids, client) logger.info(f" Fetched details for {len(videos)} videos") return videos def find_files_needing_videos() -> List[Path]: """Find custodian files that have YouTube channels but no videos. Uses ripgrep for fast filtering, then validates candidates. """ import subprocess # Use ripgrep to quickly find files with youtube_status: SUCCESS try: result = subprocess.run( ["rg", "-l", "youtube_status: SUCCESS", str(DATA_DIR)], capture_output=True, text=True, timeout=30 ) candidate_files = [Path(f.strip()) for f in result.stdout.strip().split('\n') if f.strip()] except Exception as e: logger.error(f"Error running ripgrep: {e}") return [] logger.info(f"Found {len(candidate_files)} files with youtube_status: SUCCESS") files_to_enrich = [] for yaml_file in candidate_files: try: with open(yaml_file, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: continue youtube_enrichment = data.get("youtube_enrichment", {}) channel_id = youtube_enrichment.get("channel_id") if not channel_id: continue # Check if videos already exist videos = youtube_enrichment.get("videos") if videos and len(videos) > 0: continue # Already has videos files_to_enrich.append(yaml_file) except Exception as e: logger.warning(f"Error reading {yaml_file}: {e}") return files_to_enrich def enrich_file_with_videos(yaml_file: Path, client: httpx.Client, dry_run: bool = False) -> bool: """Add video metadata to a single file.""" try: with open(yaml_file, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) youtube_enrichment = data.get("youtube_enrichment", {}) channel_id = youtube_enrichment.get("channel_id") if not channel_id: logger.warning(f"No channel_id in {yaml_file.name}") return False logger.info(f"Fetching videos for {yaml_file.name} (channel: {channel_id})") # Fetch videos videos = fetch_channel_videos(channel_id, client, max_videos=50) if dry_run: logger.info(f" [DRY-RUN] Would add {len(videos)} videos to {yaml_file.name}") return True # Update the data data["youtube_enrichment"]["videos"] = videos data["youtube_enrichment"]["videos_fetch_timestamp"] = datetime.now(timezone.utc).isoformat() # Write back with open(yaml_file, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) logger.info(f" ✅ Added {len(videos)} videos to {yaml_file.name}") return True except Exception as e: logger.error(f"Error enriching {yaml_file}: {e}") return False def main(): parser = argparse.ArgumentParser(description="Enrich YouTube channels with video metadata") parser.add_argument("--dry-run", action="store_true", help="Don't write files, just show what would be done") parser.add_argument("--limit", type=int, default=None, help="Limit number of files to process") args = parser.parse_args() if not GOOGLE_YOUTUBE_TOKEN: logger.error("GOOGLE_YOUTUBE_TOKEN environment variable not set") sys.exit(1) logger.info("Finding files needing video enrichment...") files_to_enrich = find_files_needing_videos() if not files_to_enrich: logger.info("No files need video enrichment!") return logger.info(f"Found {len(files_to_enrich)} files needing video enrichment") if args.limit: files_to_enrich = files_to_enrich[:args.limit] logger.info(f"Processing first {args.limit} files") success_count = 0 error_count = 0 with httpx.Client(timeout=30.0) as client: for i, yaml_file in enumerate(files_to_enrich, 1): logger.info(f"\n[{i}/{len(files_to_enrich)}] Processing {yaml_file.name}") if enrich_file_with_videos(yaml_file, client, args.dry_run): success_count += 1 else: error_count += 1 logger.info(f"\n{'='*60}") logger.info(f"SUMMARY: {success_count} success, {error_count} errors") if args.dry_run: logger.info("(DRY RUN - no files were modified)") if __name__ == "__main__": main()