two_tower_recsys / scripts /visualize_embeddings.py
minhajHP's picture
Reorganize codebase structure and fix category boosted recommendations
3ac748e
#!/usr/bin/env python3
"""
User and Item Embeddings Visualization
This script creates 2D visualizations of user and item embeddings from the
two-tower recommendation system to understand:
1. User clustering by demographics and preferences
2. Item clustering by categories and characteristics
3. User-item similarity patterns in embedding space
4. Quality of the learned representations
"""
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional
import json
from datetime import datetime
# Add src to path for imports
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
try:
from inference.recommendation_engine import RecommendationEngine
print("βœ… Successfully imported RecommendationEngine")
except Exception as e:
print(f"❌ Failed to import RecommendationEngine: {e}")
sys.exit(1)
# Optional imports for advanced visualization
try:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
HAS_SKLEARN = True
print("βœ… scikit-learn available for t-SNE/PCA")
except ImportError:
HAS_SKLEARN = False
print("⚠️ scikit-learn not available - using PCA approximation")
try:
import umap
HAS_UMAP = True
print("βœ… UMAP available for advanced dimensionality reduction")
except ImportError:
HAS_UMAP = False
print("⚠️ UMAP not available - using t-SNE/PCA only")
try:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
HAS_PLOTLY = True
print("βœ… Plotly available for interactive visualizations")
except ImportError:
HAS_PLOTLY = False
print("⚠️ Plotly not available - using matplotlib only")
class EmbeddingVisualizer:
"""Visualize user and item embeddings from the two-tower system."""
def __init__(self):
print("πŸ”§ Initializing Embedding Visualizer...")
try:
self.engine = RecommendationEngine()
print("βœ… Recommendation engine loaded successfully!")
except Exception as e:
print(f"❌ Failed to load recommendation engine: {e}")
raise
# Set up plotting style
plt.style.use('default')
sns.set_palette("husl")
def create_diverse_test_users(self) -> List[Dict]:
"""Create diverse test users for embedding visualization."""
return [
# Tech professionals
{
'name': 'YoungTechMale', 'age': 25, 'gender': 'male', 'income': 85000,
'profession': 'Technology', 'location': 'Urban', 'education_level': "Bachelor's",
'marital_status': 'Single', 'interaction_history': [1000978, 1001588, 1001618, 1002000],
'group': 'Tech_Professional', 'color': 'red'
},
{
'name': 'YoungTechFemale', 'age': 27, 'gender': 'female', 'income': 78000,
'profession': 'Technology', 'location': 'Urban', 'education_level': "Master's",
'marital_status': 'Single', 'interaction_history': [1000980, 1001590, 1001620, 1002010],
'group': 'Tech_Professional', 'color': 'red'
},
# Healthcare professionals
{
'name': 'HealthcareFemale1', 'age': 35, 'gender': 'female', 'income': 68000,
'profession': 'Healthcare', 'location': 'Suburban', 'education_level': "Master's",
'marital_status': 'Married', 'interaction_history': [1003000, 1003100, 1003200, 1003300],
'group': 'Healthcare_Professional', 'color': 'blue'
},
{
'name': 'HealthcareMale', 'age': 42, 'gender': 'male', 'income': 72000,
'profession': 'Healthcare', 'location': 'Urban', 'education_level': "Master's",
'marital_status': 'Married', 'interaction_history': [1003010, 1003110, 1003210, 1003310],
'group': 'Healthcare_Professional', 'color': 'blue'
},
# Finance professionals
{
'name': 'FinanceSenior', 'age': 45, 'gender': 'female', 'income': 120000,
'profession': 'Finance', 'location': 'Urban', 'education_level': "Master's",
'marital_status': 'Married', 'interaction_history': [1004000, 1004100, 1004200],
'group': 'Finance_Professional', 'color': 'green'
},
# Students/Low income
{
'name': 'YoungStudent', 'age': 20, 'gender': 'male', 'income': 15000,
'profession': 'Other', 'location': 'Urban', 'education_level': "Some College",
'marital_status': 'Single', 'interaction_history': [1005000, 1005100, 1005200],
'group': 'Student', 'color': 'orange'
},
{
'name': 'YoungStudentFemale', 'age': 21, 'gender': 'female', 'income': 12000,
'profession': 'Other', 'location': 'Urban', 'education_level': "Some College",
'marital_status': 'Single', 'interaction_history': [1005010, 1005110, 1005210],
'group': 'Student', 'color': 'orange'
},
# Seniors/Retirees
{
'name': 'SeniorRetiree', 'age': 67, 'gender': 'female', 'income': 35000,
'profession': 'Other', 'location': 'Rural', 'education_level': "High School",
'marital_status': 'Widowed', 'interaction_history': [1006000, 1006100],
'group': 'Senior', 'color': 'purple'
},
# Zero interaction users (cold start)
{
'name': 'ZeroTech', 'age': 30, 'gender': 'male', 'income': 75000,
'profession': 'Technology', 'location': 'Urban', 'education_level': "Bachelor's",
'marital_status': 'Single', 'interaction_history': [],
'group': 'Cold_Start', 'color': 'gray'
},
{
'name': 'ZeroHealthcare', 'age': 35, 'gender': 'female', 'income': 65000,
'profession': 'Healthcare', 'location': 'Suburban', 'education_level': "Master's",
'marital_status': 'Married', 'interaction_history': [],
'group': 'Cold_Start', 'color': 'gray'
},
{
'name': 'ZeroSenior', 'age': 60, 'gender': 'male', 'income': 40000,
'profession': 'Other', 'location': 'Rural', 'education_level': "High School",
'marital_status': 'Married', 'interaction_history': [],
'group': 'Cold_Start', 'color': 'gray'
}
]
def extract_user_embeddings(self, test_users: List[Dict]) -> Tuple[np.ndarray, List[str], List[str]]:
"""Extract user embeddings using the UserTower."""
print(f"\nπŸ“Š Extracting user embeddings...")
user_embeddings = []
user_names = []
user_groups = []
for user in test_users:
try:
# Get user embedding via UserTower
embedding = self.engine.get_user_embedding_enhanced(
age=user['age'],
gender=user['gender'],
income=user['income'],
profession=user['profession'],
location=user['location'],
education_level=user['education_level'],
marital_status=user['marital_status'],
interaction_history=user['interaction_history']
)
if embedding is not None:
user_embeddings.append(embedding)
user_names.append(user['name'])
user_groups.append(user['group'])
print(f" βœ… {user['name']}: {embedding.shape} embedding")
else:
print(f" ❌ {user['name']}: Failed to get embedding")
except Exception as e:
print(f" ❌ {user['name']}: Error - {e}")
if user_embeddings:
user_embeddings = np.array(user_embeddings)
print(f"πŸ“ˆ Extracted {len(user_embeddings)} user embeddings: {user_embeddings.shape}")
else:
print(f"❌ No user embeddings extracted!")
return user_embeddings, user_names, user_groups
def extract_item_embeddings(self, max_items: int = 1000) -> Tuple[np.ndarray, List[int], List[str]]:
"""Extract sample of item embeddings from FAISS index."""
print(f"\nπŸ“Š Extracting item embeddings (max {max_items})...")
# Get sample of items with diverse categories
items_df = self.engine.items_df.copy()
# Sample items stratified by category for diversity
item_embeddings = []
item_ids = []
item_categories = []
# Group by top-level category and sample
items_df['top_category'] = items_df['category_code'].str.split('.').str[0]
category_groups = items_df.groupby('top_category')
items_per_category = min(50, max_items // len(category_groups))
for category, group in category_groups:
if len(item_embeddings) >= max_items:
break
sample_size = min(items_per_category, len(group))
sample_items = group.sample(n=sample_size, random_state=42)
for _, item in sample_items.iterrows():
item_id = item['product_id']
# Get embedding from FAISS index
embedding = self.engine.faiss_index.get_item_embedding(item_id)
if embedding is not None:
item_embeddings.append(embedding)
item_ids.append(item_id)
item_categories.append(category)
if len(item_embeddings) >= max_items:
break
if item_embeddings:
item_embeddings = np.array(item_embeddings)
print(f"πŸ“ˆ Extracted {len(item_embeddings)} item embeddings: {item_embeddings.shape}")
# Show category distribution
category_counts = pd.Series(item_categories).value_counts()
print(f"πŸ“Š Category distribution: {dict(category_counts.head())}")
else:
print(f"❌ No item embeddings extracted!")
return item_embeddings, item_ids, item_categories
def simple_pca_2d(self, embeddings: np.ndarray) -> np.ndarray:
"""Simple PCA implementation for 2D reduction when sklearn not available."""
# Center the data
centered = embeddings - np.mean(embeddings, axis=0)
# Compute covariance matrix
cov_matrix = np.cov(centered.T)
# Compute eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# Sort by eigenvalues (descending)
idx = np.argsort(eigenvalues)[::-1]
eigenvectors = eigenvectors[:, idx]
# Project to 2D using top 2 components
reduced = centered @ eigenvectors[:, :2]
return reduced
def reduce_dimensions(self, embeddings: np.ndarray, method: str = 'tsne') -> np.ndarray:
"""Reduce embeddings to 2D for visualization."""
print(f"πŸ”„ Reducing dimensions using {method.upper()}...")
if method == 'pca':
if HAS_SKLEARN:
from sklearn.decomposition import PCA
reducer = PCA(n_components=2, random_state=42)
reduced = reducer.fit_transform(embeddings)
print(f" βœ… PCA explained variance: {reducer.explained_variance_ratio_.sum():.3f}")
else:
reduced = self.simple_pca_2d(embeddings)
print(f" βœ… Simple PCA reduction completed")
elif method == 'tsne' and HAS_SKLEARN:
# Use PCA first for speed if high dimensional
if embeddings.shape[1] > 50:
from sklearn.decomposition import PCA
n_components = min(50, embeddings.shape[0] - 1, embeddings.shape[1])
pca = PCA(n_components=n_components, random_state=42)
embeddings = pca.fit_transform(embeddings)
print(f" πŸ“‰ Pre-reduced to {n_components}D with PCA")
perplexity = min(30, max(5, embeddings.shape[0] - 1))
reducer = TSNE(n_components=2, random_state=42, perplexity=perplexity)
reduced = reducer.fit_transform(embeddings)
print(f" βœ… t-SNE reduction completed (perplexity={perplexity})")
elif method == 'umap' and HAS_UMAP:
reducer = umap.UMAP(n_components=2, random_state=42, n_neighbors=min(15, embeddings.shape[0]-1))
reduced = reducer.fit_transform(embeddings)
print(f" βœ… UMAP reduction completed")
else:
print(f" ⚠️ {method.upper()} not available, falling back to PCA")
reduced = self.simple_pca_2d(embeddings)
return reduced
def plot_user_embeddings(self, user_embeddings: np.ndarray, user_names: List[str],
user_groups: List[str], method: str = 'tsne') -> plt.Figure:
"""Create 2D plot of user embeddings."""
print(f"\nπŸ“ˆ Creating user embeddings plot...")
# Reduce dimensions
reduced_embeddings = self.reduce_dimensions(user_embeddings, method)
# Create plot
fig, ax = plt.subplots(figsize=(12, 8))
# Color map for groups
unique_groups = list(set(user_groups))
colors = plt.cm.Set1(np.linspace(0, 1, len(unique_groups)))
group_colors = dict(zip(unique_groups, colors))
# Plot points by group
for group in unique_groups:
mask = np.array(user_groups) == group
if np.any(mask):
x = reduced_embeddings[mask, 0]
y = reduced_embeddings[mask, 1]
names = np.array(user_names)[mask]
ax.scatter(x, y, c=[group_colors[group]], label=group, alpha=0.7, s=100)
# Add labels
for i, name in enumerate(names):
ax.annotate(name, (x[i], y[i]), xytext=(5, 5),
textcoords='offset points', fontsize=8, alpha=0.8)
ax.set_title(f'User Embeddings Visualization ({method.upper()})', fontsize=14, fontweight='bold')
ax.set_xlabel(f'{method.upper()} Component 1')
ax.set_ylabel(f'{method.upper()} Component 2')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_item_embeddings(self, item_embeddings: np.ndarray, item_categories: List[str],
method: str = 'tsne') -> plt.Figure:
"""Create 2D plot of item embeddings."""
print(f"\nπŸ“ˆ Creating item embeddings plot...")
# Reduce dimensions
reduced_embeddings = self.reduce_dimensions(item_embeddings, method)
# Create plot
fig, ax = plt.subplots(figsize=(12, 8))
# Color map for categories
unique_categories = list(set(item_categories))
colors = plt.cm.tab20(np.linspace(0, 1, len(unique_categories)))
category_colors = dict(zip(unique_categories, colors))
# Plot points by category
for category in unique_categories:
mask = np.array(item_categories) == category
if np.any(mask):
x = reduced_embeddings[mask, 0]
y = reduced_embeddings[mask, 1]
ax.scatter(x, y, c=[category_colors[category]], label=category,
alpha=0.6, s=30)
ax.set_title(f'Item Embeddings Visualization ({method.upper()})', fontsize=14, fontweight='bold')
ax.set_xlabel(f'{method.upper()} Component 1')
ax.set_ylabel(f'{method.upper()} Component 2')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=8)
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def plot_combined_embedding_space(self, user_embeddings: np.ndarray, item_embeddings: np.ndarray,
user_names: List[str], user_groups: List[str],
item_categories: List[str], method: str = 'tsne') -> plt.Figure:
"""Create combined plot showing users and items in same embedding space."""
print(f"\nπŸ“ˆ Creating combined embedding space plot...")
# Combine embeddings
all_embeddings = np.vstack([user_embeddings, item_embeddings])
# Reduce dimensions
reduced_embeddings = self.reduce_dimensions(all_embeddings, method)
# Split back
n_users = len(user_embeddings)
user_reduced = reduced_embeddings[:n_users]
item_reduced = reduced_embeddings[n_users:]
# Create plot
fig, ax = plt.subplots(figsize=(14, 10))
# Plot items first (as background)
unique_categories = list(set(item_categories))
item_colors = plt.cm.tab20(np.linspace(0, 1, len(unique_categories)))
category_colors = dict(zip(unique_categories, item_colors))
for category in unique_categories:
mask = np.array(item_categories) == category
if np.any(mask):
x = item_reduced[mask, 0]
y = item_reduced[mask, 1]
ax.scatter(x, y, c=[category_colors[category]], label=f'Items: {category}',
alpha=0.3, s=20, marker='.')
# Plot users on top
unique_groups = list(set(user_groups))
user_colors = plt.cm.Set1(np.linspace(0, 1, len(unique_groups)))
group_colors = dict(zip(unique_groups, user_colors))
for group in unique_groups:
mask = np.array(user_groups) == group
if np.any(mask):
x = user_reduced[mask, 0]
y = user_reduced[mask, 1]
names = np.array(user_names)[mask]
ax.scatter(x, y, c=[group_colors[group]], label=f'Users: {group}',
alpha=0.8, s=150, marker='*', edgecolors='black', linewidths=0.5)
# Add user labels
for i, name in enumerate(names):
ax.annotate(name, (x[i], y[i]), xytext=(5, 5),
textcoords='offset points', fontsize=8, fontweight='bold')
ax.set_title(f'Combined User-Item Embedding Space ({method.upper()})', fontsize=14, fontweight='bold')
ax.set_xlabel(f'{method.upper()} Component 1')
ax.set_ylabel(f'{method.upper()} Component 2')
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left', fontsize=8)
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def analyze_embedding_quality(self, user_embeddings: np.ndarray, user_groups: List[str],
item_embeddings: np.ndarray, item_categories: List[str]) -> Dict:
"""Analyze the quality of learned embeddings."""
print(f"\nπŸ” Analyzing embedding quality...")
analysis = {}
# User embedding analysis
print(f"πŸ‘₯ User Embedding Analysis:")
analysis['user_stats'] = {
'count': len(user_embeddings),
'dimensions': user_embeddings.shape[1],
'mean_norm': np.mean(np.linalg.norm(user_embeddings, axis=1)),
'std_norm': np.std(np.linalg.norm(user_embeddings, axis=1))
}
# Calculate within-group vs between-group similarities for users
if len(user_embeddings) > 1:
user_similarities = np.dot(user_embeddings, user_embeddings.T)
within_group_sims = []
between_group_sims = []
for i in range(len(user_groups)):
for j in range(i+1, len(user_groups)):
sim = user_similarities[i, j]
if user_groups[i] == user_groups[j]:
within_group_sims.append(sim)
else:
between_group_sims.append(sim)
analysis['user_clustering'] = {
'within_group_similarity': np.mean(within_group_sims) if within_group_sims else 0,
'between_group_similarity': np.mean(between_group_sims) if between_group_sims else 0,
'separation_score': (np.mean(within_group_sims) - np.mean(between_group_sims)) if within_group_sims and between_group_sims else 0
}
print(f" Within-group similarity: {analysis['user_clustering']['within_group_similarity']:.3f}")
print(f" Between-group similarity: {analysis['user_clustering']['between_group_similarity']:.3f}")
print(f" Separation score: {analysis['user_clustering']['separation_score']:.3f}")
# Item embedding analysis
print(f"πŸ›οΈ Item Embedding Analysis:")
analysis['item_stats'] = {
'count': len(item_embeddings),
'dimensions': item_embeddings.shape[1],
'mean_norm': np.mean(np.linalg.norm(item_embeddings, axis=1)),
'std_norm': np.std(np.linalg.norm(item_embeddings, axis=1))
}
print(f" πŸ“Š Stats: {analysis['user_stats']['count']} users, {analysis['item_stats']['count']} items")
print(f" πŸ“ Dimensions: {analysis['user_stats']['dimensions']}")
print(f" πŸ“ User norm: {analysis['user_stats']['mean_norm']:.3f} Β± {analysis['user_stats']['std_norm']:.3f}")
print(f" πŸ“ Item norm: {analysis['item_stats']['mean_norm']:.3f} Β± {analysis['item_stats']['std_norm']:.3f}")
return analysis
def save_results(self, figures: List[plt.Figure], analysis: Dict, timestamp: str):
"""Save visualization results."""
print(f"\nπŸ’Ύ Saving visualization results...")
# Save figures
for i, fig in enumerate(figures):
filename = f"embedding_visualization_{i+1}_{timestamp}.png"
fig.savefig(filename, dpi=300, bbox_inches='tight')
print(f" πŸ“Š Saved figure: {filename}")
# Save analysis
analysis_file = f"embedding_analysis_{timestamp}.json"
with open(analysis_file, 'w') as f:
# Convert numpy types to Python types for JSON serialization
json_analysis = {}
for key, value in analysis.items():
if isinstance(value, dict):
json_analysis[key] = {k: float(v) if isinstance(v, (np.float32, np.float64)) else v
for k, v in value.items()}
else:
json_analysis[key] = value
json.dump(json_analysis, f, indent=2)
print(f" πŸ“„ Saved analysis: {analysis_file}")
def run_visualization(self, max_items: int = 500, methods: List[str] = ['tsne']):
"""Run complete embedding visualization pipeline."""
print("πŸš€ Starting Embedding Visualization Pipeline")
print("="*60)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create test users
test_users = self.create_diverse_test_users()
print(f"πŸ‘₯ Created {len(test_users)} diverse test users")
# Extract embeddings
user_embeddings, user_names, user_groups = self.extract_user_embeddings(test_users)
item_embeddings, item_ids, item_categories = self.extract_item_embeddings(max_items)
if len(user_embeddings) == 0 or len(item_embeddings) == 0:
print("❌ Failed to extract embeddings - cannot proceed")
return
# Analyze embedding quality
analysis = self.analyze_embedding_quality(user_embeddings, user_groups,
item_embeddings, item_categories)
# Create visualizations
figures = []
for method in methods:
print(f"\n🎨 Creating visualizations with {method.upper()}...")
# User embeddings plot
user_fig = self.plot_user_embeddings(user_embeddings, user_names, user_groups, method)
figures.append(user_fig)
# Item embeddings plot (sample for visibility)
sample_size = min(300, len(item_embeddings))
sample_idx = np.random.choice(len(item_embeddings), sample_size, replace=False)
item_sample_emb = item_embeddings[sample_idx]
item_sample_cat = [item_categories[i] for i in sample_idx]
item_fig = self.plot_item_embeddings(item_sample_emb, item_sample_cat, method)
figures.append(item_fig)
# Combined plot (smaller sample for clarity)
if len(item_embeddings) > 200:
sample_idx = np.random.choice(len(item_embeddings), 200, replace=False)
combined_item_emb = item_embeddings[sample_idx]
combined_item_cat = [item_categories[i] for i in sample_idx]
else:
combined_item_emb = item_embeddings
combined_item_cat = item_categories
combined_fig = self.plot_combined_embedding_space(
user_embeddings, combined_item_emb, user_names, user_groups,
combined_item_cat, method
)
figures.append(combined_fig)
# Save results
self.save_results(figures, analysis, timestamp)
# Show plots
print(f"\nπŸŽ‰ Visualization completed!")
print(f"πŸ“Š Generated {len(figures)} visualizations")
print(f"πŸ” Embedding quality analysis completed")
if HAS_PLOTLY:
print(f"πŸ’‘ Interactive Plotly visualizations could be added for better exploration")
plt.show()
return figures, analysis
def main():
"""Run the embedding visualization."""
try:
visualizer = EmbeddingVisualizer()
# Configure visualization
methods = []
if HAS_UMAP:
methods.append('umap')
if HAS_SKLEARN:
methods.append('tsne')
methods.append('pca') # Always available
# Run visualization
figures, analysis = visualizer.run_visualization(
max_items=800,
methods=methods[:2] # Use top 2 methods to avoid too many plots
)
print(f"\nβœ… Embedding visualization completed successfully!")
except Exception as e:
print(f"❌ Visualization failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()