Spaces:
Runtime error
Runtime error
File size: 7,276 Bytes
8f3321a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | import os
import re
import asyncio
import tempfile
from typing import Tuple, Optional
import gradio as gr
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript
import yt_dlp
from openai import OpenAI
from langcodes import Language
import edge_tts
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
# ---------------------------
# Helpers
# ---------------------------
def extract_video_id(url_or_id: str) -> Optional[str]:
"""Accepts a full YouTube URL or bare ID and returns the 11-char video id."""
if not url_or_id:
return None
if re.fullmatch(r"[A-Za-z0-9_-]{11}", url_or_id):
return url_or_id
patterns = [
r"(?:v=|/v/|youtu\.be/|/embed/)([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, url_or_id)
if m:
return m.group(1)
return None
def get_transcript_text(video_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Return (transcript_text, source_lang) if available via YouTube captions."""
try:
listing = YouTubeTranscriptApi.list_transcripts(video_id)
preferred = None
try:
preferred = listing.find_manually_created_transcript(listing._manually_created_transcripts.keys())
except Exception:
pass
if not preferred:
try:
preferred = listing.find_generated_transcript(listing._generated_transcripts.keys())
except Exception:
pass
if not preferred:
all_trans = list(listing)
if not all_trans:
return None, None
preferred = all_trans[0]
parts = preferred.fetch()
text = "\n".join(p["text"].strip() for p in parts if p.get("text"))
src_lang = preferred.language_code or ""
return text, src_lang
except (TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript):
return None, None
except Exception:
return None, None
def download_audio(video_id: str) -> str:
tmpdir = tempfile.mkdtemp()
outfile = os.path.join(tmpdir, f"{video_id}.mp3")
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": os.path.join(tmpdir, f"%(id)s.%(ext)s"),
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
"quiet": True,
"noprogress": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([f"https://www.youtube.com/watch?v={video_id}"])
return outfile
def transcribe_with_openai(mp3_path: str) -> str:
client = OpenAI(api_key=OPENAI_API_KEY)
with open(mp3_path, "rb") as f:
transcript = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=f,
response_format="text",
)
return str(transcript)
def openai_translate(text: str, target_lang_code: str) -> str:
client = OpenAI(api_key=OPENAI_API_KEY)
try:
lang_name = Language.make(target_lang_code).display_name("en")
except Exception:
lang_name = target_lang_code
sys_prompt = (
"You are a professional media translator. Translate the user's transcript into the target language. "
"Keep meaning, style, and names. Remove timestamps. Output ONLY the translation without commentary."
)
msg = [
{"role": "system", "content": sys_prompt},
{
"role": "user",
"content": f"Target language: {lang_name} ({target_lang_code}).\n\nTranscript to translate:\n{text}",
},
]
resp = client.chat.completions.create(model="gpt-4o-mini", messages=msg, temperature=0.2)
return resp.choices[0].message.content.strip()
VOICE_MAP = {
"en": "en-US-JennyNeural",
"es": "es-ES-ElviraNeural",
"fr": "fr-FR-DeniseNeural",
"de": "de-DE-KatjaNeural",
"hi": "hi-IN-SwaraNeural",
"ar": "ar-EG-SalmaNeural",
"pt": "pt-BR-FranciscaNeural",
"ru": "ru-RU-SvetlanaNeural",
"ja": "ja-JP-NanamiNeural",
"ko": "ko-KR-SunHiNeural",
"zh": "zh-CN-XiaoxiaoNeural",
"ha": "en-US-JennyNeural",
}
async def synthesize_edge_tts(text: str, lang_code: str) -> str:
voice = VOICE_MAP.get(lang_code.split("-")[0], VOICE_MAP["en"])
tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
tmp.close()
communicate = edge_tts.Communicate(text, voice=voice)
with open(tmp.name, "wb") as f:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
f.write(chunk["data"])
return tmp.name
def process(url_or_id: str, target_lang: str, do_tts: bool):
vid = extract_video_id(url_or_id)
if not vid:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), "Invalid YouTube ID/URL."
text, src_lang = get_transcript_text(vid)
if (not text or len(text) < 20) and OPENAI_API_KEY:
try:
mp3 = download_audio(vid)
text = transcribe_with_openai(mp3)
except Exception as e:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Transcription failed: {e}"
elif not text:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), (
"No captions found and no OPENAI_API_KEY provided."
)
try:
translated = openai_translate(text, target_lang) if OPENAI_API_KEY else text
warn = "" if OPENAI_API_KEY else "No translation (API key missing)."
except Exception as e:
return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Translation failed: {e}"
audio_path = None
if do_tts and translated:
try:
audio_path = asyncio.run(synthesize_edge_tts(translated, target_lang))
except RuntimeError:
loop = asyncio.get_event_loop()
audio_path = loop.run_until_complete(synthesize_edge_tts(translated, target_lang))
return gr.update(value=translated, visible=True), gr.update(value=audio_path, visible=do_tts), warn
with gr.Blocks(theme=gr.themes.Soft(), title="YouTube Translator & Speaker") as demo:
gr.Markdown("# YouTube Translator and Speaker")
with gr.Row():
with gr.Column(scale=1):
video_in = gr.Textbox(label="YouTube URL or Video ID")
lang = gr.Dropdown(label="Target Language", choices=["en","es","fr","de","hi","ar","pt","ru","ja","ko","zh","ha"], value="en")
do_tts = gr.Checkbox(label="Generate Speech", value=True)
submit = gr.Button("Submit", variant="primary")
clear = gr.Button("Clear")
with gr.Column(scale=1):
out_text = gr.Textbox(label="Translated Text", lines=14)
out_audio = gr.Audio(label="Speech (MP3)", type="filepath")
status = gr.Markdown(visible=True)
submit.click(fn=process, inputs=[video_in, lang, do_tts], outputs=[out_text, out_audio, status])
clear.click(lambda: ("", None, ""), outputs=[out_text, out_audio, status])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)
|