Spaces:
Sleeping
Sleeping
| """ | |
| Nó para geração de gráficos | |
| """ | |
| import io | |
| import logging | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import matplotlib.dates as mdates | |
| from PIL import Image | |
| from typing import Dict, Any, Optional | |
| from utils.object_manager import get_object_manager | |
| async def graph_generation_node(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Nó para geração de gráficos baseado no tipo selecionado | |
| Args: | |
| state: Estado atual do agente | |
| Returns: | |
| Estado atualizado com gráfico gerado | |
| """ | |
| try: | |
| logging.info("[GRAPH_GENERATION] Iniciando geração de gráfico") | |
| # Verifica se há tipo de gráfico selecionado | |
| graph_type = state.get("graph_type") | |
| if not graph_type: | |
| logging.info("[GRAPH_GENERATION] Nenhum tipo de gráfico selecionado, pulando geração") | |
| return state | |
| # Verifica se há erro anterior | |
| if state.get("graph_error"): | |
| logging.info("[GRAPH_GENERATION] Erro anterior detectado, pulando geração") | |
| return state | |
| # Recupera dados do gráfico | |
| graph_data = state.get("graph_data", {}) | |
| data_id = graph_data.get("data_id") | |
| if not data_id: | |
| error_msg = "ID dos dados do gráfico não encontrado" | |
| logging.error(f"[GRAPH_GENERATION] {error_msg}") | |
| state.update({ | |
| "graph_error": error_msg, | |
| "graph_generated": False | |
| }) | |
| return state | |
| # Recupera DataFrame dos dados | |
| obj_manager = get_object_manager() | |
| df = obj_manager.get_object(data_id) | |
| if df is None or df.empty: | |
| error_msg = "Dados do gráfico não encontrados ou vazios" | |
| logging.error(f"[GRAPH_GENERATION] {error_msg}") | |
| state.update({ | |
| "graph_error": error_msg, | |
| "graph_generated": False | |
| }) | |
| return state | |
| # Gera título do gráfico baseado na pergunta do usuário | |
| user_query = state.get("user_input", "") | |
| title = f"Visualização: {user_query[:50]}..." if len(user_query) > 50 else f"Visualização: {user_query}" | |
| # Gera o gráfico | |
| graph_image = await generate_graph(df, graph_type, title, user_query) | |
| if graph_image is None: | |
| error_msg = f"Falha ao gerar gráfico do tipo {graph_type}" | |
| logging.error(f"[GRAPH_GENERATION] {error_msg}") | |
| state.update({ | |
| "graph_error": error_msg, | |
| "graph_generated": False | |
| }) | |
| return state | |
| # Armazena imagem do gráfico no ObjectManager | |
| graph_image_id = obj_manager.store_object(graph_image, "graph_image") | |
| # Atualiza estado | |
| state.update({ | |
| "graph_image_id": graph_image_id, | |
| "graph_generated": True, | |
| "graph_error": None | |
| }) | |
| logging.info(f"[GRAPH_GENERATION] Gráfico gerado com sucesso: {graph_type}") | |
| except Exception as e: | |
| error_msg = f"Erro na geração de gráfico: {e}" | |
| logging.error(f"[GRAPH_GENERATION] {error_msg}") | |
| state.update({ | |
| "graph_error": error_msg, | |
| "graph_generated": False | |
| }) | |
| return state | |
| async def generate_graph(df: pd.DataFrame, graph_type: str, title: str = None, user_query: str = None) -> Optional[Image.Image]: | |
| """ | |
| Gera um gráfico com base no DataFrame e tipo especificado | |
| Args: | |
| df: DataFrame com os dados | |
| graph_type: Tipo de gráfico a ser gerado | |
| title: Título do gráfico | |
| user_query: Pergunta original do usuário | |
| Returns: | |
| Imagem PIL do gráfico ou None se falhar | |
| """ | |
| logging.info(f"[GRAPH_GENERATION] Gerando gráfico tipo {graph_type}. DataFrame: {len(df)} linhas") | |
| if df.empty: | |
| logging.warning("[GRAPH_GENERATION] DataFrame vazio") | |
| return None | |
| try: | |
| # Preparar dados usando lógica UNIFICADA | |
| prepared_df = prepare_data_for_graph_unified(df, graph_type, user_query) | |
| if prepared_df.empty: | |
| logging.warning("[GRAPH_GENERATION] DataFrame preparado está vazio") | |
| return None | |
| # Configurações gerais | |
| plt.style.use('default') | |
| colors = plt.cm.tab10.colors | |
| # Gerar gráfico baseado no tipo | |
| if graph_type == 'line_simple': | |
| return await generate_line_simple(prepared_df, title, colors) | |
| elif graph_type == 'multiline': | |
| return await generate_multiline(prepared_df, title, colors) | |
| elif graph_type == 'area': | |
| return await generate_area(prepared_df, title, colors) | |
| elif graph_type == 'bar_vertical': | |
| return await generate_bar_vertical(prepared_df, title, colors) | |
| elif graph_type == 'bar_horizontal': | |
| return await generate_bar_horizontal(prepared_df, title, colors) | |
| elif graph_type == 'bar_grouped': | |
| return await generate_bar_grouped(prepared_df, title, colors) | |
| elif graph_type == 'bar_stacked': | |
| return await generate_bar_stacked(prepared_df, title, colors) | |
| elif graph_type == 'pie': | |
| return await generate_pie(prepared_df, title, colors) | |
| elif graph_type == 'donut': | |
| return await generate_donut(prepared_df, title, colors) | |
| elif graph_type == 'pie_multiple': | |
| return await generate_pie_multiple(prepared_df, title, colors) | |
| else: | |
| logging.warning(f"[GRAPH_GENERATION] Tipo '{graph_type}' não reconhecido, usando bar_vertical") | |
| return await generate_bar_vertical(prepared_df, title, colors) | |
| except Exception as e: | |
| logging.error(f"[GRAPH_GENERATION] Erro ao gerar gráfico: {e}") | |
| return None | |
| def analyze_dataframe_structure(df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Analisa a estrutura do DataFrame e retorna informações detalhadas | |
| Args: | |
| df: DataFrame a ser analisado | |
| Returns: | |
| Dicionário com informações sobre tipos de colunas e estrutura | |
| """ | |
| if df.empty: | |
| return { | |
| 'numeric_cols': [], | |
| 'date_cols': [], | |
| 'categorical_cols': [], | |
| 'total_cols': 0, | |
| 'has_multiple_numerics': False, | |
| 'has_multiple_categoricals': False, | |
| 'is_suitable_for_grouping': False | |
| } | |
| # Analisar tipos de colunas de forma mais robusta | |
| numeric_cols = [] | |
| date_cols = [] | |
| categorical_cols = [] | |
| for col in df.columns: | |
| col_data = df[col] | |
| # Verificar se é numérico (incluindo strings que representam números) | |
| if pd.api.types.is_numeric_dtype(col_data): | |
| numeric_cols.append(col) | |
| elif col_data.dtype == 'object': | |
| # Tentar converter para numérico | |
| try: | |
| test_numeric = pd.to_numeric(col_data.astype(str).str.replace(',', '.'), errors='coerce') | |
| if test_numeric.notna().sum() > len(col_data) * 0.8: # 80% são números válidos | |
| numeric_cols.append(col) | |
| else: | |
| # Verificar se é data | |
| if any(date_indicator in col.lower() for date_indicator in ['data', 'date', 'time', 'dia', 'mes', 'ano']): | |
| try: | |
| pd.to_datetime(col_data.head(3), errors='raise') | |
| date_cols.append(col) | |
| except: | |
| categorical_cols.append(col) | |
| else: | |
| categorical_cols.append(col) | |
| except: | |
| categorical_cols.append(col) | |
| elif pd.api.types.is_datetime64_any_dtype(col_data): | |
| date_cols.append(col) | |
| else: | |
| categorical_cols.append(col) | |
| return { | |
| 'numeric_cols': numeric_cols, | |
| 'date_cols': date_cols, | |
| 'categorical_cols': categorical_cols, | |
| 'total_cols': len(df.columns), | |
| 'has_multiple_numerics': len(numeric_cols) >= 2, | |
| 'has_multiple_categoricals': len(categorical_cols) >= 2, | |
| 'is_suitable_for_grouping': len(categorical_cols) >= 2 or (len(categorical_cols) >= 1 and len(numeric_cols) >= 2) | |
| } | |
| def prepare_data_for_graph_unified(df: pd.DataFrame, graph_type: str, user_query: str = None) -> pd.DataFrame: | |
| """ | |
| FUNÇÃO UNIFICADA para preparação de dados - substitui lógica duplicada | |
| Args: | |
| df: DataFrame original | |
| graph_type: Tipo de gráfico | |
| user_query: Pergunta do usuário | |
| Returns: | |
| DataFrame preparado com colunas adequadas para o tipo de gráfico | |
| """ | |
| logging.info(f"[GRAPH_GENERATION] 🔧 Preparação UNIFICADA para {graph_type}") | |
| if df.empty: | |
| logging.warning("[GRAPH_GENERATION] DataFrame vazio") | |
| return df | |
| # Fazer cópia para não modificar original | |
| prepared_df = df.copy() | |
| # Analisar estrutura do DataFrame | |
| structure = analyze_dataframe_structure(prepared_df) | |
| numeric_cols = structure['numeric_cols'] | |
| date_cols = structure['date_cols'] | |
| categorical_cols = structure['categorical_cols'] | |
| logging.info(f"[GRAPH_GENERATION] 📊 Estrutura: {len(numeric_cols)} numéricas, {len(date_cols)} datas, {len(categorical_cols)} categóricas") | |
| # Preparação específica por tipo de gráfico | |
| if graph_type in ['line_simple', 'area']: | |
| return _prepare_for_temporal_graphs(prepared_df, date_cols, numeric_cols, categorical_cols) | |
| elif graph_type in ['bar_vertical', 'bar_horizontal']: | |
| return _prepare_for_simple_bar_graphs(prepared_df, categorical_cols, numeric_cols, graph_type) | |
| elif graph_type in ['bar_grouped', 'bar_stacked']: | |
| return _prepare_for_grouped_graphs(prepared_df, structure, graph_type) | |
| elif graph_type in ['pie', 'donut', 'pie_multiple']: | |
| return _prepare_for_pie_graphs(prepared_df, categorical_cols, numeric_cols, graph_type) | |
| elif graph_type == 'multiline': | |
| return _prepare_for_multiline_graphs(prepared_df, structure) | |
| else: | |
| logging.warning(f"[GRAPH_GENERATION] Tipo {graph_type} não reconhecido, usando preparação básica") | |
| return _prepare_basic_fallback(prepared_df, categorical_cols, numeric_cols) | |
| def _prepare_for_temporal_graphs(df: pd.DataFrame, date_cols: list, numeric_cols: list, categorical_cols: list) -> pd.DataFrame: | |
| """Prepara dados para gráficos temporais (linha, área)""" | |
| if date_cols and numeric_cols: | |
| # Usar primeira coluna de data e primeira numérica | |
| x_col, y_col = date_cols[0], numeric_cols[0] | |
| result_df = df[[x_col, y_col]].sort_values(by=x_col) | |
| logging.info(f"[GRAPH_GENERATION] 📅 Temporal: {x_col} (data) + {y_col} (numérica)") | |
| return result_df | |
| elif categorical_cols and numeric_cols: | |
| # Usar primeira categórica e primeira numérica | |
| x_col, y_col = categorical_cols[0], numeric_cols[0] | |
| result_df = df[[x_col, y_col]].sort_values(by=y_col) | |
| logging.info(f"[GRAPH_GENERATION] 📊 Categórico: {x_col} + {y_col}") | |
| return result_df | |
| else: | |
| logging.warning("[GRAPH_GENERATION] Dados insuficientes para gráfico temporal") | |
| return df | |
| def _prepare_for_simple_bar_graphs(df: pd.DataFrame, categorical_cols: list, numeric_cols: list, graph_type: str) -> pd.DataFrame: | |
| """Prepara dados para gráficos de barras simples""" | |
| if categorical_cols and numeric_cols: | |
| x_col, y_col = categorical_cols[0], numeric_cols[0] | |
| result_df = df[[x_col, y_col]].sort_values(by=y_col, ascending=False) | |
| # Limitar categorias para barras verticais | |
| if graph_type == 'bar_vertical' and len(result_df) > 15: | |
| result_df = result_df.head(15) | |
| logging.info(f"[GRAPH_GENERATION] 📊 Limitado a 15 categorias para {graph_type}") | |
| logging.info(f"[GRAPH_GENERATION] 📊 Barras simples: {x_col} + {y_col}") | |
| return result_df | |
| else: | |
| logging.warning("[GRAPH_GENERATION] Dados insuficientes para gráfico de barras") | |
| return df | |
| def _prepare_for_grouped_graphs(df: pd.DataFrame, structure: dict, graph_type: str) -> pd.DataFrame: | |
| """ | |
| FUNÇÃO CRÍTICA: Prepara dados para gráficos agrupados com lógica inteligente | |
| """ | |
| numeric_cols = structure['numeric_cols'] | |
| categorical_cols = structure['categorical_cols'] | |
| has_multiple_numerics = structure['has_multiple_numerics'] | |
| has_multiple_categoricals = structure['has_multiple_categoricals'] | |
| logging.info(f"[GRAPH_GENERATION] 🎯 Preparando agrupado: {len(numeric_cols)} num, {len(categorical_cols)} cat") | |
| if has_multiple_numerics: | |
| # CENÁRIO 1: Múltiplas numéricas - usar primeira categórica + todas numéricas | |
| cols_to_keep = [categorical_cols[0]] + numeric_cols | |
| result_df = df[cols_to_keep] | |
| logging.info(f"[GRAPH_GENERATION] ✅ Múltiplas numéricas: {cols_to_keep}") | |
| return result_df | |
| elif len(numeric_cols) == 1 and has_multiple_categoricals: | |
| # CENÁRIO 2: 1 numérica + múltiplas categóricas - AGRUPAMENTO POR COR | |
| # Usar TODAS as categóricas + a numérica | |
| cols_to_keep = categorical_cols + numeric_cols | |
| result_df = df[cols_to_keep] | |
| logging.info(f"[GRAPH_GENERATION] ✅ Agrupamento por cor: {cols_to_keep}") | |
| return result_df | |
| elif len(numeric_cols) == 1 and len(categorical_cols) == 1: | |
| # CENÁRIO 3: 1 numérica + 1 categórica - gráfico simples | |
| cols_to_keep = categorical_cols + numeric_cols | |
| result_df = df[cols_to_keep] | |
| logging.info(f"[GRAPH_GENERATION] ⚠️ Dados simples para agrupado: {cols_to_keep}") | |
| return result_df | |
| else: | |
| # CENÁRIO 4: Dados inadequados | |
| logging.warning("[GRAPH_GENERATION] ❌ Dados inadequados para gráfico agrupado") | |
| return df | |
| def _prepare_for_pie_graphs(df: pd.DataFrame, categorical_cols: list, numeric_cols: list, graph_type: str) -> pd.DataFrame: | |
| """Prepara dados para gráficos de pizza""" | |
| if categorical_cols and numeric_cols: | |
| cat_col, val_col = categorical_cols[0], numeric_cols[0] | |
| if graph_type == 'pie_multiple' and len(categorical_cols) >= 2: | |
| # Para pizzas múltiplas, manter 2 categóricas + 1 numérica | |
| result_df = df[[categorical_cols[0], categorical_cols[1], val_col]] | |
| logging.info(f"[GRAPH_GENERATION] 🥧 Pizzas múltiplas: {result_df.columns.tolist()}") | |
| else: | |
| # Agrupar e somar valores para pizza simples/donut | |
| result_df = df.groupby(cat_col)[val_col].sum().reset_index() | |
| result_df = result_df.sort_values(by=val_col, ascending=False) | |
| # Limitar a 10 categorias | |
| if len(result_df) > 10: | |
| top_9 = result_df.head(9) | |
| others_sum = result_df.iloc[9:][val_col].sum() | |
| if others_sum > 0: | |
| others_row = pd.DataFrame({cat_col: ['Outros'], val_col: [others_sum]}) | |
| result_df = pd.concat([top_9, others_row], ignore_index=True) | |
| else: | |
| result_df = top_9 | |
| logging.info(f"[GRAPH_GENERATION] 🥧 Pizza: {cat_col} + {val_col} ({len(result_df)} categorias)") | |
| return result_df | |
| else: | |
| logging.warning("[GRAPH_GENERATION] Dados insuficientes para gráfico de pizza") | |
| return df | |
| def _prepare_for_multiline_graphs(df: pd.DataFrame, structure: dict) -> pd.DataFrame: | |
| """Prepara dados para gráficos de múltiplas linhas""" | |
| date_cols = structure['date_cols'] | |
| numeric_cols = structure['numeric_cols'] | |
| categorical_cols = structure['categorical_cols'] | |
| if date_cols and len(numeric_cols) >= 2: | |
| # Data + múltiplas numéricas | |
| cols_to_keep = [date_cols[0]] + numeric_cols | |
| result_df = df[cols_to_keep].sort_values(by=date_cols[0]) | |
| logging.info(f"[GRAPH_GENERATION] 📈 Multilinhas temporais: {cols_to_keep}") | |
| return result_df | |
| elif categorical_cols and len(numeric_cols) >= 2: | |
| # Categórica + múltiplas numéricas | |
| cols_to_keep = [categorical_cols[0]] + numeric_cols | |
| result_df = df[cols_to_keep] | |
| logging.info(f"[GRAPH_GENERATION] 📈 Multilinhas categóricas: {cols_to_keep}") | |
| return result_df | |
| else: | |
| logging.warning("[GRAPH_GENERATION] Dados insuficientes para multilinhas") | |
| return df | |
| def _prepare_basic_fallback(df: pd.DataFrame, categorical_cols: list, numeric_cols: list) -> pd.DataFrame: | |
| """Preparação básica de fallback""" | |
| if categorical_cols and numeric_cols: | |
| result_df = df[[categorical_cols[0], numeric_cols[0]]] | |
| logging.info(f"[GRAPH_GENERATION] 🔄 Fallback básico: {result_df.columns.tolist()}") | |
| return result_df | |
| else: | |
| logging.warning("[GRAPH_GENERATION] Dados inadequados para qualquer gráfico") | |
| return df | |
| def save_plot_to_image() -> Image.Image: | |
| """ | |
| Salva o plot atual do matplotlib como imagem PIL | |
| Returns: | |
| Imagem PIL | |
| """ | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=100, bbox_inches='tight', facecolor='white') | |
| buf.seek(0) | |
| img = Image.open(buf) | |
| plt.close() # Importante: fechar o plot para liberar memória | |
| return img | |
| # ==================== FUNÇÕES DE GERAÇÃO ESPECÍFICAS ==================== | |
| async def generate_line_simple(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de linha simples""" | |
| if len(df.columns) < 2: | |
| return None | |
| x_col, y_col = df.columns[0], df.columns[1] | |
| is_date = pd.api.types.is_datetime64_any_dtype(df[x_col]) | |
| plt.figure(figsize=(12, 6)) | |
| if is_date: | |
| plt.plot(df[x_col], df[y_col], marker='o', linewidth=2, color=colors[0]) | |
| plt.gcf().autofmt_xdate() | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y')) | |
| else: | |
| plt.plot(range(len(df)), df[y_col], marker='o', linewidth=2, color=colors[0]) | |
| plt.xticks(range(len(df)), df[x_col], rotation=45, ha='right') | |
| plt.xlabel(x_col) | |
| plt.ylabel(y_col) | |
| plt.title(title or f"{y_col} por {x_col}") | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_multiline(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de múltiplas linhas""" | |
| if len(df.columns) < 2: | |
| return None | |
| x_col = df.columns[0] | |
| y_cols = [col for col in df.columns[1:] if pd.api.types.is_numeric_dtype(df[col])] | |
| if not y_cols: | |
| return await generate_line_simple(df, title, colors) | |
| is_date = pd.api.types.is_datetime64_any_dtype(df[x_col]) | |
| plt.figure(figsize=(12, 6)) | |
| for i, y_col in enumerate(y_cols): | |
| if is_date: | |
| plt.plot(df[x_col], df[y_col], marker='o', linewidth=2, | |
| label=y_col, color=colors[i % len(colors)]) | |
| else: | |
| plt.plot(range(len(df)), df[y_col], marker='o', linewidth=2, | |
| label=y_col, color=colors[i % len(colors)]) | |
| if is_date: | |
| plt.gcf().autofmt_xdate() | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y')) | |
| else: | |
| plt.xticks(range(len(df)), df[x_col], rotation=45, ha='right') | |
| plt.xlabel(x_col) | |
| plt.ylabel("Valores") | |
| plt.title(title or f"Comparação por {x_col}") | |
| plt.legend(title="Séries", loc='best') | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_area(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de área""" | |
| if len(df.columns) < 2: | |
| return None | |
| x_col, y_col = df.columns[0], df.columns[1] | |
| is_date = pd.api.types.is_datetime64_any_dtype(df[x_col]) | |
| plt.figure(figsize=(12, 6)) | |
| if is_date: | |
| plt.fill_between(df[x_col], df[y_col], alpha=0.5, color=colors[0]) | |
| plt.plot(df[x_col], df[y_col], color=colors[0], linewidth=2) | |
| plt.gcf().autofmt_xdate() | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%d/%m/%Y')) | |
| else: | |
| plt.fill_between(range(len(df)), df[y_col], alpha=0.5, color=colors[0]) | |
| plt.plot(range(len(df)), df[y_col], color=colors[0], linewidth=2) | |
| plt.xticks(range(len(df)), df[x_col], rotation=45, ha='right') | |
| plt.xlabel(x_col) | |
| plt.ylabel(y_col) | |
| plt.title(title or f"{y_col} por {x_col}") | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_bar_vertical(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de barras verticais""" | |
| if len(df.columns) < 2: | |
| return None | |
| x_col, y_col = df.columns[0], df.columns[1] | |
| # Preparar dados numéricos - converter strings com vírgula para float | |
| df_plot = df.copy() | |
| try: | |
| if df_plot[y_col].dtype == 'object': | |
| # Converte strings para números, tratando vírgulas como separador decimal | |
| df_plot[y_col] = pd.to_numeric(df_plot[y_col].astype(str).str.replace(',', '.'), errors='coerce') | |
| # Remove linhas com valores não numéricos | |
| df_plot = df_plot.dropna(subset=[y_col]) | |
| if df_plot.empty: | |
| logging.error(f"[GRAPH_GENERATION] Nenhum valor numérico válido encontrado na coluna {y_col}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"[GRAPH_GENERATION] Erro ao converter dados para numérico: {e}") | |
| return None | |
| plt.figure(figsize=(12, 8)) | |
| bars = plt.bar(range(len(df_plot)), df_plot[y_col], color=colors[0]) | |
| # Adicionar valores nas barras | |
| try: | |
| max_value = df_plot[y_col].max() | |
| for i, bar in enumerate(bars): | |
| height = bar.get_height() | |
| if isinstance(height, (int, float)) and not pd.isna(height): | |
| plt.text(bar.get_x() + bar.get_width()/2., height + 0.02 * max_value, | |
| f'{height:,.0f}', ha='center', fontsize=9) | |
| except Exception as e: | |
| logging.warning(f"[GRAPH_GENERATION] Erro ao adicionar valores nas barras: {e}") | |
| plt.xlabel(x_col) | |
| plt.ylabel(y_col) | |
| plt.title(title or f"{y_col} por {x_col}") | |
| plt.xticks(range(len(df_plot)), df_plot[x_col], rotation=45, ha='right') | |
| plt.grid(True, linestyle='--', alpha=0.7, axis='y') | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_bar_horizontal(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de barras horizontais""" | |
| if len(df.columns) < 2: | |
| return None | |
| x_col, y_col = df.columns[0], df.columns[1] | |
| # Preparar dados numéricos - converter strings com vírgula para float | |
| df_plot = df.copy() | |
| try: | |
| if df_plot[y_col].dtype == 'object': | |
| # Converte strings para números, tratando vírgulas como separador decimal | |
| df_plot[y_col] = pd.to_numeric(df_plot[y_col].astype(str).str.replace(',', '.'), errors='coerce') | |
| # Remove linhas com valores não numéricos | |
| df_plot = df_plot.dropna(subset=[y_col]) | |
| if df_plot.empty: | |
| logging.error(f"[GRAPH_GENERATION] Nenhum valor numérico válido encontrado na coluna {y_col}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"[GRAPH_GENERATION] Erro ao converter dados para numérico: {e}") | |
| return None | |
| plt.figure(figsize=(12, max(6, len(df_plot) * 0.4))) | |
| bars = plt.barh(range(len(df_plot)), df_plot[y_col], color=colors[0]) | |
| # Adicionar valores nas barras | |
| try: | |
| max_value = df_plot[y_col].max() | |
| for i, bar in enumerate(bars): | |
| width = bar.get_width() | |
| if isinstance(width, (int, float)) and not pd.isna(width): | |
| plt.text(width + 0.02 * max_value, bar.get_y() + bar.get_height()/2., | |
| f'{width:,.0f}', va='center', fontsize=9) | |
| except Exception as e: | |
| logging.warning(f"[GRAPH_GENERATION] Erro ao adicionar valores nas barras: {e}") | |
| plt.xlabel(y_col) | |
| plt.ylabel(x_col) | |
| plt.title(title or f"{y_col} por {x_col}") | |
| plt.yticks(range(len(df_plot)), df_plot[x_col]) | |
| plt.grid(True, linestyle='--', alpha=0.7, axis='x') | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_bar_grouped(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """ | |
| FUNÇÃO REFATORADA: Gera gráfico de barras agrupadas com fallbacks inteligentes | |
| """ | |
| logging.info(f"[GRAPH_GENERATION] 🎯 Gerando barras agrupadas REFATORADO. Colunas: {df.columns.tolist()}") | |
| if len(df.columns) < 2: | |
| logging.warning("[GRAPH_GENERATION] ❌ Dados insuficientes para gráfico agrupado") | |
| return None | |
| # Analisar estrutura dos dados | |
| structure = analyze_dataframe_structure(df) | |
| numeric_cols = structure['numeric_cols'] | |
| categorical_cols = structure['categorical_cols'] | |
| logging.info(f"[GRAPH_GENERATION] 📊 Estrutura: {len(numeric_cols)} numéricas, {len(categorical_cols)} categóricas") | |
| if not numeric_cols: | |
| logging.warning("[GRAPH_GENERATION] ❌ Nenhuma coluna numérica encontrada") | |
| return await generate_bar_vertical(df, title, colors) | |
| # DECISÃO INTELIGENTE baseada na estrutura dos dados | |
| if len(numeric_cols) >= 2: | |
| # CENÁRIO 1: Múltiplas numéricas - gráfico agrupado tradicional | |
| return await _generate_multi_numeric_grouped(df, title, colors, categorical_cols[0], numeric_cols) | |
| elif len(numeric_cols) == 1 and len(categorical_cols) >= 2: | |
| # CENÁRIO 2: 1 numérica + múltiplas categóricas - agrupamento por cor | |
| return await _generate_color_grouped_bars(df, title, colors, categorical_cols, numeric_cols[0]) | |
| elif len(numeric_cols) == 1 and len(categorical_cols) == 1: | |
| # CENÁRIO 3: Dados simples - fallback inteligente para barras verticais | |
| logging.info("[GRAPH_GENERATION] ⚠️ Dados simples, usando barras verticais") | |
| return await generate_bar_vertical(df, title, colors) | |
| else: | |
| # CENÁRIO 4: Estrutura inadequada | |
| logging.warning("[GRAPH_GENERATION] ❌ Estrutura de dados inadequada para agrupamento") | |
| return await generate_bar_vertical(df, title, colors) | |
| async def _generate_multi_numeric_grouped(df: pd.DataFrame, title: str, colors, x_col: str, y_cols: list) -> Optional[Image.Image]: | |
| """ | |
| Gera gráfico agrupado com múltiplas colunas numéricas (cenário tradicional) | |
| """ | |
| logging.info(f"[GRAPH_GENERATION] 📊 Múltiplas numéricas: {x_col} + {y_cols}") | |
| # Converter colunas numéricas se necessário | |
| for col in y_cols: | |
| if df[col].dtype == 'object': | |
| df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', '.'), errors='coerce') | |
| # Remover linhas com valores NaN | |
| df_clean = df.dropna(subset=y_cols) | |
| if df_clean.empty: | |
| logging.error("[GRAPH_GENERATION] ❌ Todos os valores são NaN após conversão") | |
| return None | |
| # Verificar diferença de escala entre colunas | |
| col_ranges = {col: df_clean[col].max() - df_clean[col].min() for col in y_cols} | |
| max_range = max(col_ranges.values()) | |
| min_range = min(col_ranges.values()) | |
| if max_range > 0 and min_range > 0 and (max_range / min_range) > 100: | |
| # Escalas muito diferentes - usar eixos duplos | |
| logging.info("[GRAPH_GENERATION] 📊 Escalas diferentes, usando eixos duplos") | |
| return await _generate_dual_axis_chart(df_clean, title, colors, x_col, y_cols[0], y_cols[1]) | |
| # Gráfico agrupado normal | |
| x_pos = np.arange(len(df_clean)) | |
| width = 0.8 / len(y_cols) | |
| fig, ax = plt.subplots(figsize=(14, 8)) | |
| for i, col in enumerate(y_cols): | |
| offset = width * i - width * (len(y_cols) - 1) / 2 | |
| bars = ax.bar(x_pos + offset, df_clean[col], width, label=col, | |
| color=colors[i % len(colors)], alpha=0.8) | |
| # Adicionar valores nas barras | |
| for bar in bars: | |
| height = bar.get_height() | |
| if height > 0: | |
| ax.text(bar.get_x() + bar.get_width()/2., height + height * 0.02, | |
| f'{height:.0f}', ha='center', fontsize=8) | |
| ax.set_xlabel(x_col) | |
| ax.set_ylabel('Valores') | |
| ax.set_title(title or f"Comparação de {', '.join(y_cols)} por {x_col}") | |
| ax.set_xticks(x_pos) | |
| ax.set_xticklabels(df_clean[x_col], rotation=45, ha='right') | |
| ax.legend() | |
| ax.grid(True, linestyle='--', alpha=0.7, axis='y') | |
| plt.tight_layout() | |
| logging.info(f"[GRAPH_GENERATION] ✅ Gráfico agrupado tradicional criado: {len(y_cols)} métricas") | |
| return save_plot_to_image() | |
| async def _generate_color_grouped_bars(df: pd.DataFrame, title: str, colors, categorical_cols: list, y_col: str) -> Optional[Image.Image]: | |
| """ | |
| Gera gráfico agrupado por cor usando múltiplas categóricas (CENÁRIO CRÍTICO) | |
| """ | |
| x_col = categorical_cols[0] | |
| group_col = categorical_cols[1] if len(categorical_cols) > 1 else None | |
| logging.info(f"[GRAPH_GENERATION] 🎨 Agrupamento por cor: {x_col} (X) + {y_col} (Y) + {group_col} (cor)") | |
| if not group_col: | |
| logging.warning("[GRAPH_GENERATION] ⚠️ Sem coluna para agrupamento, usando gráfico simples") | |
| return await generate_bar_vertical(df[[x_col, y_col]], title, colors) | |
| # Converter coluna numérica se necessário | |
| if df[y_col].dtype == 'object': | |
| df[y_col] = pd.to_numeric(df[y_col].astype(str).str.replace(',', '.'), errors='coerce') | |
| # Remover linhas com valores NaN | |
| df_clean = df.dropna(subset=[y_col]) | |
| if df_clean.empty: | |
| logging.error("[GRAPH_GENERATION] ❌ Todos os valores são NaN após conversão") | |
| return None | |
| # Obter categorias únicas | |
| unique_groups = df_clean[group_col].unique() | |
| unique_x = df_clean[x_col].unique() | |
| logging.info(f"[GRAPH_GENERATION] 🎯 Grupos: {unique_groups} | X: {len(unique_x)} categorias") | |
| # Configurar gráfico | |
| x_pos = np.arange(len(unique_x)) | |
| width = 0.8 / len(unique_groups) | |
| fig, ax = plt.subplots(figsize=(14, 8)) | |
| # Criar barras para cada grupo | |
| for i, group in enumerate(unique_groups): | |
| group_data = df_clean[df_clean[group_col] == group] | |
| # Criar array de valores para cada posição X | |
| values = [] | |
| for x_val in unique_x: | |
| matching_rows = group_data[group_data[x_col] == x_val] | |
| if not matching_rows.empty: | |
| values.append(matching_rows[y_col].iloc[0]) | |
| else: | |
| values.append(0) | |
| # Calcular posição das barras | |
| offset = width * i - width * (len(unique_groups) - 1) / 2 | |
| bars = ax.bar(x_pos + offset, values, width, label=f"{group_col}: {group}", | |
| color=colors[i % len(colors)], alpha=0.8) | |
| # Adicionar valores nas barras | |
| for bar, value in zip(bars, values): | |
| if value > 0: | |
| ax.text(bar.get_x() + bar.get_width()/2., value + value * 0.02, | |
| f'{value:.0f}', ha='center', fontsize=8) | |
| # Configurações do gráfico | |
| ax.set_xlabel(x_col) | |
| ax.set_ylabel(y_col) | |
| ax.set_title(title or f"{y_col} por {x_col} (agrupado por {group_col})") | |
| ax.set_xticks(x_pos) | |
| ax.set_xticklabels(unique_x, rotation=45, ha='right') | |
| ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left') | |
| ax.grid(True, linestyle='--', alpha=0.7, axis='y') | |
| plt.tight_layout() | |
| logging.info(f"[GRAPH_GENERATION] ✅ Gráfico agrupado por cor criado: {len(unique_groups)} grupos") | |
| return save_plot_to_image() | |
| async def _generate_dual_axis_chart(df: pd.DataFrame, title: str, colors, x_col: str, y1_col: str, y2_col: str) -> Optional[Image.Image]: | |
| """ | |
| Gera gráfico com eixos duplos para métricas com escalas diferentes | |
| """ | |
| logging.info(f"[GRAPH_GENERATION] 📊 Eixos duplos: {y1_col} (esq) + {y2_col} (dir)") | |
| fig, ax1 = plt.subplots(figsize=(14, 8)) | |
| # Primeiro eixo Y (esquerda) | |
| x_pos = np.arange(len(df)) | |
| width = 0.35 | |
| bars1 = ax1.bar(x_pos - width/2, df[y1_col], width, label=y1_col, | |
| color=colors[0], alpha=0.8) | |
| ax1.set_xlabel(x_col) | |
| ax1.set_ylabel(y1_col, color=colors[0]) | |
| ax1.tick_params(axis='y', labelcolor=colors[0]) | |
| # Segundo eixo Y (direita) | |
| ax2 = ax1.twinx() | |
| bars2 = ax2.bar(x_pos + width/2, df[y2_col], width, label=y2_col, | |
| color=colors[1], alpha=0.8) | |
| ax2.set_ylabel(y2_col, color=colors[1]) | |
| ax2.tick_params(axis='y', labelcolor=colors[1]) | |
| # Configurações comuns | |
| ax1.set_xticks(x_pos) | |
| ax1.set_xticklabels(df[x_col], rotation=45, ha='right') | |
| ax1.grid(True, linestyle='--', alpha=0.7, axis='y') | |
| # Adicionar valores nas barras | |
| for bar in bars1: | |
| height = bar.get_height() | |
| if height > 0: | |
| ax1.text(bar.get_x() + bar.get_width()/2., height + height * 0.02, | |
| f'{height:.0f}', ha='center', fontsize=8) | |
| for bar in bars2: | |
| height = bar.get_height() | |
| if height > 0: | |
| ax2.text(bar.get_x() + bar.get_width()/2., height + height * 0.02, | |
| f'{height:.0f}', ha='center', fontsize=8) | |
| plt.title(title or f"{y1_col} e {y2_col} por {x_col}") | |
| plt.tight_layout() | |
| logging.info(f"[GRAPH_GENERATION] ✅ Gráfico com eixos duplos criado: {y1_col} + {y2_col}") | |
| return save_plot_to_image() | |
| # Função removida - substituída pela nova lógica unificada | |
| # Função removida - substituída pela nova lógica unificada em _generate_color_grouped_bars() | |
| async def generate_bar_stacked(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de barras empilhadas""" | |
| if len(df.columns) < 3: | |
| return await generate_bar_vertical(df, title, colors) | |
| x_col = df.columns[0] | |
| y_cols = [col for col in df.columns[1:] if pd.api.types.is_numeric_dtype(df[col])] | |
| if not y_cols: | |
| return await generate_bar_vertical(df, title, colors) | |
| fig, ax = plt.subplots(figsize=(12, 8)) | |
| bottom = np.zeros(len(df)) | |
| for i, col in enumerate(y_cols): | |
| bars = ax.bar(range(len(df)), df[col], bottom=bottom, label=col, color=colors[i % len(colors)]) | |
| # Adicionar valores nas barras | |
| for j, bar in enumerate(bars): | |
| height = bar.get_height() | |
| if isinstance(height, (int, float)) and height > 0: | |
| ax.text(bar.get_x() + bar.get_width()/2., bottom[j] + height/2, | |
| f'{height:.2f}', ha='center', va='center', fontsize=8, color='white') | |
| bottom += df[col].fillna(0) | |
| ax.set_xlabel(x_col) | |
| ax.set_ylabel('Valores') | |
| ax.set_title(title or f"Distribuição por {x_col}") | |
| ax.set_xticks(range(len(df))) | |
| ax.set_xticklabels(df[x_col], rotation=45, ha='right') | |
| ax.legend() | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_pie(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de pizza""" | |
| if len(df.columns) < 2: | |
| return None | |
| label_col, value_col = df.columns[0], df.columns[1] | |
| # Preparar dados numéricos - converter strings com vírgula para float | |
| df_plot = df.copy() | |
| try: | |
| if df_plot[value_col].dtype == 'object': | |
| # Converte strings para números, tratando vírgulas como separador decimal | |
| df_plot[value_col] = pd.to_numeric(df_plot[value_col].astype(str).str.replace(',', '.'), errors='coerce') | |
| # Remove linhas com valores não numéricos, negativos ou zero | |
| df_plot = df_plot.dropna(subset=[value_col]) | |
| df_plot = df_plot[df_plot[value_col] > 0] | |
| if df_plot.empty: | |
| logging.error(f"[GRAPH_GENERATION] Nenhum valor numérico positivo encontrado na coluna {value_col}") | |
| return await generate_bar_vertical(df, title, colors) | |
| except Exception as e: | |
| logging.error(f"[GRAPH_GENERATION] Erro ao converter dados para numérico: {e}") | |
| return await generate_bar_vertical(df, title, colors) | |
| plt.figure(figsize=(10, 10)) | |
| # Calcular percentuais para os rótulos | |
| total = df_plot[value_col].sum() | |
| labels = [f'{label} ({val:,.0f}, {val/total:.1%})' for label, val in zip(df_plot[label_col], df_plot[value_col])] | |
| plt.pie(df_plot[value_col], labels=labels, autopct='%1.1f%%', | |
| startangle=90, shadow=False, colors=colors[:len(df_plot)]) | |
| plt.axis('equal') | |
| plt.title(title or f"Distribuição de {value_col} por {label_col}") | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_donut(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera gráfico de donut""" | |
| if len(df.columns) < 2: | |
| return None | |
| label_col, value_col = df.columns[0], df.columns[1] | |
| # Preparar dados numéricos - converter strings com vírgula para float | |
| df_plot = df.copy() | |
| try: | |
| if df_plot[value_col].dtype == 'object': | |
| # Converte strings para números, tratando vírgulas como separador decimal | |
| df_plot[value_col] = pd.to_numeric(df_plot[value_col].astype(str).str.replace(',', '.'), errors='coerce') | |
| # Remove linhas com valores não numéricos, negativos ou zero | |
| df_plot = df_plot.dropna(subset=[value_col]) | |
| df_plot = df_plot[df_plot[value_col] > 0] | |
| if df_plot.empty: | |
| logging.error(f"[GRAPH_GENERATION] Nenhum valor numérico positivo encontrado na coluna {value_col}") | |
| return await generate_bar_vertical(df, title, colors) | |
| except Exception as e: | |
| logging.error(f"[GRAPH_GENERATION] Erro ao converter dados para numérico: {e}") | |
| return await generate_bar_vertical(df, title, colors) | |
| plt.figure(figsize=(10, 10)) | |
| # Calcular percentuais para os rótulos | |
| total = df_plot[value_col].sum() | |
| labels = [f'{label} ({val:,.0f}, {val/total:.1%})' for label, val in zip(df_plot[label_col], df_plot[value_col])] | |
| # Criar gráfico de donut (pizza com círculo central) | |
| plt.pie(df_plot[value_col], labels=labels, autopct='%1.1f%%', | |
| startangle=90, shadow=False, colors=colors[:len(df_plot)], | |
| wedgeprops=dict(width=0.5)) # Largura do anel | |
| plt.axis('equal') | |
| plt.title(title or f"Distribuição de {value_col} por {label_col}") | |
| plt.tight_layout() | |
| return save_plot_to_image() | |
| async def generate_pie_multiple(df: pd.DataFrame, title: str, colors) -> Optional[Image.Image]: | |
| """Gera múltiplos gráficos de pizza""" | |
| if len(df.columns) < 3: | |
| return await generate_pie(df, title, colors) | |
| cat1, cat2, val_col = df.columns[0], df.columns[1], df.columns[2] | |
| # Verificar se o valor é numérico | |
| if not pd.api.types.is_numeric_dtype(df[val_col]): | |
| return await generate_bar_grouped(df, title, colors) | |
| # Agrupar dados | |
| grouped = df.groupby([cat1, cat2])[val_col].sum().unstack().fillna(0) | |
| # Determinar layout da grade | |
| n_groups = len(grouped) | |
| if n_groups == 0: | |
| return None | |
| cols = min(3, n_groups) # Máximo 3 colunas | |
| rows = (n_groups + cols - 1) // cols # Arredondar para cima | |
| # Criar subplots | |
| fig, axes = plt.subplots(rows, cols, figsize=(15, 5 * rows)) | |
| if rows == 1 and cols == 1: | |
| axes = np.array([axes]) # Garantir que axes seja um array | |
| axes = axes.flatten() | |
| # Plotar cada pizza | |
| for i, (group_name, group_data) in enumerate(grouped.iterrows()): | |
| if i < len(axes): | |
| # Remover valores zero | |
| data = group_data[group_data > 0] | |
| if not data.empty: | |
| # Calcular percentuais | |
| total = data.sum() | |
| # Criar rótulos com valores e percentuais | |
| labels = [f'{idx} ({val:.2f}, {val/total:.1%})' for idx, val in data.items()] | |
| # Plotar pizza | |
| axes[i].pie(data, labels=labels, autopct='%1.1f%%', | |
| startangle=90, colors=colors[:len(data)]) | |
| axes[i].set_title(f"{group_name}") | |
| axes[i].axis('equal') | |
| # Esconder eixos não utilizados | |
| for j in range(i + 1, len(axes)): | |
| axes[j].axis('off') | |
| plt.suptitle(title or f"Distribuição de {val_col} por {cat2} para cada {cat1}", fontsize=16) | |
| plt.tight_layout() | |
| plt.subplots_adjust(top=0.9) | |
| return save_plot_to_image() | |