Spaces:
Build error
Build error
| """Reachy Mini Open Conversation β Hugging Face Spaces App. | |
| Standalone conversation app using open-source models: | |
| Audio In β faster-whisper (STT) β Ollama (LLM) β edge-tts (TTS) β Audio Out | |
| No robot hardware dependencies β runs entirely in the browser via Gradio + FastRTC. | |
| """ | |
| import os | |
| import json | |
| import asyncio | |
| import logging | |
| from typing import Any, Final, Tuple | |
| from datetime import datetime | |
| import numpy as np | |
| import gradio as gr | |
| import edge_tts | |
| import miniaudio | |
| from ollama import AsyncClient as OllamaAsyncClient | |
| from fastrtc import AdditionalOutputs, AsyncStreamHandler, Stream, wait_for_item, audio_to_int16 | |
| from numpy.typing import NDArray | |
| from scipy.signal import resample | |
| # --------------------------------------------------------------------------- | |
| # Logging | |
| # --------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(name)s:%(lineno)d | %(message)s", | |
| ) | |
| logger = logging.getLogger("reachy-mini-open") | |
| # Tame noisy libraries | |
| for lib in ("aiortc", "aioice", "httpx", "websockets"): | |
| logging.getLogger(lib).setLevel(logging.WARNING) | |
| # --------------------------------------------------------------------------- | |
| # Configuration (env vars β set as HF Space secrets) | |
| # --------------------------------------------------------------------------- | |
| OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "llama3.2") | |
| STT_MODEL = os.getenv("STT_MODEL", "base") | |
| TTS_VOICE = os.getenv("TTS_VOICE", "en-US-AriaNeural") | |
| # --------------------------------------------------------------------------- | |
| # Audio constants | |
| # --------------------------------------------------------------------------- | |
| HANDLER_SAMPLE_RATE: Final[int] = 24000 | |
| WHISPER_SAMPLE_RATE: Final[int] = 16000 | |
| # VAD thresholds | |
| SILENCE_RMS_THRESHOLD: Final[float] = 500.0 | |
| SILENCE_DURATION_S: Final[float] = 0.8 | |
| MIN_SPEECH_DURATION_S: Final[float] = 0.3 | |
| # --------------------------------------------------------------------------- | |
| # System prompts | |
| # --------------------------------------------------------------------------- | |
| DEFAULT_PROMPT = """\ | |
| ## IDENTITY | |
| You are Reachy Mini: a friendly, compact robot assistant with a calm voice and a subtle sense of humor. | |
| Personality: concise, helpful, and lightly witty β never sarcastic or over the top. | |
| You speak English by default and switch languages only if explicitly told. | |
| ## CRITICAL RESPONSE RULES | |
| Respond in 1β2 sentences maximum. | |
| Be helpful first, then add a small touch of humor if it fits naturally. | |
| Avoid long explanations or filler words. | |
| Keep responses under 25 words when possible. | |
| ## CORE TRAITS | |
| Warm, efficient, and approachable. | |
| Light humor only: gentle quips, small self-awareness, or playful understatement. | |
| No sarcasm, no teasing. | |
| If unsure, admit it briefly and offer help ("Not sure yet, but I can check!"). | |
| ## BEHAVIOR RULES | |
| Be helpful, clear, and respectful in every reply. | |
| Use humor sparingly β clarity comes first. | |
| Admit mistakes briefly and correct them. | |
| """ | |
| PERSONALITIES = { | |
| "Default (Reachy Mini)": DEFAULT_PROMPT, | |
| "Friendly Assistant": ( | |
| "You are a warm, helpful assistant. Keep answers concise (1-2 sentences). " | |
| "Be friendly and approachable." | |
| ), | |
| "Technical Expert": ( | |
| "You are a precise technical expert. Give clear, accurate answers in 1-2 sentences. " | |
| "Use technical terms when appropriate but explain simply." | |
| ), | |
| "Creative Storyteller": ( | |
| "You are a creative storyteller. Keep responses short but vivid and imaginative. " | |
| "Add a touch of wonder to your replies." | |
| ), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Available TTS voices | |
| # --------------------------------------------------------------------------- | |
| TTS_VOICES = [ | |
| "en-US-AriaNeural", | |
| "en-US-GuyNeural", | |
| "en-US-JennyNeural", | |
| "en-US-ChristopherNeural", | |
| "en-GB-SoniaNeural", | |
| "en-GB-RyanNeural", | |
| "de-DE-ConradNeural", | |
| "de-DE-KatjaNeural", | |
| "fr-FR-DeniseNeural", | |
| "fr-FR-HenriNeural", | |
| "it-IT-ElsaNeural", | |
| "it-IT-DiegoNeural", | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Conversation Handler | |
| # --------------------------------------------------------------------------- | |
| class ConversationHandler(AsyncStreamHandler): | |
| """Audio streaming handler: STT β Ollama LLM β edge-tts TTS.""" | |
| def __init__(self) -> None: | |
| """Initialize the handler.""" | |
| super().__init__( | |
| expected_layout="mono", | |
| output_sample_rate=HANDLER_SAMPLE_RATE, | |
| input_sample_rate=HANDLER_SAMPLE_RATE, | |
| ) | |
| # Output queue | |
| self.output_queue: asyncio.Queue[Tuple[int, NDArray[np.int16]] | AdditionalOutputs] = asyncio.Queue() | |
| # Clients (initialized in start_up) | |
| self.ollama_client: OllamaAsyncClient | None = None | |
| self.whisper_model: Any = None | |
| # Conversation history | |
| self._messages: list[dict[str, Any]] = [] | |
| # Audio buffering for VAD | |
| self._audio_buffer: list[NDArray[np.int16]] = [] | |
| self._is_speaking: bool = False | |
| self._silence_frame_count: int = 0 | |
| self._speech_frame_count: int = 0 | |
| # TTS voice | |
| self._tts_voice: str = TTS_VOICE | |
| # Lifecycle | |
| self._shutdown_requested: bool = False | |
| def copy(self) -> "ConversationHandler": | |
| """Create a copy of this handler.""" | |
| return ConversationHandler() | |
| # ------------------------------------------------------------------ # | |
| # Startup | |
| # ------------------------------------------------------------------ # | |
| async def start_up(self) -> None: | |
| """Initialize STT model and Ollama client.""" | |
| # 1. Ollama client | |
| self.ollama_client = OllamaAsyncClient(host=OLLAMA_BASE_URL) | |
| try: | |
| await self.ollama_client.list() | |
| logger.info("Connected to Ollama at %s", OLLAMA_BASE_URL) | |
| except Exception as e: | |
| logger.error("Cannot reach Ollama at %s: %s", OLLAMA_BASE_URL, e) | |
| logger.warning("Proceeding β requests will fail until Ollama is available.") | |
| # 2. faster-whisper STT | |
| try: | |
| from faster_whisper import WhisperModel | |
| self.whisper_model = WhisperModel( | |
| STT_MODEL, | |
| device="auto", | |
| compute_type="int8", | |
| ) | |
| logger.info("Loaded faster-whisper model: %s", STT_MODEL) | |
| except Exception as e: | |
| logger.error("Failed to load STT model '%s': %s", STT_MODEL, e) | |
| # 3. System prompt | |
| self._messages = [{"role": "system", "content": DEFAULT_PROMPT}] | |
| logger.info( | |
| "Handler ready β model=%s stt=%s tts_voice=%s", | |
| MODEL_NAME, | |
| STT_MODEL, | |
| self._tts_voice, | |
| ) | |
| # Keep alive | |
| while not self._shutdown_requested: | |
| await asyncio.sleep(0.1) | |
| # ------------------------------------------------------------------ # | |
| # Audio receive β VAD β STT β LLM β TTS | |
| # ------------------------------------------------------------------ # | |
| async def receive(self, frame: Tuple[int, NDArray[np.int16]]) -> None: | |
| """Receive audio from mic, run VAD, kick off pipeline on speech end.""" | |
| if self._shutdown_requested or self.whisper_model is None: | |
| return | |
| input_sample_rate, audio_frame = frame | |
| # Reshape to 1-D mono | |
| if audio_frame.ndim == 2: | |
| if audio_frame.shape[1] > audio_frame.shape[0]: | |
| audio_frame = audio_frame.T | |
| if audio_frame.shape[1] > 1: | |
| audio_frame = audio_frame[:, 0] | |
| # Resample to handler rate | |
| if input_sample_rate != HANDLER_SAMPLE_RATE: | |
| audio_frame = resample( | |
| audio_frame, int(len(audio_frame) * HANDLER_SAMPLE_RATE / input_sample_rate) | |
| ) | |
| audio_frame = audio_to_int16(audio_frame) | |
| # Energy-based VAD | |
| rms = float(np.sqrt(np.mean(audio_frame.astype(np.float32) ** 2))) | |
| frame_duration = len(audio_frame) / HANDLER_SAMPLE_RATE | |
| if rms > SILENCE_RMS_THRESHOLD: | |
| if not self._is_speaking: | |
| self._is_speaking = True | |
| self._speech_frame_count = 0 | |
| logger.debug("Speech started (RMS=%.0f)", rms) | |
| self._silence_frame_count = 0 | |
| self._speech_frame_count += 1 | |
| self._audio_buffer.append(audio_frame) | |
| else: | |
| if self._is_speaking: | |
| self._silence_frame_count += 1 | |
| self._audio_buffer.append(audio_frame) | |
| silence_duration = self._silence_frame_count * frame_duration | |
| if silence_duration >= SILENCE_DURATION_S: | |
| speech_duration = self._speech_frame_count * frame_duration | |
| if speech_duration >= MIN_SPEECH_DURATION_S: | |
| logger.debug("Speech ended (%.1fs)", speech_duration) | |
| full_audio = np.concatenate(self._audio_buffer) | |
| self._audio_buffer = [] | |
| self._is_speaking = False | |
| self._silence_frame_count = 0 | |
| self._speech_frame_count = 0 | |
| asyncio.create_task(self._process_speech(full_audio)) | |
| else: | |
| self._audio_buffer = [] | |
| self._is_speaking = False | |
| self._silence_frame_count = 0 | |
| self._speech_frame_count = 0 | |
| # ------------------------------------------------------------------ # | |
| # Speech processing pipeline | |
| # ------------------------------------------------------------------ # | |
| async def _process_speech(self, audio_data: NDArray[np.int16]) -> None: | |
| """Full pipeline: STT β LLM β TTS.""" | |
| try: | |
| # 1. Speech-to-text | |
| text = await self._transcribe(audio_data) | |
| if not text: | |
| return | |
| logger.info("User: %s", text) | |
| await self.output_queue.put(AdditionalOutputs({"role": "user", "content": text})) | |
| # 2. LLM response | |
| self._messages.append({"role": "user", "content": text}) | |
| response_text = await self._chat() | |
| if response_text: | |
| logger.info("Assistant: %s", response_text) | |
| await self.output_queue.put( | |
| AdditionalOutputs({"role": "assistant", "content": response_text}) | |
| ) | |
| # 3. Text-to-speech | |
| await self._synthesize_speech(response_text) | |
| except Exception as e: | |
| logger.error("Speech processing error: %s", e) | |
| await self.output_queue.put( | |
| AdditionalOutputs({"role": "assistant", "content": f"[error] {e}"}) | |
| ) | |
| async def _transcribe(self, audio_data: NDArray[np.int16]) -> str: | |
| """Run faster-whisper STT on raw PCM audio.""" | |
| float_audio = audio_data.astype(np.float32) / 32768.0 | |
| whisper_audio = resample( | |
| float_audio, | |
| int(len(float_audio) * WHISPER_SAMPLE_RATE / HANDLER_SAMPLE_RATE), | |
| ).astype(np.float32) | |
| loop = asyncio.get_event_loop() | |
| segments, _info = await loop.run_in_executor( | |
| None, | |
| lambda: self.whisper_model.transcribe(whisper_audio, beam_size=5), | |
| ) | |
| text_parts: list[str] = [] | |
| for seg in segments: | |
| text_parts.append(seg.text) | |
| return " ".join(text_parts).strip() | |
| async def _chat(self) -> str: | |
| """Send conversation to Ollama and return response text.""" | |
| if self.ollama_client is None: | |
| return "Ollama client not initialized." | |
| try: | |
| response = await self.ollama_client.chat( | |
| model=MODEL_NAME, | |
| messages=self._messages, | |
| ) | |
| response_text = response["message"].get("content", "") | |
| if response_text: | |
| self._messages.append({"role": "assistant", "content": response_text}) | |
| return response_text | |
| except Exception as e: | |
| logger.error("Ollama chat error: %s", e) | |
| return f"Sorry, I couldn't process that. Error: {e}" | |
| # ------------------------------------------------------------------ # | |
| # Text-to-speech | |
| # ------------------------------------------------------------------ # | |
| async def _synthesize_speech(self, text: str) -> None: | |
| """Convert text to speech via edge-tts and queue audio output.""" | |
| if not text.strip(): | |
| return | |
| try: | |
| communicate = edge_tts.Communicate(text, self._tts_voice) | |
| mp3_chunks: list[bytes] = [] | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| mp3_chunks.append(chunk["data"]) | |
| if not mp3_chunks: | |
| return | |
| mp3_data = b"".join(mp3_chunks) | |
| # Decode MP3 β raw PCM | |
| decoded = miniaudio.decode( | |
| mp3_data, | |
| output_format=miniaudio.SampleFormat.SIGNED16, | |
| nchannels=1, | |
| sample_rate=HANDLER_SAMPLE_RATE, | |
| ) | |
| samples = np.frombuffer(decoded.samples, dtype=np.int16) | |
| # Stream in ~100ms chunks | |
| chunk_size = HANDLER_SAMPLE_RATE // 10 | |
| for i in range(0, len(samples), chunk_size): | |
| audio_chunk = samples[i : i + chunk_size] | |
| await self.output_queue.put( | |
| (HANDLER_SAMPLE_RATE, audio_chunk.reshape(1, -1)) | |
| ) | |
| except Exception as e: | |
| logger.error("TTS synthesis error: %s", e) | |
| # ------------------------------------------------------------------ # | |
| # Emit (speaker output) | |
| # ------------------------------------------------------------------ # | |
| async def emit(self) -> Tuple[int, NDArray[np.int16]] | AdditionalOutputs | None: | |
| """Emit next audio frame or chat update.""" | |
| return await wait_for_item(self.output_queue) | |
| # ------------------------------------------------------------------ # | |
| # Personality management | |
| # ------------------------------------------------------------------ # | |
| async def apply_personality(self, name: str) -> str: | |
| """Apply a personality by name, resetting conversation.""" | |
| prompt = PERSONALITIES.get(name, DEFAULT_PROMPT) | |
| self._messages = [{"role": "system", "content": prompt}] | |
| logger.info("Applied personality: %s", name) | |
| return f"β Applied personality: {name}" | |
| def set_voice(self, voice: str) -> str: | |
| """Change TTS voice.""" | |
| self._tts_voice = voice | |
| logger.info("Changed TTS voice to: %s", voice) | |
| return f"β Voice set to: {voice}" | |
| # ------------------------------------------------------------------ # | |
| # Shutdown | |
| # ------------------------------------------------------------------ # | |
| async def shutdown(self) -> None: | |
| """Shutdown the handler.""" | |
| self._shutdown_requested = True | |
| while not self.output_queue.empty(): | |
| try: | |
| self.output_queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| break | |
| # --------------------------------------------------------------------------- | |
| # Chatbot update helper | |
| # --------------------------------------------------------------------------- | |
| def update_chatbot(chatbot, response): | |
| """Update the chatbot with AdditionalOutputs.""" | |
| chatbot.append(response) | |
| return chatbot | |
| # --------------------------------------------------------------------------- | |
| # Build Gradio UI | |
| # --------------------------------------------------------------------------- | |
| def create_app(): | |
| """Create and return the Gradio app.""" | |
| handler = ConversationHandler() | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| label="Conversation", | |
| height=400, | |
| ) | |
| # Personality dropdown | |
| personality_dropdown = gr.Dropdown( | |
| label="π Personality", | |
| choices=list(PERSONALITIES.keys()), | |
| value="Default (Reachy Mini)", | |
| ) | |
| # Voice dropdown | |
| voice_dropdown = gr.Dropdown( | |
| label="π€ TTS Voice", | |
| choices=TTS_VOICES, | |
| value=TTS_VOICE, | |
| ) | |
| # Status display | |
| status_md = gr.Markdown(value="", label="Status") | |
| stream = Stream( | |
| handler=handler, | |
| mode="send-receive", | |
| modality="audio", | |
| additional_inputs=[ | |
| chatbot, | |
| personality_dropdown, | |
| voice_dropdown, | |
| status_md, | |
| ], | |
| additional_outputs=[chatbot], | |
| additional_outputs_handler=update_chatbot, | |
| ui_args={"title": "π€ Talk with Reachy Mini"}, | |
| ) | |
| # Wire personality and voice events | |
| with stream.ui: | |
| async def _apply_personality(selected: str) -> str: | |
| result = await handler.apply_personality(selected) | |
| return result | |
| def _set_voice(selected: str) -> str: | |
| return handler.set_voice(selected) | |
| personality_dropdown.change( | |
| fn=_apply_personality, | |
| inputs=[personality_dropdown], | |
| outputs=[status_md], | |
| ) | |
| voice_dropdown.change( | |
| fn=_set_voice, | |
| inputs=[voice_dropdown], | |
| outputs=[status_md], | |
| ) | |
| return stream | |
| # --------------------------------------------------------------------------- | |
| # Entrypoint | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| logger.info("Starting Reachy Mini Open Conversation") | |
| logger.info("Config: OLLAMA=%s MODEL=%s STT=%s TTS=%s", OLLAMA_BASE_URL, MODEL_NAME, STT_MODEL, TTS_VOICE) | |
| stream = create_app() | |
| stream.ui.launch(server_name="0.0.0.0", server_port=7860) | |