agent-seara / agents /sql_agent.py
rwayz's picture
m
e982206
"""
Criação e configuração do agente SQL
"""
import logging
import time
import asyncio
from typing import Optional, Dict, Any, List
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import AgentAction, AgentFinish
from utils.config import (
MAX_ITERATIONS,
TEMPERATURE,
AVAILABLE_MODELS,
OPENAI_MODELS,
ANTHROPIC_MODELS,
GOOGLE_MODELS
)
class SQLQueryCaptureHandler(BaseCallbackHandler):
"""
Handler para capturar queries SQL executadas pelo agente
"""
def __init__(self):
super().__init__()
self.sql_queries: List[str] = []
self.agent_actions: List[Dict[str, Any]] = []
self.step_count = 0
def on_agent_action(self, action: AgentAction, **kwargs) -> None:
"""
Captura ações do agente, especialmente queries SQL
Args:
action: Ação do agente
"""
try:
self.step_count += 1
tool_name = action.tool
tool_input = action.tool_input
# Capturar SQL especificamente (sem log de cada passo)
if tool_name == 'sql_db_query' and isinstance(tool_input, dict):
sql_query = tool_input.get('query', '')
if sql_query and sql_query.strip():
clean_query = sql_query.strip()
self.sql_queries.append(clean_query)
# Log apenas uma vez com query completa
logging.info(f"[SQL_HANDLER] 🔍 Query SQL capturada:\n{clean_query}")
# Armazenar todas as ações para debug
self.agent_actions.append({
"step": self.step_count,
"tool": tool_name,
"input": tool_input,
"timestamp": time.time()
})
except Exception as e:
logging.error(f"[SQL_HANDLER] Erro ao capturar ação: {e}")
def get_last_sql_query(self) -> Optional[str]:
"""
Retorna a última query SQL capturada
Returns:
Última query SQL ou None se não houver
"""
return self.sql_queries[-1] if self.sql_queries else None
def get_all_sql_queries(self) -> List[str]:
"""
Retorna todas as queries SQL capturadas
Returns:
Lista de queries SQL
"""
return self.sql_queries.copy()
def reset(self):
"""Reseta o handler para nova execução"""
self.sql_queries.clear()
self.agent_actions.clear()
self.step_count = 0
async def retry_with_backoff(func, max_retries=3, base_delay=1.0):
"""
Executa função com retry e backoff exponencial para lidar com rate limiting
Args:
func: Função a ser executada
max_retries: Número máximo de tentativas
base_delay: Delay base em segundos
Returns:
Resultado da função ou levanta exceção após esgotar tentativas
"""
for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
error_str = str(e)
# Verifica se é erro de rate limiting ou overload
if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']):
if attempt < max_retries:
delay = base_delay * (2 ** attempt) # Backoff exponencial
logging.warning(f"API sobrecarregada (tentativa {attempt + 1}/{max_retries + 1}). Aguardando {delay}s...")
await asyncio.sleep(delay)
continue
else:
logging.error(f"API continua sobrecarregada após {max_retries + 1} tentativas")
raise Exception(f"API da Anthropic sobrecarregada. Tente novamente em alguns minutos. Erro original: {e}")
else:
# Se não é erro de rate limiting, levanta imediatamente
raise e
# Não deveria chegar aqui, mas por segurança
raise Exception("Número máximo de tentativas excedido")
def create_sql_agent_executor(db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None, top_k: int = 10):
"""
Cria um agente SQL usando LangChain com suporte a diferentes provedores
Args:
db: Objeto SQLDatabase do LangChain
model_name: Nome do modelo a usar (OpenAI, Anthropic)
single_table_mode: Se deve restringir a uma única tabela
selected_table: Tabela específica para modo único
top_k: Número máximo de resultados (LIMIT) para queries SQL
Returns:
Agente SQL configurado
"""
try:
# Se modo tabela única, cria SQLDatabase restrito
if single_table_mode and selected_table:
# Cria uma nova instância do SQLDatabase restrita à tabela selecionada
restricted_db = SQLDatabase.from_uri(
db._engine.url,
include_tables=[selected_table]
)
logging.info(f"[SQL_AGENT] Criando agente em modo tabela única: {selected_table}")
db_to_use = restricted_db
else:
# Usa o SQLDatabase original (modo multi-tabela)
logging.info("[SQL_AGENT] Criando agente em modo multi-tabela")
db_to_use = db
# Obtém o ID real do modelo
model_id = AVAILABLE_MODELS.get(model_name, model_name)
# Cria o modelo LLM baseado no provedor
if model_id in OPENAI_MODELS:
# Configurações específicas para modelos OpenAI
if model_id == "o3-mini":
# o3-mini não suporta temperature
llm = ChatOpenAI(model=model_id)
else:
# GPT-4o e GPT-4o-mini suportam temperature
llm = ChatOpenAI(model=model_id, temperature=TEMPERATURE)
agent_type = "openai-tools"
elif model_id in ANTHROPIC_MODELS:
# Claude com tool-calling e configurações para rate limiting
llm = ChatAnthropic(
model=model_id,
temperature=TEMPERATURE,
max_tokens=4096,
max_retries=2, # Retry interno do cliente
timeout=60.0 # Timeout mais longo
)
agent_type = "tool-calling" # Claude usa tool-calling
elif model_id in GOOGLE_MODELS:
# Gemini com tool-calling e configurações otimizadas
llm = ChatGoogleGenerativeAI(
model=model_id,
temperature=TEMPERATURE,
max_tokens=4096,
max_retries=2,
timeout=60.0
)
agent_type = "tool-calling" # Gemini usa tool-calling
else:
# Fallback para OpenAI
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=TEMPERATURE
)
agent_type = "openai-tools"
logging.warning(f"Modelo {model_name} não reconhecido, usando gpt-4o-mini como fallback")
# Cria o agente SQL
sql_agent = create_sql_agent(
llm=llm,
db=db_to_use, # Usa o SQLDatabase apropriado (restrito ou completo)
agent_type=agent_type,
verbose=True,
max_iterations=MAX_ITERATIONS,
return_intermediate_steps=True,
top_k=top_k # Usa o valor dinâmico configurado pelo usuário
)
logging.info(f"Agente SQL criado com sucesso usando modelo {model_name} ({model_id}) com agent_type={agent_type}")
return sql_agent
except Exception as e:
logging.error(f"Erro ao criar agente SQL: {e}")
raise
class SQLAgentManager:
"""
Gerenciador do agente SQL com funcionalidades avançadas
"""
def __init__(self, db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None, top_k: int = 10):
self.db = db
self.model_name = model_name
self.single_table_mode = single_table_mode
self.selected_table = selected_table
self.top_k = top_k
self.agent = None
self._initialize_agent()
def _initialize_agent(self):
"""Inicializa o agente SQL"""
self.agent = create_sql_agent_executor(self.db, self.model_name, self.single_table_mode, self.selected_table, self.top_k)
def recreate_agent(self, new_db: SQLDatabase = None, new_model: str = None, single_table_mode: bool = None, selected_table: str = None, top_k: int = None):
"""
Recria o agente com novos parâmetros
Args:
new_db: Novo banco de dados (opcional)
new_model: Novo modelo (opcional)
single_table_mode: Novo modo de tabela (opcional)
selected_table: Nova tabela selecionada (opcional)
top_k: Novo valor de TOP_K para LIMIT (opcional)
"""
if new_db:
self.db = new_db
if new_model:
self.model_name = new_model
if single_table_mode is not None:
self.single_table_mode = single_table_mode
if selected_table is not None:
self.selected_table = selected_table
if top_k is not None:
self.top_k = top_k
self._initialize_agent()
mode_info = f"modo {'tabela única' if self.single_table_mode else 'multi-tabela'}"
logging.info(f"Agente SQL recriado com modelo {self.model_name} em {mode_info}, TOP_K={self.top_k}")
def _extract_text_from_claude_response(self, output) -> str:
"""
Extrai texto limpo da resposta do Claude que pode vir em formato complexo
Args:
output: Resposta do agente (pode ser string, lista ou dict)
Returns:
String limpa com o texto da resposta
"""
try:
# Se já é string, retorna diretamente
if isinstance(output, str):
return output
# Se é lista, procura por dicionários com 'text'
if isinstance(output, list):
text_parts = []
for item in output:
if isinstance(item, dict) and 'text' in item:
text_parts.append(item['text'])
elif isinstance(item, str):
text_parts.append(item)
if text_parts:
return '\n'.join(text_parts)
# Se é dict, procura por 'text' ou converte para string
if isinstance(output, dict):
if 'text' in output:
return output['text']
elif 'content' in output:
return str(output['content'])
# Fallback: converte para string
return str(output)
except Exception as e:
logging.warning(f"Erro ao extrair texto da resposta: {e}")
return str(output)
async def execute_query(self, instruction: str) -> dict:
"""
Executa uma query através do agente SQL com retry para rate limiting
Args:
instruction: Instrução para o agente
Returns:
Resultado da execução
"""
try:
logging.info("------- Agent SQL: Executando query -------")
# Criar handler para capturar SQL
sql_handler = SQLQueryCaptureHandler()
# Verifica se é agente Claude ou Gemini para aplicar retry
model_id = getattr(self, 'model_name', '')
is_claude = any(claude_model in model_id for claude_model in ANTHROPIC_MODELS)
is_gemini = any(gemini_model in model_id for gemini_model in GOOGLE_MODELS)
if is_claude or is_gemini:
# Usa retry com backoff para Claude e Gemini
response = await retry_with_backoff(
lambda: self.agent.invoke(
{"input": instruction},
{"callbacks": [sql_handler]}
),
max_retries=3,
base_delay=2.0
)
else:
# Execução normal para outros modelos
response = self.agent.invoke(
{"input": instruction},
{"callbacks": [sql_handler]}
)
# Extrai e limpa a resposta
raw_output = response.get("output", "Erro ao obter a resposta do agente.")
clean_output = self._extract_text_from_claude_response(raw_output)
# Captura a última query SQL executada
sql_query = sql_handler.get_last_sql_query()
result = {
"output": clean_output,
"intermediate_steps": response.get("intermediate_steps", []),
"success": True,
"sql_query": sql_query, # ← Query SQL capturada
"all_sql_queries": sql_handler.get_all_sql_queries()
}
logging.info(f"Query executada com sucesso: {result['output'][:100]}...")
return result
except Exception as e:
error_str = str(e)
# Mensagem mais amigável para problemas de rate limiting
if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']):
error_msg = (
"🚫 **API da Anthropic temporariamente sobrecarregada**\n\n"
"A API do Claude está com muitas solicitações no momento. "
"Por favor, aguarde alguns minutos e tente novamente.\n\n"
"**Sugestões:**\n"
"- Aguarde 2-3 minutos antes de tentar novamente\n"
"- Considere usar um modelo OpenAI temporariamente\n"
"- Tente novamente em horários de menor movimento\n\n"
f"*Erro técnico: {e}*"
)
else:
error_msg = f"Erro ao consultar o agente SQL: {e}"
logging.error(error_msg)
return {
"output": error_msg,
"intermediate_steps": [],
"success": False
}
def get_agent_info(self) -> dict:
"""
Retorna informações sobre o agente atual
Returns:
Dicionário com informações do agente
"""
return {
"model_name": self.model_name,
"max_iterations": MAX_ITERATIONS,
"temperature": TEMPERATURE,
"database_tables": self.db.get_usable_table_names() if self.db else [],
"agent_type": "openai-tools"
}
def validate_agent(self) -> bool:
"""
Valida se o agente está funcionando corretamente
Returns:
True se válido, False caso contrário
"""
try:
# Testa com uma query simples
test_result = self.agent.invoke({
"input": "Quantas linhas existem na tabela?"
})
success = "output" in test_result and test_result["output"]
logging.info(f"Validação do agente: {'Sucesso' if success else 'Falha'}")
return success
except Exception as e:
logging.error(f"Erro na validação do agente: {e}")
return False
def get_default_sql_agent(db: SQLDatabase) -> SQLAgentManager:
"""
Cria um agente SQL com configurações padrão
Args:
db: Objeto SQLDatabase
Returns:
SQLAgentManager configurado
"""
return SQLAgentManager(db)