File size: 10,148 Bytes
e982206
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
Nó para operações de banco de dados
"""
import os
import logging
import pandas as pd
from typing import Dict, Any, TypedDict, Optional
from sqlalchemy import create_engine

from utils.config import SQL_DB_PATH
from utils.database import create_sql_database, validate_database
from utils.object_manager import get_object_manager

class DatabaseState(TypedDict):
    """Estado para operações de banco de dados"""
    success: bool
    message: str
    database_info: dict
    engine_id: str
    db_id: str

async def create_database_from_dataframe_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para criar banco de dados a partir de DataFrame processado
    
    Args:
        state: Estado contendo informações do DataFrame processado
        
    Returns:
        Estado atualizado com informações do banco
    """
    try:
        obj_manager = get_object_manager()
        
        # Recupera DataFrame processado
        df_id = state.get("dataframe_id")
        if not df_id:
            raise ValueError("ID do DataFrame não encontrado no estado")
        
        processed_df = obj_manager.get_object(df_id)
        if processed_df is None:
            raise ValueError("DataFrame processado não encontrado")
        
        # Recupera informações das colunas
        column_info = state.get("column_info", {})
        sql_types = column_info.get("sql_types", {})
        
        # Cria engine do banco
        engine = create_engine(f"sqlite:///{SQL_DB_PATH}")
        
        # Salva DataFrame no banco
        processed_df.to_sql(
            "tabela", 
            engine, 
            index=False, 
            if_exists="replace", 
            dtype=sql_types
        )
        
        logging.info(f"[DATABASE] Banco criado com {len(processed_df)} registros")
        
        # Cria objeto SQLDatabase do LangChain
        db = create_sql_database(engine)
        
        # Valida banco
        is_valid = validate_database(engine)
        
        # Armazena objetos no gerenciador
        engine_id = obj_manager.store_engine(engine)
        db_id = obj_manager.store_database(db)
        
        # Informações do banco
        database_info = {
            "path": SQL_DB_PATH,
            "table_name": "tabela",
            "total_records": len(processed_df),
            "columns": list(processed_df.columns),
            "column_types": {col: str(dtype) for col, dtype in processed_df.dtypes.items()},
            "is_valid": is_valid,
            "sql_types_used": {col: str(sql_type) for col, sql_type in sql_types.items()}
        }
        
        # Atualiza estado
        state.update({
            "success": True,
            "message": f"✅ Banco de dados criado com sucesso! {len(processed_df)} registros salvos",
            "database_info": database_info,
            "engine_id": engine_id,
            "db_id": db_id
        })
        
        logging.info(f"[DATABASE] Banco criado e validado: {database_info}")
        
    except Exception as e:
        error_msg = f"❌ Erro ao criar banco de dados: {e}"
        logging.error(f"[DATABASE] {error_msg}")
        state.update({
            "success": False,
            "message": error_msg,
            "database_info": {},
            "engine_id": "",
            "db_id": ""
        })
    
    return state

async def load_existing_database_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para carregar banco de dados existente
    
    Args:
        state: Estado atual
        
    Returns:
        Estado atualizado com informações do banco existente
    """
    try:
        if not os.path.exists(SQL_DB_PATH):
            raise ValueError("Banco de dados não encontrado")
        
        # Cria engine
        engine = create_engine(f"sqlite:///{SQL_DB_PATH}")
        
        # Cria objeto SQLDatabase
        db = create_sql_database(engine)
        
        # Valida banco
        is_valid = validate_database(engine)
        
        # Obtém informações do banco
        try:
            sample_df = pd.read_sql_query("SELECT * FROM tabela LIMIT 5", engine)
            total_records_df = pd.read_sql_query("SELECT COUNT(*) as count FROM tabela", engine)
            total_records = total_records_df.iloc[0]['count']
            
            database_info = {
                "path": SQL_DB_PATH,
                "table_name": "tabela",
                "total_records": total_records,
                "columns": list(sample_df.columns),
                "column_types": {col: str(dtype) for col, dtype in sample_df.dtypes.items()},
                "is_valid": is_valid,
                "sample_data": sample_df.head(3).to_dict()
            }
        except Exception as e:
            logging.warning(f"Erro ao obter informações detalhadas do banco: {e}")
            database_info = {
                "path": SQL_DB_PATH,
                "table_name": "tabela",
                "is_valid": is_valid,
                "error": str(e)
            }
        
        # Armazena objetos no gerenciador
        obj_manager = get_object_manager()
        engine_id = obj_manager.store_engine(engine)
        db_id = obj_manager.store_database(db)
        
        # Atualiza estado
        state.update({
            "success": True,
            "message": "✅ Banco de dados existente carregado com sucesso",
            "database_info": database_info,
            "engine_id": engine_id,
            "db_id": db_id
        })
        
        logging.info(f"[DATABASE] Banco existente carregado: {database_info}")
        
    except Exception as e:
        error_msg = f"❌ Erro ao carregar banco existente: {e}"
        logging.error(f"[DATABASE] {error_msg}")
        state.update({
            "success": False,
            "message": error_msg,
            "database_info": {},
            "engine_id": "",
            "db_id": ""
        })
    
    return state

async def get_database_sample_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para obter amostra dos dados do banco
    
    Args:
        state: Estado contendo ID da engine
        
    Returns:
        Estado atualizado com amostra dos dados
    """
    try:
        obj_manager = get_object_manager()
        
        # Recupera engine
        engine_id = state.get("engine_id")
        if not engine_id:
            raise ValueError("ID da engine não encontrado")
        
        engine = obj_manager.get_engine(engine_id)
        if not engine:
            raise ValueError("Engine não encontrada")
        
        # Determina qual tabela usar para amostra
        connection_type = state.get("connection_type", "csv")

        if connection_type == "postgresql":
            # Para PostgreSQL, detecta dinamicamente a primeira tabela disponível com dados
            import sqlalchemy as sa

            try:
                with engine.connect() as conn:
                    # Obtém lista de tabelas disponíveis
                    tables_result = conn.execute(sa.text("""
                        SELECT table_name
                        FROM information_schema.tables
                        WHERE table_schema = 'public'
                        ORDER BY table_name
                    """))
                    available_tables = [row[0] for row in tables_result.fetchall()]

                    if not available_tables:
                        raise ValueError("Nenhuma tabela encontrada no banco PostgreSQL")

                    # Tenta encontrar uma tabela com dados
                    table_name = None
                    for table in available_tables:
                        try:
                            # Verifica se a tabela tem dados
                            count_result = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table} LIMIT 1"))
                            count = count_result.scalar()
                            if count > 0:
                                table_name = table
                                logging.info(f"[DATABASE] PostgreSQL - usando tabela '{table_name}' para amostra ({count} registros)")
                                break
                        except Exception as e:
                            logging.warning(f"[DATABASE] Erro ao verificar tabela {table}: {e}")
                            continue

                    # Se nenhuma tabela tem dados, usa a primeira disponível
                    if not table_name:
                        table_name = available_tables[0]
                        logging.info(f"[DATABASE] PostgreSQL - usando primeira tabela '{table_name}' (sem dados detectados)")

            except Exception as e:
                logging.error(f"[DATABASE] Erro ao detectar tabelas PostgreSQL: {e}")
                raise ValueError(f"Erro ao acessar tabelas PostgreSQL: {e}")

        else:
            table_name = "tabela"  # Padrão para CSV
            logging.info(f"[DATABASE] CSV - usando tabela padrão: {table_name}")

        # Obtém amostra dos dados
        try:
            sample_df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", engine)
            logging.info(f"[DATABASE] Amostra obtida da tabela '{table_name}': {sample_df.shape[0]} registros")
        except Exception as e:
            logging.error(f"[DATABASE] Erro ao obter amostra da tabela '{table_name}': {e}")
            # Se falhar, cria DataFrame vazio para não quebrar o fluxo
            sample_df = pd.DataFrame()
        
        # Converte para formato serializável
        db_sample_dict = {
            "data": sample_df.to_dict('records'),
            "columns": list(sample_df.columns),
            "dtypes": sample_df.dtypes.astype(str).to_dict(),
            "shape": sample_df.shape
        }
        
        state["db_sample_dict"] = db_sample_dict
        
        logging.info(f"[DATABASE] Amostra obtida: {sample_df.shape[0]} registros")
        
    except Exception as e:
        error_msg = f"Erro ao obter amostra do banco: {e}"
        logging.error(f"[DATABASE] {error_msg}")
        state["db_sample_dict"] = {}
        state["error"] = error_msg
    
    return state