Spaces:
Sleeping
Sleeping
| """ | |
| Nó para operações de banco de dados | |
| """ | |
| import os | |
| import logging | |
| import pandas as pd | |
| from typing import Dict, Any, TypedDict, Optional | |
| from sqlalchemy import create_engine | |
| from utils.config import SQL_DB_PATH | |
| from utils.database import create_sql_database, validate_database | |
| from utils.object_manager import get_object_manager | |
| class DatabaseState(TypedDict): | |
| """Estado para operações de banco de dados""" | |
| success: bool | |
| message: str | |
| database_info: dict | |
| engine_id: str | |
| db_id: str | |
| async def create_database_from_dataframe_node(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Nó para criar banco de dados a partir de DataFrame processado | |
| Args: | |
| state: Estado contendo informações do DataFrame processado | |
| Returns: | |
| Estado atualizado com informações do banco | |
| """ | |
| try: | |
| obj_manager = get_object_manager() | |
| # Recupera DataFrame processado | |
| df_id = state.get("dataframe_id") | |
| if not df_id: | |
| raise ValueError("ID do DataFrame não encontrado no estado") | |
| processed_df = obj_manager.get_object(df_id) | |
| if processed_df is None: | |
| raise ValueError("DataFrame processado não encontrado") | |
| # Recupera informações das colunas | |
| column_info = state.get("column_info", {}) | |
| sql_types = column_info.get("sql_types", {}) | |
| # Cria engine do banco | |
| engine = create_engine(f"sqlite:///{SQL_DB_PATH}") | |
| # Salva DataFrame no banco | |
| processed_df.to_sql( | |
| "tabela", | |
| engine, | |
| index=False, | |
| if_exists="replace", | |
| dtype=sql_types | |
| ) | |
| logging.info(f"[DATABASE] Banco criado com {len(processed_df)} registros") | |
| # Cria objeto SQLDatabase do LangChain | |
| db = create_sql_database(engine) | |
| # Valida banco | |
| is_valid = validate_database(engine) | |
| # Armazena objetos no gerenciador | |
| engine_id = obj_manager.store_engine(engine) | |
| db_id = obj_manager.store_database(db) | |
| # Informações do banco | |
| database_info = { | |
| "path": SQL_DB_PATH, | |
| "table_name": "tabela", | |
| "total_records": len(processed_df), | |
| "columns": list(processed_df.columns), | |
| "column_types": {col: str(dtype) for col, dtype in processed_df.dtypes.items()}, | |
| "is_valid": is_valid, | |
| "sql_types_used": {col: str(sql_type) for col, sql_type in sql_types.items()} | |
| } | |
| # Atualiza estado | |
| state.update({ | |
| "success": True, | |
| "message": f"✅ Banco de dados criado com sucesso! {len(processed_df)} registros salvos", | |
| "database_info": database_info, | |
| "engine_id": engine_id, | |
| "db_id": db_id | |
| }) | |
| logging.info(f"[DATABASE] Banco criado e validado: {database_info}") | |
| except Exception as e: | |
| error_msg = f"❌ Erro ao criar banco de dados: {e}" | |
| logging.error(f"[DATABASE] {error_msg}") | |
| state.update({ | |
| "success": False, | |
| "message": error_msg, | |
| "database_info": {}, | |
| "engine_id": "", | |
| "db_id": "" | |
| }) | |
| return state | |
| async def load_existing_database_node(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Nó para carregar banco de dados existente | |
| Args: | |
| state: Estado atual | |
| Returns: | |
| Estado atualizado com informações do banco existente | |
| """ | |
| try: | |
| if not os.path.exists(SQL_DB_PATH): | |
| raise ValueError("Banco de dados não encontrado") | |
| # Cria engine | |
| engine = create_engine(f"sqlite:///{SQL_DB_PATH}") | |
| # Cria objeto SQLDatabase | |
| db = create_sql_database(engine) | |
| # Valida banco | |
| is_valid = validate_database(engine) | |
| # Obtém informações do banco | |
| try: | |
| sample_df = pd.read_sql_query("SELECT * FROM tabela LIMIT 5", engine) | |
| total_records_df = pd.read_sql_query("SELECT COUNT(*) as count FROM tabela", engine) | |
| total_records = total_records_df.iloc[0]['count'] | |
| database_info = { | |
| "path": SQL_DB_PATH, | |
| "table_name": "tabela", | |
| "total_records": total_records, | |
| "columns": list(sample_df.columns), | |
| "column_types": {col: str(dtype) for col, dtype in sample_df.dtypes.items()}, | |
| "is_valid": is_valid, | |
| "sample_data": sample_df.head(3).to_dict() | |
| } | |
| except Exception as e: | |
| logging.warning(f"Erro ao obter informações detalhadas do banco: {e}") | |
| database_info = { | |
| "path": SQL_DB_PATH, | |
| "table_name": "tabela", | |
| "is_valid": is_valid, | |
| "error": str(e) | |
| } | |
| # Armazena objetos no gerenciador | |
| obj_manager = get_object_manager() | |
| engine_id = obj_manager.store_engine(engine) | |
| db_id = obj_manager.store_database(db) | |
| # Atualiza estado | |
| state.update({ | |
| "success": True, | |
| "message": "✅ Banco de dados existente carregado com sucesso", | |
| "database_info": database_info, | |
| "engine_id": engine_id, | |
| "db_id": db_id | |
| }) | |
| logging.info(f"[DATABASE] Banco existente carregado: {database_info}") | |
| except Exception as e: | |
| error_msg = f"❌ Erro ao carregar banco existente: {e}" | |
| logging.error(f"[DATABASE] {error_msg}") | |
| state.update({ | |
| "success": False, | |
| "message": error_msg, | |
| "database_info": {}, | |
| "engine_id": "", | |
| "db_id": "" | |
| }) | |
| return state | |
| async def get_database_sample_node(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Nó para obter amostra dos dados do banco | |
| Args: | |
| state: Estado contendo ID da engine | |
| Returns: | |
| Estado atualizado com amostra dos dados | |
| """ | |
| try: | |
| obj_manager = get_object_manager() | |
| # Recupera engine | |
| engine_id = state.get("engine_id") | |
| if not engine_id: | |
| raise ValueError("ID da engine não encontrado") | |
| engine = obj_manager.get_engine(engine_id) | |
| if not engine: | |
| raise ValueError("Engine não encontrada") | |
| # Determina qual tabela usar para amostra | |
| connection_type = state.get("connection_type", "csv") | |
| if connection_type == "postgresql": | |
| # Para PostgreSQL, detecta dinamicamente a primeira tabela disponível com dados | |
| import sqlalchemy as sa | |
| try: | |
| with engine.connect() as conn: | |
| # Obtém lista de tabelas disponíveis | |
| tables_result = conn.execute(sa.text(""" | |
| SELECT table_name | |
| FROM information_schema.tables | |
| WHERE table_schema = 'public' | |
| ORDER BY table_name | |
| """)) | |
| available_tables = [row[0] for row in tables_result.fetchall()] | |
| if not available_tables: | |
| raise ValueError("Nenhuma tabela encontrada no banco PostgreSQL") | |
| # Tenta encontrar uma tabela com dados | |
| table_name = None | |
| for table in available_tables: | |
| try: | |
| # Verifica se a tabela tem dados | |
| count_result = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table} LIMIT 1")) | |
| count = count_result.scalar() | |
| if count > 0: | |
| table_name = table | |
| logging.info(f"[DATABASE] PostgreSQL - usando tabela '{table_name}' para amostra ({count} registros)") | |
| break | |
| except Exception as e: | |
| logging.warning(f"[DATABASE] Erro ao verificar tabela {table}: {e}") | |
| continue | |
| # Se nenhuma tabela tem dados, usa a primeira disponível | |
| if not table_name: | |
| table_name = available_tables[0] | |
| logging.info(f"[DATABASE] PostgreSQL - usando primeira tabela '{table_name}' (sem dados detectados)") | |
| except Exception as e: | |
| logging.error(f"[DATABASE] Erro ao detectar tabelas PostgreSQL: {e}") | |
| raise ValueError(f"Erro ao acessar tabelas PostgreSQL: {e}") | |
| else: | |
| table_name = "tabela" # Padrão para CSV | |
| logging.info(f"[DATABASE] CSV - usando tabela padrão: {table_name}") | |
| # Obtém amostra dos dados | |
| try: | |
| sample_df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", engine) | |
| logging.info(f"[DATABASE] Amostra obtida da tabela '{table_name}': {sample_df.shape[0]} registros") | |
| except Exception as e: | |
| logging.error(f"[DATABASE] Erro ao obter amostra da tabela '{table_name}': {e}") | |
| # Se falhar, cria DataFrame vazio para não quebrar o fluxo | |
| sample_df = pd.DataFrame() | |
| # Converte para formato serializável | |
| db_sample_dict = { | |
| "data": sample_df.to_dict('records'), | |
| "columns": list(sample_df.columns), | |
| "dtypes": sample_df.dtypes.astype(str).to_dict(), | |
| "shape": sample_df.shape | |
| } | |
| state["db_sample_dict"] = db_sample_dict | |
| logging.info(f"[DATABASE] Amostra obtida: {sample_df.shape[0]} registros") | |
| except Exception as e: | |
| error_msg = f"Erro ao obter amostra do banco: {e}" | |
| logging.error(f"[DATABASE] {error_msg}") | |
| state["db_sample_dict"] = {} | |
| state["error"] = error_msg | |
| return state | |