Spaces:
Sleeping
Sleeping
| """ | |
| Clustering Analysis Module | |
| ========================= | |
| This module implements various clustering algorithms for customer segmentation. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.cluster import KMeans, DBSCAN | |
| from sklearn.metrics import silhouette_score, calinski_harabasz_score | |
| import streamlit as st | |
| class ClusteringAnalyzer: | |
| """ | |
| Handles clustering analysis for customer segmentation. | |
| """ | |
| def __init__(self): | |
| self.kmeans_model = None | |
| self.dbscan_model = None | |
| self.optimal_clusters = None | |
| self.cluster_labels = {} | |
| def find_optimal_clusters(self, scaled_data, max_clusters=10): | |
| """Find optimal number of clusters using multiple methods.""" | |
| if scaled_data is None: | |
| st.error("No scaled data available. Please preprocess data first.") | |
| return None | |
| cluster_range = range(2, max_clusters + 1) | |
| inertias = [] | |
| silhouette_scores = [] | |
| calinski_scores = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| for i, k in enumerate(cluster_range): | |
| status_text.text(f'Evaluating {k} clusters...') | |
| kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) | |
| cluster_labels = kmeans.fit_predict(scaled_data) | |
| inertias.append(kmeans.inertia_) | |
| silhouette_scores.append(silhouette_score(scaled_data, cluster_labels)) | |
| calinski_scores.append(calinski_harabasz_score(scaled_data, cluster_labels)) | |
| progress_bar.progress((i + 1) / len(cluster_range)) | |
| status_text.text('Optimization complete!') | |
| # Find optimal clusters based on silhouette score | |
| optimal_silhouette = cluster_range[np.argmax(silhouette_scores)] | |
| optimal_calinski = cluster_range[np.argmax(calinski_scores)] | |
| # Store results | |
| self.optimization_results = { | |
| 'cluster_range': list(cluster_range), | |
| 'inertias': inertias, | |
| 'silhouette_scores': silhouette_scores, | |
| 'calinski_scores': calinski_scores, | |
| 'optimal_silhouette': optimal_silhouette, | |
| 'optimal_calinski': optimal_calinski | |
| } | |
| self.optimal_clusters = optimal_silhouette | |
| st.success(f"✅ Optimal clusters found: {self.optimal_clusters} (based on Silhouette Score)") | |
| return self.optimization_results | |
| def apply_kmeans(self, scaled_data, n_clusters=None): | |
| """Apply K-Means clustering.""" | |
| if scaled_data is None: | |
| st.error("No scaled data available. Please preprocess data first.") | |
| return None | |
| if n_clusters is None: | |
| n_clusters = self.optimal_clusters or 5 | |
| with st.spinner(f'Applying K-Means clustering with {n_clusters} clusters...'): | |
| self.kmeans_model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
| kmeans_labels = self.kmeans_model.fit_predict(scaled_data) | |
| # Calculate metrics | |
| silhouette_avg = silhouette_score(scaled_data, kmeans_labels) | |
| calinski_score = calinski_harabasz_score(scaled_data, kmeans_labels) | |
| self.cluster_labels['kmeans'] = kmeans_labels | |
| results = { | |
| 'labels': kmeans_labels, | |
| 'n_clusters': n_clusters, | |
| 'silhouette_score': silhouette_avg, | |
| 'calinski_score': calinski_score, | |
| 'inertia': self.kmeans_model.inertia_, | |
| 'centers': self.kmeans_model.cluster_centers_ | |
| } | |
| st.success(f"✅ K-Means clustering completed!") | |
| st.info(f"Silhouette Score: {silhouette_avg:.3f} | Calinski-Harabasz Score: {calinski_score:.3f}") | |
| return results | |
| def apply_dbscan(self, scaled_data, eps=0.5, min_samples=5): | |
| """Apply DBSCAN clustering.""" | |
| if scaled_data is None: | |
| st.error("No scaled data available. Please preprocess data first.") | |
| return None | |
| with st.spinner(f'Applying DBSCAN clustering (eps={eps}, min_samples={min_samples})...'): | |
| self.dbscan_model = DBSCAN(eps=eps, min_samples=min_samples) | |
| dbscan_labels = self.dbscan_model.fit_predict(scaled_data) | |
| # Calculate metrics | |
| n_clusters = len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0) | |
| n_noise = list(dbscan_labels).count(-1) | |
| self.cluster_labels['dbscan'] = dbscan_labels | |
| results = { | |
| 'labels': dbscan_labels, | |
| 'n_clusters': n_clusters, | |
| 'n_noise': n_noise, | |
| 'eps': eps, | |
| 'min_samples': min_samples | |
| } | |
| # Calculate silhouette score only if we have more than 1 cluster and non-noise points | |
| if n_clusters > 1: | |
| non_noise_mask = dbscan_labels != -1 | |
| if np.sum(non_noise_mask) > 1: | |
| silhouette_avg = silhouette_score(scaled_data[non_noise_mask], | |
| dbscan_labels[non_noise_mask]) | |
| results['silhouette_score'] = silhouette_avg | |
| st.success(f"✅ DBSCAN clustering completed!") | |
| st.info(f"Clusters: {n_clusters} | Noise points: {n_noise}") | |
| return results | |
| def analyze_clusters(self, data, algorithm='kmeans'): | |
| """Analyze cluster characteristics.""" | |
| # Normalize algorithm name | |
| algo_key = algorithm.lower().replace('-', '').replace(' ', '') | |
| if algo_key not in self.cluster_labels: | |
| st.error(f"No {algorithm} clustering results found. Please run clustering first.") | |
| return None | |
| cluster_labels = self.cluster_labels[algo_key] | |
| # Create consistent column name (use the format that actually gets created) | |
| if algo_key == 'kmeans': | |
| cluster_col = 'Kmeans_Cluster' # Match what we see in the error | |
| elif algo_key == 'dbscan': | |
| cluster_col = 'DBSCAN_Cluster' | |
| else: | |
| cluster_col = f'{algorithm}_Cluster' | |
| # Add cluster labels to data | |
| analysis_data = data.copy() | |
| analysis_data[cluster_col] = cluster_labels | |
| # Calculate cluster statistics | |
| numeric_cols = analysis_data.select_dtypes(include=[np.number]).columns | |
| numeric_cols = [col for col in numeric_cols if not col.endswith('_Cluster')] | |
| cluster_stats = analysis_data.groupby(cluster_col)[numeric_cols].agg(['mean', 'std', 'count']) | |
| # Calculate spending analysis if available | |
| spending_analysis = None | |
| if 'Spending Score (1-100)' in analysis_data.columns: | |
| spending_analysis = analysis_data.groupby(cluster_col)['Spending Score (1-100)'].agg(['mean', 'std', 'min', 'max', 'count']) | |
| results = { | |
| 'data_with_clusters': analysis_data, | |
| 'cluster_stats': cluster_stats, | |
| 'spending_analysis': spending_analysis, | |
| 'cluster_distribution': analysis_data[cluster_col].value_counts().sort_index() | |
| } | |
| return results | |
| def get_cluster_profiles(self, data, algorithm='kmeans'): | |
| """Generate customer profiles for each cluster.""" | |
| # Normalize algorithm name | |
| algo_key = algorithm.lower().replace('-', '').replace(' ', '') | |
| if algo_key not in self.cluster_labels: | |
| return None | |
| cluster_labels = self.cluster_labels[algo_key] | |
| # Create consistent column name (use the format that actually gets created) | |
| if algo_key == 'kmeans': | |
| cluster_col = 'Kmeans_Cluster' # Match what we see in the error | |
| elif algo_key == 'dbscan': | |
| cluster_col = 'DBSCAN_Cluster' | |
| else: | |
| cluster_col = f'{algorithm}_Cluster' | |
| analysis_data = data.copy() | |
| analysis_data[cluster_col] = cluster_labels | |
| profiles = [] | |
| for cluster in sorted(analysis_data[cluster_col].unique()): | |
| if cluster == -1: # Skip noise points in DBSCAN | |
| continue | |
| cluster_data = analysis_data[analysis_data[cluster_col] == cluster] | |
| profile = { | |
| 'cluster': cluster, | |
| 'size': len(cluster_data), | |
| 'percentage': len(cluster_data) / len(analysis_data) * 100 | |
| } | |
| # Add feature statistics | |
| if 'Age' in cluster_data.columns: | |
| profile['avg_age'] = cluster_data['Age'].mean() | |
| profile['age_std'] = cluster_data['Age'].std() | |
| if 'Annual Income (k$)' in cluster_data.columns: | |
| profile['avg_income'] = cluster_data['Annual Income (k$)'].mean() | |
| profile['income_std'] = cluster_data['Annual Income (k$)'].std() | |
| if 'Spending Score (1-100)' in cluster_data.columns: | |
| profile['avg_spending'] = cluster_data['Spending Score (1-100)'].mean() | |
| profile['spending_std'] = cluster_data['Spending Score (1-100)'].std() | |
| if 'Gender' in cluster_data.columns: | |
| profile['gender_dist'] = cluster_data['Gender'].value_counts().to_dict() | |
| # Generate profile characterization | |
| if 'avg_income' in profile and 'avg_spending' in profile: | |
| avg_income = profile['avg_income'] | |
| avg_spending = profile['avg_spending'] | |
| if avg_income > 70 and avg_spending > 70: | |
| profile['type'] = "💎 HIGH VALUE" | |
| profile['description'] = "High income, high spending - Premium customers" | |
| elif avg_income > 70 and avg_spending < 40: | |
| profile['type'] = "💼 CONSERVATIVE" | |
| profile['description'] = "High income, low spending - Potential for upselling" | |
| elif avg_income < 40 and avg_spending > 70: | |
| profile['type'] = "🎯 BUDGET SPENDERS" | |
| profile['description'] = "Low income, high spending - Price-sensitive loyal customers" | |
| elif avg_income < 40 and avg_spending < 40: | |
| profile['type'] = "📉 LOW ENGAGEMENT" | |
| profile['description'] = "Low income, low spending - Need retention strategies" | |
| else: | |
| profile['type'] = "⚖️ BALANCED" | |
| profile['description'] = "Moderate income and spending - Core customer base" | |
| profiles.append(profile) | |
| return profiles | |