|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.metrics import silhouette_score |
|
|
from sklearn.cluster import KMeans |
|
|
|
|
|
__all__ = [ |
|
|
"choose_k", |
|
|
"compute_cluster_centroids_pca", |
|
|
"inverse_project_centroids", |
|
|
"compute_cluster_stats", |
|
|
"identify_top_drivers" |
|
|
] |
|
|
|
|
|
|
|
|
def choose_k(X_pca): |
|
|
best_k = 2 |
|
|
best_score = -1 |
|
|
|
|
|
|
|
|
n_samples = X_pca.shape[0] |
|
|
max_k_for_silhouette = n_samples |
|
|
|
|
|
for k in range(2, min(12, max_k_for_silhouette)): |
|
|
km = KMeans(n_clusters=k, random_state=42, n_init='auto') |
|
|
labels = km.fit_predict(X_pca) |
|
|
score = silhouette_score(X_pca, labels) |
|
|
|
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_k = k |
|
|
|
|
|
print(f"Executing choose_k()... Best Score: {best_score}") |
|
|
|
|
|
return best_k |
|
|
|
|
|
|
|
|
def compute_cluster_centroids_pca(df_pca, labels): |
|
|
df = pd.DataFrame(df_pca) |
|
|
df['cluster'] = labels |
|
|
|
|
|
return df.groupby('cluster').mean() |
|
|
|
|
|
|
|
|
def inverse_project_centroids(pca_centroids, pca_model, scaler_model, original_feature_names): |
|
|
|
|
|
scaled_centroids = pca_model.inverse_transform(pca_centroids.values) |
|
|
original_space_centroids = scaler_model.inverse_transform(scaled_centroids) |
|
|
|
|
|
df_original = pd.DataFrame( |
|
|
original_space_centroids, |
|
|
columns=original_feature_names, |
|
|
index=pca_centroids.index |
|
|
) |
|
|
|
|
|
return df_original |
|
|
|
|
|
|
|
|
def compute_cluster_stats(df_pca, labels, feature_names): |
|
|
df = pd.DataFrame(df_pca, columns=feature_names) |
|
|
df['cluster'] = labels |
|
|
|
|
|
stats = {} |
|
|
|
|
|
for cluster_id in sorted(df['cluster'].unique()): |
|
|
cluster_data = df[df['cluster'] == cluster_id].drop(columns=['cluster']) |
|
|
|
|
|
stats[cluster_id] = { |
|
|
"count": len(cluster_data), |
|
|
"mean": cluster_data.mean().to_dict(), |
|
|
"median": cluster_data.median().to_dict(), |
|
|
"std": cluster_data.std().to_dict(), |
|
|
"min": cluster_data.min().to_dict(), |
|
|
"max": cluster_data.max().to_dict(), |
|
|
"range": (cluster_data.max() - cluster_data.min()).to_dict() |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
def identify_top_drivers(original_space_centroids, top_n): |
|
|
global_mean = original_space_centroids.mean() |
|
|
|
|
|
drivers = {} |
|
|
|
|
|
for cluster_id, row in original_space_centroids.iterrows(): |
|
|
deviation = row - global_mean |
|
|
ranked = deviation.abs().sort_values(ascending=False) |
|
|
|
|
|
top_features = ranked.head(top_n).index.tolist() |
|
|
|
|
|
drivers[cluster_id] = { |
|
|
|
|
|
"deviations": deviation[top_features].to_dict() |
|
|
} |
|
|
|
|
|
return drivers |