##PARA.AI/core/orchestrator.py """ Pipeline Orchestrator - Orquestrador principal V13.6 Substitui processor_manager.py com arquitetura em fases e dependências explícitas """ import logging import asyncio from typing import Dict, Any, List, Optional from datetime import datetime import json from core.context_builder import ContextBuilder from core.validator import SchemaValidator from api.utils.logger import setup_logger logger = setup_logger(__name__) class PipelineOrchestrator: """ Orquestrador de pipeline em fases com dependências explícitas MUDANÇAS DO V13.1: - Fases sequenciais com dependências explícitas (DAG) - Context injection: especialista N recebe output de N-1 - Validação jsonschema após cada fase - Logs detalhados por fase/especialista - Suporte a execução paralela controlada (FASE 4) """ def __init__(self, config: Dict[str, Any], llm_manager, specialists: Dict[int, Any]): """ Args: config: Configuração da pipeline (de pipeline_config.yaml) llm_manager: Instância do LLMManager specialists: Dicionário {id: instância do especialista} """ self.config = config self.llm_manager = llm_manager self.specialists = specialists self.context_builder = ContextBuilder() self.validator = SchemaValidator('schemas/protocolo_v13_6_schema.json') # Organizar fases self.phases = self._organize_phases() logger.info( f"✅ PipelineOrchestrator inicializado: " f"{len(self.phases)} fases, {len(self.specialists)} especialistas" ) def _organize_phases(self) -> List[Dict[str, Any]]: """Organiza fases em ordem de execução respeitando dependências""" phases = self.config['pipeline']['phases'] return sorted(phases, key=lambda p: p['id']) async def process_acordao( self, acordao_bruto: Dict[str, Any], fase_inicial: int = 1, fase_final: Optional[int] = None ) -> Dict[str, Any]: """ Processa acórdão através de todas as fases da pipeline Args: acordao_bruto: Dados brutos do acórdão (ementa, inteiro_teor, etc) fase_inicial: Fase inicial (default: 1) fase_final: Fase final (default: None = todas) Returns: JSON completo conforme Protocolo V13.6 """ start_time = datetime.now() # Inicializar resultado acumulado resultado = { "protocolo_versao": "v13.6", "id_manifestacao": acordao_bruto.get('id', 0), "metadados_processamento": { "protocolo_origem": "v13.6", "data_processamento": start_time.isoformat(), "versao_preprocessador": "v13.6.0", "campos_enriquecidos": [], "tempo_processamento_segundos": None }, "campos_futuros": { "relatorio_transcript_exato": None, "fundamentacao_transcript_exato": None, "dispositivo_transcript_exato": None, "embeddings_metadata": None, "tags_embedding_baldes": None } } logger.info(f"🚀 Iniciando pipeline para acórdão ID {resultado['id_manifestacao']}") # Executar fases fase_final = fase_final or len(self.phases) fases_para_executar = [p for p in self.phases if fase_inicial <= p['id'] <= fase_final] for phase in fases_para_executar: phase_id = phase['id'] phase_name = phase['name'] is_parallel = phase.get('parallel', False) logger.info(f"📍 FASE {phase_id}: {phase_name} (parallel={is_parallel})") try: if is_parallel and len(phase.get('specialists', [])) > 1: resultado = await self._run_phase_parallel( phase=phase, current_result=resultado, input_data=acordao_bruto ) else: resultado = await self._run_phase_sequential( phase=phase, current_result=resultado, input_data=acordao_bruto ) logger.info(f"✅ FASE {phase_id} concluída") except Exception as e: logger.error(f"❌ Erro na FASE {phase_id} ({phase_name}): {e}") resultado['metadados_processamento']['alertas_qualidade'] = \ resultado['metadados_processamento'].get('alertas_qualidade', []) + \ [f"Erro na fase {phase_id}: {str(e)}"] # Validação final (FASE 6) if fase_final >= 6: is_valid, errors = self.validator.validate(resultado) if not is_valid: logger.warning(f"⚠️ Validação final: {len(errors)} erros encontrados") resultado['metadados_processamento']['alertas_validacao'] = errors[:5] # Calcular tempo total end_time = datetime.now() resultado['metadados_processamento']['tempo_processamento_segundos'] = \ (end_time - start_time).total_seconds() logger.info( f"✅ Pipeline completa: {resultado['metadados_processamento']['tempo_processamento_segundos']:.2f}s" ) return resultado async def _run_phase_sequential( self, phase: Dict[str, Any], current_result: Dict[str, Any], input_data: Dict[str, Any] ) -> Dict[str, Any]: """Executa fase sequencialmente""" specialist_ids = phase.get('specialists', []) for spec_id in specialist_ids: specialist = self.specialists.get(spec_id) if not specialist: logger.warning(f"⚠️ Especialista {spec_id} não encontrado") continue # Context injection context = self.context_builder.build_context( current_result=current_result, specialist_id=spec_id ) logger.info(f" 🤖 Executando Especialista {spec_id}: {specialist.__class__.__name__}") try: partial_result = await specialist.process( input_data=input_data, context=context ) current_result = self._merge_results(current_result, partial_result) campos_novos = list(partial_result.keys()) current_result['metadados_processamento']['campos_enriquecidos'].extend(campos_novos) logger.info(f" ✅ Especialista {spec_id} completou: {len(campos_novos)} campos") except Exception as e: logger.error(f" ❌ Erro no Especialista {spec_id}: {e}") raise return current_result async def _run_phase_parallel( self, phase: Dict[str, Any], current_result: Dict[str, Any], input_data: Dict[str, Any] ) -> Dict[str, Any]: """Executa fase em paralelo""" specialist_ids = phase.get('specialists', []) tasks = [] for spec_id in specialist_ids: specialist = self.specialists.get(spec_id) if not specialist: continue context = self.context_builder.build_context( current_result=current_result, specialist_id=spec_id ) logger.info(f" 🤖 Agendando Especialista {spec_id} (paralelo)") task = specialist.process(input_data=input_data, context=context) tasks.append((spec_id, task)) results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True) for (spec_id, _), result in zip(tasks, results): if isinstance(result, Exception): logger.error(f" ❌ Erro no Especialista {spec_id}: {result}") continue current_result = self._merge_results(current_result, result) campos_novos = list(result.keys()) current_result['metadados_processamento']['campos_enriquecidos'].extend(campos_novos) logger.info(f" ✅ Especialista {spec_id} completou: {len(campos_novos)} campos") return current_result def _merge_results(self, current: Dict[str, Any], partial: Dict[str, Any]) -> Dict[str, Any]: """Mescla resultado parcial no resultado acumulado""" for key, value in partial.items(): if key in ['metadados_processamento', 'campos_futuros']: continue if isinstance(value, dict) and key in current and isinstance(current[key], dict): current[key].update(value) else: current[key] = value return current def get_phase_info(self, phase_id: int) -> Optional[Dict[str, Any]]: """Retorna informações sobre uma fase específica""" for phase in self.phases: if phase['id'] == phase_id: return phase return None def get_pipeline_status(self) -> Dict[str, Any]: """Retorna status atual da pipeline""" return { 'total_phases': len(self.phases), 'total_specialists': len(self.specialists), 'phases': [ { 'id': p['id'], 'name': p['name'], 'parallel': p.get('parallel', False), 'specialists_count': len(p.get('specialists', [])) } for p in self.phases ] }