Spaces:
Sleeping
Sleeping
File size: 10,835 Bytes
5e2aaa0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 | """
Clustering Analysis Module
=========================
This module implements various clustering algorithms for customer segmentation.
"""
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans, DBSCAN
from sklearn.metrics import silhouette_score, calinski_harabasz_score
import streamlit as st
class ClusteringAnalyzer:
"""
Handles clustering analysis for customer segmentation.
"""
def __init__(self):
self.kmeans_model = None
self.dbscan_model = None
self.optimal_clusters = None
self.cluster_labels = {}
def find_optimal_clusters(self, scaled_data, max_clusters=10):
"""Find optimal number of clusters using multiple methods."""
if scaled_data is None:
st.error("No scaled data available. Please preprocess data first.")
return None
cluster_range = range(2, max_clusters + 1)
inertias = []
silhouette_scores = []
calinski_scores = []
progress_bar = st.progress(0)
status_text = st.empty()
for i, k in enumerate(cluster_range):
status_text.text(f'Evaluating {k} clusters...')
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(scaled_data)
inertias.append(kmeans.inertia_)
silhouette_scores.append(silhouette_score(scaled_data, cluster_labels))
calinski_scores.append(calinski_harabasz_score(scaled_data, cluster_labels))
progress_bar.progress((i + 1) / len(cluster_range))
status_text.text('Optimization complete!')
# Find optimal clusters based on silhouette score
optimal_silhouette = cluster_range[np.argmax(silhouette_scores)]
optimal_calinski = cluster_range[np.argmax(calinski_scores)]
# Store results
self.optimization_results = {
'cluster_range': list(cluster_range),
'inertias': inertias,
'silhouette_scores': silhouette_scores,
'calinski_scores': calinski_scores,
'optimal_silhouette': optimal_silhouette,
'optimal_calinski': optimal_calinski
}
self.optimal_clusters = optimal_silhouette
st.success(f"โ
Optimal clusters found: {self.optimal_clusters} (based on Silhouette Score)")
return self.optimization_results
def apply_kmeans(self, scaled_data, n_clusters=None):
"""Apply K-Means clustering."""
if scaled_data is None:
st.error("No scaled data available. Please preprocess data first.")
return None
if n_clusters is None:
n_clusters = self.optimal_clusters or 5
with st.spinner(f'Applying K-Means clustering with {n_clusters} clusters...'):
self.kmeans_model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
kmeans_labels = self.kmeans_model.fit_predict(scaled_data)
# Calculate metrics
silhouette_avg = silhouette_score(scaled_data, kmeans_labels)
calinski_score = calinski_harabasz_score(scaled_data, kmeans_labels)
self.cluster_labels['kmeans'] = kmeans_labels
results = {
'labels': kmeans_labels,
'n_clusters': n_clusters,
'silhouette_score': silhouette_avg,
'calinski_score': calinski_score,
'inertia': self.kmeans_model.inertia_,
'centers': self.kmeans_model.cluster_centers_
}
st.success(f"โ
K-Means clustering completed!")
st.info(f"Silhouette Score: {silhouette_avg:.3f} | Calinski-Harabasz Score: {calinski_score:.3f}")
return results
def apply_dbscan(self, scaled_data, eps=0.5, min_samples=5):
"""Apply DBSCAN clustering."""
if scaled_data is None:
st.error("No scaled data available. Please preprocess data first.")
return None
with st.spinner(f'Applying DBSCAN clustering (eps={eps}, min_samples={min_samples})...'):
self.dbscan_model = DBSCAN(eps=eps, min_samples=min_samples)
dbscan_labels = self.dbscan_model.fit_predict(scaled_data)
# Calculate metrics
n_clusters = len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0)
n_noise = list(dbscan_labels).count(-1)
self.cluster_labels['dbscan'] = dbscan_labels
results = {
'labels': dbscan_labels,
'n_clusters': n_clusters,
'n_noise': n_noise,
'eps': eps,
'min_samples': min_samples
}
# Calculate silhouette score only if we have more than 1 cluster and non-noise points
if n_clusters > 1:
non_noise_mask = dbscan_labels != -1
if np.sum(non_noise_mask) > 1:
silhouette_avg = silhouette_score(scaled_data[non_noise_mask],
dbscan_labels[non_noise_mask])
results['silhouette_score'] = silhouette_avg
st.success(f"โ
DBSCAN clustering completed!")
st.info(f"Clusters: {n_clusters} | Noise points: {n_noise}")
return results
def analyze_clusters(self, data, algorithm='kmeans'):
"""Analyze cluster characteristics."""
# Normalize algorithm name
algo_key = algorithm.lower().replace('-', '').replace(' ', '')
if algo_key not in self.cluster_labels:
st.error(f"No {algorithm} clustering results found. Please run clustering first.")
return None
cluster_labels = self.cluster_labels[algo_key]
# Create consistent column name (use the format that actually gets created)
if algo_key == 'kmeans':
cluster_col = 'Kmeans_Cluster' # Match what we see in the error
elif algo_key == 'dbscan':
cluster_col = 'DBSCAN_Cluster'
else:
cluster_col = f'{algorithm}_Cluster'
# Add cluster labels to data
analysis_data = data.copy()
analysis_data[cluster_col] = cluster_labels
# Calculate cluster statistics
numeric_cols = analysis_data.select_dtypes(include=[np.number]).columns
numeric_cols = [col for col in numeric_cols if not col.endswith('_Cluster')]
cluster_stats = analysis_data.groupby(cluster_col)[numeric_cols].agg(['mean', 'std', 'count'])
# Calculate spending analysis if available
spending_analysis = None
if 'Spending Score (1-100)' in analysis_data.columns:
spending_analysis = analysis_data.groupby(cluster_col)['Spending Score (1-100)'].agg(['mean', 'std', 'min', 'max', 'count'])
results = {
'data_with_clusters': analysis_data,
'cluster_stats': cluster_stats,
'spending_analysis': spending_analysis,
'cluster_distribution': analysis_data[cluster_col].value_counts().sort_index()
}
return results
def get_cluster_profiles(self, data, algorithm='kmeans'):
"""Generate customer profiles for each cluster."""
# Normalize algorithm name
algo_key = algorithm.lower().replace('-', '').replace(' ', '')
if algo_key not in self.cluster_labels:
return None
cluster_labels = self.cluster_labels[algo_key]
# Create consistent column name (use the format that actually gets created)
if algo_key == 'kmeans':
cluster_col = 'Kmeans_Cluster' # Match what we see in the error
elif algo_key == 'dbscan':
cluster_col = 'DBSCAN_Cluster'
else:
cluster_col = f'{algorithm}_Cluster'
analysis_data = data.copy()
analysis_data[cluster_col] = cluster_labels
profiles = []
for cluster in sorted(analysis_data[cluster_col].unique()):
if cluster == -1: # Skip noise points in DBSCAN
continue
cluster_data = analysis_data[analysis_data[cluster_col] == cluster]
profile = {
'cluster': cluster,
'size': len(cluster_data),
'percentage': len(cluster_data) / len(analysis_data) * 100
}
# Add feature statistics
if 'Age' in cluster_data.columns:
profile['avg_age'] = cluster_data['Age'].mean()
profile['age_std'] = cluster_data['Age'].std()
if 'Annual Income (k$)' in cluster_data.columns:
profile['avg_income'] = cluster_data['Annual Income (k$)'].mean()
profile['income_std'] = cluster_data['Annual Income (k$)'].std()
if 'Spending Score (1-100)' in cluster_data.columns:
profile['avg_spending'] = cluster_data['Spending Score (1-100)'].mean()
profile['spending_std'] = cluster_data['Spending Score (1-100)'].std()
if 'Gender' in cluster_data.columns:
profile['gender_dist'] = cluster_data['Gender'].value_counts().to_dict()
# Generate profile characterization
if 'avg_income' in profile and 'avg_spending' in profile:
avg_income = profile['avg_income']
avg_spending = profile['avg_spending']
if avg_income > 70 and avg_spending > 70:
profile['type'] = "๐ HIGH VALUE"
profile['description'] = "High income, high spending - Premium customers"
elif avg_income > 70 and avg_spending < 40:
profile['type'] = "๐ผ CONSERVATIVE"
profile['description'] = "High income, low spending - Potential for upselling"
elif avg_income < 40 and avg_spending > 70:
profile['type'] = "๐ฏ BUDGET SPENDERS"
profile['description'] = "Low income, high spending - Price-sensitive loyal customers"
elif avg_income < 40 and avg_spending < 40:
profile['type'] = "๐ LOW ENGAGEMENT"
profile['description'] = "Low income, low spending - Need retention strategies"
else:
profile['type'] = "โ๏ธ BALANCED"
profile['description'] = "Moderate income and spending - Core customer base"
profiles.append(profile)
return profiles
|