""" ProcessingService - Usa ProcessorManager REAL com LLM Integração completa com os 9 especialistas refatorados """ import os import logging import json import tarfile import hashlib from typing import Dict, Any, List, Optional from datetime import datetime from pathlib import Path from llm.llm_manager import LLMManager from processors.processor_manager import ProcessorManager logger = logging.getLogger(__name__) class ProcessingService: """ Serviço que coordena processamento via ProcessorManager REAL MUDANÇAS: - Todos os especialistas usam LLM real - Sem simulações - Configuração via YAML - Suporte a batch e paralelo """ def __init__( self, llm_provider: str = "groq", api_key: Optional[str] = None, max_workers: int = 3 ): """ Args: llm_provider: Provider LLM (groq, openai, anthropic) api_key: API key (opcional, usa env var se não fornecido) max_workers: Workers paralelos """ self.llm_provider = llm_provider self.api_key = api_key self.max_workers = max_workers # Configurar API key no ambiente se fornecida if self.api_key: env_key = f"{llm_provider.upper()}_API_KEY" os.environ[env_key] = self.api_key logger.info(f"✅ API key configurada para {env_key}") # Criar LLMManager self.llm_manager = LLMManager() # Criar ProcessorManager com LLM Manager self.processor_manager = ProcessorManager( llm_manager=self.llm_manager, max_workers=max_workers ) logger.info( f"✅ ProcessingService inicializado " f"(provider={llm_provider}, 9 especialistas prontos)" ) async def process_acordao( self, acordao_data: Dict[str, Any], specialist_ids: Optional[List[int]] = None, enable_parallel: bool = False ) -> Dict[str, Any]: """ Processa 1 acórdão usando ProcessorManager Args: acordao_data: Dados do acórdão specialist_ids: IDs dos especialistas (default: todos) enable_parallel: Executar em paralelo Returns: Resultado consolidado dos 9 especialistas """ try: logger.info( f"🚀 Processando acórdão {acordao_data.get('acordao_id', 'unknown')} " f"com ProcessorManager (parallel={enable_parallel})" ) # Usar ProcessorManager REAL para processar if enable_parallel: result = await self.processor_manager.process_acordao_parallel( acordao_data=acordao_data, specialist_ids=specialist_ids ) else: result = await self.processor_manager.process_acordao_sequential( acordao_data=acordao_data, specialist_ids=specialist_ids ) logger.info( f"✅ Acórdão processado em {result.get('execution_time', 0):.2f}s" ) return result except Exception as e: logger.error(f"❌ Erro ao processar acórdão: {e}", exc_info=True) return { "acordao_id": acordao_data.get("acordao_id", "unknown"), "status": "error", "error": str(e), "timestamp": datetime.now().isoformat() } async def process_batch( self, acordaos: List[Dict[str, Any]], specialist_ids: Optional[List[int]] = None, enable_parallel: bool = False ) -> Dict[str, Any]: """ Processa lote de acórdãos Args: acordaos: Lista de acórdãos specialist_ids: IDs dos especialistas enable_parallel: Processar cada acórdão em paralelo Returns: Resultados consolidados """ results = [] start_time = datetime.now() logger.info(f"📚 Processando batch: {len(acordaos)} acórdãos") for idx, acordao in enumerate(acordaos, 1): logger.info(f"📄 Processando acórdão {idx}/{len(acordaos)}...") result = await self.process_acordao( acordao_data=acordao, specialist_ids=specialist_ids, enable_parallel=enable_parallel ) results.append(result) elapsed = (datetime.now() - start_time).total_seconds() successful = len([r for r in results if r.get("status") != "error"]) failed = len(results) - successful logger.info( f"✅ Batch concluído: {successful} sucessos, {failed} falhas " f"em {elapsed:.2f}s" ) return { "batch_size": len(acordaos), "processed": len(results), "successful": successful, "failed": failed, "total_execution_time": elapsed, "avg_time_per_acordao": elapsed / len(acordaos) if acordaos else 0, "results": results, "timestamp": datetime.now().isoformat() } async def process_jsonl_file( self, file_path: str, task_id: str, llm_provider: str = "groq", model_type: str = "balanced", enable_parallel: bool = True, max_workers: int = 3 ) -> Dict[str, Any]: """ Processa arquivo JSONL completo e gera TAR.GZ com resultados Args: file_path: Caminho do arquivo JSONL task_id: ID da task llm_provider: Provider LLM model_type: Tipo do modelo enable_parallel: Processar em paralelo max_workers: Workers paralelos Returns: Metadados do processamento com caminho do arquivo """ from api.config import get_settings settings = get_settings() start_time = datetime.now() try: # Ler acórdãos do JSONL logger.info(f"📖 Lendo arquivo JSONL: {file_path}") acordaos = [] with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if line: try: acordao = json.loads(line) acordaos.append(acordao) except json.JSONDecodeError as e: logger.warning(f"⚠️ Linha {line_num} inválida: {e}") logger.info(f"📚 {len(acordaos)} acórdãos carregados") if not acordaos: raise ValueError("Nenhum acórdão válido encontrado no arquivo") # Processar batch batch_result = await self.process_batch( acordaos=acordaos, specialist_ids=None, # Todos os especialistas enable_parallel=enable_parallel ) # Criar estrutura de output output_dir = Path(settings.OUTPUT_PATH) / task_id output_dir.mkdir(parents=True, exist_ok=True) # Salvar resultados individuais results_dir = output_dir / "results" results_dir.mkdir(exist_ok=True) for idx, result in enumerate(batch_result['results']): acordao_id = result.get('acordao_id', f'acordao_{idx:04d}') result_file = results_dir / f"{acordao_id}.json" with open(result_file, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) # Salvar sumário summary = { 'task_id': task_id, 'timestamp': datetime.now().isoformat(), 'batch_size': batch_result['batch_size'], 'processed': batch_result['processed'], 'successful': batch_result['successful'], 'failed': batch_result['failed'], 'total_execution_time': batch_result['total_execution_time'], 'avg_time_per_acordao': batch_result['avg_time_per_acordao'], 'llm_provider': llm_provider, 'model_type': model_type, 'enable_parallel': enable_parallel, 'max_workers': max_workers } summary_file = output_dir / "summary.json" with open(summary_file, 'w', encoding='utf-8') as f: json.dump(summary, f, ensure_ascii=False, indent=2) # Criar TAR.GZ archive_dir = Path(settings.OUTPUT_PATH) / "archives" archive_dir.mkdir(parents=True, exist_ok=True) archive_path = archive_dir / f"{task_id}.tar.gz" logger.info(f"📦 Criando arquivo: {archive_path}") with tarfile.open(archive_path, "w:gz") as tar: tar.add(output_dir, arcname=task_id) # Calcular hash sha256_hash = hashlib.sha256() with open(archive_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) file_hash = sha256_hash.hexdigest() elapsed = (datetime.now() - start_time).total_seconds() logger.info( f"✅ Processamento concluído: {batch_result['successful']} sucessos, " f"{batch_result['failed']} falhas em {elapsed:.2f}s" ) return { 'task_id': task_id, 'archive_path': str(archive_path), 'hash': file_hash, 'processed': batch_result['processed'], 'successful': batch_result['successful'], 'failed': batch_result['failed'], 'elapsed_seconds': elapsed, 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erro ao processar arquivo JSONL: {e}", exc_info=True) raise