AIdea-Server / src /api /downloader.py
Ahmed Mostafa
fix RapidAPI v1.2
3d4b8b5
import json
import base64
import binascii
import html
import http.cookiejar
import inspect
import logging
import os
import re
import time
import requests
from curl_cffi import requests as curl_requests
from pathlib import Path
from typing import Callable, List, Tuple
from src.utils.config import settings
logger = logging.getLogger(__name__)
_FAST_FAIL_SSL_MARKERS = (
"UNEXPECTED_EOF_WHILE_READING",
"SSLEOFError",
"EOF occurred in violation of protocol",
"TLS",
"TLS connect error",
"invalid library",
)
def _is_fast_fail_ssl_error(exc: Exception) -> bool:
error_text = str(exc)
return any(marker in error_text for marker in _FAST_FAIL_SSL_MARKERS)
class TranscriptProviderError(RuntimeError):
"""Raised when a transcript provider cannot return usable transcript text."""
class YouTubeDownloader:
def __init__(self):
self._assemblyai_key = os.environ.get("ASSEMBLYAI_API_KEY", "").strip()
self._supadata_key = os.environ.get("SUPADATA_API_KEY", "").strip()
self._rapidapi_key = os.environ.get("RAPIDAPI_KEY", "").strip()
self._rapidapi_host = os.environ.get("RAPIDAPI_HOST", "youtube-transcriptor.p.rapidapi.com").strip()
self._youtube_cookies = os.environ.get("YOUTUBE_COOKIES", "").strip()
self._youtube_cookies_b64 = os.environ.get("YOUTUBE_COOKIES_B64", "").strip()
self._youtube_cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH", "").strip()
self._youtube_cookies_file = os.environ.get("YOUTUBE_COOKIES_FILE", "").strip()
self._youtube_po_token = os.environ.get("YOUTUBE_PO_TOKEN", "").strip()
self._youtube_po_token_client = os.environ.get("YOUTUBE_PO_TOKEN_CLIENT", "web").strip()
self._youtube_po_token_context = os.environ.get("YOUTUBE_PO_TOKEN_CONTEXT", "gvs").strip()
self._proxy_url = (
os.environ.get("PROXY_URL", "").strip()
or os.environ.get("YOUTUBE_PROXY", "").strip()
)
self._invidious_instances = self._load_invidious_instances()
self._strategy = settings.youtube_transcript_strategy
self._configure_proxy_environment()
if self._strategy == "cookies_required":
logger.info("Transcript strategy 'cookies_required' enabled.")
def get_transcript(self, url: str) -> str:
video_id = self._extract_video_id(url)
logger.info("Transcript pipeline for video ID %s using strategy=%s", video_id, self._strategy)
failures: List[str] = []
providers = self._build_provider_plan()
for index, (provider_name, provider) in enumerate(providers, start=1):
try:
logger.info("Trying transcript strategy: %s", provider_name)
transcript = provider(video_id)
if transcript:
return transcript
raise TranscriptProviderError(f"{provider_name} returned empty transcript text.")
except Exception as exc:
failures.append(f"{provider_name}: {exc}")
has_more_fallbacks = index < len(providers)
if has_more_fallbacks:
logger.info("%s transcript provider unavailable, trying next fallback.", provider_name)
else:
logger.error("All transcript providers failed for %s.", video_id)
raise RuntimeError(
f"All transcript strategies exhausted for {video_id}. "
f"Failures: {' | '.join(failures)}"
)
def _build_provider_plan(self) -> List[Tuple[str, Callable[[str], str]]]:
return [
("RapidAPI", self._get_transcript_via_rapidapi),
("RapidAPI-v2", self._get_transcript_via_rapidapi_v2),
("Supadata", self._get_transcript_via_supadata),
("YouTube Transcript API", self._get_transcript_via_youtube),
("yt-dlp", self._get_transcript_via_ytdlp),
("pytubefix captions", self._get_transcript_via_pytubefix),
]
def _get_transcript_via_youtube(self, video_id: str) -> str:
last_error: Exception | None = None
languages = ["en", "ar", "en-US"]
cookie_file = self._resolve_cookie_file()
proxies = self._requests_proxies()
for attempt in range(3):
try:
from youtube_transcript_api import YouTubeTranscriptApi
if hasattr(YouTubeTranscriptApi, "get_transcript"):
kwargs = {"languages": languages}
if cookie_file:
kwargs["cookies"] = str(cookie_file)
if proxies:
kwargs["proxies"] = proxies
data = YouTubeTranscriptApi.get_transcript(video_id, **kwargs)
logger.info("YouTube Transcript API get_transcript succeeded on attempt %s", attempt + 1)
return self._join_transcript_entries(data)
session = self._build_requests_session(cookie_file)
api = YouTubeTranscriptApi(http_client=session)
if hasattr(api, "fetch"):
data = api.fetch(video_id, languages=languages)
logger.info("YouTube Transcript API fetch succeeded on attempt %s", attempt + 1)
return self._join_transcript_entries(data)
list_kwargs = {}
if cookie_file:
list_kwargs["cookies"] = str(cookie_file)
if proxies:
list_kwargs["proxies"] = proxies
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id, **list_kwargs)
try:
transcript = transcript_list.find_manually_created_transcript(["en", "ar", "en-US"])
except Exception:
try:
transcript = transcript_list.find_generated_transcript(["en", "ar", "en-US"])
except Exception:
transcript = next(iter(transcript_list))
entries = transcript.fetch()
logger.info("YouTube Transcript API succeeded on attempt %s", attempt + 1)
return self._join_transcript_entries(entries)
except Exception as exc:
last_error = exc
if _is_fast_fail_ssl_error(exc):
break
if attempt < 2:
time.sleep(1.5 * (attempt + 1))
raise TranscriptProviderError(f"YouTube Transcript API failed: {last_error}") from last_error
def _get_transcript_via_rapidapi(self, video_id: str) -> str:
import httpx
if not self._rapidapi_key:
raise TranscriptProviderError("RapidAPI key not configured.")
url = f"https://{self._rapidapi_host}/transcript"
headers = {
"x-rapidapi-key": self._rapidapi_key,
"x-rapidapi-host": self._rapidapi_host,
}
params = {
"video_id": video_id,
"lang": "en",
}
try:
with httpx.Client(timeout=30) as client:
response = client.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
except Exception as exc:
raise TranscriptProviderError(f"RapidAPI request failed: {exc}") from exc
if isinstance(data, dict) and "error" in data:
raise ValueError(f"RapidAPI: {data['error']}")
if isinstance(data, list) and len(data) > 0:
item = data[0]
if isinstance(item, dict):
# 1. FIRST try data[0]["transcriptionAsText"]
full_text = item.get("transcriptionAsText", "")
if full_text and str(full_text).strip():
logger.info("RapidAPI transcript (full text) fetched successfully (%d chars).", len(full_text.strip()))
return full_text.strip()
# 2. FALLBACK to joining data[0]["transcription"][n]["subtitle"]
transcription_list = item.get("transcription", [])
if isinstance(transcription_list, list) and len(transcription_list) > 0:
transcript = " ".join(
str(seg.get("subtitle", "")) for seg in transcription_list if isinstance(seg, dict)
)
if transcript.strip():
logger.info("RapidAPI transcript (segments) fetched successfully (%d chars).", len(transcript.strip()))
return transcript.strip()
# 3. If neither works, log the full raw response at WARNING level and raise
logger.warning("RapidAPI raw response: %s", data)
raise TranscriptProviderError("RapidAPI response did not contain usable transcript content.")
def _get_transcript_via_rapidapi_v2(self, video_id: str) -> str:
import httpx
if not self._rapidapi_key:
raise TranscriptProviderError("RapidAPI key not configured.")
url = "https://youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com/transcribe"
headers = {
"x-rapidapi-key": self._rapidapi_key,
"x-rapidapi-host": "youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com",
"Content-Type": "application/json",
}
payload = {
"url": f"https://www.youtube.com/watch?v={video_id}"
}
try:
with httpx.Client(timeout=60) as client:
response = client.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
except Exception as exc:
raise TranscriptProviderError(f"RapidAPI-v2 request failed: {exc}") from exc
# Log raw response for debugging
logger.info(
f"[RapidAPI-v2 DEBUG] status={response.status_code} "
f"preview={str(data)[:300]}"
)
# Handle error responses
if isinstance(data, dict) and "error" in data:
raise ValueError(f"RapidAPI-v2: {data['error']}")
# Response shape is typically:
# {"transcript": "full text..."}
# OR {"segments": [{"text": "...", "start": 0.0}, ...]}
# OR {"content": "full text..."}
if isinstance(data, dict):
for key in ("transcript", "content", "text", "result"):
if data.get(key) and isinstance(data[key], str) and data[key].strip():
return data[key].strip()
# Fallback: join segments array if present
for key in ("segments", "transcription", "words"):
if isinstance(data.get(key), list):
joined = " ".join(
seg.get("text", "") for seg in data[key]
if isinstance(seg, dict)
).strip()
if joined:
return joined
raise TranscriptProviderError("RapidAPI-v2 response did not contain usable transcript content.")
def _get_transcript_via_supadata(self, video_id: str) -> str:
if not self._supadata_key:
raise TranscriptProviderError("Supadata API key not configured.")
clean_url = f"https://www.youtube.com/watch?v={video_id}"
headers = {
"x-api-key": self._supadata_key,
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
}
try:
resp = curl_requests.get(
f"https://api.supadata.ai/v1/youtube/transcript?url={clean_url}&text=true",
headers=headers,
impersonate="chrome124",
timeout=30,
proxies=self._requests_proxies() or None,
)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
raise TranscriptProviderError(f"Supadata request failed: {exc}") from exc
# Handle both "content" (plain text) and "segments" (structured list)
text = ""
content_val = data.get("content")
if isinstance(content_val, str) and content_val.strip():
text = content_val.strip()
elif isinstance(content_val, list):
# If content is returned as a list of segments instead of text
text = " ".join(
s.get("text", "") for s in content_val if isinstance(s, dict)
).strip()
# Fallback to "segments" key if content is empty
if not text:
segments = data.get("segments", [])
if segments and isinstance(segments, list):
text = " ".join(
s.get("text", "") for s in segments if isinstance(s, dict)
).strip()
if not text:
raise TranscriptProviderError("Supadata response did not include usable transcript content.")
logger.info("Supadata transcript fetched successfully (%d chars).", len(text))
return text
def _get_transcript_via_pytubefix(self, video_id: str) -> str:
url = f"https://www.youtube.com/watch?v={video_id}"
try:
from pytubefix import YouTube
except Exception as exc:
raise TranscriptProviderError(f"pytubefix import failed: {exc}") from exc
try:
init_kwargs = self._pytubefix_init_kwargs(YouTube)
yt = YouTube(url, **init_kwargs)
captions = getattr(yt, "captions", None)
if not captions:
raise TranscriptProviderError("pytubefix returned no captions.")
caption = self._select_pytubefix_caption(captions)
if caption is None:
raise TranscriptProviderError("pytubefix found no preferred caption track.")
text = self._caption_to_text(caption)
if not text:
raise TranscriptProviderError("pytubefix caption track was empty.")
logger.info("pytubefix captions fetched successfully (%d chars).", len(text))
return text
except TranscriptProviderError:
raise
except Exception as exc:
raise TranscriptProviderError(f"pytubefix captions failed: {exc}") from exc
def _get_transcript_via_ytdlp(self, video_id: str) -> str:
"""
Final fallback: uses yt-dlp which is most robust and supports POT tokens.
"""
import yt_dlp
url = f"https://www.youtube.com/watch?v={video_id}"
# Configure yt-dlp to be quiet and only fetch metadata/subs
ydl_opts = {
'skip_download': True,
'writesubtitles': True,
'writeautomaticsubs': True,
'subtitleslangs': ['en', 'ar', 'en-US'],
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
self._apply_youtube_network_options(ydl_opts)
self._apply_cookie_options(ydl_opts)
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
subtitles = info.get('subtitles') or {}
auto_subs = info.get('automatic_captions') or {}
# Preferred languages in order
for lang in ['en', 'en-US', 'ar']:
# Try manual subs first, then auto
for source in [subtitles, auto_subs]:
if lang in source:
# Find a format we can parse (json3 is easiest, then vtt)
formats = source[lang]
# Try to find json3
json3_url = next((f['url'] for f in formats if f.get('ext') == 'json3'), None)
if json3_url:
resp = curl_requests.get(
json3_url,
impersonate="chrome124",
proxies=self._requests_proxies() or None,
)
data = resp.json()
return " ".join(
seg.get('utf8', '')
for event in data.get('events', [])
for seg in event.get('segs', [])
).strip()
# Fallback to vtt
vtt_url = next((f['url'] for f in formats if f.get('ext') == 'vtt'), None)
if vtt_url:
resp = curl_requests.get(
vtt_url,
impersonate="chrome124",
proxies=self._requests_proxies() or None,
)
# Simple VTT parsing (strip tags and timestamps)
vtt_text = resp.text
lines = []
for line in vtt_text.splitlines():
if '-->' not in line and line.strip() and not line.strip().isdigit() and line != 'WEBVTT':
clean = re.sub(r'<[^>]+>', '', line).strip()
if clean: lines.append(clean)
return " ".join(lines).strip()
raise TranscriptProviderError("No usable subtitle formats found via yt-dlp.")
except Exception as exc:
raise TranscriptProviderError(f"yt-dlp failed: {exc}") from exc
finally:
cookiefile = ydl_opts.get("cookiefile")
if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()):
try:
os.remove(cookiefile)
except OSError:
pass
def download_audio(self, url: str, output_stem: str) -> Path:
"""
Download the best available audio stream for Whisper deep-scan fallback.
"""
import yt_dlp
settings.temp_dir.mkdir(parents=True, exist_ok=True)
safe_stem = re.sub(r"[^A-Za-z0-9_-]+", "_", output_stem).strip("_") or "audio"
output_template = str(settings.temp_dir / f"{safe_stem}.%(ext)s")
expected_audio_path = settings.temp_dir / f"{safe_stem}.mp3"
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": output_template,
"quiet": True,
"no_warnings": True,
"noplaylist": True,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "128",
}
],
}
self._apply_youtube_network_options(ydl_opts)
self._apply_cookie_options(ydl_opts)
failures: List[str] = []
try:
for provider_name, provider in self._build_audio_download_plan(ydl_opts):
try:
provider(url, safe_stem)
break
except Exception as exc:
failures.append(f"{provider_name}: {exc}")
logger.warning("%s audio extraction failed: %s", provider_name, exc)
else:
auth_hint = ""
if self._looks_like_youtube_auth_block(failures) and not self._has_youtube_auth():
auth_hint = (
" YouTube authentication is required for this video/Space. "
"Set YOUTUBE_COOKIES_B64 (recommended) or YOUTUBE_COOKIES, "
"and optionally YOUTUBE_PO_TOKEN."
)
raise RuntimeError(f"Audio extraction failed.{auth_hint} {' | '.join(failures)}")
if expected_audio_path.exists() and expected_audio_path.stat().st_size > 0:
logger.info("Audio extracted for deep scan: %s", expected_audio_path)
return expected_audio_path
matches = sorted(settings.temp_dir.glob(f"{safe_stem}.*"))
for candidate in matches:
if candidate.is_file() and candidate.stat().st_size > 0:
logger.info("Audio extracted for deep scan: %s", candidate)
return candidate
raise RuntimeError("Audio extraction completed but no audio file was produced.")
finally:
cookiefile = ydl_opts.get("cookiefile")
if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()):
try:
os.remove(cookiefile)
except OSError:
pass
def _build_audio_download_plan(self, ydl_opts: dict) -> List[Tuple[str, Callable[[str, str], None]]]:
return [
("yt-dlp", lambda url, _safe_stem: self._download_audio_via_ytdlp(url, ydl_opts)),
("Invidious proxy", self._download_audio_via_invidious),
("pytubefix", self._download_audio_via_pytubefix),
]
def _download_audio_via_ytdlp(self, url: str, ydl_opts: dict) -> None:
import yt_dlp
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.extract_info(url, download=True)
def _download_audio_via_pytubefix(self, url: str, safe_stem: str) -> None:
from pytubefix import YouTube
try:
yt = YouTube(url, **self._pytubefix_init_kwargs(YouTube))
stream = yt.streams.filter(only_audio=True).order_by("abr").desc().first()
if stream is None:
raise RuntimeError("No audio stream returned by pytubefix.")
stream.download(
output_path=str(settings.temp_dir),
filename=f"{safe_stem}.{stream.subtype or 'mp4'}",
)
except Exception as exc:
raise RuntimeError(f"pytubefix failed: {exc}") from exc
def _download_audio_via_invidious(self, url: str, safe_stem: str) -> None:
video_id = self._extract_video_id(url)
if not video_id or video_id == "unknown":
raise RuntimeError("Could not extract video ID for Invidious fallback.")
failures: List[str] = []
for instance in self._invidious_instances:
instance = instance.rstrip("/")
try:
api_url = f"{instance}/api/v1/videos/{video_id}"
resp = requests.get(
api_url,
headers=self._browser_headers(),
proxies=self._requests_proxies() or None,
timeout=20,
)
resp.raise_for_status()
data = resp.json()
audio_formats = self._extract_invidious_audio_formats(data)
if not audio_formats:
raise RuntimeError("No audio formats in Invidious response.")
selected = audio_formats[0]
itag = selected.get("itag")
download_url = (
f"{instance}/latest_version?id={video_id}&itag={itag}&local=true"
if itag
else selected.get("url", "")
)
if not download_url:
raise RuntimeError("No downloadable audio URL in Invidious response.")
extension = self._extension_from_mime(selected.get("type", "audio/webm"))
output_path = settings.temp_dir / f"{safe_stem}.{extension}"
self._stream_download(download_url, output_path)
logger.info("Invidious audio extracted via %s: %s", instance, output_path)
return
except Exception as exc:
failures.append(f"{instance}: {exc}")
logger.warning("Invidious instance failed for audio extraction: %s", exc)
raise RuntimeError("All Invidious instances failed. " + " | ".join(failures))
def _extract_invidious_audio_formats(self, data: dict) -> List[dict]:
formats = data.get("adaptiveFormats") or data.get("formatStreams") or []
audio_formats = [
item
for item in formats
if isinstance(item, dict)
and str(item.get("type", "")).startswith("audio/")
and (item.get("itag") or item.get("url"))
]
return sorted(
audio_formats,
key=lambda item: int(item.get("bitrate") or item.get("bitrateBps") or 0),
reverse=True,
)
def _stream_download(self, url: str, output_path: Path) -> None:
with requests.get(
url,
headers=self._browser_headers(),
proxies=self._requests_proxies() or None,
stream=True,
timeout=60,
) as resp:
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "").lower()
if "text/html" in content_type or "application/json" in content_type:
raise RuntimeError(f"Unexpected audio response type: {content_type}")
with output_path.open("wb") as audio_file:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if chunk:
audio_file.write(chunk)
if not output_path.exists() or output_path.stat().st_size == 0:
raise RuntimeError("Downloaded audio file is empty.")
def _extension_from_mime(self, mime_type: str) -> str:
if "mp4" in mime_type or "m4a" in mime_type:
return "m4a"
if "mpeg" in mime_type or "mp3" in mime_type:
return "mp3"
if "ogg" in mime_type:
return "ogg"
return "webm"
def _join_transcript_entries(self, entries) -> str:
texts = []
for entry in entries:
if isinstance(entry, dict):
text = entry.get("text", "")
else:
text = getattr(entry, "text", "")
if text:
texts.append(str(text))
return " ".join(texts).strip()
def _build_requests_session(self, cookie_file: Path | None = None) -> requests.Session:
session = requests.Session()
session.headers.update(self._browser_headers())
proxies = self._requests_proxies()
if proxies:
session.proxies.update(proxies)
if cookie_file:
try:
cookie_jar = http.cookiejar.MozillaCookieJar(str(cookie_file))
cookie_jar.load(ignore_discard=True, ignore_expires=True)
session.cookies.update(cookie_jar)
except Exception as exc:
logger.warning("Could not load YouTube cookies from %s: %s", cookie_file, exc)
return session
def _pytubefix_init_kwargs(self, youtube_cls) -> dict:
kwargs = {}
try:
params = inspect.signature(youtube_cls).parameters
except (TypeError, ValueError):
params = {}
if "use_oauth" in params:
kwargs["use_oauth"] = False
if "allow_oauth_cache" in params:
kwargs["allow_oauth_cache"] = True
if self._proxy_url:
if "proxies" in params:
kwargs["proxies"] = self._requests_proxies()
elif "proxy" in params:
kwargs["proxy"] = self._proxy_url
return kwargs
def _select_pytubefix_caption(self, captions):
preferred_codes = ["en", "a.en", "en-US", "a.en-US", "ar", "a.ar"]
for code in preferred_codes:
try:
return captions[code]
except Exception:
pass
getter = getattr(captions, "get_by_language_code", None)
if callable(getter):
try:
caption = getter(code)
if caption is not None:
return caption
except Exception:
pass
try:
for caption in captions:
if not isinstance(caption, str):
return caption
try:
return captions[caption]
except Exception:
pass
except Exception:
return None
return None
def _caption_to_text(self, caption) -> str:
srt_method = getattr(caption, "generate_srt_captions", None)
if callable(srt_method):
return self._strip_srt(srt_method())
for attr_name in ("xml_captions", "xml_caption", "caption_xml"):
value = getattr(caption, attr_name, None)
if value:
return self._strip_markup(str(value))
json_value = getattr(caption, "json_captions", None)
if json_value:
try:
data = json.loads(json_value) if isinstance(json_value, str) else json_value
return self._join_caption_json(data)
except Exception:
pass
return self._strip_markup(str(caption))
def _strip_srt(self, srt_text: str) -> str:
lines = []
for line in srt_text.splitlines():
stripped = line.strip()
if not stripped or stripped.isdigit() or "-->" in stripped:
continue
lines.append(stripped)
return " ".join(lines).strip()
def _strip_markup(self, value: str) -> str:
no_tags = re.sub(r"<[^>]+>", " ", value)
return re.sub(r"\s+", " ", html.unescape(no_tags)).strip()
def _join_caption_json(self, data) -> str:
texts = []
for event in data.get("events", []) if isinstance(data, dict) else []:
for segment in event.get("segs", []) or []:
text = segment.get("utf8", "")
if text:
texts.append(text)
return " ".join(texts).strip()
def _apply_cookie_options(self, ydl_opts: dict) -> None:
cookie_b64 = os.getenv("YOUTUBE_COOKIES_B64")
if cookie_b64:
import tempfile, base64
try:
cookie_bytes = base64.b64decode(cookie_b64)
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
f.write(cookie_bytes)
cookie_path = f.name
ydl_opts["cookiefile"] = cookie_path
except Exception as exc:
logger.warning("Failed to decode YOUTUBE_COOKIES_B64: %s", exc)
else:
cookie_file = self._resolve_cookie_file()
if cookie_file:
ydl_opts["cookiefile"] = str(cookie_file)
def _apply_youtube_network_options(self, ydl_opts: dict) -> None:
youtube_args = {
"player_client": ["android", "web_safari", "tv"],
}
po_tokens = self._build_po_token_args()
if po_tokens:
youtube_args["po_token"] = po_tokens
ydl_opts.update(
{
"source_address": "0.0.0.0",
"socket_timeout": 30,
"retries": 5,
"fragment_retries": 5,
"geo_bypass": True,
"http_headers": self._browser_headers(),
"extractor_args": {
"youtube": youtube_args,
},
}
)
if self._proxy_url:
ydl_opts["proxy"] = self._proxy_url
def _browser_headers(self) -> dict:
return {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
}
def _requests_proxies(self) -> dict:
if not self._proxy_url:
return {}
return {
"http": self._proxy_url,
"https": self._proxy_url,
}
def _configure_proxy_environment(self) -> None:
if not self._proxy_url:
return
for key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"):
os.environ.setdefault(key, self._proxy_url)
def _load_invidious_instances(self) -> List[str]:
configured = os.environ.get("INVIDIOUS_INSTANCES", "").strip()
if configured:
return [
item.strip().rstrip("/")
for item in configured.split(",")
if item.strip()
]
return [
"https://yewtu.be",
"https://inv.nadeko.net",
"https://invidious.privacyredirect.com",
"https://vid.puffyan.us",
]
def _build_po_token_args(self) -> List[str]:
if not self._youtube_po_token:
return []
raw_tokens = [
token.strip()
for token in re.split(r"[\n,]+", self._youtube_po_token)
if token.strip()
]
if not raw_tokens:
return []
po_tokens = []
for token in raw_tokens:
if "+" in token:
po_tokens.append(token)
else:
client = self._youtube_po_token_client or "web"
context = self._youtube_po_token_context or "gvs"
po_tokens.append(f"{client}.{context}+{token}")
return po_tokens
def _has_youtube_auth(self) -> bool:
return bool(
self._youtube_cookies
or self._youtube_cookies_b64
or self._youtube_cookies_path
or self._youtube_cookies_file
or self._youtube_po_token
or self._proxy_url
)
def _looks_like_youtube_auth_block(self, failures: List[str]) -> bool:
combined = " ".join(failures).lower()
return any(
marker in combined
for marker in (
"sign in to confirm",
"detected as a bot",
"po_token",
"bot",
"forbidden",
"403",
)
)
def _resolve_cookie_file(self) -> Path | None:
if self._youtube_cookies_path:
cookie_path = Path(self._youtube_cookies_path)
if cookie_path.exists():
return cookie_path
logger.warning("YOUTUBE_COOKIES_PATH is set but does not exist: %s", cookie_path)
if self._youtube_cookies_file:
cookie_path = Path(self._youtube_cookies_file)
if cookie_path.exists():
return cookie_path
logger.warning("YOUTUBE_COOKIES_FILE is set but does not exist: %s", cookie_path)
cookie_text = self._youtube_cookies
if not cookie_text and self._youtube_cookies_b64:
try:
cookie_text = base64.b64decode(self._youtube_cookies_b64).decode("utf-8")
except (binascii.Error, UnicodeDecodeError) as exc:
logger.warning("YOUTUBE_COOKIES_B64 could not be decoded: %s", exc)
return None
if not cookie_text:
return None
settings.temp_dir.mkdir(parents=True, exist_ok=True)
cookie_path = settings.temp_dir / "youtube_cookies.txt"
cookie_text = cookie_text.replace("\\n", "\n")
if not cookie_text.endswith("\n"):
cookie_text += "\n"
cookie_path.write_text(cookie_text, encoding="utf-8")
return cookie_path
def cleanup(self, path=None):
if path is None:
return
try:
audio_path = Path(path)
if audio_path.exists() and audio_path.is_file():
audio_path.unlink()
except Exception as exc:
logger.warning("Failed to clean up temporary audio file %s: %s", path, exc)
def _extract_video_id(self, url: str) -> str:
match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url))
return match.group(1) if match else "unknown"