Spaces:
Runtime error
Runtime error
| """ | |
| LLM Manager - Gerencia requisições LLM com suporte a batch e paralelo | |
| """ | |
| import os | |
| import logging | |
| import asyncio | |
| from typing import Dict, Any, List, Optional | |
| from enum import Enum | |
| from pathlib import Path | |
| from llm.clients.groq_client import GroqClient | |
| from llm.clients.openai_client import OpenAIClient | |
| from llm.clients.anthropic_client import AnthropicClient | |
| from llm.batch_processor import BatchProcessor, BatchRequest | |
| logger = logging.getLogger(__name__) | |
| class LLMProvider(Enum): | |
| GROQ = "groq" | |
| OPENAI = "openai" | |
| ANTHROPIC = "anthropic" | |
| class RequestMode(Enum): | |
| IMMEDIATE = "immediate" # Execução imediata | |
| BATCH = "batch" # Batch processing | |
| PARALLEL = "parallel" # Paralelo com workers | |
| class LLMManager: | |
| """Gerenciador central de LLMs com suporte a batch e paralelo""" | |
| def __init__(self): | |
| self.clients: Dict[LLMProvider, Any] = {} | |
| self.batch_processor: Optional[BatchProcessor] = None | |
| self.pending_requests: List[Dict[str, Any]] = [] | |
| # Inicializar clients disponíveis | |
| self._init_clients() | |
| # Inicializar batch processor se Groq disponível | |
| if LLMProvider.GROQ in self.clients: | |
| groq_key = os.getenv("GROQ_API_KEY") | |
| work_dir = Path(os.getenv("WORK_DIR", "./work")) | |
| self.batch_processor = BatchProcessor( | |
| api_key=groq_key, | |
| work_dir=work_dir | |
| ) | |
| def _init_clients(self): | |
| """Inicializa clients LLM disponíveis""" | |
| # Groq | |
| groq_key = os.getenv("GROQ_API_KEY") | |
| if groq_key: | |
| try: | |
| self.clients[LLMProvider.GROQ] = GroqClient(api_key=groq_key) | |
| logger.info("✅ GroqClient inicializado") | |
| except Exception as e: | |
| logger.warning(f"⚠️ GroqClient não disponível: {e}") | |
| # OpenAI | |
| openai_key = os.getenv("OPENAI_API_KEY") | |
| if openai_key: | |
| try: | |
| self.clients[LLMProvider.OPENAI] = OpenAIClient(api_key=openai_key) | |
| logger.info("✅ OpenAIClient inicializado") | |
| except Exception as e: | |
| logger.warning(f"⚠️ OpenAIClient não disponível: {e}") | |
| # Anthropic | |
| anthropic_key = os.getenv("ANTHROPIC_API_KEY") | |
| if anthropic_key: | |
| try: | |
| self.clients[LLMProvider.ANTHROPIC] = AnthropicClient(api_key=anthropic_key) | |
| logger.info("✅ AnthropicClient inicializado") | |
| except Exception as e: | |
| logger.warning(f"⚠️ AnthropicClient não disponível: {e}") | |
| async def generate( | |
| self, | |
| provider: str, | |
| model: str, | |
| system_prompt: str, | |
| user_prompt: str, | |
| temperature: float = 0.3, | |
| max_tokens: int = 4000, | |
| mode: RequestMode = RequestMode.IMMEDIATE | |
| ) -> Dict[str, Any]: | |
| """ | |
| Gera resposta LLM (imediata, batch ou paralela) | |
| Args: | |
| provider: Provider LLM (groq, openai, anthropic) | |
| model: Nome do modelo | |
| system_prompt: Prompt do sistema | |
| user_prompt: Prompt do usuário | |
| temperature: Temperatura | |
| max_tokens: Máximo de tokens | |
| mode: Modo de execução | |
| Returns: | |
| Dict com resultado | |
| """ | |
| # Mapear provider string para enum | |
| provider_enum = LLMProvider(provider.lower()) | |
| if provider_enum not in self.clients: | |
| raise ValueError(f"Provider {provider} não disponível") | |
| client = self.clients[provider_enum] | |
| # Executar baseado no modo | |
| if mode == RequestMode.IMMEDIATE: | |
| return await self._execute_immediate( | |
| client, model, system_prompt, user_prompt, | |
| temperature, max_tokens | |
| ) | |
| elif mode == RequestMode.BATCH: | |
| # Adicionar à fila de batch | |
| return await self._queue_for_batch( | |
| provider, model, system_prompt, user_prompt, | |
| temperature, max_tokens | |
| ) | |
| elif mode == RequestMode.PARALLEL: | |
| # Executar em worker pool | |
| return await self._execute_parallel( | |
| client, model, system_prompt, user_prompt, | |
| temperature, max_tokens | |
| ) | |
| async def _execute_immediate( | |
| self, | |
| client: Any, | |
| model: str, | |
| system_prompt: str, | |
| user_prompt: str, | |
| temperature: float, | |
| max_tokens: int | |
| ) -> Dict[str, Any]: | |
| """Execução imediata""" | |
| try: | |
| # Chamar método do client específico | |
| if hasattr(client, 'chat_completion'): | |
| response = await client.chat_completion( | |
| model=model, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| response_format={"type": "json_object"} | |
| ) | |
| elif hasattr(client, 'generate'): | |
| response = await client.generate( | |
| prompt="{system_prompt}\n\n{user_prompt}", | |
| temperature=temperature, | |
| max_tokens=max_tokens | |
| ) | |
| else: | |
| raise ValueError(f"Client {type(client)} não suporta geração") | |
| return { | |
| 'status': 'success', | |
| 'result': response, | |
| 'mode': 'immediate' | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Erro na execução imediata: {e}", exc_info=True) | |
| return { | |
| 'status': 'error', | |
| 'error': str(e), | |
| 'mode': 'immediate' | |
| } | |
| async def _queue_for_batch( | |
| self, | |
| provider: str, | |
| model: str, | |
| system_prompt: str, | |
| user_prompt: str, | |
| temperature: float, | |
| max_tokens: int | |
| ) -> Dict[str, Any]: | |
| """Adiciona requisição à fila de batch""" | |
| request_id = f"req_{len(self.pending_requests)}" | |
| self.pending_requests.append({ | |
| 'id': request_id, | |
| 'provider': provider, | |
| 'model': model, | |
| 'system_prompt': system_prompt, | |
| 'user_prompt': user_prompt, | |
| 'temperature': temperature, | |
| 'max_tokens': max_tokens | |
| }) | |
| logger.info(f"📥 Requisição adicionada à fila: {request_id}") | |
| return { | |
| 'status': 'queued', | |
| 'request_id': request_id, | |
| 'mode': 'batch' | |
| } | |
| async def execute_batch(self, batch_id: str) -> Dict[str, Any]: | |
| """ | |
| Executa batch de requisições pendentes | |
| Returns: | |
| Dict com resultados | |
| """ | |
| if not self.batch_processor: | |
| raise ValueError("Batch processor não disponível") | |
| if not self.pending_requests: | |
| logger.warning("⚠️ Nenhuma requisição pendente para batch") | |
| return {} | |
| logger.info(f"🚀 Executando batch: {len(self.pending_requests)} requisições") | |
| # Converter para BatchRequest | |
| batch_requests = [] | |
| for req in self.pending_requests: | |
| batch_req = BatchRequest( | |
| custom_id=req['id'], | |
| specialist_id=0, # Será preenchido pelo processor | |
| system_prompt=req['system_prompt'], | |
| user_prompt=req['user_prompt'], | |
| model=req['model'], | |
| temperature=req['temperature'], | |
| max_tokens=req['max_tokens'] | |
| ) | |
| batch_requests.append(batch_req) | |
| # Executar batch | |
| results = await self.batch_processor.process_batch( | |
| requests=batch_requests, | |
| batch_id=batch_id | |
| ) | |
| # Limpar fila | |
| self.pending_requests.clear() | |
| return results | |
| async def _execute_parallel( | |
| self, | |
| client: Any, | |
| model: str, | |
| system_prompt: str, | |
| user_prompt: str, | |
| temperature: float, | |
| max_tokens: int | |
| ) -> Dict[str, Any]: | |
| """Execução paralela com worker pool""" | |
| # Similar a immediate mas em um worker pool gerenciado | |
| return await self._execute_immediate( | |
| client, model, system_prompt, user_prompt, | |
| temperature, max_tokens | |
| ) | |