Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from sklearn.preprocessing import StandardScaler, MinMaxScaler | |
| from sklearn.decomposition import PCA | |
| from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN | |
| from sklearn.mixture import GaussianMixture | |
| from sklearn.metrics import silhouette_score, adjusted_rand_score | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import io | |
| # Function to load the dataset with st.spinner | |
| # Cache the data to speed up subsequent runs | |
| def load_data(): | |
| with st.spinner("Loading data..."): | |
| df = pd.read_csv("marketing_campaign.csv", delimiter='\t') | |
| return df | |
| def handle_mixed_types(df): | |
| for col in df.columns: | |
| unique_types = df[col].apply(type).unique() | |
| if len(unique_types) > 1: # Check if there are mixed types | |
| # If mixed numeric types (int and float), convert to float | |
| if all(issubclass(t, (int, float)) for t in unique_types): | |
| df[col] = df[col].astype(float) | |
| # Otherwise, convert to string (e.g., for mixed numeric and string types) | |
| else: | |
| df[col] = df[col].astype(str) | |
| return df | |
| def handle_nulls(df): | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| df[col] = df[col].fillna(df[col].mode()) # Explicit assignment for categorical | |
| else: | |
| df[col] = df[col].fillna(df[col].mean()) # Explicit assignment for numerical | |
| return df | |
| # Function to check data type consistency | |
| def check_data_types(df): | |
| df['Dt_Customer'] = pd.to_datetime(df['Dt_Customer'], dayfirst=True) | |
| return df | |
| # Function to visualize data distribution | |
| def visualize_data(df): | |
| st.subheader("Data Visualization") | |
| # Select top 3 columns with highest variance (excluding date and object types) | |
| numerical_df = df.select_dtypes(exclude=['object', 'datetime']) # Exclude datetime columns | |
| top_3_cols = numerical_df.var().sort_values(ascending=False).head(3).index.tolist() | |
| for col in top_3_cols: | |
| if df[col].dtype == 'object': | |
| plt.figure(figsize=(10, 5)) | |
| sns.countplot(x=col, data=df) | |
| plt.xticks(rotation=45) | |
| # Convert plot to image | |
| img = io.BytesIO() | |
| plt.savefig(img, format='png') | |
| img.seek(0) | |
| st.image(img) # Display the image | |
| else: | |
| plt.figure(figsize=(10, 5)) | |
| sns.histplot(x=col, data=df, kde=True) | |
| # Convert plot to image | |
| img = io.BytesIO() | |
| plt.savefig(img, format='png') | |
| img.seek(0) | |
| st.image(img) # Display the image | |
| # Function to preprocess data with PCA | |
| def preprocess_data_with_pca(df): | |
| st.subheader("Preprocessed Data with PCA") | |
| # One-hot encode categorical features | |
| categorical_cols = df.select_dtypes(include=['object']).columns.tolist() | |
| df_encoded = pd.get_dummies(df, columns=categorical_cols) | |
| # Drop 'Response' column for clustering | |
| X = df_encoded.drop(columns=['Response']) | |
| X['Dt_Customer_Year'] = X['Dt_Customer'].dt.year | |
| X['Dt_Customer_Month'] = X['Dt_Customer'].dt.month | |
| X = X.drop(columns=['Dt_Customer']) | |
| # MinMax scale numerical features | |
| scaler = MinMaxScaler() | |
| numerical_cols = X.select_dtypes(include=['number']).columns.tolist() | |
| X[numerical_cols] = scaler.fit_transform(X[numerical_cols]) | |
| # Apply PCA | |
| pca = PCA(n_components=0.95) # Retain 95% of variance | |
| X_pca = pca.fit_transform(X) | |
| st.write(pd.DataFrame(X_pca).head()) | |
| return X_pca, df['Response'] | |
| # Function to run K-Means clustering | |
| def run_kmeans(X, y_true): | |
| kmeans = KMeans(n_clusters=5, random_state=42) # Example: 5 clusters | |
| y_pred = kmeans.fit_predict(X) | |
| n_clusters = kmeans.n_clusters | |
| silhouette = silhouette_score(X, y_pred) | |
| # Check for number of unique labels before calculating Rand Index | |
| if len(set(y_pred)) > 1: | |
| rand_index = adjusted_rand_score(y_true, y_pred) | |
| else: | |
| rand_index = "N/A (Only one cluster found)" | |
| return n_clusters, silhouette, rand_index | |
| # Function to run Hierarchical clustering | |
| def run_hierarchical(X, y_true): | |
| hierarchical = AgglomerativeClustering(n_clusters=5) # Example: 5 clusters | |
| y_pred = hierarchical.fit_predict(X) | |
| n_clusters = hierarchical.n_clusters | |
| silhouette = silhouette_score(X, y_pred) | |
| # Check for number of unique labels before calculating Rand Index | |
| if len(set(y_pred)) > 1: | |
| rand_index = adjusted_rand_score(y_true, y_pred) | |
| else: | |
| rand_index = "N/A (Only one cluster found)" | |
| return n_clusters, silhouette, rand_index | |
| # Function to run DBSCAN clustering | |
| def run_dbscan(X, y_true): | |
| dbscan = DBSCAN(eps=0.5, min_samples=5) # Example parameters | |
| y_pred = dbscan.fit_predict(X) | |
| n_clusters = len(set(y_pred)) - (1 if -1 in y_pred else 0) # Adjust for noise | |
| # Check for number of unique labels before calculating Silhouette and Rand Index | |
| if n_clusters > 1: | |
| silhouette = silhouette_score(X, y_pred) | |
| rand_index = adjusted_rand_score(y_true, y_pred) | |
| else: | |
| silhouette = "N/A (Only one cluster found)" | |
| rand_index = "N/A (Only one cluster found)" | |
| return n_clusters, silhouette, rand_index | |
| # Function to run Gaussian Mixture clustering | |
| def run_gaussian_mixture(X, y_true): | |
| gaussian_mixture = GaussianMixture(n_components=5, random_state=42) # Example: 5 components | |
| y_pred = gaussian_mixture.fit_predict(X) | |
| n_clusters = gaussian_mixture.n_components | |
| silhouette = silhouette_score(X, y_pred) | |
| # Check for number of unique labels before calculating Rand Index | |
| if len(set(y_pred)) > 1: | |
| rand_index = adjusted_rand_score(y_true, y_pred) | |
| else: | |
| rand_index = "N/A (Only one cluster found)" | |
| return n_clusters, silhouette, rand_index | |
| # Main Streamlit app | |
| def main(): | |
| st.title("Customer Segmentation App") | |
| # Load data | |
| df = load_data() | |
| # Data cleaning and validation | |
| df = handle_mixed_types(df) | |
| df = handle_nulls(df) | |
| df = check_data_types(df) | |
| df = handle_mixed_types(df) | |
| # Visualize data | |
| visualize_data(df) | |
| # Preprocess data | |
| X_pca, y_true = preprocess_data_with_pca(df) | |
| # Create tabs | |
| tab1, tab2, tab3, tab4 = st.tabs(["K-Means", "Hierarchical", "DBSCAN", "Gaussian Mixture"]) | |
| # Tab 1: K-Means | |
| with tab1: | |
| n_clusters, silhouette, rand_index = run_kmeans(X_pca, y_true) | |
| st.write(f"Number of Clusters: {n_clusters}") | |
| st.write(f"Silhouette Score: {silhouette:.3f}") | |
| st.write(f"Rand Index: {rand_index}") | |
| # Tab 2: Hierarchical | |
| with tab2: | |
| n_clusters, silhouette, rand_index = run_hierarchical(X_pca, y_true) | |
| st.write(f"Number of Clusters: {n_clusters}") | |
| st.write(f"Silhouette Score: {silhouette:.3f}") | |
| st.write(f"Rand Index: {rand_index}") | |
| # Tab 3: DBSCAN | |
| with tab3: | |
| n_clusters, silhouette, rand_index = run_dbscan(X_pca, y_true) | |
| st.write(f"Number of Clusters: {n_clusters}") | |
| st.write(f"Silhouette Score: {silhouette}") | |
| st.write(f"Rand Index: {rand_index}") | |
| # Tab 4: Gaussian Mixture | |
| with tab4: | |
| n_clusters, silhouette, rand_index = run_gaussian_mixture(X_pca, y_true) | |
| st.write(f"Number of Clusters: {n_clusters}") | |
| st.write(f"Silhouette Score: {silhouette:.3f}") | |
| st.write(f"Rand Index: {rand_index}") | |
| if __name__ == "__main__": | |
| main() |