|
|
|
|
|
""" |
|
|
Simple KMeans demo with PCA for wine quality dataset |
|
|
""" |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import json |
|
|
|
|
|
def simple_pca(X, n_components=2): |
|
|
"""Simple PCA implementation""" |
|
|
|
|
|
X_centered = X - np.mean(X, axis=0) |
|
|
|
|
|
|
|
|
cov_matrix = np.cov(X_centered.T) |
|
|
|
|
|
|
|
|
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix) |
|
|
|
|
|
|
|
|
idx = np.argsort(eigenvalues)[::-1] |
|
|
eigenvalues = eigenvalues[idx] |
|
|
eigenvectors = eigenvectors[:, idx] |
|
|
|
|
|
|
|
|
components = eigenvectors[:, :n_components] |
|
|
|
|
|
|
|
|
X_pca = X_centered @ components |
|
|
|
|
|
|
|
|
explained_variance_ratio = eigenvalues[:n_components] / np.sum(eigenvalues) |
|
|
|
|
|
return X_pca, components, explained_variance_ratio |
|
|
|
|
|
def simple_kmeans(X, k, max_iters=100, random_state=42): |
|
|
"""Simple KMeans implementation""" |
|
|
np.random.seed(random_state) |
|
|
|
|
|
|
|
|
n_samples, n_features = X.shape |
|
|
centroids = X[np.random.choice(n_samples, k, replace=False)] |
|
|
|
|
|
for _ in range(max_iters): |
|
|
|
|
|
distances = np.sqrt(((X - centroids[:, np.newaxis])**2).sum(axis=2)) |
|
|
labels = np.argmin(distances, axis=0) |
|
|
|
|
|
|
|
|
new_centroids = np.array([X[labels == i].mean(axis=0) for i in range(k)]) |
|
|
|
|
|
|
|
|
if np.allclose(centroids, new_centroids): |
|
|
break |
|
|
|
|
|
centroids = new_centroids |
|
|
|
|
|
|
|
|
inertia = sum([np.sum((X[labels == i] - centroids[i])**2) for i in range(k)]) |
|
|
|
|
|
return labels, centroids, inertia |
|
|
|
|
|
def standardize_data(X): |
|
|
"""Standardize data to have mean=0 and std=1""" |
|
|
return (X - np.mean(X, axis=0)) / np.std(X, axis=0) |
|
|
|
|
|
def analyze_wine_data(): |
|
|
"""Main analysis function""" |
|
|
|
|
|
try: |
|
|
df = pd.read_csv('data/winequality-merged.csv') |
|
|
print(f"β Loaded dataset with {len(df)} samples") |
|
|
except Exception as e: |
|
|
print(f"β Error loading data: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
|
|
X = df[numeric_cols].values |
|
|
|
|
|
print(f"β Using {len(numeric_cols)} numeric features: {numeric_cols}") |
|
|
|
|
|
|
|
|
X_scaled = standardize_data(X) |
|
|
print("β Data standardized") |
|
|
|
|
|
|
|
|
X_pca, components, explained_var = simple_pca(X_scaled, n_components=2) |
|
|
print(f"β PCA applied - Explained variance: {explained_var[0]:.3f}, {explained_var[1]:.3f}") |
|
|
print(f"β Total variance explained: {sum(explained_var):.1%}") |
|
|
|
|
|
|
|
|
results = {} |
|
|
for k in [2, 3, 4, 5]: |
|
|
labels, centroids, inertia = simple_kmeans(X_scaled, k, random_state=42) |
|
|
|
|
|
|
|
|
centroids_centered = centroids - np.mean(X_scaled, axis=0) |
|
|
centroids_pca = centroids_centered @ components |
|
|
|
|
|
results[k] = { |
|
|
'labels': labels.tolist(), |
|
|
'centroids_pca': centroids_pca.tolist(), |
|
|
'inertia': inertia |
|
|
} |
|
|
|
|
|
print(f"β KMeans with k={k} - Inertia: {inertia:.2f}") |
|
|
|
|
|
|
|
|
output_data = { |
|
|
'pca_data': X_pca.tolist(), |
|
|
'explained_variance': explained_var.tolist(), |
|
|
'feature_names': numeric_cols, |
|
|
'wine_types': df['wine_type'].tolist() if 'wine_type' in df.columns else None, |
|
|
'kmeans_results': results, |
|
|
'pca_components': components.tolist() |
|
|
} |
|
|
|
|
|
|
|
|
with open('data/kmeans_results.json', 'w') as f: |
|
|
json.dump(output_data, f, indent=2) |
|
|
|
|
|
print("β Results saved to 'data/kmeans_results.json'") |
|
|
|
|
|
|
|
|
print("\n" + "="*50) |
|
|
print("π WINE QUALITY KMEANS ANALYSIS SUMMARY") |
|
|
print("="*50) |
|
|
print(f"Dataset: {len(df)} samples, {len(numeric_cols)} features") |
|
|
if 'wine_type' in df.columns: |
|
|
wine_counts = df['wine_type'].value_counts() |
|
|
print(f"Wine types: {dict(wine_counts)}") |
|
|
|
|
|
print(f"\nPCA Results:") |
|
|
print(f" β’ PC1 explains {explained_var[0]:.1%} of variance") |
|
|
print(f" β’ PC2 explains {explained_var[1]:.1%} of variance") |
|
|
print(f" β’ Total: {sum(explained_var):.1%} of variance captured") |
|
|
|
|
|
print(f"\nKMeans Results:") |
|
|
for k, result in results.items(): |
|
|
print(f" β’ K={k}: Inertia = {result['inertia']:.2f}") |
|
|
|
|
|
|
|
|
print(f"\nTop 5 features for PC1:") |
|
|
feature_importance = [(abs(components[i, 0]), numeric_cols[i], components[i, 0]) |
|
|
for i in range(len(numeric_cols))] |
|
|
feature_importance.sort(reverse=True) |
|
|
|
|
|
for i, (abs_val, feature, val) in enumerate(feature_importance[:5]): |
|
|
print(f" {i+1}. {feature}: {val:.3f}") |
|
|
|
|
|
return output_data |
|
|
|
|
|
if __name__ == "__main__": |
|
|
analyze_wine_data() |
|
|
|