# -*- coding: utf-8 -*-
import os
import io
import streamlit as st
from dotenv import load_dotenv
from datetime import date, datetime, time
# ⬇️ Import correto das utils de operação
from utils_operacao import obter_grupos_disponiveis, obter_modulos_para_grupo
# ✅ Usa toda a largura da página (chamar antes de qualquer outro st.*)
st.set_page_config(layout="wide")
# Carrega variáveis de ambiente
load_dotenv()
# ===============================
# IMPORTAÇÃO DOS MÓDULOS
# ===============================
import formulario
import consulta
import relatorio
import administracao
import quiz
import ranking
import quiz_admin
import usuarios_admin
import videos
import auditoria
import importar_excel
import calendario
import auditoria_cleanup
import jogos
import db_tools
import db_admin
import db_monitor
import operacao
import db_export_import
import resposta # 📬 Admin: Caixa de Entrada IOI‑RUN (módulo interno)
import outlook_relatorio
import repositorio_load
import Produtividade_Especialista as produtividade_especialista
import rnc
import rnc_listagem
import rnc_relatorio
import sugestoes_usuario # 💡 Usuário: Sugestões IOI‑RUN (módulo separado)
import repo_rnc
import recebimento
# (Opcional) Calendário mensal — só se existir no projeto
try:
import calendario_mensal
_HAS_CAL_MENSAL = True
except Exception:
_HAS_CAL_MENSAL = False
from utils_info import INFO_CONTEUDO, INFO_MODULOS, INFO_MAP_PAGINA_ID
from login import login
from utils_permissoes import verificar_permissao
from utils_layout import exibir_logo
from modules_map import MODULES
from banco import engine, Base, SessionLocal, db_info
from models import QuizPontuacao
from models import IOIRunSugestao
from models import AvisoGlobal
# Extras p/ sessões ativas
from uuid import uuid4
from sqlalchemy import text, func, or_
# 🗄️ Banco ativo (Produção/Teste/Treinamento)
# Tentativa de importar utilitários do db_router, com fallback seguro.
try:
from db_router import current_db_choice, bank_label
_HAS_ROUTER = True
except Exception:
_HAS_ROUTER = False
def current_db_choice() -> str:
return st.session_state.get("__db_choice_override__", "prod")
def bank_label(choice: str) -> str:
return {
"prod": "🟢 Produção",
"test": "🔴 Teste",
"treinamento": "🟡 Treinamento",
"train": "🟡 Treinamento",
}.get(choice, "🟢 Produção")
# 🔁 Tentativas de funções adicionais do router (se existirem)
_get_available_choices = None
_set_db_choice_func = None
if _HAS_ROUTER:
try:
from db_router import get_available_choices as _get_available_choices
except Exception:
_get_available_choices = None
# tenta 2 nomes comuns de setter
try:
from db_router import set_current_db_choice as _set_db_choice_func # tipo preferido
except Exception:
try:
from db_router import set_db_choice as _set_db_choice_func # alias
except Exception:
_set_db_choice_func = None
# =========================================
# 🔧 STARTUP HOOKS (1x por container / process)
# =========================================
# - CLEAR_CACHE_ON_START=1 → limpa caches do Streamlit
# - INIT_DB_ON_START=1 → tenta rodar init_db.run() e grava ~/.ioirun/.init_done
_MARKER_DIR = os.path.join(os.path.expanduser("~"), ".ioirun")
os.makedirs(_MARKER_DIR, exist_ok=True)
_INIT_MARK = os.path.join(_MARKER_DIR, ".init_done")
# Limpa cache no start (não depende de marker; roda a cada start quando flag=1)
if os.getenv("CLEAR_CACHE_ON_START", "0") == "1":
try:
st.cache_data.clear()
except Exception:
pass
try:
st.cache_resource.clear()
except Exception:
pass
# Rodar init_db.run() uma única vez por start de container (⚠️ roda no banco padrão do ambiente)
try:
import init_db as _init_db
_HAS_INIT_DB = hasattr(_init_db, "run")
except Exception:
_HAS_INIT_DB = False
if os.getenv("INIT_DB_ON_START", "0") == "1" and _HAS_INIT_DB:
# Só roda se marker não existir OU se variável forçada estiver setada
force = os.getenv("FORCE_INIT_DB_EVERY_START", "0") == "1"
should_run = (not os.path.exists(_INIT_MARK)) or force
if should_run:
try:
_init_db.run()
with open(_INIT_MARK, "w", encoding="utf-8") as f:
f.write(f"init at {datetime.now().isoformat()}\n")
except Exception as e:
# no start, se falhar, apenas registra visualmente
try:
st.sidebar.warning(f"INIT_DB_ON_START falhou: {e}")
except Exception:
pass
# ===============================
# RERUN por querystring (atalho ?rr=1) e aplicar ?db=teste|treinamento
# ===============================
def _get_query_params():
"""Compat: retorna query params como dict (Streamlit novo/antigo)."""
try:
return dict(st.query_params) # Streamlit >= 1.32
except Exception:
try:
return dict(st.experimental_get_query_params()) # Antigo
except Exception:
return {}
def _set_query_params(new_params: dict):
"""Compat: define query params (Streamlit novo/antigo)."""
try:
st.query_params = new_params # Streamlit >= 1.32
except Exception:
try:
st.experimental_set_query_params(**new_params)
except Exception:
pass
def _apply_db_choice_from_qs():
"""Permite selecionar o banco via URL (?db=test|treinamento)."""
try:
params = _get_query_params()
db = params.get("db")
if not db:
return
db = db[0] if isinstance(db, (list, tuple)) else db
sel = str(db).strip().lower()
if _set_db_choice_func:
try:
_set_db_choice_func(sel)
except Exception:
os.environ["DB_CHOICE"] = sel
st.session_state["__db_choice_override__"] = sel
else:
os.environ["DB_CHOICE"] = sel
st.session_state["__db_choice_override__"] = sel
except Exception:
pass
# 🔒 ANTI-TREMOR PATCH: não consumir rr=1 quando login/quiz ainda não concluídos
def _check_rerun_qs(pagina_atual: str = ""):
"""
Se a URL contiver rr=1 (ou true), força um rerun e limpa o parâmetro para evitar loop.
✅ Bloqueios para módulos sensíveis e login/quiz.
"""
try:
if st.session_state.get("__qs_rr_consumed__", False):
return
# Evita rr=1 quando a tela está em login/quiz (logo mais evidente)
if (not st.session_state.get("logado")) or (not st.session_state.get("quiz_verificado")):
return
# Evita rr=1 em módulos sensíveis a rerun/refresh
if pagina_atual in ("resposta", "outlook_relatorio", "formulario"):
return
params = _get_query_params()
rr_raw = params.get("rr", ["0"])
rr = rr_raw[0] if isinstance(rr_raw, (list, tuple)) else str(rr_raw)
if str(rr).lower() in ("1", "true"):
new_params = {k: v for k, v in params.items() if k != "rr"}
_set_query_params(new_params)
st.session_state["__qs_rr_consumed__"] = True
st.rerun()
except Exception:
pass
# =========================================
# DB helper — sessão ciente do ambiente
# =========================================
def _get_db_session():
"""Retorna uma sessão de banco consistente com o ambiente atual."""
# 1) Se o router expuser get_session_for_current_db(), use
try:
from db_router import get_session_factory
return get_session_factory()()
except Exception:
pass
# 2) Se houver engine por ambiente
try:
from db_router import get_engine
from sqlalchemy.orm import sessionmaker
Eng = get_engine()
return sessionmaker(bind=Eng)()
except Exception:
pass
# 3) Fallback
return SessionLocal()
def quiz_respondido_hoje(usuario: str) -> bool:
# ✅ Usar sessão ciente do ambiente
db = _get_db_session()
try:
inicio_dia = datetime.combine(date.today(), time.min)
return (
db.query(QuizPontuacao)
.filter(
QuizPontuacao.usuario == usuario,
QuizPontuacao.data >= inicio_dia
)
.first()
is not None
)
finally:
try:
db.close()
except Exception:
pass
# ===============================
# Sessões ativas (usuários logados agora)
# ===============================
_SESS_TTL_MIN = 5 # janela para considerar "online"
def _get_session_id() -> str:
if "_sid" not in st.session_state:
st.session_state["_sid"] = f"{uuid4()}"
return st.session_state["_sid"]
def _ensure_sessao_table(db) -> None:
"""Cria a tabela sessao_web caso não exista (SQLite/Postgres/MySQL)."""
dialect = db.bind.dialect.name
if dialect == "sqlite":
db.execute(text("""
CREATE TABLE IF NOT EXISTS sessao_web (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario TEXT NOT NULL,
session_id TEXT NOT NULL UNIQUE,
last_seen TIMESTAMP NOT NULL,
ativo INTEGER NOT NULL DEFAULT 1
)
"""))
elif dialect in ("postgresql", "postgres"):
db.execute(text("""
CREATE TABLE IF NOT EXISTS sessao_web (
id SERIAL PRIMARY KEY,
usuario TEXT NOT NULL,
session_id TEXT NOT NULL UNIQUE,
last_seen TIMESTAMPTZ NOT NULL,
ativo BOOLEAN NOT NULL DEFAULT TRUE
)
"""))
else: # mysql / mariadb
db.execute(text("""
CREATE TABLE IF NOT EXISTS sessao_web (
id INT AUTO_INCREMENT PRIMARY KEY,
usuario VARCHAR(255) NOT NULL,
session_id VARCHAR(255) NOT NULL UNIQUE,
last_seen TIMESTAMP NOT NULL,
ativo TINYINT(1) NOT NULL DEFAULT 1
)
"""))
db.commit()
def _session_heartbeat(usuario: str) -> None:
"""Atualiza/insere a sessão ativa do usuário com last_seen = now() e faz limpeza básica."""
if not usuario:
return
db = _get_db_session()
try:
_ensure_sessao_table(db)
sid = _get_session_id()
now_sql = "CURRENT_TIMESTAMP"
upd = db.execute(
text(f"UPDATE sessao_web SET last_seen = {now_sql}, ativo = 1 WHERE session_id = :sid"),
{"sid": sid}
)
if upd.rowcount == 0:
db.execute(
text(f"INSERT INTO sessao_web (usuario, session_id, last_seen, ativo) "
f"VALUES (:usuario, :sid, {now_sql}, 1)"),
{"usuario": usuario, "sid": sid}
)
dialect = db.bind.dialect.name
if dialect in ("postgresql", "postgres"):
cleanup_sql = f"UPDATE sessao_web SET ativo = FALSE WHERE last_seen < (NOW() - INTERVAL '{_SESS_TTL_MIN * 2} minutes')"
elif dialect == "sqlite":
cleanup_sql = f"UPDATE sessao_web SET ativo = 0 WHERE last_seen < datetime(CURRENT_TIMESTAMP, '-{_SESS_TTL_MIN * 2} minutes')"
else:
cleanup_sql = f"UPDATE sessao_web SET ativo = 0 WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL {_SESS_TTL_MIN * 2} MINUTE)"
db.execute(text(cleanup_sql))
db.commit()
except Exception:
db.rollback()
finally:
try:
db.close()
except Exception:
pass
def _get_active_users_count() -> int:
"""Conta usuários distintos com last_seen dentro da janela (_SESS_TTL_MIN) e ativo=1."""
db = _get_db_session()
try:
_ensure_sessao_table(db)
dialect = db.bind.dialect.name
if dialect in ("postgresql", "postgres"):
threshold = f"(NOW() - INTERVAL '{_SESS_TTL_MIN} minutes')"
elif dialect == "sqlite":
threshold = f"datetime(CURRENT_TIMESTAMP, '-{_SESS_TTL_MIN} minutes')"
else:
threshold = f"DATE_SUB(CURRENT_TIMESTAMP, INTERVAL {_SESS_TTL_MIN} MINUTE)"
res = db.execute(
text(f"SELECT COUNT(DISTINCT usuario) AS c FROM sessao_web WHERE ativo = 1 AND last_seen >= {threshold}")
).fetchone()
return int(res[0] if res and res[0] is not None else 0)
except Exception:
return 0
finally:
try:
db.close()
except Exception:
pass
def _mark_session_inactive() -> None:
"""Marca a sessão atual como inativa (chamar no logout)."""
sid = st.session_state.get("_sid")
if not sid:
return
db = _get_db_session()
try:
_ensure_sessao_table(db)
db.execute(text("UPDATE sessao_web SET ativo = 0 WHERE session_id = :sid"), {"sid": sid})
db.commit()
except Exception:
db.rollback()
finally:
try:
db.close()
except Exception:
pass
# ===============================
# Aviso Global — Util (leitura e sanitização)
# ===============================
def _sanitize_largura(largura_raw: str) -> str:
val = (largura_raw or "").strip()
if not val:
return "100%"
if val.endswith("%") or val.endswith("px"):
return val
if val.isdigit():
return f"{val}px"
return "100%"
def obter_aviso_ativo(db):
try:
aviso = (
db.query(AvisoGlobal)
.filter(AvisoGlobal.ativo == True)
.order_by(AvisoGlobal.updated_at.desc(), AvisoGlobal.created_at.desc())
.first()
)
return aviso
except Exception:
return None
# ===============================
# Aviso Global — Render do banner superior (robusto)
# ===============================
def _render_aviso_global_topbar():
try:
db = _get_db_session()
except Exception as e:
st.sidebar.warning(f"Aviso desativado: sessão indisponível ({e})")
return
aviso = None
try:
aviso = obter_aviso_ativo(db)
except Exception as e:
st.sidebar.warning(f"Aviso desativado: falha ao consultar ({e})")
aviso = None
finally:
try:
db.close()
except Exception:
pass
if not aviso:
return
try:
largura = _sanitize_largura(aviso.largura)
bg = aviso.bg_color or "#FFF3CD"
fg = aviso.text_color or "#664D03"
efeito = (aviso.efeito or "marquee").lower()
velocidade = int(aviso.velocidade or 20)
try:
font_size = max(10, min(int(getattr(aviso, "font_size", 14)), 48))
except Exception:
font_size = 14
altura = 52 # px
# 🔒 ANTI-TREMOR PATCH: padding-top constante e levemente maior que a topbar
padding_top = max(60, altura + 8)
st.markdown(
f"""
""",
unsafe_allow_html=True
)
except Exception as e:
st.sidebar.warning(f"Aviso desativado: erro de render ({e})")
return
# ===============================
# Logout (utilitário)
# ===============================
def logout():
"""Finaliza a sessão do usuário, limpa estados e recarrega a aplicação."""
_mark_session_inactive() # marca esta sessão como inativa
st.session_state.logado = False
st.session_state.usuario = None
st.session_state.perfil = None
st.session_state.nome = None
st.session_state.email = None
st.session_state.quiz_verificado = False
st.rerun()
# ===============================
# 🎂 Banner/efeito de aniversário
# ===============================
def _show_birthday_banner_if_needed():
if st.session_state.get("__show_birthday__"):
st.session_state["__show_birthday__"] = False
st.markdown(
"""
""",
unsafe_allow_html=True
)
st.balloons()
nome = st.session_state.get("nome") or st.session_state.get("usuario") or "Usuário"
st.markdown(
f"""
🎉 Feliz Aniversário, {nome}! 🎉
""",
unsafe_allow_html=True
)
COR_FRASE = "#0d6efd"
st.markdown(
f"""
Desejamos a você muitas conquistas e bons embarques ao longo do ano! 💜
""",
unsafe_allow_html=True
)
# ===============================
# 🔎 Painel de Diagnóstico de Autenticação (Admin)
# ===============================
def _render_auth_diag_panel():
"""Renderiza informações de diagnóstico de autenticação (apenas Admin)."""
try:
info = db_info()
except Exception:
info = {"url": "(indisponível)", "using_router": _HAS_ROUTER}
st.sidebar.markdown("### 🔎 Diagnóstico de Autenticação")
st.sidebar.caption(f"DISABLE_AUTH = {os.getenv('DISABLE_AUTH')}")
st.sidebar.caption(f"ALLOW_EMERGENCY_LOGIN = {os.getenv('ALLOW_EMERGENCY_LOGIN')}")
st.sidebar.caption(f"EMERG_USER set? = {bool(os.getenv('EMERG_USER'))}")
st.sidebar.caption(f"EMERG_PASS_BCRYPT set? = {bool(os.getenv('EMERG_PASS_BCRYPT'))}")
st.sidebar.caption(f"DEMO_USER = {os.getenv('DEMO_USER') or '∅'}")
st.sidebar.caption(f"DEMO_PERFIL = {os.getenv('DEMO_PERFIL') or '∅'}")
st.sidebar.caption(f"DEMO_EMAIL = {os.getenv('DEMO_EMAIL') or '∅'}")
st.sidebar.caption(f"Router habilitado = {_HAS_ROUTER}")
try:
_b_label = bank_label(current_db_choice()) if _HAS_ROUTER else (
"🟢 Produção" if current_db_choice() == "prod" else "🔴 Teste"
)
st.sidebar.caption(f"Banco ativo (label) = {_b_label}")
except Exception as e:
st.sidebar.caption(f"Banco ativo (erro) = {e}")
st.sidebar.caption(f"DB URL = {info.get('url')}")
st.sidebar.caption(f"SessionState.logado = {st.session_state.get('logado')}")
st.sidebar.caption(f"SessionState.usuario = {st.session_state.get('usuario')}")
st.sidebar.caption(f"SessionState.perfil = {st.session_state.get('perfil')}")
try:
from login import login as _login_test # noqa: F401
st.sidebar.success("login.py importado ✅")
except Exception as e:
st.sidebar.error(f"Falha ao importar login.py: {e}")
# ===============================
# 🔒 ANTI-TREMOR PATCH: wrapper idempotente para a LOGO
# ===============================
def exibir_logo_once(top: bool = False, sidebar: bool = False):
"""
Garante que a logo só seja inserida uma vez por posição (top e/ou sidebar) por sessão.
Se for chamada novamente, não reinjeta o HTML nem causa reflows.
"""
state = st.session_state.setdefault("__logo_once__", {"top": False, "sidebar": False})
need_top = bool(top and not state["top"])
need_sidebar = bool(sidebar and not state["sidebar"])
if need_top or need_sidebar:
try:
exibir_logo(top=need_top, sidebar=need_sidebar)
finally:
if need_top:
state["top"] = True
if need_sidebar:
state["sidebar"] = True
# ===============================
# 🔧 Escolha do Banco (UI + setters)
# ===============================
def _db_choice_ui():
"""
Renderiza na sidebar a seleção de banco de dados (Produção/Teste/Treinamento).
Usa db_router quando disponível; caso não, mantém fallback visual.
"""
with st.sidebar.expander("🗄️ Banco de dados", expanded=True):
# Obtém lista de choices
if _get_available_choices:
try:
avail = list(_get_available_choices()) # ex.: ["prod","test","treinamento"]
if not avail:
avail = ["prod", "test", "treinamento"]
except Exception:
avail = ["prod", "test", "treinamento"]
else:
avail = ["prod", "test", "treinamento"]
# Label amigável para cada opção
def _lbl(c):
try:
return bank_label(c)
except Exception:
return {"prod": "🟢 Produção", "test": "🔴 Teste", "treinamento": "🟡 Treinamento"}.get(c, c)
current = current_db_choice() if callable(current_db_choice) else st.session_state.get("__db_choice_override__", "prod")
labels = [_lbl(c) for c in avail]
# Mapeia índice atual
try:
idx = avail.index(current)
except Exception:
idx = 0
sel_label = st.selectbox(
"Escolha o banco:",
options=labels,
index=idx,
key="__db_choice_select__",
)
# Resolve escolha → código
sel_idx = labels.index(sel_label)
sel_code = avail[sel_idx]
# Aplica se mudou
if sel_code != current:
# Tenta aplicar no router (se houver)
applied = False
if _set_db_choice_func:
try:
_set_db_choice_func(sel_code)
applied = True
except Exception:
applied = False
# Fallback: guarda override local e exporta env var (se algum loader usar)
if not applied:
st.session_state["__db_choice_override__"] = sel_code
os.environ["DB_CHOICE"] = sel_code # se o router/engine ler do env
st.toast(f"Banco alterado para: {_lbl(sel_code)}", icon="🗄️")
st.rerun()
# 🔎 DEBUG da escolha/URL (ajuda a diagnosticar se o router está aplicando)
try:
from db_router import current_db_choice as _cur, bank_label as _lbl2, get_engine as _eng
ch = _cur()
eng = _eng()
st.sidebar.caption(f"⚙️ DEBUG • Banco atual: {_lbl2(ch)} ({ch})")
st.sidebar.caption(f"⚙️ DEBUG • URL: {eng.url}")
except Exception as e:
st.sidebar.caption(f"⚙️ DEBUG router fail: {e}")
# ===============================
# 🔴 Zerar banco (somente Admin, e apenas test/treinamento)
# ===============================
def _wipe_current_database_safely():
"""
Zera totalmente o banco ATUAL selecionado no router (apenas test/treinamento).
- SQLite: remove o arquivo físico e recria o schema.
- Postgres/MySQL: drop_all + recreate.
"""
try:
from db_router import current_db_choice
except Exception:
st.error("Router indisponível. Não é possível determinar o banco atual.")
return
from sqlalchemy.engine.url import make_url
from banco import get_engine, init_schema, Base
choice = (current_db_choice() or "").strip().lower()
if choice not in ("test", "treinamento", "train"):
st.error("Operação bloqueada: só é permitido zerar **Teste** ou **Treinamento**.")
return
eng = get_engine()
url = str(eng.url)
url_obj = make_url(url)
try:
if url_obj.drivername.startswith("sqlite"):
# Caminho do arquivo
path = url.replace("sqlite:///", "", 1)
if path.startswith("//"):
path = path[1:]
file_path = os.path.abspath(path)
# Fecha conexões e apaga arquivo
try:
eng.dispose()
except Exception:
pass
try:
if os.path.exists(file_path):
os.remove(file_path)
else:
# Pode estar no ~/.ioirun/
home_alt = os.path.join(os.path.expanduser("~"), ".ioirun", os.path.basename(file_path))
if os.path.exists(home_alt):
os.remove(home_alt)
except Exception as e_rm:
st.error(f"Falha ao remover arquivo SQLite: {e_rm}")
return
# Recria o schema do zero no arquivo recém criado
try:
init_schema()
except Exception as e_schema:
st.error(f"Falha ao recriar schema após remoção do arquivo: {e_schema}")
return
else:
# Postgres/MySQL: drop_all + create_all
try:
Base.metadata.drop_all(bind=eng)
except Exception as e_drop:
st.error(f"Falha no drop de tabelas: {e_drop}")
return
try:
init_schema()
except Exception as e_create:
st.error(f"Falha ao recriar schema: {e_create}")
return
st.success("✅ Banco zerado e schema recriado com sucesso.")
try:
st.toast("Banco zerado com sucesso.", icon="🗑️")
except Exception:
pass
# Rerun para refletir o novo estado
st.rerun()
except Exception as e:
st.error(f"Erro ao zerar o banco: {e}")
# ===============================
# MAIN
# ===============================
def main():
# Aplicar escolha via URL (ex.: ?db=test)
_apply_db_choice_from_qs()
# Estados iniciais
if "logado" not in st.session_state:
st.session_state.logado = False
if "usuario" not in st.session_state:
st.session_state.usuario = None
if "quiz_verificado" not in st.session_state:
st.session_state.quiz_verificado = False
if "user_responses_viewed" not in st.session_state:
st.session_state.user_responses_viewed = False
if "nav_target" not in st.session_state:
st.session_state.nav_target = None
st.session_state.setdefault("__auto_refresh_interval_sec__", 60)
st.session_state.setdefault("__auth_diag__", False) # 🔧 estado do painel de diagnóstico (Admin)
# ===== Seleção de banco (sempre visível na sidebar) =====
_db_choice_ui()
# 🔧 Manutenção do banco atual: Schema + init_db.run() + Zerar banco (Admin)
try:
from banco import init_schema
with st.sidebar.expander("⚙️ Manutenção do banco atual", expanded=False):
if st.button("Criar/atualizar schema neste banco", key="__btn_init_schema__", type="secondary"):
try:
init_schema()
st.sidebar.success("Schema criado/atualizado no banco selecionado com sucesso.")
except Exception as e:
st.sidebar.error(f"Falha ao criar/atualizar schema: {e}")
# Botão para rodar init_db.run() no banco atual
if _HAS_INIT_DB:
c1, c2 = st.columns(2)
sure1 = c1.checkbox("Confirmo", key="__init_confirm1__")
sure2 = c2.checkbox("Estou ciente", key="__init_confirm2__")
if st.button("Rodar init_db.run()", key="__btn_run_initdb__", help="Cria usuários padrão e garante schema", type="secondary"):
if not (sure1 and sure2):
st.warning("Marque as duas confirmações para executar.")
else:
try:
_init_db.run()
st.success("init_db.run() executado com sucesso no banco selecionado.")
except Exception as e:
st.error(f"Falha ao rodar init_db.run(): {e}")
# 🔴 Zerar banco (somente Admin, e nunca em Produção)
try:
perfil_atual = (st.session_state.get("perfil") or "").strip().lower()
ch = current_db_choice() if _HAS_ROUTER else "prod"
except Exception:
perfil_atual = (st.session_state.get("perfil") or "").strip().lower()
ch = "prod"
if perfil_atual == "admin":
st.markdown("---")
st.markdown("### 🧨 Zerar banco (somente Admin)")
# Exibir apenas se não for produção
if ch in ("test", "treinamento", "train"):
st.info("Esta ação **apaga todo o conteúdo** do banco **selecionado** (Test/Treinamento) e recria o schema. **Irreversível.**")
colz1, colz2 = st.columns(2)
z_conf1 = colz1.checkbox("Confirmo que entendo os riscos", key="__wipe_c1__")
z_conf2 = colz2.checkbox("Estou ciente que é irreversível", key="__wipe_c2__")
safe_phrase = st.text_input("Digite **ZERAR** para habilitar", key="__wipe_phrase__")
# Seleção explícita do banco-alvo (precisa coincidir com o atual)
alvo = st.selectbox(
"Banco a zerar (deve ser o banco atual):",
["test", "treinamento"],
index=(0 if ch == "test" else 1 if ch in ("treinamento", "train") else 0),
key="__wipe_target__"
)
btn_wipe = st.button("🗑️ ZERAR banco **ATUAL**", key="__btn_wipe_db__", type="secondary")
if btn_wipe:
if alvo not in ("test", "treinamento"):
st.error("Selecione um alvo válido (test/treinamento).")
elif alvo != ("treinamento" if ch in ("treinamento", "train") else ch):
st.error("O alvo selecionado **não coincide** com o banco atual. Ajuste a seleção.")
elif not (z_conf1 and z_conf2):
st.error("Marque as duas confirmações para continuar.")
elif safe_phrase.strip().upper() != "ZERAR":
st.error("Frase de segurança incorreta. Digite exatamente **ZERAR**.")
else:
_wipe_current_database_safely()
else:
st.warning("Zerar banco indisponível: **Produção** está selecionado. Troque para **Teste** ou **Treinamento** para habilitar.")
except Exception:
pass
# ⚙️ Criação automática do schema (leve) após seleção do banco (rodará no banco correto)
try:
Base.metadata.create_all(bind=engine)
except Exception as e:
st.sidebar.warning(f"Schema não foi criado automaticamente: {e}")
# LOGIN
if not st.session_state.logado:
st.session_state.quiz_verificado = False
# 🔒 ANTI-TREMOR: renderiza logo top apenas 1x
exibir_logo_once(top=True, sidebar=False)
login()
# 🔕 REMOVIDO: mensagem de dica de autenticação no Spaces
return
# 👥 Heartbeat + Badge de usuários logados (APENAS ADMIN)
_session_heartbeat(st.session_state.usuario)
if (st.session_state.get("perfil") or "").strip().lower() == "admin":
try:
online_now = _get_active_users_count()
except Exception:
online_now = 0
st.sidebar.markdown(
f"""
🟢 Online (últimos {_SESS_TTL_MIN} min)
{online_now}
""",
unsafe_allow_html=True
)
# 🔘 Botão Admin: habilitar/desabilitar painel de diagnóstico
st.sidebar.markdown("---")
if st.session_state.get("__auth_diag__"):
if st.sidebar.button("🧪 Desativar diagnóstico de login (Admin)", key="__btn_auth_diag_off__", type="secondary"):
st.session_state["__auth_diag__"] = False
st.rerun()
else:
if st.sidebar.button("🧪 Ativar diagnóstico de login (Admin)", key="__btn_auth_diag_on__", type="secondary"):
st.session_state["__auth_diag__"] = True
st.rerun()
# Render do painel (se ativo)
if st.session_state.get("__auth_diag__"):
_render_auth_diag_panel()
# ============================
# 🔬 Diagnóstico profundo do banco (somente Admin + painel ativo)
# ============================
from sqlalchemy import inspect # import local para evitar custo desnecessário
with st.sidebar.expander("🧪 Diagnóstico Profundo do Banco", expanded=False):
dbx = _get_db_session()
try:
eng = dbx.bind
st.caption(f"Engine URL: {eng.url}")
# 1) SELECT 1
try:
dbx.execute(text("SELECT 1"))
st.success("SELECT 1 OK")
except Exception as e:
st.error(f"SELECT 1 falhou: {e}")
# 2) Tabelas registradas
insp = inspect(eng)
tables = insp.get_table_names()
st.write("Tabelas:", tables)
# 3) Detectar tabela de usuários
user_tables_candidates = ["usuarios", "usuario", "Usuario", "users", "auth_user"]
target_table = next((t for t in user_tables_candidates if t in tables), None)
if not target_table:
st.warning("Tabela de usuários não encontrada. Verifique nomes e migrations.")
else:
st.write(f"Usando tabela: **{target_table}**")
# 4) Colunas
cols = [c["name"] for c in insp.get_columns(target_table)]
st.write("Colunas:", cols)
# 5) Contagem
try:
cnt = dbx.execute(text(f"SELECT COUNT(*) FROM {target_table}")).fetchone()[0]
st.write("Quantidade de registros:", cnt)
except Exception as e:
st.error(f"COUNT(*) falhou: {e}")
# 6) Amostra de usuários (sem senhas)
try:
sel_cols = [c for c in ["usuario", "email", "perfil", "nome"] if c in cols]
sel_expr = ", ".join(sel_cols) if sel_cols else "*"
amostra = dbx.execute(text(f"SELECT {sel_expr} FROM {target_table} LIMIT 5"))
rows = [dict(r._mapping) for r in amostra]
st.write("Amostra:", rows)
except Exception as e:
st.error(f"Amostra falhou: {e}")
# 7) Teste de bcrypt (não grava nada)
import bcrypt
st.markdown("---")
st.caption("Teste de verificação bcrypt (não grava nada)")
test_user = st.text_input("Usuário para testar hash", "", key="__bcrypt_user__")
test_pass = st.text_input("Senha para testar hash", "", type="password", key="__bcrypt_pass__")
if st.button("Testar bcrypt com este usuário", key="__btn_bcrypt_test__", type="secondary"):
try:
pass_cols = [c for c in ["senha_hash", "password_hash", "senha", "hash"] if c in cols]
user_cols = [c for c in ["usuario", "username", "login"] if c in cols]
if not pass_cols or not user_cols:
st.error("Não encontrei colunas de senha/usuário na tabela.")
else:
c_pass = pass_cols[0]
c_user = user_cols[0]
res = dbx.execute(
text(f"SELECT {c_user} AS u, {c_pass} AS h FROM {target_table} WHERE {c_user} = :u LIMIT 1"),
{"u": test_user.strip()}
).fetchone()
if not res:
st.error("Usuário não encontrado na tabela.")
else:
senha_hash = res._mapping.get("h")
if not senha_hash:
st.error(f"Coluna '{c_pass}' vazia/nula — login normal não vai entrar.")
else:
try:
ok = bcrypt.checkpw(test_pass.encode("utf-8"), str(senha_hash).encode("utf-8"))
st.success(f"bcrypt.checkpw = {ok}")
if not ok:
st.info("Se a senha no banco não for bcrypt, gere um hash e atualize o registro.")
except Exception as e:
st.error(f"Falha no bcrypt: {e}")
st.info("Um hash válido geralmente começa com '$2a$' ou '$2b$' e tem 60 caracteres.")
except Exception as e:
st.error(f"Erro no teste: {e}")
finally:
try:
dbx.close()
except Exception:
pass
# ============================
# 🧰 Ações de manutenção (somente Admin)
# ============================
with st.sidebar.expander("🧰 Manutenção (Admin)", expanded=False):
col_m1, col_m2 = st.columns(2)
if col_m1.button("🧹 Limpar cache agora", key="__btn_clear_cache__", help="Limpa cache de dados/recursos", type="secondary"):
try:
st.cache_data.clear()
except Exception:
pass
try:
st.cache_resource.clear()
except Exception:
pass
st.sidebar.success("Caches limpos.")
st.rerun()
# Rodar init_db.run() com dupla confirmação (Admin)
if _HAS_INIT_DB:
c1, c2 = st.columns(2)
sure1 = c1.checkbox("Confirmo", key="__btn_run_initdb_admin_c1__")
sure2 = c2.checkbox("Estou ciente", key="__btn_run_initdb_admin_c2__")
if st.button("🧬 Rodar init_db.run()", key="__btn_run_initdb_admin__", type="secondary"):
if not (sure1 and sure2):
st.warning("Marque as duas confirmações para executar.")
else:
try:
_init_db.run()
with open(_INIT_MARK, "w", encoding="utf-8") as f:
f.write(f"init manual at {datetime.now().isoformat()}\n")
st.success("init_db.run() executado com sucesso.")
except Exception as e:
st.error(f"Falha no init_db.run(): {e}")
# 🔄 Botão de Recarregar (mantém a sessão ativa) + ⏱️ Controle do intervalo
st.sidebar.markdown("---")
col_reload, col_interval = st.sidebar.columns([1, 1])
if col_reload.button("🔄 Recarregar (sem sair)", key="__btn_reload_now__", type="secondary"):
st.rerun()
if hasattr(st, "popover"):
with col_interval.popover("⏱️ Autoatualização"):
new_val = st.number_input(
"Intervalo (segundos) — 0 desativa",
min_value=0, max_value=3600,
value=int(st.session_state["__auto_refresh_interval_sec__"]),
step=5, key="__auto_refresh_input__"
)
if st.button("Aplicar intervalo", key="__btn_apply_auto_refresh__", type="secondary"):
st.session_state["__auto_refresh_interval_sec__"] = int(new_val)
try:
if int(new_val) > 0:
st.toast(f"Autoatualização ajustada para {int(new_val)}s.", icon="⏱️")
else:
st.toast("Autoatualização desativada.", icon="⛔")
except Exception:
pass
st.rerun()
else:
with st.sidebar.expander("⏱️ Autoatualização", expanded=False):
new_val = st.number_input(
"Intervalo (segundos) — 0 desativa",
min_value=0, max_value=3600,
value=int(st.session_state["__auto_refresh_interval_sec__"]),
step=5, key="__auto_refresh_input__"
)
if st.button("Aplicar intervalo", key="__btn_apply_auto_refresh__", type="secondary"):
st.session_state["__auto_refresh_interval_sec__"] = int(new_val)
try:
if int(new_val) > 0:
st.toast(f"Autoatualização ajustada para {int(new_val)}s.", icon="⏱️")
else:
st.toast("Autoatualização desativada.", icon="⛔")
except Exception:
pass
st.rerun()
usuario = st.session_state.usuario
perfil = (st.session_state.get("perfil") or "usuario").strip().lower()
# QUIZ
if not st.session_state.quiz_verificado:
if not quiz_respondido_hoje(usuario):
# 🔒 ANTI-TREMOR: render 1x
exibir_logo_once(top=True, sidebar=False)
quiz.main()
return
else:
st.session_state.quiz_verificado = True
st.rerun()
# SISTEMA LIBERADO
# 🔒 ANTI-TREMOR: render 1x (topo + sidebar)
exibir_logo_once(top=True, sidebar=True)
_render_aviso_global_topbar()
_show_birthday_banner_if_needed()
st.sidebar.markdown("### Menu | 🎉 Carnaval 🎭 2026!")
# Banco ativo na sidebar (badge rápido)
try:
_b_label = bank_label(current_db_choice()) if _HAS_ROUTER else bank_label(current_db_choice())
st.sidebar.caption(f"🗄️ Banco ativo: {_b_label}")
except Exception:
pass
# =========================
# Notificações no sidebar
# =========================
# --- Admin: pendentes ---
if perfil == "admin":
try:
db = _get_db_session()
pendentes = db.query(IOIRunSugestao).filter(func.lower(IOIRunSugestao.status) == "pendente").count()
except Exception:
pendentes = 0
finally:
try:
db.close()
except Exception:
pass
if pendentes > 0:
st.sidebar.markdown(
"""
🔔 {pendentes} sugestão(ões) pendente(s)
Acesse a caixa de entrada para responder.
""".format(pendentes=pendentes),
unsafe_allow_html=True
)
# 👉 Direciona para o MESMO módulo do menu (resposta.main())
if st.sidebar.button("📬 Abrir Caixa de Entrada (Admin)", key="__btn_open_inbox__", type="secondary"):
st.session_state.nav_target = "resposta"
st.rerun()
# --- Usuário: respostas novas (após último 'visto') ---
if perfil != "admin":
last_seen_dt = st.session_state.get("__user_last_answer_seen__")
try:
db = _get_db_session()
last_answer_dt_row = (
db.query(IOIRunSugestao.data_resposta)
.filter(
IOIRunSugestao.usuario == usuario,
func.lower(IOIRunSugestao.status) == "respondida",
IOIRunSugestao.data_resposta != None
)
.order_by(IOIRunSugestao.data_resposta.desc())
.first()
)
last_answer_dt = last_answer_dt_row[0] if last_answer_dt_row else None
if last_answer_dt and (not last_seen_dt or last_answer_dt > last_seen_dt):
st.session_state.user_responses_viewed = False
novas_respostas = (
db.query(IOIRunSugestao)
.filter(
IOIRunSugestao.usuario == usuario,
func.lower(IOIRunSugestao.status) == "respondida",
(IOIRunSugestao.data_resposta > last_seen_dt) if last_seen_dt else (IOIRunSugestao.data_resposta != None)
)
.count()
)
except Exception:
novas_respostas = 0
finally:
try:
db.close()
except Exception:
pass
if novas_respostas > 0 and not st.session_state.get("user_responses_viewed", False):
st.sidebar.markdown(
"""
🔔 {resps} resposta(s) nova(s) para suas sugestões
Clique para ver suas respostas.
""".format(resps=novas_respostas),
unsafe_allow_html=True
)
if not st.session_state.get("__user_toast_shown__"):
try:
st.toast("Você tem novas respostas do IOI‑RUN. Clique em '📥 Ver respostas'.", icon="💬")
except Exception:
pass
st.session_state["__user_toast_shown__"] = True
if st.sidebar.button("📥 Ver respostas", key="__btn_view_answers__", type="secondary"):
st.session_state.nav_target = "sugestoes_ioirun"
st.session_state.user_responses_viewed = True
st.rerun()
else:
st.session_state["__user_toast_shown__"] = False
# ------------------------- Menu lateral -------------------------
termo_busca = st.sidebar.text_input("Pesquisar módulo:").strip().lower()
try:
ambiente_atual = current_db_choice() if _HAS_ROUTER else current_db_choice()
except Exception:
ambiente_atual = current_db_choice()
grupos_disponiveis = obter_grupos_disponiveis(
MODULES,
perfil=st.session_state.get("perfil", "usuario"),
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual,
verificar_permissao=verificar_permissao
)
if not grupos_disponiveis:
st.sidebar.selectbox("Selecione a operação:", ["Em desenvolvimento"], index=0)
st.warning("Nenhuma operação disponível para seu perfil/ambiente neste momento.")
return
grupo_escolhido = st.sidebar.selectbox("Selecione a operação:", grupos_disponiveis)
opcoes = obter_modulos_para_grupo(
MODULES, grupo_escolhido, termo_busca,
perfil=st.session_state.get("perfil", "usuario"),
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual,
verificar_permissao=verificar_permissao
)
with st.sidebar.expander("🔧 Diagnóstico do menu", expanded=False):
st.caption(f"Perfil: **{st.session_state.get('perfil', '—')}** | Grupo: **{grupo_escolhido}** | Busca: **{termo_busca or '∅'}** | Ambiente: **{ambiente_atual}**")
try:
mods_dbg = [{"id": mid, "label": lbl} for mid, lbl in (opcoes or [])]
except Exception:
mods_dbg = []
st.write("Módulos visíveis (após regras):", mods_dbg if mods_dbg else "—")
# Failsafe outlook_relatorio
try:
mod_outlook = MODULES.get("outlook_relatorio")
if mod_outlook:
mesmo_grupo = (mod_outlook.get("grupo") == grupo_escolhido)
perfil_ok = verificar_permissao(
perfil=st.session_state.get("perfil", "usuario"),
modulo_key="outlook_relatorio",
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual
)
ja_nas_opcoes = any(mid == "outlook_relatorio" for mid, _ in (opcoes or []))
passa_busca = (not termo_busca) or (termo_busca in mod_outlook.get("label", "").strip().lower())
if mesmo_grupo and perfil_ok and not ja_nas_opcoes and passa_busca:
opcoes = (opcoes or []) + [("outlook_relatorio", mod_outlook.get("label", "Relatório portaria"))]
except Exception:
pass
# Failsafe repositorio_load
try:
mod_repo = MODULES.get("repositorio_load")
if mod_repo:
mesmo_grupo_r = (mod_repo.get("grupo") == grupo_escolhido)
perfil_ok_r = verificar_permissao(
perfil=st.session_state.get("perfil", "usuario"),
modulo_key="repositorio_load",
usuario=st.session_state.get("usuario"),
ambiente=ambiente_atual
)
ja_nas_opcoes_r = any(mid == "repositorio_load" for mid, _ in (opcoes or []))
passa_busca_r = (not termo_busca) or (termo_busca in mod_repo.get("label", "").strip().lower())
if mesmo_grupo_r and perfil_ok_r and not ja_nas_opcoes_r and passa_busca_r:
opcoes = (opcoes or []) + [("repositorio_load", mod_repo.get("label", "Repositório Load"))]
except Exception:
pass
if not opcoes:
st.sidebar.selectbox("Selecione o módulo:", ["Em desenvolvimento"], index=0)
st.warning(f"A operação '{grupo_escolhido}' está em desenvolvimento.")
return
# ============================================================
# 🔒 Fix: selectbox com 'key' + seleção forçada para 'resposta'
# quando vier de nav_target (sidebar) ou quando já estivermos na página.
# ============================================================
labels = [label for _, label in opcoes]
# Se foi solicitado nav_target, injeta a label alvo antes do selectbox
if st.session_state.get("nav_target"):
target = st.session_state["nav_target"]
try:
target_label = next(lbl for mid, lbl in opcoes if mid == target)
st.session_state["mod_select_label"] = target_label
except StopIteration:
pass
# Inicializa/persiste seleção
if "mod_select_label" not in st.session_state or st.session_state["mod_select_label"] not in labels:
st.session_state["mod_select_label"] = labels[0]
escolha_label = st.sidebar.selectbox(
"Selecione o módulo:",
labels,
index=labels.index(st.session_state["mod_select_label"]),
key="mod_select_label"
)
pagina_id = next(mod_id for mod_id, label in opcoes if label == escolha_label)
# ✅ Navegação com lock (evita disputa com outros reruns)
if st.session_state.get("nav_target"):
pagina_id = st.session_state.nav_target
st.session_state["__nav_lock__"] = True
else:
st.session_state["__nav_lock__"] = False
# 🔎 Agora que sabemos a página atual, tratamos rr=1 com segurança
_check_rerun_qs(pagina_atual=pagina_id)
# ⏱️ Auto-refresh leve do sidebar — NÃO quando em Inbox/Admin/Outlook/Formulário/Recebimento
try:
from streamlit_autorefresh import st_autorefresh
is_inbox_admin = (pagina_id == "resposta")
is_outlook_rel = (pagina_id == "outlook_relatorio")
is_formulario = (pagina_id == "formulario")
is_recebimento = (pagina_id == "recebimento")
# 🔒 ANTI-TREMOR: incluir telas com logo topo (login/quiz/primeira tela)
is_login_or_quiz = (not st.session_state.get("logado")) or (not st.session_state.get("quiz_verificado"))
interval_sec = int(st.session_state.get("__auto_refresh_interval_sec__", 60))
if (interval_sec > 0) and not (is_inbox_admin or is_outlook_rel or is_formulario or is_recebimento or is_login_or_quiz):
st_autorefresh(interval=interval_sec * 1000, key=f"sidebar_autorefresh_{interval_sec}s")
except Exception:
pass
# Logout
st.sidebar.markdown("---")
if st.session_state.get("logado"):
if st.sidebar.button("🚪 Sair (Logout)", key="__btn_logout__", type="primary"):
logout()
st.divider()
# ------------------------- Roteamento -------------------------
if pagina_id == "formulario":
formulario.main()
elif pagina_id == "consulta":
consulta.main()
elif pagina_id == "relatorio":
relatorio.main()
elif pagina_id == "ranking":
ranking.main()
elif pagina_id == "quiz":
quiz.main()
ranking.main()
elif pagina_id == "quiz_admin":
quiz_admin.main()
elif pagina_id == "usuarios":
usuarios_admin.main()
elif pagina_id == "administracao":
administracao.main()
elif pagina_id == "videos":
videos.main()
elif pagina_id == "auditoria":
auditoria.main()
elif pagina_id == "auditoria_cleanup":
auditoria_cleanup.main()
elif pagina_id == "importacao":
importar_excel.main()
elif pagina_id == "calendario":
calendario.main()
elif pagina_id == "calendario_mensal" and _HAS_CAL_MENSAL:
calendario_mensal.main()
elif pagina_id == "jogos":
st.session_state.setdefault("pontuacao", 0)
st.session_state.setdefault("rodadas", 0)
st.session_state.setdefault("ultimo_resultado", None)
jogos.main()
elif pagina_id == "temporario":
db_tools.main()
elif pagina_id == "db_admin":
db_admin.main()
elif pagina_id == "db_monitor":
db_monitor.main()
elif pagina_id == "operacao":
operacao.main()
elif pagina_id == "resposta": # 📬 Admin
resposta.main()
elif pagina_id == "db_export_import":
db_export_import.main()
elif pagina_id == "produtividade_especialista":
produtividade_especialista.main()
elif pagina_id == "outlook_relatorio":
outlook_relatorio.main()
elif pagina_id == "sugestoes_ioirun": # 💡 Usuário
if st.session_state.get("perfil") == "admin":
st.info("Use a **📬 Caixa de Entrada (Admin)** para responder sugestões.")
else:
sugestoes_usuario.main()
elif pagina_id == "repositorio_load":
repositorio_load.main()
elif pagina_id == "rnc":
rnc.pagina()
elif pagina_id == "rnc_listagem":
rnc_listagem.pagina()
elif pagina_id == "rnc_relatorio":
rnc_relatorio.pagina()
elif pagina_id == "repo_rnc":
repo_rnc.pagina()
elif pagina_id == "recebimento":
recebimento.main()
# ------------------------------------------------------
# ℹ️ INFO — Guia passo a passo de uso (no sidebar)
# ------------------------------------------------------
info_mod_default = INFO_MAP_PAGINA_ID.get(pagina_id, "Geral")
with st.sidebar.expander("ℹ️ Info • Como usar o sistema", expanded=False):
st.markdown("""
**Bem-vindo!**
Este painel reúne instruções rápidas para utilizar cada módulo e seus campos.
Selecione o módulo abaixo ou navegue pelo menu — o conteúdo ajusta automaticamente.
""")
mod_info_sel = st.selectbox(
"Escolha o módulo para ver instruções:",
INFO_MODULOS,
index=INFO_MODULOS.index(info_mod_default) if info_mod_default in INFO_MODULOS else 0,
key="info_mod_sel"
)
st.markdown(INFO_CONTEUDO.get(mod_info_sel, "_Conteúdo não disponível para este módulo._"))
# ✅ Libera o nav_target após a 1ª render da página de destino
if st.session_state.get("__nav_lock__"):
st.session_state["nav_target"] = None
st.session_state["__nav_lock__"] = False
if __name__ == "__main__":
main()
# -------------------------
# Desenvolvedor e versão
# -------------------------
if st.session_state.get("logado") and st.session_state.get("email"):
st.sidebar.markdown(
f"""
👤
{st.session_state.email}
""",
unsafe_allow_html=True
)
st.sidebar.markdown(
"""
Versão: 1.0.0 • Desenvolvedor: Rodrigo Silva - Ideiasystem | 2026
""",
unsafe_allow_html=True
)