PARA.AI / processors /processor_menager.py
Carlex22's picture
ParaAIV3.0
f0322a6
raw
history blame
10.9 kB
"""
ProcessorManager - Orquestrador de 9 Especialistas
Coordena execução sequencial/paralela de todos os processadores
"""
import asyncio
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import json
from processors.processor_metadados import ProcessorMetadados
from processors.processor_relatorio import ProcessorRelatorio
from processors.processor_fundamentacao import ProcessorFundamentacao
from processors.processor_decisao import ProcessorDecisao
from processors.processor_auditoria import ProcessorAuditoria
from processors.processor_arquivo_relacional import ProcessorArquivoRelacional
from processors.processor_segmentacao_sintatica import ProcessorSegmentacaoSintatica
from processors.processor_analise_contextual import ProcessorAnaliseContextual
from processors.processor_transcricao_3partite import ProcessorTranscricao3Partite
logger = logging.getLogger(__name__)
class ProcessorManager:
"""
Orquestrador central que gerencia os 9 especialistas
Funcionalidades:
- Execução sequencial ou paralela
- Orquestração de dependências entre especialistas
- Validação de resultados
- Consolidação de outputs
- Auditoria de processamento
"""
def __init__(self, llm_model=None, max_workers: int = 4):
"""
Inicializa manager
Args:
llm_model: Modelo LLM a usar
max_workers: Número máximo de workers paralelos
"""
self.llm_model = llm_model
self.max_workers = max_workers
self.start_time = None
self.execution_log = []
# Inicializar 9 especialistas
self.specialists = {
1: ProcessorMetadados(llm_model=llm_model),
2: ProcessorRelatorio(llm_model=llm_model),
3: ProcessorFundamentacao(llm_model=llm_model),
4: ProcessorDecisao(llm_model=llm_model),
5: ProcessorAuditoria(llm_model=llm_model),
6: ProcessorArquivoRelacional(llm_model=llm_model),
7: ProcessorSegmentacaoSintatica(llm_model=llm_model),
8: ProcessorAnaliseContextual(llm_model=llm_model),
9: ProcessorTranscricao3Partite(llm_model=llm_model),
}
logger.info("✅ ProcessorManager inicializado com 9 especialistas")
async def process_acordao_sequential(
self,
acordao_data: Dict[str, Any],
specialist_ids: Optional[List[int]] = None
) -> Dict[str, Any]:
"""
Processa acórdão sequencialmente (um especialista após o outro)
Args:
acordao_data: Dados estruturados do acórdão
specialist_ids: IDs dos especialistas a executar (default: todos)
Returns:
Dicionário com resultados consolidados
"""
self.start_time = datetime.now()
if specialist_ids is None:
specialist_ids = list(range(1, 10))
logger.info(f"🔄 Iniciando processamento sequencial com especialistas: {specialist_ids}")
results = {}
errors = {}
for spec_id in specialist_ids:
if spec_id not in self.specialists:
error_msg = f"Especialista {spec_id} não existe"
logger.error(error_msg)
errors[spec_id] = error_msg
continue
try:
specialist = self.specialists[spec_id]
logger.info(f"⏳ Executando {specialist.specialist_name}...")
# Executar em executor para não bloquear
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
specialist.process,
acordao_data
)
# Validar resultado
if specialist.validate(result):
results[spec_id] = specialist.post_process(result)
logger.info(f"✅ {specialist.specialist_name} completado")
else:
error_msg = f"Validação falhou para {specialist.specialist_name}"
errors[spec_id] = error_msg
logger.error(error_msg)
except Exception as e:
error_msg = f"Erro ao executar {self.specialists[spec_id].specialist_name}: {str(e)}"
errors[spec_id] = error_msg
logger.error(error_msg, exc_info=True)
elapsed_time = (datetime.now() - self.start_time).total_seconds()
return {
"status": "completed" if not errors else "completed_with_errors",
"total_specialists": len(specialist_ids),
"successful": len(results),
"failed": len(errors),
"execution_time": elapsed_time,
"results": results,
"errors": errors,
"timestamp": datetime.now().isoformat()
}
async def process_acordao_parallel(
self,
acordao_data: Dict[str, Any],
specialist_ids: Optional[List[int]] = None
) -> Dict[str, Any]:
"""
Processa acórdão em paralelo (múltiplos especialistas simultaneamente)
Args:
acordao_data: Dados estruturados do acórdão
specialist_ids: IDs dos especialistas a executar (default: todos)
Returns:
Dicionário com resultados consolidados
"""
self.start_time = datetime.now()
if specialist_ids is None:
specialist_ids = list(range(1, 10))
logger.info(f"⚡ Iniciando processamento paralelo com especialistas: {specialist_ids}")
tasks = []
valid_specialists = []
for spec_id in specialist_ids:
if spec_id not in self.specialists:
logger.error(f"Especialista {spec_id} não existe")
continue
specialist = self.specialists[spec_id]
valid_specialists.append(spec_id)
# Criar task para cada especialista
loop = asyncio.get_event_loop()
task = loop.run_in_executor(
None,
self._execute_specialist,
spec_id,
acordao_data
)
tasks.append(task)
# Executar todas as tasks em paralelo
results_list = await asyncio.gather(*tasks, return_exceptions=True)
results = {}
errors = {}
for spec_id, result in zip(valid_specialists, results_list):
if isinstance(result, Exception):
errors[spec_id] = str(result)
elif result is not None:
results[spec_id] = result
else:
errors[spec_id] = "Resultado nulo"
elapsed_time = (datetime.now() - self.start_time).total_seconds()
return {
"status": "completed" if not errors else "completed_with_errors",
"total_specialists": len(valid_specialists),
"successful": len(results),
"failed": len(errors),
"execution_time": elapsed_time,
"results": results,
"errors": errors,
"timestamp": datetime.now().isoformat()
}
def _execute_specialist(
self,
spec_id: int,
acordao_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Executa especialista individual (para paralelização)"""
try:
specialist = self.specialists[spec_id]
logger.info(f"⏳ [{spec_id}] Executando {specialist.specialist_name}...")
result = specialist.process(acordao_data)
if specialist.validate(result):
processed = specialist.post_process(result)
logger.info(f"✅ [{spec_id}] {specialist.specialist_name} completado")
return processed
else:
logger.error(f"❌ [{spec_id}] Validação falhou para {specialist.specialist_name}")
return None
except Exception as e:
logger.error(f"❌ [{spec_id}] Erro: {str(e)}", exc_info=True)
return None
def process_batch(
self,
acordaos: List[Dict[str, Any]],
parallel: bool = False
) -> Dict[str, Any]:
"""
Processa múltiplos acórdãos em batch
Args:
acordaos: Lista de acórdãos a processar
parallel: Se True, processa em paralelo
Returns:
Resultados consolidados
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
results = []
for idx, acordao in enumerate(acordaos):
logger.info(f"📋 Processando acórdão {idx+1}/{len(acordaos)}")
if parallel:
result = loop.run_until_complete(
self.process_acordao_parallel(acordao)
)
else:
result = loop.run_until_complete(
self.process_acordao_sequential(acordao)
)
results.append(result)
return {
"batch_size": len(acordaos),
"processed": len(results),
"results": results,
"timestamp": datetime.now().isoformat()
}
finally:
loop.close()
def get_specialists_info(self) -> Dict[int, Dict[str, Any]]:
"""Retorna informações sobre todos os especialistas"""
return {
spec_id: specialist.info
for spec_id, specialist in self.specialists.items()
}
def get_specialist(self, spec_id: int):
"""Retorna um especialista específico"""
return self.specialists.get(spec_id)
def export_results(self, results: Dict[str, Any], format: str = "json") -> str:
"""
Exporta resultados em diferentes formatos
Args:
results: Resultados a exportar
format: Formato desejado (json, jsonl)
Returns:
String com dados formatados
"""
if format == "json":
return json.dumps(results, ensure_ascii=False, indent=2)
elif format == "jsonl":
return json.dumps(results, ensure_ascii=False) + "\n"
else:
raise ValueError(f"Formato {format} não suportado")