PlotweaverModel's picture
Upload app.py
a4b9343 verified
"""
PlotWeaver — Live Commentary Translation Platform (Single File)
================================================================
Two engines: Qwen Omni | YourVoic API (with NLLB MT)
"""
import os, io, re, time, base64, struct, shutil, subprocess, tempfile, logging
import torch, numpy as np, requests, soundfile as sf, gradio as gr
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
# =============================================================================
# LANGUAGES
# =============================================================================
# Qwen Omni voices (work across all Qwen-supported languages)
QWEN_VOICES = [
"Cherry", "Serena", "Ethan", "Chelsie", "Momo", "Vivian", "Moon", "Maia",
"Kai", "Nofish", "Bella", "Jennifer", "Ryan", "Katerina", "Aiden",
"Eldric Sage", "Mia", "Mochi", "Bellona", "Vincent", "Bunny", "Neil",
"Elias", "Arthur", "Seren", "Bodega", "Sonrisa", "Alek", "Dolce",
"Sohee", "Ono Anna", "Lenn", "Emilien", "Andre",
]
# Each language entry:
# "Display Name": {
# "nllb": NLLB-200 language code (for local/yourvoic pipeline translation),
# "yourvoic_lang": YourVoic language code (or None),
# "yourvoic_voices": list of YourVoic voice names,
# "tts_engine": "qwen" | "yourvoic" | "local",
# "qwen_code": short language code for Qwen prompts (or None),
# "qwen_name": full language name for Qwen system prompt (or None),
# }
LANGUAGES = {
# ---- Qwen Omni Languages (end-to-end speech-to-speech, 11 languages) ----
"English": {
"nllb": "eng_Latn", "yourvoic_lang": "en-US",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "en", "qwen_name": "English",
},
"Chinese (Mandarin)": {
"nllb": "zho_Hans", "yourvoic_lang": "zh-CN",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "zh", "qwen_name": "Mandarin Chinese",
},
"Japanese": {
"nllb": "jpn_Jpan", "yourvoic_lang": "ja-JP",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "ja", "qwen_name": "Japanese",
},
"Korean": {
"nllb": "kor_Hang", "yourvoic_lang": "ko-KR",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "ko", "qwen_name": "Korean",
},
"German": {
"nllb": "deu_Latn", "yourvoic_lang": "de-DE",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "de", "qwen_name": "German",
},
"French": {
"nllb": "fra_Latn", "yourvoic_lang": "fr-FR",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "fr", "qwen_name": "French",
},
"Russian": {
"nllb": "rus_Cyrl", "yourvoic_lang": "ru-RU",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "ru", "qwen_name": "Russian",
},
"Portuguese": {
"nllb": "por_Latn", "yourvoic_lang": "pt-BR",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "pt", "qwen_name": "Portuguese",
},
"Spanish": {
"nllb": "spa_Latn", "yourvoic_lang": "es-ES",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "es", "qwen_name": "Spanish",
},
"Italian": {
"nllb": "ita_Latn", "yourvoic_lang": "it-IT",
"yourvoic_voices": ["Peter", "Kylie"], "tts_engine": "qwen",
"qwen_code": "it", "qwen_name": "Italian",
},
"Arabic": {
"nllb": "arb_Arab", "yourvoic_lang": "ar-SA",
"yourvoic_voices": ["Peter"], "tts_engine": "qwen",
"qwen_code": "ar", "qwen_name": "Modern Standard Arabic",
},
# ---- African Languages (YourVoic API) ----
"Swahili": {
"nllb": "swh_Latn", "yourvoic_lang": "sw-KE",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Amharic": {
"nllb": "amh_Ethi", "yourvoic_lang": "am-ET",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Afrikaans": {
"nllb": "afr_Latn", "yourvoic_lang": "af-ZA",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
# ---- South Asian (YourVoic TTS + NLLB MT) ----
"Hindi": {
"nllb": "hin_Deva", "yourvoic_lang": "hi-IN",
"yourvoic_voices": ["Rahul", "Deepika", "Aditya"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Bengali": {
"nllb": "ben_Beng", "yourvoic_lang": "bn-IN",
"yourvoic_voices": ["Sneha", "Aryan"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Tamil": {
"nllb": "tam_Taml", "yourvoic_lang": "ta-IN",
"yourvoic_voices": ["Priya", "Kumar"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Telugu": {
"nllb": "tel_Telu", "yourvoic_lang": "te-IN",
"yourvoic_voices": ["Arjun", "Lakshmi"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Marathi": {
"nllb": "mar_Deva", "yourvoic_lang": "mr-IN",
"yourvoic_voices": ["Anjali", "Rohan"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Urdu": {
"nllb": "urd_Arab", "yourvoic_lang": "ur-PK",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Nepali": {
"nllb": "npi_Deva", "yourvoic_lang": "ne-NP",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
# ---- Southeast Asian (YourVoic) ----
"Indonesian": {
"nllb": "ind_Latn", "yourvoic_lang": "id-ID",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Vietnamese": {
"nllb": "vie_Latn", "yourvoic_lang": "vi-VN",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Thai": {
"nllb": "tha_Thai", "yourvoic_lang": "th-TH",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Malay": {
"nllb": "zsm_Latn", "yourvoic_lang": "ms-MY",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Filipino": {
"nllb": "tgl_Latn", "yourvoic_lang": "fil-PH",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
# ---- European (YourVoic) ----
"Dutch": {
"nllb": "nld_Latn", "yourvoic_lang": "nl-NL",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Polish": {
"nllb": "pol_Latn", "yourvoic_lang": "pl-PL",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Turkish": {
"nllb": "tur_Latn", "yourvoic_lang": "tr-TR",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Swedish": {
"nllb": "swe_Latn", "yourvoic_lang": "sv-SE",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Romanian": {
"nllb": "ron_Latn", "yourvoic_lang": "ro-RO",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Greek": {
"nllb": "ell_Grek", "yourvoic_lang": "el-GR",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Ukrainian": {
"nllb": "ukr_Cyrl", "yourvoic_lang": "uk-UA",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Finnish": {
"nllb": "fin_Latn", "yourvoic_lang": "fi-FI",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Danish": {
"nllb": "dan_Latn", "yourvoic_lang": "da-DK",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Norwegian": {
"nllb": "nob_Latn", "yourvoic_lang": "nb-NO",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
# ---- Middle Eastern (YourVoic) ----
"Persian": {
"nllb": "pes_Arab", "yourvoic_lang": "fa-IR",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
"Hebrew": {
"nllb": "heb_Hebr", "yourvoic_lang": "he-IL",
"yourvoic_voices": ["Peter"], "tts_engine": "yourvoic",
"qwen_code": None, "qwen_name": None,
},
}
# Group languages by category for the UI
LANGUAGE_GROUPS = {
"Global Languages": [
"Spanish", "French", "German", "Mandarin", "Italian",
"Japanese", "Portuguese", "Hindi", "Arabic", "Korean", "Russian",
],
"African Languages": [
"Swahili", "Amharic", "Afrikaans",
],
"South Asian": [
"Bengali", "Tamil", "Telugu", "Marathi", "Urdu", "Nepali",
],
"Southeast Asian": [
"Indonesian", "Vietnamese", "Thai", "Malay", "Filipino",
],
"European": [
"Dutch", "Polish", "Turkish", "Swedish", "Romanian",
"Greek", "Ukrainian", "Finnish", "Danish", "Norwegian",
],
"Middle Eastern": [
"Persian", "Hebrew",
],
}
# All language display names (for dropdowns)
ALL_LANGUAGE_NAMES = sorted(LANGUAGES.keys())
# Languages that use YourVoic API
YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"]
# Languages that use YourVoic API
YOURVOIC_LANGUAGES = [k for k, v in LANGUAGES.items() if v["tts_engine"] == "yourvoic"]
# =============================================================================
# PIPELINE: ASR + MT + Video helpers
# =============================================================================
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32
# Models (loaded once at startup)
asr_pipe = None
mt_tokenizer = None
mt_model = None
def load_models():
"""Load all models at startup."""
global asr_pipe, mt_tokenizer, mt_model
from transformers import (
pipeline as hf_pipeline,
AutoTokenizer,
AutoModelForSeq2SeqLM,
)
print(f"Device: {DEVICE} | Dtype: {TORCH_DTYPE}")
print("Loading models...")
# ASR
ASR_MODEL_ID = "PlotweaverAI/whisper-small-de-en"
print(f" Loading ASR: {ASR_MODEL_ID}")
asr_pipe = hf_pipeline(
"automatic-speech-recognition",
model=ASR_MODEL_ID,
device=DEVICE,
torch_dtype=TORCH_DTYPE,
)
print(" ASR loaded")
# MT
MT_MODEL_ID = "PlotweaverAI/nllb-200-distilled-600M-african-6lang"
print(f" Loading MT: {MT_MODEL_ID}")
mt_tokenizer = AutoTokenizer.from_pretrained(MT_MODEL_ID)
mt_model = AutoModelForSeq2SeqLM.from_pretrained(
MT_MODEL_ID, torch_dtype=TORCH_DTYPE
).to(DEVICE)
mt_tokenizer.src_lang = "eng_Latn"
print(" MT loaded")
# Diagnostics
print(f"\n=== Device diagnostics ===")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA device: {torch.cuda.get_device_name(0)}")
print(f"ASR on: {next(asr_pipe.model.parameters()).device}")
print(f"MT on: {next(mt_model.parameters()).device}")
print(f"YourVoic API key: {'set' if os.environ.get('YOURVOIC_API_KEY') else 'NOT SET'}")
print(f"Dashscope key: {'set' if os.environ.get('DASHSCOPE_API_KEY') else 'NOT SET'}")
print(f"==========================\n")
print("All models loaded!")
# ---- Text Processing ----
def split_into_sentences(text):
"""Split raw ASR text into individual sentences."""
text = text.strip()
if not text:
return []
text = '. '.join(s.strip().capitalize() for s in text.split('. ') if s.strip())
if re.search(r'[.!?]', text):
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
words = text.split()
MAX_WORDS = 12
sentences = []
for i in range(0, len(words), MAX_WORDS):
chunk = ' '.join(words[i:i + MAX_WORDS])
if not chunk.endswith(('.', '!', '?')):
chunk += '.'
chunk = chunk[0].upper() + chunk[1:] if len(chunk) > 1 else chunk.upper()
sentences.append(chunk)
return sentences
# ---- ASR ----
def transcribe(audio_array, sample_rate=16000):
"""ASR: English audio to text. Handles both short and long audio."""
if len(audio_array) < 1600:
return ""
duration_s = len(audio_array) / sample_rate
if sample_rate != 16000:
import torchaudio.functional as F_audio
audio_tensor = torch.from_numpy(audio_array).float()
audio_tensor = F_audio.resample(audio_tensor, sample_rate, 16000)
audio_array = audio_tensor.numpy()
sample_rate = 16000
if duration_s <= 28:
result = asr_pipe(
{"raw": audio_array, "sampling_rate": sample_rate},
return_timestamps=False,
)
return result["text"].strip()
# Long-form: native Whisper generate
model = asr_pipe.model
processor = asr_pipe.feature_extractor
tokenizer = asr_pipe.tokenizer
inputs = processor(
audio_array, sampling_rate=16000, return_tensors="pt",
truncation=False, padding="longest", return_attention_mask=True,
)
input_features = inputs.input_features.to(DEVICE, dtype=TORCH_DTYPE)
attention_mask = inputs.attention_mask.to(DEVICE) if "attention_mask" in inputs else None
generate_kwargs = {"return_timestamps": True, "language": "en", "task": "transcribe"}
if attention_mask is not None:
generate_kwargs["attention_mask"] = attention_mask
with torch.no_grad():
predicted_ids = model.generate(input_features, **generate_kwargs)
transcription = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0]
return transcription.strip()
# ---- MT ----
def translate_sentence(text, target_nllb_code, fast=True, max_length=256):
"""Translate a single sentence from English to target language."""
inputs = mt_tokenizer(text, return_tensors="pt", truncation=True).to(DEVICE)
tgt_lang_id = mt_tokenizer.convert_tokens_to_ids(target_nllb_code)
generate_kwargs = {
"forced_bos_token_id": tgt_lang_id,
"repetition_penalty": 1.5,
"no_repeat_ngram_size": 3,
}
if fast:
generate_kwargs.update({"max_length": 128, "num_beams": 1, "do_sample": False})
else:
generate_kwargs.update({"max_length": max_length, "num_beams": 4, "early_stopping": True})
with torch.no_grad():
output_ids = mt_model.generate(**inputs, **generate_kwargs)
return mt_tokenizer.decode(output_ids[0], skip_special_tokens=True)
def translate_text(text, target_nllb_code, fast=True):
"""Split and translate full text sentence-by-sentence."""
sentences = split_into_sentences(text)
if not sentences:
return "", [], []
translations = []
for s in sentences:
yo = translate_sentence(s, target_nllb_code, fast=fast)
translations.append(yo)
return ' '.join(translations), sentences, translations
# ---- Video Processing ----
def extract_audio_from_video(video_path, output_path, target_sr=16000):
"""Extract audio track from video as 16kHz mono WAV."""
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-vn", "-acodec", "pcm_s16le", "-ar", str(target_sr), "-ac", "1",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg extraction failed: {result.stderr[:200]}")
return output_path
def get_media_duration(path):
"""Get duration in seconds."""
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffprobe failed: {result.stderr[:200]}")
return float(result.stdout.strip())
def stretch_audio_to_duration(input_path, output_path, target_duration_s):
"""Stretch/compress audio to match target duration."""
current_duration = get_media_duration(input_path)
if current_duration <= 0:
raise RuntimeError("Invalid audio duration")
ratio = current_duration / target_duration_s
filters = []
remaining = ratio
while remaining > 2.0:
filters.append("atempo=2.0")
remaining /= 2.0
while remaining < 0.5:
filters.append("atempo=0.5")
remaining /= 0.5
filters.append(f"atempo={remaining:.4f}")
cmd = ["ffmpeg", "-y", "-i", input_path, "-filter:a", ",".join(filters), output_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg tempo failed: {result.stderr[:200]}")
return output_path
def mux_video_audio(video_path, audio_path, output_path, extend_video=False, target_duration=None):
"""Combine video with new audio. Optionally extend video by freezing last frame."""
if extend_video and target_duration:
cmd = [
"ffmpeg", "-y", "-i", video_path, "-i", audio_path,
"-filter_complex", f"[0:v]tpad=stop_mode=clone:stop_duration={target_duration}[v]",
"-map", "[v]", "-map", "1:a:0",
"-c:v", "libx264", "-preset", "fast", "-c:a", "aac",
"-t", str(target_duration), output_path,
]
else:
cmd = [
"ffmpeg", "-y", "-i", video_path, "-i", audio_path,
"-c:v", "copy", "-c:a", "aac",
"-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg mux failed: {result.stderr[:200]}")
return output_path
# =============================================================================
# TTS ENGINE: YourVoic API
# =============================================================================
YOURVOIC_API_KEY = os.environ.get("YOURVOIC_API_KEY", "")
YOURVOIC_STREAM_URL = "https://yourvoic.com/api/v1/tts/stream"
def synthesize_yourvoic(text, language_code, voice="Peter", speed=1.0):
"""Synthesize text using YourVoic API."""
if not YOURVOIC_API_KEY:
raise RuntimeError("YOURVOIC_API_KEY not set.")
headers = {"X-API-Key": YOURVOIC_API_KEY, "Content-Type": "application/json"}
payload = {"text": text, "voice": voice, "language": language_code, "model": "aura-prime", "speed": speed}
t0 = time.time()
response = requests.post(YOURVOIC_STREAM_URL, headers=headers, json=payload, stream=True, timeout=60)
if response.status_code != 200:
raise RuntimeError(f"YourVoic error {response.status_code}: {response.text[:200]}")
# Detect format from content-type header
ct = response.headers.get("content-type", "").lower()
logger.info(f"YourVoic content-type: {ct}")
# Collect audio bytes
audio_data = b""
for chunk in response.iter_content(chunk_size=8192):
audio_data += chunk
elapsed = time.time() - t0
logger.info(f"YourVoic TTS: {len(text)} chars, {elapsed:.2f}s, {len(audio_data)} bytes")
# Log first bytes for format detection
magic = audio_data[:16] if len(audio_data) > 16 else audio_data
logger.info(f"YourVoic first bytes: {magic[:8]}")
# Determine file extension from content-type or magic bytes
if b"RIFF" in audio_data[:4]:
ext = ".wav"
elif b"\xff\xfb" in audio_data[:3] or b"\xff\xf3" in audio_data[:3] or b"ID3" in audio_data[:3]:
ext = ".mp3"
elif b"OggS" in audio_data[:4]:
ext = ".ogg"
elif b"fLaC" in audio_data[:4]:
ext = ".flac"
elif "mp3" in ct or "mpeg" in ct:
ext = ".mp3"
elif "ogg" in ct:
ext = ".ogg"
elif "wav" in ct:
ext = ".wav"
elif "flac" in ct:
ext = ".flac"
elif "linear16" in ct or "pcm" in ct or "l16" in ct:
ext = ".raw"
else:
ext = ".mp3" # Most common API default
logger.warning(f"Unknown YourVoic format (ct={ct}), guessing mp3")
# Save with correct extension
tmp_path = tempfile.NamedTemporaryFile(suffix=ext, delete=False).name
with open(tmp_path, "wb") as f:
f.write(audio_data)
# Try reading directly with soundfile
try:
audio_array, sample_rate = sf.read(tmp_path, dtype="float32")
os.unlink(tmp_path)
return audio_array, sample_rate
except Exception as e:
logger.warning(f"soundfile can't read {ext}: {e}")
# Handle raw PCM (linear16): wrap in WAV header
if ext == ".raw":
try:
sr = 24000
raw_data = audio_data
wav_path = tmp_path + ".wav"
with open(wav_path, "wb") as f:
f.write(b"RIFF")
f.write(struct.pack("<I", 36 + len(raw_data)))
f.write(b"WAVE")
f.write(b"fmt ")
f.write(struct.pack("<IHHIIHH", 16, 1, 1, sr, sr * 2, 2, 16))
f.write(b"data")
f.write(struct.pack("<I", len(raw_data)))
f.write(raw_data)
audio_array, sample_rate = sf.read(wav_path, dtype="float32")
os.unlink(tmp_path)
os.unlink(wav_path)
return audio_array, sample_rate
except Exception as e:
logger.warning(f"Raw PCM wrap failed: {e}")
# Fallback: convert with ffmpeg
try:
wav_path = tmp_path + ".wav"
result = subprocess.run(
["ffmpeg", "-y", "-i", tmp_path, "-acodec", "pcm_s16le", "-ar", "24000", "-ac", "1", wav_path],
capture_output=True, text=True,
)
os.unlink(tmp_path)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {result.stderr[-300:]}")
audio_array, sample_rate = sf.read(wav_path, dtype="float32")
os.unlink(wav_path)
return audio_array, sample_rate
except Exception as e2:
for f in [tmp_path, tmp_path + ".wav"]:
if os.path.exists(f): os.unlink(f)
raise RuntimeError(f"YourVoic decode failed: {e2}")
def synthesize_yourvoic_to_file(text, language_code, output_path, voice="Peter", speed=1.0):
"""Synthesize via YourVoic and save to file."""
audio, sr = synthesize_yourvoic(text, language_code, voice, speed)
sf.write(output_path, audio, sr)
return output_path, sr
def synthesize_chunked(text, language_config, sentences_per_chunk=2):
"""
Synthesize long text by chunking into sentence groups via YourVoic API.
Args:
text: Full text to synthesize
language_config: Dict from LANGUAGES (has yourvoic_lang, yourvoic_voices, etc.)
sentences_per_chunk: How many sentences to synthesize per API call
Returns:
(audio_array, sample_rate)
"""
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
if not sentences:
return np.zeros(int(0.5 * 16000), dtype=np.float32), 16000
audio_segments = []
output_sr = None
for i in range(0, len(sentences), sentences_per_chunk):
chunk_text = ' '.join(sentences[i:i + sentences_per_chunk])
if not chunk_text:
continue
try:
voice = language_config["yourvoic_voices"][0] if language_config.get("yourvoic_voices") else "Peter"
lang_code = language_config["yourvoic_lang"]
audio_seg, seg_sr = synthesize_yourvoic(chunk_text, lang_code, voice)
if output_sr is None:
output_sr = seg_sr
if len(audio_seg) > 0:
audio_segments.append(audio_seg)
silence = np.zeros(int(0.15 * seg_sr), dtype=np.float32)
audio_segments.append(silence)
except Exception as e:
logger.error(f"TTS chunk failed: {e}")
continue
if not audio_segments:
fallback_sr = output_sr or 16000
logger.warning("All TTS chunks failed — returning silence")
return np.zeros(int(0.5 * fallback_sr), dtype=np.float32), fallback_sr
return np.concatenate(audio_segments), output_sr
# =============================================================================
# QWEN OMNI ENGINE
# =============================================================================
QWEN_MODEL = "qwen3.5-omni-plus"
QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
def _get_client():
"""Create OpenAI-compatible client for Qwen Dashscope API."""
from openai import OpenAI
api_key = os.environ.get("DASHSCOPE_API_KEY", "")
if not api_key:
raise RuntimeError(
"DASHSCOPE_API_KEY not set. Add it as a Space secret."
)
return OpenAI(api_key=api_key, base_url=QWEN_BASE_URL)
def _wav_to_base64(wav_path):
"""Read WAV file and return base64 string."""
with open(wav_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def _base64_to_wav(b64_data, output_path):
"""Convert raw PCM base64 audio to WAV file (24kHz, mono, 16-bit)."""
audio_bytes = base64.b64decode(b64_data)
sample_rate = 24000
num_channels = 1
bits_per_sample = 16
byte_rate = sample_rate * num_channels * bits_per_sample // 8
block_align = num_channels * bits_per_sample // 8
data_size = len(audio_bytes)
with open(output_path, "wb") as f:
f.write(b"RIFF")
f.write(struct.pack("<I", 36 + data_size))
f.write(b"WAVE")
f.write(b"fmt ")
f.write(struct.pack("<I", 16))
f.write(struct.pack("<H", 1))
f.write(struct.pack("<H", num_channels))
f.write(struct.pack("<I", sample_rate))
f.write(struct.pack("<I", byte_rate))
f.write(struct.pack("<H", block_align))
f.write(struct.pack("<H", bits_per_sample))
f.write(b"data")
f.write(struct.pack("<I", data_size))
f.write(audio_bytes)
def _extract_audio_chunk(video_path, output_wav, start_sec, duration_sec):
"""Extract a chunk of audio from video as 16kHz mono WAV."""
subprocess.run(
["ffmpeg", "-y", "-ss", str(start_sec), "-t", str(duration_sec),
"-i", video_path, "-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1", output_wav],
capture_output=True, check=True,
)
def _get_duration(filepath):
"""Get media file duration in seconds."""
result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", filepath],
capture_output=True, text=True,
)
return float(result.stdout.strip())
def _concatenate_wavs(wav_files, output_path):
"""Concatenate WAV files using ffmpeg."""
if len(wav_files) == 1:
shutil.copy2(wav_files[0], output_path)
return
list_file = output_path + ".txt"
with open(list_file, "w") as f:
for wav in wav_files:
f.write(f"file '{wav}'\n")
subprocess.run(
["ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", list_file, "-c", "copy", output_path],
capture_output=True, check=True,
)
os.remove(list_file)
def _build_system_prompt(language_name):
"""Build Qwen system prompt for a target language."""
return (
f"You are a professional video dubbing translator. You will receive audio in English.\n"
f"Your task:\n"
f"1. Listen carefully to the English speech.\n"
f"2. Translate it into natural, fluent {language_name}.\n"
f"3. Respond ONLY with the {language_name} translation spoken aloud — no English, no commentary,\n"
f" no meta-text, no transliteration. Speak entirely in {language_name}.\n"
f"4. Match the tone, emotion, and pacing of the original speaker as closely as possible.\n"
f"5. If there are pauses or silence in the original audio, maintain similar pacing.\n"
f"6. Translate idioms and cultural references into their {language_name} equivalents.\n"
f"7. Use clear, professional pronunciation suitable for a broad audience."
)
def translate_chunk_qwen(wav_path, voice, language_name, chunk_index=0):
"""
Translate a single audio chunk using Qwen Omni.
Args:
wav_path: Path to input WAV file (English audio)
voice: Qwen voice name (e.g. "Ethan", "Cherry")
language_name: Full language name for the system prompt
chunk_index: For logging
Returns:
(output_wav_path, transcript) or (None, transcript) if no audio
"""
client = _get_client()
audio_b64 = _wav_to_base64(wav_path)
output_wav = wav_path.replace(".wav", f"_qwen_{chunk_index}.wav")
system_prompt = _build_system_prompt(language_name)
user_prompt = f"Translate this English speech into {language_name}. Respond only with the spoken {language_name} translation."
t0 = time.time()
completion = client.chat.completions.create(
model=QWEN_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{
"type": "input_audio",
"input_audio": {
"data": f"data:audio/wav;base64,{audio_b64}",
"format": "wav",
},
},
{"type": "text", "text": user_prompt},
],
},
],
modalities=["text", "audio"],
audio={"voice": voice, "format": "wav"},
stream=True,
stream_options={"include_usage": True},
)
audio_chunks = []
transcript_parts = []
for event in completion:
if not event.choices:
continue
delta = event.choices[0].delta
if hasattr(delta, "content") and delta.content:
transcript_parts.append(delta.content)
if hasattr(delta, "audio") and delta.audio:
if isinstance(delta.audio, dict):
if "data" in delta.audio:
audio_chunks.append(delta.audio["data"])
elif hasattr(delta.audio, "data") and delta.audio.data:
audio_chunks.append(delta.audio.data)
transcript = "".join(transcript_parts)
elapsed = time.time() - t0
logger.info(f"Qwen chunk {chunk_index}: {elapsed:.1f}s, transcript={transcript[:60]}")
if audio_chunks:
full_audio_b64 = "".join(audio_chunks)
_base64_to_wav(full_audio_b64, output_wav)
return output_wav, transcript
return None, transcript
def dub_video_qwen(video_path, language_name, voice="Ethan", chunk_seconds=120, progress_fn=None):
"""
Full video dubbing pipeline using Qwen Omni.
Splits video into chunks, translates each chunk via Qwen API,
concatenates results, and muxes back onto video.
Args:
video_path: Path to input video
language_name: Full language name (e.g. "French", "Arabic")
voice: Qwen voice name
chunk_seconds: Audio chunk duration for API calls
progress_fn: Optional gradio progress callback
Returns:
(output_video_path, log_text)
"""
tmp_dir = tempfile.mkdtemp(prefix=f"qwen_dub_")
log = []
try:
# Duration
if progress_fn:
progress_fn(0.05, desc="Analyzing video...")
total_duration = _get_duration(video_path)
log.append(f"**Video:** {total_duration:.1f}s")
log.append(f"**Engine:** Qwen 3.5 Omni")
log.append(f"**Voice:** {voice}")
log.append(f"**Language:** {language_name}")
if total_duration > 3600:
return None, "Video longer than 1 hour — please use a shorter clip."
# Split into chunks
if progress_fn:
progress_fn(0.1, desc="Extracting audio chunks...")
num_chunks = max(1, int(total_duration // chunk_seconds) + (1 if total_duration % chunk_seconds > 0 else 0))
log.append(f"**Chunks:** {num_chunks} ({chunk_seconds}s each)")
input_chunks = []
for i in range(num_chunks):
start = i * chunk_seconds
duration = min(chunk_seconds, total_duration - start)
chunk_path = os.path.join(tmp_dir, f"chunk_{i:03d}.wav")
_extract_audio_chunk(video_path, chunk_path, start, duration)
input_chunks.append(chunk_path)
# Translate each chunk
output_chunks = []
all_transcripts = []
for i, chunk_path in enumerate(input_chunks):
if progress_fn:
frac = 0.15 + 0.7 * (i / num_chunks)
progress_fn(frac, desc=f"Translating chunk {i+1}/{num_chunks}...")
result_path, transcript = translate_chunk_qwen(
chunk_path, voice, language_name, i
)
if transcript:
all_transcripts.append(f"**[{i+1}]** {transcript}")
if result_path:
output_chunks.append(result_path)
else:
# Silence fallback
duration = _get_duration(chunk_path)
silence_path = os.path.join(tmp_dir, f"silence_{i:03d}.wav")
subprocess.run(
["ffmpeg", "-y", "-f", "lavfi",
"-i", "anullsrc=r=24000:cl=mono",
"-t", str(duration), "-acodec", "pcm_s16le", silence_path],
capture_output=True, check=True,
)
output_chunks.append(silence_path)
# Concatenate
if progress_fn:
progress_fn(0.88, desc="Assembling audio...")
full_audio = os.path.join(tmp_dir, "full_dubbed.wav")
_concatenate_wavs(output_chunks, full_audio)
# Mux onto video
if progress_fn:
progress_fn(0.93, desc="Combining audio and video...")
output_video = os.path.join(tmp_dir, "dubbed_output.mp4")
subprocess.run(
["ffmpeg", "-y", "-i", video_path, "-i", full_audio,
"-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0",
"-shortest", output_video],
capture_output=True, check=True,
)
if progress_fn:
progress_fn(1.0, desc="Done!")
log.append(f"\n**Transcript:**")
log.extend(all_transcripts)
return output_video, "\n".join(log)
except Exception as e:
logger.exception("Qwen dubbing failed")
shutil.rmtree(tmp_dir, ignore_errors=True)
return None, f"Error: {str(e)}"
# =============================================================================
# GRADIO APP
# =============================================================================
# Load models at startup
load_models()
# =============================================================================
# Helper functions
# =============================================================================
def get_voices_for_language(lang_name):
"""Get available voices for a language based on its engine."""
config = LANGUAGES.get(lang_name, {})
engine = config.get("tts_engine", "local")
if engine == "qwen":
return QWEN_VOICES
elif engine == "yourvoic" and config.get("yourvoic_voices"):
return config["yourvoic_voices"]
elif engine == "local":
return ["Peter"]
return ["Peter"]
def full_pipeline_audio(audio_input, target_language):
"""Full pipeline: English audio → target language audio."""
if audio_input is None:
return None, "Please upload or record audio."
lang_config = LANGUAGES.get(target_language)
if not lang_config:
return None, f"Language '{target_language}' not configured."
sample_rate, audio_array = audio_input
audio_array = audio_array.astype(np.float32)
if audio_array.ndim > 1:
audio_array = audio_array.mean(axis=1)
if audio_array.max() > 1.0 or audio_array.min() < -1.0:
max_val = max(abs(audio_array.max()), abs(audio_array.min()))
if max_val > 0:
audio_array = audio_array / max_val
log = []
total_start = time.time()
# ASR
t0 = time.time()
english = transcribe(audio_array, sample_rate)
log.append(f"**ASR** ({time.time()-t0:.2f}s)\n{english}")
if not english:
return None, "ASR returned empty text."
# MT
t0 = time.time()
nllb_code = lang_config["nllb"]
translated, en_sents, tgt_sents = translate_text(english, nllb_code, fast=False)
log.append(f"\n**Translation** ({time.time()-t0:.2f}s)")
for e, t in zip(en_sents, tgt_sents):
log.append(f" EN: {e}\n {target_language.upper()}: {t}")
if not translated:
return None, "Translation returned empty."
# TTS
t0 = time.time()
audio_out, sr_out = synthesize_chunked(
translated, lang_config
)
log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio")
total = time.time() - total_start
log.append(f"\n**Total: {total:.2f}s**")
return (sr_out, audio_out), "\n".join(log)
def full_pipeline_text(english_text, target_language, voice_name):
"""Text-only pipeline: English text → target language audio."""
if not english_text or not english_text.strip():
return None, "Please enter English text."
lang_config = LANGUAGES.get(target_language)
if not lang_config:
return None, f"Language '{target_language}' not configured."
log = []
total_start = time.time()
# MT
t0 = time.time()
nllb_code = lang_config["nllb"]
translated, en_sents, tgt_sents = translate_text(english_text.strip(), nllb_code, fast=False)
log.append(f"**Translation** ({time.time()-t0:.2f}s)")
for e, t in zip(en_sents, tgt_sents):
log.append(f" EN: {e}\n {target_language.upper()}: {t}")
if not translated:
return None, "Translation returned empty."
# TTS
t0 = time.time()
audio_out, sr_out = synthesize_chunked(
translated, lang_config
)
log.append(f"\n**TTS** ({time.time()-t0:.2f}s) = {len(audio_out)/sr_out:.1f}s audio")
total = time.time() - total_start
log.append(f"\n**Total: {total:.2f}s**")
return (sr_out, audio_out), "\n".join(log)
def dub_video(video_path, target_languages, dub_voice, chunk_seconds, progress=gr.Progress()):
"""
Dub a video into one or more target languages.
Routes to Qwen Omni for global languages, YourVoic for others.
"""
if video_path is None:
return None, "Please upload a video."
if not target_languages:
return None, "Please select at least one target language."
results_log = []
output_videos = []
for lang_name in target_languages:
lang_config = LANGUAGES.get(lang_name)
if not lang_config:
results_log.append(f"**{lang_name}**: not configured, skipped")
continue
engine = lang_config.get("tts_engine", "local")
results_log.append(f"\n{'='*50}")
results_log.append(f"**Dubbing: {lang_name}** (engine: {engine})")
results_log.append(f"{'='*50}")
try:
if engine == "qwen":
# Qwen Omni: end-to-end speech-to-speech (best for global languages)
qwen_lang_name = lang_config.get("qwen_name", lang_name)
voice = dub_voice if dub_voice in QWEN_VOICES else "Ethan"
out_video, log_text = dub_video_qwen(
video_path, qwen_lang_name, voice=voice,
chunk_seconds=chunk_seconds, progress_fn=progress,
)
results_log.append(log_text)
if out_video:
output_videos.append(out_video)
else:
# Local/YourVoic pipeline: ASR → NLLB → TTS
work_dir = tempfile.mkdtemp(prefix=f"dub_{lang_name}_")
extracted_audio = os.path.join(work_dir, "audio.wav")
tgt_audio_raw = os.path.join(work_dir, "tgt_raw.wav")
tgt_audio_aligned = os.path.join(work_dir, "tgt_aligned.wav")
output_video = os.path.join(work_dir, f"dubbed_{lang_name}.mp4")
progress(0.05, desc=f"{lang_name}: extracting audio...")
extract_audio_from_video(video_path, extracted_audio)
video_duration = get_media_duration(video_path)
results_log.append(f"Video: {video_duration:.1f}s")
audio_array, sr = sf.read(extracted_audio, dtype="float32")
if audio_array.ndim > 1:
audio_array = audio_array.mean(axis=1)
progress(0.15, desc=f"{lang_name}: transcribing...")
t0 = time.time()
english = transcribe(audio_array, sr)
results_log.append(f"ASR: {time.time()-t0:.1f}s")
if not english:
results_log.append("ASR empty — skipped")
continue
progress(0.4, desc=f"{lang_name}: translating...")
t0 = time.time()
nllb_code = lang_config["nllb"]
translated, _, _ = translate_text(english, nllb_code, fast=True)
results_log.append(f"MT: {time.time()-t0:.1f}s")
if not translated:
results_log.append("Translation empty — skipped")
continue
progress(0.65, desc=f"{lang_name}: synthesizing...")
t0 = time.time()
tgt_audio, tgt_sr = synthesize_chunked(
translated, lang_config
)
sf.write(tgt_audio_raw, tgt_audio, tgt_sr)
tgt_duration = len(tgt_audio) / tgt_sr
results_log.append(f"TTS: {time.time()-t0:.1f}s ({tgt_duration:.1f}s audio)")
progress(0.85, desc=f"{lang_name}: aligning...")
MAX_STRETCH = 1.2
stretch_ratio = tgt_duration / video_duration
if stretch_ratio <= MAX_STRETCH:
if abs(stretch_ratio - 1.0) > 0.02:
stretch_audio_to_duration(tgt_audio_raw, tgt_audio_aligned, video_duration)
else:
import shutil
shutil.copy(tgt_audio_raw, tgt_audio_aligned)
extend_video = False
final_duration = video_duration
else:
shutil.copy(tgt_audio_raw, tgt_audio_aligned)
extend_video = True
final_duration = tgt_duration
results_log.append(f"Audio longer ({stretch_ratio:.1f}x) — extending video")
progress(0.95, desc=f"{lang_name}: combining...")
mux_video_audio(
video_path, tgt_audio_aligned, output_video,
extend_video=extend_video, target_duration=final_duration
)
output_videos.append(output_video)
except Exception as e:
logger.exception(f"Dubbing {lang_name} failed")
results_log.append(f"Error: {str(e)}")
progress(1.0, desc="Done!")
final_video = output_videos[0] if output_videos else None
return final_video, "\n".join(results_log)
def update_voices(language):
"""Update voice dropdown when language changes."""
voices = get_voices_for_language(language)
return gr.update(choices=voices, value=voices[0])
# =============================================================================
# Gradio UI
# =============================================================================
EXAMPLES = [
"And it's a brilliant goal from the striker!",
"The referee has shown a yellow card. Corner kick for the home team.",
"What a save by the goalkeeper! The match is heading into injury time.",
"He dribbles past two defenders and shoots! The ball hits the back of the net!",
]
CSS = """
.main-header { text-align: center; margin-bottom: 0.5rem; }
.main-header h1 { font-size: 1.8rem; font-weight: 700; margin: 0; }
.main-header p { color: #666; font-size: 0.95rem; }
.lang-group-label { font-weight: 600; font-size: 0.85rem; color: #888; text-transform: uppercase; letter-spacing: 0.05em; margin-top: 0.5rem; }
"""
with gr.Blocks(
title="PlotWeaver — Live Commentary Translation",
theme=gr.themes.Soft(),
css=CSS,
) as demo:
gr.HTML("""
<div class="main-header">
<h1>PlotWeaver</h1>
<p>Live commentary translation platform &mdash; English to 40+ languages</p>
<p style="font-size:0.8rem; color:#999">Qwen Omni (11 languages) + YourVoic API + NLLB-200 (27 languages)</p>
</div>
""")
with gr.Tabs():
# ====== TAB 1: EVENT MANAGEMENT ======
with gr.TabItem("Event Management"):
gr.Markdown("### Create new event")
gr.Markdown("Configure your live broadcast event with target languages and input source.")
with gr.Row():
with gr.Column(scale=2):
event_name = gr.Textbox(
label="Event name",
placeholder="e.g. Premier League: Arsenal vs. Chelsea",
)
with gr.Row():
start_time = gr.Textbox(label="Start time", placeholder="08:30 PM")
end_time = gr.Textbox(label="End time", placeholder="10:30 PM")
event_date = gr.Textbox(label="Date", placeholder="2026-06-06")
gr.Markdown("#### Input source")
input_method = gr.Radio(
choices=["RTMP Stream", "WebRTC (Browser)", "Direct Audio Feed"],
value="RTMP Stream",
label="Input method",
)
gr.Markdown("#### Target languages")
gr.Markdown("Select languages for simultaneous broadcast. Additional languages consume more stream minutes.")
# Language checkboxes grouped by category
target_langs = gr.CheckboxGroup(
choices=ALL_LANGUAGE_NAMES,
label="Languages",
value=["Spanish"],
)
with gr.Column(scale=1):
gr.Markdown("#### Estimate summary")
estimate_display = gr.Markdown(
value="**Event:** Not configured\n\n**Languages:** 1 selected\n\n**Estimated duration:** --\n\n**Total estimate:** --"
)
create_event_btn = gr.Button("Create Event", variant="primary", size="lg")
event_status = gr.Markdown("")
def update_estimate(name, langs, start, end):
n_langs = len(langs) if langs else 0
lang_list = ", ".join(langs) if langs else "None"
return (
f"**Event:** {name or 'Not set'}\n\n"
f"**Languages:** {n_langs} selected\n\n"
f"{lang_list}\n\n"
f"**Input:** Configured\n\n"
f"**Rate:** 1x (Standard)"
)
for inp in [event_name, target_langs, start_time, end_time]:
inp.change(
fn=update_estimate,
inputs=[event_name, target_langs, start_time, end_time],
outputs=[estimate_display],
)
def create_event(name, langs):
if not name:
return "Please enter an event name."
if not langs:
return "Please select at least one language."
return f"Event **{name}** created with {len(langs)} languages: {', '.join(langs)}"
create_event_btn.click(
fn=create_event,
inputs=[event_name, target_langs],
outputs=[event_status],
)
# ====== TAB 2: LIVE STUDIO ======
with gr.TabItem("Live Studio"):
gr.Markdown("### Live streaming translation")
gr.Markdown("Record or stream English commentary and hear it translated in real-time.")
with gr.Row():
studio_language = gr.Dropdown(
choices=ALL_LANGUAGE_NAMES,
value="Spanish",
label="Target language",
)
studio_voice = gr.Dropdown(
choices=get_voices_for_language("Spanish"),
value=get_voices_for_language("Spanish")[0],
label="Voice",
)
studio_language.change(
fn=update_voices,
inputs=[studio_language],
outputs=[studio_voice],
)
with gr.Row():
with gr.Column():
studio_audio_in = gr.Audio(
label="English commentary (upload or record)",
type="numpy",
sources=["upload", "microphone"],
)
studio_translate_btn = gr.Button("Translate", variant="primary", size="lg")
with gr.Column():
studio_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True)
studio_log = gr.Markdown(label="Pipeline log")
studio_translate_btn.click(
fn=full_pipeline_audio,
inputs=[studio_audio_in, studio_language],
outputs=[studio_audio_out, studio_log],
)
# ====== TAB 3: VIDEO DUBBING ======
with gr.TabItem("Video Dubbing"):
gr.Markdown("### Video dubbing (English → multi-language)")
gr.Markdown(
"Upload a video with English commentary and get back a dubbed version. "
"**Global languages** (Arabic, French, Spanish, etc.) use Qwen Omni for best quality. "
"**African/regional languages** use YourVoic API with NLLB translation."
)
with gr.Row():
with gr.Column():
dub_video_in = gr.Video(label="Upload English video", sources=["upload"])
dub_languages = gr.CheckboxGroup(
choices=ALL_LANGUAGE_NAMES,
label="Target languages",
value=["Spanish"],
)
with gr.Row():
dub_voice = gr.Dropdown(
choices=QWEN_VOICES,
value="Ethan",
label="Voice (for Qwen languages)",
info="Applies to Arabic, French, Spanish, etc. Local languages use default voice.",
)
dub_chunk_slider = gr.Slider(
minimum=30, maximum=300, value=120, step=10,
label="Chunk duration (seconds)",
info="Shorter = more API calls but less timeout risk.",
)
dub_btn = gr.Button("Dub Video", variant="primary", size="lg")
with gr.Column():
dub_video_out = gr.Video(label="Dubbed video (download from player)")
dub_log = gr.Markdown(
label="Processing log",
value="Upload a video and select languages to start."
)
dub_btn.click(
fn=dub_video,
inputs=[dub_video_in, dub_languages, dub_voice, dub_chunk_slider],
outputs=[dub_video_out, dub_log],
)
# ====== TAB 4: TEXT TRANSLATION ======
with gr.TabItem("Text \u2192 Audio"):
gr.Markdown("### Text to translated speech")
gr.Markdown("Type English text, choose a language, and hear the translated audio.")
with gr.Row():
text_language = gr.Dropdown(
choices=ALL_LANGUAGE_NAMES,
value="Spanish",
label="Target language",
)
text_voice = gr.Dropdown(
choices=get_voices_for_language("Spanish"),
value=get_voices_for_language("Spanish")[0],
label="Voice",
)
text_language.change(
fn=update_voices,
inputs=[text_language],
outputs=[text_voice],
)
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="English text",
placeholder="Type English football commentary here...",
lines=4,
)
text_btn = gr.Button("Translate to speech", variant="primary", size="lg")
gr.Examples(
examples=[[e] for e in EXAMPLES],
inputs=[text_input],
label="Example commentary",
)
with gr.Column():
text_audio_out = gr.Audio(label="Translated audio", type="numpy", autoplay=True)
text_log = gr.Markdown(label="Pipeline log")
text_btn.click(
fn=full_pipeline_text,
inputs=[text_input, text_language, text_voice],
outputs=[text_audio_out, text_log],
)
# ====== TAB 5: RECORDINGS ======
with gr.TabItem("Recordings & Clips"):
gr.Markdown("### Recordings management")
gr.Markdown(
"Past dubbed recordings will appear here. "
"This feature is coming soon — for now, use Video Dubbing to create new recordings "
"and download them from the player."
)
# ====== TAB 6: VOICE MODELS ======
with gr.TabItem("Voice Models"):
gr.Markdown("### Voice model library")
gr.Markdown("Browse available voices for each language.")
voice_lang_select = gr.Dropdown(
choices=ALL_LANGUAGE_NAMES,
value="Spanish",
label="Select language",
)
voice_info = gr.Markdown()
def show_voice_info(lang):
config = LANGUAGES.get(lang, {})
engine = config.get("tts_engine", "unknown")
voices = config.get("yourvoic_voices", [])
info = f"### {lang}\n\n"
if engine == "qwen":
info += f"**Engine:** Qwen 3.5 Omni (end-to-end speech-to-speech)\n\n"
info += f"This is the highest quality option. Qwen handles ASR + translation + TTS in a single API call, "
info += f"preserving tone, emotion, and pacing from the original speaker.\n\n"
info += f"**Available voices ({len(QWEN_VOICES)}):** {', '.join(QWEN_VOICES[:10])}... and {len(QWEN_VOICES)-10} more\n\n"
info += f"All voices support all Qwen languages."
elif engine == "yourvoic":
info += f"**Engine:** YourVoic API (TTS) + NLLB-200 (translation)\n\n"
info += f"**YourVoic language:** `{config.get('yourvoic_lang', 'N/A')}`\n\n"
info += f"**Available voices:** {', '.join(voices) if voices else 'Peter (default)'}"
else:
info += f"**Engine:** Not available\n\n"
info += f"**NLLB code:** `{config.get('nllb', 'N/A')}`\n\n"
info += "Uses locally fine-tuned models on GPU. Voice selection not available."
return info
voice_lang_select.change(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info])
demo.load(fn=show_voice_info, inputs=[voice_lang_select], outputs=[voice_info])
gr.Markdown("""
---
**PlotWeaver** by PlotweaverAI | Models:
[ASR](https://huggingface.co/PlotweaverAI/whisper-small-de-en) |
[MT](https://huggingface.co/PlotweaverAI/nllb-200-distilled-600M-african-6lang) |
[TTS](https://yourvoic.com) |
[Qwen Omni](https://www.alibabacloud.com/help/en/model-studio/qwen-omni)
""")
if __name__ == "__main__":
demo.launch()