""" STJ Consulta Processual — HuggingFace Space 1 browser + N abas para paralelismo eficiente. Endpoints: POST /consultar — processa lote de numeros_registro GET /health — status do browser POST /reiniciar — reinicia browser (kill + start) Arquitetura: - 1 instancia de Chrome (undetected-chromedriver) via Xvfb - N abas abertas simultaneamente (window.open) - Abas compartilham cookies (Cloudflare resolvido 1x) - Fases: 1. Abrir abas com navegacao paralela 2. Aguardar carregamento coletivo 3. Clicar "Fases" em cada aba 4. Aguardar AJAX paralelo 5. Extrair proclamacao de cada aba """ import hashlib import logging import os import re import time import random import threading from datetime import datetime from typing import Optional import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ( TimeoutException, WebDriverException, NoSuchWindowException, StaleElementReferenceException, ) from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field, field_validator # === Logging === logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("stj-consulta") # === Regex (poc_v2 calibrado, 10 tipos, 100% ALTA em 50 processos) === REGEX_DISPOSITIVO = re.compile( r"decidiu\s+(.*?)(?:\.\s*-|,\s*nos termos|\.\s*$)", re.IGNORECASE | re.DOTALL, ) REGEX_FALLBACK = re.compile( r"(?:por\s+(?:unanimidade|maioria)[\s,]+(?:de\s+votos[\s,]+)?)" r"(.*?)(?:\.\s*-|,\s*nos termos|\.\s*$)", re.IGNORECASE | re.DOTALL, ) REGEX_PARCIAL = re.compile( r"Proclama[çc][aã]o\s+Parcial\s+de\s+Julgamento:\s*(.*?)(?:\.\s*-|\.\s*$)", re.IGNORECASE | re.DOTALL, ) REGEX_VOTACAO = re.compile( r"por\s+(unanimidade|maioria(?:\s+de\s+votos)?)", re.IGNORECASE, ) MAPA_TIPO = [ (re.compile(r"neg\w+\s+provimento", re.IGNORECASE), "NEGADO"), (re.compile(r"dar\s+parcial\s+provimento", re.IGNORECASE), "PARCIAL"), (re.compile(r"dar\s+provimento", re.IGNORECASE), "PROVIDO"), (re.compile(r"conhec\w+\s+parcialmente.*neg\w+\s+provimento", re.IGNORECASE), "NEGADO"), (re.compile(r"prejudicad", re.IGNORECASE), "PREJUDICADO"), (re.compile(r"n[aã]o\s+conhec", re.IGNORECASE), "NAO_CONHECIDO"), (re.compile(r"retirad[oa]\s+de\s+pauta", re.IGNORECASE), "RETIRADO"), (re.compile(r"destaque", re.IGNORECASE), "DESTACADO"), (re.compile(r"rejeit\w+", re.IGNORECASE), "REJEITADO"), (re.compile(r"acolh\w+", re.IGNORECASE), "ACOLHIDO"), (re.compile(r"homolog", re.IGNORECASE), "HOMOLOGADO"), ] URL_BASE = ( "https://processo.stj.jus.br/processo/pesquisa/" "?termo={nr}" "&aplicacao=processos.ea" "&tipoPesquisa=tipoPesquisaGenerica" "&chkordem=DESC" "&chkMorto=MORTO" ) # === Funcoes de extracao (identicas a poc_v2) === def classificar_tipo(texto: str) -> str: for regex, tipo in MAPA_TIPO: if regex.search(texto): return tipo return "OUTRO" def extrair_dispositivo(texto: str) -> tuple[Optional[str], Optional[str], Optional[str]]: # 1. Proclamacao Parcial (retirado, destaque, etc.) match = REGEX_PARCIAL.search(texto) if match: d = match.group(1).strip() return d, classificar_tipo(d), "ALTA" # 2. Primario (apos "decidiu") match = REGEX_DISPOSITIVO.search(texto) if match: d = match.group(1).strip() return d, classificar_tipo(d), "ALTA" # 3. Fallback (apos "unanimidade/maioria") match = REGEX_FALLBACK.search(texto) if match: d = match.group(1).strip() return d, classificar_tipo(d), "ALTA" # 4. Heuristica t = classificar_tipo(texto) if t != "OUTRO": return texto[:200], t, "BAIXA" return None, None, None # === Gerenciador de Browser (singleton thread-safe) === class GerenciadorBrowser: """1 browser Chrome, N abas paralelas, lock para acesso exclusivo.""" def __init__(self): self._driver: Optional[uc.Chrome] = None self._lock = threading.Lock() self._cloudflare_ok = False self._iniciado_em: Optional[datetime] = None self._total_processados = 0 self._total_encontrados = 0 self._total_erros = 0 def _iniciar_browser(self): if self._driver is not None: return logger.info("Iniciando browser (undetected-chromedriver)...") opcoes = uc.ChromeOptions() opcoes.add_argument("--no-sandbox") opcoes.add_argument("--disable-dev-shm-usage") opcoes.add_argument("--disable-gpu") opcoes.add_argument("--disable-extensions") opcoes.add_argument("--disable-background-networking") opcoes.add_argument("--disable-popup-blocking") opcoes.add_argument("--window-size=1920,1080") opcoes.add_argument("--disable-infobars") # Bloquear imagens para economizar banda/CPU opcoes.add_experimental_option("prefs", { "profile.managed_default_content_settings.images": 2, }) self._driver = uc.Chrome(options=opcoes, version_main=None) self._iniciado_em = datetime.now() logger.info("Browser iniciado com sucesso") def _resolver_cloudflare(self): if self._cloudflare_ok: return logger.info("Resolvendo Cloudflare (primeiro acesso)...") self._driver.get("https://processo.stj.jus.br/processo/pesquisa/") time.sleep(5) titulo = self._driver.title.lower() if "momento" in titulo or "verifica" in titulo: logger.warning("Cloudflare pendente, aguardando +5s...") time.sleep(5) titulo = self._driver.title.lower() if "momento" in titulo or "verifica" in titulo: logger.warning("Cloudflare pendente, aguardando +10s...") time.sleep(10) titulo = self._driver.title.lower() if "momento" in titulo or "verifica" in titulo: raise RuntimeError( f"Cloudflare nao resolvido apos 20s. Titulo: {self._driver.title}" ) self._cloudflare_ok = True logger.info("Cloudflare resolvido, titulo=%s", self._driver.title[:60]) def _fechar_browser(self): if self._driver: try: self._driver.quit() except Exception: pass self._driver = None self._cloudflare_ok = False def reiniciar(self): with self._lock: logger.info("Reiniciando browser...") self._fechar_browser() self._iniciar_browser() self._resolver_cloudflare() logger.info("Browser reiniciado com sucesso") def processar_lote( self, numeros_registro: list[str], timeout_aba: int = 15, ) -> list[dict]: with self._lock: try: self._iniciar_browser() self._resolver_cloudflare() return self._processar_lote_impl(numeros_registro, timeout_aba) except WebDriverException as e: logger.error("WebDriverException fatal: %s", str(e)[:200]) self._fechar_browser() raise HTTPException( status_code=500, detail=f"Browser crash, reinicie com POST /reiniciar: {str(e)[:200]}", ) def _processar_lote_impl( self, numeros_registro: list[str], timeout_aba: int, ) -> list[dict]: driver = self._driver aba_original = driver.current_window_handle mapa_abas: dict[str, str] = {} # nr -> window_handle resultados: list[dict] = [] inicio = time.time() try: # === FASE 1: Abrir abas com navegacao paralela === # window.open(url) e non-blocking: todas as abas carregam simultaneamente logger.info("F1: abrindo %d abas...", len(numeros_registro)) handles_antes = set(driver.window_handles) for nr in numeros_registro: url = URL_BASE.format(nr=nr) driver.execute_script(f"window.open('{url}')") time.sleep(0.2) # Pequeno delay para Chrome registrar a aba # Mapear handles novos aos numeros na ordem de criacao handles_novos = [ h for h in driver.window_handles if h not in handles_antes ] for i, nr in enumerate(numeros_registro): if i < len(handles_novos): mapa_abas[nr] = handles_novos[i] else: resultados.append(_resultado_erro(nr, "falha_abrir_aba")) # === FASE 2: Aguardar carregamento coletivo === # Todas as abas estao carregando em paralelo (1 processo Chrome, N tabs) logger.info("F2: aguardando carregamento coletivo (%d abas)...", len(mapa_abas)) time.sleep(4) # === FASE 3: Clicar "Fases" em cada aba === # Apos clicar, o AJAX de cada aba carrega em background (paralelo) logger.info("F3: clicando 'Fases' em %d abas...", len(mapa_abas)) abas_com_fases: dict[str, str] = {} for nr, handle in list(mapa_abas.items()): try: driver.switch_to.window(handle) # Verificar se Cloudflare travou nesta aba titulo = driver.title.lower() url_atual = driver.current_url logger.info( "Aba %s: titulo='%s', url='%s'", nr, titulo[:60], url_atual[:80], ) if "momento" in titulo or "verifica" in titulo: logger.warning("Cloudflare pendente na aba %s, aguardando 5s...", nr) time.sleep(5) titulo = driver.title.lower() logger.info("Apos espera, titulo='%s'", titulo[:60]) fases_btn = WebDriverWait(driver, timeout_aba).until( EC.element_to_be_clickable( (By.XPATH, "//a[contains(text(), 'Fases')]") ) ) fases_btn.click() abas_com_fases[nr] = handle except TimeoutException: # Capturar diagnostico da aba que falhou try: diag_titulo = driver.title diag_url = driver.current_url diag_body = driver.find_element(By.TAG_NAME, "body").text[:200] except Exception: diag_titulo = "?" diag_url = "?" diag_body = "?" logger.error( "Timeout Fases nr=%s titulo='%s' url='%s' body='%s'", nr, diag_titulo[:60], diag_url[:80], diag_body[:100], ) resultados.append(_resultado_erro( nr, f"aba_fases_nao_encontrada|titulo={diag_titulo[:50]}|body={diag_body[:80]}", )) except Exception as e: resultados.append( _resultado_erro(nr, f"erro_clique_fases: {str(e)[:100]}") ) # === FASE 4: Aguardar AJAX das Fases (paralelo em background) === logger.info("F4: aguardando AJAX (%d abas)...", len(abas_com_fases)) time.sleep(2) # === FASE 5: Extrair proclamacao de cada aba === logger.info("F5: extraindo proclamacoes de %d abas...", len(abas_com_fases)) for nr, handle in abas_com_fases.items(): resultado = self._extrair_de_aba(driver, handle, nr, timeout_aba) resultados.append(resultado) finally: # Fechar todas as abas extras (manter apenas a original) self._fechar_abas_extras(driver, aba_original) # Metricas tempo_total = round(time.time() - inicio, 2) encontrados = sum(1 for r in resultados if r.get("proclamacao_encontrada")) erros = sum(1 for r in resultados if r.get("erro")) self._total_processados += len(numeros_registro) self._total_encontrados += encontrados self._total_erros += erros logger.info( "Lote concluido: %d processos, %d encontrados, %d erros, " "%.1fs total (%.2fs/proc)", len(numeros_registro), encontrados, erros, tempo_total, tempo_total / max(len(numeros_registro), 1), ) return resultados def _extrair_de_aba( self, driver: uc.Chrome, handle: str, nr: str, timeout_aba: int, ) -> dict: resultado = { "numero_registro": nr, "proclamacao_encontrada": False, "data_proclamacao": None, "texto_completo": None, "dispositivo": None, "tipo_dispositivo": None, "votacao": None, "confianca": None, "tempo_segundos": 0.0, "erro": None, "hash_trecho": None, } inicio = time.time() try: driver.switch_to.window(handle) # Esperar #idDivFases ter conteudo (AJAX pode ainda estar carregando) WebDriverWait(driver, timeout_aba).until( lambda d: ( d.find_element(By.ID, "idDivFases").text or "" ).strip() != "" ) texto = driver.find_element(By.ID, "idDivFases").text resultado["hash_trecho"] = hashlib.sha256( texto.encode() ).hexdigest()[:16] # Buscar linha com "Proclamacao ... Julgamento" for linha in texto.split("\n"): if "proclama" in linha.lower() and "julgamento" in linha.lower(): resultado["proclamacao_encontrada"] = True resultado["texto_completo"] = linha.strip() data_m = re.search(r"(\d{2}/\d{2}/\d{4})", linha) if data_m: resultado["data_proclamacao"] = data_m.group(1) d, t, c = extrair_dispositivo(linha) resultado["dispositivo"] = d resultado["tipo_dispositivo"] = t resultado["confianca"] = c vot_m = REGEX_VOTACAO.search(linha) resultado["votacao"] = ( vot_m.group(1).strip() if vot_m else None ) break except TimeoutException: resultado["erro"] = "timeout_div_fases" except NoSuchWindowException: resultado["erro"] = "aba_fechada_inesperadamente" except StaleElementReferenceException: resultado["erro"] = "elemento_stale" except Exception as e: resultado["erro"] = str(e)[:200] resultado["tempo_segundos"] = round(time.time() - inicio, 2) return resultado def _fechar_abas_extras(self, driver, aba_original: str): for handle in driver.window_handles: if handle != aba_original: try: driver.switch_to.window(handle) driver.close() except Exception: pass try: driver.switch_to.window(aba_original) except Exception: pass @property def status(self) -> dict: return { "browser_ativo": self._driver is not None, "cloudflare_resolvido": self._cloudflare_ok, "iniciado_em": ( self._iniciado_em.isoformat() if self._iniciado_em else None ), "total_processados": self._total_processados, "total_encontrados": self._total_encontrados, "total_erros": self._total_erros, } def _resultado_erro(nr: str, erro: str) -> dict: return { "numero_registro": nr, "proclamacao_encontrada": False, "data_proclamacao": None, "texto_completo": None, "dispositivo": None, "tipo_dispositivo": None, "votacao": None, "confianca": None, "tempo_segundos": 0.0, "erro": erro, "hash_trecho": None, } # === FastAPI === gerenciador = GerenciadorBrowser() app = FastAPI( title="STJ Consulta Processual", description="1 browser + N abas para extracao paralela de proclamacoes", version="1.0.0", ) class RequisicaoConsulta(BaseModel): numeros_registro: list[str] = Field( ..., min_length=1, max_length=20, description="Lista de numeros de registro (12 digitos cada)", ) timeout_aba: int = Field( default=15, ge=5, le=60, description="Timeout em segundos para cada aba", ) @field_validator("numeros_registro") @classmethod def validar_numeros(cls, v: list[str]) -> list[str]: for nr in v: if not nr.isdigit() or len(nr) < 10 or len(nr) > 14: raise ValueError( f"numero_registro invalido: '{nr}' (esperado 10-14 digitos)" ) return v class ResultadoProcesso(BaseModel): numero_registro: str proclamacao_encontrada: bool = False data_proclamacao: Optional[str] = None texto_completo: Optional[str] = None dispositivo: Optional[str] = None tipo_dispositivo: Optional[str] = None votacao: Optional[str] = None confianca: Optional[str] = None tempo_segundos: float = 0.0 erro: Optional[str] = None hash_trecho: Optional[str] = None class MetaLote(BaseModel): total: int com_proclamacao: int com_erro: int tempo_total_segundos: float tempo_medio_por_processo: float class RespostaConsulta(BaseModel): resultados: list[ResultadoProcesso] meta: MetaLote @app.get("/health") def health(): return {"status": "ok", "browser": gerenciador.status} @app.post("/consultar", response_model=RespostaConsulta) def consultar(req: RequisicaoConsulta): """Processa lote de numeros_registro em abas paralelas.""" inicio = time.time() resultados = gerenciador.processar_lote( numeros_registro=req.numeros_registro, timeout_aba=req.timeout_aba, ) tempo_total = round(time.time() - inicio, 2) total = len(resultados) com_proc = sum(1 for r in resultados if r.get("proclamacao_encontrada")) com_erro = sum(1 for r in resultados if r.get("erro")) return RespostaConsulta( resultados=resultados, meta=MetaLote( total=total, com_proclamacao=com_proc, com_erro=com_erro, tempo_total_segundos=tempo_total, tempo_medio_por_processo=round( tempo_total / max(total, 1), 2 ), ), ) @app.post("/reiniciar") def reiniciar(): """Mata o browser e reinicia (resolve Cloudflare de novo).""" try: gerenciador.reiniciar() return {"status": "ok", "browser": gerenciador.status} except Exception as e: raise HTTPException(status_code=500, detail=str(e)[:500]) @app.get("/debug/{numero_registro}") def debug(numero_registro: str): """Abre 1 aba, navega, e retorna diagnostico completo.""" with gerenciador._lock: try: gerenciador._iniciar_browser() gerenciador._resolver_cloudflare() except Exception as e: return {"erro": f"browser/cloudflare: {str(e)[:200]}"} driver = gerenciador._driver original = driver.current_window_handle resultado = {} try: # Info da aba principal (pos-Cloudflare) resultado["main_titulo"] = driver.title resultado["main_url"] = driver.current_url resultado["main_cookies"] = len(driver.get_cookies()) # Abrir aba com URL do processo url = URL_BASE.format(nr=numero_registro) resultado["url_alvo"] = url driver.execute_script(f"window.open('{url}')") time.sleep(1) handles = [h for h in driver.window_handles if h != original] if not handles: resultado["erro"] = "aba nao abriu" return resultado driver.switch_to.window(handles[0]) # Esperar ate 15s pelo carregamento for i in range(15): time.sleep(1) titulo = driver.title url_atual = driver.current_url resultado[f"t{i+1}_titulo"] = titulo resultado[f"t{i+1}_url"] = url_atual[:100] # Se nao tem "momento" no titulo, pagina carregou if "momento" not in titulo.lower() and "verifica" not in titulo.lower(): if titulo.strip(): break resultado["titulo_final"] = driver.title resultado["url_final"] = driver.current_url resultado["body_preview"] = driver.find_element( By.TAG_NAME, "body" ).text[:500] # Tentar encontrar link Fases try: fases = driver.find_element( By.XPATH, "//a[contains(text(), 'Fases')]" ) resultado["fases_encontrada"] = True resultado["fases_text"] = fases.text except Exception: resultado["fases_encontrada"] = False # Listar todos os links da pagina links = driver.find_elements(By.TAG_NAME, "a") resultado["links_na_pagina"] = [ a.text[:50] for a in links[:20] if a.text.strip() ] except Exception as e: resultado["erro"] = str(e)[:300] finally: for h in driver.window_handles: if h != original: try: driver.switch_to.window(h) driver.close() except Exception: pass driver.switch_to.window(original) return resultado