import pandas as pd import numpy as np import pickle import logging from pathlib import Path from scipy.sparse import csr_matrix, save_npz logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s") log = logging.getLogger(__name__) def compute_recency_weights(df, halflife_days=180): now_ts = df["timestamp"].max() days_since = (now_ts - df["timestamp"]) / 86400.0 lam = np.log(2) / halflife_days return np.exp(-lam * days_since).rename("recency_weight") def build_user_features(df): df = df.copy() df["recency_weight"] = compute_recency_weights(df) now_ts = df["timestamp"].max() uf = df.groupby("user_idx").agg( n_interactions=("item_idx", "count"), avg_rating =("rating", "mean"), rating_std =("rating", "std"), min_rating =("rating", "min"), max_rating =("rating", "max"), first_ts =("timestamp", "min"), last_ts =("timestamp", "max"), avg_recency_w =("recency_weight", "mean"), max_recency_w =("recency_weight", "max"), n_high_rating =("rating", lambda x: (x >= 4).sum()), ).reset_index() uf["days_active"] = ((uf["last_ts"] - uf["first_ts"]) / 86400).clip(lower=0) uf["days_since_last"] = ((now_ts - uf["last_ts"]) / 86400).clip(lower=0) uf["pct_high_rating"] = uf["n_high_rating"] / uf["n_interactions"] uf["rating_std"] = uf["rating_std"].fillna(0) uf = uf.drop(columns=["first_ts", "last_ts"]) log.info(f"User features: {uf.shape}") return uf def build_item_features(df): df = df.copy() df["recency_weight"] = compute_recency_weights(df) now_ts = df["timestamp"].max() itf = df.groupby("item_idx").agg( n_interactions=("user_idx", "count"), n_unique_users=("user_idx", "nunique"), avg_rating =("rating", "mean"), rating_std =("rating", "std"), min_rating =("rating", "min"), max_rating =("rating", "max"), first_ts =("timestamp", "min"), last_ts =("timestamp", "max"), avg_recency_w =("recency_weight", "mean"), n_high_rating =("rating", lambda x: (x >= 4).sum()), ).reset_index() itf["days_on_platform"] = ((now_ts - itf["first_ts"]) / 86400).clip(lower=1) itf["days_since_last"] = ((now_ts - itf["last_ts"]) / 86400).clip(lower=0) itf["interaction_velocity"] = itf["n_interactions"] / itf["days_on_platform"] itf["pct_high_rating"] = itf["n_high_rating"] / itf["n_interactions"] itf["rating_std"] = itf["rating_std"].fillna(0) global_avg = df["rating"].mean() min_count = 10 itf["popularity_score"] = ( (itf["n_interactions"] * itf["avg_rating"] + min_count * global_avg) / (itf["n_interactions"] + min_count) ) itf = itf.drop(columns=["first_ts", "last_ts"]) log.info(f"Item features: {itf.shape}") return itf def build_interaction_features(df, user_feats, item_feats): df = df.copy() df["recency_weight"] = compute_recency_weights(df) df = df.merge( user_feats[["user_idx", "avg_rating"]].rename( columns={"avg_rating": "user_avg_rating"}), on="user_idx", how="left" ) df = df.merge( item_feats[["item_idx", "avg_rating", "n_interactions"]].rename( columns={"avg_rating": "item_avg_rating", "n_interactions": "item_popularity"}), on="item_idx", how="left" ) df["rating_deviation"] = df["rating"] - df["item_avg_rating"] df["user_item_ratio"] = df["user_avg_rating"] / (df["item_avg_rating"] + 1e-8) df["is_high_rating"] = (df["rating"] >= 4).astype(int) df["implicit_label"] = 1 log.info(f"Interaction features: {df.shape}") return df def build_weighted_matrix(df, n_users, n_items): df = df.copy() df["recency_weight"] = compute_recency_weights(df) df["weighted_value"] = df["rating"] * df["recency_weight"] matrix = csr_matrix( (df["weighted_value"].values.astype(float), (df["user_idx"].values, df["item_idx"].values)), shape=(n_users, n_items) ) log.info(f"Weighted matrix: {matrix.shape}") return matrix def normalise_features(df, exclude_cols=None): if exclude_cols is None: exclude_cols = [] df = df.copy() numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() cols_to_norm = [c for c in numeric_cols if c not in exclude_cols] for col in cols_to_norm: col_min = df[col].min() col_max = df[col].max() df[col] = (df[col] - col_min) / (col_max - col_min) if col_max > col_min else 0.0 return df def run_feature_pipeline(processed_dir): processed_dir = Path(processed_dir) train_df = pd.read_parquet(processed_dir / "train.parquet") test_df = pd.read_parquet(processed_dir / "test.parquet") with open(processed_dir / "mappings.pkl", "rb") as f: mappings = pickle.load(f) n_users = train_df["user_idx"].max() + 1 n_items = train_df["item_idx"].max() + 1 log.info(f"Loaded train ({len(train_df):,}) and test ({len(test_df):,})") user_feats = build_user_features(train_df) item_feats = build_item_features(train_df) interaction_feats = build_interaction_features(train_df, user_feats, item_feats) weighted_matrix = build_weighted_matrix(train_df, n_users, n_items) user_feats_norm = normalise_features(user_feats, exclude_cols=["user_idx"]) item_feats_norm = normalise_features(item_feats, exclude_cols=["item_idx"]) user_feats.to_parquet(processed_dir / "user_features.parquet", index=False) item_feats.to_parquet(processed_dir / "item_features.parquet", index=False) user_feats_norm.to_parquet(processed_dir / "user_features_norm.parquet", index=False) item_feats_norm.to_parquet(processed_dir / "item_features_norm.parquet", index=False) interaction_feats.to_parquet(processed_dir / "interaction_features.parquet", index=False) save_npz(str(processed_dir / "weighted_matrix.npz"), weighted_matrix) log.info("All feature artifacts saved.") return { "user_feats": user_feats, "item_feats": item_feats, "user_feats_norm": user_feats_norm, "item_feats_norm": item_feats_norm, "interaction_feats": interaction_feats, "weighted_matrix": weighted_matrix, "n_users": n_users, "n_items": n_items, "train_df": train_df, "test_df": test_df, "mappings": mappings, }