|
|
|
|
|
|
|
|
import streamlit as st
|
|
|
import os
|
|
|
import shutil
|
|
|
from sqlalchemy import text
|
|
|
from banco import engine, SessionLocal
|
|
|
from utils_permissoes import verificar_permissao
|
|
|
from utils_auditoria import registrar_log
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _dialeto():
|
|
|
try:
|
|
|
return engine.url.get_backend_name()
|
|
|
except Exception:
|
|
|
return "desconhecido"
|
|
|
|
|
|
def _sqlite_version():
|
|
|
if _dialeto() != "sqlite":
|
|
|
return None
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
rv = conn.execute(text("select sqlite_version()")).scalar()
|
|
|
return rv
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _listar_tabelas():
|
|
|
d = _dialeto()
|
|
|
with engine.begin() as conn:
|
|
|
if d == "sqlite":
|
|
|
rows = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall()
|
|
|
return [r[0] for r in rows]
|
|
|
else:
|
|
|
q = text("""
|
|
|
SELECT table_name
|
|
|
FROM information_schema.tables
|
|
|
WHERE table_schema NOT IN ('pg_catalog','information_schema')
|
|
|
ORDER BY table_name
|
|
|
""")
|
|
|
rows = conn.execute(q).fetchall()
|
|
|
return [r[0] for r in rows]
|
|
|
|
|
|
def _listar_colunas(tabela: str):
|
|
|
d = _dialeto()
|
|
|
with engine.begin() as conn:
|
|
|
if d == "sqlite":
|
|
|
rows = conn.execute(text(f"PRAGMA table_info({tabela})")).fetchall()
|
|
|
|
|
|
return [{"name": r[1], "type": r[2], "notnull": bool(r[3]), "default": r[4], "pk": bool(r[5])} for r in rows]
|
|
|
else:
|
|
|
q = text("""
|
|
|
SELECT column_name, data_type, is_nullable, column_default
|
|
|
FROM information_schema.columns
|
|
|
WHERE table_name = :tbl
|
|
|
ORDER BY ordinal_position
|
|
|
""")
|
|
|
rows = conn.execute(q, {"tbl": tabela}).fetchall()
|
|
|
return [{"name": r[0], "type": r[1], "notnull": (str(r[2]).upper() == "NO"), "default": r[3], "pk": False} for r in rows]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _sqlite_backup():
|
|
|
if _dialeto() != "sqlite":
|
|
|
st.info("Backup automático só disponível para SQLite via cópia de arquivo.")
|
|
|
return
|
|
|
db_path = engine.url.database
|
|
|
if not db_path or not os.path.exists(db_path):
|
|
|
st.error("Arquivo de banco SQLite não encontrado.")
|
|
|
return
|
|
|
dest = db_path + ".bak"
|
|
|
shutil.copyfile(db_path, dest)
|
|
|
st.success(f"Backup criado: {dest}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ddl_add_column_sql(tabela, col_nome, col_tipo, notnull=False, default=None):
|
|
|
d = _dialeto()
|
|
|
nn = "NOT NULL" if notnull else "NULL"
|
|
|
def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else ""
|
|
|
if d == "sqlite":
|
|
|
|
|
|
return f"ALTER TABLE {tabela} ADD COLUMN {col_nome} {col_tipo} {nn}{def_clause};"
|
|
|
elif d in ("postgresql", "postgres"):
|
|
|
base = f'ALTER TABLE "{tabela}" ADD COLUMN "{col_nome}" {col_tipo}'
|
|
|
if default is not None and str(default).strip() != "":
|
|
|
base += f" DEFAULT {default}"
|
|
|
if notnull:
|
|
|
base += " NOT NULL"
|
|
|
return base + ";"
|
|
|
elif d in ("mysql", "mariadb"):
|
|
|
base = f"ALTER TABLE `{tabela}` ADD COLUMN `{col_nome}` {col_tipo}"
|
|
|
if default is not None and str(default).strip() != "":
|
|
|
base += f" DEFAULT {default}"
|
|
|
base += " NOT NULL" if notnull else " NULL"
|
|
|
return base + ";"
|
|
|
return None
|
|
|
|
|
|
def _ddl_rename_column_sql(tabela, old, new):
|
|
|
d = _dialeto()
|
|
|
if d == "sqlite":
|
|
|
return f"ALTER TABLE {tabela} RENAME COLUMN {old} TO {new};"
|
|
|
elif d in ("postgresql", "postgres"):
|
|
|
return f'ALTER TABLE "{tabela}" RENAME COLUMN "{old}" TO "{new}";'
|
|
|
elif d in ("mysql", "mariadb"):
|
|
|
|
|
|
return None
|
|
|
return None
|
|
|
|
|
|
def _ddl_drop_column_sql(tabela, col):
|
|
|
d = _dialeto()
|
|
|
if d == "sqlite":
|
|
|
return f"ALTER TABLE {tabela} DROP COLUMN {col};"
|
|
|
elif d in ("postgresql", "postgres"):
|
|
|
return f'ALTER TABLE "{tabela}" DROP COLUMN "{col}";'
|
|
|
elif d in ("mysql", "mariadb"):
|
|
|
return f"ALTER TABLE `{tabela}` DROP COLUMN `{col}`;"
|
|
|
return None
|
|
|
|
|
|
def _ddl_alter_type_sql(tabela, col, new_type):
|
|
|
d = _dialeto()
|
|
|
if d == "sqlite":
|
|
|
|
|
|
return None
|
|
|
elif d in ("postgresql", "postgres"):
|
|
|
return f'ALTER TABLE "{tabela}" ALTER COLUMN "{col}" TYPE {new_type};'
|
|
|
elif d in ("mysql", "mariadb"):
|
|
|
return f"ALTER TABLE `{tabela}` MODIFY COLUMN `{col}` {new_type};"
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _sqlite_reconstruir_tabela(tabela, novas_colunas):
|
|
|
"""
|
|
|
Reconstrói tabela SQLite com "novas_colunas" (lista de dicts):
|
|
|
[{"name":..., "type":..., "notnull":bool, "default":..., "pk":bool}, ...]
|
|
|
- Cria tabela __tmp_<tabela> com o novo schema
|
|
|
- Copia dados das colunas compatíveis (mesmos nomes)
|
|
|
- Drop da tabela original e rename da temporária
|
|
|
"""
|
|
|
cols_def = []
|
|
|
copy_cols = []
|
|
|
pk_cols = [c["name"] for c in novas_colunas if c.get("pk")]
|
|
|
for c in novas_colunas:
|
|
|
nn = "NOT NULL" if c.get("notnull") else ""
|
|
|
default = c.get("default")
|
|
|
def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else ""
|
|
|
cols_def.append(f'{c["name"]} {c["type"]} {nn}{def_clause}'.strip())
|
|
|
copy_cols.append(c["name"])
|
|
|
pk_clause = f", PRIMARY KEY ({', '.join(pk_cols)})" if pk_cols else ""
|
|
|
|
|
|
create_sql = f"CREATE TABLE __tmp_{tabela} ({', '.join(cols_def)}{pk_clause});"
|
|
|
copy_sql = f"INSERT INTO __tmp_{tabela} ({', '.join(copy_cols)}) SELECT {', '.join(copy_cols)} FROM {tabela};"
|
|
|
drop_sql = f"DROP TABLE {tabela};"
|
|
|
rename_sql= f"ALTER TABLE __tmp_{tabela} RENAME TO {tabela};"
|
|
|
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(create_sql))
|
|
|
conn.execute(text(copy_sql))
|
|
|
conn.execute(text(drop_sql))
|
|
|
conn.execute(text(rename_sql))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
st.title("🛠️ Administração de Banco (Schema)")
|
|
|
|
|
|
|
|
|
if not verificar_permissao("db_admin") and st.session_state.get("perfil") != "admin":
|
|
|
st.error("⛔ Acesso não autorizado.")
|
|
|
return
|
|
|
|
|
|
|
|
|
dial = _dialeto()
|
|
|
st.caption(f"Dialeto: **{dial}**")
|
|
|
ver = _sqlite_version()
|
|
|
if ver:
|
|
|
st.caption(f"SQLite version: **{ver}**")
|
|
|
|
|
|
|
|
|
if dial == "sqlite":
|
|
|
if st.button("💾 Backup rápido (SQLite)"):
|
|
|
_sqlite_backup()
|
|
|
|
|
|
|
|
|
tabelas = _listar_tabelas()
|
|
|
if not tabelas:
|
|
|
st.warning("Nenhuma tabela encontrada.")
|
|
|
return
|
|
|
|
|
|
tabela = st.selectbox("Tabela alvo:", tabelas, index=0)
|
|
|
colunas = _listar_colunas(tabela)
|
|
|
|
|
|
st.divider()
|
|
|
st.subheader("📋 Colunas atuais")
|
|
|
st.write(pd.DataFrame(colunas)) if 'pd' in globals() else st.write(colunas)
|
|
|
|
|
|
st.divider()
|
|
|
tabs = st.tabs(["➕ Adicionar coluna", "✏️ Renomear coluna", "🗑️ Excluir coluna", "♻️ Alterar tipo"])
|
|
|
|
|
|
|
|
|
with tabs[0]:
|
|
|
st.markdown("**Adicionar uma nova coluna à tabela selecionada**")
|
|
|
novo_nome = st.text_input("Nome da nova coluna")
|
|
|
novo_tipo = st.text_input("Tipo (ex.: TEXT, INTEGER, VARCHAR(255))")
|
|
|
novo_notnull = st.checkbox("NOT NULL", value=False)
|
|
|
novo_default = st.text_input("DEFAULT (opcional)")
|
|
|
|
|
|
confirmar_add = st.checkbox("Confirmo a adição desta coluna (DDL).")
|
|
|
if st.button("Executar ADD COLUMN", type="primary") and confirmar_add:
|
|
|
sql = _ddl_add_column_sql(tabela, novo_nome, novo_tipo, notnull=novo_notnull, default=novo_default)
|
|
|
if not sql:
|
|
|
st.error("Dialeto não suportado para ADD COLUMN.")
|
|
|
else:
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(sql))
|
|
|
registrar_log(st.session_state.get("usuario"), f"ADD COLUMN {novo_nome} {novo_tipo} em {tabela}", "schema", None)
|
|
|
st.success("✅ Coluna adicionada com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao adicionar coluna: {e}")
|
|
|
|
|
|
|
|
|
with tabs[1]:
|
|
|
st.markdown("**Renomear uma coluna existente**")
|
|
|
col_nomes = [c["name"] for c in colunas]
|
|
|
antigo = st.selectbox("Coluna atual:", col_nomes) if col_nomes else ""
|
|
|
novo = st.text_input("Novo nome da coluna")
|
|
|
|
|
|
confirmar_ren = st.checkbox("Confirmo a renomeação desta coluna (DDL).")
|
|
|
if st.button("Executar RENAME COLUMN") and confirmar_ren:
|
|
|
d = _dialeto()
|
|
|
if d == "sqlite":
|
|
|
|
|
|
ver = _sqlite_version() or "0.0.0"
|
|
|
suportado = tuple(map(int, ver.split("."))) >= (3, 25, 0)
|
|
|
if suportado:
|
|
|
sql = _ddl_rename_column_sql(tabela, antigo, novo)
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(sql))
|
|
|
registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}→{novo} em {tabela}", "schema", None)
|
|
|
st.success("✅ Coluna renomeada com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao renomear: {e}")
|
|
|
else:
|
|
|
st.warning("SQLite < 3.25 não suporta RENAME COLUMN. Oferecendo reconstrução assistida.")
|
|
|
|
|
|
novas = []
|
|
|
for c in colunas:
|
|
|
nm = novo if c["name"] == antigo else c["name"]
|
|
|
novas.append({"name": nm, "type": c["type"], "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]})
|
|
|
try:
|
|
|
_sqlite_reconstruir_tabela(tabela, novas)
|
|
|
registrar_log(st.session_state.get("usuario"), f"RENAME (rebuild) {antigo}→{novo} em {tabela}", "schema", None)
|
|
|
st.success("✅ Reconstrução concluída com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro na reconstrução: {e}")
|
|
|
elif d in ("postgresql", "postgres"):
|
|
|
sql = _ddl_rename_column_sql(tabela, antigo, novo)
|
|
|
if not sql:
|
|
|
st.error("Renomeação não suportada.")
|
|
|
else:
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(sql))
|
|
|
registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}→{novo} em {tabela}", "schema", None)
|
|
|
st.success("✅ Coluna renomeada com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao renomear: {e}")
|
|
|
elif d in ("mysql", "mariadb"):
|
|
|
st.info("MySQL/MariaDB exigem 'CHANGE COLUMN' informando o novo tipo; use a aba 'Alterar tipo' para renomear junto com tipo.")
|
|
|
|
|
|
|
|
|
with tabs[2]:
|
|
|
st.markdown("**Excluir uma coluna existente**")
|
|
|
col_nomes = [c["name"] for c in colunas]
|
|
|
col_drop = st.selectbox("Coluna a excluir:", col_nomes) if col_nomes else ""
|
|
|
|
|
|
confirmar_drop = st.checkbox("Confirmo a exclusão desta coluna (DDL) e entendo que é irreversível.")
|
|
|
if st.button("Executar DROP COLUMN", type="secondary") and confirmar_drop:
|
|
|
d = _dialeto()
|
|
|
if d == "sqlite":
|
|
|
ver = _sqlite_version() or "0.0.0"
|
|
|
suportado = tuple(map(int, ver.split("."))) >= (3, 35, 0)
|
|
|
if suportado:
|
|
|
sql = _ddl_drop_column_sql(tabela, col_drop)
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(sql))
|
|
|
registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None)
|
|
|
st.success("✅ Coluna excluída com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao excluir: {e}")
|
|
|
else:
|
|
|
st.warning("SQLite < 3.35 não suporta DROP COLUMN. Oferecendo reconstrução assistida.")
|
|
|
novas = [c for c in colunas if c["name"] != col_drop]
|
|
|
try:
|
|
|
_sqlite_reconstruir_tabela(tabela, novas)
|
|
|
registrar_log(st.session_state.get("usuario"), f"DROP (rebuild) {col_drop} em {tabela}", "schema", None)
|
|
|
st.success("✅ Reconstrução concluída e coluna removida.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro na reconstrução: {e}")
|
|
|
elif d in ("postgresql", "postgres", "mysql", "mariadb"):
|
|
|
sql = _ddl_drop_column_sql(tabela, col_drop)
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(sql))
|
|
|
registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None)
|
|
|
st.success("✅ Coluna excluída com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao excluir: {e}")
|
|
|
|
|
|
|
|
|
with tabs[3]:
|
|
|
st.markdown("**Alterar tipo declarado de uma coluna**")
|
|
|
col_nomes = [c["name"] for c in colunas]
|
|
|
alvo = st.selectbox("Coluna alvo:", col_nomes) if col_nomes else ""
|
|
|
novo_tipo = st.text_input("Novo tipo (ex.: TEXT, INTEGER, VARCHAR(255))")
|
|
|
|
|
|
confirmar_type = st.checkbox("Confirmo a alteração de tipo (DDL).")
|
|
|
if st.button("Executar ALTER TYPE") and confirmar_type:
|
|
|
d = _dialeto()
|
|
|
if d == "sqlite":
|
|
|
st.warning("SQLite não suporta ALTER TYPE direto; oferecemos reconstrução assistida.")
|
|
|
novas = []
|
|
|
for c in colunas:
|
|
|
typ = novo_tipo if c["name"] == alvo else c["type"]
|
|
|
novas.append({"name": c["name"], "type": typ, "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]})
|
|
|
try:
|
|
|
_sqlite_reconstruir_tabela(tabela, novas)
|
|
|
registrar_log(st.session_state.get("usuario"), f"ALTER TYPE (rebuild) {alvo}→{novo_tipo} em {tabela}", "schema", None)
|
|
|
st.success("✅ Tipo alterado com sucesso via reconstrução.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro na reconstrução: {e}")
|
|
|
elif d in ("postgresql", "postgres", "mysql", "mariadb"):
|
|
|
sql = _ddl_alter_type_sql(tabela, alvo, novo_tipo)
|
|
|
if not sql:
|
|
|
st.error("Dialeto não suportado para ALTER TYPE.")
|
|
|
else:
|
|
|
try:
|
|
|
with engine.begin() as conn:
|
|
|
conn.execute(text(sql))
|
|
|
registrar_log(st.session_state.get("usuario"), f"ALTER TYPE {alvo}→{novo_tipo} em {tabela}", "schema", None)
|
|
|
st.success("✅ Tipo alterado com sucesso.")
|
|
|
st.rerun()
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao alterar tipo: {e}")
|
|
|
|