Spaces:
Runtime error
Runtime error
| """ | |
| ProcessingService - Usa ProcessorManager REAL com LLM | |
| Integração completa com os 9 especialistas refatorados | |
| """ | |
| import os | |
| import logging | |
| import json | |
| import tarfile | |
| import hashlib | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| from pathlib import Path | |
| from llm.llm_manager import LLMManager | |
| from processors.processor_manager import ProcessorManager | |
| logger = logging.getLogger(__name__) | |
| class ProcessingService: | |
| """ | |
| Serviço que coordena processamento via ProcessorManager REAL | |
| MUDANÇAS: | |
| - Todos os especialistas usam LLM real | |
| - Sem simulações | |
| - Configuração via YAML | |
| - Suporte a batch e paralelo | |
| """ | |
| def __init__( | |
| self, | |
| llm_provider: str = "groq", | |
| api_key: Optional[str] = None, | |
| max_workers: int = 3 | |
| ): | |
| """ | |
| Args: | |
| llm_provider: Provider LLM (groq, openai, anthropic) | |
| api_key: API key (opcional, usa env var se não fornecido) | |
| max_workers: Workers paralelos | |
| """ | |
| self.llm_provider = llm_provider | |
| self.api_key = api_key | |
| self.max_workers = max_workers | |
| # Configurar API key no ambiente se fornecida | |
| if self.api_key: | |
| env_key = f"{llm_provider.upper()}_API_KEY" | |
| os.environ[env_key] = self.api_key | |
| logger.info(f"✅ API key configurada para {env_key}") | |
| # Criar LLMManager | |
| self.llm_manager = LLMManager() | |
| # Criar ProcessorManager com LLM Manager | |
| self.processor_manager = ProcessorManager( | |
| llm_manager=self.llm_manager, | |
| max_workers=max_workers | |
| ) | |
| logger.info( | |
| f"✅ ProcessingService inicializado " | |
| f"(provider={llm_provider}, 9 especialistas prontos)" | |
| ) | |
| async def process_acordao( | |
| self, | |
| acordao_data: Dict[str, Any], | |
| specialist_ids: Optional[List[int]] = None, | |
| enable_parallel: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa 1 acórdão usando ProcessorManager | |
| Args: | |
| acordao_data: Dados do acórdão | |
| specialist_ids: IDs dos especialistas (default: todos) | |
| enable_parallel: Executar em paralelo | |
| Returns: | |
| Resultado consolidado dos 9 especialistas | |
| """ | |
| try: | |
| logger.info( | |
| f"🚀 Processando acórdão {acordao_data.get('acordao_id', 'unknown')} " | |
| f"com ProcessorManager (parallel={enable_parallel})" | |
| ) | |
| # Usar ProcessorManager REAL para processar | |
| if enable_parallel: | |
| result = await self.processor_manager.process_acordao_parallel( | |
| acordao_data=acordao_data, | |
| specialist_ids=specialist_ids | |
| ) | |
| else: | |
| result = await self.processor_manager.process_acordao_sequential( | |
| acordao_data=acordao_data, | |
| specialist_ids=specialist_ids | |
| ) | |
| logger.info( | |
| f"✅ Acórdão processado em {result.get('execution_time', 0):.2f}s" | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao processar acórdão: {e}", exc_info=True) | |
| return { | |
| "acordao_id": acordao_data.get("acordao_id", "unknown"), | |
| "status": "error", | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def process_batch( | |
| self, | |
| acordaos: List[Dict[str, Any]], | |
| specialist_ids: Optional[List[int]] = None, | |
| enable_parallel: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa lote de acórdãos | |
| Args: | |
| acordaos: Lista de acórdãos | |
| specialist_ids: IDs dos especialistas | |
| enable_parallel: Processar cada acórdão em paralelo | |
| Returns: | |
| Resultados consolidados | |
| """ | |
| results = [] | |
| start_time = datetime.now() | |
| logger.info(f"📚 Processando batch: {len(acordaos)} acórdãos") | |
| for idx, acordao in enumerate(acordaos, 1): | |
| logger.info(f"📄 Processando acórdão {idx}/{len(acordaos)}...") | |
| result = await self.process_acordao( | |
| acordao_data=acordao, | |
| specialist_ids=specialist_ids, | |
| enable_parallel=enable_parallel | |
| ) | |
| results.append(result) | |
| elapsed = (datetime.now() - start_time).total_seconds() | |
| successful = len([r for r in results if r.get("status") != "error"]) | |
| failed = len(results) - successful | |
| logger.info( | |
| f"✅ Batch concluído: {successful} sucessos, {failed} falhas " | |
| f"em {elapsed:.2f}s" | |
| ) | |
| return { | |
| "batch_size": len(acordaos), | |
| "processed": len(results), | |
| "successful": successful, | |
| "failed": failed, | |
| "total_execution_time": elapsed, | |
| "avg_time_per_acordao": elapsed / len(acordaos) if acordaos else 0, | |
| "results": results, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def process_jsonl_file( | |
| self, | |
| file_path: str, | |
| task_id: str, | |
| llm_provider: str = "groq", | |
| model_type: str = "balanced", | |
| enable_parallel: bool = True, | |
| max_workers: int = 3 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa arquivo JSONL completo e gera TAR.GZ com resultados | |
| Args: | |
| file_path: Caminho do arquivo JSONL | |
| task_id: ID da task | |
| llm_provider: Provider LLM | |
| model_type: Tipo do modelo | |
| enable_parallel: Processar em paralelo | |
| max_workers: Workers paralelos | |
| Returns: | |
| Metadados do processamento com caminho do arquivo | |
| """ | |
| from api.config import get_settings | |
| settings = get_settings() | |
| start_time = datetime.now() | |
| try: | |
| # Ler acórdãos do JSONL | |
| logger.info(f"📖 Lendo arquivo JSONL: {file_path}") | |
| acordaos = [] | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line_num, line in enumerate(f, 1): | |
| line = line.strip() | |
| if line: | |
| try: | |
| acordao = json.loads(line) | |
| acordaos.append(acordao) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"⚠️ Linha {line_num} inválida: {e}") | |
| logger.info(f"📚 {len(acordaos)} acórdãos carregados") | |
| if not acordaos: | |
| raise ValueError("Nenhum acórdão válido encontrado no arquivo") | |
| # Processar batch | |
| batch_result = await self.process_batch( | |
| acordaos=acordaos, | |
| specialist_ids=None, # Todos os especialistas | |
| enable_parallel=enable_parallel | |
| ) | |
| # Criar estrutura de output | |
| output_dir = Path(settings.OUTPUT_PATH) / task_id | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Salvar resultados individuais | |
| results_dir = output_dir / "results" | |
| results_dir.mkdir(exist_ok=True) | |
| for idx, result in enumerate(batch_result['results']): | |
| acordao_id = result.get('acordao_id', f'acordao_{idx:04d}') | |
| result_file = results_dir / f"{acordao_id}.json" | |
| with open(result_file, 'w', encoding='utf-8') as f: | |
| json.dump(result, f, ensure_ascii=False, indent=2) | |
| # Salvar sumário | |
| summary = { | |
| 'task_id': task_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'batch_size': batch_result['batch_size'], | |
| 'processed': batch_result['processed'], | |
| 'successful': batch_result['successful'], | |
| 'failed': batch_result['failed'], | |
| 'total_execution_time': batch_result['total_execution_time'], | |
| 'avg_time_per_acordao': batch_result['avg_time_per_acordao'], | |
| 'llm_provider': llm_provider, | |
| 'model_type': model_type, | |
| 'enable_parallel': enable_parallel, | |
| 'max_workers': max_workers | |
| } | |
| summary_file = output_dir / "summary.json" | |
| with open(summary_file, 'w', encoding='utf-8') as f: | |
| json.dump(summary, f, ensure_ascii=False, indent=2) | |
| # Criar TAR.GZ | |
| archive_dir = Path(settings.OUTPUT_PATH) / "archives" | |
| archive_dir.mkdir(parents=True, exist_ok=True) | |
| archive_path = archive_dir / f"{task_id}.tar.gz" | |
| logger.info(f"📦 Criando arquivo: {archive_path}") | |
| with tarfile.open(archive_path, "w:gz") as tar: | |
| tar.add(output_dir, arcname=task_id) | |
| # Calcular hash | |
| sha256_hash = hashlib.sha256() | |
| with open(archive_path, "rb") as f: | |
| for byte_block in iter(lambda: f.read(4096), b""): | |
| sha256_hash.update(byte_block) | |
| file_hash = sha256_hash.hexdigest() | |
| elapsed = (datetime.now() - start_time).total_seconds() | |
| logger.info( | |
| f"✅ Processamento concluído: {batch_result['successful']} sucessos, " | |
| f"{batch_result['failed']} falhas em {elapsed:.2f}s" | |
| ) | |
| return { | |
| 'task_id': task_id, | |
| 'archive_path': str(archive_path), | |
| 'hash': file_hash, | |
| 'processed': batch_result['processed'], | |
| 'successful': batch_result['successful'], | |
| 'failed': batch_result['failed'], | |
| 'elapsed_seconds': elapsed, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Erro ao processar arquivo JSONL: {e}", exc_info=True) | |
| raise | |