import json import base64 import binascii import html import http.cookiejar import inspect import logging import os import re import time import requests from curl_cffi import requests as curl_requests from pathlib import Path from typing import Callable, List, Tuple from src.utils.config import settings logger = logging.getLogger(__name__) _FAST_FAIL_SSL_MARKERS = ( "UNEXPECTED_EOF_WHILE_READING", "SSLEOFError", "EOF occurred in violation of protocol", "TLS", "TLS connect error", "invalid library", ) def _is_fast_fail_ssl_error(exc: Exception) -> bool: error_text = str(exc) return any(marker in error_text for marker in _FAST_FAIL_SSL_MARKERS) class TranscriptProviderError(RuntimeError): """Raised when a transcript provider cannot return usable transcript text.""" class YouTubeDownloader: def __init__(self): self._assemblyai_key = os.environ.get("ASSEMBLYAI_API_KEY", "").strip() self._supadata_key = os.environ.get("SUPADATA_API_KEY", "").strip() self._rapidapi_key = os.environ.get("RAPIDAPI_KEY", "").strip() self._rapidapi_host = os.environ.get("RAPIDAPI_HOST", "youtube-transcriptor.p.rapidapi.com").strip() self._youtube_cookies = os.environ.get("YOUTUBE_COOKIES", "").strip() self._youtube_cookies_b64 = os.environ.get("YOUTUBE_COOKIES_B64", "").strip() self._youtube_cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH", "").strip() self._youtube_cookies_file = os.environ.get("YOUTUBE_COOKIES_FILE", "").strip() self._youtube_po_token = os.environ.get("YOUTUBE_PO_TOKEN", "").strip() self._youtube_po_token_client = os.environ.get("YOUTUBE_PO_TOKEN_CLIENT", "web").strip() self._youtube_po_token_context = os.environ.get("YOUTUBE_PO_TOKEN_CONTEXT", "gvs").strip() self._proxy_url = ( os.environ.get("PROXY_URL", "").strip() or os.environ.get("YOUTUBE_PROXY", "").strip() ) self._invidious_instances = self._load_invidious_instances() self._strategy = settings.youtube_transcript_strategy self._configure_proxy_environment() if self._strategy == "cookies_required": logger.info("Transcript strategy 'cookies_required' enabled.") def get_transcript(self, url: str) -> str: video_id = self._extract_video_id(url) logger.info("Transcript pipeline for video ID %s using strategy=%s", video_id, self._strategy) failures: List[str] = [] providers = self._build_provider_plan() for index, (provider_name, provider) in enumerate(providers, start=1): try: logger.info("Trying transcript strategy: %s", provider_name) transcript = provider(video_id) if transcript: return transcript raise TranscriptProviderError(f"{provider_name} returned empty transcript text.") except Exception as exc: failures.append(f"{provider_name}: {exc}") has_more_fallbacks = index < len(providers) if has_more_fallbacks: logger.info("%s transcript provider unavailable, trying next fallback.", provider_name) else: logger.error("All transcript providers failed for %s.", video_id) raise RuntimeError( f"All transcript strategies exhausted for {video_id}. " f"Failures: {' | '.join(failures)}" ) def _build_provider_plan(self) -> List[Tuple[str, Callable[[str], str]]]: return [ ("RapidAPI", self._get_transcript_via_rapidapi), ("RapidAPI-v2", self._get_transcript_via_rapidapi_v2), ("Supadata", self._get_transcript_via_supadata), ("YouTube Transcript API", self._get_transcript_via_youtube), ("yt-dlp", self._get_transcript_via_ytdlp), ("pytubefix captions", self._get_transcript_via_pytubefix), ] def _get_transcript_via_youtube(self, video_id: str) -> str: last_error: Exception | None = None languages = ["en", "ar", "en-US"] cookie_file = self._resolve_cookie_file() proxies = self._requests_proxies() for attempt in range(3): try: from youtube_transcript_api import YouTubeTranscriptApi if hasattr(YouTubeTranscriptApi, "get_transcript"): kwargs = {"languages": languages} if cookie_file: kwargs["cookies"] = str(cookie_file) if proxies: kwargs["proxies"] = proxies data = YouTubeTranscriptApi.get_transcript(video_id, **kwargs) logger.info("YouTube Transcript API get_transcript succeeded on attempt %s", attempt + 1) return self._join_transcript_entries(data) session = self._build_requests_session(cookie_file) api = YouTubeTranscriptApi(http_client=session) if hasattr(api, "fetch"): data = api.fetch(video_id, languages=languages) logger.info("YouTube Transcript API fetch succeeded on attempt %s", attempt + 1) return self._join_transcript_entries(data) list_kwargs = {} if cookie_file: list_kwargs["cookies"] = str(cookie_file) if proxies: list_kwargs["proxies"] = proxies transcript_list = YouTubeTranscriptApi.list_transcripts(video_id, **list_kwargs) try: transcript = transcript_list.find_manually_created_transcript(["en", "ar", "en-US"]) except Exception: try: transcript = transcript_list.find_generated_transcript(["en", "ar", "en-US"]) except Exception: transcript = next(iter(transcript_list)) entries = transcript.fetch() logger.info("YouTube Transcript API succeeded on attempt %s", attempt + 1) return self._join_transcript_entries(entries) except Exception as exc: last_error = exc if _is_fast_fail_ssl_error(exc): break if attempt < 2: time.sleep(1.5 * (attempt + 1)) raise TranscriptProviderError(f"YouTube Transcript API failed: {last_error}") from last_error def _get_transcript_via_rapidapi(self, video_id: str) -> str: import httpx if not self._rapidapi_key: raise TranscriptProviderError("RapidAPI key not configured.") url = f"https://{self._rapidapi_host}/transcript" headers = { "x-rapidapi-key": self._rapidapi_key, "x-rapidapi-host": self._rapidapi_host, } params = { "video_id": video_id, "lang": "en", } try: with httpx.Client(timeout=30) as client: response = client.get(url, headers=headers, params=params) response.raise_for_status() data = response.json() except Exception as exc: raise TranscriptProviderError(f"RapidAPI request failed: {exc}") from exc if isinstance(data, dict) and "error" in data: raise ValueError(f"RapidAPI: {data['error']}") if isinstance(data, list) and len(data) > 0: item = data[0] if isinstance(item, dict): # 1. FIRST try data[0]["transcriptionAsText"] full_text = item.get("transcriptionAsText", "") if full_text and str(full_text).strip(): logger.info("RapidAPI transcript (full text) fetched successfully (%d chars).", len(full_text.strip())) return full_text.strip() # 2. FALLBACK to joining data[0]["transcription"][n]["subtitle"] transcription_list = item.get("transcription", []) if isinstance(transcription_list, list) and len(transcription_list) > 0: transcript = " ".join( str(seg.get("subtitle", "")) for seg in transcription_list if isinstance(seg, dict) ) if transcript.strip(): logger.info("RapidAPI transcript (segments) fetched successfully (%d chars).", len(transcript.strip())) return transcript.strip() # 3. If neither works, log the full raw response at WARNING level and raise logger.warning("RapidAPI raw response: %s", data) raise TranscriptProviderError("RapidAPI response did not contain usable transcript content.") def _get_transcript_via_rapidapi_v2(self, video_id: str) -> str: import httpx if not self._rapidapi_key: raise TranscriptProviderError("RapidAPI key not configured.") url = "https://youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com/transcribe" headers = { "x-rapidapi-key": self._rapidapi_key, "x-rapidapi-host": "youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com", "Content-Type": "application/json", } payload = { "url": f"https://www.youtube.com/watch?v={video_id}" } try: with httpx.Client(timeout=60) as client: response = client.post(url, headers=headers, json=payload) response.raise_for_status() data = response.json() except Exception as exc: raise TranscriptProviderError(f"RapidAPI-v2 request failed: {exc}") from exc # Log raw response for debugging logger.info( f"[RapidAPI-v2 DEBUG] status={response.status_code} " f"preview={str(data)[:300]}" ) # Handle error responses if isinstance(data, dict) and "error" in data: raise ValueError(f"RapidAPI-v2: {data['error']}") # Response shape is typically: # {"transcript": "full text..."} # OR {"segments": [{"text": "...", "start": 0.0}, ...]} # OR {"content": "full text..."} if isinstance(data, dict): for key in ("transcript", "content", "text", "result"): if data.get(key) and isinstance(data[key], str) and data[key].strip(): return data[key].strip() # Fallback: join segments array if present for key in ("segments", "transcription", "words"): if isinstance(data.get(key), list): joined = " ".join( seg.get("text", "") for seg in data[key] if isinstance(seg, dict) ).strip() if joined: return joined raise TranscriptProviderError("RapidAPI-v2 response did not contain usable transcript content.") def _get_transcript_via_supadata(self, video_id: str) -> str: if not self._supadata_key: raise TranscriptProviderError("Supadata API key not configured.") clean_url = f"https://www.youtube.com/watch?v={video_id}" headers = { "x-api-key": self._supadata_key, "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ), } try: resp = curl_requests.get( f"https://api.supadata.ai/v1/youtube/transcript?url={clean_url}&text=true", headers=headers, impersonate="chrome124", timeout=30, proxies=self._requests_proxies() or None, ) resp.raise_for_status() data = resp.json() except Exception as exc: raise TranscriptProviderError(f"Supadata request failed: {exc}") from exc # Handle both "content" (plain text) and "segments" (structured list) text = "" content_val = data.get("content") if isinstance(content_val, str) and content_val.strip(): text = content_val.strip() elif isinstance(content_val, list): # If content is returned as a list of segments instead of text text = " ".join( s.get("text", "") for s in content_val if isinstance(s, dict) ).strip() # Fallback to "segments" key if content is empty if not text: segments = data.get("segments", []) if segments and isinstance(segments, list): text = " ".join( s.get("text", "") for s in segments if isinstance(s, dict) ).strip() if not text: raise TranscriptProviderError("Supadata response did not include usable transcript content.") logger.info("Supadata transcript fetched successfully (%d chars).", len(text)) return text def _get_transcript_via_pytubefix(self, video_id: str) -> str: url = f"https://www.youtube.com/watch?v={video_id}" try: from pytubefix import YouTube except Exception as exc: raise TranscriptProviderError(f"pytubefix import failed: {exc}") from exc try: init_kwargs = self._pytubefix_init_kwargs(YouTube) yt = YouTube(url, **init_kwargs) captions = getattr(yt, "captions", None) if not captions: raise TranscriptProviderError("pytubefix returned no captions.") caption = self._select_pytubefix_caption(captions) if caption is None: raise TranscriptProviderError("pytubefix found no preferred caption track.") text = self._caption_to_text(caption) if not text: raise TranscriptProviderError("pytubefix caption track was empty.") logger.info("pytubefix captions fetched successfully (%d chars).", len(text)) return text except TranscriptProviderError: raise except Exception as exc: raise TranscriptProviderError(f"pytubefix captions failed: {exc}") from exc def _get_transcript_via_ytdlp(self, video_id: str) -> str: """ Final fallback: uses yt-dlp which is most robust and supports POT tokens. """ import yt_dlp url = f"https://www.youtube.com/watch?v={video_id}" # Configure yt-dlp to be quiet and only fetch metadata/subs ydl_opts = { 'skip_download': True, 'writesubtitles': True, 'writeautomaticsubs': True, 'subtitleslangs': ['en', 'ar', 'en-US'], 'quiet': True, 'no_warnings': True, 'extract_flat': False, } self._apply_youtube_network_options(ydl_opts) self._apply_cookie_options(ydl_opts) try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) subtitles = info.get('subtitles') or {} auto_subs = info.get('automatic_captions') or {} # Preferred languages in order for lang in ['en', 'en-US', 'ar']: # Try manual subs first, then auto for source in [subtitles, auto_subs]: if lang in source: # Find a format we can parse (json3 is easiest, then vtt) formats = source[lang] # Try to find json3 json3_url = next((f['url'] for f in formats if f.get('ext') == 'json3'), None) if json3_url: resp = curl_requests.get( json3_url, impersonate="chrome124", proxies=self._requests_proxies() or None, ) data = resp.json() return " ".join( seg.get('utf8', '') for event in data.get('events', []) for seg in event.get('segs', []) ).strip() # Fallback to vtt vtt_url = next((f['url'] for f in formats if f.get('ext') == 'vtt'), None) if vtt_url: resp = curl_requests.get( vtt_url, impersonate="chrome124", proxies=self._requests_proxies() or None, ) # Simple VTT parsing (strip tags and timestamps) vtt_text = resp.text lines = [] for line in vtt_text.splitlines(): if '-->' not in line and line.strip() and not line.strip().isdigit() and line != 'WEBVTT': clean = re.sub(r'<[^>]+>', '', line).strip() if clean: lines.append(clean) return " ".join(lines).strip() raise TranscriptProviderError("No usable subtitle formats found via yt-dlp.") except Exception as exc: raise TranscriptProviderError(f"yt-dlp failed: {exc}") from exc finally: cookiefile = ydl_opts.get("cookiefile") if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()): try: os.remove(cookiefile) except OSError: pass def download_audio(self, url: str, output_stem: str) -> Path: """ Download the best available audio stream for Whisper deep-scan fallback. """ import yt_dlp settings.temp_dir.mkdir(parents=True, exist_ok=True) safe_stem = re.sub(r"[^A-Za-z0-9_-]+", "_", output_stem).strip("_") or "audio" output_template = str(settings.temp_dir / f"{safe_stem}.%(ext)s") expected_audio_path = settings.temp_dir / f"{safe_stem}.mp3" ydl_opts = { "format": "bestaudio/best", "outtmpl": output_template, "quiet": True, "no_warnings": True, "noplaylist": True, "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "128", } ], } self._apply_youtube_network_options(ydl_opts) self._apply_cookie_options(ydl_opts) failures: List[str] = [] try: for provider_name, provider in self._build_audio_download_plan(ydl_opts): try: provider(url, safe_stem) break except Exception as exc: failures.append(f"{provider_name}: {exc}") logger.warning("%s audio extraction failed: %s", provider_name, exc) else: auth_hint = "" if self._looks_like_youtube_auth_block(failures) and not self._has_youtube_auth(): auth_hint = ( " YouTube authentication is required for this video/Space. " "Set YOUTUBE_COOKIES_B64 (recommended) or YOUTUBE_COOKIES, " "and optionally YOUTUBE_PO_TOKEN." ) raise RuntimeError(f"Audio extraction failed.{auth_hint} {' | '.join(failures)}") if expected_audio_path.exists() and expected_audio_path.stat().st_size > 0: logger.info("Audio extracted for deep scan: %s", expected_audio_path) return expected_audio_path matches = sorted(settings.temp_dir.glob(f"{safe_stem}.*")) for candidate in matches: if candidate.is_file() and candidate.stat().st_size > 0: logger.info("Audio extracted for deep scan: %s", candidate) return candidate raise RuntimeError("Audio extraction completed but no audio file was produced.") finally: cookiefile = ydl_opts.get("cookiefile") if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()): try: os.remove(cookiefile) except OSError: pass def _build_audio_download_plan(self, ydl_opts: dict) -> List[Tuple[str, Callable[[str, str], None]]]: return [ ("yt-dlp", lambda url, _safe_stem: self._download_audio_via_ytdlp(url, ydl_opts)), ("Invidious proxy", self._download_audio_via_invidious), ("pytubefix", self._download_audio_via_pytubefix), ] def _download_audio_via_ytdlp(self, url: str, ydl_opts: dict) -> None: import yt_dlp with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.extract_info(url, download=True) def _download_audio_via_pytubefix(self, url: str, safe_stem: str) -> None: from pytubefix import YouTube try: yt = YouTube(url, **self._pytubefix_init_kwargs(YouTube)) stream = yt.streams.filter(only_audio=True).order_by("abr").desc().first() if stream is None: raise RuntimeError("No audio stream returned by pytubefix.") stream.download( output_path=str(settings.temp_dir), filename=f"{safe_stem}.{stream.subtype or 'mp4'}", ) except Exception as exc: raise RuntimeError(f"pytubefix failed: {exc}") from exc def _download_audio_via_invidious(self, url: str, safe_stem: str) -> None: video_id = self._extract_video_id(url) if not video_id or video_id == "unknown": raise RuntimeError("Could not extract video ID for Invidious fallback.") failures: List[str] = [] for instance in self._invidious_instances: instance = instance.rstrip("/") try: api_url = f"{instance}/api/v1/videos/{video_id}" resp = requests.get( api_url, headers=self._browser_headers(), proxies=self._requests_proxies() or None, timeout=20, ) resp.raise_for_status() data = resp.json() audio_formats = self._extract_invidious_audio_formats(data) if not audio_formats: raise RuntimeError("No audio formats in Invidious response.") selected = audio_formats[0] itag = selected.get("itag") download_url = ( f"{instance}/latest_version?id={video_id}&itag={itag}&local=true" if itag else selected.get("url", "") ) if not download_url: raise RuntimeError("No downloadable audio URL in Invidious response.") extension = self._extension_from_mime(selected.get("type", "audio/webm")) output_path = settings.temp_dir / f"{safe_stem}.{extension}" self._stream_download(download_url, output_path) logger.info("Invidious audio extracted via %s: %s", instance, output_path) return except Exception as exc: failures.append(f"{instance}: {exc}") logger.warning("Invidious instance failed for audio extraction: %s", exc) raise RuntimeError("All Invidious instances failed. " + " | ".join(failures)) def _extract_invidious_audio_formats(self, data: dict) -> List[dict]: formats = data.get("adaptiveFormats") or data.get("formatStreams") or [] audio_formats = [ item for item in formats if isinstance(item, dict) and str(item.get("type", "")).startswith("audio/") and (item.get("itag") or item.get("url")) ] return sorted( audio_formats, key=lambda item: int(item.get("bitrate") or item.get("bitrateBps") or 0), reverse=True, ) def _stream_download(self, url: str, output_path: Path) -> None: with requests.get( url, headers=self._browser_headers(), proxies=self._requests_proxies() or None, stream=True, timeout=60, ) as resp: resp.raise_for_status() content_type = resp.headers.get("Content-Type", "").lower() if "text/html" in content_type or "application/json" in content_type: raise RuntimeError(f"Unexpected audio response type: {content_type}") with output_path.open("wb") as audio_file: for chunk in resp.iter_content(chunk_size=1024 * 1024): if chunk: audio_file.write(chunk) if not output_path.exists() or output_path.stat().st_size == 0: raise RuntimeError("Downloaded audio file is empty.") def _extension_from_mime(self, mime_type: str) -> str: if "mp4" in mime_type or "m4a" in mime_type: return "m4a" if "mpeg" in mime_type or "mp3" in mime_type: return "mp3" if "ogg" in mime_type: return "ogg" return "webm" def _join_transcript_entries(self, entries) -> str: texts = [] for entry in entries: if isinstance(entry, dict): text = entry.get("text", "") else: text = getattr(entry, "text", "") if text: texts.append(str(text)) return " ".join(texts).strip() def _build_requests_session(self, cookie_file: Path | None = None) -> requests.Session: session = requests.Session() session.headers.update(self._browser_headers()) proxies = self._requests_proxies() if proxies: session.proxies.update(proxies) if cookie_file: try: cookie_jar = http.cookiejar.MozillaCookieJar(str(cookie_file)) cookie_jar.load(ignore_discard=True, ignore_expires=True) session.cookies.update(cookie_jar) except Exception as exc: logger.warning("Could not load YouTube cookies from %s: %s", cookie_file, exc) return session def _pytubefix_init_kwargs(self, youtube_cls) -> dict: kwargs = {} try: params = inspect.signature(youtube_cls).parameters except (TypeError, ValueError): params = {} if "use_oauth" in params: kwargs["use_oauth"] = False if "allow_oauth_cache" in params: kwargs["allow_oauth_cache"] = True if self._proxy_url: if "proxies" in params: kwargs["proxies"] = self._requests_proxies() elif "proxy" in params: kwargs["proxy"] = self._proxy_url return kwargs def _select_pytubefix_caption(self, captions): preferred_codes = ["en", "a.en", "en-US", "a.en-US", "ar", "a.ar"] for code in preferred_codes: try: return captions[code] except Exception: pass getter = getattr(captions, "get_by_language_code", None) if callable(getter): try: caption = getter(code) if caption is not None: return caption except Exception: pass try: for caption in captions: if not isinstance(caption, str): return caption try: return captions[caption] except Exception: pass except Exception: return None return None def _caption_to_text(self, caption) -> str: srt_method = getattr(caption, "generate_srt_captions", None) if callable(srt_method): return self._strip_srt(srt_method()) for attr_name in ("xml_captions", "xml_caption", "caption_xml"): value = getattr(caption, attr_name, None) if value: return self._strip_markup(str(value)) json_value = getattr(caption, "json_captions", None) if json_value: try: data = json.loads(json_value) if isinstance(json_value, str) else json_value return self._join_caption_json(data) except Exception: pass return self._strip_markup(str(caption)) def _strip_srt(self, srt_text: str) -> str: lines = [] for line in srt_text.splitlines(): stripped = line.strip() if not stripped or stripped.isdigit() or "-->" in stripped: continue lines.append(stripped) return " ".join(lines).strip() def _strip_markup(self, value: str) -> str: no_tags = re.sub(r"<[^>]+>", " ", value) return re.sub(r"\s+", " ", html.unescape(no_tags)).strip() def _join_caption_json(self, data) -> str: texts = [] for event in data.get("events", []) if isinstance(data, dict) else []: for segment in event.get("segs", []) or []: text = segment.get("utf8", "") if text: texts.append(text) return " ".join(texts).strip() def _apply_cookie_options(self, ydl_opts: dict) -> None: cookie_b64 = os.getenv("YOUTUBE_COOKIES_B64") if cookie_b64: import tempfile, base64 try: cookie_bytes = base64.b64decode(cookie_b64) with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f: f.write(cookie_bytes) cookie_path = f.name ydl_opts["cookiefile"] = cookie_path except Exception as exc: logger.warning("Failed to decode YOUTUBE_COOKIES_B64: %s", exc) else: cookie_file = self._resolve_cookie_file() if cookie_file: ydl_opts["cookiefile"] = str(cookie_file) def _apply_youtube_network_options(self, ydl_opts: dict) -> None: youtube_args = { "player_client": ["android", "web_safari", "tv"], } po_tokens = self._build_po_token_args() if po_tokens: youtube_args["po_token"] = po_tokens ydl_opts.update( { "source_address": "0.0.0.0", "socket_timeout": 30, "retries": 5, "fragment_retries": 5, "geo_bypass": True, "http_headers": self._browser_headers(), "extractor_args": { "youtube": youtube_args, }, } ) if self._proxy_url: ydl_opts["proxy"] = self._proxy_url def _browser_headers(self) -> dict: return { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept-Language": "en-US,en;q=0.9", } def _requests_proxies(self) -> dict: if not self._proxy_url: return {} return { "http": self._proxy_url, "https": self._proxy_url, } def _configure_proxy_environment(self) -> None: if not self._proxy_url: return for key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"): os.environ.setdefault(key, self._proxy_url) def _load_invidious_instances(self) -> List[str]: configured = os.environ.get("INVIDIOUS_INSTANCES", "").strip() if configured: return [ item.strip().rstrip("/") for item in configured.split(",") if item.strip() ] return [ "https://yewtu.be", "https://inv.nadeko.net", "https://invidious.privacyredirect.com", "https://vid.puffyan.us", ] def _build_po_token_args(self) -> List[str]: if not self._youtube_po_token: return [] raw_tokens = [ token.strip() for token in re.split(r"[\n,]+", self._youtube_po_token) if token.strip() ] if not raw_tokens: return [] po_tokens = [] for token in raw_tokens: if "+" in token: po_tokens.append(token) else: client = self._youtube_po_token_client or "web" context = self._youtube_po_token_context or "gvs" po_tokens.append(f"{client}.{context}+{token}") return po_tokens def _has_youtube_auth(self) -> bool: return bool( self._youtube_cookies or self._youtube_cookies_b64 or self._youtube_cookies_path or self._youtube_cookies_file or self._youtube_po_token or self._proxy_url ) def _looks_like_youtube_auth_block(self, failures: List[str]) -> bool: combined = " ".join(failures).lower() return any( marker in combined for marker in ( "sign in to confirm", "detected as a bot", "po_token", "bot", "forbidden", "403", ) ) def _resolve_cookie_file(self) -> Path | None: if self._youtube_cookies_path: cookie_path = Path(self._youtube_cookies_path) if cookie_path.exists(): return cookie_path logger.warning("YOUTUBE_COOKIES_PATH is set but does not exist: %s", cookie_path) if self._youtube_cookies_file: cookie_path = Path(self._youtube_cookies_file) if cookie_path.exists(): return cookie_path logger.warning("YOUTUBE_COOKIES_FILE is set but does not exist: %s", cookie_path) cookie_text = self._youtube_cookies if not cookie_text and self._youtube_cookies_b64: try: cookie_text = base64.b64decode(self._youtube_cookies_b64).decode("utf-8") except (binascii.Error, UnicodeDecodeError) as exc: logger.warning("YOUTUBE_COOKIES_B64 could not be decoded: %s", exc) return None if not cookie_text: return None settings.temp_dir.mkdir(parents=True, exist_ok=True) cookie_path = settings.temp_dir / "youtube_cookies.txt" cookie_text = cookie_text.replace("\\n", "\n") if not cookie_text.endswith("\n"): cookie_text += "\n" cookie_path.write_text(cookie_text, encoding="utf-8") return cookie_path def cleanup(self, path=None): if path is None: return try: audio_path = Path(path) if audio_path.exists() and audio_path.is_file(): audio_path.unlink() except Exception as exc: logger.warning("Failed to clean up temporary audio file %s: %s", path, exc) def _extract_video_id(self, url: str) -> str: match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url)) return match.group(1) if match else "unknown"