File size: 8,012 Bytes
89293f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""
Nó para processamento de consultas SQL
"""
import time
import logging
import pandas as pd
from typing import Dict, Any, TypedDict

from agents.tools import is_greeting, detect_query_type, prepare_sql_context
from agents.sql_agent import SQLAgentManager
from utils.object_manager import get_object_manager

class QueryState(TypedDict):
    """Estado para processamento de consultas"""
    user_input: str
    selected_model: str
    response: str
    execution_time: float
    error: str
    intermediate_steps: list
    llama_instruction: str
    sql_result: dict

async def process_user_query_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó principal para processar consulta do usuário
    
    Args:
        state: Estado atual com entrada do usuário
        
    Returns:
        Estado atualizado com resposta processada
    """
    start_time = time.time()
    user_input = state["user_input"]
    selected_model = state["selected_model"]
    
    logging.info(f"[QUERY] Processando: {user_input[:50]}...")
    
    try:
        # Verifica se é saudação
        if is_greeting(user_input):
            greeting_response = "Olá! Estou aqui para ajudar com suas consultas. Pergunte algo relacionado aos dados carregados no agente!"
            state.update({
                "response": greeting_response,
                "execution_time": time.time() - start_time,
                "error": None
            })
            return state
        
        # Recupera objetos necessários
        obj_manager = get_object_manager()
        
        # Recupera cache manager
        cache_id = state.get("cache_id")
        cache_manager = obj_manager.get_cache_manager(cache_id) if cache_id else None
        
        # Verifica cache se disponível
        if cache_manager:
            cached_response = cache_manager.get_cached_response(user_input)
            if cached_response:
                logging.info(f"[CACHE] Retornando resposta do cache")
                state.update({
                    "response": cached_response,
                    "execution_time": time.time() - start_time,
                    "error": None
                })
                return state
        
        # Converte amostra do banco para DataFrame
        db_sample_dict = state.get("db_sample_dict", {})
        if not db_sample_dict:
            raise ValueError("Amostra do banco não disponível")
        
        # Reconstrói DataFrame da amostra
        db_sample = pd.DataFrame(db_sample_dict.get("data", []))
        if db_sample.empty:
            raise ValueError("Dados de amostra vazios")
        
        # Detecta tipo de query e prepara contexto
        query_type = detect_query_type(user_input)
        state["query_type"] = query_type

        if query_type in ['sql_query', 'sql_query_graphic']:
            # Prepara contexto para envio direto ao agentSQL
            sql_context = prepare_sql_context(user_input, db_sample)
            state["sql_context"] = sql_context

            logging.info(f"[DEBUG] Tipo de query detectado: {query_type}")
            logging.info(f"[DEBUG] Contexto preparado para agentSQL:\n{sql_context}\n")
        else:
            # Para tipos futuros (prediction)
            error_msg = f"Tipo de query '{query_type}' ainda não implementado."
            state.update({
                "error": error_msg,
                "response": error_msg,
                "execution_time": time.time() - start_time
            })
            return state
        
        # Recupera agente SQL
        agent_id = state.get("agent_id")
        if not agent_id:
            raise ValueError("ID do agente SQL não encontrado")
        
        sql_agent = obj_manager.get_sql_agent(agent_id)
        if not sql_agent:
            raise ValueError("Agente SQL não encontrado")
        
        # Executa query no agente SQL com contexto direto
        sql_result = await sql_agent.execute_query(state["sql_context"])
        
        if not sql_result["success"]:
            state.update({
                "error": sql_result["output"],
                "response": sql_result["output"],
                "sql_result": sql_result
            })
        else:
            # Captura query SQL do resultado do agente
            sql_query_captured = sql_result.get("sql_query")

            state.update({
                "response": sql_result["output"],
                "intermediate_steps": sql_result["intermediate_steps"],
                "sql_result": sql_result,
                "sql_query_extracted": sql_query_captured,  # ← Query SQL capturada
                "error": None
            })

            # Log apenas se não foi capturada (caso de erro)
            if not sql_query_captured:
                logging.warning("[QUERY] ⚠️ Nenhuma query SQL foi capturada pelo handler")
        
        # Armazena no cache se disponível
        if cache_manager and sql_result["success"]:
            cache_manager.cache_response(user_input, state["response"])
        
        state["execution_time"] = time.time() - start_time
        logging.info(f"[QUERY] Concluído em {state['execution_time']:.2f}s")
        
    except Exception as e:
        error_msg = f"Erro ao processar query: {e}"
        logging.error(f"[QUERY] {error_msg}")
        state.update({
            "error": error_msg,
            "response": error_msg,
            "execution_time": time.time() - start_time
        })
    
    return state

async def validate_query_input_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para validar entrada da consulta
    
    Args:
        state: Estado com entrada do usuário
        
    Returns:
        Estado atualizado com validação
    """
    user_input = state.get("user_input", "").strip()
    
    if not user_input:
        state.update({
            "error": "Entrada vazia",
            "response": "Por favor, digite uma pergunta.",
            "execution_time": 0.0
        })
        return state
    
    if len(user_input) > 1000:
        state.update({
            "error": "Entrada muito longa",
            "response": "Pergunta muito longa. Por favor, seja mais conciso.",
            "execution_time": 0.0
        })
        return state
    
    # Validação passou
    state["error"] = None
    logging.info(f"[VALIDATION] Entrada validada: {len(user_input)} caracteres")
    
    return state

async def prepare_query_context_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Nó para preparar contexto da consulta
    
    Args:
        state: Estado atual
        
    Returns:
        Estado com contexto preparado
    """
    try:
        # Verifica se todos os componentes necessários estão disponíveis
        required_ids = ["agent_id", "engine_id", "cache_id"]
        missing_ids = [id_name for id_name in required_ids if not state.get(id_name)]
        
        if missing_ids:
            raise ValueError(f"IDs necessários não encontrados: {missing_ids}")
        
        obj_manager = get_object_manager()
        
        # Verifica se objetos existem
        for id_name in required_ids:
            obj_id = state[id_name]
            if id_name == "agent_id":
                obj = obj_manager.get_sql_agent(obj_id)
            elif id_name == "engine_id":
                obj = obj_manager.get_engine(obj_id)
            elif id_name == "cache_id":
                obj = obj_manager.get_cache_manager(obj_id)
            
            if obj is None:
                raise ValueError(f"Objeto não encontrado para {id_name}: {obj_id}")
        
        # Contexto preparado com sucesso
        state["context_ready"] = True
        logging.info("[CONTEXT] Contexto da consulta preparado")
        
    except Exception as e:
        error_msg = f"Erro ao preparar contexto: {e}"
        logging.error(f"[CONTEXT] {error_msg}")
        state.update({
            "error": error_msg,
            "context_ready": False
        })
    
    return state