""" LLM Manager - Gerencia requisições LLM com suporte a batch e paralelo """ import os import logging import asyncio from typing import Dict, Any, List, Optional from enum import Enum from pathlib import Path from llm.clients.groq_client import GroqClient from llm.clients.openai_client import OpenAIClient from llm.clients.anthropic_client import AnthropicClient from llm.batch_processor import BatchProcessor, BatchRequest logger = logging.getLogger(__name__) class LLMProvider(Enum): GROQ = "groq" OPENAI = "openai" ANTHROPIC = "anthropic" class RequestMode(Enum): IMMEDIATE = "immediate" # Execução imediata BATCH = "batch" # Batch processing PARALLEL = "parallel" # Paralelo com workers class LLMManager: """Gerenciador central de LLMs com suporte a batch e paralelo""" def __init__(self): self.clients: Dict[LLMProvider, Any] = {} self.batch_processor: Optional[BatchProcessor] = None self.pending_requests: List[Dict[str, Any]] = [] # Inicializar clients disponíveis self._init_clients() # Inicializar batch processor se Groq disponível if LLMProvider.GROQ in self.clients: groq_key = os.getenv("GROQ_API_KEY") work_dir = Path(os.getenv("WORK_DIR", "./work")) self.batch_processor = BatchProcessor( api_key=groq_key, work_dir=work_dir ) def _init_clients(self): """Inicializa clients LLM disponíveis""" # Groq groq_key = os.getenv("GROQ_API_KEY") if groq_key: try: self.clients[LLMProvider.GROQ] = GroqClient(api_key=groq_key) logger.info("✅ GroqClient inicializado") except Exception as e: logger.warning(f"⚠️ GroqClient não disponível: {e}") # OpenAI openai_key = os.getenv("OPENAI_API_KEY") if openai_key: try: self.clients[LLMProvider.OPENAI] = OpenAIClient(api_key=openai_key) logger.info("✅ OpenAIClient inicializado") except Exception as e: logger.warning(f"⚠️ OpenAIClient não disponível: {e}") # Anthropic anthropic_key = os.getenv("ANTHROPIC_API_KEY") if anthropic_key: try: self.clients[LLMProvider.ANTHROPIC] = AnthropicClient(api_key=anthropic_key) logger.info("✅ AnthropicClient inicializado") except Exception as e: logger.warning(f"⚠️ AnthropicClient não disponível: {e}") async def generate( self, provider: str, model: str, system_prompt: str, user_prompt: str, temperature: float = 0.3, max_tokens: int = 4000, mode: RequestMode = RequestMode.IMMEDIATE ) -> Dict[str, Any]: """ Gera resposta LLM (imediata, batch ou paralela) Args: provider: Provider LLM (groq, openai, anthropic) model: Nome do modelo system_prompt: Prompt do sistema user_prompt: Prompt do usuário temperature: Temperatura max_tokens: Máximo de tokens mode: Modo de execução Returns: Dict com resultado """ # Mapear provider string para enum provider_enum = LLMProvider(provider.lower()) if provider_enum not in self.clients: raise ValueError(f"Provider {provider} não disponível") client = self.clients[provider_enum] # Executar baseado no modo if mode == RequestMode.IMMEDIATE: return await self._execute_immediate( client, model, system_prompt, user_prompt, temperature, max_tokens ) elif mode == RequestMode.BATCH: # Adicionar à fila de batch return await self._queue_for_batch( provider, model, system_prompt, user_prompt, temperature, max_tokens ) elif mode == RequestMode.PARALLEL: # Executar em worker pool return await self._execute_parallel( client, model, system_prompt, user_prompt, temperature, max_tokens ) async def _execute_immediate( self, client: Any, model: str, system_prompt: str, user_prompt: str, temperature: float, max_tokens: int ) -> Dict[str, Any]: """Execução imediata""" try: # Chamar método do client específico if hasattr(client, 'chat_completion'): response = await client.chat_completion( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=temperature, max_tokens=max_tokens, response_format={"type": "json_object"} ) elif hasattr(client, 'generate'): response = await client.generate( prompt="{system_prompt}\n\n{user_prompt}", temperature=temperature, max_tokens=max_tokens ) else: raise ValueError(f"Client {type(client)} não suporta geração") return { 'status': 'success', 'result': response, 'mode': 'immediate' } except Exception as e: logger.error(f"❌ Erro na execução imediata: {e}", exc_info=True) return { 'status': 'error', 'error': str(e), 'mode': 'immediate' } async def _queue_for_batch( self, provider: str, model: str, system_prompt: str, user_prompt: str, temperature: float, max_tokens: int ) -> Dict[str, Any]: """Adiciona requisição à fila de batch""" request_id = f"req_{len(self.pending_requests)}" self.pending_requests.append({ 'id': request_id, 'provider': provider, 'model': model, 'system_prompt': system_prompt, 'user_prompt': user_prompt, 'temperature': temperature, 'max_tokens': max_tokens }) logger.info(f"📥 Requisição adicionada à fila: {request_id}") return { 'status': 'queued', 'request_id': request_id, 'mode': 'batch' } async def execute_batch(self, batch_id: str) -> Dict[str, Any]: """ Executa batch de requisições pendentes Returns: Dict com resultados """ if not self.batch_processor: raise ValueError("Batch processor não disponível") if not self.pending_requests: logger.warning("⚠️ Nenhuma requisição pendente para batch") return {} logger.info(f"🚀 Executando batch: {len(self.pending_requests)} requisições") # Converter para BatchRequest batch_requests = [] for req in self.pending_requests: batch_req = BatchRequest( custom_id=req['id'], specialist_id=0, # Será preenchido pelo processor system_prompt=req['system_prompt'], user_prompt=req['user_prompt'], model=req['model'], temperature=req['temperature'], max_tokens=req['max_tokens'] ) batch_requests.append(batch_req) # Executar batch results = await self.batch_processor.process_batch( requests=batch_requests, batch_id=batch_id ) # Limpar fila self.pending_requests.clear() return results async def _execute_parallel( self, client: Any, model: str, system_prompt: str, user_prompt: str, temperature: float, max_tokens: int ) -> Dict[str, Any]: """Execução paralela com worker pool""" # Similar a immediate mas em um worker pool gerenciado return await self._execute_immediate( client, model, system_prompt, user_prompt, temperature, max_tokens )