|
|
""" |
|
|
Criação e configuração do agente SQL |
|
|
""" |
|
|
import logging |
|
|
import time |
|
|
import asyncio |
|
|
from typing import Optional, Dict, Any, List |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain_anthropic import ChatAnthropic |
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
from langchain_community.agent_toolkits import create_sql_agent |
|
|
from langchain_community.utilities import SQLDatabase |
|
|
from langchain.callbacks.base import BaseCallbackHandler |
|
|
from langchain.schema import AgentAction, AgentFinish |
|
|
|
|
|
|
|
|
from utils.config import ( |
|
|
MAX_ITERATIONS, |
|
|
TEMPERATURE, |
|
|
AVAILABLE_MODELS, |
|
|
OPENAI_MODELS, |
|
|
ANTHROPIC_MODELS, |
|
|
GOOGLE_MODELS |
|
|
) |
|
|
|
|
|
class SQLQueryCaptureHandler(BaseCallbackHandler): |
|
|
""" |
|
|
Handler para capturar queries SQL executadas pelo agente |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.sql_queries: List[str] = [] |
|
|
self.agent_actions: List[Dict[str, Any]] = [] |
|
|
self.step_count = 0 |
|
|
|
|
|
def on_agent_action(self, action: AgentAction, **kwargs) -> None: |
|
|
""" |
|
|
Captura ações do agente, especialmente queries SQL |
|
|
|
|
|
Args: |
|
|
action: Ação do agente |
|
|
""" |
|
|
try: |
|
|
self.step_count += 1 |
|
|
tool_name = action.tool |
|
|
tool_input = action.tool_input |
|
|
|
|
|
|
|
|
if tool_name == 'sql_db_query' and isinstance(tool_input, dict): |
|
|
sql_query = tool_input.get('query', '') |
|
|
if sql_query and sql_query.strip(): |
|
|
clean_query = sql_query.strip() |
|
|
self.sql_queries.append(clean_query) |
|
|
|
|
|
|
|
|
logging.info(f"[SQL_HANDLER] 🔍 Query SQL capturada:\n{clean_query}") |
|
|
|
|
|
|
|
|
self.agent_actions.append({ |
|
|
"step": self.step_count, |
|
|
"tool": tool_name, |
|
|
"input": tool_input, |
|
|
"timestamp": time.time() |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"[SQL_HANDLER] Erro ao capturar ação: {e}") |
|
|
|
|
|
def get_last_sql_query(self) -> Optional[str]: |
|
|
""" |
|
|
Retorna a última query SQL capturada |
|
|
|
|
|
Returns: |
|
|
Última query SQL ou None se não houver |
|
|
""" |
|
|
return self.sql_queries[-1] if self.sql_queries else None |
|
|
|
|
|
def get_all_sql_queries(self) -> List[str]: |
|
|
""" |
|
|
Retorna todas as queries SQL capturadas |
|
|
|
|
|
Returns: |
|
|
Lista de queries SQL |
|
|
""" |
|
|
return self.sql_queries.copy() |
|
|
|
|
|
def reset(self): |
|
|
"""Reseta o handler para nova execução""" |
|
|
self.sql_queries.clear() |
|
|
self.agent_actions.clear() |
|
|
self.step_count = 0 |
|
|
|
|
|
async def retry_with_backoff(func, max_retries=3, base_delay=1.0): |
|
|
""" |
|
|
Executa função com retry e backoff exponencial para lidar com rate limiting |
|
|
|
|
|
Args: |
|
|
func: Função a ser executada |
|
|
max_retries: Número máximo de tentativas |
|
|
base_delay: Delay base em segundos |
|
|
|
|
|
Returns: |
|
|
Resultado da função ou levanta exceção após esgotar tentativas |
|
|
""" |
|
|
for attempt in range(max_retries + 1): |
|
|
try: |
|
|
return func() |
|
|
except Exception as e: |
|
|
error_str = str(e) |
|
|
|
|
|
|
|
|
if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): |
|
|
if attempt < max_retries: |
|
|
delay = base_delay * (2 ** attempt) |
|
|
logging.warning(f"API sobrecarregada (tentativa {attempt + 1}/{max_retries + 1}). Aguardando {delay}s...") |
|
|
await asyncio.sleep(delay) |
|
|
continue |
|
|
else: |
|
|
logging.error(f"API continua sobrecarregada após {max_retries + 1} tentativas") |
|
|
raise Exception(f"API da Anthropic sobrecarregada. Tente novamente em alguns minutos. Erro original: {e}") |
|
|
else: |
|
|
|
|
|
raise e |
|
|
|
|
|
|
|
|
raise Exception("Número máximo de tentativas excedido") |
|
|
|
|
|
|
|
|
|
|
|
def create_sql_agent_executor(db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None): |
|
|
""" |
|
|
Cria um agente SQL usando LangChain com suporte a diferentes provedores |
|
|
|
|
|
Args: |
|
|
db: Objeto SQLDatabase do LangChain |
|
|
model_name: Nome do modelo a usar (OpenAI, Anthropic) |
|
|
single_table_mode: Se deve restringir a uma única tabela |
|
|
selected_table: Tabela específica para modo único |
|
|
|
|
|
Returns: |
|
|
Agente SQL configurado |
|
|
""" |
|
|
try: |
|
|
|
|
|
if single_table_mode and selected_table: |
|
|
|
|
|
restricted_db = SQLDatabase.from_uri( |
|
|
db._engine.url, |
|
|
include_tables=[selected_table] |
|
|
) |
|
|
logging.info(f"[SQL_AGENT] Criando agente em modo tabela única: {selected_table}") |
|
|
db_to_use = restricted_db |
|
|
else: |
|
|
|
|
|
logging.info("[SQL_AGENT] Criando agente em modo multi-tabela") |
|
|
db_to_use = db |
|
|
|
|
|
|
|
|
model_id = AVAILABLE_MODELS.get(model_name, model_name) |
|
|
|
|
|
|
|
|
if model_id in OPENAI_MODELS: |
|
|
|
|
|
if model_id == "o3-mini": |
|
|
|
|
|
llm = ChatOpenAI(model=model_id) |
|
|
else: |
|
|
|
|
|
llm = ChatOpenAI(model=model_id, temperature=TEMPERATURE) |
|
|
|
|
|
agent_type = "openai-tools" |
|
|
|
|
|
elif model_id in ANTHROPIC_MODELS: |
|
|
|
|
|
llm = ChatAnthropic( |
|
|
model=model_id, |
|
|
temperature=TEMPERATURE, |
|
|
max_tokens=4096, |
|
|
max_retries=2, |
|
|
timeout=60.0 |
|
|
) |
|
|
agent_type = "tool-calling" |
|
|
|
|
|
elif model_id in GOOGLE_MODELS: |
|
|
|
|
|
llm = ChatGoogleGenerativeAI( |
|
|
model=model_id, |
|
|
temperature=TEMPERATURE, |
|
|
max_tokens=4096, |
|
|
max_retries=2, |
|
|
timeout=60.0 |
|
|
) |
|
|
agent_type = "tool-calling" |
|
|
|
|
|
else: |
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model="gpt-4o-mini", |
|
|
temperature=TEMPERATURE |
|
|
) |
|
|
agent_type = "openai-tools" |
|
|
logging.warning(f"Modelo {model_name} não reconhecido, usando gpt-4o-mini como fallback") |
|
|
|
|
|
|
|
|
sql_agent = create_sql_agent( |
|
|
llm=llm, |
|
|
db=db_to_use, |
|
|
agent_type=agent_type, |
|
|
verbose=True, |
|
|
max_iterations=MAX_ITERATIONS, |
|
|
return_intermediate_steps=True, |
|
|
top_k=10 |
|
|
) |
|
|
|
|
|
logging.info(f"Agente SQL criado com sucesso usando modelo {model_name} ({model_id}) com agent_type={agent_type}") |
|
|
return sql_agent |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Erro ao criar agente SQL: {e}") |
|
|
raise |
|
|
|
|
|
class SQLAgentManager: |
|
|
""" |
|
|
Gerenciador do agente SQL com funcionalidades avançadas |
|
|
""" |
|
|
|
|
|
def __init__(self, db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None): |
|
|
self.db = db |
|
|
self.model_name = model_name |
|
|
self.single_table_mode = single_table_mode |
|
|
self.selected_table = selected_table |
|
|
self.agent = None |
|
|
self._initialize_agent() |
|
|
|
|
|
def _initialize_agent(self): |
|
|
"""Inicializa o agente SQL""" |
|
|
self.agent = create_sql_agent_executor(self.db, self.model_name, self.single_table_mode, self.selected_table) |
|
|
|
|
|
def recreate_agent(self, new_db: SQLDatabase = None, new_model: str = None, single_table_mode: bool = None, selected_table: str = None): |
|
|
""" |
|
|
Recria o agente com novos parâmetros |
|
|
|
|
|
Args: |
|
|
new_db: Novo banco de dados (opcional) |
|
|
new_model: Novo modelo (opcional) |
|
|
single_table_mode: Novo modo de tabela (opcional) |
|
|
selected_table: Nova tabela selecionada (opcional) |
|
|
""" |
|
|
if new_db: |
|
|
self.db = new_db |
|
|
if new_model: |
|
|
self.model_name = new_model |
|
|
if single_table_mode is not None: |
|
|
self.single_table_mode = single_table_mode |
|
|
if selected_table is not None: |
|
|
self.selected_table = selected_table |
|
|
|
|
|
self._initialize_agent() |
|
|
mode_info = f"modo {'tabela única' if self.single_table_mode else 'multi-tabela'}" |
|
|
logging.info(f"Agente SQL recriado com modelo {self.model_name} em {mode_info}") |
|
|
|
|
|
def _extract_text_from_claude_response(self, output) -> str: |
|
|
""" |
|
|
Extrai texto limpo da resposta do Claude que pode vir em formato complexo |
|
|
|
|
|
Args: |
|
|
output: Resposta do agente (pode ser string, lista ou dict) |
|
|
|
|
|
Returns: |
|
|
String limpa com o texto da resposta |
|
|
""" |
|
|
try: |
|
|
|
|
|
if isinstance(output, str): |
|
|
return output |
|
|
|
|
|
|
|
|
if isinstance(output, list): |
|
|
text_parts = [] |
|
|
for item in output: |
|
|
if isinstance(item, dict) and 'text' in item: |
|
|
text_parts.append(item['text']) |
|
|
elif isinstance(item, str): |
|
|
text_parts.append(item) |
|
|
|
|
|
if text_parts: |
|
|
return '\n'.join(text_parts) |
|
|
|
|
|
|
|
|
if isinstance(output, dict): |
|
|
if 'text' in output: |
|
|
return output['text'] |
|
|
elif 'content' in output: |
|
|
return str(output['content']) |
|
|
|
|
|
|
|
|
return str(output) |
|
|
|
|
|
except Exception as e: |
|
|
logging.warning(f"Erro ao extrair texto da resposta: {e}") |
|
|
return str(output) |
|
|
|
|
|
async def execute_query(self, instruction: str) -> dict: |
|
|
""" |
|
|
Executa uma query através do agente SQL com retry para rate limiting |
|
|
|
|
|
Args: |
|
|
instruction: Instrução para o agente |
|
|
|
|
|
Returns: |
|
|
Resultado da execução |
|
|
""" |
|
|
try: |
|
|
logging.info("------- Agent SQL: Executando query -------") |
|
|
|
|
|
|
|
|
sql_handler = SQLQueryCaptureHandler() |
|
|
|
|
|
|
|
|
model_id = getattr(self, 'model_name', '') |
|
|
is_claude = any(claude_model in model_id for claude_model in ANTHROPIC_MODELS) |
|
|
is_gemini = any(gemini_model in model_id for gemini_model in GOOGLE_MODELS) |
|
|
|
|
|
if is_claude or is_gemini: |
|
|
|
|
|
response = await retry_with_backoff( |
|
|
lambda: self.agent.invoke( |
|
|
{"input": instruction}, |
|
|
{"callbacks": [sql_handler]} |
|
|
), |
|
|
max_retries=3, |
|
|
base_delay=2.0 |
|
|
) |
|
|
else: |
|
|
|
|
|
response = self.agent.invoke( |
|
|
{"input": instruction}, |
|
|
{"callbacks": [sql_handler]} |
|
|
) |
|
|
|
|
|
|
|
|
raw_output = response.get("output", "Erro ao obter a resposta do agente.") |
|
|
clean_output = self._extract_text_from_claude_response(raw_output) |
|
|
|
|
|
|
|
|
sql_query = sql_handler.get_last_sql_query() |
|
|
|
|
|
result = { |
|
|
"output": clean_output, |
|
|
"intermediate_steps": response.get("intermediate_steps", []), |
|
|
"success": True, |
|
|
"sql_query": sql_query, |
|
|
"all_sql_queries": sql_handler.get_all_sql_queries() |
|
|
} |
|
|
|
|
|
logging.info(f"Query executada com sucesso: {result['output'][:100]}...") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_str = str(e) |
|
|
|
|
|
|
|
|
if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): |
|
|
error_msg = ( |
|
|
"🚫 **API da Anthropic temporariamente sobrecarregada**\n\n" |
|
|
"A API do Claude está com muitas solicitações no momento. " |
|
|
"Por favor, aguarde alguns minutos e tente novamente.\n\n" |
|
|
"**Sugestões:**\n" |
|
|
"- Aguarde 2-3 minutos antes de tentar novamente\n" |
|
|
"- Considere usar um modelo OpenAI temporariamente\n" |
|
|
"- Tente novamente em horários de menor movimento\n\n" |
|
|
f"*Erro técnico: {e}*" |
|
|
) |
|
|
else: |
|
|
error_msg = f"Erro ao consultar o agente SQL: {e}" |
|
|
|
|
|
logging.error(error_msg) |
|
|
return { |
|
|
"output": error_msg, |
|
|
"intermediate_steps": [], |
|
|
"success": False |
|
|
} |
|
|
|
|
|
def get_agent_info(self) -> dict: |
|
|
""" |
|
|
Retorna informações sobre o agente atual |
|
|
|
|
|
Returns: |
|
|
Dicionário com informações do agente |
|
|
""" |
|
|
return { |
|
|
"model_name": self.model_name, |
|
|
"max_iterations": MAX_ITERATIONS, |
|
|
"temperature": TEMPERATURE, |
|
|
"database_tables": self.db.get_usable_table_names() if self.db else [], |
|
|
"agent_type": "openai-tools" |
|
|
} |
|
|
|
|
|
def validate_agent(self) -> bool: |
|
|
""" |
|
|
Valida se o agente está funcionando corretamente |
|
|
|
|
|
Returns: |
|
|
True se válido, False caso contrário |
|
|
""" |
|
|
try: |
|
|
|
|
|
test_result = self.agent.invoke({ |
|
|
"input": "Quantas linhas existem na tabela?" |
|
|
}) |
|
|
|
|
|
success = "output" in test_result and test_result["output"] |
|
|
logging.info(f"Validação do agente: {'Sucesso' if success else 'Falha'}") |
|
|
return success |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Erro na validação do agente: {e}") |
|
|
return False |
|
|
|
|
|
def get_default_sql_agent(db: SQLDatabase) -> SQLAgentManager: |
|
|
""" |
|
|
Cria um agente SQL com configurações padrão |
|
|
|
|
|
Args: |
|
|
db: Objeto SQLDatabase |
|
|
|
|
|
Returns: |
|
|
SQLAgentManager configurado |
|
|
""" |
|
|
return SQLAgentManager(db) |
|
|
|