import pandas as pd import numpy as np import tensorflow as tf from typing import Dict, List, Tuple, Optional from collections import defaultdict import pickle from concurrent.futures import ThreadPoolExecutor import multiprocessing as mp class DataProcessor: """Handles data loading and preprocessing for the two-tower model.""" def __init__(self, data_path: str = "datasets/"): self.data_path = data_path self.item_vocab = {} self.category_vocab = {} self.brand_vocab = {} self.user_vocab = {} def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Load all datasets.""" items_df = pd.read_csv(f"{self.data_path}/items.csv") users_df = pd.read_csv(f"{self.data_path}/users.csv") interactions_df = pd.read_csv(f"{self.data_path}/interactions.csv") return items_df, users_df, interactions_df def build_vocabularies(self, items_df: pd.DataFrame, users_df: pd.DataFrame, interactions_df: pd.DataFrame) -> None: """Build vocabulary mappings for categorical features.""" # Item vocabulary unique_items = pd.concat([ items_df['product_id'], interactions_df['product_id'] ]).unique() self.item_vocab = {item: idx for idx, item in enumerate(unique_items)} # Category vocabulary unique_categories = items_df['category_id'].unique() self.category_vocab = {cat: idx for idx, cat in enumerate(unique_categories)} # Brand vocabulary (handle missing values) unique_brands = items_df['brand'].fillna('unknown').unique() self.brand_vocab = {brand: idx for idx, brand in enumerate(unique_brands)} # User vocabulary unique_users = users_df['user_id'].unique() self.user_vocab = {user: idx for idx, user in enumerate(unique_users)} print(f"Vocabularies built:") print(f" Items: {len(self.item_vocab)}") print(f" Categories: {len(self.category_vocab)}") print(f" Brands: {len(self.brand_vocab)}") print(f" Users: {len(self.user_vocab)}") def prepare_item_features(self, items_df: pd.DataFrame) -> Dict[str, np.ndarray]: """Prepare item features for training.""" items_df = items_df.fillna({'brand': 'unknown'}) item_features = { 'product_id': np.array([self.item_vocab.get(item, 0) for item in items_df['product_id']]), 'category_id': np.array([self.category_vocab.get(cat, 0) for cat in items_df['category_id']]), 'brand_id': np.array([self.brand_vocab.get(brand, 0) for brand in items_df['brand']]), 'price': items_df['price'].values.astype(np.float32) } return item_features def create_user_interaction_history(self, interactions_df: pd.DataFrame, items_df: pd.DataFrame, max_history_length: int = 50) -> Dict[int, List[int]]: """Create user interaction histories sorted by timestamp.""" # Convert timestamp to datetime with timezone handling interactions_df = interactions_df.copy() interactions_df['event_time'] = pd.to_datetime(interactions_df['event_time'], utc=True) # Sort by user and timestamp interactions_sorted = interactions_df.sort_values(['user_id', 'event_time']) # Build user histories user_histories = defaultdict(list) for _, row in interactions_sorted.iterrows(): user_id = row['user_id'] item_id = self.item_vocab.get(row['product_id'], 0) user_histories[user_id].append(item_id) # Limit history length for user_id in user_histories: if len(user_histories[user_id]) > max_history_length: user_histories[user_id] = user_histories[user_id][-max_history_length:] return dict(user_histories) def create_positive_negative_pairs(self, interactions_df: pd.DataFrame, items_df: pd.DataFrame, negative_samples_per_positive: int = 4) -> pd.DataFrame: """Create positive and negative user-item pairs for training (optimized).""" # Filter valid interactions once valid_interactions = interactions_df[ (interactions_df['user_id'].isin(self.user_vocab)) & (interactions_df['product_id'].isin(self.item_vocab)) ].copy() # Create positive pairs vectorized positive_pairs = valid_interactions[['user_id', 'product_id']].copy() positive_pairs['rating'] = 1.0 # Pre-compute user interactions for faster lookup user_items_dict = ( valid_interactions.groupby('user_id')['product_id'] .apply(set).to_dict() ) all_items = set(self.item_vocab.keys()) all_items_array = np.array(list(all_items)) # Generate negative samples in parallel def generate_negatives_for_user(user_data): user_id, user_items = user_data negative_items = all_items - user_items if len(negative_items) >= negative_samples_per_positive: neg_items_array = np.array(list(negative_items)) sampled_negatives = np.random.choice( neg_items_array, size=negative_samples_per_positive * len(user_items), replace=len(negative_items) < negative_samples_per_positive * len(user_items) ) # Repeat user_id for each negative sample user_ids = np.repeat(user_id, len(sampled_negatives)) ratings = np.zeros(len(sampled_negatives)) return pd.DataFrame({ 'user_id': user_ids, 'product_id': sampled_negatives, 'rating': ratings }) return pd.DataFrame(columns=['user_id', 'product_id', 'rating']) # Process in parallel chunks chunk_size = max(1, len(user_items_dict) // mp.cpu_count()) user_chunks = [ list(user_items_dict.items())[i:i + chunk_size] for i in range(0, len(user_items_dict), chunk_size) ] negative_dfs = [] with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor: for chunk in user_chunks: chunk_results = list(executor.map(generate_negatives_for_user, chunk)) negative_dfs.extend(chunk_results) # Combine all negative samples if negative_dfs: negative_pairs = pd.concat(negative_dfs, ignore_index=True) else: negative_pairs = pd.DataFrame(columns=['user_id', 'product_id', 'rating']) # Combine positive and negative pairs all_pairs = pd.concat([positive_pairs, negative_pairs], ignore_index=True) return all_pairs def save_vocabularies(self, save_path: str = "src/artifacts/"): """Save vocabularies for later use.""" import os os.makedirs(save_path, exist_ok=True) vocab_data = { 'item_vocab': self.item_vocab, 'category_vocab': self.category_vocab, 'brand_vocab': self.brand_vocab, 'user_vocab': self.user_vocab } with open(f"{save_path}/vocabularies.pkl", 'wb') as f: pickle.dump(vocab_data, f) print(f"Vocabularies saved to {save_path}/vocabularies.pkl") def load_vocabularies(self, load_path: str = "src/artifacts/vocabularies.pkl"): """Load vocabularies from file.""" with open(load_path, 'rb') as f: vocab_data = pickle.load(f) self.item_vocab = vocab_data['item_vocab'] self.category_vocab = vocab_data['category_vocab'] self.brand_vocab = vocab_data['brand_vocab'] self.user_vocab = vocab_data['user_vocab'] print("Vocabularies loaded successfully") def create_tf_dataset(features: Dict[str, np.ndarray], batch_size: int = 256, shuffle: bool = True) -> tf.data.Dataset: """Create optimized TensorFlow dataset from features for CPU training.""" dataset = tf.data.Dataset.from_tensor_slices(features) if shuffle: # Use reasonable buffer size for memory efficiency - handle different feature types sample_key = next(iter(features.keys())) buffer_size = min(len(features[sample_key]), 10000) dataset = dataset.shuffle(buffer_size) dataset = dataset.batch(batch_size) # Optimize for CPU with reasonable prefetch dataset = dataset.prefetch(2) # Reduced from AUTOTUNE for CPU efficiency return dataset