import pandas as pd import numpy as np import math import torch import torch.nn as nn import pytorch_lightning as pl from scipy.sparse import csr_matrix from sklearn.metrics.pairwise import cosine_similarity import implicit from utils import prepare_ground_truth, calculate_metrics def recommend_popular_items_and_evaluate(train_df, test_df, k=10, prepare_ground_truth=None, calculate_metrics=None): """ Trains a non-personalized Popularity model and evaluates its performance. This model recommends the top-k most frequently transacted items from the training set to every user. It serves as a simple but strong baseline. Args: train_df (pd.DataFrame): The training dataset. test_df (pd.DataFrame): The test dataset for evaluation. k (int): The number of items to recommend. prepare_ground_truth (function): A function to process the test_df into a ground truth dict. calculate_metrics (function): A function to compute ranking metrics. Returns: dict: A dictionary containing the calculated evaluation metrics (e.g., precision, recall). """ print(f"\n--- Evaluating Popularity Model (Top {k} items) ---") # 1. "Train" the model by finding the most popular items based on transactions purchase_counts = train_df[train_df['event'] == 'transaction']['itemid'].value_counts() popular_items = purchase_counts.head(k).index.tolist() print(f"Top {k} popular items identified from training data.") # 2. Evaluate the model ground_truth = prepare_ground_truth(test_df) # Every user receives the same list of popular items recommendations = {user_id: popular_items for user_id in ground_truth.keys()} metrics = calculate_metrics(recommendations, ground_truth, k) print("Evaluation complete.") return metrics def recommend_item_item_and_evaluate(train_df, test_df, k=10, min_item_interactions=5, min_user_interactions=5, prepare_ground_truth=None, calculate_metrics=None): """ Trains an Item-Item Collaborative Filtering model and evaluates its performance. This model recommends items that are similar to items a user has interacted with in the past, based on co-occurrence patterns in the training data. Args: train_df (pd.DataFrame): The training dataset. test_df (pd.DataFrame): The test dataset for evaluation. k (int): The number of items to recommend. min_item_interactions (int): Minimum number of interactions for an item to be kept. min_user_interactions (int): Minimum number of interactions for a user to be kept. prepare_ground_truth (function): A function to process the test_df into a ground truth dict. calculate_metrics (function): A function to compute ranking metrics. Returns: dict: A dictionary containing the calculated evaluation metrics. """ print(f"\n--- Evaluating Item-Item CF Model (Top {k} items) ---") # 1. Filter out infrequent users and items to reduce noise and computation item_counts = train_df['itemid'].value_counts() user_counts = train_df['visitorid'].value_counts() items_to_keep = item_counts[item_counts >= min_item_interactions].index users_to_keep = user_counts[user_counts >= min_user_interactions].index filtered_df = train_df[(train_df['itemid'].isin(items_to_keep)) & (train_df['visitorid'].isin(users_to_keep))].copy() print(f"Filtered training data from {len(train_df)} to {len(filtered_df)} records.") # 2. Create user-item interaction matrix and vocabulary mappings user_map = {uid: i for i, uid in enumerate(filtered_df['visitorid'].unique())} item_map = {iid: i for i, iid in enumerate(filtered_df['itemid'].unique())} inverse_item_map = {i: iid for iid, i in item_map.items()} user_indices = filtered_df['visitorid'].map(user_map) item_indices = filtered_df['itemid'].map(item_map) user_item_matrix = csr_matrix((np.ones(len(filtered_df)), (user_indices, item_indices))) # 3. Calculate the cosine similarity matrix between all items print("Calculating item similarity matrix...") item_similarity_matrix = cosine_similarity(user_item_matrix.T, dense_output=False) print("Similarity matrix calculated.") # 4. Generate recommendations and evaluate ground_truth = prepare_ground_truth(test_df) recommendations = {} print("Generating recommendations for users in test set...") test_users = [u for u in ground_truth.keys() if u in user_map] for user_id in test_users: user_index = user_map[user_id] user_interactions_indices = user_item_matrix[user_index].indices if len(user_interactions_indices) > 0: # Aggregate scores from items the user has interacted with all_scores = np.asarray(item_similarity_matrix[user_interactions_indices].sum(axis=0)).flatten() # Remove already interacted items from recommendations all_scores[user_interactions_indices] = -1 top_indices = np.argsort(all_scores)[::-1][:k] recs = [inverse_item_map[idx] for idx in top_indices if idx in inverse_item_map] recommendations[user_id] = recs metrics = calculate_metrics(recommendations, ground_truth, k) print("Evaluation complete.") return metrics def recommend_als_and_evaluate(train_df, test_df, k=10, min_item_interactions=5, min_user_interactions=5, factors=25, regularization=0.02, iterations=48, prepare_ground_truth=None, calculate_metrics=None): """ Trains an Alternating Least Squares (ALS) model and evaluates its performance. This model uses matrix factorization to learn latent embeddings for users and items from implicit feedback data. Default hyperparameters are set from a previous Optuna tuning process. Args: train_df (pd.DataFrame): The training dataset. test_df (pd.DataFrame): The test dataset for evaluation. k (int): The number of items to recommend. min_item_interactions (int): Minimum number of interactions for an item to be kept. min_user_interactions (int): Minimum number of interactions for a user to be kept. factors (int): The number of latent factors to compute. regularization (float): The regularization factor. iterations (int): The number of ALS iterations to run. prepare_ground_truth (function): A function to process the test_df into a ground truth dict. calculate_metrics (function): A function to compute ranking metrics. Returns: dict: A dictionary containing the calculated evaluation metrics. """ print(f"\n--- Evaluating ALS Model (Top {k} items) ---") # 1. Filter data item_counts = train_df['itemid'].value_counts() user_counts = train_df['visitorid'].value_counts() items_to_keep = item_counts[item_counts >= min_item_interactions].index users_to_keep = user_counts[user_counts >= min_user_interactions].index filtered_df = train_df[(train_df['itemid'].isin(items_to_keep)) & (train_df['visitorid'].isin(users_to_keep))].copy() print(f"Filtered training data from {len(train_df)} to {len(filtered_df)} records.") # 2. Create mappings and confidence matrix user_map = {uid: i for i, uid in enumerate(filtered_df['visitorid'].unique())} item_map = {iid: i for i, iid in enumerate(filtered_df['itemid'].unique())} inverse_item_map = {i: iid for iid, i in item_map.items()} user_indices = filtered_df['visitorid'].map(user_map).astype(np.int32) item_indices = filtered_df['itemid'].map(item_map).astype(np.int32) event_weights = {'view': 1, 'addtocart': 3, 'transaction': 5} confidence = filtered_df['event'].map(event_weights).astype(np.float32) user_item_matrix = csr_matrix((confidence, (user_indices, item_indices))) # 3. Train the ALS model print("Training ALS model...") als_model = implicit.als.AlternatingLeastSquares(factors=factors, regularization=regularization, iterations=iterations) als_model.fit(user_item_matrix) print("ALS model trained.") # 4. Generate recommendations and evaluate ground_truth = prepare_ground_truth(test_df) recommendations = {} print("Generating recommendations for users in test set...") test_users_indices = [user_map[u] for u in ground_truth.keys() if u in user_map] if test_users_indices: user_item_matrix_for_recs = user_item_matrix[test_users_indices] ids, _ = als_model.recommend(test_users_indices, user_item_matrix_for_recs, N=k) for i, user_index in enumerate(test_users_indices): original_user_id = list(user_map.keys())[list(user_map.values()).index(user_index)] recs = [inverse_item_map[item_idx] for item_idx in ids[i] if item_idx in inverse_item_map] recommendations[original_user_id] = recs metrics = calculate_metrics(recommendations, ground_truth, k) print("Evaluation complete.") return metrics class SASRec(pl.LightningModule): """ A PyTorch Lightning implementation of the SASRec model for sequential recommendation. SASRec (Self-Attentive Sequential Recommendation) uses a Transformer-based architecture to capture the sequential patterns in a user's interaction history to predict the next item they are likely to interact with. Attributes: save_hyperparameters: Automatically saves all constructor arguments as hyperparameters. item_embedding (nn.Embedding): Embedding layer for item IDs. positional_embedding (nn.Embedding): Embedding layer to encode the position of items in a sequence. transformer_encoder (nn.TransformerEncoder): The core self-attention module. fc (nn.Linear): Final fully connected layer to produce logits over the item vocabulary. loss_fn (nn.CrossEntropyLoss): The loss function used for training. """ def __init__(self, vocab_size, max_len, hidden_dim, num_heads, num_layers, dropout=0.2, learning_rate=1e-3, weight_decay=1e-6, warmup_steps=2000, max_steps=100000): """ Initializes the SASRec model layers and hyperparameters. Args: vocab_size (int): The total number of unique items in the dataset (+1 for padding). max_len (int): The maximum length of the input sequences. hidden_dim (int): The dimensionality of the item and positional embeddings. num_heads (int): The number of attention heads in the Transformer encoder. num_layers (int): The number of layers in the Transformer encoder. dropout (float): The dropout rate to be applied. learning_rate (float): The learning rate for the optimizer. weight_decay (float): The weight decay (L2 penalty) for the optimizer. warmup_steps (int): The number of linear warmup steps for the learning rate scheduler. max_steps (int): The total number of training steps for the learning rate scheduler's decay phase. """ super().__init__() # This saves all hyperparameters to self.hparams, making them accessible later self.save_hyperparameters() # Embedding layers self.item_embedding = nn.Embedding(vocab_size, hidden_dim, padding_idx=0) self.positional_embedding = nn.Embedding(max_len, hidden_dim) self.dropout = nn.Dropout(dropout) # Transformer Encoder encoder_layer = nn.TransformerEncoderLayer( d_model=hidden_dim, nhead=num_heads, dim_feedforward=hidden_dim * 4, dropout=dropout, batch_first=True, activation='gelu' ) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) # Output layer self.fc = nn.Linear(hidden_dim, vocab_size) # Loss function, ignoring the padding token self.loss_fn = nn.CrossEntropyLoss(ignore_index=0) # Lists to store outputs from validation and test steps self.validation_step_outputs = [] self.test_step_outputs = [] def forward(self, x): """ Defines the forward pass of the model. Args: x (torch.Tensor): A batch of input sequences of shape (batch_size, seq_len). Returns: torch.Tensor: The output logits of shape (batch_size, seq_len, vocab_size). """ seq_len = x.size(1) # Create positional indices (0, 1, 2, ..., seq_len-1) positions = torch.arange(seq_len, device=self.device).unsqueeze(0) # Create a causal mask to ensure the model doesn't look ahead in the sequence causal_mask = nn.Transformer.generate_square_subsequent_mask(seq_len, device=self.device) # Combine item and positional embeddings x = self.item_embedding(x) + self.positional_embedding(positions) x = self.dropout(x) # Pass through the Transformer encoder x = self.transformer_encoder(x, mask=causal_mask) # Get final logits logits = self.fc(x) return logits def training_step(self, batch, batch_idx): """ Performs a single training step. Args: batch (tuple): A tuple containing input sequences and target items. batch_idx (int): The index of the current batch. Returns: torch.Tensor: The calculated loss for the batch. """ inputs, targets = batch logits = self.forward(inputs) # We only care about the prediction for the very last item in the input sequence last_logits = logits[:, -1, :] # Calculate loss against the single target item loss = self.loss_fn(last_logits, targets.squeeze()) self.log('train_loss', loss, prog_bar=True, on_step=True, on_epoch=True) return loss def validation_step(self, batch, batch_idx): """ Performs a single validation step. Calculates loss and stores predictions for metric computation at the end of the epoch. """ inputs, targets = batch logits = self.forward(inputs) last_item_logits = logits[:, -1, :] loss = self.loss_fn(last_item_logits, targets.squeeze()) self.log('val_loss', loss, prog_bar=True, on_epoch=True) # Get top-10 predictions for metric calculation top_k_preds = torch.topk(last_item_logits, 10, dim=-1).indices self.validation_step_outputs.append({'preds': top_k_preds, 'targets': targets}) return loss def on_validation_epoch_end(self): """ Calculates and logs ranking metrics at the end of the validation epoch. """ if not self.validation_step_outputs: return # Concatenate all predictions and targets from the epoch preds = torch.cat([x['preds'] for x in self.validation_step_outputs], dim=0) targets = torch.cat([x['targets'] for x in self.validation_step_outputs], dim=0) k = preds.size(1) # Check if the target is in the top-k predictions for each example hits_tensor = (preds == targets).any(dim=1) num_hits = hits_tensor.sum().item() num_targets = len(targets) if num_targets > 0: hit_rate = num_hits / num_targets recall = hit_rate # For next-item prediction, recall@k is the same as hit_rate@k precision = num_hits / (k * num_targets) else: hit_rate, recall, precision = 0.0, 0.0, 0.0 self.log('val_hitrate@10', hit_rate, prog_bar=True) self.log('val_precision@10', precision, prog_bar=True) self.log('val_recall@10', recall, prog_bar=True) self.validation_step_outputs.clear() # Free up memory def test_step(self, batch, batch_idx): """ Performs a single test step. Mirrors the logic of the validation_step. """ inputs, targets = batch logits = self.forward(inputs) last_item_logits = logits[:, -1, :] loss = self.loss_fn(last_item_logits, targets.squeeze()) self.log('test_loss', loss, prog_bar=True) top_k_preds = torch.topk(last_item_logits, 10, dim=-1).indices self.test_step_outputs.append({'preds': top_k_preds, 'targets': targets}) return loss def on_test_epoch_end(self): """ Calculates and logs ranking metrics at the end of the test epoch. """ if not self.test_step_outputs: return preds = torch.cat([x['preds'] for x in self.test_step_outputs], dim=0) targets = torch.cat([x['targets'] for x in self.test_step_outputs], dim=0) k = preds.size(1) hits_tensor = (preds == targets).any(dim=1) num_hits = hits_tensor.sum().item() num_targets = len(targets) if num_targets > 0: hit_rate = num_hits / num_targets recall = hit_rate precision = num_hits / (k * num_targets) else: hit_rate, recall, precision = 0.0, 0.0, 0.0 self.log('test_hitrate@10', hit_rate, prog_bar=True) self.log('test_precision@10', precision, prog_bar=True) self.log('test_recall@10', recall, prog_bar=True) self.test_step_outputs.clear() # Free up memory def configure_optimizers(self): """ Configures the optimizer and learning rate scheduler. Uses AdamW optimizer and a linear warmup followed by a cosine decay schedule, which is a standard practice for training Transformer models. """ optimizer = torch.optim.AdamW( self.parameters(), lr=self.hparams.learning_rate, weight_decay=self.hparams.weight_decay ) # Learning rate scheduler: linear warmup and cosine decay def lr_lambda(current_step: int): warmup_steps = self.hparams.warmup_steps max_steps = self.hparams.max_steps if current_step < warmup_steps: return float(current_step) / float(max(1, warmup_steps)) progress = float(current_step - warmup_steps) / float(max(1, max_steps - warmup_steps)) return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress))) scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda) return { "optimizer": optimizer, "lr_scheduler": { "scheduler": scheduler, "interval": "step", # Update the scheduler at every training step "frequency": 1 } }