import re import argparse import gradio as gr import numpy as np import torch import torchaudio.functional as F def prepare_speech(new_chunk): sr, y = new_chunk # Convert to mono if stereo if y.ndim > 1: y = y.mean(axis=1) y = y.astype(np.float32) y /= 32768.0 resampled_y = F.resample(torch.from_numpy(y), sr, 16000) return resampled_y.numpy() def wav_array_to_base64(wav_array, sample_rate): """Convert a numpy audio array to base64 encoded WAV.""" import base64 import io import soundfile as sf buffer = io.BytesIO() sf.write(buffer, wav_array, sample_rate, format='WAV') buffer.seek(0) return base64.b64encode(buffer.read()).decode('utf-8') def prepare_inputs(messages, audio_base64): if not messages: # Check for None or empty list messages = [ { "role": "system", "content": "You are a professional simultaneous interpreter. You will be given chunks of English audio and you need to translate the audio into Chinese text." }, ] messages.append( { "role": "user", "content": [{"type": "audio_url", "audio_url": {"url": f"data:audio/wav;base64,{audio_base64}"}}] } ) return messages def translate(messages, new_chunk, chunk_buffer, chunk_size_seconds, last_chunk_time): """ Translate audio chunks with buffering. Args: messages: Conversation history new_chunk: New audio chunk from microphone chunk_buffer: List of buffered audio arrays chunk_size_seconds: Target chunk size in seconds last_chunk_time: Timestamp of last received chunk (to detect pauses) Returns: messages, full_translation, updated_chunk_buffer, current_time """ from openai import OpenAI import time current_time = time.time() if new_chunk is None: current_translation = ''.join([message["content"] for message in messages if message["role"] == "assistant"]) if messages else "" return messages, current_translation, chunk_buffer, last_chunk_time # Initialize messages if None if messages is None: messages = [] # Initialize chunk_buffer if None if chunk_buffer is None: chunk_buffer = [] # Check if there was a significant gap (> 2 seconds) - indicates pause/resume # Clear partial buffer to avoid concatenating audio from different time periods if last_chunk_time is not None and (current_time - last_chunk_time) > 2.0: if chunk_buffer: print(f"⚠️ Detected pause (gap: {current_time - last_chunk_time:.1f}s). Clearing {len(chunk_buffer)} partial chunks.") chunk_buffer = [] # Prepare and buffer the new chunk y = prepare_speech(new_chunk) chunk_buffer.append(y) # Calculate how many 0.96s chunks we need to reach target size chunks_needed = int(chunk_size_seconds / 0.96) # If we haven't accumulated enough chunks yet, return without processing if len(chunk_buffer) < chunks_needed: # Return current state without translation current_translation = ''.join([message["content"] for message in messages if message["role"] == "assistant"]) return messages, current_translation, chunk_buffer, current_time # We have enough chunks - concatenate and process concatenated_audio = np.concatenate(chunk_buffer[:chunks_needed]) chunk_buffer = chunk_buffer[chunks_needed:] # Keep any extra chunks for next iteration # Convert to base64 audio_base64 = wav_array_to_base64(concatenated_audio, 16000) # Prepare messages messages = prepare_inputs(messages, audio_base64) # Calculate context window size based on chunk size # Larger chunks = longer audio = can keep fewer messages in context # Base: 30 messages for 1.92s chunks, scale proportionally context_window = max(10, int(30 * (1.92 / chunk_size_seconds))) # Call OpenAI API # model owaski/Open-LiveTranslate-v0-En-Zh served locally with vllm client = OpenAI( base_url="https://jaida-avian-irmgard.ngrok-free.dev/v1", api_key="", ) model_path = "/data/user_data/siqiouya/ckpts/test_swift/Qwen3-Omni-30B-A3B-Instruct-lora/v1-20251104-033331-hf" completion = client.chat.completions.create( model=model_path, messages=[messages[0]] + messages[-context_window:], top_p=0.95, temperature=0.6, extra_body={"top_k": 20} ) print(f"completion: {completion}") translation = completion.choices[0].message.content messages.append( { "role": "assistant", "content": translation } ) # Get all translations full_translation = ''.join([message["content"] for message in messages if message["role"] == "assistant"]) # Keep only the last 5 lines for display translation_lines = full_translation.split('\n') if full_translation else [''] # Filter out empty lines for counting, but preserve them in output non_empty_lines = [line for line in translation_lines if line.strip()] if len(non_empty_lines) > 5: # Find the last 5 non-empty lines and include any surrounding context # Count backwards to find where the 5th-to-last non-empty line is count = 0 for i in range(len(translation_lines) - 1, -1, -1): if translation_lines[i].strip(): count += 1 if count == 5: display_translation = '\n'.join(translation_lines[i:]) break else: display_translation = full_translation else: display_translation = full_translation return messages, display_translation, chunk_buffer, current_time with gr.Blocks(css=""" .large-font textarea { font-size: 20px !important; font-weight: 500; overflow-y: auto !important; } .large-font label { font-size: 20px !important; font-weight: bold; } """) as demo: gr.Markdown("# Simultaneous Speech Translation Demo") gr.Markdown("**Instructions:** Select chunk size, then click the microphone to start recording. Refresh page to reset the history.") # State components messages_state = gr.State(value=[]) chunk_buffer_state = gr.State(value=[]) last_chunk_time_state = gr.State(value=None) with gr.Row(): with gr.Column(): # Chunk size selector (multiples of 0.96) chunk_size_selector = gr.Dropdown( choices=[0.96, 1.92, 2.88, 3.84, 4.80, 5.76, 6.72, 7.68, 8.64, 9.60], value=1.92, label="Chunk Size (seconds)", info="Larger chunks = more context but slower response. Must be multiple of 0.96s." ) audio_input = gr.Audio(sources=["microphone"], streaming=True, label="Audio Input") with gr.Row(): with gr.Column(): translation_output = gr.Textbox( label="Translation", lines=3, max_lines=5, interactive=False, elem_classes=["large-font"], autoscroll=True, show_copy_button=True ) # Streaming translation audio_input.stream( translate, inputs=[messages_state, audio_input, chunk_buffer_state, chunk_size_selector, last_chunk_time_state], outputs=[messages_state, translation_output, chunk_buffer_state, last_chunk_time_state], show_progress=False, stream_every=0.96 # Base unit - buffering happens inside translate() ) demo.launch(share=True)