Spaces:
Build error
Build error
| """Bidirectional local audio stream with optional settings UI. | |
| In headless mode, there is no Gradio UI. The app connects directly to the | |
| Ollama server for LLM inference. If Ollama is not reachable, the settings | |
| page shows the connection status. | |
| The settings UI is served from this package's ``static/`` folder and offers | |
| personality management. Once configured, streaming starts automatically. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import asyncio | |
| import logging | |
| from typing import List, Optional | |
| from pathlib import Path | |
| from fastrtc import AdditionalOutputs, audio_to_float32 | |
| from scipy.signal import resample | |
| from reachy_mini import ReachyMini | |
| from reachy_mini.media.media_manager import MediaBackend | |
| from reachy_mini_conversation_app.config import LOCKED_PROFILE, config | |
| from reachy_mini_conversation_app.ollama_handler import OllamaHandler | |
| from reachy_mini_conversation_app.headless_personality_ui import mount_personality_routes | |
| try: | |
| # FastAPI is provided by the Reachy Mini Apps runtime | |
| from fastapi import FastAPI, Response | |
| from pydantic import BaseModel | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from starlette.staticfiles import StaticFiles | |
| except Exception: # pragma: no cover - only loaded when settings_app is used | |
| FastAPI = object # type: ignore | |
| FileResponse = object # type: ignore | |
| JSONResponse = object # type: ignore | |
| StaticFiles = object # type: ignore | |
| BaseModel = object # type: ignore | |
| logger = logging.getLogger(__name__) | |
| class LocalStream: | |
| """LocalStream using Reachy Mini's recorder/player.""" | |
| def __init__( | |
| self, | |
| handler: OllamaHandler, | |
| robot: ReachyMini, | |
| *, | |
| settings_app: Optional[FastAPI] = None, | |
| instance_path: Optional[str] = None, | |
| ): | |
| """Initialize the stream with an Ollama handler and pipelines. | |
| - ``settings_app``: the Reachy Mini Apps FastAPI to attach settings endpoints. | |
| - ``instance_path``: directory where per-instance ``.env`` should be stored. | |
| """ | |
| self.handler = handler | |
| self._robot = robot | |
| self._stop_event = asyncio.Event() | |
| self._tasks: List[asyncio.Task[None]] = [] | |
| # Allow the handler to flush the player queue when appropriate. | |
| self.handler._clear_queue = self.clear_audio_queue | |
| self._settings_app: Optional[FastAPI] = settings_app | |
| self._instance_path: Optional[str] = instance_path | |
| self._settings_initialized = False | |
| self._asyncio_loop = None | |
| # ---- Personality persistence helpers ---- | |
| def _read_env_lines(self, env_path: Path) -> list[str]: | |
| """Load env file contents or a template as a list of lines.""" | |
| inst = env_path.parent | |
| try: | |
| if env_path.exists(): | |
| try: | |
| return env_path.read_text(encoding="utf-8").splitlines() | |
| except Exception: | |
| return [] | |
| template_text = None | |
| ex = inst / ".env.example" | |
| if ex.exists(): | |
| try: | |
| template_text = ex.read_text(encoding="utf-8") | |
| except Exception: | |
| template_text = None | |
| if template_text is None: | |
| try: | |
| cwd_example = Path.cwd() / ".env.example" | |
| if cwd_example.exists(): | |
| template_text = cwd_example.read_text(encoding="utf-8") | |
| except Exception: | |
| template_text = None | |
| if template_text is None: | |
| packaged = Path(__file__).parent / ".env.example" | |
| if packaged.exists(): | |
| try: | |
| template_text = packaged.read_text(encoding="utf-8") | |
| except Exception: | |
| template_text = None | |
| return template_text.splitlines() if template_text else [] | |
| except Exception: | |
| return [] | |
| def _persist_personality(self, profile: Optional[str]) -> None: | |
| """Persist the startup personality to the instance .env and config.""" | |
| if LOCKED_PROFILE is not None: | |
| return | |
| selection = (profile or "").strip() or None | |
| try: | |
| from reachy_mini_conversation_app.config import set_custom_profile | |
| set_custom_profile(selection) | |
| except Exception: | |
| pass | |
| if not self._instance_path: | |
| return | |
| try: | |
| env_path = Path(self._instance_path) / ".env" | |
| lines = self._read_env_lines(env_path) | |
| replaced = False | |
| for i, ln in enumerate(list(lines)): | |
| if ln.strip().startswith("REACHY_MINI_CUSTOM_PROFILE="): | |
| if selection: | |
| lines[i] = f"REACHY_MINI_CUSTOM_PROFILE={selection}" | |
| else: | |
| lines.pop(i) | |
| replaced = True | |
| break | |
| if selection and not replaced: | |
| lines.append(f"REACHY_MINI_CUSTOM_PROFILE={selection}") | |
| if selection is None and not env_path.exists(): | |
| return | |
| final_text = "\n".join(lines) + "\n" | |
| env_path.write_text(final_text, encoding="utf-8") | |
| logger.info("Persisted startup personality to %s", env_path) | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv(dotenv_path=str(env_path), override=True) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| logger.warning("Failed to persist REACHY_MINI_CUSTOM_PROFILE: %s", e) | |
| def _read_persisted_personality(self) -> Optional[str]: | |
| """Read persisted startup personality from instance .env (if any).""" | |
| if not self._instance_path: | |
| return None | |
| env_path = Path(self._instance_path) / ".env" | |
| try: | |
| if env_path.exists(): | |
| for ln in env_path.read_text(encoding="utf-8").splitlines(): | |
| if ln.strip().startswith("REACHY_MINI_CUSTOM_PROFILE="): | |
| _, _, val = ln.partition("=") | |
| v = val.strip() | |
| return v or None | |
| except Exception: | |
| pass | |
| return None | |
| def _init_settings_ui_if_needed(self) -> None: | |
| """Attach minimal settings UI to the settings app. | |
| Mounts a status page and personality management when a settings_app | |
| is provided. | |
| """ | |
| if self._settings_initialized: | |
| return | |
| if self._settings_app is None: | |
| return | |
| static_dir = Path(__file__).parent / "static" | |
| index_file = static_dir / "index.html" | |
| if hasattr(self._settings_app, "mount"): | |
| try: | |
| # Serve /static/* assets | |
| self._settings_app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") | |
| except Exception: | |
| pass | |
| # GET / -> index.html | |
| def _root() -> FileResponse: | |
| return FileResponse(str(index_file)) | |
| # GET /favicon.ico -> avoid noisy 404s | |
| def _favicon() -> Response: | |
| return Response(status_code=204) | |
| # GET /status -> Ollama connectivity check | |
| async def _status() -> JSONResponse: | |
| ollama_ok = False | |
| try: | |
| import httpx | |
| async with httpx.AsyncClient(timeout=3.0) as client: | |
| resp = await client.get(f"{config.OLLAMA_BASE_URL}/api/tags") | |
| ollama_ok = resp.status_code == 200 | |
| except Exception: | |
| pass | |
| return JSONResponse({"ollama_connected": ollama_ok, "model": config.MODEL_NAME}) | |
| # GET /ready -> whether backend finished loading tools | |
| def _ready() -> JSONResponse: | |
| try: | |
| mod = sys.modules.get("reachy_mini_conversation_app.tools.core_tools") | |
| ready = bool(getattr(mod, "_TOOLS_INITIALIZED", False)) if mod else False | |
| except Exception: | |
| ready = False | |
| return JSONResponse({"ready": ready}) | |
| self._settings_initialized = True | |
| def launch(self) -> None: | |
| """Start the recorder/player and run the async processing loops.""" | |
| self._stop_event.clear() | |
| # Try to load an existing instance .env first (covers subsequent runs) | |
| if self._instance_path: | |
| try: | |
| from dotenv import load_dotenv | |
| from reachy_mini_conversation_app.config import set_custom_profile | |
| env_path = Path(self._instance_path) / ".env" | |
| if env_path.exists(): | |
| load_dotenv(dotenv_path=str(env_path), override=True) | |
| if LOCKED_PROFILE is None: | |
| new_profile = os.getenv("REACHY_MINI_CUSTOM_PROFILE") | |
| if new_profile is not None: | |
| try: | |
| set_custom_profile(new_profile.strip() or None) | |
| except Exception: | |
| pass # Best-effort profile update | |
| except Exception: | |
| pass # Instance .env loading is optional; continue with defaults | |
| # Always expose settings UI if a settings app is available | |
| self._init_settings_ui_if_needed() | |
| # Start media | |
| self._robot.media.start_recording() | |
| self._robot.media.start_playing() | |
| time.sleep(1) # give some time to the pipelines to start | |
| async def runner() -> None: | |
| # Capture loop for cross-thread personality actions | |
| loop = asyncio.get_running_loop() | |
| self._asyncio_loop = loop # type: ignore[assignment] | |
| # Mount personality routes now that loop and handler are available | |
| try: | |
| if self._settings_app is not None: | |
| mount_personality_routes( | |
| self._settings_app, | |
| self.handler, | |
| lambda: self._asyncio_loop, | |
| persist_personality=self._persist_personality, | |
| get_persisted_personality=self._read_persisted_personality, | |
| ) | |
| except Exception: | |
| pass | |
| self._tasks = [ | |
| asyncio.create_task(self.handler.start_up(), name="ollama-handler"), | |
| asyncio.create_task(self.record_loop(), name="stream-record-loop"), | |
| asyncio.create_task(self.play_loop(), name="stream-play-loop"), | |
| ] | |
| try: | |
| await asyncio.gather(*self._tasks) | |
| except asyncio.CancelledError: | |
| logger.info("Tasks cancelled during shutdown") | |
| finally: | |
| # Ensure handler connection is closed | |
| await self.handler.shutdown() | |
| asyncio.run(runner()) | |
| def close(self) -> None: | |
| """Stop the stream and underlying media pipelines. | |
| This method: | |
| - Stops audio recording and playback first | |
| - Sets the stop event to signal async loops to terminate | |
| - Cancels all pending async tasks | |
| """ | |
| logger.info("Stopping LocalStream...") | |
| # Stop media pipelines FIRST before cancelling async tasks | |
| try: | |
| self._robot.media.stop_recording() | |
| except Exception as e: | |
| logger.debug(f"Error stopping recording (may already be stopped): {e}") | |
| try: | |
| self._robot.media.stop_playing() | |
| except Exception as e: | |
| logger.debug(f"Error stopping playback (may already be stopped): {e}") | |
| # Now signal async loops to stop | |
| self._stop_event.set() | |
| # Cancel all running tasks | |
| for task in self._tasks: | |
| if not task.done(): | |
| task.cancel() | |
| def clear_audio_queue(self) -> None: | |
| """Flush the player's appsrc to drop any queued audio immediately.""" | |
| logger.info("User intervention: flushing player queue") | |
| if self._robot.media.backend == MediaBackend.GSTREAMER: | |
| self._robot.media.audio.clear_player() | |
| elif self._robot.media.backend == MediaBackend.DEFAULT or self._robot.media.backend == MediaBackend.DEFAULT_NO_VIDEO: | |
| self._robot.media.audio.clear_output_buffer() | |
| self.handler.output_queue = asyncio.Queue() | |
| async def record_loop(self) -> None: | |
| """Read mic frames from the recorder and forward them to the handler.""" | |
| input_sample_rate = self._robot.media.get_input_audio_samplerate() | |
| logger.debug(f"Audio recording started at {input_sample_rate} Hz") | |
| while not self._stop_event.is_set(): | |
| audio_frame = self._robot.media.get_audio_sample() | |
| if audio_frame is not None: | |
| await self.handler.receive((input_sample_rate, audio_frame)) | |
| await asyncio.sleep(0) # avoid busy loop | |
| async def play_loop(self) -> None: | |
| """Fetch outputs from the handler: log text and play audio frames.""" | |
| while not self._stop_event.is_set(): | |
| handler_output = await self.handler.emit() | |
| if isinstance(handler_output, AdditionalOutputs): | |
| for msg in handler_output.args: | |
| content = msg.get("content", "") | |
| if isinstance(content, str): | |
| logger.info( | |
| "role=%s content=%s", | |
| msg.get("role"), | |
| content if len(content) < 500 else content[:500] + "…", | |
| ) | |
| elif isinstance(handler_output, tuple): | |
| input_sample_rate, audio_data = handler_output | |
| output_sample_rate = self._robot.media.get_output_audio_samplerate() | |
| # Reshape if needed | |
| if audio_data.ndim == 2: | |
| if audio_data.shape[1] > audio_data.shape[0]: | |
| audio_data = audio_data.T | |
| if audio_data.shape[1] > 1: | |
| audio_data = audio_data[:, 0] | |
| # Cast if needed | |
| audio_frame = audio_to_float32(audio_data) | |
| # Resample if needed | |
| if input_sample_rate != output_sample_rate: | |
| audio_frame = resample( | |
| audio_frame, | |
| int(len(audio_frame) * output_sample_rate / input_sample_rate), | |
| ) | |
| self._robot.media.push_audio_sample(audio_frame) | |
| else: | |
| logger.debug("Ignoring output type=%s", type(handler_output).__name__) | |
| await asyncio.sleep(0) # yield to event loop | |