agent-seara / nodes /postgresql_connection_node.py
rwayz's picture
m
e982206
"""
Nó para conexão com PostgreSQL
"""
import logging
import time
from typing import Dict, Any, Optional
from sqlalchemy import create_engine, text
from langchain_community.utilities import SQLDatabase
from utils.database import create_sql_database
from utils.object_manager import get_object_manager
from utils.validation import (
validate_postgresql_config,
sanitize_postgresql_config,
get_connection_error_message
)
async def postgresql_connection_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para estabelecer conexão com PostgreSQL
Args:
state: Estado atual do agente
Returns:
Estado atualizado com conexão PostgreSQL estabelecida
"""
try:
logging.info("[POSTGRESQL_CONNECTION] Iniciando conexão com PostgreSQL")
# Recupera e valida configuração PostgreSQL
postgresql_config = state.get("postgresql_config", {})
if not postgresql_config:
error_msg = "Configuração PostgreSQL não encontrada"
logging.error(f"[POSTGRESQL_CONNECTION] {error_msg}")
state.update({
"success": False,
"message": f"❌ {error_msg}",
"connection_error": error_msg,
"connection_success": False
})
return state
# Sanitiza e valida configuração
postgresql_config = sanitize_postgresql_config(postgresql_config)
is_valid, validation_error = validate_postgresql_config(postgresql_config)
if not is_valid:
error_msg = f"Configuração PostgreSQL inválida: {validation_error}"
logging.error(f"[POSTGRESQL_CONNECTION] {error_msg}")
state.update({
"success": False,
"message": f"❌ {validation_error}",
"connection_error": error_msg,
"connection_success": False
})
return state
# Extrai credenciais
host = postgresql_config.get("host")
port = postgresql_config.get("port", 5432)
database = postgresql_config.get("database")
username = postgresql_config.get("username")
password = postgresql_config.get("password")
# Constrói URI de conexão
connection_uri = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
logging.info(f"[POSTGRESQL_CONNECTION] Conectando a: {host}:{port}/{database}")
# Tenta estabelecer conexão
start_time = time.time()
try:
# Cria engine SQLAlchemy
engine = create_engine(
connection_uri,
pool_timeout=30,
pool_recycle=3600,
echo=False # Não mostrar SQL queries no log
)
# Testa conexão
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
result.fetchone()
connection_time = time.time() - start_time
logging.info(f"[POSTGRESQL_CONNECTION] Conexão estabelecida em {connection_time:.2f}s")
except Exception as conn_error:
error_msg = f"Falha na conexão PostgreSQL: {str(conn_error)}"
logging.error(f"[POSTGRESQL_CONNECTION] {error_msg}")
# Usa função de tratamento de erro amigável
user_error = get_connection_error_message(conn_error)
state.update({
"success": False,
"message": user_error,
"connection_error": error_msg,
"connection_success": False
})
return state
# Cria objeto SQLDatabase do LangChain (sempre com todas as tabelas para amostra)
try:
db = SQLDatabase.from_uri(connection_uri)
logging.info("[POSTGRESQL_CONNECTION] SQLDatabase criado com sucesso")
# Obtém informações do banco
table_names = db.get_usable_table_names()
logging.info(f"[POSTGRESQL_CONNECTION] Tabelas encontradas: {table_names}")
if not table_names:
warning_msg = "⚠️ Nenhuma tabela encontrada no banco de dados"
logging.warning(f"[POSTGRESQL_CONNECTION] {warning_msg}")
# Não é um erro fatal, mas avisa o usuário
except Exception as db_error:
error_msg = f"Erro ao criar SQLDatabase: {str(db_error)}"
logging.error(f"[POSTGRESQL_CONNECTION] {error_msg}")
state.update({
"success": False,
"message": f"❌ {error_msg}",
"connection_error": error_msg,
"connection_success": False
})
return state
# Armazena objetos no ObjectManager
obj_manager = get_object_manager()
engine_id = obj_manager.store_engine(engine)
db_id = obj_manager.store_database(db)
# Informações da conexão
connection_info = {
"type": "postgresql",
"host": host,
"port": port,
"database": database,
"username": username,
"table_count": len(table_names),
"tables": table_names[:10], # Primeiras 10 tabelas
"connection_time": connection_time,
"engine_id": engine_id,
"db_id": db_id
}
# Atualiza estado com sucesso
state.update({
"success": True,
"message": f"✅ Conectado ao PostgreSQL: {len(table_names)} tabelas encontradas",
"connection_info": connection_info,
"connection_error": None,
"connection_success": True,
"engine_id": engine_id,
"db_id": db_id
})
logging.info(f"[POSTGRESQL_CONNECTION] Conexão PostgreSQL estabelecida com sucesso")
logging.info(f"[POSTGRESQL_CONNECTION] Informações: {connection_info}")
return state
except Exception as e:
error_msg = f"Erro inesperado na conexão PostgreSQL: {e}"
logging.error(f"[POSTGRESQL_CONNECTION] {error_msg}")
state.update({
"success": False,
"message": f"❌ {error_msg}",
"connection_error": error_msg,
"connection_success": False
})
return state
def validate_postgresql_credentials(postgresql_config: Dict[str, Any]) -> tuple[bool, Optional[str]]:
"""
Valida credenciais PostgreSQL sem estabelecer conexão completa
Args:
postgresql_config: Configuração PostgreSQL
Returns:
Tupla (válido, mensagem_erro)
"""
try:
required_fields = ["host", "port", "database", "username", "password"]
for field in required_fields:
if not postgresql_config.get(field):
return False, f"Campo obrigatório ausente: {field}"
# Validações básicas
port = postgresql_config.get("port")
try:
port_int = int(port)
if port_int < 1 or port_int > 65535:
return False, "Porta deve estar entre 1 e 65535"
except (ValueError, TypeError):
return False, "Porta deve ser um número válido"
host = postgresql_config.get("host", "").strip()
if not host:
return False, "Host não pode estar vazio"
database = postgresql_config.get("database", "").strip()
if not database:
return False, "Nome do banco não pode estar vazio"
username = postgresql_config.get("username", "").strip()
if not username:
return False, "Nome de usuário não pode estar vazio"
return True, None
except Exception as e:
return False, f"Erro na validação: {e}"
async def test_postgresql_connection_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para testar conexão PostgreSQL sem armazenar
Args:
state: Estado atual do agente
Returns:
Estado atualizado com resultado do teste
"""
try:
logging.info("[POSTGRESQL_TEST] Testando conexão PostgreSQL")
postgresql_config = state.get("postgresql_config", {})
# Valida credenciais
is_valid, error_msg = validate_postgresql_credentials(postgresql_config)
if not is_valid:
state.update({
"test_success": False,
"test_message": f"❌ {error_msg}",
"test_error": error_msg
})
return state
# Testa conexão rápida
host = postgresql_config.get("host")
port = postgresql_config.get("port", 5432)
database = postgresql_config.get("database")
username = postgresql_config.get("username")
password = postgresql_config.get("password")
connection_uri = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
try:
engine = create_engine(connection_uri, pool_timeout=10)
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
state.update({
"test_success": True,
"test_message": "✅ Conexão PostgreSQL testada com sucesso",
"test_error": None
})
except Exception as e:
error_msg = f"Falha no teste de conexão: {str(e)}"
state.update({
"test_success": False,
"test_message": f"❌ {error_msg}",
"test_error": error_msg
})
return state
except Exception as e:
error_msg = f"Erro no teste de conexão: {e}"
logging.error(f"[POSTGRESQL_TEST] {error_msg}")
state.update({
"test_success": False,
"test_message": f"❌ {error_msg}",
"test_error": error_msg
})
return state