""" Multi-Model LLM Client (Python) — All FREE on NVIDIA NIM 3 models, 1 provider, 1 API key, $0 cost: 1. MiniMax M2.7 → Best reasoning, 4M context, built-in CoT 2. LLaMA 3.3 70B → Reliable fallback 3. LLaMA 3.1 8B → Fast, simple tasks 4. Deterministic → Zero LLM fallback """ import time import json import hashlib import logging from typing import Optional from openai import AsyncOpenAI from config import settings logger = logging.getLogger(__name__) # ─── Model configs (ALL on NVIDIA NIM) ─────────────────────── MODEL_CONFIGS = [ { "name": "MiniMax M2.7", "model": "minimaxai/minimax-m2.7", "max_context": 4_000_000, "best_for": "profiling, scoring, complex reasoning", }, { "name": "LLaMA 3.3 70B", "model": "meta/llama-3.3-70b-instruct", "max_context": 128_000, "best_for": "general tasks, reliable fallback", }, { "name": "LLaMA 3.1 8B", "model": "meta/llama-3.1-8b-instruct", "max_context": 128_000, "best_for": "email classification, simple checks", }, ] # ─── Shared client (single provider) ───────────────────────── _client: Optional[AsyncOpenAI] = None def get_client() -> AsyncOpenAI: global _client if _client is None: _client = AsyncOpenAI( base_url=settings.NVIDIA_NIM_BASE_URL, api_key=settings.NVIDIA_API_KEY, ) return _client # ─── Main LLM call ─────────────────────────────────────────── async def call_llm( operation: str, system_prompt: str, user_prompt: str, model_index: int = 0, temperature: float = 0.2, max_tokens: int = 1024, json_mode: bool = True, trace_id: str = "", company_id: str = None, ) -> dict: """Call LLM with fallback: MiniMax → LLaMA 70B → LLaMA 8B → Deterministic""" if model_index >= len(MODEL_CONFIGS): logger.error(f"ALL models failed for {operation} — deterministic fallback") return _deterministic_fallback() config = MODEL_CONFIGS[model_index] client = get_client() start = time.time() try: kwargs = { "model": config["model"], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": temperature, "max_tokens": max_tokens, "top_p": 0.9, } if json_mode: kwargs["response_format"] = {"type": "json_object"} response = await client.chat.completions.create(**kwargs) message = response.choices[0].message content = message.content or "" reasoning = getattr(message, "reasoning_content", None) usage = response.usage latency_ms = int((time.time() - start) * 1000) parsed = _safe_parse_json(content) if json_mode else None if json_mode and parsed is None: logger.warning(f"JSON parse failed on {config['name']} — next model") return await call_llm(operation, system_prompt, user_prompt, model_index + 1, temperature, max_tokens, json_mode, trace_id, company_id) result = { "content": content, "reasoning": reasoning, "parsed": parsed, "model": config["name"], "provider": "nvidia", "tokens": { "prompt": usage.prompt_tokens if usage else 0, "completion": usage.completion_tokens if usage else 0, "total": usage.total_tokens if usage else 0, }, "latency_ms": latency_ms, "fallback_used": False, } if reasoning: logger.debug(f"MiniMax reasoning: {reasoning[:150]}...") await _log_trace(trace_id, operation, config["name"], result, True, company_id) return result except Exception as e: error_msg = str(e) if "429" in error_msg: logger.warning(f"Rate limited on {config['name']} — waiting 10s") await _async_sleep(10) return await call_llm(operation, system_prompt, user_prompt, model_index, temperature, max_tokens, json_mode, trace_id, company_id) logger.warning(f"{config['name']} failed ({error_msg[:80]}) — next model") return await call_llm(operation, system_prompt, user_prompt, model_index + 1, temperature, max_tokens, json_mode, trace_id, company_id) def _deterministic_fallback() -> dict: return { "content": "", "reasoning": None, "parsed": None, "model": "deterministic_fallback", "provider": "none", "tokens": {"prompt": 0, "completion": 0, "total": 0}, "latency_ms": 0, "fallback_used": True, } # ─── Self-consistency check ────────────────────────────────── async def call_with_consistency( operation: str, system_prompt: str, user_prompt: str, trace_id: str = "", company_id: str = None, ) -> dict: primary = await call_llm(operation, system_prompt, user_prompt, temperature=0.1, trace_id=trace_id, company_id=company_id) if operation not in ("profile", "score"): return {**primary, "is_consistent": True, "consistency_score": 1.0} if primary.get("fallback_used"): return {**primary, "is_consistent": True, "consistency_score": 0.5} # MiniMax with reasoning = inherently more consistent if primary.get("model") == "MiniMax M2.7" and primary.get("reasoning"): return {**primary, "is_consistent": True, "consistency_score": 0.95} secondary = await call_llm(operation, system_prompt, user_prompt, temperature=0.4, trace_id=trace_id, company_id=company_id) score = _compare_outputs(primary.get("parsed"), secondary.get("parsed")) return {**primary, "is_consistent": score >= 0.75, "consistency_score": score} def _compare_outputs(a: dict, b: dict) -> float: if not a or not b: return 0.5 matches = 0 total = 0 for key in ["ai_readiness", "tier", "service_match"]: if key in a and key in b: total += 1 if a[key] == b[key]: matches += 1 for key in ["total_score", "company_fit"]: av = a.get(key) bv = b.get(key) if isinstance(av, (int, float)) and isinstance(bv, (int, float)): total += 1 if abs(av - bv) <= 10: matches += 1 return matches / total if total > 0 else 1.0 # ─── Helpers ───────────────────────────────────────────────── def _safe_parse_json(text: str) -> Optional[dict]: content = text.strip() if "```json" in content: content = content.split("```json")[1].split("```")[0].strip() elif "```" in content: content = content.split("```")[1].split("```")[0].strip() try: return json.loads(content) except json.JSONDecodeError: import re match = re.search(r'\{[\s\S]*\}', content) if match: try: return json.loads(match.group()) except json.JSONDecodeError: return None return None async def _log_trace(trace_id, operation, model, result, success, company_id): try: import httpx url = f"{settings.SUPABASE_URL}/rest/v1/llm_traces" headers = { "apikey": settings.SUPABASE_SERVICE_ROLE_KEY, "Authorization": f"Bearer {settings.SUPABASE_SERVICE_ROLE_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal" } payload = { "trace_id": trace_id, "operation": operation, "model": model, "provider": "nvidia", "prompt_tokens": result["tokens"]["prompt"] if result else 0, "completion_tokens": result["tokens"]["completion"] if result else 0, "total_tokens": result["tokens"]["total"] if result else 0, "latency_ms": result.get("latency_ms", 0) if result else 0, "success": success, "fallback_used": result.get("fallback_used", True) if result else True, "company_id": company_id, } async with httpx.AsyncClient() as client: await client.post(url, json=payload, headers=headers) except Exception as e: logger.debug(f"Trace log failed (non-critical): {e}") async def _async_sleep(seconds: int): import asyncio await asyncio.sleep(seconds)