BATUTO_X / core_llm.py
BATUTO-ART's picture
Upload core_llm.py
e56a56e verified
"""
CORE LLM MEJORADO - SAMBANOVA ENTERPRISE
- Timeouts configurados
- Retry automático
- Streaming optimizado
- Métricas de calidad
"""
import os
import time
import json
from typing import Dict, List, Optional, Generator, Any, Union
from dataclasses import dataclass
from enum import Enum
import asyncio
import aiohttp
from openai import OpenAI, AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
# ===================== CONFIGURACIÓN =====================
SAMBANOVA_BASE = "https://api.sambanova.ai/v1"
DEFAULT_TIMEOUT = 60.0
MAX_RETRIES = 3
RETRY_DELAY = 1.0
# ===================== MODELOS ENTERPRISE =====================
ENTERPRISE_MODELS = {
"Scout": {
"id": "Llama-4-Maverick-17B-128E-Instruct",
"context_window": 131072,
"vision": False,
"strengths": ["general", "reasoning", "code"],
"optimal_temperature": 0.1
},
"Heavy": {
"id": "Meta-Llama-3.3-70B-Instruct",
"context_window": 131072,
"vision": False,
"strengths": ["complex reasoning", "analysis", "strategy"],
"optimal_temperature": 0.05
},
"Coder": {
"id": "Qwen2.5-Coder-32B-Instruct",
"context_window": 32768,
"vision": False,
"strengths": ["programming", "debugging", "architecture"],
"optimal_temperature": 0.01
},
"Vision": {
"id": "Meta-Llama-3.2-11B-Vision-Instruct",
"context_window": 131072,
"vision": True,
"strengths": ["image analysis", "visual reasoning"],
"optimal_temperature": 0.1
},
"Reasoner": {
"id": "DeepSeek-R1",
"context_window": 128000,
"vision": False,
"strengths": ["step-by-step reasoning", "mathematics", "logic"],
"optimal_temperature": 0.3
},
"Fast": {
"id": "Llama-4-Scout-17B-16E-Instruct",
"context_window": 131072,
"vision": False,
"strengths": ["quick responses", "general purpose"],
"optimal_temperature": 0.2
}
}
# ===================== CLIENTE MEJORADO =====================
class SambanovaClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.sync_client = OpenAI(
base_url=SAMBANOVA_BASE,
api_key=api_key,
timeout=DEFAULT_TIMEOUT,
max_retries=MAX_RETRIES
)
self.async_client = AsyncOpenAI(
base_url=SAMBANOVA_BASE,
api_key=api_key,
timeout=DEFAULT_TIMEOUT,
max_retries=MAX_RETRIES
)
self.request_count = 0
self.error_count = 0
self.avg_latency = 0.0
def get_model_info(self, model_key: str) -> Optional[Dict]:
"""Obtiene información detallada del modelo"""
if model_key in ENTERPRISE_MODELS:
return ENTERPRISE_MODELS[model_key]
# Buscar por ID si no coincide la clave
for key, info in ENTERPRISE_MODELS.items():
if info["id"] == model_key:
return info
return None
def chat_completion(
self,
model: str,
messages: List[ChatCompletionMessageParam],
temperature: float = 0.1,
top_p: float = 0.9,
max_tokens: int = 4096,
stream: bool = True,
**kwargs
):
"""Completación de chat con métricas"""
start_time = time.time()
self.request_count += 1
try:
# Ajustar parámetros según modelo
model_info = self.get_model_info(model)
if model_info:
temperature = kwargs.get('temperature', model_info.get('optimal_temperature', temperature))
response = self.sync_client.chat.completions.create(
model=model if model in [v["id"] for v in ENTERPRISE_MODELS.values()]
else self.get_model_info(model)["id"],
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stream=stream,
**{k: v for k, v in kwargs.items() if k not in ['model', 'messages', 'stream']}
)
latency = time.time() - start_time
self.avg_latency = (self.avg_latency * (self.request_count - 1) + latency) / self.request_count
return response
except Exception as e:
self.error_count += 1
latency = time.time() - start_time
raise
async def async_chat_completion(
self,
model: str,
messages: List[ChatCompletionMessageParam],
temperature: float = 0.1,
top_p: float = 0.9,
max_tokens: int = 4096,
stream: bool = True,
**kwargs
):
"""Versión asíncrona"""
start_time = time.time()
self.request_count += 1
try:
model_info = self.get_model_info(model)
if model_info:
temperature = kwargs.get('temperature', model_info.get('optimal_temperature', temperature))
response = await self.async_client.chat.completions.create(
model=model if model in [v["id"] for v in ENTERPRISE_MODELS.values()]
else self.get_model_info(model)["id"],
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stream=stream,
**{k: v for k, v in kwargs.items() if k not in ['model', 'messages', 'stream']}
)
latency = time.time() - start_time
self.avg_latency = (self.avg_latency * (self.request_count - 1) + latency) / self.request_count
return response
except Exception as e:
self.error_count += 1
latency = time.time() - start_time
raise
def get_metrics(self) -> Dict[str, Any]:
"""Obtiene métricas del cliente"""
return {
"request_count": self.request_count,
"error_count": self.error_count,
"success_rate": 1.0 - (self.error_count / max(self.request_count, 1)),
"average_latency": self.avg_latency,
"models_available": len(ENTERPRISE_MODELS)
}
# ===================== FUNCIÓN PRINCIPAL MEJORADA =====================
def create_sambanova_client(api_key: Optional[str] = None) -> SambanovaClient:
"""Crea y retorna cliente Sambanova"""
key = api_key or os.getenv("SAMBANOVA_API_KEY_2")
if not key:
raise RuntimeError(
"❌ Falta SAMBANOVA_API_KEY_2. Configura la variable de entorno o pasa la clave directamente."
)
return SambanovaClient(key)
def chat_sambanova_optimized(
api_key: str,
model: str,
messages: List[ChatCompletionMessageParam],
temperature: float = 0.1,
top_p: float = 0.9,
max_tokens: int = 4096,
stream: bool = True,
retry_on_failure: bool = True,
**kwargs
):
"""
Función optimizada con manejo de errores y retry
"""
client = create_sambanova_client(api_key)
for attempt in range(MAX_RETRIES if retry_on_failure else 1):
try:
return client.chat_completion(
model=model,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stream=stream,
**kwargs
)
except Exception as e:
if attempt == MAX_RETRIES - 1 or not retry_on_failure:
raise
time.sleep(RETRY_DELAY * (attempt + 1))
continue
# ===================== STREAMING OPTIMIZADO =====================
def stream_response_generator(
api_key: str,
model: str,
messages: List[ChatCompletionMessageParam],
**kwargs
) -> Generator[str, None, Dict]:
"""
Generador optimizado para streaming con métricas
"""
start_time = time.time()
full_response = ""
try:
stream = chat_sambanova_optimized(
api_key=api_key,
model=model,
messages=messages,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
metrics = {
"success": True,
"total_time": time.time() - start_time,
"response_length": len(full_response),
"model_used": model,
"final_response": full_response
}
yield metrics
except Exception as e:
error_metrics = {
"success": False,
"error": str(e),
"total_time": time.time() - start_time,
"response_length": len(full_response),
"model_attempted": model
}
yield error_metrics