import numpy as np import streamlit as st from scipy.sparse import csr_matrix, issparse from scipy.sparse.linalg import svds from sklearn.decomposition import NMF from sklearn.metrics.pairwise import cosine_similarity from sklearn.metrics import mean_squared_error, r2_score from implicit.als import AlternatingLeastSquares @st.cache_resource def train_svd(_train_sparse, n_factors: int = 50): """Truncated SVD on the user-item matrix.""" k = min(n_factors, min(_train_sparse.shape) - 1) U, sigma, Vt = svds(_train_sparse.astype(float), k=k) sigma_diag = np.diag(sigma) predicted = U @ sigma_diag @ Vt return predicted, U, sigma, Vt @st.cache_resource def train_als(_train_sparse, n_factors: int = 50, iterations: int = 15, regularization: float = 0.1): """ Train ALS using the implicit library (industry-standard implementation). Uses Alternating Least Squares for implicit feedback collaborative filtering. The confidence weighting formula is: C = 1 + alpha * R We use alpha=1.0 (library default) which is more conservative than alpha=40. """ model = AlternatingLeastSquares( factors=n_factors, iterations=iterations, regularization=regularization, alpha=1.0, # Confidence scaling factor (default, conservative) random_state=42 ) # implicit library expects item-user matrix (transposed) model.fit(_train_sparse.T) # Return wrapper with correctly named attributes # After fitting on transposed matrix, user_factors are actually item factors and vice versa class ALSModel: def __init__(self, implicit_model): # Swap the factors back to match our expected interface self.user_factors = implicit_model.item_factors # These are actually user factors self.item_factors = implicit_model.user_factors # These are actually item factors return ALSModel(model) @st.cache_resource def train_sgd( _train_sparse, n_factors: int = 50, lr: float = 0.001, reg: float = 0.02, n_epochs: int = 100, ): """SGD-based matrix factorization with bias terms (Funk SVD). predicted = mu + b_u + b_i + u . v Uses log1p-transformed values internally to stabilize training on purchase-count data where raw values can be large and variable. """ if issparse(_train_sparse): rows, cols = _train_sparse.nonzero() raw_vals = np.asarray(_train_sparse[rows, cols]).ravel().astype(np.float64) else: mat = np.asarray(_train_sparse, dtype=np.float64) rows, cols = np.nonzero(mat) raw_vals = mat[rows, cols] vals = np.log1p(raw_vals) n_users = _train_sparse.shape[0] n_items = _train_sparse.shape[1] mu = float(vals.mean()) rng = np.random.RandomState(42) scale = 0.01 U = rng.normal(0, scale, (n_users, n_factors)) V = rng.normal(0, scale, (n_items, n_factors)) b_u = np.zeros(n_users) b_i = np.zeros(n_items) clip = 5.0 for _ in range(n_epochs): order = rng.permutation(len(vals)) for idx in order: u, i, r = int(rows[idx]), int(cols[idx]), vals[idx] pred = mu + b_u[u] + b_i[i] + U[u] @ V[i] err = np.clip(r - pred, -clip, clip) b_u[u] += lr * (err - reg * b_u[u]) b_i[i] += lr * (err - reg * b_i[i]) U_old = U[u].copy() U[u] += lr * (err * V[i] - reg * U[u]) V[i] += lr * (err * U_old - reg * V[i]) predicted_log = mu + b_u[:, None] + b_i[None, :] + U @ V.T predicted = np.expm1(predicted_log) return predicted, U, V, b_u, b_i, mu @st.cache_resource def train_nmf(_train_dense, n_factors: int = 50, max_iter: int = 200): """Non-negative Matrix Factorization.""" model = NMF(n_components=n_factors, init="nndsvda", random_state=42, max_iter=max_iter) W = model.fit_transform(_train_dense) H = model.components_ predicted = W @ H return predicted, W, H, model @st.cache_resource def train_item_cf(_train_sparse, top_k_similar: int = 20): """Item-based collaborative filtering using cosine similarity.""" item_sim = cosine_similarity(_train_sparse.T) np.fill_diagonal(item_sim, 0) for i in range(item_sim.shape[0]): row = item_sim[i] threshold = np.partition(row, -top_k_similar)[-top_k_similar] if len(row) > top_k_similar else 0 row[row < threshold] = 0 item_sim_sparse = item_sim train_dense = _train_sparse.toarray() if hasattr(_train_sparse, 'toarray') else np.array(_train_sparse) predicted = train_dense @ item_sim_sparse return predicted, item_sim_sparse def get_top_n_recommendations(predicted_scores, train_matrix, user_idx: int, n: int = 10): """Get top-N item indices for a user, excluding already-purchased items.""" scores = predicted_scores[user_idx].copy() if hasattr(train_matrix, 'toarray'): purchased = train_matrix.toarray()[user_idx] else: purchased = np.array(train_matrix.iloc[user_idx] if hasattr(train_matrix, 'iloc') else train_matrix[user_idx]) scores[purchased > 0] = -np.inf top_items = np.argsort(scores)[::-1][:n] return top_items, scores[top_items] def evaluate_recommendations(predicted_scores, train_matrix, test_df, user_index, item_columns, k: int = 10): """ Compute recommendation metrics including: - Precision@K, Recall@K, Hit Rate (ranking metrics) - RMSE, R² (prediction accuracy metrics) """ if hasattr(train_matrix, 'toarray'): train_dense = train_matrix.toarray() else: train_dense = np.array(train_matrix) item_to_col = {item: i for i, item in enumerate(item_columns)} precisions = [] recalls = [] hits = 0 n_users = 0 # For RMSE and R² calculation y_true = [] y_pred = [] for cust_id, group in test_df.groupby("CustomerID"): if cust_id not in user_index: continue user_idx = user_index[cust_id] if user_idx >= predicted_scores.shape[0]: continue true_items = set() for _, row in group.iterrows(): if row["StockCode"] in item_to_col: item_idx = item_to_col[row["StockCode"]] true_items.add(item_idx) # Collect actual vs predicted quantities for RMSE/R² actual_quantity = row["score"] predicted_quantity = predicted_scores[user_idx, item_idx] y_true.append(actual_quantity) y_pred.append(predicted_quantity) if not true_items: continue top_items, _ = get_top_n_recommendations(predicted_scores, train_dense, user_idx, n=k) recommended_set = set(top_items) hit_count = len(recommended_set & true_items) precisions.append(hit_count / k) recalls.append(hit_count / len(true_items) if true_items else 0) if hit_count > 0: hits += 1 n_users += 1 if n_users == 0: return { "Precision@K": 0.0, "Recall@K": 0.0, "Hit Rate": 0.0, "RMSE": 0.0, "R²": 0.0, "Users Evaluated": 0 } # Calculate RMSE and R² rmse = 0.0 r2 = 0.0 if len(y_true) > 0: y_true_arr = np.array(y_true) y_pred_arr = np.array(y_pred) # Clip predictions to reasonable range to avoid extreme errors y_pred_arr = np.clip(y_pred_arr, 0, np.percentile(y_true_arr, 99)) rmse = float(np.sqrt(mean_squared_error(y_true_arr, y_pred_arr))) # R² can be negative if model is worse than mean baseline # We calculate it but note that negative values indicate poor fit r2 = float(r2_score(y_true_arr, y_pred_arr)) return { "Precision@K": float(np.mean(precisions)), "Recall@K": float(np.mean(recalls)), "Hit Rate": float(hits / n_users), "RMSE": rmse, "R²": r2, "Users Evaluated": int(n_users), }