""" PlotWeaver — Live Commentary Translation Platform (Single File) ================================================================ Two engines: Qwen Omni | YourVoic API (with NLLB MT) """ import os, io, re, time, base64, struct, shutil, subprocess, tempfile, logging import torch, numpy as np, requests, soundfile as sf, gradio as gr logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) # ============================================================================= # LANGUAGES # ============================================================================= # Qwen Omni voices (work across all Qwen-supported languages) QWEN_VOICES = [ "Cherry", "Serena", "Ethan", "Chelsie", "Momo", "Vivian", "Moon", "Maia", "Kai", "Nofish", "Bella", "Jennifer", "Ryan", "Katerina", "Aiden", "Eldric Sage", "Mia", "Mochi", "Bellona", "Vincent", "Bunny", "Neil", "Elias", "Arthur", "Seren", "Bodega", "Sonrisa", "Alek", "Dolce", "Sohee", "Ono Anna", "Lenn", "Emilien", "Andre", ] # Each language entry: # "Display Name": { # "nllb": NLLB-200 language code (for local/yourvoic pipeline translation), # "yourvoic_lang": YourVoic language code (or None), # "yourvoic_voices": list of YourVoic voice names, # "tts_engine": "qwen" | "yourvoic" | "local", # "qwen_code": short language code for Qwen prompts (or None), # "qwen_name": full language name for Qwen system prompt (or None), # } LANGUAGES = { # ---- Qwen Omni Languages (end-to-end speech-to-speech, 11 languages) ---- "English": { "nllb": "eng_Latn", "yourvoic_lang": "en-US", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "en", "qwen_name": "English", }, "Chinese (Mandarin)": { "nllb": "zho_Hans", "yourvoic_lang": "zh-CN", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "zh", "qwen_name": "Mandarin Chinese", }, "Japanese": { "nllb": "jpn_Jpan", "yourvoic_lang": "ja-JP", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "ja", "qwen_name": "Japanese", }, "Korean": { "nllb": "kor_Hang", "yourvoic_lang": "ko-KR", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "ko", "qwen_name": "Korean", }, "German": { "nllb": "deu_Latn", "yourvoic_lang": "de-DE", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "de", "qwen_name": "German", }, "French": { "nllb": "fra_Latn", "yourvoic_lang": "fr-FR", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "fr", "qwen_name": "French", }, "Russian": { "nllb": "rus_Cyrl", "yourvoic_lang": "ru-RU", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "ru", "qwen_name": "Russian", }, "Portuguese": { "nllb": "por_Latn", "yourvoic_lang": "pt-BR", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "pt", "qwen_name": "Portuguese", }, "Spanish": { "nllb": "spa_Latn", "yourvoic_lang": "es-ES", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "es", "qwen_name": "Spanish", }, "Italian": { "nllb": "ita_Latn", "yourvoic_lang": "it-IT", "yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen", "qwen_code": "it", "qwen_name": "Italian", }, "Arabic": { "nllb": "arb_Arab", "yourvoic_lang": "ar-SA", "yourvoic_voices": ["Peter"], "tts_engine": "qwen", "qwen_code": "ar", "qwen_name": "Modern Standard Arabic", }, # ---- African Languages (YourVoic API) ---- "Swahili": { "nllb": "swh_Latn", "yourvoic_lang": "sw-KE", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Amharic": { "nllb": "amh_Ethi", "yourvoic_lang": "am-ET", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Afrikaans": { "nllb": "afr_Latn", "yourvoic_lang": "af-ZA", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, # ---- South Asian (YourVoic TTS + NLLB MT) ---- "Hindi": { "nllb": "hin_Deva", "yourvoic_lang": "hi-IN", "yourvoic_voices": ["Rahul", "Deepika", "Aditya"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Bengali": { "nllb": "ben_Beng", "yourvoic_lang": "bn-IN", "yourvoic_voices": ["Sneha", "Aryan"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Tamil": { "nllb": "tam_Taml", "yourvoic_lang": "ta-IN", "yourvoic_voices": ["Priya", "Kumar"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Telugu": { "nllb": "tel_Telu", "yourvoic_lang": "te-IN", "yourvoic_voices": ["Arjun", "Lakshmi"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Marathi": { "nllb": "mar_Deva", "yourvoic_lang": "mr-IN", "yourvoic_voices": ["Anjali", "Rohan"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Urdu": { "nllb": "urd_Arab", "yourvoic_lang": "ur-PK", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Nepali": { "nllb": "npi_Deva", "yourvoic_lang": "ne-NP", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, # ---- Southeast Asian (YourVoic) ---- "Indonesian": { "nllb": "ind_Latn", "yourvoic_lang": "id-ID", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Vietnamese": { "nllb": "vie_Latn", "yourvoic_lang": "vi-VN", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Thai": { "nllb": "tha_Thai", "yourvoic_lang": "th-TH", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Malay": { "nllb": "zsm_Latn", "yourvoic_lang": "ms-MY", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Filipino": { "nllb": "tgl_Latn", "yourvoic_lang": "fil-PH", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, # ---- European (YourVoic) ---- "Dutch": { "nllb": "nld_Latn", "yourvoic_lang": "nl-NL", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Polish": { "nllb": "pol_Latn", "yourvoic_lang": "pl-PL", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Turkish": { "nllb": "tur_Latn", "yourvoic_lang": "tr-TR", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Swedish": { "nllb": "swe_Latn", "yourvoic_lang": "sv-SE", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Romanian": { "nllb": "ron_Latn", "yourvoic_lang": "ro-RO", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Greek": { "nllb": "ell_Grek", "yourvoic_lang": "el-GR", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Ukrainian": { "nllb": "ukr_Cyrl", "yourvoic_lang": "uk-UA", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Finnish": { "nllb": "fin_Latn", "yourvoic_lang": "fi-FI", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Danish": { "nllb": "dan_Latn", "yourvoic_lang": "da-DK", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Norwegian": { "nllb": "nob_Latn", "yourvoic_lang": "nb-NO", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, # ---- Middle Eastern (YourVoic) ---- "Persian": { "nllb": "pes_Arab", "yourvoic_lang": "fa-IR", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, "Hebrew": { "nllb": "heb_Hebr", "yourvoic_lang": "he-IL", "yourvoic_voices": ["Peter"], "tts_engine": "yourvoic", "qwen_code": None, "qwen_name": None, }, } # Group languages by category for the UI LANGUAGE_GROUPS = { "Global Languages": [ "Spanish", "French", "German", "Mandarin", "Italian", "Japanese", "Portuguese", "Hindi", "Arabic", "Korean", "Russian", ], "African Languages": [ "Swahili", "Amharic", "Afrikaans", ], "South Asian": [ "Bengali", "Tamil", "Telugu", "Marathi", "Urdu", "Nepali", ], "Southeast Asian": [ "Indonesian", "Vietnamese", "Thai", "Malay", "Filipino", ], "European": [ "Dutch", "Polish", "Turkish", "Swedish", "Romanian", "Greek", "Ukrainian", "Finnish", "Danish", "Norwegian", ], "Middle Eastern": [ "Persian", "Hebrew", ], } # All language display names (for dropdowns) ALL_LANGUAGE_NAMES = sorted(LANGUAGES.keys()) # Languages that use YourVoic API YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"] # Languages that use YourVoic API YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"] # ============================================================================= # PIPELINE: ASR + MT + Video helpers # ============================================================================= DEVICE = "cuda" if torch.cuda.is_available() else "cpu" TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32 # Models (loaded once at startup) asr_pipe = None mt_tokenizer = None mt_model = None def load_models(): """Load all models at startup.""" global asr_pipe, mt_tokenizer, mt_model from transformers import ( pipeline as hf_pipeline, AutoTokenizer, AutoModelForSeq2SeqLM, ) print(f"Device: {DEVICE} | Dtype: {TORCH_DTYPE}") print("Loading models...") # ASR ASR_MODEL_ID = "PlotweaverAI/whisper-small-de-en" print(f" Loading ASR: {ASR_MODEL_ID}") asr_pipe = hf_pipeline( "automatic-speech-recognition", model=ASR_MODEL_ID, device=DEVICE, torch_dtype=TORCH_DTYPE, ) print(" ASR loaded") # MT MT_MODEL_ID = "PlotweaverAI/nllb-200-distilled-600M-african-6lang" print(f" Loading MT: {MT_MODEL_ID}") mt_tokenizer = AutoTokenizer.from_pretrained(MT_MODEL_ID) mt_model = AutoModelForSeq2SeqLM.from_pretrained( MT_MODEL_ID, torch_dtype=TORCH_DTYPE ).to(DEVICE) mt_tokenizer.src_lang = "eng_Latn" print(" MT loaded") # Diagnostics print(f"\n=== Device diagnostics ===") print(f"CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"CUDA device: {torch.cuda.get_device_name(0)}") print(f"ASR on: {next(asr_pipe.model.parameters()).device}") print(f"MT on: {next(mt_model.parameters()).device}") print(f"YourVoic API key: {'set' if os.environ.get('YOURVOIC_API_KEY') else 'NOT SET'}") print(f"Dashscope key: {'set' if os.environ.get('DASHSCOPE_API_KEY') else 'NOT SET'}") print(f"==========================\n") print("All models loaded!") # ---- Text Processing ---- def split_into_sentences(text): """Split raw ASR text into individual sentences.""" text = text.strip() if not text: return [] text = '. '.join(s.strip().capitalize() for s in text.split('. ') if s.strip()) if re.search(r'[.!?]', text): sentences = re.split(r'(?<=[.!?])\s+', text) return [s.strip() for s in sentences if s.strip()] words = text.split() MAX_WORDS = 12 sentences = [] for i in range(0, len(words), MAX_WORDS): chunk = ' '.join(words[i:i + MAX_WORDS]) if not chunk.endswith(('.', '!', '?')): chunk += '.' chunk = chunk[0].upper() + chunk[1:] if len(chunk) > 1 else chunk.upper() sentences.append(chunk) return sentences # ---- ASR ---- def transcribe(audio_array, sample_rate=16000): """ASR: English audio to text. Handles both short and long audio.""" if len(audio_array) < 1600: return "" duration_s = len(audio_array) / sample_rate if sample_rate != 16000: import torchaudio.functional as F_audio audio_tensor = torch.from_numpy(audio_array).float() audio_tensor = F_audio.resample(audio_tensor, sample_rate, 16000) audio_array = audio_tensor.numpy() sample_rate = 16000 if duration_s <= 28: result = asr_pipe( {"raw": audio_array, "sampling_rate": sample_rate}, return_timestamps=False, ) return result["text"].strip() # Long-form: native Whisper generate model = asr_pipe.model processor = asr_pipe.feature_extractor tokenizer = asr_pipe.tokenizer inputs = processor( audio_array, sampling_rate=16000, return_tensors="pt", truncation=False, padding="longest", return_attention_mask=True, ) input_features = inputs.input_features.to(DEVICE, dtype=TORCH_DTYPE) attention_mask = inputs.attention_mask.to(DEVICE) if "attention_mask" in inputs else None generate_kwargs = {"return_timestamps": True, "language": "en", "task": "transcribe"} if attention_mask is not None: generate_kwargs["attention_mask"] = attention_mask with torch.no_grad(): predicted_ids = model.generate(input_features, **generate_kwargs) transcription = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0] return transcription.strip() # ---- MT ---- def translate_sentence(text, target_nllb_code, fast=True, max_length=256): """Translate a single sentence from English to target language.""" inputs = mt_tokenizer(text, return_tensors="pt", truncation=True).to(DEVICE) tgt_lang_id = mt_tokenizer.convert_tokens_to_ids(target_nllb_code) generate_kwargs = { "forced_bos_token_id": tgt_lang_id, "repetition_penalty": 1.5, "no_repeat_ngram_size": 3, } if fast: generate_kwargs.update({"max_length": 128, "num_beams": 1, "do_sample": False}) else: generate_kwargs.update({"max_length": max_length, "num_beams": 4, "early_stopping": True}) with torch.no_grad(): output_ids = mt_model.generate(**inputs, **generate_kwargs) return mt_tokenizer.decode(output_ids[0], skip_special_tokens=True) def translate_text(text, target_nllb_code, fast=True): """Split and translate full text sentence-by-sentence.""" sentences = split_into_sentences(text) if not sentences: return "", [], [] translations = [] for s in sentences: yo = translate_sentence(s, target_nllb_code, fast=fast) translations.append(yo) return ' '.join(translations), sentences, translations # ---- Video Processing ---- def extract_audio_from_video(video_path, output_path, target_sr=16000): """Extract audio track from video as 16kHz mono WAV.""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", str(target_sr), "-ac", "1", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg extraction failed: {result.stderr[:200]}") return output_path def get_media_duration(path): """Get duration in seconds.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffprobe failed: {result.stderr[:200]}") return float(result.stdout.strip()) def stretch_audio_to_duration(input_path, output_path, target_duration_s): """Stretch/compress audio to match target duration.""" current_duration = get_media_duration(input_path) if current_duration <= 0: raise RuntimeError("Invalid audio duration") ratio = current_duration / target_duration_s filters = [] remaining = ratio while remaining > 2.0: filters.append("atempo=2.0") remaining /= 2.0 while remaining < 0.5: filters.append("atempo=0.5") remaining /= 0.5 filters.append(f"atempo={remaining:.4f}") cmd = ["ffmpeg", "-y", "-i", input_path, "-filter:a", ",".join(filters), output_path] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg tempo failed: {result.stderr[:200]}") return output_path def mux_video_audio(video_path, audio_path, output_path, extend_video=False, target_duration=None): """Combine video with new audio. Optionally extend video by freezing last frame.""" if extend_video and target_duration: cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-filter_complex", f"[0:v]tpad=stop_mode=clone:stop_duration={target_duration}[v]", "-map", "[v]", "-map", "1:a:0", "-c:v", "libx264", "-preset", "fast", "-c:a", "aac", "-t", str(target_duration), output_path, ] else: cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg mux failed: {result.stderr[:200]}") return output_path # ============================================================================= # TTS ENGINE: YourVoic API # ============================================================================= YOURVOIC_API_KEY = os.environ.get("YOURVOIC_API_KEY", "") YOURVOIC_STREAM_URL = "https://yourvoic.com/api/v1/tts/stream" def synthesize_yourvoic(text, language_code, voice="Peter", speed=1.0): """Synthesize text using YourVoic API.""" if not YOURVOIC_API_KEY: raise RuntimeError("YOURVOIC_API_KEY not set.") headers = {"X-API-Key": YOURVOIC_API_KEY, "Content-Type": "application/json"} payload = {"text": text, "voice": voice, "language": language_code, "model": "aura-prime", "speed": speed} t0 = time.time() response = requests.post(YOURVOIC_STREAM_URL, headers=headers, json=payload, stream=True, timeout=60) if response.status_code != 200: raise RuntimeError(f"YourVoic error {response.status_code}: {response.text[:200]}") # Detect format from content-type header ct = response.headers.get("content-type", "").lower() logger.info(f"YourVoic content-type: {ct}") # Collect audio bytes audio_data = b"" for chunk in response.iter_content(chunk_size=8192): audio_data += chunk elapsed = time.time() - t0 logger.info(f"YourVoic TTS: {len(text)} chars, {elapsed:.2f}s, {len(audio_data)} bytes") # Log first bytes for format detection magic = audio_data[:16] if len(audio_data) > 16 else audio_data logger.info(f"YourVoic first bytes: {magic[:8]}") # Determine file extension from content-type or magic bytes if b"RIFF" in audio_data[:4]: ext = ".wav" elif b"\xff\xfb" in audio_data[:3] or b"\xff\xf3" in audio_data[:3] or b"ID3" in audio_data[:3]: ext = ".mp3" elif b"OggS" in audio_data[:4]: ext = ".ogg" elif b"fLaC" in audio_data[:4]: ext = ".flac" elif "mp3" in ct or "mpeg" in ct: ext = ".mp3" elif "ogg" in ct: ext = ".ogg" elif "wav" in ct: ext = ".wav" elif "flac" in ct: ext = ".flac" elif "linear16" in ct or "pcm" in ct or "l16" in ct: ext = ".raw" else: ext = ".mp3" # Most common API default logger.warning(f"Unknown YourVoic format (ct={ct}), guessing mp3") # Save with correct extension tmp_path = tempfile.NamedTemporaryFile(suffix=ext, delete=False).name with open(tmp_path, "wb") as f: f.write(audio_data) # Try reading directly with soundfile try: audio_array, sample_rate = sf.read(tmp_path, dtype="float32") os.unlink(tmp_path) return audio_array, sample_rate except Exception as e: logger.warning(f"soundfile can't read {ext}: {e}") # Handle raw PCM (linear16): wrap in WAV header if ext == ".raw": try: sr = 24000 raw_data = audio_data wav_path = tmp_path + ".wav" with open(wav_path, "wb") as f: f.write(b"RIFF") f.write(struct.pack(" 0: audio_segments.append(audio_seg) silence = np.zeros(int(0.15 * seg_sr), dtype=np.float32) audio_segments.append(silence) except Exception as e: logger.error(f"TTS chunk failed: {e}") continue if not audio_segments: fallback_sr = output_sr or 16000 logger.warning("All TTS chunks failed — returning silence") return np.zeros(int(0.5 * fallback_sr), dtype=np.float32), fallback_sr return np.concatenate(audio_segments), output_sr # ============================================================================= # QWEN OMNI ENGINE # ============================================================================= QWEN_MODEL = "qwen3.5-omni-plus" QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" def _get_client(): """Create OpenAI-compatible client for Qwen Dashscope API.""" from openai import OpenAI api_key = os.environ.get("DASHSCOPE_API_KEY", "") if not api_key: raise RuntimeError( "DASHSCOPE_API_KEY not set. Add it as a Space secret." ) return OpenAI(api_key=api_key, base_url=QWEN_BASE_URL) def _wav_to_base64(wav_path): """Read WAV file and return base64 string.""" with open(wav_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def _base64_to_wav(b64_data, output_path): """Convert raw PCM base64 audio to WAV file (24kHz, mono, 16-bit).""" audio_bytes = base64.b64decode(b64_data) sample_rate = 24000 num_channels = 1 bits_per_sample = 16 byte_rate = sample_rate * num_channels * bits_per_sample // 8 block_align = num_channels * bits_per_sample // 8 data_size = len(audio_bytes) with open(output_path, "wb") as f: f.write(b"RIFF") f.write(struct.pack(" 3600: return None, "Video longer than 1 hour — please use a shorter clip." # Split into chunks if progress_fn: progress_fn(0.1, desc="Extracting audio chunks...") num_chunks = max(1, int(total_duration // chunk_seconds) + (1 if total_duration % chunk_seconds > 0 else 0)) log.append(f"**Chunks:** {num_chunks} ({chunk_seconds}s each)") input_chunks = [] for i in range(num_chunks): start = i * chunk_seconds duration = min(chunk_seconds, total_duration - start) chunk_path = os.path.join(tmp_dir, f"chunk_{i:03d}.wav") _extract_audio_chunk(video_path, chunk_path, start, duration) input_chunks.append(chunk_path) # Translate each chunk output_chunks = [] all_transcripts = [] for i, chunk_path in enumerate(input_chunks): if progress_fn: frac = 0.15 + 0.7 * (i / num_chunks) progress_fn(frac, desc=f"Translating chunk {i+1}/{num_chunks}...") result_path, transcript = translate_chunk_qwen( chunk_path, voice, language_name, i ) if transcript: all_transcripts.append(f"**[{i+1}]** {transcript}") if result_path: output_chunks.append(result_path) else: # Silence fallback duration = _get_duration(chunk_path) silence_path = os.path.join(tmp_dir, f"silence_{i:03d}.wav") subprocess.run( ["ffmpeg", "-y", "-f", "lavfi", "-i", "anullsrc=r=24000:cl=mono", "-t", str(duration), "-acodec", "pcm_s16le", silence_path], capture_output=True, check=True, ) output_chunks.append(silence_path) # Concatenate if progress_fn: progress_fn(0.88, desc="Assembling audio...") full_audio = os.path.join(tmp_dir, "full_dubbed.wav") _concatenate_wavs(output_chunks, full_audio) # Mux onto video if progress_fn: progress_fn(0.93, desc="Combining audio and video...") output_video = os.path.join(tmp_dir, "dubbed_output.mp4") subprocess.run( ["ffmpeg", "-y", "-i", video_path, "-i", full_audio, "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_video], capture_output=True, check=True, ) if progress_fn: progress_fn(1.0, desc="Done!") log.append(f"\n**Transcript:**") log.extend(all_transcripts) return output_video, "\n".join(log) except Exception as e: logger.exception("Qwen dubbing failed") shutil.rmtree(tmp_dir, ignore_errors=True) return None, f"Error: {str(e)}" # ============================================================================= # GRADIO APP # ============================================================================= # Load models at startup load_models() # ============================================================================= # Helper functions # ============================================================================= def get_voices_for_language(lang_name): """Get available voices for a language based on its engine.""" config = LANGUAGES.get(lang_name, {}) engine = config.get("tts_engine", "local") if engine == "qwen": return QWEN_VOICES elif engine == "yourvoic" and config.get("yourvoic_voices"): return config["yourvoic_voices"] elif engine == "local": return ["Peter"] return ["Peter"] def full_pipeline_audio(audio_input, target_language): """Full pipeline: English audio → target language audio.""" if audio_input is None: return None, "Please upload or record audio." lang_config = LANGUAGES.get(target_language) if not lang_config: return None, f"Language '{target_language}' not configured." sample_rate, audio_array = audio_input audio_array = audio_array.astype(np.float32) if audio_array.ndim > 1: audio_array = audio_array.mean(axis=1) if audio_array.max() > 1.0 or audio_array.min() < -1.0: max_val = max(abs(audio_array.max()), abs(audio_array.min())) if max_val > 0: audio_array = audio_array / max_val log = [] total_start = time.time() # ASR t0 = time.time() english = transcribe(audio_array, sample_rate) log.append(f"**ASR** ({time.time()-t0:.2f}s)\n{english}") if not english: return None, "ASR returned empty text." # MT t0 = time.time() nllb_code = lang_config["nllb"] translated, en_sents, tgt_sents = translate_text(english, nllb_code, fast=False) log.append(f"\n**Translation** ({time.time()-t0:.2f}s)") for e, t in zip(en_sents, tgt_sents): log.append(f" EN: {e}\n {target_language.upper()}: {t}") if not translated: return None, "Translation returned empty." # TTS t0 = time.time() audio_out, sr_out = synthesize_chunked( translated, lang_config ) log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") total = time.time() - total_start log.append(f"\n**Total: {total:.2f}s**") return (sr_out, audio_out), "\n".join(log) def full_pipeline_text(english_text, target_language, voice_name): """Text-only pipeline: English text → target language audio.""" if not english_text or not english_text.strip(): return None, "Please enter English text." lang_config = LANGUAGES.get(target_language) if not lang_config: return None, f"Language '{target_language}' not configured." log = [] total_start = time.time() # MT t0 = time.time() nllb_code = lang_config["nllb"] translated, en_sents, tgt_sents = translate_text(english_text.strip(), nllb_code, fast=False) log.append(f"**Translation** ({time.time()-t0:.2f}s)") for e, t in zip(en_sents, tgt_sents): log.append(f" EN: {e}\n {target_language.upper()}: {t}") if not translated: return None, "Translation returned empty." # TTS t0 = time.time() audio_out, sr_out = synthesize_chunked( translated, lang_config ) log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio") total = time.time() - total_start log.append(f"\n**Total: {total:.2f}s**") return (sr_out, audio_out), "\n".join(log) def dub_video(video_path, target_languages, dub_voice, chunk_seconds, progress=gr.Progress()): """ Dub a video into one or more target languages. Routes to Qwen Omni for global languages, YourVoic for others. """ if video_path is None: return None, "Please upload a video." if not target_languages: return None, "Please select at least one target language." results_log = [] output_videos = [] for lang_name in target_languages: lang_config = LANGUAGES.get(lang_name) if not lang_config: results_log.append(f"**{lang_name}**: not configured, skipped") continue engine = lang_config.get("tts_engine", "local") results_log.append(f"\n{'='*50}") results_log.append(f"**Dubbing: {lang_name}** (engine: {engine})") results_log.append(f"{'='*50}") try: if engine == "qwen": # Qwen Omni: end-to-end speech-to-speech (best for global languages) qwen_lang_name = lang_config.get("qwen_name", lang_name) voice = dub_voice if dub_voice in QWEN_VOICES else "Ethan" out_video, log_text = dub_video_qwen( video_path, qwen_lang_name, voice=voice, chunk_seconds=chunk_seconds, progress_fn=progress, ) results_log.append(log_text) if out_video: output_videos.append(out_video) else: # Local/YourVoic pipeline: ASR → NLLB → TTS work_dir = tempfile.mkdtemp(prefix=f"dub_{lang_name}_") extracted_audio = os.path.join(work_dir, "audio.wav") tgt_audio_raw = os.path.join(work_dir, "tgt_raw.wav") tgt_audio_aligned = os.path.join(work_dir, "tgt_aligned.wav") output_video = os.path.join(work_dir, f"dubbed_{lang_name}.mp4") progress(0.05, desc=f"{lang_name}: extracting audio...") extract_audio_from_video(video_path, extracted_audio) video_duration = get_media_duration(video_path) results_log.append(f"Video: {video_duration:.1f}s") audio_array, sr = sf.read(extracted_audio, dtype="float32") if audio_array.ndim > 1: audio_array = audio_array.mean(axis=1) progress(0.15, desc=f"{lang_name}: transcribing...") t0 = time.time() english = transcribe(audio_array, sr) results_log.append(f"ASR: {time.time()-t0:.1f}s") if not english: results_log.append("ASR empty — skipped") continue progress(0.4, desc=f"{lang_name}: translating...") t0 = time.time() nllb_code = lang_config["nllb"] translated, _, _ = translate_text(english, nllb_code, fast=True) results_log.append(f"MT: {time.time()-t0:.1f}s") if not translated: results_log.append("Translation empty — skipped") continue progress(0.65, desc=f"{lang_name}: synthesizing...") t0 = time.time() tgt_audio, tgt_sr = synthesize_chunked( translated, lang_config ) sf.write(tgt_audio_raw, tgt_audio, tgt_sr) tgt_duration = len(tgt_audio) / tgt_sr results_log.append(f"TTS: {time.time()-t0:.1f}s ({tgt_duration:.1f}s audio)") progress(0.85, desc=f"{lang_name}: aligning...") MAX_STRETCH = 1.2 stretch_ratio = tgt_duration / video_duration if stretch_ratio <= MAX_STRETCH: if abs(stretch_ratio - 1.0) > 0.02: stretch_audio_to_duration(tgt_audio_raw, tgt_audio_aligned, video_duration) else: import shutil shutil.copy(tgt_audio_raw, tgt_audio_aligned) extend_video = False final_duration = video_duration else: shutil.copy(tgt_audio_raw, tgt_audio_aligned) extend_video = True final_duration = tgt_duration results_log.append(f"Audio longer ({stretch_ratio:.1f}x) — extending video") progress(0.95, desc=f"{lang_name}: combining...") mux_video_audio( video_path, tgt_audio_aligned, output_video, extend_video=extend_video, target_duration=final_duration ) output_videos.append(output_video) except Exception as e: logger.exception(f"Dubbing {lang_name} failed") results_log.append(f"Error: {str(e)}") progress(1.0, desc="Done!") final_video = output_videos[0] if output_videos else None return final_video, "\n".join(results_log) def update_voices(language): """Update voice dropdown when language changes.""" voices = get_voices_for_language(language) return gr.update(choices=voices, value=voices[0]) # ============================================================================= # Gradio UI # ============================================================================= EXAMPLES = [ "And it's a brilliant goal from the striker!", "The referee has shown a yellow card. Corner kick for the home team.", "What a save by the goalkeeper! The match is heading into injury time.", "He dribbles past two defenders and shoots! The ball hits the back of the net!", ] CSS = """ .main-header { text-align: center; margin-bottom: 0.5rem; } .main-header h1 { font-size: 1.8rem; font-weight: 700; margin: 0; } .main-header p { color: #666; font-size: 0.95rem; } .lang-group-label { font-weight: 600; font-size: 0.85rem; color: #888; text-transform: uppercase; letter-spacing: 0.05em; margin-top: 0.5rem; } """ with gr.Blocks( title="PlotWeaver — Live Commentary Translation", theme=gr.themes.Soft(), css=CSS, ) as demo: gr.HTML("""

PlotWeaver

Live commentary translation platform — English to 40+ languages

Qwen Omni (11 languages) + YourVoic API + NLLB-200 (27 languages)

""") with gr.Tabs(): # ====== TAB 1: EVENT MANAGEMENT ====== with gr.TabItem("Event Management"): gr.Markdown("### Create new event") gr.Markdown("Configure your live broadcast event with target languages and input source.") with gr.Row(): with gr.Column(scale=2): event_name = gr.Textbox( label="Event name", placeholder="e.g. Premier League: Arsenal vs. Chelsea", ) with gr.Row(): start_time = gr.Textbox(label="Start time", placeholder="08:30 PM") end_time = gr.Textbox(label="End time", placeholder="10:30 PM") event_date = gr.Textbox(label="Date", placeholder="2026-06-06") gr.Markdown("#### Input source") input_method = gr.Radio( choices=["RTMP Stream", "WebRTC (Browser)", "Direct Audio Feed"], value="RTMP Stream", label="Input method", ) gr.Markdown("#### Target languages") gr.Markdown("Select languages for simultaneous broadcast. Additional languages consume more stream minutes.") # Language checkboxes grouped by category target_langs = gr.CheckboxGroup( choices=ALL_LANGUAGE_NAMES, label="Languages", value=["Spanish"], ) with gr.Column(scale=1): gr.Markdown("#### Estimate summary") estimate_display = gr.Markdown( value="**Event:** Not configured\n\n**Languages:** 1 selected\n\n**Estimated duration:** --\n\n**Total estimate:** --" ) create_event_btn = gr.Button("Create Event", variant="primary", size="lg") event_status = gr.Markdown("") def update_estimate(name, langs, start, end): n_langs = len(langs) if langs else 0 lang_list = ", ".join(langs) if langs else "None" return ( f"**Event:** {name or 'Not set'}\n\n" f"**Languages:** {n_langs} selected\n\n" f"{lang_list}\n\n" f"**Input:** Configured\n\n" f"**Rate:** 1x (Standard)" ) for inp in [event_name, target_langs, start_time, end_time]: inp.change( fn=update_estimate, inputs=[event_name, target_langs, start_time, end_time], outputs=[estimate_display], ) def create_event(name, langs): if not name: return "Please enter an event name." if not langs: return "Please select at least one language." return f"Event **{name}** created with {len(langs)} languages: {', '.join(langs)}" create_event_btn.click( fn=create_event, inputs=[event_name, target_langs], outputs=[event_status], ) # ====== TAB 2: LIVE STUDIO ====== with gr.TabItem("Live Studio"): gr.Markdown("### Live streaming translation") gr.Markdown("Record or stream English commentary and hear it translated in real-time.") with gr.Row(): studio_language = gr.Dropdown( choices=ALL_LANGUAGE_NAMES, value="Spanish", label="Target language", ) studio_voice = gr.Dropdown( choices=get_voices_for_language("Spanish"), value=get_voices_for_language("Spanish")[0], label="Voice", ) studio_language.change( fn=update_voices, inputs=[studio_language], outputs=[studio_voice], ) with gr.Row(): with gr.Column(): studio_audio_in = gr.Audio( label="English commentary (upload or record)", type="numpy", sources=["upload", "microphone"], ) studio_translate_btn = gr.Button("Translate", variant="primary", size="lg") with gr.Column(): studio_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True) studio_log = gr.Markdown(label="Pipeline log") studio_translate_btn.click( fn=full_pipeline_audio, inputs=[studio_audio_in, studio_language], outputs=[studio_audio_out, studio_log], ) # ====== TAB 3: VIDEO DUBBING ====== with gr.TabItem("Video Dubbing"): gr.Markdown("### Video dubbing (English → multi-language)") gr.Markdown( "Upload a video with English commentary and get back a dubbed version. " "**Global languages** (Arabic, French, Spanish, etc.) use Qwen Omni for best quality. " "**African/regional languages** use YourVoic API with NLLB translation." ) with gr.Row(): with gr.Column(): dub_video_in = gr.Video(label="Upload English video", sources=["upload"]) dub_languages = gr.CheckboxGroup( choices=ALL_LANGUAGE_NAMES, label="Target languages", value=["Spanish"], ) with gr.Row(): dub_voice = gr.Dropdown( choices=QWEN_VOICES, value="Ethan", label="Voice (for Qwen languages)", info="Applies to Arabic, French, Spanish, etc. Local languages use default voice.", ) dub_chunk_slider = gr.Slider( minimum=30, maximum=300, value=120, step=10, label="Chunk duration (seconds)", info="Shorter = more API calls but less timeout risk.", ) dub_btn = gr.Button("Dub Video", variant="primary", size="lg") with gr.Column(): dub_video_out = gr.Video(label="Dubbed video (download from player)") dub_log = gr.Markdown( label="Processing log", value="Upload a video and select languages to start." ) dub_btn.click( fn=dub_video, inputs=[dub_video_in, dub_languages, dub_voice, dub_chunk_slider], outputs=[dub_video_out, dub_log], ) # ====== TAB 4: TEXT TRANSLATION ====== with gr.TabItem("Text \u2192 Audio"): gr.Markdown("### Text to translated speech") gr.Markdown("Type English text, choose a language, and hear the translated audio.") with gr.Row(): text_language = gr.Dropdown( choices=ALL_LANGUAGE_NAMES, value="Spanish", label="Target language", ) text_voice = gr.Dropdown( choices=get_voices_for_language("Spanish"), value=get_voices_for_language("Spanish")[0], label="Voice", ) text_language.change( fn=update_voices, inputs=[text_language], outputs=[text_voice], ) with gr.Row(): with gr.Column(): text_input = gr.Textbox( label="English text", placeholder="Type English football commentary here...", lines=4, ) text_btn = gr.Button("Translate to speech", variant="primary", size="lg") gr.Examples( examples=[[e] for e in EXAMPLES], inputs=[text_input], label="Example commentary", ) with gr.Column(): text_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True) text_log = gr.Markdown(label="Pipeline log") text_btn.click( fn=full_pipeline_text, inputs=[text_input, text_language, text_voice], outputs=[text_audio_out, text_log], ) # ====== TAB 5: RECORDINGS ====== with gr.TabItem("Recordings & Clips"): gr.Markdown("### Recordings management") gr.Markdown( "Past dubbed recordings will appear here. " "This feature is coming soon — for now, use Video Dubbing to create new recordings " "and download them from the player." ) # ====== TAB 6: VOICE MODELS ====== with gr.TabItem("Voice Models"): gr.Markdown("### Voice model library") gr.Markdown("Browse available voices for each language.") voice_lang_select = gr.Dropdown( choices=ALL_LANGUAGE_NAMES, value="Spanish", label="Select language", ) voice_info = gr.Markdown() def show_voice_info(lang): config = LANGUAGES.get(lang, {}) engine = config.get("tts_engine", "unknown") voices = config.get("yourvoic_voices", []) info = f"### {lang}\n\n" if engine == "qwen": info += f"**Engine:** Qwen 3.5 Omni (end-to-end speech-to-speech)\n\n" info += f"This is the highest quality option. Qwen handles ASR + translation + TTS in a single API call, " info += f"preserving tone, emotion, and pacing from the original speaker.\n\n" info += f"**Available voices ({len(QWEN_VOICES)}):** {', '.join(QWEN_VOICES[:10])}... and {len(QWEN_VOICES)-10} more\n\n" info += f"All voices support all Qwen languages." elif engine == "yourvoic": info += f"**Engine:** YourVoic API (TTS) + NLLB-200 (translation)\n\n" info += f"**YourVoic language:** `{config.get('yourvoic_lang', 'N/A')}`\n\n" info += f"**Available voices:** {', '.join(voices) if voices else 'Peter (default)'}" else: info += f"**Engine:** Not available\n\n" info += f"**NLLB code:** `{config.get('nllb', 'N/A')}`\n\n" info += "Uses locally fine-tuned models on GPU. Voice selection not available." return info voice_lang_select.change(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info]) demo.load(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info]) gr.Markdown(""" --- **PlotWeaver** by PlotweaverAI | Models: [ASR](https://huggingface.co/PlotweaverAI/whisper-small-de-en) | [MT](https://huggingface.co/PlotweaverAI/nllb-200-distilled-600M-african-6lang) | [TTS](https://yourvoic.com) | [Qwen Omni](https://www.alibabacloud.com/help/en/model-studio/qwen-omni) """) if __name__ == "__main__": demo.launch()