bfiguei's picture
Update app.py
92812b5 verified
"""
Dashboard interativo - Prova Final SIEP
Análise de Risco de Crédito - CrediFast
App Streamlit para:
- Upload ou uso do dataset de risco de crédito;
- Treinamento de modelos de classificação (KNN, SVM, Random Forest, LightGBM);
- Visualização de métricas e curva ROC;
- Explicabilidade com SHAP (modelo LightGBM);
- Clusterização com KMeans e detecção de outliers com DBSCAN.
"""
import streamlit as st
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
roc_auc_score,
roc_curve,
confusion_matrix,
precision_score,
recall_score,
f1_score,
accuracy_score,
)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
from imblearn.over_sampling import SMOTE
import lightgbm as lgb
import shap
import matplotlib.pyplot as plt
import seaborn as sns
# --------------------------------------------------------------------
# Funções auxiliares
# --------------------------------------------------------------------
@st.cache_data
def load_default_data() -> pd.DataFrame:
"""Carrega o dataset padrão do arquivo local."""
df = pd.read_csv("credit_risk_dataset.csv")
return df
def preprocess_data(df: pd.DataFrame, target_col: str = "loan_status"):
"""
Pré-processa o dataset:
- get_dummies para variáveis categóricas;
- separa X e y;
- preenche NaN com mediana;
- train/test split;
- padroniza;
- aplica SMOTE no treino.
"""
df_enc = pd.get_dummies(df, drop_first=True)
y = df_enc[target_col]
X = df_enc.drop(columns=[target_col])
X = X.fillna(X.median())
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42, stratify=y
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
smote = SMOTE(random_state=42)
X_train_bal, y_train_bal = smote.fit_resample(X_train_scaled, y_train)
return X, X_train_bal, X_test_scaled, y_train_bal, y_test
def treinar_modelo(nome_modelo, X_train, y_train, params: dict):
"""Cria e treina um modelo de acordo com o nome e hiperparâmetros escolhidos."""
if nome_modelo == "KNN":
modelo = KNeighborsClassifier(
n_neighbors=params.get("n_neighbors", 5),
weights=params.get("weights", "uniform"),
)
elif nome_modelo == "SVM (RBF)":
modelo = SVC(
kernel="rbf",
C=params.get("C", 1.0),
gamma=params.get("gamma", "scale"),
probability=True,
random_state=42,
)
elif nome_modelo == "Random Forest":
modelo = RandomForestClassifier(
n_estimators=params.get("n_estimators", 200),
max_depth=params.get("max_depth", None),
random_state=42,
n_jobs=-1,
)
elif nome_modelo == "LightGBM":
modelo = lgb.LGBMClassifier(
n_estimators=params.get("n_estimators", 300),
learning_rate=params.get("learning_rate", 0.05),
max_depth=params.get("max_depth", -1),
random_state=42,
)
else:
raise ValueError("Modelo não suportado.")
modelo.fit(X_train, y_train)
return modelo
def calcular_metricas(modelo, X_test, y_test):
"""Calcula AUC, acurácia, precisão, recall, F1 e matriz de confusão."""
y_prob = modelo.predict_proba(X_test)[:, 1]
y_pred = modelo.predict(X_test)
auc = roc_auc_score(y_test, y_prob)
acc = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, pos_label=1)
recall = recall_score(y_test, y_pred, pos_label=1)
f1 = f1_score(y_test, y_pred, pos_label=1)
cm = confusion_matrix(y_test, y_pred)
return {
"AUC": auc,
"Acurácia": acc,
"Precisão (bad)": precision,
"Recall (bad)": recall,
"F1 (bad)": f1,
"Matriz de Confusão": cm,
"y_prob": y_prob,
"y_pred": y_pred,
}
# --------------------------------------------------------------------
# Layout do app
# --------------------------------------------------------------------
def main():
st.set_page_config(
page_title="Prova Final SIEP - Risco de Crédito CrediFast",
layout="wide",
)
st.title("📊 Prova Final SIEP – Análise de Risco de Crédito (CrediFast)")
st.markdown(
"""
Este dashboard interativo resume as principais etapas da Prova Final:
- Diagnóstico e modelagem de risco de crédito;
- Comparação de modelos de classificação;
- Explicabilidade com **SHAP**;
- Clusterização e detecção de outliers.
Use o menu lateral para configurar o modelo e, se quiser, enviar um CSV próprio.
"""
)
# ------------------------------------------------------------
# Sidebar - upload e escolha do modelo
# ------------------------------------------------------------
st.sidebar.header("Configurações")
uploaded_file = st.sidebar.file_uploader(
"Upload de um CSV (opcional)", type=["csv"]
)
if uploaded_file is not None:
df = pd.read_csv(uploaded_file)
st.sidebar.success("Dataset carregado do upload.")
else:
df = load_default_data()
st.sidebar.info("Usando dataset padrão: credit_risk_dataset.csv")
st.sidebar.markdown("---")
modelo_escolhido = st.sidebar.selectbox(
"Escolha o modelo de classificação:",
["KNN", "SVM (RBF)", "Random Forest", "LightGBM"],
)
# Hiperparâmetros básicos
params = {}
if modelo_escolhido == "KNN":
params["n_neighbors"] = st.sidebar.slider(
"Número de vizinhos (k)", 3, 25, 7, 2
)
params["weights"] = st.sidebar.selectbox(
"Peso dos vizinhos", ["uniform", "distance"]
)
elif modelo_escolhido == "SVM (RBF)":
params["C"] = st.sidebar.slider("C (regularização)", 0.1, 10.0, 1.0, 0.1)
params["gamma"] = st.sidebar.selectbox(
"Gamma", ["scale", "auto"]
)
elif modelo_escolhido in ["Random Forest", "LightGBM"]:
params["n_estimators"] = st.sidebar.slider(
"Número de árvores", 100, 500, 300, 50
)
max_depth = st.sidebar.slider(
"Profundidade máxima (0 = sem limite)", 0, 20, 0, 1
)
params["max_depth"] = None if max_depth == 0 else max_depth
if modelo_escolhido == "LightGBM":
params["learning_rate"] = st.sidebar.slider(
"Learning rate", 0.01, 0.3, 0.05, 0.01
)
# ------------------------------------------------------------
# Aba de visualização geral
# ------------------------------------------------------------
st.subheader("1️⃣ Visão geral do dataset")
col1, col2 = st.columns([2, 1])
with col1:
st.write("Dimensão do dataset:", df.shape)
st.dataframe(df.head())
with col2:
st.write("Distribuição da variável-alvo (loan_status)")
if "loan_status" in df.columns:
counts = df["loan_status"].value_counts().sort_index()
perc = df["loan_status"].value_counts(normalize=True).sort_index() * 100
dist_df = pd.DataFrame(
{
"classe": ["good (0)", "bad (1)"],
"quantidade": counts.values,
"percentual (%)": perc.values.round(2),
}
)
st.dataframe(dist_df)
fig, ax = plt.subplots(figsize=(4, 3))
sns.barplot(x=["good (0)", "bad (1)"], y=counts.values, ax=ax)
ax.set_ylabel("Quantidade")
ax.set_title("Distribuição loan_status")
st.pyplot(fig)
else:
st.error("Coluna 'loan_status' não encontrada no dataset.")
# ------------------------------------------------------------
# Pré-processamento e treino
# ------------------------------------------------------------
if "loan_status" not in df.columns:
st.stop()
X_original, X_train_bal, X_test_scaled, y_train_bal, y_test = preprocess_data(df)
st.markdown("---")
st.subheader("2️⃣ Treinamento e desempenho do modelo selecionado")
with st.spinner("Treinando modelo..."):
modelo = treinar_modelo(modelo_escolhido, X_train_bal, y_train_bal, params)
resultados = calcular_metricas(modelo, X_test_scaled, y_test)
# Métricas
mcol1, mcol2, mcol3, mcol4 = st.columns(4)
mcol1.metric("AUC", f"{resultados['AUC']:.3f}")
mcol2.metric("Acurácia", f"{resultados['Acurácia']:.3f}")
mcol3.metric("Precisão (bad)", f"{resultados['Precisão (bad)']:.3f}")
mcol4.metric("Recall (bad)", f"{resultados['Recall (bad)']:.3f}")
st.write("**F1-score (classe bad):**", f"{resultados['F1 (bad)']:.3f}")
# Matriz de confusão
cm = resultados["Matriz de Confusão"]
fig_cm, ax_cm = plt.subplots(figsize=(4, 3))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False, ax=ax_cm)
ax_cm.set_xlabel("Predito")
ax_cm.set_ylabel("Real")
st.pyplot(fig_cm)
# Curva ROC
st.write("### Curva ROC")
fpr, tpr, _ = roc_curve(y_test, resultados["y_prob"])
fig_roc, ax_roc = plt.subplots(figsize=(5, 4))
ax_roc.plot(fpr, tpr, label=f"{modelo_escolhido} (AUC={resultados['AUC']:.3f})")
ax_roc.plot([0, 1], [0, 1], "k--")
ax_roc.set_xlabel("Falso Positivo (FPR)")
ax_roc.set_ylabel("Verdadeiro Positivo (TPR)")
ax_roc.legend()
ax_roc.grid(True)
st.pyplot(fig_roc)
st.markdown(
"""
**Interpretação rápida:**
- AUC alta indica boa separação entre `good` e `bad`;
- Recall alto para a classe `bad` significa que o modelo está capturando boa parte dos inadimplentes;
- Precisão alta na classe `bad` indica que, quando o modelo diz que o cliente é de alto risco, ele normalmente está certo.
"""
)
# ------------------------------------------------------------
# SHAP - só para LightGBM (modelo vencedor da prova)
# ------------------------------------------------------------
st.markdown("---")
st.subheader("3️⃣ Explicabilidade com SHAP (modelo LightGBM)")
st.info(
"Os gráficos abaixo sempre utilizam um modelo LightGBM treinado no mesmo "
"conjunto pré-processado, pois ele foi o modelo de melhor desempenho na análise."
)
# Treinar LightGBM padrão para SHAP (independente do modelo escolhido)
modelo_shap = treinar_modelo(
"LightGBM",
X_train_bal,
y_train_bal,
{"n_estimators": 300, "learning_rate": 0.05, "max_depth": -1},
)
# Amostra para acelerar
amostra_idx = np.random.choice(len(X_train_bal), size=min(2000, len(X_train_bal)), replace=False)
X_amostra = X_train_bal[amostra_idx, :]
X_amostra_df = pd.DataFrame(X_amostra, columns=X_original.columns)
explainer = shap.TreeExplainer(modelo_shap)
shap_values = explainer.shap_values(X_amostra_df)
if isinstance(shap_values, list):
shap_vals_plot = shap_values[1]
else:
shap_vals_plot = shap_values
st.write("### SHAP Summary Plot – Importância global das variáveis")
fig_shap_sum, _ = plt.subplots(figsize=(7, 8))
shap.summary_plot(shap_vals_plot, X_amostra_df, plot_type="dot", show=False)
st.pyplot(fig_shap_sum)
st.write(
"No gráfico acima, cada ponto é um cliente. A cor indica se o valor da variável é alto (vermelho) ou baixo (azul) "
"e a posição mostra se isso aumenta ou reduz o risco estimado de inadimplência."
)
# Explicação local (pick one good, one bad)
st.write("### Explicações locais (um cliente good e um cliente bad)")
X_test_df = pd.DataFrame(X_test_scaled, columns=X_original.columns)
y_test_reset = y_test.reset_index(drop=True)
idx_good = y_test_reset[y_test_reset == 0].index[0]
idx_bad = y_test_reset[y_test_reset == 1].index[0]
instancia_good = X_test_df.iloc[idx_good : idx_good + 1]
instancia_bad = X_test_df.iloc[idx_bad : idx_bad + 1]
shap_vals_good = explainer.shap_values(instancia_good)
shap_vals_bad = explainer.shap_values(instancia_bad)
if isinstance(shap_vals_good, list):
sv_good = shap_vals_good[1][0]
sv_bad = shap_vals_bad[1][0]
expected = explainer.expected_value[1]
else:
sv_good = shap_vals_good[0]
sv_bad = shap_vals_bad[0]
expected = explainer.expected_value
st.write("#### Cliente exemplo – classificado como good (0)")
fig_wf_good, _ = plt.subplots(figsize=(6, 4))
shap.plots._waterfall.waterfall_legacy(
expected, sv_good, feature_names=X_original.columns, max_display=10, show=False
)
st.pyplot(fig_wf_good)
st.write("#### Cliente exemplo – classificado como bad (1)")
fig_wf_bad, _ = plt.subplots(figsize=(6, 4))
shap.plots._waterfall.waterfall_legacy(
expected, sv_bad, feature_names=X_original.columns, max_display=10, show=False
)
st.pyplot(fig_wf_bad)
# ------------------------------------------------------------
# Clusterização e outliers
# ------------------------------------------------------------
st.markdown("---")
st.subheader("4️⃣ Clusterização (KMeans) e Outliers (DBSCAN)")
# KMeans
n_clusters = st.slider("Número de clusters (KMeans)", 2, 6, 4, 1)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters_kmeans = kmeans.fit_predict(X_train_bal)
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_train_bal)
df_clusters = pd.DataFrame(
{
"PC1": X_pca[:, 0],
"PC2": X_pca[:, 1],
"cluster": clusters_kmeans,
"loan_status": y_train_bal,
}
)
fig_k, ax_k = plt.subplots(figsize=(6, 4))
sns.scatterplot(
data=df_clusters,
x="PC1",
y="PC2",
hue="cluster",
style="loan_status",
palette="tab10",
ax=ax_k,
)
ax_k.set_title("Clusters de clientes (KMeans) no espaço PCA")
st.pyplot(fig_k)
taxa_inad_cluster = (
df_clusters.groupby("cluster")["loan_status"].mean().rename("Taxa bad")
)
st.write("**Taxa de inadimplência por cluster (treino balanceado):**")
st.dataframe((taxa_inad_cluster * 100).round(2).to_frame().style.format("{:.2f}%"))
# DBSCAN
st.write("### Outliers com DBSCAN")
eps_val = st.slider("eps (DBSCAN)", 0.3, 2.0, 0.8, 0.1)
min_samples_val = st.slider("min_samples (DBSCAN)", 5, 50, 15, 1)
dbscan = DBSCAN(eps=eps_val, min_samples=min_samples_val)
labels_db = dbscan.fit_predict(X_train_bal)
df_db = pd.DataFrame(
{
"PC1": X_pca[:, 0],
"PC2": X_pca[:, 1],
"cluster_dbscan": labels_db,
"loan_status": y_train_bal,
}
)
fig_db, ax_db = plt.subplots(figsize=(6, 4))
sns.scatterplot(
data=df_db,
x="PC1",
y="PC2",
hue="cluster_dbscan",
palette="Set1",
ax=ax_db,
)
ax_db.set_title("DBSCAN – outliers no espaço PCA (label = -1)")
st.pyplot(fig_db)
outliers_mask = labels_db == -1
taxa_bad_out = df_db[outliers_mask]["loan_status"].mean()
taxa_bad_norm = df_db[~outliers_mask]["loan_status"].mean()
st.write(
f"**Taxa de inadimplência entre outliers (label = -1):** {taxa_bad_out:.3f}"
)
st.write(
f"**Taxa de inadimplência entre não-outliers:** {taxa_bad_norm:.3f}"
)
st.markdown(
"""
Outliers representam clientes com perfis incomuns na base. Comparar a taxa de inadimplência
entre outliers e não-outliers ajuda a entender se esses perfis raros são mais ou menos arriscados.
"""
)
st.markdown("---")
st.caption(
"Prova Final SIEP – Sistemas de Informação em Engenharia de Produção · Bianca Figueiredo"
)
if __name__ == "__main__":
main()