gui-sparim's picture
Upload 3 files
c9b6226 verified
# -*- coding: utf-8 -*-
"""
core.py - Lógica de negócio para elaboração de modelos estatísticos
Contém: carregamento de dados, estatísticas, transformações, modelo OLS, diagnósticos
"""
import os
import pandas as pd
# Desabilita StringDtype para compatibilidade entre versões do pandas
pd.set_option('future.infer_string', False)
import numpy as np
import statsmodels.api as sm
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.outliers_influence import OLSInfluence
from scipy.stats import kstest
from itertools import product
from joblib import dump, load
import io
import plotly.io as pio
# ============================================================
# CONSTANTES
# ============================================================
TRANSFORMACOES = ["(x)", "1/(x)", "ln(x)", "exp(x)", "(x)^2", "raiz(x)", "1/raiz(x)"]
# Nomes comuns para identificar colunas especiais
NOMES_VUNIT = {"vunit", "vu", "valor_unitario", "vuloc", "vupriv", "vuaconst", "vuapriv", "valor unitário"}
NOMES_LAT = {"lat", "latitude", "y", "siat_latitude"}
NOMES_LON = {"lon", "longitude", "long", "x", "siat_longitude"}
# Tabela Durbin-Watson
DW_TABLE = {
0.05: {
1: {"n": [25, 50, 100, 200], "dL": [1.10, 1.32, 1.49, 1.57], "dU": [1.54, 1.62, 1.70, 1.75]},
2: {"n": [25, 50, 100, 200], "dL": [1.06, 1.30, 1.47, 1.55], "dU": [1.50, 1.60, 1.69, 1.74]},
3: {"n": [25, 50, 100, 200], "dL": [1.02, 1.28, 1.45, 1.54], "dU": [1.47, 1.58, 1.68, 1.73]},
4: {"n": [25, 50, 100, 200], "dL": [0.98, 1.26, 1.43, 1.52], "dU": [1.44, 1.56, 1.66, 1.72]},
}
}
# ============================================================
# CARREGAMENTO DE DADOS
# ============================================================
def detectar_abas_excel(arquivo):
"""
Detecta as abas (sheets) de um arquivo Excel.
Args:
arquivo: Pode ser um objeto de arquivo ou um caminho (string)
Retorna:
tuple: (lista_abas, mensagem, sucesso)
"""
if arquivo is None:
return [], "Nenhum arquivo enviado.", False
try:
# Obtém o caminho/nome do arquivo
caminho = arquivo.name if hasattr(arquivo, 'name') else str(arquivo)
if caminho.endswith(('.xlsx', '.xls')):
# Usa o caminho para abrir o arquivo (funciona com path ou file object)
excel_file = pd.ExcelFile(caminho)
abas = excel_file.sheet_names
return abas, f"Arquivo com {len(abas)} aba(s) detectada(s)", True
else:
# CSV não tem abas
return [], "Arquivo CSV (sem abas)", True
except Exception as e:
return [], f"Erro ao detectar abas: {str(e)}", False
def carregar_arquivo(arquivo, nome_aba=None):
"""
Carrega arquivo Excel ou CSV e retorna DataFrame.
Detecta automaticamente o separador do CSV.
Args:
arquivo: Arquivo a ser carregado (objeto de arquivo ou caminho string)
nome_aba: Nome da aba a carregar (apenas para Excel). Se None, carrega a primeira.
Retorna:
tuple: (df, mensagem, sucesso)
"""
if arquivo is None:
return None, "Nenhum arquivo enviado.", False
try:
# Obtém o caminho do arquivo
caminho = arquivo.name if hasattr(arquivo, 'name') else str(arquivo)
if caminho.endswith(('.xlsx', '.xls')):
# Carrega aba específica ou a primeira (usa caminho para poder reabrir)
df = pd.read_excel(caminho, sheet_name=nome_aba if nome_aba else 0)
aba_info = f" (aba: {nome_aba})" if nome_aba else ""
elif caminho.endswith('.csv'):
# Tenta detectar separador
with open(caminho, 'rb') as f:
content = f.read()
# Tenta diferentes separadores
for sep in [',', ';', '\t']:
try:
df = pd.read_csv(io.BytesIO(content), sep=sep)
if len(df.columns) > 1:
break
except:
continue
else:
df = pd.read_csv(io.BytesIO(content))
aba_info = ""
else:
return None, "Formato de arquivo não suportado.", False
# Reinicia índice começando em 1
df = df.reset_index(drop=True)
df.index = df.index + 1
return df, f"Arquivo carregado: {os.path.basename(caminho)}{aba_info} ({len(df)} linhas, {len(df.columns)} colunas)", True
except Exception as e:
return None, f"Erro ao carregar arquivo: {str(e)}", False
def carregar_dai(caminho):
"""
Carrega arquivo .dai e extrai DataFrame, variáveis e transformações.
Retorna:
tuple: (df, coluna_y, colunas_x, transformacao_y, transformacoes_x, mensagem, sucesso)
"""
try:
pacote = load(caminho)
# Extrai DataFrame
df = pacote["Xy_preview_out_coords"].copy()
df = df.reset_index(drop=True)
df.index = df.index + 1
# Parseia transformações
info_transf = pacote["formatted_top_transformation_info"]
# Primeiro elemento: Y (ex: "VUNIT: ln(y)")
nome_y, transf_y = info_transf[0].split(": ", 1)
coluna_y = nome_y.strip()
transformacao_y = transf_y.strip().replace("(y)", "(x)")
# Demais: X (ex: "Area: ln(x)")
colunas_x = []
transformacoes_x = {}
for item in info_transf[1:]:
nome_x, transf_x = item.split(": ", 1)
nome_x = nome_x.strip()
colunas_x.append(nome_x)
transformacoes_x[nome_x] = transf_x.strip()
msg = f"Modelo .dai carregado: {os.path.basename(caminho)} ({len(df)} dados, Y={coluna_y}, {len(colunas_x)} variáveis X)"
return df, coluna_y, colunas_x, transformacao_y, transformacoes_x, msg, True
except Exception as e:
return None, None, None, None, None, f"Erro ao carregar .dai: {str(e)}", False
def identificar_coluna_y_padrao(df):
"""
Identifica a coluna padrão para variável dependente (y).
Prioriza VUNIT se existir.
"""
colunas_lower = {col.lower(): col for col in df.columns}
for nome in NOMES_VUNIT:
if nome in colunas_lower:
return colunas_lower[nome]
# Se não encontrar, retorna primeira coluna numérica
numericas = df.select_dtypes(include=[np.number]).columns.tolist()
return numericas[0] if numericas else None
def identificar_colunas_coords(df):
"""
Identifica colunas de latitude e longitude.
Retorna tuple (lat_col, lon_col) ou (None, None).
"""
colunas_lower = {col.lower(): col for col in df.columns}
lat_col = None
lon_col = None
for nome in NOMES_LAT:
if nome in colunas_lower:
lat_col = colunas_lower[nome]
break
for nome in NOMES_LON:
if nome in colunas_lower:
lon_col = colunas_lower[nome]
break
return lat_col, lon_col
def obter_colunas_numericas(df, excluir_coords=True):
"""
Retorna lista de colunas numéricas.
Opcionalmente exclui lat/lon.
"""
numericas = df.select_dtypes(include=[np.number]).columns.tolist()
if excluir_coords:
lat_col, lon_col = identificar_colunas_coords(df)
if lat_col and lat_col in numericas:
numericas.remove(lat_col)
if lon_col and lon_col in numericas:
numericas.remove(lon_col)
return numericas
# ============================================================
# ESTATÍSTICAS
# ============================================================
def calcular_estatisticas_variaveis(df, coluna_y=None, indices_usar=None, colunas=None):
"""
Calcula estatísticas para cada coluna numérica.
Se coluna_y for especificada, inclui correlação com y.
Se colunas for especificada, calcula apenas para essas colunas.
Parâmetros:
df: DataFrame com os dados
coluna_y: nome da coluna da variável dependente (opcional)
indices_usar: lista de índices a incluir (se None, usa todos)
colunas: lista de colunas a calcular (se None, usa todas numéricas)
Retorna DataFrame com estatísticas.
"""
# Filtra por índices se especificado
df_calc = df.copy()
if indices_usar is not None:
df_calc = df_calc.loc[indices_usar]
# Usa colunas especificadas ou todas numéricas
if colunas is not None:
numericas = [c for c in colunas if c in df_calc.columns and pd.api.types.is_numeric_dtype(df_calc[c])]
else:
numericas = obter_colunas_numericas(df_calc)
estatisticas = []
for col in numericas:
serie = df_calc[col].dropna()
stats = {
"Variável": col,
"Contagem": len(serie),
"Média": serie.mean(),
"Mediana": serie.median(),
"Mínimo": serie.min(),
"Máximo": serie.max(),
"Desvio Padrão": serie.std(),
}
# Coeficiente de variação
if stats["Média"] != 0:
stats["CV (%)"] = abs(stats["Desvio Padrão"] / stats["Média"]) * 100
else:
stats["CV (%)"] = np.nan
# Correlação com y
if coluna_y and col != coluna_y:
y_serie = df_calc[coluna_y].dropna()
comum = serie.index.intersection(y_serie.index)
if len(comum) > 2:
stats["Correlação (y)"] = np.corrcoef(serie.loc[comum], y_serie.loc[comum])[0, 1]
else:
stats["Correlação (y)"] = np.nan
else:
stats["Correlação (y)"] = np.nan if col != coluna_y else 1.0
# Detecta se é dicotômica
valores_unicos = set(serie.unique())
stats["Dicotômica"] = valores_unicos.issubset({0, 1, 0.0, 1.0})
estatisticas.append(stats)
df_stats = pd.DataFrame(estatisticas)
# Ordena por correlação absoluta (maiores primeiro)
if coluna_y:
df_stats["_abs_corr"] = df_stats["Correlação (y)"].abs()
df_stats = df_stats.sort_values("_abs_corr", ascending=False)
df_stats = df_stats.drop(columns=["_abs_corr"])
return df_stats.reset_index(drop=True)
# ============================================================
# ANÁLISE DE OUTLIERS
# ============================================================
def calcular_metricas_outliers(df, coluna_y, colunas_x, indices_participantes=None):
"""
Ajusta modelo simples (sem transformações) para calcular métricas de outliers.
Parâmetros:
df: DataFrame com os dados
coluna_y: nome da coluna da variável dependente
colunas_x: lista de nomes das colunas independentes
indices_participantes: lista de índices a incluir (se None, usa todos)
Retorna:
DataFrame com métricas: Índice, Observado, Calculado, Resíduo,
Resíduo Pad., Resíduo Stud., Cook
"""
if not colunas_x:
return None
# Filtra por participantes
df_calc = df.copy()
if indices_participantes is not None:
df_calc = df_calc.loc[indices_participantes]
# Prepara y (sem transformação)
y = df_calc[coluna_y].values
# Prepara X (sem transformação)
X = df_calc[colunas_x].copy()
# Remove linhas com NaN ou Inf
mask = ~(np.isnan(y) | np.isinf(y))
for col in X.columns:
mask &= ~(np.isnan(X[col]) | np.isinf(X[col]))
y_final = y[mask]
X_final = X.loc[mask.values if hasattr(mask, 'values') else mask]
indices_final = df_calc.index[mask]
if len(y_final) < len(colunas_x) + 2:
return None
try:
# Ajusta modelo simples
X_sm = sm.add_constant(X_final)
modelo = sm.OLS(y_final, X_sm).fit()
# Calcula métricas
residuos = modelo.resid
n = len(y_final)
k = len(colunas_x)
desvio_padrao = np.sqrt(np.sum(residuos**2) / (n - k - 1))
residuos_pad = residuos / desvio_padrao
influence = OLSInfluence(modelo)
residuos_stud = influence.resid_studentized
cook = influence.cooks_distance[0]
# Monta DataFrame
df_metricas = pd.DataFrame({
"Índice": indices_final,
"Observado": y_final,
"Calculado": modelo.predict(),
"Resíduo": residuos,
"Resíduo Pad.": residuos_pad,
"Resíduo Stud.": residuos_stud,
"Cook": cook
})
df_metricas = df_metricas.set_index("Índice")
return df_metricas
except Exception as e:
print(f"Erro ao calcular métricas de outliers: {e}")
return None
def sugerir_outliers(df_metricas, coluna, valor):
"""
Aplica filtro simétrico e retorna lista de índices sugeridos como outliers.
Filtra: coluna <= -valor OU coluna >= +valor
Parâmetros:
df_metricas: DataFrame com métricas de outliers
coluna: nome da coluna para filtro (ex: "Resíduo Pad.")
valor: valor absoluto do limite (ex: 2.0 significa <= -2 OU >= +2)
Retorna:
Lista de índices sugeridos como outliers
"""
if df_metricas is None or df_metricas.empty:
return []
if coluna not in df_metricas.columns:
return []
# Filtro simétrico: <= -valor OU >= +valor
mask = (df_metricas[coluna] <= -valor) | (df_metricas[coluna] >= valor)
indices = df_metricas[mask].index.tolist()
return sorted(indices)
# ============================================================
# TRANSFORMAÇÕES
# ============================================================
def aplicar_transformacao(data, transformacao):
"""
Aplica uma transformação matemática aos dados.
"""
data = np.asarray(data, dtype=float)
# Evita overflow em exp
if transformacao == "exp(x)" and (data > 50).any():
return data
if transformacao == "(x)":
return data
elif transformacao == "1/(x)":
return 1 / (data + 0.001)
elif transformacao == "ln(x)":
return np.log(np.maximum(data, 0.001))
elif transformacao == "exp(x)":
return np.exp(data)
elif transformacao == "(x)^2":
return data ** 2
elif transformacao == "raiz(x)":
return np.sqrt(np.maximum(data, 0))
elif transformacao == "1/raiz(x)":
return 1 / (np.sqrt(np.maximum(data, 0)) + 0.001)
else:
return data
def inverter_transformacao_y(data, transformacao):
"""
Inverte a transformação aplicada em y para obter valores na escala original.
"""
data = np.asarray(data, dtype=float)
if transformacao in ["(y)", "(x)", "direct"]:
return data
elif transformacao in ["ln(y)", "ln(x)"]:
return np.exp(data)
elif transformacao in ["1/(y)", "1/(x)"]:
return 1 / (data + 0.001)
elif transformacao in ["exp(y)", "exp(x)"]:
return np.log(np.maximum(data, 0.001))
elif transformacao in ["(y)^2", "(x)^2"]:
return np.sqrt(np.maximum(data, 0))
elif transformacao in ["raiz(y)", "raiz(x)"]:
return data ** 2
elif transformacao in ["1/raiz(y)", "1/raiz(x)"]:
return 1 / (data ** 2 + 0.001)
else:
return data
def formatar_transformacao(transformacao, is_y=False):
"""
Formata a string da transformação para exibição.
"""
if transformacao == "(x)":
return "(y)" if is_y else "(x)"
elif transformacao == "1/(x)":
return "1/(y)" if is_y else "1/(x)"
elif transformacao == "ln(x)":
return "ln(y)" if is_y else "ln(x)"
elif transformacao == "exp(x)":
return "exp(y)" if is_y else "exp(x)"
elif transformacao == "(x)^2":
return "(y)^2" if is_y else "(x)^2"
elif transformacao == "raiz(x)":
return "raiz(y)" if is_y else "raiz(x)"
elif transformacao == "1/raiz(x)":
return "1/raiz(y)" if is_y else "1/raiz(x)"
else:
return transformacao
def detectar_dicotomicas(df, colunas):
"""
Detecta quais colunas são dicotômicas (apenas 0 e 1).
Retorna lista de nomes de colunas dicotômicas.
"""
dicotomicas = []
for col in colunas:
valores = set(df[col].dropna().unique())
if valores.issubset({0, 1, 0.0, 1.0}):
dicotomicas.append(col)
return dicotomicas
# ============================================================
# MODELO OLS E DIAGNÓSTICOS
# ============================================================
def ajustar_modelo(df, coluna_y, colunas_x, transformacao_y, transformacoes_x, indices_usar=None):
"""
Ajusta modelo OLS com as transformações especificadas.
Parâmetros:
df: DataFrame com os dados
coluna_y: nome da coluna da variável dependente
colunas_x: lista de nomes das colunas independentes
transformacao_y: transformação para y
transformacoes_x: dict {coluna: transformação} para X
indices_usar: lista de índices a incluir (se None, usa todos)
Retorna:
dict com resultados do modelo ou None se falhar
"""
if not colunas_x:
return None
# Filtra por índices se especificado
df_modelo = df.copy()
if indices_usar is not None:
df_modelo = df_modelo.loc[indices_usar]
# Prepara y transformado
y = df_modelo[coluna_y].values
y_transf = aplicar_transformacao(y, transformacao_y)
# Prepara X transformado
X_transf = pd.DataFrame(index=df_modelo.index)
for col in colunas_x:
transf = transformacoes_x.get(col, "(x)")
X_transf[col] = aplicar_transformacao(df_modelo[col].values, transf)
# Remove linhas com NaN ou Inf
mask = ~(np.isnan(y_transf) | np.isinf(y_transf))
for col in X_transf.columns:
mask &= ~(np.isnan(X_transf[col]) | np.isinf(X_transf[col]))
y_final = y_transf[mask]
X_final = X_transf.loc[mask.values if hasattr(mask, 'values') else mask]
indices_final = df_modelo.index[mask]
if len(y_final) < len(colunas_x) + 2:
return None
try:
# Ajusta modelo
X_sm = sm.add_constant(X_final)
modelo = sm.OLS(y_final, X_sm).fit()
# Calcula diagnósticos
diagnosticos = calcular_diagnosticos(modelo, X_final, y_final, coluna_y, transformacao_y, transformacoes_x)
# Tabela de coeficientes
tabela_coef = pd.DataFrame({
"Variável": modelo.params.index,
"Coeficiente": modelo.params.values,
"Erro Padrão": modelo.bse.values,
"t-Student": modelo.tvalues.values,
"p-valor": modelo.pvalues.values,
"Significância": [classificar_significancia(p) for p in modelo.pvalues.values]
})
# Tabela obs vs calc
y_obs = y_final
y_calc = modelo.predict()
residuos = modelo.resid
residuos_pad = residuos / diagnosticos["desvio_padrao_residuos"]
influence = OLSInfluence(modelo)
residuos_stud = influence.resid_studentized
cook = influence.cooks_distance[0]
tabela_obs_calc = pd.DataFrame({
"Índice": indices_final,
"Observado": y_obs,
"Calculado": y_calc,
"Resíduo": residuos,
"Resíduo Pad.": residuos_pad,
"Resíduo Stud.": residuos_stud,
"Cook": cook
})
# Dados transformados
X_esc_info = [f"{col}: {formatar_transformacao(transformacoes_x.get(col, '(x)'))}" for col in colunas_x]
y_esc_info = f"{coluna_y}: {formatar_transformacao(transformacao_y, is_y=True)}"
return {
"modelo_sm": modelo,
"diagnosticos": diagnosticos,
"tabela_coef": tabela_coef,
"tabela_obs_calc": tabela_obs_calc,
"X_transformado": X_final,
"y_transformado": pd.Series(y_final, index=indices_final, name=coluna_y),
"transformacoes_x": transformacoes_x.copy(),
"transformacao_y": transformacao_y,
"colunas_x": colunas_x.copy(),
"coluna_y": coluna_y,
"X_esc_info": X_esc_info,
"y_esc_info": y_esc_info,
"indices_usados": indices_final.tolist()
}
except Exception as e:
print(f"Erro ao ajustar modelo: {e}")
return None
def calcular_diagnosticos(modelo, X, y, coluna_y, transformacao_y, transformacoes_x):
"""
Calcula todos os diagnósticos do modelo.
"""
n = len(y)
k = X.shape[1]
residuos = modelo.resid
desvio_padrao_residuos = np.sqrt(np.sum(residuos**2) / (n - k - 1))
residuos_pad = residuos / desvio_padrao_residuos
# Métricas básicas
mse = np.mean(residuos**2)
r2 = modelo.rsquared
r2_ajustado = modelo.rsquared_adj
r_pearson = np.corrcoef(y, modelo.predict())[0, 1]
Fc = modelo.fvalue
p_valor_F = modelo.f_pvalue
# Interpretação F
if p_valor_F < 0.01:
interp_F = "Fc > Ft: Significante ao nível de 1% (confiança de 99%) - GRAU III."
elif p_valor_F < 0.025:
interp_F = "Fc > Ft: Significante ao nível de 2,5% (confiança de 97,5%) - GRAU II."
elif p_valor_F < 0.05:
interp_F = "Fc > Ft: Significante ao nível de 5% (confiança de 95%) - GRAU I."
else:
interp_F = "Fc < Ft: NÃO é estatisticamente significante ao nível de 5%."
# Teste KS (normalidade)
ks_stat, ks_p = kstest(residuos, "norm", args=(np.mean(residuos), np.std(residuos, ddof=1)))
interp_KS = "☑️Resíduos são normalmente distribuídos." if ks_p >= 0.05 else "❌Resíduos NÃO são normalmente distribuídos."
# Percentuais da curva normal (formato compatível com .dai)
intervalos = [(-1.00, 1.00), (-1.64, 1.64), (-1.96, 1.96)]
percentuais = []
for min_int, max_int in intervalos:
count = np.sum((residuos_pad >= min_int) & (residuos_pad <= max_int))
perc = round(count / len(residuos) * 100, 0)
percentuais.append(f"{perc:.0f}%")
perc_resid = ", ".join(percentuais)
# Durbin-Watson
dw = durbin_watson(residuos)
k_dw = min(max(1, k), 4)
tab = DW_TABLE[0.05][k_dw]
dL = np.interp(n, tab["n"], tab["dL"])
dU = np.interp(n, tab["n"], tab["dU"])
if dw < dL:
interp_DW = "❌Autocorrelação positiva."
elif dw > 4 - dL:
interp_DW = "❌Autocorrelação negativa."
elif dU < dw < 4 - dU:
interp_DW = "☑️Sem autocorrelação."
else:
interp_DW = "⚠️Região inconclusiva."
# Breusch-Pagan
bp_lm, bp_p, bp_f, bp_fp = het_breuschpagan(residuos, sm.add_constant(X))
interp_BP = "☑️Não há evidência de heterocedasticidade." if bp_p >= 0.05 else "❌Heterocedasticidade detectada."
# Equação do modelo
equacao = formatar_equacao(modelo, coluna_y, transformacao_y, transformacoes_x, list(X.columns))
# Gera formatted_output para compatibilidade com .dai
formatted_output = f"""Número de observações: {n}
Número de variáveis independentes: {k}
Desvio padrão dos resíduos: {desvio_padrao_residuos:.4f}
MSE: {mse:.4f}
R²: {r2:.4f}
R² ajustado: {r2_ajustado:.4f}
Correlação Pearson: {r_pearson:.4f}
Estatística F: {Fc:.4f} (p-valor: {p_valor_F:.4f}) -> {interp_F}
----------------------------------------------------------------------
Teste de Normalidade (Kolmogorov-Smirnov):
Estatística KS: {ks_stat:.4f}, p-valor: {ks_p:.4f} -> {interp_KS}
Teste de Normalidade (Comparação com a curva normal) – percentuais atingidos: {perc_resid}
• Ideal 68% → aceitável entre 64% e 75%
• Ideal 90% → aceitável entre 88% e 95%
• Ideal 95% → aceitável entre 95% e 100%
----------------------------------------------------------------------
Teste de Autocorrelação (Durbin-Watson):
DW: {dw:.4f} -> {interp_DW}
----------------------------------------------------------------------
Teste de Homocedasticidade (Breusch-Pagan):
Estatística LM: {bp_lm:.4f}, p-valor: {bp_p:.4f}
{interp_BP}
----------------------------------------------------------------------
Equação do Modelo:
{equacao}"""
return {
"n": n,
"k": k,
"desvio_padrao_residuos": desvio_padrao_residuos,
"mse": mse,
"r2": r2,
"r2_ajustado": r2_ajustado,
"r_pearson": r_pearson,
"Fc": Fc,
"p_valor_F": p_valor_F,
"interp_F": interp_F,
"ks_stat": ks_stat,
"ks_p": ks_p,
"interp_KS": interp_KS,
"perc_resid": perc_resid,
"dw": dw,
"interp_DW": interp_DW,
"bp_lm": bp_lm,
"bp_p": bp_p,
"interp_BP": interp_BP,
"equacao": equacao,
"formatted_output": formatted_output
}
def classificar_significancia(p_valor):
"""Classifica significância segundo NBR 14.653-2."""
if p_valor <= 0.10:
return "Grau III"
elif p_valor <= 0.20:
return "Grau II"
elif p_valor <= 0.30:
return "Grau I"
else:
return "Sem enquadramento"
def formatar_equacao(modelo, coluna_y, transformacao_y, transformacoes_x, colunas_x):
"""Formata a equação do modelo."""
equacao = f"{modelo.params['const']:.6f}"
for col in colunas_x:
coef = modelo.params[col]
transf = transformacoes_x.get(col, "(x)")
if transf == "ln(x)":
termo = f"ln({col})"
elif transf == "1/(x)":
termo = f"1/{col}"
elif transf == "(x)^2":
termo = f"({col})²"
elif transf == "raiz(x)":
termo = f"√({col})"
elif transf == "1/raiz(x)":
termo = f"1/√({col})"
elif transf == "exp(x)":
termo = f"exp({col})"
else:
termo = col
sinal = "+" if coef >= 0 else "-"
equacao += f" {sinal} {abs(coef):.6f} × {termo}"
# Aplica inversão de y
if transformacao_y == "ln(x)":
equacao = f"exp({equacao})"
elif transformacao_y == "(x)^2":
equacao = f"√({equacao})"
elif transformacao_y == "1/(x)":
equacao = f"1/({equacao})"
elif transformacao_y == "raiz(x)":
equacao = f"({equacao})²"
elif transformacao_y == "1/raiz(x)":
equacao = f"1/({equacao})²"
return f"{coluna_y} = {equacao}"
# ============================================================
# BUSCA AUTOMÁTICA DE TRANSFORMAÇÕES (OTIMIZADA)
# ============================================================
def _calcular_r2_numpy(X, y):
"""
Calcula R² usando operações matriciais diretas (mais rápido que statsmodels).
"""
n = len(y)
if n == 0:
return -np.inf
# Adiciona constante
X_const = np.column_stack([np.ones(n), X])
try:
# OLS: beta = (X'X)^-1 X'y
XtX = X_const.T @ X_const
Xty = X_const.T @ y
beta = np.linalg.solve(XtX, Xty)
# R²
y_pred = X_const @ beta
ss_res = np.sum((y - y_pred) ** 2)
ss_tot = np.sum((y - np.mean(y)) ** 2)
if ss_tot == 0:
return -np.inf
return 1 - (ss_res / ss_tot)
except:
return -np.inf
def _precomputar_transformacoes(df, colunas):
"""
Pré-computa todas as transformações válidas para as colunas.
Retorna dict: {(coluna, transformacao): array}
"""
cache = {}
for col in colunas:
valores = df[col].values.astype(float)
for transf in TRANSFORMACOES:
dados_transf = aplicar_transformacao(valores, transf)
# Só armazena se válido
if not (np.isnan(dados_transf).any() or np.isinf(dados_transf).any()):
cache[(col, transf)] = dados_transf
return cache
def _buscar_stepwise(colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, df_busca):
"""
Busca stepwise: otimiza uma variável por vez.
Mais rápido para muitas variáveis (O(7*n) em vez de O(7^n)).
"""
# Inicializa com transformação (x) para todas
transf_x_atuais = {col: "(x)" for col in colunas_livres}
# Para cada variável X, encontrar melhor transformação
for col in colunas_livres:
melhor_transf = "(x)"
melhor_r2 = -np.inf
for transf in TRANSFORMACOES:
if (col, transf) not in x_cache:
continue
# Testa com esta transformação
transf_teste = transf_x_atuais.copy()
transf_teste[col] = transf
# Monta matriz X
X_arrays = []
valido = True
for c in colunas_x:
if c in transformacoes_fixas:
t = transformacoes_fixas[c]
else:
t = transf_teste.get(c, "(x)")
if (c, t) in x_cache:
X_arrays.append(x_cache[(c, t)])
else:
valido = False
break
if not valido or not X_arrays:
continue
X = np.column_stack(X_arrays)
r2 = _calcular_r2_numpy(X, y_transf)
if r2 > melhor_r2:
melhor_r2 = r2
melhor_transf = transf
transf_x_atuais[col] = melhor_transf
# Calcula R² final
transf_x_final = {**transformacoes_fixas, **transf_x_atuais}
X_arrays = []
for c in colunas_x:
t = transf_x_final.get(c, "(x)")
if (c, t) in x_cache:
X_arrays.append(x_cache[(c, t)])
if X_arrays:
X = np.column_stack(X_arrays)
r2_final = _calcular_r2_numpy(X, y_transf)
else:
r2_final = -np.inf
return transf_x_final, r2_final
def _buscar_exaustivo_numpy(colunas_x, colunas_livres, transformacoes_fixas, y_transf, x_cache, top_n):
"""
Busca exaustiva usando numpy para cálculo rápido de R².
Para uso quando o número de combinações é pequeno (<= 5000).
"""
n_livres = len(colunas_livres)
resultados_transf = []
for combo in product(TRANSFORMACOES, repeat=n_livres):
# Monta dict de transformações
transf_x = transformacoes_fixas.copy()
for i, col in enumerate(colunas_livres):
transf_x[col] = combo[i]
# Monta matriz X
X_arrays = []
valido = True
for col in colunas_x:
t = transf_x.get(col, "(x)")
if (col, t) in x_cache:
X_arrays.append(x_cache[(col, t)])
else:
valido = False
break
if not valido or not X_arrays:
continue
X = np.column_stack(X_arrays)
r2 = _calcular_r2_numpy(X, y_transf)
if r2 > -np.inf:
resultados_transf.append((r2, transf_x.copy()))
# Ordena e retorna top
resultados_transf.sort(key=lambda x: x[0], reverse=True)
return resultados_transf[:top_n]
def buscar_melhores_transformacoes(df, coluna_y, colunas_x, transformacoes_fixas=None, transformacao_y_fixa=None,
indices_usar=None, top_n=5):
"""
Busca otimizada das melhores combinações de transformações para maximizar R².
Usa estratégia híbrida:
- Para poucos combos (<=5000): busca exaustiva com numpy (rápida)
- Para muitos combos (>5000): busca stepwise (O(7*n) em vez de O(7^n))
Parâmetros:
df: DataFrame
coluna_y: variável dependente
colunas_x: lista de variáveis independentes
transformacoes_fixas: dict {coluna: transformação} para variáveis com transformação fixa
transformacao_y_fixa: transformação fixa para y (ou None para testar todas)
indices_usar: lista de índices a incluir (se None, usa todos)
top_n: número de melhores modelos a retornar
Retorna:
Lista de dicts com informações dos top N modelos
"""
if transformacoes_fixas is None:
transformacoes_fixas = {}
# Filtra por índices se especificado
df_busca = df.copy()
if indices_usar is not None:
df_busca = df_busca.loc[indices_usar]
# Detecta dicotômicas e fixa em (x)
dicotomicas = detectar_dicotomicas(df_busca, colunas_x)
for col in dicotomicas:
if col not in transformacoes_fixas:
transformacoes_fixas[col] = "(x)"
# Identifica colunas livres (não fixas)
colunas_livres = [col for col in colunas_x if col not in transformacoes_fixas]
n_livres = len(colunas_livres)
# Define transformações de Y a testar
if transformacao_y_fixa:
transformacoes_y = [transformacao_y_fixa]
else:
transformacoes_y = TRANSFORMACOES
# Pré-computa todas as transformações de X (apenas colunas livres)
x_cache = _precomputar_transformacoes(df_busca, colunas_livres)
# Adiciona transformações fixas ao cache
for col, transf in transformacoes_fixas.items():
if col in colunas_x:
dados = aplicar_transformacao(df_busca[col].values.astype(float), transf)
if not (np.isnan(dados).any() or np.isinf(dados).any()):
x_cache[(col, transf)] = dados
# Calcula número total de combinações
total_combos = len(TRANSFORMACOES) ** n_livres if n_livres > 0 else 1
usar_stepwise = total_combos > 5000
resultados = []
for transf_y in transformacoes_y:
# Pré-computa Y transformado
y_transf = aplicar_transformacao(df_busca[coluna_y].values.astype(float), transf_y)
# Se y_transf tem problemas, pula
if np.isnan(y_transf).any() or np.isinf(y_transf).any():
continue
if n_livres == 0:
# Todas as variáveis são fixas, só calcula R²
X_arrays = [x_cache[(col, transformacoes_fixas[col])]
for col in colunas_x if (col, transformacoes_fixas.get(col, "(x)")) in x_cache]
if X_arrays:
X = np.column_stack(X_arrays)
r2 = _calcular_r2_numpy(X, y_transf)
if r2 > -np.inf:
resultados.append({
"r2": r2,
"transformacao_y": transf_y,
"transformacoes_x": transformacoes_fixas.copy()
})
elif usar_stepwise:
# Busca stepwise para muitas variáveis
transf_x, r2 = _buscar_stepwise(
colunas_x, colunas_livres, transformacoes_fixas,
y_transf, x_cache, df_busca
)
if r2 > -np.inf:
resultados.append({
"r2": r2,
"transformacao_y": transf_y,
"transformacoes_x": transf_x
})
else:
# Busca exaustiva para poucas variáveis
top_combos = _buscar_exaustivo_numpy(
colunas_x, colunas_livres, transformacoes_fixas,
y_transf, x_cache, top_n
)
for r2, transf_x in top_combos:
resultados.append({
"r2": r2,
"transformacao_y": transf_y,
"transformacoes_x": transf_x
})
# Ordena por R² e retorna top N
resultados = sorted(resultados, key=lambda x: x["r2"], reverse=True)[:top_n]
# Adiciona ranking
for i, r in enumerate(resultados):
r["rank"] = i + 1
return resultados
# ============================================================
# MICRONUMEROSIDADE (NBR 14.653-2)
# ============================================================
def testar_micronumerosidade(df, colunas_x):
"""
Testa critérios de micronumerosidade da NBR 14.653-2.
"""
# Filtra apenas colunas inteiras (dicotômicas/categóricas)
colunas_int = [col for col in colunas_x if df[col].dtype in ['int64', 'int32']]
n = len(df)
k = len(colunas_x)
# Condição geral: n >= 3*(k+1)
condicao_geral = n >= 3 * (k + 1)
resultados_por_coluna = {}
for coluna in colunas_int:
frequencia = df[coluna].value_counts()
ni_valido = True
mensagens = []
for categoria, ni in frequencia.items():
if n <= 30:
if ni < 3:
ni_valido = False
mensagens.append(f"ni={ni} < 3 na categoria {categoria}")
elif 30 < n <= 100:
if ni < 0.1 * n:
ni_valido = False
mensagens.append(f"ni={ni} < 10% de n na categoria {categoria}")
elif n > 100:
if ni <= 10:
ni_valido = False
mensagens.append(f"ni={ni} <= 10 na categoria {categoria}")
resultados_por_coluna[coluna] = {
"valido": ni_valido,
"mensagens": mensagens if mensagens else ["OK"]
}
return {
"n": n,
"k": k,
"condicao_geral_ok": condicao_geral,
"por_coluna": resultados_por_coluna
}
# ============================================================
# EXPORTAR BASE TRATADA (CSV)
# ============================================================
def exportar_base_csv(df):
"""
Exporta DataFrame para arquivo CSV temporário.
Parâmetros:
df: DataFrame a exportar
Retorna:
str: Caminho do arquivo CSV gerado
"""
import tempfile
import os
if df is None or df.empty:
return None
# Cria arquivo temporário
fd, caminho = tempfile.mkstemp(suffix=".csv", prefix="base_tratada_")
os.close(fd)
# Salva CSV
df.to_csv(caminho, index=True, encoding="utf-8-sig", sep=";", decimal=",")
return caminho
# ============================================================
# EXPORTAR MODELO (.dai)
# ============================================================
def exportar_modelo_dai(resultado_modelo, df_original, estatisticas, nome_arquivo, graficos=None):
"""
Exporta o modelo em formato .dai compatível com app MODELOS_DAI.
Parâmetros:
resultado_modelo: dict com resultado do ajuste
df_original: DataFrame original
estatisticas: DataFrame com estatísticas (mantido como DataFrame)
nome_arquivo: nome do arquivo de saída
graficos: dict opcional com gráficos Plotly (chaves: obs_calc, residuos, etc.)
Retorna:
tuple: (caminho_arquivo, mensagem)
"""
import tempfile
if resultado_modelo is None:
return None, "Nenhum modelo para exportar"
try:
# Função auxiliar para converter StringDtype para object e limpar DataFrame
def limpar_df_para_export(df):
"""Converte StringDtype para object e remove colunas Unnamed."""
df_clean = df.copy()
# Remove colunas 'Unnamed: X'
cols_unnamed = [c for c in df_clean.columns if str(c).startswith('Unnamed')]
if cols_unnamed:
df_clean = df_clean.drop(columns=cols_unnamed)
# Converte StringDtype para object (compatibilidade entre versões do pandas)
for col in df_clean.columns:
if pd.api.types.is_string_dtype(df_clean[col]):
df_clean[col] = df_clean[col].astype(object)
# Converte index se for string dtype (compatibilidade com .dai)
if pd.api.types.is_string_dtype(df_clean.index):
df_clean.index = df_clean.index.astype(object)
return df_clean
# Monta pacote
lat_col, lon_col = identificar_colunas_coords(df_original)
# DataFrame com coords para o mapa
df_coords = limpar_df_para_export(df_original)
if lat_col:
df_coords = df_coords.rename(columns={lat_col: "lat"})
if lon_col:
df_coords = df_coords.rename(columns={lon_col: "lon"})
# Filtra apenas índices usados no modelo
indices_modelo = resultado_modelo["indices_usados"]
df_modelo = df_coords.loc[indices_modelo] if indices_modelo else df_coords
df_modelo.index.name = None # Remove nome do índice
# Gera gráfico HTML para graf_model (se graficos fornecidos)
graf_model_html = ""
if graficos and graficos.get("obs_calc"):
try:
graf_model_html = pio.to_html(graficos["obs_calc"], full_html=True, include_plotlyjs='cdn')
except:
graf_model_html = ""
# Prepara tabela_coef no formato compatível (índice = Variável, sem coluna "Variável")
tabela_coef_export = limpar_df_para_export(resultado_modelo["tabela_coef"])
if "Variável" in tabela_coef_export.columns:
tabela_coef_export = tabela_coef_export.set_index("Variável")
tabela_coef_export.index.name = None # Remove nome do índice
# Converte index para object após set_index (compatibilidade .dai)
if pd.api.types.is_string_dtype(tabela_coef_export.index):
tabela_coef_export.index = tabela_coef_export.index.astype(object)
# Renomeia p-valor para p-value
tabela_coef_export = tabela_coef_export.rename(columns={"p-valor": "p-value"})
# Prepara tabela_obs_calc no formato compatível
tabela_obs_calc_export = limpar_df_para_export(resultado_modelo["tabela_obs_calc"])
# Move Índice para o index do DataFrame
if "Índice" in tabela_obs_calc_export.columns:
tabela_obs_calc_export = tabela_obs_calc_export.set_index("Índice")
tabela_obs_calc_export.index.name = None # Remove nome do índice
# Renomeia colunas para formato compatível
tabela_obs_calc_export = tabela_obs_calc_export.rename(columns={
"Resíduo Pad.": "Resíduo Padronizado",
"Resíduo Stud.": "Resíduo Studentizado",
"Cook": "Distância de Cook"
})
# Prepara modelos_resumos (diagnosticos) no formato compatível
diag = resultado_modelo["diagnosticos"].copy()
modelos_resumos = {
"n": diag.get("n"),
"k": diag.get("k"),
"desvio_padrao_residuos": diag.get("desvio_padrao_residuos"),
"mse": diag.get("mse"),
"r2": diag.get("r2"),
"r2_ajustado": diag.get("r2_ajustado"),
"r_pearson": diag.get("r_pearson"),
"Fc": diag.get("Fc"),
"p_valor_F": diag.get("p_valor_F"),
"Interpretacao_F": diag.get("interp_F"),
"ks_stat": diag.get("ks_stat"),
"ks_p": diag.get("ks_p"),
"Interpretacao_KS": diag.get("interp_KS"),
"perc_resid": diag.get("perc_resid"),
"dw": diag.get("dw"),
"Interpretacao_DW": diag.get("interp_DW"),
"bp_lm": diag.get("bp_lm"),
"bp_p": diag.get("bp_p"),
"Interpretacao_BP": diag.get("interp_BP"),
"equacao": diag.get("equacao"),
"formatted_output": diag.get("formatted_output", "")
}
# Prepara estatisticas no formato compatível (índice = Variável)
estatisticas_df = estatisticas if isinstance(estatisticas, pd.DataFrame) else pd.DataFrame(estatisticas)
estatisticas_export = limpar_df_para_export(estatisticas_df)
if "Variável" in estatisticas_export.columns:
estatisticas_export = estatisticas_export.set_index("Variável")
estatisticas_export.index.name = None # Remove nome do índice
# Converte index para object após set_index (compatibilidade .dai)
if pd.api.types.is_string_dtype(estatisticas_export.index):
estatisticas_export.index = estatisticas_export.index.astype(object)
# Adiciona coluna Tipo (dtype) se não existir
if "Tipo" not in estatisticas_export.columns:
# Obtém tipos das colunas do DataFrame original
tipos = {}
for var in estatisticas_export.index:
if var in df_original.columns:
tipos[var] = str(df_original[var].dtype)
else:
tipos[var] = "unknown"
estatisticas_export.insert(0, "Tipo", pd.Series(tipos))
# Mantém apenas colunas compatíveis (na ordem correta)
colunas_manter = ["Tipo", "Contagem", "Média", "Mediana", "Mínimo", "Máximo", "Desvio Padrão"]
colunas_existentes = [c for c in colunas_manter if c in estatisticas_export.columns]
estatisticas_export = estatisticas_export[colunas_existentes]
# Converte dtypes para compatibilidade com formato .dai
if "Contagem" in estatisticas_export.columns:
estatisticas_export["Contagem"] = estatisticas_export["Contagem"].astype(str).astype(object)
if "Tipo" in estatisticas_export.columns:
estatisticas_export["Tipo"] = estatisticas_export["Tipo"].astype(object)
# Limpa top_X_esc e top_y_esc
top_X_esc_export = limpar_df_para_export(resultado_modelo["X_transformado"])
top_X_esc_export.index.name = None # Remove nome do índice
top_y_esc_export = resultado_modelo["y_transformado"].copy()
top_y_esc_export.index.name = None # Remove nome do índice
pacote = {
"Xy_preview_out_coords": df_modelo,
"estatisticas": estatisticas_export,
"formatted_top_transformation_info": [resultado_modelo["y_esc_info"]] + resultado_modelo["X_esc_info"],
"top_X_esc": top_X_esc_export,
"top_y_esc": top_y_esc_export,
"modelos_resumos": modelos_resumos,
"tabelas_coef": tabela_coef_export,
"tabelas_obs_calc": tabela_obs_calc_export,
"graf_model": graf_model_html,
"modelos_sm": resultado_modelo["modelo_sm"]
}
# Função para converter recursivamente StringDtype para object (compatibilidade HF)
def converter_para_serializacao(obj):
"""Converte recursivamente todos os StringDtype para object dtype."""
if isinstance(obj, pd.DataFrame):
df = obj.copy()
if pd.api.types.is_string_dtype(df.index):
df.index = df.index.astype(object)
for col in df.columns:
if pd.api.types.is_string_dtype(df[col]):
df[col] = df[col].astype(object)
return df
elif isinstance(obj, pd.Series):
s = obj.copy()
if pd.api.types.is_string_dtype(s):
s = s.astype(object)
if pd.api.types.is_string_dtype(s.index):
s.index = s.index.astype(object)
return s
elif isinstance(obj, dict):
return {k: converter_para_serializacao(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [converter_para_serializacao(v) for v in obj]
else:
return obj
# Converte todo o pacote para garantir compatibilidade entre versões do pandas
pacote = converter_para_serializacao(pacote)
# Cria arquivo temporário para download (compatível com Hugging Face)
nome_base = nome_arquivo.replace(".dai", "") if nome_arquivo.endswith(".dai") else nome_arquivo
fd, caminho = tempfile.mkstemp(suffix=".dai", prefix=f"{nome_base}_")
os.close(fd)
dump(pacote, caminho)
return caminho, f"Modelo pronto para download"
except Exception as e:
return None, f"Erro ao exportar: {str(e)}"