| """ |
| Synthetic MovieLens-style dataset with realistic user-item interactions. |
| Generates data using a latent factor model if no data file exists. |
| """ |
| import os |
| import numpy as np |
| import pandas as pd |
|
|
|
|
| class MovieLensDataset: |
| GENRES = [ |
| 'Action', 'Comedy', 'Drama', 'Horror', 'Sci-Fi', |
| 'Romance', 'Thriller', 'Animation', 'Documentary', 'Fantasy', |
| ] |
|
|
| def __init__(self, data_dir='data', n_users=500, n_items=200): |
| self.data_dir = data_dir |
| self.n_users = n_users |
| self.n_items = n_items |
| os.makedirs(data_dir, exist_ok=True) |
|
|
| ratings_path = os.path.join(data_dir, 'ratings.csv') |
| items_path = os.path.join(data_dir, 'items.csv') |
|
|
| if os.path.exists(ratings_path) and os.path.exists(items_path): |
| self.ratings_df = pd.read_csv(ratings_path) |
| self.items_df = pd.read_csv(items_path) |
| else: |
| self._generate_synthetic() |
|
|
| self._preprocess() |
|
|
| |
| def _generate_synthetic(self): |
| np.random.seed(42) |
| n_factors = 20 |
| user_factors = np.random.randn(self.n_users, n_factors) |
| item_factors = np.random.randn(self.n_items, n_factors) |
|
|
| interactions = [] |
| for user_id in range(self.n_users): |
| n_ratings = np.random.randint(20, 80) |
| scores = user_factors[user_id] @ item_factors.T |
| probs = np.exp(scores - scores.max()) |
| probs /= probs.sum() |
| n_sample = min(n_ratings, self.n_items) |
| rated_items = np.random.choice(self.n_items, size=n_sample, replace=False, p=probs) |
| for item_id in rated_items: |
| score = user_factors[user_id] @ item_factors[item_id] |
| rating = int(np.clip(round((score + 2) / 4 * 4 + 1), 1, 5)) |
| timestamp = np.random.randint(1_000_000, 2_000_000) |
| interactions.append((user_id, item_id, rating, timestamp)) |
|
|
| self.ratings_df = pd.DataFrame( |
| interactions, columns=['user_id', 'item_id', 'rating', 'timestamp'] |
| ) |
| self.ratings_df = self.ratings_df.sort_values('timestamp').reset_index(drop=True) |
|
|
| items = [] |
| for item_id in range(self.n_items): |
| n_genres = np.random.randint(1, 4) |
| item_genres = np.random.choice(self.GENRES, size=n_genres, replace=False) |
| items.append({ |
| 'item_id': item_id, |
| 'title': f'Movie {item_id + 1}', |
| 'genres': '|'.join(item_genres), |
| }) |
| self.items_df = pd.DataFrame(items) |
|
|
| self.ratings_df.to_csv(os.path.join(self.data_dir, 'ratings.csv'), index=False) |
| self.items_df.to_csv(os.path.join(self.data_dir, 'items.csv'), index=False) |
|
|
| |
| def _preprocess(self): |
| df = self.ratings_df.copy() |
| df['reward'] = (df['rating'] >= 4).astype(int) |
|
|
| self.user_sequences: dict = {} |
| for user_id, group in df.groupby('user_id'): |
| group_sorted = group.sort_values('timestamp') |
| self.user_sequences[int(user_id)] = list(zip( |
| group_sorted['item_id'].values.tolist(), |
| group_sorted['reward'].values.tolist(), |
| )) |
|
|
| train_rows, test_rows = [], [] |
| for user_id, seq in self.user_sequences.items(): |
| split = max(1, int(len(seq) * 0.8)) |
| for i, (item_id, reward) in enumerate(seq): |
| row = {'user_id': user_id, 'item_id': item_id, 'reward': reward} |
| (train_rows if i < split else test_rows).append(row) |
|
|
| self.train_df = pd.DataFrame(train_rows) |
| self.test_df = pd.DataFrame(test_rows) |
| self.n_users_actual = int(df['user_id'].nunique()) |
| self.n_items_actual = int(df['item_id'].nunique()) |
|
|
| |
| def get_user_history(self, user_id: int, max_len: int = 20): |
| return self.user_sequences.get(user_id, [])[-max_len:] |
|
|
| def get_item_info(self, item_id: int) -> dict: |
| row = self.items_df[self.items_df['item_id'] == item_id] |
| if len(row) == 0: |
| return {'item_id': item_id, 'title': f'Movie {item_id + 1}', 'genres': 'Unknown'} |
| r = row.iloc[0] |
| return {'item_id': int(item_id), 'title': str(r['title']), 'genres': str(r['genres'])} |
|
|