SPG_ML / personalization /recommendation_engine.py
meetmendapara's picture
Initial commit for ML space
df31aa1
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Dict
import pickle
import os
from .feature_engineering import FeatureEngineer
class RecommendationEngine:
def __init__(self):
self.feature_engineer = FeatureEngineer()
self.collaborative_model = None
self.load_models()
def load_models(self):
"""Load trained models"""
model_path = 'models/personalization/recommendation_model.pkl'
if os.path.exists(model_path):
try:
with open(model_path, 'rb') as f:
self.collaborative_model = pickle.load(f)
print("Loaded recommendation model")
except Exception as e:
print(f"Failed to load recommendation model: {e}")
def get_recommendations(self, user_id: str, limit: int = 10) -> List[Dict]:
"""Get hybrid recommendations for a user"""
# Get user features
user_features = self.feature_engineer.compute_user_features(user_id)
# Get candidate items
candidates = self._get_candidate_items(user_id)
if not candidates:
return []
# Score using hybrid approach
scored_items = []
for item in candidates:
# Collaborative filtering score
collab_score = self._collaborative_score(user_id, item['id'], item['type'])
# Content-based score
content_score = self._content_based_score(user_features, item)
# Personality-based score
personality_score = self._personality_score(user_features, item)
# Hybrid score (weighted combination)
final_score = (
0.4 * collab_score +
0.3 * content_score +
0.3 * personality_score
)
scored_items.append({
'item_id': item['id'],
'item_type': item['type'],
'score': float(final_score),
'algorithm': 'hybrid',
'metadata': {
'reason': self._generate_reason(user_features, item),
'confidence': min(float(final_score), 1.0),
'collab_score': float(collab_score),
'content_score': float(content_score),
'personality_score': float(personality_score)
}
})
# Sort by score and return top N
scored_items.sort(key=lambda x: x['score'], reverse=True)
# Add rank
for i, item in enumerate(scored_items[:limit]):
item['rank'] = i + 1
return scored_items[:limit]
def _get_candidate_items(self, user_id: str) -> List[Dict]:
"""Get candidate items for recommendation"""
conn = self.feature_engineer.conn
candidates = []
# Get tasks user hasn't viewed recently
query = """
SELECT t.id, 'task' as type, t.title, t.category, t.priority
FROM tasks t
WHERE t.status != 'COMPLETED'
AND t.id NOT IN (
SELECT item_id FROM user_events
WHERE user_id = %s
AND item_type = 'task'
AND event_type IN ('view', 'complete')
AND timestamp > NOW() - INTERVAL '7 days'
)
LIMIT 50
"""
cursor = conn.cursor()
cursor.execute(query, (user_id,))
for row in cursor.fetchall():
candidates.append({
'id': row[0],
'type': row[1],
'title': row[2],
'category': row[3],
'priority': row[4]
})
return candidates
def _collaborative_score(self, user_id: str, item_id: str, item_type: str) -> float:
"""Collaborative filtering score using user-item interactions"""
if not self.collaborative_model:
return 0.5 # Neutral score if model not available
# Get similar users who interacted with this item
conn = self.feature_engineer.conn
query = """
SELECT DISTINCT user_id
FROM user_events
WHERE item_id = %s
AND item_type = %s
AND event_type IN ('view', 'click', 'complete')
LIMIT 10
"""
cursor = conn.cursor()
cursor.execute(query, (item_id, item_type))
similar_users = [row[0] for row in cursor.fetchall()]
if not similar_users:
return 0.3 # Low score for items with no interactions
# Compute similarity between current user and users who liked this item
user_features = self.feature_engineer.compute_user_features(user_id)
similarities = []
for other_user in similar_users:
if other_user == user_id:
continue
try:
other_features = self.feature_engineer.compute_user_features(other_user)
sim = self._compute_user_similarity(user_features, other_features)
similarities.append(sim)
except:
continue
return float(np.mean(similarities)) if similarities else 0.3
def _content_based_score(self, user_features: Dict, item: Dict) -> float:
"""Content-based score using feature similarity"""
# Match category preferences
category_score = 0.0
item_category = item.get('category', 'other')
user_categories = user_features.get('category_preferences', {})
if item_category in user_categories:
total_views = sum(user_categories.values())
category_score = user_categories[item_category] / max(total_views, 1)
# Match priority/difficulty with user's completion rate
completion_rate = user_features.get('monetary_score', 0.5)
item_priority = item.get('priority', 'MEDIUM')
priority_score = 0.5
if item_priority == 'HIGH' and completion_rate > 0.7:
priority_score = 0.8
elif item_priority == 'LOW' and completion_rate < 0.3:
priority_score = 0.7
return 0.6 * category_score + 0.4 * priority_score
def _personality_score(self, user_features: Dict, item: Dict) -> float:
"""Score based on personality-task fit"""
# High conscientiousness → prefer structured, high-priority tasks
conscientiousness = user_features.get('conscientiousness', 0.5)
# High openness → prefer creative, diverse tasks
openness = user_features.get('openness', 0.5)
# High neuroticism → prefer low-stress, clear tasks
neuroticism = user_features.get('neuroticism', 0.5)
score = 0.5
if item.get('priority') == 'HIGH':
score += conscientiousness * 0.3
if item.get('category') in ['creative', 'learning']:
score += openness * 0.2
if item.get('priority') == 'LOW':
score += neuroticism * 0.2
return min(score, 1.0)
def _compute_user_similarity(self, user1: Dict, user2: Dict) -> float:
"""Compute similarity between two users"""
# Use personality traits and behavior patterns
traits1 = np.array([
user1.get('openness', 0.5),
user1.get('conscientiousness', 0.5),
user1.get('extraversion', 0.5),
user1.get('agreeableness', 0.5),
user1.get('neuroticism', 0.5),
user1.get('recency_score', 0.0),
user1.get('frequency_score', 0.0),
user1.get('monetary_score', 0.0)
]).reshape(1, -1)
traits2 = np.array([
user2.get('openness', 0.5),
user2.get('conscientiousness', 0.5),
user2.get('extraversion', 0.5),
user2.get('agreeableness', 0.5),
user2.get('neuroticism', 0.5),
user2.get('recency_score', 0.0),
user2.get('frequency_score', 0.0),
user2.get('monetary_score', 0.0)
]).reshape(1, -1)
similarity = cosine_similarity(traits1, traits2)[0][0]
return float(similarity)
def _generate_reason(self, user_features: Dict, item: Dict) -> str:
"""Generate human-readable reason for recommendation"""
category = item.get('category', 'general')
user_categories = user_features.get('category_preferences', {})
if category in user_categories and user_categories[category] > 5:
return f"Based on your interest in {category} tasks"
conscientiousness = user_features.get('conscientiousness', 0.5)
if conscientiousness > 0.7:
return "Matches your organized work style"
return "Popular among similar users"