IOI-RUN / operacao.py
Roudrigus's picture
Upload 82 files
0f0ef8d verified
raw
history blame
67.8 kB
# -*- coding: utf-8 -*-
"""
operacao.py — Módulo Operação (Mayasuite)
Recursos principais:
- Modo rápido (Consulta leve): apenas 1 página + colunas essenciais na visualização
- Limite de itens por página (via x-filter-limit se suportado pela API)
- Cache de consultas (TTL configurável) + botões para limpar cache
- Botão de cancelar consulta
- Barra de progresso por página + status
- Debounce no submit
- Retentativas para 429/5xx (incl. 502) e timeouts com backoff
- Filtros avançados: Data operação (única), Type(API), Tipo SBM (relacionado ao Depositante),
Endereços/Notas/Categorias com sugestões, Destinatários (multisseleção) + CNPJs múltiplos,
Depositantes (multisseleção)
- Formatação PT-BR: datas DD/MM/AAAA, valores R$, SKU sem zeros à esquerda, oculta colunas vazias
- KPIs dinâmicos por consulta (cards) — Estoque, Endereços, NF Entrada/Saída, etc.
- Oculta navegação de outros módulos ao carregar
"""
import os
import re
import json
import time
import random
from io import BytesIO
from datetime import datetime
import base64
import streamlit as st
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Se existirem, importe utilitários do seu projeto
try:
from utils_permissoes import verificar_permissao
except Exception:
def verificar_permissao(_):
return True
try:
from utils_auditoria import registrar_log
except Exception:
def registrar_log(**kwargs):
return None
# =====================================================
# CONFIG (.env)
# =====================================================
OP_API_BASE_URL = os.getenv("OP_API_BASE_URL", "https://api.mayasuite.com").rstrip("/")
OP_LOGIN_EMAIL = (os.getenv("OP_LOGIN_EMAIL", "") or "").strip()
OP_LOGIN_PASSWORD = (os.getenv("OP_LOGIN_PASSWORD", "") or "").strip()
# Timeouts
OP_CONNECT_TIMEOUT = float((os.getenv("OP_CONNECT_TIMEOUT", "10") or "10")) # s
OP_READ_TIMEOUT = float((os.getenv("OP_READ_TIMEOUT", "90") or "90")) # s
# Token direto (bypass do login)
OP_ACCESS_TOKEN = (os.getenv("OP_ACCESS_TOKEN", "") or "").strip()
# Diagnóstico / alternativas
OP_LOGIN_EMAIL_ALT = (os.getenv("OP_LOGIN_EMAIL_ALT", "") or "").strip()
OP_LOGIN_PASSWORD_ALT = (os.getenv("OP_LOGIN_PASSWORD_ALT", "") or "").strip()
OP_LOGIN_DEBUG = (os.getenv("OP_LOGIN_DEBUG", "false") or "").strip().lower() == "true"
# Headers compatíveis
OP_COMPAT_HEADERS = (os.getenv("OP_COMPAT_HEADERS", "true") or "").strip().lower() == "true"
# Proxy (opcional)
OP_PROXY_HTTP = (os.getenv("OP_PROXY_HTTP", "") or "").strip()
OP_PROXY_HTTPS = (os.getenv("OP_PROXY_HTTPS", "") or "").strip()
PROXIES = {"http": OP_PROXY_HTTP, "https": OP_PROXY_HTTPS} if (OP_PROXY_HTTP or OP_PROXY_HTTPS) else None
# Rate limit / timeouts
OP_RATE_DELAY_SEC = float((os.getenv("OP_RATE_DELAY_SEC", "0.8") or "0.8"))
OP_MAX_RETRIES_PER_PAGE = int((os.getenv("OP_MAX_RETRIES_PER_PAGE", "3") or "3"))
OP_MAX_PAGES = int((os.getenv("OP_MAX_PAGES", "0") or "0"))
OP_MAX_TIMEOUT_RETRIES = int((os.getenv("OP_MAX_TIMEOUT_RETRIES", "2") or "2"))
OP_TIMEOUT_BACKOFF_BASE = float((os.getenv("OP_TIMEOUT_BACKOFF_BASE", "5") or "5"))
# Retentativas específicas para 5xx (inclusive 502)
OP_MAX_RETRIES_5XX = int((os.getenv("OP_MAX_RETRIES_5XX", "4") or "4"))
OP_5XX_BACKOFF_BASE = float((os.getenv("OP_5XX_BACKOFF_BASE", "3") or "3"))
# Tempo de cache (minutos) — configura no .env: CACHE_TTL_MIN=5
CACHE_TTL_MIN = int((os.getenv("CACHE_TTL_MIN", "5") or "5"))
CACHE_TTL_SEC = CACHE_TTL_MIN * 60
# Bases
BASES_MAP = {
"Matriz": "5a926346-15ee-4af4-ba2d-1a71d62d9b51",
"CL": "b0099983-5b44-4650-821a-e352c5c1f10e",
"YARD": "2c506e56-641d-48e2-a330-93fd088526cf",
}
# Endpoints
ENDPOINTS = {
"Nota Fiscal de Entrada": "/wsreceipt/list",
"Nota Fiscal de Saída": "/wsdispatch/list",
"Endereços": "/address/list",
"Endereços bloqueados": "/address/blocking/list",
"Lista de Pedido": "/cargorelease/list",
"Monitor Sefaz": "/monitor/nfe/list",
"Estoque": "/stock/list",
"Operações": "/operation/list",
"Agendamento": "/yms/scheduling/list",
"Produto": "/product/list",
"Faturamento": "/financial/invoice/list",
}
# Tipos (API) e Tipos SBM
OPERATION_TYPES = ["ALLOCATION", "CHECK", "DISPATCH", "MOVEMENT", "RECEIVING", "PICK"]
SBM_TYPES = ["SBM - LOAD", "SBM - BACKLOAD", "OUTROS"]
# Cancelamento
CANCEL_TOKEN_KEY = "__op_cancel__"
# ✅ NOVO: OAuth2 Client Credentials (se a API suportar)
OAUTH_TOKEN_URL = (os.getenv("OAUTH_TOKEN_URL", "") or "").strip() # ex.: https://api.mayasuite.com/oauth/token
OAUTH_CLIENT_ID = (os.getenv("CLIENT_ID", "") or "").strip()
OAUTH_CLIENT_SECRET = (os.getenv("CLIENT_SECRET", "") or "").strip()
OAUTH_SCOPE = (os.getenv("OAUTH_SCOPE", "") or "").strip() # opcional
# =====================================================
# RENAME_MAP — renomeia colunas por endpoint (somente se existirem)
# =====================================================
RENAME_MAP = {
"/wsreceipt/list": {
"wsreceipt_code": "NF Entrada",
"create_date": "Data Emissão",
"customer_document": "CNPJ_Depositante",
"customer_name": "Depositante",
"product_code": "SKU",
"product_description": "Descrição",
"qty": "Qtde",
"unit_measure_code": "Unidade",
"receipt_unit_value": "Vr. Unitário",
"receipt_value": "Vr. Total",
"lot": "Lote",
"sublot": "SubLote",
"expiration_date": "Validade",
"manufacturing_date": "Fabricação",
"location_id_code": "Endereço",
"last_update_date": "Última Atualização",
"last_update_user": "Usuário Atualização",
"category_description": "Categoria",
"type": "Tipo",
},
"/wsdispatch/list": {
"wsdispatch_code": "NF Saída",
"issue_date": "Data Emissão",
"recipient_document": "CNPJ_Destinatário",
"recipient_description": "Destinatário",
"product_code": "SKU",
"product_description": "Descrição",
"qty": "Qtde",
"unit_measure_code": "Unidade",
"item_unit_value": "Vr. Unitário",
"item_total_value": "Vr. Total",
"lot": "Lote",
"sublot": "SubLote",
"location_code": "Endereço",
"category_description": "Categoria",
"group_description": "Grupo",
"type": "Tipo",
},
"/address/list": {
"location_code": "Endereço",
"location_id_code": "Endereço",
"description": "Descrição",
"status": "Status",
"fpso": "FPSO",
"last_update_date": "Última Atualização",
"last_update_user": "Usuário Atualização",
"type": "Tipo",
},
"/address/blocking/list": {
"location_code": "Endereço",
"location_id_code": "Endereço",
"block_reason": "Motivo Bloqueio",
"block_date": "Data Bloqueio",
"block_user": "Usuário Bloqueio",
"unblock_date": "Data Desbloqueio",
"unblock_user": "Usuário Desbloqueio",
"status": "Status",
"type": "Tipo",
},
"/cargorelease/list": {
"cargorelease_code": "Pedido",
"create_date": "Data Criação",
"customer_document": "CNPJ_Depositante",
"customer_name": "Depositante",
"product_code": "SKU",
"product_description": "Descrição",
"qty": "Qtde",
"unit_measure_code": "Unidade",
"status": "Status",
"expiration_date": "Validade",
"manufacturing_date": "Fabricação",
"category_description": "Categoria",
"type": "Tipo",
},
"/monitor/nfe/list": {
"nfe_key": "Chave NFe",
"nfe_number": "Número NFe",
"status": "Status SEFAZ",
"protocol": "Protocolo",
"issue_date": "Data Emissão",
"recipient_document": "CNPJ_Destinatário",
"customer_document": "CNPJ_Depositante",
"message": "Mensagem",
"last_update_date": "Última Atualização",
"type": "Tipo",
},
"/stock/list": {
"date": "Data Operação",
"wsreceipt_code": "Nota Fiscal",
"product_code": "SKU",
"product_description": "Descrição",
"unit_measure_code": "Unidade",
"qty": "Qtde",
"item_unit_value": "Vr. Unitário",
"item_total_value": "Vr. Total",
"customer_document": "CNPJ_Depositante",
"customer_description": "Depositante",
"lot": "Lote",
"location_code": "Endereço",
"expiration_date": "Validade",
"manufacturing_date": "Fabricação",
"category_description": "Categoria",
"group_description": "Grupo",
"recipient_document": "CNPJ_Destinatário",
"recipient_description": "Destinatário",
"qty_reservation": "Qtde Reservada",
"type": "Tipo",
},
"/operation/list": {
"cargorelease_code": "Pedido",
"create_date": "Data",
"create_user": "Usuário Criação",
"customer_document": "CNPJ_Depositante",
"customer_name": "Depositante",
"expiration_date": "Validade",
"last_update_date": "Última Atualização",
"last_update_user": "Usuário Atualização",
"location_id_code": "Endereço",
"lot": "Lote",
"manufacturing_date": "Fabricação",
"product_code": "SKU",
"product_description": "Descrição",
"qty": "Qtde",
"receipt_unit_value": "Vr. Unitário",
"receipt_value": "Vr. Total",
"sublot": "SubLote",
"type": "Tipo",
"wsreceipt_code": "NF Entrada",
"category_description": "Categoria",
},
"/yms/scheduling/list": {
"scheduling_id": "ID Agendamento",
"yard": "Pátio",
"dock": "Doca",
"truck_plate": "Placa",
"driver_name": "Motorista",
"scheduled_date": "Data Agendada",
"scheduled_time": "Hora Agendada",
"status": "Status",
"last_update_date": "Última Atualização",
"type": "Tipo",
},
"/product/list": {
"product_code": "SKU",
"product_description": "Descrição",
"category_description": "Categoria",
"group_description": "Grupo",
"unit_measure_code": "Unidade",
"status": "Status",
"last_update_date": "Última Atualização",
"type": "Tipo",
},
"/financial/invoice/list": {
"invoice_number": "Número Fatura",
"invoice_date": "Data Fatura",
"customer_document": "CNPJ_Depositante",
"customer_name": "Depositante",
"total_value": "Vr. Total",
"status": "Status",
"nfe_key": "Chave NFe",
"wsdispatch_code": "NF Saída",
"last_update_date": "Última Atualização",
"type": "Tipo",
},
}
# =====================================================
# SESSÃO HTTP (retry 5xx/429) + PROGRESSO + CANCELAMENTO
# =====================================================
def _build_retry_adapter() -> HTTPAdapter:
"""Adapter de retry: lida com 429/5xx transientes."""
retry = Retry(
total=3, connect=3, read=3, backoff_factor=1.0,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"], raise_on_status=False,
)
return HTTPAdapter(max_retries=retry)
def _get_session() -> requests.Session:
sess = st.session_state.get("_op_session")
if sess is None:
sess = requests.Session()
adapter = _build_retry_adapter()
sess.mount("https://", adapter)
sess.mount("http://", adapter)
st.session_state["_op_session"] = sess
return sess
# =====================================================
# LOGIN / TOKEN
# =====================================================
class TokenManager:
"""
Gerencia token de acesso:
- Se OP_ACCESS_TOKEN estiver definido no .env: usa diretamente (bypass).
- Se variáveis OAuth2 estiverem configuradas: obtém/renova via client_credentials.
- Caso contrário: usa login atual (POST /login) com OP_LOGIN_EMAIL/OP_LOGIN_PASSWORD
(inclui conta alternativa e cooldown).
"""
def __init__(self):
self.access_token: str | None = OP_ACCESS_TOKEN if OP_ACCESS_TOKEN else None
self.expire_ts: float = 0.0
self._skew_sec: int = 30 # tolerância de relógio
def _fetch_oauth_token(self) -> tuple[str | None, float]:
"""Tenta obter via OAuth2 client_credentials. Retorna (token, expire_ts_epoch)."""
if not (OAUTH_TOKEN_URL and OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET):
return None, 0.0
try:
resp = requests.post(
OAUTH_TOKEN_URL,
data={"grant_type": "client_credentials", "scope": OAUTH_SCOPE},
auth=(OAUTH_CLIENT_ID, OAUTH_CLIENT_SECRET),
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT),
proxies=PROXIES
)
if resp.status_code >= 400:
if OP_LOGIN_DEBUG:
st.warning(f"OAuth2 falhou ({resp.status_code}): {(resp.text or '')[:300]}")
return None, 0.0
data = resp.json()
token = data.get("access_token") or data.get("token")
expires_in = float(data.get("expires_in") or 3600.0)
return token, (time.time() + max(60.0, expires_in))
except requests.exceptions.RequestException as e:
if OP_LOGIN_DEBUG:
st.warning(f"Falha OAuth2: {e}")
return None, 0.0
def _fetch_login_token(self) -> tuple[str | None, float]:
"""
Fallback para o login atual (POST /login), mantendo:
- alternativa (OP_LOGIN_EMAIL_ALT/OP_LOGIN_PASSWORD_ALT)
- cooldown por banimento (api_login_lock_until)
- e toda lógica de parsing/erros
"""
lock_until = st.session_state.get("api_login_lock_until")
if lock_until and time.time() < lock_until:
raise RuntimeError("Login temporariamente bloqueado após 403 (cooldown ativo).")
url = f"{OP_API_BASE_URL}/login"
payload_str = json.dumps({"login": OP_LOGIN_EMAIL, "password": OP_LOGIN_PASSWORD}, ensure_ascii=False)
headers = {"Content-Type": "application/json"}
try:
resp = requests.post(url, headers=headers, data=payload_str,
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Falha na conexão ao login: {e}")
body_text = (resp.text or "")[:600]
if resp.status_code >= 400:
if "banned" in body_text.lower():
st.session_state["api_login_lock_until"] = time.time() + 3600
if OP_LOGIN_DEBUG:
st.warning(f"Servidor retornou banimento. Cooldown 1h. Corpo: {body_text}")
raise RuntimeError("Origem/host/IP banido pelo servidor (403). Solicite desbloqueio/allowlist.")
if OP_LOGIN_EMAIL_ALT and OP_LOGIN_PASSWORD_ALT:
payload_alt = json.dumps({"login": OP_LOGIN_EMAIL_ALT, "password": OP_LOGIN_PASSWORD_ALT}, ensure_ascii=False)
resp2 = requests.post(url, headers=headers, data=payload_alt,
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
if resp2.status_code >= 400:
if OP_LOGIN_DEBUG:
st.warning(f"Alternativa falhou (HTTP {resp2.status_code}). Corpo: {(resp2.text or '')[:600]}")
raise RuntimeError(f"Login rejeitado ({resp.status_code}).")
try:
data2 = resp2.json()
except Exception:
raise RuntimeError("Resposta de login (alt) não é JSON.")
token2 = data2.get("access_token") or data2.get("token")
if not token2:
raise RuntimeError("Token (alt) não encontrado na resposta de login.")
return token2, (time.time() + 3600.0)
if OP_LOGIN_DEBUG:
st.warning(f"Login falhou (HTTP {resp.status_code}). Corpo: {body_text}")
raise RuntimeError(f"Login rejeitado ({resp.status_code}).")
try:
data = resp.json()
except Exception:
raise RuntimeError("Resposta de login não é JSON.")
token = data.get("access_token") or data.get("token") or (data if isinstance(data, str) else "")
if not token:
raise RuntimeError("Token 'access_token' não encontrado na resposta de login.")
return token, (time.time() + 3600.0)
def get_token(self) -> str:
"""Obtém token atual (renova se necessário)."""
# Bypass: token fixo do .env
if OP_ACCESS_TOKEN:
self.access_token = OP_ACCESS_TOKEN
self.expire_ts = time.time() + 365*24*3600 # validade simbólica longa
return self.access_token
now = time.time()
if (not self.access_token) or (now + self._skew_sec >= self.expire_ts):
# Tenta primeiro OAuth2; se indisponível, usa login /login
token, exp_ts = self._fetch_oauth_token()
if not token:
token, exp_ts = self._fetch_login_token()
self.access_token = token
self.expire_ts = exp_ts or (now + 3600.0)
return self.access_token
def force_refresh(self) -> str:
"""Força renovar o token (útil em 401) e retorna o novo."""
self.access_token = None
self.expire_ts = 0.0
return self.get_token()
# ✅ Instância global (mantém sua organização)
TM = TokenManager()
# 🛠️ Refatora função _get_token para delegar ao TokenManager
# (remove cache para não “congelar” o token; mantém assinatura/uso)
def _get_token(login_email: str, login_password: str) -> str:
"""
Obtém token automaticamente:
- OAuth2 client_credentials se configurado;
- Caso contrário, login /login com e-mail/senha (mantendo sua lógica).
"""
return TM.get_token()
# =====================================================
# HEADERS
# =====================================================
def _auth_headers(token: str, base_guid: str, page: int, limit: int, is_post: bool) -> dict:
h = {"Authorization": f"Bearer {token}", "Content-Type": "application/json", "x-user-enterprise-id": base_guid}
if OP_COMPAT_HEADERS:
h["Accept"] = "application/json"
h["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ArmLoadApp/1.0 PowerQueryCompat"
h["Connection"] = "keep-alive"
h["Origin"] = OP_API_BASE_URL
h["Referer"] = OP_API_BASE_URL + "/"
# Paginação do servidor (se suportado)
h["x-filter-page"] = str(page)
h["x-filter-limit"] = str(limit)
return h
# =====================================================
# RATE LIMIT: helpers
# =====================================================
def _parse_retry_after(resp) -> float:
ra_hdr = resp.headers.get("Retry-After")
if ra_hdr:
try:
return float(ra_hdr.strip())
except Exception:
pass
try:
data = resp.json()
ra_body = data.get("retry-after") or ""
if isinstance(ra_body, (int, float)):
return float(ra_body)
if isinstance(ra_body, str):
m = re.search(r"(\d+)", ra_body)
if m:
return float(m.group(1))
except Exception:
pass
return 60.0
# =====================================================
# LIST — POST + RAW JSON com rate-limit/timeout/5xx
# + Progresso de carregamento + Cancelamento
# =====================================================
def _call_list_paginated(
path: str,
base_guid: str,
token: str,
body_filter: dict,
limit: int = 1000,
max_pages_override: int | None = None
):
"""
- POST com RAW JSON (data=payload_str), como Excel.
- Respeita 429 (Retry-After); aplica atraso entre páginas.
- Retenta automaticamente Timeout (backoff exponencial).
- Retenta 5xx (incl. 502) com backoff exponencial + jitter.
- Suporta cancelamento via st.session_state[CANCEL_TOKEN_KEY].
- Exibe barra de progresso e status de carregamento por página.
"""
page = 1
all_rows = []
session = _get_session()
consecutive_429 = 0
max_pages = OP_MAX_PAGES if max_pages_override is None else max_pages_override
# ----- UI de progresso -----
progress_bar = st.session_state.get("__op_progress_bar__")
status_text = st.session_state.get("__op_status_text__")
if progress_bar is None:
progress_bar = st.progress(0)
st.session_state["__op_progress_bar__"] = progress_bar
if status_text is None:
status_text = st.empty()
st.session_state["__op_status_text__"] = status_text
denom = max_pages if (max_pages and max_pages > 0) else None
# reset progresso/cancelamento
st.session_state[CANCEL_TOKEN_KEY] = st.session_state.get(CANCEL_TOKEN_KEY, False)
st.session_state["op_pages_processed"] = 0
try:
while True:
# Cancelamento
if st.session_state.get(CANCEL_TOKEN_KEY):
raise RuntimeError("Consulta cancelada.")
if max_pages and page > max_pages:
break
url = f"{OP_API_BASE_URL}{path}"
headers = _auth_headers(token, base_guid, page, limit, is_post=True)
payload_str = json.dumps(body_filter or {}, ensure_ascii=False)
# ---- Retentativas por Timeout ----
timeout_attempt = 0
while True:
try:
resp = session.post(
url, headers=headers, data=payload_str,
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES
)
break
except (requests.exceptions.ReadTimeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.Timeout) as e:
timeout_attempt += 1
if timeout_attempt > OP_MAX_TIMEOUT_RETRIES:
raise RuntimeError(f"Timeout após {OP_MAX_TIMEOUT_RETRIES} tentativas na página {page}: {e}")
sleep_sec = OP_TIMEOUT_BACKOFF_BASE * timeout_attempt
if OP_LOGIN_DEBUG:
st.warning(f"Timeout na página {page}. Retentativa {timeout_attempt} em ~{sleep_sec}s...")
time.sleep(sleep_sec)
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Falha na página {page}: {e}")
# ✅ 401 → renovar token 1x (OAuth2/client_credentials ou /login) e re-tentar
if resp.status_code == 401:
token = TM.force_refresh()
headers = _auth_headers(token, base_guid, page, limit, is_post=True)
resp = session.post(
url, headers=headers, data=payload_str,
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES
)
if resp.status_code == 401:
raise RuntimeError(f"Token inválido/expirado após renovação (401) na página {page}: {resp.text[:500]}")
# 429 → Retry-After + repetir
if resp.status_code == 429:
wait_sec = _parse_retry_after(resp)
consecutive_429 += 1
if consecutive_429 > OP_MAX_RETRIES_PER_PAGE:
raise RuntimeError(
f"Limite de tentativas após 429 excedido na página {page}. Aguarde ~{wait_sec}s e tente novamente."
)
if OP_LOGIN_DEBUG:
st.warning(f"429 recebido. Aguardando ~{wait_sec}s antes de repetir página {page}...")
time.sleep(wait_sec)
resp = session.post(
url, headers=headers, data=payload_str,
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES
)
# 5xx (inclui 502 Bad Gateway) → backoff + jitter
if resp.status_code in (500, 502, 503, 504):
for attempt in range(1, OP_MAX_RETRIES_5XX + 1):
wait = OP_5XX_BACKOFF_BASE * (2 ** (attempt - 1)) + random.uniform(0, OP_5XX_BACKOFF_BASE)
if OP_LOGIN_DEBUG:
st.warning(f"{resp.status_code} recebido. Retentativa {attempt}/{OP_MAX_RETRIES_5XX} em ~{wait:.1f}s...")
time.sleep(wait)
try:
resp = session.post(
url, headers=headers, data=payload_str,
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES
)
except requests.exceptions.RequestException:
continue
if resp.status_code < 500:
break
if resp.status_code in (500, 502, 503, 504):
snippet = (resp.text or "")[:500]
raise RuntimeError(f"Erro {resp.status_code} na página {page}: {snippet}")
# Outros erros HTTP
if resp.status_code >= 400:
raise RuntimeError(f"Erro {resp.status_code} na página {page}: {resp.text[:500]}")
# RESET 429 quando OK
consecutive_429 = 0
# Parse JSON
try:
data = resp.json()
except Exception:
raise RuntimeError(f"JSON inválido na página {page}: {resp.text[:300]}")
# Normaliza linhas
if isinstance(data, list):
rows = data
elif isinstance(data, dict):
rows = data.get("data") or data.get("items") or []
if not isinstance(rows, list):
rows = []
else:
rows = []
# Fim se vazio
if not rows:
break
# Soma acumulado
all_rows.extend(rows)
st.session_state["op_pages_processed"] = page
# ----- Atualiza UI de progresso -----
status_text.info(f"🔄 Carregando página {page}…")
if denom:
frac = min(page / float(denom), 1.0)
progress_bar.progress(frac)
else:
progress_bar.progress((page % 10) / 10.0)
# Próxima página
page += 1
# Atraso entre páginas (rate limit)
if OP_RATE_DELAY_SEC > 0:
time.sleep(OP_RATE_DELAY_SEC)
# Concluído: barra 100%
progress_bar.progress(1.0)
time.sleep(0.1)
finally:
# Limpa componentes de UI sempre
try:
status_text.empty()
except Exception:
pass
try:
progress_bar.empty()
except Exception:
pass
st.session_state.pop("__op_progress_bar__", None)
st.session_state.pop("__op_status_text__", None)
return all_rows
# =====================================================
# EXPORTS
# =====================================================
def _export_excel(df: pd.DataFrame, report_key: str, filtros_aplicados: dict) -> bytes:
buffer = BytesIO()
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name=report_key[:30] or "Relatorio")
meta_df = pd.DataFrame([filtros_aplicados])
meta_df.to_excel(writer, index=False, sheet_name="Filtros_Aplicados")
return buffer.getvalue()
def _export_csv(df: pd.DataFrame) -> bytes:
return df.to_csv(index=False, encoding="utf-8-sig").encode("utf-8-sig")
# =====================================================
# Helpers — JWT iat/exp
# =====================================================
def _jwt_payload(token: str):
try:
parts = token.split(".")
if len(parts) != 3:
return None
def b64url_to_b64(s): return s + "=" * (-len(s) % 4)
payload_b64 = b64url_to_b64(parts[1])
payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
return json.loads(payload_json)
except Exception:
return None
def _fmt_ts(ts):
try:
dt = datetime.utcfromtimestamp(int(ts))
return dt.strftime("%d/%m/%Y %H:%M:%S") + " UTC"
except Exception:
return "—"
# =====================================================
# Sugestões — cache (TTL configurável)
# =====================================================
@st.cache_data(ttl=CACHE_TTL_SEC, show_spinner=False)
def _load_suggestions_recipients(base_guid: str, token: str):
"""Sugestões de Destinatários (desc+cnpj) a partir de /stock/list (1 página)."""
session = _get_session()
url = f"{OP_API_BASE_URL}/stock/list"
headers = _auth_headers(token, base_guid, page=1, limit=1000, is_post=True)
try:
resp = session.post(url, headers=headers, data=json.dumps({}, ensure_ascii=False),
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
if resp.status_code >= 400: return []
data = resp.json()
except Exception:
return []
rows = data if isinstance(data, list) else data.get("data") or data.get("items") or []
opts, seen = [], set()
for r in rows:
desc = str(r.get("recipient_description") or "").strip()
doc = str(r.get("recipient_document") or "").strip()
if not desc and not doc: continue
label = f"{desc} ({doc})" if doc else desc
key = (label, doc)
if key not in seen:
seen.add(key); opts.append((label, doc))
return opts
@st.cache_data(ttl=CACHE_TTL_SEC, show_spinner=False)
def _load_suggestions_notas(base_guid: str, token: str, path: str):
"""Sugestões de Nota Fiscal a partir do endpoint corrente (1 página)."""
session = _get_session()
url = f"{OP_API_BASE_URL}{path}"
headers = _auth_headers(token, base_guid, page=1, limit=1000, is_post=True)
try:
resp = session.post(url, headers=headers, data=json.dumps({}, ensure_ascii=False),
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
if resp.status_code >= 400: return []
data = resp.json()
except Exception:
return []
rows = data if isinstance(data, list) else data.get("data") or data.get("items") or []
field = "wsreceipt_code" if path in ("/wsreceipt/list", "/stock/list", "/operation/list") else \
"wsdispatch_code" if path in ("/wsdispatch/list", "/financial/invoice/list") else None
opts, seen = [], set()
for r in rows:
nota = str(r.get(field) or "").strip() if field else ""
if nota and nota not in seen:
seen.add(nota); opts.append(nota)
return opts
@st.cache_data(ttl=CACHE_TTL_SEC, show_spinner=False)
def _load_suggestions_categories(base_guid: str, token: str, path: str):
"""Sugestões de Categoria (1 página)."""
session = _get_session()
url = f"{OP_API_BASE_URL}{path}"
headers = _auth_headers(token, base_guid, page=1, limit=1000, is_post=True)
try:
resp = session.post(url, headers=headers, data=json.dumps({}, ensure_ascii=False),
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
if resp.status_code >= 400: return []
data = resp.json()
except Exception:
return []
rows = data if isinstance(data, list) else data.get("data") or data.get("items") or []
seen, opts = set(), []
for r in rows:
cat = str(r.get("category_description") or r.get("Categoria") or "").strip()
if cat and cat not in seen:
seen.add(cat); opts.append(cat)
return opts
@st.cache_data(ttl=CACHE_TTL_SEC, show_spinner=False)
def _load_suggestions_addresses(base_guid: str, token: str):
"""Sugestões de Endereço (1 página)."""
session = _get_session()
url = f"{OP_API_BASE_URL}/address/list"
headers = _auth_headers(token, base_guid, page=1, limit=1000, is_post=True)
try:
resp = session.post(url, headers=headers, data=json.dumps({}, ensure_ascii=False),
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
if resp.status_code >= 400: return []
data = resp.json()
except Exception:
return []
rows = data if isinstance(data, list) else data.get("data") or data.get("items") or []
seen, opts = set(), []
for r in rows:
code = str(r.get("location_code") or r.get("location_id_code") or "").strip()
desc = str(r.get("description") or "").strip()
if not code: continue
label = f"{code} - {desc}" if desc else code
if code not in seen:
seen.add(code); opts.append((label, code))
return opts
@st.cache_data(ttl=CACHE_TTL_SEC, show_spinner=False)
def _load_suggestions_depositantes(base_guid: str, token: str):
"""Sugestões de Depositantes (nome+cnpj) a partir de /stock/list (1 página)."""
session = _get_session()
url = f"{OP_API_BASE_URL}/stock/list"
headers = _auth_headers(token, base_guid, page=1, limit=1000, is_post=True)
try:
resp = session.post(url, headers=headers, data=json.dumps({}, ensure_ascii=False),
timeout=(OP_CONNECT_TIMEOUT, OP_READ_TIMEOUT), proxies=PROXIES)
if resp.status_code >= 400: return []
data = resp.json()
except Exception:
return []
rows = data if isinstance(data, list) else data.get("data") or data.get("items") or []
opts, seen = [], set()
for r in rows:
name = str(r.get("customer_name") or r.get("customer_description") or r.get("Depositante") or "").strip()
doc = str(r.get("customer_document") or r.get("CNPJ_Depositante") or "").strip()
if not name and not doc: continue
label = f"{name} ({doc})" if doc else name
key = (label, doc)
if key not in seen:
seen.add(key); opts.append((label, doc))
return opts
# =====================================================
# Body mapping (filtros comuns) — server-side quando suportado
# =====================================================
def _map_common_filters_to_body(path: str, ui: dict) -> dict:
"""
Para múltiplas seleções, a maioria dos endpoints não aceita array.
Estratégia: enviar 1 valor (o primeiro) no body e aplicar os demais client-side.
"""
enderecos_sel = ui.get("enderecos_sel", [])
endereco_text = ui.get("endereco_text", "").strip()
nf_sel = ui.get("nota_fiscal_sel", "").strip()
sku_item = ui.get("sku_item", "").strip()
part_number = ui.get("part_number", "").strip()
destinatarios_docs = ui.get("destinatarios_docs", [])
cnpjs_livres = ui.get("cnpjs_livres", [])
data_op = ui.get("data_op") # date único
categorias_sel = ui.get("categorias_sel", [])
types_sel = ui.get("types_sel", [])
depositantes_docs = ui.get("depositantes_docs", []) # múltiplos CNPJs (sugestões)
mapped = {}
# Endereço
addr = (enderecos_sel[0] if enderecos_sel else "") or endereco_text
if addr:
mapped["location_code"] = addr
# Nota fiscal (apenas sugestões)
nf_val = nf_sel
if nf_val:
if path in ("/wsreceipt/list", "/stock/list", "/operation/list"):
mapped["wsreceipt_code"] = nf_val
if path in ("/wsdispatch/list", "/financial/invoice/list"):
mapped["wsdispatch_code"] = nf_val
# SKU / Part Number
if sku_item:
mapped["product_code"] = sku_item
if part_number:
mapped["part_number"] = part_number
# Categoria (1 server-side; demais client-side)
if len(categorias_sel) == 1:
mapped["category_description"] = categorias_sel[0]
# Destinatário CNPJ (1 server-side)
all_cnpjs_dest = [c for c in destinatarios_docs + cnpjs_livres if c]
if all_cnpjs_dest:
mapped["recipient_document"] = all_cnpjs_dest[0]
# Depositante CNPJ (1 server-side)
if depositantes_docs:
mapped["customer_document"] = depositantes_docs[0]
# Data operação — em /operation/list
if path == "/operation/list" and data_op:
mapped["date_ini"] = data_op.strftime("%Y-%m-%d")
mapped["date_fim"] = data_op.strftime("%Y-%m-%d")
# Type(API) — 1 server-side
if path == "/operation/list" and len(types_sel) == 1:
mapped["type"] = types_sel[0]
return mapped
# =====================================================
# Client-side filters (Data operação única, Categoria, múltiplos seleção)
# =====================================================
def _contains_ci(series: pd.Series, needle: str) -> pd.Series:
if series.dtype != "O":
series = series.astype("string")
return series.fillna("").str.contains(needle, case=False, na=False)
def _coalesce_datetime_ptbr(df: pd.DataFrame, candidates: list) -> pd.Series:
result = pd.Series([pd.NaT] * len(df), index=df.index)
for c in candidates:
if c in df.columns:
s = df[c]
s1 = pd.to_datetime(s, errors="coerce", dayfirst=True)
result = result.fillna(s1)
mask_nat = result.isna()
if mask_nat.any():
s2 = pd.to_datetime(s[mask_nat], errors="coerce", dayfirst=False)
result.loc[mask_nat] = s2
return result
def _classify_sbm(row: pd.Series) -> str:
"""
Classificação SBM baseada em DEPOSITANTE (não em Destinatário):
- Se Depositante contém 'SBM' e Tipo == RECEIVING -> SBM - LOAD
- Se Depositante contém 'SBM' e Tipo == DISPATCH -> SBM - BACKLOAD
- Caso contrário -> OUTROS
"""
depos = str(row.get("Depositante") or row.get("customer_name") or "").upper()
typ = str(row.get("Tipo") or row.get("type") or "").upper()
if "SBM" in depos:
if typ == "RECEIVING": return "SBM - LOAD"
if typ == "DISPATCH": return "SBM - BACKLOAD"
return "OUTROS"
def _apply_client_side_filters(df: pd.DataFrame, ui: dict, sbm_types_sel: list, types_sel: list) -> pd.DataFrame:
enderecos_sel = ui.get("enderecos_sel", [])
endereco_text = ui.get("endereco_text", "").strip()
nf_sel = ui.get("nota_fiscal_sel", "").strip()
sku_item = ui.get("sku_item", "").strip()
part_number = ui.get("part_number", "").strip()
destinatarios_docs = [c.strip() for c in ui.get("destinatarios_docs", []) if c.strip()]
cnpjs_livres = [c.strip() for c in ui.get("cnpjs_livres", []) if c.strip()]
data_op = ui.get("data_op") # único
categorias_sel = ui.get("categorias_sel", [])
depositantes_docs = [c.strip() for c in ui.get("depositantes_docs", []) if c.strip()]
depositantes_nomes = [s.strip() for s in ui.get("depositantes_nomes", []) if str(s).strip()]
# Endereços
addr_cols = [c for c in ["Endereço", "location_code", "location_id_code"] if c in df.columns]
if addr_cols:
if enderecos_sel:
mask = False
for c in addr_cols:
mask = df[c].isin(enderecos_sel) | mask
df = df[mask].copy()
if endereco_text:
mask = False
for c in addr_cols:
mask = _contains_ci(df[c], endereco_text) | mask
df = df[mask].copy()
# Nota Fiscal (apenas seleção)
if nf_sel:
cols = [c for c in ["Nota Fiscal", "NF Saída", "wsreceipt_code", "wsdispatch_code"] if c in df.columns]
mask = False
for c in cols:
mask = _contains_ci(df[c], nf_sel) | mask
df = df[mask].copy()
# SKU / item
if sku_item:
cols = [c for c in ["SKU", "product_code"] if c in df.columns]
mask = False
for c in cols:
mask = _contains_ci(df[c], sku_item) | mask
df = df[mask].copy()
# Part Number
if part_number and "part_number" in df.columns:
df = df[_contains_ci(df["part_number"], part_number)].copy()
# CNPJ Destinatários
all_cnpjs_dest = destinatarios_docs + cnpjs_livres
if all_cnpjs_dest:
cols = [c for c in ["CNPJ_Destinatário", "recipient_document"] if c in df.columns]
mask = False
for c in cols:
mask = df[c].isin(all_cnpjs_dest) | mask
df = df[mask].copy()
# Depositantes (por CNPJ e/ou por nome) — multisseleção
dep_doc_cols = [c for c in ["CNPJ_Depositante", "customer_document"] if c in df.columns]
dep_name_cols = [c for c in ["Depositante", "customer_name", "customer_description"] if c in df.columns]
if depositantes_docs and dep_doc_cols:
mask = False
for c in dep_doc_cols:
mask = df[c].isin(depositantes_docs) | mask
df = df[mask].copy()
if depositantes_nomes and dep_name_cols:
mask = False
for c in dep_name_cols:
mask_local = False
for name in depositantes_nomes:
mask_local = _contains_ci(df[c], name) | mask_local
mask = mask | mask_local
df = df[mask].copy()
# Data da operação (única)
if data_op:
op_date = _coalesce_datetime_ptbr(df, [
"Data Operação","Data","Data Emissão","Data Agendada",
"Validade","Fabricação",
"date","create_date","issue_date","scheduled_date","expiration_date","manufacturing_date","last_update_date"
])
data_ini = pd.to_datetime(data_op)
data_fim = data_ini + pd.Timedelta(days=1) - pd.Timedelta(milliseconds=1)
df = df[(op_date >= data_ini) & (op_date <= data_fim)].copy()
# Categoria
if categorias_sel:
cols = [c for c in ["Categoria", "category_description"] if c in df.columns]
mask = False
for c in cols:
mask = df[c].isin(categorias_sel) | mask
df = df[mask].copy()
# Tipo (API) multisseleção ('Tipo' ou 'type')
if types_sel and ("Tipo" in df.columns or "type" in df.columns):
col_t = "Tipo" if "Tipo" in df.columns else "type"
df = df[df[col_t].isin(types_sel)].copy()
# Tipo de operação (SBM) — baseado em DEPOSITANTE
if sbm_types_sel:
sbm_series = df.apply(_classify_sbm, axis=1)
df = df[sbm_series.isin(sbm_types_sel)].copy()
return df
# =====================================================
# Formatação (R$, SKU, datas PT-BR, remove colunas vazias)
# =====================================================
def _format_currency_brl(val):
try:
x = float(val)
except Exception:
return val
s = f"{x:,.2f}"
s = s.replace(",", "X").replace(".", ",").replace("X", ".")
return f"R$ {s}"
def _format_dates_ptbr(df: pd.DataFrame) -> pd.DataFrame:
candidate_cols = [
"Data Operação","Data","Data Emissão","Validade","Fabricação",
"Data Agendada","date","create_date","issue_date","expiration_date",
"manufacturing_date","scheduled_date","last_update_date"
]
for col in df.columns:
if col in candidate_cols:
try:
s = pd.to_datetime(df[col], errors="coerce", dayfirst=True)
nat_mask = s.isna()
if nat_mask.any():
s.loc[nat_mask] = pd.to_datetime(df[col][nat_mask], errors="coerce", dayfirst=False)
df[col] = s.dt.strftime("%d/%m/%Y").fillna(df[col])
except Exception:
pass
return df
def _format_dataframe(df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df
# R$
money_candidates = ["Vr. Total", "Vr. Unitário", "Item_Value",
"item_total_value", "item_unit_value", "receipt_value", "total_value"]
for col in money_candidates:
if col in df.columns:
df[col] = df[col].apply(_format_currency_brl)
# SKU sem zeros à esquerda
for col in ["SKU", "product_code"]:
if col in df.columns:
def strip_zeros(v):
s = str(v or "").strip()
if not s:
return s
s2 = s.lstrip("0")
return s2 if s2 else "0"
df[col] = df[col].apply(strip_zeros)
# Datas PT-BR
df = _format_dates_ptbr(df)
# Remove colunas vazias
to_drop = []
for col in df.columns:
serie = df[col]
empties = serie.isna() | (serie.astype(str).str.strip() == "")
if empties.all():
to_drop.append(col)
if to_drop:
df = df.drop(columns=to_drop)
return df
# =====================================================
# Helpers de KPI (somas seguras, moeda BRL, contagens)
# =====================================================
def _to_num_brl(val) -> float:
"""Converte 'R$ 1.234,56' ou num/str para float (1234.56)."""
if val is None: return 0.0
if isinstance(val, (int, float)): return float(val)
s = str(val).strip()
if not s: return 0.0
s = s.replace("R$", "").replace(" ", "")
s = s.replace(".", "").replace(",", ".")
try:
return float(s)
except Exception:
return 0.0
def _safe_sum(series: pd.Series) -> float:
"""Soma segura de séries possivelmente formatadas em BRL (strings)."""
if series is None or series.empty: return 0.0
return float(series.apply(_to_num_brl).sum())
def _nunique(df: pd.DataFrame, colnames: list) -> int:
"""Conta distintos na primeira coluna existente em 'colnames'."""
for c in colnames:
if c in df.columns:
return int(df[c].nunique(dropna=True))
return 0
def _count_status(df: pd.DataFrame, col: str, positivos=("ATIVO","ACTIVE","BLOQUEADO","BLOCKED")) -> int:
"""Conta registros com 'Status' em valores positivos (case-insensitive)."""
if col not in df.columns: return 0
s = df[col].astype(str).str.upper()
return int(s.isin([p.upper() for p in positivos]).sum())
# =====================================================
# KPIs dinâmicos por endpoint (usa colunas pós-RENAME_MAP)
# =====================================================
def _build_kpis(path: str, df: pd.DataFrame) -> dict:
kpis = {}
if path == "/address/blocking/list":
kpis["Endereços bloqueados (distintos)"] = _nunique(df, ["Endereço","location_code","location_id_code"])
kpis["Bloqueios (linhas)"] = int(len(df))
kpis["Motivos de bloqueio (distintos)"] = _nunique(df, ["Motivo Bloqueio","block_reason"])
if "Status" in df.columns:
kpis["Bloqueios ativos"] = _count_status(df, "Status", positivos=("BLOQUEADO","BLOCKED"))
elif path == "/address/list":
kpis["Endereços (distintos)"] = _nunique(df, ["Endereço","location_code","location_id_code"])
if "Status" in df.columns:
kpis["Endereços ativos"] = _count_status(df, "Status", positivos=("ATIVO","ACTIVE"))
elif path == "/stock/list":
# IMPORTANT: Valor total em estoque deve vir DA COLUNA 'Vr. Total'
qtde_total = _safe_sum(df["Qtde"]) if "Qtde" in df.columns else 0.0
qtde_reserva = _safe_sum(df["Qtde Reservada"]) if "Qtde Reservada" in df.columns else 0.0
valor_total = _safe_sum(df["Vr. Total"]) if "Vr. Total" in df.columns else 0.0
kpis["SKUs (distintos)"] = _nunique(df, ["SKU","product_code"])
kpis["Itens em estoque (Qtde)"] = qtde_total
kpis["Itens reservados (Qtde)"] = qtde_reserva
kpis["Valor total em estoque (R$)"] = valor_total
kpis["Lotes (distintos)"] = _nunique(df, ["Lote","lot"])
kpis["Endereços (distintos)"] = _nunique(df, ["Endereço","location_code"])
kpis["Depositantes (distintos)"] = _nunique(df, ["Depositante","customer_description","customer_name"])
elif path == "/wsreceipt/list":
qtde_total = _safe_sum(df["Qtde"]) if "Qtde" in df.columns else 0.0
valor_total = _safe_sum(df["Vr. Total"]) if "Vr. Total" in df.columns else 0.0
kpis["NF de entrada (distintas)"] = _nunique(df, ["NF Entrada","wsreceipt_code"])
kpis["Linhas (itens)"] = int(len(df))
kpis["Qtde total (Entrada)"] = qtde_total
kpis["Valor total (Entrada) R$"] = valor_total
kpis["Depositantes (distintos)"] = _nunique(df, ["Depositante","customer_name"])
elif path == "/wsdispatch/list":
qtde_total = _safe_sum(df["Qtde"]) if "Qtde" in df.columns else 0.0
valor_total = _safe_sum(df["Vr. Total"]) if "Vr. Total" in df.columns else 0.0
kpis["NF de saída (distintas)"] = _nunique(df, ["NF Saída","wsdispatch_code"])
kpis["Linhas (itens)"] = int(len(df))
kpis["Qtde total (Saída)"] = qtde_total
kpis["Valor total (Saída) R$"] = valor_total
kpis["Destinatários (distintos)"] = _nunique(df, ["Destinatário","recipient_description"])
elif path == "/operation/list":
qtde_total = _safe_sum(df["Qtde"]) if "Qtde" in df.columns else 0.0
kpis["Operações (linhas)"] = int(len(df))
kpis["Tipos (distintos)"] = _nunique(df, ["Tipo","type"])
kpis["Qtde total (Operações)"] = qtde_total
elif path == "/product/list":
kpis["Produtos (linhas)"] = int(len(df))
kpis["Categorias (distintas)"] = _nunique(df, ["Categoria","category_description"])
kpis["Grupos (distintos)"] = _nunique(df, ["Grupo","group_description"])
kpis["Status (distintos)"] = _nunique(df, ["Status","status"])
elif path == "/financial/invoice/list":
valor_total = _safe_sum(df["Vr. Total"]) if "Vr. Total" in df.columns else 0.0
kpis["Faturas (distintas)"] = _nunique(df, ["Número Fatura","invoice_number"])
kpis["Valor total (R$)"] = valor_total
kpis["Status (distintos)"] = _nunique(df, ["Status","status"])
elif path == "/monitor/nfe/list":
kpis["NFes (distintas)"] = _nunique(df, ["Chave NFe","nfe_key"])
kpis["Status SEFAZ (distintos)"] = _nunique(df, ["Status SEFAZ","status"])
elif path == "/yms/scheduling/list":
kpis["Agendamentos (distintos)"] = _nunique(df, ["ID Agendamento","scheduling_id"])
kpis["Status (distintos)"] = _nunique(df, ["Status","status"])
kpis["Pátios (distintos)"] = _nunique(df, ["Pátio","yard"])
kpis["Docas (distintas)"] = _nunique(df, ["Doca","dock"])
elif path == "/cargorelease/list":
qtde_total = _safe_sum(df["Qtde"]) if "Qtde" in df.columns else 0.0
kpis["Pedidos (distintos)"] = _nunique(df, ["Pedido","cargorelease_code"])
kpis["Status (distintos)"] = _nunique(df, ["Status","status"])
kpis["Qtde total (Pedidos)"] = qtde_total
else:
kpis["Linhas"] = int(len(df))
return kpis
# =====================================================
# Cache de consulta (DF final) — TTL configurável
# =====================================================
@st.cache_data(ttl=CACHE_TTL_SEC, show_spinner=True)
def _run_query_cached(path: str, base_guid: str, token: str, body_filter: dict, limit_per_page: int, max_pages_override, ui_common: dict, sbm_types_sel: list):
rows = _call_list_paginated(
path, base_guid, token, body_filter,
limit=limit_per_page,
max_pages_override=max_pages_override
)
df = pd.json_normalize(rows)
rename = RENAME_MAP.get(path, {})
if rename:
df = df.rename(columns={k: v for k, v in rename.items() if k in df.columns})
df = _apply_client_side_filters(df, ui_common, sbm_types_sel, ui_common.get("types_sel", []))
df = _format_dataframe(df)
return df
# =====================================================
# UI — Type(API), Destinatários, Endereços; Data operação (única); Categoria; SBM; Depositantes
# + Modo rápido (consulta leve) e limite por página
# + Debounce e Cancelar
# =====================================================
def _render_form(report_label: str, token: str):
with st.form(f"form_{report_label.replace(' ', '_')}"):
base = st.selectbox("Base (x-user-enterprise-id):", list(BASES_MAP.keys()), index=0)
# Performance
col_perf1, col_perf2 = st.columns(2)
with col_perf1:
quick_mode = st.checkbox("Consulta leve (rápida)", value=True, help="Coleta apenas 1 página e mostra colunas essenciais.")
with col_perf2:
limit_per_page = st.number_input("Itens por página", min_value=50, max_value=2000, value=500, step=50, help="Se suportado pelo servidor.")
# Paginação manual (quando necessário)
colp1, colp2 = st.columns(2)
with colp1:
only_first = st.checkbox("Coletar apenas a primeira página (evitar limite/timeouts)", value=False)
with colp2:
max_pages_ui = st.number_input("Máximo de páginas (0=∞)", min_value=0, max_value=100, value=0, step=1)
path = ENDPOINTS[report_label]
# Type (API) multisseleção
types_sel = st.multiselect("Type (API)", OPERATION_TYPES, default=[])
# Data da operação (única)
data_op = st.date_input("Data da operação (opcional)", value=None)
# Categoria (sugestões) multisseleção
categorias_opts = _load_suggestions_categories(BASES_MAP[base], token, path)
categorias_sel = st.multiselect("Categoria (sugestões)", categorias_opts, default=[])
# Endereços (sugestões) multisseleção + texto
addr_opts = _load_suggestions_addresses(BASES_MAP[base], token) # [(label, code)]
addr_labels = [lbl for (lbl, code) in addr_opts]
addr_values = [code for (lbl, code) in addr_opts]
end_sel_labels = st.multiselect("Endereços (sugestões)", addr_labels, default=[])
enderecos_sel = []
for sel in end_sel_labels:
i = addr_labels.index(sel)
enderecos_sel.append(addr_values[i])
endereco_text = st.text_input("Endereço (texto)")
# Nota Fiscal (apenas sugestões)
nf_opts = _load_suggestions_notas(BASES_MAP[base], token, path)
nota_fiscal_sel = st.selectbox("Nota Fiscal (sugestões)", [""] + nf_opts, index=0)
# Item / SKU e Part Number
colA, colB = st.columns(2)
with colA:
sku_item = st.text_input("Item / SKU")
with colB:
part_number = st.text_input("Part Number")
# Destinatários (sugestões multisseleção) + CNPJ múltiplos
dest_opts = _load_suggestions_recipients(BASES_MAP[base], token) # [(label, cnpj)]
dest_labels = [lbl for (lbl, cnpj) in dest_opts]
dest_values = [cnpj for (lbl, cnpj) in dest_opts]
dest_sel_labels = st.multiselect("Destinatários (sugestões)", dest_labels, default=[])
destinatarios_docs = []
for sel in dest_sel_labels:
i = dest_labels.index(sel)
destinatarios_docs.append(dest_values[i])
cnpj_livre_text = st.text_input("CNPJ(s) do destinatário (múltiplos, separados por vírgula)")
cnpjs_livres = [c.strip() for c in cnpj_livre_text.split(",")] if cnpj_livre_text.strip() else []
# Depositantes (sugestões multisseleção)
dep_opts = _load_suggestions_depositantes(BASES_MAP[base], token) # [(label, cnpj)]
dep_labels = [lbl for (lbl, cnpj) in dep_opts]
dep_values = [cnpj for (lbl, cnpj) in dep_opts]
dep_sel_labels = st.multiselect("Depositantes (sugestões)", dep_labels, default=[])
depositantes_docs, depositantes_nomes = [], []
for sel in dep_sel_labels:
i = dep_labels.index(sel)
depositantes_docs.append(dep_values[i])
# Também guardamos o nome (parte antes do " (CNPJ)") para filtro por nome, se desejar
name = sel.split(" (")[0].strip() if " (" in sel else sel
depositantes_nomes.append(name)
# Tipo de operação (SBM) multisseleção — baseado em Depositante
sbm_types_sel = st.multiselect("Tipo de operação (SBM)", SBM_TYPES, default=[]) if report_label == "Estoque" else []
# Server-side (quando suportado)
body = {}
if report_label == "Operações":
st.markdown("**Filtros de Operações (server-side)**")
if len(types_sel) == 1:
body["type"] = types_sel[0]
if data_op:
body["date_ini"] = data_op.strftime("%Y-%m-%d")
body["date_fim"] = data_op.strftime("%Y-%m-%d")
elif report_label == "Agendamento":
st.markdown("**Filtros de Agendamento (YMS)**")
if data_op:
body["date_ini"] = data_op.strftime("%Y-%m-%d")
body["date_fim"] = data_op.strftime("%Y-%m-%d")
ui_common = {
"enderecos_sel": enderecos_sel,
"endereco_text": endereco_text,
"nota_fiscal_sel": nota_fiscal_sel, # sem campo texto
"sku_item": sku_item,
"part_number": part_number,
"destinatarios_docs": destinatarios_docs,
"cnpjs_livres": cnpjs_livres,
"data_op": data_op, # único
"categorias_sel": categorias_sel,
"types_sel": types_sel,
"depositantes_docs": depositantes_docs, # multisseleção
"depositantes_nomes": depositantes_nomes, # para match por nome, se desejar
}
mapped = _map_common_filters_to_body(path, ui_common)
body.update({k: v for k, v in mapped.items() if v})
submitted = st.form_submit_button("Consultar", type="primary", use_container_width=True)
# Debounce leve
if submitted:
last_submit = st.session_state.get("__last_submit_ts__", 0)
now = time.time()
if now - last_submit < 2.0:
st.info("Aguarde um instante e evite clicar repetidamente em 'Consultar'.")
submitted = False
st.session_state["__last_submit_ts__"] = now
max_pages_effective = 1 if only_first else int(max_pages_ui or 0)
return (
submitted,
BASES_MAP[base],
max_pages_effective,
body,
ui_common,
sbm_types_sel,
quick_mode,
limit_per_page,
)
# =====================================================
# TELA PRINCIPAL
# =====================================================
def main():
# Oculta nav lateral de outros módulos
st.markdown("""
<style>
[data-testid="stSidebarNav"] { display: none !important; }
</style>
""", unsafe_allow_html=True)
st.title("⚙️ Operação | Relatórios via API (Mayasuite)")
# Permissão
if not verificar_permissao("operacao") and st.session_state.get("perfil") != "admin":
st.error("⛔ Acesso não autorizado.")
return
if not (OP_LOGIN_EMAIL and OP_LOGIN_PASSWORD) and not OP_ACCESS_TOKEN and not (OAUTH_TOKEN_URL and OAUTH_CLIENT_ID and OAUTH_CLIENT_SECRET):
st.warning("⚠️ Configure OP_LOGIN_EMAIL/OP_LOGIN_PASSWORD, ou OP_ACCESS_TOKEN, ou OAuth2 (OAUTH_TOKEN_URL/CLIENT_ID/CLIENT_SECRET) no .env (nunca hardcode).")
st.info("Ex.: OP_LOGIN_EMAIL=api@armmatriz.com.br / OP_LOGIN_PASSWORD=*** OU OP_ACCESS_TOKEN=jwt... OU OAuth2 client_credentials.")
return
# Token antes do form (sugestões)
try:
token = _get_token(OP_LOGIN_EMAIL, OP_LOGIN_PASSWORD)
except Exception as e:
st.error(f"Login/API falhou: {e}")
return
# Sidebar: token info
if OP_ACCESS_TOKEN:
payload = _jwt_payload(OP_ACCESS_TOKEN)
if payload:
st.sidebar.caption(f"🔑 Token carregado\n• iat: {_fmt_ts(payload.get('iat'))}\n• exp: {_fmt_ts(payload.get('exp'))}")
st.sidebar.caption(f"🌐 Base API: {OP_API_BASE_URL}")
st.session_state["__active_module__"] = "operacao"
st.sidebar.markdown("### Relatórios")
report_label = st.sidebar.selectbox("Selecione:", list(ENDPOINTS.keys()), index=0)
path = ENDPOINTS[report_label]
# ⚡ Cache controls (TTL, limpar caches)
with st.sidebar.expander("⚡ Cache", expanded=False):
ttl_ui = st.slider("TTL do cache (minutos)", min_value=1, max_value=60, value=CACHE_TTL_MIN, step=1)
st.session_state["__cache_ttl_sec__"] = ttl_ui * 60
colc1, colc2 = st.columns(2)
with colc1:
if st.button("🧹 Limpar cache de dados"):
try:
_run_query_cached.clear()
except Exception:
pass
st.success("Cache de dados limpo.")
st.rerun()
with colc2:
if st.button("🧹 Limpar cache de sugestões"):
try:
_load_suggestions_recipients.clear()
_load_suggestions_notas.clear()
_load_suggestions_categories.clear()
_load_suggestions_addresses.clear()
_load_suggestions_depositantes.clear()
except Exception:
pass
st.success("Cache de sugestões limpo.")
st.rerun()
submitted, base_guid, max_pages_effective, body_filter, ui_common, sbm_types_sel, quick_mode, limit_per_page = _render_form(report_label, token)
if not submitted:
return
# Botão cancelar
cancel_col, _ = st.columns([1,3])
with cancel_col:
if st.button("⛔ Cancelar consulta"):
st.session_state[CANCEL_TOKEN_KEY] = True
st.warning("Consulta cancelada pelo usuário.")
return
st.session_state[CANCEL_TOKEN_KEY] = False
# Modo rápido → força apenas 1 página
effective_max_pages = 1 if quick_mode else (max_pages_effective if max_pages_effective > 0 else None)
# Executa consulta com cache
t0 = time.time()
try:
df = _run_query_cached(path, base_guid, token, body_filter, limit_per_page, effective_max_pages, ui_common, sbm_types_sel)
except Exception as e:
st.error(f"Falha na consulta: {e}")
# Botão para tentar novamente
if st.button("🔁 Tentar novamente agora"):
try:
_run_query_cached.clear()
df = _run_query_cached(path, base_guid, token, body_filter, limit_per_page, effective_max_pages, ui_common, sbm_types_sel)
except Exception as e2:
st.error(f"Falha na nova tentativa: {e2}")
return
else:
return
latency_ms = int((time.time() - t0) * 1000)
if df.empty:
st.info("Nenhum registro retornado.")
return
st.success(f"✅ Consulta concluída ({latency_ms} ms). Registros: {len(df)}")
pages_done = int(st.session_state.get("op_pages_processed", 0))
st.caption(f"📄 Páginas coletadas: {pages_done}")
# Visualização essencial em modo rápido
if quick_mode:
essential_cols = [c for c in [
"Data Operação","Data","Nota Fiscal","NF Saída","SKU","Descrição","Qtde",
"Unidade","Endereço","Categoria","Destinatário","CNPJ_Destinatário","Tipo","Depositante","CNPJ_Depositante"
] if c in df.columns]
df_view = df[essential_cols].copy() if essential_cols else df
else:
df_view = df
# KPIs básicos
k1, k2, k3 = st.columns(3)
with k1: st.metric("Registros", len(df_view))
with k2: st.metric("Latência (ms)", latency_ms)
with k3: st.metric("Colunas", df_view.shape[1])
# ======== NOVOS CARDS DINÂMICOS POR CONSULTA ========
kpis = _build_kpis(path, df) # usa o df já renomeado/filtrado
if kpis:
st.markdown("#### 📈 Indicadores da consulta")
cols = st.columns(min(4, max(1, len(kpis))))
for i, (label, value) in enumerate(kpis.items()):
# Formata moeda R$ quando aplicável
if isinstance(value, (int, float)) and ("R$" in label or "Valor" in label):
display_val = _format_currency_brl(value)
else:
display_val = value
cols[i % len(cols)].metric(label, display_val)
# Tabela
st.dataframe(df_view, use_container_width=True)
# Export
col_a, col_b = st.columns(2)
with col_a:
filtros_export = {"base": base_guid, "max_pages": effective_max_pages or 0, **body_filter, **{f"ui_{k}": v for k, v in ui_common.items()}, "sbm_types": sbm_types_sel}
excel_bytes = _export_excel(df, report_label, filtros_export)
st.download_button("📥 Exportar Excel", excel_bytes, file_name=f"operacao_{report_label.replace(' ','_')}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
with col_b:
csv_bytes = _export_csv(df)
st.download_button("📥 Exportar CSV", csv_bytes, file_name=f"operacao_{report_label.replace(' ','_')}.csv", mime="text/csv")
# Auditoria
try:
registrar_log(usuario=st.session_state.get("usuario"),
acao=f"Operação/API: {report_label} (base={base_guid}, {latency_ms}ms, reg={len(df)})",
tabela="operacao_api", registro_id=None)
except Exception:
pass
# Se for executado diretamente (opcional para debug local)
if __name__ == "__main__":
st.set_page_config(page_title="Operação | ARM", layout="wide")
main()