1253 lines
45 KiB
Python
Executable file
1253 lines
45 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
YouTube Enrichment Script for Heritage Custodian Entries
|
|
|
|
This script enriches heritage custodian YAML entries with YouTube channel/video data.
|
|
It finds YouTube channels from existing web_claims (social_youtube) and fetches:
|
|
- Channel info (subscribers, video count, description, etc.)
|
|
- Recent videos (title, description, views, likes, comments)
|
|
- Video transcripts (when available)
|
|
- Comments on videos
|
|
|
|
All data includes full provenance with URLs and timestamps.
|
|
|
|
Usage:
|
|
python scripts/enrich_youtube.py [--dry-run] [--limit N] [--entry ENTRY_FILE]
|
|
|
|
Environment Variables:
|
|
YOUTUBE_API_KEY: Required. Get from https://console.cloud.google.com/
|
|
|
|
Author: GLAM Data Extraction Project
|
|
Date: December 2025
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import httpx
|
|
import yaml
|
|
|
|
# Load environment variables from .env file
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
except ImportError:
|
|
pass # dotenv not installed, rely on shell environment
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"
|
|
USER_AGENT = "GLAMDataExtractor/1.0 (heritage-data@example.com) Python/httpx"
|
|
|
|
ENTRIES_DIR = Path("data/nde/enriched/entries")
|
|
|
|
# Rate limiting
|
|
REQUEST_DELAY = 0.3 # seconds between API calls (reduced for faster processing)
|
|
|
|
|
|
# ============================================================================
|
|
# Multi-API Key Management
|
|
# ============================================================================
|
|
|
|
class APIKeyManager:
|
|
"""
|
|
Manages multiple YouTube API keys with automatic rotation on quota exhaustion.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.keys = []
|
|
self.current_index = 0
|
|
self.exhausted_keys = set()
|
|
self._load_keys()
|
|
|
|
def _load_keys(self):
|
|
"""Load all available API keys from environment variables."""
|
|
# Check various environment variable patterns
|
|
key_patterns = [
|
|
"YOUTUBE_API_KEY",
|
|
"GOOGLE_YOUTUBE_TOKEN",
|
|
"GOOGLE_YOUTUBE_TOKEN_v2",
|
|
"GOOGLE_YOUTUBE_TOKEN_v3",
|
|
"GOOGLE_YOUTUBE_TOKEN_v4",
|
|
"GOOGLE_YOUTUBE_TOKEN_v5",
|
|
"YOUTUBE_API_KEY_1",
|
|
"YOUTUBE_API_KEY_2",
|
|
"YOUTUBE_API_KEY_3",
|
|
"YOUTUBE_API_KEY_4",
|
|
"YOUTUBE_API_KEY_5",
|
|
]
|
|
|
|
seen = set()
|
|
for pattern in key_patterns:
|
|
key = os.getenv(pattern)
|
|
if key and key not in seen:
|
|
self.keys.append({"key": key, "name": pattern})
|
|
seen.add(key)
|
|
|
|
if not self.keys:
|
|
print("WARNING: No YouTube API keys found in environment variables")
|
|
else:
|
|
print(f"Loaded {len(self.keys)} API key(s): {[k['name'] for k in self.keys]}")
|
|
|
|
def get_current_key(self) -> Optional[str]:
|
|
"""Get the current active API key."""
|
|
available = [k for i, k in enumerate(self.keys) if i not in self.exhausted_keys]
|
|
if not available:
|
|
return None
|
|
return available[self.current_index % len(available)]["key"]
|
|
|
|
def get_current_key_name(self) -> str:
|
|
"""Get the name of the current API key."""
|
|
available = [k for i, k in enumerate(self.keys) if i not in self.exhausted_keys]
|
|
if not available:
|
|
return "none"
|
|
return available[self.current_index % len(available)]["name"]
|
|
|
|
def mark_quota_exceeded(self):
|
|
"""Mark the current key as quota-exceeded and rotate to next."""
|
|
if not self.keys:
|
|
return
|
|
|
|
available_indices = [i for i in range(len(self.keys)) if i not in self.exhausted_keys]
|
|
if available_indices:
|
|
current_actual_index = available_indices[self.current_index % len(available_indices)]
|
|
self.exhausted_keys.add(current_actual_index)
|
|
key_name = self.keys[current_actual_index]["name"]
|
|
print(f"\n⚠️ Quota exceeded for {key_name}, rotating to next key...")
|
|
|
|
# Move to next available key
|
|
self.current_index = 0
|
|
|
|
def rotate_key(self):
|
|
"""Rotate to the next available key (for load balancing)."""
|
|
available = [k for i, k in enumerate(self.keys) if i not in self.exhausted_keys]
|
|
if len(available) > 1:
|
|
self.current_index = (self.current_index + 1) % len(available)
|
|
|
|
def has_available_keys(self) -> bool:
|
|
"""Check if any keys are still available."""
|
|
return len(self.exhausted_keys) < len(self.keys)
|
|
|
|
def get_status(self) -> str:
|
|
"""Get status string for display."""
|
|
available = len(self.keys) - len(self.exhausted_keys)
|
|
return f"{available}/{len(self.keys)} keys available"
|
|
|
|
|
|
# Global API key manager
|
|
api_key_manager = APIKeyManager()
|
|
|
|
|
|
# ============================================================================
|
|
# Helper Functions
|
|
# ============================================================================
|
|
|
|
def unwrap_safelinks_url(url: str) -> str:
|
|
"""
|
|
Unwrap Microsoft SafeLinks (Outlook email protection) URLs to get the real URL.
|
|
|
|
SafeLinks URLs look like:
|
|
https://eur05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DqTmOgmTmrSw&data=...
|
|
|
|
This function extracts the actual URL from the 'url' parameter.
|
|
If the URL is not a SafeLinks URL, it returns the original URL unchanged.
|
|
|
|
Args:
|
|
url: The URL to unwrap (may or may not be a SafeLinks URL)
|
|
|
|
Returns:
|
|
The unwrapped URL, or the original URL if not a SafeLinks URL
|
|
"""
|
|
if not url:
|
|
return url
|
|
|
|
# Check if this is a SafeLinks URL
|
|
if 'safelinks.protection.outlook.com' not in url:
|
|
return url
|
|
|
|
try:
|
|
from urllib.parse import urlparse, parse_qs, unquote
|
|
|
|
parsed = urlparse(url)
|
|
params = parse_qs(parsed.query)
|
|
|
|
if 'url' in params and params['url']:
|
|
real_url = unquote(params['url'][0])
|
|
return real_url
|
|
except Exception:
|
|
pass # If parsing fails, return original URL
|
|
|
|
return url
|
|
|
|
|
|
def extract_channel_id_or_username(youtube_url: str) -> Tuple[Optional[str], str]:
|
|
"""
|
|
Extract YouTube channel ID, username, or video ID from various URL formats.
|
|
|
|
Returns:
|
|
Tuple of (identifier, identifier_type) where type is:
|
|
- 'channel_id': Direct channel ID (UCxxxxx)
|
|
- 'username': Legacy /user/name format
|
|
- 'handle': New /@name format
|
|
- 'custom_url': /c/name or bare /name format
|
|
- 'video_id': Video ID from watch?v= URL (will resolve to channel)
|
|
"""
|
|
if not youtube_url:
|
|
return None, ""
|
|
|
|
# Channel ID format: /channel/UCxxxxx (UC + 22 chars = 24 total)
|
|
match = re.search(r'youtube\.com/channel/(UC[0-9A-Za-z_-]{22})', youtube_url)
|
|
if match:
|
|
return match.group(1), "channel_id"
|
|
|
|
# Handle format: /@username
|
|
match = re.search(r'youtube\.com/@([^/?&]+)', youtube_url)
|
|
if match:
|
|
return match.group(1), "handle"
|
|
|
|
# User format: /user/username
|
|
match = re.search(r'youtube\.com/user/([^/?&]+)', youtube_url)
|
|
if match:
|
|
return match.group(1), "username"
|
|
|
|
# Custom URL format: /c/customname
|
|
match = re.search(r'youtube\.com/c/([^/?&]+)', youtube_url)
|
|
if match:
|
|
return match.group(1), "custom_url"
|
|
|
|
# Video URL format: watch?v=VIDEO_ID (we'll resolve channel from video)
|
|
match = re.search(r'youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})', youtube_url)
|
|
if match:
|
|
return match.group(1), "video_id"
|
|
|
|
# Short video URL format: youtu.be/VIDEO_ID
|
|
match = re.search(r'youtu\.be/([a-zA-Z0-9_-]{11})', youtube_url)
|
|
if match:
|
|
return match.group(1), "video_id"
|
|
|
|
# Shorts format: youtube.com/shorts/VIDEO_ID
|
|
match = re.search(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})', youtube_url)
|
|
if match:
|
|
return match.group(1), "video_id"
|
|
|
|
# Direct custom URL format: youtube.com/customname (no prefix)
|
|
# Must be after all other patterns to avoid false matches
|
|
match = re.search(r'youtube\.com/([a-zA-Z][a-zA-Z0-9_-]{2,})(?:[/?]|$)', youtube_url)
|
|
if match:
|
|
# Exclude known paths that aren't custom URLs
|
|
name = match.group(1)
|
|
excluded = {'watch', 'playlist', 'channel', 'user', 'c', 'results', 'feed', 'gaming', 'shorts', 'live'}
|
|
if name.lower() not in excluded:
|
|
return name, "custom_url"
|
|
|
|
return None, ""
|
|
|
|
|
|
def get_channel_id_from_video(video_id: str, api_key: str) -> Optional[str]:
|
|
"""
|
|
Get the channel ID from a video ID.
|
|
"""
|
|
params = {
|
|
"part": "snippet",
|
|
"id": video_id,
|
|
"key": api_key
|
|
}
|
|
|
|
try:
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/videos",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get("items"):
|
|
return data["items"][0]["snippet"]["channelId"]
|
|
except Exception as e:
|
|
print(f" Warning: Could not get channel from video '{video_id}': {e}")
|
|
|
|
return None
|
|
|
|
|
|
def resolve_channel_id(identifier: str, id_type: str, api_key: str) -> Optional[str]:
|
|
"""
|
|
Resolve a username, handle, custom URL, or video ID to a channel ID.
|
|
"""
|
|
if id_type == "channel_id":
|
|
return identifier
|
|
|
|
# For video IDs, get the channel from the video
|
|
if id_type == "video_id":
|
|
return get_channel_id_from_video(identifier, api_key)
|
|
|
|
# Use search to find channel
|
|
search_params = {
|
|
"part": "snippet",
|
|
"type": "channel",
|
|
"maxResults": 1,
|
|
"key": api_key
|
|
}
|
|
|
|
if id_type == "handle":
|
|
search_params["q"] = f"@{identifier}"
|
|
else:
|
|
search_params["q"] = identifier
|
|
|
|
try:
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/search",
|
|
params=search_params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get("items"):
|
|
return data["items"][0]["id"]["channelId"]
|
|
except httpx.HTTPStatusError as e:
|
|
error_str = str(e)
|
|
if e.response.status_code == 403 or "quota" in error_str.lower():
|
|
# Re-raise quota errors for key rotation
|
|
raise Exception(f"quotaExceeded: {error_str}")
|
|
print(f" Warning: Could not resolve {id_type} '{identifier}': {e}")
|
|
except Exception as e:
|
|
print(f" Warning: Could not resolve {id_type} '{identifier}': {e}")
|
|
|
|
return None
|
|
|
|
|
|
def get_channel_info(channel_id: str, api_key: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed channel information from YouTube Data API.
|
|
"""
|
|
params = {
|
|
"part": "snippet,statistics,brandingSettings,contentDetails",
|
|
"id": channel_id,
|
|
"key": api_key
|
|
}
|
|
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/channels",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data.get("items"):
|
|
return {"error": f"Channel not found: {channel_id}"}
|
|
|
|
item = data["items"][0]
|
|
snippet = item.get("snippet", {})
|
|
stats = item.get("statistics", {})
|
|
branding = item.get("brandingSettings", {})
|
|
|
|
return {
|
|
"channel_id": channel_id,
|
|
"channel_url": f"https://www.youtube.com/channel/{channel_id}",
|
|
"title": snippet.get("title"),
|
|
"description": snippet.get("description"),
|
|
"custom_url": snippet.get("customUrl"),
|
|
"published_at": snippet.get("publishedAt"),
|
|
"country": snippet.get("country"),
|
|
"default_language": snippet.get("defaultLanguage"),
|
|
"thumbnail_url": snippet.get("thumbnails", {}).get("high", {}).get("url"),
|
|
"banner_url": branding.get("image", {}).get("bannerExternalUrl"),
|
|
"subscriber_count": int(stats.get("subscriberCount", 0)) if stats.get("subscriberCount") else None,
|
|
"video_count": int(stats.get("videoCount", 0)) if stats.get("videoCount") else None,
|
|
"view_count": int(stats.get("viewCount", 0)) if stats.get("viewCount") else None,
|
|
"subscriber_count_hidden": stats.get("hiddenSubscriberCount", False),
|
|
"uploads_playlist_id": item.get("contentDetails", {}).get("relatedPlaylists", {}).get("uploads"),
|
|
}
|
|
|
|
|
|
def get_all_videos_from_playlist(playlist_id: str, api_key: str) -> List[str]:
|
|
"""
|
|
Get ALL video IDs from a playlist using playlistItems API.
|
|
|
|
This is MUCH more quota-efficient than search API:
|
|
- playlistItems: 1 quota unit per request
|
|
- search: 100 quota units per request
|
|
|
|
Every YouTube channel has an "uploads" playlist containing ALL videos.
|
|
|
|
Args:
|
|
playlist_id: The uploads playlist ID (format: UU... derived from channel ID UC...)
|
|
api_key: YouTube API key
|
|
|
|
Returns:
|
|
List of all video IDs in the playlist
|
|
"""
|
|
import time
|
|
|
|
video_ids = []
|
|
next_page_token = None
|
|
page_count = 0
|
|
|
|
while True:
|
|
params = {
|
|
"part": "contentDetails", # Only need video IDs, not full snippets
|
|
"playlistId": playlist_id,
|
|
"maxResults": 50, # Max allowed per request
|
|
"key": api_key
|
|
}
|
|
|
|
if next_page_token:
|
|
params["pageToken"] = next_page_token
|
|
|
|
try:
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/playlistItems",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Extract video IDs
|
|
for item in data.get("items", []):
|
|
video_id = item.get("contentDetails", {}).get("videoId")
|
|
if video_id:
|
|
video_ids.append(video_id)
|
|
|
|
page_count += 1
|
|
next_page_token = data.get("nextPageToken")
|
|
|
|
if not next_page_token:
|
|
break
|
|
|
|
# Brief delay to be polite to the API
|
|
time.sleep(0.05)
|
|
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
if "403" in error_str or "quota" in error_str.lower():
|
|
raise Exception(f"quotaExceeded: {error_str}")
|
|
print(f" Warning: Error fetching playlist page {page_count}: {e}")
|
|
break
|
|
|
|
return video_ids
|
|
|
|
|
|
def get_channel_videos_by_year(channel_id: str, api_key: str, year: int) -> List[str]:
|
|
"""
|
|
Get all video IDs from a channel for a specific year using date range filtering.
|
|
|
|
Args:
|
|
channel_id: YouTube channel ID
|
|
api_key: YouTube API key
|
|
year: Year to fetch videos for
|
|
|
|
Returns:
|
|
List of video IDs
|
|
"""
|
|
video_ids = []
|
|
next_page_token = None
|
|
|
|
# Date range for this year
|
|
published_after = f"{year}-01-01T00:00:00Z"
|
|
published_before = f"{year}-12-31T23:59:59Z"
|
|
|
|
while True:
|
|
search_params = {
|
|
"part": "snippet",
|
|
"channelId": channel_id,
|
|
"type": "video",
|
|
"order": "date",
|
|
"maxResults": 50,
|
|
"publishedAfter": published_after,
|
|
"publishedBefore": published_before,
|
|
"key": api_key
|
|
}
|
|
|
|
if next_page_token:
|
|
search_params["pageToken"] = next_page_token
|
|
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/search",
|
|
params=search_params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
search_data = response.json()
|
|
|
|
page_ids = [item["id"]["videoId"] for item in search_data.get("items", [])]
|
|
video_ids.extend(page_ids)
|
|
|
|
next_page_token = search_data.get("nextPageToken")
|
|
if not next_page_token:
|
|
break
|
|
|
|
# Brief delay between pagination requests
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
return video_ids
|
|
|
|
|
|
def get_channel_videos(channel_id: str, api_key: str, max_results: int = None, uploads_playlist_id: str = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get ALL videos from a YouTube channel using the uploads playlist (quota-efficient).
|
|
|
|
Uses playlistItems API (1 quota unit) instead of search API (100 quota units).
|
|
Every channel has an uploads playlist that contains ALL their videos.
|
|
|
|
Args:
|
|
channel_id: YouTube channel ID
|
|
api_key: YouTube API key
|
|
max_results: Maximum videos to fetch (None = unlimited, fetch ALL)
|
|
uploads_playlist_id: Optional - the uploads playlist ID (UU...). If not provided,
|
|
it will be derived from channel ID (UC... -> UU...)
|
|
|
|
Returns:
|
|
List of video dictionaries with full metadata
|
|
"""
|
|
import time
|
|
|
|
# Derive uploads playlist ID from channel ID if not provided
|
|
# YouTube convention: Channel ID starts with "UC", uploads playlist starts with "UU"
|
|
if not uploads_playlist_id:
|
|
if channel_id.startswith("UC"):
|
|
uploads_playlist_id = "UU" + channel_id[2:]
|
|
else:
|
|
print(f" Warning: Cannot derive uploads playlist from channel ID {channel_id}")
|
|
return []
|
|
|
|
print(f" Using uploads playlist: {uploads_playlist_id}")
|
|
|
|
# Get all video IDs from the uploads playlist (very quota-efficient!)
|
|
all_video_ids = get_all_videos_from_playlist(uploads_playlist_id, api_key)
|
|
|
|
if not all_video_ids:
|
|
return []
|
|
|
|
# Apply max_results limit if specified
|
|
if max_results and len(all_video_ids) > max_results:
|
|
all_video_ids = all_video_ids[:max_results]
|
|
|
|
print(f" Found {len(all_video_ids)} videos in uploads playlist")
|
|
|
|
# Get detailed video info in batches of 50 (API limit)
|
|
all_videos = []
|
|
for i in range(0, len(all_video_ids), 50):
|
|
batch_ids = all_video_ids[i:i+50]
|
|
|
|
video_params = {
|
|
"part": "snippet,contentDetails,statistics",
|
|
"id": ",".join(batch_ids),
|
|
"key": api_key
|
|
}
|
|
|
|
try:
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/videos",
|
|
params=video_params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
video_data = response.json()
|
|
|
|
for item in video_data.get("items", []):
|
|
snippet = item.get("snippet", {})
|
|
stats = item.get("statistics", {})
|
|
content = item.get("contentDetails", {})
|
|
|
|
all_videos.append({
|
|
"video_id": item["id"],
|
|
"video_url": f"https://www.youtube.com/watch?v={item['id']}",
|
|
"title": snippet.get("title"),
|
|
"description": snippet.get("description", ""), # Full description
|
|
"published_at": snippet.get("publishedAt"),
|
|
"duration": content.get("duration"),
|
|
"definition": content.get("definition"),
|
|
"caption_available": content.get("caption") == "true",
|
|
"view_count": int(stats.get("viewCount", 0)) if stats.get("viewCount") else None,
|
|
"like_count": int(stats.get("likeCount", 0)) if stats.get("likeCount") else None,
|
|
"comment_count": int(stats.get("commentCount", 0)) if stats.get("commentCount") else None,
|
|
"tags": snippet.get("tags", []), # All tags
|
|
"thumbnail_url": snippet.get("thumbnails", {}).get("high", {}).get("url"),
|
|
"default_language": snippet.get("defaultLanguage"),
|
|
"default_audio_language": snippet.get("defaultAudioLanguage"),
|
|
"category_id": snippet.get("categoryId"),
|
|
"live_broadcast_content": snippet.get("liveBroadcastContent"),
|
|
})
|
|
|
|
# Brief delay between batches
|
|
time.sleep(0.05)
|
|
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
if "403" in error_str or "quota" in error_str.lower():
|
|
raise Exception(f"quotaExceeded: {error_str}")
|
|
print(f" Warning: Error fetching video batch: {e}")
|
|
continue
|
|
|
|
return all_videos
|
|
|
|
|
|
def get_video_comments(video_id: str, api_key: str, max_results: int = 50) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get top-level comments on a video.
|
|
"""
|
|
params = {
|
|
"part": "snippet",
|
|
"videoId": video_id,
|
|
"order": "relevance",
|
|
"maxResults": min(max_results, 100),
|
|
"textFormat": "plainText",
|
|
"key": api_key
|
|
}
|
|
|
|
try:
|
|
response = httpx.get(
|
|
f"{YOUTUBE_API_BASE}/commentThreads",
|
|
params=params,
|
|
headers={"User-Agent": USER_AGENT},
|
|
timeout=30.0
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
comments = []
|
|
for item in data.get("items", []):
|
|
snippet = item.get("snippet", {}).get("topLevelComment", {}).get("snippet", {})
|
|
comments.append({
|
|
"comment_id": item["id"],
|
|
"author_display_name": snippet.get("authorDisplayName"),
|
|
"author_channel_url": snippet.get("authorChannelUrl"),
|
|
"text": snippet.get("textDisplay", "")[:1000], # Truncate
|
|
"like_count": snippet.get("likeCount", 0),
|
|
"published_at": snippet.get("publishedAt"),
|
|
"updated_at": snippet.get("updatedAt"),
|
|
"reply_count": item.get("snippet", {}).get("totalReplyCount", 0),
|
|
})
|
|
|
|
return comments
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 403:
|
|
# Comments disabled for this video
|
|
return []
|
|
raise
|
|
|
|
|
|
def get_video_transcript(video_id: str, language: str = "en") -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get video transcript using yt-dlp.
|
|
"""
|
|
video_url = f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
try:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
result = subprocess.run(
|
|
[
|
|
"yt-dlp",
|
|
"--write-subs",
|
|
"--write-auto-subs",
|
|
"--sub-langs", f"{language},nl,en",
|
|
"--sub-format", "vtt",
|
|
"--skip-download",
|
|
"--output", f"{tmpdir}/%(id)s",
|
|
video_url
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60
|
|
)
|
|
|
|
import glob
|
|
vtt_files = glob.glob(f"{tmpdir}/*.vtt")
|
|
|
|
if vtt_files:
|
|
with open(vtt_files[0], 'r', encoding='utf-8') as f:
|
|
vtt_content = f.read()
|
|
|
|
# Parse VTT to extract text
|
|
lines = []
|
|
for line in vtt_content.split('\n'):
|
|
line = line.strip()
|
|
if line and not line.startswith('WEBVTT') and not line.startswith('Kind:') \
|
|
and not line.startswith('Language:') and '-->' not in line \
|
|
and not re.match(r'^\d+$', line):
|
|
clean_line = re.sub(r'<[^>]+>', '', line)
|
|
if clean_line:
|
|
lines.append(clean_line)
|
|
|
|
# Remove duplicate consecutive lines
|
|
deduped = []
|
|
for line in lines:
|
|
if not deduped or line != deduped[-1]:
|
|
deduped.append(line)
|
|
|
|
transcript = ' '.join(deduped)
|
|
|
|
# Determine language from filename
|
|
detected_lang = "unknown"
|
|
if ".nl." in vtt_files[0]:
|
|
detected_lang = "nl"
|
|
elif ".en." in vtt_files[0]:
|
|
detected_lang = "en"
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"language": detected_lang,
|
|
"transcript_type": "auto" if ".auto." in vtt_files[0] else "manual",
|
|
"transcript_text": transcript[:10000], # Truncate very long transcripts
|
|
"transcript_length_chars": len(transcript),
|
|
"extraction_method": "yt-dlp",
|
|
}
|
|
|
|
return None
|
|
|
|
except FileNotFoundError:
|
|
return {"error": "yt-dlp not installed"}
|
|
except subprocess.TimeoutExpired:
|
|
return {"error": "Transcript extraction timed out"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def find_youtube_url_in_entry(entry: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Find YouTube URL from web_claims or wikidata in an entry.
|
|
|
|
IMPORTANT: Only returns URLs classified as actual channel links (social_youtube_channel).
|
|
Video links (social_youtube_video) are explicitly skipped to prevent wrong channel
|
|
attribution - e.g., a news video ABOUT an institution should NOT be treated as
|
|
the institution's official YouTube channel.
|
|
|
|
Also handles:
|
|
- Unwrapping Microsoft SafeLinks URLs
|
|
- Legacy 'social_youtube' claim type (treated as unknown, needs classification)
|
|
|
|
Returns:
|
|
Tuple of (youtube_url, skip_reason) where:
|
|
- youtube_url: The channel URL if found, None otherwise
|
|
- skip_reason: If skipped, explains why (e.g., 'video_link_not_channel')
|
|
"""
|
|
# Check web_claims for YouTube
|
|
web_claims = entry.get("web_claims", {}).get("claims", [])
|
|
|
|
channel_url = None
|
|
video_url = None
|
|
legacy_url = None
|
|
|
|
for claim in web_claims:
|
|
claim_type = claim.get("claim_type", "")
|
|
url = claim.get("claim_value")
|
|
|
|
if not url:
|
|
continue
|
|
|
|
# Unwrap SafeLinks if necessary
|
|
url = unwrap_safelinks_url(url)
|
|
|
|
if claim_type == "social_youtube_channel":
|
|
# This is a verified channel link - use it
|
|
channel_url = url
|
|
break # Prefer channel links
|
|
|
|
elif claim_type == "social_youtube_video":
|
|
# This is a video link - DO NOT use for channel enrichment
|
|
video_url = url
|
|
# Don't break, keep looking for channel links
|
|
|
|
elif claim_type == "social_youtube":
|
|
# Legacy claim type - needs classification
|
|
# Check if it looks like a channel URL
|
|
legacy_url = url
|
|
|
|
# If we found a channel link, use it
|
|
if channel_url:
|
|
return channel_url, None
|
|
|
|
# If we found a video link but no channel link, skip with explanation
|
|
if video_url and not channel_url:
|
|
return None, f"video_link_not_channel: {video_url}"
|
|
|
|
# Handle legacy 'social_youtube' claims by classifying them
|
|
if legacy_url:
|
|
# Check if it's a channel URL pattern
|
|
channel_patterns = ['/@', '/channel/UC', '/user/', '/c/']
|
|
is_channel = any(pattern in legacy_url for pattern in channel_patterns)
|
|
|
|
if is_channel:
|
|
return legacy_url, None
|
|
else:
|
|
return None, f"legacy_video_link: {legacy_url}"
|
|
|
|
# Check wikidata for YouTube channel ID (P2397) - this is always a channel
|
|
wikidata = entry.get("wikidata_enrichment", {})
|
|
claims = wikidata.get("wikidata_claims", {})
|
|
|
|
youtube_claim = claims.get("P2397_youtube_channel_id")
|
|
if youtube_claim:
|
|
channel_id = youtube_claim.get("value")
|
|
if channel_id:
|
|
return f"https://www.youtube.com/channel/{channel_id}", None
|
|
|
|
return None, None
|
|
|
|
|
|
def create_youtube_enrichment(
|
|
youtube_url: str,
|
|
api_key: str,
|
|
fetch_comments_per_video: int = 100,
|
|
fetch_transcripts: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create full YouTube enrichment data with provenance.
|
|
Fetches ALL videos from the channel (no limit).
|
|
"""
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
enrichment = {
|
|
"source_url": youtube_url,
|
|
"fetch_timestamp": timestamp,
|
|
"api_endpoint": YOUTUBE_API_BASE,
|
|
"api_version": "v3",
|
|
}
|
|
|
|
# Extract channel identifier
|
|
identifier, id_type = extract_channel_id_or_username(youtube_url)
|
|
|
|
if not identifier:
|
|
enrichment["error"] = f"Could not parse YouTube URL: {youtube_url}"
|
|
enrichment["status"] = "FAILED"
|
|
return enrichment
|
|
|
|
enrichment["identifier_type"] = id_type
|
|
enrichment["identifier_value"] = identifier
|
|
|
|
# Resolve to channel ID
|
|
channel_id = resolve_channel_id(identifier, id_type, api_key)
|
|
|
|
if not channel_id:
|
|
enrichment["error"] = f"Could not resolve channel ID for: {identifier}"
|
|
enrichment["status"] = "FAILED"
|
|
return enrichment
|
|
|
|
try:
|
|
# Get channel info
|
|
print(f" Fetching channel info for {channel_id}...")
|
|
channel_info = get_channel_info(channel_id, api_key)
|
|
enrichment["channel"] = channel_info
|
|
|
|
# Get ALL videos from channel (using year-based queries)
|
|
print(f" Fetching ALL videos from channel...")
|
|
videos = get_channel_videos(channel_id, api_key) # No limit = fetch ALL
|
|
enrichment["videos"] = videos
|
|
enrichment["videos_count"] = len(videos)
|
|
|
|
# Get comments for ALL videos
|
|
if fetch_comments_per_video > 0 and videos:
|
|
print(f" Fetching comments for all {len(videos)} videos...")
|
|
for i, video in enumerate(videos):
|
|
video_id = video["video_id"]
|
|
comments = get_video_comments(video_id, api_key, fetch_comments_per_video)
|
|
videos[i]["comments"] = comments
|
|
videos[i]["comments_fetched"] = len(comments)
|
|
|
|
# Get transcripts for ALL videos with captions
|
|
if fetch_transcripts and videos:
|
|
captions_count = sum(1 for v in videos if v.get("caption_available"))
|
|
if captions_count > 0:
|
|
print(f" Fetching transcripts for {captions_count} videos with captions...")
|
|
for i, video in enumerate(videos):
|
|
if video.get("caption_available"):
|
|
video_id = video["video_id"]
|
|
transcript = get_video_transcript(video_id)
|
|
if transcript and not transcript.get("error"):
|
|
videos[i]["transcript"] = transcript
|
|
|
|
enrichment["status"] = "SUCCESS"
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
enrichment["error"] = f"YouTube API error: {e.response.status_code}"
|
|
enrichment["status"] = "FAILED"
|
|
except Exception as e:
|
|
enrichment["error"] = str(e)
|
|
enrichment["status"] = "FAILED"
|
|
|
|
return enrichment
|
|
|
|
|
|
def update_provenance(entry: Dict[str, Any], enrichment: Dict[str, Any]) -> None:
|
|
"""
|
|
Update provenance section with YouTube enrichment source.
|
|
Only lists claims that were actually extracted.
|
|
"""
|
|
if "provenance" not in entry:
|
|
entry["provenance"] = {"sources": {}}
|
|
|
|
if "sources" not in entry["provenance"]:
|
|
entry["provenance"]["sources"] = {}
|
|
|
|
if "youtube" not in entry["provenance"]["sources"]:
|
|
entry["provenance"]["sources"]["youtube"] = []
|
|
|
|
# Build list of claims that were actually extracted
|
|
claims_extracted = []
|
|
|
|
channel = enrichment.get("channel", {})
|
|
if channel and not channel.get("error"):
|
|
claims_extracted.append("channel_info")
|
|
if channel.get("subscriber_count") is not None:
|
|
claims_extracted.append("subscriber_count")
|
|
if channel.get("video_count") is not None:
|
|
claims_extracted.append("video_count")
|
|
if channel.get("view_count") is not None:
|
|
claims_extracted.append("view_count")
|
|
|
|
videos = enrichment.get("videos", [])
|
|
if videos:
|
|
claims_extracted.append(f"recent_videos ({len(videos)} videos)")
|
|
|
|
# Check for comments
|
|
videos_with_comments = sum(1 for v in videos if v.get("comments"))
|
|
if videos_with_comments > 0:
|
|
total_comments = sum(len(v.get("comments", [])) for v in videos)
|
|
claims_extracted.append(f"video_comments ({total_comments} comments from {videos_with_comments} videos)")
|
|
|
|
# Check for transcripts
|
|
videos_with_transcripts = sum(1 for v in videos if v.get("transcript"))
|
|
if videos_with_transcripts > 0:
|
|
claims_extracted.append(f"video_transcripts ({videos_with_transcripts} videos)")
|
|
|
|
source_entry = {
|
|
"source_type": "youtube_data_api",
|
|
"fetch_timestamp": enrichment.get("fetch_timestamp"),
|
|
"api_endpoint": enrichment.get("api_endpoint"),
|
|
"channel_id": channel.get("channel_id") if channel else None,
|
|
"claims_extracted": claims_extracted
|
|
}
|
|
|
|
entry["provenance"]["sources"]["youtube"].append(source_entry)
|
|
|
|
|
|
def process_entry(entry_path: Path, api_key: str, dry_run: bool = False, force: bool = False) -> bool:
|
|
"""
|
|
Process a single entry file and add YouTube enrichment.
|
|
|
|
Args:
|
|
entry_path: Path to the YAML entry file
|
|
api_key: YouTube API key
|
|
dry_run: If True, don't make changes
|
|
force: If True, re-process even if already enriched
|
|
"""
|
|
print(f"\nProcessing: {entry_path.name}")
|
|
|
|
# Load entry
|
|
with open(entry_path, 'r', encoding='utf-8') as f:
|
|
entry = yaml.safe_load(f)
|
|
|
|
# Check if already FULLY enriched (skip unless force)
|
|
# Partial enrichment should be re-processed:
|
|
# - SUCCESS but no videos
|
|
# - FAILED status
|
|
# - SUCCESS but fetched videos < channel's total videos (incomplete fetch)
|
|
# - SUCCESS but videos have no comments (old script didn't fetch comments properly)
|
|
yt_enrichment = entry.get("youtube_enrichment", {})
|
|
status = yt_enrichment.get("status")
|
|
videos = yt_enrichment.get("videos", [])
|
|
has_videos = len(videos) > 0
|
|
|
|
# Check if we got ALL videos from the channel
|
|
channel_info = yt_enrichment.get("channel", {})
|
|
channel_video_count = channel_info.get("video_count", 0)
|
|
fetched_video_count = len(videos)
|
|
|
|
# Check if videos have comments (old script may not have fetched them)
|
|
videos_with_comments = sum(1 for v in videos if v.get("comments"))
|
|
|
|
# Determine enrichment completeness
|
|
is_incomplete_videos = channel_video_count > 0 and fetched_video_count < channel_video_count
|
|
is_missing_comments = has_videos and videos_with_comments == 0
|
|
|
|
is_fully_enriched = (
|
|
status == "SUCCESS" and
|
|
has_videos and
|
|
not is_incomplete_videos and
|
|
not is_missing_comments
|
|
)
|
|
is_partial = status == "SUCCESS" and (not has_videos or is_incomplete_videos or is_missing_comments)
|
|
is_failed = status == "FAILED"
|
|
|
|
if not force and is_fully_enriched:
|
|
# Show fetched count, and channel count only if different
|
|
video_str = f"{fetched_video_count} videos" if fetched_video_count == channel_video_count else f"{fetched_video_count} videos (channel reports {channel_video_count})"
|
|
print(f" Already fully enriched ({video_str}, {videos_with_comments} with comments), skipping...")
|
|
return False
|
|
|
|
if is_partial:
|
|
reasons = []
|
|
if not has_videos:
|
|
reasons.append("no videos")
|
|
if is_incomplete_videos:
|
|
reasons.append(f"incomplete: {fetched_video_count}/{channel_video_count} videos")
|
|
if is_missing_comments:
|
|
reasons.append("missing comments")
|
|
print(f" Partial enrichment detected ({', '.join(reasons)}), re-processing...")
|
|
elif is_failed:
|
|
print(f" Previous enrichment FAILED, re-processing...")
|
|
elif force and is_fully_enriched:
|
|
print(f" Re-processing (--force enabled)...")
|
|
|
|
# Check if there's an existing channel_id from partial enrichment
|
|
# Check multiple possible locations where channel_id might be stored
|
|
existing_channel_id = None
|
|
|
|
# Location 1: entry.youtube (old format)
|
|
youtube_data = entry.get("youtube")
|
|
if youtube_data:
|
|
if isinstance(youtube_data, list) and len(youtube_data) > 0:
|
|
existing_channel_id = youtube_data[0].get("channel_id")
|
|
elif isinstance(youtube_data, dict):
|
|
existing_channel_id = youtube_data.get("channel_id")
|
|
|
|
# Location 2: provenance.sources.youtube (current format)
|
|
if not existing_channel_id:
|
|
prov_sources = entry.get("provenance", {}).get("sources", {})
|
|
prov_youtube = prov_sources.get("youtube")
|
|
if prov_youtube:
|
|
if isinstance(prov_youtube, list) and len(prov_youtube) > 0:
|
|
existing_channel_id = prov_youtube[0].get("channel_id")
|
|
elif isinstance(prov_youtube, dict):
|
|
existing_channel_id = prov_youtube.get("channel_id")
|
|
|
|
# Location 3: youtube_enrichment (current script output)
|
|
if not existing_channel_id:
|
|
yt_enrichment = entry.get("youtube_enrichment", {})
|
|
if isinstance(yt_enrichment, dict):
|
|
existing_channel_id = yt_enrichment.get("channel_id")
|
|
|
|
# Find YouTube URL (returns tuple of (url, skip_reason))
|
|
youtube_url, skip_reason = find_youtube_url_in_entry(entry)
|
|
|
|
# If we have a channel_id, construct a direct channel URL to avoid search API
|
|
if existing_channel_id and existing_channel_id.startswith("UC"):
|
|
youtube_url = f"https://www.youtube.com/channel/{existing_channel_id}"
|
|
print(f" Using existing channel ID: {existing_channel_id}")
|
|
elif not youtube_url:
|
|
if skip_reason:
|
|
# Explicitly skipped due to video link (not channel)
|
|
print(f" Skipping: {skip_reason}")
|
|
else:
|
|
print(f" No YouTube URL found, skipping...")
|
|
return False
|
|
else:
|
|
print(f" Found YouTube channel URL: {youtube_url}")
|
|
|
|
if dry_run:
|
|
print(f" [DRY RUN] Would enrich with YouTube data")
|
|
return True
|
|
|
|
# Create enrichment (fetches ALL videos, comments, and transcripts)
|
|
enrichment = create_youtube_enrichment(
|
|
youtube_url=youtube_url,
|
|
api_key=api_key,
|
|
fetch_comments_per_video=100, # Get more comments per video
|
|
fetch_transcripts=True
|
|
)
|
|
|
|
# Check for quota exceeded error - raise exception to trigger key rotation
|
|
error_msg = enrichment.get("error", "")
|
|
if "403" in error_msg or "quota" in error_msg.lower():
|
|
raise Exception(f"quotaExceeded: {error_msg}")
|
|
|
|
# Add to entry
|
|
entry["youtube_enrichment"] = enrichment
|
|
|
|
# Update provenance
|
|
if enrichment.get("status") == "SUCCESS":
|
|
update_provenance(entry, enrichment)
|
|
|
|
# Save entry
|
|
with open(entry_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(entry, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
status = enrichment.get("status", "UNKNOWN")
|
|
print(f" Status: {status}")
|
|
|
|
if status == "SUCCESS":
|
|
channel = enrichment.get("channel", {})
|
|
videos = enrichment.get("videos", [])
|
|
print(f" Channel: {channel.get('title')}")
|
|
print(f" Subscribers: {channel.get('subscriber_count'):,}" if channel.get('subscriber_count') else " Subscribers: Hidden")
|
|
print(f" Videos fetched: {len(videos)}")
|
|
|
|
return status == "SUCCESS"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Enrich heritage custodian entries with YouTube channel data"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be done without making changes"
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=None,
|
|
help="Limit number of entries to process"
|
|
)
|
|
parser.add_argument(
|
|
"--entry",
|
|
type=str,
|
|
default=None,
|
|
help="Process a specific entry file (filename or full path)"
|
|
)
|
|
parser.add_argument(
|
|
"--skip-existing",
|
|
action="store_true",
|
|
default=True,
|
|
help="Skip entries that already have YouTube enrichment (default: True)"
|
|
)
|
|
parser.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Re-process entries even if already enriched (to fetch missing comments/transcripts)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check API keys
|
|
if not api_key_manager.has_available_keys():
|
|
print("ERROR: No YouTube API keys found in environment variables")
|
|
print("\nTo get an API key:")
|
|
print("1. Go to https://console.cloud.google.com/")
|
|
print("2. Create a project and enable YouTube Data API v3")
|
|
print("3. Create an API key under Credentials")
|
|
print("4. Set one or more of:")
|
|
print(" export YOUTUBE_API_KEY='your-key-here'")
|
|
print(" export GOOGLE_YOUTUBE_TOKEN='your-key-here'")
|
|
print(" export GOOGLE_YOUTUBE_TOKEN_v2='your-second-key'")
|
|
print(" export GOOGLE_YOUTUBE_TOKEN_v3='your-third-key'")
|
|
sys.exit(1)
|
|
|
|
print("=" * 60)
|
|
print("YouTube Enrichment Script for Heritage Custodians")
|
|
print("=" * 60)
|
|
print(f"API Keys: {api_key_manager.get_status()}")
|
|
current_key = api_key_manager.get_current_key()
|
|
if current_key:
|
|
print(f"Current key: {api_key_manager.get_current_key_name()} ({current_key[:8]}...{current_key[-4:]})")
|
|
print(f"Entries directory: {ENTRIES_DIR}")
|
|
print(f"Dry run: {args.dry_run}")
|
|
|
|
# Collect entries to process
|
|
if args.entry:
|
|
entry_path = Path(args.entry)
|
|
if not entry_path.exists():
|
|
entry_path = ENTRIES_DIR / args.entry
|
|
if not entry_path.exists():
|
|
print(f"ERROR: Entry not found: {args.entry}")
|
|
sys.exit(1)
|
|
entries = [entry_path]
|
|
else:
|
|
entries = sorted(ENTRIES_DIR.glob("*.yaml"))
|
|
|
|
if args.limit:
|
|
entries = entries[:args.limit]
|
|
|
|
print(f"Entries to process: {len(entries)}")
|
|
print("=" * 60)
|
|
|
|
# Process entries
|
|
success_count = 0
|
|
skip_count = 0
|
|
error_count = 0
|
|
|
|
for entry_path in entries:
|
|
# Check if we still have available keys
|
|
if not api_key_manager.has_available_keys():
|
|
print("\n" + "=" * 60)
|
|
print("⚠️ ALL API KEYS EXHAUSTED - Stopping enrichment")
|
|
print("=" * 60)
|
|
print("All API keys have exceeded their daily quota.")
|
|
print("Please wait 24 hours for quota reset, or add more keys.")
|
|
break
|
|
|
|
try:
|
|
current_key = api_key_manager.get_current_key()
|
|
result = process_entry(entry_path, current_key, args.dry_run, args.force)
|
|
if result:
|
|
success_count += 1
|
|
# Rotate key after successful enrichment to distribute load
|
|
api_key_manager.rotate_key()
|
|
else:
|
|
skip_count += 1
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
# Check for quota exceeded error
|
|
if "quotaExceeded" in error_str or "403" in error_str:
|
|
print(f"\n ⚠️ Quota exceeded detected, rotating key...")
|
|
api_key_manager.mark_quota_exceeded()
|
|
|
|
# Retry with remaining keys until one works or all exhausted
|
|
retry_succeeded = False
|
|
while api_key_manager.has_available_keys():
|
|
try:
|
|
current_key = api_key_manager.get_current_key()
|
|
print(f" Retrying with {api_key_manager.get_current_key_name()}...")
|
|
result = process_entry(entry_path, current_key, args.dry_run, args.force)
|
|
if result:
|
|
success_count += 1
|
|
else:
|
|
skip_count += 1
|
|
retry_succeeded = True
|
|
break # Success, move to next entry
|
|
except Exception as retry_e:
|
|
retry_str = str(retry_e)
|
|
if "quotaExceeded" in retry_str or "403" in retry_str:
|
|
print(f" ⚠️ Key also exhausted, rotating...")
|
|
api_key_manager.mark_quota_exceeded()
|
|
else:
|
|
print(f" ERROR on retry: {retry_e}")
|
|
error_count += 1
|
|
retry_succeeded = True # Non-quota error, don't retry more
|
|
break
|
|
|
|
if not api_key_manager.has_available_keys() and not retry_succeeded:
|
|
print(f" No more keys available, stopping.")
|
|
break
|
|
else:
|
|
print(f" ERROR: {e}")
|
|
error_count += 1
|
|
|
|
# Rate limiting
|
|
import time
|
|
time.sleep(REQUEST_DELAY)
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("Summary")
|
|
print("=" * 60)
|
|
print(f"Entries processed: {len(entries)}")
|
|
print(f"Successfully enriched: {success_count}")
|
|
print(f"Skipped (no YouTube / already done): {skip_count}")
|
|
print(f"Errors: {error_count}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|