import pandas as pd import numpy as np import pickle import logging from pathlib import Path from scipy.sparse import csr_matrix, save_npz logging.basicConfig(level=logging.INFO, format='%(asctime)s — %(message)s') log = logging.getLogger(__name__) def load_raw_data(raw_dir: Path) -> pd.DataFrame: path = raw_dir / "all_beauty_reviews.parquet" if not path.exists(): raise FileNotFoundError(f"Raw data not found at {path}") log.info(f"Loading raw data from {path}") df = pd.read_parquet(path) log.info(f"Loaded {len(df):,} rows") return df def clean_and_select(df: pd.DataFrame) -> pd.DataFrame: df = df.rename(columns={'parent_asin': 'item_id'}) core = ['user_id', 'item_id', 'rating', 'timestamp'] df = df[core].copy() df = df.dropna(subset=core) df['rating'] = pd.to_numeric(df['rating'], errors='coerce') df['timestamp'] = pd.to_numeric(df['timestamp'], errors='coerce') df = df.dropna(subset=['rating', 'timestamp']) if df['timestamp'].median() > 1e12: df['timestamp'] = (df['timestamp'] / 1000).astype(int) df['rating'] = df['rating'].astype(float) df['timestamp'] = df['timestamp'].astype(int) log.info(f"Clean dataset: {len(df):,} rows") return df.reset_index(drop=True) def filter_kcore(df: pd.DataFrame, min_interactions: int = 5) -> pd.DataFrame: iteration = 0 while True: iteration += 1 before = len(df) user_counts = df['user_id'].value_counts() df = df[df['user_id'].isin( user_counts[user_counts >= min_interactions].index )] item_counts = df['item_id'].value_counts() df = df[df['item_id'].isin( item_counts[item_counts >= min_interactions].index )] after = len(df) log.info(f"k-core iter {iteration}: {before:,} → {after:,} rows") if before == after: break log.info(f"Final: {df['user_id'].nunique():,} users, " f"{df['item_id'].nunique():,} items, " f"{len(df):,} interactions") return df.reset_index(drop=True) def encode_ids(df: pd.DataFrame): user2idx = {u: i for i, u in enumerate(sorted(df['user_id'].unique()))} item2idx = {it: i for i, it in enumerate(sorted(df['item_id'].unique()))} idx2user = {v: k for k, v in user2idx.items()} idx2item = {v: k for k, v in item2idx.items()} df = df.copy() df['user_idx'] = df['user_id'].map(user2idx) df['item_idx'] = df['item_id'].map(item2idx) log.info(f"Encoded {len(user2idx):,} users and {len(item2idx):,} items") return df, user2idx, item2idx, idx2user, idx2item def temporal_split(df: pd.DataFrame): df = df.sort_values(['user_id', 'timestamp']) test_idx = df.groupby('user_id')['timestamp'].idxmax() test_df = df.loc[test_idx].copy() train_df = df.drop(index=test_idx).copy() log.info(f"Train: {len(train_df):,} | Test: {len(test_df):,}") return train_df.reset_index(drop=True), test_df.reset_index(drop=True) def build_sparse_matrix(df: pd.DataFrame) -> csr_matrix: n_users = df['user_idx'].max() + 1 n_items = df['item_idx'].max() + 1 matrix = csr_matrix( ( df['rating'].values.astype(float), (df['user_idx'].values, df['item_idx'].values) ), shape=(n_users, n_items) ) density = matrix.nnz / (n_users * n_items) log.info(f"Sparse matrix: {matrix.shape}, density: {density:.4%}") return matrix def run_pipeline(raw_dir: Path, processed_dir: Path, min_interactions: int = 5) -> dict: processed_dir.mkdir(parents=True, exist_ok=True) df_raw = load_raw_data(raw_dir) df_clean = clean_and_select(df_raw) df_filtered = filter_kcore(df_clean, min_interactions) df_encoded, user2idx, item2idx, idx2user, idx2item = encode_ids(df_filtered) train_df, test_df = temporal_split(df_encoded) matrix = build_sparse_matrix(train_df) train_df.to_parquet(processed_dir / 'train.parquet', index=False) test_df.to_parquet(processed_dir / 'test.parquet', index=False) save_npz(str(processed_dir / 'interaction_matrix.npz'), matrix) with open(processed_dir / 'mappings.pkl', 'wb') as f: pickle.dump({ 'user2idx': user2idx, 'item2idx': item2idx, 'idx2user': idx2user, 'idx2item': idx2item, }, f) log.info("All artifacts saved.") return { 'df_raw': df_raw, 'train_df': train_df, 'test_df': test_df, 'matrix': matrix, 'user2idx': user2idx, 'item2idx': item2idx, 'idx2user': idx2user, 'idx2item': idx2item, }