Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| core.py - Lógica de negócio para elaboração de modelos estatísticos | |
| Contém: carregamento de dados, estatísticas, transformações, modelo OLS, diagnósticos | |
| """ | |
| import os | |
| import pandas as pd | |
| # Desabilita StringDtype para compatibilidade entre versões do pandas | |
| pd.set_option('future.infer_string', False) | |
| import numpy as np | |
| import statsmodels.api as sm | |
| from statsmodels.stats.diagnostic import het_breuschpagan | |
| from statsmodels.stats.stattools import durbin_watson | |
| from statsmodels.stats.outliers_influence import OLSInfluence | |
| from scipy.stats import kstest | |
| from itertools import product | |
| from joblib import dump, load | |
| import io | |
| import plotly.io as pio | |
| # ============================================================ | |
| # CONSTANTES | |
| # ============================================================ | |
| TRANSFORMACOES = ["(x)", "1/(x)", "ln(x)", "exp(x)", "(x)^2", "raiz(x)", "1/raiz(x)"] | |
| # Nomes comuns para identificar colunas especiais | |
| NOMES_VUNIT = {"vunit", "vu", "valor_unitario", "vuloc", "vupriv", "vuaconst", "vuapriv", "valor unitário"} | |
| NOMES_LAT = {"lat", "latitude", "y", "siat_latitude"} | |
| NOMES_LON = {"lon", "longitude", "long", "x", "siat_longitude"} | |
| # Tabela Durbin-Watson | |
| DW_TABLE = { | |
| 0.05: { | |
| 1: {"n": [25, 50, 100, 200], "dL": [1.10, 1.32, 1.49, 1.57], "dU": [1.54, 1.62, 1.70, 1.75]}, | |
| 2: {"n": [25, 50, 100, 200], "dL": [1.06, 1.30, 1.47, 1.55], "dU": [1.50, 1.60, 1.69, 1.74]}, | |
| 3: {"n": [25, 50, 100, 200], "dL": [1.02, 1.28, 1.45, 1.54], "dU": [1.47, 1.58, 1.68, 1.73]}, | |
| 4: {"n": [25, 50, 100, 200], "dL": [0.98, 1.26, 1.43, 1.52], "dU": [1.44, 1.56, 1.66, 1.72]}, | |
| } | |
| } | |
| # ============================================================ | |
| # CARREGAMENTO DE DADOS | |
| # ============================================================ | |
| def detectar_abas_excel(arquivo): | |
| """ | |
| Detecta as abas (sheets) de um arquivo Excel. | |
| Args: | |
| arquivo: Pode ser um objeto de arquivo ou um caminho (string) | |
| Retorna: | |
| tuple: (lista_abas, mensagem, sucesso) | |
| """ | |
| if arquivo is None: | |
| return [], "Nenhum arquivo enviado.", False | |
| try: | |
| # Obtém o caminho/nome do arquivo | |
| caminho = arquivo.name if hasattr(arquivo, 'name') else str(arquivo) | |
| if caminho.endswith(('.xlsx', '.xls')): | |
| # Usa o caminho para abrir o arquivo (funciona com path ou file object) | |
| excel_file = pd.ExcelFile(caminho) | |
| abas = excel_file.sheet_names | |
| return abas, f"Arquivo com {len(abas)} aba(s) detectada(s)", True | |
| else: | |
| # CSV não tem abas | |
| return [], "Arquivo CSV (sem abas)", True | |
| except Exception as e: | |
| return [], f"Erro ao detectar abas: {str(e)}", False | |
| def carregar_arquivo(arquivo, nome_aba=None): | |
| """ | |
| Carrega arquivo Excel ou CSV e retorna DataFrame. | |
| Detecta automaticamente o separador do CSV. | |
| Args: | |
| arquivo: Arquivo a ser carregado (objeto de arquivo ou caminho string) | |
| nome_aba: Nome da aba a carregar (apenas para Excel). Se None, carrega a primeira. | |
| Retorna: | |
| tuple: (df, mensagem, sucesso) | |
| """ | |
| if arquivo is None: | |
| return None, "Nenhum arquivo enviado.", False | |
| try: | |
| # Obtém o caminho do arquivo | |
| caminho = arquivo.name if hasattr(arquivo, 'name') else str(arquivo) | |
| if caminho.endswith(('.xlsx', '.xls')): | |
| # Carrega aba específica ou a primeira (usa caminho para poder reabrir) | |
| df = pd.read_excel(caminho, sheet_name=nome_aba if nome_aba else 0) | |
| aba_info = f" (aba: {nome_aba})" if nome_aba else "" | |
| elif caminho.endswith('.csv'): | |
| # Tenta detectar separador | |
| with open(caminho, 'rb') as f: | |
| content = f.read() | |
| # Tenta diferentes separadores | |
| for sep in [',', ';', '\t']: | |
| try: | |
| df = pd.read_csv(io.BytesIO(content), sep=sep) | |
| if len(df.columns) > 1: | |
| break | |
| except: | |
| continue | |
| else: | |
| df = pd.read_csv(io.BytesIO(content)) | |
| aba_info = "" | |
| else: | |
| return None, "Formato de arquivo não suportado.", False | |
| # Reinicia índice começando em 1 | |
| df = df.reset_index(drop=True) | |
| df.index = df.index + 1 | |
| return df, f"Arquivo carregado: {os.path.basename(caminho)}{aba_info} ({len(df)} linhas, {len(df.columns)} colunas)", True | |
| except Exception as e: | |
| return None, f"Erro ao carregar arquivo: {str(e)}", False | |
| def carregar_dai(caminho): | |
| """ | |
| Carrega arquivo .dai e extrai DataFrame, variáveis e transformações. | |
| Retorna: | |
| tuple: (df, coluna_y, colunas_x, transformacao_y, transformacoes_x, mensagem, sucesso) | |
| """ | |
| try: | |
| pacote = load(caminho) | |
| # Extrai DataFrame | |
| df = pacote["Xy_preview_out_coords"].copy() | |
| df = df.reset_index(drop=True) | |
| df.index = df.index + 1 | |
| # Parseia transformações | |
| info_transf = pacote["formatted_top_transformation_info"] | |
| # Primeiro elemento: Y (ex: "VUNIT: ln(y)") | |
| nome_y, transf_y = info_transf[0].split(": ", 1) | |
| coluna_y = nome_y.strip() | |
| transformacao_y = transf_y.strip().replace("(y)", "(x)") | |
| # Demais: X (ex: "Area: ln(x)") | |
| colunas_x = [] | |
| transformacoes_x = {} | |
| for item in info_transf[1:]: | |
| nome_x, transf_x = item.split(": ", 1) | |
| nome_x = nome_x.strip() | |
| colunas_x.append(nome_x) | |
| transformacoes_x[nome_x] = transf_x.strip() | |
| msg = f"Modelo .dai carregado: {os.path.basename(caminho)} ({len(df)} dados, Y={coluna_y}, {len(colunas_x)} variáveis X)" | |
| return df, coluna_y, colunas_x, transformacao_y, transformacoes_x, msg, True | |
| except Exception as e: | |
| return None, None, None, None, None, f"Erro ao carregar .dai: {str(e)}", False | |
| def identificar_coluna_y_padrao(df): | |
| """ | |
| Identifica a coluna padrão para variável dependente (y). | |
| Prioriza VUNIT se existir. | |
| """ | |
| colunas_lower = {col.lower(): col for col in df.columns} | |
| for nome in NOMES_VUNIT: | |
| if nome in colunas_lower: | |
| return colunas_lower[nome] | |
| # Se não encontrar, retorna primeira coluna numérica | |
| numericas = df.select_dtypes(include=[np.number]).columns.tolist() | |
| return numericas[0] if numericas else None | |
| def identificar_colunas_coords(df): | |
| """ | |
| Identifica colunas de latitude e longitude. | |
| Retorna tuple (lat_col, lon_col) ou (None, None). | |
| """ | |
| colunas_lower = {col.lower(): col for col in df.columns} | |
| lat_col = None | |
| lon_col = None | |
| for nome in NOMES_LAT: | |
| if nome in colunas_lower: | |
| lat_col = colunas_lower[nome] | |
| break | |
| for nome in NOMES_LON: | |
| if nome in colunas_lower: | |
| lon_col = colunas_lower[nome] | |
| break | |
| return lat_col, lon_col | |
| def obter_colunas_numericas(df, excluir_coords=True): | |
| """ | |
| Retorna lista de colunas numéricas. | |
| Opcionalmente exclui lat/lon. | |
| """ | |
| numericas = df.select_dtypes(include=[np.number]).columns.tolist() | |
| if excluir_coords: | |
| lat_col, lon_col = identificar_colunas_coords(df) | |
| if lat_col and lat_col in numericas: | |
| numericas.remove(lat_col) | |
| if lon_col and lon_col in numericas: | |
| numericas.remove(lon_col) | |
| return numericas | |
| # ============================================================ | |
| # ESTATÍSTICAS | |
| # ============================================================ | |
| def calcular_estatisticas_variaveis(df, coluna_y=None, indices_usar=None, colunas=None): | |
| """ | |
| Calcula estatísticas para cada coluna numérica. | |
| Se coluna_y for especificada, inclui correlação com y. | |
| Se colunas for especificada, calcula apenas para essas colunas. | |
| Parâmetros: | |
| df: DataFrame com os dados | |
| coluna_y: nome da coluna da variável dependente (opcional) | |
| indices_usar: lista de índices a incluir (se None, usa todos) | |
| colunas: lista de colunas a calcular (se None, usa todas numéricas) | |
| Retorna DataFrame com estatísticas. | |
| """ | |
| # Filtra por índices se especificado | |
| df_calc = df.copy() | |
| if indices_usar is not None: | |
| df_calc = df_calc.loc[indices_usar] | |
| # Usa colunas especificadas ou todas numéricas | |
| if colunas is not None: | |
| numericas = [c for c in colunas if c in df_calc.columns and pd.api.types.is_numeric_dtype(df_calc[c])] | |
| else: | |
| numericas = obter_colunas_numericas(df_calc) | |
| estatisticas = [] | |
| for col in numericas: | |
| serie = df_calc[col].dropna() | |
| stats = { | |
| "Variável": col, | |
| "Contagem": len(serie), | |
| "Média": serie.mean(), | |
| "Mediana": serie.median(), | |
| "Mínimo": serie.min(), | |
| "Máximo": serie.max(), | |
| "Desvio Padrão": serie.std(), | |
| } | |
| # Coeficiente de variação | |
| if stats["Média"] != 0: | |
| stats["CV (%)"] = abs(stats["Desvio Padrão"] / stats["Média"]) * 100 | |
| else: | |
| stats["CV (%)"] = np.nan | |
| # Correlação com y | |
| if coluna_y and col != coluna_y: | |
| y_serie = df_calc[coluna_y].dropna() | |
| comum = serie.index.intersection(y_serie.index) | |
| if len(comum) > 2: | |
| stats["Correlação (y)"] = np.corrcoef(serie.loc[comum], y_serie.loc[comum])[0, 1] | |
| else: | |
| stats["Correlação (y)"] = np.nan | |
| else: | |
| stats["Correlação (y)"] = np.nan if col != coluna_y else 1.0 | |
| # Detecta se é dicotômica | |
| valores_unicos = set(serie.unique()) | |
| stats["Dicotômica"] = valores_unicos.issubset({0, 1, 0.0, 1.0}) | |
| estatisticas.append(stats) | |
| df_stats = pd.DataFrame(estatisticas) | |
| # Ordena por correlação absoluta (maiores primeiro) | |
| if coluna_y: | |
| df_stats["_abs_corr"] = df_stats["Correlação (y)"].abs() | |
| df_stats = df_stats.sort_values("_abs_corr", ascending=False) | |
| df_stats = df_stats.drop(columns=["_abs_corr"]) | |
| return df_stats.reset_index(drop=True) | |
| # ============================================================ | |
| # ANÁLISE DE OUTLIERS | |
| # ============================================================ | |
| def calcular_metricas_outliers(df, coluna_y, colunas_x, indices_participantes=None): | |
| """ | |
| Ajusta modelo simples (sem transformações) para calcular métricas de outliers. | |
| Parâmetros: | |
| df: DataFrame com os dados | |
| coluna_y: nome da coluna da variável dependente | |
| colunas_x: lista de nomes das colunas independentes | |
| indices_participantes: lista de índices a incluir (se None, usa todos) | |
| Retorna: | |
| DataFrame com métricas: Índice, Observado, Calculado, Resíduo, | |
| Resíduo Pad., Resíduo Stud., Cook | |
| """ | |
| if not colunas_x: | |
| return None | |
| # Filtra por participantes | |
| df_calc = df.copy() | |
| if indices_participantes is not None: | |
| df_calc = df_calc.loc[indices_participantes] | |
| # Prepara y (sem transformação) | |
| y = df_calc[coluna_y].values | |
| # Prepara X (sem transformação) | |
| X = df_calc[colunas_x].copy() | |
| # Remove linhas com NaN ou Inf | |
| mask = ~(np.isnan(y) | np.isinf(y)) | |
| for col in X.columns: | |
| mask &= ~(np.isnan(X[col]) | np.isinf(X[col])) | |
| y_final = y[mask] | |
| X_final = X.loc[mask.values if hasattr(mask, 'values') else mask] | |
| indices_final = df_calc.index[mask] | |
| if len(y_final) < len(colunas_x) + 2: | |
| return None | |
| try: | |
| # Ajusta modelo simples | |
| X_sm = sm.add_constant(X_final) | |
| modelo = sm.OLS(y_final, X_sm).fit() | |
| # Calcula métricas | |
| residuos = modelo.resid | |
| n = len(y_final) | |
| k = len(colunas_x) | |
| desvio_padrao = np.sqrt(np.sum(residuos**2) / (n - k - 1)) | |
| residuos_pad = residuos / desvio_padrao | |
| influence = OLSInfluence(modelo) | |
| residuos_stud = influence.resid_studentized | |
| cook = influence.cooks_distance[0] | |
| # Monta DataFrame | |
| df_metricas = pd.DataFrame({ | |
| "Índice": indices_final, | |
| "Observado": y_final, | |
| "Calculado": modelo.predict(), | |
| "Resíduo": residuos, | |
| "Resíduo Pad.": residuos_pad, | |
| "Resíduo Stud.": residuos_stud, | |
| "Cook": cook | |
| }) | |
| df_metricas = df_metricas.set_index("Índice") | |
| return df_metricas | |
| except Exception as e: | |
| print(f"Erro ao calcular métricas de outliers: {e}") | |
| return None | |
| def sugerir_outliers(df_metricas, coluna, valor): | |
| """ | |
| Aplica filtro simétrico e retorna lista de índices sugeridos como outliers. | |
| Filtra: coluna <= -valor OU coluna >= +valor | |
| Parâmetros: | |
| df_metricas: DataFrame com métricas de outliers | |
| coluna: nome da coluna para filtro (ex: "Resíduo Pad.") | |
| valor: valor absoluto do limite (ex: 2.0 significa <= -2 OU >= +2) | |
| Retorna: | |
| Lista de índices sugeridos como outliers | |
| """ | |
| if df_metricas is None or df_metricas.empty: | |
| return [] | |
| if coluna not in df_metricas.columns: | |
| return [] | |
| # Filtro simétrico: <= -valor OU >= +valor | |
| mask = (df_metricas[coluna] <= -valor) | (df_metricas[coluna] >= valor) | |
| indices = df_metricas[mask].index.tolist() | |
| return sorted(indices) | |
| # ============================================================ | |
| # TRANSFORMAÇÕES | |
| # ============================================================ | |
| def aplicar_transformacao(data, transformacao): | |
| """ | |
| Aplica uma transformação matemática aos dados. | |
| """ | |
| data = np.asarray(data, dtype=float) | |
| # Evita overflow em exp | |
| if transformacao == "exp(x)" and (data > 50).any(): | |
| return data | |
| if transformacao == "(x)": | |
| return data | |
| elif transformacao == "1/(x)": | |
| return 1 / (data + 0.001) | |
| elif transformacao == "ln(x)": | |
| return np.log(np.maximum(data, 0.001)) | |
| elif transformacao == "exp(x)": | |
| return np.exp(data) | |
| elif transformacao == "(x)^2": | |
| return data ** 2 | |
| elif transformacao == "raiz(x)": | |
| return np.sqrt(np.maximum(data, 0)) | |
| elif transformacao == "1/raiz(x)": | |
| return 1 / (np.sqrt(np.maximum(data, 0)) + 0.001) | |
| else: | |
| return data | |
| def inverter_transformacao_y(data, transformacao): | |
| """ | |
| Inverte a transformação aplicada em y para obter valores na escala original. | |
| """ | |
| data = np.asarray(data, dtype=float) | |
| if transformacao in ["(y)", "(x)", "direct"]: | |
| return data | |
| elif transformacao in ["ln(y)", "ln(x)"]: | |
| return np.exp(data) | |
| elif transformacao in ["1/(y)", "1/(x)"]: | |
| return 1 / (data + 0.001) | |
| elif transformacao in ["exp(y)", "exp(x)"]: | |
| return np.log(np.maximum(data, 0.001)) | |
| elif transformacao in ["(y)^2", "(x)^2"]: | |
| return np.sqrt(np.maximum(data, 0)) | |
| elif transformacao in ["raiz(y)", "raiz(x)"]: | |
| return data ** 2 | |
| elif transformacao in ["1/raiz(y)", "1/raiz(x)"]: | |
| return 1 / (data ** 2 + 0.001) | |
| else: | |
| return data | |
| def formatar_transformacao(transformacao, is_y=False): | |
| """ | |
| Formata a string da transformação para exibição. | |
| """ | |
| if transformacao == "(x)": | |
| return "(y)" if is_y else "(x)" | |
| elif transformacao == "1/(x)": | |
| return "1/(y)" if is_y else "1/(x)" | |
| elif transformacao == "ln(x)": | |
| return "ln(y)" if is_y else "ln(x)" | |
| elif transformacao == "exp(x)": | |
| return "exp(y)" if is_y else "exp(x)" | |
| elif transformacao == "(x)^2": | |
| return "(y)^2" if is_y else "(x)^2" | |
| elif transformacao == "raiz(x)": | |
| return "raiz(y)" if is_y else "raiz(x)" | |
| elif transformacao == "1/raiz(x)": | |
| return "1/raiz(y)" if is_y else "1/raiz(x)" | |
| else: | |
| return transformacao | |
| def detectar_dicotomicas(df, colunas): | |
| """ | |
| Detecta quais colunas são dicotômicas (apenas 0 e 1). | |
| Retorna lista de nomes de colunas dicotômicas. | |
| """ | |
| dicotomicas = [] | |
| for col in colunas: | |
| valores = set(df[col].dropna().unique()) | |
| if valores.issubset({0, 1, 0.0, 1.0}): | |
| dicotomicas.append(col) | |
| return dicotomicas | |
| # ============================================================ | |
| # MODELO OLS E DIAGNÓSTICOS | |
| # ============================================================ | |
| def ajustar_modelo(df, coluna_y, colunas_x, transformacao_y, transformacoes_x, indices_usar=None): | |
| """ | |
| Ajusta modelo OLS com as transformações especificadas. | |
| Parâmetros: | |
| df: DataFrame com os dados | |
| coluna_y: nome da coluna da variável dependente | |
| colunas_x: lista de nomes das colunas independentes | |
| transformacao_y: transformação para y | |
| transformacoes_x: dict {coluna: transformação} para X | |
| indices_usar: lista de índices a incluir (se None, usa todos) | |
| Retorna: | |
| dict com resultados do modelo ou None se falhar | |
| """ | |
| if not colunas_x: | |
| return None | |
| # Filtra por índices se especificado | |
| df_modelo = df.copy() | |
| if indices_usar is not None: | |
| df_modelo = df_modelo.loc[indices_usar] | |
| # Prepara y transformado | |
| y = df_modelo[coluna_y].values | |
| y_transf = aplicar_transformacao(y, transformacao_y) | |
| # Prepara X transformado | |
| X_transf = pd.DataFrame(index=df_modelo.index) | |
| for col in colunas_x: | |
| transf = transformacoes_x.get(col, "(x)") | |
| X_transf[col] = aplicar_transformacao(df_modelo[col].values, transf) | |
| # Remove linhas com NaN ou Inf | |
| mask = ~(np.isnan(y_transf) | np.isinf(y_transf)) | |
| for col in X_transf.columns: | |
| mask &= ~(np.isnan(X_transf[col]) | np.isinf(X_transf[col])) | |
| y_final = y_transf[mask] | |
| X_final = X_transf.loc[mask.values if hasattr(mask, 'values') else mask] | |
| indices_final = df_modelo.index[mask] | |
| if len(y_final) < len(colunas_x) + 2: | |
| return None | |
| try: | |
| # Ajusta modelo | |
| X_sm = sm.add_constant(X_final) | |
| modelo = sm.OLS(y_final, X_sm).fit() | |
| # Calcula diagnósticos | |
| diagnosticos = calcular_diagnosticos(modelo, X_final, y_final, coluna_y, transformacao_y, transformacoes_x) | |
| # Tabela de coeficientes | |
| tabela_coef = pd.DataFrame({ | |
| "Variável": modelo.params.index, | |
| "Coeficiente": modelo.params.values, | |
| "Erro Padrão": modelo.bse.values, | |
| "t-Student": modelo.tvalues.values, | |
| "p-valor": modelo.pvalues.values, | |
| "Significância": [classificar_significancia(p) for p in modelo.pvalues.values] | |
| }) | |
| # Tabela obs vs calc | |
| y_obs = y_final | |
| y_calc = modelo.predict() | |
| residuos = modelo.resid | |
| residuos_pad = residuos / diagnosticos["desvio_padrao_residuos"] | |
| influence = OLSInfluence(modelo) | |
| residuos_stud = influence.resid_studentized | |
| cook = influence.cooks_distance[0] | |
| tabela_obs_calc = pd.DataFrame({ | |
| "Índice": indices_final, | |
| "Observado": y_obs, | |
| "Calculado": y_calc, | |
| "Resíduo": residuos, | |
| "Resíduo Pad.": residuos_pad, | |
| "Resíduo Stud.": residuos_stud, | |
| "Cook": cook | |
| }) | |
| # Dados transformados | |
| X_esc_info = [f"{col}: {formatar_transformacao(transformacoes_x.get(col, '(x)'))}" for col in colunas_x] | |
| y_esc_info = f"{coluna_y}: {formatar_transformacao(transformacao_y, is_y=True)}" | |
| return { | |
| "modelo_sm": modelo, | |
| "diagnosticos": diagnosticos, | |
| "tabela_coef": tabela_coef, | |
| "tabela_obs_calc": tabela_obs_calc, | |
| "X_transformado": X_final, | |
| "y_transformado": pd.Series(y_final, index=indices_final, name=coluna_y), | |
| "transformacoes_x": transformacoes_x.copy(), | |
| "transformacao_y": transformacao_y, | |
| "colunas_x": colunas_x.copy(), | |
| "coluna_y": coluna_y, | |
| "X_esc_info": X_esc_info, | |
| "y_esc_info": y_esc_info, | |
| "indices_usados": indices_final.tolist() | |
| } | |
| except Exception as e: | |
| print(f"Erro ao ajustar modelo: {e}") | |
| return None | |
| def calcular_diagnosticos(modelo, X, y, coluna_y, transformacao_y, transformacoes_x): | |
| """ | |
| Calcula todos os diagnósticos do modelo. | |
| """ | |
| n = len(y) | |
| k = X.shape[1] | |
| residuos = modelo.resid | |
| desvio_padrao_residuos = np.sqrt(np.sum(residuos**2) / (n - k - 1)) | |
| residuos_pad = residuos / desvio_padrao_residuos | |
| # Métricas básicas | |
| mse = np.mean(residuos**2) | |
| r2 = modelo.rsquared | |
| r2_ajustado = modelo.rsquared_adj | |
| r_pearson = np.corrcoef(y, modelo.predict())[0, 1] | |
| Fc = modelo.fvalue | |
| p_valor_F = modelo.f_pvalue | |
| # Interpretação F | |
| if p_valor_F < 0.01: | |
| interp_F = "Fc > Ft: Significante ao nível de 1% (confiança de 99%) - GRAU III." | |
| elif p_valor_F < 0.025: | |
| interp_F = "Fc > Ft: Significante ao nível de 2,5% (confiança de 97,5%) - GRAU II." | |
| elif p_valor_F < 0.05: | |
| interp_F = "Fc > Ft: Significante ao nível de 5% (confiança de 95%) - GRAU I." | |
| else: | |
| interp_F = "Fc < Ft: NÃO é estatisticamente significante ao nível de 5%." | |
| # Teste KS (normalidade) | |
| ks_stat, ks_p = kstest(residuos, "norm", args=(np.mean(residuos), np.std(residuos, ddof=1))) | |
| interp_KS = "☑️Resíduos são normalmente distribuídos." if ks_p >= 0.05 else "❌Resíduos NÃO são normalmente distribuídos." | |
| # Percentuais da curva normal (formato compatível com .dai) | |
| intervalos = [(-1.00, 1.00), (-1.64, 1.64), (-1.96, 1.96)] | |
| percentuais = [] | |
| for min_int, max_int in intervalos: | |
| count = np.sum((residuos_pad >= min_int) & (residuos_pad <= max_int)) | |
| perc = round(count / len(residuos) * 100, 0) | |
| percentuais.append(f"{perc:.0f}%") | |
| perc_resid = ", ".join(percentuais) | |
| # Durbin-Watson | |
| dw = durbin_watson(residuos) | |
| k_dw = min(max(1, k), 4) | |
| tab = DW_TABLE[0.05][k_dw] | |
| dL = np.interp(n, tab["n"], tab["dL"]) | |
| dU = np.interp(n, tab["n"], tab["dU"]) | |
| if dw < dL: | |
| interp_DW = "❌Autocorrelação positiva." | |
| elif dw > 4 - dL: | |
| interp_DW = "❌Autocorrelação negativa." | |
| elif dU < dw < 4 - dU: | |
| interp_DW = "☑️Sem autocorrelação." | |
| else: | |
| interp_DW = "⚠️Região inconclusiva." | |
| # Breusch-Pagan | |
| bp_lm, bp_p, bp_f, bp_fp = het_breuschpagan(residuos, sm.add_constant(X)) | |
| interp_BP = "☑️Não há evidência de heterocedasticidade." if bp_p >= 0.05 else "❌Heterocedasticidade detectada." | |
| # Equação do modelo | |
| equacao = formatar_equacao(modelo, coluna_y, transformacao_y, transformacoes_x, list(X.columns)) | |
| # Gera formatted_output para compatibilidade com .dai | |
| formatted_output = f"""Número de observações: {n} | |
| Número de variáveis independentes: {k} | |
| Desvio padrão dos resíduos: {desvio_padrao_residuos:.4f} | |
| MSE: {mse:.4f} | |
| R²: {r2:.4f} | |
| R² ajustado: {r2_ajustado:.4f} | |
| Correlação Pearson: {r_pearson:.4f} | |
| Estatística F: {Fc:.4f} (p-valor: {p_valor_F:.4f}) -> {interp_F} | |
| ---------------------------------------------------------------------- | |
| Teste de Normalidade (Kolmogorov-Smirnov): | |
| Estatística KS: {ks_stat:.4f}, p-valor: {ks_p:.4f} -> {interp_KS} | |
| Teste de Normalidade (Comparação com a curva normal) – percentuais atingidos: {perc_resid} | |
| • Ideal 68% → aceitável entre 64% e 75% | |
| • Ideal 90% → aceitável entre 88% e 95% | |
| • Ideal 95% → aceitável entre 95% e 100% | |
| ---------------------------------------------------------------------- | |
| Teste de Autocorrelação (Durbin-Watson): | |
| DW: {dw:.4f} -> {interp_DW} | |
| ---------------------------------------------------------------------- | |
| Teste de Homocedasticidade (Breusch-Pagan): | |
| Estatística LM: {bp_lm:.4f}, p-valor: {bp_p:.4f} | |
| {interp_BP} | |
| ---------------------------------------------------------------------- | |
| Equação do Modelo: | |
| {equacao}""" | |
| return { | |
| "n": n, | |
| "k": k, | |
| "desvio_padrao_residuos": desvio_padrao_residuos, | |
| "mse": mse, | |
| "r2": r2, | |
| "r2_ajustado": r2_ajustado, | |
| "r_pearson": r_pearson, | |
| "Fc": Fc, | |
| "p_valor_F": p_valor_F, | |
| "interp_F": interp_F, | |
| "ks_stat": ks_stat, | |
| "ks_p": ks_p, | |
| "interp_KS": interp_KS, | |
| "perc_resid": perc_resid, | |
| "dw": dw, | |
| "interp_DW": interp_DW, | |
| "bp_lm": bp_lm, | |
| "bp_p": bp_p, | |
| "interp_BP": interp_BP, | |
| "equacao": equacao, | |
| "formatted_output": formatted_output | |
| } | |
| def classificar_significancia(p_valor): | |
| """Classifica significância segundo NBR 14.653-2.""" | |
| if p_valor <= 0.10: | |
| return "Grau III" | |
| elif p_valor <= 0.20: | |
| return "Grau II" | |
| elif p_valor <= 0.30: | |
| return "Grau I" | |
| else: | |
| return "Sem enquadramento" | |
| def formatar_equacao(modelo, coluna_y, transformacao_y, transformacoes_x, colunas_x): | |
| """Formata a equação do modelo.""" | |
| equacao = f"{modelo.params['const']:.6f}" | |
| for col in colunas_x: | |
| coef = modelo.params[col] | |
| transf = transformacoes_x.get(col, "(x)") | |
| if transf == "ln(x)": | |
| termo = f"ln({col})" | |
| elif transf == "1/(x)": | |
| termo = f"1/{col}" | |
| elif transf == "(x)^2": | |
| termo = f"({col})²" | |
| elif transf == "raiz(x)": | |
| termo = f"√({col})" | |
| elif transf == "1/raiz(x)": | |
| termo = f"1/√({col})" | |
| elif transf == "exp(x)": | |
| termo = f"exp({col})" | |
| else: | |
| termo = col | |
| sinal = "+" if coef >= 0 else "-" | |
| equacao += f" {sinal} {abs(coef):.6f} × {termo}" | |
| # Aplica inversão de y | |
| if transformacao_y == "ln(x)": | |
| equacao = f"exp({equacao})" | |
| elif transformacao_y == "(x)^2": | |
| equacao = f"√({equacao})" | |
| elif transformacao_y == "1/(x)": | |
| equacao = f"1/({equacao})" | |
| elif transformacao_y == "raiz(x)": | |
| equacao = f"({equacao})²" | |
| elif transformacao_y == "1/raiz(x)": | |
| equacao = f"1/({equacao})²" | |
| return f"{coluna_y} = {equacao}" | |
| # ============================================================ | |
| # BUSCA AUTOMÁTICA DE TRANSFORMAÇÕES (OTIMIZADA) | |
| # ============================================================ | |
| def _calcular_r2_numpy(X, y): | |
| """ | |
| Calcula R² usando operações matriciais diretas (mais rápido que statsmodels). | |
| """ | |
| n = len(y) | |
| if n == 0: | |
| return -np.inf | |
| # Adiciona constante | |
| X_const = np.column_stack([np.ones(n), X]) | |
| try: | |
| # OLS: beta = (X'X)^-1 X'y | |
| XtX = X_const.T @ X_const | |
| Xty = X_const.T @ y | |
| beta = np.linalg.solve(XtX, Xty) | |
| # R² | |
| y_pred = X_const @ beta | |
| ss_res = np.sum((y - y_pred) ** 2) | |
| ss_tot = np.sum((y - np.mean(y)) ** 2) | |
| if ss_tot == 0: | |
| return -np.inf | |
| return 1 - (ss_res / ss_tot) | |
| except: | |
| return -np.inf | |
| def _precomputar_transformacoes(df, colunas): | |
| """ | |
| Pré-computa todas as transformações válidas para as colunas. | |
| Retorna dict: {(coluna, transformacao): array} | |
| """ | |
| cache = {} | |
| for col in colunas: | |
| valores = df[col].values.astype(float) | |
| for transf in TRANSFORMACOES: | |
| dados_transf = aplicar_transformacao(valores, transf) | |
| # Só armazena se válido | |
| if not (np.isnan(dados_transf).any() or np.isinf(dados_transf).any()): | |
| cache[(col, transf)] = dados_transf | |
| return cache | |
| def _buscar_stepwise(colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, df_busca): | |
| """ | |
| Busca stepwise: otimiza uma variável por vez. | |
| Mais rápido para muitas variáveis (O(7*n) em vez de O(7^n)). | |
| """ | |
| # Inicializa com transformação (x) para todas | |
| transf_x_atuais = {col: "(x)" for col in colunas_livres} | |
| # Para cada variável X, encontrar melhor transformação | |
| for col in colunas_livres: | |
| melhor_transf = "(x)" | |
| melhor_r2 = -np.inf | |
| for transf in TRANSFORMACOES: | |
| if (col, transf) not in x_cache: | |
| continue | |
| # Testa com esta transformação | |
| transf_teste = transf_x_atuais.copy() | |
| transf_teste[col] = transf | |
| # Monta matriz X | |
| X_arrays = [] | |
| valido = True | |
| for c in colunas_x: | |
| if c in transformacoes_fixas: | |
| t = transformacoes_fixas[c] | |
| else: | |
| t = transf_teste.get(c, "(x)") | |
| if (c, t) in x_cache: | |
| X_arrays.append(x_cache[(c, t)]) | |
| else: | |
| valido = False | |
| break | |
| if not valido or not X_arrays: | |
| continue | |
| X = np.column_stack(X_arrays) | |
| r2 = _calcular_r2_numpy(X, y_transf) | |
| if r2 > melhor_r2: | |
| melhor_r2 = r2 | |
| melhor_transf = transf | |
| transf_x_atuais[col] = melhor_transf | |
| # Calcula R² final | |
| transf_x_final = {**transformacoes_fixas, **transf_x_atuais} | |
| X_arrays = [] | |
| for c in colunas_x: | |
| t = transf_x_final.get(c, "(x)") | |
| if (c, t) in x_cache: | |
| X_arrays.append(x_cache[(c, t)]) | |
| if X_arrays: | |
| X = np.column_stack(X_arrays) | |
| r2_final = _calcular_r2_numpy(X, y_transf) | |
| else: | |
| r2_final = -np.inf | |
| return transf_x_final, r2_final | |
| def _buscar_exaustivo_numpy(colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, top_n): | |
| """ | |
| Busca exaustiva usando numpy para cálculo rápido de R². | |
| Para uso quando o número de combinações é pequeno (<= 5000). | |
| """ | |
| n_livres = len(colunas_livres) | |
| resultados_transf = [] | |
| for combo in product(TRANSFORMACOES, repeat=n_livres): | |
| # Monta dict de transformações | |
| transf_x = transformacoes_fixas.copy() | |
| for i, col in enumerate(colunas_livres): | |
| transf_x[col] = combo[i] | |
| # Monta matriz X | |
| X_arrays = [] | |
| valido = True | |
| for col in colunas_x: | |
| t = transf_x.get(col, "(x)") | |
| if (col, t) in x_cache: | |
| X_arrays.append(x_cache[(col, t)]) | |
| else: | |
| valido = False | |
| break | |
| if not valido or not X_arrays: | |
| continue | |
| X = np.column_stack(X_arrays) | |
| r2 = _calcular_r2_numpy(X, y_transf) | |
| if r2 > -np.inf: | |
| resultados_transf.append((r2, transf_x.copy())) | |
| # Ordena e retorna top | |
| resultados_transf.sort(key=lambda x: x[0], reverse=True) | |
| return resultados_transf[:top_n] | |
| def buscar_melhores_transformacoes(df, coluna_y, colunas_x, transformacoes_fixas=None, transformacao_y_fixa=None, | |
| indices_usar=None, top_n=5): | |
| """ | |
| Busca otimizada das melhores combinações de transformações para maximizar R². | |
| Usa estratégia híbrida: | |
| - Para poucos combos (<=5000): busca exaustiva com numpy (rápida) | |
| - Para muitos combos (>5000): busca stepwise (O(7*n) em vez de O(7^n)) | |
| Parâmetros: | |
| df: DataFrame | |
| coluna_y: variável dependente | |
| colunas_x: lista de variáveis independentes | |
| transformacoes_fixas: dict {coluna: transformação} para variáveis com transformação fixa | |
| transformacao_y_fixa: transformação fixa para y (ou None para testar todas) | |
| indices_usar: lista de índices a incluir (se None, usa todos) | |
| top_n: número de melhores modelos a retornar | |
| Retorna: | |
| Lista de dicts com informações dos top N modelos | |
| """ | |
| if transformacoes_fixas is None: | |
| transformacoes_fixas = {} | |
| # Filtra por índices se especificado | |
| df_busca = df.copy() | |
| if indices_usar is not None: | |
| df_busca = df_busca.loc[indices_usar] | |
| # Detecta dicotômicas e fixa em (x) | |
| dicotomicas = detectar_dicotomicas(df_busca, colunas_x) | |
| for col in dicotomicas: | |
| if col not in transformacoes_fixas: | |
| transformacoes_fixas[col] = "(x)" | |
| # Identifica colunas livres (não fixas) | |
| colunas_livres = [col for col in colunas_x if col not in transformacoes_fixas] | |
| n_livres = len(colunas_livres) | |
| # Define transformações de Y a testar | |
| if transformacao_y_fixa: | |
| transformacoes_y = [transformacao_y_fixa] | |
| else: | |
| transformacoes_y = TRANSFORMACOES | |
| # Pré-computa todas as transformações de X (apenas colunas livres) | |
| x_cache = _precomputar_transformacoes(df_busca, colunas_livres) | |
| # Adiciona transformações fixas ao cache | |
| for col, transf in transformacoes_fixas.items(): | |
| if col in colunas_x: | |
| dados = aplicar_transformacao(df_busca[col].values.astype(float), transf) | |
| if not (np.isnan(dados).any() or np.isinf(dados).any()): | |
| x_cache[(col, transf)] = dados | |
| # Calcula número total de combinações | |
| total_combos = len(TRANSFORMACOES) ** n_livres if n_livres > 0 else 1 | |
| usar_stepwise = total_combos > 5000 | |
| resultados = [] | |
| for transf_y in transformacoes_y: | |
| # Pré-computa Y transformado | |
| y_transf = aplicar_transformacao(df_busca[coluna_y].values.astype(float), transf_y) | |
| # Se y_transf tem problemas, pula | |
| if np.isnan(y_transf).any() or np.isinf(y_transf).any(): | |
| continue | |
| if n_livres == 0: | |
| # Todas as variáveis são fixas, só calcula R² | |
| X_arrays = [x_cache[(col, transformacoes_fixas[col])] | |
| for col in colunas_x if (col, transformacoes_fixas.get(col, "(x)")) in x_cache] | |
| if X_arrays: | |
| X = np.column_stack(X_arrays) | |
| r2 = _calcular_r2_numpy(X, y_transf) | |
| if r2 > -np.inf: | |
| resultados.append({ | |
| "r2": r2, | |
| "transformacao_y": transf_y, | |
| "transformacoes_x": transformacoes_fixas.copy() | |
| }) | |
| elif usar_stepwise: | |
| # Busca stepwise para muitas variáveis | |
| transf_x, r2 = _buscar_stepwise( | |
| colunas_x, colunas_livres, transformacoes_fixas, | |
| y_transf, x_cache, df_busca | |
| ) | |
| if r2 > -np.inf: | |
| resultados.append({ | |
| "r2": r2, | |
| "transformacao_y": transf_y, | |
| "transformacoes_x": transf_x | |
| }) | |
| else: | |
| # Busca exaustiva para poucas variáveis | |
| top_combos = _buscar_exaustivo_numpy( | |
| colunas_x, colunas_livres, transformacoes_fixas, | |
| y_transf, x_cache, top_n | |
| ) | |
| for r2, transf_x in top_combos: | |
| resultados.append({ | |
| "r2": r2, | |
| "transformacao_y": transf_y, | |
| "transformacoes_x": transf_x | |
| }) | |
| # Ordena por R² e retorna top N | |
| resultados = sorted(resultados, key=lambda x: x["r2"], reverse=True)[:top_n] | |
| # Adiciona ranking | |
| for i, r in enumerate(resultados): | |
| r["rank"] = i + 1 | |
| return resultados | |
| # ============================================================ | |
| # MICRONUMEROSIDADE (NBR 14.653-2) | |
| # ============================================================ | |
| def testar_micronumerosidade(df, colunas_x): | |
| """ | |
| Testa critérios de micronumerosidade da NBR 14.653-2. | |
| """ | |
| # Filtra apenas colunas inteiras (dicotômicas/categóricas) | |
| colunas_int = [col for col in colunas_x if df[col].dtype in ['int64', 'int32']] | |
| n = len(df) | |
| k = len(colunas_x) | |
| # Condição geral: n >= 3*(k+1) | |
| condicao_geral = n >= 3 * (k + 1) | |
| resultados_por_coluna = {} | |
| for coluna in colunas_int: | |
| frequencia = df[coluna].value_counts() | |
| ni_valido = True | |
| mensagens = [] | |
| for categoria, ni in frequencia.items(): | |
| if n <= 30: | |
| if ni < 3: | |
| ni_valido = False | |
| mensagens.append(f"ni={ni} < 3 na categoria {categoria}") | |
| elif 30 < n <= 100: | |
| if ni < 0.1 * n: | |
| ni_valido = False | |
| mensagens.append(f"ni={ni} < 10% de n na categoria {categoria}") | |
| elif n > 100: | |
| if ni <= 10: | |
| ni_valido = False | |
| mensagens.append(f"ni={ni} <= 10 na categoria {categoria}") | |
| resultados_por_coluna[coluna] = { | |
| "valido": ni_valido, | |
| "mensagens": mensagens if mensagens else ["OK"] | |
| } | |
| return { | |
| "n": n, | |
| "k": k, | |
| "condicao_geral_ok": condicao_geral, | |
| "por_coluna": resultados_por_coluna | |
| } | |
| # ============================================================ | |
| # EXPORTAR BASE TRATADA (CSV) | |
| # ============================================================ | |
| def exportar_base_csv(df): | |
| """ | |
| Exporta DataFrame para arquivo CSV temporário. | |
| Parâmetros: | |
| df: DataFrame a exportar | |
| Retorna: | |
| str: Caminho do arquivo CSV gerado | |
| """ | |
| import tempfile | |
| import os | |
| if df is None or df.empty: | |
| return None | |
| # Cria arquivo temporário | |
| fd, caminho = tempfile.mkstemp(suffix=".csv", prefix="base_tratada_") | |
| os.close(fd) | |
| # Salva CSV | |
| df.to_csv(caminho, index=True, encoding="utf-8-sig", sep=";", decimal=",") | |
| return caminho | |
| # ============================================================ | |
| # EXPORTAR MODELO (.dai) | |
| # ============================================================ | |
| def exportar_modelo_dai(resultado_modelo, df_original, estatisticas, nome_arquivo, graficos=None): | |
| """ | |
| Exporta o modelo em formato .dai compatível com app MODELOS_DAI. | |
| Parâmetros: | |
| resultado_modelo: dict com resultado do ajuste | |
| df_original: DataFrame original | |
| estatisticas: DataFrame com estatísticas (mantido como DataFrame) | |
| nome_arquivo: nome do arquivo de saída | |
| graficos: dict opcional com gráficos Plotly (chaves: obs_calc, residuos, etc.) | |
| Retorna: | |
| tuple: (caminho_arquivo, mensagem) | |
| """ | |
| import tempfile | |
| if resultado_modelo is None: | |
| return None, "Nenhum modelo para exportar" | |
| try: | |
| # Função auxiliar para converter StringDtype para object e limpar DataFrame | |
| def limpar_df_para_export(df): | |
| """Converte StringDtype para object e remove colunas Unnamed.""" | |
| df_clean = df.copy() | |
| # Remove colunas 'Unnamed: X' | |
| cols_unnamed = [c for c in df_clean.columns if str(c).startswith('Unnamed')] | |
| if cols_unnamed: | |
| df_clean = df_clean.drop(columns=cols_unnamed) | |
| # Converte StringDtype para object (compatibilidade entre versões do pandas) | |
| for col in df_clean.columns: | |
| if pd.api.types.is_string_dtype(df_clean[col]): | |
| df_clean[col] = df_clean[col].astype(object) | |
| # Converte index se for string dtype (compatibilidade com .dai) | |
| if pd.api.types.is_string_dtype(df_clean.index): | |
| df_clean.index = df_clean.index.astype(object) | |
| return df_clean | |
| # Monta pacote | |
| lat_col, lon_col = identificar_colunas_coords(df_original) | |
| # DataFrame com coords para o mapa | |
| df_coords = limpar_df_para_export(df_original) | |
| if lat_col: | |
| df_coords = df_coords.rename(columns={lat_col: "lat"}) | |
| if lon_col: | |
| df_coords = df_coords.rename(columns={lon_col: "lon"}) | |
| # Filtra apenas índices usados no modelo | |
| indices_modelo = resultado_modelo["indices_usados"] | |
| df_modelo = df_coords.loc[indices_modelo] if indices_modelo else df_coords | |
| df_modelo.index.name = None # Remove nome do índice | |
| # Gera gráfico HTML para graf_model (se graficos fornecidos) | |
| graf_model_html = "" | |
| if graficos and graficos.get("obs_calc"): | |
| try: | |
| graf_model_html = pio.to_html(graficos["obs_calc"], full_html=True, include_plotlyjs='cdn') | |
| except: | |
| graf_model_html = "" | |
| # Prepara tabela_coef no formato compatível (índice = Variável, sem coluna "Variável") | |
| tabela_coef_export = limpar_df_para_export(resultado_modelo["tabela_coef"]) | |
| if "Variável" in tabela_coef_export.columns: | |
| tabela_coef_export = tabela_coef_export.set_index("Variável") | |
| tabela_coef_export.index.name = None # Remove nome do índice | |
| # Converte index para object após set_index (compatibilidade .dai) | |
| if pd.api.types.is_string_dtype(tabela_coef_export.index): | |
| tabela_coef_export.index = tabela_coef_export.index.astype(object) | |
| # Renomeia p-valor para p-value | |
| tabela_coef_export = tabela_coef_export.rename(columns={"p-valor": "p-value"}) | |
| # Prepara tabela_obs_calc no formato compatível | |
| tabela_obs_calc_export = limpar_df_para_export(resultado_modelo["tabela_obs_calc"]) | |
| # Move Índice para o index do DataFrame | |
| if "Índice" in tabela_obs_calc_export.columns: | |
| tabela_obs_calc_export = tabela_obs_calc_export.set_index("Índice") | |
| tabela_obs_calc_export.index.name = None # Remove nome do índice | |
| # Renomeia colunas para formato compatível | |
| tabela_obs_calc_export = tabela_obs_calc_export.rename(columns={ | |
| "Resíduo Pad.": "Resíduo Padronizado", | |
| "Resíduo Stud.": "Resíduo Studentizado", | |
| "Cook": "Distância de Cook" | |
| }) | |
| # Prepara modelos_resumos (diagnosticos) no formato compatível | |
| diag = resultado_modelo["diagnosticos"].copy() | |
| modelos_resumos = { | |
| "n": diag.get("n"), | |
| "k": diag.get("k"), | |
| "desvio_padrao_residuos": diag.get("desvio_padrao_residuos"), | |
| "mse": diag.get("mse"), | |
| "r2": diag.get("r2"), | |
| "r2_ajustado": diag.get("r2_ajustado"), | |
| "r_pearson": diag.get("r_pearson"), | |
| "Fc": diag.get("Fc"), | |
| "p_valor_F": diag.get("p_valor_F"), | |
| "Interpretacao_F": diag.get("interp_F"), | |
| "ks_stat": diag.get("ks_stat"), | |
| "ks_p": diag.get("ks_p"), | |
| "Interpretacao_KS": diag.get("interp_KS"), | |
| "perc_resid": diag.get("perc_resid"), | |
| "dw": diag.get("dw"), | |
| "Interpretacao_DW": diag.get("interp_DW"), | |
| "bp_lm": diag.get("bp_lm"), | |
| "bp_p": diag.get("bp_p"), | |
| "Interpretacao_BP": diag.get("interp_BP"), | |
| "equacao": diag.get("equacao"), | |
| "formatted_output": diag.get("formatted_output", "") | |
| } | |
| # Prepara estatisticas no formato compatível (índice = Variável) | |
| estatisticas_df = estatisticas if isinstance(estatisticas, pd.DataFrame) else pd.DataFrame(estatisticas) | |
| estatisticas_export = limpar_df_para_export(estatisticas_df) | |
| if "Variável" in estatisticas_export.columns: | |
| estatisticas_export = estatisticas_export.set_index("Variável") | |
| estatisticas_export.index.name = None # Remove nome do índice | |
| # Converte index para object após set_index (compatibilidade .dai) | |
| if pd.api.types.is_string_dtype(estatisticas_export.index): | |
| estatisticas_export.index = estatisticas_export.index.astype(object) | |
| # Adiciona coluna Tipo (dtype) se não existir | |
| if "Tipo" not in estatisticas_export.columns: | |
| # Obtém tipos das colunas do DataFrame original | |
| tipos = {} | |
| for var in estatisticas_export.index: | |
| if var in df_original.columns: | |
| tipos[var] = str(df_original[var].dtype) | |
| else: | |
| tipos[var] = "unknown" | |
| estatisticas_export.insert(0, "Tipo", pd.Series(tipos)) | |
| # Mantém apenas colunas compatíveis (na ordem correta) | |
| colunas_manter = ["Tipo", "Contagem", "Média", "Mediana", "Mínimo", "Máximo", "Desvio Padrão"] | |
| colunas_existentes = [c for c in colunas_manter if c in estatisticas_export.columns] | |
| estatisticas_export = estatisticas_export[colunas_existentes] | |
| # Converte dtypes para compatibilidade com formato .dai | |
| if "Contagem" in estatisticas_export.columns: | |
| estatisticas_export["Contagem"] = estatisticas_export["Contagem"].astype(str).astype(object) | |
| if "Tipo" in estatisticas_export.columns: | |
| estatisticas_export["Tipo"] = estatisticas_export["Tipo"].astype(object) | |
| # Limpa top_X_esc e top_y_esc | |
| top_X_esc_export = limpar_df_para_export(resultado_modelo["X_transformado"]) | |
| top_X_esc_export.index.name = None # Remove nome do índice | |
| top_y_esc_export = resultado_modelo["y_transformado"].copy() | |
| top_y_esc_export.index.name = None # Remove nome do índice | |
| pacote = { | |
| "Xy_preview_out_coords": df_modelo, | |
| "estatisticas": estatisticas_export, | |
| "formatted_top_transformation_info": [resultado_modelo["y_esc_info"]] + resultado_modelo["X_esc_info"], | |
| "top_X_esc": top_X_esc_export, | |
| "top_y_esc": top_y_esc_export, | |
| "modelos_resumos": modelos_resumos, | |
| "tabelas_coef": tabela_coef_export, | |
| "tabelas_obs_calc": tabela_obs_calc_export, | |
| "graf_model": graf_model_html, | |
| "modelos_sm": resultado_modelo["modelo_sm"] | |
| } | |
| # Função para converter recursivamente StringDtype para object (compatibilidade HF) | |
| def converter_para_serializacao(obj): | |
| """Converte recursivamente todos os StringDtype para object dtype.""" | |
| if isinstance(obj, pd.DataFrame): | |
| df = obj.copy() | |
| if pd.api.types.is_string_dtype(df.index): | |
| df.index = df.index.astype(object) | |
| for col in df.columns: | |
| if pd.api.types.is_string_dtype(df[col]): | |
| df[col] = df[col].astype(object) | |
| return df | |
| elif isinstance(obj, pd.Series): | |
| s = obj.copy() | |
| if pd.api.types.is_string_dtype(s): | |
| s = s.astype(object) | |
| if pd.api.types.is_string_dtype(s.index): | |
| s.index = s.index.astype(object) | |
| return s | |
| elif isinstance(obj, dict): | |
| return {k: converter_para_serializacao(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [converter_para_serializacao(v) for v in obj] | |
| else: | |
| return obj | |
| # Converte todo o pacote para garantir compatibilidade entre versões do pandas | |
| pacote = converter_para_serializacao(pacote) | |
| # Cria arquivo temporário para download (compatível com Hugging Face) | |
| nome_base = nome_arquivo.replace(".dai", "") if nome_arquivo.endswith(".dai") else nome_arquivo | |
| fd, caminho = tempfile.mkstemp(suffix=".dai", prefix=f"{nome_base}_") | |
| os.close(fd) | |
| dump(pacote, caminho) | |
| return caminho, f"Modelo pronto para download" | |
| except Exception as e: | |
| return None, f"Erro ao exportar: {str(e)}" | |