glam/scripts/enrich_youtube_videos.py
2025-12-09 07:56:35 +01:00

346 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Enrich existing YouTube channel data with individual video metadata.
This script targets custodian files that have:
- youtube_status: SUCCESS
- youtube_enrichment with channel_id
- No videos array (or empty videos array)
It fetches video details from the YouTube Data API and adds them to the existing
youtube_enrichment section.
Usage:
python scripts/enrich_youtube_videos.py [--dry-run] [--limit N]
"""
import os
import sys
import argparse
import httpx
import yaml
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
import logging
import time
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load environment
from dotenv import load_dotenv
load_dotenv()
GOOGLE_YOUTUBE_TOKEN = os.environ.get("GOOGLE_YOUTUBE_TOKEN")
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"
DATA_DIR = Path(__file__).parent.parent / "data" / "custodian"
REQUEST_DELAY = 0.5 # Seconds between API requests
def resolve_channel_id(channel_identifier: str, client: httpx.Client) -> Optional[str]:
"""Resolve a channel identifier to a proper channel ID.
Handles:
- UC... format: Already a channel ID, return as-is
- @handle format: Resolve via channels API with forHandle parameter
"""
if channel_identifier.startswith("UC"):
return channel_identifier
if channel_identifier.startswith("@"):
# Resolve handle to channel ID
params = {
"part": "id",
"forHandle": channel_identifier,
"key": GOOGLE_YOUTUBE_TOKEN,
}
try:
response = client.get(f"{YOUTUBE_API_BASE}/channels", params=params)
response.raise_for_status()
data = response.json()
if data.get("items"):
return data["items"][0].get("id")
return None
except Exception as e:
logger.warning(f"Could not resolve handle {channel_identifier}: {e}")
return None
# Unknown format
return channel_identifier
def get_uploads_playlist_id(channel_id: str, client: httpx.Client) -> Optional[str]:
"""Get uploads playlist ID from channel."""
params = {
"part": "contentDetails",
"id": channel_id,
"key": GOOGLE_YOUTUBE_TOKEN,
}
try:
response = client.get(f"{YOUTUBE_API_BASE}/channels", params=params)
response.raise_for_status()
data = response.json()
if data.get("items"):
return data["items"][0].get("contentDetails", {}).get("relatedPlaylists", {}).get("uploads")
return None
except Exception as e:
logger.error(f"Error getting uploads playlist for {channel_id}: {e}")
return None
def get_playlist_videos(playlist_id: str, client: httpx.Client, max_results: int = 50) -> List[str]:
"""Fetch video IDs from a YouTube playlist."""
params = {
"part": "contentDetails",
"playlistId": playlist_id,
"maxResults": min(max_results, 50),
"key": GOOGLE_YOUTUBE_TOKEN,
}
try:
response = client.get(f"{YOUTUBE_API_BASE}/playlistItems", params=params)
response.raise_for_status()
data = response.json()
video_ids = []
for item in data.get("items", []):
video_id = item.get("contentDetails", {}).get("videoId")
if video_id:
video_ids.append(video_id)
return video_ids
except Exception as e:
logger.error(f"Error getting playlist videos for {playlist_id}: {e}")
return []
def get_video_details(video_ids: List[str], client: httpx.Client) -> List[Dict[str, Any]]:
"""Fetch detailed metadata for multiple videos."""
if not video_ids:
return []
params = {
"part": "snippet,contentDetails,statistics",
"id": ",".join(video_ids[:50]),
"key": GOOGLE_YOUTUBE_TOKEN,
}
try:
response = client.get(f"{YOUTUBE_API_BASE}/videos", params=params)
response.raise_for_status()
data = response.json()
videos = []
for item in data.get("items", []):
video_id = item.get("id", "")
snippet = item.get("snippet", {})
content_details = item.get("contentDetails", {})
stats = item.get("statistics", {})
video_data = {
"video_id": video_id,
"video_url": f"https://www.youtube.com/watch?v={video_id}",
"title": snippet.get("title", ""),
"description": snippet.get("description", ""),
"published_at": snippet.get("publishedAt", ""),
"duration": content_details.get("duration", ""),
"view_count": int(stats.get("viewCount", 0)) if stats.get("viewCount") else 0,
"like_count": int(stats.get("likeCount", 0)) if stats.get("likeCount") else 0,
"comment_count": int(stats.get("commentCount", 0)) if stats.get("commentCount") else 0,
"comments": [],
}
# Get highest quality thumbnail
thumbnails = snippet.get("thumbnails", {})
for quality in ["maxres", "high", "medium", "default"]:
if thumbnails.get(quality, {}).get("url"):
video_data["thumbnail_url"] = thumbnails[quality]["url"]
break
videos.append(video_data)
return videos
except Exception as e:
logger.error(f"Error getting video details: {e}")
return []
def fetch_channel_videos(channel_id: str, client: httpx.Client, max_videos: int = 50) -> List[Dict[str, Any]]:
"""Fetch all videos from a YouTube channel."""
# Step 0: Resolve channel ID if it's a handle
time.sleep(REQUEST_DELAY)
resolved_id = resolve_channel_id(channel_id, client)
if not resolved_id:
logger.warning(f"Could not resolve channel identifier: {channel_id}")
return []
if resolved_id != channel_id:
logger.info(f" Resolved {channel_id} -> {resolved_id}")
# Step 1: Get uploads playlist ID
time.sleep(REQUEST_DELAY)
uploads_playlist_id = get_uploads_playlist_id(resolved_id, client)
if not uploads_playlist_id:
logger.warning(f"No uploads playlist found for channel {channel_id}")
return []
# Step 2: Get video IDs from playlist
time.sleep(REQUEST_DELAY)
video_ids = get_playlist_videos(uploads_playlist_id, client, max_videos)
if not video_ids:
logger.info(f"No videos found in uploads playlist for {channel_id}")
return []
logger.info(f" Found {len(video_ids)} videos in uploads playlist")
# Step 3: Get detailed video metadata
time.sleep(REQUEST_DELAY)
videos = get_video_details(video_ids, client)
logger.info(f" Fetched details for {len(videos)} videos")
return videos
def find_files_needing_videos() -> List[Path]:
"""Find custodian files that have YouTube channels but no videos.
Uses ripgrep for fast filtering, then validates candidates.
"""
import subprocess
# Use ripgrep to quickly find files with youtube_status: SUCCESS
try:
result = subprocess.run(
["rg", "-l", "youtube_status: SUCCESS", str(DATA_DIR)],
capture_output=True,
text=True,
timeout=30
)
candidate_files = [Path(f.strip()) for f in result.stdout.strip().split('\n') if f.strip()]
except Exception as e:
logger.error(f"Error running ripgrep: {e}")
return []
logger.info(f"Found {len(candidate_files)} files with youtube_status: SUCCESS")
files_to_enrich = []
for yaml_file in candidate_files:
try:
with open(yaml_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
continue
youtube_enrichment = data.get("youtube_enrichment", {})
channel_id = youtube_enrichment.get("channel_id")
if not channel_id:
continue
# Check if videos already exist
videos = youtube_enrichment.get("videos")
if videos and len(videos) > 0:
continue # Already has videos
files_to_enrich.append(yaml_file)
except Exception as e:
logger.warning(f"Error reading {yaml_file}: {e}")
return files_to_enrich
def enrich_file_with_videos(yaml_file: Path, client: httpx.Client, dry_run: bool = False) -> bool:
"""Add video metadata to a single file."""
try:
with open(yaml_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
youtube_enrichment = data.get("youtube_enrichment", {})
channel_id = youtube_enrichment.get("channel_id")
if not channel_id:
logger.warning(f"No channel_id in {yaml_file.name}")
return False
logger.info(f"Fetching videos for {yaml_file.name} (channel: {channel_id})")
# Fetch videos
videos = fetch_channel_videos(channel_id, client, max_videos=50)
if dry_run:
logger.info(f" [DRY-RUN] Would add {len(videos)} videos to {yaml_file.name}")
return True
# Update the data
data["youtube_enrichment"]["videos"] = videos
data["youtube_enrichment"]["videos_fetch_timestamp"] = datetime.now(timezone.utc).isoformat()
# Write back
with open(yaml_file, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
logger.info(f" ✅ Added {len(videos)} videos to {yaml_file.name}")
return True
except Exception as e:
logger.error(f"Error enriching {yaml_file}: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="Enrich YouTube channels with video metadata")
parser.add_argument("--dry-run", action="store_true", help="Don't write files, just show what would be done")
parser.add_argument("--limit", type=int, default=None, help="Limit number of files to process")
args = parser.parse_args()
if not GOOGLE_YOUTUBE_TOKEN:
logger.error("GOOGLE_YOUTUBE_TOKEN environment variable not set")
sys.exit(1)
logger.info("Finding files needing video enrichment...")
files_to_enrich = find_files_needing_videos()
if not files_to_enrich:
logger.info("No files need video enrichment!")
return
logger.info(f"Found {len(files_to_enrich)} files needing video enrichment")
if args.limit:
files_to_enrich = files_to_enrich[:args.limit]
logger.info(f"Processing first {args.limit} files")
success_count = 0
error_count = 0
with httpx.Client(timeout=30.0) as client:
for i, yaml_file in enumerate(files_to_enrich, 1):
logger.info(f"\n[{i}/{len(files_to_enrich)}] Processing {yaml_file.name}")
if enrich_file_with_videos(yaml_file, client, args.dry_run):
success_count += 1
else:
error_count += 1
logger.info(f"\n{'='*60}")
logger.info(f"SUMMARY: {success_count} success, {error_count} errors")
if args.dry_run:
logger.info("(DRY RUN - no files were modified)")
if __name__ == "__main__":
main()