# db_admin.py import streamlit as st import os import shutil from sqlalchemy import text from banco import engine, SessionLocal from utils_permissoes import verificar_permissao from utils_auditoria import registrar_log # ===================================================== # MÓDULO GERAL DE ADMINISTRAÇÃO DE BANCO (SCHEMA) # ===================================================== # Objetivo: # - Permitir adicionar, renomear, excluir e alterar tipo de colunas via UI # - Funciona com SQLite, PostgreSQL e MySQL (com diferenças por dialeto) # - Em SQLite, oferece reconstrução assistida quando DDL não é suportado # # Segurança e boas práticas: # - Recomendado fazer backup antes de operações (botão disponível para SQLite) # - Operações DDL são críticas: exigir confirmação explícita # - Acesso restrito ao perfil "admin" # # Logs: # - registrar_log(...) é chamado em todas as operações # ------------------------- # Utilitário: Dialeto e versão # ------------------------- def _dialeto(): try: return engine.url.get_backend_name() except Exception: return "desconhecido" def _sqlite_version(): if _dialeto() != "sqlite": return None try: with engine.begin() as conn: rv = conn.execute(text("select sqlite_version()")).scalar() return rv except Exception: return None # ------------------------- # Utilitário: Listar tabelas e colunas # ------------------------- def _listar_tabelas(): d = _dialeto() with engine.begin() as conn: if d == "sqlite": rows = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall() return [r[0] for r in rows] else: q = text(""" SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('pg_catalog','information_schema') ORDER BY table_name """) rows = conn.execute(q).fetchall() return [r[0] for r in rows] def _listar_colunas(tabela: str): d = _dialeto() with engine.begin() as conn: if d == "sqlite": rows = conn.execute(text(f"PRAGMA table_info({tabela})")).fetchall() # PRAGMA: (cid, name, type, notnull, dflt_value, pk) return [{"name": r[1], "type": r[2], "notnull": bool(r[3]), "default": r[4], "pk": bool(r[5])} for r in rows] else: q = text(""" SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = :tbl ORDER BY ordinal_position """) rows = conn.execute(q, {"tbl": tabela}).fetchall() return [{"name": r[0], "type": r[1], "notnull": (str(r[2]).upper() == "NO"), "default": r[3], "pk": False} for r in rows] # ------------------------- # Backup rápido (SQLite) # ------------------------- def _sqlite_backup(): if _dialeto() != "sqlite": st.info("Backup automático só disponível para SQLite via cópia de arquivo.") return db_path = engine.url.database if not db_path or not os.path.exists(db_path): st.error("Arquivo de banco SQLite não encontrado.") return dest = db_path + ".bak" shutil.copyfile(db_path, dest) st.success(f"Backup criado: {dest}") # ------------------------- # DDL: Gerar comandos por dialeto # ------------------------- def _ddl_add_column_sql(tabela, col_nome, col_tipo, notnull=False, default=None): d = _dialeto() nn = "NOT NULL" if notnull else "NULL" def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else "" if d == "sqlite": # SQLite aceita tipo textual; notnull e default são respeitados no schema return f"ALTER TABLE {tabela} ADD COLUMN {col_nome} {col_tipo} {nn}{def_clause};" elif d in ("postgresql", "postgres"): base = f'ALTER TABLE "{tabela}" ADD COLUMN "{col_nome}" {col_tipo}' if default is not None and str(default).strip() != "": base += f" DEFAULT {default}" if notnull: base += " NOT NULL" return base + ";" elif d in ("mysql", "mariadb"): base = f"ALTER TABLE `{tabela}` ADD COLUMN `{col_nome}` {col_tipo}" if default is not None and str(default).strip() != "": base += f" DEFAULT {default}" base += " NOT NULL" if notnull else " NULL" return base + ";" return None def _ddl_rename_column_sql(tabela, old, new): d = _dialeto() if d == "sqlite": return f"ALTER TABLE {tabela} RENAME COLUMN {old} TO {new};" elif d in ("postgresql", "postgres"): return f'ALTER TABLE "{tabela}" RENAME COLUMN "{old}" TO "{new}";' elif d in ("mysql", "mariadb"): # MySQL requer tipo na renomeação; esta função não cobre tipo -> usar CHANGE COLUMN via UI de "Alterar tipo/renomear" return None return None def _ddl_drop_column_sql(tabela, col): d = _dialeto() if d == "sqlite": return f"ALTER TABLE {tabela} DROP COLUMN {col};" elif d in ("postgresql", "postgres"): return f'ALTER TABLE "{tabela}" DROP COLUMN "{col}";' elif d in ("mysql", "mariadb"): return f"ALTER TABLE `{tabela}` DROP COLUMN `{col}`;" return None def _ddl_alter_type_sql(tabela, col, new_type): d = _dialeto() if d == "sqlite": # SQLite não altera type declarado via ALTER TYPE. Necessário reconstruir tabela. return None elif d in ("postgresql", "postgres"): return f'ALTER TABLE "{tabela}" ALTER COLUMN "{col}" TYPE {new_type};' elif d in ("mysql", "mariadb"): return f"ALTER TABLE `{tabela}` MODIFY COLUMN `{col}` {new_type};" return None # ------------------------- # Reconstrução assistida (SQLite) # ------------------------- def _sqlite_reconstruir_tabela(tabela, novas_colunas): """ Reconstrói tabela SQLite com "novas_colunas" (lista de dicts): [{"name":..., "type":..., "notnull":bool, "default":..., "pk":bool}, ...] - Cria tabela __tmp_ com o novo schema - Copia dados das colunas compatíveis (mesmos nomes) - Drop da tabela original e rename da temporária """ cols_def = [] copy_cols = [] pk_cols = [c["name"] for c in novas_colunas if c.get("pk")] for c in novas_colunas: nn = "NOT NULL" if c.get("notnull") else "" default = c.get("default") def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else "" cols_def.append(f'{c["name"]} {c["type"]} {nn}{def_clause}'.strip()) copy_cols.append(c["name"]) pk_clause = f", PRIMARY KEY ({', '.join(pk_cols)})" if pk_cols else "" create_sql = f"CREATE TABLE __tmp_{tabela} ({', '.join(cols_def)}{pk_clause});" copy_sql = f"INSERT INTO __tmp_{tabela} ({', '.join(copy_cols)}) SELECT {', '.join(copy_cols)} FROM {tabela};" drop_sql = f"DROP TABLE {tabela};" rename_sql= f"ALTER TABLE __tmp_{tabela} RENAME TO {tabela};" with engine.begin() as conn: conn.execute(text(create_sql)) conn.execute(text(copy_sql)) conn.execute(text(drop_sql)) conn.execute(text(rename_sql)) # ------------------------- # UI principal (admin) # ------------------------- def main(): st.title("🛠️ Administração de Banco (Schema)") # 🔐 Proteção por perfil if not verificar_permissao("db_admin") and st.session_state.get("perfil") != "admin": st.error("⛔ Acesso não autorizado.") return # Info do banco dial = _dialeto() st.caption(f"Dialeto: **{dial}**") ver = _sqlite_version() if ver: st.caption(f"SQLite version: **{ver}**") # Backup (SQLite) if dial == "sqlite": if st.button("💾 Backup rápido (SQLite)"): _sqlite_backup() # Tabelas disponíveis tabelas = _listar_tabelas() if not tabelas: st.warning("Nenhuma tabela encontrada.") return tabela = st.selectbox("Tabela alvo:", tabelas, index=0) colunas = _listar_colunas(tabela) st.divider() st.subheader("📋 Colunas atuais") st.write(pd.DataFrame(colunas)) if 'pd' in globals() else st.write(colunas) # mostra estrutura atual st.divider() tabs = st.tabs(["➕ Adicionar coluna", "✏️ Renomear coluna", "🗑️ Excluir coluna", "♻️ Alterar tipo"]) # ----------------- Adicionar coluna ----------------- with tabs[0]: st.markdown("**Adicionar uma nova coluna à tabela selecionada**") novo_nome = st.text_input("Nome da nova coluna") novo_tipo = st.text_input("Tipo (ex.: TEXT, INTEGER, VARCHAR(255))") novo_notnull = st.checkbox("NOT NULL", value=False) novo_default = st.text_input("DEFAULT (opcional)") confirmar_add = st.checkbox("Confirmo a adição desta coluna (DDL).") if st.button("Executar ADD COLUMN", type="primary") and confirmar_add: sql = _ddl_add_column_sql(tabela, novo_nome, novo_tipo, notnull=novo_notnull, default=novo_default) if not sql: st.error("Dialeto não suportado para ADD COLUMN.") else: try: with engine.begin() as conn: conn.execute(text(sql)) registrar_log(st.session_state.get("usuario"), f"ADD COLUMN {novo_nome} {novo_tipo} em {tabela}", "schema", None) st.success("✅ Coluna adicionada com sucesso.") st.rerun() except Exception as e: st.error(f"Erro ao adicionar coluna: {e}") # ----------------- Renomear coluna ----------------- with tabs[1]: st.markdown("**Renomear uma coluna existente**") col_nomes = [c["name"] for c in colunas] antigo = st.selectbox("Coluna atual:", col_nomes) if col_nomes else "" novo = st.text_input("Novo nome da coluna") confirmar_ren = st.checkbox("Confirmo a renomeação desta coluna (DDL).") if st.button("Executar RENAME COLUMN") and confirmar_ren: d = _dialeto() if d == "sqlite": # Verifica suporte na versão ver = _sqlite_version() or "0.0.0" suportado = tuple(map(int, ver.split("."))) >= (3, 25, 0) if suportado: sql = _ddl_rename_column_sql(tabela, antigo, novo) try: with engine.begin() as conn: conn.execute(text(sql)) registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}→{novo} em {tabela}", "schema", None) st.success("✅ Coluna renomeada com sucesso.") st.rerun() except Exception as e: st.error(f"Erro ao renomear: {e}") else: st.warning("SQLite < 3.25 não suporta RENAME COLUMN. Oferecendo reconstrução assistida.") # Reconstrução: atualiza metadados e recria tabela novas = [] for c in colunas: nm = novo if c["name"] == antigo else c["name"] novas.append({"name": nm, "type": c["type"], "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]}) try: _sqlite_reconstruir_tabela(tabela, novas) registrar_log(st.session_state.get("usuario"), f"RENAME (rebuild) {antigo}→{novo} em {tabela}", "schema", None) st.success("✅ Reconstrução concluída com sucesso.") st.rerun() except Exception as e: st.error(f"Erro na reconstrução: {e}") elif d in ("postgresql", "postgres"): sql = _ddl_rename_column_sql(tabela, antigo, novo) if not sql: st.error("Renomeação não suportada.") else: try: with engine.begin() as conn: conn.execute(text(sql)) registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}→{novo} em {tabela}", "schema", None) st.success("✅ Coluna renomeada com sucesso.") st.rerun() except Exception as e: st.error(f"Erro ao renomear: {e}") elif d in ("mysql", "mariadb"): st.info("MySQL/MariaDB exigem 'CHANGE COLUMN' informando o novo tipo; use a aba 'Alterar tipo' para renomear junto com tipo.") # ----------------- Excluir coluna ----------------- with tabs[2]: st.markdown("**Excluir uma coluna existente**") col_nomes = [c["name"] for c in colunas] col_drop = st.selectbox("Coluna a excluir:", col_nomes) if col_nomes else "" confirmar_drop = st.checkbox("Confirmo a exclusão desta coluna (DDL) e entendo que é irreversível.") if st.button("Executar DROP COLUMN", type="secondary") and confirmar_drop: d = _dialeto() if d == "sqlite": ver = _sqlite_version() or "0.0.0" suportado = tuple(map(int, ver.split("."))) >= (3, 35, 0) if suportado: sql = _ddl_drop_column_sql(tabela, col_drop) try: with engine.begin() as conn: conn.execute(text(sql)) registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None) st.success("✅ Coluna excluída com sucesso.") st.rerun() except Exception as e: st.error(f"Erro ao excluir: {e}") else: st.warning("SQLite < 3.35 não suporta DROP COLUMN. Oferecendo reconstrução assistida.") novas = [c for c in colunas if c["name"] != col_drop] try: _sqlite_reconstruir_tabela(tabela, novas) registrar_log(st.session_state.get("usuario"), f"DROP (rebuild) {col_drop} em {tabela}", "schema", None) st.success("✅ Reconstrução concluída e coluna removida.") st.rerun() except Exception as e: st.error(f"Erro na reconstrução: {e}") elif d in ("postgresql", "postgres", "mysql", "mariadb"): sql = _ddl_drop_column_sql(tabela, col_drop) try: with engine.begin() as conn: conn.execute(text(sql)) registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None) st.success("✅ Coluna excluída com sucesso.") st.rerun() except Exception as e: st.error(f"Erro ao excluir: {e}") # ----------------- Alterar tipo ----------------- with tabs[3]: st.markdown("**Alterar tipo declarado de uma coluna**") col_nomes = [c["name"] for c in colunas] alvo = st.selectbox("Coluna alvo:", col_nomes) if col_nomes else "" novo_tipo = st.text_input("Novo tipo (ex.: TEXT, INTEGER, VARCHAR(255))") confirmar_type = st.checkbox("Confirmo a alteração de tipo (DDL).") if st.button("Executar ALTER TYPE") and confirmar_type: d = _dialeto() if d == "sqlite": st.warning("SQLite não suporta ALTER TYPE direto; oferecemos reconstrução assistida.") novas = [] for c in colunas: typ = novo_tipo if c["name"] == alvo else c["type"] novas.append({"name": c["name"], "type": typ, "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]}) try: _sqlite_reconstruir_tabela(tabela, novas) registrar_log(st.session_state.get("usuario"), f"ALTER TYPE (rebuild) {alvo}→{novo_tipo} em {tabela}", "schema", None) st.success("✅ Tipo alterado com sucesso via reconstrução.") st.rerun() except Exception as e: st.error(f"Erro na reconstrução: {e}") elif d in ("postgresql", "postgres", "mysql", "mariadb"): sql = _ddl_alter_type_sql(tabela, alvo, novo_tipo) if not sql: st.error("Dialeto não suportado para ALTER TYPE.") else: try: with engine.begin() as conn: conn.execute(text(sql)) registrar_log(st.session_state.get("usuario"), f"ALTER TYPE {alvo}→{novo_tipo} em {tabela}", "schema", None) st.success("✅ Tipo alterado com sucesso.") st.rerun() except Exception as e: st.error(f"Erro ao alterar tipo: {e}")