Spaces:
Running
Running
| # db_monitor.py | |
| import streamlit as st | |
| import os | |
| import shutil | |
| import time | |
| from datetime import datetime, timedelta | |
| from sqlalchemy import text | |
| # ✅ Use sempre o engine do BANCO ATIVO (em vez de um engine fixo) | |
| from banco import get_engine, SessionLocal, db_info | |
| from utils_permissoes import verificar_permissao | |
| from utils_auditoria import registrar_log | |
| # =============================== | |
| # MONITOR & BACKUP DO BANCO | |
| # =============================== | |
| # Objetivo: | |
| # - Mostrar estatísticas do banco (tamanho, páginas, espaço em disco) | |
| # - Definir limiar/capacidade alvo e exibir ocupação (%) | |
| # - Planejar backup (frequência em dias) e retenção (N arquivos) | |
| # - Executar backup e limpar antigos com confirmação | |
| # - Acesso restrito por perfil admin | |
| # | |
| # Observações: | |
| # - Em SQLite: usa PRAGMA page_count/page_size + arquivo .db | |
| # - Em outros dialetos: exibe dialeto e recomenda backup externo | |
| # - Pasta padrão de backup: ./backups (pode alterar na UI) | |
| # - Auditoria: registrar_log(usuario, acao="BACKUP/CLEAN/MONITOR", tabela="schema") | |
| # (Opcional) rótulo amigável do ambiente atual (Produção/Teste/Treinamento) | |
| try: | |
| from db_router import current_db_choice, bank_label | |
| _HAS_ROUTER = True | |
| except Exception: | |
| _HAS_ROUTER = False | |
| def current_db_choice() -> str: | |
| return "prod" | |
| def bank_label(choice: str) -> str: | |
| return {"prod": "Banco 1 (Produção)", "test": "Banco 2 (Teste)", "treinamento": "Banco 3 (Treinamento)"}\ | |
| .get(choice, choice) | |
| # ------------------------- | |
| # Auxiliares de dialeto | |
| # ------------------------- | |
| def _engine(): | |
| """Retorna o engine do banco ATIVO (de acordo com a escolha no login).""" | |
| return get_engine() | |
| def _dialeto(): | |
| try: | |
| return _engine().url.get_backend_name() | |
| except Exception: | |
| return "desconhecido" | |
| def _sqlite_version(): | |
| if _dialeto() != "sqlite": | |
| return None | |
| try: | |
| with _engine().begin() as conn: | |
| return conn.execute(text("select sqlite_version()")).scalar() | |
| except Exception: | |
| return None | |
| # ------------------------- | |
| # Info do banco | |
| # ------------------------- | |
| def _db_file_path(): | |
| # Para SQLite, engine.url.database aponta para o arquivo .db | |
| try: | |
| eng = _engine() | |
| return eng.url.database if eng.url.get_backend_name() == "sqlite" else None | |
| except Exception: | |
| return None | |
| def _sqlite_stats(): | |
| # Retorna dict com stats do SQLite | |
| db_path = _db_file_path() | |
| if not db_path or not os.path.exists(db_path): | |
| return None | |
| size_bytes = os.path.getsize(db_path) | |
| dir_path = os.path.dirname(os.path.abspath(db_path)) or "." | |
| total, used, free = shutil.disk_usage(dir_path) | |
| with _engine().begin() as conn: | |
| page_count = conn.execute(text("PRAGMA page_count")).scalar() | |
| page_size = conn.execute(text("PRAGMA page_size")).scalar() | |
| return { | |
| "db_path": db_path, | |
| "size_bytes": size_bytes, | |
| "page_count": page_count, | |
| "page_size": page_size, | |
| "calc_bytes": (page_count or 0) * (page_size or 0), | |
| "disk_total": total, | |
| "disk_free": free, | |
| "disk_used": used, | |
| "sqlite_version": _sqlite_version(), | |
| } | |
| # ------------------------- | |
| # Backup | |
| # ------------------------- | |
| def _ensure_dir(path: str): | |
| os.makedirs(path, exist_ok=True) | |
| def _fmt_bytes(b: int) -> str: | |
| # Formata bytes em unidades legíveis | |
| for unit in ["B","KB","MB","GB","TB"]: | |
| if b < 1024.0: | |
| return f"{b:,.2f} {unit}".replace(",", ".") | |
| b /= 1024.0 | |
| return f"{b:,.2f} PB".replace(",", ".") | |
| def _listar_backups(backup_dir: str, base_name: str): | |
| """Lista backups para o banco atual. Formato: base_name-YYYYMMDD-HHMMSS.db (ou .zip futuramente)""" | |
| if not os.path.isdir(backup_dir): | |
| return [] | |
| files = [] | |
| for f in os.listdir(backup_dir): | |
| if f.startswith(base_name + "-") and f.endswith(".db"): | |
| full = os.path.join(backup_dir, f) | |
| files.append((f, full, os.path.getmtime(full))) | |
| return sorted(files, key=lambda x: x[2], reverse=True) # ordem decrescente | |
| def _executar_backup(backup_dir: str): | |
| """Copia o .db para backups/ com timestamp. Registra auditoria.""" | |
| db_path = _db_file_path() | |
| if not db_path or not os.path.exists(db_path): | |
| st.error("Arquivo de banco SQLite não encontrado.") | |
| return False | |
| _ensure_dir(backup_dir) | |
| base_name = os.path.splitext(os.path.basename(db_path))[0] | |
| stamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| dest = os.path.join(backup_dir, f"{base_name}-{stamp}.db") | |
| try: | |
| shutil.copyfile(db_path, dest) | |
| registrar_log(st.session_state.get("usuario"), f"BACKUP criado: {os.path.basename(dest)}", "schema", None) | |
| st.success(f"✅ Backup criado: {dest}") | |
| return True | |
| except Exception as e: | |
| st.error(f"Erro ao criar backup: {e}") | |
| return False | |
| def _limpar_antigos(backup_dir: str, base_name: str, manter: int): | |
| """Remove backups antigos, mantendo N mais recentes. Registra auditoria.""" | |
| lst = _listar_backups(backup_dir, base_name) | |
| if len(lst) <= manter: | |
| st.info("Nada para remover: já dentro da retenção.") | |
| return 0 | |
| remover = lst[manter:] | |
| count = 0 | |
| for _, full, _ in remover: | |
| try: | |
| os.remove(full) | |
| count += 1 | |
| except Exception as e: | |
| st.error(f"Erro ao remover {full}: {e}") | |
| if count > 0: | |
| registrar_log(st.session_state.get("usuario"), f"CLEAN backups antigos: {count} removidos (retain={manter})", "schema", None) | |
| st.success(f"🧹 {count} backup(s) antigo(s) removido(s).") | |
| return count | |
| # ------------------------- | |
| # UI principal | |
| # ------------------------- | |
| def main(): | |
| st.title("🗄️ Monitor e Backup do Banco") | |
| # 🔐 Proteção: apenas admin | |
| if st.session_state.get("perfil") != "admin": | |
| st.error("⛔ Acesso restrito ao administrador.") | |
| return | |
| # Badge/URL do banco ativo (opcional) | |
| try: | |
| amb = current_db_choice() | |
| st.caption(f"🧭 Ambiente: {bank_label(amb)}") | |
| except Exception: | |
| pass | |
| try: | |
| info = db_info() | |
| st.caption(f"🔗 Engine URL: {info.get('url')}") | |
| except Exception: | |
| pass | |
| dial = _dialeto() | |
| st.caption(f"Dialeto do banco: **{dial}**") | |
| # Estatísticas | |
| stats = _sqlite_stats() if dial == "sqlite" else None | |
| # Se não for SQLite, exibe recomendações | |
| if dial != "sqlite": | |
| st.info("Este monitor está otimizado para SQLite. Para PostgreSQL/MySQL, configure backup via ferramenta da plataforma (pg_dump/mysqldump) e agendamento externo.") | |
| st.stop() | |
| if not stats: | |
| st.error("Banco SQLite não encontrado ou inacessível. Verifique o arquivo do banco ativo.") | |
| return | |
| # Painel de estatísticas | |
| st.subheader("📊 Estatísticas") | |
| colA, colB, colC = st.columns(3) | |
| with colA: | |
| st.metric("Arquivo", os.path.basename(stats["db_path"])) | |
| st.metric("Tamanho do banco (arquivo)", _fmt_bytes(stats["size_bytes"])) | |
| with colB: | |
| st.metric("Páginas (PRAGMA)", f'{stats["page_count"]} × {stats["page_size"]} B') | |
| st.metric("Cálculo (page_count×page_size)", _fmt_bytes(stats["calc_bytes"])) | |
| with colC: | |
| st.metric("Espaço livre no disco", _fmt_bytes(stats["disk_free"])) | |
| st.metric("SQLite version", stats["sqlite_version"] or "—") | |
| st.divider() | |
| # Capacidade alvo e ocupação | |
| st.subheader("🎯 Capacidade & Ocupação") | |
| capacidade_gb = st.number_input("Capacidade alvo (GB) — alerta quando ultrapassar", min_value=0.1, value=1.0, step=0.1) | |
| ocupacao_perc = min(100.0, (stats["size_bytes"] / (capacidade_gb * 1024**3)) * 100.0) if capacidade_gb > 0 else 0.0 | |
| st.progress(min(1.0, ocupacao_perc / 100.0)) | |
| st.caption(f"Ocupação estimada: **{ocupacao_perc:,.2f}%** de {capacidade_gb} GB") | |
| if ocupacao_perc >= 80.0: | |
| st.warning("⚠️ Ocupação acima de 80%. Considere backup/arquivamento.") | |
| st.divider() | |
| # Planejamento de backup | |
| st.subheader("🗓️ Planejamento de Backup") | |
| backup_dir = st.text_input("Pasta de backups", value="backups") | |
| _ensure_dir(backup_dir) # garante a pasta | |
| base_name = os.path.splitext(os.path.basename(stats["db_path"]))[0] | |
| backups = _listar_backups(backup_dir, base_name) | |
| # Último e próximo | |
| ultimo_backup_dt = datetime.fromtimestamp(backups[0][2]) if backups else None | |
| freq_dias = st.number_input("Frequência (dias)", min_value=1, value=7) | |
| retencao = st.number_input("Retenção máx. de backups (arquivos)", min_value=1, value=10) | |
| proximo_backup_dt = (ultimo_backup_dt + timedelta(days=freq_dias)) if ultimo_backup_dt else (datetime.now() + timedelta(days=freq_dias)) | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Último backup", ultimo_backup_dt.strftime("%d/%m/%Y %H:%M:%S") if ultimo_backup_dt else "—") | |
| with col2: | |
| st.metric("Próximo previsto", proximo_backup_dt.strftime("%d/%m/%Y %H:%M:%S")) | |
| with col3: | |
| st.metric("Backups atuais", len(backups)) | |
| # Aviso se vencido | |
| if ultimo_backup_dt and datetime.now() >= proximo_backup_dt: | |
| st.warning("⏰ Backup previsto já venceu. Execute agora para manter o plano.") | |
| # Ações | |
| st.subheader("⚙️ Ações") | |
| colX, colY, colZ = st.columns(3) | |
| with colX: | |
| if st.button("💾 Backup agora"): | |
| if _executar_backup(backup_dir): | |
| st.rerun() | |
| with colY: | |
| if st.button("🧹 Limpar antigos (manter retenção)"): | |
| _limpar_antigos(backup_dir, base_name, int(retencao)) | |
| st.rerun() | |
| with colZ: | |
| # Apenas mostra lista dos últimos backups | |
| if backups: | |
| st.write("Últimos backups:") | |
| for f, full, mtime in backups[:5]: | |
| dt = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S") | |
| st.caption(f"• {f} ({dt})") | |
| # Auditoria de visualização (opcional) | |
| registrar_log(st.session_state.get("usuario"), "MONITOR DB", "schema", None) | |