Benchmark-Kit-26 / src /clustering.py
dwmk's picture
Update src/clustering.py
a21bdec verified
# clustering.py
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import silhouette_score
def get_preprocessor(df_subset):
"""Builds a robust sklearn preprocessor for mixed data types."""
num_cols = df_subset.select_dtypes(include=np.number).columns
cat_cols = df_subset.select_dtypes(exclude=np.number).columns
transformers = []
if len(num_cols) > 0:
transformers.append(('num', StandardScaler(), num_cols))
if len(cat_cols) > 0:
transformers.append(('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), cat_cols))
return ColumnTransformer(transformers=transformers)
def run_clustering():
st.header("🧊 Advanced Clustering Lab")
df = st.session_state.processed_df
features = st.session_state.feature_cols
if not features:
st.warning("⚠️ Please select features in the EDA tab first.")
return
# Prepare Data
X_raw = df[features].copy()
# ---------------- Configuration ----------------
c1, c2 = st.columns(2)
with c1:
k_range = st.slider("Select K Range for Elbow Method", 2, 15, (2, 8))
with c2:
n_clusters = st.slider("Choose Final K", 2, 15, 3)
# ---------------- Elbow Method ----------------
if st.checkbox("Show Elbow Method & Silhouette Analysis"):
with st.spinner("Calculating optimal K..."):
preprocessor = get_preprocessor(X_raw)
X_processed = preprocessor.fit_transform(X_raw)
inertias = []
sil_scores = []
K_vals = range(k_range[0], k_range[1] + 1)
for k in K_vals:
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_processed)
inertias.append(km.inertia_)
sil_scores.append(silhouette_score(X_processed, labels))
# Plotting
col1, col2 = st.columns(2)
# Inertia Plot
fig_elbow = px.line(x=list(K_vals), y=inertias, markers=True,
labels={'x':'K', 'y':'Inertia'}, title="Elbow Curve (Inertia)")
col1.plotly_chart(fig_elbow, use_container_width=True)
# Silhouette Plot
fig_sil = px.line(x=list(K_vals), y=sil_scores, markers=True,
labels={'x':'K', 'y':'Silhouette Score'}, title="Silhouette Score (Higher is better)")
col2.plotly_chart(fig_sil, use_container_width=True)
# ---------------- Final Clustering ----------------
if st.button("Run K-Means Clustering"):
with st.spinner("Clustering..."):
# Pipeline: Preprocess -> PCA (for viz) -> KMeans
preprocessor = get_preprocessor(X_raw)
# 1. Preprocess
X_processed = preprocessor.fit_transform(X_raw)
# 2. Fit Model
model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
clusters = model.fit_predict(X_processed)
# 3. Add to DataFrame locally for display
df_display = df.copy()
df_display["Cluster"] = clusters.astype(str)
st.success("Clustering Complete!")
st.dataframe(df_display.head())
# 4. Visualization (PCA if dims > 2)
st.subheader("Cluster Visualization")
if X_processed.shape[1] > 2:
st.info("Applying PCA to visualize high-dimensional data in 2D.")
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_processed)
fig = px.scatter(
x=X_pca[:, 0], y=X_pca[:, 1],
color=df_display["Cluster"],
title=f"PCA Projection of Clusters (K={n_clusters})",
labels={'x': 'PC1', 'y': 'PC2'},
template="plotly_white"
)
else:
# If 2 dims, just plot them directly
# We need to find the column names from preprocessor is tricky,
# so we fallback to PCA to be safe and consistent, or use raw if numeric.
# Simplest robust approach: Always use PCA for generic consistency.
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_processed)
fig = px.scatter(
x=X_pca[:, 0], y=X_pca[:, 1],
color=df_display["Cluster"],
title=f"Cluster Visualization (K={n_clusters})",
labels={'x': 'Dim 1', 'y': 'Dim 2'}
)
st.plotly_chart(fig, use_container_width=True)