Obese_Kitty / app.py
DARKWICK's picture
Upload 4 files
8f3321a verified
import os
import re
import asyncio
import tempfile
from typing import Tuple, Optional
import gradio as gr
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript
import yt_dlp
from openai import OpenAI
from langcodes import Language
import edge_tts
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
# ---------------------------
# Helpers
# ---------------------------
def extract_video_id(url_or_id: str) -> Optional[str]:
"""Accepts a full YouTube URL or bare ID and returns the 11-char video id."""
if not url_or_id:
return None
if re.fullmatch(r"[A-Za-z0-9_-]{11}", url_or_id):
return url_or_id
patterns = [
r"(?:v=|/v/|youtu\.be/|/embed/)([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, url_or_id)
if m:
return m.group(1)
return None
def get_transcript_text(video_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Return (transcript_text, source_lang) if available via YouTube captions."""
try:
listing = YouTubeTranscriptApi.list_transcripts(video_id)
preferred = None
try:
preferred = listing.find_manually_created_transcript(listing._manually_created_transcripts.keys())
except Exception:
pass
if not preferred:
try:
preferred = listing.find_generated_transcript(listing._generated_transcripts.keys())
except Exception:
pass
if not preferred:
all_trans = list(listing)
if not all_trans:
return None, None
preferred = all_trans[0]
parts = preferred.fetch()
text = "\n".join(p["text"].strip() for p in parts if p.get("text"))
src_lang = preferred.language_code or ""
return text, src_lang
except (TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript):
return None, None
except Exception:
return None, None
def download_audio(video_id: str) -> str:
tmpdir = tempfile.mkdtemp()
outfile = os.path.join(tmpdir, f"{video_id}.mp3")
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": os.path.join(tmpdir, f"%(id)s.%(ext)s"),
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
"quiet": True,
"noprogress": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"https://www.youtube.com/watch?v={video_id}"])
return outfile
def transcribe_with_openai(mp3_path: str) -> str:
client = OpenAI(api_key=OPENAI_API_KEY)
with open(mp3_path, "rb") as f:
transcript = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=f,
response_format="text",
)
return str(transcript)
def openai_translate(text: str, target_lang_code: str) -> str:
client = OpenAI(api_key=OPENAI_API_KEY)
try:
lang_name = Language.make(target_lang_code).display_name("en")
except Exception:
lang_name = target_lang_code
sys_prompt = (
"You are a professional media translator. Translate the user's transcript into the target language. "
"Keep meaning, style, and names. Remove timestamps. Output ONLY the translation without commentary."
)
msg = [
{"role": "system", "content": sys_prompt},
{
"role": "user",
"content": f"Target language: {lang_name} ({target_lang_code}).\n\nTranscript to translate:\n{text}",
},
]
resp = client.chat.completions.create(model="gpt-4o-mini", messages=msg, temperature=0.2)
return resp.choices[0].message.content.strip()
VOICE_MAP = {
"en": "en-US-JennyNeural",
"es": "es-ES-ElviraNeural",
"fr": "fr-FR-DeniseNeural",
"de": "de-DE-KatjaNeural",
"hi": "hi-IN-SwaraNeural",
"ar": "ar-EG-SalmaNeural",
"pt": "pt-BR-FranciscaNeural",
"ru": "ru-RU-SvetlanaNeural",
"ja": "ja-JP-NanamiNeural",
"ko": "ko-KR-SunHiNeural",
"zh": "zh-CN-XiaoxiaoNeural",
"ha": "en-US-JennyNeural",
}
async def synthesize_edge_tts(text: str, lang_code: str) -> str:
voice = VOICE_MAP.get(lang_code.split("-")[0], VOICE_MAP["en"])
tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
tmp.close()
communicate = edge_tts.Communicate(text, voice=voice)
with open(tmp.name, "wb") as f:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
f.write(chunk["data"])
return tmp.name
def process(url_or_id: str, target_lang: str, do_tts: bool):
vid = extract_video_id(url_or_id)
if not vid:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), "Invalid YouTube ID/URL."
text, src_lang = get_transcript_text(vid)
if (not text or len(text) < 20) and OPENAI_API_KEY:
try:
mp3 = download_audio(vid)
text = transcribe_with_openai(mp3)
except Exception as e:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Transcription failed: {e}"
elif not text:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), (
"No captions found and no OPENAI_API_KEY provided."
)
try:
translated = openai_translate(text, target_lang) if OPENAI_API_KEY else text
warn = "" if OPENAI_API_KEY else "No translation (API key missing)."
except Exception as e:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Translation failed: {e}"
audio_path = None
if do_tts and translated:
try:
audio_path = asyncio.run(synthesize_edge_tts(translated, target_lang))
except RuntimeError:
loop = asyncio.get_event_loop()
audio_path = loop.run_until_complete(synthesize_edge_tts(translated, target_lang))
return gr.update(value=translated, visible=True), gr.update(value=audio_path, visible=do_tts), warn
with gr.Blocks(theme=gr.themes.Soft(), title="YouTube Translator & Speaker") as demo:
gr.Markdown("# YouTube Translator and Speaker")
with gr.Row():
with gr.Column(scale=1):
video_in = gr.Textbox(label="YouTube URL or Video ID")
lang = gr.Dropdown(label="Target Language", choices=["en","es","fr","de","hi","ar","pt","ru","ja","ko","zh","ha"], value="en")
do_tts = gr.Checkbox(label="Generate Speech", value=True)
submit = gr.Button("Submit", variant="primary")
clear = gr.Button("Clear")
with gr.Column(scale=1):
out_text = gr.Textbox(label="Translated Text", lines=14)
out_audio = gr.Audio(label="Speech (MP3)", type="filepath")
status = gr.Markdown(visible=True)
submit.click(fn=process, inputs=[video_in, lang, do_tts], outputs=[out_text, out_audio, status])
clear.click(lambda: ("", None, ""), outputs=[out_text, out_audio, status])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)