|
|
""" |
|
|
Nós personalizados para funcionalidades específicas |
|
|
""" |
|
|
import os |
|
|
import shutil |
|
|
import logging |
|
|
from typing import Dict, Any, TypedDict |
|
|
|
|
|
from utils.database import create_sql_database |
|
|
from utils.config import UPLOADED_CSV_PATH, SQL_DB_PATH, DEFAULT_CSV_PATH |
|
|
from agents.sql_agent import SQLAgentManager |
|
|
from nodes.csv_processing_node import csv_processing_node |
|
|
from nodes.database_node import create_database_from_dataframe_node, load_existing_database_node |
|
|
|
|
|
class FileUploadState(TypedDict): |
|
|
"""Estado para upload de arquivos""" |
|
|
file_path: str |
|
|
success: bool |
|
|
message: str |
|
|
engine: Any |
|
|
sql_agent: SQLAgentManager |
|
|
cache_manager: Any |
|
|
|
|
|
class ResetState(TypedDict): |
|
|
"""Estado para reset do sistema""" |
|
|
success: bool |
|
|
message: str |
|
|
engine: Any |
|
|
sql_agent: SQLAgentManager |
|
|
cache_manager: Any |
|
|
|
|
|
async def handle_csv_upload_node(state: FileUploadState) -> FileUploadState: |
|
|
""" |
|
|
Nó para processar upload de CSV |
|
|
|
|
|
Args: |
|
|
state: Estado do upload |
|
|
|
|
|
Returns: |
|
|
Estado atualizado |
|
|
""" |
|
|
try: |
|
|
file_path = state["file_path"] |
|
|
|
|
|
|
|
|
csv_state = { |
|
|
"file_path": file_path, |
|
|
"success": False, |
|
|
"message": "", |
|
|
"csv_data_sample": {}, |
|
|
"column_info": {}, |
|
|
"processing_stats": {} |
|
|
} |
|
|
|
|
|
csv_result = await csv_processing_node(csv_state) |
|
|
if not csv_result["success"]: |
|
|
raise Exception(csv_result["message"]) |
|
|
|
|
|
|
|
|
db_result = await create_database_from_dataframe_node(csv_result) |
|
|
if not db_result["success"]: |
|
|
raise Exception(db_result["message"]) |
|
|
|
|
|
|
|
|
from utils.object_manager import get_object_manager |
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
engine = obj_manager.get_engine(db_result["engine_id"]) |
|
|
db = obj_manager.get_object(db_result["db_id"]) |
|
|
|
|
|
logging.info("[UPLOAD] Novo banco carregado e DB atualizado usando nova arquitetura.") |
|
|
|
|
|
|
|
|
sql_agent = SQLAgentManager(db) |
|
|
|
|
|
|
|
|
state["cache_manager"].clear_cache() |
|
|
|
|
|
|
|
|
state["engine"] = engine |
|
|
state["sql_agent"] = sql_agent |
|
|
state["success"] = True |
|
|
state["message"] = "✅ CSV carregado com sucesso!" |
|
|
|
|
|
logging.info("[UPLOAD] Novo banco carregado e agente recriado. Cache limpo.") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro ao processar CSV: {e}" |
|
|
logging.error(f"[ERRO] Falha ao processar novo CSV: {e}") |
|
|
state["success"] = False |
|
|
state["message"] = error_msg |
|
|
|
|
|
return state |
|
|
|
|
|
async def reset_system_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para resetar o sistema ao estado inicial |
|
|
|
|
|
Args: |
|
|
state: Estado do reset |
|
|
|
|
|
Returns: |
|
|
Estado atualizado |
|
|
""" |
|
|
try: |
|
|
from utils.object_manager import get_object_manager |
|
|
from agents.sql_agent import SQLAgentManager |
|
|
|
|
|
obj_manager = get_object_manager() |
|
|
|
|
|
|
|
|
if os.path.exists(UPLOADED_CSV_PATH): |
|
|
os.remove(UPLOADED_CSV_PATH) |
|
|
logging.info("[RESET] CSV personalizado removido.") |
|
|
|
|
|
|
|
|
csv_state = { |
|
|
"file_path": DEFAULT_CSV_PATH, |
|
|
"success": False, |
|
|
"message": "", |
|
|
"csv_data_sample": {}, |
|
|
"column_info": {}, |
|
|
"processing_stats": {} |
|
|
} |
|
|
|
|
|
csv_result = await csv_processing_node(csv_state) |
|
|
if not csv_result["success"]: |
|
|
raise Exception(csv_result["message"]) |
|
|
|
|
|
|
|
|
db_result = await create_database_from_dataframe_node(csv_result) |
|
|
if not db_result["success"]: |
|
|
raise Exception(db_result["message"]) |
|
|
|
|
|
|
|
|
engine = obj_manager.get_engine(db_result["engine_id"]) |
|
|
db = obj_manager.get_object(db_result["db_id"]) |
|
|
|
|
|
|
|
|
sql_agent = SQLAgentManager(db, single_table_mode=False, selected_table=None) |
|
|
|
|
|
|
|
|
engine_id = obj_manager.store_engine(engine) |
|
|
agent_id = obj_manager.store_sql_agent(sql_agent) |
|
|
|
|
|
|
|
|
cache_id = state.get("cache_id") |
|
|
if cache_id: |
|
|
cache_manager = obj_manager.get_cache_manager(cache_id) |
|
|
if cache_manager: |
|
|
cache_manager.clear_cache() |
|
|
|
|
|
|
|
|
state.update({ |
|
|
"engine_id": engine_id, |
|
|
"agent_id": agent_id, |
|
|
"success": True, |
|
|
"message": "🔄 Sistema resetado para o estado inicial." |
|
|
}) |
|
|
|
|
|
logging.info("[RESET] Sistema resetado com sucesso.") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ Erro ao resetar: {e}" |
|
|
logging.error(f"[ERRO] Falha ao resetar sistema: {e}") |
|
|
state.update({ |
|
|
"success": False, |
|
|
"message": error_msg |
|
|
}) |
|
|
|
|
|
return state |
|
|
|
|
|
async def validate_system_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para validar o estado do sistema |
|
|
|
|
|
Args: |
|
|
state: Estado atual do sistema |
|
|
|
|
|
Returns: |
|
|
Estado com informações de validação |
|
|
""" |
|
|
validation_results = { |
|
|
"database_valid": False, |
|
|
"agent_valid": False, |
|
|
"cache_valid": False, |
|
|
"overall_valid": False |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
if state.get("engine"): |
|
|
from utils.database import validate_database |
|
|
validation_results["database_valid"] = validate_database(state["engine"]) |
|
|
|
|
|
|
|
|
if state.get("sql_agent"): |
|
|
validation_results["agent_valid"] = state["sql_agent"].validate_agent() |
|
|
|
|
|
|
|
|
if state.get("cache_manager"): |
|
|
validation_results["cache_valid"] = True |
|
|
|
|
|
|
|
|
validation_results["overall_valid"] = all([ |
|
|
validation_results["database_valid"], |
|
|
validation_results["agent_valid"], |
|
|
validation_results["cache_valid"] |
|
|
]) |
|
|
|
|
|
state["validation"] = validation_results |
|
|
logging.info(f"[VALIDATION] Sistema válido: {validation_results['overall_valid']}") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[VALIDATION] Erro na validação: {e}") |
|
|
state["validation"] = validation_results |
|
|
|
|
|
return state |
|
|
|
|
|
async def get_system_info_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Nó para obter informações do sistema |
|
|
|
|
|
Args: |
|
|
state: Estado atual do sistema |
|
|
|
|
|
Returns: |
|
|
Estado com informações do sistema |
|
|
""" |
|
|
system_info = { |
|
|
"csv_active": None, |
|
|
"database_path": SQL_DB_PATH, |
|
|
"agent_info": None, |
|
|
"cache_stats": None |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
from utils.config import get_active_csv_path |
|
|
system_info["csv_active"] = get_active_csv_path() |
|
|
|
|
|
|
|
|
if state.get("sql_agent"): |
|
|
system_info["agent_info"] = state["sql_agent"].get_agent_info() |
|
|
|
|
|
|
|
|
if state.get("cache_manager"): |
|
|
cache_manager = state["cache_manager"] |
|
|
system_info["cache_stats"] = { |
|
|
"cached_queries": len(cache_manager.query_cache), |
|
|
"history_entries": len(cache_manager.history_log), |
|
|
"recent_history_size": len(cache_manager.recent_history) |
|
|
} |
|
|
|
|
|
state["system_info"] = system_info |
|
|
logging.info("[SYSTEM_INFO] Informações do sistema coletadas") |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[SYSTEM_INFO] Erro ao coletar informações: {e}") |
|
|
state["system_info"] = system_info |
|
|
|
|
|
return state |
|
|
|
|
|
class CustomNodeManager: |
|
|
""" |
|
|
Gerenciador dos nós personalizados |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.node_functions = { |
|
|
"csv_upload": handle_csv_upload_node, |
|
|
"system_reset": reset_system_node, |
|
|
"system_validation": validate_system_node, |
|
|
"system_info": get_system_info_node |
|
|
} |
|
|
|
|
|
def get_node_function(self, node_name: str): |
|
|
"""Retorna função do nó pelo nome""" |
|
|
return self.node_functions.get(node_name) |
|
|
|
|
|
async def execute_node(self, node_name: str, state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Executa um nó específico |
|
|
|
|
|
Args: |
|
|
node_name: Nome do nó |
|
|
state: Estado atual |
|
|
|
|
|
Returns: |
|
|
Estado atualizado |
|
|
""" |
|
|
node_function = self.get_node_function(node_name) |
|
|
if node_function: |
|
|
return await node_function(state) |
|
|
else: |
|
|
logging.error(f"Nó não encontrado: {node_name}") |
|
|
return state |
|
|
|