Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.types import DateTime, Float, Integer, Boolean, String | |
| from langchain_openai import ChatOpenAI | |
| from langchain_community.agent_toolkits import create_sql_agent | |
| from langchain_community.utilities import SQLDatabase | |
| from huggingface_hub import InferenceClient | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| import logging | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import io | |
| import re | |
| from PIL import Image | |
| import matplotlib.dates as mdates | |
| load_dotenv() | |
| CSV_FILE_PATH = "clinicas.csv" | |
| SQL_DB_PATH = "clinicas.db" | |
| HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| LLAMA_MODEL = "meta-llama/Llama-3.3-70B-Instruct" | |
| hf_client = InferenceClient(provider="together", api_key=HUGGINGFACE_API_KEY) | |
| os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY | |
| query_cache = {} | |
| history_log = [] | |
| recent_history = [] | |
| show_history_flag = False | |
| advanced_mode_enabled = False | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Configurar matplotlib para suporte a caracteres especiais | |
| plt.rcParams['font.family'] = 'DejaVu Sans' | |
| def create_or_load_sql_database(csv_path, sql_db_path): | |
| if os.path.exists(sql_db_path): | |
| print("🟢 Banco de dados SQL já existe. Carregando...") | |
| return create_engine(f"sqlite:///{sql_db_path}") | |
| print("🔧 Banco de dados SQL não encontrado. Iniciando criação...") | |
| # --- ETAPA 1: DETECÇÃO DE ENCODING --- | |
| print("🔍 Detectando encoding...") | |
| possible_encodings = ['utf-8', 'latin1', 'cp1252'] | |
| encoding = None | |
| for enc in possible_encodings: | |
| try: | |
| with open(csv_path, 'r', encoding=enc) as f: | |
| f.read(1000) | |
| encoding = enc | |
| print(f"✅ Encoding detectado: {encoding}") | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if encoding is None: | |
| encoding = 'utf-8' | |
| print("⚠️ Não foi possível detectar encoding. Usando UTF-8 como fallback.") | |
| # --- ETAPA 2: DETECÇÃO DE DELIMITADOR --- | |
| print("🔍 Detectando delimitador...") | |
| detected_delimiter = None | |
| with open(csv_path, 'r', encoding=encoding, errors="replace") as f: | |
| sample = f.read(5000) | |
| try: | |
| sniffer = csv.Sniffer() | |
| dialect = sniffer.sniff(sample, delimiters=";,|\t:") | |
| detected_delimiter = dialect.delimiter | |
| print(f"✅ Delimitador detectado: '{detected_delimiter}'") | |
| except Exception: | |
| print("⚠️ Falha na detecção. Usando ';' como fallback.") | |
| detected_delimiter = ";" | |
| # --- ETAPA 3: TRANSFORMAÇÃO DE DELIMITADOR (se necessário) --- | |
| if detected_delimiter != ";": | |
| print(f"🔄 Convertendo CSV de delimitador '{detected_delimiter}' para ';'") | |
| with open(csv_path, 'r', encoding=encoding, errors="replace") as infile: | |
| reader = csv.reader(infile, delimiter=detected_delimiter) | |
| temp_fd, temp_path = tempfile.mkstemp(suffix=".csv") | |
| os.close(temp_fd) | |
| with open(temp_path, 'w', encoding='utf-8', newline='') as outfile: | |
| writer = csv.writer(outfile, delimiter=";") | |
| for row in reader: | |
| writer.writerow(row) | |
| csv_path = temp_path | |
| encoding = 'utf-8' | |
| print(f"✅ Novo CSV salvo temporariamente: {csv_path}") | |
| else: | |
| print("✅ Delimitador já é ';', não é necessária conversão.") | |
| # --- ETAPA 4: LEITURA DO CSV --- | |
| print("📥 Lendo CSV com delimitador padronizado ';'...") | |
| df = pd.read_csv( | |
| csv_path, | |
| sep=";", | |
| encoding=encoding, | |
| on_bad_lines="skip", | |
| low_memory=False, | |
| dtype=str | |
| ) | |
| # --- ETAPA 5: LIMPEZA E INFERÊNCIA DE TIPOS --- | |
| df.columns = ( | |
| df.columns.astype(str) | |
| .str.strip() | |
| .str.replace(r"[^\w\s]", "_", regex=True) | |
| .str.replace(r"\s+", "_", regex=True) | |
| .str.replace(r"_+", "_", regex=True) | |
| .str.strip("_") | |
| ) | |
| print("🧹 Limpando e inferindo tipos das colunas...") | |
| inferred_sql_types = {} | |
| for col in df.columns: | |
| series = df[col].astype(str).str.strip() | |
| if any(keyword in col.lower() for keyword in ["data", "agendamento", "dt_", "horario"]): | |
| try: | |
| dt = pd.to_datetime(series, errors="coerce", dayfirst=True) | |
| if dt.notna().mean() > 0.6: | |
| df[col] = dt | |
| inferred_sql_types[col] = DateTime() | |
| print(f"🕒 {col}: detectado como DateTime") | |
| continue | |
| except: | |
| pass | |
| if series.str.lower().isin(["true", "false", "1", "0"]).mean() > 0.9: | |
| df[col] = series.str.lower().map({"true": True, "false": False, "1": True, "0": False}) | |
| inferred_sql_types[col] = Boolean() | |
| print(f"🔘 {col}: detectado como Boolean") | |
| continue | |
| numeric_series = pd.to_numeric(series.str.replace(",", "."), errors="coerce") | |
| numeric_ratio = numeric_series.notna().mean() | |
| if numeric_ratio > 0.9: | |
| if (numeric_series.dropna() == numeric_series.dropna().astype(int)).all(): | |
| df[col] = numeric_series.astype("Int64") | |
| inferred_sql_types[col] = Integer() | |
| print(f"🔢 {col}: detectado como Integer") | |
| else: | |
| df[col] = numeric_series.astype("float64") | |
| inferred_sql_types[col] = Float() | |
| print(f"📐 {col}: detectado como Float") | |
| else: | |
| df[col] = series.astype("string") | |
| inferred_sql_types[col] = String() | |
| print(f"🔤 {col}: definido como String") | |
| print("📦 Gerando SQLite...") | |
| engine = create_engine(f"sqlite:///{sql_db_path}") | |
| df.to_sql("base_dados_clinicas", engine, index=False, if_exists="replace", dtype=inferred_sql_types) | |
| print(f"✅ Banco de dados criado com sucesso em '{sql_db_path}' com {len(df)} linhas e {len(df.columns)} colunas.") | |
| return engine | |
| engine = create_or_load_sql_database(CSV_FILE_PATH, SQL_DB_PATH) | |
| db = SQLDatabase(engine=engine) | |
| llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2) | |
| sql_agent = create_sql_agent(llm, db=db, agent_type="openai-tools", verbose=True, max_iterations=50, return_intermediate_steps=True) | |
| def generate_initial_context(db_sample): | |
| return ( | |
| f"Você é um assistente que gera queries SQL objetivas e eficientes. Sempre inclua LIMIT 20 nas queries. Aqui está o banco de dados:\n\n" | |
| f"Exemplos do banco de dados:\n{db_sample.head().to_string(index=False)}\n\n" | |
| "\n***IMPORTANTE***: Detecte automaticamente o idioma da pergunta do usuário e responda sempre no mesmo idioma." | |
| "\nEsta base contém dados de clínicas médicas e odontológicas, com informações sobre agendamentos, taxas de faltas e sugestões de overbooking.\n" | |
| "Cada linha representa um horário de agendamento em uma clínica para uma especialidade específica, incluindo recursos disponíveis, agendamentos confirmados e várias métricas relacionadas a faltas e overbooking.\n" | |
| "\nInformações importantes:\n" | |
| "- Use `LIKE '%<palavras-chave>%'` para buscas e comparações em colunas de texto.\n" | |
| "- Quando o usuário mencionar uma clínica, procure na coluna: `nome_estabelecimento`.\n" | |
| "- Quando o usuário mencionar uma especialidade, procure na coluna: `nome_especialidade`.\n" | |
| "- Para consultas relacionadas a datas, use a coluna `data_agendamento` no formato YYYY-MM-DD.\n" | |
| "- Para consultas relacionadas a horários, use a coluna `horario_agendamento` que contém valores inteiros (8, 9, 10, etc.).\n" | |
| "- As colunas com prefixo `taxa_media_falta` contêm percentuais de faltas em diferentes períodos.\n" | |
| "- As colunas com prefixo `sugestao_overbooking` contêm recomendações de overbooking em diferentes cenários.\n" | |
| "- Nunca altere ou abrevia a pergunta do usuário.\n" | |
| "- Se o usuário solicitar um gráfico, gere uma query SQL que retorne dados apropriados para visualização.\n" | |
| "- Você só pode incluir o gráfico se for solicitado explicitamente na pergunta do usuário. \n" | |
| "- Se não tiver a palavra gráfico na pergunta do usuário não é para gerar nenhum gráfico, jamais." | |
| "- Você está usando um banco de dados SQLite.\n" | |
| "- O nome da tabela é 'base_dados_clinicas', sempre inclua o nome correto nas Querys.\n" | |
| "\nRetorne apenas a pergunta e a query SQL mais eficiente para entregar ao agent SQL do LangChain para gerar uma resposta para a pergunta. O formato deve ser:\n" | |
| "\nPergunta: <pergunta do usuário>\n" | |
| "\nOpção de Query SQL:\n<query SQL>" | |
| "\nIdioma: <idioma>" | |
| "\nGráfico: <sim/não>" | |
| ) | |
| def generate_graph_type_context(user_query, sql_query, df_columns, df_sample): | |
| # Criar uma descrição dos dados para ajudar a LLM a entender melhor a estrutura | |
| data_description = "" | |
| if not df_sample.empty: | |
| # Verificar tipos de dados | |
| numeric_cols = df_sample.select_dtypes(include=['number']).columns.tolist() | |
| date_cols = [col for col in df_sample.columns if 'data' in col.lower() or df_sample[col].dtype == 'datetime64[ns]'] | |
| categorical_cols = df_sample.select_dtypes(include=['object']).columns.tolist() | |
| # Adicionar informações sobre os primeiros valores de cada coluna | |
| data_description = "\nAmostra dos dados:\n" | |
| data_description += df_sample.head(3).to_string() | |
| # Adicionar informações sobre os tipos de dados | |
| data_description += "\n\nTipos de colunas:" | |
| if numeric_cols: | |
| data_description += f"\n- Colunas numéricas: {', '.join(numeric_cols)}" | |
| if date_cols: | |
| data_description += f"\n- Colunas de data/tempo: {', '.join(date_cols)}" | |
| if categorical_cols: | |
| data_description += f"\n- Colunas categóricas: {', '.join(categorical_cols)}" | |
| return ( | |
| f"Você é um especialista em visualização de dados que escolhe o tipo de gráfico mais adequado para representar dados.\n\n" | |
| f"Pergunta do usuário: {user_query}\n\n" | |
| f"Query SQL gerada:\n{sql_query}\n\n" | |
| f"Colunas retornadas pela query: {', '.join(df_columns)}\n" | |
| f"{data_description}\n\n" | |
| "Escolha o tipo de gráfico mais adequado para visualizar esses dados. Considere os seguintes tipos de gráficos e suas aplicações:\n\n" | |
| "1. Linha Simples → Ideal para mostrar tendências ao longo do tempo ou sequências. Use quando tiver uma coluna de data/tempo/sequência e uma coluna numérica.\n" | |
| "2. Multilinhas → Ideal para comparar tendências de diferentes categorias ao longo do tempo. Use quando tiver uma coluna de data/tempo e múltiplas colunas numéricas, ou quando tiver uma coluna categórica que pode ser usada para agrupar os dados.\n" | |
| "3. Área → Similar ao gráfico de linha, mas com área preenchida abaixo da linha. Ideal para mostrar volume ao longo do tempo. Use quando tiver uma coluna de data/tempo e uma coluna numérica.\n" | |
| "4. Barras Verticais → Ideal para comparar valores entre diferentes categorias. Use quando tiver uma coluna categórica e uma coluna numérica.\n" | |
| "5. Barras Horizontais → Similar às barras verticais, mas melhor quando há muitas categorias ou nomes longos. Use quando tiver uma coluna categórica e uma coluna numérica.\n" | |
| "6. Barras Agrupadas → Ideal para comparar valores de múltiplas categorias. Use quando tiver uma coluna categórica e múltiplas colunas numéricas para comparação.\n" | |
| "7. Barras Empilhadas → Ideal para mostrar partes de um todo por categoria. Use quando tiver uma coluna categórica e múltiplas colunas numéricas que representam partes de um todo.\n" | |
| "8. Pizza Simples → Ideal para mostrar proporções de um todo. Use quando tiver uma coluna categórica e uma coluna numérica, com poucas categorias (máximo 7).\n" | |
| "9. Dona → Similar ao gráfico de pizza, mas com um espaço no centro. Melhor para visualizar proporções quando há muitas categorias.\n" | |
| "10. Pizzas Múltiplas → Ideal para comparar proporções entre diferentes grupos. Use quando tiver duas colunas categóricas e uma coluna numérica.\n\n" | |
| "Analise cuidadosamente a pergunta do usuário e os dados retornados. Escolha o tipo de gráfico que melhor representa a informação que o usuário está buscando.\n\n" | |
| "Responda apenas com o número do tipo de gráfico mais adequado (1-10). Não inclua explicações ou texto adicional." | |
| ) | |
| def is_greeting(user_query): | |
| greetings = ["olá", "oi", "bom dia", "boa tarde", "boa noite", "oi, tudo bem?"] | |
| return user_query.lower().strip() in greetings | |
| def query_with_llama(user_query, db_sample): | |
| initial_context = generate_initial_context(db_sample) | |
| formatted_history = "\n".join([ | |
| f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history[-2:] | |
| ]) | |
| full_prompt = f"{initial_context}\n\nHistórico recente:\n{formatted_history}\n\nPergunta do usuário:\n{user_query}" | |
| logging.info(f"[DEBUG] Contexto enviado ao Llama:\n{full_prompt}\n") | |
| start_time = time.time() | |
| try: | |
| response = hf_client.chat.completions.create( | |
| model=LLAMA_MODEL, | |
| messages=[{"role": "system", "content": full_prompt}], | |
| max_tokens=900, | |
| stream=False | |
| ) | |
| llama_response = response["choices"][0]["message"]["content"] | |
| end_time = time.time() | |
| logging.info(f"[DEBUG] Resposta do Llama para o Agent SQL:\n{llama_response.strip()}\n[Tempo de execução: {end_time - start_time:.2f}s]\n") | |
| return llama_response.strip() | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao interagir com o Llama: {e}") | |
| return None | |
| def get_graph_type_with_llm(user_query, sql_query, df): | |
| """Consulta a LLM para determinar o tipo de gráfico mais adequado.""" | |
| # Obter uma amostra dos dados para análise | |
| df_sample = df.head(3) | |
| graph_type_context = generate_graph_type_context(user_query, sql_query, df.columns.tolist(), df_sample) | |
| logging.info(f"[DEBUG] Contexto enviado ao Llama para tipo de gráfico:\n{graph_type_context}\n") | |
| try: | |
| response = hf_client.chat.completions.create( | |
| model=LLAMA_MODEL, | |
| messages=[{"role": "system", "content": graph_type_context}], | |
| max_tokens=10, | |
| stream=False | |
| ) | |
| llama_response = response["choices"][0]["message"]["content"].strip() | |
| logging.info(f"[DEBUG] Resposta do Llama para tipo de gráfico: {llama_response}") | |
| # Mapear a resposta numérica para o tipo de gráfico | |
| graph_type_map = { | |
| "1": "line_simple", | |
| "2": "multiline", | |
| "3": "area", | |
| "4": "bar_vertical", | |
| "5": "bar_horizontal", | |
| "6": "bar_grouped", | |
| "7": "bar_stacked", | |
| "8": "pie", | |
| "9": "donut", | |
| "10": "pie_multiple" | |
| } | |
| # Extrair apenas o número da resposta | |
| match = re.search(r"\b([1-9]|10)\b", llama_response) | |
| if match: | |
| graph_number = match.group(0) | |
| graph_type = graph_type_map.get(graph_number, "bar_vertical") # Default para bar_vertical se não encontrar | |
| logging.info(f"[DEBUG] Tipo de gráfico escolhido pela LLM: {graph_type} (número {graph_number})") | |
| return graph_type | |
| else: | |
| logging.warning("[DEBUG] Não foi possível extrair um número válido da resposta da LLM") | |
| return "bar_vertical" # Default para bar_vertical | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao consultar LLM para tipo de gráfico: {e}") | |
| return "bar_vertical" # Default para bar_vertical em caso de erro | |
| def refine_response_with_llm(user_question, sql_response): | |
| prompt = ( | |
| f"Pergunta do usuário:\n{user_question}\n\n" | |
| f"Resposta gerada pelo agente SQL:\n{sql_response}\n\n" | |
| "Sua tarefa é refinar, complementar e melhorar a resposta." | |
| ) | |
| logging.info(f"[DEBUG] Prompt enviado ao modelo de refinamento:\n{prompt}\n") | |
| try: | |
| response = hf_client.chat.completions.create( | |
| model=LLAMA_MODEL, | |
| messages=[{"role": "system", "content": prompt}], | |
| max_tokens=1200, | |
| stream=False | |
| ) | |
| improved_response = response["choices"][0]["message"]["content"] | |
| logging.info(f"[DEBUG] Resposta do modelo de refinamento:\n{improved_response}\n") | |
| return improved_response | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao refinar resposta com LLM: {e}") | |
| return sql_response | |
| def extract_sql_query(llama_response): | |
| """Extrai e sanitiza a query SQL da resposta do Llama.""" | |
| if "Opção de Query SQL:" in llama_response: | |
| parts = llama_response.split("Opção de Query SQL:") | |
| if len(parts) > 1: | |
| query_part = parts[1].strip() | |
| # Remove qualquer ocorrência de blocos markdown ```sql ou ``` | |
| query_part = re.sub(r"```sql", "", query_part, flags=re.IGNORECASE) | |
| query_part = re.sub(r"```", "", query_part) | |
| query_part = query_part.strip() | |
| # Remove textos adicionais se existirem | |
| if "Idioma:" in query_part: | |
| query_part = query_part.split("Idioma:")[0].strip() | |
| if "Gráfico:" in query_part: | |
| query_part = query_part.split("Gráfico:")[0].strip() | |
| # Remove ponto e vírgula no final | |
| if query_part.endswith(";"): | |
| query_part = query_part[:-1].strip() | |
| return query_part | |
| return None | |
| def should_generate_graph(llama_response): | |
| """Verifica se a resposta do Llama indica que um gráfico deve ser gerado.""" | |
| logging.info(f"[DEBUG] Verificando se deve gerar gráfico na resposta: {llama_response}") | |
| # Usar expressão regular para encontrar "Gráfico: sim" independente de maiúsculas/minúsculas | |
| match = re.search(r"Gr[áa]fico\s*:\s*[Ss][Ii][Mm]", llama_response) | |
| if match: | |
| logging.info(f"[DEBUG] Encontrou indicação de gráfico: {match.group(0)}") | |
| return True | |
| logging.info("[DEBUG] Não encontrou indicação de gráfico") | |
| return False | |
| def analyze_dataframe(df): | |
| """Analisa o DataFrame para determinar características importantes para visualização.""" | |
| analysis = { | |
| "num_rows": len(df), | |
| "num_cols": len(df.columns), | |
| "numeric_cols": [], | |
| "date_cols": [], | |
| "categorical_cols": [], | |
| "time_series": False, | |
| "multi_numeric": False, | |
| "has_categories": False | |
| } | |
| # Identificar tipos de colunas | |
| for col in df.columns: | |
| # Verificar se é coluna numérica | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| analysis["numeric_cols"].append(col) | |
| # Verificar se é coluna de data | |
| elif pd.api.types.is_datetime64_any_dtype(df[col]) or 'data' in col.lower(): | |
| analysis["date_cols"].append(col) | |
| analysis["time_series"] = True | |
| # Caso contrário, considerar categórica | |
| else: | |
| analysis["categorical_cols"].append(col) | |
| analysis["has_categories"] = True | |
| # Verificar se há múltiplas colunas numéricas | |
| if len(analysis["numeric_cols"]) > 1: | |
| analysis["multi_numeric"] = True | |
| # Tentar converter colunas de data que não foram detectadas automaticamente | |
| for col in analysis["categorical_cols"]: | |
| if 'data' in col.lower() or 'date' in col.lower(): | |
| try: | |
| # Tentar converter para datetime | |
| df[col] = pd.to_datetime(df[col], errors='coerce') | |
| if not df[col].isna().all(): # Se pelo menos um valor foi convertido com sucesso | |
| analysis["date_cols"].append(col) | |
| analysis["categorical_cols"].remove(col) | |
| analysis["time_series"] = True | |
| except: | |
| pass | |
| return analysis | |
| def prepare_data_for_graph(df, graph_type, user_query): | |
| """Prepara os dados para o gráfico, adaptando-os conforme necessário.""" | |
| logging.info(f"[DEBUG] Preparando dados para gráfico tipo {graph_type}") | |
| # Verificar se o DataFrame está vazio | |
| if df.empty: | |
| logging.warning("[DEBUG] DataFrame vazio, não é possível preparar dados") | |
| return df | |
| # Fazer uma cópia para não modificar o original | |
| prepared_df = df.copy() | |
| # Analisar o DataFrame | |
| analysis = analyze_dataframe(prepared_df) | |
| logging.info(f"[DEBUG] Análise do DataFrame: {analysis}") | |
| # Converter colunas de data para datetime se existirem | |
| for col in prepared_df.columns: | |
| if col in analysis["date_cols"] and not pd.api.types.is_datetime64_any_dtype(prepared_df[col]): | |
| try: | |
| prepared_df[col] = pd.to_datetime(prepared_df[col]) | |
| logging.info(f"[DEBUG] Convertida coluna {col} para datetime") | |
| except: | |
| logging.warning(f"[DEBUG] Não foi possível converter coluna {col} para datetime") | |
| # Preparação específica para cada tipo de gráfico | |
| if graph_type == 'line_simple': | |
| # Para gráfico de linha simples, precisamos de uma coluna para o eixo x (preferencialmente data) e uma coluna numérica | |
| if analysis["time_series"] and len(analysis["numeric_cols"]) > 0: | |
| # Usar a primeira coluna de data como x e a primeira coluna numérica como y | |
| x_col = analysis["date_cols"][0] | |
| y_col = analysis["numeric_cols"][0] | |
| # Ordenar por data | |
| prepared_df = prepared_df.sort_values(by=x_col) | |
| logging.info(f"[DEBUG] Dados ordenados pela coluna de data {x_col}") | |
| # Selecionar apenas as colunas necessárias | |
| prepared_df = prepared_df[[x_col, y_col]] | |
| elif len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0: | |
| # Se não tiver data, usar a primeira coluna categórica como x e a primeira numérica como y | |
| x_col = analysis["categorical_cols"][0] | |
| y_col = analysis["numeric_cols"][0] | |
| # Ordenar por valor numérico para melhor visualização | |
| prepared_df = prepared_df.sort_values(by=y_col) | |
| logging.info(f"[DEBUG] Dados ordenados pela coluna numérica {y_col}") | |
| # Selecionar apenas as colunas necessárias | |
| prepared_df = prepared_df[[x_col, y_col]] | |
| elif graph_type == 'multiline': | |
| # Para gráfico de multilinhas, precisamos de uma coluna para o eixo x e múltiplas colunas numéricas | |
| if analysis["time_series"] and analysis["multi_numeric"]: | |
| # Usar a primeira coluna de data como x e todas as colunas numéricas | |
| x_col = analysis["date_cols"][0] | |
| # Ordenar por data | |
| prepared_df = prepared_df.sort_values(by=x_col) | |
| logging.info(f"[DEBUG] Dados ordenados pela coluna de data {x_col}") | |
| # Selecionar apenas as colunas necessárias (data + todas numéricas) | |
| cols_to_keep = [x_col] + analysis["numeric_cols"] | |
| prepared_df = prepared_df[cols_to_keep] | |
| elif len(analysis["categorical_cols"]) > 0 and analysis["multi_numeric"]: | |
| # Se não tiver data, usar a primeira coluna categórica como x e todas as numéricas | |
| x_col = analysis["categorical_cols"][0] | |
| # Selecionar apenas as colunas necessárias (categórica + todas numéricas) | |
| cols_to_keep = [x_col] + analysis["numeric_cols"] | |
| prepared_df = prepared_df[cols_to_keep] | |
| elif len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0: | |
| # Se tiver duas colunas categóricas, podemos usar uma para agrupar | |
| cat1 = analysis["categorical_cols"][0] | |
| cat2 = analysis["categorical_cols"][1] | |
| val_col = analysis["numeric_cols"][0] | |
| # Criar um pivot para multilinhas | |
| try: | |
| pivot_df = prepared_df.pivot_table(index=cat1, columns=cat2, values=val_col, aggfunc='mean') | |
| prepared_df = pivot_df.reset_index() | |
| logging.info(f"[DEBUG] Criado pivot com índice={cat1}, colunas={cat2}, valores={val_col}") | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao criar pivot para multilinhas: {e}") | |
| elif graph_type == 'area': | |
| # Similar ao gráfico de linha simples | |
| if analysis["time_series"] and len(analysis["numeric_cols"]) > 0: | |
| x_col = analysis["date_cols"][0] | |
| y_col = analysis["numeric_cols"][0] | |
| # Ordenar por data | |
| prepared_df = prepared_df.sort_values(by=x_col) | |
| # Selecionar apenas as colunas necessárias | |
| prepared_df = prepared_df[[x_col, y_col]] | |
| elif len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0: | |
| x_col = analysis["categorical_cols"][0] | |
| y_col = analysis["numeric_cols"][0] | |
| # Ordenar por valor numérico | |
| prepared_df = prepared_df.sort_values(by=y_col) | |
| # Selecionar apenas as colunas necessárias | |
| prepared_df = prepared_df[[x_col, y_col]] | |
| elif graph_type in ['bar_vertical', 'bar_horizontal']: | |
| # Para gráficos de barras, precisamos de uma coluna categórica e uma numérica | |
| if len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0: | |
| x_col = analysis["categorical_cols"][0] | |
| y_col = analysis["numeric_cols"][0] | |
| # Ordenar por valor numérico em ordem decrescente | |
| prepared_df = prepared_df.sort_values(by=y_col, ascending=False) | |
| logging.info(f"[DEBUG] Dados ordenados pela coluna {y_col} em ordem decrescente") | |
| # Limitar o número de categorias se for muito grande | |
| if len(prepared_df) > 15 and graph_type == 'bar_vertical': | |
| logging.info(f"[DEBUG] Limitando dados para gráfico de barras verticais (de {len(prepared_df)} para 15 linhas)") | |
| prepared_df = prepared_df.head(15) | |
| # Selecionar apenas as colunas necessárias | |
| prepared_df = prepared_df[[x_col, y_col]] | |
| elif graph_type == 'bar_grouped': | |
| # Para barras agrupadas, precisamos de uma coluna categórica e múltiplas numéricas | |
| if len(analysis["categorical_cols"]) > 0 and analysis["multi_numeric"]: | |
| x_col = analysis["categorical_cols"][0] | |
| # Limitar o número de categorias | |
| if len(prepared_df) > 10: | |
| logging.info(f"[DEBUG] Limitando dados para gráfico de barras agrupadas (de {len(prepared_df)} para 10 linhas)") | |
| prepared_df = prepared_df.head(10) | |
| # Selecionar apenas as colunas necessárias (categórica + todas numéricas) | |
| cols_to_keep = [x_col] + analysis["numeric_cols"] | |
| prepared_df = prepared_df[cols_to_keep] | |
| elif len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0: | |
| # Se tiver duas colunas categóricas, podemos usar uma para agrupar | |
| cat1 = analysis["categorical_cols"][0] | |
| cat2 = analysis["categorical_cols"][1] | |
| val_col = analysis["numeric_cols"][0] | |
| # Limitar o número de categorias | |
| unique_cat1 = prepared_df[cat1].nunique() | |
| unique_cat2 = prepared_df[cat2].nunique() | |
| if unique_cat1 > 10 or unique_cat2 > 10: | |
| logging.info(f"[DEBUG] Muitas categorias para barras agrupadas, limitando dados") | |
| # Pegar as categorias mais frequentes | |
| top_cat1 = prepared_df[cat1].value_counts().head(10).index.tolist() | |
| top_cat2 = prepared_df[cat2].value_counts().head(10).index.tolist() | |
| prepared_df = prepared_df[ | |
| prepared_df[cat1].isin(top_cat1) & | |
| prepared_df[cat2].isin(top_cat2) | |
| ] | |
| # Criar um pivot para barras agrupadas | |
| try: | |
| pivot_df = prepared_df.pivot_table(index=cat1, columns=cat2, values=val_col, aggfunc='mean') | |
| prepared_df = pivot_df.reset_index() | |
| logging.info(f"[DEBUG] Criado pivot com índice={cat1}, colunas={cat2}, valores={val_col}") | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao criar pivot para barras agrupadas: {e}") | |
| elif graph_type == 'bar_stacked': | |
| # Similar ao bar_grouped, mas para mostrar partes de um todo | |
| if len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0: | |
| cat1 = analysis["categorical_cols"][0] | |
| cat2 = analysis["categorical_cols"][1] | |
| val_col = analysis["numeric_cols"][0] | |
| # Limitar o número de categorias | |
| unique_cat1 = prepared_df[cat1].nunique() | |
| unique_cat2 = prepared_df[cat2].nunique() | |
| if unique_cat1 > 10 or unique_cat2 > 10: | |
| logging.info(f"[DEBUG] Muitas categorias para barras empilhadas, limitando dados") | |
| # Pegar as categorias mais frequentes | |
| top_cat1 = prepared_df[cat1].value_counts().head(10).index.tolist() | |
| top_cat2 = prepared_df[cat2].value_counts().head(10).index.tolist() | |
| prepared_df = prepared_df[ | |
| prepared_df[cat1].isin(top_cat1) & | |
| prepared_df[cat2].isin(top_cat2) | |
| ] | |
| # Criar um pivot para barras empilhadas | |
| try: | |
| pivot_df = prepared_df.pivot_table(index=cat1, columns=cat2, values=val_col, aggfunc='mean') | |
| prepared_df = pivot_df.reset_index() | |
| logging.info(f"[DEBUG] Criado pivot com índice={cat1}, colunas={cat2}, valores={val_col}") | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao criar pivot para barras empilhadas: {e}") | |
| elif len(analysis["categorical_cols"]) > 0 and analysis["multi_numeric"]: | |
| # Se tiver uma coluna categórica e múltiplas numéricas | |
| x_col = analysis["categorical_cols"][0] | |
| # Limitar o número de categorias | |
| if len(prepared_df) > 10: | |
| logging.info(f"[DEBUG] Limitando dados para gráfico de barras empilhadas (de {len(prepared_df)} para 10 linhas)") | |
| prepared_df = prepared_df.head(10) | |
| # Selecionar apenas as colunas necessárias (categórica + todas numéricas) | |
| cols_to_keep = [x_col] + analysis["numeric_cols"] | |
| prepared_df = prepared_df[cols_to_keep] | |
| elif graph_type in ['pie', 'donut']: | |
| # Para gráficos de pizza/donut, precisamos de uma coluna categórica e uma numérica | |
| if len(analysis["categorical_cols"]) > 0 and len(analysis["numeric_cols"]) > 0: | |
| cat_col = analysis["categorical_cols"][0] | |
| val_col = analysis["numeric_cols"][0] | |
| # Limitar o número de categorias para no máximo 10 | |
| if prepared_df[cat_col].nunique() > 10: | |
| logging.info(f"[DEBUG] Limitando categorias para gráfico de pizza/donut") | |
| # Agrupar por categoria e somar valores | |
| grouped = prepared_df.groupby(cat_col)[val_col].sum().reset_index() | |
| # Ordenar por valor e pegar as 9 maiores categorias | |
| grouped = grouped.sort_values(by=val_col, ascending=False) | |
| top_9 = grouped.head(9) | |
| # Agrupar o resto como "Outros" | |
| if len(grouped) > 9: | |
| others_sum = grouped.iloc[9:][val_col].sum() | |
| others_row = pd.DataFrame({cat_col: ['Outros'], val_col: [others_sum]}) | |
| prepared_df = pd.concat([top_9, others_row], ignore_index=True) | |
| logging.info(f"[DEBUG] Criada categoria 'Outros' com valor {others_sum}") | |
| else: | |
| prepared_df = top_9 | |
| else: | |
| # Agrupar por categoria e somar valores | |
| prepared_df = prepared_df.groupby(cat_col)[val_col].sum().reset_index() | |
| # Ordenar por valor em ordem decrescente | |
| prepared_df = prepared_df.sort_values(by=val_col, ascending=False) | |
| # Selecionar apenas as colunas necessárias | |
| prepared_df = prepared_df[[cat_col, val_col]] | |
| elif graph_type == 'pie_multiple': | |
| # Para múltiplos gráficos de pizza, precisamos de duas colunas categóricas e uma numérica | |
| if len(analysis["categorical_cols"]) >= 2 and len(analysis["numeric_cols"]) > 0: | |
| cat1 = analysis["categorical_cols"][0] | |
| cat2 = analysis["categorical_cols"][1] | |
| val_col = analysis["numeric_cols"][0] | |
| # Limitar o número de categorias | |
| unique_cat1 = prepared_df[cat1].nunique() | |
| unique_cat2 = prepared_df[cat2].nunique() | |
| if unique_cat1 > 6 or unique_cat2 > 10: | |
| logging.info(f"[DEBUG] Muitas categorias para múltiplos gráficos de pizza, limitando dados") | |
| # Pegar as categorias mais frequentes | |
| top_cat1 = prepared_df[cat1].value_counts().head(6).index.tolist() | |
| top_cat2 = prepared_df[cat2].value_counts().head(10).index.tolist() | |
| prepared_df = prepared_df[ | |
| prepared_df[cat1].isin(top_cat1) & | |
| prepared_df[cat2].isin(top_cat2) | |
| ] | |
| # Agrupar e somar valores | |
| prepared_df = prepared_df.groupby([cat1, cat2])[val_col].sum().reset_index() | |
| # Verificar se há palavras-chave na pergunta do usuário que possam ajudar a identificar colunas importantes | |
| if user_query: | |
| user_query_lower = user_query.lower() | |
| # Procurar por colunas mencionadas na pergunta | |
| for col in df.columns: | |
| col_lower = col.lower() | |
| # Verificar se o nome da coluna (ou parte dele) está na pergunta | |
| if col_lower in user_query_lower or any(part in user_query_lower for part in col_lower.split('_')): | |
| logging.info(f"[DEBUG] Coluna {col} mencionada na pergunta do usuário") | |
| # Se for uma coluna numérica e o gráfico for de barras, linha ou área, priorizar esta coluna | |
| if col in analysis["numeric_cols"] and graph_type in ['bar_vertical', 'bar_horizontal', 'line_simple', 'area']: | |
| if len(analysis["categorical_cols"]) > 0: | |
| cat_col = analysis["categorical_cols"][0] | |
| prepared_df = df[[cat_col, col]].copy() | |
| prepared_df = prepared_df.sort_values(by=col, ascending=False) | |
| logging.info(f"[DEBUG] Priorizando coluna {col} mencionada na pergunta") | |
| break | |
| logging.info(f"[DEBUG] Dados preparados: {len(prepared_df)} linhas, colunas: {prepared_df.columns.tolist()}") | |
| return prepared_df | |
| def generate_graph(df, graph_type, title=None, user_query=None): | |
| """Gera um gráfico com base no DataFrame e tipo especificado.""" | |
| logging.info(f"[DEBUG] Iniciando geração de gráfico tipo {graph_type}. DataFrame vazio? {df.empty}") | |
| if df.empty: | |
| logging.info("[DEBUG] DataFrame vazio, não é possível gerar gráfico") | |
| return None | |
| # Preparar dados para o gráfico | |
| prepared_df = prepare_data_for_graph(df, graph_type, user_query) | |
| if prepared_df.empty: | |
| logging.info("[DEBUG] DataFrame preparado está vazio, não é possível gerar gráfico") | |
| return None | |
| try: | |
| # Analisar o DataFrame preparado | |
| analysis = analyze_dataframe(prepared_df) | |
| # Configurações gerais para todos os gráficos | |
| plt.figure(figsize=(12, 8)) | |
| plt.title(title or "Visualização de Dados", fontsize=14) | |
| # Definir cores atraentes | |
| colors = plt.cm.tab10.colors | |
| # Gerar gráfico com base no tipo | |
| if graph_type == 'line_simple': | |
| if len(prepared_df.columns) >= 2: | |
| x_col = prepared_df.columns[0] | |
| y_col = prepared_df.columns[1] | |
| logging.info(f"[DEBUG] Criando gráfico de linha simples com x={x_col}, y={y_col}") | |
| # Verificar se x é data | |
| is_date = pd.api.types.is_datetime64_any_dtype(prepared_df[x_col]) | |
| # Criar gráfico de linha | |
| plt.figure(figsize=(12, 6)) | |
| if is_date: | |
| # Formatar eixo x para datas | |
| plt.plot(prepared_df[x_col], prepared_df[y_col], marker='o', linewidth=2, color=colors[0]) | |
| plt.gcf().autofmt_xdate() | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y')) | |
| else: | |
| plt.plot(range(len(prepared_df)), prepared_df[y_col], marker='o', linewidth=2, color=colors[0]) | |
| plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right') | |
| # Adicionar rótulos nos pontos | |
| for i, y in enumerate(prepared_df[y_col]): | |
| if isinstance(y, (int, float)): | |
| plt.annotate(f'{y:.2f}', (i, y), textcoords="offset points", | |
| xytext=(0, 5), ha='center', fontsize=9) | |
| plt.xlabel(x_col) | |
| plt.ylabel(y_col) | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de linha simples") | |
| return None | |
| elif graph_type == 'multiline': | |
| # Verificar se temos um DataFrame pivotado (primeira coluna categórica, resto numéricas) | |
| if len(prepared_df.columns) >= 2: | |
| x_col = prepared_df.columns[0] | |
| # Verificar se x é data | |
| is_date = pd.api.types.is_datetime64_any_dtype(prepared_df[x_col]) | |
| # Obter colunas numéricas (todas exceto a primeira) | |
| y_cols = [col for col in prepared_df.columns[1:] if pd.api.types.is_numeric_dtype(prepared_df[col])] | |
| if y_cols: | |
| logging.info(f"[DEBUG] Criando gráfico de multilinhas com x={x_col}, y={y_cols}") | |
| plt.figure(figsize=(12, 6)) | |
| # Plotar cada linha | |
| for i, y_col in enumerate(y_cols): | |
| if is_date: | |
| plt.plot(prepared_df[x_col], prepared_df[y_col], marker='o', linewidth=2, | |
| label=y_col, color=colors[i % len(colors)]) | |
| else: | |
| plt.plot(range(len(prepared_df)), prepared_df[y_col], marker='o', linewidth=2, | |
| label=y_col, color=colors[i % len(colors)]) | |
| if is_date: | |
| plt.gcf().autofmt_xdate() | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y')) | |
| else: | |
| plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right') | |
| plt.xlabel(x_col) | |
| plt.ylabel("Valores") | |
| plt.legend(title="Séries", loc='best') | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Não há colunas numéricas para gráfico de multilinhas") | |
| # Fallback para linha simples | |
| return generate_graph(df, 'line_simple', title, user_query) | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de multilinhas") | |
| # Fallback para linha simples | |
| return generate_graph(df, 'line_simple', title, user_query) | |
| elif graph_type == 'area': | |
| if len(prepared_df.columns) >= 2: | |
| x_col = prepared_df.columns[0] | |
| y_col = prepared_df.columns[1] | |
| logging.info(f"[DEBUG] Criando gráfico de área com x={x_col}, y={y_col}") | |
| # Verificar se x é data | |
| is_date = pd.api.types.is_datetime64_any_dtype(prepared_df[x_col]) | |
| plt.figure(figsize=(12, 6)) | |
| if is_date: | |
| # Formatar eixo x para datas | |
| plt.fill_between(prepared_df[x_col], prepared_df[y_col], alpha=0.5, color=colors[0]) | |
| plt.plot(prepared_df[x_col], prepared_df[y_col], color=colors[0], linewidth=2) | |
| plt.gcf().autofmt_xdate() | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y')) | |
| else: | |
| plt.fill_between(range(len(prepared_df)), prepared_df[y_col], alpha=0.5, color=colors[0]) | |
| plt.plot(range(len(prepared_df)), prepared_df[y_col], color=colors[0], linewidth=2) | |
| plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right') | |
| plt.xlabel(x_col) | |
| plt.ylabel(y_col) | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de área") | |
| # Fallback para linha simples | |
| return generate_graph(df, 'line_simple', title, user_query) | |
| elif graph_type == 'bar_vertical': | |
| if len(prepared_df.columns) >= 2: | |
| x_col = prepared_df.columns[0] | |
| y_col = prepared_df.columns[1] | |
| logging.info(f"[DEBUG] Criando gráfico de barras verticais com x={x_col}, y={y_col}") | |
| plt.figure(figsize=(12, 8)) | |
| bars = plt.bar(range(len(prepared_df)), prepared_df[y_col], color=colors[0]) | |
| # Adicionar valores nas barras | |
| for i, bar in enumerate(bars): | |
| height = bar.get_height() | |
| if isinstance(height, (int, float)): | |
| plt.text(bar.get_x() + bar.get_width()/2., height + 0.02 * max(prepared_df[y_col]), | |
| f'{height:.2f}', ha='center', fontsize=9) | |
| plt.xlabel(x_col) | |
| plt.ylabel(y_col) | |
| plt.xticks(range(len(prepared_df)), prepared_df[x_col], rotation=45, ha='right') | |
| plt.grid(True, linestyle='--', alpha=0.7, axis='y') | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de barras verticais") | |
| return None | |
| elif graph_type == 'bar_horizontal': | |
| if len(prepared_df.columns) >= 2: | |
| x_col = prepared_df.columns[0] | |
| y_col = prepared_df.columns[1] | |
| logging.info(f"[DEBUG] Criando gráfico de barras horizontais com x={x_col}, y={y_col}") | |
| plt.figure(figsize=(12, max(6, len(prepared_df) * 0.4))) # Ajustar altura com base no número de categorias | |
| bars = plt.barh(range(len(prepared_df)), prepared_df[y_col], color=colors[0]) | |
| # Adicionar valores nas barras | |
| for i, bar in enumerate(bars): | |
| width = bar.get_width() | |
| if isinstance(width, (int, float)): | |
| plt.text(width + 0.02 * max(prepared_df[y_col]), bar.get_y() + bar.get_height()/2., | |
| f'{width:.2f}', va='center', fontsize=9) | |
| plt.xlabel(y_col) | |
| plt.ylabel(x_col) | |
| plt.yticks(range(len(prepared_df)), prepared_df[x_col]) | |
| plt.grid(True, linestyle='--', alpha=0.7, axis='x') | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de barras horizontais") | |
| return None | |
| elif graph_type == 'bar_grouped': | |
| # Verificar se temos um DataFrame pivotado (primeira coluna categórica, resto numéricas) | |
| if len(prepared_df.columns) >= 3: | |
| x_col = prepared_df.columns[0] | |
| # Obter colunas numéricas (todas exceto a primeira) | |
| y_cols = [col for col in prepared_df.columns[1:] if pd.api.types.is_numeric_dtype(prepared_df[col])] | |
| if y_cols: | |
| logging.info(f"[DEBUG] Criando gráfico de barras agrupadas com x={x_col}, valores={y_cols}") | |
| # Configurar largura e posições das barras | |
| x = np.arange(len(prepared_df)) | |
| width = 0.8 / len(y_cols) | |
| fig, ax = plt.subplots(figsize=(12, 8)) | |
| # Plotar cada grupo de barras | |
| for i, col in enumerate(y_cols): | |
| offset = width * i - width * (len(y_cols) - 1) / 2 | |
| bars = ax.bar(x + offset, prepared_df[col], width, label=col) | |
| # Adicionar valores nas barras | |
| for bar in bars: | |
| height = bar.get_height() | |
| if isinstance(height, (int, float)) and not np.isnan(height): | |
| ax.text(bar.get_x() + bar.get_width()/2., height + 0.02 * prepared_df[y_cols].max().max(), | |
| f'{height:.2f}', ha='center', fontsize=8) | |
| ax.set_xlabel(x_col) | |
| ax.set_ylabel('Valores') | |
| ax.set_xticks(x) | |
| ax.set_xticklabels(prepared_df[x_col], rotation=45, ha='right') | |
| ax.legend() | |
| ax.grid(True, linestyle='--', alpha=0.7, axis='y') | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Não há colunas numéricas suficientes para gráfico de barras agrupadas") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de barras agrupadas") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| elif graph_type == 'bar_stacked': | |
| # Verificar se temos um DataFrame pivotado (primeira coluna categórica, resto numéricas) | |
| if len(prepared_df.columns) >= 3: | |
| x_col = prepared_df.columns[0] | |
| # Obter colunas numéricas (todas exceto a primeira) | |
| y_cols = [col for col in prepared_df.columns[1:] if pd.api.types.is_numeric_dtype(prepared_df[col])] | |
| if y_cols: | |
| logging.info(f"[DEBUG] Criando gráfico de barras empilhadas com x={x_col}, valores={y_cols}") | |
| fig, ax = plt.subplots(figsize=(12, 8)) | |
| # Criar barras empilhadas | |
| bottom = np.zeros(len(prepared_df)) | |
| for i, col in enumerate(y_cols): | |
| bars = ax.bar(range(len(prepared_df)), prepared_df[col], bottom=bottom, label=col) | |
| # Adicionar valores nas barras | |
| for j, bar in enumerate(bars): | |
| height = bar.get_height() | |
| if isinstance(height, (int, float)) and height > 0: | |
| ax.text(bar.get_x() + bar.get_width()/2., bottom[j] + height/2, | |
| f'{height:.2f}', ha='center', va='center', fontsize=8, color='white') | |
| bottom += prepared_df[col].fillna(0) | |
| ax.set_xlabel(x_col) | |
| ax.set_ylabel('Valores') | |
| ax.set_xticks(range(len(prepared_df))) | |
| ax.set_xticklabels(prepared_df[x_col], rotation=45, ha='right') | |
| ax.legend() | |
| plt.tight_layout() | |
| else: | |
| logging.warning("[DEBUG] Não há colunas numéricas suficientes para gráfico de barras empilhadas") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de barras empilhadas") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| elif graph_type == 'pie': | |
| if len(prepared_df.columns) >= 2: | |
| label_col = prepared_df.columns[0] | |
| value_col = prepared_df.columns[1] | |
| logging.info(f"[DEBUG] Criando gráfico de pizza com labels={label_col}, valores={value_col}") | |
| # Verificar se os valores são numéricos | |
| if pd.api.types.is_numeric_dtype(prepared_df[value_col]): | |
| # Remover valores negativos ou zero | |
| prepared_df = prepared_df[prepared_df[value_col] > 0] | |
| if prepared_df.empty: | |
| logging.warning("[DEBUG] Não há valores positivos para gráfico de pizza") | |
| return None | |
| plt.figure(figsize=(10, 10)) | |
| # Calcular percentuais para os rótulos | |
| total = prepared_df[value_col].sum() | |
| labels = [f'{label} ({val:.2f}, {val/total:.1%})' for label, val in zip(prepared_df[label_col], prepared_df[value_col])] | |
| plt.pie(prepared_df[value_col], labels=labels, autopct='%1.1f%%', | |
| startangle=90, shadow=False, colors=colors[:len(prepared_df)]) | |
| plt.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle | |
| plt.title(title or f"Distribuição de {value_col} por {label_col}") | |
| plt.tight_layout() | |
| else: | |
| logging.warning(f"[DEBUG] Coluna {value_col} não é numérica para gráfico de pizza") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de pizza") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| elif graph_type == 'donut': | |
| if len(prepared_df.columns) >= 2: | |
| label_col = prepared_df.columns[0] | |
| value_col = prepared_df.columns[1] | |
| logging.info(f"[DEBUG] Criando gráfico de donut com labels={label_col}, valores={value_col}") | |
| # Verificar se os valores são numéricos | |
| if pd.api.types.is_numeric_dtype(prepared_df[value_col]): | |
| # Remover valores negativos ou zero | |
| prepared_df = prepared_df[prepared_df[value_col] > 0] | |
| if prepared_df.empty: | |
| logging.warning("[DEBUG] Não há valores positivos para gráfico de donut") | |
| return None | |
| plt.figure(figsize=(10, 10)) | |
| # Calcular percentuais para os rótulos | |
| total = prepared_df[value_col].sum() | |
| labels = [f'{label} ({val:.2f}, {val/total:.1%})' for label, val in zip(prepared_df[label_col], prepared_df[value_col])] | |
| # Criar gráfico de donut (pizza com círculo central) | |
| plt.pie(prepared_df[value_col], labels=labels, autopct='%1.1f%%', | |
| startangle=90, shadow=False, colors=colors[:len(prepared_df)], | |
| wedgeprops=dict(width=0.5)) # Largura do anel | |
| plt.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle | |
| plt.title(title or f"Distribuição de {value_col} por {label_col}") | |
| plt.tight_layout() | |
| else: | |
| logging.warning(f"[DEBUG] Coluna {value_col} não é numérica para gráfico de donut") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para gráfico de donut") | |
| # Fallback para barras verticais | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| elif graph_type == 'pie_multiple': | |
| # Verificar se temos dados agrupados por duas categorias | |
| if len(prepared_df.columns) >= 3: | |
| cat1 = prepared_df.columns[0] | |
| cat2 = prepared_df.columns[1] | |
| val_col = prepared_df.columns[2] | |
| logging.info(f"[DEBUG] Criando múltiplos gráficos de pizza com grupo={cat1}, categorias={cat2}, valor={val_col}") | |
| # Verificar se o valor é numérico | |
| if pd.api.types.is_numeric_dtype(prepared_df[val_col]): | |
| # Agrupar dados | |
| grouped = prepared_df.groupby([cat1, cat2])[val_col].sum().unstack().fillna(0) | |
| # Determinar layout da grade | |
| n_groups = len(grouped) | |
| if n_groups == 0: | |
| logging.warning("[DEBUG] Não há grupos para múltiplos gráficos de pizza") | |
| return None | |
| cols = min(3, n_groups) # Máximo 3 colunas | |
| rows = (n_groups + cols - 1) // cols # Arredondar para cima | |
| # Criar subplots | |
| fig, axes = plt.subplots(rows, cols, figsize=(15, 5 * rows)) | |
| if rows == 1 and cols == 1: | |
| axes = np.array([axes]) # Garantir que axes seja um array | |
| axes = axes.flatten() | |
| # Plotar cada pizza | |
| for i, (group_name, group_data) in enumerate(grouped.iterrows()): | |
| if i < len(axes): | |
| # Remover valores zero | |
| data = group_data[group_data > 0] | |
| if not data.empty: | |
| # Calcular percentuais | |
| total = data.sum() | |
| # Criar rótulos com valores e percentuais | |
| labels = [f'{idx} ({val:.2f}, {val/total:.1%})' for idx, val in data.items()] | |
| # Plotar pizza | |
| axes[i].pie(data, labels=labels, autopct='%1.1f%%', | |
| startangle=90, colors=colors[:len(data)]) | |
| axes[i].set_title(f"{group_name}") | |
| axes[i].axis('equal') | |
| # Esconder eixos não utilizados | |
| for j in range(i + 1, len(axes)): | |
| axes[j].axis('off') | |
| plt.suptitle(title or f"Distribuição de {val_col} por {cat2} para cada {cat1}", fontsize=16) | |
| plt.tight_layout() | |
| plt.subplots_adjust(top=0.9) | |
| else: | |
| logging.warning(f"[DEBUG] Coluna {val_col} não é numérica para múltiplos gráficos de pizza") | |
| # Fallback para barras agrupadas | |
| return generate_graph(df, 'bar_grouped', title, user_query) | |
| else: | |
| logging.warning("[DEBUG] Dados insuficientes para múltiplos gráficos de pizza") | |
| # Fallback para pizza simples | |
| return generate_graph(df, 'pie', title, user_query) | |
| else: | |
| # Tipo de gráfico não reconhecido, usar barras verticais como padrão | |
| logging.warning(f"[DEBUG] Tipo de gráfico '{graph_type}' não reconhecido. Usando gráfico de barras verticais como padrão.") | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| # Salvar o gráfico em um buffer | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=100, bbox_inches='tight') | |
| buf.seek(0) | |
| # Converter para imagem PIL | |
| img = Image.open(buf) | |
| logging.info(f"[DEBUG] Gráfico gerado com sucesso, tamanho: {img.size}") | |
| return img | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao gerar gráfico: {str(e)}") | |
| import traceback | |
| logging.error(traceback.format_exc()) | |
| # Tentar fallback para barras verticais em caso de erro | |
| try: | |
| logging.info("[DEBUG] Tentando fallback para gráfico de barras verticais") | |
| return generate_graph(df, 'bar_vertical', title, user_query) | |
| except: | |
| return None | |
| def query_sql_agent(user_query): | |
| try: | |
| # Verificar cache | |
| if user_query in query_cache: | |
| print(f"[CACHE] Retornando resposta do cache para a consulta: {user_query}") | |
| return query_cache[user_query] | |
| # Verificar se é uma saudação | |
| if is_greeting(user_query): | |
| greeting_response = "Olá! Estou aqui para ajudar com suas consultas sobre clínicas, agendamentos e taxas de faltas. Posso também gerar gráficos se você solicitar. Pergunte algo relacionado aos dados carregados!" | |
| query_cache[user_query] = greeting_response | |
| return greeting_response | |
| # Obter amostra do banco de dados | |
| column_data = pd.read_sql_query("SELECT * FROM base_dados_clinicas LIMIT 10", engine) | |
| # Obter instrução do Llama | |
| llama_instruction = query_with_llama(user_query, column_data) | |
| if not llama_instruction: | |
| return "Erro: O modelo Llama não conseguiu gerar uma instrução válida." | |
| # Extrair a query SQL da resposta do Llama | |
| sql_query = extract_sql_query(llama_instruction) | |
| if not sql_query: | |
| return "Erro: Não foi possível extrair uma query SQL válida da resposta do modelo." | |
| print("------- Agent SQL: Executando query -------") | |
| response = sql_agent.invoke({"input": llama_instruction}) | |
| sql_response = response.get("output", "Erro ao obter a resposta do agente.") | |
| # Verificar se deve gerar gráfico com base na resposta da LLM | |
| if should_generate_graph(llama_instruction): | |
| try: | |
| # Corrigir a query SQL se necessário (substituir "tabela" por "base_dados_clinicas") | |
| if "tabela" in sql_query: | |
| sql_query = sql_query.replace("tabela", "base_dados_clinicas") | |
| logging.info(f"[DEBUG] Query corrigida: {sql_query}") | |
| # Executar a query SQL para obter os dados | |
| logging.info(f"[DEBUG] Executando query para gráfico: {sql_query}") | |
| result_df = pd.read_sql_query(sql_query, engine) | |
| logging.info(f"[DEBUG] Resultado da query: {len(result_df)} linhas, colunas: {result_df.columns.tolist()}") | |
| if not result_df.empty: | |
| # Consultar a LLM para determinar o tipo de gráfico adequado | |
| graph_type = get_graph_type_with_llm(user_query, sql_query, result_df) | |
| # Gerar o gráfico | |
| graph_title = f"Visualização para: {user_query}" | |
| graph_img = generate_graph(result_df, graph_type, graph_title, user_query) | |
| if graph_img: | |
| # Salvar o gráfico temporariamente | |
| graph_path = "temp_graph.png" | |
| logging.info(f"[DEBUG] Salvando gráfico em: {graph_path}") | |
| graph_img.save(graph_path) | |
| # Verificar se o arquivo foi criado | |
| if os.path.exists(graph_path): | |
| logging.info(f"[DEBUG] Arquivo de gráfico criado com sucesso: {os.path.getsize(graph_path)} bytes") | |
| else: | |
| logging.error(f"[DEBUG] Falha ao criar arquivo de gráfico: {graph_path}") | |
| # Adicionar informação sobre o gráfico na resposta | |
| sql_response += f"\n\n[Gráfico gerado com base nos dados solicitados. O tipo de gráfico '{graph_type}' foi escolhido automaticamente com base na estrutura dos dados.]" | |
| # Adicionar o caminho do gráfico à resposta (será processado pelo chatbot_response) | |
| sql_response += f"\n\nGRAPH_PATH:{graph_path}" | |
| else: | |
| logging.error("[DEBUG] Falha ao gerar objeto de imagem do gráfico") | |
| sql_response += "\n\nNão foi possível gerar um gráfico com os dados obtidos." | |
| else: | |
| logging.warning("[DEBUG] DataFrame vazio retornado pela query") | |
| sql_response += "\n\nNão foi possível gerar um gráfico, pois a consulta não retornou dados." | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao gerar gráfico: {e}") | |
| sql_response += f"\n\nOcorreu um erro ao tentar gerar o gráfico: {str(e)}" | |
| # Refinar resposta se modo avançado estiver ativado | |
| if advanced_mode_enabled: | |
| sql_response = refine_response_with_llm(user_query, sql_response) | |
| # Armazenar no cache | |
| query_cache[user_query] = sql_response | |
| return sql_response | |
| except Exception as e: | |
| logging.error(f"[ERRO] Falha ao consultar o agente SQL: {e}") | |
| return f"Erro ao consultar o agente SQL: {e}" | |
| def chatbot_response(user_input): | |
| start_time = time.time() | |
| response = query_sql_agent(user_input) | |
| end_time = time.time() | |
| # Verificar se a resposta contém um caminho de gráfico | |
| graph_path = None | |
| if "GRAPH_PATH:" in response: | |
| parts = response.split("GRAPH_PATH:") | |
| response = parts[0].strip() | |
| graph_path = parts[1].strip() | |
| logging.info(f"[DEBUG] Caminho do gráfico extraído: {graph_path}") | |
| # Verificar se o arquivo existe | |
| if os.path.exists(graph_path): | |
| logging.info(f"[DEBUG] Arquivo de gráfico encontrado: {os.path.getsize(graph_path)} bytes") | |
| else: | |
| logging.error(f"[DEBUG] Arquivo de gráfico não encontrado: {graph_path}") | |
| graph_path = None | |
| history_log.append({ | |
| "Pergunta": user_input, | |
| "Resposta": response, | |
| "Tempo de Resposta (s)": round(end_time - start_time, 2), | |
| "Gráfico": bool(graph_path) | |
| }) | |
| recent_history.append({"role": "user", "content": user_input}) | |
| recent_history.append({"role": "assistant", "content": response}) | |
| if len(recent_history) > 4: | |
| recent_history.pop(0) | |
| recent_history.pop(0) | |
| return response, graph_path | |
| def toggle_history(): | |
| global show_history_flag | |
| show_history_flag = not show_history_flag | |
| return history_log if show_history_flag else {} | |
| def toggle_advanced_mode(state): | |
| global advanced_mode_enabled | |
| advanced_mode_enabled = state | |
| logging.info(f"[MODO AVANÇADO] {'Ativado' if state else 'Desativado'}") | |
| return "Modo avançado ativado." if state else "Modo avançado desativado." | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Assistente de Clínicas") | |
| chatbot = gr.Chatbot(height=600, type="messages") | |
| msg = gr.Textbox(placeholder="Digite sua pergunta aqui...", label=" ", lines=1) | |
| with gr.Row(): | |
| btn = gr.Button("Enviar", variant="primary") | |
| history_btn = gr.Button("Histórico", variant="secondary") | |
| advanced_toggle = gr.Checkbox(label="MODO AVANÇADO", value=False) | |
| graph_output = gr.Image(label="Visualização de Dados", visible=False) | |
| def respond(message, chat_history): | |
| response, graph_path = chatbot_response(message) | |
| chat_history.append({"role": "user", "content": message}) | |
| chat_history.append({"role": "assistant", "content": response}) | |
| # Retornar o gráfico se existir | |
| if graph_path and os.path.exists(graph_path): | |
| logging.info(f"[DEBUG] Exibindo gráfico: {graph_path}") | |
| return "", chat_history, gr.update(value=graph_path, visible=True) | |
| else: | |
| logging.info("[DEBUG] Nenhum gráfico para exibir") | |
| return "", chat_history, gr.update(visible=False) | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot, graph_output]) | |
| btn.click(respond, [msg, chatbot], [msg, chatbot, graph_output]) | |
| advanced_toggle.change(toggle_advanced_mode, inputs=[advanced_toggle], outputs=[]) | |
| history_output = gr.JSON() | |
| history_btn.click(toggle_history, inputs=[], outputs=history_output) | |
| if __name__ == "__main__": | |
| # Verificar permissões de diretório | |
| current_dir = os.getcwd() | |
| logging.info(f"[DEBUG] Diretório atual: {current_dir}") | |
| logging.info(f"[DEBUG] Permissões de escrita: {os.access(current_dir, os.W_OK)}") | |
| demo.launch(share=False) |