Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import datetime | |
| def calculate_metrics(recommendations_dict, ground_truth_dict, k): | |
| """ | |
| Calculates Precision@k, Recall@k, and HitRate@k. | |
| args: | |
| ---------- | |
| recommendations_dict : {user_id: [recommended_item_ids]} | |
| ground_truth_dict : {user_id: set of ground truth item_ids} | |
| k : int | |
| Returns | |
| ------- | |
| dict with mean precision, recall, and hit rate | |
| """ | |
| all_precisions, all_recalls, all_hits = [], [], [] | |
| for user_id, true_items in ground_truth_dict.items(): | |
| recs = recommendations_dict.get(user_id, [])[:k] | |
| if not true_items: | |
| continue | |
| hits = len(set(recs) & true_items) | |
| precision = hits / k if k > 0 else 0 | |
| recall = hits / len(true_items) | |
| hit_rate = 1.0 if hits > 0 else 0.0 | |
| all_precisions.append(precision) | |
| all_recalls.append(recall) | |
| all_hits.append(hit_rate) | |
| if not all_precisions: | |
| return {"mean_precision@k": 0, "mean_recall@k": 0, "mean_hitrate@k": 0} | |
| return { | |
| "mean_precision@k": np.mean(all_precisions), | |
| "mean_recall@k": np.mean(all_recalls), | |
| "mean_hitrate@k": np.mean(all_hits) | |
| } | |
| def prepare_ground_truth(df, mode="purchase", event_weights=None): | |
| """ | |
| Prepares ground truth dictionaries for evaluation. | |
| Parameters | |
| ---------- | |
| df : pd.DataFrame | |
| Test dataframe containing at least ['visitorid', 'itemid', 'event']. | |
| mode : str, default="purchase" | |
| - "purchase" : Only use transactions as ground truth. | |
| - "all" : Use all events. Optionally weight them. | |
| event_weights : dict, optional | |
| Example: {"view": 1, "addtocart": 3, "transaction": 5}. | |
| Used only if mode == "all". | |
| Returns | |
| ------- | |
| dict : {user_id: set of item_ids} | |
| """ | |
| if mode == "purchase": | |
| df_filtered = df[df["event"] == "transaction"] | |
| ground_truth = df_filtered.groupby("visitorid")["itemid"].apply(set).to_dict() | |
| elif mode == "all": | |
| if event_weights is None: | |
| # Default: treat all events equally | |
| ground_truth = df.groupby("visitorid")["itemid"].apply(set).to_dict() | |
| else: | |
| # Weighted ground truth (for more advanced eval) | |
| ground_truth = {} | |
| for uid, user_df in df.groupby("visitorid"): | |
| weighted_items = [] | |
| for _, row in user_df.iterrows(): | |
| weight = event_weights.get(row["event"], 1) | |
| weighted_items.extend([row["itemid"]] * weight) | |
| ground_truth[uid] = set(weighted_items) | |
| else: | |
| raise ValueError("mode must be 'purchase' or 'all'") | |
| return ground_truth | |
| def load_item_properties(data_folder='data/'): | |
| """ | |
| Loads item properties and creates a mapping from item ID to its category ID. | |
| Handles both a single properties file or two split parts. | |
| Args: | |
| data_folder (str): The path to the folder containing item property files. | |
| Returns: | |
| dict: A dictionary mapping {itemid: categoryid}. | |
| """ | |
| print("Loading item properties...") | |
| try: | |
| # First, try to load the two separate parts and combine them. | |
| props_df_part1 = pd.read_csv(data_folder + 'item_properties_part1.csv') | |
| props_df_part2 = pd.read_csv(data_folder + 'item_properties_part2.csv') | |
| props_df = pd.concat([props_df_part1, props_df_part2], ignore_index=True) | |
| print("Successfully loaded and combined item_properties_part1.csv and item_properties_part2.csv.") | |
| except FileNotFoundError: | |
| try: | |
| # If the parts are not found, try to load a single combined file. | |
| props_df = pd.read_csv(data_folder + 'item_properties.csv') | |
| print("Successfully loaded a single item_properties.csv.") | |
| except FileNotFoundError: | |
| print(f"Warning: No item properties files found. Cannot display category information.") | |
| return {} | |
| category_df = props_df[props_df['property'] == 'categoryid'].copy() | |
| category_df['value'] = pd.to_numeric(category_df['value'], errors='coerce').astype('Int64') | |
| item_to_category_map = category_df.set_index('itemid')['value'].to_dict() | |
| print("Item to category mapping created successfully.") | |
| return item_to_category_map | |
| def load_category_tree(data_folder='data/'): | |
| """ | |
| Loads the category tree to map categories to their parent categories. | |
| Args: | |
| data_folder (str): The path to the folder containing category_tree.csv. | |
| Returns: | |
| dict: A dictionary mapping {categoryid: parentid}. | |
| """ | |
| print("Loading category tree...") | |
| try: | |
| tree_df = pd.read_csv(data_folder + 'category_tree.csv') | |
| category_parent_map = tree_df.set_index('categoryid')['parentid'].to_dict() | |
| print("Category tree loaded successfully.") | |
| return category_parent_map | |
| except FileNotFoundError: | |
| print("Warning: 'category_tree.csv' not found. Cannot display parent category information.") | |
| return {} | |
| def get_popular_items(train_df, k=10): | |
| """ | |
| Calculates the top-k most popular items based on transaction count. | |
| """ | |
| purchase_counts = train_df[train_df['event'] == 'transaction']['itemid'].value_counts() | |
| return purchase_counts.head(k).index.tolist() | |
| def show_user_recommendations(visitor_id, model, datamodule, popular_items, item_category_map, category_parent_map, k=10): | |
| """ | |
| Displays recommendations for a user, including category and parent category information. | |
| """ | |
| print(f"\n--- Recommendations for Visitor ID: {visitor_id} ---") | |
| model.eval() | |
| def format_item_with_category(item_id): | |
| category_id = item_category_map.get(item_id, 'N/A') | |
| parent_id = category_parent_map.get(category_id, 'N/A') if category_id != 'N/A' else 'N/A' | |
| return f"Item: {item_id} (Category: {category_id}, Parent: {parent_id})" | |
| user_history_ids = datamodule.user_history.get(visitor_id) | |
| if user_history_ids is None: | |
| print(f"User {visitor_id} not found in training history. Providing popularity-based recommendations.") | |
| print(f"\nTop {k} Popular Items (Fallback):") | |
| recs_with_cats = [format_item_with_category(item_id) for item_id in popular_items] | |
| print(recs_with_cats) | |
| print("-------------------------------------------------") | |
| return | |
| history_with_cats = [format_item_with_category(item_id) for item_id in user_history_ids] | |
| print(f"User's Historical Interactions:") | |
| print(history_with_cats) | |
| history_indices = [datamodule.item_map[i] for i in user_history_ids if i in datamodule.item_map] | |
| if not history_indices: | |
| print("None of the user's historical items are in the model's vocabulary.") | |
| return | |
| max_len = datamodule.max_len | |
| input_seq = history_indices[-max_len:] | |
| padded_input = np.zeros(max_len, dtype=np.int64) | |
| padded_input[-len(input_seq):] = input_seq | |
| input_tensor = torch.LongTensor(np.array([padded_input])) | |
| input_tensor = input_tensor.to(model.device) | |
| with torch.no_grad(): | |
| logits = model(input_tensor) | |
| last_item_logits = logits[0, -1, :] | |
| top_indices = torch.topk(last_item_logits, k).indices.tolist() | |
| recommended_item_ids = [datamodule.inverse_item_map[idx] for idx in top_indices if idx in datamodule.inverse_item_map] | |
| print(f"\nTop {k} Recommended Items:") | |
| recs_with_cats = [format_item_with_category(item_id) for item_id in recommended_item_ids] | |
| print(recs_with_cats) | |
| print("-------------------------------------------------") | |