AIDA / app /core /mimo_client.py
destinyebuka's picture
fixation
bc570ae
# ============================================================
# app/core/mimo_client.py - Multi-Model AI Client
#
# Tool calling + classify/search/alerts/booking → DeepSeek V4-Flash (primary)
# Agentic tasks + AI listing + specialists → DeepSeek V4-Pro (primary)
# Gemini Pro / Flash → dormant fallback (key pulled)
# Vision (image/video) + Audio (STT) → MiMo-V2-Omni
#
# System prompt prefix caching is automatic on DeepSeek — identical system
# prompts across requests are served from cache at ~1/50th the input cost.
# Conversation messages and answers are always generated fresh.
#
# All consuming code calls get_mimo_client() unchanged.
# Routing is transparent inside this module.
# ============================================================
import asyncio
import base64
import logging
import os
import time
import tempfile
from typing import Optional, List, Dict, Any, Tuple, Union
import httpx
from openai import AsyncOpenAI
from app.core.llm_resilience import (
AllProvidersDown,
CircuitBreaker,
RetryPolicy,
call_with_chain,
call_with_resilience,
)
logger = logging.getLogger(__name__)
# Single retry policy for all brain calls — overridable per-instance.
# max_attempts=1 → no retries on the same provider. If Gemini 429s, we
# immediately fall through to DeepSeek instead of waiting ~8s on retries.
# DeepSeek has its own paid quota and rarely 429s, so this trades a tiny
# bit of resilience for much faster perceived latency on rate-limited turns.
_DEFAULT_RETRY_POLICY = RetryPolicy(
max_attempts=1,
base_delay_seconds=0.5,
max_delay_seconds=8.0,
)
# ============================================================
# Configuration
# ============================================================
MIMO_API_BASE = "https://api.xiaomimimo.com/v1"
MIMO_MODEL = "mimo-v2.5"
DEEPSEEK_API_BASE = "https://api.deepseek.com/v1"
# V4-Flash: best for tool calling, classification, search, alerts, booking (fast + cheap)
DEEPSEEK_FLASH_MODEL = "deepseek-v4-flash"
# V4-Pro: best for agentic tasks, AI listing creation, specialist agents (1.6T MoE)
DEEPSEEK_PRO_MODEL = "deepseek-v4-pro"
# Google's OpenAI-compatible Gemini endpoint
GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta/openai"
GEMINI_PRO_MODEL = "gemini-2.5-pro"
GEMINI_FLASH_MODEL = "gemini-2.5-flash"
# OpenRouter as an alternative path to Gemini if no direct Gemini key is set
OPENROUTER_API_BASE = "https://openrouter.ai/api/v1"
OPENROUTER_GEMINI_PRO_MODEL = "google/gemini-2.5-pro"
OPENROUTER_GEMINI_FLASH_MODEL = "google/gemini-2.5-flash"
class MiMoClient:
"""
Multi-model client with a unified interface.
- Tool calling / classify / search / alerts / booking → DeepSeek V4-Flash
- Agentic tasks / AI listing / specialist agents → DeepSeek V4-Pro
- Gemini Pro / Flash → dormant fallback
- Vision (image/video) + Audio (STT) → MiMo-V2-Omni
Both DeepSeek V4 models share the same API key and base URL.
System prompt prefix caching is handled automatically by DeepSeek —
no explicit cache_control headers are needed.
All existing callers use the same API (chat, chat_text, chat_with_image, etc.).
Routing is handled internally — no changes needed in consuming code.
"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: Optional[str] = None,
):
# ── MiMo (Vision + Audio) ──
self._mimo_api_key = api_key or os.getenv("MIMO_API_KEY", "")
self._mimo_base_url = base_url or os.getenv("MIMO_BASE_URL", MIMO_API_BASE)
self._mimo_model = model or os.getenv("MIMO_MODEL", MIMO_MODEL)
if not self._mimo_api_key:
logger.warning("⚠️ MIMO_API_KEY not set - MiMo vision/audio unavailable")
self._mimo_client = None
else:
self._mimo_client = AsyncOpenAI(
api_key=self._mimo_api_key,
base_url=self._mimo_base_url,
timeout=120,
default_headers={"api-key": self._mimo_api_key},
)
logger.info(
f"✅ MiMo-V2-Omni client initialized for vision/audio "
f"(model={self._mimo_model})"
)
# ── DeepSeek V4 (Brain: Tool Calling + Agentic) ──
# Single client — both Flash and Pro share the same API key and base URL.
# Flash handles fast tool-calling tasks; Pro handles deep agentic work.
self._ds_api_key = os.getenv("DEEPSEEK_API_KEY", "")
self._ds_base_url = os.getenv("DEEPSEEK_BASE_URL", DEEPSEEK_API_BASE)
self._ds_flash_model = os.getenv("DEEPSEEK_FLASH_MODEL", DEEPSEEK_FLASH_MODEL)
self._ds_pro_model = os.getenv("DEEPSEEK_PRO_MODEL", DEEPSEEK_PRO_MODEL)
if not self._ds_api_key:
logger.warning(
"⚠️ DEEPSEEK_API_KEY not set - text brain unavailable, "
"falling back to MiMo for text"
)
self._ds_client = None
else:
self._ds_client = AsyncOpenAI(
api_key=self._ds_api_key,
base_url=self._ds_base_url,
timeout=120,
)
logger.info(
f"✅ DeepSeek V4 initialized — "
f"flash={self._ds_flash_model} (tool calling / classify / search), "
f"pro={self._ds_pro_model} (agentic / listing / specialists)"
)
# ── Gemini (Brain — Pro for specialists, Flash for classify/general) ──
# Two ways in: direct Google API key (preferred) or via OpenRouter
# (which the project already has a key for). Until either is set,
# Gemini is unavailable and the chain falls back to DeepSeek/MiMo.
self._gemini_pro_client = None
self._gemini_flash_client = None
self._gemini_pro_model = GEMINI_PRO_MODEL
self._gemini_flash_model = GEMINI_FLASH_MODEL
gemini_direct_key = os.getenv("GEMINI_API_KEY", "")
openrouter_key = os.getenv("OPENROUTER_API_KEY", "")
gemini_base = os.getenv("GEMINI_BASE_URL", GEMINI_API_BASE)
if gemini_direct_key:
shared = AsyncOpenAI(
api_key=gemini_direct_key,
base_url=gemini_base,
timeout=120,
)
self._gemini_pro_client = shared
self._gemini_flash_client = shared
self._gemini_pro_model = os.getenv("GEMINI_PRO_MODEL", GEMINI_PRO_MODEL)
self._gemini_flash_model = os.getenv("GEMINI_FLASH_MODEL", GEMINI_FLASH_MODEL)
logger.info(
f"✅ Gemini configured (direct Google API): "
f"pro={self._gemini_pro_model}, flash={self._gemini_flash_model}"
)
else:
logger.info(
"ℹ️ Gemini unavailable (set GEMINI_API_KEY). "
"Brain will use DeepSeek → MiMo only."
)
# ── Resilience: per-provider circuit breakers ──
# Trip after 5 failures in 60s, stay open for 30s, then probe.
# Separate breakers for Flash and Pro so a Pro outage doesn't
# block Flash calls, and vice-versa.
self._ds_flash_breaker = CircuitBreaker(name="deepseek-v4-flash")
self._ds_pro_breaker = CircuitBreaker(name="deepseek-v4-pro")
self._gemini_pro_breaker = CircuitBreaker(name="gemini-pro")
self._gemini_flash_breaker = CircuitBreaker(name="gemini-flash")
self._mimo_breaker = CircuitBreaker(name="mimo")
self._retry_policy = _DEFAULT_RETRY_POLICY
# Keep backward compat — old code checks this
@property
def _client(self):
return self._ds_client or self._mimo_client
@property
def _model(self):
# Default to Flash for any legacy callers that read this property directly
return self._ds_flash_model if self._ds_client else self._mimo_model
# Legacy alias — some callers reference _ds_model directly
@property
def _ds_model(self):
return self._ds_flash_model
@property
def is_available(self) -> bool:
return self._ds_client is not None or self._mimo_client is not None
# ============================================================
# Core Chat — Text + Tool Calling via DeepSeek V3
# ============================================================
async def chat(
self,
messages: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: int = 4096,
tools: Optional[List[Dict]] = None,
tool_choice: Optional[str] = None,
tier: str = "fast",
) -> Dict[str, Any]:
"""Text chat + tool calling with tier-based routing and resilient failover.
Tiers:
- ``"premium"``: agentic tasks, AI listing creation, specialist agents.
Chain: V4-Pro → V4-Flash → Gemini Pro → Gemini Flash → MiMo.
- ``"fast"``: tool calling, classification, search, alerts, booking
(default). Chain: V4-Flash → Gemini Flash → MiMo.
Gemini slots remain in the chain but are dormant while GEMINI_API_KEY
is unset — clients initialise as None and are skipped automatically.
Re-adding the key at any time activates them as fallbacks with zero
code changes.
Each provider is wrapped in retry+breaker so a transient blip
doesn't immediately bump traffic to the next tier.
"""
chain = self._build_chat_chain(
tier, messages, temperature, max_tokens, tools, tool_choice,
)
if not chain:
raise RuntimeError(
"No LLM client available - check GEMINI_API_KEY, "
"OPENROUTER_API_KEY, DEEPSEEK_API_KEY, or MIMO_API_KEY"
)
result, provider = await call_with_chain(chain, policy=self._retry_policy)
logger.info(
f"✅ {provider} responded ({len(result.get('content') or '')} chars, "
f"tier={tier}, tokens={(result.get('usage') or {}).get('total_tokens', '?')})"
)
return result
def _build_chat_chain(
self,
tier: str,
messages: List[Dict[str, Any]],
temperature: float,
max_tokens: int,
tools: Optional[List[Dict]],
tool_choice: Optional[str],
) -> list:
"""Build the (name, factory, breaker) chain for a given tier.
Each factory is a zero-arg lambda that, when invoked, makes one
attempt against its provider. The resilience layer applies retry
per-factory and skips factories whose breakers are open.
"""
def _factory(client, model):
return lambda: self._raw_chat(
client, model, messages, temperature, max_tokens, tools, tool_choice,
)
chain: list = []
# ── Primary: DeepSeek V4 ─────────────────────────────────────────────
# Premium tier: V4-Pro leads (best agentic / specialist reasoning),
# V4-Flash follows as its immediate fallback.
# Fast tier: V4-Flash only (optimal for tool calling + classify).
if self._ds_client is not None:
if tier == "premium":
chain.append((
f"DeepSeek-V4-Pro({self._ds_pro_model})",
_factory(self._ds_client, self._ds_pro_model),
self._ds_pro_breaker,
))
chain.append((
f"DeepSeek-V4-Flash({self._ds_flash_model})",
_factory(self._ds_client, self._ds_flash_model),
self._ds_flash_breaker,
))
# ── Dormant fallback: Gemini ─────────────────────────────────────────
# These slots are skipped while GEMINI_API_KEY is unset (clients = None).
# Re-add the key to activate them instantly — no code changes needed.
if tier == "premium" and self._gemini_pro_client is not None:
chain.append((
f"Gemini-Pro({self._gemini_pro_model})",
_factory(self._gemini_pro_client, self._gemini_pro_model),
self._gemini_pro_breaker,
))
if self._gemini_flash_client is not None:
chain.append((
f"Gemini-Flash({self._gemini_flash_model})",
_factory(self._gemini_flash_client, self._gemini_flash_model),
self._gemini_flash_breaker,
))
# MiMo is intentionally NOT in this chain.
# It is vision/audio only (_chat_mimo, transcribe_audio, analyze_image_url).
# Adding it here causes it to receive tool schemas it can't handle and
# return raw JSON blobs to users when DeepSeek/Gemini are unavailable.
return chain
# ============================================================
# Streaming text chat — token-level deltas
# ============================================================
async def chat_stream(
self,
messages: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: int = 4096,
):
"""Yield text deltas from the brain LLM as they arrive.
Routes to DeepSeek when configured, MiMo otherwise. Tool calling
is not surfaced in the stream — use ``chat()`` for tool use
(the brain still uses non-streaming chat for that). This method
exists for v2 brain integration where the final response text is
streamed token-by-token.
Note: retry/circuit-breaker is intentionally NOT applied to the
streaming path. Streams are stateful and partial output makes
retry semantics ambiguous — caller should handle stream errors.
"""
# Streaming uses V4-Flash — fast, low latency, ideal for token-level delivery
client = self._ds_client or self._mimo_client
model = self._ds_flash_model if self._ds_client else self._mimo_model
if not client:
raise RuntimeError(
"No LLM client available - check DEEPSEEK_API_KEY or MIMO_API_KEY"
)
kwargs: Dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_completion_tokens": max_tokens,
"stream": True,
}
stream = await client.chat.completions.create(**kwargs)
async for chunk in stream:
delta = chunk.choices[0].delta
content = getattr(delta, "content", None)
if content:
yield content
async def _raw_chat(
self,
client: AsyncOpenAI,
model: str,
messages: List[Dict[str, Any]],
temperature: float,
max_tokens: int,
tools: Optional[List[Dict]],
tool_choice: Optional[str],
) -> Dict[str, Any]:
"""Single API call without retry/fallback. Called by ``call_with_resilience``."""
start_time = time.time()
kwargs: Dict[str, Any] = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_completion_tokens": max_tokens,
}
if tools:
kwargs["tools"] = tools
if tool_choice:
kwargs["tool_choice"] = tool_choice
response = await client.chat.completions.create(**kwargs)
duration = time.time() - start_time
choice = response.choices[0]
content = choice.message.content or ""
tool_calls = choice.message.tool_calls
usage: Dict[str, Any] = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
"duration_ms": int(duration * 1000),
}
return {
"content": content,
"tool_calls": tool_calls,
"usage": usage,
"model": model,
"finish_reason": choice.finish_reason,
}
# ============================================================
# Text-only convenience
# ============================================================
async def chat_text(
self,
messages: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: int = 4096,
) -> str:
"""Simple text chat via DeepSeek V3 - returns just the response string."""
result = await self.chat(messages, temperature, max_tokens)
return result["content"]
# ============================================================
# Internal: MiMo-only chat (for vision/audio methods)
# ============================================================
async def _chat_mimo(
self,
messages: List[Dict[str, Any]],
temperature: float = 0.7,
max_tokens: int = 4096,
) -> Dict[str, Any]:
"""
Internal method that ALWAYS uses MiMo for multimodal content.
Called by vision and audio methods only.
"""
if not self._mimo_client:
raise RuntimeError(
"MiMo client not initialized - check MIMO_API_KEY "
"(required for vision/audio)"
)
start_time = time.time()
kwargs = {
"model": self._mimo_model,
"messages": messages,
"temperature": temperature,
"max_completion_tokens": max_tokens,
}
response = await self._mimo_client.chat.completions.create(**kwargs)
duration = time.time() - start_time
choice = response.choices[0]
content = choice.message.content or ""
usage = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
"duration_ms": int(duration * 1000),
}
logger.info(
f"✅ MiMo vision/audio responded ({len(content)} chars, "
f"{duration:.1f}s, tokens={usage.get('total_tokens', '?')})"
)
return {
"content": content,
"tool_calls": None,
"usage": usage,
"finish_reason": choice.finish_reason,
}
# ============================================================
# Vision: Image Analysis (via MiMo)
# ============================================================
async def chat_with_image(
self,
text: str,
image_url: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> str:
"""Analyze an image with text prompt. Uses MiMo for vision."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({
"role": "user",
"content": [
{"type": "text", "text": text},
{
"type": "image_url",
"image_url": {"url": image_url},
},
],
})
result = await self._chat_mimo(messages, temperature, max_tokens)
return result["content"]
async def chat_with_image_bytes(
self,
text: str,
image_bytes: bytes,
mime_type: str = "image/jpeg",
system_prompt: Optional[str] = None,
) -> str:
"""Analyze image from raw bytes (base64-encoded for API). Uses MiMo."""
b64 = base64.b64encode(image_bytes).decode("utf-8")
data_uri = f"data:{mime_type};base64,{b64}"
return await self.chat_with_image(text, data_uri, system_prompt)
async def chat_with_multiple_images(
self,
text: str,
images: List[Tuple[bytes, str]],
system_prompt: Optional[str] = None,
) -> str:
"""Analyze multiple images at once. Uses MiMo."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
content_parts = [{"type": "text", "text": text}]
for image_bytes, mime_type in images:
b64 = base64.b64encode(image_bytes).decode("utf-8")
data_uri = f"data:{mime_type};base64,{b64}"
content_parts.append({
"type": "image_url",
"image_url": {"url": data_uri},
})
messages.append({"role": "user", "content": content_parts})
result = await self._chat_mimo(messages)
return result["content"]
# ============================================================
# Vision: Video Analysis (via MiMo)
# ============================================================
async def chat_with_video_url(
self,
text: str,
video_url: str,
system_prompt: Optional[str] = None,
) -> str:
"""Analyze a video from URL. Uses MiMo."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({
"role": "user",
"content": [
{"type": "text", "text": text},
{
"type": "video_url",
"video_url": {"url": video_url},
},
],
})
result = await self._chat_mimo(messages, max_tokens=4096)
return result["content"]
async def chat_with_video_bytes(
self,
text: str,
video_bytes: bytes,
mime_type: str = "video/mp4",
system_prompt: Optional[str] = None,
) -> str:
"""Analyze video from raw bytes. Uses MiMo."""
b64 = base64.b64encode(video_bytes).decode("utf-8")
data_uri = f"data:{mime_type};base64,{b64}"
return await self.chat_with_video_url(text, data_uri, system_prompt)
# ============================================================
# Audio Understanding (via MiMo — replaces Whisper STT)
# ============================================================
@staticmethod
def _convert_audio_to_wav(audio_bytes: bytes) -> bytes:
"""Convert any audio format to WAV using ffmpeg for maximum compatibility."""
import subprocess
with tempfile.NamedTemporaryFile(suffix=".input", delete=False) as inp, \
tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as out:
inp.write(audio_bytes)
inp.flush()
inp_path, out_path = inp.name, out.name
try:
result = subprocess.run(
["ffmpeg", "-y", "-i", inp_path, "-ar", "16000", "-ac", "1", "-f", "wav", out_path],
capture_output=True, timeout=30,
)
if result.returncode != 0:
logger.warning(f"ffmpeg conversion failed: {result.stderr[:200]}")
return audio_bytes # fallback to original bytes
with open(out_path, "rb") as f:
return f.read()
except FileNotFoundError:
logger.warning("ffmpeg not found, sending original audio bytes")
return audio_bytes
finally:
import os as _os
for p in (inp_path, out_path):
try:
_os.unlink(p)
except OSError:
pass
async def _fetch_audio_as_base64(self, audio_url: str) -> Tuple[str, str]:
"""
Download audio from URL, convert to WAV for compatibility, and return (base64_data, format).
"""
# Already a data URI
if audio_url.startswith("data:"):
header, b64_data = audio_url.split(",", 1)
mime = header.split(":")[1].split(";")[0]
fmt = mime.split("/")[1]
if fmt != "wav":
raw_bytes = base64.b64decode(b64_data)
wav_bytes = self._convert_audio_to_wav(raw_bytes)
b64_data = base64.b64encode(wav_bytes).decode("utf-8")
fmt = "wav"
return b64_data, fmt
# Download from URL
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(audio_url)
resp.raise_for_status()
raw_bytes = resp.content
logger.info(f"🔊 Fetched audio: {len(raw_bytes)} bytes")
# Convert to WAV for maximum MiMo compatibility
wav_bytes = await asyncio.to_thread(self._convert_audio_to_wav, raw_bytes)
b64_data = base64.b64encode(wav_bytes).decode("utf-8")
logger.info(f"🔊 Converted to WAV: {len(wav_bytes)} bytes")
return b64_data, "wav"
async def understand_audio(
self,
audio_url: str,
prompt: str = "Listen to this audio and respond appropriately. If the user is speaking, understand their intent and respond.",
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> str:
"""
Understand raw audio directly via MiMo — no transcription step needed.
"""
b64_data, audio_format = await self._fetch_audio_as_base64(audio_url)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "input_audio",
"input_audio": {
"data": b64_data,
"format": audio_format,
},
},
],
})
result = await self._chat_mimo(messages, temperature, max_tokens)
return result["content"]
async def understand_audio_bytes(
self,
audio_bytes: bytes,
mime_type: str = "audio/wav",
prompt: str = "Listen to this audio and respond appropriately.",
system_prompt: Optional[str] = None,
) -> str:
"""Understand audio from raw bytes. Uses MiMo."""
b64 = base64.b64encode(audio_bytes).decode("utf-8")
data_uri = f"data:{mime_type};base64,{b64}"
return await self.understand_audio(data_uri, prompt, system_prompt)
async def transcribe_audio(
self,
audio_url: str,
) -> Tuple[str, str]:
"""
Transcribe audio and detect language via MiMo.
Replacement for Whisper STT.
Returns:
Tuple of (transcript, detected_language_code)
"""
prompt = """Listen to this audio carefully and respond with ONLY a JSON object:
{
"transcript": "exact words spoken by the user",
"language": "two-letter language code (en, fr, es, pt, ar, etc.)"
}
Rules:
- Transcribe EXACTLY what was said, word for word
- Detect the language accurately
- Return ONLY the JSON, no other text"""
response = await self.understand_audio(
audio_url=audio_url,
prompt=prompt,
temperature=0.3,
)
# Parse JSON response (string-aware, brace-balanced)
try:
from app.ai.agent.json_utils import extract_json_object
data = extract_json_object(response)
if data:
transcript = data.get("transcript", response)
language = data.get("language", "en")
return transcript, language
except Exception:
pass
# Fallback: treat entire response as transcript
logger.warning("Could not parse transcription JSON, using raw response")
return response.strip(), "en"
# ============================================================
# Multimodal: Audio + Image combined (via MiMo)
# ============================================================
async def chat_multimodal(
self,
text: str,
images: Optional[List[str]] = None,
audio_url: Optional[str] = None,
video_url: Optional[str] = None,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> str:
"""Send a multimodal request combining any modalities. Uses MiMo."""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
content_parts = [{"type": "text", "text": text}]
if images:
for img_url in images:
content_parts.append({
"type": "image_url",
"image_url": {"url": img_url},
})
if audio_url:
b64_data, audio_format = await self._fetch_audio_as_base64(audio_url)
content_parts.append({
"type": "input_audio",
"input_audio": {
"data": b64_data,
"format": audio_format,
},
})
if video_url:
content_parts.append({
"type": "video_url",
"video_url": {"url": video_url},
})
messages.append({"role": "user", "content": content_parts})
result = await self._chat_mimo(messages, temperature, max_tokens)
return result["content"]
# ============================================================
# Global Singleton
# ============================================================
_mimo_client: Optional[MiMoClient] = None
def get_mimo_client() -> MiMoClient:
"""Get or create the dual-model client (DeepSeek + MiMo)."""
global _mimo_client
if _mimo_client is None:
_mimo_client = MiMoClient()
return _mimo_client