Spaces:
Running
Running
| import json | |
| import base64 | |
| import binascii | |
| import html | |
| import http.cookiejar | |
| import inspect | |
| import logging | |
| import os | |
| import re | |
| import time | |
| import requests | |
| from curl_cffi import requests as curl_requests | |
| from pathlib import Path | |
| from typing import Callable, List, Tuple | |
| from src.utils.config import settings | |
| logger = logging.getLogger(__name__) | |
| _FAST_FAIL_SSL_MARKERS = ( | |
| "UNEXPECTED_EOF_WHILE_READING", | |
| "SSLEOFError", | |
| "EOF occurred in violation of protocol", | |
| "TLS", | |
| "TLS connect error", | |
| "invalid library", | |
| ) | |
| def _is_fast_fail_ssl_error(exc: Exception) -> bool: | |
| error_text = str(exc) | |
| return any(marker in error_text for marker in _FAST_FAIL_SSL_MARKERS) | |
| class TranscriptProviderError(RuntimeError): | |
| """Raised when a transcript provider cannot return usable transcript text.""" | |
| class YouTubeDownloader: | |
| def __init__(self): | |
| self._assemblyai_key = os.environ.get("ASSEMBLYAI_API_KEY", "").strip() | |
| self._supadata_key = os.environ.get("SUPADATA_API_KEY", "").strip() | |
| self._rapidapi_key = os.environ.get("RAPIDAPI_KEY", "").strip() | |
| self._rapidapi_host = os.environ.get("RAPIDAPI_HOST", "youtube-transcriptor.p.rapidapi.com").strip() | |
| self._youtube_cookies = os.environ.get("YOUTUBE_COOKIES", "").strip() | |
| self._youtube_cookies_b64 = os.environ.get("YOUTUBE_COOKIES_B64", "").strip() | |
| self._youtube_cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH", "").strip() | |
| self._youtube_cookies_file = os.environ.get("YOUTUBE_COOKIES_FILE", "").strip() | |
| self._youtube_po_token = os.environ.get("YOUTUBE_PO_TOKEN", "").strip() | |
| self._youtube_po_token_client = os.environ.get("YOUTUBE_PO_TOKEN_CLIENT", "web").strip() | |
| self._youtube_po_token_context = os.environ.get("YOUTUBE_PO_TOKEN_CONTEXT", "gvs").strip() | |
| self._proxy_url = ( | |
| os.environ.get("PROXY_URL", "").strip() | |
| or os.environ.get("YOUTUBE_PROXY", "").strip() | |
| ) | |
| self._invidious_instances = self._load_invidious_instances() | |
| self._strategy = settings.youtube_transcript_strategy | |
| self._configure_proxy_environment() | |
| if self._strategy == "cookies_required": | |
| logger.info("Transcript strategy 'cookies_required' enabled.") | |
| def get_transcript(self, url: str) -> str: | |
| video_id = self._extract_video_id(url) | |
| logger.info("Transcript pipeline for video ID %s using strategy=%s", video_id, self._strategy) | |
| failures: List[str] = [] | |
| providers = self._build_provider_plan() | |
| for index, (provider_name, provider) in enumerate(providers, start=1): | |
| try: | |
| logger.info("Trying transcript strategy: %s", provider_name) | |
| transcript = provider(video_id) | |
| if transcript: | |
| return transcript | |
| raise TranscriptProviderError(f"{provider_name} returned empty transcript text.") | |
| except Exception as exc: | |
| failures.append(f"{provider_name}: {exc}") | |
| has_more_fallbacks = index < len(providers) | |
| if has_more_fallbacks: | |
| logger.info("%s transcript provider unavailable, trying next fallback.", provider_name) | |
| else: | |
| logger.error("All transcript providers failed for %s.", video_id) | |
| raise RuntimeError( | |
| f"All transcript strategies exhausted for {video_id}. " | |
| f"Failures: {' | '.join(failures)}" | |
| ) | |
| def _build_provider_plan(self) -> List[Tuple[str, Callable[[str], str]]]: | |
| return [ | |
| ("RapidAPI", self._get_transcript_via_rapidapi), | |
| ("RapidAPI-v2", self._get_transcript_via_rapidapi_v2), | |
| ("Supadata", self._get_transcript_via_supadata), | |
| ("YouTube Transcript API", self._get_transcript_via_youtube), | |
| ("yt-dlp", self._get_transcript_via_ytdlp), | |
| ("pytubefix captions", self._get_transcript_via_pytubefix), | |
| ] | |
| def _get_transcript_via_youtube(self, video_id: str) -> str: | |
| last_error: Exception | None = None | |
| languages = ["en", "ar", "en-US"] | |
| cookie_file = self._resolve_cookie_file() | |
| proxies = self._requests_proxies() | |
| for attempt in range(3): | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| if hasattr(YouTubeTranscriptApi, "get_transcript"): | |
| kwargs = {"languages": languages} | |
| if cookie_file: | |
| kwargs["cookies"] = str(cookie_file) | |
| if proxies: | |
| kwargs["proxies"] = proxies | |
| data = YouTubeTranscriptApi.get_transcript(video_id, **kwargs) | |
| logger.info("YouTube Transcript API get_transcript succeeded on attempt %s", attempt + 1) | |
| return self._join_transcript_entries(data) | |
| session = self._build_requests_session(cookie_file) | |
| api = YouTubeTranscriptApi(http_client=session) | |
| if hasattr(api, "fetch"): | |
| data = api.fetch(video_id, languages=languages) | |
| logger.info("YouTube Transcript API fetch succeeded on attempt %s", attempt + 1) | |
| return self._join_transcript_entries(data) | |
| list_kwargs = {} | |
| if cookie_file: | |
| list_kwargs["cookies"] = str(cookie_file) | |
| if proxies: | |
| list_kwargs["proxies"] = proxies | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id, **list_kwargs) | |
| try: | |
| transcript = transcript_list.find_manually_created_transcript(["en", "ar", "en-US"]) | |
| except Exception: | |
| try: | |
| transcript = transcript_list.find_generated_transcript(["en", "ar", "en-US"]) | |
| except Exception: | |
| transcript = next(iter(transcript_list)) | |
| entries = transcript.fetch() | |
| logger.info("YouTube Transcript API succeeded on attempt %s", attempt + 1) | |
| return self._join_transcript_entries(entries) | |
| except Exception as exc: | |
| last_error = exc | |
| if _is_fast_fail_ssl_error(exc): | |
| break | |
| if attempt < 2: | |
| time.sleep(1.5 * (attempt + 1)) | |
| raise TranscriptProviderError(f"YouTube Transcript API failed: {last_error}") from last_error | |
| def _get_transcript_via_rapidapi(self, video_id: str) -> str: | |
| import httpx | |
| if not self._rapidapi_key: | |
| raise TranscriptProviderError("RapidAPI key not configured.") | |
| url = f"https://{self._rapidapi_host}/transcript" | |
| headers = { | |
| "x-rapidapi-key": self._rapidapi_key, | |
| "x-rapidapi-host": self._rapidapi_host, | |
| } | |
| params = { | |
| "video_id": video_id, | |
| "lang": "en", | |
| } | |
| try: | |
| with httpx.Client(timeout=30) as client: | |
| response = client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| except Exception as exc: | |
| raise TranscriptProviderError(f"RapidAPI request failed: {exc}") from exc | |
| if isinstance(data, dict) and "error" in data: | |
| raise ValueError(f"RapidAPI: {data['error']}") | |
| if isinstance(data, list) and len(data) > 0: | |
| item = data[0] | |
| if isinstance(item, dict): | |
| # 1. FIRST try data[0]["transcriptionAsText"] | |
| full_text = item.get("transcriptionAsText", "") | |
| if full_text and str(full_text).strip(): | |
| logger.info("RapidAPI transcript (full text) fetched successfully (%d chars).", len(full_text.strip())) | |
| return full_text.strip() | |
| # 2. FALLBACK to joining data[0]["transcription"][n]["subtitle"] | |
| transcription_list = item.get("transcription", []) | |
| if isinstance(transcription_list, list) and len(transcription_list) > 0: | |
| transcript = " ".join( | |
| str(seg.get("subtitle", "")) for seg in transcription_list if isinstance(seg, dict) | |
| ) | |
| if transcript.strip(): | |
| logger.info("RapidAPI transcript (segments) fetched successfully (%d chars).", len(transcript.strip())) | |
| return transcript.strip() | |
| # 3. If neither works, log the full raw response at WARNING level and raise | |
| logger.warning("RapidAPI raw response: %s", data) | |
| raise TranscriptProviderError("RapidAPI response did not contain usable transcript content.") | |
| def _get_transcript_via_rapidapi_v2(self, video_id: str) -> str: | |
| import httpx | |
| if not self._rapidapi_key: | |
| raise TranscriptProviderError("RapidAPI key not configured.") | |
| url = "https://youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com/transcribe" | |
| headers = { | |
| "x-rapidapi-key": self._rapidapi_key, | |
| "x-rapidapi-host": "youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "url": f"https://www.youtube.com/watch?v={video_id}" | |
| } | |
| try: | |
| with httpx.Client(timeout=60) as client: | |
| response = client.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| data = response.json() | |
| except Exception as exc: | |
| raise TranscriptProviderError(f"RapidAPI-v2 request failed: {exc}") from exc | |
| # Log raw response for debugging | |
| logger.info( | |
| f"[RapidAPI-v2 DEBUG] status={response.status_code} " | |
| f"preview={str(data)[:300]}" | |
| ) | |
| # Handle error responses | |
| if isinstance(data, dict) and "error" in data: | |
| raise ValueError(f"RapidAPI-v2: {data['error']}") | |
| # Response shape is typically: | |
| # {"transcript": "full text..."} | |
| # OR {"segments": [{"text": "...", "start": 0.0}, ...]} | |
| # OR {"content": "full text..."} | |
| if isinstance(data, dict): | |
| for key in ("transcript", "content", "text", "result"): | |
| if data.get(key) and isinstance(data[key], str) and data[key].strip(): | |
| return data[key].strip() | |
| # Fallback: join segments array if present | |
| for key in ("segments", "transcription", "words"): | |
| if isinstance(data.get(key), list): | |
| joined = " ".join( | |
| seg.get("text", "") for seg in data[key] | |
| if isinstance(seg, dict) | |
| ).strip() | |
| if joined: | |
| return joined | |
| raise TranscriptProviderError("RapidAPI-v2 response did not contain usable transcript content.") | |
| def _get_transcript_via_supadata(self, video_id: str) -> str: | |
| if not self._supadata_key: | |
| raise TranscriptProviderError("Supadata API key not configured.") | |
| clean_url = f"https://www.youtube.com/watch?v={video_id}" | |
| headers = { | |
| "x-api-key": self._supadata_key, | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/120.0.0.0 Safari/537.36" | |
| ), | |
| } | |
| try: | |
| resp = curl_requests.get( | |
| f"https://api.supadata.ai/v1/youtube/transcript?url={clean_url}&text=true", | |
| headers=headers, | |
| impersonate="chrome124", | |
| timeout=30, | |
| proxies=self._requests_proxies() or None, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| except Exception as exc: | |
| raise TranscriptProviderError(f"Supadata request failed: {exc}") from exc | |
| # Handle both "content" (plain text) and "segments" (structured list) | |
| text = "" | |
| content_val = data.get("content") | |
| if isinstance(content_val, str) and content_val.strip(): | |
| text = content_val.strip() | |
| elif isinstance(content_val, list): | |
| # If content is returned as a list of segments instead of text | |
| text = " ".join( | |
| s.get("text", "") for s in content_val if isinstance(s, dict) | |
| ).strip() | |
| # Fallback to "segments" key if content is empty | |
| if not text: | |
| segments = data.get("segments", []) | |
| if segments and isinstance(segments, list): | |
| text = " ".join( | |
| s.get("text", "") for s in segments if isinstance(s, dict) | |
| ).strip() | |
| if not text: | |
| raise TranscriptProviderError("Supadata response did not include usable transcript content.") | |
| logger.info("Supadata transcript fetched successfully (%d chars).", len(text)) | |
| return text | |
| def _get_transcript_via_pytubefix(self, video_id: str) -> str: | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| try: | |
| from pytubefix import YouTube | |
| except Exception as exc: | |
| raise TranscriptProviderError(f"pytubefix import failed: {exc}") from exc | |
| try: | |
| init_kwargs = self._pytubefix_init_kwargs(YouTube) | |
| yt = YouTube(url, **init_kwargs) | |
| captions = getattr(yt, "captions", None) | |
| if not captions: | |
| raise TranscriptProviderError("pytubefix returned no captions.") | |
| caption = self._select_pytubefix_caption(captions) | |
| if caption is None: | |
| raise TranscriptProviderError("pytubefix found no preferred caption track.") | |
| text = self._caption_to_text(caption) | |
| if not text: | |
| raise TranscriptProviderError("pytubefix caption track was empty.") | |
| logger.info("pytubefix captions fetched successfully (%d chars).", len(text)) | |
| return text | |
| except TranscriptProviderError: | |
| raise | |
| except Exception as exc: | |
| raise TranscriptProviderError(f"pytubefix captions failed: {exc}") from exc | |
| def _get_transcript_via_ytdlp(self, video_id: str) -> str: | |
| """ | |
| Final fallback: uses yt-dlp which is most robust and supports POT tokens. | |
| """ | |
| import yt_dlp | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| # Configure yt-dlp to be quiet and only fetch metadata/subs | |
| ydl_opts = { | |
| 'skip_download': True, | |
| 'writesubtitles': True, | |
| 'writeautomaticsubs': True, | |
| 'subtitleslangs': ['en', 'ar', 'en-US'], | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| } | |
| self._apply_youtube_network_options(ydl_opts) | |
| self._apply_cookie_options(ydl_opts) | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| subtitles = info.get('subtitles') or {} | |
| auto_subs = info.get('automatic_captions') or {} | |
| # Preferred languages in order | |
| for lang in ['en', 'en-US', 'ar']: | |
| # Try manual subs first, then auto | |
| for source in [subtitles, auto_subs]: | |
| if lang in source: | |
| # Find a format we can parse (json3 is easiest, then vtt) | |
| formats = source[lang] | |
| # Try to find json3 | |
| json3_url = next((f['url'] for f in formats if f.get('ext') == 'json3'), None) | |
| if json3_url: | |
| resp = curl_requests.get( | |
| json3_url, | |
| impersonate="chrome124", | |
| proxies=self._requests_proxies() or None, | |
| ) | |
| data = resp.json() | |
| return " ".join( | |
| seg.get('utf8', '') | |
| for event in data.get('events', []) | |
| for seg in event.get('segs', []) | |
| ).strip() | |
| # Fallback to vtt | |
| vtt_url = next((f['url'] for f in formats if f.get('ext') == 'vtt'), None) | |
| if vtt_url: | |
| resp = curl_requests.get( | |
| vtt_url, | |
| impersonate="chrome124", | |
| proxies=self._requests_proxies() or None, | |
| ) | |
| # Simple VTT parsing (strip tags and timestamps) | |
| vtt_text = resp.text | |
| lines = [] | |
| for line in vtt_text.splitlines(): | |
| if '-->' not in line and line.strip() and not line.strip().isdigit() and line != 'WEBVTT': | |
| clean = re.sub(r'<[^>]+>', '', line).strip() | |
| if clean: lines.append(clean) | |
| return " ".join(lines).strip() | |
| raise TranscriptProviderError("No usable subtitle formats found via yt-dlp.") | |
| except Exception as exc: | |
| raise TranscriptProviderError(f"yt-dlp failed: {exc}") from exc | |
| finally: | |
| cookiefile = ydl_opts.get("cookiefile") | |
| if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()): | |
| try: | |
| os.remove(cookiefile) | |
| except OSError: | |
| pass | |
| def download_audio(self, url: str, output_stem: str) -> Path: | |
| """ | |
| Download the best available audio stream for Whisper deep-scan fallback. | |
| """ | |
| import yt_dlp | |
| settings.temp_dir.mkdir(parents=True, exist_ok=True) | |
| safe_stem = re.sub(r"[^A-Za-z0-9_-]+", "_", output_stem).strip("_") or "audio" | |
| output_template = str(settings.temp_dir / f"{safe_stem}.%(ext)s") | |
| expected_audio_path = settings.temp_dir / f"{safe_stem}.mp3" | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "outtmpl": output_template, | |
| "quiet": True, | |
| "no_warnings": True, | |
| "noplaylist": True, | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "mp3", | |
| "preferredquality": "128", | |
| } | |
| ], | |
| } | |
| self._apply_youtube_network_options(ydl_opts) | |
| self._apply_cookie_options(ydl_opts) | |
| failures: List[str] = [] | |
| try: | |
| for provider_name, provider in self._build_audio_download_plan(ydl_opts): | |
| try: | |
| provider(url, safe_stem) | |
| break | |
| except Exception as exc: | |
| failures.append(f"{provider_name}: {exc}") | |
| logger.warning("%s audio extraction failed: %s", provider_name, exc) | |
| else: | |
| auth_hint = "" | |
| if self._looks_like_youtube_auth_block(failures) and not self._has_youtube_auth(): | |
| auth_hint = ( | |
| " YouTube authentication is required for this video/Space. " | |
| "Set YOUTUBE_COOKIES_B64 (recommended) or YOUTUBE_COOKIES, " | |
| "and optionally YOUTUBE_PO_TOKEN." | |
| ) | |
| raise RuntimeError(f"Audio extraction failed.{auth_hint} {' | '.join(failures)}") | |
| if expected_audio_path.exists() and expected_audio_path.stat().st_size > 0: | |
| logger.info("Audio extracted for deep scan: %s", expected_audio_path) | |
| return expected_audio_path | |
| matches = sorted(settings.temp_dir.glob(f"{safe_stem}.*")) | |
| for candidate in matches: | |
| if candidate.is_file() and candidate.stat().st_size > 0: | |
| logger.info("Audio extracted for deep scan: %s", candidate) | |
| return candidate | |
| raise RuntimeError("Audio extraction completed but no audio file was produced.") | |
| finally: | |
| cookiefile = ydl_opts.get("cookiefile") | |
| if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()): | |
| try: | |
| os.remove(cookiefile) | |
| except OSError: | |
| pass | |
| def _build_audio_download_plan(self, ydl_opts: dict) -> List[Tuple[str, Callable[[str, str], None]]]: | |
| return [ | |
| ("yt-dlp", lambda url, _safe_stem: self._download_audio_via_ytdlp(url, ydl_opts)), | |
| ("Invidious proxy", self._download_audio_via_invidious), | |
| ("pytubefix", self._download_audio_via_pytubefix), | |
| ] | |
| def _download_audio_via_ytdlp(self, url: str, ydl_opts: dict) -> None: | |
| import yt_dlp | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.extract_info(url, download=True) | |
| def _download_audio_via_pytubefix(self, url: str, safe_stem: str) -> None: | |
| from pytubefix import YouTube | |
| try: | |
| yt = YouTube(url, **self._pytubefix_init_kwargs(YouTube)) | |
| stream = yt.streams.filter(only_audio=True).order_by("abr").desc().first() | |
| if stream is None: | |
| raise RuntimeError("No audio stream returned by pytubefix.") | |
| stream.download( | |
| output_path=str(settings.temp_dir), | |
| filename=f"{safe_stem}.{stream.subtype or 'mp4'}", | |
| ) | |
| except Exception as exc: | |
| raise RuntimeError(f"pytubefix failed: {exc}") from exc | |
| def _download_audio_via_invidious(self, url: str, safe_stem: str) -> None: | |
| video_id = self._extract_video_id(url) | |
| if not video_id or video_id == "unknown": | |
| raise RuntimeError("Could not extract video ID for Invidious fallback.") | |
| failures: List[str] = [] | |
| for instance in self._invidious_instances: | |
| instance = instance.rstrip("/") | |
| try: | |
| api_url = f"{instance}/api/v1/videos/{video_id}" | |
| resp = requests.get( | |
| api_url, | |
| headers=self._browser_headers(), | |
| proxies=self._requests_proxies() or None, | |
| timeout=20, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| audio_formats = self._extract_invidious_audio_formats(data) | |
| if not audio_formats: | |
| raise RuntimeError("No audio formats in Invidious response.") | |
| selected = audio_formats[0] | |
| itag = selected.get("itag") | |
| download_url = ( | |
| f"{instance}/latest_version?id={video_id}&itag={itag}&local=true" | |
| if itag | |
| else selected.get("url", "") | |
| ) | |
| if not download_url: | |
| raise RuntimeError("No downloadable audio URL in Invidious response.") | |
| extension = self._extension_from_mime(selected.get("type", "audio/webm")) | |
| output_path = settings.temp_dir / f"{safe_stem}.{extension}" | |
| self._stream_download(download_url, output_path) | |
| logger.info("Invidious audio extracted via %s: %s", instance, output_path) | |
| return | |
| except Exception as exc: | |
| failures.append(f"{instance}: {exc}") | |
| logger.warning("Invidious instance failed for audio extraction: %s", exc) | |
| raise RuntimeError("All Invidious instances failed. " + " | ".join(failures)) | |
| def _extract_invidious_audio_formats(self, data: dict) -> List[dict]: | |
| formats = data.get("adaptiveFormats") or data.get("formatStreams") or [] | |
| audio_formats = [ | |
| item | |
| for item in formats | |
| if isinstance(item, dict) | |
| and str(item.get("type", "")).startswith("audio/") | |
| and (item.get("itag") or item.get("url")) | |
| ] | |
| return sorted( | |
| audio_formats, | |
| key=lambda item: int(item.get("bitrate") or item.get("bitrateBps") or 0), | |
| reverse=True, | |
| ) | |
| def _stream_download(self, url: str, output_path: Path) -> None: | |
| with requests.get( | |
| url, | |
| headers=self._browser_headers(), | |
| proxies=self._requests_proxies() or None, | |
| stream=True, | |
| timeout=60, | |
| ) as resp: | |
| resp.raise_for_status() | |
| content_type = resp.headers.get("Content-Type", "").lower() | |
| if "text/html" in content_type or "application/json" in content_type: | |
| raise RuntimeError(f"Unexpected audio response type: {content_type}") | |
| with output_path.open("wb") as audio_file: | |
| for chunk in resp.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| audio_file.write(chunk) | |
| if not output_path.exists() or output_path.stat().st_size == 0: | |
| raise RuntimeError("Downloaded audio file is empty.") | |
| def _extension_from_mime(self, mime_type: str) -> str: | |
| if "mp4" in mime_type or "m4a" in mime_type: | |
| return "m4a" | |
| if "mpeg" in mime_type or "mp3" in mime_type: | |
| return "mp3" | |
| if "ogg" in mime_type: | |
| return "ogg" | |
| return "webm" | |
| def _join_transcript_entries(self, entries) -> str: | |
| texts = [] | |
| for entry in entries: | |
| if isinstance(entry, dict): | |
| text = entry.get("text", "") | |
| else: | |
| text = getattr(entry, "text", "") | |
| if text: | |
| texts.append(str(text)) | |
| return " ".join(texts).strip() | |
| def _build_requests_session(self, cookie_file: Path | None = None) -> requests.Session: | |
| session = requests.Session() | |
| session.headers.update(self._browser_headers()) | |
| proxies = self._requests_proxies() | |
| if proxies: | |
| session.proxies.update(proxies) | |
| if cookie_file: | |
| try: | |
| cookie_jar = http.cookiejar.MozillaCookieJar(str(cookie_file)) | |
| cookie_jar.load(ignore_discard=True, ignore_expires=True) | |
| session.cookies.update(cookie_jar) | |
| except Exception as exc: | |
| logger.warning("Could not load YouTube cookies from %s: %s", cookie_file, exc) | |
| return session | |
| def _pytubefix_init_kwargs(self, youtube_cls) -> dict: | |
| kwargs = {} | |
| try: | |
| params = inspect.signature(youtube_cls).parameters | |
| except (TypeError, ValueError): | |
| params = {} | |
| if "use_oauth" in params: | |
| kwargs["use_oauth"] = False | |
| if "allow_oauth_cache" in params: | |
| kwargs["allow_oauth_cache"] = True | |
| if self._proxy_url: | |
| if "proxies" in params: | |
| kwargs["proxies"] = self._requests_proxies() | |
| elif "proxy" in params: | |
| kwargs["proxy"] = self._proxy_url | |
| return kwargs | |
| def _select_pytubefix_caption(self, captions): | |
| preferred_codes = ["en", "a.en", "en-US", "a.en-US", "ar", "a.ar"] | |
| for code in preferred_codes: | |
| try: | |
| return captions[code] | |
| except Exception: | |
| pass | |
| getter = getattr(captions, "get_by_language_code", None) | |
| if callable(getter): | |
| try: | |
| caption = getter(code) | |
| if caption is not None: | |
| return caption | |
| except Exception: | |
| pass | |
| try: | |
| for caption in captions: | |
| if not isinstance(caption, str): | |
| return caption | |
| try: | |
| return captions[caption] | |
| except Exception: | |
| pass | |
| except Exception: | |
| return None | |
| return None | |
| def _caption_to_text(self, caption) -> str: | |
| srt_method = getattr(caption, "generate_srt_captions", None) | |
| if callable(srt_method): | |
| return self._strip_srt(srt_method()) | |
| for attr_name in ("xml_captions", "xml_caption", "caption_xml"): | |
| value = getattr(caption, attr_name, None) | |
| if value: | |
| return self._strip_markup(str(value)) | |
| json_value = getattr(caption, "json_captions", None) | |
| if json_value: | |
| try: | |
| data = json.loads(json_value) if isinstance(json_value, str) else json_value | |
| return self._join_caption_json(data) | |
| except Exception: | |
| pass | |
| return self._strip_markup(str(caption)) | |
| def _strip_srt(self, srt_text: str) -> str: | |
| lines = [] | |
| for line in srt_text.splitlines(): | |
| stripped = line.strip() | |
| if not stripped or stripped.isdigit() or "-->" in stripped: | |
| continue | |
| lines.append(stripped) | |
| return " ".join(lines).strip() | |
| def _strip_markup(self, value: str) -> str: | |
| no_tags = re.sub(r"<[^>]+>", " ", value) | |
| return re.sub(r"\s+", " ", html.unescape(no_tags)).strip() | |
| def _join_caption_json(self, data) -> str: | |
| texts = [] | |
| for event in data.get("events", []) if isinstance(data, dict) else []: | |
| for segment in event.get("segs", []) or []: | |
| text = segment.get("utf8", "") | |
| if text: | |
| texts.append(text) | |
| return " ".join(texts).strip() | |
| def _apply_cookie_options(self, ydl_opts: dict) -> None: | |
| cookie_b64 = os.getenv("YOUTUBE_COOKIES_B64") | |
| if cookie_b64: | |
| import tempfile, base64 | |
| try: | |
| cookie_bytes = base64.b64decode(cookie_b64) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f: | |
| f.write(cookie_bytes) | |
| cookie_path = f.name | |
| ydl_opts["cookiefile"] = cookie_path | |
| except Exception as exc: | |
| logger.warning("Failed to decode YOUTUBE_COOKIES_B64: %s", exc) | |
| else: | |
| cookie_file = self._resolve_cookie_file() | |
| if cookie_file: | |
| ydl_opts["cookiefile"] = str(cookie_file) | |
| def _apply_youtube_network_options(self, ydl_opts: dict) -> None: | |
| youtube_args = { | |
| "player_client": ["android", "web_safari", "tv"], | |
| } | |
| po_tokens = self._build_po_token_args() | |
| if po_tokens: | |
| youtube_args["po_token"] = po_tokens | |
| ydl_opts.update( | |
| { | |
| "source_address": "0.0.0.0", | |
| "socket_timeout": 30, | |
| "retries": 5, | |
| "fragment_retries": 5, | |
| "geo_bypass": True, | |
| "http_headers": self._browser_headers(), | |
| "extractor_args": { | |
| "youtube": youtube_args, | |
| }, | |
| } | |
| ) | |
| if self._proxy_url: | |
| ydl_opts["proxy"] = self._proxy_url | |
| def _browser_headers(self) -> dict: | |
| return { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/124.0.0.0 Safari/537.36" | |
| ), | |
| "Accept-Language": "en-US,en;q=0.9", | |
| } | |
| def _requests_proxies(self) -> dict: | |
| if not self._proxy_url: | |
| return {} | |
| return { | |
| "http": self._proxy_url, | |
| "https": self._proxy_url, | |
| } | |
| def _configure_proxy_environment(self) -> None: | |
| if not self._proxy_url: | |
| return | |
| for key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"): | |
| os.environ.setdefault(key, self._proxy_url) | |
| def _load_invidious_instances(self) -> List[str]: | |
| configured = os.environ.get("INVIDIOUS_INSTANCES", "").strip() | |
| if configured: | |
| return [ | |
| item.strip().rstrip("/") | |
| for item in configured.split(",") | |
| if item.strip() | |
| ] | |
| return [ | |
| "https://yewtu.be", | |
| "https://inv.nadeko.net", | |
| "https://invidious.privacyredirect.com", | |
| "https://vid.puffyan.us", | |
| ] | |
| def _build_po_token_args(self) -> List[str]: | |
| if not self._youtube_po_token: | |
| return [] | |
| raw_tokens = [ | |
| token.strip() | |
| for token in re.split(r"[\n,]+", self._youtube_po_token) | |
| if token.strip() | |
| ] | |
| if not raw_tokens: | |
| return [] | |
| po_tokens = [] | |
| for token in raw_tokens: | |
| if "+" in token: | |
| po_tokens.append(token) | |
| else: | |
| client = self._youtube_po_token_client or "web" | |
| context = self._youtube_po_token_context or "gvs" | |
| po_tokens.append(f"{client}.{context}+{token}") | |
| return po_tokens | |
| def _has_youtube_auth(self) -> bool: | |
| return bool( | |
| self._youtube_cookies | |
| or self._youtube_cookies_b64 | |
| or self._youtube_cookies_path | |
| or self._youtube_cookies_file | |
| or self._youtube_po_token | |
| or self._proxy_url | |
| ) | |
| def _looks_like_youtube_auth_block(self, failures: List[str]) -> bool: | |
| combined = " ".join(failures).lower() | |
| return any( | |
| marker in combined | |
| for marker in ( | |
| "sign in to confirm", | |
| "detected as a bot", | |
| "po_token", | |
| "bot", | |
| "forbidden", | |
| "403", | |
| ) | |
| ) | |
| def _resolve_cookie_file(self) -> Path | None: | |
| if self._youtube_cookies_path: | |
| cookie_path = Path(self._youtube_cookies_path) | |
| if cookie_path.exists(): | |
| return cookie_path | |
| logger.warning("YOUTUBE_COOKIES_PATH is set but does not exist: %s", cookie_path) | |
| if self._youtube_cookies_file: | |
| cookie_path = Path(self._youtube_cookies_file) | |
| if cookie_path.exists(): | |
| return cookie_path | |
| logger.warning("YOUTUBE_COOKIES_FILE is set but does not exist: %s", cookie_path) | |
| cookie_text = self._youtube_cookies | |
| if not cookie_text and self._youtube_cookies_b64: | |
| try: | |
| cookie_text = base64.b64decode(self._youtube_cookies_b64).decode("utf-8") | |
| except (binascii.Error, UnicodeDecodeError) as exc: | |
| logger.warning("YOUTUBE_COOKIES_B64 could not be decoded: %s", exc) | |
| return None | |
| if not cookie_text: | |
| return None | |
| settings.temp_dir.mkdir(parents=True, exist_ok=True) | |
| cookie_path = settings.temp_dir / "youtube_cookies.txt" | |
| cookie_text = cookie_text.replace("\\n", "\n") | |
| if not cookie_text.endswith("\n"): | |
| cookie_text += "\n" | |
| cookie_path.write_text(cookie_text, encoding="utf-8") | |
| return cookie_path | |
| def cleanup(self, path=None): | |
| if path is None: | |
| return | |
| try: | |
| audio_path = Path(path) | |
| if audio_path.exists() and audio_path.is_file(): | |
| audio_path.unlink() | |
| except Exception as exc: | |
| logger.warning("Failed to clean up temporary audio file %s: %s", path, exc) | |
| def _extract_video_id(self, url: str) -> str: | |
| match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url)) | |
| return match.group(1) if match else "unknown" | |