# db_monitor.py import streamlit as st import os import shutil import time from datetime import datetime, timedelta from sqlalchemy import text # ✅ Use sempre o engine do BANCO ATIVO (em vez de um engine fixo) from banco import get_engine, SessionLocal, db_info from utils_permissoes import verificar_permissao from utils_auditoria import registrar_log # =============================== # MONITOR & BACKUP DO BANCO # =============================== # Objetivo: # - Mostrar estatísticas do banco (tamanho, páginas, espaço em disco) # - Definir limiar/capacidade alvo e exibir ocupação (%) # - Planejar backup (frequência em dias) e retenção (N arquivos) # - Executar backup e limpar antigos com confirmação # - Acesso restrito por perfil admin # # Observações: # - Em SQLite: usa PRAGMA page_count/page_size + arquivo .db # - Em outros dialetos: exibe dialeto e recomenda backup externo # - Pasta padrão de backup: ./backups (pode alterar na UI) # - Auditoria: registrar_log(usuario, acao="BACKUP/CLEAN/MONITOR", tabela="schema") # (Opcional) rótulo amigável do ambiente atual (Produção/Teste/Treinamento) try: from db_router import current_db_choice, bank_label _HAS_ROUTER = True except Exception: _HAS_ROUTER = False def current_db_choice() -> str: return "prod" def bank_label(choice: str) -> str: return {"prod": "Banco 1 (Produção)", "test": "Banco 2 (Teste)", "treinamento": "Banco 3 (Treinamento)"}\ .get(choice, choice) # ------------------------- # Auxiliares de dialeto # ------------------------- def _engine(): """Retorna o engine do banco ATIVO (de acordo com a escolha no login).""" return get_engine() def _dialeto(): try: return _engine().url.get_backend_name() except Exception: return "desconhecido" def _sqlite_version(): if _dialeto() != "sqlite": return None try: with _engine().begin() as conn: return conn.execute(text("select sqlite_version()")).scalar() except Exception: return None # ------------------------- # Info do banco # ------------------------- def _db_file_path(): # Para SQLite, engine.url.database aponta para o arquivo .db try: eng = _engine() return eng.url.database if eng.url.get_backend_name() == "sqlite" else None except Exception: return None def _sqlite_stats(): # Retorna dict com stats do SQLite db_path = _db_file_path() if not db_path or not os.path.exists(db_path): return None size_bytes = os.path.getsize(db_path) dir_path = os.path.dirname(os.path.abspath(db_path)) or "." total, used, free = shutil.disk_usage(dir_path) with _engine().begin() as conn: page_count = conn.execute(text("PRAGMA page_count")).scalar() page_size = conn.execute(text("PRAGMA page_size")).scalar() return { "db_path": db_path, "size_bytes": size_bytes, "page_count": page_count, "page_size": page_size, "calc_bytes": (page_count or 0) * (page_size or 0), "disk_total": total, "disk_free": free, "disk_used": used, "sqlite_version": _sqlite_version(), } # ------------------------- # Backup # ------------------------- def _ensure_dir(path: str): os.makedirs(path, exist_ok=True) def _fmt_bytes(b: int) -> str: # Formata bytes em unidades legíveis for unit in ["B","KB","MB","GB","TB"]: if b < 1024.0: return f"{b:,.2f} {unit}".replace(",", ".") b /= 1024.0 return f"{b:,.2f} PB".replace(",", ".") def _listar_backups(backup_dir: str, base_name: str): """Lista backups para o banco atual. Formato: base_name-YYYYMMDD-HHMMSS.db (ou .zip futuramente)""" if not os.path.isdir(backup_dir): return [] files = [] for f in os.listdir(backup_dir): if f.startswith(base_name + "-") and f.endswith(".db"): full = os.path.join(backup_dir, f) files.append((f, full, os.path.getmtime(full))) return sorted(files, key=lambda x: x[2], reverse=True) # ordem decrescente def _executar_backup(backup_dir: str): """Copia o .db para backups/ com timestamp. Registra auditoria.""" db_path = _db_file_path() if not db_path or not os.path.exists(db_path): st.error("Arquivo de banco SQLite não encontrado.") return False _ensure_dir(backup_dir) base_name = os.path.splitext(os.path.basename(db_path))[0] stamp = datetime.now().strftime("%Y%m%d-%H%M%S") dest = os.path.join(backup_dir, f"{base_name}-{stamp}.db") try: shutil.copyfile(db_path, dest) registrar_log(st.session_state.get("usuario"), f"BACKUP criado: {os.path.basename(dest)}", "schema", None) st.success(f"✅ Backup criado: {dest}") return True except Exception as e: st.error(f"Erro ao criar backup: {e}") return False def _limpar_antigos(backup_dir: str, base_name: str, manter: int): """Remove backups antigos, mantendo N mais recentes. Registra auditoria.""" lst = _listar_backups(backup_dir, base_name) if len(lst) <= manter: st.info("Nada para remover: já dentro da retenção.") return 0 remover = lst[manter:] count = 0 for _, full, _ in remover: try: os.remove(full) count += 1 except Exception as e: st.error(f"Erro ao remover {full}: {e}") if count > 0: registrar_log(st.session_state.get("usuario"), f"CLEAN backups antigos: {count} removidos (retain={manter})", "schema", None) st.success(f"🧹 {count} backup(s) antigo(s) removido(s).") return count # ------------------------- # UI principal # ------------------------- def main(): st.title("🗄️ Monitor e Backup do Banco") # 🔐 Proteção: apenas admin if st.session_state.get("perfil") != "admin": st.error("⛔ Acesso restrito ao administrador.") return # Badge/URL do banco ativo (opcional) try: amb = current_db_choice() st.caption(f"🧭 Ambiente: {bank_label(amb)}") except Exception: pass try: info = db_info() st.caption(f"🔗 Engine URL: {info.get('url')}") except Exception: pass dial = _dialeto() st.caption(f"Dialeto do banco: **{dial}**") # Estatísticas stats = _sqlite_stats() if dial == "sqlite" else None # Se não for SQLite, exibe recomendações if dial != "sqlite": st.info("Este monitor está otimizado para SQLite. Para PostgreSQL/MySQL, configure backup via ferramenta da plataforma (pg_dump/mysqldump) e agendamento externo.") st.stop() if not stats: st.error("Banco SQLite não encontrado ou inacessível. Verifique o arquivo do banco ativo.") return # Painel de estatísticas st.subheader("📊 Estatísticas") colA, colB, colC = st.columns(3) with colA: st.metric("Arquivo", os.path.basename(stats["db_path"])) st.metric("Tamanho do banco (arquivo)", _fmt_bytes(stats["size_bytes"])) with colB: st.metric("Páginas (PRAGMA)", f'{stats["page_count"]} × {stats["page_size"]} B') st.metric("Cálculo (page_count×page_size)", _fmt_bytes(stats["calc_bytes"])) with colC: st.metric("Espaço livre no disco", _fmt_bytes(stats["disk_free"])) st.metric("SQLite version", stats["sqlite_version"] or "—") st.divider() # Capacidade alvo e ocupação st.subheader("🎯 Capacidade & Ocupação") capacidade_gb = st.number_input("Capacidade alvo (GB) — alerta quando ultrapassar", min_value=0.1, value=1.0, step=0.1) ocupacao_perc = min(100.0, (stats["size_bytes"] / (capacidade_gb * 1024**3)) * 100.0) if capacidade_gb > 0 else 0.0 st.progress(min(1.0, ocupacao_perc / 100.0)) st.caption(f"Ocupação estimada: **{ocupacao_perc:,.2f}%** de {capacidade_gb} GB") if ocupacao_perc >= 80.0: st.warning("⚠️ Ocupação acima de 80%. Considere backup/arquivamento.") st.divider() # Planejamento de backup st.subheader("🗓️ Planejamento de Backup") backup_dir = st.text_input("Pasta de backups", value="backups") _ensure_dir(backup_dir) # garante a pasta base_name = os.path.splitext(os.path.basename(stats["db_path"]))[0] backups = _listar_backups(backup_dir, base_name) # Último e próximo ultimo_backup_dt = datetime.fromtimestamp(backups[0][2]) if backups else None freq_dias = st.number_input("Frequência (dias)", min_value=1, value=7) retencao = st.number_input("Retenção máx. de backups (arquivos)", min_value=1, value=10) proximo_backup_dt = (ultimo_backup_dt + timedelta(days=freq_dias)) if ultimo_backup_dt else (datetime.now() + timedelta(days=freq_dias)) col1, col2, col3 = st.columns(3) with col1: st.metric("Último backup", ultimo_backup_dt.strftime("%d/%m/%Y %H:%M:%S") if ultimo_backup_dt else "—") with col2: st.metric("Próximo previsto", proximo_backup_dt.strftime("%d/%m/%Y %H:%M:%S")) with col3: st.metric("Backups atuais", len(backups)) # Aviso se vencido if ultimo_backup_dt and datetime.now() >= proximo_backup_dt: st.warning("⏰ Backup previsto já venceu. Execute agora para manter o plano.") # Ações st.subheader("⚙️ Ações") colX, colY, colZ = st.columns(3) with colX: if st.button("💾 Backup agora"): if _executar_backup(backup_dir): st.rerun() with colY: if st.button("🧹 Limpar antigos (manter retenção)"): _limpar_antigos(backup_dir, base_name, int(retencao)) st.rerun() with colZ: # Apenas mostra lista dos últimos backups if backups: st.write("Últimos backups:") for f, full, mtime in backups[:5]: dt = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S") st.caption(f"• {f} ({dt})") # Auditoria de visualização (opcional) registrar_log(st.session_state.get("usuario"), "MONITOR DB", "schema", None)