Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import tensorflow as tf | |
| from typing import Dict, List, Tuple | |
| from datetime import datetime | |
| import pickle | |
| import os | |
| from functools import lru_cache | |
| from src.preprocessing.data_loader import DataProcessor | |
| class UserDatasetCreator: | |
| """Creates user training dataset with demographics and aggregated item embeddings.""" | |
| def __init__(self, max_history_length: int = 50): | |
| self.max_history_length = max_history_length | |
| self.data_processor = DataProcessor() | |
| def categorize_age(self, age: float) -> int: | |
| """Categorize age into 6 demographic groups.""" | |
| if age < 18: | |
| return 0 # Teen | |
| elif age < 26: | |
| return 1 # Young Adult | |
| elif age < 36: | |
| return 2 # Adult | |
| elif age < 51: | |
| return 3 # Middle Age | |
| elif age < 66: | |
| return 4 # Mature | |
| else: | |
| return 5 # Senior | |
| def categorize_income(self, income_series: pd.Series) -> np.ndarray: | |
| """Categorize income into 5 percentile-based groups.""" | |
| # Calculate percentiles | |
| percentiles = [0, 20, 40, 60, 80, 100] | |
| income_thresholds = np.percentile(income_series, percentiles) | |
| # Categorize based on percentiles | |
| categories = np.digitize(income_series, income_thresholds[1:-1]) | |
| # Ensure categories are 0-4 | |
| categories = np.clip(categories, 0, 4) | |
| return categories.astype(np.int32) | |
| def categorize_profession(self, profession: str) -> int: | |
| """Categorize profession into numeric categories.""" | |
| profession_map = { | |
| "Technology": 0, | |
| "Healthcare": 1, | |
| "Education": 2, | |
| "Finance": 3, | |
| "Retail": 4, | |
| "Manufacturing": 5, | |
| "Services": 6, | |
| "Other": 7 | |
| } | |
| return profession_map.get(profession, 7) # Default to "Other" | |
| def categorize_location(self, location: str) -> int: | |
| """Categorize location into numeric categories.""" | |
| location_map = { | |
| "Urban": 0, | |
| "Suburban": 1, | |
| "Rural": 2 | |
| } | |
| return location_map.get(location, 0) # Default to "Urban" | |
| def categorize_education_level(self, education: str) -> int: | |
| """Categorize education level into numeric categories.""" | |
| education_map = { | |
| "High School": 0, | |
| "Some College": 1, | |
| "Bachelor's": 2, | |
| "Master's": 3, | |
| "PhD+": 4 | |
| } | |
| return education_map.get(education, 0) # Default to "High School" | |
| def categorize_marital_status(self, marital_status: str) -> int: | |
| """Categorize marital status into numeric categories.""" | |
| marital_map = { | |
| "Single": 0, | |
| "Married": 1, | |
| "Divorced": 2, | |
| "Widowed": 3 | |
| } | |
| return marital_map.get(marital_status, 0) # Default to "Single" | |
| def load_item_embeddings(self, embeddings_path: str = "src/artifacts/item_embeddings.npy") -> Dict[int, np.ndarray]: | |
| """Load pre-trained item embeddings with caching.""" | |
| try: | |
| embeddings = np.load(embeddings_path, allow_pickle=True).item() | |
| print(f"Loaded {len(embeddings)} item embeddings from cache") | |
| return embeddings | |
| except FileNotFoundError: | |
| print(f"Warning: {embeddings_path} not found. Creating dummy embeddings...") | |
| # Create dummy embeddings for demo purposes | |
| from src.preprocessing.data_loader import DataProcessor | |
| processor = DataProcessor() | |
| items_df, users_df, interactions_df = processor.load_data() | |
| # Use more efficient random generation | |
| num_items = len(items_df['product_id'].unique()) | |
| item_ids = items_df['product_id'].unique() | |
| embedding_matrix = np.random.rand(num_items, 128).astype(np.float32) # Updated to 128D | |
| dummy_embeddings = dict(zip(item_ids, embedding_matrix)) | |
| print(f"Created dummy embeddings for {len(dummy_embeddings)} items") | |
| return dummy_embeddings | |
| def aggregate_user_history_embeddings(self, | |
| user_histories: Dict[int, List[int]], | |
| item_embeddings: Dict[int, np.ndarray], | |
| embedding_dim: int = 128) -> Dict[int, np.ndarray]: # Updated to 128D | |
| """Aggregate item embeddings for each user's interaction history.""" | |
| user_aggregated_embeddings = {} | |
| # Use direct dictionary lookup instead of sparse matrix for memory efficiency | |
| # This avoids creating a huge array when item IDs are large/sparse | |
| for user_id, item_history in user_histories.items(): | |
| if not item_history: | |
| user_aggregated_embeddings[user_id] = np.zeros((self.max_history_length, embedding_dim)) | |
| continue | |
| # FIXED: Convert vocab indices to item IDs for proper embedding lookup | |
| history_embeddings = [] | |
| vocab_to_item_id = {vocab_idx: item_id for item_id, vocab_idx in self.data_processor.item_vocab.items()} | |
| for vocab_idx in item_history: | |
| # Convert vocab index to actual item ID | |
| actual_item_id = vocab_to_item_id.get(vocab_idx) | |
| if actual_item_id and actual_item_id in item_embeddings: | |
| history_embeddings.append(item_embeddings[actual_item_id]) | |
| else: | |
| # Use zero embedding for unknown items | |
| history_embeddings.append(np.zeros(embedding_dim)) | |
| history_embeddings = np.array(history_embeddings) | |
| # Pad or truncate to max_history_length | |
| if len(history_embeddings) < self.max_history_length: | |
| # Add padding at the END so real interactions are at the BEGINNING | |
| padding = np.zeros((self.max_history_length - len(history_embeddings), embedding_dim)) | |
| history_embeddings = np.vstack([history_embeddings, padding]) | |
| else: | |
| # Keep most recent interactions | |
| history_embeddings = history_embeddings[-self.max_history_length:] | |
| user_aggregated_embeddings[user_id] = history_embeddings | |
| return user_aggregated_embeddings | |
| def create_temporal_split(self, | |
| interactions_df: pd.DataFrame, | |
| split_date: str = "2019-11-15") -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """Split interactions temporally for training and validation.""" | |
| # Convert to datetime and handle timezone issues | |
| interactions_df = interactions_df.copy() | |
| interactions_df['event_time'] = pd.to_datetime(interactions_df['event_time'], utc=True) | |
| split_timestamp = pd.to_datetime(split_date, utc=True) | |
| train_interactions = interactions_df[interactions_df['event_time'] < split_timestamp] | |
| val_interactions = interactions_df[interactions_df['event_time'] >= split_timestamp] | |
| print(f"Temporal split:") | |
| print(f" Training interactions: {len(train_interactions)} (before {split_date})") | |
| print(f" Validation interactions: {len(val_interactions)} (after {split_date})") | |
| return train_interactions, val_interactions | |
| def prepare_user_features(self, | |
| users_df: pd.DataFrame, | |
| user_aggregated_embeddings: Dict[int, np.ndarray]) -> Dict[str, np.ndarray]: | |
| """Prepare user features combining demographics and history embeddings.""" | |
| # Filter users that have both demographics and history | |
| valid_users = set(users_df['user_id']) & set(user_aggregated_embeddings.keys()) | |
| valid_users = sorted(list(valid_users)) | |
| # Prepare demographic features | |
| user_demographics = users_df[users_df['user_id'].isin(valid_users)].copy() | |
| user_demographics = user_demographics.sort_values('user_id') | |
| # Convert gender to numeric (0=female, 1=male) | |
| user_demographics['gender_numeric'] = (user_demographics['gender'] == 'male').astype(int) | |
| # Categorize age (6 categories: Teen, Young Adult, Adult, Middle Age, Mature, Senior) | |
| user_demographics['age_category'] = user_demographics['age'].apply(self.categorize_age) | |
| # Categorize income (5 percentile-based categories) | |
| user_demographics['income_category'] = self.categorize_income(user_demographics['income']) | |
| # Categorize new demographic features | |
| user_demographics['profession_category'] = user_demographics['profession'].apply(self.categorize_profession) | |
| user_demographics['location_category'] = user_demographics['location'].apply(self.categorize_location) | |
| user_demographics['education_category'] = user_demographics['education_level'].apply(self.categorize_education_level) | |
| user_demographics['marital_category'] = user_demographics['marital_status'].apply(self.categorize_marital_status) | |
| # Create mapping from user_id to array index | |
| user_id_to_index = {uid: idx for idx, uid in enumerate(user_demographics['user_id'])} | |
| # Prepare features with categorical demographics | |
| user_features = { | |
| 'user_id_to_index': user_id_to_index, # Add mapping for later use | |
| 'user_ids': user_demographics['user_id'].values, # Keep original user IDs | |
| 'age': user_demographics['age_category'].values.astype(np.int32), # Categorical age | |
| 'gender': user_demographics['gender_numeric'].values.astype(np.int32), | |
| 'income': user_demographics['income_category'].values.astype(np.int32), # Categorical income | |
| 'profession': user_demographics['profession_category'].values.astype(np.int32), # Categorical profession | |
| 'location': user_demographics['location_category'].values.astype(np.int32), # Categorical location | |
| 'education_level': user_demographics['education_category'].values.astype(np.int32), # Categorical education | |
| 'marital_status': user_demographics['marital_category'].values.astype(np.int32), # Categorical marital status | |
| 'item_history_embeddings': np.array([ | |
| user_aggregated_embeddings[uid] for uid in user_demographics['user_id'] | |
| ]).astype(np.float32) | |
| } | |
| print(f"Prepared user features for {len(valid_users)} users") | |
| print(f"Age categories: {np.unique(user_features['age'], return_counts=True)}") | |
| print(f"Income categories: {np.unique(user_features['income'], return_counts=True)}") | |
| print(f"Profession categories: {np.unique(user_features['profession'], return_counts=True)}") | |
| print(f"Location categories: {np.unique(user_features['location'], return_counts=True)}") | |
| print(f"Education categories: {np.unique(user_features['education_level'], return_counts=True)}") | |
| print(f"Marital status categories: {np.unique(user_features['marital_status'], return_counts=True)}") | |
| print(f"History embeddings shape: {user_features['item_history_embeddings'].shape}") | |
| return user_features | |
| def create_training_dataset(self, | |
| interactions_df: pd.DataFrame, | |
| items_df: pd.DataFrame, | |
| users_df: pd.DataFrame, | |
| item_embeddings: Dict[int, np.ndarray], | |
| negative_samples_per_positive: int = 4) -> Dict[str, np.ndarray]: | |
| """Create complete training dataset.""" | |
| # Load vocabularies | |
| self.data_processor.build_vocabularies(items_df, users_df, interactions_df) | |
| # Create user histories up to each interaction point | |
| print("Creating user interaction histories...") | |
| user_histories = self.data_processor.create_user_interaction_history( | |
| interactions_df, items_df, self.max_history_length | |
| ) | |
| # Aggregate user history embeddings | |
| print("Aggregating user history embeddings...") | |
| user_aggregated_embeddings = self.aggregate_user_history_embeddings( | |
| user_histories, item_embeddings | |
| ) | |
| # Create positive/negative pairs | |
| print("Creating positive/negative pairs...") | |
| training_pairs = self.data_processor.create_positive_negative_pairs( | |
| interactions_df, items_df, negative_samples_per_positive | |
| ) | |
| # Prepare user features | |
| user_features = self.prepare_user_features(users_df, user_aggregated_embeddings) | |
| # Prepare item features | |
| item_features = self.data_processor.prepare_item_features(items_df) | |
| # Create aligned dataset | |
| print("Creating aligned training dataset...") | |
| # Get valid user-item pairs | |
| valid_pairs = [] | |
| for _, row in training_pairs.iterrows(): | |
| user_id = row['user_id'] | |
| item_id = row['product_id'] | |
| rating = row['rating'] | |
| if (user_id in self.data_processor.user_vocab and | |
| item_id in self.data_processor.item_vocab): | |
| valid_pairs.append({ | |
| 'user_id': user_id, | |
| 'product_id': item_id, | |
| 'rating': rating | |
| }) | |
| valid_pairs_df = pd.DataFrame(valid_pairs) | |
| # Create feature arrays for training | |
| training_features = {} | |
| # User features for each pair - use correct mapping | |
| user_indices = [] | |
| valid_user_pairs = [] | |
| for _, row in valid_pairs_df.iterrows(): | |
| user_id = row['user_id'] | |
| if user_id in user_features['user_id_to_index']: | |
| user_indices.append(user_features['user_id_to_index'][user_id]) | |
| valid_user_pairs.append(row) | |
| # Filter valid pairs to only those with user features | |
| valid_pairs_df = pd.DataFrame(valid_user_pairs) | |
| if len(valid_pairs_df) == 0: | |
| print("Warning: No valid user-item pairs found!") | |
| return {} | |
| # Now use the correct indices | |
| training_features['age'] = user_features['age'][user_indices] | |
| training_features['gender'] = user_features['gender'][user_indices] | |
| training_features['income'] = user_features['income'][user_indices] | |
| training_features['profession'] = user_features['profession'][user_indices] | |
| training_features['location'] = user_features['location'][user_indices] | |
| training_features['education_level'] = user_features['education_level'][user_indices] | |
| training_features['marital_status'] = user_features['marital_status'][user_indices] | |
| training_features['item_history_embeddings'] = user_features['item_history_embeddings'][user_indices] | |
| # Item features for each pair | |
| item_indices = [self.data_processor.item_vocab[iid] for iid in valid_pairs_df['product_id']] | |
| training_features['product_id'] = item_features['product_id'][item_indices] | |
| training_features['category_id'] = item_features['category_id'][item_indices] | |
| training_features['brand_id'] = item_features['brand_id'][item_indices] | |
| training_features['price'] = item_features['price'][item_indices] | |
| # Ratings | |
| training_features['rating'] = valid_pairs_df['rating'].values.astype(np.float32) | |
| print(f"Created training dataset with {len(valid_pairs)} samples") | |
| return training_features | |
| def save_dataset(self, | |
| training_features: Dict[str, np.ndarray], | |
| save_path: str = "src/artifacts/"): | |
| """Save the prepared training dataset.""" | |
| os.makedirs(save_path, exist_ok=True) | |
| # Save features | |
| with open(f"{save_path}/training_features.pkl", 'wb') as f: | |
| pickle.dump(training_features, f) | |
| # Save dataset statistics | |
| stats = { | |
| 'num_samples': len(training_features['rating']), | |
| 'num_positive': np.sum(training_features['rating'] > 0.5), | |
| 'num_negative': np.sum(training_features['rating'] <= 0.5), | |
| 'history_length': training_features['item_history_embeddings'].shape[1], | |
| 'embedding_dim': training_features['item_history_embeddings'].shape[2] | |
| } | |
| with open(f"{save_path}/dataset_stats.txt", 'w') as f: | |
| for key, value in stats.items(): | |
| f.write(f"{key}: {value}\n") | |
| print(f"Training dataset saved to {save_path}") | |
| print(f"Dataset statistics: {stats}") | |
| def load_dataset(self, load_path: str = "src/artifacts/training_features.pkl") -> Dict[str, np.ndarray]: | |
| """Load saved training dataset.""" | |
| with open(load_path, 'rb') as f: | |
| training_features = pickle.load(f) | |
| print(f"Loaded training dataset with {len(training_features['rating'])} samples") | |
| return training_features | |
| def main(): | |
| """Main function for user dataset creation.""" | |
| # Initialize dataset creator | |
| dataset_creator = UserDatasetCreator(max_history_length=50) | |
| # Load data | |
| print("Loading data...") | |
| data_processor = DataProcessor() | |
| items_df, users_df, interactions_df = data_processor.load_data() | |
| # Load pre-trained item embeddings | |
| print("Loading item embeddings...") | |
| item_embeddings = dataset_creator.load_item_embeddings() | |
| # Use full dataset for training with proper validation/test splits | |
| print("Using full dataset for training...") | |
| sample_users = users_df # Use all users | |
| user_ids = set(sample_users['user_id']) | |
| # Filter interactions to users (all users now) | |
| sample_interactions = interactions_df[interactions_df['user_id'].isin(user_ids)] | |
| # Filter items to those in interactions | |
| item_ids = set(sample_interactions['product_id']) | |
| sample_items = items_df[items_df['product_id'].isin(item_ids)] | |
| print(f"Full dataset: {len(sample_items)} items, {len(sample_users)} users, {len(sample_interactions)} interactions") | |
| # Create temporal split | |
| print("Creating temporal split...") | |
| train_interactions, val_interactions = dataset_creator.create_temporal_split(sample_interactions) | |
| # Create training dataset | |
| print("Creating training dataset...") | |
| training_features = dataset_creator.create_training_dataset( | |
| train_interactions, sample_items, sample_users, item_embeddings, | |
| negative_samples_per_positive=2 # Reduce for faster processing | |
| ) | |
| # Save dataset | |
| print("Saving training dataset...") | |
| dataset_creator.save_dataset(training_features) | |
| # Create validation dataset (use reasonable portion for validation) | |
| print("Creating validation dataset...") | |
| # Use up to 10% of validation interactions or 5000, whichever is smaller | |
| val_sample_size = min(5000, max(len(val_interactions) // 10, len(val_interactions))) | |
| val_sample = val_interactions.sample(val_sample_size) if val_sample_size > 0 and val_sample_size < len(val_interactions) else val_interactions | |
| val_training_features = dataset_creator.create_training_dataset( | |
| val_sample, sample_items, sample_users, item_embeddings, | |
| negative_samples_per_positive=1 # Smaller ratio for validation | |
| ) | |
| # Save validation dataset | |
| with open("src/artifacts/validation_features.pkl", 'wb') as f: | |
| pickle.dump(val_training_features, f) | |
| print("User dataset creation completed!") | |
| def prepare_user_features(users_df: pd.DataFrame, | |
| user_histories: Dict[int, List[int]], | |
| item_features: Dict[str, np.ndarray], | |
| max_history_length: int = 50, | |
| embedding_dim: int = 128) -> Dict[int, Dict]: | |
| """Standalone function to prepare user features with categorical demographics.""" | |
| creator = UserDatasetCreator(max_history_length=max_history_length) | |
| # Create dummy item embeddings if not available (for 128D) | |
| item_embeddings = {} | |
| unique_items = set() | |
| for history in user_histories.values(): | |
| unique_items.update(history) | |
| # Create random embeddings for items (will be replaced by actual embeddings later) | |
| for item_vocab_idx in unique_items: | |
| item_embeddings[item_vocab_idx] = np.random.randn(embedding_dim).astype(np.float32) | |
| # Get user aggregated embeddings | |
| user_aggregated_embeddings = creator.aggregate_user_history_embeddings( | |
| user_histories, item_embeddings, embedding_dim | |
| ) | |
| # Process user features | |
| user_feature_dict = {} | |
| for _, user_row in users_df.iterrows(): | |
| user_id = user_row['user_id'] | |
| if user_id not in user_aggregated_embeddings: | |
| continue | |
| # Categorize demographics | |
| age_cat = creator.categorize_age(user_row['age']) | |
| gender_cat = 1 if user_row['gender'].lower() == 'male' else 0 | |
| # Categorize income using percentiles from all users | |
| income_categories = creator.categorize_income(users_df['income']) | |
| user_idx = users_df[users_df['user_id'] == user_id].index[0] | |
| income_cat = income_categories[user_idx] | |
| # Get new demographic features from the row | |
| profession_cat = creator.categorize_profession(user_row.get('profession', 'Other')) | |
| location_cat = creator.categorize_location(user_row.get('location', 'Urban')) | |
| education_cat = creator.categorize_education_level(user_row.get('education_level', 'High School')) | |
| marital_cat = creator.categorize_marital_status(user_row.get('marital_status', 'Single')) | |
| user_feature_dict[user_id] = { | |
| 'age': age_cat, | |
| 'gender': gender_cat, | |
| 'income': income_cat, | |
| 'profession': profession_cat, | |
| 'location': location_cat, | |
| 'education_level': education_cat, | |
| 'marital_status': marital_cat, | |
| 'item_history_embeddings': user_aggregated_embeddings[user_id] | |
| } | |
| print(f"Prepared features for {len(user_feature_dict)} users with {embedding_dim}D embeddings") | |
| return user_feature_dict | |
| if __name__ == "__main__": | |
| main() |