""" Real-time WebRTC speech-to-speech demo with fastrtc Based on the original liquid-audio demo """ from queue import Queue from threading import Thread import os import gradio as gr import numpy as np import torch from fastrtc import AdditionalOutputs, ReplyOnPause, WebRTC from liquid_audio import ChatState, LFM2AudioModel, LFM2AudioProcessor, LFMModality # Configure WebRTC with STUN/TURN servers # This is CRITICAL for WebRTC connections to work through firewalls/NAT rtc_configuration = { "iceServers": [ { "urls": [ "stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302", ] } ] } # For production deployment on Hugging Face Spaces, you can use Cloudflare TURN: # Uncomment these lines and set TURN_KEY_ID and TURN_KEY_API_TOKEN as Secrets # from fastrtc import get_cloudflare_turn_credentials_async # if os.getenv("TURN_KEY_ID") and os.getenv("TURN_KEY_API_TOKEN"): # rtc_configuration = get_cloudflare_turn_credentials_async() # Load models HF_REPO = "LiquidAI/LFM2-Audio-1.5B" print("Loading processor...") processor = LFM2AudioProcessor.from_pretrained(HF_REPO).eval() print("Loading model...") model = LFM2AudioModel.from_pretrained(HF_REPO).eval() print("Loading audio codec...") mimi = processor.mimi.eval() # Move to CUDA if available device = "cuda" if torch.cuda.is_available() else "cpu" model = model.to(device) mimi = mimi.to(device) print(f"Models loaded on {device}") def chat_producer( q: Queue[torch.Tensor | None], chat: ChatState, temp: float | None, topk: int | None, ): """Producer thread that generates tokens""" print(f"Starting generation with state {chat}.") with torch.no_grad(), mimi.streaming(1): for t in model.generate_interleaved( **chat, max_new_tokens=1024, audio_temperature=temp, audio_top_k=topk, ): q.put(t) if t.numel() > 1: if (t == 2048).any(): continue wav_chunk = mimi.decode(t[None, :, None])[0] q.put(wav_chunk) q.put(None) def chat_response(audio: tuple[int, np.ndarray], _id: str, chat: ChatState, temp: float | None = 1.0, topk: int | None = 4): """Handle incoming audio and generate streaming response""" if temp == 0: temp = None if topk == 0: topk = None if temp is not None: temp = float(temp) if topk is not None: topk = int(topk) if len(chat.text) == 1: chat.new_turn("system") chat.add_text("Respond with interleaved text and audio.") chat.end_turn() chat.new_turn("user") rate, wav = audio # Convert to tensor with proper shape (channels, samples) wav_tensor = torch.tensor(wav / 32_768, dtype=torch.float) # Ensure correct shape if len(wav_tensor.shape) == 1: wav_tensor = wav_tensor.unsqueeze(0) elif len(wav_tensor.shape) > 1: # If stereo, convert to mono wav_tensor = wav_tensor.mean(dim=-1, keepdim=True).T chat.add_audio(wav_tensor, rate) chat.end_turn() chat.new_turn("assistant") q: Queue[torch.Tensor | None] = Queue() chat_thread = Thread(target=chat_producer, args=(q, chat, temp, topk)) chat_thread.start() out_text: list[torch.Tensor] = [] out_audio: list[torch.Tensor] = [] out_modality: list[LFMModality] = [] while True: t = q.get() if t is None: break elif t.numel() == 1: # text out_text.append(t) out_modality.append(LFMModality.TEXT) print(processor.text.decode(t), end="") cur_string = processor.text.decode(torch.cat(out_text)).removesuffix("<|text_end|>") yield AdditionalOutputs(cur_string) elif t.numel() == 8: out_audio.append(t) out_modality.append(LFMModality.AUDIO_OUT) elif t.numel() == 1920: np_chunk = (t.cpu().numpy() * 32_767).astype(np.int16) yield (24_000, np_chunk) else: raise RuntimeError(f"unexpected shape: {t.shape}") chat.append( text=torch.stack(out_text, 1), audio_out=torch.stack(out_audio, 1), modality_flag=torch.tensor(out_modality, device=device), ) chat.end_turn() chat.new_turn("user") def clear(): """Clear chat history""" gr.Info("Cleared chat history", duration=3) return ChatState(processor), None # Create Gradio interface with gr.Blocks(title="LFM2-Audio Real-time Speech-to-Speech") as demo: gr.Markdown(""" # LFM2-Audio Real-time Speech-to-Speech Chat **Real-time WebRTC streaming** powered by fastrtc - Talk naturally and get instant responses! **How to use:** 1. Click "Allow" when prompted for microphone access 2. Start speaking - the model listens and responds in real-time 3. The conversation flows naturally with minimal latency **Features:** - 🎙️ Real-time WebRTC streaming - ⚡ Low latency response - 💬 Interleaved text and audio output - 🔄 Multi-turn conversations """) chat_state = gr.State(ChatState(processor)) with gr.Row(): with gr.Column(): webrtc = WebRTC( modality="audio", mode="send-receive", full_screen=False, rtc_configuration=rtc_configuration, ) with gr.Row(): temperature = gr.Slider( minimum=0, maximum=2.0, value=1.0, step=0.1, label="Temperature (0 for greedy)", info="Higher = more creative" ) top_k = gr.Slider( minimum=0, maximum=100, value=4, step=1, label="Top-k (0 for no filtering)", info="Sampling diversity" ) clear_btn = gr.Button("Reset Chat") with gr.Column(): text_out = gr.Textbox( lines=10, label="Conversation Text", interactive=False ) gr.Markdown(""" ### About this demo This demo uses **fastrtc** for WebRTC streaming, enabling real-time speech-to-speech interaction with minimal latency. The model processes your speech and generates both text and audio responses simultaneously. **Model**: LFM2-Audio-1.5B by Liquid AI **Mode**: Interleaved generation (optimized for real-time) **Audio Codec**: Mimi (24kHz) [Liquid AI](https://www.liquid.ai/) | [GitHub](https://github.com/Liquid4All/liquid-audio/) | [Model Card](https://huggingface.co/LiquidAI/LFM2-Audio-1.5B) """) # Setup WebRTC streaming webrtc.stream( ReplyOnPause( chat_response, # type: ignore[arg-type] input_sample_rate=24_000, output_sample_rate=24_000, can_interrupt=False, ), inputs=[webrtc, chat_state, temperature, top_k], outputs=[webrtc], ) webrtc.on_additional_outputs( lambda s: s, outputs=[text_out], ) clear_btn.click(clear, outputs=[chat_state, text_out]) if __name__ == "__main__": demo.launch()