#!/usr/bin/env python3 """ Simple KMeans demo with PCA for wine quality dataset """ import pandas as pd import numpy as np import json def simple_pca(X, n_components=2): """Simple PCA implementation""" # Center the data X_centered = X - np.mean(X, axis=0) # Compute covariance matrix cov_matrix = np.cov(X_centered.T) # Compute eigenvalues and eigenvectors eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix) # Sort by eigenvalues (descending) idx = np.argsort(eigenvalues)[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx] # Select top n_components components = eigenvectors[:, :n_components] # Transform data X_pca = X_centered @ components # Calculate explained variance ratio explained_variance_ratio = eigenvalues[:n_components] / np.sum(eigenvalues) return X_pca, components, explained_variance_ratio def simple_kmeans(X, k, max_iters=100, random_state=42): """Simple KMeans implementation""" np.random.seed(random_state) # Initialize centroids randomly n_samples, n_features = X.shape centroids = X[np.random.choice(n_samples, k, replace=False)] for _ in range(max_iters): # Assign points to closest centroid distances = np.sqrt(((X - centroids[:, np.newaxis])**2).sum(axis=2)) labels = np.argmin(distances, axis=0) # Update centroids new_centroids = np.array([X[labels == i].mean(axis=0) for i in range(k)]) # Check for convergence if np.allclose(centroids, new_centroids): break centroids = new_centroids # Calculate inertia inertia = sum([np.sum((X[labels == i] - centroids[i])**2) for i in range(k)]) return labels, centroids, inertia def standardize_data(X): """Standardize data to have mean=0 and std=1""" return (X - np.mean(X, axis=0)) / np.std(X, axis=0) def analyze_wine_data(): """Main analysis function""" # Load data try: df = pd.read_csv('data/winequality-merged.csv') print(f"✓ Loaded dataset with {len(df)} samples") except Exception as e: print(f"❌ Error loading data: {e}") return None # Prepare features (exclude wine_type) numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() X = df[numeric_cols].values print(f"✓ Using {len(numeric_cols)} numeric features: {numeric_cols}") # Standardize data X_scaled = standardize_data(X) print("✓ Data standardized") # Apply PCA X_pca, components, explained_var = simple_pca(X_scaled, n_components=2) print(f"✓ PCA applied - Explained variance: {explained_var[0]:.3f}, {explained_var[1]:.3f}") print(f"✓ Total variance explained: {sum(explained_var):.1%}") # Apply KMeans with different K values results = {} for k in [2, 3, 4, 5]: labels, centroids, inertia = simple_kmeans(X_scaled, k, random_state=42) # Transform centroids to PCA space centroids_centered = centroids - np.mean(X_scaled, axis=0) centroids_pca = centroids_centered @ components results[k] = { 'labels': labels.tolist(), 'centroids_pca': centroids_pca.tolist(), 'inertia': inertia } print(f"✓ KMeans with k={k} - Inertia: {inertia:.2f}") # Prepare output data output_data = { 'pca_data': X_pca.tolist(), 'explained_variance': explained_var.tolist(), 'feature_names': numeric_cols, 'wine_types': df['wine_type'].tolist() if 'wine_type' in df.columns else None, 'kmeans_results': results, 'pca_components': components.tolist() } # Save results with open('data/kmeans_results.json', 'w') as f: json.dump(output_data, f, indent=2) print("✓ Results saved to 'data/kmeans_results.json'") # Print summary print("\n" + "="*50) print("📊 WINE QUALITY KMEANS ANALYSIS SUMMARY") print("="*50) print(f"Dataset: {len(df)} samples, {len(numeric_cols)} features") if 'wine_type' in df.columns: wine_counts = df['wine_type'].value_counts() print(f"Wine types: {dict(wine_counts)}") print(f"\nPCA Results:") print(f" • PC1 explains {explained_var[0]:.1%} of variance") print(f" • PC2 explains {explained_var[1]:.1%} of variance") print(f" • Total: {sum(explained_var):.1%} of variance captured") print(f"\nKMeans Results:") for k, result in results.items(): print(f" • K={k}: Inertia = {result['inertia']:.2f}") # Feature importance in PCA print(f"\nTop 5 features for PC1:") feature_importance = [(abs(components[i, 0]), numeric_cols[i], components[i, 0]) for i in range(len(numeric_cols))] feature_importance.sort(reverse=True) for i, (abs_val, feature, val) in enumerate(feature_importance[:5]): print(f" {i+1}. {feature}: {val:.3f}") return output_data if __name__ == "__main__": analyze_wine_data()