Spaces:
Sleeping
Sleeping
| """ | |
| Qwen 3.5 Omni Engine — End-to-end speech-to-speech translation. | |
| Takes English audio in, returns translated audio + transcript out. | |
| No separate ASR/MT/TTS needed — Qwen handles everything in one call. | |
| """ | |
| import os | |
| import base64 | |
| import struct | |
| import subprocess | |
| import tempfile | |
| import time | |
| import shutil | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| QWEN_MODEL = "qwen3.5-omni-plus" | |
| QWEN_BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1" | |
| def _get_client(): | |
| """Create OpenAI-compatible client for Qwen Dashscope API.""" | |
| from openai import OpenAI | |
| api_key = os.environ.get("DASHSCOPE_API_KEY", "") | |
| if not api_key: | |
| raise RuntimeError( | |
| "DASHSCOPE_API_KEY not set. Add it as a Space secret." | |
| ) | |
| return OpenAI(api_key=api_key, base_url=QWEN_BASE_URL) | |
| def _wav_to_base64(wav_path): | |
| """Read WAV file and return base64 string.""" | |
| with open(wav_path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| def _base64_to_wav(b64_data, output_path): | |
| """Convert raw PCM base64 audio to WAV file (24kHz, mono, 16-bit).""" | |
| audio_bytes = base64.b64decode(b64_data) | |
| sample_rate = 24000 | |
| num_channels = 1 | |
| bits_per_sample = 16 | |
| byte_rate = sample_rate * num_channels * bits_per_sample // 8 | |
| block_align = num_channels * bits_per_sample // 8 | |
| data_size = len(audio_bytes) | |
| with open(output_path, "wb") as f: | |
| f.write(b"RIFF") | |
| f.write(struct.pack("<I", 36 + data_size)) | |
| f.write(b"WAVE") | |
| f.write(b"fmt ") | |
| f.write(struct.pack("<I", 16)) | |
| f.write(struct.pack("<H", 1)) | |
| f.write(struct.pack("<H", num_channels)) | |
| f.write(struct.pack("<I", sample_rate)) | |
| f.write(struct.pack("<I", byte_rate)) | |
| f.write(struct.pack("<H", block_align)) | |
| f.write(struct.pack("<H", bits_per_sample)) | |
| f.write(b"data") | |
| f.write(struct.pack("<I", data_size)) | |
| f.write(audio_bytes) | |
| def _extract_audio_chunk(video_path, output_wav, start_sec, duration_sec): | |
| """Extract a chunk of audio from video as 16kHz mono WAV.""" | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-ss", str(start_sec), "-t", str(duration_sec), | |
| "-i", video_path, "-vn", "-acodec", "pcm_s16le", | |
| "-ar", "16000", "-ac", "1", output_wav], | |
| capture_output=True, check=True, | |
| ) | |
| def _get_duration(filepath): | |
| """Get media file duration in seconds.""" | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", filepath], | |
| capture_output=True, text=True, | |
| ) | |
| return float(result.stdout.strip()) | |
| def _concatenate_wavs(wav_files, output_path): | |
| """Concatenate WAV files using ffmpeg.""" | |
| if len(wav_files) == 1: | |
| shutil.copy2(wav_files[0], output_path) | |
| return | |
| list_file = output_path + ".txt" | |
| with open(list_file, "w") as f: | |
| for wav in wav_files: | |
| f.write(f"file '{wav}'\n") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "concat", "-safe", "0", | |
| "-i", list_file, "-c", "copy", output_path], | |
| capture_output=True, check=True, | |
| ) | |
| os.remove(list_file) | |
| def _build_system_prompt(language_name): | |
| """Build Qwen system prompt for a target language.""" | |
| return ( | |
| f"You are a professional video dubbing translator. You will receive audio in English.\n" | |
| f"Your task:\n" | |
| f"1. Listen carefully to the English speech.\n" | |
| f"2. Translate it into natural, fluent {language_name}.\n" | |
| f"3. Respond ONLY with the {language_name} translation spoken aloud — no English, no commentary,\n" | |
| f" no meta-text, no transliteration. Speak entirely in {language_name}.\n" | |
| f"4. Match the tone, emotion, and pacing of the original speaker as closely as possible.\n" | |
| f"5. If there are pauses or silence in the original audio, maintain similar pacing.\n" | |
| f"6. Translate idioms and cultural references into their {language_name} equivalents.\n" | |
| f"7. Use clear, professional pronunciation suitable for a broad audience." | |
| ) | |
| def translate_chunk_qwen(wav_path, voice, language_name, chunk_index=0): | |
| """ | |
| Translate a single audio chunk using Qwen Omni. | |
| Args: | |
| wav_path: Path to input WAV file (English audio) | |
| voice: Qwen voice name (e.g. "Ethan", "Cherry") | |
| language_name: Full language name for the system prompt | |
| chunk_index: For logging | |
| Returns: | |
| (output_wav_path, transcript) or (None, transcript) if no audio | |
| """ | |
| client = _get_client() | |
| audio_b64 = _wav_to_base64(wav_path) | |
| output_wav = wav_path.replace(".wav", f"_qwen_{chunk_index}.wav") | |
| system_prompt = _build_system_prompt(language_name) | |
| user_prompt = f"Translate this English speech into {language_name}. Respond only with the spoken {language_name} translation." | |
| t0 = time.time() | |
| completion = client.chat.completions.create( | |
| model=QWEN_MODEL, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_audio", | |
| "input_audio": { | |
| "data": f"data:audio/wav;base64,{audio_b64}", | |
| "format": "wav", | |
| }, | |
| }, | |
| {"type": "text", "text": user_prompt}, | |
| ], | |
| }, | |
| ], | |
| modalities=["text", "audio"], | |
| audio={"voice": voice, "format": "wav"}, | |
| stream=True, | |
| stream_options={"include_usage": True}, | |
| ) | |
| audio_chunks = [] | |
| transcript_parts = [] | |
| for event in completion: | |
| if not event.choices: | |
| continue | |
| delta = event.choices[0].delta | |
| if hasattr(delta, "content") and delta.content: | |
| transcript_parts.append(delta.content) | |
| if hasattr(delta, "audio") and delta.audio: | |
| if isinstance(delta.audio, dict): | |
| if "data" in delta.audio: | |
| audio_chunks.append(delta.audio["data"]) | |
| elif hasattr(delta.audio, "data") and delta.audio.data: | |
| audio_chunks.append(delta.audio.data) | |
| transcript = "".join(transcript_parts) | |
| elapsed = time.time() - t0 | |
| logger.info(f"Qwen chunk {chunk_index}: {elapsed:.1f}s, transcript={transcript[:60]}") | |
| if audio_chunks: | |
| full_audio_b64 = "".join(audio_chunks) | |
| _base64_to_wav(full_audio_b64, output_wav) | |
| return output_wav, transcript | |
| return None, transcript | |
| def dub_video_qwen(video_path, language_name, voice="Ethan", chunk_seconds=120, progress_fn=None): | |
| """ | |
| Full video dubbing pipeline using Qwen Omni. | |
| Splits video into chunks, translates each chunk via Qwen API, | |
| concatenates results, and muxes back onto video. | |
| Args: | |
| video_path: Path to input video | |
| language_name: Full language name (e.g. "French", "Arabic") | |
| voice: Qwen voice name | |
| chunk_seconds: Audio chunk duration for API calls | |
| progress_fn: Optional gradio progress callback | |
| Returns: | |
| (output_video_path, log_text) | |
| """ | |
| tmp_dir = tempfile.mkdtemp(prefix=f"qwen_dub_") | |
| log = [] | |
| try: | |
| # Duration | |
| if progress_fn: | |
| progress_fn(0.05, desc="Analyzing video...") | |
| total_duration = _get_duration(video_path) | |
| log.append(f"**Video:** {total_duration:.1f}s") | |
| log.append(f"**Engine:** Qwen 3.5 Omni") | |
| log.append(f"**Voice:** {voice}") | |
| log.append(f"**Language:** {language_name}") | |
| if total_duration > 3600: | |
| return None, "Video longer than 1 hour — please use a shorter clip." | |
| # Split into chunks | |
| if progress_fn: | |
| progress_fn(0.1, desc="Extracting audio chunks...") | |
| num_chunks = max(1, int(total_duration // chunk_seconds) + (1 if total_duration % chunk_seconds > 0 else 0)) | |
| log.append(f"**Chunks:** {num_chunks} ({chunk_seconds}s each)") | |
| input_chunks = [] | |
| for i in range(num_chunks): | |
| start = i * chunk_seconds | |
| duration = min(chunk_seconds, total_duration - start) | |
| chunk_path = os.path.join(tmp_dir, f"chunk_{i:03d}.wav") | |
| _extract_audio_chunk(video_path, chunk_path, start, duration) | |
| input_chunks.append(chunk_path) | |
| # Translate each chunk | |
| output_chunks = [] | |
| all_transcripts = [] | |
| for i, chunk_path in enumerate(input_chunks): | |
| if progress_fn: | |
| frac = 0.15 + 0.7 * (i / num_chunks) | |
| progress_fn(frac, desc=f"Translating chunk {i+1}/{num_chunks}...") | |
| result_path, transcript = translate_chunk_qwen( | |
| chunk_path, voice, language_name, i | |
| ) | |
| if transcript: | |
| all_transcripts.append(f"**[{i+1}]** {transcript}") | |
| if result_path: | |
| output_chunks.append(result_path) | |
| else: | |
| # Silence fallback | |
| duration = _get_duration(chunk_path) | |
| silence_path = os.path.join(tmp_dir, f"silence_{i:03d}.wav") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-f", "lavfi", | |
| "-i", "anullsrc=r=24000:cl=mono", | |
| "-t", str(duration), "-acodec", "pcm_s16le", silence_path], | |
| capture_output=True, check=True, | |
| ) | |
| output_chunks.append(silence_path) | |
| # Concatenate | |
| if progress_fn: | |
| progress_fn(0.88, desc="Assembling audio...") | |
| full_audio = os.path.join(tmp_dir, "full_dubbed.wav") | |
| _concatenate_wavs(output_chunks, full_audio) | |
| # Mux onto video | |
| if progress_fn: | |
| progress_fn(0.93, desc="Combining audio and video...") | |
| output_video = os.path.join(tmp_dir, "dubbed_output.mp4") | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", video_path, "-i", full_audio, | |
| "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", | |
| "-shortest", output_video], | |
| capture_output=True, check=True, | |
| ) | |
| if progress_fn: | |
| progress_fn(1.0, desc="Done!") | |
| log.append(f"\n**Transcript:**") | |
| log.extend(all_transcripts) | |
| return output_video, "\n".join(log) | |
| except Exception as e: | |
| logger.exception("Qwen dubbing failed") | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| return None, f"Error: {str(e)}" | |