File size: 5,811 Bytes
5e2aaa0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | """
Data Loading and Preprocessing Module
====================================
This module handles data loading, preprocessing, and validation for customer segmentation.
"""
import pandas as pd
import numpy as np
import os
from sklearn.preprocessing import StandardScaler
import streamlit as st
class DataLoader:
"""
Handles data loading and preprocessing for customer segmentation analysis.
"""
def __init__(self):
self.data = None
self.scaled_data = None
self.scaler = StandardScaler()
self.feature_names = None
def create_sample_dataset(self, n_customers=200):
"""Create a realistic sample Mall Customers dataset."""
np.random.seed(42)
customer_ids = range(1, n_customers + 1)
# Gender distribution (approximately 56% Female, 44% Male)
genders = np.random.choice(['Male', 'Female'], n_customers, p=[0.44, 0.56])
# Age distribution (mean ~39, std ~14)
ages = np.random.normal(38.85, 13.97, n_customers).astype(int)
ages = np.clip(ages, 18, 70)
# Create realistic income distribution (mean ~61k, std ~26k)
annual_incomes = np.random.normal(60.56, 26.26, n_customers)
annual_incomes = np.clip(annual_incomes, 15, 137)
# Create spending scores with realistic patterns
base_spending = np.random.normal(50, 25, n_customers)
# Add some income correlation
income_normalized = (annual_incomes - annual_incomes.min()) / (annual_incomes.max() - annual_incomes.min())
income_effect = (income_normalized - 0.5) * 30
# Add age effect
age_normalized = (ages - ages.min()) / (ages.max() - ages.min())
age_effect = np.where(age_normalized < 0.3, 10,
np.where(age_normalized > 0.7, -5, 0))
spending_scores = base_spending + income_effect * 0.6 + age_effect + np.random.normal(0, 10, n_customers)
spending_scores = np.clip(spending_scores, 1, 100)
# Create DataFrame
sample_data = pd.DataFrame({
'CustomerID': customer_ids,
'Gender': genders,
'Age': ages,
'Annual Income (k$)': annual_incomes.round().astype(int),
'Spending Score (1-100)': spending_scores.round().astype(int)
})
return sample_data
def load_data(self, file_path=None):
"""Load customer data from file or create sample data."""
# Check for default dataset location first
default_path = os.path.join("data", "Mall_Customers.csv")
if file_path and os.path.exists(file_path):
try:
self.data = pd.read_csv(file_path)
st.success(f"β
Data loaded successfully from {file_path}")
return self.data
except Exception as e:
st.error(f"Error loading data: {e}")
return None
elif os.path.exists(default_path):
try:
self.data = pd.read_csv(default_path)
st.success(f"β
Mall Customers dataset loaded from {default_path}")
return self.data
except Exception as e:
st.error(f"Error loading default dataset: {e}")
return None
else:
# Create sample data
self.data = self.create_sample_dataset()
st.info("π Using generated sample dataset (Mall Customer simulation)")
# Save the sample data for future use
try:
os.makedirs("data", exist_ok=True)
self.data.to_csv(default_path, index=False)
st.info(f"πΎ Sample dataset saved to {default_path}")
except Exception as e:
st.warning(f"Could not save sample dataset: {e}")
return self.data
def get_data_info(self):
"""Get comprehensive data information."""
if self.data is None:
return None
info = {
'shape': self.data.shape,
'columns': list(self.data.columns),
'dtypes': self.data.dtypes.to_dict(),
'missing_values': self.data.isnull().sum().to_dict(),
'statistics': self.data.describe().to_dict()
}
return info
def preprocess_data(self, features=None):
"""Preprocess and scale data for clustering."""
if self.data is None:
st.error("No data loaded. Please load data first.")
return None
# Default features for clustering
if features is None:
features = ['Annual Income (k$)', 'Spending Score (1-100)']
# Check if features exist in data
available_features = [f for f in features if f in self.data.columns]
if not available_features:
st.error(f"None of the specified features {features} found in data.")
return None
# Extract features for clustering
X = self.data[available_features].copy()
# Handle missing values if any
if X.isnull().sum().sum() > 0:
X = X.fillna(X.mean())
st.warning("Missing values filled with mean values.")
# Scale the features
self.scaled_data = self.scaler.fit_transform(X)
self.feature_names = available_features
st.success(f"β
Data preprocessed successfully using features: {available_features}")
return self.scaled_data
def get_feature_data(self):
"""Get the original feature data."""
if self.data is None or self.feature_names is None:
return None
return self.data[self.feature_names]
|