import spaces import torch import gradio as gr import yt_dlp as youtube_dl from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read import tempfile import os import time import numpy as np import google.generativeai as genai from dotenv import load_dotenv load_dotenv() MODEL_NAME = "openai/whisper-large-v3-turbo" BATCH_SIZE = 8 FILE_LIMIT_MB = 5000 # 5GB YT_LENGTH_LIMIT_S = 7200 # 2 hours device = 0 if torch.cuda.is_available() else "cpu" pipe = pipeline( task="automatic-speech-recognition", model=MODEL_NAME, device=device, ignore_warning=True, model_kwargs={"torch_dtype": torch.float16} if torch.cuda.is_available() else {}, chunk_length_s=20, # small chunks to fit ZeroGPU ) def _concat_text(chunks): return " ".join([c.strip() for c in chunks if c and c.strip()]) def _format_transcript(text: str, target_chars: int = 280) -> str: """Format raw transcript into readable paragraphs. - Splits into sentences on punctuation boundaries. - Groups sentences into paragraphs targeting ~target_chars. - Normalizes whitespace; ensures blank lines between paragraphs. """ import re if not text: return text # Normalize spaces t = re.sub(r"\s+", " ", text).strip() # Split on sentence boundaries while keeping delimiters parts = re.split(r"(?<=[\.!?])\s+", t) paras, cur, cur_len = [], [], 0 for s in parts: if not s: continue cur.append(s) cur_len += len(s) + 1 if cur_len >= target_chars: paras.append(" ".join(cur)) cur, cur_len = [], 0 if cur: paras.append(" ".join(cur)) return "\n\n".join(paras) def _clean_summary(text: str) -> str: """Remove boilerplate like "Here's a summary...", "Summary:", "TL;DR:" from the top of summaries.""" import re if not text: return text lines = text.strip().splitlines() pat = re.compile(r"^(here\s*(?:is|'s)\s+(?:a|the)\s+summary.*|summary\s*:|tl;dr\s*:|overall\s*summary\s*:|in\s+summary\s*:|to\s+summarize\s*:)$", re.IGNORECASE) while lines and pat.match(lines[0].strip()): lines.pop(0) while lines and not lines[0].strip(): lines.pop(0) return "\n".join(lines).strip() def _transcribe_chunk(chunk: np.ndarray, sr: int, task: str, max_retries: int = 3) -> str: """Transcribe a single chunk with retries and simple backoff.""" delay = 2.0 for attempt in range(max_retries): try: out = pipe({"array": chunk, "sampling_rate": sr}, batch_size=1, generate_kwargs={"task": task}) return out["text"] except Exception: if attempt == max_retries - 1: raise time.sleep(delay) delay *= 1.8 def _robust_transcribe_array(audio_array: np.ndarray, sr: int, task: str) -> str: """Transcribe long/large audio by chunking sequentially to minimize GPU memory. Uses conservative chunking (20s) with 2s overlap, batch_size=1. """ if audio_array.ndim > 1: audio_array = np.mean(audio_array, axis=1) chunk_s = 20 overlap_s = 2 step = int((chunk_s - overlap_s) * sr) win = int(chunk_s * sr) texts = [] if len(audio_array) <= win: return _format_transcript(_transcribe_chunk(audio_array, sr, task)) start = 0 while start < len(audio_array): end = min(start + win, len(audio_array)) chunk = audio_array[start:end] txt = _transcribe_chunk(chunk, sr, task) texts.append(txt) if end == len(audio_array): break start += step return _format_transcript(_concat_text(texts)) def _robust_transcribe_array_stream(audio_array: np.ndarray, sr: int, task: str): """Generator: yields cumulative transcription after each chunk.""" if audio_array.ndim > 1: audio_array = np.mean(audio_array, axis=1) chunk_s = 20 overlap_s = 2 step = int((chunk_s - overlap_s) * sr) win = int(chunk_s * sr) texts = [] if len(audio_array) <= win: texts.append(_transcribe_chunk(audio_array, sr, task)) yield _format_transcript(_concat_text(texts)) return start = 0 while start < len(audio_array): end = min(start + win, len(audio_array)) chunk = audio_array[start:end] txt = _transcribe_chunk(chunk, sr, task) texts.append(txt) yield _format_transcript(_concat_text(texts)) if end == len(audio_array): break start += step def _robust_transcribe_path(path: str, task: str) -> str: sr = pipe.feature_extractor.sampling_rate # ffmpeg_read expects raw bytes, not a file path with open(path, "rb") as _f: payload = _f.read() audio = ffmpeg_read(payload, sr) try: return _robust_transcribe_array(audio, sr, task) except Exception as e: # last-chance: shrink chunk and retry small windows try: small_chunk = 10 step = int(8 * sr) win = int(small_chunk * sr) texts = [] pos = 0 while pos < len(audio): sub = audio[pos:pos+win] out = pipe({"array": sub, "sampling_rate": sr}, batch_size=1, generate_kwargs={"task": task}) texts.append(out["text"]) if pos + win >= len(audio): break pos += step return _concat_text(texts) except Exception as e2: raise gr.Error(f"Transcription failed after retries: {e2}") @spaces.GPU(duration=120) def transcribe(inputs, task, summarize=False): if inputs is None: raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") try: if isinstance(inputs, str): text = _robust_transcribe_path(inputs, task) elif isinstance(inputs, dict) and "array" in inputs: text = _robust_transcribe_array(inputs["array"], inputs.get("sampling_rate", pipe.feature_extractor.sampling_rate), task) else: text = pipe(inputs, batch_size=1, generate_kwargs={"task": task})["text"] text = _format_transcript(text) except Exception as e: raise gr.Error(f"Transcription failed: {e}") summary = "" if summarize: try: summary = summarize_with_gemini(text) except Exception as e: summary = f"Summary error: {e}" tf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) tf.write(text) tf.close() sf_path = None if summary: sf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) sf.write(summary) sf.close() sf_path = sf.name return text, summary, tf.name, sf_path def _return_yt_html_embed(yt_url): video_id = yt_url.split("?v=")[-1] HTML_str = ( f'
' "
" ) return HTML_str def download_yt_audio(yt_url, filename, cookies_txt: str | None = None): info_loader = youtube_dl.YoutubeDL() try: info = info_loader.extract_info(yt_url, download=False) except youtube_dl.utils.DownloadError as err: raise gr.Error(str(err)) file_length = info["duration_string"] file_h_m_s = file_length.split(":") file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] if len(file_h_m_s) == 1: file_h_m_s.insert(0, 0) if len(file_h_m_s) == 2: file_h_m_s.insert(0, 0) file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] if file_length_s > YT_LENGTH_LIMIT_S: yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") ydl_opts = { "outtmpl": filename, "format": "bestaudio/best", "quiet": True, "noplaylist": True, "retries": 3, } cookie_path = None if cookies_txt and cookies_txt.strip(): tf = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) tf.write(cookies_txt) tf.close() cookie_path = tf.name ydl_opts["cookiefile"] = cookie_path try: with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: ydl.download([yt_url]) except youtube_dl.utils.ExtractorError as err: raise gr.Error(str(err)) finally: if cookie_path and os.path.exists(cookie_path): os.unlink(cookie_path) @spaces.GPU(duration=120) def yt_transcribe(yt_url, task, summarize=False, cookies_txt=None, max_filesize=75.0): html_embed_str = _return_yt_html_embed(yt_url) with tempfile.TemporaryDirectory() as tmpdirname: filepath = os.path.join(tmpdirname, "video.mp4") try: download_yt_audio(yt_url, filepath, cookies_txt=cookies_txt) except gr.Error as e: raise gr.Error(str(e) + "\n\nTip: Provide exported YouTube cookies (Netscape format) in the optional cookies box if the video requires sign-in or captcha.") with open(filepath, "rb") as f: inputs = f.read() inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} try: text = _robust_transcribe_array(inputs["array"], inputs["sampling_rate"], task) except Exception as e: raise gr.Error(f"Transcription failed: {e}") summary = "" if summarize: try: summary = summarize_with_gemini(text) except Exception as e: summary = f"Summary error: {e}" # Create download files tf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) tf.write(_format_transcript(text)) tf.close() sf_path = None if summary: sf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) sf.write(summary) sf.close() sf_path = sf.name return html_embed_str, text, summary, tf.name, sf_path demo = gr.Blocks() mf_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(sources="microphone", type="filepath"), gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), gr.Checkbox(label="Summarize with Gemini", value=False), ], outputs=[ gr.Textbox(label="Transcription"), gr.Textbox(label="Summary"), gr.File(label="Download transcription (transcribe.txt)"), gr.File(label="Download summary (summarise.txt)") ], title="Whisper Large V3: Microphone", description=( "Transcribe long-form microphone or audio inputs." ), flagging_mode="never", ) file_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(sources="upload", type="filepath", label="Audio file"), gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), gr.Checkbox(label="Summarize with Gemini", value=False), ], outputs=[ gr.Textbox(label="Transcription"), gr.Textbox(label="Summary"), gr.File(label="Download transcription (transcribe.txt)"), gr.File(label="Download summary (summarise.txt)") ], title="Whisper Large V3: Audio file", description=( "Transcribe long-form microphone or audio inputs." ), flagging_mode="never", ) yt_transcribe = gr.Interface( fn=yt_transcribe, inputs=[ gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), gr.Checkbox(label="Summarize with Gemini", value=False), gr.Textbox(lines=4, placeholder="Optional: paste exported YouTube cookies in Netscape format here if the video requires sign-in.", label="YouTube cookies (optional)"), ], outputs=[ "html", gr.Textbox(label="Transcription"), gr.Textbox(label="Summary"), gr.File(label="Download transcription (transcribe.txt)"), gr.File(label="Download summary (summarise.txt)") ], title="Whisper Large V3: Transcribe YouTube", description=( "Transcribe long-form YouTube videos." ), flagging_mode="never", ) with demo: gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) # ---------------- Gemini setup (flash-lite only) ----------------- GEMINI_API_KEYS = [ os.getenv("GEMINI_API_1"), os.getenv("GEMINI_API_2"), os.getenv("GEMINI_API_3"), os.getenv("GEMINI_API_4"), os.getenv("GEMINI_API_5"), ] GEMINI_API_KEYS = [k for k in GEMINI_API_KEYS if k] _gem_idx = 0 def _gem_model(): global _gem_idx if not GEMINI_API_KEYS: return None api_key = GEMINI_API_KEYS[_gem_idx] _gem_idx = (_gem_idx + 1) % len(GEMINI_API_KEYS) genai.configure(api_key=api_key) try: return genai.GenerativeModel("gemini-2.5-flash-lite") except Exception: return genai.GenerativeModel("gemini-2.5-flash") def _count_tokens_safe(text: str) -> int: try: return genai.count_tokens(text).total_tokens # type: ignore[attr-defined] except Exception: return max(1, len(text) // 4) def summarize_with_gemini(text: str) -> str: if not text or not text.strip(): return "" model = _gem_model() if model is None: return "" max_chunk_tokens = 6000 if _count_tokens_safe(text) <= max_chunk_tokens: prompt = ( "You are an expert content summarizer. Preserve key information and decisions, " "remove filler and smalltalk. Produce a clear, well-structured summary.\n\n" + text ) resp = model.generate_content(prompt) raw = getattr(resp, "text", "") or "" return _clean_summary(raw) import re segs = re.split(r"(\n\n+|\.\s+)", text) chunks, cur, cur_tok = [], [], 0 for s in segs: t = _count_tokens_safe(s) if cur_tok + t > max_chunk_tokens and cur: chunks.append("".join(cur)) cur, cur_tok = [s], t else: cur.append(s) cur_tok += t if cur: chunks.append("".join(cur)) parts = [] for ch in chunks: prompt = ( "You are an expert content summarizer. Preserve key information and decisions, " "remove filler and smalltalk. Produce a clear, well-structured summary.\n\n" + ch ) m = _gem_model() if m is None: continue r = m.generate_content(prompt) parts.append(_clean_summary(getattr(r, "text", "") or "")) combined = "\n\n".join([p for p in parts if p]) if _count_tokens_safe(combined) > max_chunk_tokens: m2 = _gem_model() if m2 is not None: r2 = m2.generate_content( "Tighten the following combined summaries without losing key points:\n\n" + combined ) combined = _clean_summary(getattr(r2, "text", "") or combined) return combined demo.queue().launch()