File size: 7,276 Bytes
8f3321a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import re
import asyncio
import tempfile
from typing import Tuple, Optional

import gradio as gr
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript
import yt_dlp
from openai import OpenAI
from langcodes import Language
import edge_tts

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")

# ---------------------------
# Helpers
# ---------------------------

def extract_video_id(url_or_id: str) -> Optional[str]:
    """Accepts a full YouTube URL or bare ID and returns the 11-char video id."""
    if not url_or_id:
        return None
    if re.fullmatch(r"[A-Za-z0-9_-]{11}", url_or_id):
        return url_or_id
    patterns = [
        r"(?:v=|/v/|youtu\.be/|/embed/)([A-Za-z0-9_-]{11})",
    ]
    for p in patterns:
        m = re.search(p, url_or_id)
        if m:
            return m.group(1)
    return None


def get_transcript_text(video_id: str) -> Tuple[Optional[str], Optional[str]]:
    """Return (transcript_text, source_lang) if available via YouTube captions."""
    try:
        listing = YouTubeTranscriptApi.list_transcripts(video_id)
        preferred = None
        try:
            preferred = listing.find_manually_created_transcript(listing._manually_created_transcripts.keys())
        except Exception:
            pass
        if not preferred:
            try:
                preferred = listing.find_generated_transcript(listing._generated_transcripts.keys())
            except Exception:
                pass
        if not preferred:
            all_trans = list(listing)
            if not all_trans:
                return None, None
            preferred = all_trans[0]
        parts = preferred.fetch()
        text = "\n".join(p["text"].strip() for p in parts if p.get("text"))
        src_lang = preferred.language_code or ""
        return text, src_lang
    except (TranscriptsDisabled, NoTranscriptFound, CouldNotRetrieveTranscript):
        return None, None
    except Exception:
        return None, None


def download_audio(video_id: str) -> str:
    tmpdir = tempfile.mkdtemp()
    outfile = os.path.join(tmpdir, f"{video_id}.mp3")
    ydl_opts = {
        "format": "bestaudio/best",
        "outtmpl": os.path.join(tmpdir, f"%(id)s.%(ext)s"),
        "postprocessors": [
            {
                "key": "FFmpegExtractAudio",
                "preferredcodec": "mp3",
                "preferredquality": "192",
            }
        ],
        "quiet": True,
        "noprogress": True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([f"https://www.youtube.com/watch?v={video_id}"])
    return outfile


def transcribe_with_openai(mp3_path: str) -> str:
    client = OpenAI(api_key=OPENAI_API_KEY)
    with open(mp3_path, "rb") as f:
        transcript = client.audio.transcriptions.create(
            model="gpt-4o-transcribe",
            file=f,
            response_format="text",
        )
    return str(transcript)


def openai_translate(text: str, target_lang_code: str) -> str:
    client = OpenAI(api_key=OPENAI_API_KEY)
    try:
        lang_name = Language.make(target_lang_code).display_name("en")
    except Exception:
        lang_name = target_lang_code
    sys_prompt = (
        "You are a professional media translator. Translate the user's transcript into the target language. "
        "Keep meaning, style, and names. Remove timestamps. Output ONLY the translation without commentary."
    )
    msg = [
        {"role": "system", "content": sys_prompt},
        {
            "role": "user",
            "content": f"Target language: {lang_name} ({target_lang_code}).\n\nTranscript to translate:\n{text}",
        },
    ]
    resp = client.chat.completions.create(model="gpt-4o-mini", messages=msg, temperature=0.2)
    return resp.choices[0].message.content.strip()


VOICE_MAP = {
    "en": "en-US-JennyNeural",
    "es": "es-ES-ElviraNeural",
    "fr": "fr-FR-DeniseNeural",
    "de": "de-DE-KatjaNeural",
    "hi": "hi-IN-SwaraNeural",
    "ar": "ar-EG-SalmaNeural",
    "pt": "pt-BR-FranciscaNeural",
    "ru": "ru-RU-SvetlanaNeural",
    "ja": "ja-JP-NanamiNeural",
    "ko": "ko-KR-SunHiNeural",
    "zh": "zh-CN-XiaoxiaoNeural",
    "ha": "en-US-JennyNeural",
}


async def synthesize_edge_tts(text: str, lang_code: str) -> str:
    voice = VOICE_MAP.get(lang_code.split("-")[0], VOICE_MAP["en"])
    tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
    tmp.close()
    communicate = edge_tts.Communicate(text, voice=voice)
    with open(tmp.name, "wb") as f:
        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                f.write(chunk["data"])
    return tmp.name


def process(url_or_id: str, target_lang: str, do_tts: bool):
    vid = extract_video_id(url_or_id)
    if not vid:
        return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), "Invalid YouTube ID/URL."
    text, src_lang = get_transcript_text(vid)
    if (not text or len(text) < 20) and OPENAI_API_KEY:
        try:
            mp3 = download_audio(vid)
            text = transcribe_with_openai(mp3)
        except Exception as e:
            return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Transcription failed: {e}"
    elif not text:
        return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), (
            "No captions found and no OPENAI_API_KEY provided."
        )
    try:
        translated = openai_translate(text, target_lang) if OPENAI_API_KEY else text
        warn = "" if OPENAI_API_KEY else "No translation (API key missing)."
    except Exception as e:
        return gr.update(value="", visible=True), gr.update(value=None, visible=do_tts), f"Translation failed: {e}"
    audio_path = None
    if do_tts and translated:
        try:
            audio_path = asyncio.run(synthesize_edge_tts(translated, target_lang))
        except RuntimeError:
            loop = asyncio.get_event_loop()
            audio_path = loop.run_until_complete(synthesize_edge_tts(translated, target_lang))
    return gr.update(value=translated, visible=True), gr.update(value=audio_path, visible=do_tts), warn


with gr.Blocks(theme=gr.themes.Soft(), title="YouTube Translator & Speaker") as demo:
    gr.Markdown("# YouTube Translator and Speaker")
    with gr.Row():
        with gr.Column(scale=1):
            video_in = gr.Textbox(label="YouTube URL or Video ID")
            lang = gr.Dropdown(label="Target Language", choices=["en","es","fr","de","hi","ar","pt","ru","ja","ko","zh","ha"], value="en")
            do_tts = gr.Checkbox(label="Generate Speech", value=True)
            submit = gr.Button("Submit", variant="primary")
            clear = gr.Button("Clear")
        with gr.Column(scale=1):
            out_text = gr.Textbox(label="Translated Text", lines=14)
            out_audio = gr.Audio(label="Speech (MP3)", type="filepath")
            status = gr.Markdown(visible=True)

    submit.click(fn=process, inputs=[video_in, lang, do_tts], outputs=[out_text, out_audio, status])
    clear.click(lambda: ("", None, ""), outputs=[out_text, out_audio, status])

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)