# -*- coding: utf-8 -*- """ core.py - Lógica de negócio para elaboração de modelos estatísticos Contém: carregamento de dados, estatísticas, transformações, modelo OLS, diagnósticos """ import os import pandas as pd # Desabilita StringDtype para compatibilidade entre versões do pandas pd.set_option('future.infer_string', False) import numpy as np import statsmodels.api as sm from statsmodels.stats.diagnostic import het_breuschpagan from statsmodels.stats.stattools import durbin_watson from statsmodels.stats.outliers_influence import OLSInfluence from scipy.stats import kstest from itertools import product from joblib import dump, load import io import plotly.io as pio # ============================================================ # CONSTANTES # ============================================================ TRANSFORMACOES = ["(x)", "1/(x)", "ln(x)", "exp(x)", "(x)^2", "raiz(x)", "1/raiz(x)"] # Nomes comuns para identificar colunas especiais NOMES_VUNIT = {"vunit", "vu", "valor_unitario", "vuloc", "vupriv", "vuaconst", "vuapriv", "valor unitário"} NOMES_LAT = {"lat", "latitude", "y", "siat_latitude"} NOMES_LON = {"lon", "longitude", "long", "x", "siat_longitude"} # Tabela Durbin-Watson DW_TABLE = { 0.05: { 1: {"n": [25, 50, 100, 200], "dL": [1.10, 1.32, 1.49, 1.57], "dU": [1.54, 1.62, 1.70, 1.75]}, 2: {"n": [25, 50, 100, 200], "dL": [1.06, 1.30, 1.47, 1.55], "dU": [1.50, 1.60, 1.69, 1.74]}, 3: {"n": [25, 50, 100, 200], "dL": [1.02, 1.28, 1.45, 1.54], "dU": [1.47, 1.58, 1.68, 1.73]}, 4: {"n": [25, 50, 100, 200], "dL": [0.98, 1.26, 1.43, 1.52], "dU": [1.44, 1.56, 1.66, 1.72]}, } } # ============================================================ # CARREGAMENTO DE DADOS # ============================================================ def detectar_abas_excel(arquivo): """ Detecta as abas (sheets) de um arquivo Excel. Args: arquivo: Pode ser um objeto de arquivo ou um caminho (string) Retorna: tuple: (lista_abas, mensagem, sucesso) """ if arquivo is None: return [], "Nenhum arquivo enviado.", False try: # Obtém o caminho/nome do arquivo caminho = arquivo.name if hasattr(arquivo, 'name') else str(arquivo) if caminho.endswith(('.xlsx', '.xls')): # Usa o caminho para abrir o arquivo (funciona com path ou file object) excel_file = pd.ExcelFile(caminho) abas = excel_file.sheet_names return abas, f"Arquivo com {len(abas)} aba(s) detectada(s)", True else: # CSV não tem abas return [], "Arquivo CSV (sem abas)", True except Exception as e: return [], f"Erro ao detectar abas: {str(e)}", False def carregar_arquivo(arquivo, nome_aba=None): """ Carrega arquivo Excel ou CSV e retorna DataFrame. Detecta automaticamente o separador do CSV. Args: arquivo: Arquivo a ser carregado (objeto de arquivo ou caminho string) nome_aba: Nome da aba a carregar (apenas para Excel). Se None, carrega a primeira. Retorna: tuple: (df, mensagem, sucesso) """ if arquivo is None: return None, "Nenhum arquivo enviado.", False try: # Obtém o caminho do arquivo caminho = arquivo.name if hasattr(arquivo, 'name') else str(arquivo) if caminho.endswith(('.xlsx', '.xls')): # Carrega aba específica ou a primeira (usa caminho para poder reabrir) df = pd.read_excel(caminho, sheet_name=nome_aba if nome_aba else 0) aba_info = f" (aba: {nome_aba})" if nome_aba else "" elif caminho.endswith('.csv'): # Tenta detectar separador with open(caminho, 'rb') as f: content = f.read() # Tenta diferentes separadores for sep in [',', ';', '\t']: try: df = pd.read_csv(io.BytesIO(content), sep=sep) if len(df.columns) > 1: break except: continue else: df = pd.read_csv(io.BytesIO(content)) aba_info = "" else: return None, "Formato de arquivo não suportado.", False # Reinicia índice começando em 1 df = df.reset_index(drop=True) df.index = df.index + 1 return df, f"Arquivo carregado: {os.path.basename(caminho)}{aba_info} ({len(df)} linhas, {len(df.columns)} colunas)", True except Exception as e: return None, f"Erro ao carregar arquivo: {str(e)}", False def carregar_dai(caminho): """ Carrega arquivo .dai e extrai DataFrame, variáveis e transformações. Retorna: tuple: (df, coluna_y, colunas_x, transformacao_y, transformacoes_x, mensagem, sucesso) """ try: pacote = load(caminho) # Extrai DataFrame df = pacote["Xy_preview_out_coords"].copy() df = df.reset_index(drop=True) df.index = df.index + 1 # Parseia transformações info_transf = pacote["formatted_top_transformation_info"] # Primeiro elemento: Y (ex: "VUNIT: ln(y)") nome_y, transf_y = info_transf[0].split(": ", 1) coluna_y = nome_y.strip() transformacao_y = transf_y.strip().replace("(y)", "(x)") # Demais: X (ex: "Area: ln(x)") colunas_x = [] transformacoes_x = {} for item in info_transf[1:]: nome_x, transf_x = item.split(": ", 1) nome_x = nome_x.strip() colunas_x.append(nome_x) transformacoes_x[nome_x] = transf_x.strip() msg = f"Modelo .dai carregado: {os.path.basename(caminho)} ({len(df)} dados, Y={coluna_y}, {len(colunas_x)} variáveis X)" return df, coluna_y, colunas_x, transformacao_y, transformacoes_x, msg, True except Exception as e: return None, None, None, None, None, f"Erro ao carregar .dai: {str(e)}", False def identificar_coluna_y_padrao(df): """ Identifica a coluna padrão para variável dependente (y). Prioriza VUNIT se existir. """ colunas_lower = {col.lower(): col for col in df.columns} for nome in NOMES_VUNIT: if nome in colunas_lower: return colunas_lower[nome] # Se não encontrar, retorna primeira coluna numérica numericas = df.select_dtypes(include=[np.number]).columns.tolist() return numericas[0] if numericas else None def identificar_colunas_coords(df): """ Identifica colunas de latitude e longitude. Retorna tuple (lat_col, lon_col) ou (None, None). """ colunas_lower = {col.lower(): col for col in df.columns} lat_col = None lon_col = None for nome in NOMES_LAT: if nome in colunas_lower: lat_col = colunas_lower[nome] break for nome in NOMES_LON: if nome in colunas_lower: lon_col = colunas_lower[nome] break return lat_col, lon_col def obter_colunas_numericas(df, excluir_coords=True): """ Retorna lista de colunas numéricas. Opcionalmente exclui lat/lon. """ numericas = df.select_dtypes(include=[np.number]).columns.tolist() if excluir_coords: lat_col, lon_col = identificar_colunas_coords(df) if lat_col and lat_col in numericas: numericas.remove(lat_col) if lon_col and lon_col in numericas: numericas.remove(lon_col) return numericas # ============================================================ # ESTATÍSTICAS # ============================================================ def calcular_estatisticas_variaveis(df, coluna_y=None, indices_usar=None, colunas=None): """ Calcula estatísticas para cada coluna numérica. Se coluna_y for especificada, inclui correlação com y. Se colunas for especificada, calcula apenas para essas colunas. Parâmetros: df: DataFrame com os dados coluna_y: nome da coluna da variável dependente (opcional) indices_usar: lista de índices a incluir (se None, usa todos) colunas: lista de colunas a calcular (se None, usa todas numéricas) Retorna DataFrame com estatísticas. """ # Filtra por índices se especificado df_calc = df.copy() if indices_usar is not None: df_calc = df_calc.loc[indices_usar] # Usa colunas especificadas ou todas numéricas if colunas is not None: numericas = [c for c in colunas if c in df_calc.columns and pd.api.types.is_numeric_dtype(df_calc[c])] else: numericas = obter_colunas_numericas(df_calc) estatisticas = [] for col in numericas: serie = df_calc[col].dropna() stats = { "Variável": col, "Contagem": len(serie), "Média": serie.mean(), "Mediana": serie.median(), "Mínimo": serie.min(), "Máximo": serie.max(), "Desvio Padrão": serie.std(), } # Coeficiente de variação if stats["Média"] != 0: stats["CV (%)"] = abs(stats["Desvio Padrão"] / stats["Média"]) * 100 else: stats["CV (%)"] = np.nan # Correlação com y if coluna_y and col != coluna_y: y_serie = df_calc[coluna_y].dropna() comum = serie.index.intersection(y_serie.index) if len(comum) > 2: stats["Correlação (y)"] = np.corrcoef(serie.loc[comum], y_serie.loc[comum])[0, 1] else: stats["Correlação (y)"] = np.nan else: stats["Correlação (y)"] = np.nan if col != coluna_y else 1.0 # Detecta se é dicotômica valores_unicos = set(serie.unique()) stats["Dicotômica"] = valores_unicos.issubset({0, 1, 0.0, 1.0}) estatisticas.append(stats) df_stats = pd.DataFrame(estatisticas) # Ordena por correlação absoluta (maiores primeiro) if coluna_y: df_stats["_abs_corr"] = df_stats["Correlação (y)"].abs() df_stats = df_stats.sort_values("_abs_corr", ascending=False) df_stats = df_stats.drop(columns=["_abs_corr"]) return df_stats.reset_index(drop=True) # ============================================================ # ANÁLISE DE OUTLIERS # ============================================================ def calcular_metricas_outliers(df, coluna_y, colunas_x, indices_participantes=None): """ Ajusta modelo simples (sem transformações) para calcular métricas de outliers. Parâmetros: df: DataFrame com os dados coluna_y: nome da coluna da variável dependente colunas_x: lista de nomes das colunas independentes indices_participantes: lista de índices a incluir (se None, usa todos) Retorna: DataFrame com métricas: Índice, Observado, Calculado, Resíduo, Resíduo Pad., Resíduo Stud., Cook """ if not colunas_x: return None # Filtra por participantes df_calc = df.copy() if indices_participantes is not None: df_calc = df_calc.loc[indices_participantes] # Prepara y (sem transformação) y = df_calc[coluna_y].values # Prepara X (sem transformação) X = df_calc[colunas_x].copy() # Remove linhas com NaN ou Inf mask = ~(np.isnan(y) | np.isinf(y)) for col in X.columns: mask &= ~(np.isnan(X[col]) | np.isinf(X[col])) y_final = y[mask] X_final = X.loc[mask.values if hasattr(mask, 'values') else mask] indices_final = df_calc.index[mask] if len(y_final) < len(colunas_x) + 2: return None try: # Ajusta modelo simples X_sm = sm.add_constant(X_final) modelo = sm.OLS(y_final, X_sm).fit() # Calcula métricas residuos = modelo.resid n = len(y_final) k = len(colunas_x) desvio_padrao = np.sqrt(np.sum(residuos**2) / (n - k - 1)) residuos_pad = residuos / desvio_padrao influence = OLSInfluence(modelo) residuos_stud = influence.resid_studentized cook = influence.cooks_distance[0] # Monta DataFrame df_metricas = pd.DataFrame({ "Índice": indices_final, "Observado": y_final, "Calculado": modelo.predict(), "Resíduo": residuos, "Resíduo Pad.": residuos_pad, "Resíduo Stud.": residuos_stud, "Cook": cook }) df_metricas = df_metricas.set_index("Índice") return df_metricas except Exception as e: print(f"Erro ao calcular métricas de outliers: {e}") return None def sugerir_outliers(df_metricas, coluna, valor): """ Aplica filtro simétrico e retorna lista de índices sugeridos como outliers. Filtra: coluna <= -valor OU coluna >= +valor Parâmetros: df_metricas: DataFrame com métricas de outliers coluna: nome da coluna para filtro (ex: "Resíduo Pad.") valor: valor absoluto do limite (ex: 2.0 significa <= -2 OU >= +2) Retorna: Lista de índices sugeridos como outliers """ if df_metricas is None or df_metricas.empty: return [] if coluna not in df_metricas.columns: return [] # Filtro simétrico: <= -valor OU >= +valor mask = (df_metricas[coluna] <= -valor) | (df_metricas[coluna] >= valor) indices = df_metricas[mask].index.tolist() return sorted(indices) # ============================================================ # TRANSFORMAÇÕES # ============================================================ def aplicar_transformacao(data, transformacao): """ Aplica uma transformação matemática aos dados. """ data = np.asarray(data, dtype=float) # Evita overflow em exp if transformacao == "exp(x)" and (data > 50).any(): return data if transformacao == "(x)": return data elif transformacao == "1/(x)": return 1 / (data + 0.001) elif transformacao == "ln(x)": return np.log(np.maximum(data, 0.001)) elif transformacao == "exp(x)": return np.exp(data) elif transformacao == "(x)^2": return data ** 2 elif transformacao == "raiz(x)": return np.sqrt(np.maximum(data, 0)) elif transformacao == "1/raiz(x)": return 1 / (np.sqrt(np.maximum(data, 0)) + 0.001) else: return data def inverter_transformacao_y(data, transformacao): """ Inverte a transformação aplicada em y para obter valores na escala original. """ data = np.asarray(data, dtype=float) if transformacao in ["(y)", "(x)", "direct"]: return data elif transformacao in ["ln(y)", "ln(x)"]: return np.exp(data) elif transformacao in ["1/(y)", "1/(x)"]: return 1 / (data + 0.001) elif transformacao in ["exp(y)", "exp(x)"]: return np.log(np.maximum(data, 0.001)) elif transformacao in ["(y)^2", "(x)^2"]: return np.sqrt(np.maximum(data, 0)) elif transformacao in ["raiz(y)", "raiz(x)"]: return data ** 2 elif transformacao in ["1/raiz(y)", "1/raiz(x)"]: return 1 / (data ** 2 + 0.001) else: return data def formatar_transformacao(transformacao, is_y=False): """ Formata a string da transformação para exibição. """ if transformacao == "(x)": return "(y)" if is_y else "(x)" elif transformacao == "1/(x)": return "1/(y)" if is_y else "1/(x)" elif transformacao == "ln(x)": return "ln(y)" if is_y else "ln(x)" elif transformacao == "exp(x)": return "exp(y)" if is_y else "exp(x)" elif transformacao == "(x)^2": return "(y)^2" if is_y else "(x)^2" elif transformacao == "raiz(x)": return "raiz(y)" if is_y else "raiz(x)" elif transformacao == "1/raiz(x)": return "1/raiz(y)" if is_y else "1/raiz(x)" else: return transformacao def detectar_dicotomicas(df, colunas): """ Detecta quais colunas são dicotômicas (apenas 0 e 1). Retorna lista de nomes de colunas dicotômicas. """ dicotomicas = [] for col in colunas: valores = set(df[col].dropna().unique()) if valores.issubset({0, 1, 0.0, 1.0}): dicotomicas.append(col) return dicotomicas # ============================================================ # MODELO OLS E DIAGNÓSTICOS # ============================================================ def ajustar_modelo(df, coluna_y, colunas_x, transformacao_y, transformacoes_x, indices_usar=None): """ Ajusta modelo OLS com as transformações especificadas. Parâmetros: df: DataFrame com os dados coluna_y: nome da coluna da variável dependente colunas_x: lista de nomes das colunas independentes transformacao_y: transformação para y transformacoes_x: dict {coluna: transformação} para X indices_usar: lista de índices a incluir (se None, usa todos) Retorna: dict com resultados do modelo ou None se falhar """ if not colunas_x: return None # Filtra por índices se especificado df_modelo = df.copy() if indices_usar is not None: df_modelo = df_modelo.loc[indices_usar] # Prepara y transformado y = df_modelo[coluna_y].values y_transf = aplicar_transformacao(y, transformacao_y) # Prepara X transformado X_transf = pd.DataFrame(index=df_modelo.index) for col in colunas_x: transf = transformacoes_x.get(col, "(x)") X_transf[col] = aplicar_transformacao(df_modelo[col].values, transf) # Remove linhas com NaN ou Inf mask = ~(np.isnan(y_transf) | np.isinf(y_transf)) for col in X_transf.columns: mask &= ~(np.isnan(X_transf[col]) | np.isinf(X_transf[col])) y_final = y_transf[mask] X_final = X_transf.loc[mask.values if hasattr(mask, 'values') else mask] indices_final = df_modelo.index[mask] if len(y_final) < len(colunas_x) + 2: return None try: # Ajusta modelo X_sm = sm.add_constant(X_final) modelo = sm.OLS(y_final, X_sm).fit() # Calcula diagnósticos diagnosticos = calcular_diagnosticos(modelo, X_final, y_final, coluna_y, transformacao_y, transformacoes_x) # Tabela de coeficientes tabela_coef = pd.DataFrame({ "Variável": modelo.params.index, "Coeficiente": modelo.params.values, "Erro Padrão": modelo.bse.values, "t-Student": modelo.tvalues.values, "p-valor": modelo.pvalues.values, "Significância": [classificar_significancia(p) for p in modelo.pvalues.values] }) # Tabela obs vs calc y_obs = y_final y_calc = modelo.predict() residuos = modelo.resid residuos_pad = residuos / diagnosticos["desvio_padrao_residuos"] influence = OLSInfluence(modelo) residuos_stud = influence.resid_studentized cook = influence.cooks_distance[0] tabela_obs_calc = pd.DataFrame({ "Índice": indices_final, "Observado": y_obs, "Calculado": y_calc, "Resíduo": residuos, "Resíduo Pad.": residuos_pad, "Resíduo Stud.": residuos_stud, "Cook": cook }) # Dados transformados X_esc_info = [f"{col}: {formatar_transformacao(transformacoes_x.get(col, '(x)'))}" for col in colunas_x] y_esc_info = f"{coluna_y}: {formatar_transformacao(transformacao_y, is_y=True)}" return { "modelo_sm": modelo, "diagnosticos": diagnosticos, "tabela_coef": tabela_coef, "tabela_obs_calc": tabela_obs_calc, "X_transformado": X_final, "y_transformado": pd.Series(y_final, index=indices_final, name=coluna_y), "transformacoes_x": transformacoes_x.copy(), "transformacao_y": transformacao_y, "colunas_x": colunas_x.copy(), "coluna_y": coluna_y, "X_esc_info": X_esc_info, "y_esc_info": y_esc_info, "indices_usados": indices_final.tolist() } except Exception as e: print(f"Erro ao ajustar modelo: {e}") return None def calcular_diagnosticos(modelo, X, y, coluna_y, transformacao_y, transformacoes_x): """ Calcula todos os diagnósticos do modelo. """ n = len(y) k = X.shape[1] residuos = modelo.resid desvio_padrao_residuos = np.sqrt(np.sum(residuos**2) / (n - k - 1)) residuos_pad = residuos / desvio_padrao_residuos # Métricas básicas mse = np.mean(residuos**2) r2 = modelo.rsquared r2_ajustado = modelo.rsquared_adj r_pearson = np.corrcoef(y, modelo.predict())[0, 1] Fc = modelo.fvalue p_valor_F = modelo.f_pvalue # Interpretação F if p_valor_F < 0.01: interp_F = "Fc > Ft: Significante ao nível de 1% (confiança de 99%) - GRAU III." elif p_valor_F < 0.025: interp_F = "Fc > Ft: Significante ao nível de 2,5% (confiança de 97,5%) - GRAU II." elif p_valor_F < 0.05: interp_F = "Fc > Ft: Significante ao nível de 5% (confiança de 95%) - GRAU I." else: interp_F = "Fc < Ft: NÃO é estatisticamente significante ao nível de 5%." # Teste KS (normalidade) ks_stat, ks_p = kstest(residuos, "norm", args=(np.mean(residuos), np.std(residuos, ddof=1))) interp_KS = "☑️Resíduos são normalmente distribuídos." if ks_p >= 0.05 else "❌Resíduos NÃO são normalmente distribuídos." # Percentuais da curva normal (formato compatível com .dai) intervalos = [(-1.00, 1.00), (-1.64, 1.64), (-1.96, 1.96)] percentuais = [] for min_int, max_int in intervalos: count = np.sum((residuos_pad >= min_int) & (residuos_pad <= max_int)) perc = round(count / len(residuos) * 100, 0) percentuais.append(f"{perc:.0f}%") perc_resid = ", ".join(percentuais) # Durbin-Watson dw = durbin_watson(residuos) k_dw = min(max(1, k), 4) tab = DW_TABLE[0.05][k_dw] dL = np.interp(n, tab["n"], tab["dL"]) dU = np.interp(n, tab["n"], tab["dU"]) if dw < dL: interp_DW = "❌Autocorrelação positiva." elif dw > 4 - dL: interp_DW = "❌Autocorrelação negativa." elif dU < dw < 4 - dU: interp_DW = "☑️Sem autocorrelação." else: interp_DW = "⚠️Região inconclusiva." # Breusch-Pagan bp_lm, bp_p, bp_f, bp_fp = het_breuschpagan(residuos, sm.add_constant(X)) interp_BP = "☑️Não há evidência de heterocedasticidade." if bp_p >= 0.05 else "❌Heterocedasticidade detectada." # Equação do modelo equacao = formatar_equacao(modelo, coluna_y, transformacao_y, transformacoes_x, list(X.columns)) # Gera formatted_output para compatibilidade com .dai formatted_output = f"""Número de observações: {n} Número de variáveis independentes: {k} Desvio padrão dos resíduos: {desvio_padrao_residuos:.4f} MSE: {mse:.4f} R²: {r2:.4f} R² ajustado: {r2_ajustado:.4f} Correlação Pearson: {r_pearson:.4f} Estatística F: {Fc:.4f} (p-valor: {p_valor_F:.4f}) -> {interp_F} ---------------------------------------------------------------------- Teste de Normalidade (Kolmogorov-Smirnov): Estatística KS: {ks_stat:.4f}, p-valor: {ks_p:.4f} -> {interp_KS} Teste de Normalidade (Comparação com a curva normal) – percentuais atingidos: {perc_resid} • Ideal 68% → aceitável entre 64% e 75% • Ideal 90% → aceitável entre 88% e 95% • Ideal 95% → aceitável entre 95% e 100% ---------------------------------------------------------------------- Teste de Autocorrelação (Durbin-Watson): DW: {dw:.4f} -> {interp_DW} ---------------------------------------------------------------------- Teste de Homocedasticidade (Breusch-Pagan): Estatística LM: {bp_lm:.4f}, p-valor: {bp_p:.4f} {interp_BP} ---------------------------------------------------------------------- Equação do Modelo: {equacao}""" return { "n": n, "k": k, "desvio_padrao_residuos": desvio_padrao_residuos, "mse": mse, "r2": r2, "r2_ajustado": r2_ajustado, "r_pearson": r_pearson, "Fc": Fc, "p_valor_F": p_valor_F, "interp_F": interp_F, "ks_stat": ks_stat, "ks_p": ks_p, "interp_KS": interp_KS, "perc_resid": perc_resid, "dw": dw, "interp_DW": interp_DW, "bp_lm": bp_lm, "bp_p": bp_p, "interp_BP": interp_BP, "equacao": equacao, "formatted_output": formatted_output } def classificar_significancia(p_valor): """Classifica significância segundo NBR 14.653-2.""" if p_valor <= 0.10: return "Grau III" elif p_valor <= 0.20: return "Grau II" elif p_valor <= 0.30: return "Grau I" else: return "Sem enquadramento" def formatar_equacao(modelo, coluna_y, transformacao_y, transformacoes_x, colunas_x): """Formata a equação do modelo.""" equacao = f"{modelo.params['const']:.6f}" for col in colunas_x: coef = modelo.params[col] transf = transformacoes_x.get(col, "(x)") if transf == "ln(x)": termo = f"ln({col})" elif transf == "1/(x)": termo = f"1/{col}" elif transf == "(x)^2": termo = f"({col})²" elif transf == "raiz(x)": termo = f"√({col})" elif transf == "1/raiz(x)": termo = f"1/√({col})" elif transf == "exp(x)": termo = f"exp({col})" else: termo = col sinal = "+" if coef >= 0 else "-" equacao += f" {sinal} {abs(coef):.6f} × {termo}" # Aplica inversão de y if transformacao_y == "ln(x)": equacao = f"exp({equacao})" elif transformacao_y == "(x)^2": equacao = f"√({equacao})" elif transformacao_y == "1/(x)": equacao = f"1/({equacao})" elif transformacao_y == "raiz(x)": equacao = f"({equacao})²" elif transformacao_y == "1/raiz(x)": equacao = f"1/({equacao})²" return f"{coluna_y} = {equacao}" # ============================================================ # BUSCA AUTOMÁTICA DE TRANSFORMAÇÕES (OTIMIZADA) # ============================================================ def _calcular_r2_numpy(X, y): """ Calcula R² usando operações matriciais diretas (mais rápido que statsmodels). """ n = len(y) if n == 0: return -np.inf # Adiciona constante X_const = np.column_stack([np.ones(n), X]) try: # OLS: beta = (X'X)^-1 X'y XtX = X_const.T @ X_const Xty = X_const.T @ y beta = np.linalg.solve(XtX, Xty) # R² y_pred = X_const @ beta ss_res = np.sum((y - y_pred) ** 2) ss_tot = np.sum((y - np.mean(y)) ** 2) if ss_tot == 0: return -np.inf return 1 - (ss_res / ss_tot) except: return -np.inf def _precomputar_transformacoes(df, colunas): """ Pré-computa todas as transformações válidas para as colunas. Retorna dict: {(coluna, transformacao): array} """ cache = {} for col in colunas: valores = df[col].values.astype(float) for transf in TRANSFORMACOES: dados_transf = aplicar_transformacao(valores, transf) # Só armazena se válido if not (np.isnan(dados_transf).any() or np.isinf(dados_transf).any()): cache[(col, transf)] = dados_transf return cache def _buscar_stepwise(colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, df_busca): """ Busca stepwise: otimiza uma variável por vez. Mais rápido para muitas variáveis (O(7*n) em vez de O(7^n)). """ # Inicializa com transformação (x) para todas transf_x_atuais = {col: "(x)" for col in colunas_livres} # Para cada variável X, encontrar melhor transformação for col in colunas_livres: melhor_transf = "(x)" melhor_r2 = -np.inf for transf in TRANSFORMACOES: if (col, transf) not in x_cache: continue # Testa com esta transformação transf_teste = transf_x_atuais.copy() transf_teste[col] = transf # Monta matriz X X_arrays = [] valido = True for c in colunas_x: if c in transformacoes_fixas: t = transformacoes_fixas[c] else: t = transf_teste.get(c, "(x)") if (c, t) in x_cache: X_arrays.append(x_cache[(c, t)]) else: valido = False break if not valido or not X_arrays: continue X = np.column_stack(X_arrays) r2 = _calcular_r2_numpy(X, y_transf) if r2 > melhor_r2: melhor_r2 = r2 melhor_transf = transf transf_x_atuais[col] = melhor_transf # Calcula R² final transf_x_final = {**transformacoes_fixas, **transf_x_atuais} X_arrays = [] for c in colunas_x: t = transf_x_final.get(c, "(x)") if (c, t) in x_cache: X_arrays.append(x_cache[(c, t)]) if X_arrays: X = np.column_stack(X_arrays) r2_final = _calcular_r2_numpy(X, y_transf) else: r2_final = -np.inf return transf_x_final, r2_final def _buscar_exaustivo_numpy(colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, top_n): """ Busca exaustiva usando numpy para cálculo rápido de R². Para uso quando o número de combinações é pequeno (<= 5000). """ n_livres = len(colunas_livres) resultados_transf = [] for combo in product(TRANSFORMACOES, repeat=n_livres): # Monta dict de transformações transf_x = transformacoes_fixas.copy() for i, col in enumerate(colunas_livres): transf_x[col] = combo[i] # Monta matriz X X_arrays = [] valido = True for col in colunas_x: t = transf_x.get(col, "(x)") if (col, t) in x_cache: X_arrays.append(x_cache[(col, t)]) else: valido = False break if not valido or not X_arrays: continue X = np.column_stack(X_arrays) r2 = _calcular_r2_numpy(X, y_transf) if r2 > -np.inf: resultados_transf.append((r2, transf_x.copy())) # Ordena e retorna top resultados_transf.sort(key=lambda x: x[0], reverse=True) return resultados_transf[:top_n] def buscar_melhores_transformacoes(df, coluna_y, colunas_x, transformacoes_fixas=None, transformacao_y_fixa=None, indices_usar=None, top_n=5): """ Busca otimizada das melhores combinações de transformações para maximizar R². Usa estratégia híbrida: - Para poucos combos (<=5000): busca exaustiva com numpy (rápida) - Para muitos combos (>5000): busca stepwise (O(7*n) em vez de O(7^n)) Parâmetros: df: DataFrame coluna_y: variável dependente colunas_x: lista de variáveis independentes transformacoes_fixas: dict {coluna: transformação} para variáveis com transformação fixa transformacao_y_fixa: transformação fixa para y (ou None para testar todas) indices_usar: lista de índices a incluir (se None, usa todos) top_n: número de melhores modelos a retornar Retorna: Lista de dicts com informações dos top N modelos """ if transformacoes_fixas is None: transformacoes_fixas = {} # Filtra por índices se especificado df_busca = df.copy() if indices_usar is not None: df_busca = df_busca.loc[indices_usar] # Detecta dicotômicas e fixa em (x) dicotomicas = detectar_dicotomicas(df_busca, colunas_x) for col in dicotomicas: if col not in transformacoes_fixas: transformacoes_fixas[col] = "(x)" # Identifica colunas livres (não fixas) colunas_livres = [col for col in colunas_x if col not in transformacoes_fixas] n_livres = len(colunas_livres) # Define transformações de Y a testar if transformacao_y_fixa: transformacoes_y = [transformacao_y_fixa] else: transformacoes_y = TRANSFORMACOES # Pré-computa todas as transformações de X (apenas colunas livres) x_cache = _precomputar_transformacoes(df_busca, colunas_livres) # Adiciona transformações fixas ao cache for col, transf in transformacoes_fixas.items(): if col in colunas_x: dados = aplicar_transformacao(df_busca[col].values.astype(float), transf) if not (np.isnan(dados).any() or np.isinf(dados).any()): x_cache[(col, transf)] = dados # Calcula número total de combinações total_combos = len(TRANSFORMACOES) ** n_livres if n_livres > 0 else 1 usar_stepwise = total_combos > 5000 resultados = [] for transf_y in transformacoes_y: # Pré-computa Y transformado y_transf = aplicar_transformacao(df_busca[coluna_y].values.astype(float), transf_y) # Se y_transf tem problemas, pula if np.isnan(y_transf).any() or np.isinf(y_transf).any(): continue if n_livres == 0: # Todas as variáveis são fixas, só calcula R² X_arrays = [x_cache[(col, transformacoes_fixas[col])] for col in colunas_x if (col, transformacoes_fixas.get(col, "(x)")) in x_cache] if X_arrays: X = np.column_stack(X_arrays) r2 = _calcular_r2_numpy(X, y_transf) if r2 > -np.inf: resultados.append({ "r2": r2, "transformacao_y": transf_y, "transformacoes_x": transformacoes_fixas.copy() }) elif usar_stepwise: # Busca stepwise para muitas variáveis transf_x, r2 = _buscar_stepwise( colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, df_busca ) if r2 > -np.inf: resultados.append({ "r2": r2, "transformacao_y": transf_y, "transformacoes_x": transf_x }) else: # Busca exaustiva para poucas variáveis top_combos = _buscar_exaustivo_numpy( colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, top_n ) for r2, transf_x in top_combos: resultados.append({ "r2": r2, "transformacao_y": transf_y, "transformacoes_x": transf_x }) # Ordena por R² e retorna top N resultados = sorted(resultados, key=lambda x: x["r2"], reverse=True)[:top_n] # Adiciona ranking for i, r in enumerate(resultados): r["rank"] = i + 1 return resultados # ============================================================ # MICRONUMEROSIDADE (NBR 14.653-2) # ============================================================ def testar_micronumerosidade(df, colunas_x): """ Testa critérios de micronumerosidade da NBR 14.653-2. """ # Filtra apenas colunas inteiras (dicotômicas/categóricas) colunas_int = [col for col in colunas_x if df[col].dtype in ['int64', 'int32']] n = len(df) k = len(colunas_x) # Condição geral: n >= 3*(k+1) condicao_geral = n >= 3 * (k + 1) resultados_por_coluna = {} for coluna in colunas_int: frequencia = df[coluna].value_counts() ni_valido = True mensagens = [] for categoria, ni in frequencia.items(): if n <= 30: if ni < 3: ni_valido = False mensagens.append(f"ni={ni} < 3 na categoria {categoria}") elif 30 < n <= 100: if ni < 0.1 * n: ni_valido = False mensagens.append(f"ni={ni} < 10% de n na categoria {categoria}") elif n > 100: if ni <= 10: ni_valido = False mensagens.append(f"ni={ni} <= 10 na categoria {categoria}") resultados_por_coluna[coluna] = { "valido": ni_valido, "mensagens": mensagens if mensagens else ["OK"] } return { "n": n, "k": k, "condicao_geral_ok": condicao_geral, "por_coluna": resultados_por_coluna } # ============================================================ # EXPORTAR BASE TRATADA (CSV) # ============================================================ def exportar_base_csv(df): """ Exporta DataFrame para arquivo CSV temporário. Parâmetros: df: DataFrame a exportar Retorna: str: Caminho do arquivo CSV gerado """ import tempfile import os if df is None or df.empty: return None # Cria arquivo temporário fd, caminho = tempfile.mkstemp(suffix=".csv", prefix="base_tratada_") os.close(fd) # Salva CSV df.to_csv(caminho, index=True, encoding="utf-8-sig", sep=";", decimal=",") return caminho # ============================================================ # EXPORTAR MODELO (.dai) # ============================================================ def exportar_modelo_dai(resultado_modelo, df_original, estatisticas, nome_arquivo, graficos=None): """ Exporta o modelo em formato .dai compatível com app MODELOS_DAI. Parâmetros: resultado_modelo: dict com resultado do ajuste df_original: DataFrame original estatisticas: DataFrame com estatísticas (mantido como DataFrame) nome_arquivo: nome do arquivo de saída graficos: dict opcional com gráficos Plotly (chaves: obs_calc, residuos, etc.) Retorna: tuple: (caminho_arquivo, mensagem) """ import tempfile if resultado_modelo is None: return None, "Nenhum modelo para exportar" try: # Função auxiliar para converter StringDtype para object e limpar DataFrame def limpar_df_para_export(df): """Converte StringDtype para object e remove colunas Unnamed.""" df_clean = df.copy() # Remove colunas 'Unnamed: X' cols_unnamed = [c for c in df_clean.columns if str(c).startswith('Unnamed')] if cols_unnamed: df_clean = df_clean.drop(columns=cols_unnamed) # Converte StringDtype para object (compatibilidade entre versões do pandas) for col in df_clean.columns: if pd.api.types.is_string_dtype(df_clean[col]): df_clean[col] = df_clean[col].astype(object) # Converte index se for string dtype (compatibilidade com .dai) if pd.api.types.is_string_dtype(df_clean.index): df_clean.index = df_clean.index.astype(object) return df_clean # Monta pacote lat_col, lon_col = identificar_colunas_coords(df_original) # DataFrame com coords para o mapa df_coords = limpar_df_para_export(df_original) if lat_col: df_coords = df_coords.rename(columns={lat_col: "lat"}) if lon_col: df_coords = df_coords.rename(columns={lon_col: "lon"}) # Filtra apenas índices usados no modelo indices_modelo = resultado_modelo["indices_usados"] df_modelo = df_coords.loc[indices_modelo] if indices_modelo else df_coords df_modelo.index.name = None # Remove nome do índice # Gera gráfico HTML para graf_model (se graficos fornecidos) graf_model_html = "" if graficos and graficos.get("obs_calc"): try: graf_model_html = pio.to_html(graficos["obs_calc"], full_html=True, include_plotlyjs='cdn') except: graf_model_html = "" # Prepara tabela_coef no formato compatível (índice = Variável, sem coluna "Variável") tabela_coef_export = limpar_df_para_export(resultado_modelo["tabela_coef"]) if "Variável" in tabela_coef_export.columns: tabela_coef_export = tabela_coef_export.set_index("Variável") tabela_coef_export.index.name = None # Remove nome do índice # Converte index para object após set_index (compatibilidade .dai) if pd.api.types.is_string_dtype(tabela_coef_export.index): tabela_coef_export.index = tabela_coef_export.index.astype(object) # Renomeia p-valor para p-value tabela_coef_export = tabela_coef_export.rename(columns={"p-valor": "p-value"}) # Prepara tabela_obs_calc no formato compatível tabela_obs_calc_export = limpar_df_para_export(resultado_modelo["tabela_obs_calc"]) # Move Índice para o index do DataFrame if "Índice" in tabela_obs_calc_export.columns: tabela_obs_calc_export = tabela_obs_calc_export.set_index("Índice") tabela_obs_calc_export.index.name = None # Remove nome do índice # Renomeia colunas para formato compatível tabela_obs_calc_export = tabela_obs_calc_export.rename(columns={ "Resíduo Pad.": "Resíduo Padronizado", "Resíduo Stud.": "Resíduo Studentizado", "Cook": "Distância de Cook" }) # Prepara modelos_resumos (diagnosticos) no formato compatível diag = resultado_modelo["diagnosticos"].copy() modelos_resumos = { "n": diag.get("n"), "k": diag.get("k"), "desvio_padrao_residuos": diag.get("desvio_padrao_residuos"), "mse": diag.get("mse"), "r2": diag.get("r2"), "r2_ajustado": diag.get("r2_ajustado"), "r_pearson": diag.get("r_pearson"), "Fc": diag.get("Fc"), "p_valor_F": diag.get("p_valor_F"), "Interpretacao_F": diag.get("interp_F"), "ks_stat": diag.get("ks_stat"), "ks_p": diag.get("ks_p"), "Interpretacao_KS": diag.get("interp_KS"), "perc_resid": diag.get("perc_resid"), "dw": diag.get("dw"), "Interpretacao_DW": diag.get("interp_DW"), "bp_lm": diag.get("bp_lm"), "bp_p": diag.get("bp_p"), "Interpretacao_BP": diag.get("interp_BP"), "equacao": diag.get("equacao"), "formatted_output": diag.get("formatted_output", "") } # Prepara estatisticas no formato compatível (índice = Variável) estatisticas_df = estatisticas if isinstance(estatisticas, pd.DataFrame) else pd.DataFrame(estatisticas) estatisticas_export = limpar_df_para_export(estatisticas_df) if "Variável" in estatisticas_export.columns: estatisticas_export = estatisticas_export.set_index("Variável") estatisticas_export.index.name = None # Remove nome do índice # Converte index para object após set_index (compatibilidade .dai) if pd.api.types.is_string_dtype(estatisticas_export.index): estatisticas_export.index = estatisticas_export.index.astype(object) # Adiciona coluna Tipo (dtype) se não existir if "Tipo" not in estatisticas_export.columns: # Obtém tipos das colunas do DataFrame original tipos = {} for var in estatisticas_export.index: if var in df_original.columns: tipos[var] = str(df_original[var].dtype) else: tipos[var] = "unknown" estatisticas_export.insert(0, "Tipo", pd.Series(tipos)) # Mantém apenas colunas compatíveis (na ordem correta) colunas_manter = ["Tipo", "Contagem", "Média", "Mediana", "Mínimo", "Máximo", "Desvio Padrão"] colunas_existentes = [c for c in colunas_manter if c in estatisticas_export.columns] estatisticas_export = estatisticas_export[colunas_existentes] # Converte dtypes para compatibilidade com formato .dai if "Contagem" in estatisticas_export.columns: estatisticas_export["Contagem"] = estatisticas_export["Contagem"].astype(str).astype(object) if "Tipo" in estatisticas_export.columns: estatisticas_export["Tipo"] = estatisticas_export["Tipo"].astype(object) # Limpa top_X_esc e top_y_esc top_X_esc_export = limpar_df_para_export(resultado_modelo["X_transformado"]) top_X_esc_export.index.name = None # Remove nome do índice top_y_esc_export = resultado_modelo["y_transformado"].copy() top_y_esc_export.index.name = None # Remove nome do índice pacote = { "Xy_preview_out_coords": df_modelo, "estatisticas": estatisticas_export, "formatted_top_transformation_info": [resultado_modelo["y_esc_info"]] + resultado_modelo["X_esc_info"], "top_X_esc": top_X_esc_export, "top_y_esc": top_y_esc_export, "modelos_resumos": modelos_resumos, "tabelas_coef": tabela_coef_export, "tabelas_obs_calc": tabela_obs_calc_export, "graf_model": graf_model_html, "modelos_sm": resultado_modelo["modelo_sm"] } # Função para converter recursivamente StringDtype para object (compatibilidade HF) def converter_para_serializacao(obj): """Converte recursivamente todos os StringDtype para object dtype.""" if isinstance(obj, pd.DataFrame): df = obj.copy() if pd.api.types.is_string_dtype(df.index): df.index = df.index.astype(object) for col in df.columns: if pd.api.types.is_string_dtype(df[col]): df[col] = df[col].astype(object) return df elif isinstance(obj, pd.Series): s = obj.copy() if pd.api.types.is_string_dtype(s): s = s.astype(object) if pd.api.types.is_string_dtype(s.index): s.index = s.index.astype(object) return s elif isinstance(obj, dict): return {k: converter_para_serializacao(v) for k, v in obj.items()} elif isinstance(obj, list): return [converter_para_serializacao(v) for v in obj] else: return obj # Converte todo o pacote para garantir compatibilidade entre versões do pandas pacote = converter_para_serializacao(pacote) # Cria arquivo temporário para download (compatível com Hugging Face) nome_base = nome_arquivo.replace(".dai", "") if nome_arquivo.endswith(".dai") else nome_arquivo fd, caminho = tempfile.mkstemp(suffix=".dai", prefix=f"{nome_base}_") os.close(fd) dump(pacote, caminho) return caminho, f"Modelo pronto para download" except Exception as e: return None, f"Erro ao exportar: {str(e)}"