| """ |
| PlotWeaver — Live Commentary Translation Platform (Single File) |
| ================================================================ |
| Two engines: Qwen Omni | YourVoic API (with NLLB MT) |
| """ |
|
|
| import os, io, re, time, base64, struct, shutil, subprocess, tempfile, logging |
| import torch, numpy as np, requests, soundfile as sf, gradio as gr |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| |
| QWEN_VOICES = [ |
| "Cherry", "Serena", "Ethan", "Chelsie", "Momo", "Vivian", "Moon", "Maia", |
| "Kai", "Nofish", "Bella", "Jennifer", "Ryan", "Katerina", "Aiden", |
| "Eldric Sage", "Mia", "Mochi", "Bellona", "Vincent", "Bunny", "Neil", |
| "Elias", "Arthur", "Seren", "Bodega", "Sonrisa", "Alek", "Dolce", |
| "Sohee", "Ono Anna", "Lenn", "Emilien", "Andre", |
| ] |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| LANGUAGES = { |
| |
| "English": { |
| "nllb": "eng_Latn", "yourvoic_lang": "en-US", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "en", "qwen_name": "English", |
| }, |
| "Chinese (Mandarin)": { |
| "nllb": "zho_Hans", "yourvoic_lang": "zh-CN", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "zh", "qwen_name": "Mandarin Chinese", |
| }, |
| "Japanese": { |
| "nllb": "jpn_Jpan", "yourvoic_lang": "ja-JP", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "ja", "qwen_name": "Japanese", |
| }, |
| "Korean": { |
| "nllb": "kor_Hang", "yourvoic_lang": "ko-KR", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "ko", "qwen_name": "Korean", |
| }, |
| "German": { |
| "nllb": "deu_Latn", "yourvoic_lang": "de-DE", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "de", "qwen_name": "German", |
| }, |
| "French": { |
| "nllb": "fra_Latn", "yourvoic_lang": "fr-FR", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "fr", "qwen_name": "French", |
| }, |
| "Russian": { |
| "nllb": "rus_Cyrl", "yourvoic_lang": "ru-RU", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "ru", "qwen_name": "Russian", |
| }, |
| "Portuguese": { |
| "nllb": "por_Latn", "yourvoic_lang": "pt-BR", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "pt", "qwen_name": "Portuguese", |
| }, |
| "Spanish": { |
| "nllb": "spa_Latn", "yourvoic_lang": "es-ES", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "es", "qwen_name": "Spanish", |
| }, |
| "Italian": { |
| "nllb": "ita_Latn", "yourvoic_lang": "it-IT", |
| "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", |
| "qwen_code": "it", "qwen_name": "Italian", |
| }, |
| "Arabic": { |
| "nllb": "arb_Arab", "yourvoic_lang": "ar-SA", |
| "yourvoic_voices": ["Peter"], "tts_engine": "qwen", |
| "qwen_code": "ar", "qwen_name": "Modern Standard Arabic", |
| }, |
|
|
| |
| "Swahili": { |
| "nllb": "swh_Latn", "yourvoic_lang": "sw-KE", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Amharic": { |
| "nllb": "amh_Ethi", "yourvoic_lang": "am-ET", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Afrikaans": { |
| "nllb": "afr_Latn", "yourvoic_lang": "af-ZA", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
|
|
| |
| "Hindi": { |
| "nllb": "hin_Deva", "yourvoic_lang": "hi-IN", |
| "yourvoic_voices": ["Rahul", "Deepika", "Aditya"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Bengali": { |
| "nllb": "ben_Beng", "yourvoic_lang": "bn-IN", |
| "yourvoic_voices": ["Sneha", "Aryan"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Tamil": { |
| "nllb": "tam_Taml", "yourvoic_lang": "ta-IN", |
| "yourvoic_voices": ["Priya", "Kumar"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Telugu": { |
| "nllb": "tel_Telu", "yourvoic_lang": "te-IN", |
| "yourvoic_voices": ["Arjun", "Lakshmi"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Marathi": { |
| "nllb": "mar_Deva", "yourvoic_lang": "mr-IN", |
| "yourvoic_voices": ["Anjali", "Rohan"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Urdu": { |
| "nllb": "urd_Arab", "yourvoic_lang": "ur-PK", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Nepali": { |
| "nllb": "npi_Deva", "yourvoic_lang": "ne-NP", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
|
|
| |
| "Indonesian": { |
| "nllb": "ind_Latn", "yourvoic_lang": "id-ID", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Vietnamese": { |
| "nllb": "vie_Latn", "yourvoic_lang": "vi-VN", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Thai": { |
| "nllb": "tha_Thai", "yourvoic_lang": "th-TH", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Malay": { |
| "nllb": "zsm_Latn", "yourvoic_lang": "ms-MY", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Filipino": { |
| "nllb": "tgl_Latn", "yourvoic_lang": "fil-PH", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
|
|
| |
| "Dutch": { |
| "nllb": "nld_Latn", "yourvoic_lang": "nl-NL", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Polish": { |
| "nllb": "pol_Latn", "yourvoic_lang": "pl-PL", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Turkish": { |
| "nllb": "tur_Latn", "yourvoic_lang": "tr-TR", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Swedish": { |
| "nllb": "swe_Latn", "yourvoic_lang": "sv-SE", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Romanian": { |
| "nllb": "ron_Latn", "yourvoic_lang": "ro-RO", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Greek": { |
| "nllb": "ell_Grek", "yourvoic_lang": "el-GR", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Ukrainian": { |
| "nllb": "ukr_Cyrl", "yourvoic_lang": "uk-UA", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Finnish": { |
| "nllb": "fin_Latn", "yourvoic_lang": "fi-FI", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Danish": { |
| "nllb": "dan_Latn", "yourvoic_lang": "da-DK", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Norwegian": { |
| "nllb": "nob_Latn", "yourvoic_lang": "nb-NO", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
|
|
| |
| "Persian": { |
| "nllb": "pes_Arab", "yourvoic_lang": "fa-IR", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| "Hebrew": { |
| "nllb": "heb_Hebr", "yourvoic_lang": "he-IL", |
| "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", |
| "qwen_code": None, "qwen_name": None, |
| }, |
| } |
|
|
|
|
| |
| LANGUAGE_GROUPS = { |
| "Global Languages": [ |
| "Spanish", "French", "German", "Mandarin", "Italian", |
| "Japanese", "Portuguese", "Hindi", "Arabic", "Korean", "Russian", |
| ], |
| "African Languages": [ |
| "Swahili", "Amharic", "Afrikaans", |
| ], |
| "South Asian": [ |
| "Bengali", "Tamil", "Telugu", "Marathi", "Urdu", "Nepali", |
| ], |
| "Southeast Asian": [ |
| "Indonesian", "Vietnamese", "Thai", "Malay", "Filipino", |
| ], |
| "European": [ |
| "Dutch", "Polish", "Turkish", "Swedish", "Romanian", |
| "Greek", "Ukrainian", "Finnish", "Danish", "Norwegian", |
| ], |
| "Middle Eastern": [ |
| "Persian", "Hebrew", |
| ], |
| } |
|
|
| |
| ALL_LANGUAGE_NAMES = sorted(LANGUAGES.keys()) |
|
|
| |
| YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"] |
|
|
| |
| YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"] |
|
|
|
|
| |
| |
| |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
|
| |
| asr_pipe = None |
| mt_tokenizer = None |
| mt_model = None |
|
|
|
|
|
|
| def load_models(): |
| """Load all models at startup.""" |
| global asr_pipe, mt_tokenizer, mt_model |
| from transformers import ( |
| pipeline as hf_pipeline, |
| AutoTokenizer, |
| AutoModelForSeq2SeqLM, |
| ) |
|
|
| print(f"Device: {DEVICE} | Dtype: {TORCH_DTYPE}") |
| print("Loading models...") |
|
|
| |
| ASR_MODEL_ID = "PlotweaverAI/whisper-small-de-en" |
| print(f" Loading ASR: {ASR_MODEL_ID}") |
| asr_pipe = hf_pipeline( |
| "automatic-speech-recognition", |
| model=ASR_MODEL_ID, |
| device=DEVICE, |
| torch_dtype=TORCH_DTYPE, |
| ) |
| print(" ASR loaded") |
|
|
| |
| MT_MODEL_ID = "PlotweaverAI/nllb-200-distilled-600M-african-6lang" |
| print(f" Loading MT: {MT_MODEL_ID}") |
| mt_tokenizer = AutoTokenizer.from_pretrained(MT_MODEL_ID) |
| mt_model = AutoModelForSeq2SeqLM.from_pretrained( |
| MT_MODEL_ID, torch_dtype=TORCH_DTYPE |
| ).to(DEVICE) |
| mt_tokenizer.src_lang = "eng_Latn" |
| print(" MT loaded") |
|
|
| |
| print(f"\n=== Device diagnostics ===") |
| print(f"CUDA available: {torch.cuda.is_available()}") |
| if torch.cuda.is_available(): |
| print(f"CUDA device: {torch.cuda.get_device_name(0)}") |
| print(f"ASR on: {next(asr_pipe.model.parameters()).device}") |
| print(f"MT on: {next(mt_model.parameters()).device}") |
| print(f"YourVoic API key: {'set' if os.environ.get('YOURVOIC_API_KEY') else 'NOT SET'}") |
| print(f"Dashscope key: {'set' if os.environ.get('DASHSCOPE_API_KEY') else 'NOT SET'}") |
| print(f"==========================\n") |
| print("All models loaded!") |
|
|
|
|
| |
|
|
| def split_into_sentences(text): |
| """Split raw ASR text into individual sentences.""" |
| text = text.strip() |
| if not text: |
| return [] |
| text = '. '.join(s.strip().capitalize() for s in text.split('. ') if s.strip()) |
| if re.search(r'[.!?]', text): |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| return [s.strip() for s in sentences if s.strip()] |
| words = text.split() |
| MAX_WORDS = 12 |
| sentences = [] |
| for i in range(0, len(words), MAX_WORDS): |
| chunk = ' '.join(words[i:i + MAX_WORDS]) |
| if not chunk.endswith(('.', '!', '?')): |
| chunk += '.' |
| chunk = chunk[0].upper() + chunk[1:] if len(chunk) > 1 else chunk.upper() |
| sentences.append(chunk) |
| return sentences |
|
|
|
|
| |
|
|
| def transcribe(audio_array, sample_rate=16000): |
| """ASR: English audio to text. Handles both short and long audio.""" |
| if len(audio_array) < 1600: |
| return "" |
|
|
| duration_s = len(audio_array) / sample_rate |
|
|
| if sample_rate != 16000: |
| import torchaudio.functional as F_audio |
| audio_tensor = torch.from_numpy(audio_array).float() |
| audio_tensor = F_audio.resample(audio_tensor, sample_rate, 16000) |
| audio_array = audio_tensor.numpy() |
| sample_rate = 16000 |
|
|
| if duration_s <= 28: |
| result = asr_pipe( |
| {"raw": audio_array, "sampling_rate": sample_rate}, |
| return_timestamps=False, |
| ) |
| return result["text"].strip() |
|
|
| |
| model = asr_pipe.model |
| processor = asr_pipe.feature_extractor |
| tokenizer = asr_pipe.tokenizer |
|
|
| inputs = processor( |
| audio_array, sampling_rate=16000, return_tensors="pt", |
| truncation=False, padding="longest", return_attention_mask=True, |
| ) |
| input_features = inputs.input_features.to(DEVICE, dtype=TORCH_DTYPE) |
| attention_mask = inputs.attention_mask.to(DEVICE) if "attention_mask" in inputs else None |
|
|
| generate_kwargs = {"return_timestamps": True, "language": "en", "task": "transcribe"} |
| if attention_mask is not None: |
| generate_kwargs["attention_mask"] = attention_mask |
|
|
| with torch.no_grad(): |
| predicted_ids = model.generate(input_features, **generate_kwargs) |
|
|
| transcription = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0] |
| return transcription.strip() |
|
|
|
|
| |
|
|
| def translate_sentence(text, target_nllb_code, fast=True, max_length=256): |
| """Translate a single sentence from English to target language.""" |
| inputs = mt_tokenizer(text, return_tensors="pt", truncation=True).to(DEVICE) |
| tgt_lang_id = mt_tokenizer.convert_tokens_to_ids(target_nllb_code) |
|
|
| generate_kwargs = { |
| "forced_bos_token_id": tgt_lang_id, |
| "repetition_penalty": 1.5, |
| "no_repeat_ngram_size": 3, |
| } |
| if fast: |
| generate_kwargs.update({"max_length": 128, "num_beams": 1, "do_sample": False}) |
| else: |
| generate_kwargs.update({"max_length": max_length, "num_beams": 4, "early_stopping": True}) |
|
|
| with torch.no_grad(): |
| output_ids = mt_model.generate(**inputs, **generate_kwargs) |
|
|
| return mt_tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
|
| def translate_text(text, target_nllb_code, fast=True): |
| """Split and translate full text sentence-by-sentence.""" |
| sentences = split_into_sentences(text) |
| if not sentences: |
| return "", [], [] |
| translations = [] |
| for s in sentences: |
| yo = translate_sentence(s, target_nllb_code, fast=fast) |
| translations.append(yo) |
| return ' '.join(translations), sentences, translations |
|
|
|
|
| |
|
|
| def extract_audio_from_video(video_path, output_path, target_sr=16000): |
| """Extract audio track from video as 16kHz mono WAV.""" |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-vn", "-acodec", "pcm_s16le", "-ar", str(target_sr), "-ac", "1", |
| output_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg extraction failed: {result.stderr[:200]}") |
| return output_path |
|
|
|
|
| def get_media_duration(path): |
| """Get duration in seconds.""" |
| cmd = [ |
| "ffprobe", "-v", "error", |
| "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffprobe failed: {result.stderr[:200]}") |
| return float(result.stdout.strip()) |
|
|
|
|
| def stretch_audio_to_duration(input_path, output_path, target_duration_s): |
| """Stretch/compress audio to match target duration.""" |
| current_duration = get_media_duration(input_path) |
| if current_duration <= 0: |
| raise RuntimeError("Invalid audio duration") |
|
|
| ratio = current_duration / target_duration_s |
| filters = [] |
| remaining = ratio |
| while remaining > 2.0: |
| filters.append("atempo=2.0") |
| remaining /= 2.0 |
| while remaining < 0.5: |
| filters.append("atempo=0.5") |
| remaining /= 0.5 |
| filters.append(f"atempo={remaining:.4f}") |
|
|
| cmd = ["ffmpeg", "-y", "-i", input_path, "-filter:a", ",".join(filters), output_path] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg tempo failed: {result.stderr[:200]}") |
| return output_path |
|
|
|
|
| def mux_video_audio(video_path, audio_path, output_path, extend_video=False, target_duration=None): |
| """Combine video with new audio. Optionally extend video by freezing last frame.""" |
| if extend_video and target_duration: |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, "-i", audio_path, |
| "-filter_complex", f"[0:v]tpad=stop_mode=clone:stop_duration={target_duration}[v]", |
| "-map", "[v]", "-map", "1:a:0", |
| "-c:v", "libx264", "-preset", "fast", "-c:a", "aac", |
| "-t", str(target_duration), output_path, |
| ] |
| else: |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, "-i", audio_path, |
| "-c:v", "copy", "-c:a", "aac", |
| "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg mux failed: {result.stderr[:200]}") |
| return output_path |
|
|
|
|
| |
| |
| |
|
|
| YOURVOIC_API_KEY = os.environ.get("YOURVOIC_API_KEY", "") |
| YOURVOIC_STREAM_URL = "https://yourvoic.com/api/v1/tts/stream" |
|
|
|
|
| def synthesize_yourvoic(text, language_code, voice="Peter", speed=1.0): |
| """Synthesize text using YourVoic API.""" |
| if not YOURVOIC_API_KEY: |
| raise RuntimeError("YOURVOIC_API_KEY not set.") |
|
|
| headers = {"X-API-Key": YOURVOIC_API_KEY, "Content-Type": "application/json"} |
| payload = {"text": text, "voice": voice, "language": language_code, "model": "aura-prime", "speed": speed} |
|
|
| t0 = time.time() |
| response = requests.post(YOURVOIC_STREAM_URL, headers=headers, json=payload, stream=True, timeout=60) |
|
|
| if response.status_code != 200: |
| raise RuntimeError(f"YourVoic error {response.status_code}: {response.text[:200]}") |
|
|
| |
| ct = response.headers.get("content-type", "").lower() |
| logger.info(f"YourVoic content-type: {ct}") |
|
|
| |
| audio_data = b"" |
| for chunk in response.iter_content(chunk_size=8192): |
| audio_data += chunk |
|
|
| elapsed = time.time() - t0 |
| logger.info(f"YourVoic TTS: {len(text)} chars, {elapsed:.2f}s, {len(audio_data)} bytes") |
|
|
| |
| magic = audio_data[:16] if len(audio_data) > 16 else audio_data |
| logger.info(f"YourVoic first bytes: {magic[:8]}") |
|
|
| |
| if b"RIFF" in audio_data[:4]: |
| ext = ".wav" |
| elif b"\xff\xfb" in audio_data[:3] or b"\xff\xf3" in audio_data[:3] or b"ID3" in audio_data[:3]: |
| ext = ".mp3" |
| elif b"OggS" in audio_data[:4]: |
| ext = ".ogg" |
| elif b"fLaC" in audio_data[:4]: |
| ext = ".flac" |
| elif "mp3" in ct or "mpeg" in ct: |
| ext = ".mp3" |
| elif "ogg" in ct: |
| ext = ".ogg" |
| elif "wav" in ct: |
| ext = ".wav" |
| elif "flac" in ct: |
| ext = ".flac" |
| elif "linear16" in ct or "pcm" in ct or "l16" in ct: |
| ext = ".raw" |
| else: |
| ext = ".mp3" |
| logger.warning(f"Unknown YourVoic format (ct={ct}), guessing mp3") |
|
|
| |
| tmp_path = tempfile.NamedTemporaryFile(suffix=ext, delete=False).name |
| with open(tmp_path, "wb") as f: |
| f.write(audio_data) |
|
|
| |
| try: |
| audio_array, sample_rate = sf.read(tmp_path, dtype="float32") |
| os.unlink(tmp_path) |
| return audio_array, sample_rate |
| except Exception as e: |
| logger.warning(f"soundfile can't read {ext}: {e}") |
|
|
| |
| if ext == ".raw": |
| try: |
| sr = 24000 |
| raw_data = audio_data |
| wav_path = tmp_path + ".wav" |
| with open(wav_path, "wb") as f: |
| f.write(b"RIFF") |
| f.write(struct.pack("<I", 36 + len(raw_data))) |
| f.write(b"WAVE") |
| f.write(b"fmt ") |
| f.write(struct.pack("<IHHIIHH", 16, 1, 1, sr, sr * 2, 2, 16)) |
| f.write(b"data") |
| f.write(struct.pack("<I", len(raw_data))) |
| f.write(raw_data) |
| audio_array, sample_rate = sf.read(wav_path, dtype="float32") |
| os.unlink(tmp_path) |
| os.unlink(wav_path) |
| return audio_array, sample_rate |
| except Exception as e: |
| logger.warning(f"Raw PCM wrap failed: {e}") |
|
|
| |
| try: |
| wav_path = tmp_path + ".wav" |
| result = subprocess.run( |
| ["ffmpeg", "-y", "-i", tmp_path, "-acodec", "pcm_s16le", "-ar", "24000", "-ac", "1", wav_path], |
| capture_output=True, text=True, |
| ) |
| os.unlink(tmp_path) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg failed: {result.stderr[-300:]}") |
| audio_array, sample_rate = sf.read(wav_path, dtype="float32") |
| os.unlink(wav_path) |
| return audio_array, sample_rate |
| except Exception as e2: |
| for f in [tmp_path, tmp_path + ".wav"]: |
| if os.path.exists(f): os.unlink(f) |
| raise RuntimeError(f"YourVoic decode failed: {e2}") |
|
|
|
|
| def synthesize_yourvoic_to_file(text, language_code, output_path, voice="Peter", speed=1.0): |
| """Synthesize via YourVoic and save to file.""" |
| audio, sr = synthesize_yourvoic(text, language_code, voice, speed) |
| sf.write(output_path, audio, sr) |
| return output_path, sr |
|
|
|
|
|
|
|
|
| def synthesize_chunked(text, language_config, sentences_per_chunk=2): |
| """ |
| Synthesize long text by chunking into sentence groups via YourVoic API. |
| |
| Args: |
| text: Full text to synthesize |
| language_config: Dict from LANGUAGES (has yourvoic_lang, yourvoic_voices, etc.) |
| sentences_per_chunk: How many sentences to synthesize per API call |
| |
| Returns: |
| (audio_array, sample_rate) |
| """ |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| sentences = [s.strip() for s in sentences if s.strip()] |
|
|
| if not sentences: |
| return np.zeros(int(0.5 * 16000), dtype=np.float32), 16000 |
|
|
| audio_segments = [] |
| output_sr = None |
|
|
| for i in range(0, len(sentences), sentences_per_chunk): |
| chunk_text = ' '.join(sentences[i:i + sentences_per_chunk]) |
| if not chunk_text: |
| continue |
|
|
| try: |
| voice = language_config["yourvoic_voices"][0] if language_config.get("yourvoic_voices") else "Peter" |
| lang_code = language_config["yourvoic_lang"] |
| audio_seg, seg_sr = synthesize_yourvoic(chunk_text, lang_code, voice) |
|
|
| if output_sr is None: |
| output_sr = seg_sr |
| if len(audio_seg) > 0: |
| audio_segments.append(audio_seg) |
| silence = np.zeros(int(0.15 * seg_sr), dtype=np.float32) |
| audio_segments.append(silence) |
|
|
| except Exception as e: |
| logger.error(f"TTS chunk failed: {e}") |
| continue |
|
|
| if not audio_segments: |
| fallback_sr = output_sr or 16000 |
| logger.warning("All TTS chunks failed — returning silence") |
| return np.zeros(int(0.5 * fallback_sr), dtype=np.float32), fallback_sr |
|
|
| return np.concatenate(audio_segments), output_sr |
|
|
|
|
| |
| |
| |
|
|
| QWEN_MODEL = "qwen3.5-omni-plus" |
| QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" |
|
|
|
|
| def _get_client(): |
| """Create OpenAI-compatible client for Qwen Dashscope API.""" |
| from openai import OpenAI |
| api_key = os.environ.get("DASHSCOPE_API_KEY", "") |
| if not api_key: |
| raise RuntimeError( |
| "DASHSCOPE_API_KEY not set. Add it as a Space secret." |
| ) |
| return OpenAI(api_key=api_key, base_url=QWEN_BASE_URL) |
|
|
|
|
| def _wav_to_base64(wav_path): |
| """Read WAV file and return base64 string.""" |
| with open(wav_path, "rb") as f: |
| return base64.b64encode(f.read()).decode("utf-8") |
|
|
|
|
| def _base64_to_wav(b64_data, output_path): |
| """Convert raw PCM base64 audio to WAV file (24kHz, mono, 16-bit).""" |
| audio_bytes = base64.b64decode(b64_data) |
| sample_rate = 24000 |
| num_channels = 1 |
| bits_per_sample = 16 |
| byte_rate = sample_rate * num_channels * bits_per_sample // 8 |
| block_align = num_channels * bits_per_sample // 8 |
| data_size = len(audio_bytes) |
| with open(output_path, "wb") as f: |
| f.write(b"RIFF") |
| f.write(struct.pack("<I", 36 + data_size)) |
| f.write(b"WAVE") |
| f.write(b"fmt ") |
| f.write(struct.pack("<I", 16)) |
| f.write(struct.pack("<H", 1)) |
| f.write(struct.pack("<H", num_channels)) |
| f.write(struct.pack("<I", sample_rate)) |
| f.write(struct.pack("<I", byte_rate)) |
| f.write(struct.pack("<H", block_align)) |
| f.write(struct.pack("<H", bits_per_sample)) |
| f.write(b"data") |
| f.write(struct.pack("<I", data_size)) |
| f.write(audio_bytes) |
|
|
|
|
| def _extract_audio_chunk(video_path, output_wav, start_sec, duration_sec): |
| """Extract a chunk of audio from video as 16kHz mono WAV.""" |
| subprocess.run( |
| ["ffmpeg", "-y", "-ss", str(start_sec), "-t", str(duration_sec), |
| "-i", video_path, "-vn", "-acodec", "pcm_s16le", |
| "-ar", "16000", "-ac", "1", output_wav], |
| capture_output=True, check=True, |
| ) |
|
|
|
|
| def _get_duration(filepath): |
| """Get media file duration in seconds.""" |
| result = subprocess.run( |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", filepath], |
| capture_output=True, text=True, |
| ) |
| return float(result.stdout.strip()) |
|
|
|
|
| def _concatenate_wavs(wav_files, output_path): |
| """Concatenate WAV files using ffmpeg.""" |
| if len(wav_files) == 1: |
| shutil.copy2(wav_files[0], output_path) |
| return |
| list_file = output_path + ".txt" |
| with open(list_file, "w") as f: |
| for wav in wav_files: |
| f.write(f"file '{wav}'\n") |
| subprocess.run( |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", |
| "-i", list_file, "-c", "copy", output_path], |
| capture_output=True, check=True, |
| ) |
| os.remove(list_file) |
|
|
|
|
| def _build_system_prompt(language_name): |
| """Build Qwen system prompt for a target language.""" |
| return ( |
| f"You are a professional video dubbing translator. You will receive audio in English.\n" |
| f"Your task:\n" |
| f"1. Listen carefully to the English speech.\n" |
| f"2. Translate it into natural, fluent {language_name}.\n" |
| f"3. Respond ONLY with the {language_name} translation spoken aloud — no English, no commentary,\n" |
| f" no meta-text, no transliteration. Speak entirely in {language_name}.\n" |
| f"4. Match the tone, emotion, and pacing of the original speaker as closely as possible.\n" |
| f"5. If there are pauses or silence in the original audio, maintain similar pacing.\n" |
| f"6. Translate idioms and cultural references into their {language_name} equivalents.\n" |
| f"7. Use clear, professional pronunciation suitable for a broad audience." |
| ) |
|
|
|
|
| def translate_chunk_qwen(wav_path, voice, language_name, chunk_index=0): |
| """ |
| Translate a single audio chunk using Qwen Omni. |
| |
| Args: |
| wav_path: Path to input WAV file (English audio) |
| voice: Qwen voice name (e.g. "Ethan", "Cherry") |
| language_name: Full language name for the system prompt |
| chunk_index: For logging |
| |
| Returns: |
| (output_wav_path, transcript) or (None, transcript) if no audio |
| """ |
| client = _get_client() |
| audio_b64 = _wav_to_base64(wav_path) |
| output_wav = wav_path.replace(".wav", f"_qwen_{chunk_index}.wav") |
|
|
| system_prompt = _build_system_prompt(language_name) |
| user_prompt = f"Translate this English speech into {language_name}. Respond only with the spoken {language_name} translation." |
|
|
| t0 = time.time() |
| completion = client.chat.completions.create( |
| model=QWEN_MODEL, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "input_audio", |
| "input_audio": { |
| "data": f"data:audio/wav;base64,{audio_b64}", |
| "format": "wav", |
| }, |
| }, |
| {"type": "text", "text": user_prompt}, |
| ], |
| }, |
| ], |
| modalities=["text", "audio"], |
| audio={"voice": voice, "format": "wav"}, |
| stream=True, |
| stream_options={"include_usage": True}, |
| ) |
|
|
| audio_chunks = [] |
| transcript_parts = [] |
|
|
| for event in completion: |
| if not event.choices: |
| continue |
| delta = event.choices[0].delta |
| if hasattr(delta, "content") and delta.content: |
| transcript_parts.append(delta.content) |
| if hasattr(delta, "audio") and delta.audio: |
| if isinstance(delta.audio, dict): |
| if "data" in delta.audio: |
| audio_chunks.append(delta.audio["data"]) |
| elif hasattr(delta.audio, "data") and delta.audio.data: |
| audio_chunks.append(delta.audio.data) |
|
|
| transcript = "".join(transcript_parts) |
| elapsed = time.time() - t0 |
| logger.info(f"Qwen chunk {chunk_index}: {elapsed:.1f}s, transcript={transcript[:60]}") |
|
|
| if audio_chunks: |
| full_audio_b64 = "".join(audio_chunks) |
| _base64_to_wav(full_audio_b64, output_wav) |
| return output_wav, transcript |
|
|
| return None, transcript |
|
|
|
|
| def dub_video_qwen(video_path, language_name, voice="Ethan", chunk_seconds=120, progress_fn=None): |
| """ |
| Full video dubbing pipeline using Qwen Omni. |
| Splits video into chunks, translates each chunk via Qwen API, |
| concatenates results, and muxes back onto video. |
| |
| Args: |
| video_path: Path to input video |
| language_name: Full language name (e.g. "French", "Arabic") |
| voice: Qwen voice name |
| chunk_seconds: Audio chunk duration for API calls |
| progress_fn: Optional gradio progress callback |
| |
| Returns: |
| (output_video_path, log_text) |
| """ |
| tmp_dir = tempfile.mkdtemp(prefix=f"qwen_dub_") |
| log = [] |
|
|
| try: |
| |
| if progress_fn: |
| progress_fn(0.05, desc="Analyzing video...") |
| total_duration = _get_duration(video_path) |
| log.append(f"**Video:** {total_duration:.1f}s") |
| log.append(f"**Engine:** Qwen 3.5 Omni") |
| log.append(f"**Voice:** {voice}") |
| log.append(f"**Language:** {language_name}") |
|
|
| if total_duration > 3600: |
| return None, "Video longer than 1 hour — please use a shorter clip." |
|
|
| |
| if progress_fn: |
| progress_fn(0.1, desc="Extracting audio chunks...") |
| num_chunks = max(1, int(total_duration // chunk_seconds) + (1 if total_duration % chunk_seconds > 0 else 0)) |
| log.append(f"**Chunks:** {num_chunks} ({chunk_seconds}s each)") |
|
|
| input_chunks = [] |
| for i in range(num_chunks): |
| start = i * chunk_seconds |
| duration = min(chunk_seconds, total_duration - start) |
| chunk_path = os.path.join(tmp_dir, f"chunk_{i:03d}.wav") |
| _extract_audio_chunk(video_path, chunk_path, start, duration) |
| input_chunks.append(chunk_path) |
|
|
| |
| output_chunks = [] |
| all_transcripts = [] |
|
|
| for i, chunk_path in enumerate(input_chunks): |
| if progress_fn: |
| frac = 0.15 + 0.7 * (i / num_chunks) |
| progress_fn(frac, desc=f"Translating chunk {i+1}/{num_chunks}...") |
|
|
| result_path, transcript = translate_chunk_qwen( |
| chunk_path, voice, language_name, i |
| ) |
| if transcript: |
| all_transcripts.append(f"**[{i+1}]** {transcript}") |
|
|
| if result_path: |
| output_chunks.append(result_path) |
| else: |
| |
| duration = _get_duration(chunk_path) |
| silence_path = os.path.join(tmp_dir, f"silence_{i:03d}.wav") |
| subprocess.run( |
| ["ffmpeg", "-y", "-f", "lavfi", |
| "-i", "anullsrc=r=24000:cl=mono", |
| "-t", str(duration), "-acodec", "pcm_s16le", silence_path], |
| capture_output=True, check=True, |
| ) |
| output_chunks.append(silence_path) |
|
|
| |
| if progress_fn: |
| progress_fn(0.88, desc="Assembling audio...") |
| full_audio = os.path.join(tmp_dir, "full_dubbed.wav") |
| _concatenate_wavs(output_chunks, full_audio) |
|
|
| |
| if progress_fn: |
| progress_fn(0.93, desc="Combining audio and video...") |
| output_video = os.path.join(tmp_dir, "dubbed_output.mp4") |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", video_path, "-i", full_audio, |
| "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", |
| "-shortest", output_video], |
| capture_output=True, check=True, |
| ) |
|
|
| if progress_fn: |
| progress_fn(1.0, desc="Done!") |
|
|
| log.append(f"\n**Transcript:**") |
| log.extend(all_transcripts) |
|
|
| return output_video, "\n".join(log) |
|
|
| except Exception as e: |
| logger.exception("Qwen dubbing failed") |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
| return None, f"Error: {str(e)}" |
|
|
|
|
| |
| |
| |
|
|
| |
| load_models() |
|
|
| |
| |
| |
|
|
| def get_voices_for_language(lang_name): |
| """Get available voices for a language based on its engine.""" |
| config = LANGUAGES.get(lang_name, {}) |
| engine = config.get("tts_engine", "local") |
| if engine == "qwen": |
| return QWEN_VOICES |
| elif engine == "yourvoic" and config.get("yourvoic_voices"): |
| return config["yourvoic_voices"] |
| elif engine == "local": |
| return ["Peter"] |
| return ["Peter"] |
|
|
|
|
| def full_pipeline_audio(audio_input, target_language): |
| """Full pipeline: English audio → target language audio.""" |
| if audio_input is None: |
| return None, "Please upload or record audio." |
|
|
| lang_config = LANGUAGES.get(target_language) |
| if not lang_config: |
| return None, f"Language '{target_language}' not configured." |
|
|
| sample_rate, audio_array = audio_input |
| audio_array = audio_array.astype(np.float32) |
| if audio_array.ndim > 1: |
| audio_array = audio_array.mean(axis=1) |
| if audio_array.max() > 1.0 or audio_array.min() < -1.0: |
| max_val = max(abs(audio_array.max()), abs(audio_array.min())) |
| if max_val > 0: |
| audio_array = audio_array / max_val |
|
|
| log = [] |
| total_start = time.time() |
|
|
| |
| t0 = time.time() |
| english = transcribe(audio_array, sample_rate) |
| log.append(f"**ASR** ({time.time()-t0:.2f}s)\n{english}") |
| if not english: |
| return None, "ASR returned empty text." |
|
|
| |
| t0 = time.time() |
| nllb_code = lang_config["nllb"] |
| translated, en_sents, tgt_sents = translate_text(english, nllb_code, fast=False) |
| log.append(f"\n**Translation** ({time.time()-t0:.2f}s)") |
| for e, t in zip(en_sents, tgt_sents): |
| log.append(f" EN: {e}\n {target_language.upper()}: {t}") |
| if not translated: |
| return None, "Translation returned empty." |
|
|
| |
| t0 = time.time() |
| audio_out, sr_out = synthesize_chunked( |
| translated, lang_config |
| ) |
| log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") |
|
|
| total = time.time() - total_start |
| log.append(f"\n**Total: {total:.2f}s**") |
|
|
| return (sr_out, audio_out), "\n".join(log) |
|
|
|
|
| def full_pipeline_text(english_text, target_language, voice_name): |
| """Text-only pipeline: English text → target language audio.""" |
| if not english_text or not english_text.strip(): |
| return None, "Please enter English text." |
|
|
| lang_config = LANGUAGES.get(target_language) |
| if not lang_config: |
| return None, f"Language '{target_language}' not configured." |
|
|
| log = [] |
| total_start = time.time() |
|
|
| |
| t0 = time.time() |
| nllb_code = lang_config["nllb"] |
| translated, en_sents, tgt_sents = translate_text(english_text.strip(), nllb_code, fast=False) |
| log.append(f"**Translation** ({time.time()-t0:.2f}s)") |
| for e, t in zip(en_sents, tgt_sents): |
| log.append(f" EN: {e}\n {target_language.upper()}: {t}") |
| if not translated: |
| return None, "Translation returned empty." |
|
|
| |
| t0 = time.time() |
| audio_out, sr_out = synthesize_chunked( |
| translated, lang_config |
| ) |
| log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") |
|
|
| total = time.time() - total_start |
| log.append(f"\n**Total: {total:.2f}s**") |
|
|
| return (sr_out, audio_out), "\n".join(log) |
|
|
|
|
| def dub_video(video_path, target_languages, dub_voice, chunk_seconds, progress=gr.Progress()): |
| """ |
| Dub a video into one or more target languages. |
| Routes to Qwen Omni for global languages, YourVoic for others. |
| """ |
| if video_path is None: |
| return None, "Please upload a video." |
|
|
| if not target_languages: |
| return None, "Please select at least one target language." |
|
|
| results_log = [] |
| output_videos = [] |
|
|
| for lang_name in target_languages: |
| lang_config = LANGUAGES.get(lang_name) |
| if not lang_config: |
| results_log.append(f"**{lang_name}**: not configured, skipped") |
| continue |
|
|
| engine = lang_config.get("tts_engine", "local") |
| results_log.append(f"\n{'='*50}") |
| results_log.append(f"**Dubbing: {lang_name}** (engine: {engine})") |
| results_log.append(f"{'='*50}") |
|
|
| try: |
| if engine == "qwen": |
| |
| qwen_lang_name = lang_config.get("qwen_name", lang_name) |
| voice = dub_voice if dub_voice in QWEN_VOICES else "Ethan" |
| out_video, log_text = dub_video_qwen( |
| video_path, qwen_lang_name, voice=voice, |
| chunk_seconds=chunk_seconds, progress_fn=progress, |
| ) |
| results_log.append(log_text) |
| if out_video: |
| output_videos.append(out_video) |
|
|
| else: |
| |
| work_dir = tempfile.mkdtemp(prefix=f"dub_{lang_name}_") |
| extracted_audio = os.path.join(work_dir, "audio.wav") |
| tgt_audio_raw = os.path.join(work_dir, "tgt_raw.wav") |
| tgt_audio_aligned = os.path.join(work_dir, "tgt_aligned.wav") |
| output_video = os.path.join(work_dir, f"dubbed_{lang_name}.mp4") |
|
|
| progress(0.05, desc=f"{lang_name}: extracting audio...") |
| extract_audio_from_video(video_path, extracted_audio) |
| video_duration = get_media_duration(video_path) |
| results_log.append(f"Video: {video_duration:.1f}s") |
|
|
| audio_array, sr = sf.read(extracted_audio, dtype="float32") |
| if audio_array.ndim > 1: |
| audio_array = audio_array.mean(axis=1) |
|
|
| progress(0.15, desc=f"{lang_name}: transcribing...") |
| t0 = time.time() |
| english = transcribe(audio_array, sr) |
| results_log.append(f"ASR: {time.time()-t0:.1f}s") |
| if not english: |
| results_log.append("ASR empty — skipped") |
| continue |
|
|
| progress(0.4, desc=f"{lang_name}: translating...") |
| t0 = time.time() |
| nllb_code = lang_config["nllb"] |
| translated, _, _ = translate_text(english, nllb_code, fast=True) |
| results_log.append(f"MT: {time.time()-t0:.1f}s") |
| if not translated: |
| results_log.append("Translation empty — skipped") |
| continue |
|
|
| progress(0.65, desc=f"{lang_name}: synthesizing...") |
| t0 = time.time() |
| tgt_audio, tgt_sr = synthesize_chunked( |
| translated, lang_config |
| ) |
| sf.write(tgt_audio_raw, tgt_audio, tgt_sr) |
| tgt_duration = len(tgt_audio) / tgt_sr |
| results_log.append(f"TTS: {time.time()-t0:.1f}s ({tgt_duration:.1f}s audio)") |
|
|
| progress(0.85, desc=f"{lang_name}: aligning...") |
| MAX_STRETCH = 1.2 |
| stretch_ratio = tgt_duration / video_duration |
|
|
| if stretch_ratio <= MAX_STRETCH: |
| if abs(stretch_ratio - 1.0) > 0.02: |
| stretch_audio_to_duration(tgt_audio_raw, tgt_audio_aligned, video_duration) |
| else: |
| import shutil |
| shutil.copy(tgt_audio_raw, tgt_audio_aligned) |
| extend_video = False |
| final_duration = video_duration |
| else: |
| shutil.copy(tgt_audio_raw, tgt_audio_aligned) |
| extend_video = True |
| final_duration = tgt_duration |
| results_log.append(f"Audio longer ({stretch_ratio:.1f}x) — extending video") |
|
|
| progress(0.95, desc=f"{lang_name}: combining...") |
| mux_video_audio( |
| video_path, tgt_audio_aligned, output_video, |
| extend_video=extend_video, target_duration=final_duration |
| ) |
| output_videos.append(output_video) |
|
|
| except Exception as e: |
| logger.exception(f"Dubbing {lang_name} failed") |
| results_log.append(f"Error: {str(e)}") |
|
|
| progress(1.0, desc="Done!") |
| final_video = output_videos[0] if output_videos else None |
| return final_video, "\n".join(results_log) |
|
|
|
|
| def update_voices(language): |
| """Update voice dropdown when language changes.""" |
| voices = get_voices_for_language(language) |
| return gr.update(choices=voices, value=voices[0]) |
|
|
|
|
| |
| |
| |
|
|
| EXAMPLES = [ |
| "And it's a brilliant goal from the striker!", |
| "The referee has shown a yellow card. Corner kick for the home team.", |
| "What a save by the goalkeeper! The match is heading into injury time.", |
| "He dribbles past two defenders and shoots! The ball hits the back of the net!", |
| ] |
|
|
| CSS = """ |
| .main-header { text-align: center; margin-bottom: 0.5rem; } |
| .main-header h1 { font-size: 1.8rem; font-weight: 700; margin: 0; } |
| .main-header p { color: #666; font-size: 0.95rem; } |
| .lang-group-label { font-weight: 600; font-size: 0.85rem; color: #888; text-transform: uppercase; letter-spacing: 0.05em; margin-top: 0.5rem; } |
| """ |
|
|
| with gr.Blocks( |
| title="PlotWeaver — Live Commentary Translation", |
| theme=gr.themes.Soft(), |
| css=CSS, |
| ) as demo: |
|
|
| gr.HTML(""" |
| <div class="main-header"> |
| <h1>PlotWeaver</h1> |
| <p>Live commentary translation platform — English to 40+ languages</p> |
| <p style="font-size:0.8rem; color:#999">Qwen Omni (11 languages) + YourVoic API + NLLB-200 (27 languages)</p> |
| </div> |
| """) |
|
|
| with gr.Tabs(): |
|
|
| |
| with gr.TabItem("Event Management"): |
| gr.Markdown("### Create new event") |
| gr.Markdown("Configure your live broadcast event with target languages and input source.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| event_name = gr.Textbox( |
| label="Event name", |
| placeholder="e.g. Premier League: Arsenal vs. Chelsea", |
| ) |
| with gr.Row(): |
| start_time = gr.Textbox(label="Start time", placeholder="08:30 PM") |
| end_time = gr.Textbox(label="End time", placeholder="10:30 PM") |
| event_date = gr.Textbox(label="Date", placeholder="2026-06-06") |
|
|
| gr.Markdown("#### Input source") |
| input_method = gr.Radio( |
| choices=["RTMP Stream", "WebRTC (Browser)", "Direct Audio Feed"], |
| value="RTMP Stream", |
| label="Input method", |
| ) |
|
|
| gr.Markdown("#### Target languages") |
| gr.Markdown("Select languages for simultaneous broadcast. Additional languages consume more stream minutes.") |
|
|
| |
| target_langs = gr.CheckboxGroup( |
| choices=ALL_LANGUAGE_NAMES, |
| label="Languages", |
| value=["Spanish"], |
| ) |
|
|
| with gr.Column(scale=1): |
| gr.Markdown("#### Estimate summary") |
| estimate_display = gr.Markdown( |
| value="**Event:** Not configured\n\n**Languages:** 1 selected\n\n**Estimated duration:** --\n\n**Total estimate:** --" |
| ) |
| create_event_btn = gr.Button("Create Event", variant="primary", size="lg") |
| event_status = gr.Markdown("") |
|
|
| def update_estimate(name, langs, start, end): |
| n_langs = len(langs) if langs else 0 |
| lang_list = ", ".join(langs) if langs else "None" |
| return ( |
| f"**Event:** {name or 'Not set'}\n\n" |
| f"**Languages:** {n_langs} selected\n\n" |
| f"{lang_list}\n\n" |
| f"**Input:** Configured\n\n" |
| f"**Rate:** 1x (Standard)" |
| ) |
|
|
| for inp in [event_name, target_langs, start_time, end_time]: |
| inp.change( |
| fn=update_estimate, |
| inputs=[event_name, target_langs, start_time, end_time], |
| outputs=[estimate_display], |
| ) |
|
|
| def create_event(name, langs): |
| if not name: |
| return "Please enter an event name." |
| if not langs: |
| return "Please select at least one language." |
| return f"Event **{name}** created with {len(langs)} languages: {', '.join(langs)}" |
|
|
| create_event_btn.click( |
| fn=create_event, |
| inputs=[event_name, target_langs], |
| outputs=[event_status], |
| ) |
|
|
| |
| with gr.TabItem("Live Studio"): |
| gr.Markdown("### Live streaming translation") |
| gr.Markdown("Record or stream English commentary and hear it translated in real-time.") |
|
|
| with gr.Row(): |
| studio_language = gr.Dropdown( |
| choices=ALL_LANGUAGE_NAMES, |
| value="Spanish", |
| label="Target language", |
| ) |
| studio_voice = gr.Dropdown( |
| choices=get_voices_for_language("Spanish"), |
| value=get_voices_for_language("Spanish")[0], |
| label="Voice", |
| ) |
|
|
| studio_language.change( |
| fn=update_voices, |
| inputs=[studio_language], |
| outputs=[studio_voice], |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| studio_audio_in = gr.Audio( |
| label="English commentary (upload or record)", |
| type="numpy", |
| sources=["upload", "microphone"], |
| ) |
| studio_translate_btn = gr.Button("Translate", variant="primary", size="lg") |
|
|
| with gr.Column(): |
| studio_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True) |
| studio_log = gr.Markdown(label="Pipeline log") |
|
|
| studio_translate_btn.click( |
| fn=full_pipeline_audio, |
| inputs=[studio_audio_in, studio_language], |
| outputs=[studio_audio_out, studio_log], |
| ) |
|
|
| |
| with gr.TabItem("Video Dubbing"): |
| gr.Markdown("### Video dubbing (English → multi-language)") |
| gr.Markdown( |
| "Upload a video with English commentary and get back a dubbed version. " |
| "**Global languages** (Arabic, French, Spanish, etc.) use Qwen Omni for best quality. " |
| "**African/regional languages** use YourVoic API with NLLB translation." |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| dub_video_in = gr.Video(label="Upload English video", sources=["upload"]) |
| dub_languages = gr.CheckboxGroup( |
| choices=ALL_LANGUAGE_NAMES, |
| label="Target languages", |
| value=["Spanish"], |
| ) |
| with gr.Row(): |
| dub_voice = gr.Dropdown( |
| choices=QWEN_VOICES, |
| value="Ethan", |
| label="Voice (for Qwen languages)", |
| info="Applies to Arabic, French, Spanish, etc. Local languages use default voice.", |
| ) |
| dub_chunk_slider = gr.Slider( |
| minimum=30, maximum=300, value=120, step=10, |
| label="Chunk duration (seconds)", |
| info="Shorter = more API calls but less timeout risk.", |
| ) |
| dub_btn = gr.Button("Dub Video", variant="primary", size="lg") |
|
|
| with gr.Column(): |
| dub_video_out = gr.Video(label="Dubbed video (download from player)") |
| dub_log = gr.Markdown( |
| label="Processing log", |
| value="Upload a video and select languages to start." |
| ) |
|
|
| dub_btn.click( |
| fn=dub_video, |
| inputs=[dub_video_in, dub_languages, dub_voice, dub_chunk_slider], |
| outputs=[dub_video_out, dub_log], |
| ) |
|
|
| |
| with gr.TabItem("Text \u2192 Audio"): |
| gr.Markdown("### Text to translated speech") |
| gr.Markdown("Type English text, choose a language, and hear the translated audio.") |
|
|
| with gr.Row(): |
| text_language = gr.Dropdown( |
| choices=ALL_LANGUAGE_NAMES, |
| value="Spanish", |
| label="Target language", |
| ) |
| text_voice = gr.Dropdown( |
| choices=get_voices_for_language("Spanish"), |
| value=get_voices_for_language("Spanish")[0], |
| label="Voice", |
| ) |
|
|
| text_language.change( |
| fn=update_voices, |
| inputs=[text_language], |
| outputs=[text_voice], |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| text_input = gr.Textbox( |
| label="English text", |
| placeholder="Type English football commentary here...", |
| lines=4, |
| ) |
| text_btn = gr.Button("Translate to speech", variant="primary", size="lg") |
| gr.Examples( |
| examples=[[e] for e in EXAMPLES], |
| inputs=[text_input], |
| label="Example commentary", |
| ) |
|
|
| with gr.Column(): |
| text_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True) |
| text_log = gr.Markdown(label="Pipeline log") |
|
|
| text_btn.click( |
| fn=full_pipeline_text, |
| inputs=[text_input, text_language, text_voice], |
| outputs=[text_audio_out, text_log], |
| ) |
|
|
| |
| with gr.TabItem("Recordings & Clips"): |
| gr.Markdown("### Recordings management") |
| gr.Markdown( |
| "Past dubbed recordings will appear here. " |
| "This feature is coming soon — for now, use Video Dubbing to create new recordings " |
| "and download them from the player." |
| ) |
|
|
| |
| with gr.TabItem("Voice Models"): |
| gr.Markdown("### Voice model library") |
| gr.Markdown("Browse available voices for each language.") |
|
|
| voice_lang_select = gr.Dropdown( |
| choices=ALL_LANGUAGE_NAMES, |
| value="Spanish", |
| label="Select language", |
| ) |
| voice_info = gr.Markdown() |
|
|
| def show_voice_info(lang): |
| config = LANGUAGES.get(lang, {}) |
| engine = config.get("tts_engine", "unknown") |
| voices = config.get("yourvoic_voices", []) |
|
|
| info = f"### {lang}\n\n" |
| if engine == "qwen": |
| info += f"**Engine:** Qwen 3.5 Omni (end-to-end speech-to-speech)\n\n" |
| info += f"This is the highest quality option. Qwen handles ASR + translation + TTS in a single API call, " |
| info += f"preserving tone, emotion, and pacing from the original speaker.\n\n" |
| info += f"**Available voices ({len(QWEN_VOICES)}):** {', '.join(QWEN_VOICES[:10])}... and {len(QWEN_VOICES)-10} more\n\n" |
| info += f"All voices support all Qwen languages." |
| elif engine == "yourvoic": |
| info += f"**Engine:** YourVoic API (TTS) + NLLB-200 (translation)\n\n" |
| info += f"**YourVoic language:** `{config.get('yourvoic_lang', 'N/A')}`\n\n" |
| info += f"**Available voices:** {', '.join(voices) if voices else 'Peter (default)'}" |
| else: |
| info += f"**Engine:** Not available\n\n" |
| info += f"**NLLB code:** `{config.get('nllb', 'N/A')}`\n\n" |
| info += "Uses locally fine-tuned models on GPU. Voice selection not available." |
|
|
| return info |
|
|
| voice_lang_select.change(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info]) |
| demo.load(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info]) |
|
|
| gr.Markdown(""" |
| --- |
| **PlotWeaver** by PlotweaverAI | Models: |
| [ASR](https://huggingface.co/PlotweaverAI/whisper-small-de-en) | |
| [MT](https://huggingface.co/PlotweaverAI/nllb-200-distilled-600M-african-6lang) | |
| [TTS](https://yourvoic.com) | |
| [Qwen Omni](https://www.alibabacloud.com/help/en/model-studio/qwen-omni) |
| """) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|