""" Real-time WebRTC speech-to-speech demo with fastrtc Based on the original liquid-audio demo """ from queue import Queue from threading import Thread import gradio as gr import numpy as np import torch from fastrtc import AdditionalOutputs, ReplyOnPause, WebRTC from liquid_audio import ChatState, LFM2AudioModel, LFM2AudioProcessor, LFMModality # Load models HF_REPO = "LiquidAI/LFM2-Audio-1.5B" print("Loading processor...") processor = LFM2AudioProcessor.from_pretrained(HF_REPO).eval() print("Loading model...") model = LFM2AudioModel.from_pretrained(HF_REPO).eval() print("Loading audio codec...") mimi = processor.mimi.eval() # Move to CUDA if available device = "cuda" if torch.cuda.is_available() else "cpu" model = model.to(device) mimi = mimi.to(device) print(f"Models loaded on {device}") def chat_producer( q: Queue[torch.Tensor | None], chat: ChatState, temp: float | None, topk: int | None, ): """Producer thread that generates tokens""" print(f"Starting generation with state {chat}.") with torch.no_grad(), mimi.streaming(1): for t in model.generate_interleaved( **chat, max_new_tokens=1024, audio_temperature=temp, audio_top_k=topk, ): q.put(t) if t.numel() > 1: if (t == 2048).any(): continue wav_chunk = mimi.decode(t[None, :, None])[0] q.put(wav_chunk) q.put(None) def chat_response(audio: tuple[int, np.ndarray], _id: str, chat: ChatState, temp: float | None = 1.0, topk: int | None = 4): """Handle incoming audio and generate streaming response""" if temp == 0: temp = None if topk == 0: topk = None if temp is not None: temp = float(temp) if topk is not None: topk = int(topk) if len(chat.text) == 1: chat.new_turn("system") chat.add_text("Respond with interleaved text and audio.") chat.end_turn() chat.new_turn("user") rate, wav = audio # Convert to tensor with proper shape (channels, samples) wav_tensor = torch.tensor(wav / 32_768, dtype=torch.float) # Ensure correct shape if len(wav_tensor.shape) == 1: wav_tensor = wav_tensor.unsqueeze(0) elif len(wav_tensor.shape) > 1: # If stereo, convert to mono wav_tensor = wav_tensor.mean(dim=-1, keepdim=True).T chat.add_audio(wav_tensor, rate) chat.end_turn() chat.new_turn("assistant") q: Queue[torch.Tensor | None] = Queue() chat_thread = Thread(target=chat_producer, args=(q, chat, temp, topk)) chat_thread.start() out_text: list[torch.Tensor] = [] out_audio: list[torch.Tensor] = [] out_modality: list[LFMModality] = [] while True: t = q.get() if t is None: break elif t.numel() == 1: # text out_text.append(t) out_modality.append(LFMModality.TEXT) print(processor.text.decode(t), end="") cur_string = processor.text.decode(torch.cat(out_text)).removesuffix("<|text_end|>") yield AdditionalOutputs(cur_string) elif t.numel() == 8: out_audio.append(t) out_modality.append(LFMModality.AUDIO_OUT) elif t.numel() == 1920: np_chunk = (t.cpu().numpy() * 32_767).astype(np.int16) yield (24_000, np_chunk) else: raise RuntimeError(f"unexpected shape: {t.shape}") chat.append( text=torch.stack(out_text, 1), audio_out=torch.stack(out_audio, 1), modality_flag=torch.tensor(out_modality, device=device), ) chat.end_turn() chat.new_turn("user") def clear(): """Clear chat history""" gr.Info("Cleared chat history", duration=3) return ChatState(processor), None # Create Gradio interface with gr.Blocks(title="LFM2-Audio Real-time Speech-to-Speech") as demo: gr.Markdown(""" # LFM2-Audio Real-time Speech-to-Speech Chat **Real-time WebRTC streaming** powered by fastrtc - Talk naturally and get instant responses! **How to use:** 1. Click "Allow" when prompted for microphone access 2. Start speaking - the model listens and responds in real-time 3. The conversation flows naturally with minimal latency **Features:** - 🎙️ Real-time WebRTC streaming - ⚡ Low latency response - 💬 Interleaved text and audio output - 🔄 Multi-turn conversations """) chat_state = gr.State(ChatState(processor)) with gr.Row(): with gr.Column(): webrtc = WebRTC( modality="audio", mode="send-receive", full_screen=False, ) with gr.Row(): temperature = gr.Slider( minimum=0, maximum=2.0, value=1.0, step=0.1, label="Temperature (0 for greedy)", info="Higher = more creative" ) top_k = gr.Slider( minimum=0, maximum=100, value=4, step=1, label="Top-k (0 for no filtering)", info="Sampling diversity" ) clear_btn = gr.Button("Reset Chat") with gr.Column(): text_out = gr.Textbox( lines=10, label="Conversation Text", interactive=False ) gr.Markdown(""" ### About this demo This demo uses **fastrtc** for WebRTC streaming, enabling real-time speech-to-speech interaction with minimal latency. The model processes your speech and generates both text and audio responses simultaneously. **Model**: LFM2-Audio-1.5B by Liquid AI **Mode**: Interleaved generation (optimized for real-time) **Audio Codec**: Mimi (24kHz) [Liquid AI](https://www.liquid.ai/) | [GitHub](https://github.com/Liquid4All/liquid-audio/) | [Model Card](https://huggingface.co/LiquidAI/LFM2-Audio-1.5B) """) # Setup WebRTC streaming webrtc.stream( ReplyOnPause( chat_response, # type: ignore[arg-type] input_sample_rate=24_000, output_sample_rate=24_000, can_interrupt=False, ), inputs=[webrtc, chat_state, temperature, top_k], outputs=[webrtc], ) webrtc.on_additional_outputs( lambda s: s, outputs=[text_out], ) clear_btn.click(clear, outputs=[chat_state, text_out]) if __name__ == "__main__": demo.launch()