caarleexx commited on
Commit
2663ee4
·
verified ·
1 Parent(s): dfe7ad4

Delete pipeline_base (1).py

Browse files
Files changed (1) hide show
  1. pipeline_base (1).py +0 -322
pipeline_base (1).py DELETED
@@ -1,322 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- pipeline_base.py - Classes base e orquestrador de pipeline
4
-
5
- Fornece:
6
- - BaseProcessor: classe abstrata para todos os processadores
7
- - CPUProcessor: processadores só com CPU
8
- - LLMProcessor: processadores que usam Groq API
9
- - PipelineEngine: orquestra sequência de processadores
10
- - ProcessorTeam: agrupa processadores por domínio
11
- - PipelineBuilder: builder para pipelines complexas
12
- """
13
-
14
- import json
15
- import logging
16
- from abc import ABC, abstractmethod
17
- from typing import Dict, List, Optional, Callable
18
- from datetime import datetime
19
-
20
- logger = logging.getLogger(__name__)
21
-
22
-
23
- def ensure_manifestacoes_autos(record: dict) -> dict:
24
- """Garante que record tem chave manifestacoes_autos."""
25
- if "manifestacoes_autos" not in record:
26
- record["manifestacoes_autos"] = {
27
- "data_extracao": datetime.now().isoformat()
28
- }
29
- return record
30
-
31
-
32
- # ============================================================================
33
- # CLASSES BASE
34
- # ============================================================================
35
-
36
- class BaseProcessor(ABC):
37
- """
38
- Classe abstrata para todos os processadores.
39
-
40
- Define interface obrigatória e tracking de stats.
41
- """
42
-
43
- def __init__(self, name: str, expertise: str):
44
- """
45
- Args:
46
- name: Nome único do processador
47
- expertise: Descrição do domínio de expertise
48
- """
49
- self.name = name
50
- self.expertise = expertise
51
- self.stats = {
52
- "processados": 0,
53
- "sucesso": 0,
54
- "erro": 0,
55
- "ignorado": 0
56
- }
57
-
58
- @abstractmethod
59
- def process(self, record: dict) -> dict:
60
- """
61
- Processa um registro.
62
-
63
- Args:
64
- record: Dict com dados do acórdão
65
-
66
- Returns:
67
- Record processado (modificado ou não)
68
- """
69
- pass
70
-
71
- def get_stats(self) -> dict:
72
- """Retorna estatísticas do processador."""
73
- return {
74
- "processor": self.name,
75
- "expertise": self.expertise,
76
- **self.stats
77
- }
78
-
79
- def reset_stats(self):
80
- """Reseta estatísticas."""
81
- self.stats = {
82
- "processados": 0,
83
- "sucesso": 0,
84
- "erro": 0,
85
- "ignorado": 0
86
- }
87
-
88
-
89
- class CPUProcessor(BaseProcessor):
90
- """
91
- Processador que usa apenas CPU (regex, heurística, operações em memória).
92
-
93
- Não faz chamadas à API externa.
94
- """
95
-
96
- def __init__(self, name: str, expertise: str):
97
- super().__init__(name, expertise)
98
-
99
-
100
- class LLMProcessor(BaseProcessor):
101
- """
102
- Processador que usa chamadas a LLM (Groq API).
103
-
104
- Requer llm_worker_factory para funcionar.
105
- Sem factory, fica no estado "ignorado".
106
- """
107
-
108
- def __init__(self, name: str, expertise: str, llm_worker_factory: Optional[Callable] = None):
109
- super().__init__(name, expertise)
110
- self.llm_worker_factory = llm_worker_factory
111
- self.stats["llm_calls"] = 0
112
- self.stats["tokens_consumed"] = 0
113
-
114
- def _call_llm(self, tarefa: dict, system_prompt: str) -> Optional[dict]:
115
- """
116
- Chamada genérica a LLM via worker factory.
117
-
118
- Args:
119
- tarefa: Dict com id, chunk_id, dados_originais
120
- system_prompt: Prompt do sistema para a tarefa
121
-
122
- Returns:
123
- Dict com resultado ou None em caso de erro
124
- """
125
- if not self.llm_worker_factory:
126
- return None
127
-
128
- try:
129
- worker = self.llm_worker_factory()
130
-
131
- # Prepara chamada LLM
132
- user_message = json.dumps(tarefa.get("dados_originais", {}), ensure_ascii=False)
133
-
134
- # Simula resultado (em produção seria worker.call_llm real)
135
- # Esta é uma implementação stub - veja llm_worker_hybrid.py para real
136
- logger.debug(f"LLM call para {self.name}: {len(user_message)} chars")
137
-
138
- self.stats["llm_calls"] += 1
139
- # self.stats["tokens_consumed"] += estimated_tokens
140
-
141
- return {"status": "success"} # Placeholder
142
-
143
- except Exception as e:
144
- logger.warning(f"⚠️ Erro em chamada LLM ({self.name}): {e}")
145
- return None
146
-
147
-
148
- # ============================================================================
149
- # ORQUESTRADOR
150
- # ============================================================================
151
-
152
- class ProcessorTeam:
153
- """Agrupa processadores por domínio especializado."""
154
-
155
- def __init__(self, name: str, description: str):
156
- self.name = name
157
- self.description = description
158
- self.processors: List[BaseProcessor] = []
159
-
160
- def add_processor(self, processor: BaseProcessor):
161
- """Adiciona processador ao time."""
162
- self.processors.append(processor)
163
- logger.debug(f"✓ {processor.name} adicionado a {self.name}")
164
-
165
- def get_stats(self) -> dict:
166
- """Retorna stats agregados do time."""
167
- return {
168
- "time": self.name,
169
- "description": self.description,
170
- "num_processadores": len(self.processors),
171
- "processadores": [p.get_stats() for p in self.processors]
172
- }
173
-
174
-
175
- class PipelineEngine:
176
- """
177
- Motor de execução de pipeline.
178
-
179
- Orquestra sequência de processadores e coleta stats.
180
- """
181
-
182
- def __init__(self):
183
- self.processors: List[BaseProcessor] = []
184
-
185
- def add_processor(self, processor: BaseProcessor):
186
- """Adiciona processador à sequência."""
187
- self.processors.append(processor)
188
-
189
- def process_record(self, record: dict) -> dict:
190
- """
191
- Processa um registro através de toda a sequência.
192
-
193
- Args:
194
- record: Dict com dados do acórdão
195
-
196
- Returns:
197
- Record processado
198
- """
199
- for processor in self.processors:
200
- try:
201
- record = processor.process(record)
202
- except Exception as e:
203
- logger.error(f"❌ Erro crítico em {processor.name}: {e}")
204
- raise
205
-
206
- return record
207
-
208
- def process_batch(self, records: List[dict]) -> List[dict]:
209
- """
210
- Processa lote de registros.
211
-
212
- Args:
213
- records: Lista de dicts
214
-
215
- Returns:
216
- Lista de records processados
217
- """
218
- results = []
219
- for i, record in enumerate(records):
220
- try:
221
- processed = self.process_record(record)
222
- results.append(processed)
223
- if (i + 1) % 100 == 0:
224
- logger.info(f"✓ {i + 1}/{len(records)} registros processados")
225
- except Exception as e:
226
- logger.error(f"❌ Erro ao processar registro {i}: {e}")
227
- results.append(record) # Retorna original em caso de erro
228
-
229
- return results
230
-
231
- def get_full_stats(self) -> dict:
232
- """Coleta stats de todos os processadores."""
233
- return {
234
- "pipeline": "PipelineOrchestratorMultidisciplinar",
235
- "num_processadores": len(self.processors),
236
- "processors": [p.get_stats() for p in self.processors]
237
- }
238
-
239
- def reset_stats(self):
240
- """Reseta stats de todos os processadores."""
241
- for processor in self.processors:
242
- processor.reset_stats()
243
-
244
-
245
- class PipelineBuilder:
246
- """
247
- Builder para construir pipelines complexas com times.
248
-
249
- Permite:
250
- - Criar times especializados
251
- - Adicionar processadores a times
252
- - Construir engine final
253
- """
254
-
255
- def __init__(self):
256
- self.teams: List[ProcessorTeam] = []
257
- self.engine = PipelineEngine()
258
-
259
- def create_team(self, name: str, description: str) -> ProcessorTeam:
260
- """Cria novo time."""
261
- team = ProcessorTeam(name, description)
262
- self.teams.append(team)
263
- return team
264
-
265
- def add_team_to_engine(self, team: ProcessorTeam):
266
- """Adiciona todos os processadores de um time ao engine."""
267
- for processor in team.processors:
268
- self.engine.add_processor(processor)
269
-
270
- def add_all_teams(self):
271
- """Adiciona todos os times ao engine."""
272
- for team in self.teams:
273
- self.add_team_to_engine(team)
274
-
275
- def build(self) -> PipelineEngine:
276
- """Constrói engine final com todos os times."""
277
- self.add_all_teams()
278
- logger.info(f"✅ Pipeline construída com {len(self.engine.processors)} processadores")
279
- return self.engine
280
-
281
- def print_structure(self):
282
- """Imprime estrutura de times e processadores."""
283
- print("\n" + "=" * 100)
284
- print("ESTRUTURA DA PIPELINE MULTIDISCIPLINAR")
285
- print("=" * 100)
286
-
287
- for i, team in enumerate(self.teams, 1):
288
- print(f"\n{i}. 👥 {team.name}")
289
- print(f" Descrição: {team.description}")
290
- print(f" Processadores ({len(team.processors)}):")
291
-
292
- for processor in team.processors:
293
- is_llm = "LLM" if isinstance(processor, LLMProcessor) else "CPU"
294
- print(f" ✓ {processor.name:<40} [{is_llm}] - {processor.expertise}")
295
-
296
- print("\n" + "=" * 100)
297
- print(f"Total: {len(self.teams)} times | {sum(len(t.processors) for t in self.teams)} processadores")
298
- print("=" * 100 + "\n")
299
-
300
-
301
- # ============================================================================
302
- # HELPER FUNCTIONS
303
- # ============================================================================
304
-
305
- def estimate_tokens(text: str) -> int:
306
- """
307
- Estima tokens em texto para cálculo de custo.
308
-
309
- Approximação: 1 token ≈ 4 caracteres
310
- """
311
- return len(text) // 4
312
-
313
-
314
- def log_processor_start(processor: BaseProcessor, record_id: str):
315
- """Loga início de processamento."""
316
- logger.debug(f"🔄 {processor.name}: iniciando para {record_id}")
317
-
318
-
319
- def log_processor_end(processor: BaseProcessor, record_id: str, status: str):
320
- """Loga fim de processamento."""
321
- emoji = "✓" if status == "sucesso" else "✗" if status == "erro" else "⊘"
322
- logger.debug(f"{emoji} {processor.name}: {status} para {record_id}")