recsys-ecommerce / src /data_loader.py
dscsdvdfsvs's picture
fix: upload src folder with model classes
80843b0 verified
import pandas as pd
import numpy as np
import pickle
import logging
from pathlib import Path
from scipy.sparse import csr_matrix, save_npz
logging.basicConfig(level=logging.INFO, format='%(asctime)s — %(message)s')
log = logging.getLogger(__name__)
def load_raw_data(raw_dir: Path) -> pd.DataFrame:
path = raw_dir / "all_beauty_reviews.parquet"
if not path.exists():
raise FileNotFoundError(f"Raw data not found at {path}")
log.info(f"Loading raw data from {path}")
df = pd.read_parquet(path)
log.info(f"Loaded {len(df):,} rows")
return df
def clean_and_select(df: pd.DataFrame) -> pd.DataFrame:
df = df.rename(columns={'parent_asin': 'item_id'})
core = ['user_id', 'item_id', 'rating', 'timestamp']
df = df[core].copy()
df = df.dropna(subset=core)
df['rating'] = pd.to_numeric(df['rating'], errors='coerce')
df['timestamp'] = pd.to_numeric(df['timestamp'], errors='coerce')
df = df.dropna(subset=['rating', 'timestamp'])
if df['timestamp'].median() > 1e12:
df['timestamp'] = (df['timestamp'] / 1000).astype(int)
df['rating'] = df['rating'].astype(float)
df['timestamp'] = df['timestamp'].astype(int)
log.info(f"Clean dataset: {len(df):,} rows")
return df.reset_index(drop=True)
def filter_kcore(df: pd.DataFrame, min_interactions: int = 5) -> pd.DataFrame:
iteration = 0
while True:
iteration += 1
before = len(df)
user_counts = df['user_id'].value_counts()
df = df[df['user_id'].isin(
user_counts[user_counts >= min_interactions].index
)]
item_counts = df['item_id'].value_counts()
df = df[df['item_id'].isin(
item_counts[item_counts >= min_interactions].index
)]
after = len(df)
log.info(f"k-core iter {iteration}: {before:,}{after:,} rows")
if before == after:
break
log.info(f"Final: {df['user_id'].nunique():,} users, "
f"{df['item_id'].nunique():,} items, "
f"{len(df):,} interactions")
return df.reset_index(drop=True)
def encode_ids(df: pd.DataFrame):
user2idx = {u: i for i, u in enumerate(sorted(df['user_id'].unique()))}
item2idx = {it: i for i, it in enumerate(sorted(df['item_id'].unique()))}
idx2user = {v: k for k, v in user2idx.items()}
idx2item = {v: k for k, v in item2idx.items()}
df = df.copy()
df['user_idx'] = df['user_id'].map(user2idx)
df['item_idx'] = df['item_id'].map(item2idx)
log.info(f"Encoded {len(user2idx):,} users and {len(item2idx):,} items")
return df, user2idx, item2idx, idx2user, idx2item
def temporal_split(df: pd.DataFrame):
df = df.sort_values(['user_id', 'timestamp'])
test_idx = df.groupby('user_id')['timestamp'].idxmax()
test_df = df.loc[test_idx].copy()
train_df = df.drop(index=test_idx).copy()
log.info(f"Train: {len(train_df):,} | Test: {len(test_df):,}")
return train_df.reset_index(drop=True), test_df.reset_index(drop=True)
def build_sparse_matrix(df: pd.DataFrame) -> csr_matrix:
n_users = df['user_idx'].max() + 1
n_items = df['item_idx'].max() + 1
matrix = csr_matrix(
(
df['rating'].values.astype(float),
(df['user_idx'].values, df['item_idx'].values)
),
shape=(n_users, n_items)
)
density = matrix.nnz / (n_users * n_items)
log.info(f"Sparse matrix: {matrix.shape}, density: {density:.4%}")
return matrix
def run_pipeline(raw_dir: Path, processed_dir: Path, min_interactions: int = 5) -> dict:
processed_dir.mkdir(parents=True, exist_ok=True)
df_raw = load_raw_data(raw_dir)
df_clean = clean_and_select(df_raw)
df_filtered = filter_kcore(df_clean, min_interactions)
df_encoded, user2idx, item2idx, idx2user, idx2item = encode_ids(df_filtered)
train_df, test_df = temporal_split(df_encoded)
matrix = build_sparse_matrix(train_df)
train_df.to_parquet(processed_dir / 'train.parquet', index=False)
test_df.to_parquet(processed_dir / 'test.parquet', index=False)
save_npz(str(processed_dir / 'interaction_matrix.npz'), matrix)
with open(processed_dir / 'mappings.pkl', 'wb') as f:
pickle.dump({
'user2idx': user2idx, 'item2idx': item2idx,
'idx2user': idx2user, 'idx2item': idx2item,
}, f)
log.info("All artifacts saved.")
return {
'df_raw': df_raw,
'train_df': train_df,
'test_df': test_df,
'matrix': matrix,
'user2idx': user2idx,
'item2idx': item2idx,
'idx2user': idx2user,
'idx2item': idx2item,
}