Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import asyncio | |
| import tempfile | |
| from typing import Tuple, Optional | |
| import gradio as gr | |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript | |
| import yt_dlp | |
| from openai import OpenAI | |
| from langcodes import Language | |
| import edge_tts | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| # --------------------------- | |
| # Helpers | |
| # --------------------------- | |
| def extract_video_id(url_or_id: str) -> Optional[str]: | |
| """Accepts a full YouTube URL or bare ID and returns the 11-char video id.""" | |
| if not url_or_id: | |
| return None | |
| if re.fullmatch(r"[A-Za-z0-9_-]{11}", url_or_id): | |
| return url_or_id | |
| patterns = [ | |
| r"(?:v=|/v/|youtu\.be/|/embed/)([A-Za-z0-9_-]{11})", | |
| ] | |
| for p in patterns: | |
| m = re.search(p, url_or_id) | |
| if m: | |
| return m.group(1) | |
| return None | |
| def get_transcript_text(video_id: str) -> Tuple[Optional[str], Optional[str]]: | |
| """Return (transcript_text, source_lang) if available via YouTube captions.""" | |
| try: | |
| listing = YouTubeTranscriptApi.list_transcripts(video_id) | |
| preferred = None | |
| try: | |
| preferred = listing.find_manually_created_transcript(listing._manually_created_transcripts.keys()) | |
| except Exception: | |
| pass | |
| if not preferred: | |
| try: | |
| preferred = listing.find_generated_transcript(listing._generated_transcripts.keys()) | |
| except Exception: | |
| pass | |
| if not preferred: | |
| all_trans = list(listing) | |
| if not all_trans: | |
| return None, None | |
| preferred = all_trans[0] | |
| parts = preferred.fetch() | |
| text = "\n".join(p["text"].strip() for p in parts if p.get("text")) | |
| src_lang = preferred.language_code or "" | |
| return text, src_lang | |
| except (TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript): | |
| return None, None | |
| except Exception: | |
| return None, None | |
| def download_audio(video_id: str) -> str: | |
| tmpdir = tempfile.mkdtemp() | |
| outfile = os.path.join(tmpdir, f"{video_id}.mp3") | |
| ydl_opts = { | |
| "format": "bestaudio/best", | |
| "outtmpl": os.path.join(tmpdir, f"%(id)s.%(ext)s"), | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "mp3", | |
| "preferredquality": "192", | |
| } | |
| ], | |
| "quiet": True, | |
| "noprogress": True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([f"https://www.youtube.com/watch?v={video_id}"]) | |
| return outfile | |
| def transcribe_with_openai(mp3_path: str) -> str: | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| with open(mp3_path, "rb") as f: | |
| transcript = client.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", | |
| file=f, | |
| response_format="text", | |
| ) | |
| return str(transcript) | |
| def openai_translate(text: str, target_lang_code: str) -> str: | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| try: | |
| lang_name = Language.make(target_lang_code).display_name("en") | |
| except Exception: | |
| lang_name = target_lang_code | |
| sys_prompt = ( | |
| "You are a professional media translator. Translate the user's transcript into the target language. " | |
| "Keep meaning, style, and names. Remove timestamps. Output ONLY the translation without commentary." | |
| ) | |
| msg = [ | |
| {"role": "system", "content": sys_prompt}, | |
| { | |
| "role": "user", | |
| "content": f"Target language: {lang_name} ({target_lang_code}).\n\nTranscript to translate:\n{text}", | |
| }, | |
| ] | |
| resp = client.chat.completions.create(model="gpt-4o-mini", messages=msg, temperature=0.2) | |
| return resp.choices[0].message.content.strip() | |
| VOICE_MAP = { | |
| "en": "en-US-JennyNeural", | |
| "es": "es-ES-ElviraNeural", | |
| "fr": "fr-FR-DeniseNeural", | |
| "de": "de-DE-KatjaNeural", | |
| "hi": "hi-IN-SwaraNeural", | |
| "ar": "ar-EG-SalmaNeural", | |
| "pt": "pt-BR-FranciscaNeural", | |
| "ru": "ru-RU-SvetlanaNeural", | |
| "ja": "ja-JP-NanamiNeural", | |
| "ko": "ko-KR-SunHiNeural", | |
| "zh": "zh-CN-XiaoxiaoNeural", | |
| "ha": "en-US-JennyNeural", | |
| } | |
| async def synthesize_edge_tts(text: str, lang_code: str) -> str: | |
| voice = VOICE_MAP.get(lang_code.split("-")[0], VOICE_MAP["en"]) | |
| tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
| tmp.close() | |
| communicate = edge_tts.Communicate(text, voice=voice) | |
| with open(tmp.name, "wb") as f: | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| f.write(chunk["data"]) | |
| return tmp.name | |
| def process(url_or_id: str, target_lang: str, do_tts: bool): | |
| vid = extract_video_id(url_or_id) | |
| if not vid: | |
| return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), "Invalid YouTube ID/URL." | |
| text, src_lang = get_transcript_text(vid) | |
| if (not text or len(text) < 20) and OPENAI_API_KEY: | |
| try: | |
| mp3 = download_audio(vid) | |
| text = transcribe_with_openai(mp3) | |
| except Exception as e: | |
| return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Transcription failed: {e}" | |
| elif not text: | |
| return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), ( | |
| "No captions found and no OPENAI_API_KEY provided." | |
| ) | |
| try: | |
| translated = openai_translate(text, target_lang) if OPENAI_API_KEY else text | |
| warn = "" if OPENAI_API_KEY else "No translation (API key missing)." | |
| except Exception as e: | |
| return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Translation failed: {e}" | |
| audio_path = None | |
| if do_tts and translated: | |
| try: | |
| audio_path = asyncio.run(synthesize_edge_tts(translated, target_lang)) | |
| except RuntimeError: | |
| loop = asyncio.get_event_loop() | |
| audio_path = loop.run_until_complete(synthesize_edge_tts(translated, target_lang)) | |
| return gr.update(value=translated, visible=True), gr.update(value=audio_path, visible=do_tts), warn | |
| with gr.Blocks(theme=gr.themes.Soft(), title="YouTube Translator & Speaker") as demo: | |
| gr.Markdown("# YouTube Translator and Speaker") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_in = gr.Textbox(label="YouTube URL or Video ID") | |
| lang = gr.Dropdown(label="Target Language", choices=["en","es","fr","de","hi","ar","pt","ru","ja","ko","zh","ha"], value="en") | |
| do_tts = gr.Checkbox(label="Generate Speech", value=True) | |
| submit = gr.Button("Submit", variant="primary") | |
| clear = gr.Button("Clear") | |
| with gr.Column(scale=1): | |
| out_text = gr.Textbox(label="Translated Text", lines=14) | |
| out_audio = gr.Audio(label="Speech (MP3)", type="filepath") | |
| status = gr.Markdown(visible=True) | |
| submit.click(fn=process, inputs=[video_in, lang, do_tts], outputs=[out_text, out_audio, status]) | |
| clear.click(lambda: ("", None, ""), outputs=[out_text, out_audio, status]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |