ml-demo / utils /rec_models.py
aliarafat-stack-ml's picture
fixed model training error
42328a2
import numpy as np
import streamlit as st
from scipy.sparse import csr_matrix, issparse
from scipy.sparse.linalg import svds
from sklearn.decomposition import NMF
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import mean_squared_error, r2_score
from implicit.als import AlternatingLeastSquares
@st.cache_resource
def train_svd(_train_sparse, n_factors: int = 50):
"""Truncated SVD on the user-item matrix."""
k = min(n_factors, min(_train_sparse.shape) - 1)
U, sigma, Vt = svds(_train_sparse.astype(float), k=k)
sigma_diag = np.diag(sigma)
predicted = U @ sigma_diag @ Vt
return predicted, U, sigma, Vt
@st.cache_resource
def train_als(_train_sparse, n_factors: int = 50, iterations: int = 15, regularization: float = 0.1):
"""
Train ALS using the implicit library (industry-standard implementation).
Uses Alternating Least Squares for implicit feedback collaborative filtering.
The confidence weighting formula is: C = 1 + alpha * R
We use alpha=1.0 (library default) which is more conservative than alpha=40.
"""
model = AlternatingLeastSquares(
factors=n_factors,
iterations=iterations,
regularization=regularization,
alpha=1.0, # Confidence scaling factor (default, conservative)
random_state=42
)
# implicit library expects item-user matrix (transposed)
model.fit(_train_sparse.T)
# Return wrapper with correctly named attributes
# After fitting on transposed matrix, user_factors are actually item factors and vice versa
class ALSModel:
def __init__(self, implicit_model):
# Swap the factors back to match our expected interface
self.user_factors = implicit_model.item_factors # These are actually user factors
self.item_factors = implicit_model.user_factors # These are actually item factors
return ALSModel(model)
@st.cache_resource
def train_sgd(
_train_sparse,
n_factors: int = 50,
lr: float = 0.001,
reg: float = 0.02,
n_epochs: int = 100,
):
"""SGD-based matrix factorization with bias terms (Funk SVD).
predicted = mu + b_u + b_i + u . v
Uses log1p-transformed values internally to stabilize training on
purchase-count data where raw values can be large and variable.
"""
if issparse(_train_sparse):
rows, cols = _train_sparse.nonzero()
raw_vals = np.asarray(_train_sparse[rows, cols]).ravel().astype(np.float64)
else:
mat = np.asarray(_train_sparse, dtype=np.float64)
rows, cols = np.nonzero(mat)
raw_vals = mat[rows, cols]
vals = np.log1p(raw_vals)
n_users = _train_sparse.shape[0]
n_items = _train_sparse.shape[1]
mu = float(vals.mean())
rng = np.random.RandomState(42)
scale = 0.01
U = rng.normal(0, scale, (n_users, n_factors))
V = rng.normal(0, scale, (n_items, n_factors))
b_u = np.zeros(n_users)
b_i = np.zeros(n_items)
clip = 5.0
for _ in range(n_epochs):
order = rng.permutation(len(vals))
for idx in order:
u, i, r = int(rows[idx]), int(cols[idx]), vals[idx]
pred = mu + b_u[u] + b_i[i] + U[u] @ V[i]
err = np.clip(r - pred, -clip, clip)
b_u[u] += lr * (err - reg * b_u[u])
b_i[i] += lr * (err - reg * b_i[i])
U_old = U[u].copy()
U[u] += lr * (err * V[i] - reg * U[u])
V[i] += lr * (err * U_old - reg * V[i])
predicted_log = mu + b_u[:, None] + b_i[None, :] + U @ V.T
predicted = np.expm1(predicted_log)
return predicted, U, V, b_u, b_i, mu
@st.cache_resource
def train_nmf(_train_dense, n_factors: int = 50, max_iter: int = 200):
"""Non-negative Matrix Factorization."""
model = NMF(n_components=n_factors, init="nndsvda", random_state=42, max_iter=max_iter)
W = model.fit_transform(_train_dense)
H = model.components_
predicted = W @ H
return predicted, W, H, model
@st.cache_resource
def train_item_cf(_train_sparse, top_k_similar: int = 20):
"""Item-based collaborative filtering using cosine similarity."""
item_sim = cosine_similarity(_train_sparse.T)
np.fill_diagonal(item_sim, 0)
for i in range(item_sim.shape[0]):
row = item_sim[i]
threshold = np.partition(row, -top_k_similar)[-top_k_similar] if len(row) > top_k_similar else 0
row[row < threshold] = 0
item_sim_sparse = item_sim
train_dense = _train_sparse.toarray() if hasattr(_train_sparse, 'toarray') else np.array(_train_sparse)
predicted = train_dense @ item_sim_sparse
return predicted, item_sim_sparse
def get_top_n_recommendations(predicted_scores, train_matrix, user_idx: int, n: int = 10):
"""Get top-N item indices for a user, excluding already-purchased items."""
scores = predicted_scores[user_idx].copy()
if hasattr(train_matrix, 'toarray'):
purchased = train_matrix.toarray()[user_idx]
else:
purchased = np.array(train_matrix.iloc[user_idx] if hasattr(train_matrix, 'iloc') else train_matrix[user_idx])
scores[purchased > 0] = -np.inf
top_items = np.argsort(scores)[::-1][:n]
return top_items, scores[top_items]
def evaluate_recommendations(predicted_scores, train_matrix, test_df, user_index, item_columns, k: int = 10):
"""
Compute recommendation metrics including:
- Precision@K, Recall@K, Hit Rate (ranking metrics)
- RMSE, R² (prediction accuracy metrics)
"""
if hasattr(train_matrix, 'toarray'):
train_dense = train_matrix.toarray()
else:
train_dense = np.array(train_matrix)
item_to_col = {item: i for i, item in enumerate(item_columns)}
precisions = []
recalls = []
hits = 0
n_users = 0
# For RMSE and R² calculation
y_true = []
y_pred = []
for cust_id, group in test_df.groupby("CustomerID"):
if cust_id not in user_index:
continue
user_idx = user_index[cust_id]
if user_idx >= predicted_scores.shape[0]:
continue
true_items = set()
for _, row in group.iterrows():
if row["StockCode"] in item_to_col:
item_idx = item_to_col[row["StockCode"]]
true_items.add(item_idx)
# Collect actual vs predicted quantities for RMSE/R²
actual_quantity = row["score"]
predicted_quantity = predicted_scores[user_idx, item_idx]
y_true.append(actual_quantity)
y_pred.append(predicted_quantity)
if not true_items:
continue
top_items, _ = get_top_n_recommendations(predicted_scores, train_dense, user_idx, n=k)
recommended_set = set(top_items)
hit_count = len(recommended_set & true_items)
precisions.append(hit_count / k)
recalls.append(hit_count / len(true_items) if true_items else 0)
if hit_count > 0:
hits += 1
n_users += 1
if n_users == 0:
return {
"Precision@K": 0.0,
"Recall@K": 0.0,
"Hit Rate": 0.0,
"RMSE": 0.0,
"R²": 0.0,
"Users Evaluated": 0
}
# Calculate RMSE and R²
rmse = 0.0
r2 = 0.0
if len(y_true) > 0:
y_true_arr = np.array(y_true)
y_pred_arr = np.array(y_pred)
# Clip predictions to reasonable range to avoid extreme errors
y_pred_arr = np.clip(y_pred_arr, 0, np.percentile(y_true_arr, 99))
rmse = float(np.sqrt(mean_squared_error(y_true_arr, y_pred_arr)))
# R² can be negative if model is worse than mean baseline
# We calculate it but note that negative values indicate poor fit
r2 = float(r2_score(y_true_arr, y_pred_arr))
return {
"Precision@K": float(np.mean(precisions)),
"Recall@K": float(np.mean(recalls)),
"Hit Rate": float(hits / n_users),
"RMSE": rmse,
"R²": r2,
"Users Evaluated": int(n_users),
}