Spaces:
Running
Running
| # ============================================================ | |
| # app/core/mimo_client.py - Multi-Model AI Client | |
| # | |
| # Tool calling + classify/search/alerts/booking → DeepSeek V4-Flash (primary) | |
| # Agentic tasks + AI listing + specialists → DeepSeek V4-Pro (primary) | |
| # Gemini Pro / Flash → dormant fallback (key pulled) | |
| # Vision (image/video) + Audio (STT) → MiMo-V2-Omni | |
| # | |
| # System prompt prefix caching is automatic on DeepSeek — identical system | |
| # prompts across requests are served from cache at ~1/50th the input cost. | |
| # Conversation messages and answers are always generated fresh. | |
| # | |
| # All consuming code calls get_mimo_client() unchanged. | |
| # Routing is transparent inside this module. | |
| # ============================================================ | |
| import asyncio | |
| import base64 | |
| import logging | |
| import os | |
| import time | |
| import tempfile | |
| from typing import Optional, List, Dict, Any, Tuple, Union | |
| import httpx | |
| from openai import AsyncOpenAI | |
| from app.core.llm_resilience import ( | |
| AllProvidersDown, | |
| CircuitBreaker, | |
| RetryPolicy, | |
| call_with_chain, | |
| call_with_resilience, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Single retry policy for all brain calls — overridable per-instance. | |
| # max_attempts=1 → no retries on the same provider. If Gemini 429s, we | |
| # immediately fall through to DeepSeek instead of waiting ~8s on retries. | |
| # DeepSeek has its own paid quota and rarely 429s, so this trades a tiny | |
| # bit of resilience for much faster perceived latency on rate-limited turns. | |
| _DEFAULT_RETRY_POLICY = RetryPolicy( | |
| max_attempts=1, | |
| base_delay_seconds=0.5, | |
| max_delay_seconds=8.0, | |
| ) | |
| # ============================================================ | |
| # Configuration | |
| # ============================================================ | |
| MIMO_API_BASE = "https://api.xiaomimimo.com/v1" | |
| MIMO_MODEL = "mimo-v2.5" | |
| DEEPSEEK_API_BASE = "https://api.deepseek.com/v1" | |
| # V4-Flash: best for tool calling, classification, search, alerts, booking (fast + cheap) | |
| DEEPSEEK_FLASH_MODEL = "deepseek-v4-flash" | |
| # V4-Pro: best for agentic tasks, AI listing creation, specialist agents (1.6T MoE) | |
| DEEPSEEK_PRO_MODEL = "deepseek-v4-pro" | |
| # Google's OpenAI-compatible Gemini endpoint | |
| GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta/openai" | |
| GEMINI_PRO_MODEL = "gemini-2.5-pro" | |
| GEMINI_FLASH_MODEL = "gemini-2.5-flash" | |
| # OpenRouter as an alternative path to Gemini if no direct Gemini key is set | |
| OPENROUTER_API_BASE = "https://openrouter.ai/api/v1" | |
| OPENROUTER_GEMINI_PRO_MODEL = "google/gemini-2.5-pro" | |
| OPENROUTER_GEMINI_FLASH_MODEL = "google/gemini-2.5-flash" | |
| class MiMoClient: | |
| """ | |
| Multi-model client with a unified interface. | |
| - Tool calling / classify / search / alerts / booking → DeepSeek V4-Flash | |
| - Agentic tasks / AI listing / specialist agents → DeepSeek V4-Pro | |
| - Gemini Pro / Flash → dormant fallback | |
| - Vision (image/video) + Audio (STT) → MiMo-V2-Omni | |
| Both DeepSeek V4 models share the same API key and base URL. | |
| System prompt prefix caching is handled automatically by DeepSeek — | |
| no explicit cache_control headers are needed. | |
| All existing callers use the same API (chat, chat_text, chat_with_image, etc.). | |
| Routing is handled internally — no changes needed in consuming code. | |
| """ | |
| def __init__( | |
| self, | |
| api_key: Optional[str] = None, | |
| base_url: Optional[str] = None, | |
| model: Optional[str] = None, | |
| ): | |
| # ── MiMo (Vision + Audio) ── | |
| self._mimo_api_key = api_key or os.getenv("MIMO_API_KEY", "") | |
| self._mimo_base_url = base_url or os.getenv("MIMO_BASE_URL", MIMO_API_BASE) | |
| self._mimo_model = model or os.getenv("MIMO_MODEL", MIMO_MODEL) | |
| if not self._mimo_api_key: | |
| logger.warning("⚠️ MIMO_API_KEY not set - MiMo vision/audio unavailable") | |
| self._mimo_client = None | |
| else: | |
| self._mimo_client = AsyncOpenAI( | |
| api_key=self._mimo_api_key, | |
| base_url=self._mimo_base_url, | |
| timeout=120, | |
| default_headers={"api-key": self._mimo_api_key}, | |
| ) | |
| logger.info( | |
| f"✅ MiMo-V2-Omni client initialized for vision/audio " | |
| f"(model={self._mimo_model})" | |
| ) | |
| # ── DeepSeek V4 (Brain: Tool Calling + Agentic) ── | |
| # Single client — both Flash and Pro share the same API key and base URL. | |
| # Flash handles fast tool-calling tasks; Pro handles deep agentic work. | |
| self._ds_api_key = os.getenv("DEEPSEEK_API_KEY", "") | |
| self._ds_base_url = os.getenv("DEEPSEEK_BASE_URL", DEEPSEEK_API_BASE) | |
| self._ds_flash_model = os.getenv("DEEPSEEK_FLASH_MODEL", DEEPSEEK_FLASH_MODEL) | |
| self._ds_pro_model = os.getenv("DEEPSEEK_PRO_MODEL", DEEPSEEK_PRO_MODEL) | |
| if not self._ds_api_key: | |
| logger.warning( | |
| "⚠️ DEEPSEEK_API_KEY not set - text brain unavailable, " | |
| "falling back to MiMo for text" | |
| ) | |
| self._ds_client = None | |
| else: | |
| self._ds_client = AsyncOpenAI( | |
| api_key=self._ds_api_key, | |
| base_url=self._ds_base_url, | |
| timeout=120, | |
| ) | |
| logger.info( | |
| f"✅ DeepSeek V4 initialized — " | |
| f"flash={self._ds_flash_model} (tool calling / classify / search), " | |
| f"pro={self._ds_pro_model} (agentic / listing / specialists)" | |
| ) | |
| # ── Gemini (Brain — Pro for specialists, Flash for classify/general) ── | |
| # Two ways in: direct Google API key (preferred) or via OpenRouter | |
| # (which the project already has a key for). Until either is set, | |
| # Gemini is unavailable and the chain falls back to DeepSeek/MiMo. | |
| self._gemini_pro_client = None | |
| self._gemini_flash_client = None | |
| self._gemini_pro_model = GEMINI_PRO_MODEL | |
| self._gemini_flash_model = GEMINI_FLASH_MODEL | |
| gemini_direct_key = os.getenv("GEMINI_API_KEY", "") | |
| openrouter_key = os.getenv("OPENROUTER_API_KEY", "") | |
| gemini_base = os.getenv("GEMINI_BASE_URL", GEMINI_API_BASE) | |
| if gemini_direct_key: | |
| shared = AsyncOpenAI( | |
| api_key=gemini_direct_key, | |
| base_url=gemini_base, | |
| timeout=120, | |
| ) | |
| self._gemini_pro_client = shared | |
| self._gemini_flash_client = shared | |
| self._gemini_pro_model = os.getenv("GEMINI_PRO_MODEL", GEMINI_PRO_MODEL) | |
| self._gemini_flash_model = os.getenv("GEMINI_FLASH_MODEL", GEMINI_FLASH_MODEL) | |
| logger.info( | |
| f"✅ Gemini configured (direct Google API): " | |
| f"pro={self._gemini_pro_model}, flash={self._gemini_flash_model}" | |
| ) | |
| else: | |
| logger.info( | |
| "ℹ️ Gemini unavailable (set GEMINI_API_KEY). " | |
| "Brain will use DeepSeek → MiMo only." | |
| ) | |
| # ── Resilience: per-provider circuit breakers ── | |
| # Trip after 5 failures in 60s, stay open for 30s, then probe. | |
| # Separate breakers for Flash and Pro so a Pro outage doesn't | |
| # block Flash calls, and vice-versa. | |
| self._ds_flash_breaker = CircuitBreaker(name="deepseek-v4-flash") | |
| self._ds_pro_breaker = CircuitBreaker(name="deepseek-v4-pro") | |
| self._gemini_pro_breaker = CircuitBreaker(name="gemini-pro") | |
| self._gemini_flash_breaker = CircuitBreaker(name="gemini-flash") | |
| self._mimo_breaker = CircuitBreaker(name="mimo") | |
| self._retry_policy = _DEFAULT_RETRY_POLICY | |
| # Keep backward compat — old code checks this | |
| def _client(self): | |
| return self._ds_client or self._mimo_client | |
| def _model(self): | |
| # Default to Flash for any legacy callers that read this property directly | |
| return self._ds_flash_model if self._ds_client else self._mimo_model | |
| # Legacy alias — some callers reference _ds_model directly | |
| def _ds_model(self): | |
| return self._ds_flash_model | |
| def is_available(self) -> bool: | |
| return self._ds_client is not None or self._mimo_client is not None | |
| # ============================================================ | |
| # Core Chat — Text + Tool Calling via DeepSeek V3 | |
| # ============================================================ | |
| async def chat( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| tools: Optional[List[Dict]] = None, | |
| tool_choice: Optional[str] = None, | |
| tier: str = "fast", | |
| ) -> Dict[str, Any]: | |
| """Text chat + tool calling with tier-based routing and resilient failover. | |
| Tiers: | |
| - ``"premium"``: agentic tasks, AI listing creation, specialist agents. | |
| Chain: V4-Pro → V4-Flash → Gemini Pro → Gemini Flash → MiMo. | |
| - ``"fast"``: tool calling, classification, search, alerts, booking | |
| (default). Chain: V4-Flash → Gemini Flash → MiMo. | |
| Gemini slots remain in the chain but are dormant while GEMINI_API_KEY | |
| is unset — clients initialise as None and are skipped automatically. | |
| Re-adding the key at any time activates them as fallbacks with zero | |
| code changes. | |
| Each provider is wrapped in retry+breaker so a transient blip | |
| doesn't immediately bump traffic to the next tier. | |
| """ | |
| chain = self._build_chat_chain( | |
| tier, messages, temperature, max_tokens, tools, tool_choice, | |
| ) | |
| if not chain: | |
| raise RuntimeError( | |
| "No LLM client available - check GEMINI_API_KEY, " | |
| "OPENROUTER_API_KEY, DEEPSEEK_API_KEY, or MIMO_API_KEY" | |
| ) | |
| result, provider = await call_with_chain(chain, policy=self._retry_policy) | |
| logger.info( | |
| f"✅ {provider} responded ({len(result.get('content') or '')} chars, " | |
| f"tier={tier}, tokens={(result.get('usage') or {}).get('total_tokens', '?')})" | |
| ) | |
| return result | |
| def _build_chat_chain( | |
| self, | |
| tier: str, | |
| messages: List[Dict[str, Any]], | |
| temperature: float, | |
| max_tokens: int, | |
| tools: Optional[List[Dict]], | |
| tool_choice: Optional[str], | |
| ) -> list: | |
| """Build the (name, factory, breaker) chain for a given tier. | |
| Each factory is a zero-arg lambda that, when invoked, makes one | |
| attempt against its provider. The resilience layer applies retry | |
| per-factory and skips factories whose breakers are open. | |
| """ | |
| def _factory(client, model): | |
| return lambda: self._raw_chat( | |
| client, model, messages, temperature, max_tokens, tools, tool_choice, | |
| ) | |
| chain: list = [] | |
| # ── Primary: DeepSeek V4 ───────────────────────────────────────────── | |
| # Premium tier: V4-Pro leads (best agentic / specialist reasoning), | |
| # V4-Flash follows as its immediate fallback. | |
| # Fast tier: V4-Flash only (optimal for tool calling + classify). | |
| if self._ds_client is not None: | |
| if tier == "premium": | |
| chain.append(( | |
| f"DeepSeek-V4-Pro({self._ds_pro_model})", | |
| _factory(self._ds_client, self._ds_pro_model), | |
| self._ds_pro_breaker, | |
| )) | |
| chain.append(( | |
| f"DeepSeek-V4-Flash({self._ds_flash_model})", | |
| _factory(self._ds_client, self._ds_flash_model), | |
| self._ds_flash_breaker, | |
| )) | |
| # ── Dormant fallback: Gemini ───────────────────────────────────────── | |
| # These slots are skipped while GEMINI_API_KEY is unset (clients = None). | |
| # Re-add the key to activate them instantly — no code changes needed. | |
| if tier == "premium" and self._gemini_pro_client is not None: | |
| chain.append(( | |
| f"Gemini-Pro({self._gemini_pro_model})", | |
| _factory(self._gemini_pro_client, self._gemini_pro_model), | |
| self._gemini_pro_breaker, | |
| )) | |
| if self._gemini_flash_client is not None: | |
| chain.append(( | |
| f"Gemini-Flash({self._gemini_flash_model})", | |
| _factory(self._gemini_flash_client, self._gemini_flash_model), | |
| self._gemini_flash_breaker, | |
| )) | |
| # MiMo is intentionally NOT in this chain. | |
| # It is vision/audio only (_chat_mimo, transcribe_audio, analyze_image_url). | |
| # Adding it here causes it to receive tool schemas it can't handle and | |
| # return raw JSON blobs to users when DeepSeek/Gemini are unavailable. | |
| return chain | |
| # ============================================================ | |
| # Streaming text chat — token-level deltas | |
| # ============================================================ | |
| async def chat_stream( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| ): | |
| """Yield text deltas from the brain LLM as they arrive. | |
| Routes to DeepSeek when configured, MiMo otherwise. Tool calling | |
| is not surfaced in the stream — use ``chat()`` for tool use | |
| (the brain still uses non-streaming chat for that). This method | |
| exists for v2 brain integration where the final response text is | |
| streamed token-by-token. | |
| Note: retry/circuit-breaker is intentionally NOT applied to the | |
| streaming path. Streams are stateful and partial output makes | |
| retry semantics ambiguous — caller should handle stream errors. | |
| """ | |
| # Streaming uses V4-Flash — fast, low latency, ideal for token-level delivery | |
| client = self._ds_client or self._mimo_client | |
| model = self._ds_flash_model if self._ds_client else self._mimo_model | |
| if not client: | |
| raise RuntimeError( | |
| "No LLM client available - check DEEPSEEK_API_KEY or MIMO_API_KEY" | |
| ) | |
| kwargs: Dict[str, Any] = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_completion_tokens": max_tokens, | |
| "stream": True, | |
| } | |
| stream = await client.chat.completions.create(**kwargs) | |
| async for chunk in stream: | |
| delta = chunk.choices[0].delta | |
| content = getattr(delta, "content", None) | |
| if content: | |
| yield content | |
| async def _raw_chat( | |
| self, | |
| client: AsyncOpenAI, | |
| model: str, | |
| messages: List[Dict[str, Any]], | |
| temperature: float, | |
| max_tokens: int, | |
| tools: Optional[List[Dict]], | |
| tool_choice: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """Single API call without retry/fallback. Called by ``call_with_resilience``.""" | |
| start_time = time.time() | |
| kwargs: Dict[str, Any] = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_completion_tokens": max_tokens, | |
| } | |
| if tools: | |
| kwargs["tools"] = tools | |
| if tool_choice: | |
| kwargs["tool_choice"] = tool_choice | |
| response = await client.chat.completions.create(**kwargs) | |
| duration = time.time() - start_time | |
| choice = response.choices[0] | |
| content = choice.message.content or "" | |
| tool_calls = choice.message.tool_calls | |
| usage: Dict[str, Any] = {} | |
| if response.usage: | |
| usage = { | |
| "prompt_tokens": response.usage.prompt_tokens, | |
| "completion_tokens": response.usage.completion_tokens, | |
| "total_tokens": response.usage.total_tokens, | |
| "duration_ms": int(duration * 1000), | |
| } | |
| return { | |
| "content": content, | |
| "tool_calls": tool_calls, | |
| "usage": usage, | |
| "model": model, | |
| "finish_reason": choice.finish_reason, | |
| } | |
| # ============================================================ | |
| # Text-only convenience | |
| # ============================================================ | |
| async def chat_text( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| ) -> str: | |
| """Simple text chat via DeepSeek V3 - returns just the response string.""" | |
| result = await self.chat(messages, temperature, max_tokens) | |
| return result["content"] | |
| # ============================================================ | |
| # Internal: MiMo-only chat (for vision/audio methods) | |
| # ============================================================ | |
| async def _chat_mimo( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Internal method that ALWAYS uses MiMo for multimodal content. | |
| Called by vision and audio methods only. | |
| """ | |
| if not self._mimo_client: | |
| raise RuntimeError( | |
| "MiMo client not initialized - check MIMO_API_KEY " | |
| "(required for vision/audio)" | |
| ) | |
| start_time = time.time() | |
| kwargs = { | |
| "model": self._mimo_model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_completion_tokens": max_tokens, | |
| } | |
| response = await self._mimo_client.chat.completions.create(**kwargs) | |
| duration = time.time() - start_time | |
| choice = response.choices[0] | |
| content = choice.message.content or "" | |
| usage = {} | |
| if response.usage: | |
| usage = { | |
| "prompt_tokens": response.usage.prompt_tokens, | |
| "completion_tokens": response.usage.completion_tokens, | |
| "total_tokens": response.usage.total_tokens, | |
| "duration_ms": int(duration * 1000), | |
| } | |
| logger.info( | |
| f"✅ MiMo vision/audio responded ({len(content)} chars, " | |
| f"{duration:.1f}s, tokens={usage.get('total_tokens', '?')})" | |
| ) | |
| return { | |
| "content": content, | |
| "tool_calls": None, | |
| "usage": usage, | |
| "finish_reason": choice.finish_reason, | |
| } | |
| # ============================================================ | |
| # Vision: Image Analysis (via MiMo) | |
| # ============================================================ | |
| async def chat_with_image( | |
| self, | |
| text: str, | |
| image_url: str, | |
| system_prompt: Optional[str] = None, | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| ) -> str: | |
| """Analyze an image with text prompt. Uses MiMo for vision.""" | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": text}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": image_url}, | |
| }, | |
| ], | |
| }) | |
| result = await self._chat_mimo(messages, temperature, max_tokens) | |
| return result["content"] | |
| async def chat_with_image_bytes( | |
| self, | |
| text: str, | |
| image_bytes: bytes, | |
| mime_type: str = "image/jpeg", | |
| system_prompt: Optional[str] = None, | |
| ) -> str: | |
| """Analyze image from raw bytes (base64-encoded for API). Uses MiMo.""" | |
| b64 = base64.b64encode(image_bytes).decode("utf-8") | |
| data_uri = f"data:{mime_type};base64,{b64}" | |
| return await self.chat_with_image(text, data_uri, system_prompt) | |
| async def chat_with_multiple_images( | |
| self, | |
| text: str, | |
| images: List[Tuple[bytes, str]], | |
| system_prompt: Optional[str] = None, | |
| ) -> str: | |
| """Analyze multiple images at once. Uses MiMo.""" | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| content_parts = [{"type": "text", "text": text}] | |
| for image_bytes, mime_type in images: | |
| b64 = base64.b64encode(image_bytes).decode("utf-8") | |
| data_uri = f"data:{mime_type};base64,{b64}" | |
| content_parts.append({ | |
| "type": "image_url", | |
| "image_url": {"url": data_uri}, | |
| }) | |
| messages.append({"role": "user", "content": content_parts}) | |
| result = await self._chat_mimo(messages) | |
| return result["content"] | |
| # ============================================================ | |
| # Vision: Video Analysis (via MiMo) | |
| # ============================================================ | |
| async def chat_with_video_url( | |
| self, | |
| text: str, | |
| video_url: str, | |
| system_prompt: Optional[str] = None, | |
| ) -> str: | |
| """Analyze a video from URL. Uses MiMo.""" | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": text}, | |
| { | |
| "type": "video_url", | |
| "video_url": {"url": video_url}, | |
| }, | |
| ], | |
| }) | |
| result = await self._chat_mimo(messages, max_tokens=4096) | |
| return result["content"] | |
| async def chat_with_video_bytes( | |
| self, | |
| text: str, | |
| video_bytes: bytes, | |
| mime_type: str = "video/mp4", | |
| system_prompt: Optional[str] = None, | |
| ) -> str: | |
| """Analyze video from raw bytes. Uses MiMo.""" | |
| b64 = base64.b64encode(video_bytes).decode("utf-8") | |
| data_uri = f"data:{mime_type};base64,{b64}" | |
| return await self.chat_with_video_url(text, data_uri, system_prompt) | |
| # ============================================================ | |
| # Audio Understanding (via MiMo — replaces Whisper STT) | |
| # ============================================================ | |
| def _convert_audio_to_wav(audio_bytes: bytes) -> bytes: | |
| """Convert any audio format to WAV using ffmpeg for maximum compatibility.""" | |
| import subprocess | |
| with tempfile.NamedTemporaryFile(suffix=".input", delete=False) as inp, \ | |
| tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as out: | |
| inp.write(audio_bytes) | |
| inp.flush() | |
| inp_path, out_path = inp.name, out.name | |
| try: | |
| result = subprocess.run( | |
| ["ffmpeg", "-y", "-i", inp_path, "-ar", "16000", "-ac", "1", "-f", "wav", out_path], | |
| capture_output=True, timeout=30, | |
| ) | |
| if result.returncode != 0: | |
| logger.warning(f"ffmpeg conversion failed: {result.stderr[:200]}") | |
| return audio_bytes # fallback to original bytes | |
| with open(out_path, "rb") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| logger.warning("ffmpeg not found, sending original audio bytes") | |
| return audio_bytes | |
| finally: | |
| import os as _os | |
| for p in (inp_path, out_path): | |
| try: | |
| _os.unlink(p) | |
| except OSError: | |
| pass | |
| async def _fetch_audio_as_base64(self, audio_url: str) -> Tuple[str, str]: | |
| """ | |
| Download audio from URL, convert to WAV for compatibility, and return (base64_data, format). | |
| """ | |
| # Already a data URI | |
| if audio_url.startswith("data:"): | |
| header, b64_data = audio_url.split(",", 1) | |
| mime = header.split(":")[1].split(";")[0] | |
| fmt = mime.split("/")[1] | |
| if fmt != "wav": | |
| raw_bytes = base64.b64decode(b64_data) | |
| wav_bytes = self._convert_audio_to_wav(raw_bytes) | |
| b64_data = base64.b64encode(wav_bytes).decode("utf-8") | |
| fmt = "wav" | |
| return b64_data, fmt | |
| # Download from URL | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| resp = await client.get(audio_url) | |
| resp.raise_for_status() | |
| raw_bytes = resp.content | |
| logger.info(f"🔊 Fetched audio: {len(raw_bytes)} bytes") | |
| # Convert to WAV for maximum MiMo compatibility | |
| wav_bytes = await asyncio.to_thread(self._convert_audio_to_wav, raw_bytes) | |
| b64_data = base64.b64encode(wav_bytes).decode("utf-8") | |
| logger.info(f"🔊 Converted to WAV: {len(wav_bytes)} bytes") | |
| return b64_data, "wav" | |
| async def understand_audio( | |
| self, | |
| audio_url: str, | |
| prompt: str = "Listen to this audio and respond appropriately. If the user is speaking, understand their intent and respond.", | |
| system_prompt: Optional[str] = None, | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| ) -> str: | |
| """ | |
| Understand raw audio directly via MiMo — no transcription step needed. | |
| """ | |
| b64_data, audio_format = await self._fetch_audio_as_base64(audio_url) | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "input_audio", | |
| "input_audio": { | |
| "data": b64_data, | |
| "format": audio_format, | |
| }, | |
| }, | |
| ], | |
| }) | |
| result = await self._chat_mimo(messages, temperature, max_tokens) | |
| return result["content"] | |
| async def understand_audio_bytes( | |
| self, | |
| audio_bytes: bytes, | |
| mime_type: str = "audio/wav", | |
| prompt: str = "Listen to this audio and respond appropriately.", | |
| system_prompt: Optional[str] = None, | |
| ) -> str: | |
| """Understand audio from raw bytes. Uses MiMo.""" | |
| b64 = base64.b64encode(audio_bytes).decode("utf-8") | |
| data_uri = f"data:{mime_type};base64,{b64}" | |
| return await self.understand_audio(data_uri, prompt, system_prompt) | |
| async def transcribe_audio( | |
| self, | |
| audio_url: str, | |
| ) -> Tuple[str, str]: | |
| """ | |
| Transcribe audio and detect language via MiMo. | |
| Replacement for Whisper STT. | |
| Returns: | |
| Tuple of (transcript, detected_language_code) | |
| """ | |
| prompt = """Listen to this audio carefully and respond with ONLY a JSON object: | |
| { | |
| "transcript": "exact words spoken by the user", | |
| "language": "two-letter language code (en, fr, es, pt, ar, etc.)" | |
| } | |
| Rules: | |
| - Transcribe EXACTLY what was said, word for word | |
| - Detect the language accurately | |
| - Return ONLY the JSON, no other text""" | |
| response = await self.understand_audio( | |
| audio_url=audio_url, | |
| prompt=prompt, | |
| temperature=0.3, | |
| ) | |
| # Parse JSON response (string-aware, brace-balanced) | |
| try: | |
| from app.ai.agent.json_utils import extract_json_object | |
| data = extract_json_object(response) | |
| if data: | |
| transcript = data.get("transcript", response) | |
| language = data.get("language", "en") | |
| return transcript, language | |
| except Exception: | |
| pass | |
| # Fallback: treat entire response as transcript | |
| logger.warning("Could not parse transcription JSON, using raw response") | |
| return response.strip(), "en" | |
| # ============================================================ | |
| # Multimodal: Audio + Image combined (via MiMo) | |
| # ============================================================ | |
| async def chat_multimodal( | |
| self, | |
| text: str, | |
| images: Optional[List[str]] = None, | |
| audio_url: Optional[str] = None, | |
| video_url: Optional[str] = None, | |
| system_prompt: Optional[str] = None, | |
| temperature: float = 0.7, | |
| max_tokens: int = 4096, | |
| ) -> str: | |
| """Send a multimodal request combining any modalities. Uses MiMo.""" | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| content_parts = [{"type": "text", "text": text}] | |
| if images: | |
| for img_url in images: | |
| content_parts.append({ | |
| "type": "image_url", | |
| "image_url": {"url": img_url}, | |
| }) | |
| if audio_url: | |
| b64_data, audio_format = await self._fetch_audio_as_base64(audio_url) | |
| content_parts.append({ | |
| "type": "input_audio", | |
| "input_audio": { | |
| "data": b64_data, | |
| "format": audio_format, | |
| }, | |
| }) | |
| if video_url: | |
| content_parts.append({ | |
| "type": "video_url", | |
| "video_url": {"url": video_url}, | |
| }) | |
| messages.append({"role": "user", "content": content_parts}) | |
| result = await self._chat_mimo(messages, temperature, max_tokens) | |
| return result["content"] | |
| # ============================================================ | |
| # Global Singleton | |
| # ============================================================ | |
| _mimo_client: Optional[MiMoClient] = None | |
| def get_mimo_client() -> MiMoClient: | |
| """Get or create the dual-model client (DeepSeek + MiMo).""" | |
| global _mimo_client | |
| if _mimo_client is None: | |
| _mimo_client = MiMoClient() | |
| return _mimo_client | |