Nuclear-Intelligence / core /llm_engine.py
QalamHipHop
Fix: Update AIMLAPI key and optimize thresholds for HF Spaces
c12d3ca
Raw
History Blame Contribute Delete
44.1 kB
"""
Nuclear Intelligence v4.0 - Ultra-LLM Engine ⚡
═══════════════════════════════════════════════════════════════════
Multi-Provider LLM with FREE providers chain:
DeepSeek → Groq → Gemini → Together → Fireworks → Cerebras → OpenRouter → Cloudflare → HuggingFace
Features:
- Intelligent routing with automatic fallback
- Response caching (LRU)
- Per-provider rate limiting
- Health monitoring & automatic degradation
- HuggingFace Inference API + Hub support
- Structured JSON completion
- Token usage tracking
═══════════════════════════════════════════════════════════════════
"""
import os
import json
import time
import hashlib
import asyncio
import threading
from typing import List, Dict, Any, Optional, Tuple, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from loguru import logger
from collections import defaultdict
import re
# ─── Provider Definitions ───────────────────────────────────────
@dataclass
class LLMProvider:
name: str
api_key_env: str
api_base: str
default_model: str
priority: int
enabled: bool = True
rate_limit_rpm: int = 60
supports_structured: bool = True
max_tokens: int = 4096
context_window: int = 8192
description: str = ""
requires_account_id: bool = False
is_free_tier: bool = True
PROVIDERS: Dict[str, LLMProvider] = {
# ─── Tier 1: Best Free Models ⭐⭐⭐ ─────────────────────────
"deepseek": LLMProvider(
"DeepSeek V3", "DEEPSEEK_API_KEY",
"https://api.deepseek.com/v1",
"deepseek-chat", 1,
rate_limit_rpm=60, max_tokens=64000, context_window=128000,
description="🟢 DeepSeek V3 - Best free model, 128K context"
),
"groq": LLMProvider(
"Groq LPU", "GROQ_API_KEY",
"https://api.groq.com/openai/v1",
"llama-3.3-70b-versatile", 2,
rate_limit_rpm=60, max_tokens=8192, context_window=128000,
description="⚡ Groq - Fastest inference, Llama 3.3 70B"
),
"cerebras": LLMProvider(
"Cerebras", "CEREBRAS_API_KEY",
"https://api.cerebras.ai/v1",
"llama-3.3-70b", 3,
rate_limit_rpm=60, max_tokens=4096, context_window=8192,
description="🔵 Cerebras - Ultra-fast inference, free tier"
),
# ─── Tier 2: Good Free Models ⭐⭐ ─────────────────────────
"gemini": LLMProvider(
"Google Gemini", "GEMINI_API_KEY",
"https://generativelanguage.googleapis.com/v1beta",
"gemini-2.0-flash", 4,
rate_limit_rpm=60, max_tokens=8192, context_window=1000000,
description="🔵 Gemini 2.0 Flash - Google's best free model"
),
"fireworks": LLMProvider(
"Fireworks AI", "FIREWORKS_API_KEY",
"https://api.fireworks.ai/inference/v1",
"accounts/fireworks/models/deepseek-v3-0324", 5,
rate_limit_rpm=30, max_tokens=32000, context_window=128000,
description="🟣 Fireworks AI - DeepSeek V3, fast inference"
),
"together": LLMProvider(
"Together AI", "TOGETHER_API_KEY",
"https://api.together.xyz/v1",
"meta-llama/Llama-3.3-70B-Instruct-Turbo", 6,
rate_limit_rpm=30, max_tokens=4096, context_window=128000,
description="🟡 Together - Llama 3.3 70B, good quality"
),
"novita": LLMProvider(
"Novita AI", "NOVITA_API_KEY",
"https://api.novita.ai/v3",
"deepseek-ai/DeepSeek-V3", 7,
rate_limit_rpm=30, max_tokens=64000, context_window=128000,
description="🟣 Novita - DeepSeek V3, 128K context"
),
# ─── Tier 3: Fallback Models ⭐ ─────────────────────────────
"openrouter": LLMProvider(
"OpenRouter", "OPENROUTER_API_KEY",
"https://openrouter.ai/api/v1",
"deepseek/deepseek-chat-v3:free", 8,
rate_limit_rpm=40, max_tokens=4096, context_window=64000,
description="🟠 OpenRouter - Multiple free models"
),
"cloudflare": LLMProvider(
"Cloudflare AI", "CLOUDFLARE_API_KEY",
"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run",
"@cf/meta/llama-3.1-8b-instruct", 9,
rate_limit_rpm=1000, max_tokens=4096, context_window=8192,
description="⚪ Cloudflare - Workers AI, free daily",
requires_account_id=True
),
# ─── Tier 4: HuggingFace Inference ⭐ ───────────────────────
"huggingface": LLMProvider(
"HuggingFace", "HF_TOKEN",
"https://api-inference.huggingface.co/models",
"Qwen/Qwen2.5-72B-Instruct", 10,
rate_limit_rpm=30, max_tokens=2048, context_window=32768,
description="🟤 HuggingFace Inference API - Your token active!"
),
"aimlapi": LLMProvider(
"AIMLAPI", "AIMLAPI_API_KEY",
"https://api.aimlapi.com/v1",
"gpt-4o", 0,
rate_limit_rpm=100, max_tokens=4096, context_window=128000,
description="🔵 AIMLAPI - GPT-4o Power"
),
}
# ─── LRU Cache ──────────────────────────────────────────────────
class LRUCache:
"""Thread-safe LRU cache for LLM responses"""
def __init__(self, max_size: int = 200):
self.cache: Dict[str, Tuple[Any, datetime]] = {}
self.max_size = max_size
self.hits = 0
self.misses = 0
self._lock = threading.Lock()
def _make_key(self, prompt: str, model: str, temperature: float) -> str:
# Normalize prompt to first 500 chars for cache key
normalized = prompt[:500].strip().lower()
return hashlib.sha256(f"{normalized}:{model}:{temperature:.2f}".encode()).hexdigest()
def get(self, prompt: str, model: str, temperature: float) -> Optional[Any]:
key = self._make_key(prompt, model, temperature)
with self._lock:
if key in self.cache:
value, timestamp = self.cache[key]
if datetime.now() - timestamp < timedelta(hours=2):
self.hits += 1
return value
del self.cache[key]
self.misses += 1
return None
def set(self, prompt: str, model: str, temperature: float, value: Any):
key = self._make_key(prompt, model, temperature)
with self._lock:
if len(self.cache) >= self.max_size:
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
self.cache[key] = (value, datetime.now())
def stats(self) -> Dict:
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{(self.hits / max(total, 1) * 100):.1f}%",
"size": len(self.cache),
"max_size": self.max_size,
}
def clear(self):
with self._lock:
self.cache.clear()
self.hits = 0
self.misses = 0
# ─── Rate Limiter ────────────────────────────────────────────────
class RateLimiter:
"""Per-provider rate limiting with sliding window"""
def __init__(self):
self.counters: Dict[str, List[datetime]] = defaultdict(list)
self._lock = threading.Lock()
def can_request(self, provider: str, rpm: int) -> bool:
now = datetime.now()
with self._lock:
self.counters[provider] = [
t for t in self.counters[provider] if now - t < timedelta(minutes=1)
]
return len(self.counters[provider]) < rpm
def record(self, provider: str):
now = datetime.now()
with self._lock:
self.counters[provider].append(now)
def get_status(self, provider: str, rpm: int) -> Dict:
now = datetime.now()
with self._lock:
recent = [t for t in self.counters[provider] if now - t < timedelta(minutes=1)]
return {
"requests_in_window": len(recent),
"limit": rpm,
"available": rpm - len(recent),
"percentage": f"{(len(recent) / max(rpm, 1) * 100):.0f}%",
}
def clear(self, provider: str = None):
with self._lock:
if provider:
self.counters[provider] = []
else:
self.counters.clear()
# ─── Main LLM Engine ─────────────────────────────────────────────
class LLMEngine:
"""Ultra-advanced multi-provider LLM engine with caching, rate limiting, and intelligent routing"""
def __init__(
self,
provider_chain: Optional[List[str]] = None,
default_temperature: float = 0.7,
enable_caching: bool = True,
enable_rate_limiting: bool = True,
):
# Priority order: Best free models first
self.provider_chain = provider_chain or [
"aimlapi", "deepseek", "groq", "cerebras", "gemini", "fireworks",
"together", "novita", "openrouter", "cloudflare", "huggingface"
]
self._current_provider_name = "aimlapi" if "aimlapi" in self.provider_chain else None
self.default_temperature = default_temperature
self.enable_caching = enable_caching
self.enable_rate_limiting = enable_rate_limiting
self._available_providers: List[str] = []
self._stats: Dict[str, Any] = {
"requests": 0, "successes": 0, "failures": 0,
"by_provider": defaultdict(int),
"total_tokens_used": 0,
"cache_stats": {},
}
self._current_provider_name: Optional[str] = None
self._last_error: Optional[str] = None
self._provider_health: Dict[str, Dict] = {}
self.cache = LRUCache(max_size=200)
self.rate_limiter = RateLimiter()
self._init_providers()
def _init_providers(self):
"""Check which providers have valid API keys"""
# Explicitly set AIMLAPI key if not in environment
if not os.getenv("AIMLAPI_API_KEY"):
os.environ["AIMLAPI_API_KEY"] = "bd510ec538561ec582dc003b6070cf6d"
logger.info("🔍 Checking LLM providers...")
available = []
for name in self.provider_chain:
if name not in PROVIDERS:
continue
provider = PROVIDERS[name]
api_key = os.getenv(provider.api_key_env, "").strip()
# Skip placeholder/invalid keys
if not api_key or api_key in ("", "placeholder", "your_key_here", "hf_placeholder"):
logger.debug(f"⏭️ {provider.name} - No API key")
continue
# Skip if requires account_id but it's missing
if provider.requires_account_id:
account_id = os.getenv("CLOUDFLARE_ACCOUNT_ID", "").strip()
if not account_id:
logger.debug(f"⏭️ {provider.name} - No account ID")
continue
# Validate key format
if self._validate_key(name, api_key):
available.append(name)
self._provider_health[name] = {
"status": "available",
"last_used": None,
"failures": 0,
"avg_latency": 0,
"total_calls": 0,
}
logger.info(f"✅ {provider.description}")
else:
logger.warning(f"⚠️ {provider.name} - Invalid API key format")
self._available_providers = available
# If no providers, try HF token as fallback
if not self._available_providers:
hf_key = os.getenv("HF_TOKEN", "").strip()
if hf_key and (hf_key.startswith("hf_") or len(hf_key) > 20):
self._available_providers = ["huggingface"]
logger.info("🔄 Falling back to HuggingFace inference API")
else:
logger.error("❌ No LLM providers available! Add at least one API key to .env")
else:
logger.info(f"🎯 {len(self._available_providers)} providers ready: {', '.join(self._available_providers)}")
def _validate_key(self, provider: str, api_key: str) -> bool:
"""Provider-specific key format validation"""
if len(api_key) < 10:
return False
if provider == "groq" and not api_key.startswith("gsk_"):
return False
if provider == "deepseek" and not (api_key.startswith("sk-") or api_key.startswith("ghp_")):
return False
if provider == "huggingface" and not api_key.startswith("hf_"):
return False
if provider == "aimlapi" and len(api_key) < 20:
return False
return True
@property
def _current_provider(self):
return self._current_provider_name
def _get_best_provider(self) -> Optional[str]:
"""Get the best available provider based on priority and health"""
for name in self._available_providers:
health = self._provider_health.get(name, {})
if health.get("failures", 0) < 3:
return name
return self._available_providers[0] if self._available_providers else None
# ─── Provider-specific API Calls ───────────────────────────
def _call_deepseek(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""DeepSeek API - Best free model with 128K context"""
try:
import requests
provider = PROVIDERS["deepseek"]
api_key = os.getenv(provider.api_key_env)
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {
"model": model or provider.default_model,
"messages": messages,
"temperature": temperature,
"max_tokens": min(max_tokens, provider.max_tokens),
"stream": False
}
resp = requests.post(
f"{provider.api_base}/chat/completions",
headers=headers, json=payload, timeout=180
)
if resp.status_code == 200:
data = resp.json()
usage = data.get("usage", {})
return {
"content": data["choices"][0]["message"]["content"],
"provider": "deepseek",
"model": data.get("model"),
"usage": {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
"finish_reason": data["choices"][0].get("finish_reason"),
}
elif resp.status_code == 429:
self._provider_health["deepseek"]["failures"] = self._provider_health.get("deepseek", {}).get("failures", 0) + 1
logger.warning("⏳ DeepSeek rate limited")
else:
logger.warning(f"⚠️ DeepSeek error: {resp.status_code} - {resp.text[:100]}")
except Exception as e:
logger.warning(f"⚠️ DeepSeek call failed: {e}")
return None
def _call_groq(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Groq API - Fastest inference"""
try:
from openai import OpenAI
provider = PROVIDERS["groq"]
api_key = os.getenv(provider.api_key_env)
client = OpenAI(api_key=api_key, base_url=provider.api_base)
response = client.chat.completions.create(
model=model or provider.default_model,
messages=messages,
temperature=temperature,
max_tokens=min(max_tokens, provider.max_tokens),
)
usage = response.usage or {}
return {
"content": response.choices[0].message.content,
"provider": "groq",
"model": response.model,
"usage": {
"prompt_tokens": usage.prompt_tokens or 0,
"completion_tokens": usage.completion_tokens or 0,
"total_tokens": usage.total_tokens or 0,
},
"finish_reason": response.choices[0].finish_reason,
}
except Exception as e:
logger.warning(f"⚠️ Groq failed: {e}")
return None
def _call_cerebras(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Cerebras API - Ultra-fast inference"""
try:
from openai import OpenAI
provider = PROVIDERS["cerebras"]
api_key = os.getenv(provider.api_key_env)
client = OpenAI(api_key=api_key, base_url=provider.api_base)
response = client.chat.completions.create(
model=model or provider.default_model,
messages=messages,
temperature=temperature,
max_tokens=min(max_tokens, provider.max_tokens),
)
usage = response.usage or {}
return {
"content": response.choices[0].message.content,
"provider": "cerebras",
"model": response.model,
"usage": {
"prompt_tokens": usage.prompt_tokens or 0,
"completion_tokens": usage.completion_tokens or 0,
"total_tokens": usage.total_tokens or 0,
},
"finish_reason": response.choices[0].finish_reason,
}
except Exception as e:
logger.warning(f"⚠️ Cerebras failed: {e}")
return None
def _call_gemini(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Google Gemini API"""
try:
import requests
provider = PROVIDERS["gemini"]
api_key = os.getenv(provider.api_key_env)
actual_model = model or provider.default_model
contents = []
for msg in messages:
role = "model" if msg["role"] == "assistant" else "user"
contents.append({"role": role, "parts": [{"text": msg["content"]}]})
payload = {
"contents": contents,
"generationConfig": {
"temperature": temperature,
"maxOutputTokens": min(max_tokens, 8192),
}
}
resp = requests.post(
f"{provider.api_base}/models/{actual_model}:generateContent?key={api_key}",
json=payload, timeout=120
)
if resp.status_code == 200:
data = resp.json()
content = data["candidates"][0]["content"]["parts"][0]["text"]
return {
"content": content,
"provider": "gemini",
"model": actual_model,
"usage": {"total_tokens": len(content) // 4},
"finish_reason": "stop",
}
else:
logger.warning(f"⚠️ Gemini error: {resp.status_code} - {resp.text[:200]}")
except Exception as e:
logger.warning(f"⚠️ Gemini failed: {e}")
return None
def _call_fireworks(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Fireworks AI API"""
try:
from openai import OpenAI
provider = PROVIDERS["fireworks"]
client = OpenAI(api_key=os.getenv(provider.api_key_env), base_url=provider.api_base)
response = client.chat.completions.create(
model=model or provider.default_model,
messages=messages,
temperature=temperature,
max_tokens=min(max_tokens, provider.max_tokens),
)
usage = response.usage or {}
return {
"content": response.choices[0].message.content,
"provider": "fireworks",
"model": response.model,
"usage": {
"prompt_tokens": usage.prompt_tokens or 0,
"completion_tokens": usage.completion_tokens or 0,
"total_tokens": usage.total_tokens or 0,
},
"finish_reason": response.choices[0].finish_reason,
}
except Exception as e:
logger.warning(f"⚠️ Fireworks failed: {e}")
return None
def _call_together(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Together AI API"""
try:
from openai import OpenAI
provider = PROVIDERS["together"]
client = OpenAI(api_key=os.getenv(provider.api_key_env), base_url=provider.api_base)
response = client.chat.completions.create(
model=model or provider.default_model,
messages=messages,
temperature=temperature,
max_tokens=min(max_tokens, provider.max_tokens),
)
usage = response.usage or {}
return {
"content": response.choices[0].message.content,
"provider": "together",
"model": response.model,
"usage": {
"prompt_tokens": usage.prompt_tokens or 0,
"completion_tokens": usage.completion_tokens or 0,
"total_tokens": usage.total_tokens or 0,
},
"finish_reason": response.choices[0].finish_reason,
}
except Exception as e:
logger.warning(f"⚠️ Together failed: {e}")
return None
def _call_novita(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Novita AI API"""
try:
import requests
provider = PROVIDERS["novita"]
api_key = os.getenv(provider.api_key_env)
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {
"model": model or provider.default_model,
"messages": messages,
"temperature": temperature,
"max_tokens": min(max_tokens, provider.max_tokens),
}
resp = requests.post(
f"{provider.api_base}/chat/completions",
headers=headers, json=payload, timeout=180
)
if resp.status_code == 200:
data = resp.json()
return {
"content": data["choices"][0]["message"]["content"],
"provider": "novita",
"model": data.get("model"),
"usage": data.get("usage", {}),
"finish_reason": data["choices"][0].get("finish_reason"),
}
else:
logger.warning(f"⚠️ Novita error: {resp.status_code}")
except Exception as e:
logger.warning(f"⚠️ Novita failed: {e}")
return None
def _call_openrouter(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""OpenRouter API"""
try:
from openai import OpenAI
provider = PROVIDERS["openrouter"]
client = OpenAI(api_key=os.getenv(provider.api_key_env), base_url=provider.api_base)
response = client.chat.completions.create(
model=model or provider.default_model,
messages=messages,
temperature=temperature,
max_tokens=min(max_tokens, provider.max_tokens),
)
usage = response.usage or {}
return {
"content": response.choices[0].message.content,
"provider": "openrouter",
"model": response.model,
"usage": {
"prompt_tokens": usage.prompt_tokens or 0,
"completion_tokens": usage.completion_tokens or 0,
"total_tokens": usage.total_tokens or 0,
},
"finish_reason": response.choices[0].finish_reason,
}
except Exception as e:
logger.warning(f"⚠️ OpenRouter failed: {e}")
return None
def _call_cloudflare(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""Cloudflare Workers AI"""
try:
import requests
account_id = os.getenv("CLOUDFLARE_ACCOUNT_ID", "")
api_key = os.getenv("CLOUDFLARE_API_KEY", "")
if not account_id or not api_key:
return None
provider = PROVIDERS["cloudflare"]
headers = {"Authorization": f"Bearer {api_key}"}
prompt = "\n".join([
f"{'System' if m['role']=='system' else m['role'].capitalize()}: {m['content']}"
for m in messages
]) + "\nAssistant:"
resp = requests.post(
provider.api_base.replace("{account_id}", account_id) + f"/{model or provider.default_model}",
headers=headers,
json={"messages": [{"role": "user", "content": prompt}], "max_tokens": min(max_tokens, 4096)},
timeout=60
)
if resp.status_code == 200:
data = resp.json()
return {
"content": data.get("result", {}).get("response", ""),
"provider": "cloudflare",
"model": model or provider.default_model,
"usage": {},
"finish_reason": "stop",
}
except Exception as e:
logger.warning(f"⚠️ Cloudflare failed: {e}")
return None
def _call_aimlapi(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""AIMLAPI - GPT-4o Power"""
try:
from openai import OpenAI
provider = PROVIDERS["aimlapi"]
api_key = os.getenv(provider.api_key_env)
client = OpenAI(api_key=api_key, base_url=provider.api_base)
response = client.chat.completions.create(
model=model or provider.default_model,
messages=messages,
temperature=temperature,
max_tokens=min(max_tokens, provider.max_tokens),
)
usage = response.usage or {}
return {
"content": response.choices[0].message.content,
"provider": "aimlapi",
"model": response.model,
"usage": {
"prompt_tokens": usage.prompt_tokens or 0,
"completion_tokens": usage.completion_tokens or 0,
"total_tokens": usage.total_tokens or 0,
},
"finish_reason": response.choices[0].finish_reason,
}
except Exception as e:
logger.warning(f"⚠️ AIMLAPI failed: {e}")
return None
def _call_huggingface(self, messages: List[Dict], model: str, temperature: float, max_tokens: int) -> Optional[Dict]:
"""HuggingFace Inference API - Enhanced with better model detection"""
try:
import requests
provider = PROVIDERS["huggingface"]
api_key = os.getenv(provider.api_key_env)
hf_model = model or provider.default_model
# Build prompt
prompt = self._build_hf_prompt(messages)
headers = {"Authorization": f"Bearer {api_key}"}
# Try chat completions format first (for chat models)
payload = {
"inputs": prompt,
"parameters": {
"temperature": temperature,
"max_new_tokens": min(max_tokens, provider.max_tokens),
"return_full_text": False,
"do_sample": temperature > 0.1,
},
"options": {
"wait_for_model": True,
"use_cache": True,
}
}
resp = requests.post(
f"{provider.api_base}/{hf_model}",
headers=headers,
json=payload,
timeout=180
)
if resp.status_code == 200:
result = resp.json()
content = ""
if isinstance(result, list) and len(result) > 0:
content = result[0].get("generated_text", "")
elif isinstance(result, dict):
content = result.get("generated_text", str(result))
else:
content = str(result)
# Clean up response
content = content.strip()
return {
"content": content,
"provider": "huggingface",
"model": hf_model,
"usage": {"total_tokens": len(prompt) // 4 + len(content) // 4},
"finish_reason": "stop",
}
elif resp.status_code == 503:
# Model loading - try again with wait
logger.info("⏳ HuggingFace model loading, waiting...")
time.sleep(10)
resp2 = requests.post(
f"{provider.api_base}/{hf_model}",
headers=headers,
json={**payload, "options": {"wait_for_model": True}},
timeout=180
)
if resp2.status_code == 200:
result = resp2.json()
content = result[0].get("generated_text", "") if isinstance(result, list) else str(result)
return {
"content": content.strip(),
"provider": "huggingface",
"model": hf_model,
"usage": {},
"finish_reason": "stop",
}
else:
logger.warning(f"⚠️ HuggingFace error: {resp.status_code} - {resp.text[:150]}")
except Exception as e:
logger.warning(f"⚠️ HuggingFace failed: {e}")
return None
def _build_hf_prompt(self, messages: List[Dict]) -> str:
"""Build HF-compatible prompt from messages"""
parts = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
parts.append(f"System: {content}")
elif role == "user":
parts.append(f"User: {content}")
elif role == "assistant":
parts.append(f"Assistant: {content}")
parts.append("Assistant:")
return "\n".join(parts)
# ─── Provider Dispatcher ─────────────────────────────────────
def _call_provider(self, provider_name: str, messages: List[Dict], model: Optional[str], temperature: float, max_tokens: int) -> Optional[Dict]:
"""Dispatch to the appropriate provider"""
if self.enable_rate_limiting:
provider = PROVIDERS.get(provider_name)
if provider and not self.rate_limiter.can_request(provider_name, provider.rate_limit_rpm):
logger.debug(f"⏳ Rate limited: {provider_name}")
return None
start_time = time.time()
calls = {
"deepseek": lambda: self._call_deepseek(messages, model, temperature, max_tokens),
"groq": lambda: self._call_groq(messages, model, temperature, max_tokens),
"cerebras": lambda: self._call_cerebras(messages, model, temperature, max_tokens),
"gemini": lambda: self._call_gemini(messages, model, temperature, max_tokens),
"fireworks": lambda: self._call_fireworks(messages, model, temperature, max_tokens),
"together": lambda: self._call_together(messages, model, temperature, max_tokens),
"novita": lambda: self._call_novita(messages, model, temperature, max_tokens),
"openrouter": lambda: self._call_openrouter(messages, model, temperature, max_tokens),
"cloudflare": lambda: self._call_cloudflare(messages, model, temperature, max_tokens),
"huggingface": lambda: self._call_huggingface(messages, model, temperature, max_tokens),
"aimlapi": lambda: self._call_aimlapi(messages, model, temperature, max_tokens),
}
result = calls.get(provider_name, lambda: None)()
latency = time.time() - start_time
if result:
if self.enable_rate_limiting:
self.rate_limiter.record(provider_name)
# Update health with latency tracking
health = self._provider_health.get(provider_name, {})
total_calls = health.get("total_calls", 0) + 1
avg_lat = health.get("avg_latency", 0)
new_avg = (avg_lat * (total_calls - 1) + latency) / total_calls
self._provider_health[provider_name] = {
"status": "active",
"last_used": datetime.now().isoformat(),
"failures": 0,
"avg_latency": round(new_avg, 2),
"total_calls": total_calls,
}
return result
def _call_with_fallback(
self,
messages: List[Dict],
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: int = 4096,
retry_count: int = 2,
) -> Optional[Dict]:
"""Try providers in priority order with automatic fallback"""
temperature = temperature if temperature is not None else self.default_temperature
# Check cache first
if self.enable_caching:
cache_key_prompt = messages[-1]["content"] if messages else ""
if len(cache_key_prompt) < 1000:
cached = self.cache.get(cache_key_prompt, model or "default", temperature)
if cached:
logger.debug("📦 Cache hit!")
self._stats["cache_hits"] = self._stats.get("cache_hits", 0) + 1
return cached
errors = []
for provider_name in self._available_providers:
health = self._provider_health.get(provider_name, {})
if health.get("failures", 0) > 5:
continue # Skip degraded providers
for attempt in range(retry_count):
result = self._call_provider(provider_name, messages, model, temperature, max_tokens)
if result and result.get("content"):
self._stats["requests"] += 1
self._stats["successes"] += 1
self._stats["by_provider"][provider_name] += 1
usage = result.get("usage", {})
if isinstance(usage, dict):
total = usage.get("total_tokens", 0)
else:
total = 0
self._stats["total_tokens_used"] += total
self._current_provider_name = provider_name
if self.enable_caching and cache_key_prompt:
self.cache.set(cache_key_prompt, model or "default", temperature, result)
return result
errors.append(f"{provider_name}: attempt {attempt + 1} failed")
current_failures = self._provider_health.get(provider_name, {}).get("failures", 0) + 1
self._provider_health[provider_name] = {
"status": "degraded",
"last_error": datetime.now().isoformat(),
"failures": current_failures,
}
time.sleep(1)
self._stats["failures"] += 1
self._last_error = "; ".join(errors[-3:])
logger.error(f"❌ All LLM providers failed: {self._last_error}")
return None
# ─── Public API ─────────────────────────────────────────────
def chat(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: int = 4096,
) -> Optional[str]:
"""Simple chat completion"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
result = self._call_with_fallback(messages, temperature=temperature, max_tokens=max_tokens)
return result["content"] if result else None
def structured_completion(
self,
prompt: str,
system_prompt: str,
response_format: str = "json",
temperature: Optional[float] = None,
max_tokens: int = 4096,
) -> Optional[Dict]:
"""JSON-structured completion with automatic parsing"""
import re
# Add JSON enforcement to system prompt
json_instruction = (
"\n\nIMPORTANT: Return ONLY valid JSON matching the schema exactly. "
"No markdown code blocks, no explanation, no text before or after the JSON."
'Start with "{" and end with "}".'
)
messages = [
{"role": "system", "content": system_prompt + json_instruction},
{"role": "user", "content": prompt},
]
result = self._call_with_fallback(messages, temperature=temperature or 0.2, max_tokens=max_tokens)
if not result:
return None
content = result["content"].strip()
# Parse JSON
parsed = self._parse_json_response(content)
if parsed:
return {
"text": content,
"parsed": parsed,
"provider": result["provider"],
"model": result.get("model", ""),
"usage": result.get("usage", {}),
"parse_error": False,
}
# Try to extract JSON from text
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
try:
parsed = json.loads(json_match.group())
return {
"text": content,
"parsed": parsed,
"provider": result["provider"],
"model": result.get("model", ""),
"parse_error": False,
}
except:
pass
return {
"text": content,
"parsed": {},
"provider": result["provider"],
"model": result.get("model", ""),
"parse_error": True,
"error_detail": "Failed to parse JSON from response",
}
def _parse_json_response(self, text: str) -> Optional[Dict]:
"""Parse JSON from LLM response with multiple strategies"""
content = text.strip()
# Remove markdown code blocks
for marker in ["```json", "```yaml", "```"]:
if marker in content:
parts = content.split(marker)
if len(parts) >= 3:
content = parts[1].strip()
elif len(parts) == 2:
content = parts[1].replace("```", "").strip()
break
# Try direct JSON parse
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# Try finding JSON object in text
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
try:
return json.loads(json_match.group())
except:
pass
return None
def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive engine statistics"""
cache_stats = self.cache.stats()
total = self._stats["requests"]
return {
"requests": total,
"successes": self._stats["successes"],
"failures": self._stats["failures"],
"by_provider": dict(self._stats["by_provider"]),
"total_tokens_used": self._stats["total_tokens_used"],
"success_rate": f"{(self._stats['successes'] / max(total, 1) * 100):.1f}%",
"available_providers": self._available_providers,
"current_provider": self._current_provider,
"last_error": self._last_error,
"cache": cache_stats,
"health": self._provider_health,
}
def health_check(self) -> Dict[str, Any]:
"""Health check for all providers"""
providers = {}
for name in self.provider_chain:
if name not in PROVIDERS:
continue
provider = PROVIDERS[name]
health = self._provider_health.get(name, {})
rate_status = self.rate_limiter.get_status(name, provider.rate_limit_rpm) if self.enable_rate_limiting else {}
failures = health.get("failures", 0)
status = "healthy" if failures == 0 else "degraded" if failures < 5 else "unavailable"
providers[provider.name] = {
"configured": name in self._available_providers,
"status": status,
"priority": provider.priority,
"default_model": provider.default_model,
"rate_limit": rate_status,
"last_used": health.get("last_used"),
"avg_latency": health.get("avg_latency"),
"total_requests": self._stats["by_provider"].get(name, 0),
"is_free": provider.is_free_tier,
"context_window": provider.context_window,
}
return {
"providers": providers,
"active_provider": self._current_provider,
"total_available": len(self._available_providers),
"total_configured": len([p for p in self.provider_chain if p in PROVIDERS]),
}
def get_best_available(self) -> Optional[str]:
"""Get the best available provider"""
return self._get_best_provider()
def reset_health(self, provider: str = None):
"""Reset health status for a provider or all providers"""
if provider:
if provider in self._provider_health:
self._provider_health[provider] = {"status": "available", "failures": 0}
else:
for name in self._provider_health:
self._provider_health[name] = {"status": "available", "failures": 0}
def clear_cache(self):
"""Clear the response cache"""
self.cache.clear()
logger.info("🧹 LLM cache cleared")