# data_prep.py - COMPLETE ARCHETYPE EDITION # Integrates: 20-rating threshold, Recency Bias Penalty, and Specialist Centroid Logic. import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity import json import pickle import os import re from tqdm import tqdm import time import litellm from config import HF_TOKEN, LLM_MODEL_NAME # --- CUSTOM JSON ENCODER --- class NumpyEncoder(json.JSONEncoder): """Custom encoder for numpy data types to avoid JSON serializable errors.""" def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NumpyEncoder, self).default(obj) # --- CONSTANTS & CONFIG --- MOVIELENS_DIR = 'data/ml-latest-small' PROCESSED_DIR = 'data/processed' MIN_RATINGS_THRESHOLD = 20 # Popularity filter for items MIN_USER_RATINGS = 20 # Minimum activity for specialist candidates DISCOVERY_ERA_YEAR = 1980 # Threshold for Recency Bias logic # Specialist Persona Targets (Used to find the top 5 specialists per category) PERSONA_TARGETS = { "Action Junkie": ["Action", "Adventure", "Sci-Fi"], "Romantic Dreamer": ["Romance"], "Cinephile Critic": ["Drama", "Crime", "Mystery", "Thriller"] } os.makedirs(PROCESSED_DIR, exist_ok=True) # --- LLM HELPER --- def call_llm_for_hook(prompt): """Calls the LLM via LiteLLM to generate snappy movie hooks.""" try: print(f"DEBUG: Calling LLM for hook with model: {LLM_MODEL_NAME}") start_ts = time.time() response = litellm.completion( model=LLM_MODEL_NAME, messages=[{"role": "user", "content": prompt}], api_key=HF_TOKEN, max_tokens=40, temperature=0.8 ) print(f"DEBUG: Full LLM Response Object: {response}") print(f"DEBUG: LLM call completed in {time.time() - start_ts:.4f}s") return response.choices[0].message.content.strip().replace('"', '') except Exception as e: return "A standout pick for your collection." def extract_year(title): """Regex to pull (YYYY) from movie titles.""" match = re.search(r'\((\d{4})\)', title) return int(match.group(1)) if match else 0 # --- CORE PREPARATION --- def prepare_data(): print("Step 1: Loading raw MovieLens data...") ratings = pd.read_csv(f'{MOVIELENS_DIR}/ratings.csv') movies = pd.read_csv(f'{MOVIELENS_DIR}/movies.csv') # 2 Process Metadata print("Step 2: Processing metadata and applying thresholds...") movies['year'] = movies['title'].apply(extract_year) # Extract (YYYY) from titles for recency bias # Filter movies by popularity: count ratings per movie, then keep only those with ≥20 ratings popular_ids = ratings.groupby('movieId').size() popular_ids = popular_ids[popular_ids >= MIN_RATINGS_THRESHOLD].index filtered_movies = movies[movies['movieId'].isin(popular_ids)].copy() movie_meta = {} for _, row in filtered_movies.iterrows(): movie_meta[str(row['movieId'])] = { 'movie_title': row['title'], 'genres': row['genres'].split('|'), 'year': row['year'] } # === RATING NORMALIZATION (User Bias Correction) === # Problem: Different users have different rating scales (generous vs. strict) # Solution: Subtract each user's average rating from their individual ratings # This creates deviation scores that are comparable across users # Formula: normalized_rating = rating - user_avg_rating print(" - Normalizing ratings to remove user bias...") user_avg_ratings = ratings.groupby('userId')['rating'].mean().to_dict() ratings['rating_normalized'] = ratings.apply( lambda row: row['rating'] - user_avg_ratings.get(row['userId'], 0), axis=1 ) # Use normalized ratings for all collaborative filtering steps ratings_normalized = ratings[['userId', 'movieId', 'rating_normalized']].copy() ratings_normalized.columns = ['userId', 'movieId', 'rating'] print(" - Normalization complete. All subsequent steps use deviation-adjusted ratings.\n") # 3 Identify Specialist Persona Archetypes (The Centroid Logic) print("Step 3: Identifying Specialist Centroids (Top 5 users per persona)...") step_start = time.time() persona_archetypes = {} for persona, target_genres in PERSONA_TARGETS.items(): # Find movies in target genres genre_movies = movies[movies['genres'].str.contains('|'.join(target_genres))]['movieId'] # Identify active users (MIN_USER_RATINGS ≥20 total ratings) active_users = ratings_normalized.groupby('userId').size() active_users = active_users[active_users >= MIN_USER_RATINGS].index # Calculate Specialization Score for each active user using NORMALIZED ratings # Formula: Specialization(u) = (Genre Density) × (Genre Passion) # Genre Density = (Ratings in Target Genre / Total Ratings by User) # Genre Passion = AvgRating(u, Target Genre) - based on NORMALIZED ratings specialization_scores = {} for user_id in active_users: user_ratings = ratings_normalized[ratings_normalized['userId'] == user_id] total_user_ratings = len(user_ratings) # Get genre ratings for this user genre_ratings = user_ratings[user_ratings['movieId'].isin(genre_movies)] genre_rating_count = len(genre_ratings) # Skip if user hasn't rated any genre movies if genre_rating_count == 0: continue # Calculate specialization components (using normalized ratings) genre_density = genre_rating_count / total_user_ratings # Proportion (normalized) genre_passion = genre_ratings['rating'].mean() # Average DEVIATION for genre # Specialization score (combined metric) specialization_scores[user_id] = genre_density * genre_passion # Select top 5 specialists by specialization score top_5_specialists = sorted(specialization_scores.items(), key=lambda x: x[1], reverse=True)[:5] top_5_specialists = [user_id for user_id, score in top_5_specialists] print(f" - Found specialists for {persona}: {top_5_specialists}") # Aggregate their NORMALIZED ratings (The Centroid) centroid_ratings = ratings_normalized[ratings_normalized['userId'].isin(top_5_specialists)] # Create centroid vector: average NORMALIZED rating per movie from the 5 specialists aggregated_history = centroid_ratings.groupby('movieId')['rating'].mean().to_dict() persona_archetypes[persona] = { "specialist_ids": top_5_specialists, "target_genres": target_genres, "consolidated_history": aggregated_history } print(f">>> Step 3 (Specialist Centroids) complete in {time.time() - step_start:.2f}s") # Step 4: Pre-compute Item-Item Similarities for app.py print("Step 4: Pre-computing Item-Item Similarities (O(1) Lookups)...") step_start = time.time() pivot = ratings_normalized.pivot(index='userId', columns='movieId', values='rating').fillna(0) item_sim_matrix = cosine_similarity(pivot.T) # Define m_ids early and create O(1) lookup mapping m_ids = pivot.columns.tolist() m_id_to_index = {m_id: idx for idx, m_id in enumerate(m_ids)} # Genre coverage analysis with O(1) lookup (fixed performance issue) # === STEP 4A: Genre Coverage Analysis for Adaptive K Selection === # Problem: How many similar items (K) should we pre-compute for genre filtering? # - Too low (K=20): Genre filter may not find enough candidates for all genres # - Too high (K=100): Wastes storage and compute for marginal benefit # Solution: Empirically test different K values and pick the smallest K that ensures # every genre has sufficient similar items available (≥1.0 average per movie). # This ensures genre-filtered recommendations in app.py won't be starved for choices. print("Analyzing genre coverage for different K values...") genre_coverage_analysis = {} K_CANDIDATES = [20, 30, 50, 100] optimal_k = 50 # Default fallback for genre in ["Action", "Drama", "Romance", "Comedy", "Sci-Fi", "Thriller"]: genre_movie_ids = set(movies[movies['genres'].str.contains(genre)]['movieId']) genre_coverage_analysis[genre] = {} # For each K candidate, measure coverage with O(1) lookup for k in K_CANDIDATES: coverage_count = 0 for m_id in m_ids: m_idx = m_id_to_index[m_id] # O(1) lookup instead of m_ids.index() sim_scores = item_sim_matrix[m_idx] top_k_indices = np.argsort(sim_scores)[-(k+1):-1] genre_match = sum(1 for idx in top_k_indices if m_ids[idx] in genre_movie_ids) coverage_count += genre_match avg_coverage = coverage_count / len(m_ids) genre_coverage_analysis[genre][k] = avg_coverage print("Avg similar items per movie in each genre:") for genre, coverage_dict in genre_coverage_analysis.items(): print(f" {genre}: {coverage_dict}") # Adaptively select K: find smallest K that gives >=1.0 avg items per genre TARGET_MIN_COVERAGE = 1.0 # At least 1 similar item per genre on average for k in sorted(K_CANDIDATES): min_coverage = min(genre_coverage_analysis[g][k] for g in genre_coverage_analysis.keys()) if min_coverage >= TARGET_MIN_COVERAGE: optimal_k = k print(f"\n✅ Optimal K selected: {optimal_k} (min genre coverage: {min_coverage:.2f})") break print(f"Using K={optimal_k} for top similar items\n") # Pre-compute similar items with adaptive K top_sim_dict = {} for i, m_id in enumerate(tqdm(m_ids, desc="Similarities")): if str(m_id) not in movie_meta: continue sim_scores = item_sim_matrix[i] # Get top K similar (excluding self) top_indices = np.argsort(sim_scores)[-(optimal_k+1):-1] top_sim_dict[str(m_id)] = {str(m_ids[idx]): float(sim_scores[idx]) for idx in top_indices} print(f">>> Step 4 (Item-Item Similarities with K={optimal_k}) complete in {time.time() - step_start:.2f}s\n") # Save components with open(f'{PROCESSED_DIR}/movie_metadata.json', 'w') as f: json.dump(movie_meta, f, indent=4) with open(f'{PROCESSED_DIR}/persona_archetypes.json', 'w') as f: json.dump(persona_archetypes, f, cls=NumpyEncoder, indent=4) with open(f'{PROCESSED_DIR}/user_avg_ratings.json', 'w') as f: json.dump(user_avg_ratings, f, indent=4) with open(f'{PROCESSED_DIR}/top_similar_items.pkl', 'wb') as f: pickle.dump(top_sim_dict, f) return persona_archetypes, movie_meta, pivot # --- RECOMMENDATION ENGINE --- def compute_home_recommendations(persona_archetypes, movie_meta, pivot): print("Step 5: Computing Layer 4 Home Recs with Recency Bias...") home_recs = {} for persona, data in persona_archetypes.items(): history = data['consolidated_history'] target_genres = data['target_genres'] print(f" - Building centroid and finding neighborhood for {persona}...") # Construct Centroid Vector centroid_vec = pd.Series(0.0, index=pivot.columns, dtype=float) # Init with float to avoid warning for m_id, rating in history.items(): if m_id in centroid_vec: centroid_vec[m_id] = rating # Neighborhood Search (Find users similar to the centroid) user_sims = cosine_similarity([centroid_vec], pivot)[0] neighbor_idx = np.argsort(user_sims)[-50:] neighbor_ratings = pivot.iloc[neighbor_idx] # Weighted Candidate Scores (Layer 3: Collaborative Prediction) # Formula: R_{p,m} = Σ(Similarity × Rating) / Σ|Similarity| # This normalizes by total similarity to get average weighted score numerator = neighbor_ratings.multiply(user_sims[neighbor_idx], axis=0).sum() denominator = user_sims[neighbor_idx].sum() candidates = numerator / denominator if denominator > 0 else numerator print(f" - Applying Layer 4 re-ranking for {persona}...") final_list = [] for m_id, raw_score in candidates.items(): m_id_str = str(m_id) if m_id_str not in movie_meta: continue if m_id in history: continue # Don't recommend what they've seen meta = movie_meta[m_id_str] # Layer 4a: Genre Affinity Boost genre_match = any(g in target_genres for g in meta['genres']) genre_multiplier = 2.5 if genre_match else 0.4 # Layer 4b: Recency Bias Logic (Claude's Penalty vs Bonus) # Movies post-1980 get a bonus, older movies get a penalty proportional to age if meta['year'] < DISCOVERY_ERA_YEAR: # Penalty scales from 0.9 (1979) down to 0.5 (1930s) age_factor = max(0.5, 1.0 - (DISCOVERY_ERA_YEAR - meta['year']) / 120) else: # Bonus scales from 1.0 (1980) up to 1.3 (Modern) age_factor = min(1.3, 1.0 + (meta['year'] - DISCOVERY_ERA_YEAR) / 100) final_score = raw_score * genre_multiplier * age_factor final_list.append({ 'movie_id': m_id, 'movie_title': meta['movie_title'], 'genres': meta['genres'], 'score': final_score }) # Top 6 for Home Screen home_recs[persona] = sorted(final_list, key=lambda x: x['score'], reverse=True)[:6] top_movies_str = ", ".join([f"'{r['movie_title']}'" for r in home_recs[persona][:2]]) print(f" - Top recs for {persona}: {top_movies_str}...") with open(f'{PROCESSED_DIR}/home_recommendations.json', 'w') as f: json.dump(home_recs, f, indent=4) return home_recs def generate_hooks(home_recs): print("Step 6: Generating LLM Hooks for Discovery Feed...") cached_hooks = {} for persona, recs in home_recs.items(): cached_hooks[persona] = {} for r in recs: title = r['movie_title'] prompt = f"Generate a 5-10 word snappy, atmospheric hook for the movie: {title}." hook = call_llm_for_hook(prompt) cached_hooks[persona][str(r['movie_id'])] = hook time.sleep(0.5) # Increased sleep to avoid HF Backend Error 40001 with open(f'{PROCESSED_DIR}/cached_hooks.json', 'w') as f: json.dump(cached_hooks, f, indent=4) if __name__ == "__main__": total_start = time.time() # --- Phase 1: Data Loading, Cleaning, and Initial Computations --- step_start = time.time() archetypes, meta, p_matrix = prepare_data() print(f">>> Phase 1 (Data Prep) complete in {time.time() - step_start:.2f}s\n") # --- Phase 2: Compute Home Recommendations using Centroid Logic --- step_start = time.time() recs = compute_home_recommendations(archetypes, meta, p_matrix) print(f">>> Phase 2 (Home Recs) complete in {time.time() - step_start:.2f}s\n") # --- Phase 3: Generate LLM Hooks for the UI --- step_start = time.time() generate_hooks(recs) print(f">>> Phase 3 (LLM Hooks) complete in {time.time() - step_start:.2f}s\n") print(f"✅ SUCCESS: Full data pipeline complete in {time.time() - total_start:.2f} seconds.")