DataGraph / nodes /query_node.py
rwayz's picture
Deploy
89293f9
raw
history blame
8.01 kB
"""
Nó para processamento de consultas SQL
"""
import time
import logging
import pandas as pd
from typing import Dict, Any, TypedDict
from agents.tools import is_greeting, detect_query_type, prepare_sql_context
from agents.sql_agent import SQLAgentManager
from utils.object_manager import get_object_manager
class QueryState(TypedDict):
"""Estado para processamento de consultas"""
user_input: str
selected_model: str
response: str
execution_time: float
error: str
intermediate_steps: list
llama_instruction: str
sql_result: dict
async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó principal para processar consulta do usuário
Args:
state: Estado atual com entrada do usuário
Returns:
Estado atualizado com resposta processada
"""
start_time = time.time()
user_input = state["user_input"]
selected_model = state["selected_model"]
logging.info(f"[QUERY] Processando: {user_input[:50]}...")
try:
# Verifica se é saudação
if is_greeting(user_input):
greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!"
state.update({
"response": greeting_response,
"execution_time": time.time() - start_time,
"error": None
})
return state
# Recupera objetos necessários
obj_manager = get_object_manager()
# Recupera cache manager
cache_id = state.get("cache_id")
cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None
# Verifica cache se disponível
if cache_manager:
cached_response = cache_manager.get_cached_response(user_input)
if cached_response:
logging.info(f"[CACHE] Retornando resposta do cache")
state.update({
"response": cached_response,
"execution_time": time.time() - start_time,
"error": None
})
return state
# Converte amostra do banco para DataFrame
db_sample_dict = state.get("db_sample_dict", {})
if not db_sample_dict:
raise ValueError("Amostra do banco não disponível")
# Reconstrói DataFrame da amostra
db_sample = pd.DataFrame(db_sample_dict.get("data", []))
if db_sample.empty:
raise ValueError("Dados de amostra vazios")
# Detecta tipo de query e prepara contexto
query_type = detect_query_type(user_input)
state["query_type"] = query_type
if query_type in ['sql_query', 'sql_query_graphic']:
# Prepara contexto para envio direto ao agentSQL
sql_context = prepare_sql_context(user_input, db_sample)
state["sql_context"] = sql_context
logging.info(f"[DEBUG] Tipo de query detectado: {query_type}")
logging.info(f"[DEBUG] Contexto preparado para agentSQL:\n{sql_context}\n")
else:
# Para tipos futuros (prediction)
error_msg = f"Tipo de query '{query_type}' ainda não implementado."
state.update({
"error": error_msg,
"response": error_msg,
"execution_time": time.time() - start_time
})
return state
# Recupera agente SQL
agent_id = state.get("agent_id")
if not agent_id:
raise ValueError("ID do agente SQL não encontrado")
sql_agent = obj_manager.get_sql_agent(agent_id)
if not sql_agent:
raise ValueError("Agente SQL não encontrado")
# Executa query no agente SQL com contexto direto
sql_result = await sql_agent.execute_query(state["sql_context"])
if not sql_result["success"]:
state.update({
"error": sql_result["output"],
"response": sql_result["output"],
"sql_result": sql_result
})
else:
# Captura query SQL do resultado do agente
sql_query_captured = sql_result.get("sql_query")
state.update({
"response": sql_result["output"],
"intermediate_steps": sql_result["intermediate_steps"],
"sql_result": sql_result,
"sql_query_extracted": sql_query_captured, # ← Query SQL capturada
"error": None
})
# Log apenas se não foi capturada (caso de erro)
if not sql_query_captured:
logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler")
# Armazena no cache se disponível
if cache_manager and sql_result["success"]:
cache_manager.cache_response(user_input, state["response"])
state["execution_time"] = time.time() - start_time
logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s")
except Exception as e:
error_msg = f"Erro ao processar query: {e}"
logging.error(f"[QUERY] {error_msg}")
state.update({
"error": error_msg,
"response": error_msg,
"execution_time": time.time() - start_time
})
return state
async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para validar entrada da consulta
Args:
state: Estado com entrada do usuário
Returns:
Estado atualizado com validação
"""
user_input = state.get("user_input", "").strip()
if not user_input:
state.update({
"error": "Entrada vazia",
"response": "Por favor, digite uma pergunta.",
"execution_time": 0.0
})
return state
if len(user_input) > 1000:
state.update({
"error": "Entrada muito longa",
"response": "Pergunta muito longa. Por favor, seja mais conciso.",
"execution_time": 0.0
})
return state
# Validação passou
state["error"] = None
logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres")
return state
async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para preparar contexto da consulta
Args:
state: Estado atual
Returns:
Estado com contexto preparado
"""
try:
# Verifica se todos os componentes necessários estão disponíveis
required_ids = ["agent_id", "engine_id", "cache_id"]
missing_ids = [id_name for id_name in required_ids if not state.get(id_name)]
if missing_ids:
raise ValueError(f"IDs necessários não encontrados: {missing_ids}")
obj_manager = get_object_manager()
# Verifica se objetos existem
for id_name in required_ids:
obj_id = state[id_name]
if id_name == "agent_id":
obj = obj_manager.get_sql_agent(obj_id)
elif id_name == "engine_id":
obj = obj_manager.get_engine(obj_id)
elif id_name == "cache_id":
obj = obj_manager.get_cache_manager(obj_id)
if obj is None:
raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}")
# Contexto preparado com sucesso
state["context_ready"] = True
logging.info("[CONTEXT] Contexto da consulta preparado")
except Exception as e:
error_msg = f"Erro ao preparar contexto: {e}"
logging.error(f"[CONTEXT] {error_msg}")
state.update({
"error": error_msg,
"context_ready": False
})
return state