Spaces:
Sleeping
Sleeping
File size: 7,632 Bytes
38ae75d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
import pandas as pd
import numpy as np
import torch
import datetime
def calculate_metrics(recommendations_dict, ground_truth_dict, k):
"""
Calculates Precision@k, Recall@k, and HitRate@k.
args:
----------
recommendations_dict : {user_id: [recommended_item_ids]}
ground_truth_dict : {user_id: set of ground truth item_ids}
k : int
Returns
-------
dict with mean precision, recall, and hit rate
"""
all_precisions, all_recalls, all_hits = [], [], []
for user_id, true_items in ground_truth_dict.items():
recs = recommendations_dict.get(user_id, [])[:k]
if not true_items:
continue
hits = len(set(recs) & true_items)
precision = hits / k if k > 0 else 0
recall = hits / len(true_items)
hit_rate = 1.0 if hits > 0 else 0.0
all_precisions.append(precision)
all_recalls.append(recall)
all_hits.append(hit_rate)
if not all_precisions:
return {"mean_precision@k": 0, "mean_recall@k": 0, "mean_hitrate@k": 0}
return {
"mean_precision@k": np.mean(all_precisions),
"mean_recall@k": np.mean(all_recalls),
"mean_hitrate@k": np.mean(all_hits)
}
def prepare_ground_truth(df, mode="purchase", event_weights=None):
"""
Prepares ground truth dictionaries for evaluation.
Parameters
----------
df : pd.DataFrame
Test dataframe containing at least ['visitorid', 'itemid', 'event'].
mode : str, default="purchase"
- "purchase" : Only use transactions as ground truth.
- "all" : Use all events. Optionally weight them.
event_weights : dict, optional
Example: {"view": 1, "addtocart": 3, "transaction": 5}.
Used only if mode == "all".
Returns
-------
dict : {user_id: set of item_ids}
"""
if mode == "purchase":
df_filtered = df[df["event"] == "transaction"]
ground_truth = df_filtered.groupby("visitorid")["itemid"].apply(set).to_dict()
elif mode == "all":
if event_weights is None:
# Default: treat all events equally
ground_truth = df.groupby("visitorid")["itemid"].apply(set).to_dict()
else:
# Weighted ground truth (for more advanced eval)
ground_truth = {}
for uid, user_df in df.groupby("visitorid"):
weighted_items = []
for _, row in user_df.iterrows():
weight = event_weights.get(row["event"], 1)
weighted_items.extend([row["itemid"]] * weight)
ground_truth[uid] = set(weighted_items)
else:
raise ValueError("mode must be 'purchase' or 'all'")
return ground_truth
def load_item_properties(data_folder='data/'):
"""
Loads item properties and creates a mapping from item ID to its category ID.
Handles both a single properties file or two split parts.
Args:
data_folder (str): The path to the folder containing item property files.
Returns:
dict: A dictionary mapping {itemid: categoryid}.
"""
print("Loading item properties...")
try:
# First, try to load the two separate parts and combine them.
props_df_part1 = pd.read_csv(data_folder + 'item_properties_part1.csv')
props_df_part2 = pd.read_csv(data_folder + 'item_properties_part2.csv')
props_df = pd.concat([props_df_part1, props_df_part2], ignore_index=True)
print("Successfully loaded and combined item_properties_part1.csv and item_properties_part2.csv.")
except FileNotFoundError:
try:
# If the parts are not found, try to load a single combined file.
props_df = pd.read_csv(data_folder + 'item_properties.csv')
print("Successfully loaded a single item_properties.csv.")
except FileNotFoundError:
print(f"Warning: No item properties files found. Cannot display category information.")
return {}
category_df = props_df[props_df['property'] == 'categoryid'].copy()
category_df['value'] = pd.to_numeric(category_df['value'], errors='coerce').astype('Int64')
item_to_category_map = category_df.set_index('itemid')['value'].to_dict()
print("Item to category mapping created successfully.")
return item_to_category_map
def load_category_tree(data_folder='data/'):
"""
Loads the category tree to map categories to their parent categories.
Args:
data_folder (str): The path to the folder containing category_tree.csv.
Returns:
dict: A dictionary mapping {categoryid: parentid}.
"""
print("Loading category tree...")
try:
tree_df = pd.read_csv(data_folder + 'category_tree.csv')
category_parent_map = tree_df.set_index('categoryid')['parentid'].to_dict()
print("Category tree loaded successfully.")
return category_parent_map
except FileNotFoundError:
print("Warning: 'category_tree.csv' not found. Cannot display parent category information.")
return {}
def get_popular_items(train_df, k=10):
"""
Calculates the top-k most popular items based on transaction count.
"""
purchase_counts = train_df[train_df['event'] == 'transaction']['itemid'].value_counts()
return purchase_counts.head(k).index.tolist()
def show_user_recommendations(visitor_id, model, datamodule, popular_items, item_category_map, category_parent_map, k=10):
"""
Displays recommendations for a user, including category and parent category information.
"""
print(f"\n--- Recommendations for Visitor ID: {visitor_id} ---")
model.eval()
def format_item_with_category(item_id):
category_id = item_category_map.get(item_id, 'N/A')
parent_id = category_parent_map.get(category_id, 'N/A') if category_id != 'N/A' else 'N/A'
return f"Item: {item_id} (Category: {category_id}, Parent: {parent_id})"
user_history_ids = datamodule.user_history.get(visitor_id)
if user_history_ids is None:
print(f"User {visitor_id} not found in training history. Providing popularity-based recommendations.")
print(f"\nTop {k} Popular Items (Fallback):")
recs_with_cats = [format_item_with_category(item_id) for item_id in popular_items]
print(recs_with_cats)
print("-------------------------------------------------")
return
history_with_cats = [format_item_with_category(item_id) for item_id in user_history_ids]
print(f"User's Historical Interactions:")
print(history_with_cats)
history_indices = [datamodule.item_map[i] for i in user_history_ids if i in datamodule.item_map]
if not history_indices:
print("None of the user's historical items are in the model's vocabulary.")
return
max_len = datamodule.max_len
input_seq = history_indices[-max_len:]
padded_input = np.zeros(max_len, dtype=np.int64)
padded_input[-len(input_seq):] = input_seq
input_tensor = torch.LongTensor(np.array([padded_input]))
input_tensor = input_tensor.to(model.device)
with torch.no_grad():
logits = model(input_tensor)
last_item_logits = logits[0, -1, :]
top_indices = torch.topk(last_item_logits, k).indices.tolist()
recommended_item_ids = [datamodule.inverse_item_map[idx] for idx in top_indices if idx in datamodule.inverse_item_map]
print(f"\nTop {k} Recommended Items:")
recs_with_cats = [format_item_with_category(item_id) for item_id in recommended_item_ids]
print(recs_with_cats)
print("-------------------------------------------------")
|