""" ProcessorManager - Orquestrador de 9 Especialistas Coordena execução sequencial/paralela de todos os processadores """ import asyncio import logging from typing import Dict, List, Any, Optional from datetime import datetime from concurrent.futures import ThreadPoolExecutor import json from processors.processor_metadados import ProcessorMetadados from processors.processor_relatorio import ProcessorRelatorio from processors.processor_fundamentacao import ProcessorFundamentacao from processors.processor_decisao import ProcessorDecisao from processors.processor_auditoria import ProcessorAuditoria from processors.processor_arquivo_relacional import ProcessorArquivoRelacional from processors.processor_segmentacao_sintatica import ProcessorSegmentacaoSintatica from processors.processor_analise_contextual import ProcessorAnaliseContextual from processors.processor_transcricao_3partite import ProcessorTranscricao3Partite logger = logging.getLogger(__name__) class ProcessorManager: """ Orquestrador central que gerencia os 9 especialistas Funcionalidades: - Execução sequencial ou paralela - Orquestração de dependências entre especialistas - Validação de resultados - Consolidação de outputs - Auditoria de processamento """ def __init__(self, llm_model=None, max_workers: int = 4): """ Inicializa manager Args: llm_model: Modelo LLM a usar max_workers: Número máximo de workers paralelos """ self.llm_model = llm_model self.max_workers = max_workers self.start_time = None self.execution_log = [] # Inicializar 9 especialistas self.specialists = { 1: ProcessorMetadados(llm_model=llm_model), 2: ProcessorRelatorio(llm_model=llm_model), 3: ProcessorFundamentacao(llm_model=llm_model), 4: ProcessorDecisao(llm_model=llm_model), 5: ProcessorAuditoria(llm_model=llm_model), 6: ProcessorArquivoRelacional(llm_model=llm_model), 7: ProcessorSegmentacaoSintatica(llm_model=llm_model), 8: ProcessorAnaliseContextual(llm_model=llm_model), 9: ProcessorTranscricao3Partite(llm_model=llm_model), } logger.info("✅ ProcessorManager inicializado com 9 especialistas") async def process_acordao_sequential( self, acordao_data: Dict[str, Any], specialist_ids: Optional[List[int]] = None ) -> Dict[str, Any]: """ Processa acórdão sequencialmente (um especialista após o outro) Args: acordao_data: Dados estruturados do acórdão specialist_ids: IDs dos especialistas a executar (default: todos) Returns: Dicionário com resultados consolidados """ self.start_time = datetime.now() if specialist_ids is None: specialist_ids = list(range(1, 10)) logger.info(f"🔄 Iniciando processamento sequencial com especialistas: {specialist_ids}") results = {} errors = {} for spec_id in specialist_ids: if spec_id not in self.specialists: error_msg = f"Especialista {spec_id} não existe" logger.error(error_msg) errors[spec_id] = error_msg continue try: specialist = self.specialists[spec_id] logger.info(f"⏳ Executando {specialist.specialist_name}...") # Executar em executor para não bloquear loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, specialist.process, acordao_data ) # Validar resultado if specialist.validate(result): results[spec_id] = specialist.post_process(result) logger.info(f"✅ {specialist.specialist_name} completado") else: error_msg = f"Validação falhou para {specialist.specialist_name}" errors[spec_id] = error_msg logger.error(error_msg) except Exception as e: error_msg = f"Erro ao executar {self.specialists[spec_id].specialist_name}: {str(e)}" errors[spec_id] = error_msg logger.error(error_msg, exc_info=True) elapsed_time = (datetime.now() - self.start_time).total_seconds() return { "status": "completed" if not errors else "completed_with_errors", "total_specialists": len(specialist_ids), "successful": len(results), "failed": len(errors), "execution_time": elapsed_time, "results": results, "errors": errors, "timestamp": datetime.now().isoformat() } async def process_acordao_parallel( self, acordao_data: Dict[str, Any], specialist_ids: Optional[List[int]] = None ) -> Dict[str, Any]: """ Processa acórdão em paralelo (múltiplos especialistas simultaneamente) Args: acordao_data: Dados estruturados do acórdão specialist_ids: IDs dos especialistas a executar (default: todos) Returns: Dicionário com resultados consolidados """ self.start_time = datetime.now() if specialist_ids is None: specialist_ids = list(range(1, 10)) logger.info(f"⚡ Iniciando processamento paralelo com especialistas: {specialist_ids}") tasks = [] valid_specialists = [] for spec_id in specialist_ids: if spec_id not in self.specialists: logger.error(f"Especialista {spec_id} não existe") continue specialist = self.specialists[spec_id] valid_specialists.append(spec_id) # Criar task para cada especialista loop = asyncio.get_event_loop() task = loop.run_in_executor( None, self._execute_specialist, spec_id, acordao_data ) tasks.append(task) # Executar todas as tasks em paralelo results_list = await asyncio.gather(*tasks, return_exceptions=True) results = {} errors = {} for spec_id, result in zip(valid_specialists, results_list): if isinstance(result, Exception): errors[spec_id] = str(result) elif result is not None: results[spec_id] = result else: errors[spec_id] = "Resultado nulo" elapsed_time = (datetime.now() - self.start_time).total_seconds() return { "status": "completed" if not errors else "completed_with_errors", "total_specialists": len(valid_specialists), "successful": len(results), "failed": len(errors), "execution_time": elapsed_time, "results": results, "errors": errors, "timestamp": datetime.now().isoformat() } def _execute_specialist( self, spec_id: int, acordao_data: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """Executa especialista individual (para paralelização)""" try: specialist = self.specialists[spec_id] logger.info(f"⏳ [{spec_id}] Executando {specialist.specialist_name}...") result = specialist.process(acordao_data) if specialist.validate(result): processed = specialist.post_process(result) logger.info(f"✅ [{spec_id}] {specialist.specialist_name} completado") return processed else: logger.error(f"❌ [{spec_id}] Validação falhou para {specialist.specialist_name}") return None except Exception as e: logger.error(f"❌ [{spec_id}] Erro: {str(e)}", exc_info=True) return None def process_batch( self, acordaos: List[Dict[str, Any]], parallel: bool = False ) -> Dict[str, Any]: """ Processa múltiplos acórdãos em batch Args: acordaos: Lista de acórdãos a processar parallel: Se True, processa em paralelo Returns: Resultados consolidados """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: results = [] for idx, acordao in enumerate(acordaos): logger.info(f"📋 Processando acórdão {idx+1}/{len(acordaos)}") if parallel: result = loop.run_until_complete( self.process_acordao_parallel(acordao) ) else: result = loop.run_until_complete( self.process_acordao_sequential(acordao) ) results.append(result) return { "batch_size": len(acordaos), "processed": len(results), "results": results, "timestamp": datetime.now().isoformat() } finally: loop.close() def get_specialists_info(self) -> Dict[int, Dict[str, Any]]: """Retorna informações sobre todos os especialistas""" return { spec_id: specialist.info for spec_id, specialist in self.specialists.items() } def get_specialist(self, spec_id: int): """Retorna um especialista específico""" return self.specialists.get(spec_id) def export_results(self, results: Dict[str, Any], format: str = "json") -> str: """ Exporta resultados em diferentes formatos Args: results: Resultados a exportar format: Formato desejado (json, jsonl) Returns: String com dados formatados """ if format == "json": return json.dumps(results, ensure_ascii=False, indent=2) elif format == "jsonl": return json.dumps(results, ensure_ascii=False) + "\n" else: raise ValueError(f"Formato {format} não suportado")