IOI-RUN / db_export_import.py
Roudrigus's picture
Upload 82 files
0f0ef8d verified
# -*- coding: utf-8 -*-
"""
db_export_import.py — Backup & Restore (Export/Import) do banco ATIVO (Produção/Teste)
Recursos:
• Exibe banco ativo (prod/test) e URL do engine
• Exporta todas as tabelas para:
- ZIP (CSV por tabela + manifest.json)
- Excel (.xlsx) (1 aba por tabela + manifest sheet)
• Importa (upload) de:
- ZIP (CSV por tabela)
- Excel (.xlsx)
• Modos de import: APPEND ou REPLACE (cuidado com FK)
• Snapshot físico para SQLite: cópia do arquivo (.db) — backup/restore rápido
Dependências:
- pandas, openpyxl, sqlalchemy, zipfile, io, json, datetime
"""
import os
import io
import json
import zipfile
from datetime import datetime
import streamlit as st
import pandas as pd
from sqlalchemy import inspect, text
from banco import get_engine, db_info, SessionLocal
from utils_auditoria import registrar_log
# Ambiente (prod/test) — se db_router não existir, fallback para 'prod'
try:
from db_router import current_db_choice
_HAS_ROUTER = True
except Exception:
_HAS_ROUTER = False
def current_db_choice() -> str:
return "prod"
# =========================
# Helpers: tabelas e I/O
# =========================
def list_tables(engine) -> list[str]:
"""Retorna nomes de todas as tabelas via SQLAlchemy inspection."""
inspector = inspect(engine)
return inspector.get_table_names()
def _read_table_df(engine, table_name: str) -> pd.DataFrame:
"""Lê toda a tabela como DataFrame."""
try:
# pandas + SQLAlchemy: lê tabela diretamente
return pd.read_sql_table(table_name, con=engine)
except Exception:
# fallback: SELECT com aspas (útil para SQLite com nomes case-sensitive)
return pd.read_sql(f'SELECT * FROM "{table_name}"', con=engine)
def _write_table_df(engine, table_name: str, df: pd.DataFrame, mode: str = "append"):
"""
Escreve DataFrame em tabela.
mode: "append" (adiciona) ou "replace" (sobrescreve todos os dados).
Observação: nos fluxos de import, quando 'replace' foi selecionado,
todas as tabelas são truncadas previamente, e aqui usamos 'append'.
"""
if mode not in ("append", "replace"):
mode = "append"
df.to_sql(table_name, con=engine, if_exists=mode, index=False)
# =========================
# Export: ZIP (CSV) & Excel
# =========================
def export_zip(engine, ambiente: str) -> bytes:
"""
Exporta todas as tabelas para um ZIP:
- 1 CSV por tabela (UTF-8-BOM)
- manifest.json com metadados (ambiente, timestamp, url, tabelas)
"""
tables = list_tables(engine)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
for t in tables:
df = _read_table_df(engine, t)
csv_bytes = df.to_csv(index=False, encoding="utf-8-sig").encode("utf-8-sig")
z.writestr(f"{t}.csv", csv_bytes)
manifest = {
"ambiente": ambiente,
"timestamp": datetime.now().isoformat(),
"engine_url": str(engine.url),
"tables": tables,
"format": "zip/csv",
"version": "1.0",
}
z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2))
buf.seek(0)
return buf.getvalue()
def export_excel(engine, ambiente: str) -> bytes:
"""
Exporta todas as tabelas para um Excel (.xlsx):
- 1 aba por tabela (limitada a 31 caracteres)
- "manifest" com metadados
"""
tables = list_tables(engine)
buf = io.BytesIO()
with pd.ExcelWriter(buf, engine="openpyxl") as writer:
# manifest
manifest = pd.DataFrame([{
"ambiente": ambiente,
"timestamp": datetime.now().isoformat(),
"engine_url": str(engine.url),
"tables": ", ".join(tables),
"format": "xlsx",
"version": "1.0",
}])
manifest.to_excel(writer, sheet_name="manifest", index=False)
# tabelas → 1 aba por tabela
for t in tables:
df = _read_table_df(engine, t)
sheet = t[:31] if len(t) > 31 else t
df.to_excel(writer, sheet_name=sheet, index=False)
buf.seek(0)
return buf.getvalue()
# =========================
# Import: ZIP (CSV) & Excel
# =========================
def import_zip(engine, file_bytes: bytes, mode: str = "append") -> dict:
"""
Importa dados de um ZIP (CSV por tabela).
mode: "append" ou "replace".
Retorna um relatório {table: {"rows": int, "mode": str}}.
"""
report = {}
zbuf = io.BytesIO(file_bytes)
with zipfile.ZipFile(zbuf, "r") as z:
# Se replace, limpar tabelas (cuidado com FK)
if mode == "replace":
_truncate_all(engine)
for name in z.namelist():
if not name.lower().endswith(".csv"):
continue
table = os.path.splitext(os.path.basename(name))[0]
csv_bytes = z.read(name)
df = pd.read_csv(io.BytesIO(csv_bytes), dtype=str) # dtype=str para evitar coercões agressivas
# Conversões leves de datetime (best-effort)
for col in df.columns:
if "data" in col.lower() or "date" in col.lower():
try:
df[col] = pd.to_datetime(df[col], errors="ignore")
except Exception:
pass
# replace já truncou; aqui fazemos append
_write_table_df(engine, table, df, mode="append")
report[table] = {"rows": int(len(df)), "mode": mode}
return report
def import_excel(engine, file_bytes: bytes, mode: str = "append") -> dict:
"""
Importa dados de um Excel (.xlsx) com múltiplas abas (1 por tabela).
mode: "append" ou "replace".
"""
report = {}
xbuf = io.BytesIO(file_bytes)
xls = pd.ExcelFile(xbuf, engine="openpyxl")
sheets = [s for s in xls.sheet_names if s.lower() != "manifest"]
if mode == "replace":
_truncate_all(engine)
for sheet in sheets:
df = xls.parse(sheet_name=sheet, dtype=str)
# best-effort para datas
for col in df.columns:
if "data" in col.lower() or "date" in col.lower():
try:
df[col] = pd.to_datetime(df[col], errors="ignore")
except Exception:
pass
table = sheet
_write_table_df(engine, table, df, mode="append")
report[table] = {"rows": int(len(df)), "mode": mode}
return report
# =========================
# Truncate (REPLACE mode)
# =========================
def _truncate_all(engine):
"""
Limpa todas as tabelas do banco ativo (cuidado!).
• Para SQLite: desabilita FK temporariamente, apaga, e reabilita.
• Para outros bancos: executa DELETE tabela; considere ordem por FK se necessário.
"""
insp = inspect(engine)
tables = insp.get_table_names()
with engine.begin() as conn:
url = str(engine.url)
is_sqlite = url.startswith("sqlite")
if is_sqlite:
conn.execute(text("PRAGMA foreign_keys=OFF"))
# Apaga conteúdo (sem considerar ordem de FK — OK para SQLite com FK OFF)
for t in tables:
conn.execute(text(f'DELETE FROM "{t}"'))
if is_sqlite:
conn.execute(text("PRAGMA foreign_keys=ON"))
# =========================
# Snapshot físico (SQLite)
# =========================
def snapshot_sqlite(engine, ambiente: str) -> bytes:
"""
Cria um snapshot (cópia física) do arquivo SQLite do banco ativo.
Retorna o conteúdo do arquivo para download.
"""
url = str(engine.url)
if not url.startswith("sqlite:///"):
raise RuntimeError("Snapshot físico disponível apenas para SQLite.")
db_path = url.replace("sqlite:///", "")
if not os.path.isfile(db_path):
raise FileNotFoundError(f"Arquivo SQLite não encontrado: {db_path}")
with open(db_path, "rb") as f:
data = f.read()
# auditoria
try:
registrar_log(usuario=st.session_state.get("usuario"),
acao=f"Snapshot SQLite ({ambiente})",
tabela="backup",
registro_id=None)
except Exception:
pass
return data
# =========================
# UI (Streamlit)
# =========================
def main():
st.title("🗄️ Backup & Restore | Export/Import de Banco")
# Banco ativo e info
ambiente = current_db_choice()
info = db_info()
st.caption(f"🧭 Ambiente: {'Produção' if ambiente == 'prod' else 'Teste'}")
st.caption(f"🔗 Engine URL: {info.get('url')}")
engine = get_engine()
st.divider()
st.subheader("⬇️ Exportar dados")
colA, colB, colC = st.columns(3)
with colA:
if st.button("Exportar ZIP (CSV por tabela)", type="primary"):
try:
zip_bytes = export_zip(engine, ambiente)
fname = f"backup_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
st.download_button("📥 Baixar ZIP", data=zip_bytes, file_name=fname, mime="application/zip")
registrar_log(usuario=st.session_state.get("usuario"),
acao=f"Export ZIP (ambiente={ambiente})",
tabela="backup", registro_id=None)
except Exception as e:
st.error(f"Falha ao exportar ZIP: {e}")
with colB:
if st.button("Exportar Excel (.xlsx)", type="primary"):
try:
xlsx_bytes = export_excel(engine, ambiente)
fname = f"backup_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
st.download_button("📥 Baixar Excel", data=xlsx_bytes, file_name=fname,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
registrar_log(usuario=st.session_state.get("usuario"),
acao=f"Export XLSX (ambiente={ambiente})",
tabela="backup", registro_id=None)
except Exception as e:
st.error(f"Falha ao exportar Excel: {e}")
with colC:
# Snapshot físico apenas para SQLite
url = str(engine.url)
if url.startswith("sqlite:///"):
if st.button("Snapshot físico (SQLite)", type="secondary"):
try:
snap_bytes = snapshot_sqlite(engine, ambiente)
fname = f"snapshot_{ambiente}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
st.download_button("📥 Baixar Snapshot (.db)", data=snap_bytes, file_name=fname, mime="application/octet-stream")
except Exception as e:
st.error(f"Falha ao criar snapshot: {e}")
else:
st.caption("ℹ️ Snapshot físico disponível apenas para SQLite.")
st.divider()
st.subheader("⬆️ Importar dados")
mode = st.radio("Modo de importação:", ["APPEND (adicionar)", "REPLACE (substituir tudo)"], horizontal=True)
mode_val = "append" if "APPEND" in mode else "replace"
up_col1, up_col2 = st.columns(2)
with up_col1:
zip_file = st.file_uploader("Upload ZIP (CSV por tabela)", type=["zip"])
if zip_file is not None and st.button("Importar do ZIP", type="primary"):
try:
report = import_zip(engine, zip_file.read(), mode=mode_val)
st.success(f"Import ZIP concluído ({mode_val}).")
st.json(report)
registrar_log(usuario=st.session_state.get("usuario"),
acao=f"Import ZIP ({mode_val}, ambiente={ambiente})",
tabela="restore", registro_id=None)
except Exception as e:
st.error(f"Falha ao importar ZIP: {e}")
with up_col2:
xls_file = st.file_uploader("Upload Excel (.xlsx)", type=["xlsx"])
if xls_file is not None and st.button("Importar do Excel", type="primary"):
try:
report = import_excel(engine, xls_file.read(), mode=mode_val)
st.success(f"Import Excel concluído ({mode_val}).")
st.json(report)
registrar_log(usuario=st.session_state.get("usuario"),
acao=f"Import XLSX ({mode_val}, ambiente={ambiente})",
tabela="restore", registro_id=None)
except Exception as e:
st.error(f"Falha ao importar Excel: {e}")
st.divider()
st.info("⚠️ Recomendações:\n"
"• Para restore completo com integridade referencial, prefira snapshot físico no SQLite, ou migrações controladas em bancos como Postgres/SQL Server.\n"
"• O modo REPLACE desabilita FK temporariamente no SQLite para permitir limpeza; use com cautela.\n"
"• Em produção, considere gerar backups com versionamento e retenção (ex.: timestamp no nome do arquivo).")
def render():
# compatível com seu roteador/menu
main()
if __name__ == "__main__":
st.set_page_config(page_title="Backup & Restore | ARM", layout="wide")
main()