PARA.AI / llm /llm_manager.py
Carlex22's picture
Revert "ParaAIV3.1"
1f24745
"""
LLM Manager - Gerencia requisições LLM com suporte a batch e paralelo
"""
import os
import logging
import asyncio
from typing import Dict, Any, List, Optional
from enum import Enum
from pathlib import Path
from llm.clients.groq_client import GroqClient
from llm.clients.openai_client import OpenAIClient
from llm.clients.anthropic_client import AnthropicClient
from llm.batch_processor import BatchProcessor, BatchRequest
logger = logging.getLogger(__name__)
class LLMProvider(Enum):
GROQ = "groq"
OPENAI = "openai"
ANTHROPIC = "anthropic"
class RequestMode(Enum):
IMMEDIATE = "immediate" # Execução imediata
BATCH = "batch" # Batch processing
PARALLEL = "parallel" # Paralelo com workers
class LLMManager:
"""Gerenciador central de LLMs com suporte a batch e paralelo"""
def __init__(self):
self.clients: Dict[LLMProvider, Any] = {}
self.batch_processor: Optional[BatchProcessor] = None
self.pending_requests: List[Dict[str, Any]] = []
# Inicializar clients disponíveis
self._init_clients()
# Inicializar batch processor se Groq disponível
if LLMProvider.GROQ in self.clients:
groq_key = os.getenv("GROQ_API_KEY")
work_dir = Path(os.getenv("WORK_DIR", "./work"))
self.batch_processor = BatchProcessor(
api_key=groq_key,
work_dir=work_dir
)
def _init_clients(self):
"""Inicializa clients LLM disponíveis"""
# Groq
groq_key = os.getenv("GROQ_API_KEY")
if groq_key:
try:
self.clients[LLMProvider.GROQ] = GroqClient(api_key=groq_key)
logger.info("✅ GroqClient inicializado")
except Exception as e:
logger.warning(f"⚠️ GroqClient não disponível: {e}")
# OpenAI
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key:
try:
self.clients[LLMProvider.OPENAI] = OpenAIClient(api_key=openai_key)
logger.info("✅ OpenAIClient inicializado")
except Exception as e:
logger.warning(f"⚠️ OpenAIClient não disponível: {e}")
# Anthropic
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
if anthropic_key:
try:
self.clients[LLMProvider.ANTHROPIC] = AnthropicClient(api_key=anthropic_key)
logger.info("✅ AnthropicClient inicializado")
except Exception as e:
logger.warning(f"⚠️ AnthropicClient não disponível: {e}")
async def generate(
self,
provider: str,
model: str,
system_prompt: str,
user_prompt: str,
temperature: float = 0.3,
max_tokens: int = 4000,
mode: RequestMode = RequestMode.IMMEDIATE
) -> Dict[str, Any]:
"""
Gera resposta LLM (imediata, batch ou paralela)
Args:
provider: Provider LLM (groq, openai, anthropic)
model: Nome do modelo
system_prompt: Prompt do sistema
user_prompt: Prompt do usuário
temperature: Temperatura
max_tokens: Máximo de tokens
mode: Modo de execução
Returns:
Dict com resultado
"""
# Mapear provider string para enum
provider_enum = LLMProvider(provider.lower())
if provider_enum not in self.clients:
raise ValueError(f"Provider {provider} não disponível")
client = self.clients[provider_enum]
# Executar baseado no modo
if mode == RequestMode.IMMEDIATE:
return await self._execute_immediate(
client, model, system_prompt, user_prompt,
temperature, max_tokens
)
elif mode == RequestMode.BATCH:
# Adicionar à fila de batch
return await self._queue_for_batch(
provider, model, system_prompt, user_prompt,
temperature, max_tokens
)
elif mode == RequestMode.PARALLEL:
# Executar em worker pool
return await self._execute_parallel(
client, model, system_prompt, user_prompt,
temperature, max_tokens
)
async def _execute_immediate(
self,
client: Any,
model: str,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int
) -> Dict[str, Any]:
"""Execução imediata"""
try:
# Chamar método do client específico
if hasattr(client, 'chat_completion'):
response = await client.chat_completion(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"}
)
elif hasattr(client, 'generate'):
response = await client.generate(
prompt="{system_prompt}\n\n{user_prompt}",
temperature=temperature,
max_tokens=max_tokens
)
else:
raise ValueError(f"Client {type(client)} não suporta geração")
return {
'status': 'success',
'result': response,
'mode': 'immediate'
}
except Exception as e:
logger.error(f"❌ Erro na execução imediata: {e}", exc_info=True)
return {
'status': 'error',
'error': str(e),
'mode': 'immediate'
}
async def _queue_for_batch(
self,
provider: str,
model: str,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int
) -> Dict[str, Any]:
"""Adiciona requisição à fila de batch"""
request_id = f"req_{len(self.pending_requests)}"
self.pending_requests.append({
'id': request_id,
'provider': provider,
'model': model,
'system_prompt': system_prompt,
'user_prompt': user_prompt,
'temperature': temperature,
'max_tokens': max_tokens
})
logger.info(f"📥 Requisição adicionada à fila: {request_id}")
return {
'status': 'queued',
'request_id': request_id,
'mode': 'batch'
}
async def execute_batch(self, batch_id: str) -> Dict[str, Any]:
"""
Executa batch de requisições pendentes
Returns:
Dict com resultados
"""
if not self.batch_processor:
raise ValueError("Batch processor não disponível")
if not self.pending_requests:
logger.warning("⚠️ Nenhuma requisição pendente para batch")
return {}
logger.info(f"🚀 Executando batch: {len(self.pending_requests)} requisições")
# Converter para BatchRequest
batch_requests = []
for req in self.pending_requests:
batch_req = BatchRequest(
custom_id=req['id'],
specialist_id=0, # Será preenchido pelo processor
system_prompt=req['system_prompt'],
user_prompt=req['user_prompt'],
model=req['model'],
temperature=req['temperature'],
max_tokens=req['max_tokens']
)
batch_requests.append(batch_req)
# Executar batch
results = await self.batch_processor.process_batch(
requests=batch_requests,
batch_id=batch_id
)
# Limpar fila
self.pending_requests.clear()
return results
async def _execute_parallel(
self,
client: Any,
model: str,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int
) -> Dict[str, Any]:
"""Execução paralela com worker pool"""
# Similar a immediate mas em um worker pool gerenciado
return await self._execute_immediate(
client, model, system_prompt, user_prompt,
temperature, max_tokens
)