Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """ | |
| db_export_import.py — Backup & Restore (Export/Import) do banco ATIVO (Produção/Teste) | |
| Recursos: | |
| • Exibe banco ativo (prod/test) e URL do engine | |
| • Exporta todas as tabelas para: | |
| - ZIP (CSV por tabela + manifest.json) | |
| - Excel (.xlsx) (1 aba por tabela + manifest sheet) | |
| • Importa (upload) de: | |
| - ZIP (CSV por tabela) | |
| - Excel (.xlsx) | |
| • Modos de import: APPEND ou REPLACE (cuidado com FK) | |
| • Snapshot físico para SQLite: cópia do arquivo (.db) — backup/restore rápido | |
| Dependências: | |
| - pandas, openpyxl, sqlalchemy, zipfile, io, json, datetime | |
| """ | |
| import os | |
| import io | |
| import json | |
| import zipfile | |
| from datetime import datetime | |
| import streamlit as st | |
| import pandas as pd | |
| from sqlalchemy import inspect, text | |
| from banco import get_engine, db_info, SessionLocal | |
| from utils_auditoria import registrar_log | |
| # Ambiente (prod/test) — se db_router não existir, fallback para 'prod' | |
| try: | |
| from db_router import current_db_choice | |
| _HAS_ROUTER = True | |
| except Exception: | |
| _HAS_ROUTER = False | |
| def current_db_choice() -> str: | |
| return "prod" | |
| # ========================= | |
| # Helpers: tabelas e I/O | |
| # ========================= | |
| def list_tables(engine) -> list[str]: | |
| """Retorna nomes de todas as tabelas via SQLAlchemy inspection.""" | |
| inspector = inspect(engine) | |
| return inspector.get_table_names() | |
| def _read_table_df(engine, table_name: str) -> pd.DataFrame: | |
| """Lê toda a tabela como DataFrame.""" | |
| try: | |
| # pandas + SQLAlchemy: lê tabela diretamente | |
| return pd.read_sql_table(table_name, con=engine) | |
| except Exception: | |
| # fallback: SELECT com aspas (útil para SQLite com nomes case-sensitive) | |
| return pd.read_sql(f'SELECT * FROM "{table_name}"', con=engine) | |
| def _write_table_df(engine, table_name: str, df: pd.DataFrame, mode: str = "append"): | |
| """ | |
| Escreve DataFrame em tabela. | |
| mode: "append" (adiciona) ou "replace" (sobrescreve todos os dados). | |
| Observação: nos fluxos de import, quando 'replace' foi selecionado, | |
| todas as tabelas são truncadas previamente, e aqui usamos 'append'. | |
| """ | |
| if mode not in ("append", "replace"): | |
| mode = "append" | |
| df.to_sql(table_name, con=engine, if_exists=mode, index=False) | |
| # ========================= | |
| # Export: ZIP (CSV) & Excel | |
| # ========================= | |
| def export_zip(engine, ambiente: str) -> bytes: | |
| """ | |
| Exporta todas as tabelas para um ZIP: | |
| - 1 CSV por tabela (UTF-8-BOM) | |
| - manifest.json com metadados (ambiente, timestamp, url, tabelas) | |
| """ | |
| tables = list_tables(engine) | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z: | |
| for t in tables: | |
| df = _read_table_df(engine, t) | |
| csv_bytes = df.to_csv(index=False, encoding="utf-8-sig").encode("utf-8-sig") | |
| z.writestr(f"{t}.csv", csv_bytes) | |
| manifest = { | |
| "ambiente": ambiente, | |
| "timestamp": datetime.now().isoformat(), | |
| "engine_url": str(engine.url), | |
| "tables": tables, | |
| "format": "zip/csv", | |
| "version": "1.0", | |
| } | |
| z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| def export_excel(engine, ambiente: str) -> bytes: | |
| """ | |
| Exporta todas as tabelas para um Excel (.xlsx): | |
| - 1 aba por tabela (limitada a 31 caracteres) | |
| - "manifest" com metadados | |
| """ | |
| tables = list_tables(engine) | |
| buf = io.BytesIO() | |
| with pd.ExcelWriter(buf, engine="openpyxl") as writer: | |
| # manifest | |
| manifest = pd.DataFrame([{ | |
| "ambiente": ambiente, | |
| "timestamp": datetime.now().isoformat(), | |
| "engine_url": str(engine.url), | |
| "tables": ", ".join(tables), | |
| "format": "xlsx", | |
| "version": "1.0", | |
| }]) | |
| manifest.to_excel(writer, sheet_name="manifest", index=False) | |
| # tabelas → 1 aba por tabela | |
| for t in tables: | |
| df = _read_table_df(engine, t) | |
| sheet = t[:31] if len(t) > 31 else t | |
| df.to_excel(writer, sheet_name=sheet, index=False) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| # ========================= | |
| # Import: ZIP (CSV) & Excel | |
| # ========================= | |
| def import_zip(engine, file_bytes: bytes, mode: str = "append") -> dict: | |
| """ | |
| Importa dados de um ZIP (CSV por tabela). | |
| mode: "append" ou "replace". | |
| Retorna um relatório {table: {"rows": int, "mode": str}}. | |
| """ | |
| report = {} | |
| zbuf = io.BytesIO(file_bytes) | |
| with zipfile.ZipFile(zbuf, "r") as z: | |
| # Se replace, limpar tabelas (cuidado com FK) | |
| if mode == "replace": | |
| _truncate_all(engine) | |
| for name in z.namelist(): | |
| if not name.lower().endswith(".csv"): | |
| continue | |
| table = os.path.splitext(os.path.basename(name))[0] | |
| csv_bytes = z.read(name) | |
| df = pd.read_csv(io.BytesIO(csv_bytes), dtype=str) # dtype=str para evitar coercões agressivas | |
| # Conversões leves de datetime (best-effort) | |
| for col in df.columns: | |
| if "data" in col.lower() or "date" in col.lower(): | |
| try: | |
| df[col] = pd.to_datetime(df[col], errors="ignore") | |
| except Exception: | |
| pass | |
| # replace já truncou; aqui fazemos append | |
| _write_table_df(engine, table, df, mode="append") | |
| report[table] = {"rows": int(len(df)), "mode": mode} | |
| return report | |
| def import_excel(engine, file_bytes: bytes, mode: str = "append") -> dict: | |
| """ | |
| Importa dados de um Excel (.xlsx) com múltiplas abas (1 por tabela). | |
| mode: "append" ou "replace". | |
| """ | |
| report = {} | |
| xbuf = io.BytesIO(file_bytes) | |
| xls = pd.ExcelFile(xbuf, engine="openpyxl") | |
| sheets = [s for s in xls.sheet_names if s.lower() != "manifest"] | |
| if mode == "replace": | |
| _truncate_all(engine) | |
| for sheet in sheets: | |
| df = xls.parse(sheet_name=sheet, dtype=str) | |
| # best-effort para datas | |
| for col in df.columns: | |
| if "data" in col.lower() or "date" in col.lower(): | |
| try: | |
| df[col] = pd.to_datetime(df[col], errors="ignore") | |
| except Exception: | |
| pass | |
| table = sheet | |
| _write_table_df(engine, table, df, mode="append") | |
| report[table] = {"rows": int(len(df)), "mode": mode} | |
| return report | |
| # ========================= | |
| # Truncate (REPLACE mode) | |
| # ========================= | |
| def _truncate_all(engine): | |
| """ | |
| Limpa todas as tabelas do banco ativo (cuidado!). | |
| • Para SQLite: desabilita FK temporariamente, apaga, e reabilita. | |
| • Para outros bancos: executa DELETE tabela; considere ordem por FK se necessário. | |
| """ | |
| insp = inspect(engine) | |
| tables = insp.get_table_names() | |
| with engine.begin() as conn: | |
| url = str(engine.url) | |
| is_sqlite = url.startswith("sqlite") | |
| if is_sqlite: | |
| conn.execute(text("PRAGMA foreign_keys=OFF")) | |
| # Apaga conteúdo (sem considerar ordem de FK — OK para SQLite com FK OFF) | |
| for t in tables: | |
| conn.execute(text(f'DELETE FROM "{t}"')) | |
| if is_sqlite: | |
| conn.execute(text("PRAGMA foreign_keys=ON")) | |
| # ========================= | |
| # Snapshot físico (SQLite) | |
| # ========================= | |
| def snapshot_sqlite(engine, ambiente: str) -> bytes: | |
| """ | |
| Cria um snapshot (cópia física) do arquivo SQLite do banco ativo. | |
| Retorna o conteúdo do arquivo para download. | |
| """ | |
| url = str(engine.url) | |
| if not url.startswith("sqlite:///"): | |
| raise RuntimeError("Snapshot físico disponível apenas para SQLite.") | |
| db_path = url.replace("sqlite:///", "") | |
| if not os.path.isfile(db_path): | |
| raise FileNotFoundError(f"Arquivo SQLite não encontrado: {db_path}") | |
| with open(db_path, "rb") as f: | |
| data = f.read() | |
| # auditoria | |
| try: | |
| registrar_log(usuario=st.session_state.get("usuario"), | |
| acao=f"Snapshot SQLite ({ambiente})", | |
| tabela="backup", | |
| registro_id=None) | |
| except Exception: | |
| pass | |
| return data | |
| # ========================= | |
| # UI (Streamlit) | |
| # ========================= | |
| def main(): | |
| st.title("🗄️ Backup & Restore | Export/Import de Banco") | |
| # Banco ativo e info | |
| ambiente = current_db_choice() | |
| info = db_info() | |
| st.caption(f"🧭 Ambiente: {'Produção' if ambiente == 'prod' else 'Teste'}") | |
| st.caption(f"🔗 Engine URL: {info.get('url')}") | |
| engine = get_engine() | |
| st.divider() | |
| st.subheader("⬇️ Exportar dados") | |
| colA, colB, colC = st.columns(3) | |
| with colA: | |
| if st.button("Exportar ZIP (CSV por tabela)", type="primary"): | |
| try: | |
| zip_bytes = export_zip(engine, ambiente) | |
| fname = f"backup_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" | |
| st.download_button("📥 Baixar ZIP", data=zip_bytes, file_name=fname, mime="application/zip") | |
| registrar_log(usuario=st.session_state.get("usuario"), | |
| acao=f"Export ZIP (ambiente={ambiente})", | |
| tabela="backup", registro_id=None) | |
| except Exception as e: | |
| st.error(f"Falha ao exportar ZIP: {e}") | |
| with colB: | |
| if st.button("Exportar Excel (.xlsx)", type="primary"): | |
| try: | |
| xlsx_bytes = export_excel(engine, ambiente) | |
| fname = f"backup_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" | |
| st.download_button("📥 Baixar Excel", data=xlsx_bytes, file_name=fname, | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |
| registrar_log(usuario=st.session_state.get("usuario"), | |
| acao=f"Export XLSX (ambiente={ambiente})", | |
| tabela="backup", registro_id=None) | |
| except Exception as e: | |
| st.error(f"Falha ao exportar Excel: {e}") | |
| with colC: | |
| # Snapshot físico apenas para SQLite | |
| url = str(engine.url) | |
| if url.startswith("sqlite:///"): | |
| if st.button("Snapshot físico (SQLite)", type="secondary"): | |
| try: | |
| snap_bytes = snapshot_sqlite(engine, ambiente) | |
| fname = f"snapshot_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" | |
| st.download_button("📥 Baixar Snapshot (.db)", data=snap_bytes, file_name=fname, mime="application/octet-stream") | |
| except Exception as e: | |
| st.error(f"Falha ao criar snapshot: {e}") | |
| else: | |
| st.caption("ℹ️ Snapshot físico disponível apenas para SQLite.") | |
| st.divider() | |
| st.subheader("⬆️ Importar dados") | |
| mode = st.radio("Modo de importação:", ["APPEND (adicionar)", "REPLACE (substituir tudo)"], horizontal=True) | |
| mode_val = "append" if "APPEND" in mode else "replace" | |
| up_col1, up_col2 = st.columns(2) | |
| with up_col1: | |
| zip_file = st.file_uploader("Upload ZIP (CSV por tabela)", type=["zip"]) | |
| if zip_file is not None and st.button("Importar do ZIP", type="primary"): | |
| try: | |
| report = import_zip(engine, zip_file.read(), mode=mode_val) | |
| st.success(f"Import ZIP concluído ({mode_val}).") | |
| st.json(report) | |
| registrar_log(usuario=st.session_state.get("usuario"), | |
| acao=f"Import ZIP ({mode_val}, ambiente={ambiente})", | |
| tabela="restore", registro_id=None) | |
| except Exception as e: | |
| st.error(f"Falha ao importar ZIP: {e}") | |
| with up_col2: | |
| xls_file = st.file_uploader("Upload Excel (.xlsx)", type=["xlsx"]) | |
| if xls_file is not None and st.button("Importar do Excel", type="primary"): | |
| try: | |
| report = import_excel(engine, xls_file.read(), mode=mode_val) | |
| st.success(f"Import Excel concluído ({mode_val}).") | |
| st.json(report) | |
| registrar_log(usuario=st.session_state.get("usuario"), | |
| acao=f"Import XLSX ({mode_val}, ambiente={ambiente})", | |
| tabela="restore", registro_id=None) | |
| except Exception as e: | |
| st.error(f"Falha ao importar Excel: {e}") | |
| st.divider() | |
| st.info("⚠️ Recomendações:\n" | |
| "• Para restore completo com integridade referencial, prefira snapshot físico no SQLite, ou migrações controladas em bancos como Postgres/SQL Server.\n" | |
| "• O modo REPLACE desabilita FK temporariamente no SQLite para permitir limpeza; use com cautela.\n" | |
| "• Em produção, considere gerar backups com versionamento e retenção (ex.: timestamp no nome do arquivo).") | |
| def render(): | |
| # compatível com seu roteador/menu | |
| main() | |
| if __name__ == "__main__": | |
| st.set_page_config(page_title="Backup & Restore | ARM", layout="wide") | |
| main() | |