Spaces:
Runtime error
Runtime error
| ##PARA.AI/core/orchestrator.py | |
| """ | |
| Pipeline Orchestrator - Orquestrador principal V13.6 | |
| Substitui processor_manager.py com arquitetura em fases e dependências explícitas | |
| """ | |
| import logging | |
| import asyncio | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| import json | |
| from core.context_builder import ContextBuilder | |
| from core.validator import SchemaValidator | |
| from api.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class PipelineOrchestrator: | |
| """ | |
| Orquestrador de pipeline em fases com dependências explícitas | |
| MUDANÇAS DO V13.1: | |
| - Fases sequenciais com dependências explícitas (DAG) | |
| - Context injection: especialista N recebe output de N-1 | |
| - Validação jsonschema após cada fase | |
| - Logs detalhados por fase/especialista | |
| - Suporte a execução paralela controlada (FASE 4) | |
| """ | |
| def __init__(self, config: Dict[str, Any], llm_manager, specialists: Dict[int, Any]): | |
| """ | |
| Args: | |
| config: Configuração da pipeline (de pipeline_config.yaml) | |
| llm_manager: Instância do LLMManager | |
| specialists: Dicionário {id: instância do especialista} | |
| """ | |
| self.config = config | |
| self.llm_manager = llm_manager | |
| self.specialists = specialists | |
| self.context_builder = ContextBuilder() | |
| self.validator = SchemaValidator('schemas/protocolo_v13_6_schema.json') | |
| # Organizar fases | |
| self.phases = self._organize_phases() | |
| logger.info( | |
| f"✅ PipelineOrchestrator inicializado: " | |
| f"{len(self.phases)} fases, {len(self.specialists)} especialistas" | |
| ) | |
| def _organize_phases(self) -> List[Dict[str, Any]]: | |
| """Organiza fases em ordem de execução respeitando dependências""" | |
| phases = self.config['pipeline']['phases'] | |
| return sorted(phases, key=lambda p: p['id']) | |
| async def process_acordao( | |
| self, | |
| acordao_bruto: Dict[str, Any], | |
| fase_inicial: int = 1, | |
| fase_final: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Processa acórdão através de todas as fases da pipeline | |
| Args: | |
| acordao_bruto: Dados brutos do acórdão (ementa, inteiro_teor, etc) | |
| fase_inicial: Fase inicial (default: 1) | |
| fase_final: Fase final (default: None = todas) | |
| Returns: | |
| JSON completo conforme Protocolo V13.6 | |
| """ | |
| start_time = datetime.now() | |
| # Inicializar resultado acumulado | |
| resultado = { | |
| "protocolo_versao": "v13.6", | |
| "id_manifestacao": acordao_bruto.get('id', 0), | |
| "metadados_processamento": { | |
| "protocolo_origem": "v13.6", | |
| "data_processamento": start_time.isoformat(), | |
| "versao_preprocessador": "v13.6.0", | |
| "campos_enriquecidos": [], | |
| "tempo_processamento_segundos": None | |
| }, | |
| "campos_futuros": { | |
| "relatorio_transcript_exato": None, | |
| "fundamentacao_transcript_exato": None, | |
| "dispositivo_transcript_exato": None, | |
| "embeddings_metadata": None, | |
| "tags_embedding_baldes": None | |
| } | |
| } | |
| logger.info(f"🚀 Iniciando pipeline para acórdão ID {resultado['id_manifestacao']}") | |
| # Executar fases | |
| fase_final = fase_final or len(self.phases) | |
| fases_para_executar = [p for p in self.phases if fase_inicial <= p['id'] <= fase_final] | |
| for phase in fases_para_executar: | |
| phase_id = phase['id'] | |
| phase_name = phase['name'] | |
| is_parallel = phase.get('parallel', False) | |
| logger.info(f"📍 FASE {phase_id}: {phase_name} (parallel={is_parallel})") | |
| try: | |
| if is_parallel and len(phase.get('specialists', [])) > 1: | |
| resultado = await self._run_phase_parallel( | |
| phase=phase, | |
| current_result=resultado, | |
| input_data=acordao_bruto | |
| ) | |
| else: | |
| resultado = await self._run_phase_sequential( | |
| phase=phase, | |
| current_result=resultado, | |
| input_data=acordao_bruto | |
| ) | |
| logger.info(f"✅ FASE {phase_id} concluída") | |
| except Exception as e: | |
| logger.error(f"❌ Erro na FASE {phase_id} ({phase_name}): {e}") | |
| resultado['metadados_processamento']['alertas_qualidade'] = \ | |
| resultado['metadados_processamento'].get('alertas_qualidade', []) + \ | |
| [f"Erro na fase {phase_id}: {str(e)}"] | |
| # Validação final (FASE 6) | |
| if fase_final >= 6: | |
| is_valid, errors = self.validator.validate(resultado) | |
| if not is_valid: | |
| logger.warning(f"⚠️ Validação final: {len(errors)} erros encontrados") | |
| resultado['metadados_processamento']['alertas_validacao'] = errors[:5] | |
| # Calcular tempo total | |
| end_time = datetime.now() | |
| resultado['metadados_processamento']['tempo_processamento_segundos'] = \ | |
| (end_time - start_time).total_seconds() | |
| logger.info( | |
| f"✅ Pipeline completa: {resultado['metadados_processamento']['tempo_processamento_segundos']:.2f}s" | |
| ) | |
| return resultado | |
| async def _run_phase_sequential( | |
| self, | |
| phase: Dict[str, Any], | |
| current_result: Dict[str, Any], | |
| input_data: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Executa fase sequencialmente""" | |
| specialist_ids = phase.get('specialists', []) | |
| for spec_id in specialist_ids: | |
| specialist = self.specialists.get(spec_id) | |
| if not specialist: | |
| logger.warning(f"⚠️ Especialista {spec_id} não encontrado") | |
| continue | |
| # Context injection | |
| context = self.context_builder.build_context( | |
| current_result=current_result, | |
| specialist_id=spec_id | |
| ) | |
| logger.info(f" 🤖 Executando Especialista {spec_id}: {specialist.__class__.__name__}") | |
| try: | |
| partial_result = await specialist.process( | |
| input_data=input_data, | |
| context=context | |
| ) | |
| current_result = self._merge_results(current_result, partial_result) | |
| campos_novos = list(partial_result.keys()) | |
| current_result['metadados_processamento']['campos_enriquecidos'].extend(campos_novos) | |
| logger.info(f" ✅ Especialista {spec_id} completou: {len(campos_novos)} campos") | |
| except Exception as e: | |
| logger.error(f" ❌ Erro no Especialista {spec_id}: {e}") | |
| raise | |
| return current_result | |
| async def _run_phase_parallel( | |
| self, | |
| phase: Dict[str, Any], | |
| current_result: Dict[str, Any], | |
| input_data: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Executa fase em paralelo""" | |
| specialist_ids = phase.get('specialists', []) | |
| tasks = [] | |
| for spec_id in specialist_ids: | |
| specialist = self.specialists.get(spec_id) | |
| if not specialist: | |
| continue | |
| context = self.context_builder.build_context( | |
| current_result=current_result, | |
| specialist_id=spec_id | |
| ) | |
| logger.info(f" 🤖 Agendando Especialista {spec_id} (paralelo)") | |
| task = specialist.process(input_data=input_data, context=context) | |
| tasks.append((spec_id, task)) | |
| results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True) | |
| for (spec_id, _), result in zip(tasks, results): | |
| if isinstance(result, Exception): | |
| logger.error(f" ❌ Erro no Especialista {spec_id}: {result}") | |
| continue | |
| current_result = self._merge_results(current_result, result) | |
| campos_novos = list(result.keys()) | |
| current_result['metadados_processamento']['campos_enriquecidos'].extend(campos_novos) | |
| logger.info(f" ✅ Especialista {spec_id} completou: {len(campos_novos)} campos") | |
| return current_result | |
| def _merge_results(self, current: Dict[str, Any], partial: Dict[str, Any]) -> Dict[str, Any]: | |
| """Mescla resultado parcial no resultado acumulado""" | |
| for key, value in partial.items(): | |
| if key in ['metadados_processamento', 'campos_futuros']: | |
| continue | |
| if isinstance(value, dict) and key in current and isinstance(current[key], dict): | |
| current[key].update(value) | |
| else: | |
| current[key] = value | |
| return current | |
| def get_phase_info(self, phase_id: int) -> Optional[Dict[str, Any]]: | |
| """Retorna informações sobre uma fase específica""" | |
| for phase in self.phases: | |
| if phase['id'] == phase_id: | |
| return phase | |
| return None | |
| def get_pipeline_status(self) -> Dict[str, Any]: | |
| """Retorna status atual da pipeline""" | |
| return { | |
| 'total_phases': len(self.phases), | |
| 'total_specialists': len(self.specialists), | |
| 'phases': [ | |
| { | |
| 'id': p['id'], | |
| 'name': p['name'], | |
| 'parallel': p.get('parallel', False), | |
| 'specialists_count': len(p.get('specialists', [])) | |
| } | |
| for p in self.phases | |
| ] | |
| } | |