import os import re import asyncio import tempfile from typing import Tuple, Optional import gradio as gr from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript import yt_dlp from openai import OpenAI from langcodes import Language import edge_tts OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") # --------------------------- # Helpers # --------------------------- def extract_video_id(url_or_id: str) -> Optional[str]: """Accepts a full YouTube URL or bare ID and returns the 11-char video id.""" if not url_or_id: return None if re.fullmatch(r"[A-Za-z0-9_-]{11}", url_or_id): return url_or_id patterns = [ r"(?:v=|/v/|youtu\.be/|/embed/)([A-Za-z0-9_-]{11})", ] for p in patterns: m = re.search(p, url_or_id) if m: return m.group(1) return None def get_transcript_text(video_id: str) -> Tuple[Optional[str], Optional[str]]: """Return (transcript_text, source_lang) if available via YouTube captions.""" try: listing = YouTubeTranscriptApi.list_transcripts(video_id) preferred = None try: preferred = listing.find_manually_created_transcript(listing._manually_created_transcripts.keys()) except Exception: pass if not preferred: try: preferred = listing.find_generated_transcript(listing._generated_transcripts.keys()) except Exception: pass if not preferred: all_trans = list(listing) if not all_trans: return None, None preferred = all_trans[0] parts = preferred.fetch() text = "\n".join(p["text"].strip() for p in parts if p.get("text")) src_lang = preferred.language_code or "" return text, src_lang except (TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript): return None, None except Exception: return None, None def download_audio(video_id: str) -> str: tmpdir = tempfile.mkdtemp() outfile = os.path.join(tmpdir, f"{video_id}.mp3") ydl_opts = { "format": "bestaudio/best", "outtmpl": os.path.join(tmpdir, f"%(id)s.%(ext)s"), "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ], "quiet": True, "noprogress": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([f"https://www.youtube.com/watch?v={video_id}"]) return outfile def transcribe_with_openai(mp3_path: str) -> str: client = OpenAI(api_key=OPENAI_API_KEY) with open(mp3_path, "rb") as f: transcript = client.audio.transcriptions.create( model="gpt-4o-transcribe", file=f, response_format="text", ) return str(transcript) def openai_translate(text: str, target_lang_code: str) -> str: client = OpenAI(api_key=OPENAI_API_KEY) try: lang_name = Language.make(target_lang_code).display_name("en") except Exception: lang_name = target_lang_code sys_prompt = ( "You are a professional media translator. Translate the user's transcript into the target language. " "Keep meaning, style, and names. Remove timestamps. Output ONLY the translation without commentary." ) msg = [ {"role": "system", "content": sys_prompt}, { "role": "user", "content": f"Target language: {lang_name} ({target_lang_code}).\n\nTranscript to translate:\n{text}", }, ] resp = client.chat.completions.create(model="gpt-4o-mini", messages=msg, temperature=0.2) return resp.choices[0].message.content.strip() VOICE_MAP = { "en": "en-US-JennyNeural", "es": "es-ES-ElviraNeural", "fr": "fr-FR-DeniseNeural", "de": "de-DE-KatjaNeural", "hi": "hi-IN-SwaraNeural", "ar": "ar-EG-SalmaNeural", "pt": "pt-BR-FranciscaNeural", "ru": "ru-RU-SvetlanaNeural", "ja": "ja-JP-NanamiNeural", "ko": "ko-KR-SunHiNeural", "zh": "zh-CN-XiaoxiaoNeural", "ha": "en-US-JennyNeural", } async def synthesize_edge_tts(text: str, lang_code: str) -> str: voice = VOICE_MAP.get(lang_code.split("-")[0], VOICE_MAP["en"]) tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) tmp.close() communicate = edge_tts.Communicate(text, voice=voice) with open(tmp.name, "wb") as f: async for chunk in communicate.stream(): if chunk["type"] == "audio": f.write(chunk["data"]) return tmp.name def process(url_or_id: str, target_lang: str, do_tts: bool): vid = extract_video_id(url_or_id) if not vid: return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), "Invalid YouTube ID/URL." text, src_lang = get_transcript_text(vid) if (not text or len(text) < 20) and OPENAI_API_KEY: try: mp3 = download_audio(vid) text = transcribe_with_openai(mp3) except Exception as e: return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Transcription failed: {e}" elif not text: return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), ( "No captions found and no OPENAI_API_KEY provided." ) try: translated = openai_translate(text, target_lang) if OPENAI_API_KEY else text warn = "" if OPENAI_API_KEY else "No translation (API key missing)." except Exception as e: return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Translation failed: {e}" audio_path = None if do_tts and translated: try: audio_path = asyncio.run(synthesize_edge_tts(translated, target_lang)) except RuntimeError: loop = asyncio.get_event_loop() audio_path = loop.run_until_complete(synthesize_edge_tts(translated, target_lang)) return gr.update(value=translated, visible=True), gr.update(value=audio_path, visible=do_tts), warn with gr.Blocks(theme=gr.themes.Soft(), title="YouTube Translator & Speaker") as demo: gr.Markdown("# YouTube Translator and Speaker") with gr.Row(): with gr.Column(scale=1): video_in = gr.Textbox(label="YouTube URL or Video ID") lang = gr.Dropdown(label="Target Language", choices=["en","es","fr","de","hi","ar","pt","ru","ja","ko","zh","ha"], value="en") do_tts = gr.Checkbox(label="Generate Speech", value=True) submit = gr.Button("Submit", variant="primary") clear = gr.Button("Clear") with gr.Column(scale=1): out_text = gr.Textbox(label="Translated Text", lines=14) out_audio = gr.Audio(label="Speech (MP3)", type="filepath") status = gr.Markdown(visible=True) submit.click(fn=process, inputs=[video_in, lang, do_tts], outputs=[out_text, out_audio, status]) clear.click(lambda: ("", None, ""), outputs=[out_text, out_audio, status]) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)