Spaces:
Runtime error
Runtime error
| """ | |
| ProcessorManager - Orquestrador de 9 Especialistas | |
| Coordena execução sequencial/paralela de todos os processadores | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| from concurrent.futures import ThreadPoolExecutor | |
| import json | |
| from processors.processor_metadados import ProcessorMetadados | |
| from processors.processor_relatorio import ProcessorRelatorio | |
| from processors.processor_fundamentacao import ProcessorFundamentacao | |
| from processors.processor_decisao import ProcessorDecisao | |
| from processors.processor_auditoria import ProcessorAuditoria | |
| from processors.processor_arquivo_relacional import ProcessorArquivoRelacional | |
| from processors.processor_segmentacao_sintatica import ProcessorSegmentacaoSintatica | |
| from processors.processor_analise_contextual import ProcessorAnaliseContextual | |
| from processors.processor_transcricao_3partite import ProcessorTranscricao3Partite | |
| logger = logging.getLogger(__name__) | |
| class ProcessorManager: | |
| """ | |
| Orquestrador central que gerencia os 9 especialistas | |
| Funcionalidades: | |
| - Execução sequencial ou paralela | |
| - Orquestração de dependências entre especialistas | |
| - Validação de resultados | |
| - Consolidação de outputs | |
| - Auditoria de processamento | |
| """ | |
| def __init__(self, llm_model=None, max_workers: int = 4): | |
| """ | |
| Inicializa manager | |
| Args: | |
| llm_model: Modelo LLM a usar | |
| max_workers: Número máximo de workers paralelos | |
| """ | |
| self.llm_model = llm_model | |
| self.max_workers = max_workers | |
| self.start_time = None | |
| self.execution_log = [] | |
| # Inicializar 9 especialistas | |
| self.specialists = { | |
| 1: ProcessorMetadados(llm_model=llm_model), | |
| 2: ProcessorRelatorio(llm_model=llm_model), | |
| 3: ProcessorFundamentacao(llm_model=llm_model), | |
| 4: ProcessorDecisao(llm_model=llm_model), | |
| 5: ProcessorAuditoria(llm_model=llm_model), | |
| 6: ProcessorArquivoRelacional(llm_model=llm_model), | |
| 7: ProcessorSegmentacaoSintatica(llm_model=llm_model), | |
| 8: ProcessorAnaliseContextual(llm_model=llm_model), | |
| 9: ProcessorTranscricao3Partite(llm_model=llm_model), | |
| } | |
| logger.info("✅ ProcessorManager inicializado com 9 especialistas") | |
| async def process_acordao_sequential( | |
| self, | |
| acordao_data: Dict[str, Any], | |
| specialist_ids: Optional[List[int]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa acórdão sequencialmente (um especialista após o outro) | |
| Args: | |
| acordao_data: Dados estruturados do acórdão | |
| specialist_ids: IDs dos especialistas a executar (default: todos) | |
| Returns: | |
| Dicionário com resultados consolidados | |
| """ | |
| self.start_time = datetime.now() | |
| if specialist_ids is None: | |
| specialist_ids = list(range(1, 10)) | |
| logger.info(f"🔄 Iniciando processamento sequencial com especialistas: {specialist_ids}") | |
| results = {} | |
| errors = {} | |
| for spec_id in specialist_ids: | |
| if spec_id not in self.specialists: | |
| error_msg = f"Especialista {spec_id} não existe" | |
| logger.error(error_msg) | |
| errors[spec_id] = error_msg | |
| continue | |
| try: | |
| specialist = self.specialists[spec_id] | |
| logger.info(f"⏳ Executando {specialist.specialist_name}...") | |
| # Executar em executor para não bloquear | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, | |
| specialist.process, | |
| acordao_data | |
| ) | |
| # Validar resultado | |
| if specialist.validate(result): | |
| results[spec_id] = specialist.post_process(result) | |
| logger.info(f"✅ {specialist.specialist_name} completado") | |
| else: | |
| error_msg = f"Validação falhou para {specialist.specialist_name}" | |
| errors[spec_id] = error_msg | |
| logger.error(error_msg) | |
| except Exception as e: | |
| error_msg = f"Erro ao executar {self.specialists[spec_id].specialist_name}: {str(e)}" | |
| errors[spec_id] = error_msg | |
| logger.error(error_msg, exc_info=True) | |
| elapsed_time = (datetime.now() - self.start_time).total_seconds() | |
| return { | |
| "status": "completed" if not errors else "completed_with_errors", | |
| "total_specialists": len(specialist_ids), | |
| "successful": len(results), | |
| "failed": len(errors), | |
| "execution_time": elapsed_time, | |
| "results": results, | |
| "errors": errors, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def process_acordao_parallel( | |
| self, | |
| acordao_data: Dict[str, Any], | |
| specialist_ids: Optional[List[int]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa acórdão em paralelo (múltiplos especialistas simultaneamente) | |
| Args: | |
| acordao_data: Dados estruturados do acórdão | |
| specialist_ids: IDs dos especialistas a executar (default: todos) | |
| Returns: | |
| Dicionário com resultados consolidados | |
| """ | |
| self.start_time = datetime.now() | |
| if specialist_ids is None: | |
| specialist_ids = list(range(1, 10)) | |
| logger.info(f"⚡ Iniciando processamento paralelo com especialistas: {specialist_ids}") | |
| tasks = [] | |
| valid_specialists = [] | |
| for spec_id in specialist_ids: | |
| if spec_id not in self.specialists: | |
| logger.error(f"Especialista {spec_id} não existe") | |
| continue | |
| specialist = self.specialists[spec_id] | |
| valid_specialists.append(spec_id) | |
| # Criar task para cada especialista | |
| loop = asyncio.get_event_loop() | |
| task = loop.run_in_executor( | |
| None, | |
| self._execute_specialist, | |
| spec_id, | |
| acordao_data | |
| ) | |
| tasks.append(task) | |
| # Executar todas as tasks em paralelo | |
| results_list = await asyncio.gather(*tasks, return_exceptions=True) | |
| results = {} | |
| errors = {} | |
| for spec_id, result in zip(valid_specialists, results_list): | |
| if isinstance(result, Exception): | |
| errors[spec_id] = str(result) | |
| elif result is not None: | |
| results[spec_id] = result | |
| else: | |
| errors[spec_id] = "Resultado nulo" | |
| elapsed_time = (datetime.now() - self.start_time).total_seconds() | |
| return { | |
| "status": "completed" if not errors else "completed_with_errors", | |
| "total_specialists": len(valid_specialists), | |
| "successful": len(results), | |
| "failed": len(errors), | |
| "execution_time": elapsed_time, | |
| "results": results, | |
| "errors": errors, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _execute_specialist( | |
| self, | |
| spec_id: int, | |
| acordao_data: Dict[str, Any] | |
| ) -> Optional[Dict[str, Any]]: | |
| """Executa especialista individual (para paralelização)""" | |
| try: | |
| specialist = self.specialists[spec_id] | |
| logger.info(f"⏳ [{spec_id}] Executando {specialist.specialist_name}...") | |
| result = specialist.process(acordao_data) | |
| if specialist.validate(result): | |
| processed = specialist.post_process(result) | |
| logger.info(f"✅ [{spec_id}] {specialist.specialist_name} completado") | |
| return processed | |
| else: | |
| logger.error(f"❌ [{spec_id}] Validação falhou para {specialist.specialist_name}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ [{spec_id}] Erro: {str(e)}", exc_info=True) | |
| return None | |
| def process_batch( | |
| self, | |
| acordaos: List[Dict[str, Any]], | |
| parallel: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa múltiplos acórdãos em batch | |
| Args: | |
| acordaos: Lista de acórdãos a processar | |
| parallel: Se True, processa em paralelo | |
| Returns: | |
| Resultados consolidados | |
| """ | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| results = [] | |
| for idx, acordao in enumerate(acordaos): | |
| logger.info(f"📋 Processando acórdão {idx+1}/{len(acordaos)}") | |
| if parallel: | |
| result = loop.run_until_complete( | |
| self.process_acordao_parallel(acordao) | |
| ) | |
| else: | |
| result = loop.run_until_complete( | |
| self.process_acordao_sequential(acordao) | |
| ) | |
| results.append(result) | |
| return { | |
| "batch_size": len(acordaos), | |
| "processed": len(results), | |
| "results": results, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| finally: | |
| loop.close() | |
| def get_specialists_info(self) -> Dict[int, Dict[str, Any]]: | |
| """Retorna informações sobre todos os especialistas""" | |
| return { | |
| spec_id: specialist.info | |
| for spec_id, specialist in self.specialists.items() | |
| } | |
| def get_specialist(self, spec_id: int): | |
| """Retorna um especialista específico""" | |
| return self.specialists.get(spec_id) | |
| def export_results(self, results: Dict[str, Any], format: str = "json") -> str: | |
| """ | |
| Exporta resultados em diferentes formatos | |
| Args: | |
| results: Resultados a exportar | |
| format: Formato desejado (json, jsonl) | |
| Returns: | |
| String com dados formatados | |
| """ | |
| if format == "json": | |
| return json.dumps(results, ensure_ascii=False, indent=2) | |
| elif format == "jsonl": | |
| return json.dumps(results, ensure_ascii=False) + "\n" | |
| else: | |
| raise ValueError(f"Formato {format} não suportado") | |