PARA.AI / core /orchestrator.py
Carlex22's picture
ParaAIV3.6
2b9d72c
##PARA.AI/core/orchestrator.py
"""
Pipeline Orchestrator - Orquestrador principal V13.6
Substitui processor_manager.py com arquitetura em fases e dependências explícitas
"""
import logging
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
from core.context_builder import ContextBuilder
from core.validator import SchemaValidator
from api.utils.logger import setup_logger
logger = setup_logger(__name__)
class PipelineOrchestrator:
"""
Orquestrador de pipeline em fases com dependências explícitas
MUDANÇAS DO V13.1:
- Fases sequenciais com dependências explícitas (DAG)
- Context injection: especialista N recebe output de N-1
- Validação jsonschema após cada fase
- Logs detalhados por fase/especialista
- Suporte a execução paralela controlada (FASE 4)
"""
def __init__(self, config: Dict[str, Any], llm_manager, specialists: Dict[int, Any]):
"""
Args:
config: Configuração da pipeline (de pipeline_config.yaml)
llm_manager: Instância do LLMManager
specialists: Dicionário {id: instância do especialista}
"""
self.config = config
self.llm_manager = llm_manager
self.specialists = specialists
self.context_builder = ContextBuilder()
self.validator = SchemaValidator('schemas/protocolo_v13_6_schema.json')
# Organizar fases
self.phases = self._organize_phases()
logger.info(
f"✅ PipelineOrchestrator inicializado: "
f"{len(self.phases)} fases, {len(self.specialists)} especialistas"
)
def _organize_phases(self) -> List[Dict[str, Any]]:
"""Organiza fases em ordem de execução respeitando dependências"""
phases = self.config['pipeline']['phases']
return sorted(phases, key=lambda p: p['id'])
async def process_acordao(
self,
acordao_bruto: Dict[str, Any],
fase_inicial: int = 1,
fase_final: Optional[int] = None
) -> Dict[str, Any]:
"""
Processa acórdão através de todas as fases da pipeline
Args:
acordao_bruto: Dados brutos do acórdão (ementa, inteiro_teor, etc)
fase_inicial: Fase inicial (default: 1)
fase_final: Fase final (default: None = todas)
Returns:
JSON completo conforme Protocolo V13.6
"""
start_time = datetime.now()
# Inicializar resultado acumulado
resultado = {
"protocolo_versao": "v13.6",
"id_manifestacao": acordao_bruto.get('id', 0),
"metadados_processamento": {
"protocolo_origem": "v13.6",
"data_processamento": start_time.isoformat(),
"versao_preprocessador": "v13.6.0",
"campos_enriquecidos": [],
"tempo_processamento_segundos": None
},
"campos_futuros": {
"relatorio_transcript_exato": None,
"fundamentacao_transcript_exato": None,
"dispositivo_transcript_exato": None,
"embeddings_metadata": None,
"tags_embedding_baldes": None
}
}
logger.info(f"🚀 Iniciando pipeline para acórdão ID {resultado['id_manifestacao']}")
# Executar fases
fase_final = fase_final or len(self.phases)
fases_para_executar = [p for p in self.phases if fase_inicial <= p['id'] <= fase_final]
for phase in fases_para_executar:
phase_id = phase['id']
phase_name = phase['name']
is_parallel = phase.get('parallel', False)
logger.info(f"📍 FASE {phase_id}: {phase_name} (parallel={is_parallel})")
try:
if is_parallel and len(phase.get('specialists', [])) > 1:
resultado = await self._run_phase_parallel(
phase=phase,
current_result=resultado,
input_data=acordao_bruto
)
else:
resultado = await self._run_phase_sequential(
phase=phase,
current_result=resultado,
input_data=acordao_bruto
)
logger.info(f"✅ FASE {phase_id} concluída")
except Exception as e:
logger.error(f"❌ Erro na FASE {phase_id} ({phase_name}): {e}")
resultado['metadados_processamento']['alertas_qualidade'] = \
resultado['metadados_processamento'].get('alertas_qualidade', []) + \
[f"Erro na fase {phase_id}: {str(e)}"]
# Validação final (FASE 6)
if fase_final >= 6:
is_valid, errors = self.validator.validate(resultado)
if not is_valid:
logger.warning(f"⚠️ Validação final: {len(errors)} erros encontrados")
resultado['metadados_processamento']['alertas_validacao'] = errors[:5]
# Calcular tempo total
end_time = datetime.now()
resultado['metadados_processamento']['tempo_processamento_segundos'] = \
(end_time - start_time).total_seconds()
logger.info(
f"✅ Pipeline completa: {resultado['metadados_processamento']['tempo_processamento_segundos']:.2f}s"
)
return resultado
async def _run_phase_sequential(
self,
phase: Dict[str, Any],
current_result: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Executa fase sequencialmente"""
specialist_ids = phase.get('specialists', [])
for spec_id in specialist_ids:
specialist = self.specialists.get(spec_id)
if not specialist:
logger.warning(f"⚠️ Especialista {spec_id} não encontrado")
continue
# Context injection
context = self.context_builder.build_context(
current_result=current_result,
specialist_id=spec_id
)
logger.info(f" 🤖 Executando Especialista {spec_id}: {specialist.__class__.__name__}")
try:
partial_result = await specialist.process(
input_data=input_data,
context=context
)
current_result = self._merge_results(current_result, partial_result)
campos_novos = list(partial_result.keys())
current_result['metadados_processamento']['campos_enriquecidos'].extend(campos_novos)
logger.info(f" ✅ Especialista {spec_id} completou: {len(campos_novos)} campos")
except Exception as e:
logger.error(f" ❌ Erro no Especialista {spec_id}: {e}")
raise
return current_result
async def _run_phase_parallel(
self,
phase: Dict[str, Any],
current_result: Dict[str, Any],
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Executa fase em paralelo"""
specialist_ids = phase.get('specialists', [])
tasks = []
for spec_id in specialist_ids:
specialist = self.specialists.get(spec_id)
if not specialist:
continue
context = self.context_builder.build_context(
current_result=current_result,
specialist_id=spec_id
)
logger.info(f" 🤖 Agendando Especialista {spec_id} (paralelo)")
task = specialist.process(input_data=input_data, context=context)
tasks.append((spec_id, task))
results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True)
for (spec_id, _), result in zip(tasks, results):
if isinstance(result, Exception):
logger.error(f" ❌ Erro no Especialista {spec_id}: {result}")
continue
current_result = self._merge_results(current_result, result)
campos_novos = list(result.keys())
current_result['metadados_processamento']['campos_enriquecidos'].extend(campos_novos)
logger.info(f" ✅ Especialista {spec_id} completou: {len(campos_novos)} campos")
return current_result
def _merge_results(self, current: Dict[str, Any], partial: Dict[str, Any]) -> Dict[str, Any]:
"""Mescla resultado parcial no resultado acumulado"""
for key, value in partial.items():
if key in ['metadados_processamento', 'campos_futuros']:
continue
if isinstance(value, dict) and key in current and isinstance(current[key], dict):
current[key].update(value)
else:
current[key] = value
return current
def get_phase_info(self, phase_id: int) -> Optional[Dict[str, Any]]:
"""Retorna informações sobre uma fase específica"""
for phase in self.phases:
if phase['id'] == phase_id:
return phase
return None
def get_pipeline_status(self) -> Dict[str, Any]:
"""Retorna status atual da pipeline"""
return {
'total_phases': len(self.phases),
'total_specialists': len(self.specialists),
'phases': [
{
'id': p['id'],
'name': p['name'],
'parallel': p.get('parallel', False),
'specialists_count': len(p.get('specialists', []))
}
for p in self.phases
]
}