| |
|
| |
|
| | import streamlit as st
|
| | import os
|
| | import shutil
|
| | import time
|
| | from datetime import datetime, timedelta
|
| | from sqlalchemy import text
|
| |
|
| | from banco import get_engine, SessionLocal, db_info
|
| | from utils_permissoes import verificar_permissao
|
| | from utils_auditoria import registrar_log
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | try:
|
| | from db_router import current_db_choice, bank_label
|
| | _HAS_ROUTER = True
|
| | except Exception:
|
| | _HAS_ROUTER = False
|
| | def current_db_choice() -> str:
|
| | return "prod"
|
| | def bank_label(choice: str) -> str:
|
| | return {"prod": "Banco 1 (Produção)", "test": "Banco 2 (Teste)", "treinamento": "Banco 3 (Treinamento)"}\
|
| | .get(choice, choice)
|
| |
|
| |
|
| |
|
| |
|
| | def _engine():
|
| | """Retorna o engine do banco ATIVO (de acordo com a escolha no login)."""
|
| | return get_engine()
|
| |
|
| | def _dialeto():
|
| | try:
|
| | return _engine().url.get_backend_name()
|
| | except Exception:
|
| | return "desconhecido"
|
| |
|
| | def _sqlite_version():
|
| | if _dialeto() != "sqlite":
|
| | return None
|
| | try:
|
| | with _engine().begin() as conn:
|
| | return conn.execute(text("select sqlite_version()")).scalar()
|
| | except Exception:
|
| | return None
|
| |
|
| |
|
| |
|
| |
|
| | def _db_file_path():
|
| |
|
| | try:
|
| | eng = _engine()
|
| | return eng.url.database if eng.url.get_backend_name() == "sqlite" else None
|
| | except Exception:
|
| | return None
|
| |
|
| | def _sqlite_stats():
|
| |
|
| | db_path = _db_file_path()
|
| | if not db_path or not os.path.exists(db_path):
|
| | return None
|
| |
|
| | size_bytes = os.path.getsize(db_path)
|
| | dir_path = os.path.dirname(os.path.abspath(db_path)) or "."
|
| | total, used, free = shutil.disk_usage(dir_path)
|
| |
|
| | with _engine().begin() as conn:
|
| | page_count = conn.execute(text("PRAGMA page_count")).scalar()
|
| | page_size = conn.execute(text("PRAGMA page_size")).scalar()
|
| |
|
| | return {
|
| | "db_path": db_path,
|
| | "size_bytes": size_bytes,
|
| | "page_count": page_count,
|
| | "page_size": page_size,
|
| | "calc_bytes": (page_count or 0) * (page_size or 0),
|
| | "disk_total": total,
|
| | "disk_free": free,
|
| | "disk_used": used,
|
| | "sqlite_version": _sqlite_version(),
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | def _ensure_dir(path: str):
|
| | os.makedirs(path, exist_ok=True)
|
| |
|
| | def _fmt_bytes(b: int) -> str:
|
| |
|
| | for unit in ["B","KB","MB","GB","TB"]:
|
| | if b < 1024.0:
|
| | return f"{b:,.2f} {unit}".replace(",", ".")
|
| | b /= 1024.0
|
| | return f"{b:,.2f} PB".replace(",", ".")
|
| |
|
| | def _listar_backups(backup_dir: str, base_name: str):
|
| | """Lista backups para o banco atual. Formato: base_name-YYYYMMDD-HHMMSS.db (ou .zip futuramente)"""
|
| | if not os.path.isdir(backup_dir):
|
| | return []
|
| | files = []
|
| | for f in os.listdir(backup_dir):
|
| | if f.startswith(base_name + "-") and f.endswith(".db"):
|
| | full = os.path.join(backup_dir, f)
|
| | files.append((f, full, os.path.getmtime(full)))
|
| | return sorted(files, key=lambda x: x[2], reverse=True)
|
| |
|
| | def _executar_backup(backup_dir: str):
|
| | """Copia o .db para backups/ com timestamp. Registra auditoria."""
|
| | db_path = _db_file_path()
|
| | if not db_path or not os.path.exists(db_path):
|
| | st.error("Arquivo de banco SQLite não encontrado.")
|
| | return False
|
| |
|
| | _ensure_dir(backup_dir)
|
| | base_name = os.path.splitext(os.path.basename(db_path))[0]
|
| | stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
| | dest = os.path.join(backup_dir, f"{base_name}-{stamp}.db")
|
| |
|
| | try:
|
| | shutil.copyfile(db_path, dest)
|
| | registrar_log(st.session_state.get("usuario"), f"BACKUP criado: {os.path.basename(dest)}", "schema", None)
|
| | st.success(f"✅ Backup criado: {dest}")
|
| | return True
|
| | except Exception as e:
|
| | st.error(f"Erro ao criar backup: {e}")
|
| | return False
|
| |
|
| | def _limpar_antigos(backup_dir: str, base_name: str, manter: int):
|
| | """Remove backups antigos, mantendo N mais recentes. Registra auditoria."""
|
| | lst = _listar_backups(backup_dir, base_name)
|
| | if len(lst) <= manter:
|
| | st.info("Nada para remover: já dentro da retenção.")
|
| | return 0
|
| | remover = lst[manter:]
|
| | count = 0
|
| | for _, full, _ in remover:
|
| | try:
|
| | os.remove(full)
|
| | count += 1
|
| | except Exception as e:
|
| | st.error(f"Erro ao remover {full}: {e}")
|
| | if count > 0:
|
| | registrar_log(st.session_state.get("usuario"), f"CLEAN backups antigos: {count} removidos (retain={manter})", "schema", None)
|
| | st.success(f"🧹 {count} backup(s) antigo(s) removido(s).")
|
| | return count
|
| |
|
| |
|
| |
|
| |
|
| | def main():
|
| | st.title("🗄️ Monitor e Backup do Banco")
|
| |
|
| |
|
| | if st.session_state.get("perfil") != "admin":
|
| | st.error("⛔ Acesso restrito ao administrador.")
|
| | return
|
| |
|
| |
|
| | try:
|
| | amb = current_db_choice()
|
| | st.caption(f"🧭 Ambiente: {bank_label(amb)}")
|
| | except Exception:
|
| | pass
|
| | try:
|
| | info = db_info()
|
| | st.caption(f"🔗 Engine URL: {info.get('url')}")
|
| | except Exception:
|
| | pass
|
| |
|
| | dial = _dialeto()
|
| | st.caption(f"Dialeto do banco: **{dial}**")
|
| |
|
| |
|
| | stats = _sqlite_stats() if dial == "sqlite" else None
|
| |
|
| |
|
| | if dial != "sqlite":
|
| | st.info("Este monitor está otimizado para SQLite. Para PostgreSQL/MySQL, configure backup via ferramenta da plataforma (pg_dump/mysqldump) e agendamento externo.")
|
| | st.stop()
|
| |
|
| | if not stats:
|
| | st.error("Banco SQLite não encontrado ou inacessível. Verifique o arquivo do banco ativo.")
|
| | return
|
| |
|
| |
|
| | st.subheader("📊 Estatísticas")
|
| | colA, colB, colC = st.columns(3)
|
| | with colA:
|
| | st.metric("Arquivo", os.path.basename(stats["db_path"]))
|
| | st.metric("Tamanho do banco (arquivo)", _fmt_bytes(stats["size_bytes"]))
|
| | with colB:
|
| | st.metric("Páginas (PRAGMA)", f'{stats["page_count"]} × {stats["page_size"]} B')
|
| | st.metric("Cálculo (page_count×page_size)", _fmt_bytes(stats["calc_bytes"]))
|
| | with colC:
|
| | st.metric("Espaço livre no disco", _fmt_bytes(stats["disk_free"]))
|
| | st.metric("SQLite version", stats["sqlite_version"] or "—")
|
| |
|
| | st.divider()
|
| |
|
| |
|
| | st.subheader("🎯 Capacidade & Ocupação")
|
| | capacidade_gb = st.number_input("Capacidade alvo (GB) — alerta quando ultrapassar", min_value=0.1, value=1.0, step=0.1)
|
| | ocupacao_perc = min(100.0, (stats["size_bytes"] / (capacidade_gb * 1024**3)) * 100.0) if capacidade_gb > 0 else 0.0
|
| |
|
| | st.progress(min(1.0, ocupacao_perc / 100.0))
|
| | st.caption(f"Ocupação estimada: **{ocupacao_perc:,.2f}%** de {capacidade_gb} GB")
|
| |
|
| | if ocupacao_perc >= 80.0:
|
| | st.warning("⚠️ Ocupação acima de 80%. Considere backup/arquivamento.")
|
| |
|
| | st.divider()
|
| |
|
| |
|
| | st.subheader("🗓️ Planejamento de Backup")
|
| | backup_dir = st.text_input("Pasta de backups", value="backups")
|
| | _ensure_dir(backup_dir)
|
| | base_name = os.path.splitext(os.path.basename(stats["db_path"]))[0]
|
| | backups = _listar_backups(backup_dir, base_name)
|
| |
|
| |
|
| | ultimo_backup_dt = datetime.fromtimestamp(backups[0][2]) if backups else None
|
| | freq_dias = st.number_input("Frequência (dias)", min_value=1, value=7)
|
| | retencao = st.number_input("Retenção máx. de backups (arquivos)", min_value=1, value=10)
|
| | proximo_backup_dt = (ultimo_backup_dt + timedelta(days=freq_dias)) if ultimo_backup_dt else (datetime.now() + timedelta(days=freq_dias))
|
| |
|
| | col1, col2, col3 = st.columns(3)
|
| | with col1:
|
| | st.metric("Último backup", ultimo_backup_dt.strftime("%d/%m/%Y %H:%M:%S") if ultimo_backup_dt else "—")
|
| | with col2:
|
| | st.metric("Próximo previsto", proximo_backup_dt.strftime("%d/%m/%Y %H:%M:%S"))
|
| | with col3:
|
| | st.metric("Backups atuais", len(backups))
|
| |
|
| |
|
| | if ultimo_backup_dt and datetime.now() >= proximo_backup_dt:
|
| | st.warning("⏰ Backup previsto já venceu. Execute agora para manter o plano.")
|
| |
|
| |
|
| | st.subheader("⚙️ Ações")
|
| | colX, colY, colZ = st.columns(3)
|
| | with colX:
|
| | if st.button("💾 Backup agora"):
|
| | if _executar_backup(backup_dir):
|
| | st.rerun()
|
| | with colY:
|
| | if st.button("🧹 Limpar antigos (manter retenção)"):
|
| | _limpar_antigos(backup_dir, base_name, int(retencao))
|
| | st.rerun()
|
| | with colZ:
|
| |
|
| | if backups:
|
| | st.write("Últimos backups:")
|
| | for f, full, mtime in backups[:5]:
|
| | dt = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S")
|
| | st.caption(f"• {f} ({dt})")
|
| |
|
| |
|
| | registrar_log(st.session_state.get("usuario"), "MONITOR DB", "schema", None)
|
| |
|
| |
|