PARA.AI / processors /processor_manager.py
Carlex22's picture
ParaAIV3.6
2b9d72c
##PARA.AI/processors/processor_manager.py
"""
Processor Manager - REFATORADO para V13.6
Wrapper para manter compatibilidade com código existente
"""
import logging
import yaml
from typing import Dict, Any, List, Optional
from core.orchestrator import PipelineOrchestrator
from core.validator import SchemaValidator
# Importar especialistas V13.1 (migração gradual)
from processors.processor_metadados import ProcessorMetadados
from processors.processor_segmentacao import ProcessorSegmentacao
from processors.processor_relatorio import ProcessorRelatorio
from processors.processor_fundamentacao import ProcessorFundamentacao
from processors.processor_decisao import ProcessorDecisao
from processors.processor_arquivo import ProcessorArquivo
from processors.processor_contexto import ProcessorContexto
logger = logging.getLogger(__name__)
class ProcessorManager:
"""Wrapper para PipelineOrchestrator V13.6"""
def __init__(self, llm_manager, max_workers: int = 3):
self.llm_manager = llm_manager
self.max_workers = max_workers
# Carregar config
with open('config/pipeline_config.yaml', 'r', encoding='utf-8') as f:
self.pipeline_config = yaml.safe_load(f)
# Inicializar especialistas (mapeamento V13.1)
self.specialists = {
1: ProcessorSegmentacao(llm_manager),
2: ProcessorMetadados(llm_manager),
3: ProcessorContexto(llm_manager),
4: ProcessorRelatorio(llm_manager),
5: ProcessorFundamentacao(llm_manager),
6: ProcessorDecisao(llm_manager),
7: ProcessorArquivo(llm_manager),
}
# Inicializar orquestrador V13.6
self.orchestrator = PipelineOrchestrator(
config=self.pipeline_config,
llm_manager=llm_manager,
specialists=self.specialists
)
logger.info(f"✅ ProcessorManager V13.6: {len(self.specialists)} especialistas")
async def process_acordao_sequential(
self,
acordao_data: Dict[str, Any],
specialist_ids: Optional[List[int]] = None
) -> Dict[str, Any]:
"""Compatibilidade V13.1"""
if specialist_ids:
fase_inicial = min(specialist_ids)
fase_final = max(specialist_ids)
else:
fase_inicial = 1
fase_final = 6
return await self.orchestrator.process_acordao(
acordao_bruto=acordao_data,
fase_inicial=fase_inicial,
fase_final=fase_final
)
async def process_acordao_parallel(
self,
acordao_data: Dict[str, Any],
specialist_ids: Optional[List[int]] = None
) -> Dict[str, Any]:
"""Compatibilidade V13.1"""
return await self.orchestrator.process_acordao(
acordao_bruto=acordao_data,
fase_inicial=1,
fase_final=6
)
def get_processor(self, specialist_id: int):
return self.specialists.get(specialist_id)
def get_all_processors(self) -> Dict[int, Any]:
return self.specialists
def get_processors_info(self) -> Dict[str, Any]:
return self.orchestrator.get_pipeline_status()