agent-seara / nodes /database_node.py
rwayz's picture
m
e982206
"""
Nó para operações de banco de dados
"""
import os
import logging
import pandas as pd
from typing import Dict, Any, TypedDict, Optional
from sqlalchemy import create_engine
from utils.config import SQL_DB_PATH
from utils.database import create_sql_database, validate_database
from utils.object_manager import get_object_manager
class DatabaseState(TypedDict):
"""Estado para operações de banco de dados"""
success: bool
message: str
database_info: dict
engine_id: str
db_id: str
async def create_database_from_dataframe_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para criar banco de dados a partir de DataFrame processado
Args:
state: Estado contendo informações do DataFrame processado
Returns:
Estado atualizado com informações do banco
"""
try:
obj_manager = get_object_manager()
# Recupera DataFrame processado
df_id = state.get("dataframe_id")
if not df_id:
raise ValueError("ID do DataFrame não encontrado no estado")
processed_df = obj_manager.get_object(df_id)
if processed_df is None:
raise ValueError("DataFrame processado não encontrado")
# Recupera informações das colunas
column_info = state.get("column_info", {})
sql_types = column_info.get("sql_types", {})
# Cria engine do banco
engine = create_engine(f"sqlite:///{SQL_DB_PATH}")
# Salva DataFrame no banco
processed_df.to_sql(
"tabela",
engine,
index=False,
if_exists="replace",
dtype=sql_types
)
logging.info(f"[DATABASE] Banco criado com {len(processed_df)} registros")
# Cria objeto SQLDatabase do LangChain
db = create_sql_database(engine)
# Valida banco
is_valid = validate_database(engine)
# Armazena objetos no gerenciador
engine_id = obj_manager.store_engine(engine)
db_id = obj_manager.store_database(db)
# Informações do banco
database_info = {
"path": SQL_DB_PATH,
"table_name": "tabela",
"total_records": len(processed_df),
"columns": list(processed_df.columns),
"column_types": {col: str(dtype) for col, dtype in processed_df.dtypes.items()},
"is_valid": is_valid,
"sql_types_used": {col: str(sql_type) for col, sql_type in sql_types.items()}
}
# Atualiza estado
state.update({
"success": True,
"message": f"✅ Banco de dados criado com sucesso! {len(processed_df)} registros salvos",
"database_info": database_info,
"engine_id": engine_id,
"db_id": db_id
})
logging.info(f"[DATABASE] Banco criado e validado: {database_info}")
except Exception as e:
error_msg = f"❌ Erro ao criar banco de dados: {e}"
logging.error(f"[DATABASE] {error_msg}")
state.update({
"success": False,
"message": error_msg,
"database_info": {},
"engine_id": "",
"db_id": ""
})
return state
async def load_existing_database_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para carregar banco de dados existente
Args:
state: Estado atual
Returns:
Estado atualizado com informações do banco existente
"""
try:
if not os.path.exists(SQL_DB_PATH):
raise ValueError("Banco de dados não encontrado")
# Cria engine
engine = create_engine(f"sqlite:///{SQL_DB_PATH}")
# Cria objeto SQLDatabase
db = create_sql_database(engine)
# Valida banco
is_valid = validate_database(engine)
# Obtém informações do banco
try:
sample_df = pd.read_sql_query("SELECT * FROM tabela LIMIT 5", engine)
total_records_df = pd.read_sql_query("SELECT COUNT(*) as count FROM tabela", engine)
total_records = total_records_df.iloc[0]['count']
database_info = {
"path": SQL_DB_PATH,
"table_name": "tabela",
"total_records": total_records,
"columns": list(sample_df.columns),
"column_types": {col: str(dtype) for col, dtype in sample_df.dtypes.items()},
"is_valid": is_valid,
"sample_data": sample_df.head(3).to_dict()
}
except Exception as e:
logging.warning(f"Erro ao obter informações detalhadas do banco: {e}")
database_info = {
"path": SQL_DB_PATH,
"table_name": "tabela",
"is_valid": is_valid,
"error": str(e)
}
# Armazena objetos no gerenciador
obj_manager = get_object_manager()
engine_id = obj_manager.store_engine(engine)
db_id = obj_manager.store_database(db)
# Atualiza estado
state.update({
"success": True,
"message": "✅ Banco de dados existente carregado com sucesso",
"database_info": database_info,
"engine_id": engine_id,
"db_id": db_id
})
logging.info(f"[DATABASE] Banco existente carregado: {database_info}")
except Exception as e:
error_msg = f"❌ Erro ao carregar banco existente: {e}"
logging.error(f"[DATABASE] {error_msg}")
state.update({
"success": False,
"message": error_msg,
"database_info": {},
"engine_id": "",
"db_id": ""
})
return state
async def get_database_sample_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para obter amostra dos dados do banco
Args:
state: Estado contendo ID da engine
Returns:
Estado atualizado com amostra dos dados
"""
try:
obj_manager = get_object_manager()
# Recupera engine
engine_id = state.get("engine_id")
if not engine_id:
raise ValueError("ID da engine não encontrado")
engine = obj_manager.get_engine(engine_id)
if not engine:
raise ValueError("Engine não encontrada")
# Determina qual tabela usar para amostra
connection_type = state.get("connection_type", "csv")
if connection_type == "postgresql":
# Para PostgreSQL, detecta dinamicamente a primeira tabela disponível com dados
import sqlalchemy as sa
try:
with engine.connect() as conn:
# Obtém lista de tabelas disponíveis
tables_result = conn.execute(sa.text("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
"""))
available_tables = [row[0] for row in tables_result.fetchall()]
if not available_tables:
raise ValueError("Nenhuma tabela encontrada no banco PostgreSQL")
# Tenta encontrar uma tabela com dados
table_name = None
for table in available_tables:
try:
# Verifica se a tabela tem dados
count_result = conn.execute(sa.text(f"SELECT COUNT(*) FROM {table} LIMIT 1"))
count = count_result.scalar()
if count > 0:
table_name = table
logging.info(f"[DATABASE] PostgreSQL - usando tabela '{table_name}' para amostra ({count} registros)")
break
except Exception as e:
logging.warning(f"[DATABASE] Erro ao verificar tabela {table}: {e}")
continue
# Se nenhuma tabela tem dados, usa a primeira disponível
if not table_name:
table_name = available_tables[0]
logging.info(f"[DATABASE] PostgreSQL - usando primeira tabela '{table_name}' (sem dados detectados)")
except Exception as e:
logging.error(f"[DATABASE] Erro ao detectar tabelas PostgreSQL: {e}")
raise ValueError(f"Erro ao acessar tabelas PostgreSQL: {e}")
else:
table_name = "tabela" # Padrão para CSV
logging.info(f"[DATABASE] CSV - usando tabela padrão: {table_name}")
# Obtém amostra dos dados
try:
sample_df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", engine)
logging.info(f"[DATABASE] Amostra obtida da tabela '{table_name}': {sample_df.shape[0]} registros")
except Exception as e:
logging.error(f"[DATABASE] Erro ao obter amostra da tabela '{table_name}': {e}")
# Se falhar, cria DataFrame vazio para não quebrar o fluxo
sample_df = pd.DataFrame()
# Converte para formato serializável
db_sample_dict = {
"data": sample_df.to_dict('records'),
"columns": list(sample_df.columns),
"dtypes": sample_df.dtypes.astype(str).to_dict(),
"shape": sample_df.shape
}
state["db_sample_dict"] = db_sample_dict
logging.info(f"[DATABASE] Amostra obtida: {sample_df.shape[0]} registros")
except Exception as e:
error_msg = f"Erro ao obter amostra do banco: {e}"
logging.error(f"[DATABASE] {error_msg}")
state["db_sample_dict"] = {}
state["error"] = error_msg
return state