FlareS / app.py
aanycmn's picture
Create app.py
259e6ad verified
"""
STJ Consulta Processual — HuggingFace Space
1 browser + N abas para paralelismo eficiente.
Endpoints:
POST /consultar — processa lote de numeros_registro
GET /health — status do browser
POST /reiniciar — reinicia browser (kill + start)
Arquitetura:
- 1 instancia de Chrome (undetected-chromedriver) via Xvfb
- N abas abertas simultaneamente (window.open)
- Abas compartilham cookies (Cloudflare resolvido 1x)
- Fases:
1. Abrir abas com navegacao paralela
2. Aguardar carregamento coletivo
3. Clicar "Fases" em cada aba
4. Aguardar AJAX paralelo
5. Extrair proclamacao de cada aba
"""
import hashlib
import logging
import os
import re
import time
import random
import threading
from datetime import datetime
from typing import Optional
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
TimeoutException,
WebDriverException,
NoSuchWindowException,
StaleElementReferenceException,
)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, field_validator
# === Logging ===
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("stj-consulta")
# === Regex (poc_v2 calibrado, 10 tipos, 100% ALTA em 50 processos) ===
REGEX_DISPOSITIVO = re.compile(
r"decidiu\s+(.*?)(?:\.\s*-|,\s*nos termos|\.\s*$)",
re.IGNORECASE | re.DOTALL,
)
REGEX_FALLBACK = re.compile(
r"(?:por\s+(?:unanimidade|maioria)[\s,]+(?:de\s+votos[\s,]+)?)"
r"(.*?)(?:\.\s*-|,\s*nos termos|\.\s*$)",
re.IGNORECASE | re.DOTALL,
)
REGEX_PARCIAL = re.compile(
r"Proclama[çc][aã]o\s+Parcial\s+de\s+Julgamento:\s*(.*?)(?:\.\s*-|\.\s*$)",
re.IGNORECASE | re.DOTALL,
)
REGEX_VOTACAO = re.compile(
r"por\s+(unanimidade|maioria(?:\s+de\s+votos)?)",
re.IGNORECASE,
)
MAPA_TIPO = [
(re.compile(r"neg\w+\s+provimento", re.IGNORECASE), "NEGADO"),
(re.compile(r"dar\s+parcial\s+provimento", re.IGNORECASE), "PARCIAL"),
(re.compile(r"dar\s+provimento", re.IGNORECASE), "PROVIDO"),
(re.compile(r"conhec\w+\s+parcialmente.*neg\w+\s+provimento", re.IGNORECASE), "NEGADO"),
(re.compile(r"prejudicad", re.IGNORECASE), "PREJUDICADO"),
(re.compile(r"n[aã]o\s+conhec", re.IGNORECASE), "NAO_CONHECIDO"),
(re.compile(r"retirad[oa]\s+de\s+pauta", re.IGNORECASE), "RETIRADO"),
(re.compile(r"destaque", re.IGNORECASE), "DESTACADO"),
(re.compile(r"rejeit\w+", re.IGNORECASE), "REJEITADO"),
(re.compile(r"acolh\w+", re.IGNORECASE), "ACOLHIDO"),
(re.compile(r"homolog", re.IGNORECASE), "HOMOLOGADO"),
]
URL_BASE = (
"https://processo.stj.jus.br/processo/pesquisa/"
"?termo={nr}"
"&aplicacao=processos.ea"
"&tipoPesquisa=tipoPesquisaGenerica"
"&chkordem=DESC"
"&chkMorto=MORTO"
)
# === Funcoes de extracao (identicas a poc_v2) ===
def classificar_tipo(texto: str) -> str:
for regex, tipo in MAPA_TIPO:
if regex.search(texto):
return tipo
return "OUTRO"
def extrair_dispositivo(texto: str) -> tuple[Optional[str], Optional[str], Optional[str]]:
# 1. Proclamacao Parcial (retirado, destaque, etc.)
match = REGEX_PARCIAL.search(texto)
if match:
d = match.group(1).strip()
return d, classificar_tipo(d), "ALTA"
# 2. Primario (apos "decidiu")
match = REGEX_DISPOSITIVO.search(texto)
if match:
d = match.group(1).strip()
return d, classificar_tipo(d), "ALTA"
# 3. Fallback (apos "unanimidade/maioria")
match = REGEX_FALLBACK.search(texto)
if match:
d = match.group(1).strip()
return d, classificar_tipo(d), "ALTA"
# 4. Heuristica
t = classificar_tipo(texto)
if t != "OUTRO":
return texto[:200], t, "BAIXA"
return None, None, None
# === Gerenciador de Browser (singleton thread-safe) ===
class GerenciadorBrowser:
"""1 browser Chrome, N abas paralelas, lock para acesso exclusivo."""
def __init__(self):
self._driver: Optional[uc.Chrome] = None
self._lock = threading.Lock()
self._cloudflare_ok = False
self._iniciado_em: Optional[datetime] = None
self._total_processados = 0
self._total_encontrados = 0
self._total_erros = 0
def _iniciar_browser(self):
if self._driver is not None:
return
logger.info("Iniciando browser (undetected-chromedriver)...")
opcoes = uc.ChromeOptions()
opcoes.add_argument("--no-sandbox")
opcoes.add_argument("--disable-dev-shm-usage")
opcoes.add_argument("--disable-gpu")
opcoes.add_argument("--disable-extensions")
opcoes.add_argument("--disable-background-networking")
opcoes.add_argument("--disable-popup-blocking")
opcoes.add_argument("--window-size=1920,1080")
opcoes.add_argument("--disable-infobars")
# Bloquear imagens para economizar banda/CPU
opcoes.add_experimental_option("prefs", {
"profile.managed_default_content_settings.images": 2,
})
self._driver = uc.Chrome(options=opcoes, version_main=None)
self._iniciado_em = datetime.now()
logger.info("Browser iniciado com sucesso")
def _resolver_cloudflare(self):
if self._cloudflare_ok:
return
logger.info("Resolvendo Cloudflare (primeiro acesso)...")
self._driver.get("https://processo.stj.jus.br/processo/pesquisa/")
time.sleep(5)
titulo = self._driver.title.lower()
if "momento" in titulo or "verifica" in titulo:
logger.warning("Cloudflare pendente, aguardando +5s...")
time.sleep(5)
titulo = self._driver.title.lower()
if "momento" in titulo or "verifica" in titulo:
logger.warning("Cloudflare pendente, aguardando +10s...")
time.sleep(10)
titulo = self._driver.title.lower()
if "momento" in titulo or "verifica" in titulo:
raise RuntimeError(
f"Cloudflare nao resolvido apos 20s. Titulo: {self._driver.title}"
)
self._cloudflare_ok = True
logger.info("Cloudflare resolvido, titulo=%s", self._driver.title[:60])
def _fechar_browser(self):
if self._driver:
try:
self._driver.quit()
except Exception:
pass
self._driver = None
self._cloudflare_ok = False
def reiniciar(self):
with self._lock:
logger.info("Reiniciando browser...")
self._fechar_browser()
self._iniciar_browser()
self._resolver_cloudflare()
logger.info("Browser reiniciado com sucesso")
def processar_lote(
self,
numeros_registro: list[str],
timeout_aba: int = 15,
) -> list[dict]:
with self._lock:
try:
self._iniciar_browser()
self._resolver_cloudflare()
return self._processar_lote_impl(numeros_registro, timeout_aba)
except WebDriverException as e:
logger.error("WebDriverException fatal: %s", str(e)[:200])
self._fechar_browser()
raise HTTPException(
status_code=500,
detail=f"Browser crash, reinicie com POST /reiniciar: {str(e)[:200]}",
)
def _processar_lote_impl(
self,
numeros_registro: list[str],
timeout_aba: int,
) -> list[dict]:
driver = self._driver
aba_original = driver.current_window_handle
mapa_abas: dict[str, str] = {} # nr -> window_handle
resultados: list[dict] = []
inicio = time.time()
try:
# === FASE 1: Abrir abas com navegacao paralela ===
# window.open(url) e non-blocking: todas as abas carregam simultaneamente
logger.info("F1: abrindo %d abas...", len(numeros_registro))
handles_antes = set(driver.window_handles)
for nr in numeros_registro:
url = URL_BASE.format(nr=nr)
driver.execute_script(f"window.open('{url}')")
time.sleep(0.2) # Pequeno delay para Chrome registrar a aba
# Mapear handles novos aos numeros na ordem de criacao
handles_novos = [
h for h in driver.window_handles if h not in handles_antes
]
for i, nr in enumerate(numeros_registro):
if i < len(handles_novos):
mapa_abas[nr] = handles_novos[i]
else:
resultados.append(_resultado_erro(nr, "falha_abrir_aba"))
# === FASE 2: Aguardar carregamento coletivo ===
# Todas as abas estao carregando em paralelo (1 processo Chrome, N tabs)
logger.info("F2: aguardando carregamento coletivo (%d abas)...", len(mapa_abas))
time.sleep(4)
# === FASE 3: Clicar "Fases" em cada aba ===
# Apos clicar, o AJAX de cada aba carrega em background (paralelo)
logger.info("F3: clicando 'Fases' em %d abas...", len(mapa_abas))
abas_com_fases: dict[str, str] = {}
for nr, handle in list(mapa_abas.items()):
try:
driver.switch_to.window(handle)
# Verificar se Cloudflare travou nesta aba
titulo = driver.title.lower()
url_atual = driver.current_url
logger.info(
"Aba %s: titulo='%s', url='%s'",
nr, titulo[:60], url_atual[:80],
)
if "momento" in titulo or "verifica" in titulo:
logger.warning("Cloudflare pendente na aba %s, aguardando 5s...", nr)
time.sleep(5)
titulo = driver.title.lower()
logger.info("Apos espera, titulo='%s'", titulo[:60])
fases_btn = WebDriverWait(driver, timeout_aba).until(
EC.element_to_be_clickable(
(By.XPATH, "//a[contains(text(), 'Fases')]")
)
)
fases_btn.click()
abas_com_fases[nr] = handle
except TimeoutException:
# Capturar diagnostico da aba que falhou
try:
diag_titulo = driver.title
diag_url = driver.current_url
diag_body = driver.find_element(By.TAG_NAME, "body").text[:200]
except Exception:
diag_titulo = "?"
diag_url = "?"
diag_body = "?"
logger.error(
"Timeout Fases nr=%s titulo='%s' url='%s' body='%s'",
nr, diag_titulo[:60], diag_url[:80], diag_body[:100],
)
resultados.append(_resultado_erro(
nr,
f"aba_fases_nao_encontrada|titulo={diag_titulo[:50]}|body={diag_body[:80]}",
))
except Exception as e:
resultados.append(
_resultado_erro(nr, f"erro_clique_fases: {str(e)[:100]}")
)
# === FASE 4: Aguardar AJAX das Fases (paralelo em background) ===
logger.info("F4: aguardando AJAX (%d abas)...", len(abas_com_fases))
time.sleep(2)
# === FASE 5: Extrair proclamacao de cada aba ===
logger.info("F5: extraindo proclamacoes de %d abas...", len(abas_com_fases))
for nr, handle in abas_com_fases.items():
resultado = self._extrair_de_aba(driver, handle, nr, timeout_aba)
resultados.append(resultado)
finally:
# Fechar todas as abas extras (manter apenas a original)
self._fechar_abas_extras(driver, aba_original)
# Metricas
tempo_total = round(time.time() - inicio, 2)
encontrados = sum(1 for r in resultados if r.get("proclamacao_encontrada"))
erros = sum(1 for r in resultados if r.get("erro"))
self._total_processados += len(numeros_registro)
self._total_encontrados += encontrados
self._total_erros += erros
logger.info(
"Lote concluido: %d processos, %d encontrados, %d erros, "
"%.1fs total (%.2fs/proc)",
len(numeros_registro),
encontrados,
erros,
tempo_total,
tempo_total / max(len(numeros_registro), 1),
)
return resultados
def _extrair_de_aba(
self,
driver: uc.Chrome,
handle: str,
nr: str,
timeout_aba: int,
) -> dict:
resultado = {
"numero_registro": nr,
"proclamacao_encontrada": False,
"data_proclamacao": None,
"texto_completo": None,
"dispositivo": None,
"tipo_dispositivo": None,
"votacao": None,
"confianca": None,
"tempo_segundos": 0.0,
"erro": None,
"hash_trecho": None,
}
inicio = time.time()
try:
driver.switch_to.window(handle)
# Esperar #idDivFases ter conteudo (AJAX pode ainda estar carregando)
WebDriverWait(driver, timeout_aba).until(
lambda d: (
d.find_element(By.ID, "idDivFases").text or ""
).strip() != ""
)
texto = driver.find_element(By.ID, "idDivFases").text
resultado["hash_trecho"] = hashlib.sha256(
texto.encode()
).hexdigest()[:16]
# Buscar linha com "Proclamacao ... Julgamento"
for linha in texto.split("\n"):
if "proclama" in linha.lower() and "julgamento" in linha.lower():
resultado["proclamacao_encontrada"] = True
resultado["texto_completo"] = linha.strip()
data_m = re.search(r"(\d{2}/\d{2}/\d{4})", linha)
if data_m:
resultado["data_proclamacao"] = data_m.group(1)
d, t, c = extrair_dispositivo(linha)
resultado["dispositivo"] = d
resultado["tipo_dispositivo"] = t
resultado["confianca"] = c
vot_m = REGEX_VOTACAO.search(linha)
resultado["votacao"] = (
vot_m.group(1).strip() if vot_m else None
)
break
except TimeoutException:
resultado["erro"] = "timeout_div_fases"
except NoSuchWindowException:
resultado["erro"] = "aba_fechada_inesperadamente"
except StaleElementReferenceException:
resultado["erro"] = "elemento_stale"
except Exception as e:
resultado["erro"] = str(e)[:200]
resultado["tempo_segundos"] = round(time.time() - inicio, 2)
return resultado
def _fechar_abas_extras(self, driver, aba_original: str):
for handle in driver.window_handles:
if handle != aba_original:
try:
driver.switch_to.window(handle)
driver.close()
except Exception:
pass
try:
driver.switch_to.window(aba_original)
except Exception:
pass
@property
def status(self) -> dict:
return {
"browser_ativo": self._driver is not None,
"cloudflare_resolvido": self._cloudflare_ok,
"iniciado_em": (
self._iniciado_em.isoformat() if self._iniciado_em else None
),
"total_processados": self._total_processados,
"total_encontrados": self._total_encontrados,
"total_erros": self._total_erros,
}
def _resultado_erro(nr: str, erro: str) -> dict:
return {
"numero_registro": nr,
"proclamacao_encontrada": False,
"data_proclamacao": None,
"texto_completo": None,
"dispositivo": None,
"tipo_dispositivo": None,
"votacao": None,
"confianca": None,
"tempo_segundos": 0.0,
"erro": erro,
"hash_trecho": None,
}
# === FastAPI ===
gerenciador = GerenciadorBrowser()
app = FastAPI(
title="STJ Consulta Processual",
description="1 browser + N abas para extracao paralela de proclamacoes",
version="1.0.0",
)
class RequisicaoConsulta(BaseModel):
numeros_registro: list[str] = Field(
...,
min_length=1,
max_length=20,
description="Lista de numeros de registro (12 digitos cada)",
)
timeout_aba: int = Field(
default=15,
ge=5,
le=60,
description="Timeout em segundos para cada aba",
)
@field_validator("numeros_registro")
@classmethod
def validar_numeros(cls, v: list[str]) -> list[str]:
for nr in v:
if not nr.isdigit() or len(nr) < 10 or len(nr) > 14:
raise ValueError(
f"numero_registro invalido: '{nr}' (esperado 10-14 digitos)"
)
return v
class ResultadoProcesso(BaseModel):
numero_registro: str
proclamacao_encontrada: bool = False
data_proclamacao: Optional[str] = None
texto_completo: Optional[str] = None
dispositivo: Optional[str] = None
tipo_dispositivo: Optional[str] = None
votacao: Optional[str] = None
confianca: Optional[str] = None
tempo_segundos: float = 0.0
erro: Optional[str] = None
hash_trecho: Optional[str] = None
class MetaLote(BaseModel):
total: int
com_proclamacao: int
com_erro: int
tempo_total_segundos: float
tempo_medio_por_processo: float
class RespostaConsulta(BaseModel):
resultados: list[ResultadoProcesso]
meta: MetaLote
@app.get("/health")
def health():
return {"status": "ok", "browser": gerenciador.status}
@app.post("/consultar", response_model=RespostaConsulta)
def consultar(req: RequisicaoConsulta):
"""Processa lote de numeros_registro em abas paralelas."""
inicio = time.time()
resultados = gerenciador.processar_lote(
numeros_registro=req.numeros_registro,
timeout_aba=req.timeout_aba,
)
tempo_total = round(time.time() - inicio, 2)
total = len(resultados)
com_proc = sum(1 for r in resultados if r.get("proclamacao_encontrada"))
com_erro = sum(1 for r in resultados if r.get("erro"))
return RespostaConsulta(
resultados=resultados,
meta=MetaLote(
total=total,
com_proclamacao=com_proc,
com_erro=com_erro,
tempo_total_segundos=tempo_total,
tempo_medio_por_processo=round(
tempo_total / max(total, 1), 2
),
),
)
@app.post("/reiniciar")
def reiniciar():
"""Mata o browser e reinicia (resolve Cloudflare de novo)."""
try:
gerenciador.reiniciar()
return {"status": "ok", "browser": gerenciador.status}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)[:500])
@app.get("/debug/{numero_registro}")
def debug(numero_registro: str):
"""Abre 1 aba, navega, e retorna diagnostico completo."""
with gerenciador._lock:
try:
gerenciador._iniciar_browser()
gerenciador._resolver_cloudflare()
except Exception as e:
return {"erro": f"browser/cloudflare: {str(e)[:200]}"}
driver = gerenciador._driver
original = driver.current_window_handle
resultado = {}
try:
# Info da aba principal (pos-Cloudflare)
resultado["main_titulo"] = driver.title
resultado["main_url"] = driver.current_url
resultado["main_cookies"] = len(driver.get_cookies())
# Abrir aba com URL do processo
url = URL_BASE.format(nr=numero_registro)
resultado["url_alvo"] = url
driver.execute_script(f"window.open('{url}')")
time.sleep(1)
handles = [h for h in driver.window_handles if h != original]
if not handles:
resultado["erro"] = "aba nao abriu"
return resultado
driver.switch_to.window(handles[0])
# Esperar ate 15s pelo carregamento
for i in range(15):
time.sleep(1)
titulo = driver.title
url_atual = driver.current_url
resultado[f"t{i+1}_titulo"] = titulo
resultado[f"t{i+1}_url"] = url_atual[:100]
# Se nao tem "momento" no titulo, pagina carregou
if "momento" not in titulo.lower() and "verifica" not in titulo.lower():
if titulo.strip():
break
resultado["titulo_final"] = driver.title
resultado["url_final"] = driver.current_url
resultado["body_preview"] = driver.find_element(
By.TAG_NAME, "body"
).text[:500]
# Tentar encontrar link Fases
try:
fases = driver.find_element(
By.XPATH, "//a[contains(text(), 'Fases')]"
)
resultado["fases_encontrada"] = True
resultado["fases_text"] = fases.text
except Exception:
resultado["fases_encontrada"] = False
# Listar todos os links da pagina
links = driver.find_elements(By.TAG_NAME, "a")
resultado["links_na_pagina"] = [
a.text[:50] for a in links[:20] if a.text.strip()
]
except Exception as e:
resultado["erro"] = str(e)[:300]
finally:
for h in driver.window_handles:
if h != original:
try:
driver.switch_to.window(h)
driver.close()
except Exception:
pass
driver.switch_to.window(original)
return resultado