caarleexx commited on
Commit
e2a60cf
verified
1 Parent(s): fc7b50e

Upload processing_service.py

Browse files
Files changed (1) hide show
  1. api/services/processing_service.py +60 -87
api/services/processing_service.py CHANGED
@@ -1,94 +1,67 @@
1
- """Servi莽o de processamento de ac贸rd茫os"""
2
- from typing import Dict, Any
3
- import json
4
- import tarfile
5
- import hashlib
6
- from pathlib import Path
7
  from datetime import datetime
 
 
 
 
 
8
 
9
  class ProcessingService:
10
- """Servi莽o para processar arquivos JSONL de ac贸rd茫os."""
11
-
12
- async def process_jsonl_file(
13
- self,
14
- file_path: str,
15
- task_id: str,
16
- llm_provider: str = "groq",
17
- model_type: str = "balanced",
18
- enable_parallel: bool = True,
19
- max_workers: int = 3
20
- ) -> Dict[str, Any]:
21
- """
22
- Processa arquivo JSONL com ac贸rd茫os.
23
-
24
- Returns:
25
- Dict com resultados do processamento
26
- """
27
- from api.config import get_settings
28
-
29
- settings = get_settings()
30
- processed = 0
31
- failed = 0
32
- results = []
33
-
34
- # Ler arquivo JSONL
35
- with open(file_path, 'r', encoding='utf-8') as f:
36
- for line in f:
37
- if not line.strip():
38
- continue
39
-
40
- try:
41
- record = json.loads(line)
42
- # Aqui seria o processamento real com os 9 especialistas
43
- # Por enquanto, retornar mock
44
- results.append({
45
- "acordao_id": record.get("acordao_id"),
46
- "status": "processed",
47
- "timestamp": datetime.now().isoformat()
48
- })
49
- processed += 1
50
- except Exception as e:
51
- failed += 1
52
- results.append({
53
- "acordao_id": record.get("acordao_id", "unknown"),
54
- "status": "error",
55
- "error": str(e)
56
- })
57
-
58
- # Criar arquivo TAR.GZ com resultados
59
- output_dir = Path(settings.OUTPUT_PATH) / "archives"
60
- output_dir.mkdir(parents=True, exist_ok=True)
61
-
62
- archive_path = output_dir / f"{task_id}.tar.gz"
63
-
64
- # Criar JSON com resultados
65
- result_json = {
66
- "task_id": task_id,
67
- "processed": processed,
68
- "failed": failed,
69
  "results": results,
70
- "completed_at": datetime.now().isoformat()
71
  }
72
-
73
- temp_json = output_dir / f"{task_id}_results.json"
74
- with open(temp_json, 'w', encoding='utf-8') as f:
75
- json.dump(result_json, f, ensure_ascii=False, indent=2)
76
-
77
- # Criar TAR.GZ
78
- with tarfile.open(archive_path, 'w:gz') as tar:
79
- tar.add(temp_json, arcname=f"{task_id}_results.json")
80
-
81
- # Remover JSON tempor谩rio
82
- temp_json.unlink()
83
-
84
- # Calcular hash
85
- with open(archive_path, 'rb') as f:
86
- file_hash = hashlib.sha256(f.read()).hexdigest()
87
-
88
  return {
89
- "processed": processed,
90
- "failed": failed,
91
- "archive_path": str(archive_path),
92
- "hash": file_hash,
93
- "elapsed_seconds": 1.5 # mock
94
  }
 
1
+ """
2
+ ProcessingService REAL - Integra LLM
3
+ """
4
+ import logging
5
+ from typing import Dict, Any, List, Optional
 
6
  from datetime import datetime
7
+ from api.llm.factory import LLMFactory
8
+ from api.processors.processor_metadados_llm import ProcessorMetadadosLLM
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
 
13
  class ProcessingService:
14
+ """Servi莽o de processamento com LLM REAL."""
15
+
16
+ def __init__(self, llm_provider: str = "groq", api_key: Optional[str] = None):
17
+ """Initialize."""
18
+ self.llm_provider = llm_provider
19
+ self.api_key = api_key
20
+ self.llm_client = self._create_llm_client()
21
+ self.processors = {1: ProcessorMetadadosLLM(llm_client=self.llm_client)}
22
+ logger.info(f"ProcessingService init (provider={llm_provider})")
23
+
24
+ def _create_llm_client(self):
25
+ """Cria LLM client."""
26
+ try:
27
+ if not self.api_key:
28
+ logger.warning("API key n茫o fornecida")
29
+ return None
30
+ factory = LLMFactory(provider=self.llm_provider, api_key=self.api_key)
31
+ return factory.create_client(model_tier="balanced")
32
+ except Exception as e:
33
+ logger.error(f"Erro ao criar LLM client: {e}")
34
+ return None
35
+
36
+ def process_acordao(self, acordao_data: Dict[str, Any]) -> Dict[str, Any]:
37
+ """Processa 1 ac贸rd茫o."""
38
+ start_time = datetime.now()
39
+ results = {}
40
+
41
+ for proc_id, processor in self.processors.items():
42
+ try:
43
+ logger.info(f"Executando {processor.specialist_name}...")
44
+ result = processor.process(acordao_data)
45
+ results[proc_id] = processor.postprocess(result)
46
+ except Exception as e:
47
+ logger.error(f"Erro processor {proc_id}: {e}")
48
+
49
+ return {
50
+ "acordao_id": acordao_data.get("acordao_id", "unknown"),
51
+ "status": "completed",
52
+ "execution_time": (datetime.now() - start_time).total_seconds(),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  "results": results,
54
+ "timestamp": datetime.now().isoformat()
55
  }
56
+
57
+ def process_batch(self, acordaos: List[Dict[str, Any]]) -> Dict[str, Any]:
58
+ """Processa batch."""
59
+ results = []
60
+ for idx, acordao in enumerate(acordaos, 1):
61
+ logger.info(f"Processando {idx}/{len(acordaos)}...")
62
+ results.append(self.process_acordao(acordao))
 
 
 
 
 
 
 
 
 
63
  return {
64
+ "batch_size": len(acordaos),
65
+ "processed": len(results),
66
+ "results": results
 
 
67
  }