# ============================================================ # app/core/mimo_client.py - Multi-Model AI Client # # Tool calling + classify/search/alerts/booking → DeepSeek V4-Flash (primary) # Agentic tasks + AI listing + specialists → DeepSeek V4-Pro (primary) # Gemini Pro / Flash → dormant fallback (key pulled) # Vision (image/video) + Audio (STT) → MiMo-V2-Omni # # System prompt prefix caching is automatic on DeepSeek — identical system # prompts across requests are served from cache at ~1/50th the input cost. # Conversation messages and answers are always generated fresh. # # All consuming code calls get_mimo_client() unchanged. # Routing is transparent inside this module. # ============================================================ import asyncio import base64 import logging import os import time import tempfile from typing import Optional, List, Dict, Any, Tuple, Union import httpx from openai import AsyncOpenAI from app.core.llm_resilience import ( AllProvidersDown, CircuitBreaker, RetryPolicy, call_with_chain, call_with_resilience, ) logger = logging.getLogger(__name__) # Single retry policy for all brain calls — overridable per-instance. # max_attempts=1 → no retries on the same provider. If Gemini 429s, we # immediately fall through to DeepSeek instead of waiting ~8s on retries. # DeepSeek has its own paid quota and rarely 429s, so this trades a tiny # bit of resilience for much faster perceived latency on rate-limited turns. _DEFAULT_RETRY_POLICY = RetryPolicy( max_attempts=1, base_delay_seconds=0.5, max_delay_seconds=8.0, ) # ============================================================ # Configuration # ============================================================ MIMO_API_BASE = "https://api.xiaomimimo.com/v1" MIMO_MODEL = "mimo-v2.5" DEEPSEEK_API_BASE = "https://api.deepseek.com/v1" # V4-Flash: best for tool calling, classification, search, alerts, booking (fast + cheap) DEEPSEEK_FLASH_MODEL = "deepseek-v4-flash" # V4-Pro: best for agentic tasks, AI listing creation, specialist agents (1.6T MoE) DEEPSEEK_PRO_MODEL = "deepseek-v4-pro" # Google's OpenAI-compatible Gemini endpoint GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta/openai" GEMINI_PRO_MODEL = "gemini-2.5-pro" GEMINI_FLASH_MODEL = "gemini-2.5-flash" # OpenRouter as an alternative path to Gemini if no direct Gemini key is set OPENROUTER_API_BASE = "https://openrouter.ai/api/v1" OPENROUTER_GEMINI_PRO_MODEL = "google/gemini-2.5-pro" OPENROUTER_GEMINI_FLASH_MODEL = "google/gemini-2.5-flash" class MiMoClient: """ Multi-model client with a unified interface. - Tool calling / classify / search / alerts / booking → DeepSeek V4-Flash - Agentic tasks / AI listing / specialist agents → DeepSeek V4-Pro - Gemini Pro / Flash → dormant fallback - Vision (image/video) + Audio (STT) → MiMo-V2-Omni Both DeepSeek V4 models share the same API key and base URL. System prompt prefix caching is handled automatically by DeepSeek — no explicit cache_control headers are needed. All existing callers use the same API (chat, chat_text, chat_with_image, etc.). Routing is handled internally — no changes needed in consuming code. """ def __init__( self, api_key: Optional[str] = None, base_url: Optional[str] = None, model: Optional[str] = None, ): # ── MiMo (Vision + Audio) ── self._mimo_api_key = api_key or os.getenv("MIMO_API_KEY", "") self._mimo_base_url = base_url or os.getenv("MIMO_BASE_URL", MIMO_API_BASE) self._mimo_model = model or os.getenv("MIMO_MODEL", MIMO_MODEL) if not self._mimo_api_key: logger.warning("⚠️ MIMO_API_KEY not set - MiMo vision/audio unavailable") self._mimo_client = None else: self._mimo_client = AsyncOpenAI( api_key=self._mimo_api_key, base_url=self._mimo_base_url, timeout=120, default_headers={"api-key": self._mimo_api_key}, ) logger.info( f"✅ MiMo-V2-Omni client initialized for vision/audio " f"(model={self._mimo_model})" ) # ── DeepSeek V4 (Brain: Tool Calling + Agentic) ── # Single client — both Flash and Pro share the same API key and base URL. # Flash handles fast tool-calling tasks; Pro handles deep agentic work. self._ds_api_key = os.getenv("DEEPSEEK_API_KEY", "") self._ds_base_url = os.getenv("DEEPSEEK_BASE_URL", DEEPSEEK_API_BASE) self._ds_flash_model = os.getenv("DEEPSEEK_FLASH_MODEL", DEEPSEEK_FLASH_MODEL) self._ds_pro_model = os.getenv("DEEPSEEK_PRO_MODEL", DEEPSEEK_PRO_MODEL) if not self._ds_api_key: logger.warning( "⚠️ DEEPSEEK_API_KEY not set - text brain unavailable, " "falling back to MiMo for text" ) self._ds_client = None else: self._ds_client = AsyncOpenAI( api_key=self._ds_api_key, base_url=self._ds_base_url, timeout=120, ) logger.info( f"✅ DeepSeek V4 initialized — " f"flash={self._ds_flash_model} (tool calling / classify / search), " f"pro={self._ds_pro_model} (agentic / listing / specialists)" ) # ── Gemini (Brain — Pro for specialists, Flash for classify/general) ── # Two ways in: direct Google API key (preferred) or via OpenRouter # (which the project already has a key for). Until either is set, # Gemini is unavailable and the chain falls back to DeepSeek/MiMo. self._gemini_pro_client = None self._gemini_flash_client = None self._gemini_pro_model = GEMINI_PRO_MODEL self._gemini_flash_model = GEMINI_FLASH_MODEL gemini_direct_key = os.getenv("GEMINI_API_KEY", "") openrouter_key = os.getenv("OPENROUTER_API_KEY", "") gemini_base = os.getenv("GEMINI_BASE_URL", GEMINI_API_BASE) if gemini_direct_key: shared = AsyncOpenAI( api_key=gemini_direct_key, base_url=gemini_base, timeout=120, ) self._gemini_pro_client = shared self._gemini_flash_client = shared self._gemini_pro_model = os.getenv("GEMINI_PRO_MODEL", GEMINI_PRO_MODEL) self._gemini_flash_model = os.getenv("GEMINI_FLASH_MODEL", GEMINI_FLASH_MODEL) logger.info( f"✅ Gemini configured (direct Google API): " f"pro={self._gemini_pro_model}, flash={self._gemini_flash_model}" ) else: logger.info( "ℹ️ Gemini unavailable (set GEMINI_API_KEY). " "Brain will use DeepSeek → MiMo only." ) # ── Resilience: per-provider circuit breakers ── # Trip after 5 failures in 60s, stay open for 30s, then probe. # Separate breakers for Flash and Pro so a Pro outage doesn't # block Flash calls, and vice-versa. self._ds_flash_breaker = CircuitBreaker(name="deepseek-v4-flash") self._ds_pro_breaker = CircuitBreaker(name="deepseek-v4-pro") self._gemini_pro_breaker = CircuitBreaker(name="gemini-pro") self._gemini_flash_breaker = CircuitBreaker(name="gemini-flash") self._mimo_breaker = CircuitBreaker(name="mimo") self._retry_policy = _DEFAULT_RETRY_POLICY # Keep backward compat — old code checks this @property def _client(self): return self._ds_client or self._mimo_client @property def _model(self): # Default to Flash for any legacy callers that read this property directly return self._ds_flash_model if self._ds_client else self._mimo_model # Legacy alias — some callers reference _ds_model directly @property def _ds_model(self): return self._ds_flash_model @property def is_available(self) -> bool: return self._ds_client is not None or self._mimo_client is not None # ============================================================ # Core Chat — Text + Tool Calling via DeepSeek V3 # ============================================================ async def chat( self, messages: List[Dict[str, Any]], temperature: float = 0.7, max_tokens: int = 4096, tools: Optional[List[Dict]] = None, tool_choice: Optional[str] = None, tier: str = "fast", ) -> Dict[str, Any]: """Text chat + tool calling with tier-based routing and resilient failover. Tiers: - ``"premium"``: agentic tasks, AI listing creation, specialist agents. Chain: V4-Pro → V4-Flash → Gemini Pro → Gemini Flash → MiMo. - ``"fast"``: tool calling, classification, search, alerts, booking (default). Chain: V4-Flash → Gemini Flash → MiMo. Gemini slots remain in the chain but are dormant while GEMINI_API_KEY is unset — clients initialise as None and are skipped automatically. Re-adding the key at any time activates them as fallbacks with zero code changes. Each provider is wrapped in retry+breaker so a transient blip doesn't immediately bump traffic to the next tier. """ chain = self._build_chat_chain( tier, messages, temperature, max_tokens, tools, tool_choice, ) if not chain: raise RuntimeError( "No LLM client available - check GEMINI_API_KEY, " "OPENROUTER_API_KEY, DEEPSEEK_API_KEY, or MIMO_API_KEY" ) result, provider = await call_with_chain(chain, policy=self._retry_policy) logger.info( f"✅ {provider} responded ({len(result.get('content') or '')} chars, " f"tier={tier}, tokens={(result.get('usage') or {}).get('total_tokens', '?')})" ) return result def _build_chat_chain( self, tier: str, messages: List[Dict[str, Any]], temperature: float, max_tokens: int, tools: Optional[List[Dict]], tool_choice: Optional[str], ) -> list: """Build the (name, factory, breaker) chain for a given tier. Each factory is a zero-arg lambda that, when invoked, makes one attempt against its provider. The resilience layer applies retry per-factory and skips factories whose breakers are open. """ def _factory(client, model): return lambda: self._raw_chat( client, model, messages, temperature, max_tokens, tools, tool_choice, ) chain: list = [] # ── Primary: DeepSeek V4 ───────────────────────────────────────────── # Premium tier: V4-Pro leads (best agentic / specialist reasoning), # V4-Flash follows as its immediate fallback. # Fast tier: V4-Flash only (optimal for tool calling + classify). if self._ds_client is not None: if tier == "premium": chain.append(( f"DeepSeek-V4-Pro({self._ds_pro_model})", _factory(self._ds_client, self._ds_pro_model), self._ds_pro_breaker, )) chain.append(( f"DeepSeek-V4-Flash({self._ds_flash_model})", _factory(self._ds_client, self._ds_flash_model), self._ds_flash_breaker, )) # ── Dormant fallback: Gemini ───────────────────────────────────────── # These slots are skipped while GEMINI_API_KEY is unset (clients = None). # Re-add the key to activate them instantly — no code changes needed. if tier == "premium" and self._gemini_pro_client is not None: chain.append(( f"Gemini-Pro({self._gemini_pro_model})", _factory(self._gemini_pro_client, self._gemini_pro_model), self._gemini_pro_breaker, )) if self._gemini_flash_client is not None: chain.append(( f"Gemini-Flash({self._gemini_flash_model})", _factory(self._gemini_flash_client, self._gemini_flash_model), self._gemini_flash_breaker, )) # MiMo is intentionally NOT in this chain. # It is vision/audio only (_chat_mimo, transcribe_audio, analyze_image_url). # Adding it here causes it to receive tool schemas it can't handle and # return raw JSON blobs to users when DeepSeek/Gemini are unavailable. return chain # ============================================================ # Streaming text chat — token-level deltas # ============================================================ async def chat_stream( self, messages: List[Dict[str, Any]], temperature: float = 0.7, max_tokens: int = 4096, ): """Yield text deltas from the brain LLM as they arrive. Routes to DeepSeek when configured, MiMo otherwise. Tool calling is not surfaced in the stream — use ``chat()`` for tool use (the brain still uses non-streaming chat for that). This method exists for v2 brain integration where the final response text is streamed token-by-token. Note: retry/circuit-breaker is intentionally NOT applied to the streaming path. Streams are stateful and partial output makes retry semantics ambiguous — caller should handle stream errors. """ # Streaming uses V4-Flash — fast, low latency, ideal for token-level delivery client = self._ds_client or self._mimo_client model = self._ds_flash_model if self._ds_client else self._mimo_model if not client: raise RuntimeError( "No LLM client available - check DEEPSEEK_API_KEY or MIMO_API_KEY" ) kwargs: Dict[str, Any] = { "model": model, "messages": messages, "temperature": temperature, "max_completion_tokens": max_tokens, "stream": True, } stream = await client.chat.completions.create(**kwargs) async for chunk in stream: delta = chunk.choices[0].delta content = getattr(delta, "content", None) if content: yield content async def _raw_chat( self, client: AsyncOpenAI, model: str, messages: List[Dict[str, Any]], temperature: float, max_tokens: int, tools: Optional[List[Dict]], tool_choice: Optional[str], ) -> Dict[str, Any]: """Single API call without retry/fallback. Called by ``call_with_resilience``.""" start_time = time.time() kwargs: Dict[str, Any] = { "model": model, "messages": messages, "temperature": temperature, "max_completion_tokens": max_tokens, } if tools: kwargs["tools"] = tools if tool_choice: kwargs["tool_choice"] = tool_choice response = await client.chat.completions.create(**kwargs) duration = time.time() - start_time choice = response.choices[0] content = choice.message.content or "" tool_calls = choice.message.tool_calls usage: Dict[str, Any] = {} if response.usage: usage = { "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens, "duration_ms": int(duration * 1000), } return { "content": content, "tool_calls": tool_calls, "usage": usage, "model": model, "finish_reason": choice.finish_reason, } # ============================================================ # Text-only convenience # ============================================================ async def chat_text( self, messages: List[Dict[str, Any]], temperature: float = 0.7, max_tokens: int = 4096, ) -> str: """Simple text chat via DeepSeek V3 - returns just the response string.""" result = await self.chat(messages, temperature, max_tokens) return result["content"] # ============================================================ # Internal: MiMo-only chat (for vision/audio methods) # ============================================================ async def _chat_mimo( self, messages: List[Dict[str, Any]], temperature: float = 0.7, max_tokens: int = 4096, ) -> Dict[str, Any]: """ Internal method that ALWAYS uses MiMo for multimodal content. Called by vision and audio methods only. """ if not self._mimo_client: raise RuntimeError( "MiMo client not initialized - check MIMO_API_KEY " "(required for vision/audio)" ) start_time = time.time() kwargs = { "model": self._mimo_model, "messages": messages, "temperature": temperature, "max_completion_tokens": max_tokens, } response = await self._mimo_client.chat.completions.create(**kwargs) duration = time.time() - start_time choice = response.choices[0] content = choice.message.content or "" usage = {} if response.usage: usage = { "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens, "duration_ms": int(duration * 1000), } logger.info( f"✅ MiMo vision/audio responded ({len(content)} chars, " f"{duration:.1f}s, tokens={usage.get('total_tokens', '?')})" ) return { "content": content, "tool_calls": None, "usage": usage, "finish_reason": choice.finish_reason, } # ============================================================ # Vision: Image Analysis (via MiMo) # ============================================================ async def chat_with_image( self, text: str, image_url: str, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 4096, ) -> str: """Analyze an image with text prompt. Uses MiMo for vision.""" messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({ "role": "user", "content": [ {"type": "text", "text": text}, { "type": "image_url", "image_url": {"url": image_url}, }, ], }) result = await self._chat_mimo(messages, temperature, max_tokens) return result["content"] async def chat_with_image_bytes( self, text: str, image_bytes: bytes, mime_type: str = "image/jpeg", system_prompt: Optional[str] = None, ) -> str: """Analyze image from raw bytes (base64-encoded for API). Uses MiMo.""" b64 = base64.b64encode(image_bytes).decode("utf-8") data_uri = f"data:{mime_type};base64,{b64}" return await self.chat_with_image(text, data_uri, system_prompt) async def chat_with_multiple_images( self, text: str, images: List[Tuple[bytes, str]], system_prompt: Optional[str] = None, ) -> str: """Analyze multiple images at once. Uses MiMo.""" messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) content_parts = [{"type": "text", "text": text}] for image_bytes, mime_type in images: b64 = base64.b64encode(image_bytes).decode("utf-8") data_uri = f"data:{mime_type};base64,{b64}" content_parts.append({ "type": "image_url", "image_url": {"url": data_uri}, }) messages.append({"role": "user", "content": content_parts}) result = await self._chat_mimo(messages) return result["content"] # ============================================================ # Vision: Video Analysis (via MiMo) # ============================================================ async def chat_with_video_url( self, text: str, video_url: str, system_prompt: Optional[str] = None, ) -> str: """Analyze a video from URL. Uses MiMo.""" messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({ "role": "user", "content": [ {"type": "text", "text": text}, { "type": "video_url", "video_url": {"url": video_url}, }, ], }) result = await self._chat_mimo(messages, max_tokens=4096) return result["content"] async def chat_with_video_bytes( self, text: str, video_bytes: bytes, mime_type: str = "video/mp4", system_prompt: Optional[str] = None, ) -> str: """Analyze video from raw bytes. Uses MiMo.""" b64 = base64.b64encode(video_bytes).decode("utf-8") data_uri = f"data:{mime_type};base64,{b64}" return await self.chat_with_video_url(text, data_uri, system_prompt) # ============================================================ # Audio Understanding (via MiMo — replaces Whisper STT) # ============================================================ @staticmethod def _convert_audio_to_wav(audio_bytes: bytes) -> bytes: """Convert any audio format to WAV using ffmpeg for maximum compatibility.""" import subprocess with tempfile.NamedTemporaryFile(suffix=".input", delete=False) as inp, \ tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as out: inp.write(audio_bytes) inp.flush() inp_path, out_path = inp.name, out.name try: result = subprocess.run( ["ffmpeg", "-y", "-i", inp_path, "-ar", "16000", "-ac", "1", "-f", "wav", out_path], capture_output=True, timeout=30, ) if result.returncode != 0: logger.warning(f"ffmpeg conversion failed: {result.stderr[:200]}") return audio_bytes # fallback to original bytes with open(out_path, "rb") as f: return f.read() except FileNotFoundError: logger.warning("ffmpeg not found, sending original audio bytes") return audio_bytes finally: import os as _os for p in (inp_path, out_path): try: _os.unlink(p) except OSError: pass async def _fetch_audio_as_base64(self, audio_url: str) -> Tuple[str, str]: """ Download audio from URL, convert to WAV for compatibility, and return (base64_data, format). """ # Already a data URI if audio_url.startswith("data:"): header, b64_data = audio_url.split(",", 1) mime = header.split(":")[1].split(";")[0] fmt = mime.split("/")[1] if fmt != "wav": raw_bytes = base64.b64decode(b64_data) wav_bytes = self._convert_audio_to_wav(raw_bytes) b64_data = base64.b64encode(wav_bytes).decode("utf-8") fmt = "wav" return b64_data, fmt # Download from URL async with httpx.AsyncClient(timeout=30) as client: resp = await client.get(audio_url) resp.raise_for_status() raw_bytes = resp.content logger.info(f"🔊 Fetched audio: {len(raw_bytes)} bytes") # Convert to WAV for maximum MiMo compatibility wav_bytes = await asyncio.to_thread(self._convert_audio_to_wav, raw_bytes) b64_data = base64.b64encode(wav_bytes).decode("utf-8") logger.info(f"🔊 Converted to WAV: {len(wav_bytes)} bytes") return b64_data, "wav" async def understand_audio( self, audio_url: str, prompt: str = "Listen to this audio and respond appropriately. If the user is speaking, understand their intent and respond.", system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 4096, ) -> str: """ Understand raw audio directly via MiMo — no transcription step needed. """ b64_data, audio_format = await self._fetch_audio_as_base64(audio_url) messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({ "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "input_audio", "input_audio": { "data": b64_data, "format": audio_format, }, }, ], }) result = await self._chat_mimo(messages, temperature, max_tokens) return result["content"] async def understand_audio_bytes( self, audio_bytes: bytes, mime_type: str = "audio/wav", prompt: str = "Listen to this audio and respond appropriately.", system_prompt: Optional[str] = None, ) -> str: """Understand audio from raw bytes. Uses MiMo.""" b64 = base64.b64encode(audio_bytes).decode("utf-8") data_uri = f"data:{mime_type};base64,{b64}" return await self.understand_audio(data_uri, prompt, system_prompt) async def transcribe_audio( self, audio_url: str, ) -> Tuple[str, str]: """ Transcribe audio and detect language via MiMo. Replacement for Whisper STT. Returns: Tuple of (transcript, detected_language_code) """ prompt = """Listen to this audio carefully and respond with ONLY a JSON object: { "transcript": "exact words spoken by the user", "language": "two-letter language code (en, fr, es, pt, ar, etc.)" } Rules: - Transcribe EXACTLY what was said, word for word - Detect the language accurately - Return ONLY the JSON, no other text""" response = await self.understand_audio( audio_url=audio_url, prompt=prompt, temperature=0.3, ) # Parse JSON response (string-aware, brace-balanced) try: from app.ai.agent.json_utils import extract_json_object data = extract_json_object(response) if data: transcript = data.get("transcript", response) language = data.get("language", "en") return transcript, language except Exception: pass # Fallback: treat entire response as transcript logger.warning("Could not parse transcription JSON, using raw response") return response.strip(), "en" # ============================================================ # Multimodal: Audio + Image combined (via MiMo) # ============================================================ async def chat_multimodal( self, text: str, images: Optional[List[str]] = None, audio_url: Optional[str] = None, video_url: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 4096, ) -> str: """Send a multimodal request combining any modalities. Uses MiMo.""" messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) content_parts = [{"type": "text", "text": text}] if images: for img_url in images: content_parts.append({ "type": "image_url", "image_url": {"url": img_url}, }) if audio_url: b64_data, audio_format = await self._fetch_audio_as_base64(audio_url) content_parts.append({ "type": "input_audio", "input_audio": { "data": b64_data, "format": audio_format, }, }) if video_url: content_parts.append({ "type": "video_url", "video_url": {"url": video_url}, }) messages.append({"role": "user", "content": content_parts}) result = await self._chat_mimo(messages, temperature, max_tokens) return result["content"] # ============================================================ # Global Singleton # ============================================================ _mimo_client: Optional[MiMoClient] = None def get_mimo_client() -> MiMoClient: """Get or create the dual-model client (DeepSeek + MiMo).""" global _mimo_client if _mimo_client is None: _mimo_client = MiMoClient() return _mimo_client