ClinicaChain / app.py
rwayz's picture
Update app.py
851f35e verified
import os
import time
import json
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy.types import DateTime, Float, Integer, Boolean, String
from langchain_openai import ChatOpenAI
from langchain_community.agent_toolkits import create_sql_agent
from langchain_community.utilities import SQLDatabase
from huggingface_hub import InferenceClient
import gradio as gr
from dotenv import load_dotenv
import logging
import matplotlib.pyplot as plt
import seaborn as sns
import io
import re
from PIL import Image
import matplotlib.dates as mdates
load_dotenv()
CSV_FILE_PATH = "clinicas.csv"
SQL_DB_PATH = "clinicas.db"
HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
LLAMA_MODEL = "meta-llama/Llama-3.3-70B-Instruct"
hf_client = InferenceClient(provider="together", api_key=HUGGINGFACE_API_KEY)
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
query_cache = {}
history_log = []
recent_history = []
show_history_flag = False
advanced_mode_enabled = False
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configurar matplotlib para suporte a caracteres especiais
plt.rcParams['font.family'] = 'DejaVu Sans'
def create_or_load_sql_database(csv_path, sql_db_path):
if os.path.exists(sql_db_path):
print("🟢 Banco de dados SQL já existe. Carregando...")
return create_engine(f"sqlite:///{sql_db_path}")
print("🔧 Banco de dados SQL não encontrado. Iniciando criação...")
# --- ETAPA 1: DETECÇÃO DE ENCODING ---
print("🔍 Detectando encoding...")
possible_encodings = ['utf-8', 'latin1', 'cp1252']
encoding = None
for enc in possible_encodings:
try:
with open(csv_path, 'r', encoding=enc) as f:
f.read(1000)
encoding = enc
print(f"✅ Encoding detectado: {encoding}")
break
except UnicodeDecodeError:
continue
if encoding is None:
encoding = 'utf-8'
print("⚠️ Não foi possível detectar encoding. Usando UTF-8 como fallback.")
# --- ETAPA 2: DETECÇÃO DE DELIMITADOR ---
print("🔍 Detectando delimitador...")
detected_delimiter = None
with open(csv_path, 'r', encoding=encoding, errors="replace") as f:
sample = f.read(5000)
try:
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample, delimiters=";,|\t:")
detected_delimiter = dialect.delimiter
print(f"✅ Delimitador detectado: '{detected_delimiter}'")
except Exception:
print("⚠️ Falha na detecção. Usando ';' como fallback.")
detected_delimiter = ";"
# --- ETAPA 3: TRANSFORMAÇÃO DE DELIMITADOR (se necessário) ---
if detected_delimiter != ";":
print(f"🔄 Convertendo CSV de delimitador '{detected_delimiter}' para ';'")
with open(csv_path, 'r', encoding=encoding, errors="replace") as infile:
reader = csv.reader(infile, delimiter=detected_delimiter)
temp_fd, temp_path = tempfile.mkstemp(suffix=".csv")
os.close(temp_fd)
with open(temp_path, 'w', encoding='utf-8', newline='') as outfile:
writer = csv.writer(outfile, delimiter=";")
for row in reader:
writer.writerow(row)
csv_path = temp_path
encoding = 'utf-8'
print(f"✅ Novo CSV salvo temporariamente: {csv_path}")
else:
print("✅ Delimitador já é ';', não é necessária conversão.")
# --- ETAPA 4: LEITURA DO CSV ---
print("📥 Lendo CSV com delimitador padronizado ';'...")
df = pd.read_csv(
csv_path,
sep=";",
encoding=encoding,
on_bad_lines="skip",
low_memory=False,
dtype=str
)
# --- ETAPA 5: LIMPEZA E INFERÊNCIA DE TIPOS ---
df.columns = (
df.columns.astype(str)
.str.strip()
.str.replace(r"[^\w\s]", "_", regex=True)
.str.replace(r"\s+", "_", regex=True)
.str.replace(r"_+", "_", regex=True)
.str.strip("_")
)
print("🧹 Limpando e inferindo tipos das colunas...")
inferred_sql_types = {}
for col in df.columns:
series = df[col].astype(str).str.strip()
if any(keyword in col.lower() for keyword in ["data", "agendamento", "dt_", "horario"]):
try:
dt = pd.to_datetime(series, errors="coerce", dayfirst=True)
if dt.notna().mean() > 0.6:
df[col] = dt
inferred_sql_types[col] = DateTime()
print(f"🕒 {col}: detectado como DateTime")
continue
except:
pass
if series.str.lower().isin(["true", "false", "1", "0"]).mean() > 0.9:
df[col] = series.str.lower().map({"true": True, "false": False, "1": True, "0": False})
inferred_sql_types[col] = Boolean()
print(f"🔘 {col}: detectado como Boolean")
continue
numeric_series = pd.to_numeric(series.str.replace(",", "."), errors="coerce")
numeric_ratio = numeric_series.notna().mean()
if numeric_ratio > 0.9:
if (numeric_series.dropna() == numeric_series.dropna().astype(int)).all():
df[col] = numeric_series.astype("Int64")
inferred_sql_types[col] = Integer()
print(f"🔢 {col}: detectado como Integer")
else:
df[col] = numeric_series.astype("float64")
inferred_sql_types[col] = Float()
print(f"📐 {col}: detectado como Float")
else:
df[col] = series.astype("string")
inferred_sql_types[col] = String()
print(f"🔤 {col}: definido como String")
print("📦 Gerando SQLite...")
engine = create_engine(f"sqlite:///{sql_db_path}")
df.to_sql("base_dados_clinicas", engine, index=False, if_exists="replace", dtype=inferred_sql_types)
print(f"✅ Banco de dados criado com sucesso em '{sql_db_path}' com {len(df)} linhas e {len(df.columns)} colunas.")
return engine
engine = create_or_load_sql_database(CSV_FILE_PATH, SQL_DB_PATH)
db = SQLDatabase(engine=engine)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)
sql_agent = create_sql_agent(llm, db=db, agent_type="openai-tools", verbose=True, max_iterations=50, return_intermediate_steps=True)
def generate_initial_context(db_sample):
return (
f"Você é um assistente que gera queries SQL objetivas e eficientes. Sempre inclua LIMIT 20 nas queries. Aqui está o banco de dados:\n\n"
f"Exemplos do banco de dados:\n{db_sample.head().to_string(index=False)}\n\n"
"\n***IMPORTANTE***: Detecte automaticamente o idioma da pergunta do usuário e responda sempre no mesmo idioma."
"\nEsta base contém dados de clínicas médicas e odontológicas, com informações sobre agendamentos, taxas de faltas e sugestões de overbooking.\n"
"Cada linha representa um horário de agendamento em uma clínica para uma especialidade específica, incluindo recursos disponíveis, agendamentos confirmados e várias métricas relacionadas a faltas e overbooking.\n"
"\nInformações importantes:\n"
"- Use `LIKE '%<palavras-chave>%'` para buscas e comparações em colunas de texto.\n"
"- Quando o usuário mencionar uma clínica, procure na coluna: `nome_estabelecimento`.\n"
"- Quando o usuário mencionar uma especialidade, procure na coluna: `nome_especialidade`.\n"
"- Para consultas relacionadas a datas, use a coluna `data_agendamento` no formato YYYY-MM-DD.\n"
"- Para consultas relacionadas a horários, use a coluna `horario_agendamento` que contém valores inteiros (8, 9, 10, etc.).\n"
"- As colunas com prefixo `taxa_media_falta` contêm percentuais de faltas em diferentes períodos.\n"
"- As colunas com prefixo `sugestao_overbooking` contêm recomendações de overbooking em diferentes cenários.\n"
"- Nunca altere ou abrevia a pergunta do usuário.\n"
"- Se o usuário solicitar um gráfico, gere uma query SQL que retorne dados apropriados para visualização.\n"
"- Você só pode incluir o gráfico se for solicitado explicitamente na pergunta do usuário. \n"
"- Se não tiver a palavra gráfico na pergunta do usuário não é para gerar nenhum gráfico, jamais."
"- Você está usando um banco de dados SQLite.\n"
"- O nome da tabela é 'base_dados_clinicas', sempre inclua o nome correto nas Querys.\n"
"\nRetorne apenas a pergunta e a query SQL mais eficiente para entregar ao agent SQL do LangChain para gerar uma resposta para a pergunta. O formato deve ser:\n"
"\nPergunta: <pergunta do usuário>\n"
"\nOpção de Query SQL:\n<query SQL>"
"\nIdioma: <idioma>"
"\nGráfico: <sim/não>"
)
def generate_graph_type_context(user_query, sql_query, df_columns, df_sample):
# Criar uma descrição dos dados para ajudar a LLM a entender melhor a estrutura
data_description = ""
if not df_sample.empty:
# Verificar tipos de dados
numeric_cols = df_sample.select_dtypes(include=['number']).columns.tolist()
date_cols = [col for col in df_sample.columns if 'data' in col.lower() or df_sample[col].dtype == 'datetime64[ns]']
categorical_cols = df_sample.select_dtypes(include=['object']).columns.tolist()
# Adicionar informações sobre os primeiros valores de cada coluna
data_description = "\nAmostra dos dados:\n"
data_description += df_sample.head(3).to_string()
# Adicionar informações sobre os tipos de dados
data_description += "\n\nTipos de colunas:"
if numeric_cols:
data_description += f"\n- Colunas numéricas: {', '.join(numeric_cols)}"
if date_cols:
data_description += f"\n- Colunas de data/tempo: {', '.join(date_cols)}"
if categorical_cols:
data_description += f"\n- Colunas categóricas: {', '.join(categorical_cols)}"
return (
f"Você é um especialista em visualização de dados que escolhe o tipo de gráfico mais adequado para representar dados.\n\n"
f"Pergunta do usuário: {user_query}\n\n"
f"Query SQL gerada:\n{sql_query}\n\n"
f"Colunas retornadas pela query: {', '.join(df_columns)}\n"
f"{data_description}\n\n"
"Escolha o tipo de gráfico mais adequado para visualizar esses dados. Considere os seguintes tipos de gráficos e suas aplicações:\n\n"
"1. Linha Simples → Ideal para mostrar tendências ao longo do tempo ou sequências. Use quando tiver uma coluna de data/tempo/sequência e uma coluna numérica.\n"
"2. Multilinhas → Ideal para comparar tendências de diferentes categorias ao longo do tempo. Use quando tiver uma coluna de data/tempo e múltiplas colunas numéricas, ou quando tiver uma coluna categórica que pode ser usada para agrupar os dados.\n"
"3. Área → Similar ao gráfico de linha, mas com área preenchida abaixo da linha. Ideal para mostrar volume ao longo do tempo. Use quando tiver uma coluna de data/tempo e uma coluna numérica.\n"
"4. Barras Verticais → Ideal para comparar valores entre diferentes categorias. Use quando tiver uma coluna categórica e uma coluna numérica.\n"
"5. Barras Horizontais → Similar às barras verticais, mas melhor quando há muitas categorias ou nomes longos. Use quando tiver uma coluna categórica e uma coluna numérica.\n"
"6. Barras Agrupadas → Ideal para comparar valores de múltiplas categorias. Use quando tiver uma coluna categórica e múltiplas colunas numéricas para comparação.\n"
"7. Barras Empilhadas → Ideal para mostrar partes de um todo por categoria. Use quando tiver uma coluna categórica e múltiplas colunas numéricas que representam partes de um todo.\n"
"8. Pizza Simples → Ideal para mostrar proporções de um todo. Use quando tiver uma coluna categórica e uma coluna numérica, com poucas categorias (máximo 7).\n"
"9. Dona → Similar ao gráfico de pizza, mas com um espaço no centro. Melhor para visualizar proporções quando há muitas categorias.\n"
"10. Pizzas Múltiplas → Ideal para comparar proporções entre diferentes grupos. Use quando tiver duas colunas categóricas e uma coluna numérica.\n\n"
"Analise cuidadosamente a pergunta do usuário e os dados retornados. Escolha o tipo de gráfico que melhor representa a informação que o usuário está buscando.\n\n"
"Responda apenas com o número do tipo de gráfico mais adequado (1-10). Não inclua explicações ou texto adicional."
)
def is_greeting(user_query):
greetings = ["olá", "oi", "bom dia", "boa tarde", "boa noite", "oi, tudo bem?"]
return user_query.lower().strip() in greetings
def query_with_llama(user_query, db_sample):
initial_context = generate_initial_context(db_sample)
formatted_history = "\n".join([
f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history[-2:]
])
full_prompt = f"{initial_context}\n\nHistórico recente:\n{formatted_history}\n\nPergunta do usuário:\n{user_query}"
logging.info(f"[DEBUG] Contexto enviado ao Llama:\n{full_prompt}\n")
start_time = time.time()
try:
response = hf_client.chat.completions.create(
model=LLAMA_MODEL,
messages=[{"role": "system", "content": full_prompt}],
max_tokens=900,
stream=False
)
llama_response = response["choices"][0]["message"]["content"]
end_time = time.time()
logging.info(f"[DEBUG] Resposta do Llama para o Agent SQL:\n{llama_response.strip()}\n[Tempo de execução: {end_time - start_time:.2f}s]\n")
return llama_response.strip()
except Exception as e:
logging.error(f"[ERRO] Falha ao interagir com o Llama: {e}")
return None
def get_graph_type_with_llm(user_query, sql_query, df):
"""Consulta a LLM para determinar o tipo de gráfico mais adequado."""
# Obter uma amostra dos dados para análise
df_sample = df.head(3)
graph_type_context = generate_graph_type_context(user_query, sql_query, df.columns.tolist(), df_sample)
logging.info(f"[DEBUG] Contexto enviado ao Llama para tipo de gráfico:\n{graph_type_context}\n")
try:
response = hf_client.chat.completions.create(
model=LLAMA_MODEL,
messages=[{"role": "system", "content": graph_type_context}],
max_tokens=10,
stream=False
)
llama_response = response["choices"][0]["message"]["content"].strip()
logging.info(f"[DEBUG] Resposta do Llama para tipo de gráfico: {llama_response}")
# Mapear a resposta numérica para o tipo de gráfico
graph_type_map = {
"1": "line_simple",
"2": "multiline",
"3": "area",
"4": "bar_vertical",
"5": "bar_horizontal",
"6": "bar_grouped",
"7": "bar_stacked",
"8": "pie",
"9": "donut",
"10": "pie_multiple"
}
# Extrair apenas o número da resposta
match = re.search(r"\b([1-9]|10)\b", llama_response)
if match:
graph_number = match.group(0)
graph_type = graph_type_map.get(graph_number, "bar_vertical") # Default para bar_vertical se não encontrar
logging.info(f"[DEBUG] Tipo de gráfico escolhido pela LLM: {graph_type} (número {graph_number})")
return graph_type
else:
logging.warning("[DEBUG] Não foi possível extrair um número válido da resposta da LLM")
return "bar_vertical" # Default para bar_vertical
except Exception as e:
logging.error(f"[ERRO] Falha ao consultar LLM para tipo de gráfico: {e}")
return "bar_vertical" # Default para bar_vertical em caso de erro
def refine_response_with_llm(user_question, sql_response):
prompt = (
f"Pergunta do usuário:\n{user_question}\n\n"
f"Resposta gerada pelo agente SQL:\n{sql_response}\n\n"
"Sua tarefa é refinar, complementar e melhorar a resposta."
)
logging.info(f"[DEBUG] Prompt enviado ao modelo de refinamento:\n{prompt}\n")
try:
response = hf_client.chat.completions.create(
model=LLAMA_MODEL,
messages=[{"role": "system", "content": prompt}],
max_tokens=1200,
stream=False
)
improved_response = response["choices"][0]["message"]["content"]
logging.info(f"[DEBUG] Resposta do modelo de refinamento:\n{improved_response}\n")
return improved_response
except Exception as e:
logging.error(f"[ERRO] Falha ao refinar resposta com LLM: {e}")
return sql_response
def extract_sql_query(llama_response):
"""Extrai e sanitiza a query SQL da resposta do Llama."""
if "Opção de Query SQL:" in llama_response:
parts = llama_response.split("Opção de Query SQL:")
if len(parts) > 1:
query_part = parts[1].strip()
# Remove qualquer ocorrência de blocos markdown ```sql ou ```
query_part = re.sub(r"```sql", "", query_part, flags=re.IGNORECASE)
query_part = re.sub(r"```", "", query_part)
query_part = query_part.strip()
# Remove textos adicionais se existirem
if "Idioma:" in query_part:
query_part = query_part.split("Idioma:")[0].strip()
if "Gráfico:" in query_part:
query_part = query_part.split("Gráfico:")[0].strip()
# Remove ponto e vírgula no final
if query_part.endswith(";"):
query_part = query_part[:-1].strip()
return query_part
return None
def should_generate_graph(llama_response):
"""Verifica se a resposta do Llama indica que um gráfico deve ser gerado."""
logging.info(f"[DEBUG] Verificando se deve gerar gráfico na resposta: {llama_response}")
# Usar expressão regular para encontrar "Gráfico: sim" independente de maiúsculas/minúsculas
match = re.search(r"Gr[áa]fico\s*:\s*[Ss][Ii][Mm]", llama_response)
if match:
logging.info(f"[DEBUG] Encontrou indicação de gráfico: {match.group(0)}")
return True
logging.info("[DEBUG] Não encontrou indicação de gráfico")
return False
def analyze_dataframe(df):
"""Analisa o DataFrame para determinar características importantes para visualização."""
analysis = {
"num_rows": len(df),
"num_cols": len(df.columns),
"numeric_cols": [],
"date_cols": [],
"categorical_cols": [],
"time_series": False,
"multi_numeric": False,
"has_categories": False
}
# Identificar tipos de colunas
for col in df.columns:
# Verificar se é coluna numérica
if pd.api.types.is_numeric_dtype(df[col]):
analysis["numeric_cols"].append(col)
# Verificar se é coluna de data
elif pd.api.types.is_datetime64_any_dtype(df[col]) or 'data' in col.lower():
analysis["date_cols"].append(col)
analysis["time_series"] = True
# Caso contrário, considerar categórica
else:
analysis["categorical_cols"].append(col)
analysis["has_categories"] = True
# Verificar se há múltiplas colunas numéricas
if len(analysis["numeric_cols"]) > 1:
analysis["multi_numeric"] = True
# Tentar converter colunas de data que não foram detectadas automaticamente
for col in analysis["categorical_cols"]:
if 'data' in col.lower() or 'date' in col.lower():
try:
# Tentar converter para datetime
df[col] = pd.to_datetime(df[col], errors='coerce')
if not df[col].isna().all(): # Se pelo menos um valor foi convertido com sucesso
analysis["date_cols"].append(col)
analysis["categorical_cols"].remove(col)
analysis["time_series"] = True
except:
pass
return analysis
def prepare_data_for_graph(df, graph_type, user_query):
"""Prepara os dados para o gráfico, adaptando-os conforme necessário."""
logging.info(f"[DEBUG] Preparando dados para gráfico tipo {graph_type}")
# Verificar se o DataFrame está vazio
if df.empty:
logging.warning("[DEBUG] DataFrame vazio, não é possível preparar dados")
return df
# Fazer uma cópia para não modificar o original
prepared_df = df.copy()
# Analisar o DataFrame
analysis = analyze_dataframe(prepared_df)
logging.info(f"[DEBUG] Análise do DataFrame: {analysis}")
# Converter colunas de data para datetime se existirem
for col in prepared_df.columns:
if col in analysis["date_cols"] and not pd.api.types.is_datetime64_any_dtype(prepared_df[col]):
try:
prepared_df[col] = pd.to_datetime(prepared_df[col])
logging.info(f"[DEBUG] Convertida coluna {col} para datetime")
except:
logging.warning(f"[DEBUG] Não foi possível converter coluna {col} para datetime")
# Preparação específica para cada tipo de gráfico
if graph_type == 'line_simple':
# Para gráfico de linha simples, precisamos de uma coluna para o eixo x (preferencialmente data) e uma coluna numérica
if analysis["time_series"] and len(analysis["numeric_cols"]) > 0:
# Usar a primeira coluna de data como x e a primeira coluna numérica como y
x_col = analysis["date_cols"][0]
y_col = analysis["numeric_cols"][0]
# Ordenar por data
prepared_df = prepared_df.sort_values(by=x_col)
logging.info(f"[DEBUG] Dados ordenados pela coluna de data {x_col}")
# Selecionar apenas as colunas necessárias
prepared_df = prepared_df[[x_col, y_col]]
elif len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0:
# Se não tiver data, usar a primeira coluna categórica como x e a primeira numérica como y
x_col = analysis["categorical_cols"][0]
y_col = analysis["numeric_cols"][0]
# Ordenar por valor numérico para melhor visualização
prepared_df = prepared_df.sort_values(by=y_col)
logging.info(f"[DEBUG] Dados ordenados pela coluna numérica {y_col}")
# Selecionar apenas as colunas necessárias
prepared_df = prepared_df[[x_col, y_col]]
elif graph_type == 'multiline':
# Para gráfico de multilinhas, precisamos de uma coluna para o eixo x e múltiplas colunas numéricas
if analysis["time_series"] and analysis["multi_numeric"]:
# Usar a primeira coluna de data como x e todas as colunas numéricas
x_col = analysis["date_cols"][0]
# Ordenar por data
prepared_df = prepared_df.sort_values(by=x_col)
logging.info(f"[DEBUG] Dados ordenados pela coluna de data {x_col}")
# Selecionar apenas as colunas necessárias (data + todas numéricas)
cols_to_keep = [x_col] + analysis["numeric_cols"]
prepared_df = prepared_df[cols_to_keep]
elif len(analysis["categorical_cols"]) > 0 and analysis["multi_numeric"]:
# Se não tiver data, usar a primeira coluna categórica como x e todas as numéricas
x_col = analysis["categorical_cols"][0]
# Selecionar apenas as colunas necessárias (categórica + todas numéricas)
cols_to_keep = [x_col] + analysis["numeric_cols"]
prepared_df = prepared_df[cols_to_keep]
elif len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0:
# Se tiver duas colunas categóricas, podemos usar uma para agrupar
cat1 = analysis["categorical_cols"][0]
cat2 = analysis["categorical_cols"][1]
val_col = analysis["numeric_cols"][0]
# Criar um pivot para multilinhas
try:
pivot_df = prepared_df.pivot_table(index=cat1, columns=cat2, values=val_col, aggfunc='mean')
prepared_df = pivot_df.reset_index()
logging.info(f"[DEBUG] Criado pivot com índice={cat1}, colunas={cat2}, valores={val_col}")
except Exception as e:
logging.error(f"[ERRO] Falha ao criar pivot para multilinhas: {e}")
elif graph_type == 'area':
# Similar ao gráfico de linha simples
if analysis["time_series"] and len(analysis["numeric_cols"]) > 0:
x_col = analysis["date_cols"][0]
y_col = analysis["numeric_cols"][0]
# Ordenar por data
prepared_df = prepared_df.sort_values(by=x_col)
# Selecionar apenas as colunas necessárias
prepared_df = prepared_df[[x_col, y_col]]
elif len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0:
x_col = analysis["categorical_cols"][0]
y_col = analysis["numeric_cols"][0]
# Ordenar por valor numérico
prepared_df = prepared_df.sort_values(by=y_col)
# Selecionar apenas as colunas necessárias
prepared_df = prepared_df[[x_col, y_col]]
elif graph_type in ['bar_vertical', 'bar_horizontal']:
# Para gráficos de barras, precisamos de uma coluna categórica e uma numérica
if len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0:
x_col = analysis["categorical_cols"][0]
y_col = analysis["numeric_cols"][0]
# Ordenar por valor numérico em ordem decrescente
prepared_df = prepared_df.sort_values(by=y_col, ascending=False)
logging.info(f"[DEBUG] Dados ordenados pela coluna {y_col} em ordem decrescente")
# Limitar o número de categorias se for muito grande
if len(prepared_df) > 15 and graph_type == 'bar_vertical':
logging.info(f"[DEBUG] Limitando dados para gráfico de barras verticais (de {len(prepared_df)} para 15 linhas)")
prepared_df = prepared_df.head(15)
# Selecionar apenas as colunas necessárias
prepared_df = prepared_df[[x_col, y_col]]
elif graph_type == 'bar_grouped':
# Para barras agrupadas, precisamos de uma coluna categórica e múltiplas numéricas
if len(analysis["categorical_cols"]) > 0 and analysis["multi_numeric"]:
x_col = analysis["categorical_cols"][0]
# Limitar o número de categorias
if len(prepared_df) > 10:
logging.info(f"[DEBUG] Limitando dados para gráfico de barras agrupadas (de {len(prepared_df)} para 10 linhas)")
prepared_df = prepared_df.head(10)
# Selecionar apenas as colunas necessárias (categórica + todas numéricas)
cols_to_keep = [x_col] + analysis["numeric_cols"]
prepared_df = prepared_df[cols_to_keep]
elif len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0:
# Se tiver duas colunas categóricas, podemos usar uma para agrupar
cat1 = analysis["categorical_cols"][0]
cat2 = analysis["categorical_cols"][1]
val_col = analysis["numeric_cols"][0]
# Limitar o número de categorias
unique_cat1 = prepared_df[cat1].nunique()
unique_cat2 = prepared_df[cat2].nunique()
if unique_cat1 > 10 or unique_cat2 > 10:
logging.info(f"[DEBUG] Muitas categorias para barras agrupadas, limitando dados")
# Pegar as categorias mais frequentes
top_cat1 = prepared_df[cat1].value_counts().head(10).index.tolist()
top_cat2 = prepared_df[cat2].value_counts().head(10).index.tolist()
prepared_df = prepared_df[
prepared_df[cat1].isin(top_cat1) &
prepared_df[cat2].isin(top_cat2)
]
# Criar um pivot para barras agrupadas
try:
pivot_df = prepared_df.pivot_table(index=cat1, columns=cat2, values=val_col, aggfunc='mean')
prepared_df = pivot_df.reset_index()
logging.info(f"[DEBUG] Criado pivot com índice={cat1}, colunas={cat2}, valores={val_col}")
except Exception as e:
logging.error(f"[ERRO] Falha ao criar pivot para barras agrupadas: {e}")
elif graph_type == 'bar_stacked':
# Similar ao bar_grouped, mas para mostrar partes de um todo
if len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0:
cat1 = analysis["categorical_cols"][0]
cat2 = analysis["categorical_cols"][1]
val_col = analysis["numeric_cols"][0]
# Limitar o número de categorias
unique_cat1 = prepared_df[cat1].nunique()
unique_cat2 = prepared_df[cat2].nunique()
if unique_cat1 > 10 or unique_cat2 > 10:
logging.info(f"[DEBUG] Muitas categorias para barras empilhadas, limitando dados")
# Pegar as categorias mais frequentes
top_cat1 = prepared_df[cat1].value_counts().head(10).index.tolist()
top_cat2 = prepared_df[cat2].value_counts().head(10).index.tolist()
prepared_df = prepared_df[
prepared_df[cat1].isin(top_cat1) &
prepared_df[cat2].isin(top_cat2)
]
# Criar um pivot para barras empilhadas
try:
pivot_df = prepared_df.pivot_table(index=cat1, columns=cat2, values=val_col, aggfunc='mean')
prepared_df = pivot_df.reset_index()
logging.info(f"[DEBUG] Criado pivot com índice={cat1}, colunas={cat2}, valores={val_col}")
except Exception as e:
logging.error(f"[ERRO] Falha ao criar pivot para barras empilhadas: {e}")
elif len(analysis["categorical_cols"]) > 0 and analysis["multi_numeric"]:
# Se tiver uma coluna categórica e múltiplas numéricas
x_col = analysis["categorical_cols"][0]
# Limitar o número de categorias
if len(prepared_df) > 10:
logging.info(f"[DEBUG] Limitando dados para gráfico de barras empilhadas (de {len(prepared_df)} para 10 linhas)")
prepared_df = prepared_df.head(10)
# Selecionar apenas as colunas necessárias (categórica + todas numéricas)
cols_to_keep = [x_col] + analysis["numeric_cols"]
prepared_df = prepared_df[cols_to_keep]
elif graph_type in ['pie', 'donut']:
# Para gráficos de pizza/donut, precisamos de uma coluna categórica e uma numérica
if len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0:
cat_col = analysis["categorical_cols"][0]
val_col = analysis["numeric_cols"][0]
# Limitar o número de categorias para no máximo 10
if prepared_df[cat_col].nunique() > 10:
logging.info(f"[DEBUG] Limitando categorias para gráfico de pizza/donut")
# Agrupar por categoria e somar valores
grouped = prepared_df.groupby(cat_col)[val_col].sum().reset_index()
# Ordenar por valor e pegar as 9 maiores categorias
grouped = grouped.sort_values(by=val_col, ascending=False)
top_9 = grouped.head(9)
# Agrupar o resto como "Outros"
if len(grouped) > 9:
others_sum = grouped.iloc[9:][val_col].sum()
others_row = pd.DataFrame({cat_col: ['Outros'], val_col: [others_sum]})
prepared_df = pd.concat([top_9, others_row], ignore_index=True)
logging.info(f"[DEBUG] Criada categoria 'Outros' com valor {others_sum}")
else:
prepared_df = top_9
else:
# Agrupar por categoria e somar valores
prepared_df = prepared_df.groupby(cat_col)[val_col].sum().reset_index()
# Ordenar por valor em ordem decrescente
prepared_df = prepared_df.sort_values(by=val_col, ascending=False)
# Selecionar apenas as colunas necessárias
prepared_df = prepared_df[[cat_col, val_col]]
elif graph_type == 'pie_multiple':
# Para múltiplos gráficos de pizza, precisamos de duas colunas categóricas e uma numérica
if len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0:
cat1 = analysis["categorical_cols"][0]
cat2 = analysis["categorical_cols"][1]
val_col = analysis["numeric_cols"][0]
# Limitar o número de categorias
unique_cat1 = prepared_df[cat1].nunique()
unique_cat2 = prepared_df[cat2].nunique()
if unique_cat1 > 6 or unique_cat2 > 10:
logging.info(f"[DEBUG] Muitas categorias para múltiplos gráficos de pizza, limitando dados")
# Pegar as categorias mais frequentes
top_cat1 = prepared_df[cat1].value_counts().head(6).index.tolist()
top_cat2 = prepared_df[cat2].value_counts().head(10).index.tolist()
prepared_df = prepared_df[
prepared_df[cat1].isin(top_cat1) &
prepared_df[cat2].isin(top_cat2)
]
# Agrupar e somar valores
prepared_df = prepared_df.groupby([cat1, cat2])[val_col].sum().reset_index()
# Verificar se há palavras-chave na pergunta do usuário que possam ajudar a identificar colunas importantes
if user_query:
user_query_lower = user_query.lower()
# Procurar por colunas mencionadas na pergunta
for col in df.columns:
col_lower = col.lower()
# Verificar se o nome da coluna (ou parte dele) está na pergunta
if col_lower in user_query_lower or any(part in user_query_lower for part in col_lower.split('_')):
logging.info(f"[DEBUG] Coluna {col} mencionada na pergunta do usuário")
# Se for uma coluna numérica e o gráfico for de barras, linha ou área, priorizar esta coluna
if col in analysis["numeric_cols"] and graph_type in ['bar_vertical', 'bar_horizontal', 'line_simple', 'area']:
if len(analysis["categorical_cols"]) > 0:
cat_col = analysis["categorical_cols"][0]
prepared_df = df[[cat_col, col]].copy()
prepared_df = prepared_df.sort_values(by=col, ascending=False)
logging.info(f"[DEBUG] Priorizando coluna {col} mencionada na pergunta")
break
logging.info(f"[DEBUG] Dados preparados: {len(prepared_df)} linhas, colunas: {prepared_df.columns.tolist()}")
return prepared_df
def generate_graph(df, graph_type, title=None, user_query=None):
"""Gera um gráfico com base no DataFrame e tipo especificado."""
logging.info(f"[DEBUG] Iniciando geração de gráfico tipo {graph_type}. DataFrame vazio? {df.empty}")
if df.empty:
logging.info("[DEBUG] DataFrame vazio, não é possível gerar gráfico")
return None
# Preparar dados para o gráfico
prepared_df = prepare_data_for_graph(df, graph_type, user_query)
if prepared_df.empty:
logging.info("[DEBUG] DataFrame preparado está vazio, não é possível gerar gráfico")
return None
try:
# Analisar o DataFrame preparado
analysis = analyze_dataframe(prepared_df)
# Configurações gerais para todos os gráficos
plt.figure(figsize=(12, 8))
plt.title(title or "Visualização de Dados", fontsize=14)
# Definir cores atraentes
colors = plt.cm.tab10.colors
# Gerar gráfico com base no tipo
if graph_type == 'line_simple':
if len(prepared_df.columns) >= 2:
x_col = prepared_df.columns[0]
y_col = prepared_df.columns[1]
logging.info(f"[DEBUG] Criando gráfico de linha simples com x={x_col}, y={y_col}")
# Verificar se x é data
is_date = pd.api.types.is_datetime64_any_dtype(prepared_df[x_col])
# Criar gráfico de linha
plt.figure(figsize=(12, 6))
if is_date:
# Formatar eixo x para datas
plt.plot(prepared_df[x_col], prepared_df[y_col], marker='o', linewidth=2, color=colors[0])
plt.gcf().autofmt_xdate()
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y'))
else:
plt.plot(range(len(prepared_df)), prepared_df[y_col], marker='o', linewidth=2, color=colors[0])
plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right')
# Adicionar rótulos nos pontos
for i, y in enumerate(prepared_df[y_col]):
if isinstance(y, (int, float)):
plt.annotate(f'{y:.2f}', (i, y), textcoords="offset points",
xytext=(0, 5), ha='center', fontsize=9)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de linha simples")
return None
elif graph_type == 'multiline':
# Verificar se temos um DataFrame pivotado (primeira coluna categórica, resto numéricas)
if len(prepared_df.columns) >= 2:
x_col = prepared_df.columns[0]
# Verificar se x é data
is_date = pd.api.types.is_datetime64_any_dtype(prepared_df[x_col])
# Obter colunas numéricas (todas exceto a primeira)
y_cols = [col for col in prepared_df.columns[1:] if pd.api.types.is_numeric_dtype(prepared_df[col])]
if y_cols:
logging.info(f"[DEBUG] Criando gráfico de multilinhas com x={x_col}, y={y_cols}")
plt.figure(figsize=(12, 6))
# Plotar cada linha
for i, y_col in enumerate(y_cols):
if is_date:
plt.plot(prepared_df[x_col], prepared_df[y_col], marker='o', linewidth=2,
label=y_col, color=colors[i % len(colors)])
else:
plt.plot(range(len(prepared_df)), prepared_df[y_col], marker='o', linewidth=2,
label=y_col, color=colors[i % len(colors)])
if is_date:
plt.gcf().autofmt_xdate()
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y'))
else:
plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right')
plt.xlabel(x_col)
plt.ylabel("Valores")
plt.legend(title="Séries", loc='best')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
else:
logging.warning("[DEBUG] Não há colunas numéricas para gráfico de multilinhas")
# Fallback para linha simples
return generate_graph(df, 'line_simple', title, user_query)
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de multilinhas")
# Fallback para linha simples
return generate_graph(df, 'line_simple', title, user_query)
elif graph_type == 'area':
if len(prepared_df.columns) >= 2:
x_col = prepared_df.columns[0]
y_col = prepared_df.columns[1]
logging.info(f"[DEBUG] Criando gráfico de área com x={x_col}, y={y_col}")
# Verificar se x é data
is_date = pd.api.types.is_datetime64_any_dtype(prepared_df[x_col])
plt.figure(figsize=(12, 6))
if is_date:
# Formatar eixo x para datas
plt.fill_between(prepared_df[x_col], prepared_df[y_col], alpha=0.5, color=colors[0])
plt.plot(prepared_df[x_col], prepared_df[y_col], color=colors[0], linewidth=2)
plt.gcf().autofmt_xdate()
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y'))
else:
plt.fill_between(range(len(prepared_df)), prepared_df[y_col], alpha=0.5, color=colors[0])
plt.plot(range(len(prepared_df)), prepared_df[y_col], color=colors[0], linewidth=2)
plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right')
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de área")
# Fallback para linha simples
return generate_graph(df, 'line_simple', title, user_query)
elif graph_type == 'bar_vertical':
if len(prepared_df.columns) >= 2:
x_col = prepared_df.columns[0]
y_col = prepared_df.columns[1]
logging.info(f"[DEBUG] Criando gráfico de barras verticais com x={x_col}, y={y_col}")
plt.figure(figsize=(12, 8))
bars = plt.bar(range(len(prepared_df)), prepared_df[y_col], color=colors[0])
# Adicionar valores nas barras
for i, bar in enumerate(bars):
height = bar.get_height()
if isinstance(height, (int, float)):
plt.text(bar.get_x() + bar.get_width()/2., height + 0.02 * max(prepared_df[y_col]),
f'{height:.2f}', ha='center', fontsize=9)
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right')
plt.grid(True, linestyle='--', alpha=0.7, axis='y')
plt.tight_layout()
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de barras verticais")
return None
elif graph_type == 'bar_horizontal':
if len(prepared_df.columns) >= 2:
x_col = prepared_df.columns[0]
y_col = prepared_df.columns[1]
logging.info(f"[DEBUG] Criando gráfico de barras horizontais com x={x_col}, y={y_col}")
plt.figure(figsize=(12, max(6, len(prepared_df) * 0.4))) # Ajustar altura com base no número de categorias
bars = plt.barh(range(len(prepared_df)), prepared_df[y_col], color=colors[0])
# Adicionar valores nas barras
for i, bar in enumerate(bars):
width = bar.get_width()
if isinstance(width, (int, float)):
plt.text(width + 0.02 * max(prepared_df[y_col]), bar.get_y() + bar.get_height()/2.,
f'{width:.2f}', va='center', fontsize=9)
plt.xlabel(y_col)
plt.ylabel(x_col)
plt.yticks(range(len(prepared_df)), prepared_df[x_col])
plt.grid(True, linestyle='--', alpha=0.7, axis='x')
plt.tight_layout()
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de barras horizontais")
return None
elif graph_type == 'bar_grouped':
# Verificar se temos um DataFrame pivotado (primeira coluna categórica, resto numéricas)
if len(prepared_df.columns) >= 3:
x_col = prepared_df.columns[0]
# Obter colunas numéricas (todas exceto a primeira)
y_cols = [col for col in prepared_df.columns[1:] if pd.api.types.is_numeric_dtype(prepared_df[col])]
if y_cols:
logging.info(f"[DEBUG] Criando gráfico de barras agrupadas com x={x_col}, valores={y_cols}")
# Configurar largura e posições das barras
x = np.arange(len(prepared_df))
width = 0.8 / len(y_cols)
fig, ax = plt.subplots(figsize=(12, 8))
# Plotar cada grupo de barras
for i, col in enumerate(y_cols):
offset = width * i - width * (len(y_cols) - 1) / 2
bars = ax.bar(x + offset, prepared_df[col], width, label=col)
# Adicionar valores nas barras
for bar in bars:
height = bar.get_height()
if isinstance(height, (int, float)) and not np.isnan(height):
ax.text(bar.get_x() + bar.get_width()/2., height + 0.02 * prepared_df[y_cols].max().max(),
f'{height:.2f}', ha='center', fontsize=8)
ax.set_xlabel(x_col)
ax.set_ylabel('Valores')
ax.set_xticks(x)
ax.set_xticklabels(prepared_df[x_col], rotation=45, ha='right')
ax.legend()
ax.grid(True, linestyle='--', alpha=0.7, axis='y')
plt.tight_layout()
else:
logging.warning("[DEBUG] Não há colunas numéricas suficientes para gráfico de barras agrupadas")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de barras agrupadas")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
elif graph_type == 'bar_stacked':
# Verificar se temos um DataFrame pivotado (primeira coluna categórica, resto numéricas)
if len(prepared_df.columns) >= 3:
x_col = prepared_df.columns[0]
# Obter colunas numéricas (todas exceto a primeira)
y_cols = [col for col in prepared_df.columns[1:] if pd.api.types.is_numeric_dtype(prepared_df[col])]
if y_cols:
logging.info(f"[DEBUG] Criando gráfico de barras empilhadas com x={x_col}, valores={y_cols}")
fig, ax = plt.subplots(figsize=(12, 8))
# Criar barras empilhadas
bottom = np.zeros(len(prepared_df))
for i, col in enumerate(y_cols):
bars = ax.bar(range(len(prepared_df)), prepared_df[col], bottom=bottom, label=col)
# Adicionar valores nas barras
for j, bar in enumerate(bars):
height = bar.get_height()
if isinstance(height, (int, float)) and height > 0:
ax.text(bar.get_x() + bar.get_width()/2., bottom[j] + height/2,
f'{height:.2f}', ha='center', va='center', fontsize=8, color='white')
bottom += prepared_df[col].fillna(0)
ax.set_xlabel(x_col)
ax.set_ylabel('Valores')
ax.set_xticks(range(len(prepared_df)))
ax.set_xticklabels(prepared_df[x_col], rotation=45, ha='right')
ax.legend()
plt.tight_layout()
else:
logging.warning("[DEBUG] Não há colunas numéricas suficientes para gráfico de barras empilhadas")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de barras empilhadas")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
elif graph_type == 'pie':
if len(prepared_df.columns) >= 2:
label_col = prepared_df.columns[0]
value_col = prepared_df.columns[1]
logging.info(f"[DEBUG] Criando gráfico de pizza com labels={label_col}, valores={value_col}")
# Verificar se os valores são numéricos
if pd.api.types.is_numeric_dtype(prepared_df[value_col]):
# Remover valores negativos ou zero
prepared_df = prepared_df[prepared_df[value_col] > 0]
if prepared_df.empty:
logging.warning("[DEBUG] Não há valores positivos para gráfico de pizza")
return None
plt.figure(figsize=(10, 10))
# Calcular percentuais para os rótulos
total = prepared_df[value_col].sum()
labels = [f'{label} ({val:.2f}, {val/total:.1%})' for label, val in zip(prepared_df[label_col], prepared_df[value_col])]
plt.pie(prepared_df[value_col], labels=labels, autopct='%1.1f%%',
startangle=90, shadow=False, colors=colors[:len(prepared_df)])
plt.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle
plt.title(title or f"Distribuição de {value_col} por {label_col}")
plt.tight_layout()
else:
logging.warning(f"[DEBUG] Coluna {value_col} não é numérica para gráfico de pizza")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de pizza")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
elif graph_type == 'donut':
if len(prepared_df.columns) >= 2:
label_col = prepared_df.columns[0]
value_col = prepared_df.columns[1]
logging.info(f"[DEBUG] Criando gráfico de donut com labels={label_col}, valores={value_col}")
# Verificar se os valores são numéricos
if pd.api.types.is_numeric_dtype(prepared_df[value_col]):
# Remover valores negativos ou zero
prepared_df = prepared_df[prepared_df[value_col] > 0]
if prepared_df.empty:
logging.warning("[DEBUG] Não há valores positivos para gráfico de donut")
return None
plt.figure(figsize=(10, 10))
# Calcular percentuais para os rótulos
total = prepared_df[value_col].sum()
labels = [f'{label} ({val:.2f}, {val/total:.1%})' for label, val in zip(prepared_df[label_col], prepared_df[value_col])]
# Criar gráfico de donut (pizza com círculo central)
plt.pie(prepared_df[value_col], labels=labels, autopct='%1.1f%%',
startangle=90, shadow=False, colors=colors[:len(prepared_df)],
wedgeprops=dict(width=0.5)) # Largura do anel
plt.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle
plt.title(title or f"Distribuição de {value_col} por {label_col}")
plt.tight_layout()
else:
logging.warning(f"[DEBUG] Coluna {value_col} não é numérica para gráfico de donut")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
else:
logging.warning("[DEBUG] Dados insuficientes para gráfico de donut")
# Fallback para barras verticais
return generate_graph(df, 'bar_vertical', title, user_query)
elif graph_type == 'pie_multiple':
# Verificar se temos dados agrupados por duas categorias
if len(prepared_df.columns) >= 3:
cat1 = prepared_df.columns[0]
cat2 = prepared_df.columns[1]
val_col = prepared_df.columns[2]
logging.info(f"[DEBUG] Criando múltiplos gráficos de pizza com grupo={cat1}, categorias={cat2}, valor={val_col}")
# Verificar se o valor é numérico
if pd.api.types.is_numeric_dtype(prepared_df[val_col]):
# Agrupar dados
grouped = prepared_df.groupby([cat1, cat2])[val_col].sum().unstack().fillna(0)
# Determinar layout da grade
n_groups = len(grouped)
if n_groups == 0:
logging.warning("[DEBUG] Não há grupos para múltiplos gráficos de pizza")
return None
cols = min(3, n_groups) # Máximo 3 colunas
rows = (n_groups + cols - 1) // cols # Arredondar para cima
# Criar subplots
fig, axes = plt.subplots(rows, cols, figsize=(15, 5 * rows))
if rows == 1 and cols == 1:
axes = np.array([axes]) # Garantir que axes seja um array
axes = axes.flatten()
# Plotar cada pizza
for i, (group_name, group_data) in enumerate(grouped.iterrows()):
if i < len(axes):
# Remover valores zero
data = group_data[group_data > 0]
if not data.empty:
# Calcular percentuais
total = data.sum()
# Criar rótulos com valores e percentuais
labels = [f'{idx} ({val:.2f}, {val/total:.1%})' for idx, val in data.items()]
# Plotar pizza
axes[i].pie(data, labels=labels, autopct='%1.1f%%',
startangle=90, colors=colors[:len(data)])
axes[i].set_title(f"{group_name}")
axes[i].axis('equal')
# Esconder eixos não utilizados
for j in range(i + 1, len(axes)):
axes[j].axis('off')
plt.suptitle(title or f"Distribuição de {val_col} por {cat2} para cada {cat1}", fontsize=16)
plt.tight_layout()
plt.subplots_adjust(top=0.9)
else:
logging.warning(f"[DEBUG] Coluna {val_col} não é numérica para múltiplos gráficos de pizza")
# Fallback para barras agrupadas
return generate_graph(df, 'bar_grouped', title, user_query)
else:
logging.warning("[DEBUG] Dados insuficientes para múltiplos gráficos de pizza")
# Fallback para pizza simples
return generate_graph(df, 'pie', title, user_query)
else:
# Tipo de gráfico não reconhecido, usar barras verticais como padrão
logging.warning(f"[DEBUG] Tipo de gráfico '{graph_type}' não reconhecido. Usando gráfico de barras verticais como padrão.")
return generate_graph(df, 'bar_vertical', title, user_query)
# Salvar o gráfico em um buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', dpi=100, bbox_inches='tight')
buf.seek(0)
# Converter para imagem PIL
img = Image.open(buf)
logging.info(f"[DEBUG] Gráfico gerado com sucesso, tamanho: {img.size}")
return img
except Exception as e:
logging.error(f"[ERRO] Falha ao gerar gráfico: {str(e)}")
import traceback
logging.error(traceback.format_exc())
# Tentar fallback para barras verticais em caso de erro
try:
logging.info("[DEBUG] Tentando fallback para gráfico de barras verticais")
return generate_graph(df, 'bar_vertical', title, user_query)
except:
return None
def query_sql_agent(user_query):
try:
# Verificar cache
if user_query in query_cache:
print(f"[CACHE] Retornando resposta do cache para a consulta: {user_query}")
return query_cache[user_query]
# Verificar se é uma saudação
if is_greeting(user_query):
greeting_response = "Olá! Estou aqui para ajudar com suas consultas sobre clínicas, agendamentos e taxas de faltas. Posso também gerar gráficos se você solicitar. Pergunte algo relacionado aos dados carregados!"
query_cache[user_query] = greeting_response
return greeting_response
# Obter amostra do banco de dados
column_data = pd.read_sql_query("SELECT * FROM base_dados_clinicas LIMIT 10", engine)
# Obter instrução do Llama
llama_instruction = query_with_llama(user_query, column_data)
if not llama_instruction:
return "Erro: O modelo Llama não conseguiu gerar uma instrução válida."
# Extrair a query SQL da resposta do Llama
sql_query = extract_sql_query(llama_instruction)
if not sql_query:
return "Erro: Não foi possível extrair uma query SQL válida da resposta do modelo."
print("------- Agent SQL: Executando query -------")
response = sql_agent.invoke({"input": llama_instruction})
sql_response = response.get("output", "Erro ao obter a resposta do agente.")
# Verificar se deve gerar gráfico com base na resposta da LLM
if should_generate_graph(llama_instruction):
try:
# Corrigir a query SQL se necessário (substituir "tabela" por "base_dados_clinicas")
if "tabela" in sql_query:
sql_query = sql_query.replace("tabela", "base_dados_clinicas")
logging.info(f"[DEBUG] Query corrigida: {sql_query}")
# Executar a query SQL para obter os dados
logging.info(f"[DEBUG] Executando query para gráfico: {sql_query}")
result_df = pd.read_sql_query(sql_query, engine)
logging.info(f"[DEBUG] Resultado da query: {len(result_df)} linhas, colunas: {result_df.columns.tolist()}")
if not result_df.empty:
# Consultar a LLM para determinar o tipo de gráfico adequado
graph_type = get_graph_type_with_llm(user_query, sql_query, result_df)
# Gerar o gráfico
graph_title = f"Visualização para: {user_query}"
graph_img = generate_graph(result_df, graph_type, graph_title, user_query)
if graph_img:
# Salvar o gráfico temporariamente
graph_path = "temp_graph.png"
logging.info(f"[DEBUG] Salvando gráfico em: {graph_path}")
graph_img.save(graph_path)
# Verificar se o arquivo foi criado
if os.path.exists(graph_path):
logging.info(f"[DEBUG] Arquivo de gráfico criado com sucesso: {os.path.getsize(graph_path)} bytes")
else:
logging.error(f"[DEBUG] Falha ao criar arquivo de gráfico: {graph_path}")
# Adicionar informação sobre o gráfico na resposta
sql_response += f"\n\n[Gráfico gerado com base nos dados solicitados. O tipo de gráfico '{graph_type}' foi escolhido automaticamente com base na estrutura dos dados.]"
# Adicionar o caminho do gráfico à resposta (será processado pelo chatbot_response)
sql_response += f"\n\nGRAPH_PATH:{graph_path}"
else:
logging.error("[DEBUG] Falha ao gerar objeto de imagem do gráfico")
sql_response += "\n\nNão foi possível gerar um gráfico com os dados obtidos."
else:
logging.warning("[DEBUG] DataFrame vazio retornado pela query")
sql_response += "\n\nNão foi possível gerar um gráfico, pois a consulta não retornou dados."
except Exception as e:
logging.error(f"[ERRO] Falha ao gerar gráfico: {e}")
sql_response += f"\n\nOcorreu um erro ao tentar gerar o gráfico: {str(e)}"
# Refinar resposta se modo avançado estiver ativado
if advanced_mode_enabled:
sql_response = refine_response_with_llm(user_query, sql_response)
# Armazenar no cache
query_cache[user_query] = sql_response
return sql_response
except Exception as e:
logging.error(f"[ERRO] Falha ao consultar o agente SQL: {e}")
return f"Erro ao consultar o agente SQL: {e}"
def chatbot_response(user_input):
start_time = time.time()
response = query_sql_agent(user_input)
end_time = time.time()
# Verificar se a resposta contém um caminho de gráfico
graph_path = None
if "GRAPH_PATH:" in response:
parts = response.split("GRAPH_PATH:")
response = parts[0].strip()
graph_path = parts[1].strip()
logging.info(f"[DEBUG] Caminho do gráfico extraído: {graph_path}")
# Verificar se o arquivo existe
if os.path.exists(graph_path):
logging.info(f"[DEBUG] Arquivo de gráfico encontrado: {os.path.getsize(graph_path)} bytes")
else:
logging.error(f"[DEBUG] Arquivo de gráfico não encontrado: {graph_path}")
graph_path = None
history_log.append({
"Pergunta": user_input,
"Resposta": response,
"Tempo de Resposta (s)": round(end_time - start_time, 2),
"Gráfico": bool(graph_path)
})
recent_history.append({"role": "user", "content": user_input})
recent_history.append({"role": "assistant", "content": response})
if len(recent_history) > 4:
recent_history.pop(0)
recent_history.pop(0)
return response, graph_path
def toggle_history():
global show_history_flag
show_history_flag = not show_history_flag
return history_log if show_history_flag else {}
def toggle_advanced_mode(state):
global advanced_mode_enabled
advanced_mode_enabled = state
logging.info(f"[MODO AVANÇADO] {'Ativado' if state else 'Desativado'}")
return "Modo avançado ativado." if state else "Modo avançado desativado."
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# Assistente de Clínicas")
chatbot = gr.Chatbot(height=600, type="messages")
msg = gr.Textbox(placeholder="Digite sua pergunta aqui...", label=" ", lines=1)
with gr.Row():
btn = gr.Button("Enviar", variant="primary")
history_btn = gr.Button("Histórico", variant="secondary")
advanced_toggle = gr.Checkbox(label="MODO AVANÇADO", value=False)
graph_output = gr.Image(label="Visualização de Dados", visible=False)
def respond(message, chat_history):
response, graph_path = chatbot_response(message)
chat_history.append({"role": "user", "content": message})
chat_history.append({"role": "assistant", "content": response})
# Retornar o gráfico se existir
if graph_path and os.path.exists(graph_path):
logging.info(f"[DEBUG] Exibindo gráfico: {graph_path}")
return "", chat_history, gr.update(value=graph_path, visible=True)
else:
logging.info("[DEBUG] Nenhum gráfico para exibir")
return "", chat_history, gr.update(visible=False)
msg.submit(respond, [msg, chatbot], [msg, chatbot, graph_output])
btn.click(respond, [msg, chatbot], [msg, chatbot, graph_output])
advanced_toggle.change(toggle_advanced_mode, inputs=[advanced_toggle], outputs=[])
history_output = gr.JSON()
history_btn.click(toggle_history, inputs=[], outputs=history_output)
if __name__ == "__main__":
# Verificar permissões de diretório
current_dir = os.getcwd()
logging.info(f"[DEBUG] Diretório atual: {current_dir}")
logging.info(f"[DEBUG] Permissões de escrita: {os.access(current_dir, os.W_OK)}")
demo.launch(share=False)