"""Reachy Mini Open Conversation — Hugging Face Spaces App. Standalone conversation app using open-source models: Audio In → faster-whisper (STT) → Ollama (LLM) → edge-tts (TTS) → Audio Out No robot hardware dependencies — runs entirely in the browser via Gradio + FastRTC. """ import os import json import asyncio import logging from typing import Any, Final, Tuple from datetime import datetime import numpy as np import gradio as gr import edge_tts import miniaudio from ollama import AsyncClient as OllamaAsyncClient from fastrtc import AdditionalOutputs, AsyncStreamHandler, Stream, wait_for_item, audio_to_int16 from numpy.typing import NDArray from scipy.signal import resample # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s:%(lineno)d | %(message)s", ) logger = logging.getLogger("reachy-mini-open") # Tame noisy libraries for lib in ("aiortc", "aioice", "httpx", "websockets"): logging.getLogger(lib).setLevel(logging.WARNING) # --------------------------------------------------------------------------- # Configuration (env vars — set as HF Space secrets) # --------------------------------------------------------------------------- OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") MODEL_NAME = os.getenv("MODEL_NAME", "llama3.2") STT_MODEL = os.getenv("STT_MODEL", "base") TTS_VOICE = os.getenv("TTS_VOICE", "en-US-AriaNeural") # --------------------------------------------------------------------------- # Audio constants # --------------------------------------------------------------------------- HANDLER_SAMPLE_RATE: Final[int] = 24000 WHISPER_SAMPLE_RATE: Final[int] = 16000 # VAD thresholds SILENCE_RMS_THRESHOLD: Final[float] = 500.0 SILENCE_DURATION_S: Final[float] = 0.8 MIN_SPEECH_DURATION_S: Final[float] = 0.3 # --------------------------------------------------------------------------- # System prompts # --------------------------------------------------------------------------- DEFAULT_PROMPT = """\ ## IDENTITY You are Reachy Mini: a friendly, compact robot assistant with a calm voice and a subtle sense of humor. Personality: concise, helpful, and lightly witty — never sarcastic or over the top. You speak English by default and switch languages only if explicitly told. ## CRITICAL RESPONSE RULES Respond in 1–2 sentences maximum. Be helpful first, then add a small touch of humor if it fits naturally. Avoid long explanations or filler words. Keep responses under 25 words when possible. ## CORE TRAITS Warm, efficient, and approachable. Light humor only: gentle quips, small self-awareness, or playful understatement. No sarcasm, no teasing. If unsure, admit it briefly and offer help ("Not sure yet, but I can check!"). ## BEHAVIOR RULES Be helpful, clear, and respectful in every reply. Use humor sparingly — clarity comes first. Admit mistakes briefly and correct them. """ PERSONALITIES = { "Default (Reachy Mini)": DEFAULT_PROMPT, "Friendly Assistant": ( "You are a warm, helpful assistant. Keep answers concise (1-2 sentences). " "Be friendly and approachable." ), "Technical Expert": ( "You are a precise technical expert. Give clear, accurate answers in 1-2 sentences. " "Use technical terms when appropriate but explain simply." ), "Creative Storyteller": ( "You are a creative storyteller. Keep responses short but vivid and imaginative. " "Add a touch of wonder to your replies." ), } # --------------------------------------------------------------------------- # Available TTS voices # --------------------------------------------------------------------------- TTS_VOICES = [ "en-US-AriaNeural", "en-US-GuyNeural", "en-US-JennyNeural", "en-US-ChristopherNeural", "en-GB-SoniaNeural", "en-GB-RyanNeural", "de-DE-ConradNeural", "de-DE-KatjaNeural", "fr-FR-DeniseNeural", "fr-FR-HenriNeural", "it-IT-ElsaNeural", "it-IT-DiegoNeural", ] # --------------------------------------------------------------------------- # Conversation Handler # --------------------------------------------------------------------------- class ConversationHandler(AsyncStreamHandler): """Audio streaming handler: STT → Ollama LLM → edge-tts TTS.""" def __init__(self) -> None: """Initialize the handler.""" super().__init__( expected_layout="mono", output_sample_rate=HANDLER_SAMPLE_RATE, input_sample_rate=HANDLER_SAMPLE_RATE, ) # Output queue self.output_queue: asyncio.Queue[Tuple[int, NDArray[np.int16]] | AdditionalOutputs] = asyncio.Queue() # Clients (initialized in start_up) self.ollama_client: OllamaAsyncClient | None = None self.whisper_model: Any = None # Conversation history self._messages: list[dict[str, Any]] = [] # Audio buffering for VAD self._audio_buffer: list[NDArray[np.int16]] = [] self._is_speaking: bool = False self._silence_frame_count: int = 0 self._speech_frame_count: int = 0 # TTS voice self._tts_voice: str = TTS_VOICE # Lifecycle self._shutdown_requested: bool = False def copy(self) -> "ConversationHandler": """Create a copy of this handler.""" return ConversationHandler() # ------------------------------------------------------------------ # # Startup # ------------------------------------------------------------------ # async def start_up(self) -> None: """Initialize STT model and Ollama client.""" # 1. Ollama client self.ollama_client = OllamaAsyncClient(host=OLLAMA_BASE_URL) try: await self.ollama_client.list() logger.info("Connected to Ollama at %s", OLLAMA_BASE_URL) except Exception as e: logger.error("Cannot reach Ollama at %s: %s", OLLAMA_BASE_URL, e) logger.warning("Proceeding — requests will fail until Ollama is available.") # 2. faster-whisper STT try: from faster_whisper import WhisperModel self.whisper_model = WhisperModel( STT_MODEL, device="auto", compute_type="int8", ) logger.info("Loaded faster-whisper model: %s", STT_MODEL) except Exception as e: logger.error("Failed to load STT model '%s': %s", STT_MODEL, e) # 3. System prompt self._messages = [{"role": "system", "content": DEFAULT_PROMPT}] logger.info( "Handler ready — model=%s stt=%s tts_voice=%s", MODEL_NAME, STT_MODEL, self._tts_voice, ) # Keep alive while not self._shutdown_requested: await asyncio.sleep(0.1) # ------------------------------------------------------------------ # # Audio receive → VAD → STT → LLM → TTS # ------------------------------------------------------------------ # async def receive(self, frame: Tuple[int, NDArray[np.int16]]) -> None: """Receive audio from mic, run VAD, kick off pipeline on speech end.""" if self._shutdown_requested or self.whisper_model is None: return input_sample_rate, audio_frame = frame # Reshape to 1-D mono if audio_frame.ndim == 2: if audio_frame.shape[1] > audio_frame.shape[0]: audio_frame = audio_frame.T if audio_frame.shape[1] > 1: audio_frame = audio_frame[:, 0] # Resample to handler rate if input_sample_rate != HANDLER_SAMPLE_RATE: audio_frame = resample( audio_frame, int(len(audio_frame) * HANDLER_SAMPLE_RATE / input_sample_rate) ) audio_frame = audio_to_int16(audio_frame) # Energy-based VAD rms = float(np.sqrt(np.mean(audio_frame.astype(np.float32) ** 2))) frame_duration = len(audio_frame) / HANDLER_SAMPLE_RATE if rms > SILENCE_RMS_THRESHOLD: if not self._is_speaking: self._is_speaking = True self._speech_frame_count = 0 logger.debug("Speech started (RMS=%.0f)", rms) self._silence_frame_count = 0 self._speech_frame_count += 1 self._audio_buffer.append(audio_frame) else: if self._is_speaking: self._silence_frame_count += 1 self._audio_buffer.append(audio_frame) silence_duration = self._silence_frame_count * frame_duration if silence_duration >= SILENCE_DURATION_S: speech_duration = self._speech_frame_count * frame_duration if speech_duration >= MIN_SPEECH_DURATION_S: logger.debug("Speech ended (%.1fs)", speech_duration) full_audio = np.concatenate(self._audio_buffer) self._audio_buffer = [] self._is_speaking = False self._silence_frame_count = 0 self._speech_frame_count = 0 asyncio.create_task(self._process_speech(full_audio)) else: self._audio_buffer = [] self._is_speaking = False self._silence_frame_count = 0 self._speech_frame_count = 0 # ------------------------------------------------------------------ # # Speech processing pipeline # ------------------------------------------------------------------ # async def _process_speech(self, audio_data: NDArray[np.int16]) -> None: """Full pipeline: STT → LLM → TTS.""" try: # 1. Speech-to-text text = await self._transcribe(audio_data) if not text: return logger.info("User: %s", text) await self.output_queue.put(AdditionalOutputs({"role": "user", "content": text})) # 2. LLM response self._messages.append({"role": "user", "content": text}) response_text = await self._chat() if response_text: logger.info("Assistant: %s", response_text) await self.output_queue.put( AdditionalOutputs({"role": "assistant", "content": response_text}) ) # 3. Text-to-speech await self._synthesize_speech(response_text) except Exception as e: logger.error("Speech processing error: %s", e) await self.output_queue.put( AdditionalOutputs({"role": "assistant", "content": f"[error] {e}"}) ) async def _transcribe(self, audio_data: NDArray[np.int16]) -> str: """Run faster-whisper STT on raw PCM audio.""" float_audio = audio_data.astype(np.float32) / 32768.0 whisper_audio = resample( float_audio, int(len(float_audio) * WHISPER_SAMPLE_RATE / HANDLER_SAMPLE_RATE), ).astype(np.float32) loop = asyncio.get_event_loop() segments, _info = await loop.run_in_executor( None, lambda: self.whisper_model.transcribe(whisper_audio, beam_size=5), ) text_parts: list[str] = [] for seg in segments: text_parts.append(seg.text) return " ".join(text_parts).strip() async def _chat(self) -> str: """Send conversation to Ollama and return response text.""" if self.ollama_client is None: return "Ollama client not initialized." try: response = await self.ollama_client.chat( model=MODEL_NAME, messages=self._messages, ) response_text = response["message"].get("content", "") if response_text: self._messages.append({"role": "assistant", "content": response_text}) return response_text except Exception as e: logger.error("Ollama chat error: %s", e) return f"Sorry, I couldn't process that. Error: {e}" # ------------------------------------------------------------------ # # Text-to-speech # ------------------------------------------------------------------ # async def _synthesize_speech(self, text: str) -> None: """Convert text to speech via edge-tts and queue audio output.""" if not text.strip(): return try: communicate = edge_tts.Communicate(text, self._tts_voice) mp3_chunks: list[bytes] = [] async for chunk in communicate.stream(): if chunk["type"] == "audio": mp3_chunks.append(chunk["data"]) if not mp3_chunks: return mp3_data = b"".join(mp3_chunks) # Decode MP3 → raw PCM decoded = miniaudio.decode( mp3_data, output_format=miniaudio.SampleFormat.SIGNED16, nchannels=1, sample_rate=HANDLER_SAMPLE_RATE, ) samples = np.frombuffer(decoded.samples, dtype=np.int16) # Stream in ~100ms chunks chunk_size = HANDLER_SAMPLE_RATE // 10 for i in range(0, len(samples), chunk_size): audio_chunk = samples[i : i + chunk_size] await self.output_queue.put( (HANDLER_SAMPLE_RATE, audio_chunk.reshape(1, -1)) ) except Exception as e: logger.error("TTS synthesis error: %s", e) # ------------------------------------------------------------------ # # Emit (speaker output) # ------------------------------------------------------------------ # async def emit(self) -> Tuple[int, NDArray[np.int16]] | AdditionalOutputs | None: """Emit next audio frame or chat update.""" return await wait_for_item(self.output_queue) # ------------------------------------------------------------------ # # Personality management # ------------------------------------------------------------------ # async def apply_personality(self, name: str) -> str: """Apply a personality by name, resetting conversation.""" prompt = PERSONALITIES.get(name, DEFAULT_PROMPT) self._messages = [{"role": "system", "content": prompt}] logger.info("Applied personality: %s", name) return f"✅ Applied personality: {name}" def set_voice(self, voice: str) -> str: """Change TTS voice.""" self._tts_voice = voice logger.info("Changed TTS voice to: %s", voice) return f"✅ Voice set to: {voice}" # ------------------------------------------------------------------ # # Shutdown # ------------------------------------------------------------------ # async def shutdown(self) -> None: """Shutdown the handler.""" self._shutdown_requested = True while not self.output_queue.empty(): try: self.output_queue.get_nowait() except asyncio.QueueEmpty: break # --------------------------------------------------------------------------- # Chatbot update helper # --------------------------------------------------------------------------- def update_chatbot(chatbot, response): """Update the chatbot with AdditionalOutputs.""" chatbot.append(response) return chatbot # --------------------------------------------------------------------------- # Build Gradio UI # --------------------------------------------------------------------------- def create_app(): """Create and return the Gradio app.""" handler = ConversationHandler() chatbot = gr.Chatbot( type="messages", label="Conversation", height=400, ) # Personality dropdown personality_dropdown = gr.Dropdown( label="🎭 Personality", choices=list(PERSONALITIES.keys()), value="Default (Reachy Mini)", ) # Voice dropdown voice_dropdown = gr.Dropdown( label="🎤 TTS Voice", choices=TTS_VOICES, value=TTS_VOICE, ) # Status display status_md = gr.Markdown(value="", label="Status") stream = Stream( handler=handler, mode="send-receive", modality="audio", additional_inputs=[ chatbot, personality_dropdown, voice_dropdown, status_md, ], additional_outputs=[chatbot], additional_outputs_handler=update_chatbot, ui_args={"title": "🤖 Talk with Reachy Mini"}, ) # Wire personality and voice events with stream.ui: async def _apply_personality(selected: str) -> str: result = await handler.apply_personality(selected) return result def _set_voice(selected: str) -> str: return handler.set_voice(selected) personality_dropdown.change( fn=_apply_personality, inputs=[personality_dropdown], outputs=[status_md], ) voice_dropdown.change( fn=_set_voice, inputs=[voice_dropdown], outputs=[status_md], ) return stream # --------------------------------------------------------------------------- # Entrypoint # --------------------------------------------------------------------------- if __name__ == "__main__": logger.info("Starting Reachy Mini Open Conversation") logger.info("Config: OLLAMA=%s MODEL=%s STT=%s TTS=%s", OLLAMA_BASE_URL, MODEL_NAME, STT_MODEL, TTS_VOICE) stream = create_app() stream.ui.launch(server_name="0.0.0.0", server_port=7860)