# -*- coding: utf-8 -*- import os import io import streamlit as st from dotenv import load_dotenv from datetime import date, datetime, time # ⬇️ Import correto das utils de operação from utils_operacao import obter_grupos_disponiveis, obter_modulos_para_grupo # ✅ Usa toda a largura da página (chamar antes de qualquer outro st.*) st.set_page_config(layout="wide") # Carrega variáveis de ambiente load_dotenv() # =============================== # IMPORTAÇÃO DOS MÓDULOS # =============================== import formulario import consulta import relatorio import administracao import quiz import ranking import quiz_admin import usuarios_admin import videos import auditoria import importar_excel import calendario import auditoria_cleanup import jogos import db_tools import db_admin import db_monitor import operacao import db_export_import import resposta # 📬 Admin: Caixa de Entrada IOI‑RUN (módulo interno) import outlook_relatorio import repositorio_load import Produtividade_Especialista as produtividade_especialista import rnc import rnc_listagem import rnc_relatorio import sugestoes_usuario # 💡 Usuário: Sugestões IOI‑RUN (módulo separado) import repo_rnc import recebimento # (Opcional) Calendário mensal — só se existir no projeto try: import calendario_mensal _HAS_CAL_MENSAL = True except Exception: _HAS_CAL_MENSAL = False from utils_info import INFO_CONTEUDO, INFO_MODULOS, INFO_MAP_PAGINA_ID from login import login from utils_permissoes import verificar_permissao from utils_layout import exibir_logo from modules_map import MODULES from banco import engine, Base, SessionLocal, db_info from models import QuizPontuacao from models import IOIRunSugestao from models import AvisoGlobal # Extras p/ sessões ativas from uuid import uuid4 from sqlalchemy import text, func, or_ # 🗄️ Banco ativo (Produção/Teste/Treinamento) # Tentativa de importar utilitários do db_router, com fallback seguro. try: from db_router import current_db_choice, bank_label _HAS_ROUTER = True except Exception: _HAS_ROUTER = False def current_db_choice() -> str: return st.session_state.get("__db_choice_override__", "prod") def bank_label(choice: str) -> str: return { "prod": "🟢 Produção", "test": "🔴 Teste", "treinamento": "🟡 Treinamento", "train": "🟡 Treinamento", }.get(choice, "🟢 Produção") # 🔁 Tentativas de funções adicionais do router (se existirem) _get_available_choices = None _set_db_choice_func = None if _HAS_ROUTER: try: from db_router import get_available_choices as _get_available_choices except Exception: _get_available_choices = None # tenta 2 nomes comuns de setter try: from db_router import set_current_db_choice as _set_db_choice_func # tipo preferido except Exception: try: from db_router import set_db_choice as _set_db_choice_func # alias except Exception: _set_db_choice_func = None # ========================================= # 🔧 STARTUP HOOKS (1x por container / process) # ========================================= # - CLEAR_CACHE_ON_START=1 → limpa caches do Streamlit # - INIT_DB_ON_START=1 → tenta rodar init_db.run() e grava ~/.ioirun/.init_done _MARKER_DIR = os.path.join(os.path.expanduser("~"), ".ioirun") os.makedirs(_MARKER_DIR, exist_ok=True) _INIT_MARK = os.path.join(_MARKER_DIR, ".init_done") # Limpa cache no start (não depende de marker; roda a cada start quando flag=1) if os.getenv("CLEAR_CACHE_ON_START", "0") == "1": try: st.cache_data.clear() except Exception: pass try: st.cache_resource.clear() except Exception: pass # Rodar init_db.run() uma única vez por start de container (⚠️ roda no banco padrão do ambiente) try: import init_db as _init_db _HAS_INIT_DB = hasattr(_init_db, "run") except Exception: _HAS_INIT_DB = False if os.getenv("INIT_DB_ON_START", "0") == "1" and _HAS_INIT_DB: # Só roda se marker não existir OU se variável forçada estiver setada force = os.getenv("FORCE_INIT_DB_EVERY_START", "0") == "1" should_run = (not os.path.exists(_INIT_MARK)) or force if should_run: try: _init_db.run() with open(_INIT_MARK, "w", encoding="utf-8") as f: f.write(f"init at {datetime.now().isoformat()}\n") except Exception as e: # no start, se falhar, apenas registra visualmente try: st.sidebar.warning(f"INIT_DB_ON_START falhou: {e}") except Exception: pass # =============================== # RERUN por querystring (atalho ?rr=1) e aplicar ?db=teste|treinamento # =============================== def _get_query_params(): """Compat: retorna query params como dict (Streamlit novo/antigo).""" try: return dict(st.query_params) # Streamlit >= 1.32 except Exception: try: return dict(st.experimental_get_query_params()) # Antigo except Exception: return {} def _set_query_params(new_params: dict): """Compat: define query params (Streamlit novo/antigo).""" try: st.query_params = new_params # Streamlit >= 1.32 except Exception: try: st.experimental_set_query_params(**new_params) except Exception: pass def _apply_db_choice_from_qs(): """Permite selecionar o banco via URL (?db=test|treinamento).""" try: params = _get_query_params() db = params.get("db") if not db: return db = db[0] if isinstance(db, (list, tuple)) else db sel = str(db).strip().lower() if _set_db_choice_func: try: _set_db_choice_func(sel) except Exception: os.environ["DB_CHOICE"] = sel st.session_state["__db_choice_override__"] = sel else: os.environ["DB_CHOICE"] = sel st.session_state["__db_choice_override__"] = sel except Exception: pass # 🔒 ANTI-TREMOR PATCH: não consumir rr=1 quando login/quiz ainda não concluídos def _check_rerun_qs(pagina_atual: str = ""): """ Se a URL contiver rr=1 (ou true), força um rerun e limpa o parâmetro para evitar loop. ✅ Bloqueios para módulos sensíveis e login/quiz. """ try: if st.session_state.get("__qs_rr_consumed__", False): return # Evita rr=1 quando a tela está em login/quiz (logo mais evidente) if (not st.session_state.get("logado")) or (not st.session_state.get("quiz_verificado")): return # Evita rr=1 em módulos sensíveis a rerun/refresh if pagina_atual in ("resposta", "outlook_relatorio", "formulario"): return params = _get_query_params() rr_raw = params.get("rr", ["0"]) rr = rr_raw[0] if isinstance(rr_raw, (list, tuple)) else str(rr_raw) if str(rr).lower() in ("1", "true"): new_params = {k: v for k, v in params.items() if k != "rr"} _set_query_params(new_params) st.session_state["__qs_rr_consumed__"] = True st.rerun() except Exception: pass # ========================================= # DB helper — sessão ciente do ambiente # ========================================= def _get_db_session(): """Retorna uma sessão de banco consistente com o ambiente atual.""" # 1) Se o router expuser get_session_for_current_db(), use try: from db_router import get_session_factory return get_session_factory()() except Exception: pass # 2) Se houver engine por ambiente try: from db_router import get_engine from sqlalchemy.orm import sessionmaker Eng = get_engine() return sessionmaker(bind=Eng)() except Exception: pass # 3) Fallback return SessionLocal() def quiz_respondido_hoje(usuario: str) -> bool: # ✅ Usar sessão ciente do ambiente db = _get_db_session() try: inicio_dia = datetime.combine(date.today(), time.min) return ( db.query(QuizPontuacao) .filter( QuizPontuacao.usuario == usuario, QuizPontuacao.data >= inicio_dia ) .first() is not None ) finally: try: db.close() except Exception: pass # =============================== # Sessões ativas (usuários logados agora) # =============================== _SESS_TTL_MIN = 5 # janela para considerar "online" def _get_session_id() -> str: if "_sid" not in st.session_state: st.session_state["_sid"] = f"{uuid4()}" return st.session_state["_sid"] def _ensure_sessao_table(db) -> None: """Cria a tabela sessao_web caso não exista (SQLite/Postgres/MySQL).""" dialect = db.bind.dialect.name if dialect == "sqlite": db.execute(text(""" CREATE TABLE IF NOT EXISTS sessao_web ( id INTEGER PRIMARY KEY AUTOINCREMENT, usuario TEXT NOT NULL, session_id TEXT NOT NULL UNIQUE, last_seen TIMESTAMP NOT NULL, ativo INTEGER NOT NULL DEFAULT 1 ) """)) elif dialect in ("postgresql", "postgres"): db.execute(text(""" CREATE TABLE IF NOT EXISTS sessao_web ( id SERIAL PRIMARY KEY, usuario TEXT NOT NULL, session_id TEXT NOT NULL UNIQUE, last_seen TIMESTAMPTZ NOT NULL, ativo BOOLEAN NOT NULL DEFAULT TRUE ) """)) else: # mysql / mariadb db.execute(text(""" CREATE TABLE IF NOT EXISTS sessao_web ( id INT AUTO_INCREMENT PRIMARY KEY, usuario VARCHAR(255) NOT NULL, session_id VARCHAR(255) NOT NULL UNIQUE, last_seen TIMESTAMP NOT NULL, ativo TINYINT(1) NOT NULL DEFAULT 1 ) """)) db.commit() def _session_heartbeat(usuario: str) -> None: """Atualiza/insere a sessão ativa do usuário com last_seen = now() e faz limpeza básica.""" if not usuario: return db = _get_db_session() try: _ensure_sessao_table(db) sid = _get_session_id() now_sql = "CURRENT_TIMESTAMP" upd = db.execute( text(f"UPDATE sessao_web SET last_seen = {now_sql}, ativo = 1 WHERE session_id = :sid"), {"sid": sid} ) if upd.rowcount == 0: db.execute( text(f"INSERT INTO sessao_web (usuario, session_id, last_seen, ativo) " f"VALUES (:usuario, :sid, {now_sql}, 1)"), {"usuario": usuario, "sid": sid} ) dialect = db.bind.dialect.name if dialect in ("postgresql", "postgres"): cleanup_sql = f"UPDATE sessao_web SET ativo = FALSE WHERE last_seen < (NOW() - INTERVAL '{_SESS_TTL_MIN * 2} minutes')" elif dialect == "sqlite": cleanup_sql = f"UPDATE sessao_web SET ativo = 0 WHERE last_seen < datetime(CURRENT_TIMESTAMP, '-{_SESS_TTL_MIN * 2} minutes')" else: cleanup_sql = f"UPDATE sessao_web SET ativo = 0 WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP, INTERVAL {_SESS_TTL_MIN * 2} MINUTE)" db.execute(text(cleanup_sql)) db.commit() except Exception: db.rollback() finally: try: db.close() except Exception: pass def _get_active_users_count() -> int: """Conta usuários distintos com last_seen dentro da janela (_SESS_TTL_MIN) e ativo=1.""" db = _get_db_session() try: _ensure_sessao_table(db) dialect = db.bind.dialect.name if dialect in ("postgresql", "postgres"): threshold = f"(NOW() - INTERVAL '{_SESS_TTL_MIN} minutes')" elif dialect == "sqlite": threshold = f"datetime(CURRENT_TIMESTAMP, '-{_SESS_TTL_MIN} minutes')" else: threshold = f"DATE_SUB(CURRENT_TIMESTAMP, INTERVAL {_SESS_TTL_MIN} MINUTE)" res = db.execute( text(f"SELECT COUNT(DISTINCT usuario) AS c FROM sessao_web WHERE ativo = 1 AND last_seen >= {threshold}") ).fetchone() return int(res[0] if res and res[0] is not None else 0) except Exception: return 0 finally: try: db.close() except Exception: pass def _mark_session_inactive() -> None: """Marca a sessão atual como inativa (chamar no logout).""" sid = st.session_state.get("_sid") if not sid: return db = _get_db_session() try: _ensure_sessao_table(db) db.execute(text("UPDATE sessao_web SET ativo = 0 WHERE session_id = :sid"), {"sid": sid}) db.commit() except Exception: db.rollback() finally: try: db.close() except Exception: pass # =============================== # Aviso Global — Util (leitura e sanitização) # =============================== def _sanitize_largura(largura_raw: str) -> str: val = (largura_raw or "").strip() if not val: return "100%" if val.endswith("%") or val.endswith("px"): return val if val.isdigit(): return f"{val}px" return "100%" def obter_aviso_ativo(db): try: aviso = ( db.query(AvisoGlobal) .filter(AvisoGlobal.ativo == True) .order_by(AvisoGlobal.updated_at.desc(), AvisoGlobal.created_at.desc()) .first() ) return aviso except Exception: return None # =============================== # Aviso Global — Render do banner superior (robusto) # =============================== def _render_aviso_global_topbar(): try: db = _get_db_session() except Exception as e: st.sidebar.warning(f"Aviso desativado: sessão indisponível ({e})") return aviso = None try: aviso = obter_aviso_ativo(db) except Exception as e: st.sidebar.warning(f"Aviso desativado: falha ao consultar ({e})") aviso = None finally: try: db.close() except Exception: pass if not aviso: return try: largura = _sanitize_largura(aviso.largura) bg = aviso.bg_color or "#FFF3CD" fg = aviso.text_color or "#664D03" efeito = (aviso.efeito or "marquee").lower() velocidade = int(aviso.velocidade or 20) try: font_size = max(10, min(int(getattr(aviso, "font_size", 14)), 48)) except Exception: font_size = 14 altura = 52 # px # 🔒 ANTI-TREMOR PATCH: padding-top constante e levemente maior que a topbar padding_top = max(60, altura + 8) st.markdown( f"""
{aviso.mensagem}
""", unsafe_allow_html=True ) except Exception as e: st.sidebar.warning(f"Aviso desativado: erro de render ({e})") return # =============================== # Logout (utilitário) # =============================== def logout(): """Finaliza a sessão do usuário, limpa estados e recarrega a aplicação.""" _mark_session_inactive() # marca esta sessão como inativa st.session_state.logado = False st.session_state.usuario = None st.session_state.perfil = None st.session_state.nome = None st.session_state.email = None st.session_state.quiz_verificado = False st.rerun() # =============================== # 🎂 Banner/efeito de aniversário # =============================== def _show_birthday_banner_if_needed(): if st.session_state.get("__show_birthday__"): st.session_state["__show_birthday__"] = False st.markdown( """
🎊
🎉
🎊
🎉
🎊
🎉
🎊
🎉
🎊
🎉
🎊
🎉
""", unsafe_allow_html=True ) st.balloons() nome = st.session_state.get("nome") or st.session_state.get("usuario") or "Usuário" st.markdown( f"""
🎉 Feliz Aniversário, {nome}! 🎉
""", unsafe_allow_html=True ) COR_FRASE = "#0d6efd" st.markdown( f"""
Desejamos a você muitas conquistas e bons embarques ao longo do ano! 💜
""", unsafe_allow_html=True ) # =============================== # 🔎 Painel de Diagnóstico de Autenticação (Admin) # =============================== def _render_auth_diag_panel(): """Renderiza informações de diagnóstico de autenticação (apenas Admin).""" try: info = db_info() except Exception: info = {"url": "(indisponível)", "using_router": _HAS_ROUTER} st.sidebar.markdown("### 🔎 Diagnóstico de Autenticação") st.sidebar.caption(f"DISABLE_AUTH = {os.getenv('DISABLE_AUTH')}") st.sidebar.caption(f"ALLOW_EMERGENCY_LOGIN = {os.getenv('ALLOW_EMERGENCY_LOGIN')}") st.sidebar.caption(f"EMERG_USER set? = {bool(os.getenv('EMERG_USER'))}") st.sidebar.caption(f"EMERG_PASS_BCRYPT set? = {bool(os.getenv('EMERG_PASS_BCRYPT'))}") st.sidebar.caption(f"DEMO_USER = {os.getenv('DEMO_USER') or '∅'}") st.sidebar.caption(f"DEMO_PERFIL = {os.getenv('DEMO_PERFIL') or '∅'}") st.sidebar.caption(f"DEMO_EMAIL = {os.getenv('DEMO_EMAIL') or '∅'}") st.sidebar.caption(f"Router habilitado = {_HAS_ROUTER}") try: _b_label = bank_label(current_db_choice()) if _HAS_ROUTER else ( "🟢 Produção" if current_db_choice() == "prod" else "🔴 Teste" ) st.sidebar.caption(f"Banco ativo (label) = {_b_label}") except Exception as e: st.sidebar.caption(f"Banco ativo (erro) = {e}") st.sidebar.caption(f"DB URL = {info.get('url')}") st.sidebar.caption(f"SessionState.logado = {st.session_state.get('logado')}") st.sidebar.caption(f"SessionState.usuario = {st.session_state.get('usuario')}") st.sidebar.caption(f"SessionState.perfil = {st.session_state.get('perfil')}") try: from login import login as _login_test # noqa: F401 st.sidebar.success("login.py importado ✅") except Exception as e: st.sidebar.error(f"Falha ao importar login.py: {e}") # =============================== # 🔒 ANTI-TREMOR PATCH: wrapper idempotente para a LOGO # =============================== def exibir_logo_once(top: bool = False, sidebar: bool = False): """ Garante que a logo só seja inserida uma vez por posição (top e/ou sidebar) por sessão. Se for chamada novamente, não reinjeta o HTML nem causa reflows. """ state = st.session_state.setdefault("__logo_once__", {"top": False, "sidebar": False}) need_top = bool(top and not state["top"]) need_sidebar = bool(sidebar and not state["sidebar"]) if need_top or need_sidebar: try: exibir_logo(top=need_top, sidebar=need_sidebar) finally: if need_top: state["top"] = True if need_sidebar: state["sidebar"] = True # =============================== # 🔧 Escolha do Banco (UI + setters) # =============================== def _db_choice_ui(): """ Renderiza na sidebar a seleção de banco de dados (Produção/Teste/Treinamento). Usa db_router quando disponível; caso não, mantém fallback visual. """ with st.sidebar.expander("🗄️ Banco de dados", expanded=True): # Obtém lista de choices if _get_available_choices: try: avail = list(_get_available_choices()) # ex.: ["prod","test","treinamento"] if not avail: avail = ["prod", "test", "treinamento"] except Exception: avail = ["prod", "test", "treinamento"] else: avail = ["prod", "test", "treinamento"] # Label amigável para cada opção def _lbl(c): try: return bank_label(c) except Exception: return {"prod": "🟢 Produção", "test": "🔴 Teste", "treinamento": "🟡 Treinamento"}.get(c, c) current = current_db_choice() if callable(current_db_choice) else st.session_state.get("__db_choice_override__", "prod") labels = [_lbl(c) for c in avail] # Mapeia índice atual try: idx = avail.index(current) except Exception: idx = 0 sel_label = st.selectbox( "Escolha o banco:", options=labels, index=idx, key="__db_choice_select__", ) # Resolve escolha → código sel_idx = labels.index(sel_label) sel_code = avail[sel_idx] # Aplica se mudou if sel_code != current: # Tenta aplicar no router (se houver) applied = False if _set_db_choice_func: try: _set_db_choice_func(sel_code) applied = True except Exception: applied = False # Fallback: guarda override local e exporta env var (se algum loader usar) if not applied: st.session_state["__db_choice_override__"] = sel_code os.environ["DB_CHOICE"] = sel_code # se o router/engine ler do env st.toast(f"Banco alterado para: {_lbl(sel_code)}", icon="🗄️") st.rerun() # 🔎 DEBUG da escolha/URL (ajuda a diagnosticar se o router está aplicando) try: from db_router import current_db_choice as _cur, bank_label as _lbl2, get_engine as _eng ch = _cur() eng = _eng() st.sidebar.caption(f"⚙️ DEBUG • Banco atual: {_lbl2(ch)} ({ch})") st.sidebar.caption(f"⚙️ DEBUG • URL: {eng.url}") except Exception as e: st.sidebar.caption(f"⚙️ DEBUG router fail: {e}") # =============================== # 🔴 Zerar banco (somente Admin, e apenas test/treinamento) # =============================== def _wipe_current_database_safely(): """ Zera totalmente o banco ATUAL selecionado no router (apenas test/treinamento). - SQLite: remove o arquivo físico e recria o schema. - Postgres/MySQL: drop_all + recreate. """ try: from db_router import current_db_choice except Exception: st.error("Router indisponível. Não é possível determinar o banco atual.") return from sqlalchemy.engine.url import make_url from banco import get_engine, init_schema, Base choice = (current_db_choice() or "").strip().lower() if choice not in ("test", "treinamento", "train"): st.error("Operação bloqueada: só é permitido zerar **Teste** ou **Treinamento**.") return eng = get_engine() url = str(eng.url) url_obj = make_url(url) try: if url_obj.drivername.startswith("sqlite"): # Caminho do arquivo path = url.replace("sqlite:///", "", 1) if path.startswith("//"): path = path[1:] file_path = os.path.abspath(path) # Fecha conexões e apaga arquivo try: eng.dispose() except Exception: pass try: if os.path.exists(file_path): os.remove(file_path) else: # Pode estar no ~/.ioirun/ home_alt = os.path.join(os.path.expanduser("~"), ".ioirun", os.path.basename(file_path)) if os.path.exists(home_alt): os.remove(home_alt) except Exception as e_rm: st.error(f"Falha ao remover arquivo SQLite: {e_rm}") return # Recria o schema do zero no arquivo recém criado try: init_schema() except Exception as e_schema: st.error(f"Falha ao recriar schema após remoção do arquivo: {e_schema}") return else: # Postgres/MySQL: drop_all + create_all try: Base.metadata.drop_all(bind=eng) except Exception as e_drop: st.error(f"Falha no drop de tabelas: {e_drop}") return try: init_schema() except Exception as e_create: st.error(f"Falha ao recriar schema: {e_create}") return st.success("✅ Banco zerado e schema recriado com sucesso.") try: st.toast("Banco zerado com sucesso.", icon="🗑️") except Exception: pass # Rerun para refletir o novo estado st.rerun() except Exception as e: st.error(f"Erro ao zerar o banco: {e}") # =============================== # MAIN # =============================== def main(): # Aplicar escolha via URL (ex.: ?db=test) _apply_db_choice_from_qs() # Estados iniciais if "logado" not in st.session_state: st.session_state.logado = False if "usuario" not in st.session_state: st.session_state.usuario = None if "quiz_verificado" not in st.session_state: st.session_state.quiz_verificado = False if "user_responses_viewed" not in st.session_state: st.session_state.user_responses_viewed = False if "nav_target" not in st.session_state: st.session_state.nav_target = None st.session_state.setdefault("__auto_refresh_interval_sec__", 60) st.session_state.setdefault("__auth_diag__", False) # 🔧 estado do painel de diagnóstico (Admin) # ===== Seleção de banco (sempre visível na sidebar) ===== _db_choice_ui() # 🔧 Manutenção do banco atual: Schema + init_db.run() + Zerar banco (Admin) try: from banco import init_schema with st.sidebar.expander("⚙️ Manutenção do banco atual", expanded=False): if st.button("Criar/atualizar schema neste banco", key="__btn_init_schema__", type="secondary"): try: init_schema() st.sidebar.success("Schema criado/atualizado no banco selecionado com sucesso.") except Exception as e: st.sidebar.error(f"Falha ao criar/atualizar schema: {e}") # Botão para rodar init_db.run() no banco atual if _HAS_INIT_DB: c1, c2 = st.columns(2) sure1 = c1.checkbox("Confirmo", key="__init_confirm1__") sure2 = c2.checkbox("Estou ciente", key="__init_confirm2__") if st.button("Rodar init_db.run()", key="__btn_run_initdb__", help="Cria usuários padrão e garante schema", type="secondary"): if not (sure1 and sure2): st.warning("Marque as duas confirmações para executar.") else: try: _init_db.run() st.success("init_db.run() executado com sucesso no banco selecionado.") except Exception as e: st.error(f"Falha ao rodar init_db.run(): {e}") # 🔴 Zerar banco (somente Admin, e nunca em Produção) try: perfil_atual = (st.session_state.get("perfil") or "").strip().lower() ch = current_db_choice() if _HAS_ROUTER else "prod" except Exception: perfil_atual = (st.session_state.get("perfil") or "").strip().lower() ch = "prod" if perfil_atual == "admin": st.markdown("---") st.markdown("### 🧨 Zerar banco (somente Admin)") # Exibir apenas se não for produção if ch in ("test", "treinamento", "train"): st.info("Esta ação **apaga todo o conteúdo** do banco **selecionado** (Test/Treinamento) e recria o schema. **Irreversível.**") colz1, colz2 = st.columns(2) z_conf1 = colz1.checkbox("Confirmo que entendo os riscos", key="__wipe_c1__") z_conf2 = colz2.checkbox("Estou ciente que é irreversível", key="__wipe_c2__") safe_phrase = st.text_input("Digite **ZERAR** para habilitar", key="__wipe_phrase__") # Seleção explícita do banco-alvo (precisa coincidir com o atual) alvo = st.selectbox( "Banco a zerar (deve ser o banco atual):", ["test", "treinamento"], index=(0 if ch == "test" else 1 if ch in ("treinamento", "train") else 0), key="__wipe_target__" ) btn_wipe = st.button("🗑️ ZERAR banco **ATUAL**", key="__btn_wipe_db__", type="secondary") if btn_wipe: if alvo not in ("test", "treinamento"): st.error("Selecione um alvo válido (test/treinamento).") elif alvo != ("treinamento" if ch in ("treinamento", "train") else ch): st.error("O alvo selecionado **não coincide** com o banco atual. Ajuste a seleção.") elif not (z_conf1 and z_conf2): st.error("Marque as duas confirmações para continuar.") elif safe_phrase.strip().upper() != "ZERAR": st.error("Frase de segurança incorreta. Digite exatamente **ZERAR**.") else: _wipe_current_database_safely() else: st.warning("Zerar banco indisponível: **Produção** está selecionado. Troque para **Teste** ou **Treinamento** para habilitar.") except Exception: pass # ⚙️ Criação automática do schema (leve) após seleção do banco (rodará no banco correto) try: Base.metadata.create_all(bind=engine) except Exception as e: st.sidebar.warning(f"Schema não foi criado automaticamente: {e}") # LOGIN if not st.session_state.logado: st.session_state.quiz_verificado = False # 🔒 ANTI-TREMOR: renderiza logo top apenas 1x exibir_logo_once(top=True, sidebar=False) login() # 🔕 REMOVIDO: mensagem de dica de autenticação no Spaces return # 👥 Heartbeat + Badge de usuários logados (APENAS ADMIN) _session_heartbeat(st.session_state.usuario) if (st.session_state.get("perfil") or "").strip().lower() == "admin": try: online_now = _get_active_users_count() except Exception: online_now = 0 st.sidebar.markdown( f"""
🟢 Online (últimos {_SESS_TTL_MIN} min)
{online_now}
""", unsafe_allow_html=True ) # 🔘 Botão Admin: habilitar/desabilitar painel de diagnóstico st.sidebar.markdown("---") if st.session_state.get("__auth_diag__"): if st.sidebar.button("🧪 Desativar diagnóstico de login (Admin)", key="__btn_auth_diag_off__", type="secondary"): st.session_state["__auth_diag__"] = False st.rerun() else: if st.sidebar.button("🧪 Ativar diagnóstico de login (Admin)", key="__btn_auth_diag_on__", type="secondary"): st.session_state["__auth_diag__"] = True st.rerun() # Render do painel (se ativo) if st.session_state.get("__auth_diag__"): _render_auth_diag_panel() # ============================ # 🔬 Diagnóstico profundo do banco (somente Admin + painel ativo) # ============================ from sqlalchemy import inspect # import local para evitar custo desnecessário with st.sidebar.expander("🧪 Diagnóstico Profundo do Banco", expanded=False): dbx = _get_db_session() try: eng = dbx.bind st.caption(f"Engine URL: {eng.url}") # 1) SELECT 1 try: dbx.execute(text("SELECT 1")) st.success("SELECT 1 OK") except Exception as e: st.error(f"SELECT 1 falhou: {e}") # 2) Tabelas registradas insp = inspect(eng) tables = insp.get_table_names() st.write("Tabelas:", tables) # 3) Detectar tabela de usuários user_tables_candidates = ["usuarios", "usuario", "Usuario", "users", "auth_user"] target_table = next((t for t in user_tables_candidates if t in tables), None) if not target_table: st.warning("Tabela de usuários não encontrada. Verifique nomes e migrations.") else: st.write(f"Usando tabela: **{target_table}**") # 4) Colunas cols = [c["name"] for c in insp.get_columns(target_table)] st.write("Colunas:", cols) # 5) Contagem try: cnt = dbx.execute(text(f"SELECT COUNT(*) FROM {target_table}")).fetchone()[0] st.write("Quantidade de registros:", cnt) except Exception as e: st.error(f"COUNT(*) falhou: {e}") # 6) Amostra de usuários (sem senhas) try: sel_cols = [c for c in ["usuario", "email", "perfil", "nome"] if c in cols] sel_expr = ", ".join(sel_cols) if sel_cols else "*" amostra = dbx.execute(text(f"SELECT {sel_expr} FROM {target_table} LIMIT 5")) rows = [dict(r._mapping) for r in amostra] st.write("Amostra:", rows) except Exception as e: st.error(f"Amostra falhou: {e}") # 7) Teste de bcrypt (não grava nada) import bcrypt st.markdown("---") st.caption("Teste de verificação bcrypt (não grava nada)") test_user = st.text_input("Usuário para testar hash", "", key="__bcrypt_user__") test_pass = st.text_input("Senha para testar hash", "", type="password", key="__bcrypt_pass__") if st.button("Testar bcrypt com este usuário", key="__btn_bcrypt_test__", type="secondary"): try: pass_cols = [c for c in ["senha_hash", "password_hash", "senha", "hash"] if c in cols] user_cols = [c for c in ["usuario", "username", "login"] if c in cols] if not pass_cols or not user_cols: st.error("Não encontrei colunas de senha/usuário na tabela.") else: c_pass = pass_cols[0] c_user = user_cols[0] res = dbx.execute( text(f"SELECT {c_user} AS u, {c_pass} AS h FROM {target_table} WHERE {c_user} = :u LIMIT 1"), {"u": test_user.strip()} ).fetchone() if not res: st.error("Usuário não encontrado na tabela.") else: senha_hash = res._mapping.get("h") if not senha_hash: st.error(f"Coluna '{c_pass}' vazia/nula — login normal não vai entrar.") else: try: ok = bcrypt.checkpw(test_pass.encode("utf-8"), str(senha_hash).encode("utf-8")) st.success(f"bcrypt.checkpw = {ok}") if not ok: st.info("Se a senha no banco não for bcrypt, gere um hash e atualize o registro.") except Exception as e: st.error(f"Falha no bcrypt: {e}") st.info("Um hash válido geralmente começa com '$2a$' ou '$2b$' e tem 60 caracteres.") except Exception as e: st.error(f"Erro no teste: {e}") finally: try: dbx.close() except Exception: pass # ============================ # 🧰 Ações de manutenção (somente Admin) # ============================ with st.sidebar.expander("🧰 Manutenção (Admin)", expanded=False): col_m1, col_m2 = st.columns(2) if col_m1.button("🧹 Limpar cache agora", key="__btn_clear_cache__", help="Limpa cache de dados/recursos", type="secondary"): try: st.cache_data.clear() except Exception: pass try: st.cache_resource.clear() except Exception: pass st.sidebar.success("Caches limpos.") st.rerun() # Rodar init_db.run() com dupla confirmação (Admin) if _HAS_INIT_DB: c1, c2 = st.columns(2) sure1 = c1.checkbox("Confirmo", key="__btn_run_initdb_admin_c1__") sure2 = c2.checkbox("Estou ciente", key="__btn_run_initdb_admin_c2__") if st.button("🧬 Rodar init_db.run()", key="__btn_run_initdb_admin__", type="secondary"): if not (sure1 and sure2): st.warning("Marque as duas confirmações para executar.") else: try: _init_db.run() with open(_INIT_MARK, "w", encoding="utf-8") as f: f.write(f"init manual at {datetime.now().isoformat()}\n") st.success("init_db.run() executado com sucesso.") except Exception as e: st.error(f"Falha no init_db.run(): {e}") # 🔄 Botão de Recarregar (mantém a sessão ativa) + ⏱️ Controle do intervalo st.sidebar.markdown("---") col_reload, col_interval = st.sidebar.columns([1, 1]) if col_reload.button("🔄 Recarregar (sem sair)", key="__btn_reload_now__", type="secondary"): st.rerun() if hasattr(st, "popover"): with col_interval.popover("⏱️ Autoatualização"): new_val = st.number_input( "Intervalo (segundos) — 0 desativa", min_value=0, max_value=3600, value=int(st.session_state["__auto_refresh_interval_sec__"]), step=5, key="__auto_refresh_input__" ) if st.button("Aplicar intervalo", key="__btn_apply_auto_refresh__", type="secondary"): st.session_state["__auto_refresh_interval_sec__"] = int(new_val) try: if int(new_val) > 0: st.toast(f"Autoatualização ajustada para {int(new_val)}s.", icon="⏱️") else: st.toast("Autoatualização desativada.", icon="⛔") except Exception: pass st.rerun() else: with st.sidebar.expander("⏱️ Autoatualização", expanded=False): new_val = st.number_input( "Intervalo (segundos) — 0 desativa", min_value=0, max_value=3600, value=int(st.session_state["__auto_refresh_interval_sec__"]), step=5, key="__auto_refresh_input__" ) if st.button("Aplicar intervalo", key="__btn_apply_auto_refresh__", type="secondary"): st.session_state["__auto_refresh_interval_sec__"] = int(new_val) try: if int(new_val) > 0: st.toast(f"Autoatualização ajustada para {int(new_val)}s.", icon="⏱️") else: st.toast("Autoatualização desativada.", icon="⛔") except Exception: pass st.rerun() usuario = st.session_state.usuario perfil = (st.session_state.get("perfil") or "usuario").strip().lower() # QUIZ if not st.session_state.quiz_verificado: if not quiz_respondido_hoje(usuario): # 🔒 ANTI-TREMOR: render 1x exibir_logo_once(top=True, sidebar=False) quiz.main() return else: st.session_state.quiz_verificado = True st.rerun() # SISTEMA LIBERADO # 🔒 ANTI-TREMOR: render 1x (topo + sidebar) exibir_logo_once(top=True, sidebar=True) _render_aviso_global_topbar() _show_birthday_banner_if_needed() st.sidebar.markdown("### Menu | 🎉 Carnaval 🎭 2026!") # Banco ativo na sidebar (badge rápido) try: _b_label = bank_label(current_db_choice()) if _HAS_ROUTER else bank_label(current_db_choice()) st.sidebar.caption(f"🗄️ Banco ativo: {_b_label}") except Exception: pass # ========================= # Notificações no sidebar # ========================= # --- Admin: pendentes --- if perfil == "admin": try: db = _get_db_session() pendentes = db.query(IOIRunSugestao).filter(func.lower(IOIRunSugestao.status) == "pendente").count() except Exception: pendentes = 0 finally: try: db.close() except Exception: pass if pendentes > 0: st.sidebar.markdown( """
🔔 {pendentes} sugestão(ões) pendente(s)
Acesse a caixa de entrada para responder.
""".format(pendentes=pendentes), unsafe_allow_html=True ) # 👉 Direciona para o MESMO módulo do menu (resposta.main()) if st.sidebar.button("📬 Abrir Caixa de Entrada (Admin)", key="__btn_open_inbox__", type="secondary"): st.session_state.nav_target = "resposta" st.rerun() # --- Usuário: respostas novas (após último 'visto') --- if perfil != "admin": last_seen_dt = st.session_state.get("__user_last_answer_seen__") try: db = _get_db_session() last_answer_dt_row = ( db.query(IOIRunSugestao.data_resposta) .filter( IOIRunSugestao.usuario == usuario, func.lower(IOIRunSugestao.status) == "respondida", IOIRunSugestao.data_resposta != None ) .order_by(IOIRunSugestao.data_resposta.desc()) .first() ) last_answer_dt = last_answer_dt_row[0] if last_answer_dt_row else None if last_answer_dt and (not last_seen_dt or last_answer_dt > last_seen_dt): st.session_state.user_responses_viewed = False novas_respostas = ( db.query(IOIRunSugestao) .filter( IOIRunSugestao.usuario == usuario, func.lower(IOIRunSugestao.status) == "respondida", (IOIRunSugestao.data_resposta > last_seen_dt) if last_seen_dt else (IOIRunSugestao.data_resposta != None) ) .count() ) except Exception: novas_respostas = 0 finally: try: db.close() except Exception: pass if novas_respostas > 0 and not st.session_state.get("user_responses_viewed", False): st.sidebar.markdown( """
🔔 {resps} resposta(s) nova(s) para suas sugestões
Clique para ver suas respostas.
""".format(resps=novas_respostas), unsafe_allow_html=True ) if not st.session_state.get("__user_toast_shown__"): try: st.toast("Você tem novas respostas do IOI‑RUN. Clique em '📥 Ver respostas'.", icon="💬") except Exception: pass st.session_state["__user_toast_shown__"] = True if st.sidebar.button("📥 Ver respostas", key="__btn_view_answers__", type="secondary"): st.session_state.nav_target = "sugestoes_ioirun" st.session_state.user_responses_viewed = True st.rerun() else: st.session_state["__user_toast_shown__"] = False # ------------------------- Menu lateral ------------------------- termo_busca = st.sidebar.text_input("Pesquisar módulo:").strip().lower() try: ambiente_atual = current_db_choice() if _HAS_ROUTER else current_db_choice() except Exception: ambiente_atual = current_db_choice() grupos_disponiveis = obter_grupos_disponiveis( MODULES, perfil=st.session_state.get("perfil", "usuario"), usuario=st.session_state.get("usuario"), ambiente=ambiente_atual, verificar_permissao=verificar_permissao ) if not grupos_disponiveis: st.sidebar.selectbox("Selecione a operação:", ["Em desenvolvimento"], index=0) st.warning("Nenhuma operação disponível para seu perfil/ambiente neste momento.") return grupo_escolhido = st.sidebar.selectbox("Selecione a operação:", grupos_disponiveis) opcoes = obter_modulos_para_grupo( MODULES, grupo_escolhido, termo_busca, perfil=st.session_state.get("perfil", "usuario"), usuario=st.session_state.get("usuario"), ambiente=ambiente_atual, verificar_permissao=verificar_permissao ) with st.sidebar.expander("🔧 Diagnóstico do menu", expanded=False): st.caption(f"Perfil: **{st.session_state.get('perfil', '—')}** | Grupo: **{grupo_escolhido}** | Busca: **{termo_busca or '∅'}** | Ambiente: **{ambiente_atual}**") try: mods_dbg = [{"id": mid, "label": lbl} for mid, lbl in (opcoes or [])] except Exception: mods_dbg = [] st.write("Módulos visíveis (após regras):", mods_dbg if mods_dbg else "—") # Failsafe outlook_relatorio try: mod_outlook = MODULES.get("outlook_relatorio") if mod_outlook: mesmo_grupo = (mod_outlook.get("grupo") == grupo_escolhido) perfil_ok = verificar_permissao( perfil=st.session_state.get("perfil", "usuario"), modulo_key="outlook_relatorio", usuario=st.session_state.get("usuario"), ambiente=ambiente_atual ) ja_nas_opcoes = any(mid == "outlook_relatorio" for mid, _ in (opcoes or [])) passa_busca = (not termo_busca) or (termo_busca in mod_outlook.get("label", "").strip().lower()) if mesmo_grupo and perfil_ok and not ja_nas_opcoes and passa_busca: opcoes = (opcoes or []) + [("outlook_relatorio", mod_outlook.get("label", "Relatório portaria"))] except Exception: pass # Failsafe repositorio_load try: mod_repo = MODULES.get("repositorio_load") if mod_repo: mesmo_grupo_r = (mod_repo.get("grupo") == grupo_escolhido) perfil_ok_r = verificar_permissao( perfil=st.session_state.get("perfil", "usuario"), modulo_key="repositorio_load", usuario=st.session_state.get("usuario"), ambiente=ambiente_atual ) ja_nas_opcoes_r = any(mid == "repositorio_load" for mid, _ in (opcoes or [])) passa_busca_r = (not termo_busca) or (termo_busca in mod_repo.get("label", "").strip().lower()) if mesmo_grupo_r and perfil_ok_r and not ja_nas_opcoes_r and passa_busca_r: opcoes = (opcoes or []) + [("repositorio_load", mod_repo.get("label", "Repositório Load"))] except Exception: pass if not opcoes: st.sidebar.selectbox("Selecione o módulo:", ["Em desenvolvimento"], index=0) st.warning(f"A operação '{grupo_escolhido}' está em desenvolvimento.") return # ============================================================ # 🔒 Fix: selectbox com 'key' + seleção forçada para 'resposta' # quando vier de nav_target (sidebar) ou quando já estivermos na página. # ============================================================ labels = [label for _, label in opcoes] # Se foi solicitado nav_target, injeta a label alvo antes do selectbox if st.session_state.get("nav_target"): target = st.session_state["nav_target"] try: target_label = next(lbl for mid, lbl in opcoes if mid == target) st.session_state["mod_select_label"] = target_label except StopIteration: pass # Inicializa/persiste seleção if "mod_select_label" not in st.session_state or st.session_state["mod_select_label"] not in labels: st.session_state["mod_select_label"] = labels[0] escolha_label = st.sidebar.selectbox( "Selecione o módulo:", labels, index=labels.index(st.session_state["mod_select_label"]), key="mod_select_label" ) pagina_id = next(mod_id for mod_id, label in opcoes if label == escolha_label) # ✅ Navegação com lock (evita disputa com outros reruns) if st.session_state.get("nav_target"): pagina_id = st.session_state.nav_target st.session_state["__nav_lock__"] = True else: st.session_state["__nav_lock__"] = False # 🔎 Agora que sabemos a página atual, tratamos rr=1 com segurança _check_rerun_qs(pagina_atual=pagina_id) # ⏱️ Auto-refresh leve do sidebar — NÃO quando em Inbox/Admin/Outlook/Formulário/Recebimento try: from streamlit_autorefresh import st_autorefresh is_inbox_admin = (pagina_id == "resposta") is_outlook_rel = (pagina_id == "outlook_relatorio") is_formulario = (pagina_id == "formulario") is_recebimento = (pagina_id == "recebimento") # 🔒 ANTI-TREMOR: incluir telas com logo topo (login/quiz/primeira tela) is_login_or_quiz = (not st.session_state.get("logado")) or (not st.session_state.get("quiz_verificado")) interval_sec = int(st.session_state.get("__auto_refresh_interval_sec__", 60)) if (interval_sec > 0) and not (is_inbox_admin or is_outlook_rel or is_formulario or is_recebimento or is_login_or_quiz): st_autorefresh(interval=interval_sec * 1000, key=f"sidebar_autorefresh_{interval_sec}s") except Exception: pass # Logout st.sidebar.markdown("---") if st.session_state.get("logado"): if st.sidebar.button("🚪 Sair (Logout)", key="__btn_logout__", type="primary"): logout() st.divider() # ------------------------- Roteamento ------------------------- if pagina_id == "formulario": formulario.main() elif pagina_id == "consulta": consulta.main() elif pagina_id == "relatorio": relatorio.main() elif pagina_id == "ranking": ranking.main() elif pagina_id == "quiz": quiz.main() ranking.main() elif pagina_id == "quiz_admin": quiz_admin.main() elif pagina_id == "usuarios": usuarios_admin.main() elif pagina_id == "administracao": administracao.main() elif pagina_id == "videos": videos.main() elif pagina_id == "auditoria": auditoria.main() elif pagina_id == "auditoria_cleanup": auditoria_cleanup.main() elif pagina_id == "importacao": importar_excel.main() elif pagina_id == "calendario": calendario.main() elif pagina_id == "calendario_mensal" and _HAS_CAL_MENSAL: calendario_mensal.main() elif pagina_id == "jogos": st.session_state.setdefault("pontuacao", 0) st.session_state.setdefault("rodadas", 0) st.session_state.setdefault("ultimo_resultado", None) jogos.main() elif pagina_id == "temporario": db_tools.main() elif pagina_id == "db_admin": db_admin.main() elif pagina_id == "db_monitor": db_monitor.main() elif pagina_id == "operacao": operacao.main() elif pagina_id == "resposta": # 📬 Admin resposta.main() elif pagina_id == "db_export_import": db_export_import.main() elif pagina_id == "produtividade_especialista": produtividade_especialista.main() elif pagina_id == "outlook_relatorio": outlook_relatorio.main() elif pagina_id == "sugestoes_ioirun": # 💡 Usuário if st.session_state.get("perfil") == "admin": st.info("Use a **📬 Caixa de Entrada (Admin)** para responder sugestões.") else: sugestoes_usuario.main() elif pagina_id == "repositorio_load": repositorio_load.main() elif pagina_id == "rnc": rnc.pagina() elif pagina_id == "rnc_listagem": rnc_listagem.pagina() elif pagina_id == "rnc_relatorio": rnc_relatorio.pagina() elif pagina_id == "repo_rnc": repo_rnc.pagina() elif pagina_id == "recebimento": recebimento.main() # ------------------------------------------------------ # ℹ️ INFO — Guia passo a passo de uso (no sidebar) # ------------------------------------------------------ info_mod_default = INFO_MAP_PAGINA_ID.get(pagina_id, "Geral") with st.sidebar.expander("ℹ️ Info • Como usar o sistema", expanded=False): st.markdown(""" **Bem-vindo!** Este painel reúne instruções rápidas para utilizar cada módulo e seus campos. Selecione o módulo abaixo ou navegue pelo menu — o conteúdo ajusta automaticamente. """) mod_info_sel = st.selectbox( "Escolha o módulo para ver instruções:", INFO_MODULOS, index=INFO_MODULOS.index(info_mod_default) if info_mod_default in INFO_MODULOS else 0, key="info_mod_sel" ) st.markdown(INFO_CONTEUDO.get(mod_info_sel, "_Conteúdo não disponível para este módulo._")) # ✅ Libera o nav_target após a 1ª render da página de destino if st.session_state.get("__nav_lock__"): st.session_state["nav_target"] = None st.session_state["__nav_lock__"] = False if __name__ == "__main__": main() # ------------------------- # Desenvolvedor e versão # ------------------------- if st.session_state.get("logado") and st.session_state.get("email"): st.sidebar.markdown( f"""
👤 {st.session_state.email}
""", unsafe_allow_html=True ) st.sidebar.markdown( """

Versão: 1.0.0 • Desenvolvedor: Rodrigo Silva - Ideiasystem | 2026

""", unsafe_allow_html=True )