File size: 17,746 Bytes
0f0ef8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388

# db_admin.py
import streamlit as st
import os
import shutil
from sqlalchemy import text
from banco import engine, SessionLocal
from utils_permissoes import verificar_permissao
from utils_auditoria import registrar_log

# =====================================================
# MÓDULO GERAL DE ADMINISTRAÇÃO DE BANCO (SCHEMA)
# =====================================================
# Objetivo:
# - Permitir adicionar, renomear, excluir e alterar tipo de colunas via UI
# - Funciona com SQLite, PostgreSQL e MySQL (com diferenças por dialeto)
# - Em SQLite, oferece reconstrução assistida quando DDL não é suportado
#
# Segurança e boas práticas:
# - Recomendado fazer backup antes de operações (botão disponível para SQLite)
# - Operações DDL são críticas: exigir confirmação explícita
# - Acesso restrito ao perfil "admin"
#
# Logs:
# - registrar_log(...) é chamado em todas as operações


# -------------------------
# Utilitário: Dialeto e versão
# -------------------------
def _dialeto():
    try:
        return engine.url.get_backend_name()
    except Exception:
        return "desconhecido"

def _sqlite_version():
    if _dialeto() != "sqlite":
        return None
    try:
        with engine.begin() as conn:
            rv = conn.execute(text("select sqlite_version()")).scalar()
            return rv
    except Exception:
        return None


# -------------------------
# Utilitário: Listar tabelas e colunas
# -------------------------
def _listar_tabelas():
    d = _dialeto()
    with engine.begin() as conn:
        if d == "sqlite":
            rows = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")).fetchall()
            return [r[0] for r in rows]
        else:
            q = text("""

                SELECT table_name

                FROM information_schema.tables

                WHERE table_schema NOT IN ('pg_catalog','information_schema')

                ORDER BY table_name

            """)
            rows = conn.execute(q).fetchall()
            return [r[0] for r in rows]

def _listar_colunas(tabela: str):
    d = _dialeto()
    with engine.begin() as conn:
        if d == "sqlite":
            rows = conn.execute(text(f"PRAGMA table_info({tabela})")).fetchall()
            # PRAGMA: (cid, name, type, notnull, dflt_value, pk)
            return [{"name": r[1], "type": r[2], "notnull": bool(r[3]), "default": r[4], "pk": bool(r[5])} for r in rows]
        else:
            q = text("""

                SELECT column_name, data_type, is_nullable, column_default

                FROM information_schema.columns

                WHERE table_name = :tbl

                ORDER BY ordinal_position

            """)
            rows = conn.execute(q, {"tbl": tabela}).fetchall()
            return [{"name": r[0], "type": r[1], "notnull": (str(r[2]).upper() == "NO"), "default": r[3], "pk": False} for r in rows]


# -------------------------
# Backup rápido (SQLite)
# -------------------------
def _sqlite_backup():
    if _dialeto() != "sqlite":
        st.info("Backup automático só disponível para SQLite via cópia de arquivo.")
        return
    db_path = engine.url.database
    if not db_path or not os.path.exists(db_path):
        st.error("Arquivo de banco SQLite não encontrado.")
        return
    dest = db_path + ".bak"
    shutil.copyfile(db_path, dest)
    st.success(f"Backup criado: {dest}")


# -------------------------
# DDL: Gerar comandos por dialeto
# -------------------------
def _ddl_add_column_sql(tabela, col_nome, col_tipo, notnull=False, default=None):
    d = _dialeto()
    nn = "NOT NULL" if notnull else "NULL"
    def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else ""
    if d == "sqlite":
        # SQLite aceita tipo textual; notnull e default são respeitados no schema
        return f"ALTER TABLE {tabela} ADD COLUMN {col_nome} {col_tipo} {nn}{def_clause};"
    elif d in ("postgresql", "postgres"):
        base = f'ALTER TABLE "{tabela}" ADD COLUMN "{col_nome}" {col_tipo}'
        if default is not None and str(default).strip() != "":
            base += f" DEFAULT {default}"
        if notnull:
            base += " NOT NULL"
        return base + ";"
    elif d in ("mysql", "mariadb"):
        base = f"ALTER TABLE `{tabela}` ADD COLUMN `{col_nome}` {col_tipo}"
        if default is not None and str(default).strip() != "":
            base += f" DEFAULT {default}"
        base += " NOT NULL" if notnull else " NULL"
        return base + ";"
    return None

def _ddl_rename_column_sql(tabela, old, new):
    d = _dialeto()
    if d == "sqlite":
        return f"ALTER TABLE {tabela} RENAME COLUMN {old} TO {new};"
    elif d in ("postgresql", "postgres"):
        return f'ALTER TABLE "{tabela}" RENAME COLUMN "{old}" TO "{new}";'
    elif d in ("mysql", "mariadb"):
        # MySQL requer tipo na renomeação; esta função não cobre tipo -> usar CHANGE COLUMN via UI de "Alterar tipo/renomear"
        return None
    return None

def _ddl_drop_column_sql(tabela, col):
    d = _dialeto()
    if d == "sqlite":
        return f"ALTER TABLE {tabela} DROP COLUMN {col};"
    elif d in ("postgresql", "postgres"):
        return f'ALTER TABLE "{tabela}" DROP COLUMN "{col}";'
    elif d in ("mysql", "mariadb"):
        return f"ALTER TABLE `{tabela}` DROP COLUMN `{col}`;"
    return None

def _ddl_alter_type_sql(tabela, col, new_type):
    d = _dialeto()
    if d == "sqlite":
        # SQLite não altera type declarado via ALTER TYPE. Necessário reconstruir tabela.
        return None
    elif d in ("postgresql", "postgres"):
        return f'ALTER TABLE "{tabela}" ALTER COLUMN "{col}" TYPE {new_type};'
    elif d in ("mysql", "mariadb"):
        return f"ALTER TABLE `{tabela}` MODIFY COLUMN `{col}` {new_type};"
    return None


# -------------------------
# Reconstrução assistida (SQLite)
# -------------------------
def _sqlite_reconstruir_tabela(tabela, novas_colunas):
    """

    Reconstrói tabela SQLite com "novas_colunas" (lista de dicts):

    [{"name":..., "type":..., "notnull":bool, "default":..., "pk":bool}, ...]

    - Cria tabela __tmp_<tabela> com o novo schema

    - Copia dados das colunas compatíveis (mesmos nomes)

    - Drop da tabela original e rename da temporária

    """
    cols_def = []
    copy_cols = []
    pk_cols = [c["name"] for c in novas_colunas if c.get("pk")]
    for c in novas_colunas:
        nn = "NOT NULL" if c.get("notnull") else ""
        default = c.get("default")
        def_clause = f" DEFAULT {default}" if (default is not None and str(default).strip() != "") else ""
        cols_def.append(f'{c["name"]} {c["type"]} {nn}{def_clause}'.strip())
        copy_cols.append(c["name"])
    pk_clause = f", PRIMARY KEY ({', '.join(pk_cols)})" if pk_cols else ""

    create_sql = f"CREATE TABLE __tmp_{tabela} ({', '.join(cols_def)}{pk_clause});"
    copy_sql = f"INSERT INTO __tmp_{tabela} ({', '.join(copy_cols)}) SELECT {', '.join(copy_cols)} FROM {tabela};"
    drop_sql  = f"DROP TABLE {tabela};"
    rename_sql= f"ALTER TABLE __tmp_{tabela} RENAME TO {tabela};"

    with engine.begin() as conn:
        conn.execute(text(create_sql))
        conn.execute(text(copy_sql))
        conn.execute(text(drop_sql))
        conn.execute(text(rename_sql))


# -------------------------
# UI principal (admin)
# -------------------------
def main():
    st.title("🛠️ Administração de Banco (Schema)")

    # 🔐 Proteção por perfil
    if not verificar_permissao("db_admin") and st.session_state.get("perfil") != "admin":
        st.error("⛔ Acesso não autorizado.")
        return

    # Info do banco
    dial = _dialeto()
    st.caption(f"Dialeto: **{dial}**")
    ver = _sqlite_version()
    if ver:
        st.caption(f"SQLite version: **{ver}**")

    # Backup (SQLite)
    if dial == "sqlite":
        if st.button("💾 Backup rápido (SQLite)"):
            _sqlite_backup()

    # Tabelas disponíveis
    tabelas = _listar_tabelas()
    if not tabelas:
        st.warning("Nenhuma tabela encontrada.")
        return

    tabela = st.selectbox("Tabela alvo:", tabelas, index=0)
    colunas = _listar_colunas(tabela)

    st.divider()
    st.subheader("📋 Colunas atuais")
    st.write(pd.DataFrame(colunas)) if 'pd' in globals() else st.write(colunas)  # mostra estrutura atual

    st.divider()
    tabs = st.tabs(["➕ Adicionar coluna", "✏️ Renomear coluna", "🗑️ Excluir coluna", "♻️ Alterar tipo"])

    # ----------------- Adicionar coluna -----------------
    with tabs[0]:
        st.markdown("**Adicionar uma nova coluna à tabela selecionada**")
        novo_nome = st.text_input("Nome da nova coluna")
        novo_tipo = st.text_input("Tipo (ex.: TEXT, INTEGER, VARCHAR(255))")
        novo_notnull = st.checkbox("NOT NULL", value=False)
        novo_default = st.text_input("DEFAULT (opcional)")

        confirmar_add = st.checkbox("Confirmo a adição desta coluna (DDL).")
        if st.button("Executar ADD COLUMN", type="primary") and confirmar_add:
            sql = _ddl_add_column_sql(tabela, novo_nome, novo_tipo, notnull=novo_notnull, default=novo_default)
            if not sql:
                st.error("Dialeto não suportado para ADD COLUMN.")
            else:
                try:
                    with engine.begin() as conn:
                        conn.execute(text(sql))
                    registrar_log(st.session_state.get("usuario"), f"ADD COLUMN {novo_nome} {novo_tipo} em {tabela}", "schema", None)
                    st.success("✅ Coluna adicionada com sucesso.")
                    st.rerun()
                except Exception as e:
                    st.error(f"Erro ao adicionar coluna: {e}")

    # ----------------- Renomear coluna -----------------
    with tabs[1]:
        st.markdown("**Renomear uma coluna existente**")
        col_nomes = [c["name"] for c in colunas]
        antigo = st.selectbox("Coluna atual:", col_nomes) if col_nomes else ""
        novo = st.text_input("Novo nome da coluna")

        confirmar_ren = st.checkbox("Confirmo a renomeação desta coluna (DDL).")
        if st.button("Executar RENAME COLUMN") and confirmar_ren:
            d = _dialeto()
            if d == "sqlite":
                # Verifica suporte na versão
                ver = _sqlite_version() or "0.0.0"
                suportado = tuple(map(int, ver.split("."))) >= (3, 25, 0)
                if suportado:
                    sql = _ddl_rename_column_sql(tabela, antigo, novo)
                    try:
                        with engine.begin() as conn:
                            conn.execute(text(sql))
                        registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}{novo} em {tabela}", "schema", None)
                        st.success("✅ Coluna renomeada com sucesso.")
                        st.rerun()
                    except Exception as e:
                        st.error(f"Erro ao renomear: {e}")
                else:
                    st.warning("SQLite < 3.25 não suporta RENAME COLUMN. Oferecendo reconstrução assistida.")
                    # Reconstrução: atualiza metadados e recria tabela
                    novas = []
                    for c in colunas:
                        nm = novo if c["name"] == antigo else c["name"]
                        novas.append({"name": nm, "type": c["type"], "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]})
                    try:
                        _sqlite_reconstruir_tabela(tabela, novas)
                        registrar_log(st.session_state.get("usuario"), f"RENAME (rebuild) {antigo}{novo} em {tabela}", "schema", None)
                        st.success("✅ Reconstrução concluída com sucesso.")
                        st.rerun()
                    except Exception as e:
                        st.error(f"Erro na reconstrução: {e}")
            elif d in ("postgresql", "postgres"):
                sql = _ddl_rename_column_sql(tabela, antigo, novo)
                if not sql:
                    st.error("Renomeação não suportada.")
                else:
                    try:
                        with engine.begin() as conn:
                            conn.execute(text(sql))
                        registrar_log(st.session_state.get("usuario"), f"RENAME COLUMN {antigo}{novo} em {tabela}", "schema", None)
                        st.success("✅ Coluna renomeada com sucesso.")
                        st.rerun()
                    except Exception as e:
                        st.error(f"Erro ao renomear: {e}")
            elif d in ("mysql", "mariadb"):
                st.info("MySQL/MariaDB exigem 'CHANGE COLUMN' informando o novo tipo; use a aba 'Alterar tipo' para renomear junto com tipo.")

    # ----------------- Excluir coluna -----------------
    with tabs[2]:
        st.markdown("**Excluir uma coluna existente**")
        col_nomes = [c["name"] for c in colunas]
        col_drop = st.selectbox("Coluna a excluir:", col_nomes) if col_nomes else ""

        confirmar_drop = st.checkbox("Confirmo a exclusão desta coluna (DDL) e entendo que é irreversível.")
        if st.button("Executar DROP COLUMN", type="secondary") and confirmar_drop:
            d = _dialeto()
            if d == "sqlite":
                ver = _sqlite_version() or "0.0.0"
                suportado = tuple(map(int, ver.split("."))) >= (3, 35, 0)
                if suportado:
                    sql = _ddl_drop_column_sql(tabela, col_drop)
                    try:
                        with engine.begin() as conn:
                            conn.execute(text(sql))
                        registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None)
                        st.success("✅ Coluna excluída com sucesso.")
                        st.rerun()
                    except Exception as e:
                        st.error(f"Erro ao excluir: {e}")
                else:
                    st.warning("SQLite < 3.35 não suporta DROP COLUMN. Oferecendo reconstrução assistida.")
                    novas = [c for c in colunas if c["name"] != col_drop]
                    try:
                        _sqlite_reconstruir_tabela(tabela, novas)
                        registrar_log(st.session_state.get("usuario"), f"DROP (rebuild) {col_drop} em {tabela}", "schema", None)
                        st.success("✅ Reconstrução concluída e coluna removida.")
                        st.rerun()
                    except Exception as e:
                        st.error(f"Erro na reconstrução: {e}")
            elif d in ("postgresql", "postgres", "mysql", "mariadb"):
                sql = _ddl_drop_column_sql(tabela, col_drop)
                try:
                    with engine.begin() as conn:
                        conn.execute(text(sql))
                    registrar_log(st.session_state.get("usuario"), f"DROP COLUMN {col_drop} em {tabela}", "schema", None)
                    st.success("✅ Coluna excluída com sucesso.")
                    st.rerun()
                except Exception as e:
                    st.error(f"Erro ao excluir: {e}")

    # ----------------- Alterar tipo -----------------
    with tabs[3]:
        st.markdown("**Alterar tipo declarado de uma coluna**")
        col_nomes = [c["name"] for c in colunas]
        alvo = st.selectbox("Coluna alvo:", col_nomes) if col_nomes else ""
        novo_tipo = st.text_input("Novo tipo (ex.: TEXT, INTEGER, VARCHAR(255))")

        confirmar_type = st.checkbox("Confirmo a alteração de tipo (DDL).")
        if st.button("Executar ALTER TYPE") and confirmar_type:
            d = _dialeto()
            if d == "sqlite":
                st.warning("SQLite não suporta ALTER TYPE direto; oferecemos reconstrução assistida.")
                novas = []
                for c in colunas:
                    typ = novo_tipo if c["name"] == alvo else c["type"]
                    novas.append({"name": c["name"], "type": typ, "notnull": c["notnull"], "default": c["default"], "pk": c["pk"]})
                try:
                    _sqlite_reconstruir_tabela(tabela, novas)
                    registrar_log(st.session_state.get("usuario"), f"ALTER TYPE (rebuild) {alvo}{novo_tipo} em {tabela}", "schema", None)
                    st.success("✅ Tipo alterado com sucesso via reconstrução.")
                    st.rerun()
                except Exception as e:
                    st.error(f"Erro na reconstrução: {e}")
            elif d in ("postgresql", "postgres", "mysql", "mariadb"):
                sql = _ddl_alter_type_sql(tabela, alvo, novo_tipo)
                if not sql:
                    st.error("Dialeto não suportado para ALTER TYPE.")
                else:
                    try:
                        with engine.begin() as conn:
                            conn.execute(text(sql))
                        registrar_log(st.session_state.get("usuario"), f"ALTER TYPE {alvo}{novo_tipo} em {tabela}", "schema", None)
                        st.success("✅ Tipo alterado com sucesso.")
                        st.rerun()
                    except Exception as e:
                        st.error(f"Erro ao alterar tipo: {e}")