caarleexx commited on
Commit
39d6fdc
·
verified ·
1 Parent(s): ffc8ac9

Create worker.py

Browse files
Files changed (1) hide show
  1. worker.py +339 -0
worker.py ADDED
@@ -0,0 +1,339 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ TJ-PR - Extrator ULTRA SIMPLIFICADO (v7.0 - State-Driven Batch Worker)
4
+
5
+ Worker com arquitetura produtor-consumidor. Esta versão foi refatorada para:
6
+ 1. Usar arquivos de estado (CSVs) no repositório Git para rastrear o progresso.
7
+ 2. Acumular registros válidos e salvá-los em lotes.
8
+ 3. Garantir a continuidade do trabalho entre reinicializações lendo o estado.
9
+ """
10
+
11
+ import re
12
+ import json
13
+ import requests
14
+ import csv
15
+ from datetime import datetime
16
+ from pathlib import Path
17
+ import logging
18
+ import random
19
+ import uuid
20
+ import os
21
+ import time
22
+ import gzip
23
+ from concurrent.futures import ThreadPoolExecutor
24
+ from threading import Thread, Lock, current_thread
25
+ from queue import Queue, Empty
26
+ from git import Repo, GitCommandError, Git
27
+
28
+ # -----------------------------
29
+ # Configuração
30
+ # -----------------------------
31
+ GIT_TOKEN = os.getenv("GIT_TOKEN")
32
+ GITHUB_BRANCH = os.getenv("GITHUB_BRANCH", "main")
33
+ LOCAL_REPO_PATH = Path("./repo_clone_v7")
34
+ github_repo_env = os.getenv("GITHUB_REPO")
35
+ if github_repo_env: GITHUB_REPO = github_repo_env.replace("https://github.com/", "").replace(".git", "")
36
+ else: GITHUB_REPO = None
37
+
38
+ MAX_PAGINA = int(os.getenv("MAX_PAGINA") or "121792")
39
+ MIN_PAGINA = int(os.getenv("MIN_PAGINA") or "30001")
40
+
41
+ NUM_WORKERS_EXTRACao = int(os.getenv("NUM_WORKERS_EXTRACAO") or "10")
42
+ TAMANHO_LOTE_PAGINAS = int(os.getenv("TAMANHO_LOTE_PAGINAS") or "100")
43
+ TAMANHO_LOTE_REGISTROS = 500 # Gatilho para criar um novo arquivo de lote
44
+ MAX_TENTATIVAS_EXTRACAO = int(os.getenv("MAX_TENTATIVAS_EXTRACAO") or "5")
45
+
46
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%M:%S")
47
+ logger = logging.getLogger(__name__)
48
+
49
+ # ===================================================================================
50
+ # CLASSE DE EXTRAÇÃO (INALTERADA)
51
+ # ===================================================================================
52
+ class ExtratorUltraSimples:
53
+ def __init__(self):
54
+ self.base_url = "https://portal.tjpr.jus.br"
55
+ def limpar_texto(self, texto: str) -> str:
56
+ texto = re.sub(r"<br\s*/?>", " ", texto, flags=re.IGNORECASE); texto = re.sub(r"<[^>]+>", "", texto); texto = texto.replace("&nbsp;", " ").replace("&quot;", '"').replace("&amp;", "&"); texto = texto.replace("&lt;", "<").replace("&gt;", ">").replace("&#39;", "'"); texto = texto.replace("\r", "").replace("\n", "").replace("\t", ""); texto = re.sub(r"\s+", " ", texto); return texto.strip()
57
+ def extrair_chave_valor_da_linha(self, tr_html: str) -> tuple:
58
+ match_chave = re.search(r"<b>([^<]+)</b>", tr_html, re.IGNORECASE);
59
+ if not match_chave: return None, None
60
+ chave_raw = match_chave.group(1).strip(); chave = chave_raw.replace(":", "").strip(); chave = re.sub(r"[áàâãä]", "a", chave, flags=re.IGNORECASE); chave = re.sub(r"[éèêë]", "e", chave, flags=re.IGNORECASE); chave = re.sub(r"[íìîï]", "i", chave, flags=re.IGNORECASE); chave = re.sub(r"[óòôõö]", "o", chave, flags=re.IGNORECASE); chave = re.sub(r"[úùûü]", "u", chave, flags=re.IGNORECASE); chave = re.sub(r"[ç]", "c", chave, flags=re.IGNORECASE); chave = re.sub(r"\s+", "_", chave); chave = re.sub(r"[^\w]", "", chave); chave = chave.lower()
61
+ match_valor = re.search(r"</b>\s*(.*?)</td>", tr_html, re.DOTALL | re.IGNORECASE)
62
+ if not match_valor: return chave, None
63
+ valor_raw = match_valor.group(1); valor = self.limpar_texto(valor_raw); return chave, valor
64
+ def extrair_url_documento(self, tr_html: str) -> str:
65
+ match = re.search(r"visualizacao\.do\?tjpr\.url\.crypto=([a-f0-9]+)", tr_html)
66
+ if match: return f"{self.base_url}/jurisprudencia/publico/visualizacao.do?tjpr.url.crypto={match.group(1)}"
67
+ return None
68
+ def extrair_tabelas(self, html: str) -> list:
69
+ return re.findall(r'<table[^>]*class=["\']?[^"\']*resultTable[^"\']*["\']?[^>]*>(.*?)</table>', html, re.DOTALL | re.IGNORECASE)
70
+ def extrair_linhas_tr(self, tabela_html: str) -> list:
71
+ return re.findall(r"<tr[^>]*>(.*?)</tr>", tabela_html, re.DOTALL | re.IGNORECASE)
72
+ def extrair_acordao(self, tabela_html: str) -> dict:
73
+ acordao = {}; linhas = self.extrair_linhas_tr(tabela_html)
74
+ for linha_html in linhas:
75
+ if url := self.extrair_url_documento(linha_html): acordao["url_documento"] = url
76
+ if (chave := self.extrair_chave_valor_da_linha(linha_html)[0]) and (valor := self.extrair_chave_valor_da_linha(linha_html)[1]): acordao[chave] = valor
77
+ return acordao
78
+ def extrair_todos_acordaos(self, html: str) -> list:
79
+ tabelas = self.extrair_tabelas(html); return [acordao for tabela in tabelas if (acordao := self.extrair_acordao(tabela))]
80
+
81
+ # ===================================================================================
82
+ # CLASSE PRINCIPAL DO WORKER (REARQUITETADA v7.0)
83
+ # ===================================================================================
84
+ class AbelhaAtomica:
85
+ def __init__(self):
86
+ if not GIT_TOKEN or not GITHUB_REPO: raise ValueError("GIT_TOKEN e GITHUB_REPO s��o obrigatórios.")
87
+ self.worker_id = f"abelha-{uuid.uuid4().hex[:6]}"
88
+ self.extrator_html = ExtratorUltraSimples()
89
+ self.session = requests.Session()
90
+ self.session.headers.update({"User-Agent": f"Mozilla/5.0 ({self.worker_id})"})
91
+
92
+ self.fila_de_tarefas = Queue()
93
+ self.fila_de_resultados = Queue() # Irá conter registros individuais e marcadores de status de página
94
+
95
+ # Locks
96
+ self.lock_git = Lock()
97
+ self.lock_estado = Lock() # Protege o acesso aos dicionários de estado e acumulador
98
+
99
+ self.thread_de_push = None
100
+
101
+ # Gerenciamento de Estado
102
+ self.paginas_status = {} # {123: 'sucesso', 456: 'falha'}
103
+ self.processos_vistos = set() # { 'processo_25_char_1', ... }
104
+ self.registros_acumulados = [] # Acumula registros válidos para o próximo lote
105
+ self.num_lote_saida = 0
106
+
107
+ self._setup_git_repo()
108
+ logger.info(f"Abelha {self.worker_id} (v7.0) iniciando. Estado carregado: {len(self.paginas_status)} páginas, {len(self.processos_vistos)} processos.")
109
+
110
+ def _setup_git_repo(self):
111
+ remote_url = f"https://oauth2:{GIT_TOKEN}@github.com/{GITHUB_REPO}.git"
112
+ if LOCAL_REPO_PATH.exists():
113
+ self.repo = Repo(LOCAL_REPO_PATH)
114
+ else:
115
+ self.repo = Repo.clone_from(remote_url, LOCAL_REPO_PATH, branch=GITHUB_BRANCH)
116
+
117
+ with self.repo.config_writer() as config:
118
+ config.set_value("pull", "rebase", "true")
119
+
120
+ self._carregar_estado_do_repo()
121
+
122
+ def _carregar_estado_do_repo(self):
123
+ with self.lock_git:
124
+ logger.info("Sincronizando com o remoto para carregar estado...")
125
+ self.repo.git.reset("--hard")
126
+ self.repo.remotes.origin.pull()
127
+
128
+ # Carregar status das páginas
129
+ path_paginas = LOCAL_REPO_PATH / "paginas_status.csv"
130
+ if path_paginas.exists():
131
+ with open(path_paginas, 'r', encoding='utf-8') as f:
132
+ reader = csv.reader(f)
133
+ next(reader, None) # Pula cabeçalho
134
+ self.paginas_status = {int(row[0]): row[1] for row in reader}
135
+
136
+ # Carregar processos já vistos
137
+ path_processos = LOCAL_REPO_PATH / "processos_vistos.csv"
138
+ if path_processos.exists():
139
+ with open(path_processos, 'r', encoding='utf-8') as f:
140
+ reader = csv.reader(f)
141
+ next(reader, None) # Pula cabeçalho
142
+ self.processos_vistos = {row[1] for row in reader}
143
+
144
+ # Determinar o próximo número de lote
145
+ path_saida = LOCAL_REPO_PATH / "new_doce"
146
+ path_saida.mkdir(exist_ok=True)
147
+ arquivos_existentes = list(path_saida.glob("*.jsonl.gz"))
148
+ if arquivos_existentes:
149
+ numeros = [int(re.search(r'(\d+)', f.name).group(1)) for f in arquivos_existentes if re.search(r'(\d+)', f.name)]
150
+ self.num_lote_saida = max(numeros) + 1 if numeros else 1
151
+
152
+ def _selecionar_lote_de_paginas(self):
153
+ paginas_para_trabalhar = []
154
+ for pagina in range(MIN_PAGINA, MAX_PAGINA + 1):
155
+ status = self.paginas_status.get(pagina)
156
+ if status not in ['sucesso', 'falha_permanente', 'vazio']:
157
+ paginas_para_trabalhar.append(pagina)
158
+ if len(paginas_para_trabalhar) >= TAMANHO_LOTE_PAGINAS:
159
+ break
160
+
161
+ if not paginas_para_trabalhar:
162
+ logger.info("Nenhuma página nova para processar. Trabalho concluído ou aguardando.")
163
+ return False
164
+
165
+ for pagina in paginas_para_trabalhar:
166
+ self.fila_de_tarefas.put(pagina)
167
+ logger.info(f"Lote de {len(paginas_para_trabalhar)} páginas adicionado à fila de tarefas.")
168
+ return True
169
+
170
+ def extrair_processo_25(self, processo_str):
171
+ if not processo_str: return ""
172
+ return re.sub(r'[^a-zA-Z0-9\-.]', '', str(processo_str))[:25]
173
+
174
+ def validar_registro(self, reg):
175
+ processo_completo = reg.get("processo", "")
176
+ processo_25 = self.extrair_processo_25(processo_completo)
177
+
178
+ if not processo_25 or processo_25 in self.processos_vistos:
179
+ return False
180
+
181
+ url = reg.get("url_documento", "")
182
+ ementa = reg.get("ementa", "")
183
+ integra = reg.get("integra_do_acordao", "")
184
+
185
+ if not any([url, ementa, integra]): return False
186
+
187
+ ementa_upper = ementa.upper().strip()
188
+ integra_upper = integra.upper().strip()
189
+
190
+ if len(processo_completo.strip()) < 25 and processo_completo.upper() not in ementa_upper: return False
191
+ if len(processo_completo.strip()) == 25 and (processo_completo.upper() not in ementa_upper and processo_completo.upper() not in integra_upper): return False
192
+ if ementa and integra and ementa_upper[:60] not in integra_upper: reg["ementa"] = ""
193
+
194
+ return True
195
+
196
+ def _produtor_worker(self):
197
+ delay = random.uniform(0.5, 1.5)
198
+ time.sleep(delay)
199
+
200
+ while True:
201
+ try:
202
+ pagina = self.fila_de_tarefas.get(timeout=1)
203
+ self._processar_pagina_extracao(pagina)
204
+ self.fila_de_tarefas.task_done()
205
+ except Empty:
206
+ break
207
+ except Exception as e:
208
+ logger.error(f"[{current_thread().name}] Erro fatal no worker da página {pagina}: {e}")
209
+ self.fila_de_tarefas.task_done()
210
+
211
+ def extrair_dados_pagina(self, pagina: int) -> list:
212
+ form_data = { "actionType": "pesquisar", "criterioPesquisa": "", "idLocalPesquisa": "99", "pageSize": "50", "pageNumber": str(pagina), "sortColumn": "processos.dataJulgamento", "sortOrder": "asc", "segredoJustica": "pesquisar sem", "mostrarCompleto": "true" }
213
+ for _ in range(MAX_TENTATIVAS_EXTRACAO):
214
+ try:
215
+ response = self.session.post("https://portal.tjpr.jus.br/jurisprudencia/publico/pesquisa.do", data=form_data, timeout=45)
216
+ response.raise_for_status()
217
+ return self.extrator_html.extrair_todos_acordaos(response.text)
218
+ except requests.RequestException:
219
+ time.sleep(5)
220
+ raise ConnectionError(f"Não foi possível buscar a página {pagina} após {MAX_TENTATIVAS_EXTRACAO} tentativas.")
221
+
222
+ def _processar_pagina_extracao(self, pagina: int):
223
+ logger.info(f"[{current_thread().name}] 🐝 Iniciando extração da página {pagina}...")
224
+ try:
225
+ acordaos_brutos = self.extrair_dados_pagina(pagina)
226
+ num_validos = 0
227
+
228
+ if not acordaos_brutos:
229
+ self.fila_de_resultados.put({'_tipo': 'status_pagina', 'pagina': pagina, 'status': 'vazio'})
230
+ return
231
+
232
+ for acordao_raw in acordaos_brutos:
233
+ acordao_enriquecido = self._enriquecer_acordao(acordao_raw)
234
+ if self.validar_registro(acordao_enriquecido):
235
+ self.fila_de_resultados.put(acordao_enriquecido)
236
+ num_validos += 1
237
+
238
+ logger.info(f"[{current_thread().name}] ✅ Página {pagina}: {num_validos}/{len(acordaos_brutos)} registros válidos.")
239
+ self.fila_de_resultados.put({'_tipo': 'status_pagina', 'pagina': pagina, 'status': 'sucesso'})
240
+ except Exception as e:
241
+ logger.error(f"[{current_thread().name}] ❌ FALHA na extração da página {pagina}. Erro: {e}")
242
+ self.fila_de_resultados.put({'_tipo': 'status_pagina', 'pagina': pagina, 'status': 'falha'})
243
+
244
+ def _enriquecer_acordao(self, acordao: dict) -> dict:
245
+ if "processo" in acordao:
246
+ match = re.search(r"^(.*?)\s*\((.*?)\)", acordao["processo"])
247
+ if match: acordao["processo"], acordao["tipo_mov"] = match.group(1).strip(), match.group(2).strip()
248
+ return acordao
249
+
250
+ def _consumidor_push_para_git(self, lote_registros, paginas_atualizadas, processos_novos):
251
+ with self.lock_git:
252
+ logger.info(f"📤 [Push Thread] Iniciando PUSH de {len(lote_registros)} registros...")
253
+ try:
254
+ self.repo.git.reset("--hard"); self.repo.remotes.origin.pull()
255
+
256
+ # 1. Escrever o novo lote de dados
257
+ nome_arquivo = f"acordaos_{self.num_lote_saida:05d}.jsonl.gz"
258
+ caminho_saida = LOCAL_REPO_PATH / "new_doce" / nome_arquivo
259
+ with gzip.open(caminho_saida, 'wt', encoding='utf-8') as f:
260
+ for reg in lote_registros: f.write(json.dumps(reg, ensure_ascii=False) + "\n")
261
+
262
+ # 2. Atualizar arquivos de estado
263
+ path_paginas = LOCAL_REPO_PATH / "paginas_status.csv"
264
+ with open(path_paginas, 'w', newline='', encoding='utf-8') as f:
265
+ writer = csv.writer(f); writer.writerow(['pagina', 'status'])
266
+ for p, s in sorted(paginas_atualizadas.items()): writer.writerow([p, s])
267
+
268
+ path_processos = LOCAL_REPO_PATH / "processos_vistos.csv"
269
+ # Usamos 'a' (append) para adicionar apenas os novos processos
270
+ with open(path_processos, 'a', newline='', encoding='utf-8') as f:
271
+ writer = csv.writer(f)
272
+ if path_processos.stat().st_size == 0: writer.writerow(['id', 'processo_25'])
273
+ for idx, proc in enumerate(processos_novos, start=len(self.processos_vistos) - len(processos_novos)):
274
+ writer.writerow([idx, proc])
275
+
276
+ # 3. Commit e Push
277
+ self.repo.git.add(["new_doce/", "paginas_status.csv", "processos_vistos.csv"])
278
+ commit_msg = f"DATA: {self.worker_id} adiciona lote {self.num_lote_saida} com {len(lote_registros)} registros"
279
+ self.repo.index.commit(commit_msg)
280
+ self.repo.remotes.origin.push()
281
+
282
+ logger.info(f"✅ [Push Thread] Push do lote {self.num_lote_saida} concluído.")
283
+ self.num_lote_saida += 1
284
+
285
+ # Limpar acumulador local SOMENTE após o push bem-sucedido
286
+ with self.lock_estado: self.registros_acumulados.clear()
287
+ except Exception as e:
288
+ logger.critical(f"❌ [Push Thread] FALHA CRÍTICA no push para o Git: {e}. Os dados não foram limpos e serão reenviados.")
289
+ self.thread_de_push = None
290
+
291
+ def loop_infinito(self):
292
+ pool_produtores = ThreadPoolExecutor(max_workers=NUM_WORKERS_EXTRACao, thread_name_prefix='ProdutorWorker')
293
+
294
+ while True:
295
+ try:
296
+ # Alimenta a fila de tarefas se estiver vazia
297
+ if self.fila_de_tarefas.empty():
298
+ if not self._selecionar_lote_de_paginas():
299
+ time.sleep(60) # Dorme se não houver trabalho
300
+ continue
301
+ for _ in range(NUM_WORKERS_EXTRACao):
302
+ pool_produtores.submit(self._produtor_worker)
303
+
304
+ # Processa resultados da fila
305
+ processos_novos_neste_ciclo = set()
306
+ while not self.fila_de_resultados.empty():
307
+ item = self.fila_de_resultados.get_nowait()
308
+ with self.lock_estado:
309
+ if '_tipo' in item and item['_tipo'] == 'status_pagina':
310
+ self.paginas_status[item['pagina']] = item['status']
311
+ else: # É um registro de acórdão
312
+ processo_25 = self.extrair_processo_25(item.get("processo"))
313
+ if processo_25 not in self.processos_vistos:
314
+ self.registros_acumulados.append(item)
315
+ self.processos_vistos.add(processo_25)
316
+ processos_novos_neste_ciclo.add(processo_25)
317
+
318
+ # Dispara o push se o gatilho for atingido
319
+ if len(self.registros_acumulados) >= TAMANHO_LOTE_REGISTROS and self.thread_de_push is None:
320
+ lote_para_push = list(self.registros_acumulados) # Faz uma cópia
321
+ paginas_para_push = dict(self.paginas_status) # Faz uma cópia
322
+
323
+ self.thread_de_push = Thread(target=self._consumidor_push_para_git, args=(lote_para_push, paginas_para_push, processos_novos_neste_ciclo), name="PushThread")
324
+ self.thread_de_push.start()
325
+
326
+ print(f"[Orquestrador] Tarefas: {self.fila_de_tarefas.qsize()}, Acumulados: {len(self.registros_acumulados)}/{TAMANHO_LOTE_REGISTROS}, Push: {'Sim' if self.thread_de_push else 'Não'}", flush=True)
327
+ time.sleep(5)
328
+
329
+ except Exception as e:
330
+ logger.critical(f"ERRO CRÍTICO no loop principal: {e}. Reiniciando...")
331
+ time.sleep(15)
332
+
333
+ def main():
334
+ logger.info("Iniciando Abelha com Pipeline Assíncrono (v7.0 - State-Driven)")
335
+ abelha = AbelhaAtomica()
336
+ abelha.loop_infinito()
337
+
338
+ if __name__ == "__main__":
339
+ main()