File size: 10,157 Bytes
e2a60cf
8c81e75
 
e2a60cf
d19c70e
e2a60cf
c70dd89
 
 
e2a60cf
ae2014a
c70dd89
e78ccf9
8c81e75
77f0bb6
e2a60cf
 
 
124d299
ae2014a
e78ccf9
8c81e75
c70dd89
8c81e75
 
 
 
 
e78ccf9
c70dd89
e78ccf9
 
 
d19c70e
 
e78ccf9
 
 
 
d19c70e
 
e78ccf9
e2a60cf
 
d19c70e
c70dd89
d19c70e
 
 
 
 
c70dd89
8c81e75
 
c70dd89
8c81e75
d19c70e
8c81e75
d19c70e
 
c70dd89
e78ccf9
d19c70e
 
e78ccf9
c70dd89
d19c70e
e78ccf9
 
d19c70e
 
e78ccf9
 
8c81e75
c70dd89
e78ccf9
d19c70e
 
 
c70dd89
e78ccf9
d19c70e
e78ccf9
d19c70e
 
 
8c81e75
d19c70e
c70dd89
d19c70e
 
 
 
 
 
 
 
 
 
 
c70dd89
d19c70e
 
 
c70dd89
d19c70e
c70dd89
d19c70e
 
 
 
 
 
 
 
c70dd89
d19c70e
e78ccf9
 
d19c70e
 
e78ccf9
 
8c81e75
c70dd89
e78ccf9
 
d19c70e
 
c70dd89
e78ccf9
 
 
e2a60cf
d19c70e
c70dd89
8c81e75
 
e2a60cf
d19c70e
c70dd89
d19c70e
 
 
 
 
e78ccf9
c70dd89
d19c70e
c70dd89
d19c70e
 
c70dd89
8c81e75
 
 
 
 
ae2014a
e2a60cf
 
d19c70e
 
 
 
e78ccf9
 
c70dd89
 
 
 
 
 
 
 
 
 
 
 
8c81e75
c70dd89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
"""
ProcessingService - Usa ProcessorManager REAL com LLM
Integração completa com os 9 especialistas refatorados
"""
import os
import logging
import json
import tarfile
import hashlib
from typing import Dict, Any, List, Optional
from datetime import datetime
from pathlib import Path

from llm.llm_manager import LLMManager
from processors.processor_manager import ProcessorManager

logger = logging.getLogger(__name__)


class ProcessingService:
    """
    Serviço que coordena processamento via ProcessorManager REAL

    MUDANÇAS:
    - Todos os especialistas usam LLM real
    - Sem simulações
    - Configuração via YAML
    - Suporte a batch e paralelo
    """

    def __init__(
        self,
        llm_provider: str = "groq",
        api_key: Optional[str] = None,
        max_workers: int = 3
    ):
        """
        Args:
            llm_provider: Provider LLM (groq, openai, anthropic)
            api_key: API key (opcional, usa env var se não fornecido)
            max_workers: Workers paralelos
        """
        self.llm_provider = llm_provider
        self.api_key = api_key
        self.max_workers = max_workers

        # Configurar API key no ambiente se fornecida
        if self.api_key:
            env_key = f"{llm_provider.upper()}_API_KEY"
            os.environ[env_key] = self.api_key
            logger.info(f"✅ API key configurada para {env_key}")

        # Criar LLMManager
        self.llm_manager = LLMManager()

        # Criar ProcessorManager com LLM Manager
        self.processor_manager = ProcessorManager(
            llm_manager=self.llm_manager,
            max_workers=max_workers
        )

        logger.info(
            f"✅ ProcessingService inicializado "
            f"(provider={llm_provider}, 9 especialistas prontos)"
        )

    async def process_acordao(
        self,
        acordao_data: Dict[str, Any],
        specialist_ids: Optional[List[int]] = None,
        enable_parallel: bool = False
    ) -> Dict[str, Any]:
        """
        Processa 1 acórdão usando ProcessorManager

        Args:
            acordao_data: Dados do acórdão
            specialist_ids: IDs dos especialistas (default: todos)
            enable_parallel: Executar em paralelo

        Returns:
            Resultado consolidado dos 9 especialistas
        """
        try:
            logger.info(
                f"🚀 Processando acórdão {acordao_data.get('acordao_id', 'unknown')} "
                f"com ProcessorManager (parallel={enable_parallel})"
            )

            # Usar ProcessorManager REAL para processar
            if enable_parallel:
                result = await self.processor_manager.process_acordao_parallel(
                    acordao_data=acordao_data,
                    specialist_ids=specialist_ids
                )
            else:
                result = await self.processor_manager.process_acordao_sequential(
                    acordao_data=acordao_data,
                    specialist_ids=specialist_ids
                )

            logger.info(
                f"✅ Acórdão processado em {result.get('execution_time', 0):.2f}s"
            )

            return result

        except Exception as e:
            logger.error(f"❌ Erro ao processar acórdão: {e}", exc_info=True)
            return {
                "acordao_id": acordao_data.get("acordao_id", "unknown"),
                "status": "error",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }

    async def process_batch(
        self,
        acordaos: List[Dict[str, Any]],
        specialist_ids: Optional[List[int]] = None,
        enable_parallel: bool = False
    ) -> Dict[str, Any]:
        """
        Processa lote de acórdãos

        Args:
            acordaos: Lista de acórdãos
            specialist_ids: IDs dos especialistas
            enable_parallel: Processar cada acórdão em paralelo

        Returns:
            Resultados consolidados
        """
        results = []
        start_time = datetime.now()

        logger.info(f"📚 Processando batch: {len(acordaos)} acórdãos")

        for idx, acordao in enumerate(acordaos, 1):
            logger.info(f"📄 Processando acórdão {idx}/{len(acordaos)}...")

            result = await self.process_acordao(
                acordao_data=acordao,
                specialist_ids=specialist_ids,
                enable_parallel=enable_parallel
            )
            results.append(result)

        elapsed = (datetime.now() - start_time).total_seconds()

        successful = len([r for r in results if r.get("status") != "error"])
        failed = len(results) - successful

        logger.info(
            f"✅ Batch concluído: {successful} sucessos, {failed} falhas "
            f"em {elapsed:.2f}s"
        )

        return {
            "batch_size": len(acordaos),
            "processed": len(results),
            "successful": successful,
            "failed": failed,
            "total_execution_time": elapsed,
            "avg_time_per_acordao": elapsed / len(acordaos) if acordaos else 0,
            "results": results,
            "timestamp": datetime.now().isoformat()
        }

    async def process_jsonl_file(
        self,
        file_path: str,
        task_id: str,
        llm_provider: str = "groq",
        model_type: str = "balanced",
        enable_parallel: bool = True,
        max_workers: int = 3
    ) -> Dict[str, Any]:
        """
        Processa arquivo JSONL completo e gera TAR.GZ com resultados

        Args:
            file_path: Caminho do arquivo JSONL
            task_id: ID da task
            llm_provider: Provider LLM
            model_type: Tipo do modelo
            enable_parallel: Processar em paralelo
            max_workers: Workers paralelos

        Returns:
            Metadados do processamento com caminho do arquivo
        """
        from api.config import get_settings

        settings = get_settings()
        start_time = datetime.now()

        try:
            # Ler acórdãos do JSONL
            logger.info(f"📖 Lendo arquivo JSONL: {file_path}")
            acordaos = []

            with open(file_path, 'r', encoding='utf-8') as f:
                for line_num, line in enumerate(f, 1):
                    line = line.strip()
                    if line:
                        try:
                            acordao = json.loads(line)
                            acordaos.append(acordao)
                        except json.JSONDecodeError as e:
                            logger.warning(f"⚠️ Linha {line_num} inválida: {e}")

            logger.info(f"📚 {len(acordaos)} acórdãos carregados")

            if not acordaos:
                raise ValueError("Nenhum acórdão válido encontrado no arquivo")

            # Processar batch
            batch_result = await self.process_batch(
                acordaos=acordaos,
                specialist_ids=None,  # Todos os especialistas
                enable_parallel=enable_parallel
            )

            # Criar estrutura de output
            output_dir = Path(settings.OUTPUT_PATH) / task_id
            output_dir.mkdir(parents=True, exist_ok=True)

            # Salvar resultados individuais
            results_dir = output_dir / "results"
            results_dir.mkdir(exist_ok=True)

            for idx, result in enumerate(batch_result['results']):
                acordao_id = result.get('acordao_id', f'acordao_{idx:04d}')
                result_file = results_dir / f"{acordao_id}.json"

                with open(result_file, 'w', encoding='utf-8') as f:
                    json.dump(result, f, ensure_ascii=False, indent=2)

            # Salvar sumário
            summary = {
                'task_id': task_id,
                'timestamp': datetime.now().isoformat(),
                'batch_size': batch_result['batch_size'],
                'processed': batch_result['processed'],
                'successful': batch_result['successful'],
                'failed': batch_result['failed'],
                'total_execution_time': batch_result['total_execution_time'],
                'avg_time_per_acordao': batch_result['avg_time_per_acordao'],
                'llm_provider': llm_provider,
                'model_type': model_type,
                'enable_parallel': enable_parallel,
                'max_workers': max_workers
            }

            summary_file = output_dir / "summary.json"
            with open(summary_file, 'w', encoding='utf-8') as f:
                json.dump(summary, f, ensure_ascii=False, indent=2)

            # Criar TAR.GZ
            archive_dir = Path(settings.OUTPUT_PATH) / "archives"
            archive_dir.mkdir(parents=True, exist_ok=True)
            archive_path = archive_dir / f"{task_id}.tar.gz"

            logger.info(f"📦 Criando arquivo: {archive_path}")

            with tarfile.open(archive_path, "w:gz") as tar:
                tar.add(output_dir, arcname=task_id)

            # Calcular hash
            sha256_hash = hashlib.sha256()
            with open(archive_path, "rb") as f:
                for byte_block in iter(lambda: f.read(4096), b""):
                    sha256_hash.update(byte_block)

            file_hash = sha256_hash.hexdigest()
            elapsed = (datetime.now() - start_time).total_seconds()

            logger.info(
                f"✅ Processamento concluído: {batch_result['successful']} sucessos, "
                f"{batch_result['failed']} falhas em {elapsed:.2f}s"
            )

            return {
                'task_id': task_id,
                'archive_path': str(archive_path),
                'hash': file_hash,
                'processed': batch_result['processed'],
                'successful': batch_result['successful'],
                'failed': batch_result['failed'],
                'elapsed_seconds': elapsed,
                'timestamp': datetime.now().isoformat()
            }

        except Exception as e:
            logger.error(f"❌ Erro ao processar arquivo JSONL: {e}", exc_info=True)
            raise