Spaces:
Sleeping
Sleeping
File size: 16,019 Bytes
e982206 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
"""
Criação e configuração do agente SQL
"""
import logging
import time
import asyncio
from typing import Optional, Dict, Any, List
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import AgentAction, AgentFinish
from utils.config import (
MAX_ITERATIONS,
TEMPERATURE,
AVAILABLE_MODELS,
OPENAI_MODELS,
ANTHROPIC_MODELS,
GOOGLE_MODELS
)
class SQLQueryCaptureHandler(BaseCallbackHandler):
"""
Handler para capturar queries SQL executadas pelo agente
"""
def __init__(self):
super().__init__()
self.sql_queries: List[str] = []
self.agent_actions: List[Dict[str, Any]] = []
self.step_count = 0
def on_agent_action(self, action: AgentAction, **kwargs) -> None:
"""
Captura ações do agente, especialmente queries SQL
Args:
action: Ação do agente
"""
try:
self.step_count += 1
tool_name = action.tool
tool_input = action.tool_input
# Capturar SQL especificamente (sem log de cada passo)
if tool_name == 'sql_db_query' and isinstance(tool_input, dict):
sql_query = tool_input.get('query', '')
if sql_query and sql_query.strip():
clean_query = sql_query.strip()
self.sql_queries.append(clean_query)
# Log apenas uma vez com query completa
logging.info(f"[SQL_HANDLER] 🔍 Query SQL capturada:\n{clean_query}")
# Armazenar todas as ações para debug
self.agent_actions.append({
"step": self.step_count,
"tool": tool_name,
"input": tool_input,
"timestamp": time.time()
})
except Exception as e:
logging.error(f"[SQL_HANDLER] Erro ao capturar ação: {e}")
def get_last_sql_query(self) -> Optional[str]:
"""
Retorna a última query SQL capturada
Returns:
Última query SQL ou None se não houver
"""
return self.sql_queries[-1] if self.sql_queries else None
def get_all_sql_queries(self) -> List[str]:
"""
Retorna todas as queries SQL capturadas
Returns:
Lista de queries SQL
"""
return self.sql_queries.copy()
def reset(self):
"""Reseta o handler para nova execução"""
self.sql_queries.clear()
self.agent_actions.clear()
self.step_count = 0
async def retry_with_backoff(func, max_retries=3, base_delay=1.0):
"""
Executa função com retry e backoff exponencial para lidar com rate limiting
Args:
func: Função a ser executada
max_retries: Número máximo de tentativas
base_delay: Delay base em segundos
Returns:
Resultado da função ou levanta exceção após esgotar tentativas
"""
for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
error_str = str(e)
# Verifica se é erro de rate limiting ou overload
if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']):
if attempt < max_retries:
delay = base_delay * (2 ** attempt) # Backoff exponencial
logging.warning(f"API sobrecarregada (tentativa {attempt + 1}/{max_retries + 1}). Aguardando {delay}s...")
await asyncio.sleep(delay)
continue
else:
logging.error(f"API continua sobrecarregada após {max_retries + 1} tentativas")
raise Exception(f"API da Anthropic sobrecarregada. Tente novamente em alguns minutos. Erro original: {e}")
else:
# Se não é erro de rate limiting, levanta imediatamente
raise e
# Não deveria chegar aqui, mas por segurança
raise Exception("Número máximo de tentativas excedido")
def create_sql_agent_executor(db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None, top_k: int = 10):
"""
Cria um agente SQL usando LangChain com suporte a diferentes provedores
Args:
db: Objeto SQLDatabase do LangChain
model_name: Nome do modelo a usar (OpenAI, Anthropic)
single_table_mode: Se deve restringir a uma única tabela
selected_table: Tabela específica para modo único
top_k: Número máximo de resultados (LIMIT) para queries SQL
Returns:
Agente SQL configurado
"""
try:
# Se modo tabela única, cria SQLDatabase restrito
if single_table_mode and selected_table:
# Cria uma nova instância do SQLDatabase restrita à tabela selecionada
restricted_db = SQLDatabase.from_uri(
db._engine.url,
include_tables=[selected_table]
)
logging.info(f"[SQL_AGENT] Criando agente em modo tabela única: {selected_table}")
db_to_use = restricted_db
else:
# Usa o SQLDatabase original (modo multi-tabela)
logging.info("[SQL_AGENT] Criando agente em modo multi-tabela")
db_to_use = db
# Obtém o ID real do modelo
model_id = AVAILABLE_MODELS.get(model_name, model_name)
# Cria o modelo LLM baseado no provedor
if model_id in OPENAI_MODELS:
# Configurações específicas para modelos OpenAI
if model_id == "o3-mini":
# o3-mini não suporta temperature
llm = ChatOpenAI(model=model_id)
else:
# GPT-4o e GPT-4o-mini suportam temperature
llm = ChatOpenAI(model=model_id, temperature=TEMPERATURE)
agent_type = "openai-tools"
elif model_id in ANTHROPIC_MODELS:
# Claude com tool-calling e configurações para rate limiting
llm = ChatAnthropic(
model=model_id,
temperature=TEMPERATURE,
max_tokens=4096,
max_retries=2, # Retry interno do cliente
timeout=60.0 # Timeout mais longo
)
agent_type = "tool-calling" # Claude usa tool-calling
elif model_id in GOOGLE_MODELS:
# Gemini com tool-calling e configurações otimizadas
llm = ChatGoogleGenerativeAI(
model=model_id,
temperature=TEMPERATURE,
max_tokens=4096,
max_retries=2,
timeout=60.0
)
agent_type = "tool-calling" # Gemini usa tool-calling
else:
# Fallback para OpenAI
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=TEMPERATURE
)
agent_type = "openai-tools"
logging.warning(f"Modelo {model_name} não reconhecido, usando gpt-4o-mini como fallback")
# Cria o agente SQL
sql_agent = create_sql_agent(
llm=llm,
db=db_to_use, # Usa o SQLDatabase apropriado (restrito ou completo)
agent_type=agent_type,
verbose=True,
max_iterations=MAX_ITERATIONS,
return_intermediate_steps=True,
top_k=top_k # Usa o valor dinâmico configurado pelo usuário
)
logging.info(f"Agente SQL criado com sucesso usando modelo {model_name} ({model_id}) com agent_type={agent_type}")
return sql_agent
except Exception as e:
logging.error(f"Erro ao criar agente SQL: {e}")
raise
class SQLAgentManager:
"""
Gerenciador do agente SQL com funcionalidades avançadas
"""
def __init__(self, db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None, top_k: int = 10):
self.db = db
self.model_name = model_name
self.single_table_mode = single_table_mode
self.selected_table = selected_table
self.top_k = top_k
self.agent = None
self._initialize_agent()
def _initialize_agent(self):
"""Inicializa o agente SQL"""
self.agent = create_sql_agent_executor(self.db, self.model_name, self.single_table_mode, self.selected_table, self.top_k)
def recreate_agent(self, new_db: SQLDatabase = None, new_model: str = None, single_table_mode: bool = None, selected_table: str = None, top_k: int = None):
"""
Recria o agente com novos parâmetros
Args:
new_db: Novo banco de dados (opcional)
new_model: Novo modelo (opcional)
single_table_mode: Novo modo de tabela (opcional)
selected_table: Nova tabela selecionada (opcional)
top_k: Novo valor de TOP_K para LIMIT (opcional)
"""
if new_db:
self.db = new_db
if new_model:
self.model_name = new_model
if single_table_mode is not None:
self.single_table_mode = single_table_mode
if selected_table is not None:
self.selected_table = selected_table
if top_k is not None:
self.top_k = top_k
self._initialize_agent()
mode_info = f"modo {'tabela única' if self.single_table_mode else 'multi-tabela'}"
logging.info(f"Agente SQL recriado com modelo {self.model_name} em {mode_info}, TOP_K={self.top_k}")
def _extract_text_from_claude_response(self, output) -> str:
"""
Extrai texto limpo da resposta do Claude que pode vir em formato complexo
Args:
output: Resposta do agente (pode ser string, lista ou dict)
Returns:
String limpa com o texto da resposta
"""
try:
# Se já é string, retorna diretamente
if isinstance(output, str):
return output
# Se é lista, procura por dicionários com 'text'
if isinstance(output, list):
text_parts = []
for item in output:
if isinstance(item, dict) and 'text' in item:
text_parts.append(item['text'])
elif isinstance(item, str):
text_parts.append(item)
if text_parts:
return '\n'.join(text_parts)
# Se é dict, procura por 'text' ou converte para string
if isinstance(output, dict):
if 'text' in output:
return output['text']
elif 'content' in output:
return str(output['content'])
# Fallback: converte para string
return str(output)
except Exception as e:
logging.warning(f"Erro ao extrair texto da resposta: {e}")
return str(output)
async def execute_query(self, instruction: str) -> dict:
"""
Executa uma query através do agente SQL com retry para rate limiting
Args:
instruction: Instrução para o agente
Returns:
Resultado da execução
"""
try:
logging.info("------- Agent SQL: Executando query -------")
# Criar handler para capturar SQL
sql_handler = SQLQueryCaptureHandler()
# Verifica se é agente Claude ou Gemini para aplicar retry
model_id = getattr(self, 'model_name', '')
is_claude = any(claude_model in model_id for claude_model in ANTHROPIC_MODELS)
is_gemini = any(gemini_model in model_id for gemini_model in GOOGLE_MODELS)
if is_claude or is_gemini:
# Usa retry com backoff para Claude e Gemini
response = await retry_with_backoff(
lambda: self.agent.invoke(
{"input": instruction},
{"callbacks": [sql_handler]}
),
max_retries=3,
base_delay=2.0
)
else:
# Execução normal para outros modelos
response = self.agent.invoke(
{"input": instruction},
{"callbacks": [sql_handler]}
)
# Extrai e limpa a resposta
raw_output = response.get("output", "Erro ao obter a resposta do agente.")
clean_output = self._extract_text_from_claude_response(raw_output)
# Captura a última query SQL executada
sql_query = sql_handler.get_last_sql_query()
result = {
"output": clean_output,
"intermediate_steps": response.get("intermediate_steps", []),
"success": True,
"sql_query": sql_query, # ← Query SQL capturada
"all_sql_queries": sql_handler.get_all_sql_queries()
}
logging.info(f"Query executada com sucesso: {result['output'][:100]}...")
return result
except Exception as e:
error_str = str(e)
# Mensagem mais amigável para problemas de rate limiting
if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']):
error_msg = (
"🚫 **API da Anthropic temporariamente sobrecarregada**\n\n"
"A API do Claude está com muitas solicitações no momento. "
"Por favor, aguarde alguns minutos e tente novamente.\n\n"
"**Sugestões:**\n"
"- Aguarde 2-3 minutos antes de tentar novamente\n"
"- Considere usar um modelo OpenAI temporariamente\n"
"- Tente novamente em horários de menor movimento\n\n"
f"*Erro técnico: {e}*"
)
else:
error_msg = f"Erro ao consultar o agente SQL: {e}"
logging.error(error_msg)
return {
"output": error_msg,
"intermediate_steps": [],
"success": False
}
def get_agent_info(self) -> dict:
"""
Retorna informações sobre o agente atual
Returns:
Dicionário com informações do agente
"""
return {
"model_name": self.model_name,
"max_iterations": MAX_ITERATIONS,
"temperature": TEMPERATURE,
"database_tables": self.db.get_usable_table_names() if self.db else [],
"agent_type": "openai-tools"
}
def validate_agent(self) -> bool:
"""
Valida se o agente está funcionando corretamente
Returns:
True se válido, False caso contrário
"""
try:
# Testa com uma query simples
test_result = self.agent.invoke({
"input": "Quantas linhas existem na tabela?"
})
success = "output" in test_result and test_result["output"]
logging.info(f"Validação do agente: {'Sucesso' if success else 'Falha'}")
return success
except Exception as e:
logging.error(f"Erro na validação do agente: {e}")
return False
def get_default_sql_agent(db: SQLDatabase) -> SQLAgentManager:
"""
Cria um agente SQL com configurações padrão
Args:
db: Objeto SQLDatabase
Returns:
SQLAgentManager configurado
"""
return SQLAgentManager(db)
|