PARA.AI / api /services /processing_service.py
caarleexx's picture
Update api/services/processing_service.py
77f0bb6 verified
raw
history blame
6.77 kB
"""
ProcessingService - Usa ProcessorManager REAL
Integração correta com os 9 especialistas existentes
"""
import os
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from llm.llm_manager import LLMManager, LLMProvider
from processors.processor_manager import ProcessorManager
logger = logging.getLogger(__name__)
class ProcessingService:
"""
Serviço que coordena processamento via ProcessorManager REAL.
NÃO cria processors novos - usa os 9 especialistas existentes!
"""
def __init__(
self,
llm_provider: str = "groq",
api_key: Optional[str] = None,
max_workers: int = 3
):
"""
Args:
llm_provider: Provider LLM (groq, openai, anthropic)
api_key: API key (opcional, usa env var se não fornecido)
max_workers: Workers paralelos
"""
self.llm_provider = llm_provider
self.api_key = api_key
self.max_workers = max_workers
# Configurar API key no ambiente se fornecida
if self.api_key:
env_key = f"{llm_provider.upper()}_API_KEY"
os.environ[env_key] = self.api_key
logger.info(f"✅ API key configurada para {env_key}")
# Criar LLMManager (inicializa clients via env vars)
self.llm_manager = self._create_llm_manager()
# Obter client específico do provider
self.llm_client = self._get_provider_client()
# Criar ProcessorManager com LLM client
# ProcessorManager já inicializa os 9 especialistas!
self.processor_manager = ProcessorManager(
llm_model=self.llm_client,
max_workers=max_workers
)
logger.info(
f"✅ ProcessingService inicializado "
f"(provider={llm_provider}, 9 especialistas prontos)"
)
def _create_llm_manager(self) -> Optional[LLMManager]:
"""Cria LLMManager."""
try:
manager = LLMManager()
logger.info("✅ LLMManager inicializado")
return manager
except Exception as e:
logger.error(f"❌ Erro ao criar LLMManager: {e}", exc_info=True)
return None
def _get_provider_client(self):
"""Obtém client do provider selecionado."""
if not self.llm_manager:
logger.warning("⚠️ LLMManager não disponível")
return None
try:
# Mapear string para enum
provider_map = {
"groq": LLMProvider.GROQ,
"openai": LLMProvider.OPENAI,
"anthropic": LLMProvider.ANTHROPIC
}
provider_enum = provider_map.get(self.llm_provider.lower())
if provider_enum and provider_enum in self.llm_manager.clients:
client = self.llm_manager.clients[provider_enum]
logger.info(f"✅ Client obtido: {type(client).__name__}")
return client
else:
logger.warning(
f"⚠️ Client {self.llm_provider} não disponível. "
"Verifique API key no ambiente."
)
return None
except Exception as e:
logger.error(f"❌ Erro ao obter client: {e}", exc_info=True)
return None
async def process_acordao(
self,
acordao_data: Dict[str, Any],
specialist_ids: Optional[List[int]] = None,
enable_parallel: bool = False
) -> Dict[str, Any]:
"""
Processa 1 acórdão usando ProcessorManager.
Args:
acordao_data: Dados do acórdão
specialist_ids: IDs dos especialistas (default: todos)
enable_parallel: Executar em paralelo
Returns:
Resultado consolidado dos 9 especialistas
"""
try:
logger.info(
f"🚀 Processando acórdão {acordao_data.get('acordao_id', 'unknown')} "
f"com ProcessorManager"
)
# Usar ProcessorManager REAL para processar
if enable_parallel:
result = await self.processor_manager.process_acordao_parallel(
acordao_data=acordao_data,
specialist_ids=specialist_ids
)
else:
result = await self.processor_manager.process_acordao_sequential(
acordao_data=acordao_data,
specialist_ids=specialist_ids
)
logger.info(
f"✅ Acórdão processado em {result.get('execution_time', 0):.2f}s"
)
return result
except Exception as e:
logger.error(f"❌ Erro ao processar acórdão: {e}", exc_info=True)
return {
"acordao_id": acordao_data.get("acordao_id", "unknown"),
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def process_batch(
self,
acordaos: List[Dict[str, Any]],
specialist_ids: Optional[List[int]] = None,
enable_parallel: bool = False
) -> Dict[str, Any]:
"""
Processa lote de acórdãos.
Args:
acordaos: Lista de acórdãos
specialist_ids: IDs dos especialistas
enable_parallel: Processar cada acórdão em paralelo
Returns:
Resultados consolidados
"""
results = []
start_time = datetime.now()
for idx, acordao in enumerate(acordaos, 1):
logger.info(f"📄 Processando acórdão {idx}/{len(acordaos)}...")
result = await self.process_acordao(
acordao_data=acordao,
specialist_ids=specialist_ids,
enable_parallel=enable_parallel
)
results.append(result)
elapsed = (datetime.now() - start_time).total_seconds()
successful = len([r for r in results if r.get("status") != "error"])
failed = len(results) - successful
return {
"batch_size": len(acordaos),
"processed": len(results),
"successful": successful,
"failed": failed,
"total_execution_time": elapsed,
"avg_time_per_acordao": elapsed / len(acordaos) if acordaos else 0,
"results": results,
"timestamp": datetime.now().isoformat()
}