caarleexx commited on
Commit
b5f9fbc
·
verified ·
1 Parent(s): 5cc126f

Update worker.py

Browse files
Files changed (1) hide show
  1. worker.py +354 -234
worker.py CHANGED
@@ -1,11 +1,12 @@
1
  #!/usr/bin/env python3
2
  """
3
- TJ-PR - Extrator ULTRA SIMPLIFICADO (v8.4 - SINGLE WORKER OPTIMIZED)
4
 
5
- Otimização:
6
- 1. STARTUP: Carrega estado do Git para memória (Fundamental).
7
- 2. RUNTIME: Confia na memória RAM (não relê disco a cada push).
8
- 3. PUSH: Apenas Append das novidades nos CSVs.
 
9
  """
10
 
11
  import re
@@ -23,19 +24,19 @@ import time
23
  import gzip
24
  import tarfile
25
  import io
 
26
  from concurrent.futures import ThreadPoolExecutor
27
  from threading import Thread, Lock
28
  from queue import Queue, Empty
29
  from git import Repo, GitCommandError
30
 
31
  # -----------------------------
32
- # Configuração & Caminhos
33
  # -----------------------------
34
- BASE_DIR = Path(__file__).parent.resolve()
35
  GIT_TOKEN = os.getenv("GIT_TOKEN")
36
  GITHUB_BRANCH = os.getenv("GITHUB_BRANCH", "main")
37
- LOCAL_REPO_PATH = BASE_DIR / "repo_clone_v8"
38
- STATS_FILE = BASE_DIR / "monitor_stats.json"
39
 
40
  github_repo_env = os.getenv("GITHUB_REPO")
41
  if github_repo_env:
@@ -45,279 +46,398 @@ else:
45
 
46
  MAX_PAGINA = int(os.getenv("MAX_PAGINA") or "121792")
47
  MIN_PAGINA = int(os.getenv("MIN_PAGINA") or "1")
 
48
  NUM_WORKERS_EXTRACao = int(os.getenv("NUM_WORKERS_EXTRACAO") or "10")
49
  TAMANHO_LOTE_PAGINAS = int(os.getenv("TAMANHO_LOTE_PAGINAS") or "100")
50
  TAMANHO_LOTE_REGISTROS = 500
51
  MAX_TENTATIVAS_EXTRACAO = int(os.getenv("MAX_TENTATIVAS_EXTRACAO") or "5")
52
 
53
- logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
 
 
 
 
54
  logger = logging.getLogger(__name__)
55
 
56
  # ===================================================================================
57
- # CLASSES UTILITÁRIAS
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  # ===================================================================================
59
  class DestaqueExtractor:
60
  def __init__(self):
61
- self.MIN = 3
62
- self.STOP = {'OCULTARACORDAO', 'ATENCAO', 'O TEXTO ABAIXO REPRESENTA A TRANSCRICAO DE ACORDAO', 'EVENTUAIS IMAGENS SERAO SUPRIMIDAS', 'TRANSCRICAO DE ACORDAO', 'ESTADO DO PARANA', 'PODER JUDICIARIO', 'TRIBUNAL DE JUSTICA', 'RELATOR DESEMBARGADOR', 'VISTOS'}
63
- def _norm(self, t): return re.sub(r'[^a-zA-Z0-9.\s]', '.', "".join([c for c in unicodedata.normalize('NFKD', t or "") if not unicodedata.combining(c)]))
64
- def extrair_destaques(self, reg, pid):
65
- txt = self._norm(f"{reg.get('ementa', '')} {reg.get('integra_do_acordao', '')}")
66
- chunks = set()
67
- for c in txt.split('.'):
68
- c = re.sub(r'\s+', ' ', c).strip()
69
- if len(c) >= self.MIN and c.isupper() and not any(x.islower() for x in c) and c not in self.STOP and len(c.split())<=12: chunks.add(c)
70
- return [{"id": f"{pid}-{i}", "id_processo": pid, "texto": t} for i, t in enumerate(chunks, 1)]
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  class ExtratorUltraSimples:
73
- def __init__(self): self.base = "https://portal.tjpr.jus.br"
74
- def _limpa(self, t): return re.sub(r"\s+", " ", re.sub(r"<[^>]+>", "", t.replace("&nbsp;", " ").replace("&quot;", '"').replace("&#39;", "'"))).strip()
75
- def extrair_acordaos(self, html):
76
- res = []
77
- for tbl in re.findall(r'<table[^>]*class=["\']?[^"\']*resultTable[^"\']*["\']?[^>]*>(.*?)</table>', html, re.DOTALL | re.IGNORECASE):
78
- ac = {}
79
- for tr in re.findall(r"<tr[^>]*>(.*?)</tr>", tbl, re.DOTALL | re.IGNORECASE):
80
- if m := re.search(r"tjpr\.url\.crypto=([a-f0-9]+)", tr): ac["url_documento"] = f"{self.base}/jurisprudencia/publico/visualizacao.do?tjpr.url.crypto={m.group(1)}"
81
- if m := re.search(r"<b>([^<]+)</b>", tr, re.IGNORECASE):
82
- k = re.sub(r"[^\w]", "", unicodedata.normalize('NFKD', m.group(1).strip()).encode('ascii','ignore').decode('ascii').lower())
83
- if v := re.search(r"</b>\s*(.*?)</td>", tr, re.DOTALL | re.IGNORECASE): ac[k] = self._limpa(v.group(1))
84
- if ac: res.append(ac)
85
- return res
 
 
 
 
 
 
86
 
87
  # ===================================================================================
88
- # WORKER v8.4 (SINGLE WORKER OPTIMIZED)
89
  # ===================================================================================
90
  class AbelhaAtomica:
91
  def __init__(self):
92
- if not GIT_TOKEN or not GITHUB_REPO: raise ValueError("GIT_TOKEN/REPO required")
93
  self.worker_id = f"abelha-{uuid.uuid4().hex[:6]}"
94
- self.extrator = ExtratorUltraSimples()
95
- self.destaques = DestaqueExtractor()
 
96
  self.session = requests.Session(); self.session.headers.update({"User-Agent": f"Mozilla/5.0 ({self.worker_id})"})
97
 
98
- self.fila, self.res = Queue(), Queue()
99
- self.lk_git, self.lk_state, self.lk_stats = Lock(), Lock(), Lock()
100
- self.th_push = None
101
 
102
  self.paginas_status, self.processos_vistos = {}, set()
103
- self.buf_regs, self.buf_tags = [], []
104
- # Buf de páginas novas para append no CSV
105
- self.buf_paginas_novas = {}
106
-
107
- self.lote_n, self.prox_id = 1, 1
108
- self.st_pg, self.st_proc, self.tot_pg = [], [], 0
109
-
110
- logger.info(f"--- WORKER {self.worker_id} (v8.4 - Single Worker) ---")
111
- self._init_repo()
112
 
113
- def _init_repo(self):
114
- url = f"https://oauth2:{GIT_TOKEN}@github.com/{GITHUB_REPO}.git"
115
- if not LOCAL_REPO_PATH.exists(): Repo.clone_from(url, LOCAL_REPO_PATH, branch=GITHUB_BRANCH)
 
116
  else:
117
  self.repo = Repo(LOCAL_REPO_PATH)
118
- try: os.remove(LOCAL_REPO_PATH/".git"/"index.lock")
119
- except: pass
120
- self.repo.git.remote("set-url", "origin", url)
121
 
122
- with self.repo.config_writer() as c: c.set_value("user", "name", self.worker_id); c.set_value("user", "email", "bot@local")
123
-
124
- # CARGA INICIAL (FUNDAMENTAL)
125
- with self.lk_git:
126
- logger.info("Sincronizando estado inicial...")
127
- self.repo.git.fetch("origin")
128
- self.repo.git.reset("--hard", f"origin/{GITHUB_BRANCH}")
129
-
130
- # Lê CSVs para memória
131
- f_pg = LOCAL_REPO_PATH / "paginas_status.csv"
132
- if not f_pg.exists(): open(f_pg, 'w').write("pagina,status\n")
133
- with open(f_pg, 'r') as f:
134
- r = csv.reader(f); next(r, None)
135
- self.paginas_status = {int(row[0]): row[1] for row in r if row}
 
 
 
 
 
136
 
137
- f_pr = LOCAL_REPO_PATH / "processos_vistos.csv"
138
- if not f_pr.exists(): open(f_pr, 'w').write("id,processo_25\n")
139
- with open(f_pr, 'r') as f:
140
- r = csv.reader(f); next(r, None)
141
- ids = []
142
- for row in r:
143
- if len(row)>1:
144
- if row[0].isdigit(): ids.append(int(row[0]))
145
- self.processos_vistos.add(row[1])
146
- if ids: self.prox_id = max(ids)+1
147
-
148
- # Lote
149
- new_doce = LOCAL_REPO_PATH / "new_doce"; new_doce.mkdir(exist_ok=True)
150
- nums = [int(re.search(r'(\d+)', f.name).group(1)) for f in new_doce.glob("lote_*.tar.gz") if re.search(r'(\d+)', f.name)]
151
- if nums: self.lote_n = max(nums) + 1
152
 
153
- logger.info(f"Memória carregada: {len(self.paginas_status)} pgs, {len(self.processos_vistos)} procs.")
 
 
 
 
154
 
155
- # --- Stats ---
156
- def _metric(self, t, n=1):
157
- now = time.time()
158
- with self.lk_stats:
159
- if t=='pg': self.st_pg.append(now); self.tot_pg += 1
160
- else: self.st_proc.extend([now]*n)
 
161
 
162
- def _upd_stats(self):
163
- now = time.time()
164
- with self.lk_stats:
165
- self.st_pg = [x for x in self.st_pg if x > now-7500]
166
- self.st_proc = [x for x in self.st_proc if x > now-7500]
167
- def avg(l, m): return round(sum(1 for x in l if x >= now - m*60)/m, 2)
168
- s = {
169
- "worker": self.worker_id, "last": time.strftime("%H:%M:%S"),
170
- "sessao_pg": self.tot_pg, "db_procs": len(self.processos_vistos),
171
- "fila": self.fila.qsize(), "buf": len(self.buf_regs),
172
- "rate_pg": {k: avg(self.st_pg, v) for k,v in [("5m",5),("30m",30),("2h",120)]},
173
- "rate_proc": {k: avg(self.st_proc, v) for k,v in [("5m",5),("30m",30),("2h",120)]}
174
- }
175
- try:
176
- tmp = STATS_FILE.with_suffix(".tmp"); json.dump(s, open(tmp,'w')); tmp.replace(STATS_FILE)
177
- except: pass
178
- return s
179
-
180
- # --- Flow ---
181
- def _fill(self):
182
- p = [x for x in range(MIN_PAGINA, MAX_PAGINA+1) if x not in self.paginas_status]
183
- if not p: return False
184
- for x in p[:TAMANHO_LOTE_PAGINAS]: self.fila.put(x)
185
- logger.info(f"Fila +{len(p[:TAMANHO_LOTE_PAGINAS])}"); return True
186
-
187
- def _worker(self):
188
- time.sleep(random.random())
189
  while True:
190
  try:
191
- pg = self.fila.get(timeout=3)
192
- if pg in self.paginas_status:
193
- self.res.put(('ign', pg)) # Já processado (duplicado na fila ou memória)
194
- else:
195
- try:
196
- data = {"actionType": "pesquisar", "idLocalPesquisa": "99", "pageSize": "50", "pageNumber": str(pg), "sortColumn": "processos.dataJulgamento", "sortOrder": "asc", "mostrarCompleto": "true"}
197
- # Retry loop simplificado
198
- acs = None
199
- for _ in range(MAX_TENTATIVAS_EXTRACAO):
200
- try:
201
- r = self.session.post("https://portal.tjpr.jus.br/jurisprudencia/publico/pesquisa.do", data=data, timeout=45)
202
- if r.status_code == 200: acs = self.extrator.extrair_acordaos(r.text); break
203
- except: time.sleep(5)
204
-
205
- if acs is None: raise Exception("Max retries")
206
-
207
- if not acs: self.res.put(('vazio', pg))
208
- else:
209
- for a in acs: self.res.put(('ok', pg, a))
210
- self.res.put(('fim_pg', pg))
211
- self._metric('pg')
212
- except: self.res.put(('err', pg))
213
- self.fila.task_done()
214
  except Empty: break
215
- except: self.fila.task_done()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
- def _norm(self, ac, pg, rid):
218
- p25 = re.sub(r'[^a-zA-Z0-9\-.]', '', str(ac.get("processo","")))[:25]
219
- h = lambda x: hashlib.sha256(x.encode('utf-8')).hexdigest() if x else None
220
- princ = {"Id": rid, "pagina": pg, "processo": p25, "url": ac.get("url_documento"), "rel": ac.get("relatora"), "org": ac.get("orgao_julgador"), "dt": ac.get("data_do_julgamento"), "hash_int": h(ac.get("integra_do_acordao")), "hash_em": h(ac.get("ementa"))}
221
- return {"principal": princ, "ementa": {"id_proc": rid, "txt": ac.get("ementa")} if princ["hash_em"] else None, "integra": {"id_proc": rid, "txt": ac.get("integra_do_acordao")} if princ["hash_int"] else None}
222
 
223
- def _push(self, lote, tags, pgs_novas, procs_novos):
224
- with self.lk_git:
225
- logger.info(f"🚀 Push Lote {self.lote_n} ({len(lote)} docs)...")
 
226
  try:
227
- bn = f"{self.lote_n:05d}"
228
- with tarfile.open(LOCAL_REPO_PATH / "new_doce" / f"lote_{bn}.tar.gz", "w:gz") as tar:
229
- def w(n, d): b = io.BytesIO(json.dumps(d, ensure_ascii=False).encode('utf8')); ti = tarfile.TarInfo(n); ti.size = len(b.getvalue()); tar.addfile(ti, b)
230
- # Escreve um JSON por linha em cada arquivo dentro do tar (formato JSONL)
231
- buf_a, buf_e, buf_i, buf_t = io.StringIO(), io.StringIO(), io.StringIO(), io.StringIO()
232
- for x in lote:
233
- buf_a.write(json.dumps(x['principal'], ensure_ascii=False)+'\n')
234
- if x['ementa']: buf_e.write(json.dumps(x['ementa'], ensure_ascii=False)+'\n')
235
- if x['integra']: buf_i.write(json.dumps(x['integra'], ensure_ascii=False)+'\n')
236
- for t in tags: buf_t.write(json.dumps(t, ensure_ascii=False)+'\n')
 
 
 
 
 
 
237
 
238
- def add_buf(nome, buf): b = buf.getvalue().encode('utf8'); ti = tarfile.TarInfo(nome); ti.size = len(b); tar.addfile(ti, io.BytesIO(b))
239
- add_buf(f"acordaos_{bn}.jsonl", buf_a); add_buf(f"ementa_{bn}.jsonl", buf_e)
240
- add_buf(f"integra_{bn}.jsonl", buf_i); add_buf(f"tags_{bn}.jsonl", buf_t)
 
 
 
 
 
 
 
 
 
 
241
 
242
- with self.lk_state:
243
- self.buf_regs = []; self.buf_tags = []; self.buf_paginas_novas = {}
244
- self.lote_n += 1
 
 
 
245
 
246
- # GIT OPERATIONS
247
- for i in range(3):
 
 
 
 
 
 
 
248
  try:
249
- try: os.remove(LOCAL_REPO_PATH/".git"/"index.lock")
250
- except: pass
251
- self.repo.git.fetch("origin")
252
- # Reset hard para garantir base limpa
253
- self.repo.git.reset("--hard", f"origin/{GITHUB_BRANCH}")
254
-
255
- # APPEND (Apenas novidades)
256
- with open(LOCAL_REPO_PATH/"paginas_status.csv", 'a', newline='', encoding='utf-8') as f:
257
- csv.writer(f).writerows(sorted(pgs_novas.items()))
258
-
259
- with open(LOCAL_REPO_PATH/"processos_vistos.csv", 'a', newline='', encoding='utf-8') as f:
260
- csv.writer(f).writerows(sorted(procs_novos.items(), key=lambda x: x[1]))
261
-
262
- self.repo.git.add(["new_doce/", "paginas_status.csv", "processos_vistos.csv"])
263
- self.repo.index.commit(f"Lote {bn}")
264
  self.repo.remotes.origin.push()
 
 
 
 
 
 
 
 
 
265
 
266
- s = self._upd_stats()
267
- logger.info(f" Push OK. Rate: {s['rate_pg']['5m']} pg/m")
268
- break
269
- except Exception as e:
270
- logger.warning(f"Retry push {i}: {e}"); time.sleep(10)
271
- except Exception as e: logger.critical(f"Push Fatal: {e}")
272
- self.th_push = None
273
-
274
- def run(self):
275
- pool = ThreadPoolExecutor(NUM_WORKERS_EXTRACao)
276
- last_s = time.time()
 
 
277
  while True:
278
  try:
279
- if self.fila.empty(): self._fill()
280
- while self.fila.qsize() and len(pool._threads) < NUM_WORKERS_EXTRACao: pool.submit(self._worker)
281
 
282
- # Consumo de resultados e atualização de memória
283
- novos_procs = {}
284
- # Novas páginas deste ciclo para append no CSV
285
- # Nota: paginas_status (memoria) acumula tudo. pgs_novas é só para o CSV.
286
 
287
- while not self.res.empty():
288
- t, *d = self.res.get_nowait()
289
- with self.lk_state:
290
- if t == 'ign': pass
291
- elif t in ['vazio', 'fim_pg', 'err']:
292
- pg = d[0]
293
- st = 'vazio' if t=='vazio' else ('sucesso' if t=='fim_pg' else 'falha')
294
- self.paginas_status[pg] = st
295
- self.buf_paginas_novas[pg] = st # Guarda para o CSV
296
- elif t == 'ok':
297
- pg, ac = d
298
- p25 = re.sub(r'[^a-zA-Z0-9\-.]', '', str(ac.get("processo","")))[:25]
299
- if p25 and p25 not in self.processos_vistos:
300
- rid = self.prox_id
301
- self.buf_regs.append(self._norm(ac, pg, rid))
302
- self.buf_tags.extend(self.destaques.extrair_destaques(ac, rid))
303
- self.processos_vistos.add(p25)
304
- novos_procs[p25] = rid # Guarda para CSV
305
- self.prox_id += 1
306
- self._metric('proc')
307
-
308
- # Trigger Push
309
- if len(self.buf_regs) >= TAMANHO_LOTE_REGISTROS and not self.th_push:
310
- l = list(self.buf_regs); tg = list(self.buf_tags)
311
- pn = dict(self.buf_paginas_novas) # Cópia das novas páginas
312
- # Limpa o buffer de paginas novas após capturar para envio
313
- # (Se o push falhar, perdemos o registro no CSV mas a memória segura.
314
- # Idealmente o buffer só limparia no sucesso, mas para simplificar v8.4 está ok)
315
 
316
- self.th_push = Thread(target=self._push, args=(l, tg, pn, novos_procs))
317
- self.th_push.start()
 
 
 
 
318
 
319
- if time.time()-last_s > 10: self._upd_stats(); last_s = time.time()
320
  time.sleep(5)
321
- except Exception as e: logger.critical(f"Loop Err: {e}"); time.sleep(10)
 
 
 
 
322
 
323
- if __name__ == "__main__": AbelhaAtomica().run()
 
 
1
  #!/usr/bin/env python3
2
  """
3
+ TJ-PR - Extrator ULTRA SIMPLIFICADO (v7.8 - Git Resiliente & Stats)
4
 
5
+ Worker com arquitetura produtor-consumidor.
6
+ v7.8 MODIFICAÇÕES:
7
+ 1. Git Resiliente: Commit local antes de Pull/Push. Estratégia Rebase.
8
+ 2. Logs Limpos: Apenas eventos de lote e push.
9
+ 3. Estatísticas: Monitoramento de médias (5m, 30m, 120m) salvo em JSON.
10
  """
11
 
12
  import re
 
24
  import gzip
25
  import tarfile
26
  import io
27
+ from collections import deque
28
  from concurrent.futures import ThreadPoolExecutor
29
  from threading import Thread, Lock
30
  from queue import Queue, Empty
31
  from git import Repo, GitCommandError
32
 
33
  # -----------------------------
34
+ # Configuração
35
  # -----------------------------
 
36
  GIT_TOKEN = os.getenv("GIT_TOKEN")
37
  GITHUB_BRANCH = os.getenv("GITHUB_BRANCH", "main")
38
+ LOCAL_REPO_PATH = Path("./repo_clone_v7")
39
+ STATS_FILE_PATH = LOCAL_REPO_PATH / "stats.json"
40
 
41
  github_repo_env = os.getenv("GITHUB_REPO")
42
  if github_repo_env:
 
46
 
47
  MAX_PAGINA = int(os.getenv("MAX_PAGINA") or "121792")
48
  MIN_PAGINA = int(os.getenv("MIN_PAGINA") or "1")
49
+
50
  NUM_WORKERS_EXTRACao = int(os.getenv("NUM_WORKERS_EXTRACAO") or "10")
51
  TAMANHO_LOTE_PAGINAS = int(os.getenv("TAMANHO_LOTE_PAGINAS") or "100")
52
  TAMANHO_LOTE_REGISTROS = 500
53
  MAX_TENTATIVAS_EXTRACAO = int(os.getenv("MAX_TENTATIVAS_EXTRACAO") or "5")
54
 
55
+ logging.basicConfig(
56
+ level=logging.INFO,
57
+ format="%(asctime)s [%(levelname)s] %(message)s",
58
+ datefmt="%H:%M:%S"
59
+ )
60
  logger = logging.getLogger(__name__)
61
 
62
  # ===================================================================================
63
+ # GERENCIADOR DE ESTATÍSTICAS
64
+ # ===================================================================================
65
+ class EstatisticaManager:
66
+ def __init__(self):
67
+ self.lock = Lock()
68
+ self.total_paginas = 0
69
+ self.total_processos = 0
70
+ # Armazena timestamps dos eventos para cálculo de média móvel
71
+ self.history_paginas = deque()
72
+ self.history_processos = deque()
73
+
74
+ def registrar(self, qtd_paginas, qtd_processos):
75
+ now = time.time()
76
+ with self.lock:
77
+ self.total_paginas += qtd_paginas
78
+ self.total_processos += qtd_processos
79
+ for _ in range(qtd_paginas): self.history_paginas.append(now)
80
+ for _ in range(qtd_processos): self.history_processos.append(now)
81
+ self._limpar_antigos(now)
82
+
83
+ def _limpar_antigos(self, now):
84
+ # Remove eventos mais velhos que 120 minutos (7200 segundos)
85
+ limit = now - 7200
86
+ while self.history_paginas and self.history_paginas[0] < limit: self.history_paginas.popleft()
87
+ while self.history_processos and self.history_processos[0] < limit: self.history_processos.popleft()
88
+
89
+ def calcular_medias(self):
90
+ now = time.time()
91
+ with self.lock:
92
+ self._limpar_antigos(now)
93
+ stats = {
94
+ "total_paginas": self.total_paginas,
95
+ "total_processos": self.total_processos,
96
+ "medias_paginas": self._calc_rate(self.history_paginas, now),
97
+ "medias_processos": self._calc_rate(self.history_processos, now),
98
+ "timestamp_atualizacao": time.strftime("%Y-%m-%d %H:%M:%S")
99
+ }
100
+ return stats
101
+
102
+ def _calc_rate(self, history, now):
103
+ # Retorna contagem absoluta nos intervalos
104
+ def count_in_window(seconds):
105
+ limit = now - seconds
106
+ return sum(1 for t in history if t >= limit)
107
+
108
+ # Opcional: Para virar "por minuto", dividiria pelo tempo.
109
+ # Aqui retornaremos o TOTAL processado na janela de tempo.
110
+ return {
111
+ "5min": count_in_window(300),
112
+ "30min": count_in_window(1800),
113
+ "120min": count_in_window(7200)
114
+ }
115
+
116
+ def salvar_arquivo(self):
117
+ dados = self.calcular_medias()
118
+ try:
119
+ with open(STATS_FILE_PATH, 'w', encoding='utf-8') as f:
120
+ json.dump(dados, f, indent=2)
121
+ except Exception:
122
+ pass # Falha não crítica
123
+ return dados
124
+
125
+ # ===================================================================================
126
+ # CLASSE DE EXTRAÇÃO DE DESTAQUES (Integrada)
127
  # ===================================================================================
128
  class DestaqueExtractor:
129
  def __init__(self):
130
+ self.MIN_CARACTERES_TAG = 3
131
+ self.STOP_PHRASES = {
132
+ 'OCULTARACORDAO', 'ATENCAO', 'O TEXTO ABAIXO REPRESENTA A TRANSCRICAO DE ACORDAO',
133
+ 'EVENTUAIS IMAGENS SERAO SUPRIMIDAS', 'TRANSCRICAO DE ACORDAO', 'ESTADO DO PARANA',
134
+ 'PODER JUDICIARIO', 'TRIBUNAL DE JUSTICA', 'RELATOR DESEMBARGADOR', 'VISTOS'
135
+ }
136
+
137
+ def _normalizar(self, texto: str) -> str:
138
+ if not texto: return ""
139
+ nfkd_form = unicodedata.normalize('NFKD', texto)
140
+ texto_limpo = "".join([c for c in nfkd_form if not unicodedata.combining(c)])
141
+ return re.sub(r'[^a-zA-Z0-9.\s]', '.', texto_limpo)
142
+
143
+ def _extrair_chunks(self, texto_processado: str) -> list[str]:
144
+ chunks = texto_processado.split('.')
145
+ etiquetas = set()
146
+ for chunk in chunks:
147
+ chunk = re.sub(r'\s+', ' ', chunk).strip()
148
+ if len(chunk) >= self.MIN_CARACTERES_TAG:
149
+ if chunk.isupper() and not any(c.islower() for c in chunk):
150
+ if chunk not in self.STOP_PHRASES and len(chunk.split()) <= 12:
151
+ etiquetas.add(chunk)
152
+ return list(etiquetas)
153
 
154
+ def extrair_destaques(self, registro: dict, id_processo: int) -> list[dict]:
155
+ conteudo = f"{registro.get('ementa', '')} {registro.get('integra_do_acordao', '')}"
156
+ texto_normalizado = self._normalizar(conteudo)
157
+ destaques_encontrados = self._extrair_chunks(texto_normalizado)
158
+
159
+ tags_para_saida = []
160
+ for i, texto_destaque in enumerate(destaques_encontrados, 1):
161
+ tag_id = f"{id_processo}-{i}"
162
+ tags_para_saida.append({ "id": tag_id, "id_processo": id_processo, "texto": texto_destaque })
163
+ return tags_para_saida
164
+
165
+ # ===================================================================================
166
+ # CLASSE DE EXTRAÇÃO WEB
167
+ # ===================================================================================
168
  class ExtratorUltraSimples:
169
+ def __init__(self): self.base_url = "https://portal.tjpr.jus.br"
170
+ def limpar_texto(self, texto: str) -> str: texto = re.sub(r"<br\s*/?>", " ", texto, flags=re.IGNORECASE); texto = re.sub(r"<[^>]+>", "", texto); texto = texto.replace("&nbsp;", " ").replace("&quot;", '"').replace("&amp;", "&"); texto = texto.replace("&lt;", "<").replace("&gt;", ">").replace("&#39;", "'"); texto = texto.replace("\r", "").replace("\n", "").replace("\t", ""); texto = re.sub(r"\s+", " ", texto); return texto.strip()
171
+ def extrair_chave_valor_da_linha(self, tr_html: str) -> tuple:
172
+ match_chave = re.search(r"<b>([^<]+)</b>", tr_html, re.IGNORECASE);
173
+ if not match_chave: return None, None
174
+ chave_raw = match_chave.group(1).strip(); chave = chave_raw.replace(":", "").strip(); chave = re.sub(r"[áàâãä]", "a", chave, flags=re.IGNORECASE); chave = re.sub(r"[éèêë]", "e", chave, flags=re.IGNORECASE); chave = re.sub(r"[íìîï]", "i", chave, flags=re.IGNORECASE); chave = re.sub(r"[óòôõö]", "o", chave, flags=re.IGNORECASE); chave = re.sub(r"[úùûü]", "u", chave, flags=re.IGNORECASE); chave = re.sub(r"[ç]", "c", chave, flags=re.IGNORECASE); chave = re.sub(r"\s+", "_", chave); chave = re.sub(r"[^\w]", "", chave); chave = chave.lower();
175
+ match_valor = re.search(r"</b>\s*(.*?)</td>", tr_html, re.DOTALL | re.IGNORECASE)
176
+ if not match_valor: return chave, None
177
+ valor_raw = match_valor.group(1); valor = self.limpar_texto(valor_raw); return chave, valor
178
+ def extrair_url_documento(self, tr_html: str) -> str: match = re.search(r"visualizacao\.do\?tjpr\.url\.crypto=([a-f0-9]+)", tr_html); return f"{self.base_url}/jurisprudencia/publico/visualizacao.do?tjpr.url.crypto={match.group(1)}" if match else None
179
+ def extrair_tabelas(self, html: str) -> list: return re.findall(r'<table[^>]*class=["\']?[^"\']*resultTable[^"\']*["\']?[^>]*>(.*?)</table>', html, re.DOTALL | re.IGNORECASE)
180
+ def extrair_linhas_tr(self, tabela_html: str) -> list: return re.findall(r"<tr[^>]*>(.*?)</tr>", tabela_html, re.DOTALL | re.IGNORECASE)
181
+ def extrair_acordao(self, tabela_html: str) -> dict:
182
+ acordao = {};
183
+ for linha_html in self.extrair_linhas_tr(tabela_html):
184
+ if url := self.extrair_url_documento(linha_html): acordao["url_documento"] = url
185
+ if (chave := self.extrair_chave_valor_da_linha(linha_html)[0]) and (valor := self.extrair_chave_valor_da_linha(linha_html)[1]): acordao[chave] = valor
186
+ return acordao
187
+ def extrair_todos_acordaos(self, html: str) -> list: return [acordao for tabela in self.extrair_tabelas(html) if (acordao := self.extrair_acordao(tabela))]
188
 
189
  # ===================================================================================
190
+ # CLASSE PRINCIPAL DO WORKER
191
  # ===================================================================================
192
  class AbelhaAtomica:
193
  def __init__(self):
194
+ if not GIT_TOKEN or not GITHUB_REPO: raise ValueError("GIT_TOKEN e GITHUB_REPO são obrigatórios.")
195
  self.worker_id = f"abelha-{uuid.uuid4().hex[:6]}"
196
+ self.extrator_html = ExtratorUltraSimples()
197
+ self.destaque_extractor = DestaqueExtractor()
198
+ self.stats = EstatisticaManager()
199
  self.session = requests.Session(); self.session.headers.update({"User-Agent": f"Mozilla/5.0 ({self.worker_id})"})
200
 
201
+ self.fila_de_tarefas, self.fila_de_resultados = Queue(), Queue()
202
+ self.lock_git, self.lock_estado = Lock(), Lock()
203
+ self.thread_de_push = None
204
 
205
  self.paginas_status, self.processos_vistos = {}, set()
206
+ self.registros_acumulados, self.tags_destaques_acumuladas, self.html_bruto_acumulado = [], [], {}
207
+ self.num_lote_saida, self.proximo_id_registro = 1, 1
208
+
209
+ logger.info(f"Inicializando Abelha {self.worker_id} (v7.8 - Resiliente)...")
210
+ self._setup_git_repo()
211
+ logger.info(f"Abelha pronta. Estado: {len(self.paginas_status)} págs, {len(self.processos_vistos)} processos.")
 
 
 
212
 
213
+ def _setup_git_repo(self):
214
+ remote_url = f"https://oauth2:{GIT_TOKEN}@github.com/{GITHUB_REPO}.git"
215
+ if not LOCAL_REPO_PATH.exists():
216
+ self.repo = Repo.clone_from(remote_url, LOCAL_REPO_PATH, branch=GITHUB_BRANCH)
217
  else:
218
  self.repo = Repo(LOCAL_REPO_PATH)
219
+ self.repo.remotes.origin.set_url(remote_url)
 
 
220
 
221
+ with self.repo.config_writer() as config:
222
+ config.set_value("pull", "rebase", "true")
223
+ config.set_value("user", "email", "worker@tjpr.bot")
224
+ config.set_value("user", "name", "TJPR Worker")
225
+
226
+ self._carregar_estado_do_repo()
227
+
228
+ def _carregar_estado_do_repo(self):
229
+ with self.lock_git:
230
+ logger.info("Sincronizando (Pull inicial)...")
231
+ try:
232
+ self.repo.remotes.origin.pull()
233
+ except Exception as e:
234
+ logger.warning(f"Pull inicial falhou (continuando com estado local se existir): {e}")
235
+
236
+ path_paginas = LOCAL_REPO_PATH / "paginas_status.csv"
237
+ if not path_paginas.exists():
238
+ with open(path_paginas, 'w', newline='', encoding='utf-8') as f: csv.writer(f).writerow(['pagina', 'status'])
239
+ with open(path_paginas, 'r', encoding='utf-8') as f: reader = csv.reader(f); next(reader, None); self.paginas_status = {int(row[0]): row[1] for row in reader if row}
240
 
241
+ path_processos = LOCAL_REPO_PATH / "processos_vistos.csv"
242
+ if not path_processos.exists():
243
+ with open(path_processos, 'w', newline='', encoding='utf-8') as f: csv.writer(f).writerow(['id', 'processo_25'])
244
+ with open(path_processos, 'r', encoding='utf-8') as f:
245
+ reader = csv.reader(f); next(reader, None); ids = [int(row[0]) for row in reader if len(row) > 1 and row[0].isdigit() and self.processos_vistos.add(row[1]) is None]
246
+ if ids: self.proximo_id_registro = max(ids) + 1
 
 
 
 
 
 
 
 
 
247
 
248
+ path_saida = LOCAL_REPO_PATH / "new_doce"; path_saida.mkdir(exist_ok=True)
249
+ arquivos = list(path_saida.glob("lote_*.tar.gz"))
250
+ if arquivos:
251
+ numeros = [int(re.search(r'(\d+)', f.name).group(1)) for f in arquivos if re.search(r'(\d+)', f.name)]
252
+ if numeros: self.num_lote_saida = max(numeros) + 1
253
 
254
+ def _selecionar_lote_de_paginas(self):
255
+ paginas_para_trabalhar = [p for p in range(MIN_PAGINA, MAX_PAGINA + 1) if p not in self.paginas_status]
256
+ if not paginas_para_trabalhar: return False
257
+ lote = paginas_para_trabalhar[:TAMANHO_LOTE_PAGINAS]
258
+ for pagina in lote: self.fila_de_tarefas.put(pagina)
259
+ logger.info(f"➕ Lote de {len(lote)} páginas adicionado à fila.")
260
+ return True
261
 
262
+ def _gerar_hash(self, texto: str) -> str: return hashlib.sha256(texto.encode('utf-8')).hexdigest() if texto else None
263
+ def _extrair_crypto_url(self, url: str) -> str: match = re.search(r"tjpr\.url\.crypto=([a-f0-9]+)", url); return match.group(1) if match else None
264
+ def extrair_processo_25(self, processo_str: str) -> str: return re.sub(r'[^a-zA-Z0-9\-.]', '', str(processo_str))[:25]
265
+
266
+ def _transformar_e_normalizar(self, acordao: dict, pagina: int, registro_id: int) -> dict:
267
+ processo_completo, ementa, integra = acordao.get("processo", ""), acordao.get("ementa", ""), acordao.get("integra_do_acordao", "")
268
+ pacote_principal = { "Id": registro_id, "pagina": pagina, "processo": self.extrair_processo_25(processo_completo), "url_documento": self._extrair_crypto_url(acordao.get("url_documento")), "relatora": acordao.get("relatora", acordao.get("relator(a)", "")), "orgao_julgador": acordao.get("orgao_julgador", ""), "comarca": acordao.get("comarca", acordao.get("comarca_de_origem", "")), "data_do_julgamento": acordao.get("data_do_julgamento", ""), "tipo_mov": acordao.get("tipo_mov", ""), "hash_interior_teor": self._gerar_hash(integra), "hash_ementa": self._gerar_hash(ementa) }
269
+ pacote_ementa = {"hash_texto": pacote_principal["hash_ementa"], "texto": ementa, "id_processo": registro_id} if ementa else None
270
+ pacote_integra = {"hash_texto": pacote_principal["hash_interior_teor"], "texto": integra, "id_processo": registro_id} if integra else None
271
+ return {"principal": pacote_principal, "ementa": pacote_ementa, "integra": pacote_integra}
272
+
273
+ def validar_registro(self, reg):
274
+ processo_completo, processo_25 = reg.get("processo", ""), self.extrair_processo_25(reg.get("processo", ""))
275
+ if not processo_25 or processo_25 in self.processos_vistos: return False
276
+ url, ementa, integra = reg.get("url_documento", ""), reg.get("ementa", ""), reg.get("integra_do_acordao", "")
277
+ if not any([url, ementa, integra]): return False
278
+ ementa_upper, integra_upper = ementa.upper().strip(), integra.upper().strip()
279
+ if len(processo_completo.strip()) < 25 and processo_completo.upper() not in ementa_upper: return False
280
+ if len(processo_completo.strip()) == 25 and (processo_completo.upper() not in ementa_upper and processo_completo.upper() not in integra_upper): return False
281
+ if ementa and integra and ementa_upper[:60] not in integra_upper: reg["ementa"] = ""
282
+ return True
283
+
284
+ def _produtor_worker(self):
285
+ time.sleep(random.uniform(0.5, 2.0))
 
 
 
286
  while True:
287
  try:
288
+ pagina = self.fila_de_tarefas.get(timeout=3); self._processar_pagina_extracao(pagina); self.fila_de_tarefas.task_done()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  except Empty: break
290
+ except Exception as e: logger.error(f"Erro worker: {e}"); self.fila_de_tarefas.task_done()
291
+
292
+ def extrair_dados_pagina(self, pagina: int) -> tuple[list, str]:
293
+ form_data = {"actionType": "pesquisar", "criterioPesquisa": "", "idLocalPesquisa": "99", "pageSize": "50", "pageNumber": str(pagina), "sortColumn": "processos.dataJulgamento", "sortOrder": "asc", "segredoJustica": "pesquisar sem", "mostrarCompleto": "true"}
294
+ for tentativa in range(MAX_TENTATIVAS_EXTRACAO):
295
+ try:
296
+ response = self.session.post("https://portal.tjpr.jus.br/jurisprudencia/publico/pesquisa.do", data=form_data, timeout=45)
297
+ response.raise_for_status(); return self.extrator_html.extrair_todos_acordaos(response.text), response.text
298
+ except requests.RequestException: time.sleep(5)
299
+ raise ConnectionError(f"Excedido o número de tentativas para a página {pagina}.")
300
+
301
+ def _processar_pagina_extracao(self, pagina: int):
302
+ # Removido logger.info por página para limpar output
303
+ try:
304
+ acordaos_brutos, html_conteudo = self.extrair_dados_pagina(pagina)
305
+ self.fila_de_resultados.put({'tipo': 'html_bruto', 'pagina': pagina, 'conteudo': html_conteudo})
306
+ if not acordaos_brutos:
307
+ self.fila_de_resultados.put({'tipo': 'status_pagina', 'pagina': pagina, 'status': 'vazio'}); return
308
+ num_validos = sum(1 for ac in acordaos_brutos if self.validar_registro(self._enriquecer_acordao(ac)) and self.fila_de_resultados.put({'tipo': 'acordao', 'dados': ac, 'pagina': pagina}) is None)
309
+ self.fila_de_resultados.put({'tipo': 'status_pagina', 'pagina': pagina, 'status': 'sucesso'})
310
+ except Exception:
311
+ self.fila_de_resultados.put({'tipo': 'status_pagina', 'pagina': pagina, 'status': 'falha'})
312
 
313
+ def _enriquecer_acordao(self, acordao: dict) -> dict:
314
+ if "processo" in acordao:
315
+ match = re.search(r"^(.*?)\s*\((.*?)\)", acordao["processo"])
316
+ if match: acordao["processo"], acordao["tipo_mov"] = match.group(1).strip(), match.group(2).strip()
317
+ return acordao
318
 
319
+ def _consumidor_push_para_git(self, lote_pacotes, lote_tags, lote_html, paginas_atualizadas, processos_novos):
320
+ with self.lock_git:
321
+ logger.info(f"📤 [PUSH] Preparando Lote {self.num_lote_saida}: {len(lote_pacotes)} registros de {len(paginas_atualizadas)} páginas...")
322
+
323
  try:
324
+ # 1. Preparar Arquivo .tar.gz
325
+ path_saida = LOCAL_REPO_PATH / "new_doce"; path_saida.mkdir(exist_ok=True)
326
+ base_name = f"{self.num_lote_saida:05d}"; archive_path = path_saida / f"lote_{base_name}.tar.gz"
327
+
328
+ with tarfile.open(archive_path, "w:gz") as tar:
329
+ buffers = {
330
+ f"acordaos_{base_name}.jsonl": io.StringIO(),
331
+ f"ementa_{base_name}.jsonl": io.StringIO(),
332
+ f"integra_do_acordao_{base_name}.jsonl": io.StringIO(),
333
+ f"tags_destaques_{base_name}.jsonl": io.StringIO()
334
+ }
335
+ for pacote in lote_pacotes:
336
+ buffers[f"acordaos_{base_name}.jsonl"].write(json.dumps(pacote['principal'], ensure_ascii=False) + '\n')
337
+ if pacote['ementa']: buffers[f"ementa_{base_name}.jsonl"].write(json.dumps(pacote['ementa'], ensure_ascii=False) + '\n')
338
+ if pacote['integra']: buffers[f"integra_do_acordao_{base_name}.jsonl"].write(json.dumps(pacote['integra'], ensure_ascii=False) + '\n')
339
+ for tag in lote_tags: buffers[f"tags_destaques_{base_name}.jsonl"].write(json.dumps(tag, ensure_ascii=False) + '\n')
340
 
341
+ for filename, buffer in buffers.items():
342
+ data = buffer.getvalue().encode('utf-8')
343
+ tarinfo = tarfile.TarInfo(name=filename); tarinfo.size = len(data)
344
+ tar.addfile(tarinfo, io.BytesIO(data))
345
+
346
+ # 2. Atualizar CSVs (Modo Append/Rewrite local)
347
+ with open(LOCAL_REPO_PATH / "paginas_status.csv", 'w', newline='', encoding='utf-8') as f:
348
+ writer = csv.writer(f); writer.writerow(['pagina', 'status'])
349
+ writer.writerows(sorted(self.paginas_status.items())) # Escreve todo o estado conhecido
350
+
351
+ with open(LOCAL_REPO_PATH / "processos_vistos.csv", 'a', newline='', encoding='utf-8') as f:
352
+ writer = csv.writer(f)
353
+ writer.writerows(sorted(processos_novos.items(), key=lambda item: item[1]))
354
 
355
+ # 3. Estatísticas e Logs
356
+ self.stats.registrar(len(paginas_atualizadas), len(processos_novos))
357
+ dados_stats = self.stats.salvar_arquivo()
358
+
359
+ # 4. Git Flow Resiliente
360
+ self.repo.git.add(["new_doce/", "paginas_status.csv", "processos_vistos.csv", "stats.json"])
361
 
362
+ if self.repo.is_dirty():
363
+ self.repo.index.commit(f"DATA: Lote {self.num_lote_saida} ({len(lote_pacotes)} regs)")
364
+
365
+ try:
366
+ self.repo.remotes.origin.pull(rebase=True)
367
+ logger.info("✅ Pull (Rebase) OK.")
368
+ except GitCommandError as e:
369
+ logger.warning(f"⚠️ Pull falhou (conflito ou rede), mas commit local salvo: {e}")
370
+
371
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
372
  self.repo.remotes.origin.push()
373
+ logger.info(f"🚀 PUSH do lote {self.num_lote_saida} concluído.")
374
+
375
+ # Log Estatístico
376
+ p = dados_stats['medias_paginas']
377
+ pr = dados_stats['medias_processos']
378
+ logger.info(f"📊 ESTATÍSTICAS: Total: {dados_stats['total_paginas']} págs | "
379
+ f"5m: {p['5min']}p/{pr['5min']}proc | "
380
+ f"30m: {p['30min']}p/{pr['30min']}proc | "
381
+ f"120m: {p['120min']}p/{pr['120min']}proc")
382
 
383
+ except GitCommandError as e:
384
+ logger.error(f" Push falhou (será enviado na próxima rodada): {e}")
385
+
386
+ self.num_lote_saida += 1
387
+ with self.lock_estado: self.registros_acumulados.clear(); self.tags_destaques_acumuladas.clear(); self.html_bruto_acumulado.clear()
388
+
389
+ except Exception as e:
390
+ logger.critical(f"❌ FALHA CRÍTICA GERAL no push: {e}", exc_info=True)
391
+
392
+ self.thread_de_push = None
393
+
394
+ def loop_infinito(self):
395
+ pool_produtores = ThreadPoolExecutor(max_workers=NUM_WORKERS_EXTRACao, thread_name_prefix='ProdutorWorker')
396
  while True:
397
  try:
398
+ if self.fila_de_tarefas.empty() and self._selecionar_lote_de_paginas():
399
+ for _ in range(NUM_WORKERS_EXTRACao): pool_produtores.submit(self._produtor_worker)
400
 
401
+ processos_novos_neste_ciclo = {}
402
+ paginas_atualizadas_ciclo = {}
 
 
403
 
404
+ # Consome fila de resultados
405
+ while not self.fila_de_resultados.empty():
406
+ item = self.fila_de_resultados.get_nowait()
407
+ with self.lock_estado:
408
+ if item['tipo'] == 'status_pagina':
409
+ self.paginas_status[item['pagina']] = item['status']
410
+ paginas_atualizadas_ciclo[item['pagina']] = item['status']
411
+ elif item['tipo'] == 'html_bruto':
412
+ self.html_bruto_acumulado[item['pagina']] = item['conteudo']
413
+ elif item['tipo'] == 'acordao':
414
+ acordao = item['dados']; processo_25 = self.extrair_processo_25(acordao.get("processo"))
415
+ if processo_25 and processo_25 not in self.processos_vistos:
416
+ reg_id = self.proximo_id_registro
417
+ self.registros_acumulados.append(self._transformar_e_normalizar(acordao, item['pagina'], reg_id))
418
+ self.tags_destaques_acumuladas.extend(self.destaque_extractor.extrair_destaques(acordao, reg_id))
419
+ self.processos_vistos.add(processo_25); processos_novos_neste_ciclo[processo_25] = reg_id; self.proximo_id_registro += 1
420
+
421
+ # Dispara thread de persistência
422
+ if len(self.registros_acumulados) >= TAMANHO_LOTE_REGISTROS and not self.thread_de_push:
423
+ lote = list(self.registros_acumulados)
424
+ tags = list(self.tags_destaques_acumuladas)
425
+ html = dict(self.html_bruto_acumulado)
426
+ paginas = dict(self.paginas_status) # Cópia do estado total
 
 
 
 
 
427
 
428
+ self.thread_de_push = Thread(
429
+ target=self._consumidor_push_para_git,
430
+ args=(lote, tags, html, paginas, processos_novos_neste_ciclo),
431
+ name="PushThread"
432
+ )
433
+ self.thread_de_push.start()
434
 
 
435
  time.sleep(5)
436
+ except Exception as e: logger.critical(f"ERRO CRÍTICO no loop principal: {e}", exc_info=True); time.sleep(15)
437
+
438
+ def main():
439
+ try: AbelhaAtomica().loop_infinito()
440
+ except Exception as e: logger.critical(f"Falha fatal na inicialização: {e}", exc_info=True); exit(1)
441
 
442
+ if __name__ == "__main__":
443
+ main()