File size: 8,012 Bytes
89293f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
"""
Nó para processamento de consultas SQL
"""
import time
import logging
import pandas as pd
from typing import Dict, Any, TypedDict
from agents.tools import is_greeting, detect_query_type, prepare_sql_context
from agents.sql_agent import SQLAgentManager
from utils.object_manager import get_object_manager
class QueryState(TypedDict):
"""Estado para processamento de consultas"""
user_input: str
selected_model: str
response: str
execution_time: float
error: str
intermediate_steps: list
llama_instruction: str
sql_result: dict
async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó principal para processar consulta do usuário
Args:
state: Estado atual com entrada do usuário
Returns:
Estado atualizado com resposta processada
"""
start_time = time.time()
user_input = state["user_input"]
selected_model = state["selected_model"]
logging.info(f"[QUERY] Processando: {user_input[:50]}...")
try:
# Verifica se é saudação
if is_greeting(user_input):
greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!"
state.update({
"response": greeting_response,
"execution_time": time.time() - start_time,
"error": None
})
return state
# Recupera objetos necessários
obj_manager = get_object_manager()
# Recupera cache manager
cache_id = state.get("cache_id")
cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None
# Verifica cache se disponível
if cache_manager:
cached_response = cache_manager.get_cached_response(user_input)
if cached_response:
logging.info(f"[CACHE] Retornando resposta do cache")
state.update({
"response": cached_response,
"execution_time": time.time() - start_time,
"error": None
})
return state
# Converte amostra do banco para DataFrame
db_sample_dict = state.get("db_sample_dict", {})
if not db_sample_dict:
raise ValueError("Amostra do banco não disponível")
# Reconstrói DataFrame da amostra
db_sample = pd.DataFrame(db_sample_dict.get("data", []))
if db_sample.empty:
raise ValueError("Dados de amostra vazios")
# Detecta tipo de query e prepara contexto
query_type = detect_query_type(user_input)
state["query_type"] = query_type
if query_type in ['sql_query', 'sql_query_graphic']:
# Prepara contexto para envio direto ao agentSQL
sql_context = prepare_sql_context(user_input, db_sample)
state["sql_context"] = sql_context
logging.info(f"[DEBUG] Tipo de query detectado: {query_type}")
logging.info(f"[DEBUG] Contexto preparado para agentSQL:\n{sql_context}\n")
else:
# Para tipos futuros (prediction)
error_msg = f"Tipo de query '{query_type}' ainda não implementado."
state.update({
"error": error_msg,
"response": error_msg,
"execution_time": time.time() - start_time
})
return state
# Recupera agente SQL
agent_id = state.get("agent_id")
if not agent_id:
raise ValueError("ID do agente SQL não encontrado")
sql_agent = obj_manager.get_sql_agent(agent_id)
if not sql_agent:
raise ValueError("Agente SQL não encontrado")
# Executa query no agente SQL com contexto direto
sql_result = await sql_agent.execute_query(state["sql_context"])
if not sql_result["success"]:
state.update({
"error": sql_result["output"],
"response": sql_result["output"],
"sql_result": sql_result
})
else:
# Captura query SQL do resultado do agente
sql_query_captured = sql_result.get("sql_query")
state.update({
"response": sql_result["output"],
"intermediate_steps": sql_result["intermediate_steps"],
"sql_result": sql_result,
"sql_query_extracted": sql_query_captured, # ← Query SQL capturada
"error": None
})
# Log apenas se não foi capturada (caso de erro)
if not sql_query_captured:
logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler")
# Armazena no cache se disponível
if cache_manager and sql_result["success"]:
cache_manager.cache_response(user_input, state["response"])
state["execution_time"] = time.time() - start_time
logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s")
except Exception as e:
error_msg = f"Erro ao processar query: {e}"
logging.error(f"[QUERY] {error_msg}")
state.update({
"error": error_msg,
"response": error_msg,
"execution_time": time.time() - start_time
})
return state
async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para validar entrada da consulta
Args:
state: Estado com entrada do usuário
Returns:
Estado atualizado com validação
"""
user_input = state.get("user_input", "").strip()
if not user_input:
state.update({
"error": "Entrada vazia",
"response": "Por favor, digite uma pergunta.",
"execution_time": 0.0
})
return state
if len(user_input) > 1000:
state.update({
"error": "Entrada muito longa",
"response": "Pergunta muito longa. Por favor, seja mais conciso.",
"execution_time": 0.0
})
return state
# Validação passou
state["error"] = None
logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres")
return state
async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Nó para preparar contexto da consulta
Args:
state: Estado atual
Returns:
Estado com contexto preparado
"""
try:
# Verifica se todos os componentes necessários estão disponíveis
required_ids = ["agent_id", "engine_id", "cache_id"]
missing_ids = [id_name for id_name in required_ids if not state.get(id_name)]
if missing_ids:
raise ValueError(f"IDs necessários não encontrados: {missing_ids}")
obj_manager = get_object_manager()
# Verifica se objetos existem
for id_name in required_ids:
obj_id = state[id_name]
if id_name == "agent_id":
obj = obj_manager.get_sql_agent(obj_id)
elif id_name == "engine_id":
obj = obj_manager.get_engine(obj_id)
elif id_name == "cache_id":
obj = obj_manager.get_cache_manager(obj_id)
if obj is None:
raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}")
# Contexto preparado com sucesso
state["context_ready"] = True
logging.info("[CONTEXT] Contexto da consulta preparado")
except Exception as e:
error_msg = f"Erro ao preparar contexto: {e}"
logging.error(f"[CONTEXT] {error_msg}")
state.update({
"error": error_msg,
"context_ready": False
})
return state
|