import spaces
import torch
import gradio as gr
import yt_dlp as youtube_dl
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
import tempfile
import os
import time
import numpy as np
import google.generativeai as genai
from dotenv import load_dotenv
load_dotenv()
MODEL_NAME = "openai/whisper-large-v3-turbo"
BATCH_SIZE = 8
FILE_LIMIT_MB = 5000 # 5GB
YT_LENGTH_LIMIT_S = 7200 # 2 hours
device = 0 if torch.cuda.is_available() else "cpu"
pipe = pipeline(
task="automatic-speech-recognition",
model=MODEL_NAME,
device=device,
ignore_warning=True,
model_kwargs={"torch_dtype": torch.float16} if torch.cuda.is_available() else {},
chunk_length_s=20, # small chunks to fit ZeroGPU
)
def _concat_text(chunks):
return " ".join([c.strip() for c in chunks if c and c.strip()])
def _format_transcript(text: str, target_chars: int = 280) -> str:
"""Format raw transcript into readable paragraphs.
- Splits into sentences on punctuation boundaries.
- Groups sentences into paragraphs targeting ~target_chars.
- Normalizes whitespace; ensures blank lines between paragraphs.
"""
import re
if not text:
return text
# Normalize spaces
t = re.sub(r"\s+", " ", text).strip()
# Split on sentence boundaries while keeping delimiters
parts = re.split(r"(?<=[\.!?])\s+", t)
paras, cur, cur_len = [], [], 0
for s in parts:
if not s:
continue
cur.append(s)
cur_len += len(s) + 1
if cur_len >= target_chars:
paras.append(" ".join(cur))
cur, cur_len = [], 0
if cur:
paras.append(" ".join(cur))
return "\n\n".join(paras)
def _clean_summary(text: str) -> str:
"""Remove boilerplate like "Here's a summary...", "Summary:", "TL;DR:" from the top of summaries."""
import re
if not text:
return text
lines = text.strip().splitlines()
pat = re.compile(r"^(here\s*(?:is|'s)\s+(?:a|the)\s+summary.*|summary\s*:|tl;dr\s*:|overall\s*summary\s*:|in\s+summary\s*:|to\s+summarize\s*:)$", re.IGNORECASE)
while lines and pat.match(lines[0].strip()):
lines.pop(0)
while lines and not lines[0].strip():
lines.pop(0)
return "\n".join(lines).strip()
def _transcribe_chunk(chunk: np.ndarray, sr: int, task: str, max_retries: int = 3) -> str:
"""Transcribe a single chunk with retries and simple backoff."""
delay = 2.0
for attempt in range(max_retries):
try:
out = pipe({"array": chunk, "sampling_rate": sr}, batch_size=1, generate_kwargs={"task": task})
return out["text"]
except Exception:
if attempt == max_retries - 1:
raise
time.sleep(delay)
delay *= 1.8
def _robust_transcribe_array(audio_array: np.ndarray, sr: int, task: str) -> str:
"""Transcribe long/large audio by chunking sequentially to minimize GPU memory.
Uses conservative chunking (20s) with 2s overlap, batch_size=1.
"""
if audio_array.ndim > 1:
audio_array = np.mean(audio_array, axis=1)
chunk_s = 20
overlap_s = 2
step = int((chunk_s - overlap_s) * sr)
win = int(chunk_s * sr)
texts = []
if len(audio_array) <= win:
return _format_transcript(_transcribe_chunk(audio_array, sr, task))
start = 0
while start < len(audio_array):
end = min(start + win, len(audio_array))
chunk = audio_array[start:end]
txt = _transcribe_chunk(chunk, sr, task)
texts.append(txt)
if end == len(audio_array):
break
start += step
return _format_transcript(_concat_text(texts))
def _robust_transcribe_array_stream(audio_array: np.ndarray, sr: int, task: str):
"""Generator: yields cumulative transcription after each chunk."""
if audio_array.ndim > 1:
audio_array = np.mean(audio_array, axis=1)
chunk_s = 20
overlap_s = 2
step = int((chunk_s - overlap_s) * sr)
win = int(chunk_s * sr)
texts = []
if len(audio_array) <= win:
texts.append(_transcribe_chunk(audio_array, sr, task))
yield _format_transcript(_concat_text(texts))
return
start = 0
while start < len(audio_array):
end = min(start + win, len(audio_array))
chunk = audio_array[start:end]
txt = _transcribe_chunk(chunk, sr, task)
texts.append(txt)
yield _format_transcript(_concat_text(texts))
if end == len(audio_array):
break
start += step
def _robust_transcribe_path(path: str, task: str) -> str:
sr = pipe.feature_extractor.sampling_rate
# ffmpeg_read expects raw bytes, not a file path
with open(path, "rb") as _f:
payload = _f.read()
audio = ffmpeg_read(payload, sr)
try:
return _robust_transcribe_array(audio, sr, task)
except Exception as e:
# last-chance: shrink chunk and retry small windows
try:
small_chunk = 10
step = int(8 * sr)
win = int(small_chunk * sr)
texts = []
pos = 0
while pos < len(audio):
sub = audio[pos:pos+win]
out = pipe({"array": sub, "sampling_rate": sr}, batch_size=1, generate_kwargs={"task": task})
texts.append(out["text"])
if pos + win >= len(audio):
break
pos += step
return _concat_text(texts)
except Exception as e2:
raise gr.Error(f"Transcription failed after retries: {e2}")
@spaces.GPU(duration=120)
def transcribe(inputs, task, summarize=False):
if inputs is None:
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
try:
if isinstance(inputs, str):
text = _robust_transcribe_path(inputs, task)
elif isinstance(inputs, dict) and "array" in inputs:
text = _robust_transcribe_array(inputs["array"], inputs.get("sampling_rate", pipe.feature_extractor.sampling_rate), task)
else:
text = pipe(inputs, batch_size=1, generate_kwargs={"task": task})["text"]
text = _format_transcript(text)
except Exception as e:
raise gr.Error(f"Transcription failed: {e}")
summary = ""
if summarize:
try:
summary = summarize_with_gemini(text)
except Exception as e:
summary = f"Summary error: {e}"
tf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
tf.write(text)
tf.close()
sf_path = None
if summary:
sf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
sf.write(summary)
sf.close()
sf_path = sf.name
return text, summary, tf.name, sf_path
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
HTML_str = (
f'
'
" "
)
return HTML_str
def download_yt_audio(yt_url, filename, cookies_txt: str | None = None):
info_loader = youtube_dl.YoutubeDL()
try:
info = info_loader.extract_info(yt_url, download=False)
except youtube_dl.utils.DownloadError as err:
raise gr.Error(str(err))
file_length = info["duration_string"]
file_h_m_s = file_length.split(":")
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s]
if len(file_h_m_s) == 1:
file_h_m_s.insert(0, 0)
if len(file_h_m_s) == 2:
file_h_m_s.insert(0, 0)
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2]
if file_length_s > YT_LENGTH_LIMIT_S:
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S))
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s))
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.")
ydl_opts = {
"outtmpl": filename,
"format": "bestaudio/best",
"quiet": True,
"noplaylist": True,
"retries": 3,
}
cookie_path = None
if cookies_txt and cookies_txt.strip():
tf = tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False)
tf.write(cookies_txt)
tf.close()
cookie_path = tf.name
ydl_opts["cookiefile"] = cookie_path
try:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([yt_url])
except youtube_dl.utils.ExtractorError as err:
raise gr.Error(str(err))
finally:
if cookie_path and os.path.exists(cookie_path):
os.unlink(cookie_path)
@spaces.GPU(duration=120)
def yt_transcribe(yt_url, task, summarize=False, cookies_txt=None, max_filesize=75.0):
html_embed_str = _return_yt_html_embed(yt_url)
with tempfile.TemporaryDirectory() as tmpdirname:
filepath = os.path.join(tmpdirname, "video.mp4")
try:
download_yt_audio(yt_url, filepath, cookies_txt=cookies_txt)
except gr.Error as e:
raise gr.Error(str(e) + "\n\nTip: Provide exported YouTube cookies (Netscape format) in the optional cookies box if the video requires sign-in or captcha.")
with open(filepath, "rb") as f:
inputs = f.read()
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
try:
text = _robust_transcribe_array(inputs["array"], inputs["sampling_rate"], task)
except Exception as e:
raise gr.Error(f"Transcription failed: {e}")
summary = ""
if summarize:
try:
summary = summarize_with_gemini(text)
except Exception as e:
summary = f"Summary error: {e}"
# Create download files
tf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
tf.write(_format_transcript(text))
tf.close()
sf_path = None
if summary:
sf = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
sf.write(summary)
sf.close()
sf_path = sf.name
return html_embed_str, text, summary, tf.name, sf_path
demo = gr.Blocks()
mf_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="microphone", type="filepath"),
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
gr.Checkbox(label="Summarize with Gemini", value=False),
],
outputs=[
gr.Textbox(label="Transcription"),
gr.Textbox(label="Summary"),
gr.File(label="Download transcription (transcribe.txt)"),
gr.File(label="Download summary (summarise.txt)")
],
title="Whisper Large V3: Microphone",
description=(
"Transcribe long-form microphone or audio inputs."
),
flagging_mode="never",
)
file_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="upload", type="filepath", label="Audio file"),
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
gr.Checkbox(label="Summarize with Gemini", value=False),
],
outputs=[
gr.Textbox(label="Transcription"),
gr.Textbox(label="Summary"),
gr.File(label="Download transcription (transcribe.txt)"),
gr.File(label="Download summary (summarise.txt)")
],
title="Whisper Large V3: Audio file",
description=(
"Transcribe long-form microphone or audio inputs."
),
flagging_mode="never",
)
yt_transcribe = gr.Interface(
fn=yt_transcribe,
inputs=[
gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"),
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
gr.Checkbox(label="Summarize with Gemini", value=False),
gr.Textbox(lines=4, placeholder="Optional: paste exported YouTube cookies in Netscape format here if the video requires sign-in.", label="YouTube cookies (optional)"),
],
outputs=[
"html",
gr.Textbox(label="Transcription"),
gr.Textbox(label="Summary"),
gr.File(label="Download transcription (transcribe.txt)"),
gr.File(label="Download summary (summarise.txt)")
],
title="Whisper Large V3: Transcribe YouTube",
description=(
"Transcribe long-form YouTube videos."
),
flagging_mode="never",
)
with demo:
gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"])
# ---------------- Gemini setup (flash-lite only) -----------------
GEMINI_API_KEYS = [
os.getenv("GEMINI_API_1"),
os.getenv("GEMINI_API_2"),
os.getenv("GEMINI_API_3"),
os.getenv("GEMINI_API_4"),
os.getenv("GEMINI_API_5"),
]
GEMINI_API_KEYS = [k for k in GEMINI_API_KEYS if k]
_gem_idx = 0
def _gem_model():
global _gem_idx
if not GEMINI_API_KEYS:
return None
api_key = GEMINI_API_KEYS[_gem_idx]
_gem_idx = (_gem_idx + 1) % len(GEMINI_API_KEYS)
genai.configure(api_key=api_key)
try:
return genai.GenerativeModel("gemini-2.5-flash-lite")
except Exception:
return genai.GenerativeModel("gemini-2.5-flash")
def _count_tokens_safe(text: str) -> int:
try:
return genai.count_tokens(text).total_tokens # type: ignore[attr-defined]
except Exception:
return max(1, len(text) // 4)
def summarize_with_gemini(text: str) -> str:
if not text or not text.strip():
return ""
model = _gem_model()
if model is None:
return ""
max_chunk_tokens = 6000
if _count_tokens_safe(text) <= max_chunk_tokens:
prompt = (
"You are an expert content summarizer. Preserve key information and decisions, "
"remove filler and smalltalk. Produce a clear, well-structured summary.\n\n" + text
)
resp = model.generate_content(prompt)
raw = getattr(resp, "text", "") or ""
return _clean_summary(raw)
import re
segs = re.split(r"(\n\n+|\.\s+)", text)
chunks, cur, cur_tok = [], [], 0
for s in segs:
t = _count_tokens_safe(s)
if cur_tok + t > max_chunk_tokens and cur:
chunks.append("".join(cur))
cur, cur_tok = [s], t
else:
cur.append(s)
cur_tok += t
if cur:
chunks.append("".join(cur))
parts = []
for ch in chunks:
prompt = (
"You are an expert content summarizer. Preserve key information and decisions, "
"remove filler and smalltalk. Produce a clear, well-structured summary.\n\n" + ch
)
m = _gem_model()
if m is None:
continue
r = m.generate_content(prompt)
parts.append(_clean_summary(getattr(r, "text", "") or ""))
combined = "\n\n".join([p for p in parts if p])
if _count_tokens_safe(combined) > max_chunk_tokens:
m2 = _gem_model()
if m2 is not None:
r2 = m2.generate_content(
"Tighten the following combined summaries without losing key points:\n\n" + combined
)
combined = _clean_summary(getattr(r2, "text", "") or combined)
return combined
demo.queue().launch()