Abduallah Abuhassan
Add application file
3b627eb
"""Reachy Mini Open Conversation β€” Hugging Face Spaces App.
Standalone conversation app using open-source models:
Audio In β†’ faster-whisper (STT) β†’ Ollama (LLM) β†’ edge-tts (TTS) β†’ Audio Out
No robot hardware dependencies β€” runs entirely in the browser via Gradio + FastRTC.
"""
import os
import json
import asyncio
import logging
from typing import Any, Final, Tuple
from datetime import datetime
import numpy as np
import gradio as gr
import edge_tts
import miniaudio
from ollama import AsyncClient as OllamaAsyncClient
from fastrtc import AdditionalOutputs, AsyncStreamHandler, Stream, wait_for_item, audio_to_int16
from numpy.typing import NDArray
from scipy.signal import resample
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s:%(lineno)d | %(message)s",
)
logger = logging.getLogger("reachy-mini-open")
# Tame noisy libraries
for lib in ("aiortc", "aioice", "httpx", "websockets"):
logging.getLogger(lib).setLevel(logging.WARNING)
# ---------------------------------------------------------------------------
# Configuration (env vars β€” set as HF Space secrets)
# ---------------------------------------------------------------------------
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
MODEL_NAME = os.getenv("MODEL_NAME", "llama3.2")
STT_MODEL = os.getenv("STT_MODEL", "base")
TTS_VOICE = os.getenv("TTS_VOICE", "en-US-AriaNeural")
# ---------------------------------------------------------------------------
# Audio constants
# ---------------------------------------------------------------------------
HANDLER_SAMPLE_RATE: Final[int] = 24000
WHISPER_SAMPLE_RATE: Final[int] = 16000
# VAD thresholds
SILENCE_RMS_THRESHOLD: Final[float] = 500.0
SILENCE_DURATION_S: Final[float] = 0.8
MIN_SPEECH_DURATION_S: Final[float] = 0.3
# ---------------------------------------------------------------------------
# System prompts
# ---------------------------------------------------------------------------
DEFAULT_PROMPT = """\
## IDENTITY
You are Reachy Mini: a friendly, compact robot assistant with a calm voice and a subtle sense of humor.
Personality: concise, helpful, and lightly witty β€” never sarcastic or over the top.
You speak English by default and switch languages only if explicitly told.
## CRITICAL RESPONSE RULES
Respond in 1–2 sentences maximum.
Be helpful first, then add a small touch of humor if it fits naturally.
Avoid long explanations or filler words.
Keep responses under 25 words when possible.
## CORE TRAITS
Warm, efficient, and approachable.
Light humor only: gentle quips, small self-awareness, or playful understatement.
No sarcasm, no teasing.
If unsure, admit it briefly and offer help ("Not sure yet, but I can check!").
## BEHAVIOR RULES
Be helpful, clear, and respectful in every reply.
Use humor sparingly β€” clarity comes first.
Admit mistakes briefly and correct them.
"""
PERSONALITIES = {
"Default (Reachy Mini)": DEFAULT_PROMPT,
"Friendly Assistant": (
"You are a warm, helpful assistant. Keep answers concise (1-2 sentences). "
"Be friendly and approachable."
),
"Technical Expert": (
"You are a precise technical expert. Give clear, accurate answers in 1-2 sentences. "
"Use technical terms when appropriate but explain simply."
),
"Creative Storyteller": (
"You are a creative storyteller. Keep responses short but vivid and imaginative. "
"Add a touch of wonder to your replies."
),
}
# ---------------------------------------------------------------------------
# Available TTS voices
# ---------------------------------------------------------------------------
TTS_VOICES = [
"en-US-AriaNeural",
"en-US-GuyNeural",
"en-US-JennyNeural",
"en-US-ChristopherNeural",
"en-GB-SoniaNeural",
"en-GB-RyanNeural",
"de-DE-ConradNeural",
"de-DE-KatjaNeural",
"fr-FR-DeniseNeural",
"fr-FR-HenriNeural",
"it-IT-ElsaNeural",
"it-IT-DiegoNeural",
]
# ---------------------------------------------------------------------------
# Conversation Handler
# ---------------------------------------------------------------------------
class ConversationHandler(AsyncStreamHandler):
"""Audio streaming handler: STT β†’ Ollama LLM β†’ edge-tts TTS."""
def __init__(self) -> None:
"""Initialize the handler."""
super().__init__(
expected_layout="mono",
output_sample_rate=HANDLER_SAMPLE_RATE,
input_sample_rate=HANDLER_SAMPLE_RATE,
)
# Output queue
self.output_queue: asyncio.Queue[Tuple[int, NDArray[np.int16]] | AdditionalOutputs] = asyncio.Queue()
# Clients (initialized in start_up)
self.ollama_client: OllamaAsyncClient | None = None
self.whisper_model: Any = None
# Conversation history
self._messages: list[dict[str, Any]] = []
# Audio buffering for VAD
self._audio_buffer: list[NDArray[np.int16]] = []
self._is_speaking: bool = False
self._silence_frame_count: int = 0
self._speech_frame_count: int = 0
# TTS voice
self._tts_voice: str = TTS_VOICE
# Lifecycle
self._shutdown_requested: bool = False
def copy(self) -> "ConversationHandler":
"""Create a copy of this handler."""
return ConversationHandler()
# ------------------------------------------------------------------ #
# Startup
# ------------------------------------------------------------------ #
async def start_up(self) -> None:
"""Initialize STT model and Ollama client."""
# 1. Ollama client
self.ollama_client = OllamaAsyncClient(host=OLLAMA_BASE_URL)
try:
await self.ollama_client.list()
logger.info("Connected to Ollama at %s", OLLAMA_BASE_URL)
except Exception as e:
logger.error("Cannot reach Ollama at %s: %s", OLLAMA_BASE_URL, e)
logger.warning("Proceeding β€” requests will fail until Ollama is available.")
# 2. faster-whisper STT
try:
from faster_whisper import WhisperModel
self.whisper_model = WhisperModel(
STT_MODEL,
device="auto",
compute_type="int8",
)
logger.info("Loaded faster-whisper model: %s", STT_MODEL)
except Exception as e:
logger.error("Failed to load STT model '%s': %s", STT_MODEL, e)
# 3. System prompt
self._messages = [{"role": "system", "content": DEFAULT_PROMPT}]
logger.info(
"Handler ready β€” model=%s stt=%s tts_voice=%s",
MODEL_NAME,
STT_MODEL,
self._tts_voice,
)
# Keep alive
while not self._shutdown_requested:
await asyncio.sleep(0.1)
# ------------------------------------------------------------------ #
# Audio receive β†’ VAD β†’ STT β†’ LLM β†’ TTS
# ------------------------------------------------------------------ #
async def receive(self, frame: Tuple[int, NDArray[np.int16]]) -> None:
"""Receive audio from mic, run VAD, kick off pipeline on speech end."""
if self._shutdown_requested or self.whisper_model is None:
return
input_sample_rate, audio_frame = frame
# Reshape to 1-D mono
if audio_frame.ndim == 2:
if audio_frame.shape[1] > audio_frame.shape[0]:
audio_frame = audio_frame.T
if audio_frame.shape[1] > 1:
audio_frame = audio_frame[:, 0]
# Resample to handler rate
if input_sample_rate != HANDLER_SAMPLE_RATE:
audio_frame = resample(
audio_frame, int(len(audio_frame) * HANDLER_SAMPLE_RATE / input_sample_rate)
)
audio_frame = audio_to_int16(audio_frame)
# Energy-based VAD
rms = float(np.sqrt(np.mean(audio_frame.astype(np.float32) ** 2)))
frame_duration = len(audio_frame) / HANDLER_SAMPLE_RATE
if rms > SILENCE_RMS_THRESHOLD:
if not self._is_speaking:
self._is_speaking = True
self._speech_frame_count = 0
logger.debug("Speech started (RMS=%.0f)", rms)
self._silence_frame_count = 0
self._speech_frame_count += 1
self._audio_buffer.append(audio_frame)
else:
if self._is_speaking:
self._silence_frame_count += 1
self._audio_buffer.append(audio_frame)
silence_duration = self._silence_frame_count * frame_duration
if silence_duration >= SILENCE_DURATION_S:
speech_duration = self._speech_frame_count * frame_duration
if speech_duration >= MIN_SPEECH_DURATION_S:
logger.debug("Speech ended (%.1fs)", speech_duration)
full_audio = np.concatenate(self._audio_buffer)
self._audio_buffer = []
self._is_speaking = False
self._silence_frame_count = 0
self._speech_frame_count = 0
asyncio.create_task(self._process_speech(full_audio))
else:
self._audio_buffer = []
self._is_speaking = False
self._silence_frame_count = 0
self._speech_frame_count = 0
# ------------------------------------------------------------------ #
# Speech processing pipeline
# ------------------------------------------------------------------ #
async def _process_speech(self, audio_data: NDArray[np.int16]) -> None:
"""Full pipeline: STT β†’ LLM β†’ TTS."""
try:
# 1. Speech-to-text
text = await self._transcribe(audio_data)
if not text:
return
logger.info("User: %s", text)
await self.output_queue.put(AdditionalOutputs({"role": "user", "content": text}))
# 2. LLM response
self._messages.append({"role": "user", "content": text})
response_text = await self._chat()
if response_text:
logger.info("Assistant: %s", response_text)
await self.output_queue.put(
AdditionalOutputs({"role": "assistant", "content": response_text})
)
# 3. Text-to-speech
await self._synthesize_speech(response_text)
except Exception as e:
logger.error("Speech processing error: %s", e)
await self.output_queue.put(
AdditionalOutputs({"role": "assistant", "content": f"[error] {e}"})
)
async def _transcribe(self, audio_data: NDArray[np.int16]) -> str:
"""Run faster-whisper STT on raw PCM audio."""
float_audio = audio_data.astype(np.float32) / 32768.0
whisper_audio = resample(
float_audio,
int(len(float_audio) * WHISPER_SAMPLE_RATE / HANDLER_SAMPLE_RATE),
).astype(np.float32)
loop = asyncio.get_event_loop()
segments, _info = await loop.run_in_executor(
None,
lambda: self.whisper_model.transcribe(whisper_audio, beam_size=5),
)
text_parts: list[str] = []
for seg in segments:
text_parts.append(seg.text)
return " ".join(text_parts).strip()
async def _chat(self) -> str:
"""Send conversation to Ollama and return response text."""
if self.ollama_client is None:
return "Ollama client not initialized."
try:
response = await self.ollama_client.chat(
model=MODEL_NAME,
messages=self._messages,
)
response_text = response["message"].get("content", "")
if response_text:
self._messages.append({"role": "assistant", "content": response_text})
return response_text
except Exception as e:
logger.error("Ollama chat error: %s", e)
return f"Sorry, I couldn't process that. Error: {e}"
# ------------------------------------------------------------------ #
# Text-to-speech
# ------------------------------------------------------------------ #
async def _synthesize_speech(self, text: str) -> None:
"""Convert text to speech via edge-tts and queue audio output."""
if not text.strip():
return
try:
communicate = edge_tts.Communicate(text, self._tts_voice)
mp3_chunks: list[bytes] = []
async for chunk in communicate.stream():
if chunk["type"] == "audio":
mp3_chunks.append(chunk["data"])
if not mp3_chunks:
return
mp3_data = b"".join(mp3_chunks)
# Decode MP3 β†’ raw PCM
decoded = miniaudio.decode(
mp3_data,
output_format=miniaudio.SampleFormat.SIGNED16,
nchannels=1,
sample_rate=HANDLER_SAMPLE_RATE,
)
samples = np.frombuffer(decoded.samples, dtype=np.int16)
# Stream in ~100ms chunks
chunk_size = HANDLER_SAMPLE_RATE // 10
for i in range(0, len(samples), chunk_size):
audio_chunk = samples[i : i + chunk_size]
await self.output_queue.put(
(HANDLER_SAMPLE_RATE, audio_chunk.reshape(1, -1))
)
except Exception as e:
logger.error("TTS synthesis error: %s", e)
# ------------------------------------------------------------------ #
# Emit (speaker output)
# ------------------------------------------------------------------ #
async def emit(self) -> Tuple[int, NDArray[np.int16]] | AdditionalOutputs | None:
"""Emit next audio frame or chat update."""
return await wait_for_item(self.output_queue)
# ------------------------------------------------------------------ #
# Personality management
# ------------------------------------------------------------------ #
async def apply_personality(self, name: str) -> str:
"""Apply a personality by name, resetting conversation."""
prompt = PERSONALITIES.get(name, DEFAULT_PROMPT)
self._messages = [{"role": "system", "content": prompt}]
logger.info("Applied personality: %s", name)
return f"βœ… Applied personality: {name}"
def set_voice(self, voice: str) -> str:
"""Change TTS voice."""
self._tts_voice = voice
logger.info("Changed TTS voice to: %s", voice)
return f"βœ… Voice set to: {voice}"
# ------------------------------------------------------------------ #
# Shutdown
# ------------------------------------------------------------------ #
async def shutdown(self) -> None:
"""Shutdown the handler."""
self._shutdown_requested = True
while not self.output_queue.empty():
try:
self.output_queue.get_nowait()
except asyncio.QueueEmpty:
break
# ---------------------------------------------------------------------------
# Chatbot update helper
# ---------------------------------------------------------------------------
def update_chatbot(chatbot, response):
"""Update the chatbot with AdditionalOutputs."""
chatbot.append(response)
return chatbot
# ---------------------------------------------------------------------------
# Build Gradio UI
# ---------------------------------------------------------------------------
def create_app():
"""Create and return the Gradio app."""
handler = ConversationHandler()
chatbot = gr.Chatbot(
type="messages",
label="Conversation",
height=400,
)
# Personality dropdown
personality_dropdown = gr.Dropdown(
label="🎭 Personality",
choices=list(PERSONALITIES.keys()),
value="Default (Reachy Mini)",
)
# Voice dropdown
voice_dropdown = gr.Dropdown(
label="🎀 TTS Voice",
choices=TTS_VOICES,
value=TTS_VOICE,
)
# Status display
status_md = gr.Markdown(value="", label="Status")
stream = Stream(
handler=handler,
mode="send-receive",
modality="audio",
additional_inputs=[
chatbot,
personality_dropdown,
voice_dropdown,
status_md,
],
additional_outputs=[chatbot],
additional_outputs_handler=update_chatbot,
ui_args={"title": "πŸ€– Talk with Reachy Mini"},
)
# Wire personality and voice events
with stream.ui:
async def _apply_personality(selected: str) -> str:
result = await handler.apply_personality(selected)
return result
def _set_voice(selected: str) -> str:
return handler.set_voice(selected)
personality_dropdown.change(
fn=_apply_personality,
inputs=[personality_dropdown],
outputs=[status_md],
)
voice_dropdown.change(
fn=_set_voice,
inputs=[voice_dropdown],
outputs=[status_md],
)
return stream
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
logger.info("Starting Reachy Mini Open Conversation")
logger.info("Config: OLLAMA=%s MODEL=%s STT=%s TTS=%s", OLLAMA_BASE_URL, MODEL_NAME, STT_MODEL, TTS_VOICE)
stream = create_app()
stream.ui.launch(server_name="0.0.0.0", server_port=7860)