""" Batch Processor - Processamento em batch via Groq Batch API Baseado no batch_client fornecido pelo usuário """ import json import time import logging import asyncio import requests from pathlib import Path from typing import List, Dict, Any, Optional from datetime import datetime logger = logging.getLogger(__name__) class BatchRequest: """Representa uma requisição no batch""" def __init__( self, custom_id: str, specialist_id: int, system_prompt: str, user_prompt: str, model: str, temperature: float, max_tokens: int ): self.custom_id = custom_id self.specialist_id = specialist_id self.system_prompt = system_prompt self.user_prompt = user_prompt self.model = model self.temperature = temperature self.max_tokens = max_tokens def to_jsonl_line(self) -> str: """Converte para linha JSONL do formato Groq Batch API""" batch_line = { "custom_id": self.custom_id, "method": "POST", "url": "/v1/chat/completions", "body": { "model": self.model, "messages": [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": self.user_prompt} ], "temperature": self.temperature, "max_tokens": self.max_tokens, "response_format": {"type": "json_object"} } } return json.dumps(batch_line, ensure_ascii=False) class BatchProcessor: """Processador de requisições em batch via Groq API""" def __init__( self, api_key: str, work_dir: Path, completion_window: str = "24h" ): """ Args: api_key: Groq API key work_dir: Diretório de trabalho para arquivos temporários completion_window: Janela de conclusão (24h padrão) """ self.api_key = api_key self.work_dir = Path(work_dir) self.completion_window = completion_window self.base_url = "https://api.groq.com/openai/v1" self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } # Criar diretórios self.batches_dir = self.work_dir / "batches" self.batches_dir.mkdir(parents=True, exist_ok=True) def create_batch_file( self, requests: List[BatchRequest], batch_id: str ) -> Optional[Path]: """ Cria arquivo JSONL para batch Args: requests: Lista de requisições batch_id: ID do batch Returns: Path do arquivo criado ou None """ try: batch_file = self.batches_dir / f"batch_input_{batch_id}.jsonl" logger.info(f"📝 Criando batch file: {batch_file.name}") with open(batch_file, 'w', encoding='utf-8') as f: for req in requests: f.write(req.to_jsonl_line() + '\n') logger.info(f"✅ Batch file criado: {len(requests)} requisições") return batch_file except Exception as e: logger.error(f"❌ Erro ao criar batch file: {e}", exc_info=True) return None def upload_batch_file(self, file_path: Path) -> Optional[str]: """ Upload do arquivo JSONL para Groq Files API Returns: file_id ou None """ logger.info(f"📤 Uploading batch file: {file_path.name}") try: with open(file_path, 'rb') as f: files = {'file': (file_path.name, f, 'application/jsonl')} data = {'purpose': 'batch'} response = requests.post( f"{self.base_url}/files", headers={"Authorization": f"Bearer {self.api_key}"}, files=files, data=data, timeout=120 ) response.raise_for_status() result = response.json() file_id = result['id'] logger.info(f"✅ File uploaded: {file_id}") return file_id except Exception as e: logger.error(f"❌ Erro no upload: {e}", exc_info=True) return None def create_batch_job(self, input_file_id: str) -> Optional[str]: """ Cria batch job Returns: batch_id ou None """ logger.info(f"🚀 Criando batch job (window: {self.completion_window})") try: payload = { "input_file_id": input_file_id, "endpoint": "/v1/chat/completions", "completion_window": self.completion_window } response = requests.post( f"{self.base_url}/batches", headers=self.headers, json=payload, timeout=30 ) response.raise_for_status() result = response.json() batch_id = result['id'] logger.info(f"✅ Batch job criado: {batch_id}") return batch_id except Exception as e: logger.error(f"❌ Erro ao criar batch: {e}", exc_info=True) return None def check_batch_status(self, batch_id: str) -> Optional[Dict]: """Checa status do batch""" try: response = requests.get( f"{self.base_url}/batches/{batch_id}", headers=self.headers, timeout=30 ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"❌ Erro ao checar status: {e}", exc_info=True) return None async def wait_for_completion( self, batch_id: str, poll_interval: int = 30, max_wait_time: int = 86400 # 24h ) -> Optional[Dict]: """ Aguarda conclusão do batch com polling Args: batch_id: ID do batch poll_interval: Intervalo entre polls (segundos) max_wait_time: Tempo máximo de espera (segundos) Returns: Info do batch completo ou None """ logger.info(f"⏳ Aguardando conclusão do batch {batch_id}...") status_map = { "validating": "🔍 Validando", "in_progress": "⚙️ Processando", "finalizing": "🏁 Finalizando", "completed": "✅ Completo", "failed": "❌ Falhou", "expired": "⏰ Expirado", "cancelled": "🚫 Cancelado" } start_time = time.time() last_status = None while True: # Verificar timeout elapsed = time.time() - start_time if elapsed > max_wait_time: logger.error(f"⏰ Timeout: batch excedeu {max_wait_time}s") return None # Checar status batch_info = self.check_batch_status(batch_id) if not batch_info: logger.warning("⚠️ Erro ao checar status, tentando novamente...") await asyncio.sleep(poll_interval) continue status = batch_info['status'] request_counts = batch_info.get('request_counts', {}) # Log mudança de status if status != last_status: emoji = status_map.get(status, "❓") logger.info(f"{emoji} Status: {status}") last_status = status # Log progresso total = request_counts.get('total', 0) completed = request_counts.get('completed', 0) failed = request_counts.get('failed', 0) if total > 0: progress = (completed + failed) / total * 100 logger.info( f"📊 Progresso: {completed}/{total} OK, {failed} erros " f"({progress:.1f}%) - {elapsed:.0f}s" ) # Estados finais if status == "completed": logger.info(f"✅ Batch completo! {completed} sucessos, {failed} falhas") return batch_info elif status in ["failed", "expired", "cancelled"]: logger.error(f"❌ Batch terminou com status: {status}") return batch_info # Aguarda próximo poll await asyncio.sleep(poll_interval) def download_results( self, output_file_id: str, batch_id: str ) -> Optional[Path]: """ Baixa arquivo de resultados Returns: Path do arquivo ou None """ logger.info(f"⬇️ Baixando resultados: {output_file_id}") try: response = requests.get( f"{self.base_url}/files/{output_file_id}/content", headers=self.headers, timeout=120 ) response.raise_for_status() output_file = self.batches_dir / f"batch_output_{batch_id}.jsonl" output_file.write_bytes(response.content) logger.info(f"✅ Resultados salvos: {output_file}") return output_file except Exception as e: logger.error(f"❌ Erro ao baixar resultados: {e}", exc_info=True) return None def parse_results(self, output_file: Path) -> Dict[str, Dict[str, Any]]: """ Parseia arquivo JSONL de resultados Returns: Dict mapeando custom_id -> resultado """ results = {} try: with open(output_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue try: item = json.loads(line) custom_id = item.get('custom_id') response = item.get('response', {}) if response.get('status_code') == 200: body = response.get('body', {}) content = body.get('choices', [{}])[0].get('message', {}).get('content', '{}') tokens = body.get('usage', {}).get('total_tokens', 0) try: parsed_content = json.loads(content) except json.JSONDecodeError: parsed_content = {"raw_content": content} results[custom_id] = { 'status': 'success', 'result': parsed_content, 'tokens': tokens } else: error_msg = response.get('body', {}).get('error', {}).get('message', 'Unknown error') results[custom_id] = { 'status': 'error', 'error': error_msg, 'tokens': 0 } except json.JSONDecodeError as e: logger.warning(f"⚠️ Erro ao parsear linha: {e}") continue logger.info(f"✅ Parseados {len(results)} resultados") return results except Exception as e: logger.error(f"❌ Erro ao parsear resultados: {e}", exc_info=True) return {} async def process_batch( self, requests: List[BatchRequest], batch_id: str, poll_interval: int = 30 ) -> Optional[Dict[str, Dict[str, Any]]]: """ Fluxo completo de batch processing Args: requests: Lista de requisições batch_id: ID do batch poll_interval: Intervalo de polling Returns: Dict com resultados mapeados por custom_id """ logger.info(f"\n{'='*80}") logger.info(f"🔄 BATCH PROCESSING - {batch_id}") logger.info(f" {len(requests)} requisições") logger.info(f"{'='*80}") # 1. Criar arquivo JSONL batch_file = self.create_batch_file(requests, batch_id) if not batch_file: return None # 2. Upload file_id = self.upload_batch_file(batch_file) if not file_id: return None # 3. Criar batch job groq_batch_id = self.create_batch_job(file_id) if not groq_batch_id: return None # 4. Aguardar conclusão batch_result = await self.wait_for_completion(groq_batch_id, poll_interval) if not batch_result: return None # 5. Baixar e parsear resultados if batch_result['status'] == 'completed': output_file_id = batch_result.get('output_file_id') if output_file_id: output_file = self.download_results(output_file_id, batch_id) if output_file: results = self.parse_results(output_file) return results return None