""" Criação e configuração do agente SQL """ import logging import time import asyncio from typing import Optional, Dict, Any, List from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_google_genai import ChatGoogleGenerativeAI from langchain_community.agent_toolkits import create_sql_agent from langchain_community.utilities import SQLDatabase from langchain.callbacks.base import BaseCallbackHandler from langchain.schema import AgentAction, AgentFinish from utils.config import ( MAX_ITERATIONS, TEMPERATURE, AVAILABLE_MODELS, OPENAI_MODELS, ANTHROPIC_MODELS, GOOGLE_MODELS ) class SQLQueryCaptureHandler(BaseCallbackHandler): """ Handler para capturar queries SQL executadas pelo agente """ def __init__(self): super().__init__() self.sql_queries: List[str] = [] self.agent_actions: List[Dict[str, Any]] = [] self.step_count = 0 def on_agent_action(self, action: AgentAction, **kwargs) -> None: """ Captura ações do agente, especialmente queries SQL Args: action: Ação do agente """ try: self.step_count += 1 tool_name = action.tool tool_input = action.tool_input # Capturar SQL especificamente (sem log de cada passo) if tool_name == 'sql_db_query' and isinstance(tool_input, dict): sql_query = tool_input.get('query', '') if sql_query and sql_query.strip(): clean_query = sql_query.strip() self.sql_queries.append(clean_query) # Log apenas uma vez com query completa logging.info(f"[SQL_HANDLER] 🔍 Query SQL capturada:\n{clean_query}") # Armazenar todas as ações para debug self.agent_actions.append({ "step": self.step_count, "tool": tool_name, "input": tool_input, "timestamp": time.time() }) except Exception as e: logging.error(f"[SQL_HANDLER] Erro ao capturar ação: {e}") def get_last_sql_query(self) -> Optional[str]: """ Retorna a última query SQL capturada Returns: Última query SQL ou None se não houver """ return self.sql_queries[-1] if self.sql_queries else None def get_all_sql_queries(self) -> List[str]: """ Retorna todas as queries SQL capturadas Returns: Lista de queries SQL """ return self.sql_queries.copy() def reset(self): """Reseta o handler para nova execução""" self.sql_queries.clear() self.agent_actions.clear() self.step_count = 0 async def retry_with_backoff(func, max_retries=3, base_delay=1.0): """ Executa função com retry e backoff exponencial para lidar com rate limiting Args: func: Função a ser executada max_retries: Número máximo de tentativas base_delay: Delay base em segundos Returns: Resultado da função ou levanta exceção após esgotar tentativas """ for attempt in range(max_retries + 1): try: return func() except Exception as e: error_str = str(e) # Verifica se é erro de rate limiting ou overload if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): if attempt < max_retries: delay = base_delay * (2 ** attempt) # Backoff exponencial logging.warning(f"API sobrecarregada (tentativa {attempt + 1}/{max_retries + 1}). Aguardando {delay}s...") await asyncio.sleep(delay) continue else: logging.error(f"API continua sobrecarregada após {max_retries + 1} tentativas") raise Exception(f"API da Anthropic sobrecarregada. Tente novamente em alguns minutos. Erro original: {e}") else: # Se não é erro de rate limiting, levanta imediatamente raise e # Não deveria chegar aqui, mas por segurança raise Exception("Número máximo de tentativas excedido") def create_sql_agent_executor(db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None, top_k: int = 10): """ Cria um agente SQL usando LangChain com suporte a diferentes provedores Args: db: Objeto SQLDatabase do LangChain model_name: Nome do modelo a usar (OpenAI, Anthropic) single_table_mode: Se deve restringir a uma única tabela selected_table: Tabela específica para modo único top_k: Número máximo de resultados (LIMIT) para queries SQL Returns: Agente SQL configurado """ try: # Se modo tabela única, cria SQLDatabase restrito if single_table_mode and selected_table: # Cria uma nova instância do SQLDatabase restrita à tabela selecionada restricted_db = SQLDatabase.from_uri( db._engine.url, include_tables=[selected_table] ) logging.info(f"[SQL_AGENT] Criando agente em modo tabela única: {selected_table}") db_to_use = restricted_db else: # Usa o SQLDatabase original (modo multi-tabela) logging.info("[SQL_AGENT] Criando agente em modo multi-tabela") db_to_use = db # Obtém o ID real do modelo model_id = AVAILABLE_MODELS.get(model_name, model_name) # Cria o modelo LLM baseado no provedor if model_id in OPENAI_MODELS: # Configurações específicas para modelos OpenAI if model_id == "o3-mini": # o3-mini não suporta temperature llm = ChatOpenAI(model=model_id) else: # GPT-4o e GPT-4o-mini suportam temperature llm = ChatOpenAI(model=model_id, temperature=TEMPERATURE) agent_type = "openai-tools" elif model_id in ANTHROPIC_MODELS: # Claude com tool-calling e configurações para rate limiting llm = ChatAnthropic( model=model_id, temperature=TEMPERATURE, max_tokens=4096, max_retries=2, # Retry interno do cliente timeout=60.0 # Timeout mais longo ) agent_type = "tool-calling" # Claude usa tool-calling elif model_id in GOOGLE_MODELS: # Gemini com tool-calling e configurações otimizadas llm = ChatGoogleGenerativeAI( model=model_id, temperature=TEMPERATURE, max_tokens=4096, max_retries=2, timeout=60.0 ) agent_type = "tool-calling" # Gemini usa tool-calling else: # Fallback para OpenAI llm = ChatOpenAI( model="gpt-4o-mini", temperature=TEMPERATURE ) agent_type = "openai-tools" logging.warning(f"Modelo {model_name} não reconhecido, usando gpt-4o-mini como fallback") # Cria o agente SQL sql_agent = create_sql_agent( llm=llm, db=db_to_use, # Usa o SQLDatabase apropriado (restrito ou completo) agent_type=agent_type, verbose=True, max_iterations=MAX_ITERATIONS, return_intermediate_steps=True, top_k=top_k # Usa o valor dinâmico configurado pelo usuário ) logging.info(f"Agente SQL criado com sucesso usando modelo {model_name} ({model_id}) com agent_type={agent_type}") return sql_agent except Exception as e: logging.error(f"Erro ao criar agente SQL: {e}") raise class SQLAgentManager: """ Gerenciador do agente SQL com funcionalidades avançadas """ def __init__(self, db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None, top_k: int = 10): self.db = db self.model_name = model_name self.single_table_mode = single_table_mode self.selected_table = selected_table self.top_k = top_k self.agent = None self._initialize_agent() def _initialize_agent(self): """Inicializa o agente SQL""" self.agent = create_sql_agent_executor(self.db, self.model_name, self.single_table_mode, self.selected_table, self.top_k) def recreate_agent(self, new_db: SQLDatabase = None, new_model: str = None, single_table_mode: bool = None, selected_table: str = None, top_k: int = None): """ Recria o agente com novos parâmetros Args: new_db: Novo banco de dados (opcional) new_model: Novo modelo (opcional) single_table_mode: Novo modo de tabela (opcional) selected_table: Nova tabela selecionada (opcional) top_k: Novo valor de TOP_K para LIMIT (opcional) """ if new_db: self.db = new_db if new_model: self.model_name = new_model if single_table_mode is not None: self.single_table_mode = single_table_mode if selected_table is not None: self.selected_table = selected_table if top_k is not None: self.top_k = top_k self._initialize_agent() mode_info = f"modo {'tabela única' if self.single_table_mode else 'multi-tabela'}" logging.info(f"Agente SQL recriado com modelo {self.model_name} em {mode_info}, TOP_K={self.top_k}") def _extract_text_from_claude_response(self, output) -> str: """ Extrai texto limpo da resposta do Claude que pode vir em formato complexo Args: output: Resposta do agente (pode ser string, lista ou dict) Returns: String limpa com o texto da resposta """ try: # Se já é string, retorna diretamente if isinstance(output, str): return output # Se é lista, procura por dicionários com 'text' if isinstance(output, list): text_parts = [] for item in output: if isinstance(item, dict) and 'text' in item: text_parts.append(item['text']) elif isinstance(item, str): text_parts.append(item) if text_parts: return '\n'.join(text_parts) # Se é dict, procura por 'text' ou converte para string if isinstance(output, dict): if 'text' in output: return output['text'] elif 'content' in output: return str(output['content']) # Fallback: converte para string return str(output) except Exception as e: logging.warning(f"Erro ao extrair texto da resposta: {e}") return str(output) async def execute_query(self, instruction: str) -> dict: """ Executa uma query através do agente SQL com retry para rate limiting Args: instruction: Instrução para o agente Returns: Resultado da execução """ try: logging.info("------- Agent SQL: Executando query -------") # Criar handler para capturar SQL sql_handler = SQLQueryCaptureHandler() # Verifica se é agente Claude ou Gemini para aplicar retry model_id = getattr(self, 'model_name', '') is_claude = any(claude_model in model_id for claude_model in ANTHROPIC_MODELS) is_gemini = any(gemini_model in model_id for gemini_model in GOOGLE_MODELS) if is_claude or is_gemini: # Usa retry com backoff para Claude e Gemini response = await retry_with_backoff( lambda: self.agent.invoke( {"input": instruction}, {"callbacks": [sql_handler]} ), max_retries=3, base_delay=2.0 ) else: # Execução normal para outros modelos response = self.agent.invoke( {"input": instruction}, {"callbacks": [sql_handler]} ) # Extrai e limpa a resposta raw_output = response.get("output", "Erro ao obter a resposta do agente.") clean_output = self._extract_text_from_claude_response(raw_output) # Captura a última query SQL executada sql_query = sql_handler.get_last_sql_query() result = { "output": clean_output, "intermediate_steps": response.get("intermediate_steps", []), "success": True, "sql_query": sql_query, # ← Query SQL capturada "all_sql_queries": sql_handler.get_all_sql_queries() } logging.info(f"Query executada com sucesso: {result['output'][:100]}...") return result except Exception as e: error_str = str(e) # Mensagem mais amigável para problemas de rate limiting if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): error_msg = ( "🚫 **API da Anthropic temporariamente sobrecarregada**\n\n" "A API do Claude está com muitas solicitações no momento. " "Por favor, aguarde alguns minutos e tente novamente.\n\n" "**Sugestões:**\n" "- Aguarde 2-3 minutos antes de tentar novamente\n" "- Considere usar um modelo OpenAI temporariamente\n" "- Tente novamente em horários de menor movimento\n\n" f"*Erro técnico: {e}*" ) else: error_msg = f"Erro ao consultar o agente SQL: {e}" logging.error(error_msg) return { "output": error_msg, "intermediate_steps": [], "success": False } def get_agent_info(self) -> dict: """ Retorna informações sobre o agente atual Returns: Dicionário com informações do agente """ return { "model_name": self.model_name, "max_iterations": MAX_ITERATIONS, "temperature": TEMPERATURE, "database_tables": self.db.get_usable_table_names() if self.db else [], "agent_type": "openai-tools" } def validate_agent(self) -> bool: """ Valida se o agente está funcionando corretamente Returns: True se válido, False caso contrário """ try: # Testa com uma query simples test_result = self.agent.invoke({ "input": "Quantas linhas existem na tabela?" }) success = "output" in test_result and test_result["output"] logging.info(f"Validação do agente: {'Sucesso' if success else 'Falha'}") return success except Exception as e: logging.error(f"Erro na validação do agente: {e}") return False def get_default_sql_agent(db: SQLDatabase) -> SQLAgentManager: """ Cria um agente SQL com configurações padrão Args: db: Objeto SQLDatabase Returns: SQLAgentManager configurado """ return SQLAgentManager(db)