Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import pickle | |
| import logging | |
| from pathlib import Path | |
| from scipy.sparse import csr_matrix, save_npz | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s — %(message)s') | |
| log = logging.getLogger(__name__) | |
| def load_raw_data(raw_dir: Path) -> pd.DataFrame: | |
| path = raw_dir / "all_beauty_reviews.parquet" | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Raw data not found at {path}") | |
| log.info(f"Loading raw data from {path}") | |
| df = pd.read_parquet(path) | |
| log.info(f"Loaded {len(df):,} rows") | |
| return df | |
| def clean_and_select(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.rename(columns={'parent_asin': 'item_id'}) | |
| core = ['user_id', 'item_id', 'rating', 'timestamp'] | |
| df = df[core].copy() | |
| df = df.dropna(subset=core) | |
| df['rating'] = pd.to_numeric(df['rating'], errors='coerce') | |
| df['timestamp'] = pd.to_numeric(df['timestamp'], errors='coerce') | |
| df = df.dropna(subset=['rating', 'timestamp']) | |
| if df['timestamp'].median() > 1e12: | |
| df['timestamp'] = (df['timestamp'] / 1000).astype(int) | |
| df['rating'] = df['rating'].astype(float) | |
| df['timestamp'] = df['timestamp'].astype(int) | |
| log.info(f"Clean dataset: {len(df):,} rows") | |
| return df.reset_index(drop=True) | |
| def filter_kcore(df: pd.DataFrame, min_interactions: int = 5) -> pd.DataFrame: | |
| iteration = 0 | |
| while True: | |
| iteration += 1 | |
| before = len(df) | |
| user_counts = df['user_id'].value_counts() | |
| df = df[df['user_id'].isin( | |
| user_counts[user_counts >= min_interactions].index | |
| )] | |
| item_counts = df['item_id'].value_counts() | |
| df = df[df['item_id'].isin( | |
| item_counts[item_counts >= min_interactions].index | |
| )] | |
| after = len(df) | |
| log.info(f"k-core iter {iteration}: {before:,} → {after:,} rows") | |
| if before == after: | |
| break | |
| log.info(f"Final: {df['user_id'].nunique():,} users, " | |
| f"{df['item_id'].nunique():,} items, " | |
| f"{len(df):,} interactions") | |
| return df.reset_index(drop=True) | |
| def encode_ids(df: pd.DataFrame): | |
| user2idx = {u: i for i, u in enumerate(sorted(df['user_id'].unique()))} | |
| item2idx = {it: i for i, it in enumerate(sorted(df['item_id'].unique()))} | |
| idx2user = {v: k for k, v in user2idx.items()} | |
| idx2item = {v: k for k, v in item2idx.items()} | |
| df = df.copy() | |
| df['user_idx'] = df['user_id'].map(user2idx) | |
| df['item_idx'] = df['item_id'].map(item2idx) | |
| log.info(f"Encoded {len(user2idx):,} users and {len(item2idx):,} items") | |
| return df, user2idx, item2idx, idx2user, idx2item | |
| def temporal_split(df: pd.DataFrame): | |
| df = df.sort_values(['user_id', 'timestamp']) | |
| test_idx = df.groupby('user_id')['timestamp'].idxmax() | |
| test_df = df.loc[test_idx].copy() | |
| train_df = df.drop(index=test_idx).copy() | |
| log.info(f"Train: {len(train_df):,} | Test: {len(test_df):,}") | |
| return train_df.reset_index(drop=True), test_df.reset_index(drop=True) | |
| def build_sparse_matrix(df: pd.DataFrame) -> csr_matrix: | |
| n_users = df['user_idx'].max() + 1 | |
| n_items = df['item_idx'].max() + 1 | |
| matrix = csr_matrix( | |
| ( | |
| df['rating'].values.astype(float), | |
| (df['user_idx'].values, df['item_idx'].values) | |
| ), | |
| shape=(n_users, n_items) | |
| ) | |
| density = matrix.nnz / (n_users * n_items) | |
| log.info(f"Sparse matrix: {matrix.shape}, density: {density:.4%}") | |
| return matrix | |
| def run_pipeline(raw_dir: Path, processed_dir: Path, min_interactions: int = 5) -> dict: | |
| processed_dir.mkdir(parents=True, exist_ok=True) | |
| df_raw = load_raw_data(raw_dir) | |
| df_clean = clean_and_select(df_raw) | |
| df_filtered = filter_kcore(df_clean, min_interactions) | |
| df_encoded, user2idx, item2idx, idx2user, idx2item = encode_ids(df_filtered) | |
| train_df, test_df = temporal_split(df_encoded) | |
| matrix = build_sparse_matrix(train_df) | |
| train_df.to_parquet(processed_dir / 'train.parquet', index=False) | |
| test_df.to_parquet(processed_dir / 'test.parquet', index=False) | |
| save_npz(str(processed_dir / 'interaction_matrix.npz'), matrix) | |
| with open(processed_dir / 'mappings.pkl', 'wb') as f: | |
| pickle.dump({ | |
| 'user2idx': user2idx, 'item2idx': item2idx, | |
| 'idx2user': idx2user, 'idx2item': idx2item, | |
| }, f) | |
| log.info("All artifacts saved.") | |
| return { | |
| 'df_raw': df_raw, | |
| 'train_df': train_df, | |
| 'test_df': test_df, | |
| 'matrix': matrix, | |
| 'user2idx': user2idx, | |
| 'item2idx': item2idx, | |
| 'idx2user': idx2user, | |
| 'idx2item': idx2item, | |
| } |