xxnithicxx's picture
Add KMeans clustering analysis for wine quality dataset
ebcdddb
#!/usr/bin/env python3
"""
Simple KMeans demo with PCA for wine quality dataset
"""
import pandas as pd
import numpy as np
import json
def simple_pca(X, n_components=2):
"""Simple PCA implementation"""
# Center the data
X_centered = X - np.mean(X, axis=0)
# Compute covariance matrix
cov_matrix = np.cov(X_centered.T)
# Compute eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# Sort by eigenvalues (descending)
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# Select top n_components
components = eigenvectors[:, :n_components]
# Transform data
X_pca = X_centered @ components
# Calculate explained variance ratio
explained_variance_ratio = eigenvalues[:n_components] / np.sum(eigenvalues)
return X_pca, components, explained_variance_ratio
def simple_kmeans(X, k, max_iters=100, random_state=42):
"""Simple KMeans implementation"""
np.random.seed(random_state)
# Initialize centroids randomly
n_samples, n_features = X.shape
centroids = X[np.random.choice(n_samples, k, replace=False)]
for _ in range(max_iters):
# Assign points to closest centroid
distances = np.sqrt(((X - centroids[:, np.newaxis])**2).sum(axis=2))
labels = np.argmin(distances, axis=0)
# Update centroids
new_centroids = np.array([X[labels == i].mean(axis=0) for i in range(k)])
# Check for convergence
if np.allclose(centroids, new_centroids):
break
centroids = new_centroids
# Calculate inertia
inertia = sum([np.sum((X[labels == i] - centroids[i])**2) for i in range(k)])
return labels, centroids, inertia
def standardize_data(X):
"""Standardize data to have mean=0 and std=1"""
return (X - np.mean(X, axis=0)) / np.std(X, axis=0)
def analyze_wine_data():
"""Main analysis function"""
# Load data
try:
df = pd.read_csv('data/winequality-merged.csv')
print(f"βœ“ Loaded dataset with {len(df)} samples")
except Exception as e:
print(f"❌ Error loading data: {e}")
return None
# Prepare features (exclude wine_type)
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
X = df[numeric_cols].values
print(f"βœ“ Using {len(numeric_cols)} numeric features: {numeric_cols}")
# Standardize data
X_scaled = standardize_data(X)
print("βœ“ Data standardized")
# Apply PCA
X_pca, components, explained_var = simple_pca(X_scaled, n_components=2)
print(f"βœ“ PCA applied - Explained variance: {explained_var[0]:.3f}, {explained_var[1]:.3f}")
print(f"βœ“ Total variance explained: {sum(explained_var):.1%}")
# Apply KMeans with different K values
results = {}
for k in [2, 3, 4, 5]:
labels, centroids, inertia = simple_kmeans(X_scaled, k, random_state=42)
# Transform centroids to PCA space
centroids_centered = centroids - np.mean(X_scaled, axis=0)
centroids_pca = centroids_centered @ components
results[k] = {
'labels': labels.tolist(),
'centroids_pca': centroids_pca.tolist(),
'inertia': inertia
}
print(f"βœ“ KMeans with k={k} - Inertia: {inertia:.2f}")
# Prepare output data
output_data = {
'pca_data': X_pca.tolist(),
'explained_variance': explained_var.tolist(),
'feature_names': numeric_cols,
'wine_types': df['wine_type'].tolist() if 'wine_type' in df.columns else None,
'kmeans_results': results,
'pca_components': components.tolist()
}
# Save results
with open('data/kmeans_results.json', 'w') as f:
json.dump(output_data, f, indent=2)
print("βœ“ Results saved to 'data/kmeans_results.json'")
# Print summary
print("\n" + "="*50)
print("πŸ“Š WINE QUALITY KMEANS ANALYSIS SUMMARY")
print("="*50)
print(f"Dataset: {len(df)} samples, {len(numeric_cols)} features")
if 'wine_type' in df.columns:
wine_counts = df['wine_type'].value_counts()
print(f"Wine types: {dict(wine_counts)}")
print(f"\nPCA Results:")
print(f" β€’ PC1 explains {explained_var[0]:.1%} of variance")
print(f" β€’ PC2 explains {explained_var[1]:.1%} of variance")
print(f" β€’ Total: {sum(explained_var):.1%} of variance captured")
print(f"\nKMeans Results:")
for k, result in results.items():
print(f" β€’ K={k}: Inertia = {result['inertia']:.2f}")
# Feature importance in PCA
print(f"\nTop 5 features for PC1:")
feature_importance = [(abs(components[i, 0]), numeric_cols[i], components[i, 0])
for i in range(len(numeric_cols))]
feature_importance.sort(reverse=True)
for i, (abs_val, feature, val) in enumerate(feature_importance[:5]):
print(f" {i+1}. {feature}: {val:.3f}")
return output_data
if __name__ == "__main__":
analyze_wine_data()