import logging import json import urllib.request import tempfile import subprocess from pathlib import Path # Load .env file if exists _env_path = Path(__file__).parent / ".env" if _env_path.exists(): for line in _env_path.read_text(encoding="utf-8").splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) import os as _os _os.environ.setdefault(k.strip(), v.strip()) from fastapi import FastAPI from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from youtube_transcript_api import YouTubeTranscriptApi import re import time import asyncio from concurrent.futures import ThreadPoolExecutor import os import urllib.parse import requests as _requests_mod import shutil logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="YouTube Transcript Extractor") # Version: 3.3.0 - Network intercept + ffmpeg direct URL audio extraction app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) _executor = ThreadPoolExecutor(max_workers=5) _fetch_semaphore = asyncio.Semaphore(3) # max 3 concurrent YouTube fetches # --- Groq Whisper API (Instagram STT) --- _groq_api_key = os.environ.get("GROQ_API_KEY", "") _groq_client = None if _groq_api_key: from groq import Groq _groq_client = Groq(api_key=_groq_api_key) logger.info("Groq Whisper API initialized") else: logger.info("GROQ_API_KEY not set, Instagram transcription disabled") _ig_semaphore = asyncio.Semaphore(2) # max 2 concurrent Instagram transcriptions # Check if ffmpeg is available for audio extraction _has_ffmpeg = shutil.which('ffmpeg') is not None if _has_ffmpeg: logger.info("ffmpeg found - will extract audio directly from URL") else: logger.info("ffmpeg not found - will download full video for Groq") # --- Proxy support (optional PROXY_URL env var) --- _proxy_url = os.environ.get("PROXY_URL", "").strip() _proxy_config = None if _proxy_url: from youtube_transcript_api.proxies import GenericProxyConfig _proxy_config = GenericProxyConfig( http_url=_proxy_url, https_url=_proxy_url, ) logger.info(f"Using proxy: {_proxy_url[:30]}...") # --- Cloudflare Worker proxy support (WORKER_URL env var) --- # NOTE: Disabled on HF Spaces — .workers.dev DNS not resolvable from HF infra _worker_url_raw = os.environ.get("WORKER_URL", "").strip() _worker_url = "" # Force direct connection on HF Spaces if _worker_url_raw: logger.info(f"WORKER_URL found but disabled for HF Spaces: {_worker_url_raw[:50]}") else: logger.info("No WORKER_URL set, using direct YouTube connection") class _WorkerProxySession(_requests_mod.Session): """Routes requests through a Cloudflare Worker to bypass YouTube IP blocks.""" def __init__(self, worker_url): super().__init__() self._worker_url = worker_url.rstrip('/') def request(self, method, url, **kwargs): if url.startswith('http'): proxied = f"{self._worker_url}/?url={urllib.parse.quote(url, safe='')}" return super().request(method, proxied, **kwargs) return super().request(method, url, **kwargs) # --- API instances: plain (no cookies) + with cookies (fallback) --- if _worker_url: _worker_session = _WorkerProxySession(_worker_url) _yt_api = YouTubeTranscriptApi(http_client=_worker_session) logger.info(f"Using Cloudflare Worker proxy: {_worker_url}") elif _proxy_config: _yt_api = YouTubeTranscriptApi(proxy_config=_proxy_config) else: _yt_api = YouTubeTranscriptApi() _yt_api_cookies = None _cookie_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cookies.txt") try: if not os.path.exists(_cookie_path): import base64 cookies_b64 = os.environ.get("YOUTUBE_COOKIES_BASE64", "") if cookies_b64: _tmp_cookie = "/tmp/cookies.txt" with open(_tmp_cookie, "wb") as f: f.write(base64.b64decode(cookies_b64)) _cookie_path = _tmp_cookie logger.info("Created cookies.txt from YOUTUBE_COOKIES_BASE64 env var") if os.path.exists(_cookie_path): import http.cookiejar import requests _cookie_jar = http.cookiejar.MozillaCookieJar(_cookie_path) _cookie_jar.load(ignore_discard=True, ignore_expires=True) if _worker_url: _session = _WorkerProxySession(_worker_url) else: _session = requests.Session() _session.cookies = _cookie_jar if _proxy_url and not _worker_url: _session.proxies = {"http": _proxy_url, "https": _proxy_url} _yt_api_cookies = YouTubeTranscriptApi(http_client=_session) logger.info(f"Cookies loaded from {_cookie_path} (used as fallback, worker={'yes' if _worker_url else 'no'})") else: logger.info("No cookies found, running without cookies") except Exception as e: logger.error(f"Failed to load cookies: {e}") class TranscriptRequest(BaseModel): urls: list[str] language: str = "auto" denoise: bool = False format: str = "text" # text, json, srt, vtt keep_newlines: bool = False timestamps: bool = False class PlaylistRequest(BaseModel): url: str class FeedbackRequest(BaseModel): message: str type: str = "general" def extract_video_id(url: str) -> str | None: url = url.strip() if not url: return None url = re.sub(r'[&?](si|feature|utm_\w+|fbclid|gclid)=[^&]*', '', url) patterns = [ r"(?:(?:m\.)?youtube\.com/watch\?.*v=)([a-zA-Z0-9_-]{11})", r"(?:youtu\.be/)([a-zA-Z0-9_-]{11})", r"(?:youtube\.com/embed/)([a-zA-Z0-9_-]{11})", r"(?:youtube\.com/shorts/)([a-zA-Z0-9_-]{11})", r"^([a-zA-Z0-9_-]{11})$", ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def detect_platform(url: str) -> tuple[str, str | None]: """Returns (platform, content_id) tuple.""" url = url.strip() if not url: return ("unknown", None) ig_match = re.search( r"(?:instagram\.com|instagr\.am)/(?:reel|reels|p|tv)/([A-Za-z0-9_-]+)", url ) if ig_match: return ("instagram", ig_match.group(1)) yt_id = extract_video_id(url) if yt_id: return ("youtube", yt_id) return ("unknown", None) def _fetch_title(video_id: str) -> str | None: try: oembed_url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" req = urllib.request.Request(oembed_url) with urllib.request.urlopen(req, timeout=5) as response: data = json.loads(response.read().decode("utf-8")) return data.get("title") except Exception: return None KOREAN_FILLERS = { "어", "음", "그", "아", "네", "예", "에", "으", "흠", "어어", "음음", "아아", "네네", "예예", } NOISE_PATTERN = re.compile(r"^\[.*\]$") def denoise_text(text: str) -> str: lines = text.split("\n") result = [] prev = None for line in lines: stripped = line.strip() if not stripped: continue if stripped in KOREAN_FILLERS: continue if NOISE_PATTERN.match(stripped): continue if stripped == prev: continue result.append(stripped) prev = stripped return "\n".join(result) def _format_ts_short(seconds: float) -> str: m = int(seconds // 60) s = int(seconds % 60) return f"{m}:{s:02d}" def _format_ts_srt(seconds: float) -> str: h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def _format_ts_vtt(seconds: float) -> str: h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" def _format_srt(entries: list[dict]) -> str: lines = [] for i, e in enumerate(entries, 1): start = _format_ts_srt(e["start"]) end = _format_ts_srt(e["start"] + e["duration"]) lines.append(str(i)) lines.append(f"{start} --> {end}") lines.append(e["text"]) lines.append("") return "\n".join(lines) def _format_vtt(entries: list[dict]) -> str: lines = ["WEBVTT", ""] for e in entries: start = _format_ts_vtt(e["start"]) end = _format_ts_vtt(e["start"] + e["duration"]) lines.append(f"{start} --> {end}") lines.append(e["text"]) lines.append("") return "\n".join(lines) def _format_error(error_msg: str) -> str: if "No transcripts" in error_msg or "Could not retrieve" in error_msg: return f"No subtitles found. ({error_msg[:200]})" elif "disabled" in error_msg.lower(): return "Subtitles are disabled for this video." elif "unavailable" in error_msg.lower(): return "Video not found." return error_msg def _fetch_transcript(video_id: str, language: str, denoise: bool, fmt: str, keep_newlines: bool = False, timestamps: bool = False) -> dict: if language == "auto": languages = ["en", "ko", "ja", "es", "pt"] else: languages = [language] apis_to_try = [("plain", _yt_api)] if _yt_api_cookies: apis_to_try.append(("cookies", _yt_api_cookies)) def _process_result(data): entries = [ {"text": e.text, "start": e.start, "duration": e.duration} for e in data ] if denoise: deduped = [] prev_text = None for entry in entries: txt = entry["text"].strip() if txt in KOREAN_FILLERS or NOISE_PATTERN.match(txt): continue if txt == prev_text: continue if txt: entry["text"] = txt deduped.append(entry) prev_text = txt entries = deduped if fmt == "json": return {"transcript": entries, "error": None} elif fmt == "srt": return {"transcript": _format_srt(entries), "error": None} elif fmt == "vtt": return {"transcript": _format_vtt(entries), "error": None} else: # text if timestamps: lines = [] for e in entries: ts = _format_ts_short(e["start"]) lines.append("[" + ts + "] " + e["text"]) text = "\n".join(lines) else: separator = "\n" if keep_newlines else " " text = separator.join(e["text"] for e in entries) if not keep_newlines: text = " ".join(text.split()) return {"transcript": text, "error": None} max_retries = 4 for attempt in range(max_retries): last_error = None for api_name, api in apis_to_try: try: data = api.fetch(video_id, languages=languages) return _process_result(data) except Exception as e: last_error = str(e) logger.warning(f"[{api_name}] attempt {attempt+1} Failed for {video_id}: {last_error[:200]}") if "No transcripts" in last_error or "disabled" in last_error.lower(): return {"transcript": None, "error": _format_error(last_error)} if attempt < max_retries - 1: delay = 2 ** (attempt + 1) logger.info(f"Retrying {video_id} after {delay}s delay (attempt {attempt+1})") time.sleep(delay) for api_name, api in apis_to_try: try: logger.info(f"[{api_name}] Trying without language filter for {video_id}") data = api.fetch(video_id) return _process_result(data) except Exception as e: logger.warning(f"[{api_name}] No-lang fallback failed for {video_id}: {str(e)[:200]}") for api_name, api in apis_to_try: try: logger.info(f"[{api_name}] Listing transcripts for {video_id}") transcript_list = api.list(video_id) for lang in languages: for t in transcript_list: if t.language_code == lang: data = t.fetch() return _process_result(data) for t in transcript_list: data = t.fetch() return _process_result(data) except Exception as e: logger.warning(f"[{api_name}] List fallback failed for {video_id}: {str(e)[:200]}") return {"transcript": None, "error": _format_error(last_error or "Unknown error")} # --------------------------------------------------------------------------- # Instagram video URL extraction: 2-tier cascade # 1. Playwright embed page (cookie-free) + network intercept # 2. Playwright full page with cookies (fallback for private/restricted) # # Optimizations: # - Dedicated single-thread executor for Playwright (thread-safety) # - Persistent browser instance pre-warmed at startup # - Network intercept captures CDN URL before DOM renders (fastest) # - ffmpeg extracts audio directly from URL (skip full video download) # --------------------------------------------------------------------------- _pw_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix='playwright') _ig_browser = None _ig_pw = None def _pw_init_browser(): """Initialize persistent browser. Must run inside _pw_executor thread.""" global _ig_browser, _ig_pw if _ig_browser and _ig_browser.is_connected(): return _ig_browser if _ig_pw: try: _ig_pw.stop() except Exception: pass from playwright.sync_api import sync_playwright _ig_pw = sync_playwright().start() _ig_browser = _ig_pw.chromium.launch( headless=True, args=['--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu'], ) logger.info("[instagram] Launched persistent Chromium browser") return _ig_browser # Pre-warm browser at import time (fire-and-forget; failure is non-fatal) try: _pw_executor.submit(_pw_init_browser) except Exception: logger.warning("[instagram] Failed to submit browser pre-warm task") def _pw_extract_embed(shortcode): """Run inside _pw_executor thread. Extract video URL from embed page via DOM.""" browser = _pw_init_browser() ctx = browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', viewport={'width': 1280, 'height': 720}, ) page = ctx.new_page() page.goto( f'https://www.instagram.com/p/{shortcode}/embed/', wait_until='domcontentloaded', timeout=15000, ) # Wait for