IOI-RUN / app.py
Roudrigus's picture
Update app.py
cc39037 verified
raw
history blame
36.5 kB
# -*- coding: utf-8 -*-
import os
import sys
import platform
import importlib
from uuid import uuid4
from datetime import date, datetime, time
import streamlit as st
from dotenv import load_dotenv
from sqlalchemy import text, func
# ✅ Usa toda a largura da página (chamar antes de qualquer outro st.*)
st.set_page_config(layout="wide")
# Carrega variáveis de ambiente
load_dotenv()
# --------------------------------------------------------------------------------------
# Utilitários de robustez / compatibilidade
# --------------------------------------------------------------------------------------
def _module_stub(name: str, hint: str = ""):
"""Stub com .main() e .pagina() que exibe aviso amigável quando o módulo
não puder ser importado/usado no ambiente atual."""
class _Stub:
def __init__(self, _name, _hint):
self.__name = _name
self.__hint = _hint or "Módulo indisponível nesta hospedagem."
def main(self, *args, **kwargs):
st.warning(f"🔒 Módulo **{self.__name}** indisponível.\n\n{self.__hint}")
def pagina(self, *args, **kwargs):
self.main(*args, **kwargs)
return _Stub(name, hint)
def _try_import(module_name: str, on_fail_hint: str = ""):
"""Import 'seguro': caso falhe, retorna stub que não derruba o app."""
try:
return importlib.import_module(module_name)
except Exception as e:
return _module_stub(module_name, f"{on_fail_hint}\n\n**Detalhe técnico:** {e}")
def _ensure_db_case_alias(expected_names=("load.db", "Load.db", "LOAD.DB")):
"""Se existir um .db com caixa diferente, cria CÓPIA 'load.db' (Linux é case-sensitive)."""
base = os.path.abspath(os.getcwd())
candidates = [os.path.join(base, n) for n in expected_names]
lower = os.path.join(base, "load.db")
if os.path.exists(lower):
return lower
for path in candidates:
if os.path.exists(path):
try:
import shutil
shutil.copy(path, lower)
return lower
except Exception:
pass
return lower
# Ajuste preventivo do case do banco (caso módulos internos usem 'load.db')
_DB_ALIAS = _ensure_db_case_alias()
# --------------------------------------------------------------------------------------
# Imports internos essenciais
# --------------------------------------------------------------------------------------
from utils_operacao import obter_grupos_disponiveis, obter_modulos_para_grupo
from utils_info import INFO_CONTEUDO, INFO_MODULOS, INFO_MAP_PAGINA_ID
from utils_permissoes import verificar_permissao
from modules_map import MODULES
from banco import engine, Base, SessionLocal
from models import QuizPontuacao, IOIRunSugestao, AvisoGlobal
# ----- login / logo: import e “safe wrappers” -----------------------------------------
# login()
try:
from login import login as _login_orig
except Exception as _e_login_import:
_login_orig = None
_login_import_err = _e_login_import
def login_safe():
"""Tenta usar o login normal; se falhar, permite autologin (DISABLE_AUTH=1)
ou opção emergencial (ALLOW_EMERGENCY_LOGIN=1)."""
# Autologin (debug)
if os.getenv("DISABLE_AUTH", "0") == "1":
st.session_state.logado = True
st.session_state.usuario = os.getenv("DEMO_USER", "demo")
st.session_state.perfil = os.getenv("DEMO_PERFIL", "admin")
st.session_state.email = os.getenv("DEMO_EMAIL", "demo@example.com")
st.info("🔓 Autologin ativado (DISABLE_AUTH=1). **Não use em produção.**")
return
# Fluxo normal
if _login_orig:
try:
_login_orig()
return
except Exception as e:
st.error(f"Falha ao carregar tela de login (login.py): {e}")
# Emergencial (opcional)
if os.getenv("ALLOW_EMERGENCY_LOGIN", "0") == "1":
with st.form("emergency_login"):
u = st.text_input("Usuário")
p = st.text_input("Senha", type="password")
ok = st.form_submit_button("Entrar (emergencial)")
if ok and u:
st.session_state.logado = True
st.session_state.usuario = u
st.session_state.perfil = "admin"
st.session_state.email = f"{u}@local"
st.warning("⚠️ Login emergencial ativo. Desative após os testes (ALLOW_EMERGENCY_LOGIN=0).")
st.rerun()
else:
st.info(
"Login temporariamente indisponível. "
"Para testar no Spaces, defina **DISABLE_AUTH=1** em *Settings → Secrets*."
)
# exibir_logo()
try:
from utils_layout import exibir_logo as _exibir_logo_orig
except Exception as _e_layout_import:
_exibir_logo_orig = None
_exibir_logo_import_err = _e_layout_import
def _resolve_logo_path() -> str | None:
"""Procura a logo localmente; aceita LOGO_PATH absoluto ou relativo."""
cand = []
env_path = os.getenv("LOGO_PATH")
if env_path:
cand.append(env_path)
cand += ["logo.png", "assets/logo.png", "images/logo.png", "static/logo.png"]
base = os.path.abspath(os.getcwd())
for p in cand:
if not p:
continue
full = p if os.path.isabs(p) else os.path.join(base, p)
if os.path.exists(full):
return full
return None
def exibir_logo_safe(top: bool = False, sidebar: bool = False):
"""Usa sua função original; se falhar, exibe imagem local ou fallback de texto."""
try:
if _exibir_logo_orig:
return _exibir_logo_orig(top=top, sidebar=sidebar)
except Exception as e:
st.sidebar.warning(f"Logo padrão indisponível ({e}). Usando fallback.")
path = _resolve_logo_path()
if path:
if top:
st.image(path, use_column_width=False)
if sidebar:
st.sidebar.image(path, use_column_width=True)
else:
if top:
st.markdown("### IOI‑RUN")
if sidebar:
st.sidebar.markdown("### IOI‑RUN")
# --------------------------------------------------------------------------------------
# Imports de páginas/módulos com fallback
# --------------------------------------------------------------------------------------
formulario = _try_import("formulario")
consulta = _try_import("consulta")
relatorio = _try_import("relatorio")
administracao = _try_import("administracao")
quiz = _try_import("quiz")
ranking = _try_import("ranking")
quiz_admin = _try_import("quiz_admin")
usuarios_admin = _try_import("usuarios_admin", "Adicione **bcrypt** ao requirements.txt.")
videos = _try_import("videos")
auditoria = _try_import("auditoria")
importar_excel = _try_import("importar_excel")
calendario = _try_import("calendario", "Instale **streamlit-calendar** no requirements.txt.")
auditoria_cleanup = _try_import("auditoria_cleanup")
jogos = _try_import("jogos")
db_tools = _try_import("db_tools")
db_admin = _try_import("db_admin")
db_monitor = _try_import("db_monitor")
operacao = _try_import("operacao")
db_export_import = _try_import("db_export_import")
resposta = _try_import("resposta") # 📬 Admin
repositorio_load = _try_import("repositorio_load")
produtividade_especialista = _try_import("Produtividade_Especialista")
rnc = _try_import("rnc")
rnc_listagem = _try_import("rnc_listagem")
rnc_relatorio = _try_import("rnc_relatorio")
sugestoes_usuario = _try_import("sugestoes_usuario")
repo_rnc = _try_import("repo_rnc")
recebimento = _try_import("recebimento")
# Outlook/COM (somente Windows). Em Linux/Spaces: desativado ou stub.
_DISABLE_OUTLOOK = os.getenv("DISABLE_OUTLOOK", "0") == "1"
if platform.system().lower() != "windows" or _DISABLE_OUTLOOK:
outlook_relatorio = _module_stub(
"outlook_relatorio",
"Este módulo usa automação COM do Outlook (pywin32), disponível **apenas no Windows**.\n"
"No servidor Linux, ele foi desativado. Para manter essa função em nuvem, use **Microsoft Graph API**."
)
else:
outlook_relatorio = _try_import("outlook_relatorio")
# --------------------------------------------------------------------------------------
# Roteamento condicional de banco
# --------------------------------------------------------------------------------------
try:
from db_router import current_db_choice, bank_label
_HAS_ROUTER = True
except Exception:
_HAS_ROUTER = False
def current_db_choice() -> str:
return "prod"
def bank_label(choice: str) -> str:
return "🟢 Produção" if choice == "prod" else "🔴 Teste"
# --------------------------------------------------------------------------------------
# RERUN por querystring (?rr=1)
# --------------------------------------------------------------------------------------
def _get_query_params():
try:
return dict(st.query_params) # Streamlit >= 1.32
except Exception:
try:
return dict(st.experimental_get_query_params())
except Exception:
return {}
def _set_query_params(new_params: dict):
try:
st.query_params = new_params
except Exception:
try:
st.experimental_set_query_params(**new_params)
except Exception:
pass
def _check_rerun_qs(pagina_atual: str = ""):
try:
if st.session_state.get("__qs_rr_consumed__", False):
return
# Não aplica RR dentro de módulos sensíveis
if pagina_atual in ("resposta", "outlook_relatorio", "formulario"):
return
params = _get_query_params()
rr_raw = params.get("rr", ["0"])
rr = rr_raw[0] if isinstance(rr_raw, (list, tuple)) else str(rr_raw)
if str(rr).lower() in ("1", "true"):
new_params = {k: v for k, v in params.items() if k != "rr"}
_set_query_params(new_params)
st.session_state["__qs_rr_consumed__"] = True
st.rerun()
except Exception:
pass
# --------------------------------------------------------------------------------------
# Sessões / DB helpers
# --------------------------------------------------------------------------------------
def _get_db_session():
try:
from db_router import get_session_for_current_db
return get_session_for_current_db()
except Exception:
pass
try:
from db_router import get_engine_for_current_db
from sqlalchemy.orm import sessionmaker
Eng = get_engine_for_current_db()
return sessionmaker(bind=Eng)()
except Exception:
pass
from sqlalchemy.orm import sessionmaker
return sessionmaker(bind=engine)()
Base.metadata.create_all(bind=engine)
def quiz_respondido_hoje(usuario: str) -> bool:
db = _get_db_session()
try:
inicio_dia = datetime.combine(date.today(), time.min)
return (
db.query(QuizPontuacao)
.filter(
QuizPontuacao.usuario == usuario,
QuizPontuacao.data >= inicio_dia
)
.first()
is not None
)
finally:
try: db.close()
except Exception: pass
_SESS_TTL_MIN = 5 # janela para considerar "online"
def _get_session_id() -> str:
if "_sid" not in st.session_state:
st.session_state["_sid"] = f"{uuid4()}"
return st.session_state["_sid"]
def _ensure_sessao_table(db) -> None:
dialect = db.bind.dialect.name
if dialect == "sqlite":
db.execute(text("""
CREATE TABLE IF NOT EXISTS sessao_web (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario TEXT NOT NULL,
session_id TEXT NOT NULL UNIQUE,
last_seen TIMESTAMP NOT NULL,
ativo INTEGER NOT NULL DEFAULT 1
)
"""))
elif dialect in ("postgresql", "postgres"):
db.execute(text("""
CREATE TABLE IF NOT EXISTS sessao_web (
id SERIAL PRIMARY KEY,
usuario TEXT NOT NULL,
session_id TEXT NOT NULL UNIQUE,
last_seen TIMESTAMPTZ NOT NULL,
ativo BOOLEAN NOT NULL DEFAULT TRUE
)
"""))
else: # mysql / mariadb
db.execute(text("""
CREATE TABLE IF NOT EXISTS sessao_web (
id INT AUTO_INCREMENT PRIMARY KEY,
usuario VARCHAR(255) NOT NULL,
session_id VARCHAR(255) NOT NULL UNIQUE,
last_seen TIMESTAMP NOT NULL,
ativo TINYINT(1) NOT NULL DEFAULT 1
)
"""))
db.commit()
def _session_heartbeat(usuario: str) -> None:
if not usuario:
return
db = _get_db_session()
try:
_ensure_sessao_table(db)
sid = _get_session_id()
now_sql = "CURRENT_TIMESTAMP"
upd = db.execute(
text(f"UPDATE sessao_web SET last_seen = {now_sql}, ativo = 1 WHERE session_id = :sid"),
{"sid": sid}
)
if upd.rowcount == 0:
db.execute(
text(f"INSERT INTO sessao_web (usuario, session_id, last_seen, ativo) "
f"VALUES (:usuario, :sid, {now_sql}, 1)"),
{"usuario": usuario, "sid": sid}
)
dialect = db.bind.dialect.name
if dialect in ("postgresql", "postgres"):
cleanup_sql = f"UPDATE sessao_web SET ativo = FALSE WHERE last_seen < (NOW() - INTERVAL '{_SESS_TTL_MIN * 2} minutes')"
elif dialect == "sqlite":
cleanup_sql = f"UPDATE sessao_web SET ativo = 0 WHERE last_seen < datetime(CURRENT_TIMESTAMP, '-{_SESS_TTL_MIN * 2} minutes')"
else:
cleanup_sql = f"UPDATE sessao_web SET ativo = 0 WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL {_SESS_TTL_MIN * 2} MINUTE)"
db.execute(text(cleanup_sql))
db.commit()
except Exception:
db.rollback()
finally:
try: db.close()
except Exception: pass
def _get_active_users_count() -> int:
db = _get_db_session()
try:
_ensure_sessao_table(db)
dialect = db.bind.dialect.name
if dialect in ("postgresql", "postgres"):
threshold = f"(NOW() - INTERVAL '{_SESS_TTL_MIN} minutes')"
elif dialect == "sqlite":
threshold = f"datetime(CURRENT_TIMESTAMP, '-{_SESS_TTL_MIN} minutes')"
else:
threshold = f"DATE_SUB(CURRENT_TIMESTAMP, INTERVAL {_SESS_TTL_MIN} MINUTE)"
res = db.execute(
text(f"SELECT COUNT(DISTINCT usuario) AS c FROM sessao_web WHERE ativo = 1 AND last_seen >= {threshold}")
).fetchone()
return int(res[0] if res and res[0] is not None else 0)
except Exception:
return 0
finally:
try: db.close()
except Exception: pass
def _mark_session_inactive() -> None:
sid = st.session_state.get("_sid")
if not sid:
return
db = _get_db_session()
try:
_ensure_sessao_table(db)
db.execute(text("UPDATE sessao_web SET ativo = 0 WHERE session_id = :sid"), {"sid": sid})
db.commit()
except Exception:
db.rollback()
finally:
try: db.close()
except Exception: pass
# --------------------------------------------------------------------------------------
# Aviso Global (banner superior)
# --------------------------------------------------------------------------------------
def _sanitize_largura(largura_raw: str) -> str:
val = (largura_raw or "").strip()
if not val:
return "100%"
if val.endswith("%") or val.endswith("px"):
return val
if val.isdigit():
return f"{val}px"
return "100%"
def obter_aviso_ativo(db):
try:
aviso = (
db.query(AvisoGlobal)
.filter(AvisoGlobal.ativo == True)
.order_by(AvisoGlobal.updated_at.desc(), AvisoGlobal.created_at.desc())
.first()
)
return aviso
except Exception:
return None
def _render_aviso_global_topbar():
try:
db = _get_db_session()
except Exception as e:
st.sidebar.warning(f"Aviso desativado: sessão indisponível ({e})")
return
aviso = None
try:
aviso = obter_aviso_ativo(db)
except Exception as e:
st.sidebar.warning(f"Aviso desativado: falha ao consultar ({e})")
aviso = None
finally:
try: db.close()
except Exception: pass
if not aviso:
return
try:
largura = _sanitize_largura(getattr(aviso, "largura", "100%"))
bg = getattr(aviso, "bg_color", "#FFF3CD") or "#FFF3CD"
fg = getattr(aviso, "text_color", "#664D03") or "#664D03"
efeito = (getattr(aviso, "efeito", "marquee") or "marquee").lower()
velocidade = int(getattr(aviso, "velocidade", 20) or 20)
try:
font_size = max(10, min(int(getattr(aviso, "font_size", 14)), 48))
except Exception:
font_size = 14
altura = 52 # px
st.markdown(
f"""
<style>
.stApp::before,
header[data-testid="stHeader"],
[data-testid="stToolbar"],
[data-testid="stDecoration"],
[data-testid="collapsedControl"],
.stApp [class*="stDialog"] {{
z-index: 1 !important;
}}
[data-testid="stAppViewContainer"] {{
padding-top: {altura + 8}px !important;
}}
.ag-topbar-wrap {{
position: fixed;
top: 0;
left: 0;
width: {largura};
z-index: 2147483647 !important;
background: {bg};
color: {fg};
border-bottom: 1px solid rgba(0,0,0,.12);
box-shadow: 0 2px 8px rgba(0,0,0,.15);
border-radius: 0 0 10px 10px;
pointer-events: none;
}}
.ag-topbar-inner {{
display: flex;
align-items: center;
height: {altura}px;
padding: 0 14px;
overflow: hidden;
font-weight: 700;
font-size: {font_size}px;
letter-spacing: .2px;
white-space: nowrap;
}}
.ag-topbar-marquee > span {{
display: inline-block;
padding-left: 100%;
animation: ag-marquee {velocidade}s linear infinite;
}}
@keyframes ag-marquee {{
0% {{ transform: translateX(0); }}
100% {{ transform: translateX(-100%); }}
}}
@media (prefers-reduced-motion: reduce) {{
.ag-topbar-marquee > span {{ animation: none !important; padding-left: 0; }}
}}
@media (max-width: 500px) {{
.ag-topbar-inner {{
font-size: {max(10, font_size-3)}px;
padding: 0 8px;
height: 44px;
}}
[data-testid="stAppViewContainer"] {{ padding-top: 52px !important; }}
}}
</style>
<div class="ag-topbar-wrap">
<div class="ag-topbar-inner {'ag-topbar-marquee' if efeito=='marquee' else ''}">
<span>{getattr(aviso, 'mensagem', '')}</span>
</div>
</div>
""",
unsafe_allow_html=True
)
except Exception as e:
st.sidebar.warning(f"Aviso desativado: erro de render ({e})")
return
# --------------------------------------------------------------------------------------
# Logout
# --------------------------------------------------------------------------------------
def logout():
_mark_session_inactive()
st.session_state.logado = False
st.session_state.usuario = None
st.session_state.perfil = None
st.session_state.nome = None
st.session_state.email = None
st.session_state.quiz_verificado = False
st.rerun()
# --------------------------------------------------------------------------------------
# 🎂 Banner/efeito de aniversário
# --------------------------------------------------------------------------------------
def _show_birthday_banner_if_needed():
if st.session_state.get("__show_birthday__"):
st.session_state["__show_birthday__"] = False
st.markdown(
"""
<style>
.confetti-wrapper { position: relative; width: 100%; height: 0; }
.confetti-area { position: fixed; top: 0; left: 0; width: 100%; height: 100%;
pointer-events: none; z-index: 9999; }
.confetti { position: absolute; top: -5%; font-size: 24px; animation-name: confetti-fall;
animation-timing-function: linear; animation-iteration-count: 1; }
@keyframes confetti-fall {
0% { transform: translateY(-5vh) rotate(0deg); opacity: 1; }
100% { transform: translateY(105vh) rotate(360deg); opacity: 0; }
}
.confetti:nth-child(1) { left: 5%; animation-duration: 3.5s; }
.confetti:nth-child(2) { left: 12%; animation-duration: 4.0s; }
.confetti:nth-child(3) { left: 20%; animation-duration: 3.2s; }
.confetti:nth-child(4) { left: 28%; animation-duration: 4.3s; }
.confetti:nth-child(5) { left: 36%; animation-duration: 3.8s; }
.confetti:nth-child(6) { left: 44%; animation-duration: 4.1s; }
.confetti:nth-child(7) { left: 52%; animation-duration: 3.4s; }
.confetti:nth-child(8) { left: 60%; animation-duration: 4.4s; }
.confetti:nth-child(9) { left: 68%; animation-duration: 3.9s; }
.confetti:nth-child(10) { left: 76%; animation-duration: 4.2s; }
.confetti:nth-child(11) { left: 84%; animation-duration: 3.6s; }
.confetti:nth-child(12) { left: 92%; animation-duration: 4.0s; }
</style>
<div class="confetti-wrapper">
<div class="confetti-area">
<div class="confetti">🎊</div><div class="confetti">🎉</div>
<div class="confetti">🎊</div><div class="confetti">🎉</div>
<div class="confetti">🎊</div><div class="confetti">🎉</div>
<div class="confetti">🎊</div><div class="confetti">🎉</div>
<div class="confetti">🎊</div><div class="confetti">🎉</div>
<div class="confetti">🎊</div><div class="confetti">🎉</div>
</div>
</div>
""",
unsafe_allow_html=True
)
st.balloons()
nome = st.session_state.get("nome") or st.session_state.get("usuario") or "Usuário"
st.markdown(
f"""
<div style="display:flex;justify-content:center;align-items:center;text-align:center;width:100%;margin:40px 0;">
<div style="font-size: 36px; font-weight: 800; color:#A020F0;
background:linear-gradient(90deg,#FFF0F6,#F0E6FF);
padding:20px 30px; border-radius:16px; box-shadow:0 4px 10px rgba(0,0,0,.08);">
🎉 Feliz Aniversário, {nome}! 🎉
</div>
</div>
""",
unsafe_allow_html=True
)
COR_FRASE = "#0d6efd"
st.markdown(
f"""
<div style="display:flex;justify-content:center;align-items:center;text-align:center;width:100%;margin-top:10px;">
<div style="font-size: 20px; font-weight: 500; color:{COR_FRASE}; padding:10px;">
Desejamos a você muitas conquistas e bons embarques ao longo do ano! 💜
</div>
</div>
""",
unsafe_allow_html=True
)
# --------------------------------------------------------------------------------------
# MAIN
# --------------------------------------------------------------------------------------
def main():
# Estados iniciais
st.session_state.setdefault("logado", False)
st.session_state.setdefault("usuario", None)
st.session_state.setdefault("quiz_verificado", False)
st.session_state.setdefault("user_responses_viewed", False)
st.session_state.setdefault("nav_target", None)
st.session_state.setdefault("__auto_refresh_interval_sec__", 60)
# LOGIN
if not st.session_state.logado:
st.session_state.quiz_verificado = False
exibir_logo_safe(top=True, sidebar=False)
login_safe()
return
# Heartbeat + badge admin
_session_heartbeat(st.session_state.usuario)
if (st.session_state.get("perfil") or "").strip().lower() == "admin":
try:
online_now = _get_active_users_count()
except Exception:
online_now = 0
st.sidebar.markdown(
f"""
<div style="padding:8px 10px;margin-top:6px;margin-bottom:6px;border-radius:8px;
background:#1e293b; color:#e2e8f0; border:1px solid #334155;">
<span style="font-size:13px;">🟢 Online (últimos {_SESS_TTL_MIN} min)</span><br>
<span style="font-size:22px;font-weight:800;">{online_now}</span>
</div>
""",
unsafe_allow_html=True
)
# 🔄 Recarregar + Auto-refresh
st.sidebar.markdown("---")
col_reload, col_interval = st.sidebar.columns([1, 1])
if col_reload.button("🔄 Recarregar (sem sair)", key="__btn_reload_now__"):
st.rerun()
if hasattr(st, "popover"):
with col_interval.popover("⏱️ Autoatualização"):
new_val = st.number_input(
"Intervalo (segundos) — 0 desativa",
min_value=0, max_value=3600,
value=int(st.session_state["__auto_refresh_interval_sec__"]),
step=5, key="__auto_refresh_input__"
)
if st.button("Aplicar intervalo", key="__btn_apply_auto_refresh__"):
st.session_state["__auto_refresh_interval_sec__"] = int(new_val)
try:
if int(new_val) > 0:
st.toast(f"Autoatualização ajustada para {int(new_val)}s.", icon="⏱️")
else:
st.toast("Autoatualização desativada.", icon="⛔")
except Exception:
pass
st.rerun()
else:
with st.sidebar.expander("⏱️ Autoatualização", expanded=False):
new_val = st.number_input(
"Intervalo (segundos) — 0 desativa",
min_value=0, max_value=3600,
value=int(st.session_state["__auto_refresh_interval_sec__"]),
step=5, key="__auto_refresh_input__"
)
if st.button("Aplicar intervalo", key="__btn_apply_auto_refresh__"):
st.session_state["__auto_refresh_interval_sec__"] = int(new_val)
try:
if int(new_val) > 0:
st.toast(f"Autoatualização ajustada para {int(new_val)}s.", icon="⏱️")
else:
st.toast("Autoatualização desativada.", icon="⛔")
except Exception:
pass
st.rerun()
usuario = st.session_state.usuario
perfil = (st.session_state.get("perfil") or "usuario").strip().lower()
# QUIZ (gating)
if not st.session_state.quiz_verificado:
if not quiz_respondido_hoje(usuario):
exibir_logo_safe(top=True, sidebar=False)
quiz.main()
return
else:
st.session_state.quiz_verificado = True
st.rerun()
# SISTEMA LIBERADO
exibir_logo_safe(top=True, sidebar=True)
_render_aviso_global_topbar()
_show_birthday_banner_if_needed()
st.sidebar.markdown("### Menu | 🎉 Carnaval 🎭 2026!")
# Banco ativo
try:
banco_lbl = bank_label(current_db_choice()) if _HAS_ROUTER else (
"🟢 Produção" if current_db_choice() == "prod" else "🔴 Teste"
)
st.sidebar.caption(f"🗄️ Banco ativo: {banco_lbl}")
except Exception:
pass
# ========================= Menu =========================
termo_busca = st.sidebar.text_input("Pesquisar módulo:").strip().lower()
try:
ambiente_atual = current_db_choice() if _HAS_ROUTER else "prod"
except Exception:
ambiente_atual = "prod"
grupos_disponiveis = obter_grupos_disponiveis(
MODULES,
perfil=st.session_state.get("perfil", "usuario"),
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual,
verificar_permissao=verificar_permissao
)
if not grupos_disponiveis:
st.sidebar.selectbox("Selecione a operação:", ["Em desenvolvimento"], index=0)
st.warning("Nenhuma operação disponível para seu perfil/ambiente neste momento.")
return
grupo_escolhido = st.sidebar.selectbox("Selecione a operação:", grupos_disponiveis)
opcoes = obter_modulos_para_grupo(
MODULES, grupo_escolhido, termo_busca,
perfil=st.session_state.get("perfil", "usuario"),
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual,
verificar_permissao=verificar_permissao
)
with st.sidebar.expander("🔧 Diagnóstico do menu", expanded=False):
st.caption(
f"Perfil: **{st.session_state.get('perfil', '—')}** | "
f"Grupo: **{grupo_escolhido}** | "
f"Busca: **{termo_busca or '∅'}** | "
f"Ambiente: **{ambiente_atual}**"
)
try:
mods_dbg = [{"id": mid, "label": lbl} for mid, lbl in (opcoes or [])]
except Exception:
mods_dbg = []
st.write("Módulos visíveis (após regras):", mods_dbg if mods_dbg else "—")
# Failsafe: Outlook
try:
mod_outlook = MODULES.get("outlook_relatorio")
if mod_outlook:
mesmo_grupo = (mod_outlook.get("grupo") == grupo_escolhido)
perfil_ok = verificar_permissao(
perfil=st.session_state.get("perfil", "usuario"),
modulo_key="outlook_relatorio",
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual
)
ja_nas_opcoes = any(mid == "outlook_relatorio" for mid, _ in (opcoes or []))
passa_busca = (not termo_busca) or (termo_busca in mod_outlook.get("label", "").strip().lower())
if mesmo_grupo and perfil_ok and not ja_nas_opcoes and passa_busca:
opcoes = (opcoes or []) + [("outlook_relatorio", mod_outlook.get("label", "Relatorio portaria"))]
except Exception:
pass
# Failsafe: Repositório Load
try:
mod_repo = MODULES.get("repositorio_load")
if mod_repo:
mesmo_grupo_r = (mod_repo.get("grupo") == grupo_escolhido)
perfil_ok_r = verificar_permissao(
perfil=st.session_state.get("perfil", "usuario"),
modulo_key="repositorio_load",
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual
)
ja_nas_opcoes_r = any(mid == "repositorio_load" for mid, _ in (opcoes or []))
passa_busca_r = (not termo_busca) or (termo_busca in mod_repo.get("label", "").strip().lower())
if mesmo_grupo_r and perfil_ok_r and not ja_nas_opcoes_r and passa_busca_r:
opcoes = (opcoes or []) + [("repositorio_load", mod_repo.get("label", "Repositório Load"))]
except Exception:
pass
if not opcoes:
st.sidebar.selectbox("Selecione o módulo:", ["Em desenvolvimento"], index=0)
st.warning(f"A operação '{grupo_escolhido}' está em desenvolvimento.")
return
# ------------------------- seleção -------------------------
labels = [label for _, label in opcoes]
if st.session_state.get("nav_target"):
target = st.session_state["nav_target"]
try:
target_label = next(lbl for mid, lbl in opcoes if mid == target)
st.session_state["mod_select_label"] = target_label
except StopIteration:
pass
if "mod_select_label" not in st.session_state or st.session_state["mod_select_label"] not in labels:
st.session_state["mod_select_label"] = labels[0]
escolha_label = st.sidebar.selectbox(
"Selecione o módulo:",
labels,
index=labels.index(st.session_state["mod_select_label"]),
key="mod_select_label"
)
pagina_id = next(mod_id for mod_id, label in opcoes if label == escolha_label)
if st.session_state.get("nav_target"):
pagina_id = st.session_state.nav_target
st.session_state["__nav_lock__"] = True
else:
st.session_state["__nav_lock__"] = False
_check_rerun_qs(pagina_atual=pagina_id)
# Auto-refresh leve do sidebar — não em módulos sensíveis
try:
from streamlit_autorefresh import st_autorefresh
sensiveis = {"resposta", "outlook_relatorio", "formulario", "recebimento"}
interval_sec = int(st.session_state.get("__auto_refresh_interval_sec__", 60))
if (interval_sec > 0) and (pagina_id not in sensiveis):
st_autorefresh(interval=interval_sec * 1000, key=f"sidebar_autorefresh_{interval_sec}s")
except Exception:
pass
# Logout
st.sidebar.markdown("---")
if st.session_state.get("logado") and st.sidebar.button("🚪 Sair (Logout)"):
logout()
st.divider()
# ------------------------- Roteamento -------------------------
if pagina_id == "formulario":
formulario.main()
elif pagina_id == "consulta":
consulta.main()
elif pagina_id == "relatorio":
relatorio.main()
elif pagina_id == "ranking":
ranking.main()
elif pagina_id == "quiz":
quiz.main()
ranking.main()
elif pagina_id == "quiz_admin":
quiz_admin.main()
elif pagina_id == "usuarios":
usuarios_admin.main()
elif pagina_id == "administracao":
administracao.main()
elif pagina_id == "videos":
videos.main()
elif pagina_id == "auditoria":
auditoria.main()
elif pagina_id == "auditoria_cleanup":
auditoria_cleanup.main()
elif pagina_id == "importacao":
importar_excel.main()
elif pagina_id == "calendario":
calendario.main()
elif pagina_id == "jogos":
st.session_state.setdefault("pontuacao", 0)
st.session_state.setdefault("rodadas", 0)
st.session_state.setdefault("ultimo_resultado", None)
jogos.main()
elif pagina_id == "temporario":
db_tools.main()
elif pagina_id == "db_admin":
db_admin.main()
elif pagina_id == "db_monitor":
db_monitor.main()
elif pagina_id == "operacao":
operacao.main()
elif pagina_id == "resposta":
resposta.main()
elif pagina_id == "db_export_import":
db_export_import.main()
elif pagina_id == "produtividade_especialista":
produtividade_especialista.main()
elif pagina_id == "outlook_relatorio":
outlook_relatorio.main()
elif pagina_id == "sugestoes_ioirun":
if st.session_state.get("perfil") == "admin":
st.info("Use a **📬 Caixa de Entrada (Admin)** para responder sugestões.")
else:
sugestoes_usuario.main()
elif pagina_id == "repositorio_load":
repositorio_load.main()
elif pagina_id == "rnc":
rnc.pagina()
elif pagina_id == "rnc_listagem":
rnc_listagem.pagina()
elif pagina_id == "rnc_relatorio":
rnc_relatorio.pagina()
elif pagina_id == "repo_rnc":
repo_rnc.pagina()
elif pagina_id == "recebimento":
recebimento.main()
# ------------------------- Info no sidebar -------------------------
info_mod_default = INFO_MAP_PAGINA_ID.get(pagina_id, "Geral")
with st.sidebar.expander("ℹ️ Info • Como usar o sistema", expanded=False):
st.markdown("""
**Bem-vindo!**
Este painel reúne instruções rápidas para utilizar cada módulo e seus campos.
Selecione o módulo abaixo ou navegue pelo menu — o conteúdo ajusta automaticamente.
""")
try:
sel_index = INFO_MODULOS.index(info_mod_default) if info_mod_default in INFO_MODULOS else 0
except Exception:
sel_index = 0
mod_info_sel = st.selectbox("Escolha o módulo para ver instruções:", INFO_MODULOS, index=sel_index, key="info_mod_sel")
st.markdown(INFO_CONTEUDO.get(mod_info_sel, "_Conteúdo não disponível para este módulo._"))
# Libera nav_target após primeira render da página destino
if st.session_state.get("__nav_lock__"):
st.session_state["nav_target"] = None
st.session_state["__nav_lock__"] = False
# --------------------------------------------------------------------------------------
if __name__ == "__main__":
main()
# ------------------------- Rodapé do sidebar (limpo) -------------------------
if st.session_state.get("logado") and st.session_state.get("email"):
st.sidebar.markdown(f"**👤 {st.session_state.email}**")
st.sidebar.divider()
st.sidebar.markdown("Versão: **1.0.0** • Desenvolvedor: **Rodrigo Silva - Ideiasystem | 2026**")