File size: 15,835 Bytes
a1a5ffc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
# data_prep.py - COMPLETE ARCHETYPE EDITION
# Integrates: 20-rating threshold, Recency Bias Penalty, and Specialist Centroid Logic.
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import json
import pickle
import os
import re
from tqdm import tqdm
import time
import litellm
from config import HF_TOKEN, LLM_MODEL_NAME
# --- CUSTOM JSON ENCODER ---
class NumpyEncoder(json.JSONEncoder):
"""Custom encoder for numpy data types to avoid JSON serializable errors."""
def default(self, obj):
if isinstance(obj, np.integer): return int(obj)
if isinstance(obj, np.floating): return float(obj)
if isinstance(obj, np.ndarray): return obj.tolist()
return super(NumpyEncoder, self).default(obj)
# --- CONSTANTS & CONFIG ---
MOVIELENS_DIR = 'data/ml-latest-small'
PROCESSED_DIR = 'data/processed'
MIN_RATINGS_THRESHOLD = 20 # Popularity filter for items
MIN_USER_RATINGS = 20 # Minimum activity for specialist candidates
DISCOVERY_ERA_YEAR = 1980 # Threshold for Recency Bias logic
# Specialist Persona Targets (Used to find the top 5 specialists per category)
PERSONA_TARGETS = {
"Action Junkie": ["Action", "Adventure", "Sci-Fi"],
"Romantic Dreamer": ["Romance"],
"Cinephile Critic": ["Drama", "Crime", "Mystery", "Thriller"]
}
os.makedirs(PROCESSED_DIR, exist_ok=True)
# --- LLM HELPER ---
def call_llm_for_hook(prompt):
"""Calls the LLM via LiteLLM to generate snappy movie hooks."""
try:
print(f"DEBUG: Calling LLM for hook with model: {LLM_MODEL_NAME}")
start_ts = time.time()
response = litellm.completion(
model=LLM_MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
api_key=HF_TOKEN,
max_tokens=40,
temperature=0.8
)
print(f"DEBUG: Full LLM Response Object: {response}")
print(f"DEBUG: LLM call completed in {time.time() - start_ts:.4f}s")
return response.choices[0].message.content.strip().replace('"', '')
except Exception as e:
return "A standout pick for your collection."
def extract_year(title):
"""Regex to pull (YYYY) from movie titles."""
match = re.search(r'\((\d{4})\)', title)
return int(match.group(1)) if match else 0
# --- CORE PREPARATION ---
def prepare_data():
print("Step 1: Loading raw MovieLens data...")
ratings = pd.read_csv(f'{MOVIELENS_DIR}/ratings.csv')
movies = pd.read_csv(f'{MOVIELENS_DIR}/movies.csv')
# 2 Process Metadata
print("Step 2: Processing metadata and applying thresholds...")
movies['year'] = movies['title'].apply(extract_year) # Extract (YYYY) from titles for recency bias
# Filter movies by popularity: count ratings per movie, then keep only those with ≥20 ratings
popular_ids = ratings.groupby('movieId').size()
popular_ids = popular_ids[popular_ids >= MIN_RATINGS_THRESHOLD].index
filtered_movies = movies[movies['movieId'].isin(popular_ids)].copy()
movie_meta = {}
for _, row in filtered_movies.iterrows():
movie_meta[str(row['movieId'])] = {
'movie_title': row['title'],
'genres': row['genres'].split('|'),
'year': row['year']
}
# === RATING NORMALIZATION (User Bias Correction) ===
# Problem: Different users have different rating scales (generous vs. strict)
# Solution: Subtract each user's average rating from their individual ratings
# This creates deviation scores that are comparable across users
# Formula: normalized_rating = rating - user_avg_rating
print(" - Normalizing ratings to remove user bias...")
user_avg_ratings = ratings.groupby('userId')['rating'].mean().to_dict()
ratings['rating_normalized'] = ratings.apply(
lambda row: row['rating'] - user_avg_ratings.get(row['userId'], 0), axis=1
)
# Use normalized ratings for all collaborative filtering steps
ratings_normalized = ratings[['userId', 'movieId', 'rating_normalized']].copy()
ratings_normalized.columns = ['userId', 'movieId', 'rating']
print(" - Normalization complete. All subsequent steps use deviation-adjusted ratings.\n")
# 3 Identify Specialist Persona Archetypes (The Centroid Logic)
print("Step 3: Identifying Specialist Centroids (Top 5 users per persona)...")
step_start = time.time()
persona_archetypes = {}
for persona, target_genres in PERSONA_TARGETS.items():
# Find movies in target genres
genre_movies = movies[movies['genres'].str.contains('|'.join(target_genres))]['movieId']
# Identify active users (MIN_USER_RATINGS ≥20 total ratings)
active_users = ratings_normalized.groupby('userId').size()
active_users = active_users[active_users >= MIN_USER_RATINGS].index
# Calculate Specialization Score for each active user using NORMALIZED ratings
# Formula: Specialization(u) = (Genre Density) × (Genre Passion)
# Genre Density = (Ratings in Target Genre / Total Ratings by User)
# Genre Passion = AvgRating(u, Target Genre) - based on NORMALIZED ratings
specialization_scores = {}
for user_id in active_users:
user_ratings = ratings_normalized[ratings_normalized['userId'] == user_id]
total_user_ratings = len(user_ratings)
# Get genre ratings for this user
genre_ratings = user_ratings[user_ratings['movieId'].isin(genre_movies)]
genre_rating_count = len(genre_ratings)
# Skip if user hasn't rated any genre movies
if genre_rating_count == 0:
continue
# Calculate specialization components (using normalized ratings)
genre_density = genre_rating_count / total_user_ratings # Proportion (normalized)
genre_passion = genre_ratings['rating'].mean() # Average DEVIATION for genre
# Specialization score (combined metric)
specialization_scores[user_id] = genre_density * genre_passion
# Select top 5 specialists by specialization score
top_5_specialists = sorted(specialization_scores.items(), key=lambda x: x[1], reverse=True)[:5]
top_5_specialists = [user_id for user_id, score in top_5_specialists]
print(f" - Found specialists for {persona}: {top_5_specialists}")
# Aggregate their NORMALIZED ratings (The Centroid)
centroid_ratings = ratings_normalized[ratings_normalized['userId'].isin(top_5_specialists)]
# Create centroid vector: average NORMALIZED rating per movie from the 5 specialists
aggregated_history = centroid_ratings.groupby('movieId')['rating'].mean().to_dict()
persona_archetypes[persona] = {
"specialist_ids": top_5_specialists,
"target_genres": target_genres,
"consolidated_history": aggregated_history
}
print(f">>> Step 3 (Specialist Centroids) complete in {time.time() - step_start:.2f}s")
# Step 4: Pre-compute Item-Item Similarities for app.py
print("Step 4: Pre-computing Item-Item Similarities (O(1) Lookups)...")
step_start = time.time()
pivot = ratings_normalized.pivot(index='userId', columns='movieId', values='rating').fillna(0)
item_sim_matrix = cosine_similarity(pivot.T)
# Define m_ids early and create O(1) lookup mapping
m_ids = pivot.columns.tolist()
m_id_to_index = {m_id: idx for idx, m_id in enumerate(m_ids)}
# Genre coverage analysis with O(1) lookup (fixed performance issue)
# === STEP 4A: Genre Coverage Analysis for Adaptive K Selection ===
# Problem: How many similar items (K) should we pre-compute for genre filtering?
# - Too low (K=20): Genre filter may not find enough candidates for all genres
# - Too high (K=100): Wastes storage and compute for marginal benefit
# Solution: Empirically test different K values and pick the smallest K that ensures
# every genre has sufficient similar items available (≥1.0 average per movie).
# This ensures genre-filtered recommendations in app.py won't be starved for choices.
print("Analyzing genre coverage for different K values...")
genre_coverage_analysis = {}
K_CANDIDATES = [20, 30, 50, 100]
optimal_k = 50 # Default fallback
for genre in ["Action", "Drama", "Romance", "Comedy", "Sci-Fi", "Thriller"]:
genre_movie_ids = set(movies[movies['genres'].str.contains(genre)]['movieId'])
genre_coverage_analysis[genre] = {}
# For each K candidate, measure coverage with O(1) lookup
for k in K_CANDIDATES:
coverage_count = 0
for m_id in m_ids:
m_idx = m_id_to_index[m_id] # O(1) lookup instead of m_ids.index()
sim_scores = item_sim_matrix[m_idx]
top_k_indices = np.argsort(sim_scores)[-(k+1):-1]
genre_match = sum(1 for idx in top_k_indices if m_ids[idx] in genre_movie_ids)
coverage_count += genre_match
avg_coverage = coverage_count / len(m_ids)
genre_coverage_analysis[genre][k] = avg_coverage
print("Avg similar items per movie in each genre:")
for genre, coverage_dict in genre_coverage_analysis.items():
print(f" {genre}: {coverage_dict}")
# Adaptively select K: find smallest K that gives >=1.0 avg items per genre
TARGET_MIN_COVERAGE = 1.0 # At least 1 similar item per genre on average
for k in sorted(K_CANDIDATES):
min_coverage = min(genre_coverage_analysis[g][k] for g in genre_coverage_analysis.keys())
if min_coverage >= TARGET_MIN_COVERAGE:
optimal_k = k
print(f"\n✅ Optimal K selected: {optimal_k} (min genre coverage: {min_coverage:.2f})")
break
print(f"Using K={optimal_k} for top similar items\n")
# Pre-compute similar items with adaptive K
top_sim_dict = {}
for i, m_id in enumerate(tqdm(m_ids, desc="Similarities")):
if str(m_id) not in movie_meta: continue
sim_scores = item_sim_matrix[i]
# Get top K similar (excluding self)
top_indices = np.argsort(sim_scores)[-(optimal_k+1):-1]
top_sim_dict[str(m_id)] = {str(m_ids[idx]): float(sim_scores[idx]) for idx in top_indices}
print(f">>> Step 4 (Item-Item Similarities with K={optimal_k}) complete in {time.time() - step_start:.2f}s\n")
# Save components
with open(f'{PROCESSED_DIR}/movie_metadata.json', 'w') as f:
json.dump(movie_meta, f, indent=4)
with open(f'{PROCESSED_DIR}/persona_archetypes.json', 'w') as f:
json.dump(persona_archetypes, f, cls=NumpyEncoder, indent=4)
with open(f'{PROCESSED_DIR}/user_avg_ratings.json', 'w') as f:
json.dump(user_avg_ratings, f, indent=4)
with open(f'{PROCESSED_DIR}/top_similar_items.pkl', 'wb') as f:
pickle.dump(top_sim_dict, f)
return persona_archetypes, movie_meta, pivot
# --- RECOMMENDATION ENGINE ---
def compute_home_recommendations(persona_archetypes, movie_meta, pivot):
print("Step 5: Computing Layer 4 Home Recs with Recency Bias...")
home_recs = {}
for persona, data in persona_archetypes.items():
history = data['consolidated_history']
target_genres = data['target_genres']
print(f" - Building centroid and finding neighborhood for {persona}...")
# Construct Centroid Vector
centroid_vec = pd.Series(0.0, index=pivot.columns, dtype=float) # Init with float to avoid warning
for m_id, rating in history.items():
if m_id in centroid_vec: centroid_vec[m_id] = rating
# Neighborhood Search (Find users similar to the centroid)
user_sims = cosine_similarity([centroid_vec], pivot)[0]
neighbor_idx = np.argsort(user_sims)[-50:]
neighbor_ratings = pivot.iloc[neighbor_idx]
# Weighted Candidate Scores (Layer 3: Collaborative Prediction)
# Formula: R_{p,m} = Σ(Similarity × Rating) / Σ|Similarity|
# This normalizes by total similarity to get average weighted score
numerator = neighbor_ratings.multiply(user_sims[neighbor_idx], axis=0).sum()
denominator = user_sims[neighbor_idx].sum()
candidates = numerator / denominator if denominator > 0 else numerator
print(f" - Applying Layer 4 re-ranking for {persona}...")
final_list = []
for m_id, raw_score in candidates.items():
m_id_str = str(m_id)
if m_id_str not in movie_meta: continue
if m_id in history: continue # Don't recommend what they've seen
meta = movie_meta[m_id_str]
# Layer 4a: Genre Affinity Boost
genre_match = any(g in target_genres for g in meta['genres'])
genre_multiplier = 2.5 if genre_match else 0.4
# Layer 4b: Recency Bias Logic (Claude's Penalty vs Bonus)
# Movies post-1980 get a bonus, older movies get a penalty proportional to age
if meta['year'] < DISCOVERY_ERA_YEAR:
# Penalty scales from 0.9 (1979) down to 0.5 (1930s)
age_factor = max(0.5, 1.0 - (DISCOVERY_ERA_YEAR - meta['year']) / 120)
else:
# Bonus scales from 1.0 (1980) up to 1.3 (Modern)
age_factor = min(1.3, 1.0 + (meta['year'] - DISCOVERY_ERA_YEAR) / 100)
final_score = raw_score * genre_multiplier * age_factor
final_list.append({
'movie_id': m_id,
'movie_title': meta['movie_title'],
'genres': meta['genres'],
'score': final_score
})
# Top 6 for Home Screen
home_recs[persona] = sorted(final_list, key=lambda x: x['score'], reverse=True)[:6]
top_movies_str = ", ".join([f"'{r['movie_title']}'" for r in home_recs[persona][:2]])
print(f" - Top recs for {persona}: {top_movies_str}...")
with open(f'{PROCESSED_DIR}/home_recommendations.json', 'w') as f:
json.dump(home_recs, f, indent=4)
return home_recs
def generate_hooks(home_recs):
print("Step 6: Generating LLM Hooks for Discovery Feed...")
cached_hooks = {}
for persona, recs in home_recs.items():
cached_hooks[persona] = {}
for r in recs:
title = r['movie_title']
prompt = f"Generate a 5-10 word snappy, atmospheric hook for the movie: {title}."
hook = call_llm_for_hook(prompt)
cached_hooks[persona][str(r['movie_id'])] = hook
time.sleep(0.5) # Increased sleep to avoid HF Backend Error 40001
with open(f'{PROCESSED_DIR}/cached_hooks.json', 'w') as f:
json.dump(cached_hooks, f, indent=4)
if __name__ == "__main__":
total_start = time.time()
# --- Phase 1: Data Loading, Cleaning, and Initial Computations ---
step_start = time.time()
archetypes, meta, p_matrix = prepare_data()
print(f">>> Phase 1 (Data Prep) complete in {time.time() - step_start:.2f}s\n")
# --- Phase 2: Compute Home Recommendations using Centroid Logic ---
step_start = time.time()
recs = compute_home_recommendations(archetypes, meta, p_matrix)
print(f">>> Phase 2 (Home Recs) complete in {time.time() - step_start:.2f}s\n")
# --- Phase 3: Generate LLM Hooks for the UI ---
step_start = time.time()
generate_hooks(recs)
print(f">>> Phase 3 (LLM Hooks) complete in {time.time() - step_start:.2f}s\n")
print(f"✅ SUCCESS: Full data pipeline complete in {time.time() - total_start:.2f} seconds.") |