PARA.AI / api /services /processing_service.py
Carlex22's picture
Revert "ParaAIV3.1"
1f24745
"""
ProcessingService - Usa ProcessorManager REAL com LLM
Integração completa com os 9 especialistas refatorados
"""
import os
import logging
import json
import tarfile
import hashlib
from typing import Dict, Any, List, Optional
from datetime import datetime
from pathlib import Path
from llm.llm_manager import LLMManager
from processors.processor_manager import ProcessorManager
logger = logging.getLogger(__name__)
class ProcessingService:
"""
Serviço que coordena processamento via ProcessorManager REAL
MUDANÇAS:
- Todos os especialistas usam LLM real
- Sem simulações
- Configuração via YAML
- Suporte a batch e paralelo
"""
def __init__(
self,
llm_provider: str = "groq",
api_key: Optional[str] = None,
max_workers: int = 3
):
"""
Args:
llm_provider: Provider LLM (groq, openai, anthropic)
api_key: API key (opcional, usa env var se não fornecido)
max_workers: Workers paralelos
"""
self.llm_provider = llm_provider
self.api_key = api_key
self.max_workers = max_workers
# Configurar API key no ambiente se fornecida
if self.api_key:
env_key = f"{llm_provider.upper()}_API_KEY"
os.environ[env_key] = self.api_key
logger.info(f"✅ API key configurada para {env_key}")
# Criar LLMManager
self.llm_manager = LLMManager()
# Criar ProcessorManager com LLM Manager
self.processor_manager = ProcessorManager(
llm_manager=self.llm_manager,
max_workers=max_workers
)
logger.info(
f"✅ ProcessingService inicializado "
f"(provider={llm_provider}, 9 especialistas prontos)"
)
async def process_acordao(
self,
acordao_data: Dict[str, Any],
specialist_ids: Optional[List[int]] = None,
enable_parallel: bool = False
) -> Dict[str, Any]:
"""
Processa 1 acórdão usando ProcessorManager
Args:
acordao_data: Dados do acórdão
specialist_ids: IDs dos especialistas (default: todos)
enable_parallel: Executar em paralelo
Returns:
Resultado consolidado dos 9 especialistas
"""
try:
logger.info(
f"🚀 Processando acórdão {acordao_data.get('acordao_id', 'unknown')} "
f"com ProcessorManager (parallel={enable_parallel})"
)
# Usar ProcessorManager REAL para processar
if enable_parallel:
result = await self.processor_manager.process_acordao_parallel(
acordao_data=acordao_data,
specialist_ids=specialist_ids
)
else:
result = await self.processor_manager.process_acordao_sequential(
acordao_data=acordao_data,
specialist_ids=specialist_ids
)
logger.info(
f"✅ Acórdão processado em {result.get('execution_time', 0):.2f}s"
)
return result
except Exception as e:
logger.error(f"❌ Erro ao processar acórdão: {e}", exc_info=True)
return {
"acordao_id": acordao_data.get("acordao_id", "unknown"),
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def process_batch(
self,
acordaos: List[Dict[str, Any]],
specialist_ids: Optional[List[int]] = None,
enable_parallel: bool = False
) -> Dict[str, Any]:
"""
Processa lote de acórdãos
Args:
acordaos: Lista de acórdãos
specialist_ids: IDs dos especialistas
enable_parallel: Processar cada acórdão em paralelo
Returns:
Resultados consolidados
"""
results = []
start_time = datetime.now()
logger.info(f"📚 Processando batch: {len(acordaos)} acórdãos")
for idx, acordao in enumerate(acordaos, 1):
logger.info(f"📄 Processando acórdão {idx}/{len(acordaos)}...")
result = await self.process_acordao(
acordao_data=acordao,
specialist_ids=specialist_ids,
enable_parallel=enable_parallel
)
results.append(result)
elapsed = (datetime.now() - start_time).total_seconds()
successful = len([r for r in results if r.get("status") != "error"])
failed = len(results) - successful
logger.info(
f"✅ Batch concluído: {successful} sucessos, {failed} falhas "
f"em {elapsed:.2f}s"
)
return {
"batch_size": len(acordaos),
"processed": len(results),
"successful": successful,
"failed": failed,
"total_execution_time": elapsed,
"avg_time_per_acordao": elapsed / len(acordaos) if acordaos else 0,
"results": results,
"timestamp": datetime.now().isoformat()
}
async def process_jsonl_file(
self,
file_path: str,
task_id: str,
llm_provider: str = "groq",
model_type: str = "balanced",
enable_parallel: bool = True,
max_workers: int = 3
) -> Dict[str, Any]:
"""
Processa arquivo JSONL completo e gera TAR.GZ com resultados
Args:
file_path: Caminho do arquivo JSONL
task_id: ID da task
llm_provider: Provider LLM
model_type: Tipo do modelo
enable_parallel: Processar em paralelo
max_workers: Workers paralelos
Returns:
Metadados do processamento com caminho do arquivo
"""
from api.config import get_settings
settings = get_settings()
start_time = datetime.now()
try:
# Ler acórdãos do JSONL
logger.info(f"📖 Lendo arquivo JSONL: {file_path}")
acordaos = []
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line:
try:
acordao = json.loads(line)
acordaos.append(acordao)
except json.JSONDecodeError as e:
logger.warning(f"⚠️ Linha {line_num} inválida: {e}")
logger.info(f"📚 {len(acordaos)} acórdãos carregados")
if not acordaos:
raise ValueError("Nenhum acórdão válido encontrado no arquivo")
# Processar batch
batch_result = await self.process_batch(
acordaos=acordaos,
specialist_ids=None, # Todos os especialistas
enable_parallel=enable_parallel
)
# Criar estrutura de output
output_dir = Path(settings.OUTPUT_PATH) / task_id
output_dir.mkdir(parents=True, exist_ok=True)
# Salvar resultados individuais
results_dir = output_dir / "results"
results_dir.mkdir(exist_ok=True)
for idx, result in enumerate(batch_result['results']):
acordao_id = result.get('acordao_id', f'acordao_{idx:04d}')
result_file = results_dir / f"{acordao_id}.json"
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
# Salvar sumário
summary = {
'task_id': task_id,
'timestamp': datetime.now().isoformat(),
'batch_size': batch_result['batch_size'],
'processed': batch_result['processed'],
'successful': batch_result['successful'],
'failed': batch_result['failed'],
'total_execution_time': batch_result['total_execution_time'],
'avg_time_per_acordao': batch_result['avg_time_per_acordao'],
'llm_provider': llm_provider,
'model_type': model_type,
'enable_parallel': enable_parallel,
'max_workers': max_workers
}
summary_file = output_dir / "summary.json"
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
# Criar TAR.GZ
archive_dir = Path(settings.OUTPUT_PATH) / "archives"
archive_dir.mkdir(parents=True, exist_ok=True)
archive_path = archive_dir / f"{task_id}.tar.gz"
logger.info(f"📦 Criando arquivo: {archive_path}")
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(output_dir, arcname=task_id)
# Calcular hash
sha256_hash = hashlib.sha256()
with open(archive_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
file_hash = sha256_hash.hexdigest()
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(
f"✅ Processamento concluído: {batch_result['successful']} sucessos, "
f"{batch_result['failed']} falhas em {elapsed:.2f}s"
)
return {
'task_id': task_id,
'archive_path': str(archive_path),
'hash': file_hash,
'processed': batch_result['processed'],
'successful': batch_result['successful'],
'failed': batch_result['failed'],
'elapsed_seconds': elapsed,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"❌ Erro ao processar arquivo JSONL: {e}", exc_info=True)
raise