Spaces:
Running
Running
| # db_admin.py | |
| import streamlit as st | |
| import os | |
| import shutil | |
| from sqlalchemy import text | |
| from banco import engine, SessionLocal | |
| from utils_permissoes import verificar_permissao | |
| from utils_auditoria import registrar_log | |
| # ===================================================== | |
| # MÓDULO GERAL DE ADMINISTRAÇÃO DE BANCO (SCHEMA) | |
| # ===================================================== | |
| # Objetivo: | |
| # - Permitir adicionar, renomear, excluir e alterar tipo de colunas via UI | |
| # - Funciona com SQLite, PostgreSQL e MySQL (com diferenças por dialeto) | |
| # - Em SQLite, oferece reconstrução assistida quando DDL não é suportado | |
| # | |
| # Segurança e boas práticas: | |
| # - Recomendado fazer backup antes de operações (botão disponível para SQLite) | |
| # - Operações DDL são críticas: exigir confirmação explícita | |
| # - Acesso restrito ao perfil "admin" | |
| # | |
| # Logs: | |
| # - registrar_log(...) é chamado em todas as operações | |
| # ------------------------- | |
| # Utilitário: Dialeto e versão | |
| # ------------------------- | |
| def _dialeto(): | |
| try: | |
| return engine.url.get_backend_name() | |
| except Exception: | |
| return "desconhecido" | |
| def _sqlite_version(): | |
| if _dialeto() != "sqlite": | |
| return None | |
| try: | |
| with engine.begin() as conn: | |
| rv = conn.execute(text("select sqlite_version()")).scalar() | |
| return rv | |
| except Exception: | |
| return None | |
| # ------------------------- | |
| # Utilitário: Listar tabelas e colunas | |
| # ------------------------- | |
| def _listar_tabelas(): | |
| d = _dialeto() | |
| with engine.begin() as conn: | |
| if d == "sqlite": | |
| rows = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall() | |
| return [r[0] for r in rows] | |
| else: | |
| q = text(""" | |
| SELECT table_name | |
| FROM information_schema.tables | |
| WHERE table_schema NOT IN ('pg_catalog','information_schema') | |
| ORDER BY table_name | |
| """) | |
| rows = conn.execute(q).fetchall() | |
| return [r[0] for r in rows] | |
| def _listar_colunas(tabela: str): | |
| d = _dialeto() | |
| with engine.begin() as conn: | |
| if d == "sqlite": | |
| rows = conn.execute(text(f"PRAGMA table_info({tabela})")).fetchall() | |
| # PRAGMA: (cid, name, type, notnull, dflt_value, pk) | |
| return [{"name": r[1], "type": r[2], "notnull": bool(r[3]), "default": r[4], "pk": bool(r[5])} for r in rows] | |
| else: | |
| q = text(""" | |
| SELECT column_name, data_type, is_nullable, column_default | |
| FROM information_schema.columns | |
| WHERE table_name = :tbl | |
| ORDER BY ordinal_position | |
| """) | |
| rows = conn.execute(q, {"tbl": tabela}).fetchall() | |
| return [{"name": r[0], "type": r[1], "notnull": (str(r[2]).upper() == "NO"), "default": r[3], "pk": False} for r in rows] | |
| # ------------------------- | |
| # Backup rápido (SQLite) | |
| # ------------------------- | |
| def _sqlite_backup(): | |
| if _dialeto() != "sqlite": | |
| st.info("Backup automático só disponível para SQLite via cópia de arquivo.") | |
| return | |
| db_path = engine.url.database | |
| if not db_path or not os.path.exists(db_path): | |
| st.error("Arquivo de banco SQLite não encontrado.") | |
| return | |
| dest = db_path + ".bak" | |
| shutil.copyfile(db_path, dest) | |
| st.success(f"Backup criado: {dest}") | |
| # ------------------------- | |
| # DDL: Gerar comandos por dialeto | |
| # ------------------------- | |
| def _ddl_add_column_sql(tabela, col_nome, col_tipo, notnull=False, default=None): | |
| d = _dialeto() | |
| nn = "NOT NULL" if notnull else "NULL" | |
| def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else "" | |
| if d == "sqlite": | |
| # SQLite aceita tipo textual; notnull e default são respeitados no schema | |
| return f"ALTER TABLE {tabela} ADD COLUMN {col_nome} {col_tipo} {nn}{def_clause};" | |
| elif d in ("postgresql", "postgres"): | |
| base = f'ALTER TABLE "{tabela}" ADD COLUMN "{col_nome}" {col_tipo}' | |
| if default is not None and str(default).strip() != "": | |
| base += f" DEFAULT {default}" | |
| if notnull: | |
| base += " NOT NULL" | |
| return base + ";" | |
| elif d in ("mysql", "mariadb"): | |
| base = f"ALTER TABLE `{tabela}` ADD COLUMN `{col_nome}` {col_tipo}" | |
| if default is not None and str(default).strip() != "": | |
| base += f" DEFAULT {default}" | |
| base += " NOT NULL" if notnull else " NULL" | |
| return base + ";" | |
| return None | |
| def _ddl_rename_column_sql(tabela, old, new): | |
| d = _dialeto() | |
| if d == "sqlite": | |
| return f"ALTER TABLE {tabela} RENAME COLUMN {old} TO {new};" | |
| elif d in ("postgresql", "postgres"): | |
| return f'ALTER TABLE "{tabela}" RENAME COLUMN "{old}" TO "{new}";' | |
| elif d in ("mysql", "mariadb"): | |
| # MySQL requer tipo na renomeação; esta função não cobre tipo -> usar CHANGE COLUMN via UI de "Alterar tipo/renomear" | |
| return None | |
| return None | |
| def _ddl_drop_column_sql(tabela, col): | |
| d = _dialeto() | |
| if d == "sqlite": | |
| return f"ALTER TABLE {tabela} DROP COLUMN {col};" | |
| elif d in ("postgresql", "postgres"): | |
| return f'ALTER TABLE "{tabela}" DROP COLUMN "{col}";' | |
| elif d in ("mysql", "mariadb"): | |
| return f"ALTER TABLE `{tabela}` DROP COLUMN `{col}`;" | |
| return None | |
| def _ddl_alter_type_sql(tabela, col, new_type): | |
| d = _dialeto() | |
| if d == "sqlite": | |
| # SQLite não altera type declarado via ALTER TYPE. Necessário reconstruir tabela. | |
| return None | |
| elif d in ("postgresql", "postgres"): | |
| return f'ALTER TABLE "{tabela}" ALTER COLUMN "{col}" TYPE {new_type};' | |
| elif d in ("mysql", "mariadb"): | |
| return f"ALTER TABLE `{tabela}` MODIFY COLUMN `{col}` {new_type};" | |
| return None | |
| # ------------------------- | |
| # Reconstrução assistida (SQLite) | |
| # ------------------------- | |
| def _sqlite_reconstruir_tabela(tabela, novas_colunas): | |
| """ | |
| Reconstrói tabela SQLite com "novas_colunas" (lista de dicts): | |
| [{"name":..., "type":..., "notnull":bool, "default":..., "pk":bool}, ...] | |
| - Cria tabela __tmp_<tabela> com o novo schema | |
| - Copia dados das colunas compatíveis (mesmos nomes) | |
| - Drop da tabela original e rename da temporária | |
| """ | |
| cols_def = [] | |
| copy_cols = [] | |
| pk_cols = [c["name"] for c in novas_colunas if c.get("pk")] | |
| for c in novas_colunas: | |
| nn = "NOT NULL" if c.get("notnull") else "" | |
| default = c.get("default") | |
| def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else "" | |
| cols_def.append(f'{c["name"]} {c["type"]} {nn}{def_clause}'.strip()) | |
| copy_cols.append(c["name"]) | |
| pk_clause = f", PRIMARY KEY ({', '.join(pk_cols)})" if pk_cols else "" | |
| create_sql = f"CREATE TABLE __tmp_{tabela} ({', '.join(cols_def)}{pk_clause});" | |
| copy_sql = f"INSERT INTO __tmp_{tabela} ({', '.join(copy_cols)}) SELECT {', '.join(copy_cols)} FROM {tabela};" | |
| drop_sql = f"DROP TABLE {tabela};" | |
| rename_sql= f"ALTER TABLE __tmp_{tabela} RENAME TO {tabela};" | |
| with engine.begin() as conn: | |
| conn.execute(text(create_sql)) | |
| conn.execute(text(copy_sql)) | |
| conn.execute(text(drop_sql)) | |
| conn.execute(text(rename_sql)) | |
| # ------------------------- | |
| # UI principal (admin) | |
| # ------------------------- | |
| def main(): | |
| st.title("🛠️ Administração de Banco (Schema)") | |
| # 🔐 Proteção por perfil | |
| if not verificar_permissao("db_admin") and st.session_state.get("perfil") != "admin": | |
| st.error("⛔ Acesso não autorizado.") | |
| return | |
| # Info do banco | |
| dial = _dialeto() | |
| st.caption(f"Dialeto: **{dial}**") | |
| ver = _sqlite_version() | |
| if ver: | |
| st.caption(f"SQLite version: **{ver}**") | |
| # Backup (SQLite) | |
| if dial == "sqlite": | |
| if st.button("💾 Backup rápido (SQLite)"): | |
| _sqlite_backup() | |
| # Tabelas disponíveis | |
| tabelas = _listar_tabelas() | |
| if not tabelas: | |
| st.warning("Nenhuma tabela encontrada.") | |
| return | |
| tabela = st.selectbox("Tabela alvo:", tabelas, index=0) | |
| colunas = _listar_colunas(tabela) | |
| st.divider() | |
| st.subheader("📋 Colunas atuais") | |
| st.write(pd.DataFrame(colunas)) if 'pd' in globals() else st.write(colunas) # mostra estrutura atual | |
| st.divider() | |
| tabs = st.tabs(["➕ Adicionar coluna", "✏️ Renomear coluna", "🗑️ Excluir coluna", "♻️ Alterar tipo"]) | |
| # ----------------- Adicionar coluna ----------------- | |
| with tabs[0]: | |
| st.markdown("**Adicionar uma nova coluna à tabela selecionada**") | |
| novo_nome = st.text_input("Nome da nova coluna") | |
| novo_tipo = st.text_input("Tipo (ex.: TEXT, INTEGER, VARCHAR(255))") | |
| novo_notnull = st.checkbox("NOT NULL", value=False) | |
| novo_default = st.text_input("DEFAULT (opcional)") | |
| confirmar_add = st.checkbox("Confirmo a adição desta coluna (DDL).") | |
| if st.button("Executar ADD COLUMN", type="primary") and confirmar_add: | |
| sql = _ddl_add_column_sql(tabela, novo_nome, novo_tipo, notnull=novo_notnull, default=novo_default) | |
| if not sql: | |
| st.error("Dialeto não suportado para ADD COLUMN.") | |
| else: | |
| try: | |
| with engine.begin() as conn: | |
| conn.execute(text(sql)) | |
| registrar_log(st.session_state.get("usuario"), f"ADD COLUMN {novo_nome} {novo_tipo} em {tabela}", "schema", None) | |
| st.success("✅ Coluna adicionada com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro ao adicionar coluna: {e}") | |
| # ----------------- Renomear coluna ----------------- | |
| with tabs[1]: | |
| st.markdown("**Renomear uma coluna existente**") | |
| col_nomes = [c["name"] for c in colunas] | |
| antigo = st.selectbox("Coluna atual:", col_nomes) if col_nomes else "" | |
| novo = st.text_input("Novo nome da coluna") | |
| confirmar_ren = st.checkbox("Confirmo a renomeação desta coluna (DDL).") | |
| if st.button("Executar RENAME COLUMN") and confirmar_ren: | |
| d = _dialeto() | |
| if d == "sqlite": | |
| # Verifica suporte na versão | |
| ver = _sqlite_version() or "0.0.0" | |
| suportado = tuple(map(int, ver.split("."))) >= (3, 25, 0) | |
| if suportado: | |
| sql = _ddl_rename_column_sql(tabela, antigo, novo) | |
| try: | |
| with engine.begin() as conn: | |
| conn.execute(text(sql)) | |
| registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}→{novo} em {tabela}", "schema", None) | |
| st.success("✅ Coluna renomeada com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro ao renomear: {e}") | |
| else: | |
| st.warning("SQLite < 3.25 não suporta RENAME COLUMN. Oferecendo reconstrução assistida.") | |
| # Reconstrução: atualiza metadados e recria tabela | |
| novas = [] | |
| for c in colunas: | |
| nm = novo if c["name"] == antigo else c["name"] | |
| novas.append({"name": nm, "type": c["type"], "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]}) | |
| try: | |
| _sqlite_reconstruir_tabela(tabela, novas) | |
| registrar_log(st.session_state.get("usuario"), f"RENAME (rebuild) {antigo}→{novo} em {tabela}", "schema", None) | |
| st.success("✅ Reconstrução concluída com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro na reconstrução: {e}") | |
| elif d in ("postgresql", "postgres"): | |
| sql = _ddl_rename_column_sql(tabela, antigo, novo) | |
| if not sql: | |
| st.error("Renomeação não suportada.") | |
| else: | |
| try: | |
| with engine.begin() as conn: | |
| conn.execute(text(sql)) | |
| registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}→{novo} em {tabela}", "schema", None) | |
| st.success("✅ Coluna renomeada com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro ao renomear: {e}") | |
| elif d in ("mysql", "mariadb"): | |
| st.info("MySQL/MariaDB exigem 'CHANGE COLUMN' informando o novo tipo; use a aba 'Alterar tipo' para renomear junto com tipo.") | |
| # ----------------- Excluir coluna ----------------- | |
| with tabs[2]: | |
| st.markdown("**Excluir uma coluna existente**") | |
| col_nomes = [c["name"] for c in colunas] | |
| col_drop = st.selectbox("Coluna a excluir:", col_nomes) if col_nomes else "" | |
| confirmar_drop = st.checkbox("Confirmo a exclusão desta coluna (DDL) e entendo que é irreversível.") | |
| if st.button("Executar DROP COLUMN", type="secondary") and confirmar_drop: | |
| d = _dialeto() | |
| if d == "sqlite": | |
| ver = _sqlite_version() or "0.0.0" | |
| suportado = tuple(map(int, ver.split("."))) >= (3, 35, 0) | |
| if suportado: | |
| sql = _ddl_drop_column_sql(tabela, col_drop) | |
| try: | |
| with engine.begin() as conn: | |
| conn.execute(text(sql)) | |
| registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None) | |
| st.success("✅ Coluna excluída com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro ao excluir: {e}") | |
| else: | |
| st.warning("SQLite < 3.35 não suporta DROP COLUMN. Oferecendo reconstrução assistida.") | |
| novas = [c for c in colunas if c["name"] != col_drop] | |
| try: | |
| _sqlite_reconstruir_tabela(tabela, novas) | |
| registrar_log(st.session_state.get("usuario"), f"DROP (rebuild) {col_drop} em {tabela}", "schema", None) | |
| st.success("✅ Reconstrução concluída e coluna removida.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro na reconstrução: {e}") | |
| elif d in ("postgresql", "postgres", "mysql", "mariadb"): | |
| sql = _ddl_drop_column_sql(tabela, col_drop) | |
| try: | |
| with engine.begin() as conn: | |
| conn.execute(text(sql)) | |
| registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None) | |
| st.success("✅ Coluna excluída com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro ao excluir: {e}") | |
| # ----------------- Alterar tipo ----------------- | |
| with tabs[3]: | |
| st.markdown("**Alterar tipo declarado de uma coluna**") | |
| col_nomes = [c["name"] for c in colunas] | |
| alvo = st.selectbox("Coluna alvo:", col_nomes) if col_nomes else "" | |
| novo_tipo = st.text_input("Novo tipo (ex.: TEXT, INTEGER, VARCHAR(255))") | |
| confirmar_type = st.checkbox("Confirmo a alteração de tipo (DDL).") | |
| if st.button("Executar ALTER TYPE") and confirmar_type: | |
| d = _dialeto() | |
| if d == "sqlite": | |
| st.warning("SQLite não suporta ALTER TYPE direto; oferecemos reconstrução assistida.") | |
| novas = [] | |
| for c in colunas: | |
| typ = novo_tipo if c["name"] == alvo else c["type"] | |
| novas.append({"name": c["name"], "type": typ, "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]}) | |
| try: | |
| _sqlite_reconstruir_tabela(tabela, novas) | |
| registrar_log(st.session_state.get("usuario"), f"ALTER TYPE (rebuild) {alvo}→{novo_tipo} em {tabela}", "schema", None) | |
| st.success("✅ Tipo alterado com sucesso via reconstrução.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro na reconstrução: {e}") | |
| elif d in ("postgresql", "postgres", "mysql", "mariadb"): | |
| sql = _ddl_alter_type_sql(tabela, alvo, novo_tipo) | |
| if not sql: | |
| st.error("Dialeto não suportado para ALTER TYPE.") | |
| else: | |
| try: | |
| with engine.begin() as conn: | |
| conn.execute(text(sql)) | |
| registrar_log(st.session_state.get("usuario"), f"ALTER TYPE {alvo}→{novo_tipo} em {tabela}", "schema", None) | |
| st.success("✅ Tipo alterado com sucesso.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Erro ao alterar tipo: {e}") | |