| """ |
| STJ Consulta Processual — HuggingFace Space |
| 1 browser + N abas para paralelismo eficiente. |
| |
| Endpoints: |
| POST /consultar — processa lote de numeros_registro |
| GET /health — status do browser |
| POST /reiniciar — reinicia browser (kill + start) |
| |
| Arquitetura: |
| - 1 instancia de Chrome (undetected-chromedriver) via Xvfb |
| - N abas abertas simultaneamente (window.open) |
| - Abas compartilham cookies (Cloudflare resolvido 1x) |
| - Fases: |
| 1. Abrir abas com navegacao paralela |
| 2. Aguardar carregamento coletivo |
| 3. Clicar "Fases" em cada aba |
| 4. Aguardar AJAX paralelo |
| 5. Extrair proclamacao de cada aba |
| """ |
|
|
| import hashlib |
| import logging |
| import os |
| import re |
| import time |
| import random |
| import threading |
| from datetime import datetime |
| from typing import Optional |
|
|
| import undetected_chromedriver as uc |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.support.wait import WebDriverWait |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.common.exceptions import ( |
| TimeoutException, |
| WebDriverException, |
| NoSuchWindowException, |
| StaleElementReferenceException, |
| ) |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel, Field, field_validator |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| ) |
| logger = logging.getLogger("stj-consulta") |
|
|
| |
| REGEX_DISPOSITIVO = re.compile( |
| r"decidiu\s+(.*?)(?:\.\s*-|,\s*nos termos|\.\s*$)", |
| re.IGNORECASE | re.DOTALL, |
| ) |
| REGEX_FALLBACK = re.compile( |
| r"(?:por\s+(?:unanimidade|maioria)[\s,]+(?:de\s+votos[\s,]+)?)" |
| r"(.*?)(?:\.\s*-|,\s*nos termos|\.\s*$)", |
| re.IGNORECASE | re.DOTALL, |
| ) |
| REGEX_PARCIAL = re.compile( |
| r"Proclama[çc][aã]o\s+Parcial\s+de\s+Julgamento:\s*(.*?)(?:\.\s*-|\.\s*$)", |
| re.IGNORECASE | re.DOTALL, |
| ) |
| REGEX_VOTACAO = re.compile( |
| r"por\s+(unanimidade|maioria(?:\s+de\s+votos)?)", |
| re.IGNORECASE, |
| ) |
| MAPA_TIPO = [ |
| (re.compile(r"neg\w+\s+provimento", re.IGNORECASE), "NEGADO"), |
| (re.compile(r"dar\s+parcial\s+provimento", re.IGNORECASE), "PARCIAL"), |
| (re.compile(r"dar\s+provimento", re.IGNORECASE), "PROVIDO"), |
| (re.compile(r"conhec\w+\s+parcialmente.*neg\w+\s+provimento", re.IGNORECASE), "NEGADO"), |
| (re.compile(r"prejudicad", re.IGNORECASE), "PREJUDICADO"), |
| (re.compile(r"n[aã]o\s+conhec", re.IGNORECASE), "NAO_CONHECIDO"), |
| (re.compile(r"retirad[oa]\s+de\s+pauta", re.IGNORECASE), "RETIRADO"), |
| (re.compile(r"destaque", re.IGNORECASE), "DESTACADO"), |
| (re.compile(r"rejeit\w+", re.IGNORECASE), "REJEITADO"), |
| (re.compile(r"acolh\w+", re.IGNORECASE), "ACOLHIDO"), |
| (re.compile(r"homolog", re.IGNORECASE), "HOMOLOGADO"), |
| ] |
|
|
| URL_BASE = ( |
| "https://processo.stj.jus.br/processo/pesquisa/" |
| "?termo={nr}" |
| "&aplicacao=processos.ea" |
| "&tipoPesquisa=tipoPesquisaGenerica" |
| "&chkordem=DESC" |
| "&chkMorto=MORTO" |
| ) |
|
|
|
|
| |
|
|
| def classificar_tipo(texto: str) -> str: |
| for regex, tipo in MAPA_TIPO: |
| if regex.search(texto): |
| return tipo |
| return "OUTRO" |
|
|
|
|
| def extrair_dispositivo(texto: str) -> tuple[Optional[str], Optional[str], Optional[str]]: |
| |
| match = REGEX_PARCIAL.search(texto) |
| if match: |
| d = match.group(1).strip() |
| return d, classificar_tipo(d), "ALTA" |
|
|
| |
| match = REGEX_DISPOSITIVO.search(texto) |
| if match: |
| d = match.group(1).strip() |
| return d, classificar_tipo(d), "ALTA" |
|
|
| |
| match = REGEX_FALLBACK.search(texto) |
| if match: |
| d = match.group(1).strip() |
| return d, classificar_tipo(d), "ALTA" |
|
|
| |
| t = classificar_tipo(texto) |
| if t != "OUTRO": |
| return texto[:200], t, "BAIXA" |
| return None, None, None |
|
|
|
|
| |
|
|
| class GerenciadorBrowser: |
| """1 browser Chrome, N abas paralelas, lock para acesso exclusivo.""" |
|
|
| def __init__(self): |
| self._driver: Optional[uc.Chrome] = None |
| self._lock = threading.Lock() |
| self._cloudflare_ok = False |
| self._iniciado_em: Optional[datetime] = None |
| self._total_processados = 0 |
| self._total_encontrados = 0 |
| self._total_erros = 0 |
|
|
| def _iniciar_browser(self): |
| if self._driver is not None: |
| return |
| logger.info("Iniciando browser (undetected-chromedriver)...") |
| opcoes = uc.ChromeOptions() |
| opcoes.add_argument("--no-sandbox") |
| opcoes.add_argument("--disable-dev-shm-usage") |
| opcoes.add_argument("--disable-gpu") |
| opcoes.add_argument("--disable-extensions") |
| opcoes.add_argument("--disable-background-networking") |
| opcoes.add_argument("--disable-popup-blocking") |
| opcoes.add_argument("--window-size=1920,1080") |
| opcoes.add_argument("--disable-infobars") |
| |
| opcoes.add_experimental_option("prefs", { |
| "profile.managed_default_content_settings.images": 2, |
| }) |
| self._driver = uc.Chrome(options=opcoes, version_main=None) |
| self._iniciado_em = datetime.now() |
| logger.info("Browser iniciado com sucesso") |
|
|
| def _resolver_cloudflare(self): |
| if self._cloudflare_ok: |
| return |
| logger.info("Resolvendo Cloudflare (primeiro acesso)...") |
| self._driver.get("https://processo.stj.jus.br/processo/pesquisa/") |
| time.sleep(5) |
| titulo = self._driver.title.lower() |
| if "momento" in titulo or "verifica" in titulo: |
| logger.warning("Cloudflare pendente, aguardando +5s...") |
| time.sleep(5) |
| titulo = self._driver.title.lower() |
| if "momento" in titulo or "verifica" in titulo: |
| logger.warning("Cloudflare pendente, aguardando +10s...") |
| time.sleep(10) |
| titulo = self._driver.title.lower() |
| if "momento" in titulo or "verifica" in titulo: |
| raise RuntimeError( |
| f"Cloudflare nao resolvido apos 20s. Titulo: {self._driver.title}" |
| ) |
| self._cloudflare_ok = True |
| logger.info("Cloudflare resolvido, titulo=%s", self._driver.title[:60]) |
|
|
| def _fechar_browser(self): |
| if self._driver: |
| try: |
| self._driver.quit() |
| except Exception: |
| pass |
| self._driver = None |
| self._cloudflare_ok = False |
|
|
| def reiniciar(self): |
| with self._lock: |
| logger.info("Reiniciando browser...") |
| self._fechar_browser() |
| self._iniciar_browser() |
| self._resolver_cloudflare() |
| logger.info("Browser reiniciado com sucesso") |
|
|
| def processar_lote( |
| self, |
| numeros_registro: list[str], |
| timeout_aba: int = 15, |
| ) -> list[dict]: |
| with self._lock: |
| try: |
| self._iniciar_browser() |
| self._resolver_cloudflare() |
| return self._processar_lote_impl(numeros_registro, timeout_aba) |
| except WebDriverException as e: |
| logger.error("WebDriverException fatal: %s", str(e)[:200]) |
| self._fechar_browser() |
| raise HTTPException( |
| status_code=500, |
| detail=f"Browser crash, reinicie com POST /reiniciar: {str(e)[:200]}", |
| ) |
|
|
| def _processar_lote_impl( |
| self, |
| numeros_registro: list[str], |
| timeout_aba: int, |
| ) -> list[dict]: |
| driver = self._driver |
| aba_original = driver.current_window_handle |
| mapa_abas: dict[str, str] = {} |
| resultados: list[dict] = [] |
| inicio = time.time() |
|
|
| try: |
| |
| |
| logger.info("F1: abrindo %d abas...", len(numeros_registro)) |
| handles_antes = set(driver.window_handles) |
|
|
| for nr in numeros_registro: |
| url = URL_BASE.format(nr=nr) |
| driver.execute_script(f"window.open('{url}')") |
| time.sleep(0.2) |
|
|
| |
| handles_novos = [ |
| h for h in driver.window_handles if h not in handles_antes |
| ] |
| for i, nr in enumerate(numeros_registro): |
| if i < len(handles_novos): |
| mapa_abas[nr] = handles_novos[i] |
| else: |
| resultados.append(_resultado_erro(nr, "falha_abrir_aba")) |
|
|
| |
| |
| logger.info("F2: aguardando carregamento coletivo (%d abas)...", len(mapa_abas)) |
| time.sleep(4) |
|
|
| |
| |
| logger.info("F3: clicando 'Fases' em %d abas...", len(mapa_abas)) |
| abas_com_fases: dict[str, str] = {} |
|
|
| for nr, handle in list(mapa_abas.items()): |
| try: |
| driver.switch_to.window(handle) |
| |
| titulo = driver.title.lower() |
| url_atual = driver.current_url |
| logger.info( |
| "Aba %s: titulo='%s', url='%s'", |
| nr, titulo[:60], url_atual[:80], |
| ) |
| if "momento" in titulo or "verifica" in titulo: |
| logger.warning("Cloudflare pendente na aba %s, aguardando 5s...", nr) |
| time.sleep(5) |
| titulo = driver.title.lower() |
| logger.info("Apos espera, titulo='%s'", titulo[:60]) |
|
|
| fases_btn = WebDriverWait(driver, timeout_aba).until( |
| EC.element_to_be_clickable( |
| (By.XPATH, "//a[contains(text(), 'Fases')]") |
| ) |
| ) |
| fases_btn.click() |
| abas_com_fases[nr] = handle |
| except TimeoutException: |
| |
| try: |
| diag_titulo = driver.title |
| diag_url = driver.current_url |
| diag_body = driver.find_element(By.TAG_NAME, "body").text[:200] |
| except Exception: |
| diag_titulo = "?" |
| diag_url = "?" |
| diag_body = "?" |
| logger.error( |
| "Timeout Fases nr=%s titulo='%s' url='%s' body='%s'", |
| nr, diag_titulo[:60], diag_url[:80], diag_body[:100], |
| ) |
| resultados.append(_resultado_erro( |
| nr, |
| f"aba_fases_nao_encontrada|titulo={diag_titulo[:50]}|body={diag_body[:80]}", |
| )) |
| except Exception as e: |
| resultados.append( |
| _resultado_erro(nr, f"erro_clique_fases: {str(e)[:100]}") |
| ) |
|
|
| |
| logger.info("F4: aguardando AJAX (%d abas)...", len(abas_com_fases)) |
| time.sleep(2) |
|
|
| |
| logger.info("F5: extraindo proclamacoes de %d abas...", len(abas_com_fases)) |
| for nr, handle in abas_com_fases.items(): |
| resultado = self._extrair_de_aba(driver, handle, nr, timeout_aba) |
| resultados.append(resultado) |
|
|
| finally: |
| |
| self._fechar_abas_extras(driver, aba_original) |
|
|
| |
| tempo_total = round(time.time() - inicio, 2) |
| encontrados = sum(1 for r in resultados if r.get("proclamacao_encontrada")) |
| erros = sum(1 for r in resultados if r.get("erro")) |
| self._total_processados += len(numeros_registro) |
| self._total_encontrados += encontrados |
| self._total_erros += erros |
|
|
| logger.info( |
| "Lote concluido: %d processos, %d encontrados, %d erros, " |
| "%.1fs total (%.2fs/proc)", |
| len(numeros_registro), |
| encontrados, |
| erros, |
| tempo_total, |
| tempo_total / max(len(numeros_registro), 1), |
| ) |
| return resultados |
|
|
| def _extrair_de_aba( |
| self, |
| driver: uc.Chrome, |
| handle: str, |
| nr: str, |
| timeout_aba: int, |
| ) -> dict: |
| resultado = { |
| "numero_registro": nr, |
| "proclamacao_encontrada": False, |
| "data_proclamacao": None, |
| "texto_completo": None, |
| "dispositivo": None, |
| "tipo_dispositivo": None, |
| "votacao": None, |
| "confianca": None, |
| "tempo_segundos": 0.0, |
| "erro": None, |
| "hash_trecho": None, |
| } |
| inicio = time.time() |
|
|
| try: |
| driver.switch_to.window(handle) |
|
|
| |
| WebDriverWait(driver, timeout_aba).until( |
| lambda d: ( |
| d.find_element(By.ID, "idDivFases").text or "" |
| ).strip() != "" |
| ) |
|
|
| texto = driver.find_element(By.ID, "idDivFases").text |
| resultado["hash_trecho"] = hashlib.sha256( |
| texto.encode() |
| ).hexdigest()[:16] |
|
|
| |
| for linha in texto.split("\n"): |
| if "proclama" in linha.lower() and "julgamento" in linha.lower(): |
| resultado["proclamacao_encontrada"] = True |
| resultado["texto_completo"] = linha.strip() |
| data_m = re.search(r"(\d{2}/\d{2}/\d{4})", linha) |
| if data_m: |
| resultado["data_proclamacao"] = data_m.group(1) |
| d, t, c = extrair_dispositivo(linha) |
| resultado["dispositivo"] = d |
| resultado["tipo_dispositivo"] = t |
| resultado["confianca"] = c |
| vot_m = REGEX_VOTACAO.search(linha) |
| resultado["votacao"] = ( |
| vot_m.group(1).strip() if vot_m else None |
| ) |
| break |
|
|
| except TimeoutException: |
| resultado["erro"] = "timeout_div_fases" |
| except NoSuchWindowException: |
| resultado["erro"] = "aba_fechada_inesperadamente" |
| except StaleElementReferenceException: |
| resultado["erro"] = "elemento_stale" |
| except Exception as e: |
| resultado["erro"] = str(e)[:200] |
|
|
| resultado["tempo_segundos"] = round(time.time() - inicio, 2) |
| return resultado |
|
|
| def _fechar_abas_extras(self, driver, aba_original: str): |
| for handle in driver.window_handles: |
| if handle != aba_original: |
| try: |
| driver.switch_to.window(handle) |
| driver.close() |
| except Exception: |
| pass |
| try: |
| driver.switch_to.window(aba_original) |
| except Exception: |
| pass |
|
|
| @property |
| def status(self) -> dict: |
| return { |
| "browser_ativo": self._driver is not None, |
| "cloudflare_resolvido": self._cloudflare_ok, |
| "iniciado_em": ( |
| self._iniciado_em.isoformat() if self._iniciado_em else None |
| ), |
| "total_processados": self._total_processados, |
| "total_encontrados": self._total_encontrados, |
| "total_erros": self._total_erros, |
| } |
|
|
|
|
| def _resultado_erro(nr: str, erro: str) -> dict: |
| return { |
| "numero_registro": nr, |
| "proclamacao_encontrada": False, |
| "data_proclamacao": None, |
| "texto_completo": None, |
| "dispositivo": None, |
| "tipo_dispositivo": None, |
| "votacao": None, |
| "confianca": None, |
| "tempo_segundos": 0.0, |
| "erro": erro, |
| "hash_trecho": None, |
| } |
|
|
|
|
| |
|
|
| gerenciador = GerenciadorBrowser() |
| app = FastAPI( |
| title="STJ Consulta Processual", |
| description="1 browser + N abas para extracao paralela de proclamacoes", |
| version="1.0.0", |
| ) |
|
|
|
|
| class RequisicaoConsulta(BaseModel): |
| numeros_registro: list[str] = Field( |
| ..., |
| min_length=1, |
| max_length=20, |
| description="Lista de numeros de registro (12 digitos cada)", |
| ) |
| timeout_aba: int = Field( |
| default=15, |
| ge=5, |
| le=60, |
| description="Timeout em segundos para cada aba", |
| ) |
|
|
| @field_validator("numeros_registro") |
| @classmethod |
| def validar_numeros(cls, v: list[str]) -> list[str]: |
| for nr in v: |
| if not nr.isdigit() or len(nr) < 10 or len(nr) > 14: |
| raise ValueError( |
| f"numero_registro invalido: '{nr}' (esperado 10-14 digitos)" |
| ) |
| return v |
|
|
|
|
| class ResultadoProcesso(BaseModel): |
| numero_registro: str |
| proclamacao_encontrada: bool = False |
| data_proclamacao: Optional[str] = None |
| texto_completo: Optional[str] = None |
| dispositivo: Optional[str] = None |
| tipo_dispositivo: Optional[str] = None |
| votacao: Optional[str] = None |
| confianca: Optional[str] = None |
| tempo_segundos: float = 0.0 |
| erro: Optional[str] = None |
| hash_trecho: Optional[str] = None |
|
|
|
|
| class MetaLote(BaseModel): |
| total: int |
| com_proclamacao: int |
| com_erro: int |
| tempo_total_segundos: float |
| tempo_medio_por_processo: float |
|
|
|
|
| class RespostaConsulta(BaseModel): |
| resultados: list[ResultadoProcesso] |
| meta: MetaLote |
|
|
|
|
| @app.get("/health") |
| def health(): |
| return {"status": "ok", "browser": gerenciador.status} |
|
|
|
|
| @app.post("/consultar", response_model=RespostaConsulta) |
| def consultar(req: RequisicaoConsulta): |
| """Processa lote de numeros_registro em abas paralelas.""" |
| inicio = time.time() |
|
|
| resultados = gerenciador.processar_lote( |
| numeros_registro=req.numeros_registro, |
| timeout_aba=req.timeout_aba, |
| ) |
|
|
| tempo_total = round(time.time() - inicio, 2) |
| total = len(resultados) |
| com_proc = sum(1 for r in resultados if r.get("proclamacao_encontrada")) |
| com_erro = sum(1 for r in resultados if r.get("erro")) |
|
|
| return RespostaConsulta( |
| resultados=resultados, |
| meta=MetaLote( |
| total=total, |
| com_proclamacao=com_proc, |
| com_erro=com_erro, |
| tempo_total_segundos=tempo_total, |
| tempo_medio_por_processo=round( |
| tempo_total / max(total, 1), 2 |
| ), |
| ), |
| ) |
|
|
|
|
| @app.post("/reiniciar") |
| def reiniciar(): |
| """Mata o browser e reinicia (resolve Cloudflare de novo).""" |
| try: |
| gerenciador.reiniciar() |
| return {"status": "ok", "browser": gerenciador.status} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)[:500]) |
|
|
|
|
| @app.get("/debug/{numero_registro}") |
| def debug(numero_registro: str): |
| """Abre 1 aba, navega, e retorna diagnostico completo.""" |
| with gerenciador._lock: |
| try: |
| gerenciador._iniciar_browser() |
| gerenciador._resolver_cloudflare() |
| except Exception as e: |
| return {"erro": f"browser/cloudflare: {str(e)[:200]}"} |
|
|
| driver = gerenciador._driver |
| original = driver.current_window_handle |
| resultado = {} |
|
|
| try: |
| |
| resultado["main_titulo"] = driver.title |
| resultado["main_url"] = driver.current_url |
| resultado["main_cookies"] = len(driver.get_cookies()) |
|
|
| |
| url = URL_BASE.format(nr=numero_registro) |
| resultado["url_alvo"] = url |
| driver.execute_script(f"window.open('{url}')") |
| time.sleep(1) |
|
|
| handles = [h for h in driver.window_handles if h != original] |
| if not handles: |
| resultado["erro"] = "aba nao abriu" |
| return resultado |
|
|
| driver.switch_to.window(handles[0]) |
|
|
| |
| for i in range(15): |
| time.sleep(1) |
| titulo = driver.title |
| url_atual = driver.current_url |
| resultado[f"t{i+1}_titulo"] = titulo |
| resultado[f"t{i+1}_url"] = url_atual[:100] |
| |
| if "momento" not in titulo.lower() and "verifica" not in titulo.lower(): |
| if titulo.strip(): |
| break |
|
|
| resultado["titulo_final"] = driver.title |
| resultado["url_final"] = driver.current_url |
| resultado["body_preview"] = driver.find_element( |
| By.TAG_NAME, "body" |
| ).text[:500] |
|
|
| |
| try: |
| fases = driver.find_element( |
| By.XPATH, "//a[contains(text(), 'Fases')]" |
| ) |
| resultado["fases_encontrada"] = True |
| resultado["fases_text"] = fases.text |
| except Exception: |
| resultado["fases_encontrada"] = False |
| |
| links = driver.find_elements(By.TAG_NAME, "a") |
| resultado["links_na_pagina"] = [ |
| a.text[:50] for a in links[:20] if a.text.strip() |
| ] |
|
|
| except Exception as e: |
| resultado["erro"] = str(e)[:300] |
| finally: |
| for h in driver.window_handles: |
| if h != original: |
| try: |
| driver.switch_to.window(h) |
| driver.close() |
| except Exception: |
| pass |
| driver.switch_to.window(original) |
|
|
| return resultado |