Spaces:
Runtime error
Runtime error
| """ | |
| Batch Processor - Processamento em batch via Groq Batch API | |
| Baseado no batch_client fornecido pelo usuário | |
| """ | |
| import json | |
| import time | |
| import logging | |
| import asyncio | |
| import requests | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class BatchRequest: | |
| """Representa uma requisição no batch""" | |
| def __init__( | |
| self, | |
| custom_id: str, | |
| specialist_id: int, | |
| system_prompt: str, | |
| user_prompt: str, | |
| model: str, | |
| temperature: float, | |
| max_tokens: int | |
| ): | |
| self.custom_id = custom_id | |
| self.specialist_id = specialist_id | |
| self.system_prompt = system_prompt | |
| self.user_prompt = user_prompt | |
| self.model = model | |
| self.temperature = temperature | |
| self.max_tokens = max_tokens | |
| def to_jsonl_line(self) -> str: | |
| """Converte para linha JSONL do formato Groq Batch API""" | |
| batch_line = { | |
| "custom_id": self.custom_id, | |
| "method": "POST", | |
| "url": "/v1/chat/completions", | |
| "body": { | |
| "model": self.model, | |
| "messages": [ | |
| {"role": "system", "content": self.system_prompt}, | |
| {"role": "user", "content": self.user_prompt} | |
| ], | |
| "temperature": self.temperature, | |
| "max_tokens": self.max_tokens, | |
| "response_format": {"type": "json_object"} | |
| } | |
| } | |
| return json.dumps(batch_line, ensure_ascii=False) | |
| class BatchProcessor: | |
| """Processador de requisições em batch via Groq API""" | |
| def __init__( | |
| self, | |
| api_key: str, | |
| work_dir: Path, | |
| completion_window: str = "24h" | |
| ): | |
| """ | |
| Args: | |
| api_key: Groq API key | |
| work_dir: Diretório de trabalho para arquivos temporários | |
| completion_window: Janela de conclusão (24h padrão) | |
| """ | |
| self.api_key = api_key | |
| self.work_dir = Path(work_dir) | |
| self.completion_window = completion_window | |
| self.base_url = "https://api.groq.com/openai/v1" | |
| self.headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| # Criar diretórios | |
| self.batches_dir = self.work_dir / "batches" | |
| self.batches_dir.mkdir(parents=True, exist_ok=True) | |
| def create_batch_file( | |
| self, | |
| requests: List[BatchRequest], | |
| batch_id: str | |
| ) -> Optional[Path]: | |
| """ | |
| Cria arquivo JSONL para batch | |
| Args: | |
| requests: Lista de requisições | |
| batch_id: ID do batch | |
| Returns: | |
| Path do arquivo criado ou None | |
| """ | |
| try: | |
| batch_file = self.batches_dir / f"batch_input_{batch_id}.jsonl" | |
| logger.info(f"📝 Criando batch file: {batch_file.name}") | |
| with open(batch_file, 'w', encoding='utf-8') as f: | |
| for req in requests: | |
| f.write(req.to_jsonl_line() + '\n') | |
| logger.info(f"✅ Batch file criado: {len(requests)} requisições") | |
| return batch_file | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao criar batch file: {e}", exc_info=True) | |
| return None | |
| def upload_batch_file(self, file_path: Path) -> Optional[str]: | |
| """ | |
| Upload do arquivo JSONL para Groq Files API | |
| Returns: | |
| file_id ou None | |
| """ | |
| logger.info(f"📤 Uploading batch file: {file_path.name}") | |
| try: | |
| with open(file_path, 'rb') as f: | |
| files = {'file': (file_path.name, f, 'application/jsonl')} | |
| data = {'purpose': 'batch'} | |
| response = requests.post( | |
| f"{self.base_url}/files", | |
| headers={"Authorization": f"Bearer {self.api_key}"}, | |
| files=files, | |
| data=data, | |
| timeout=120 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| file_id = result['id'] | |
| logger.info(f"✅ File uploaded: {file_id}") | |
| return file_id | |
| except Exception as e: | |
| logger.error(f"❌ Erro no upload: {e}", exc_info=True) | |
| return None | |
| def create_batch_job(self, input_file_id: str) -> Optional[str]: | |
| """ | |
| Cria batch job | |
| Returns: | |
| batch_id ou None | |
| """ | |
| logger.info(f"🚀 Criando batch job (window: {self.completion_window})") | |
| try: | |
| payload = { | |
| "input_file_id": input_file_id, | |
| "endpoint": "/v1/chat/completions", | |
| "completion_window": self.completion_window | |
| } | |
| response = requests.post( | |
| f"{self.base_url}/batches", | |
| headers=self.headers, | |
| json=payload, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| batch_id = result['id'] | |
| logger.info(f"✅ Batch job criado: {batch_id}") | |
| return batch_id | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao criar batch: {e}", exc_info=True) | |
| return None | |
| def check_batch_status(self, batch_id: str) -> Optional[Dict]: | |
| """Checa status do batch""" | |
| try: | |
| response = requests.get( | |
| f"{self.base_url}/batches/{batch_id}", | |
| headers=self.headers, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao checar status: {e}", exc_info=True) | |
| return None | |
| async def wait_for_completion( | |
| self, | |
| batch_id: str, | |
| poll_interval: int = 30, | |
| max_wait_time: int = 86400 # 24h | |
| ) -> Optional[Dict]: | |
| """ | |
| Aguarda conclusão do batch com polling | |
| Args: | |
| batch_id: ID do batch | |
| poll_interval: Intervalo entre polls (segundos) | |
| max_wait_time: Tempo máximo de espera (segundos) | |
| Returns: | |
| Info do batch completo ou None | |
| """ | |
| logger.info(f"⏳ Aguardando conclusão do batch {batch_id}...") | |
| status_map = { | |
| "validating": "🔍 Validando", | |
| "in_progress": "⚙️ Processando", | |
| "finalizing": "🏁 Finalizando", | |
| "completed": "✅ Completo", | |
| "failed": "❌ Falhou", | |
| "expired": "⏰ Expirado", | |
| "cancelled": "🚫 Cancelado" | |
| } | |
| start_time = time.time() | |
| last_status = None | |
| while True: | |
| # Verificar timeout | |
| elapsed = time.time() - start_time | |
| if elapsed > max_wait_time: | |
| logger.error(f"⏰ Timeout: batch excedeu {max_wait_time}s") | |
| return None | |
| # Checar status | |
| batch_info = self.check_batch_status(batch_id) | |
| if not batch_info: | |
| logger.warning("⚠️ Erro ao checar status, tentando novamente...") | |
| await asyncio.sleep(poll_interval) | |
| continue | |
| status = batch_info['status'] | |
| request_counts = batch_info.get('request_counts', {}) | |
| # Log mudança de status | |
| if status != last_status: | |
| emoji = status_map.get(status, "❓") | |
| logger.info(f"{emoji} Status: {status}") | |
| last_status = status | |
| # Log progresso | |
| total = request_counts.get('total', 0) | |
| completed = request_counts.get('completed', 0) | |
| failed = request_counts.get('failed', 0) | |
| if total > 0: | |
| progress = (completed + failed) / total * 100 | |
| logger.info( | |
| f"📊 Progresso: {completed}/{total} OK, {failed} erros " | |
| f"({progress:.1f}%) - {elapsed:.0f}s" | |
| ) | |
| # Estados finais | |
| if status == "completed": | |
| logger.info(f"✅ Batch completo! {completed} sucessos, {failed} falhas") | |
| return batch_info | |
| elif status in ["failed", "expired", "cancelled"]: | |
| logger.error(f"❌ Batch terminou com status: {status}") | |
| return batch_info | |
| # Aguarda próximo poll | |
| await asyncio.sleep(poll_interval) | |
| def download_results( | |
| self, | |
| output_file_id: str, | |
| batch_id: str | |
| ) -> Optional[Path]: | |
| """ | |
| Baixa arquivo de resultados | |
| Returns: | |
| Path do arquivo ou None | |
| """ | |
| logger.info(f"⬇️ Baixando resultados: {output_file_id}") | |
| try: | |
| response = requests.get( | |
| f"{self.base_url}/files/{output_file_id}/content", | |
| headers=self.headers, | |
| timeout=120 | |
| ) | |
| response.raise_for_status() | |
| output_file = self.batches_dir / f"batch_output_{batch_id}.jsonl" | |
| output_file.write_bytes(response.content) | |
| logger.info(f"✅ Resultados salvos: {output_file}") | |
| return output_file | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao baixar resultados: {e}", exc_info=True) | |
| return None | |
| def parse_results(self, output_file: Path) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Parseia arquivo JSONL de resultados | |
| Returns: | |
| Dict mapeando custom_id -> resultado | |
| """ | |
| results = {} | |
| try: | |
| with open(output_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| item = json.loads(line) | |
| custom_id = item.get('custom_id') | |
| response = item.get('response', {}) | |
| if response.get('status_code') == 200: | |
| body = response.get('body', {}) | |
| content = body.get('choices', [{}])[0].get('message', {}).get('content', '{}') | |
| tokens = body.get('usage', {}).get('total_tokens', 0) | |
| try: | |
| parsed_content = json.loads(content) | |
| except json.JSONDecodeError: | |
| parsed_content = {"raw_content": content} | |
| results[custom_id] = { | |
| 'status': 'success', | |
| 'result': parsed_content, | |
| 'tokens': tokens | |
| } | |
| else: | |
| error_msg = response.get('body', {}).get('error', {}).get('message', 'Unknown error') | |
| results[custom_id] = { | |
| 'status': 'error', | |
| 'error': error_msg, | |
| 'tokens': 0 | |
| } | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"⚠️ Erro ao parsear linha: {e}") | |
| continue | |
| logger.info(f"✅ Parseados {len(results)} resultados") | |
| return results | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao parsear resultados: {e}", exc_info=True) | |
| return {} | |
| async def process_batch( | |
| self, | |
| requests: List[BatchRequest], | |
| batch_id: str, | |
| poll_interval: int = 30 | |
| ) -> Optional[Dict[str, Dict[str, Any]]]: | |
| """ | |
| Fluxo completo de batch processing | |
| Args: | |
| requests: Lista de requisições | |
| batch_id: ID do batch | |
| poll_interval: Intervalo de polling | |
| Returns: | |
| Dict com resultados mapeados por custom_id | |
| """ | |
| logger.info(f"\n{'='*80}") | |
| logger.info(f"🔄 BATCH PROCESSING - {batch_id}") | |
| logger.info(f" {len(requests)} requisições") | |
| logger.info(f"{'='*80}") | |
| # 1. Criar arquivo JSONL | |
| batch_file = self.create_batch_file(requests, batch_id) | |
| if not batch_file: | |
| return None | |
| # 2. Upload | |
| file_id = self.upload_batch_file(batch_file) | |
| if not file_id: | |
| return None | |
| # 3. Criar batch job | |
| groq_batch_id = self.create_batch_job(file_id) | |
| if not groq_batch_id: | |
| return None | |
| # 4. Aguardar conclusão | |
| batch_result = await self.wait_for_completion(groq_batch_id, poll_interval) | |
| if not batch_result: | |
| return None | |
| # 5. Baixar e parsear resultados | |
| if batch_result['status'] == 'completed': | |
| output_file_id = batch_result.get('output_file_id') | |
| if output_file_id: | |
| output_file = self.download_results(output_file_id, batch_id) | |
| if output_file: | |
| results = self.parse_results(output_file) | |
| return results | |
| return None | |