Spaces:
Sleeping
Sleeping
| import spaces | |
| import torch | |
| import gradio as gr | |
| import yt_dlp as youtube_dl | |
| from transformers import pipeline | |
| from transformers.pipelines.audio_utils import ffmpeg_read | |
| import tempfile | |
| import os | |
| import time | |
| import numpy as np | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| MODEL_NAME = "openai/whisper-large-v3-turbo" | |
| BATCH_SIZE = 8 | |
| FILE_LIMIT_MB = 5000 # 5GB | |
| YT_LENGTH_LIMIT_S = 7200 # 2 hours | |
| device = 0 if torch.cuda.is_available() else "cpu" | |
| pipe = pipeline( | |
| task="automatic-speech-recognition", | |
| model=MODEL_NAME, | |
| device=device, | |
| ignore_warning=True, | |
| model_kwargs={"torch_dtype": torch.float16} if torch.cuda.is_available() else {}, | |
| chunk_length_s=20, # small chunks to fit ZeroGPU | |
| ) | |
| def _concat_text(chunks): | |
| return " ".join([c.strip() for c in chunks if c and c.strip()]) | |
| def _format_transcript(text: str, target_chars: int = 280) -> str: | |
| """Format raw transcript into readable paragraphs. | |
| - Splits into sentences on punctuation boundaries. | |
| - Groups sentences into paragraphs targeting ~target_chars. | |
| - Normalizes whitespace; ensures blank lines between paragraphs. | |
| """ | |
| import re | |
| if not text: | |
| return text | |
| # Normalize spaces | |
| t = re.sub(r"\s+", " ", text).strip() | |
| # Split on sentence boundaries while keeping delimiters | |
| parts = re.split(r"(?<=[\.!?])\s+", t) | |
| paras, cur, cur_len = [], [], 0 | |
| for s in parts: | |
| if not s: | |
| continue | |
| cur.append(s) | |
| cur_len += len(s) + 1 | |
| if cur_len >= target_chars: | |
| paras.append(" ".join(cur)) | |
| cur, cur_len = [], 0 | |
| if cur: | |
| paras.append(" ".join(cur)) | |
| return "\n\n".join(paras) | |
| def _clean_summary(text: str) -> str: | |
| """Remove boilerplate like "Here's a summary...", "Summary:", "TL;DR:" from the top of summaries.""" | |
| import re | |
| if not text: | |
| return text | |
| lines = text.strip().splitlines() | |
| pat = re.compile(r"^(here\s*(?:is|'s)\s+(?:a|the)\s+summary.*|summary\s*:|tl;dr\s*:|overall\s*summary\s*:|in\s+summary\s*:|to\s+summarize\s*:)$", re.IGNORECASE) | |
| while lines and pat.match(lines[0].strip()): | |
| lines.pop(0) | |
| while lines and not lines[0].strip(): | |
| lines.pop(0) | |
| return "\n".join(lines).strip() | |
| def _transcribe_chunk(chunk: np.ndarray, sr: int, task: str, max_retries: int = 3) -> str: | |
| """Transcribe a single chunk with retries and simple backoff.""" | |
| delay = 2.0 | |
| for attempt in range(max_retries): | |
| try: | |
| out = pipe({"array": chunk, "sampling_rate": sr}, batch_size=1, generate_kwargs={"task": task}) | |
| return out["text"] | |
| except Exception: | |
| if attempt == max_retries - 1: | |
| raise | |
| time.sleep(delay) | |
| delay *= 1.8 | |
| def _robust_transcribe_array(audio_array: np.ndarray, sr: int, task: str) -> str: | |
| """Transcribe long/large audio by chunking sequentially to minimize GPU memory. | |
| Uses conservative chunking (20s) with 2s overlap, batch_size=1. | |
| """ | |
| if audio_array.ndim > 1: | |
| audio_array = np.mean(audio_array, axis=1) | |
| chunk_s = 20 | |
| overlap_s = 2 | |
| step = int((chunk_s - overlap_s) * sr) | |
| win = int(chunk_s * sr) | |
| texts = [] | |
| if len(audio_array) <= win: | |
| return _format_transcript(_transcribe_chunk(audio_array, sr, task)) | |
| start = 0 | |
| while start < len(audio_array): | |
| end = min(start + win, len(audio_array)) | |
| chunk = audio_array[start:end] | |
| txt = _transcribe_chunk(chunk, sr, task) | |
| texts.append(txt) | |
| if end == len(audio_array): | |
| break | |
| start += step | |
| return _format_transcript(_concat_text(texts)) | |
| def _robust_transcribe_array_stream(audio_array: np.ndarray, sr: int, task: str): | |
| """Generator: yields cumulative transcription after each chunk.""" | |
| if audio_array.ndim > 1: | |
| audio_array = np.mean(audio_array, axis=1) | |
| chunk_s = 20 | |
| overlap_s = 2 | |
| step = int((chunk_s - overlap_s) * sr) | |
| win = int(chunk_s * sr) | |
| texts = [] | |
| if len(audio_array) <= win: | |
| texts.append(_transcribe_chunk(audio_array, sr, task)) | |
| yield _format_transcript(_concat_text(texts)) | |
| return | |
| start = 0 | |
| while start < len(audio_array): | |
| end = min(start + win, len(audio_array)) | |
| chunk = audio_array[start:end] | |
| txt = _transcribe_chunk(chunk, sr, task) | |
| texts.append(txt) | |
| yield _format_transcript(_concat_text(texts)) | |
| if end == len(audio_array): | |
| break | |
| start += step | |
| def _robust_transcribe_path(path: str, task: str) -> str: | |
| sr = pipe.feature_extractor.sampling_rate | |
| # ffmpeg_read expects raw bytes, not a file path | |
| with open(path, "rb") as _f: | |
| payload = _f.read() | |
| audio = ffmpeg_read(payload, sr) | |
| try: | |
| return _robust_transcribe_array(audio, sr, task) | |
| except Exception as e: | |
| # last-chance: shrink chunk and retry small windows | |
| try: | |
| small_chunk = 10 | |
| step = int(8 * sr) | |
| win = int(small_chunk * sr) | |
| texts = [] | |
| pos = 0 | |
| while pos < len(audio): | |
| sub = audio[pos:pos+win] | |
| out = pipe({"array": sub, "sampling_rate": sr}, batch_size=1, generate_kwargs={"task": task}) | |
| texts.append(out["text"]) | |
| if pos + win >= len(audio): | |
| break | |
| pos += step | |
| return _concat_text(texts) | |
| except Exception as e2: | |
| raise gr.Error(f"Transcription failed after retries: {e2}") | |
| def transcribe(inputs, task, summarize=False): | |
| if inputs is None: | |
| raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
| try: | |
| if isinstance(inputs, str): | |
| text = _robust_transcribe_path(inputs, task) | |
| elif isinstance(inputs, dict) and "array" in inputs: | |
| text = _robust_transcribe_array(inputs["array"], inputs.get("sampling_rate", pipe.feature_extractor.sampling_rate), task) | |
| else: | |
| text = pipe(inputs, batch_size=1, generate_kwargs={"task": task})["text"] | |
| text = _format_transcript(text) | |
| except Exception as e: | |
| raise gr.Error(f"Transcription failed: {e}") | |
| summary = "" | |
| if summarize: | |
| try: | |
| summary = summarize_with_gemini(text) | |
| except Exception as e: | |
| summary = f"Summary error: {e}" | |
| tf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) | |
| tf.write(text) | |
| tf.close() | |
| sf_path = None | |
| if summary: | |
| sf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) | |
| sf.write(summary) | |
| sf.close() | |
| sf_path = sf.name | |
| return text, summary, tf.name, sf_path | |
| def _return_yt_html_embed(yt_url): | |
| video_id = yt_url.split("?v=")[-1] | |
| HTML_str = ( | |
| f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
| " </center>" | |
| ) | |
| return HTML_str | |
| def download_yt_audio(yt_url, filename, cookies_txt: str | None = None): | |
| info_loader = youtube_dl.YoutubeDL() | |
| try: | |
| info = info_loader.extract_info(yt_url, download=False) | |
| except youtube_dl.utils.DownloadError as err: | |
| raise gr.Error(str(err)) | |
| file_length = info["duration_string"] | |
| file_h_m_s = file_length.split(":") | |
| file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
| if len(file_h_m_s) == 1: | |
| file_h_m_s.insert(0, 0) | |
| if len(file_h_m_s) == 2: | |
| file_h_m_s.insert(0, 0) | |
| file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
| if file_length_s > YT_LENGTH_LIMIT_S: | |
| yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
| file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
| raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.") | |
| ydl_opts = { | |
| "outtmpl": filename, | |
| "format": "bestaudio/best", | |
| "quiet": True, | |
| "noplaylist": True, | |
| "retries": 3, | |
| } | |
| cookie_path = None | |
| if cookies_txt and cookies_txt.strip(): | |
| tf = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) | |
| tf.write(cookies_txt) | |
| tf.close() | |
| cookie_path = tf.name | |
| ydl_opts["cookiefile"] = cookie_path | |
| try: | |
| with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
| try: | |
| ydl.download([yt_url]) | |
| except youtube_dl.utils.ExtractorError as err: | |
| raise gr.Error(str(err)) | |
| finally: | |
| if cookie_path and os.path.exists(cookie_path): | |
| os.unlink(cookie_path) | |
| def yt_transcribe(yt_url, task, summarize=False, cookies_txt=None, max_filesize=75.0): | |
| html_embed_str = _return_yt_html_embed(yt_url) | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| filepath = os.path.join(tmpdirname, "video.mp4") | |
| try: | |
| download_yt_audio(yt_url, filepath, cookies_txt=cookies_txt) | |
| except gr.Error as e: | |
| raise gr.Error(str(e) + "\n\nTip: Provide exported YouTube cookies (Netscape format) in the optional cookies box if the video requires sign-in or captcha.") | |
| with open(filepath, "rb") as f: | |
| inputs = f.read() | |
| inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
| inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
| try: | |
| text = _robust_transcribe_array(inputs["array"], inputs["sampling_rate"], task) | |
| except Exception as e: | |
| raise gr.Error(f"Transcription failed: {e}") | |
| summary = "" | |
| if summarize: | |
| try: | |
| summary = summarize_with_gemini(text) | |
| except Exception as e: | |
| summary = f"Summary error: {e}" | |
| # Create download files | |
| tf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) | |
| tf.write(_format_transcript(text)) | |
| tf.close() | |
| sf_path = None | |
| if summary: | |
| sf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) | |
| sf.write(summary) | |
| sf.close() | |
| sf_path = sf.name | |
| return html_embed_str, text, summary, tf.name, sf_path | |
| demo = gr.Blocks() | |
| mf_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources="microphone", type="filepath"), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Checkbox(label="Summarize with Gemini", value=False), | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Transcription"), | |
| gr.Textbox(label="Summary"), | |
| gr.File(label="Download transcription (transcribe.txt)"), | |
| gr.File(label="Download summary (summarise.txt)") | |
| ], | |
| title="Whisper Large V3: Microphone", | |
| description=( | |
| "Transcribe long-form microphone or audio inputs." | |
| ), | |
| flagging_mode="never", | |
| ) | |
| file_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Checkbox(label="Summarize with Gemini", value=False), | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Transcription"), | |
| gr.Textbox(label="Summary"), | |
| gr.File(label="Download transcription (transcribe.txt)"), | |
| gr.File(label="Download summary (summarise.txt)") | |
| ], | |
| title="Whisper Large V3: Audio file", | |
| description=( | |
| "Transcribe long-form microphone or audio inputs." | |
| ), | |
| flagging_mode="never", | |
| ) | |
| yt_transcribe = gr.Interface( | |
| fn=yt_transcribe, | |
| inputs=[ | |
| gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Checkbox(label="Summarize with Gemini", value=False), | |
| gr.Textbox(lines=4, placeholder="Optional: paste exported YouTube cookies in Netscape format here if the video requires sign-in.", label="YouTube cookies (optional)"), | |
| ], | |
| outputs=[ | |
| "html", | |
| gr.Textbox(label="Transcription"), | |
| gr.Textbox(label="Summary"), | |
| gr.File(label="Download transcription (transcribe.txt)"), | |
| gr.File(label="Download summary (summarise.txt)") | |
| ], | |
| title="Whisper Large V3: Transcribe YouTube", | |
| description=( | |
| "Transcribe long-form YouTube videos." | |
| ), | |
| flagging_mode="never", | |
| ) | |
| with demo: | |
| gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"]) | |
| # ---------------- Gemini setup (flash-lite only) ----------------- | |
| GEMINI_API_KEYS = [ | |
| os.getenv("GEMINI_API_1"), | |
| os.getenv("GEMINI_API_2"), | |
| os.getenv("GEMINI_API_3"), | |
| os.getenv("GEMINI_API_4"), | |
| os.getenv("GEMINI_API_5"), | |
| ] | |
| GEMINI_API_KEYS = [k for k in GEMINI_API_KEYS if k] | |
| _gem_idx = 0 | |
| def _gem_model(): | |
| global _gem_idx | |
| if not GEMINI_API_KEYS: | |
| return None | |
| api_key = GEMINI_API_KEYS[_gem_idx] | |
| _gem_idx = (_gem_idx + 1) % len(GEMINI_API_KEYS) | |
| genai.configure(api_key=api_key) | |
| try: | |
| return genai.GenerativeModel("gemini-2.5-flash-lite") | |
| except Exception: | |
| return genai.GenerativeModel("gemini-2.5-flash") | |
| def _count_tokens_safe(text: str) -> int: | |
| try: | |
| return genai.count_tokens(text).total_tokens # type: ignore[attr-defined] | |
| except Exception: | |
| return max(1, len(text) // 4) | |
| def summarize_with_gemini(text: str) -> str: | |
| if not text or not text.strip(): | |
| return "" | |
| model = _gem_model() | |
| if model is None: | |
| return "" | |
| max_chunk_tokens = 6000 | |
| if _count_tokens_safe(text) <= max_chunk_tokens: | |
| prompt = ( | |
| "You are an expert content summarizer. Preserve key information and decisions, " | |
| "remove filler and smalltalk. Produce a clear, well-structured summary.\n\n" + text | |
| ) | |
| resp = model.generate_content(prompt) | |
| raw = getattr(resp, "text", "") or "" | |
| return _clean_summary(raw) | |
| import re | |
| segs = re.split(r"(\n\n+|\.\s+)", text) | |
| chunks, cur, cur_tok = [], [], 0 | |
| for s in segs: | |
| t = _count_tokens_safe(s) | |
| if cur_tok + t > max_chunk_tokens and cur: | |
| chunks.append("".join(cur)) | |
| cur, cur_tok = [s], t | |
| else: | |
| cur.append(s) | |
| cur_tok += t | |
| if cur: | |
| chunks.append("".join(cur)) | |
| parts = [] | |
| for ch in chunks: | |
| prompt = ( | |
| "You are an expert content summarizer. Preserve key information and decisions, " | |
| "remove filler and smalltalk. Produce a clear, well-structured summary.\n\n" + ch | |
| ) | |
| m = _gem_model() | |
| if m is None: | |
| continue | |
| r = m.generate_content(prompt) | |
| parts.append(_clean_summary(getattr(r, "text", "") or "")) | |
| combined = "\n\n".join([p for p in parts if p]) | |
| if _count_tokens_safe(combined) > max_chunk_tokens: | |
| m2 = _gem_model() | |
| if m2 is not None: | |
| r2 = m2.generate_content( | |
| "Tighten the following combined summaries without losing key points:\n\n" + combined | |
| ) | |
| combined = _clean_summary(getattr(r2, "text", "") or combined) | |
| return combined | |
| demo.queue().launch() | |