| | """ |
| | Criação e configuração do agente SQL |
| | """ |
| | import logging |
| | import time |
| | import asyncio |
| | from typing import Optional, Dict, Any, List |
| | from langchain_openai import ChatOpenAI |
| | from langchain_anthropic import ChatAnthropic |
| | from langchain_community.agent_toolkits import create_sql_agent |
| | from langchain_community.utilities import SQLDatabase |
| | from langchain.callbacks.base import BaseCallbackHandler |
| | from langchain.schema import AgentAction, AgentFinish |
| |
|
| |
|
| | from utils.config import ( |
| | MAX_ITERATIONS, |
| | TEMPERATURE, |
| | AVAILABLE_MODELS, |
| | OPENAI_MODELS, |
| | ANTHROPIC_MODELS |
| | ) |
| |
|
| | class SQLQueryCaptureHandler(BaseCallbackHandler): |
| | """ |
| | Handler para capturar queries SQL executadas pelo agente |
| | """ |
| |
|
| | def __init__(self): |
| | super().__init__() |
| | self.sql_queries: List[str] = [] |
| | self.agent_actions: List[Dict[str, Any]] = [] |
| | self.step_count = 0 |
| |
|
| | def on_agent_action(self, action: AgentAction, **kwargs) -> None: |
| | """ |
| | Captura ações do agente, especialmente queries SQL |
| | |
| | Args: |
| | action: Ação do agente |
| | """ |
| | try: |
| | self.step_count += 1 |
| | tool_name = action.tool |
| | tool_input = action.tool_input |
| |
|
| | |
| | if tool_name == 'sql_db_query' and isinstance(tool_input, dict): |
| | sql_query = tool_input.get('query', '') |
| | if sql_query and sql_query.strip(): |
| | clean_query = sql_query.strip() |
| | self.sql_queries.append(clean_query) |
| |
|
| | |
| | logging.info(f"[SQL_HANDLER] 🔍 Query SQL capturada:\n{clean_query}") |
| |
|
| | |
| | self.agent_actions.append({ |
| | "step": self.step_count, |
| | "tool": tool_name, |
| | "input": tool_input, |
| | "timestamp": time.time() |
| | }) |
| |
|
| | except Exception as e: |
| | logging.error(f"[SQL_HANDLER] Erro ao capturar ação: {e}") |
| |
|
| | def get_last_sql_query(self) -> Optional[str]: |
| | """ |
| | Retorna a última query SQL capturada |
| | |
| | Returns: |
| | Última query SQL ou None se não houver |
| | """ |
| | return self.sql_queries[-1] if self.sql_queries else None |
| |
|
| | def get_all_sql_queries(self) -> List[str]: |
| | """ |
| | Retorna todas as queries SQL capturadas |
| | |
| | Returns: |
| | Lista de queries SQL |
| | """ |
| | return self.sql_queries.copy() |
| |
|
| | def reset(self): |
| | """Reseta o handler para nova execução""" |
| | self.sql_queries.clear() |
| | self.agent_actions.clear() |
| | self.step_count = 0 |
| |
|
| | async def retry_with_backoff(func, max_retries=3, base_delay=1.0): |
| | """ |
| | Executa função com retry e backoff exponencial para lidar com rate limiting |
| | |
| | Args: |
| | func: Função a ser executada |
| | max_retries: Número máximo de tentativas |
| | base_delay: Delay base em segundos |
| | |
| | Returns: |
| | Resultado da função ou levanta exceção após esgotar tentativas |
| | """ |
| | for attempt in range(max_retries + 1): |
| | try: |
| | return func() |
| | except Exception as e: |
| | error_str = str(e) |
| |
|
| | |
| | if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): |
| | if attempt < max_retries: |
| | delay = base_delay * (2 ** attempt) |
| | logging.warning(f"API sobrecarregada (tentativa {attempt + 1}/{max_retries + 1}). Aguardando {delay}s...") |
| | await asyncio.sleep(delay) |
| | continue |
| | else: |
| | logging.error(f"API continua sobrecarregada após {max_retries + 1} tentativas") |
| | raise Exception(f"API da Anthropic sobrecarregada. Tente novamente em alguns minutos. Erro original: {e}") |
| | else: |
| | |
| | raise e |
| |
|
| | |
| | raise Exception("Número máximo de tentativas excedido") |
| |
|
| |
|
| |
|
| | def create_sql_agent_executor(db: SQLDatabase, model_name: str = "gpt-4o-mini"): |
| | """ |
| | Cria um agente SQL usando LangChain com suporte a diferentes provedores |
| | |
| | Args: |
| | db: Objeto SQLDatabase do LangChain |
| | model_name: Nome do modelo a usar (OpenAI, Anthropic) |
| | |
| | Returns: |
| | Agente SQL configurado |
| | """ |
| | try: |
| | |
| | model_id = AVAILABLE_MODELS.get(model_name, model_name) |
| |
|
| | |
| | if model_id in OPENAI_MODELS: |
| | |
| | if model_id == "o3-mini": |
| | |
| | llm = ChatOpenAI(model=model_id) |
| | else: |
| | |
| | llm = ChatOpenAI(model=model_id, temperature=TEMPERATURE) |
| |
|
| | agent_type = "openai-tools" |
| |
|
| | elif model_id in ANTHROPIC_MODELS: |
| | |
| | llm = ChatAnthropic( |
| | model=model_id, |
| | temperature=TEMPERATURE, |
| | max_tokens=4096, |
| | max_retries=2, |
| | timeout=60.0 |
| | ) |
| | agent_type = "tool-calling" |
| |
|
| | else: |
| | |
| | llm = ChatOpenAI( |
| | model="gpt-4o-mini", |
| | temperature=TEMPERATURE |
| | ) |
| | agent_type = "openai-tools" |
| | logging.warning(f"Modelo {model_name} não reconhecido, usando gpt-4o-mini como fallback") |
| |
|
| | |
| | sql_agent = create_sql_agent( |
| | llm=llm, |
| | db=db, |
| | agent_type=agent_type, |
| | verbose=True, |
| | max_iterations=MAX_ITERATIONS, |
| | return_intermediate_steps=True, |
| | top_k=10 |
| | ) |
| |
|
| | logging.info(f"Agente SQL criado com sucesso usando modelo {model_name} ({model_id}) com agent_type={agent_type}") |
| | return sql_agent |
| |
|
| | except Exception as e: |
| | logging.error(f"Erro ao criar agente SQL: {e}") |
| | raise |
| |
|
| | class SQLAgentManager: |
| | """ |
| | Gerenciador do agente SQL com funcionalidades avançadas |
| | """ |
| | |
| | def __init__(self, db: SQLDatabase, model_name: str = "gpt-4o-mini"): |
| | self.db = db |
| | self.model_name = model_name |
| | self.agent = None |
| | self._initialize_agent() |
| | |
| | def _initialize_agent(self): |
| | """Inicializa o agente SQL""" |
| | self.agent = create_sql_agent_executor(self.db, self.model_name) |
| | |
| | def recreate_agent(self, new_db: SQLDatabase = None, new_model: str = None): |
| | """ |
| | Recria o agente com novos parâmetros |
| | |
| | Args: |
| | new_db: Novo banco de dados (opcional) |
| | new_model: Novo modelo (opcional) |
| | """ |
| | if new_db: |
| | self.db = new_db |
| | if new_model: |
| | self.model_name = new_model |
| | |
| | self._initialize_agent() |
| | logging.info("Agente SQL recriado com sucesso") |
| | |
| | def _extract_text_from_claude_response(self, output) -> str: |
| | """ |
| | Extrai texto limpo da resposta do Claude que pode vir em formato complexo |
| | |
| | Args: |
| | output: Resposta do agente (pode ser string, lista ou dict) |
| | |
| | Returns: |
| | String limpa com o texto da resposta |
| | """ |
| | try: |
| | |
| | if isinstance(output, str): |
| | return output |
| |
|
| | |
| | if isinstance(output, list): |
| | text_parts = [] |
| | for item in output: |
| | if isinstance(item, dict) and 'text' in item: |
| | text_parts.append(item['text']) |
| | elif isinstance(item, str): |
| | text_parts.append(item) |
| |
|
| | if text_parts: |
| | return '\n'.join(text_parts) |
| |
|
| | |
| | if isinstance(output, dict): |
| | if 'text' in output: |
| | return output['text'] |
| | elif 'content' in output: |
| | return str(output['content']) |
| |
|
| | |
| | return str(output) |
| |
|
| | except Exception as e: |
| | logging.warning(f"Erro ao extrair texto da resposta: {e}") |
| | return str(output) |
| |
|
| | async def execute_query(self, instruction: str) -> dict: |
| | """ |
| | Executa uma query através do agente SQL com retry para rate limiting |
| | |
| | Args: |
| | instruction: Instrução para o agente |
| | |
| | Returns: |
| | Resultado da execução |
| | """ |
| | try: |
| | logging.info("------- Agent SQL: Executando query -------") |
| |
|
| | |
| | sql_handler = SQLQueryCaptureHandler() |
| |
|
| | |
| | model_id = getattr(self, 'model_name', '') |
| | is_claude = any(claude_model in model_id for claude_model in ANTHROPIC_MODELS) |
| |
|
| | if is_claude: |
| | |
| | response = await retry_with_backoff( |
| | lambda: self.agent.invoke( |
| | {"input": instruction}, |
| | {"callbacks": [sql_handler]} |
| | ), |
| | max_retries=3, |
| | base_delay=2.0 |
| | ) |
| | else: |
| | |
| | response = self.agent.invoke( |
| | {"input": instruction}, |
| | {"callbacks": [sql_handler]} |
| | ) |
| |
|
| | |
| | raw_output = response.get("output", "Erro ao obter a resposta do agente.") |
| | clean_output = self._extract_text_from_claude_response(raw_output) |
| |
|
| | |
| | sql_query = sql_handler.get_last_sql_query() |
| |
|
| | result = { |
| | "output": clean_output, |
| | "intermediate_steps": response.get("intermediate_steps", []), |
| | "success": True, |
| | "sql_query": sql_query, |
| | "all_sql_queries": sql_handler.get_all_sql_queries() |
| | } |
| |
|
| | logging.info(f"Query executada com sucesso: {result['output'][:100]}...") |
| | return result |
| |
|
| | except Exception as e: |
| | error_str = str(e) |
| |
|
| | |
| | if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): |
| | error_msg = ( |
| | "🚫 **API da Anthropic temporariamente sobrecarregada**\n\n" |
| | "A API do Claude está com muitas solicitações no momento. " |
| | "Por favor, aguarde alguns minutos e tente novamente.\n\n" |
| | "**Sugestões:**\n" |
| | "- Aguarde 2-3 minutos antes de tentar novamente\n" |
| | "- Considere usar um modelo OpenAI temporariamente\n" |
| | "- Tente novamente em horários de menor movimento\n\n" |
| | f"*Erro técnico: {e}*" |
| | ) |
| | else: |
| | error_msg = f"Erro ao consultar o agente SQL: {e}" |
| |
|
| | logging.error(error_msg) |
| | return { |
| | "output": error_msg, |
| | "intermediate_steps": [], |
| | "success": False |
| | } |
| | |
| | def get_agent_info(self) -> dict: |
| | """ |
| | Retorna informações sobre o agente atual |
| | |
| | Returns: |
| | Dicionário com informações do agente |
| | """ |
| | return { |
| | "model_name": self.model_name, |
| | "max_iterations": MAX_ITERATIONS, |
| | "temperature": TEMPERATURE, |
| | "database_tables": self.db.get_usable_table_names() if self.db else [], |
| | "agent_type": "openai-tools" |
| | } |
| | |
| | def validate_agent(self) -> bool: |
| | """ |
| | Valida se o agente está funcionando corretamente |
| | |
| | Returns: |
| | True se válido, False caso contrário |
| | """ |
| | try: |
| | |
| | test_result = self.agent.invoke({ |
| | "input": "Quantas linhas existem na tabela?" |
| | }) |
| | |
| | success = "output" in test_result and test_result["output"] |
| | logging.info(f"Validação do agente: {'Sucesso' if success else 'Falha'}") |
| | return success |
| | |
| | except Exception as e: |
| | logging.error(f"Erro na validação do agente: {e}") |
| | return False |
| |
|
| | def get_default_sql_agent(db: SQLDatabase) -> SQLAgentManager: |
| | """ |
| | Cria um agente SQL com configurações padrão |
| | |
| | Args: |
| | db: Objeto SQLDatabase |
| | |
| | Returns: |
| | SQLAgentManager configurado |
| | """ |
| | return SQLAgentManager(db) |
| |
|