DataGraph / nodes /custom_nodes.py
rwayz's picture
ss
7094511
"""
Nós personalizados para funcionalidades específicas
"""
import os
import shutil
import logging
from typing import Dict, Any, TypedDict
from utils.database import create_sql_database
from utils.config import UPLOADED_CSV_PATH, SQL_DB_PATH, DEFAULT_CSV_PATH
from agents.sql_agent import SQLAgentManager
from nodes.csv_processing_node import csv_processing_node
from nodes.database_node import create_database_from_dataframe_node, load_existing_database_node
class FileUploadState(TypedDict):
"""Estado para upload de arquivos"""
file_path: str
success: bool
message: str
engine: Any
sql_agent: SQLAgentManager
cache_manager: Any
class ResetState(TypedDict):
"""Estado para reset do sistema"""
success: bool
message: str
engine: Any
sql_agent: SQLAgentManager
cache_manager: Any
async def handle_csv_upload_node(state: FileUploadState) -> FileUploadState:
"""
Nó para processar upload de CSV
Args:
state: Estado do upload
Returns:
Estado atualizado
"""
try:
file_path = state["file_path"]
# Etapa 1: Processa CSV usando nova arquitetura
csv_state = {
"file_path": file_path,
"success": False,
"message": "",
"csv_data_sample": {},
"column_info": {},
"processing_stats": {}
}
csv_result = await csv_processing_node(csv_state)
if not csv_result["success"]:
raise Exception(csv_result["message"])
# Etapa 2: Cria banco de dados
db_result = await create_database_from_dataframe_node(csv_result)
if not db_result["success"]:
raise Exception(db_result["message"])
# Recupera objetos criados
from utils.object_manager import get_object_manager
obj_manager = get_object_manager()
engine = obj_manager.get_engine(db_result["engine_id"])
db = obj_manager.get_object(db_result["db_id"])
logging.info("[UPLOAD] Novo banco carregado e DB atualizado usando nova arquitetura.")
# Recria agente SQL
sql_agent = SQLAgentManager(db)
# Limpa cache
state["cache_manager"].clear_cache()
# Atualiza estado
state["engine"] = engine
state["sql_agent"] = sql_agent
state["success"] = True
state["message"] = "✅ CSV carregado com sucesso!"
logging.info("[UPLOAD] Novo banco carregado e agente recriado. Cache limpo.")
except Exception as e:
error_msg = f"❌ Erro ao processar CSV: {e}"
logging.error(f"[ERRO] Falha ao processar novo CSV: {e}")
state["success"] = False
state["message"] = error_msg
return state
async def reset_system_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para resetar o sistema ao estado inicial
Args:
state: Estado do reset
Returns:
Estado atualizado
"""
try:
from utils.object_manager import get_object_manager
from agents.sql_agent import SQLAgentManager
obj_manager = get_object_manager()
# Remove CSV personalizado se existir
if os.path.exists(UPLOADED_CSV_PATH):
os.remove(UPLOADED_CSV_PATH)
logging.info("[RESET] CSV personalizado removido.")
# Recria banco com CSV padrão usando nova arquitetura
csv_state = {
"file_path": DEFAULT_CSV_PATH,
"success": False,
"message": "",
"csv_data_sample": {},
"column_info": {},
"processing_stats": {}
}
csv_result = await csv_processing_node(csv_state)
if not csv_result["success"]:
raise Exception(csv_result["message"])
# Cria banco de dados
db_result = await create_database_from_dataframe_node(csv_result)
if not db_result["success"]:
raise Exception(db_result["message"])
# Recupera objetos criados
engine = obj_manager.get_engine(db_result["engine_id"])
db = obj_manager.get_object(db_result["db_id"])
# Recria agente SQL (modo padrão multi-tabela)
sql_agent = SQLAgentManager(db, single_table_mode=False, selected_table=None)
# Atualiza objetos no gerenciador
engine_id = obj_manager.store_engine(engine)
agent_id = obj_manager.store_sql_agent(sql_agent)
# Limpa cache se disponível
cache_id = state.get("cache_id")
if cache_id:
cache_manager = obj_manager.get_cache_manager(cache_id)
if cache_manager:
cache_manager.clear_cache()
# Atualiza estado
state.update({
"engine_id": engine_id,
"agent_id": agent_id,
"success": True,
"message": "🔄 Sistema resetado para o estado inicial."
})
logging.info("[RESET] Sistema resetado com sucesso.")
except Exception as e:
error_msg = f"❌ Erro ao resetar: {e}"
logging.error(f"[ERRO] Falha ao resetar sistema: {e}")
state.update({
"success": False,
"message": error_msg
})
return state
async def validate_system_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para validar o estado do sistema
Args:
state: Estado atual do sistema
Returns:
Estado com informações de validação
"""
validation_results = {
"database_valid": False,
"agent_valid": False,
"cache_valid": False,
"overall_valid": False
}
try:
# Valida banco de dados
if state.get("engine"):
from utils.database import validate_database
validation_results["database_valid"] = validate_database(state["engine"])
# Valida agente SQL
if state.get("sql_agent"):
validation_results["agent_valid"] = state["sql_agent"].validate_agent()
# Valida cache
if state.get("cache_manager"):
validation_results["cache_valid"] = True # Cache sempre válido se existe
# Validação geral
validation_results["overall_valid"] = all([
validation_results["database_valid"],
validation_results["agent_valid"],
validation_results["cache_valid"]
])
state["validation"] = validation_results
logging.info(f"[VALIDATION] Sistema válido: {validation_results['overall_valid']}")
except Exception as e:
logging.error(f"[VALIDATION] Erro na validação: {e}")
state["validation"] = validation_results
return state
async def get_system_info_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para obter informações do sistema
Args:
state: Estado atual do sistema
Returns:
Estado com informações do sistema
"""
system_info = {
"csv_active": None,
"database_path": SQL_DB_PATH,
"agent_info": None,
"cache_stats": None
}
try:
# Informações do CSV ativo
from utils.config import get_active_csv_path
system_info["csv_active"] = get_active_csv_path()
# Informações do agente
if state.get("sql_agent"):
system_info["agent_info"] = state["sql_agent"].get_agent_info()
# Estatísticas do cache
if state.get("cache_manager"):
cache_manager = state["cache_manager"]
system_info["cache_stats"] = {
"cached_queries": len(cache_manager.query_cache),
"history_entries": len(cache_manager.history_log),
"recent_history_size": len(cache_manager.recent_history)
}
state["system_info"] = system_info
logging.info("[SYSTEM_INFO] Informações do sistema coletadas")
except Exception as e:
logging.error(f"[SYSTEM_INFO] Erro ao coletar informações: {e}")
state["system_info"] = system_info
return state
class CustomNodeManager:
"""
Gerenciador dos nós personalizados
"""
def __init__(self):
self.node_functions = {
"csv_upload": handle_csv_upload_node,
"system_reset": reset_system_node,
"system_validation": validate_system_node,
"system_info": get_system_info_node
}
def get_node_function(self, node_name: str):
"""Retorna função do nó pelo nome"""
return self.node_functions.get(node_name)
async def execute_node(self, node_name: str, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Executa um nó específico
Args:
node_name: Nome do nó
state: Estado atual
Returns:
Estado atualizado
"""
node_function = self.get_node_function(node_name)
if node_function:
return await node_function(state)
else:
logging.error(f"Nó não encontrado: {node_name}")
return state