Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import streamlit as st | |
| from scipy.sparse import csr_matrix, issparse | |
| from scipy.sparse.linalg import svds | |
| from sklearn.decomposition import NMF | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| from implicit.als import AlternatingLeastSquares | |
| def train_svd(_train_sparse, n_factors: int = 50): | |
| """Truncated SVD on the user-item matrix.""" | |
| k = min(n_factors, min(_train_sparse.shape) - 1) | |
| U, sigma, Vt = svds(_train_sparse.astype(float), k=k) | |
| sigma_diag = np.diag(sigma) | |
| predicted = U @ sigma_diag @ Vt | |
| return predicted, U, sigma, Vt | |
| def train_als(_train_sparse, n_factors: int = 50, iterations: int = 15, regularization: float = 0.1): | |
| """ | |
| Train ALS using the implicit library (industry-standard implementation). | |
| Uses Alternating Least Squares for implicit feedback collaborative filtering. | |
| The confidence weighting formula is: C = 1 + alpha * R | |
| We use alpha=1.0 (library default) which is more conservative than alpha=40. | |
| """ | |
| model = AlternatingLeastSquares( | |
| factors=n_factors, | |
| iterations=iterations, | |
| regularization=regularization, | |
| alpha=1.0, # Confidence scaling factor (default, conservative) | |
| random_state=42 | |
| ) | |
| # implicit library expects item-user matrix (transposed) | |
| model.fit(_train_sparse.T) | |
| # Return wrapper with correctly named attributes | |
| # After fitting on transposed matrix, user_factors are actually item factors and vice versa | |
| class ALSModel: | |
| def __init__(self, implicit_model): | |
| # Swap the factors back to match our expected interface | |
| self.user_factors = implicit_model.item_factors # These are actually user factors | |
| self.item_factors = implicit_model.user_factors # These are actually item factors | |
| return ALSModel(model) | |
| def train_sgd( | |
| _train_sparse, | |
| n_factors: int = 50, | |
| lr: float = 0.001, | |
| reg: float = 0.02, | |
| n_epochs: int = 100, | |
| ): | |
| """SGD-based matrix factorization with bias terms (Funk SVD). | |
| predicted = mu + b_u + b_i + u . v | |
| Uses log1p-transformed values internally to stabilize training on | |
| purchase-count data where raw values can be large and variable. | |
| """ | |
| if issparse(_train_sparse): | |
| rows, cols = _train_sparse.nonzero() | |
| raw_vals = np.asarray(_train_sparse[rows, cols]).ravel().astype(np.float64) | |
| else: | |
| mat = np.asarray(_train_sparse, dtype=np.float64) | |
| rows, cols = np.nonzero(mat) | |
| raw_vals = mat[rows, cols] | |
| vals = np.log1p(raw_vals) | |
| n_users = _train_sparse.shape[0] | |
| n_items = _train_sparse.shape[1] | |
| mu = float(vals.mean()) | |
| rng = np.random.RandomState(42) | |
| scale = 0.01 | |
| U = rng.normal(0, scale, (n_users, n_factors)) | |
| V = rng.normal(0, scale, (n_items, n_factors)) | |
| b_u = np.zeros(n_users) | |
| b_i = np.zeros(n_items) | |
| clip = 5.0 | |
| for _ in range(n_epochs): | |
| order = rng.permutation(len(vals)) | |
| for idx in order: | |
| u, i, r = int(rows[idx]), int(cols[idx]), vals[idx] | |
| pred = mu + b_u[u] + b_i[i] + U[u] @ V[i] | |
| err = np.clip(r - pred, -clip, clip) | |
| b_u[u] += lr * (err - reg * b_u[u]) | |
| b_i[i] += lr * (err - reg * b_i[i]) | |
| U_old = U[u].copy() | |
| U[u] += lr * (err * V[i] - reg * U[u]) | |
| V[i] += lr * (err * U_old - reg * V[i]) | |
| predicted_log = mu + b_u[:, None] + b_i[None, :] + U @ V.T | |
| predicted = np.expm1(predicted_log) | |
| return predicted, U, V, b_u, b_i, mu | |
| def train_nmf(_train_dense, n_factors: int = 50, max_iter: int = 200): | |
| """Non-negative Matrix Factorization.""" | |
| model = NMF(n_components=n_factors, init="nndsvda", random_state=42, max_iter=max_iter) | |
| W = model.fit_transform(_train_dense) | |
| H = model.components_ | |
| predicted = W @ H | |
| return predicted, W, H, model | |
| def train_item_cf(_train_sparse, top_k_similar: int = 20): | |
| """Item-based collaborative filtering using cosine similarity.""" | |
| item_sim = cosine_similarity(_train_sparse.T) | |
| np.fill_diagonal(item_sim, 0) | |
| for i in range(item_sim.shape[0]): | |
| row = item_sim[i] | |
| threshold = np.partition(row, -top_k_similar)[-top_k_similar] if len(row) > top_k_similar else 0 | |
| row[row < threshold] = 0 | |
| item_sim_sparse = item_sim | |
| train_dense = _train_sparse.toarray() if hasattr(_train_sparse, 'toarray') else np.array(_train_sparse) | |
| predicted = train_dense @ item_sim_sparse | |
| return predicted, item_sim_sparse | |
| def get_top_n_recommendations(predicted_scores, train_matrix, user_idx: int, n: int = 10): | |
| """Get top-N item indices for a user, excluding already-purchased items.""" | |
| scores = predicted_scores[user_idx].copy() | |
| if hasattr(train_matrix, 'toarray'): | |
| purchased = train_matrix.toarray()[user_idx] | |
| else: | |
| purchased = np.array(train_matrix.iloc[user_idx] if hasattr(train_matrix, 'iloc') else train_matrix[user_idx]) | |
| scores[purchased > 0] = -np.inf | |
| top_items = np.argsort(scores)[::-1][:n] | |
| return top_items, scores[top_items] | |
| def evaluate_recommendations(predicted_scores, train_matrix, test_df, user_index, item_columns, k: int = 10): | |
| """ | |
| Compute recommendation metrics including: | |
| - Precision@K, Recall@K, Hit Rate (ranking metrics) | |
| - RMSE, R² (prediction accuracy metrics) | |
| """ | |
| if hasattr(train_matrix, 'toarray'): | |
| train_dense = train_matrix.toarray() | |
| else: | |
| train_dense = np.array(train_matrix) | |
| item_to_col = {item: i for i, item in enumerate(item_columns)} | |
| precisions = [] | |
| recalls = [] | |
| hits = 0 | |
| n_users = 0 | |
| # For RMSE and R² calculation | |
| y_true = [] | |
| y_pred = [] | |
| for cust_id, group in test_df.groupby("CustomerID"): | |
| if cust_id not in user_index: | |
| continue | |
| user_idx = user_index[cust_id] | |
| if user_idx >= predicted_scores.shape[0]: | |
| continue | |
| true_items = set() | |
| for _, row in group.iterrows(): | |
| if row["StockCode"] in item_to_col: | |
| item_idx = item_to_col[row["StockCode"]] | |
| true_items.add(item_idx) | |
| # Collect actual vs predicted quantities for RMSE/R² | |
| actual_quantity = row["score"] | |
| predicted_quantity = predicted_scores[user_idx, item_idx] | |
| y_true.append(actual_quantity) | |
| y_pred.append(predicted_quantity) | |
| if not true_items: | |
| continue | |
| top_items, _ = get_top_n_recommendations(predicted_scores, train_dense, user_idx, n=k) | |
| recommended_set = set(top_items) | |
| hit_count = len(recommended_set & true_items) | |
| precisions.append(hit_count / k) | |
| recalls.append(hit_count / len(true_items) if true_items else 0) | |
| if hit_count > 0: | |
| hits += 1 | |
| n_users += 1 | |
| if n_users == 0: | |
| return { | |
| "Precision@K": 0.0, | |
| "Recall@K": 0.0, | |
| "Hit Rate": 0.0, | |
| "RMSE": 0.0, | |
| "R²": 0.0, | |
| "Users Evaluated": 0 | |
| } | |
| # Calculate RMSE and R² | |
| rmse = 0.0 | |
| r2 = 0.0 | |
| if len(y_true) > 0: | |
| y_true_arr = np.array(y_true) | |
| y_pred_arr = np.array(y_pred) | |
| # Clip predictions to reasonable range to avoid extreme errors | |
| y_pred_arr = np.clip(y_pred_arr, 0, np.percentile(y_true_arr, 99)) | |
| rmse = float(np.sqrt(mean_squared_error(y_true_arr, y_pred_arr))) | |
| # R² can be negative if model is worse than mean baseline | |
| # We calculate it but note that negative values indicate poor fit | |
| r2 = float(r2_score(y_true_arr, y_pred_arr)) | |
| return { | |
| "Precision@K": float(np.mean(precisions)), | |
| "Recall@K": float(np.mean(recalls)), | |
| "Hit Rate": float(hits / n_users), | |
| "RMSE": rmse, | |
| "R²": r2, | |
| "Users Evaluated": int(n_users), | |
| } | |