| """ |
| Clustering Analysis Module |
| ========================= |
| |
| This module implements various clustering algorithms for customer segmentation. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.cluster import KMeans, DBSCAN |
| from sklearn.metrics import silhouette_score, calinski_harabasz_score |
| import streamlit as st |
|
|
| class ClusteringAnalyzer: |
| """ |
| Handles clustering analysis for customer segmentation. |
| """ |
| |
| def __init__(self): |
| self.kmeans_model = None |
| self.dbscan_model = None |
| self.optimal_clusters = None |
| self.cluster_labels = {} |
| |
| def find_optimal_clusters(self, scaled_data, max_clusters=10): |
| """Find optimal number of clusters using multiple methods.""" |
| if scaled_data is None: |
| st.error("No scaled data available. Please preprocess data first.") |
| return None |
| |
| cluster_range = range(2, max_clusters + 1) |
| inertias = [] |
| silhouette_scores = [] |
| calinski_scores = [] |
| |
| progress_bar = st.progress(0) |
| status_text = st.empty() |
| |
| for i, k in enumerate(cluster_range): |
| status_text.text(f'Evaluating {k} clusters...') |
| |
| kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) |
| cluster_labels = kmeans.fit_predict(scaled_data) |
| |
| inertias.append(kmeans.inertia_) |
| silhouette_scores.append(silhouette_score(scaled_data, cluster_labels)) |
| calinski_scores.append(calinski_harabasz_score(scaled_data, cluster_labels)) |
| |
| progress_bar.progress((i + 1) / len(cluster_range)) |
| |
| status_text.text('Optimization complete!') |
| |
| |
| optimal_silhouette = cluster_range[np.argmax(silhouette_scores)] |
| optimal_calinski = cluster_range[np.argmax(calinski_scores)] |
| |
| |
| self.optimization_results = { |
| 'cluster_range': list(cluster_range), |
| 'inertias': inertias, |
| 'silhouette_scores': silhouette_scores, |
| 'calinski_scores': calinski_scores, |
| 'optimal_silhouette': optimal_silhouette, |
| 'optimal_calinski': optimal_calinski |
| } |
| |
| self.optimal_clusters = optimal_silhouette |
| |
| st.success(f"β
Optimal clusters found: {self.optimal_clusters} (based on Silhouette Score)") |
| |
| return self.optimization_results |
| |
| def apply_kmeans(self, scaled_data, n_clusters=None): |
| """Apply K-Means clustering.""" |
| if scaled_data is None: |
| st.error("No scaled data available. Please preprocess data first.") |
| return None |
| |
| if n_clusters is None: |
| n_clusters = self.optimal_clusters or 5 |
| |
| with st.spinner(f'Applying K-Means clustering with {n_clusters} clusters...'): |
| self.kmeans_model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) |
| kmeans_labels = self.kmeans_model.fit_predict(scaled_data) |
| |
| |
| silhouette_avg = silhouette_score(scaled_data, kmeans_labels) |
| calinski_score = calinski_harabasz_score(scaled_data, kmeans_labels) |
| |
| self.cluster_labels['kmeans'] = kmeans_labels |
| |
| results = { |
| 'labels': kmeans_labels, |
| 'n_clusters': n_clusters, |
| 'silhouette_score': silhouette_avg, |
| 'calinski_score': calinski_score, |
| 'inertia': self.kmeans_model.inertia_, |
| 'centers': self.kmeans_model.cluster_centers_ |
| } |
| |
| st.success(f"β
K-Means clustering completed!") |
| st.info(f"Silhouette Score: {silhouette_avg:.3f} | Calinski-Harabasz Score: {calinski_score:.3f}") |
| |
| return results |
| |
| def apply_dbscan(self, scaled_data, eps=0.5, min_samples=5): |
| """Apply DBSCAN clustering.""" |
| if scaled_data is None: |
| st.error("No scaled data available. Please preprocess data first.") |
| return None |
| |
| with st.spinner(f'Applying DBSCAN clustering (eps={eps}, min_samples={min_samples})...'): |
| self.dbscan_model = DBSCAN(eps=eps, min_samples=min_samples) |
| dbscan_labels = self.dbscan_model.fit_predict(scaled_data) |
| |
| |
| n_clusters = len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0) |
| n_noise = list(dbscan_labels).count(-1) |
| |
| self.cluster_labels['dbscan'] = dbscan_labels |
| |
| results = { |
| 'labels': dbscan_labels, |
| 'n_clusters': n_clusters, |
| 'n_noise': n_noise, |
| 'eps': eps, |
| 'min_samples': min_samples |
| } |
| |
| |
| if n_clusters > 1: |
| non_noise_mask = dbscan_labels != -1 |
| if np.sum(non_noise_mask) > 1: |
| silhouette_avg = silhouette_score(scaled_data[non_noise_mask], |
| dbscan_labels[non_noise_mask]) |
| results['silhouette_score'] = silhouette_avg |
| |
| st.success(f"β
DBSCAN clustering completed!") |
| st.info(f"Clusters: {n_clusters} | Noise points: {n_noise}") |
| |
| return results |
| |
| def analyze_clusters(self, data, algorithm='kmeans'): |
| """Analyze cluster characteristics.""" |
| |
| algo_key = algorithm.lower().replace('-', '').replace(' ', '') |
| |
| if algo_key not in self.cluster_labels: |
| st.error(f"No {algorithm} clustering results found. Please run clustering first.") |
| return None |
| |
| cluster_labels = self.cluster_labels[algo_key] |
| |
| |
| if algo_key == 'kmeans': |
| cluster_col = 'Kmeans_Cluster' |
| elif algo_key == 'dbscan': |
| cluster_col = 'DBSCAN_Cluster' |
| else: |
| cluster_col = f'{algorithm}_Cluster' |
| |
| |
| analysis_data = data.copy() |
| analysis_data[cluster_col] = cluster_labels |
| |
| |
| numeric_cols = analysis_data.select_dtypes(include=[np.number]).columns |
| numeric_cols = [col for col in numeric_cols if not col.endswith('_Cluster')] |
| |
| cluster_stats = analysis_data.groupby(cluster_col)[numeric_cols].agg(['mean', 'std', 'count']) |
| |
| |
| spending_analysis = None |
| if 'Spending Score (1-100)' in analysis_data.columns: |
| spending_analysis = analysis_data.groupby(cluster_col)['Spending Score (1-100)'].agg(['mean', 'std', 'min', 'max', 'count']) |
| |
| results = { |
| 'data_with_clusters': analysis_data, |
| 'cluster_stats': cluster_stats, |
| 'spending_analysis': spending_analysis, |
| 'cluster_distribution': analysis_data[cluster_col].value_counts().sort_index() |
| } |
| |
| return results |
| |
| def get_cluster_profiles(self, data, algorithm='kmeans'): |
| """Generate customer profiles for each cluster.""" |
| |
| algo_key = algorithm.lower().replace('-', '').replace(' ', '') |
| |
| if algo_key not in self.cluster_labels: |
| return None |
| |
| cluster_labels = self.cluster_labels[algo_key] |
| |
| |
| if algo_key == 'kmeans': |
| cluster_col = 'Kmeans_Cluster' |
| elif algo_key == 'dbscan': |
| cluster_col = 'DBSCAN_Cluster' |
| else: |
| cluster_col = f'{algorithm}_Cluster' |
| |
| analysis_data = data.copy() |
| analysis_data[cluster_col] = cluster_labels |
| |
| profiles = [] |
| |
| for cluster in sorted(analysis_data[cluster_col].unique()): |
| if cluster == -1: |
| continue |
| |
| cluster_data = analysis_data[analysis_data[cluster_col] == cluster] |
| |
| profile = { |
| 'cluster': cluster, |
| 'size': len(cluster_data), |
| 'percentage': len(cluster_data) / len(analysis_data) * 100 |
| } |
| |
| |
| if 'Age' in cluster_data.columns: |
| profile['avg_age'] = cluster_data['Age'].mean() |
| profile['age_std'] = cluster_data['Age'].std() |
| |
| if 'Annual Income (k$)' in cluster_data.columns: |
| profile['avg_income'] = cluster_data['Annual Income (k$)'].mean() |
| profile['income_std'] = cluster_data['Annual Income (k$)'].std() |
| |
| if 'Spending Score (1-100)' in cluster_data.columns: |
| profile['avg_spending'] = cluster_data['Spending Score (1-100)'].mean() |
| profile['spending_std'] = cluster_data['Spending Score (1-100)'].std() |
| |
| if 'Gender' in cluster_data.columns: |
| profile['gender_dist'] = cluster_data['Gender'].value_counts().to_dict() |
| |
| |
| if 'avg_income' in profile and 'avg_spending' in profile: |
| avg_income = profile['avg_income'] |
| avg_spending = profile['avg_spending'] |
| |
| if avg_income > 70 and avg_spending > 70: |
| profile['type'] = "π HIGH VALUE" |
| profile['description'] = "High income, high spending - Premium customers" |
| elif avg_income > 70 and avg_spending < 40: |
| profile['type'] = "πΌ CONSERVATIVE" |
| profile['description'] = "High income, low spending - Potential for upselling" |
| elif avg_income < 40 and avg_spending > 70: |
| profile['type'] = "π― BUDGET SPENDERS" |
| profile['description'] = "Low income, high spending - Price-sensitive loyal customers" |
| elif avg_income < 40 and avg_spending < 40: |
| profile['type'] = "π LOW ENGAGEMENT" |
| profile['description'] = "Low income, low spending - Need retention strategies" |
| else: |
| profile['type'] = "βοΈ BALANCED" |
| profile['description'] = "Moderate income and spending - Core customer base" |
| |
| profiles.append(profile) |
| |
| return profiles |
|
|