PERCEUL / cluster_ops /cluster_utils.py
sdbrgo's picture
Rename clustering/cluster_utils.py to cluster_ops/cluster_utils.py
1a56e4b verified
import pandas as pd
import numpy as np
from sklearn.metrics import silhouette_score
from sklearn.cluster import KMeans
__all__ = [
"choose_k",
"compute_cluster_centroids_pca",
"inverse_project_centroids",
"compute_cluster_stats",
"identify_top_drivers"
]
#========== Before Final Clustering ==========
def choose_k(X_pca):
best_k = 2
best_score = -1
# Ensure k does not exceed n_samples - 1 for silhouette_score validity
n_samples = X_pca.shape[0]
max_k_for_silhouette = n_samples # range is exclusive of end, so this will allow k up to n_samples - 1
for k in range(2, min(12, max_k_for_silhouette)):
km = KMeans(n_clusters=k, random_state=42, n_init='auto') # Added n_init='auto' to suppress future warning
labels = km.fit_predict(X_pca)
score = silhouette_score(X_pca, labels)
if score > best_score:
best_score = score
best_k = k
print(f"Executing choose_k()... Best Score: {best_score}")
return best_k
#========== During Cluster Analysis ==========
def compute_cluster_centroids_pca(df_pca, labels):
df = pd.DataFrame(df_pca)
df['cluster'] = labels
return df.groupby('cluster').mean()
# maps PCA-space centroids back to original feature space
def inverse_project_centroids(pca_centroids, pca_model, scaler_model, original_feature_names):
scaled_centroids = pca_model.inverse_transform(pca_centroids.values) # back-project from PCA space to scaled feature space
original_space_centroids = scaler_model.inverse_transform(scaled_centroids) # undo scaling
df_original = pd.DataFrame(
original_space_centroids,
columns=original_feature_names,
index=pca_centroids.index
)
return df_original
# function to compute and save cluster stats
def compute_cluster_stats(df_pca, labels, feature_names):
df = pd.DataFrame(df_pca, columns=feature_names)
df['cluster'] = labels
stats = {}
for cluster_id in sorted(df['cluster'].unique()):
cluster_data = df[df['cluster'] == cluster_id].drop(columns=['cluster'])
stats[cluster_id] = {
"count": len(cluster_data),
"mean": cluster_data.mean().to_dict(),
"median": cluster_data.median().to_dict(),
"std": cluster_data.std().to_dict(),
"min": cluster_data.min().to_dict(),
"max": cluster_data.max().to_dict(),
"range": (cluster_data.max() - cluster_data.min()).to_dict()
}
return stats
# ranks top drivers based on `top_n`
def identify_top_drivers(original_space_centroids, top_n):
global_mean = original_space_centroids.mean()
drivers = {}
for cluster_id, row in original_space_centroids.iterrows():
deviation = row - global_mean
ranked = deviation.abs().sort_values(ascending=False)
top_features = ranked.head(top_n).index.tolist()
drivers[cluster_id] = {
# "top_features": top_features,
"deviations": deviation[top_features].to_dict()
}
return drivers