IOI-RUN / db_admin.py
Roudrigus's picture
Upload 82 files
0f0ef8d verified
# db_admin.py
import streamlit as st
import os
import shutil
from sqlalchemy import text
from banco import engine, SessionLocal
from utils_permissoes import verificar_permissao
from utils_auditoria import registrar_log
# =====================================================
# MÓDULO GERAL DE ADMINISTRAÇÃO DE BANCO (SCHEMA)
# =====================================================
# Objetivo:
# - Permitir adicionar, renomear, excluir e alterar tipo de colunas via UI
# - Funciona com SQLite, PostgreSQL e MySQL (com diferenças por dialeto)
# - Em SQLite, oferece reconstrução assistida quando DDL não é suportado
#
# Segurança e boas práticas:
# - Recomendado fazer backup antes de operações (botão disponível para SQLite)
# - Operações DDL são críticas: exigir confirmação explícita
# - Acesso restrito ao perfil "admin"
#
# Logs:
# - registrar_log(...) é chamado em todas as operações
# -------------------------
# Utilitário: Dialeto e versão
# -------------------------
def _dialeto():
try:
return engine.url.get_backend_name()
except Exception:
return "desconhecido"
def _sqlite_version():
if _dialeto() != "sqlite":
return None
try:
with engine.begin() as conn:
rv = conn.execute(text("select sqlite_version()")).scalar()
return rv
except Exception:
return None
# -------------------------
# Utilitário: Listar tabelas e colunas
# -------------------------
def _listar_tabelas():
d = _dialeto()
with engine.begin() as conn:
if d == "sqlite":
rows = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall()
return [r[0] for r in rows]
else:
q = text("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog','information_schema')
ORDER BY table_name
""")
rows = conn.execute(q).fetchall()
return [r[0] for r in rows]
def _listar_colunas(tabela: str):
d = _dialeto()
with engine.begin() as conn:
if d == "sqlite":
rows = conn.execute(text(f"PRAGMA table_info({tabela})")).fetchall()
# PRAGMA: (cid, name, type, notnull, dflt_value, pk)
return [{"name": r[1], "type": r[2], "notnull": bool(r[3]), "default": r[4], "pk": bool(r[5])} for r in rows]
else:
q = text("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = :tbl
ORDER BY ordinal_position
""")
rows = conn.execute(q, {"tbl": tabela}).fetchall()
return [{"name": r[0], "type": r[1], "notnull": (str(r[2]).upper() == "NO"), "default": r[3], "pk": False} for r in rows]
# -------------------------
# Backup rápido (SQLite)
# -------------------------
def _sqlite_backup():
if _dialeto() != "sqlite":
st.info("Backup automático só disponível para SQLite via cópia de arquivo.")
return
db_path = engine.url.database
if not db_path or not os.path.exists(db_path):
st.error("Arquivo de banco SQLite não encontrado.")
return
dest = db_path + ".bak"
shutil.copyfile(db_path, dest)
st.success(f"Backup criado: {dest}")
# -------------------------
# DDL: Gerar comandos por dialeto
# -------------------------
def _ddl_add_column_sql(tabela, col_nome, col_tipo, notnull=False, default=None):
d = _dialeto()
nn = "NOT NULL" if notnull else "NULL"
def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else ""
if d == "sqlite":
# SQLite aceita tipo textual; notnull e default são respeitados no schema
return f"ALTER TABLE {tabela} ADD COLUMN {col_nome} {col_tipo} {nn}{def_clause};"
elif d in ("postgresql", "postgres"):
base = f'ALTER TABLE "{tabela}" ADD COLUMN "{col_nome}" {col_tipo}'
if default is not None and str(default).strip() != "":
base += f" DEFAULT {default}"
if notnull:
base += " NOT NULL"
return base + ";"
elif d in ("mysql", "mariadb"):
base = f"ALTER TABLE `{tabela}` ADD COLUMN `{col_nome}` {col_tipo}"
if default is not None and str(default).strip() != "":
base += f" DEFAULT {default}"
base += " NOT NULL" if notnull else " NULL"
return base + ";"
return None
def _ddl_rename_column_sql(tabela, old, new):
d = _dialeto()
if d == "sqlite":
return f"ALTER TABLE {tabela} RENAME COLUMN {old} TO {new};"
elif d in ("postgresql", "postgres"):
return f'ALTER TABLE "{tabela}" RENAME COLUMN "{old}" TO "{new}";'
elif d in ("mysql", "mariadb"):
# MySQL requer tipo na renomeação; esta função não cobre tipo -> usar CHANGE COLUMN via UI de "Alterar tipo/renomear"
return None
return None
def _ddl_drop_column_sql(tabela, col):
d = _dialeto()
if d == "sqlite":
return f"ALTER TABLE {tabela} DROP COLUMN {col};"
elif d in ("postgresql", "postgres"):
return f'ALTER TABLE "{tabela}" DROP COLUMN "{col}";'
elif d in ("mysql", "mariadb"):
return f"ALTER TABLE `{tabela}` DROP COLUMN `{col}`;"
return None
def _ddl_alter_type_sql(tabela, col, new_type):
d = _dialeto()
if d == "sqlite":
# SQLite não altera type declarado via ALTER TYPE. Necessário reconstruir tabela.
return None
elif d in ("postgresql", "postgres"):
return f'ALTER TABLE "{tabela}" ALTER COLUMN "{col}" TYPE {new_type};'
elif d in ("mysql", "mariadb"):
return f"ALTER TABLE `{tabela}` MODIFY COLUMN `{col}` {new_type};"
return None
# -------------------------
# Reconstrução assistida (SQLite)
# -------------------------
def _sqlite_reconstruir_tabela(tabela, novas_colunas):
"""
Reconstrói tabela SQLite com "novas_colunas" (lista de dicts):
[{"name":..., "type":..., "notnull":bool, "default":..., "pk":bool}, ...]
- Cria tabela __tmp_<tabela> com o novo schema
- Copia dados das colunas compatíveis (mesmos nomes)
- Drop da tabela original e rename da temporária
"""
cols_def = []
copy_cols = []
pk_cols = [c["name"] for c in novas_colunas if c.get("pk")]
for c in novas_colunas:
nn = "NOT NULL" if c.get("notnull") else ""
default = c.get("default")
def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else ""
cols_def.append(f'{c["name"]} {c["type"]} {nn}{def_clause}'.strip())
copy_cols.append(c["name"])
pk_clause = f", PRIMARY KEY ({', '.join(pk_cols)})" if pk_cols else ""
create_sql = f"CREATE TABLE __tmp_{tabela} ({', '.join(cols_def)}{pk_clause});"
copy_sql = f"INSERT INTO __tmp_{tabela} ({', '.join(copy_cols)}) SELECT {', '.join(copy_cols)} FROM {tabela};"
drop_sql = f"DROP TABLE {tabela};"
rename_sql= f"ALTER TABLE __tmp_{tabela} RENAME TO {tabela};"
with engine.begin() as conn:
conn.execute(text(create_sql))
conn.execute(text(copy_sql))
conn.execute(text(drop_sql))
conn.execute(text(rename_sql))
# -------------------------
# UI principal (admin)
# -------------------------
def main():
st.title("🛠️ Administração de Banco (Schema)")
# 🔐 Proteção por perfil
if not verificar_permissao("db_admin") and st.session_state.get("perfil") != "admin":
st.error("⛔ Acesso não autorizado.")
return
# Info do banco
dial = _dialeto()
st.caption(f"Dialeto: **{dial}**")
ver = _sqlite_version()
if ver:
st.caption(f"SQLite version: **{ver}**")
# Backup (SQLite)
if dial == "sqlite":
if st.button("💾 Backup rápido (SQLite)"):
_sqlite_backup()
# Tabelas disponíveis
tabelas = _listar_tabelas()
if not tabelas:
st.warning("Nenhuma tabela encontrada.")
return
tabela = st.selectbox("Tabela alvo:", tabelas, index=0)
colunas = _listar_colunas(tabela)
st.divider()
st.subheader("📋 Colunas atuais")
st.write(pd.DataFrame(colunas)) if 'pd' in globals() else st.write(colunas) # mostra estrutura atual
st.divider()
tabs = st.tabs(["➕ Adicionar coluna", "✏️ Renomear coluna", "🗑️ Excluir coluna", "♻️ Alterar tipo"])
# ----------------- Adicionar coluna -----------------
with tabs[0]:
st.markdown("**Adicionar uma nova coluna à tabela selecionada**")
novo_nome = st.text_input("Nome da nova coluna")
novo_tipo = st.text_input("Tipo (ex.: TEXT, INTEGER, VARCHAR(255))")
novo_notnull = st.checkbox("NOT NULL", value=False)
novo_default = st.text_input("DEFAULT (opcional)")
confirmar_add = st.checkbox("Confirmo a adição desta coluna (DDL).")
if st.button("Executar ADD COLUMN", type="primary") and confirmar_add:
sql = _ddl_add_column_sql(tabela, novo_nome, novo_tipo, notnull=novo_notnull, default=novo_default)
if not sql:
st.error("Dialeto não suportado para ADD COLUMN.")
else:
try:
with engine.begin() as conn:
conn.execute(text(sql))
registrar_log(st.session_state.get("usuario"), f"ADD COLUMN {novo_nome} {novo_tipo} em {tabela}", "schema", None)
st.success("✅ Coluna adicionada com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro ao adicionar coluna: {e}")
# ----------------- Renomear coluna -----------------
with tabs[1]:
st.markdown("**Renomear uma coluna existente**")
col_nomes = [c["name"] for c in colunas]
antigo = st.selectbox("Coluna atual:", col_nomes) if col_nomes else ""
novo = st.text_input("Novo nome da coluna")
confirmar_ren = st.checkbox("Confirmo a renomeação desta coluna (DDL).")
if st.button("Executar RENAME COLUMN") and confirmar_ren:
d = _dialeto()
if d == "sqlite":
# Verifica suporte na versão
ver = _sqlite_version() or "0.0.0"
suportado = tuple(map(int, ver.split("."))) >= (3, 25, 0)
if suportado:
sql = _ddl_rename_column_sql(tabela, antigo, novo)
try:
with engine.begin() as conn:
conn.execute(text(sql))
registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}{novo} em {tabela}", "schema", None)
st.success("✅ Coluna renomeada com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro ao renomear: {e}")
else:
st.warning("SQLite < 3.25 não suporta RENAME COLUMN. Oferecendo reconstrução assistida.")
# Reconstrução: atualiza metadados e recria tabela
novas = []
for c in colunas:
nm = novo if c["name"] == antigo else c["name"]
novas.append({"name": nm, "type": c["type"], "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]})
try:
_sqlite_reconstruir_tabela(tabela, novas)
registrar_log(st.session_state.get("usuario"), f"RENAME (rebuild) {antigo}{novo} em {tabela}", "schema", None)
st.success("✅ Reconstrução concluída com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro na reconstrução: {e}")
elif d in ("postgresql", "postgres"):
sql = _ddl_rename_column_sql(tabela, antigo, novo)
if not sql:
st.error("Renomeação não suportada.")
else:
try:
with engine.begin() as conn:
conn.execute(text(sql))
registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}{novo} em {tabela}", "schema", None)
st.success("✅ Coluna renomeada com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro ao renomear: {e}")
elif d in ("mysql", "mariadb"):
st.info("MySQL/MariaDB exigem 'CHANGE COLUMN' informando o novo tipo; use a aba 'Alterar tipo' para renomear junto com tipo.")
# ----------------- Excluir coluna -----------------
with tabs[2]:
st.markdown("**Excluir uma coluna existente**")
col_nomes = [c["name"] for c in colunas]
col_drop = st.selectbox("Coluna a excluir:", col_nomes) if col_nomes else ""
confirmar_drop = st.checkbox("Confirmo a exclusão desta coluna (DDL) e entendo que é irreversível.")
if st.button("Executar DROP COLUMN", type="secondary") and confirmar_drop:
d = _dialeto()
if d == "sqlite":
ver = _sqlite_version() or "0.0.0"
suportado = tuple(map(int, ver.split("."))) >= (3, 35, 0)
if suportado:
sql = _ddl_drop_column_sql(tabela, col_drop)
try:
with engine.begin() as conn:
conn.execute(text(sql))
registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None)
st.success("✅ Coluna excluída com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro ao excluir: {e}")
else:
st.warning("SQLite < 3.35 não suporta DROP COLUMN. Oferecendo reconstrução assistida.")
novas = [c for c in colunas if c["name"] != col_drop]
try:
_sqlite_reconstruir_tabela(tabela, novas)
registrar_log(st.session_state.get("usuario"), f"DROP (rebuild) {col_drop} em {tabela}", "schema", None)
st.success("✅ Reconstrução concluída e coluna removida.")
st.rerun()
except Exception as e:
st.error(f"Erro na reconstrução: {e}")
elif d in ("postgresql", "postgres", "mysql", "mariadb"):
sql = _ddl_drop_column_sql(tabela, col_drop)
try:
with engine.begin() as conn:
conn.execute(text(sql))
registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None)
st.success("✅ Coluna excluída com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro ao excluir: {e}")
# ----------------- Alterar tipo -----------------
with tabs[3]:
st.markdown("**Alterar tipo declarado de uma coluna**")
col_nomes = [c["name"] for c in colunas]
alvo = st.selectbox("Coluna alvo:", col_nomes) if col_nomes else ""
novo_tipo = st.text_input("Novo tipo (ex.: TEXT, INTEGER, VARCHAR(255))")
confirmar_type = st.checkbox("Confirmo a alteração de tipo (DDL).")
if st.button("Executar ALTER TYPE") and confirmar_type:
d = _dialeto()
if d == "sqlite":
st.warning("SQLite não suporta ALTER TYPE direto; oferecemos reconstrução assistida.")
novas = []
for c in colunas:
typ = novo_tipo if c["name"] == alvo else c["type"]
novas.append({"name": c["name"], "type": typ, "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]})
try:
_sqlite_reconstruir_tabela(tabela, novas)
registrar_log(st.session_state.get("usuario"), f"ALTER TYPE (rebuild) {alvo}{novo_tipo} em {tabela}", "schema", None)
st.success("✅ Tipo alterado com sucesso via reconstrução.")
st.rerun()
except Exception as e:
st.error(f"Erro na reconstrução: {e}")
elif d in ("postgresql", "postgres", "mysql", "mariadb"):
sql = _ddl_alter_type_sql(tabela, alvo, novo_tipo)
if not sql:
st.error("Dialeto não suportado para ALTER TYPE.")
else:
try:
with engine.begin() as conn:
conn.execute(text(sql))
registrar_log(st.session_state.get("usuario"), f"ALTER TYPE {alvo}{novo_tipo} em {tabela}", "schema", None)
st.success("✅ Tipo alterado com sucesso.")
st.rerun()
except Exception as e:
st.error(f"Erro ao alterar tipo: {e}")