Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from typing import List, Dict | |
| import pickle | |
| import os | |
| from .feature_engineering import FeatureEngineer | |
| class RecommendationEngine: | |
| def __init__(self): | |
| self.feature_engineer = FeatureEngineer() | |
| self.collaborative_model = None | |
| self.load_models() | |
| def load_models(self): | |
| """Load trained models""" | |
| model_path = 'models/personalization/recommendation_model.pkl' | |
| if os.path.exists(model_path): | |
| try: | |
| with open(model_path, 'rb') as f: | |
| self.collaborative_model = pickle.load(f) | |
| print("Loaded recommendation model") | |
| except Exception as e: | |
| print(f"Failed to load recommendation model: {e}") | |
| def get_recommendations(self, user_id: str, limit: int = 10) -> List[Dict]: | |
| """Get hybrid recommendations for a user""" | |
| # Get user features | |
| user_features = self.feature_engineer.compute_user_features(user_id) | |
| # Get candidate items | |
| candidates = self._get_candidate_items(user_id) | |
| if not candidates: | |
| return [] | |
| # Score using hybrid approach | |
| scored_items = [] | |
| for item in candidates: | |
| # Collaborative filtering score | |
| collab_score = self._collaborative_score(user_id, item['id'], item['type']) | |
| # Content-based score | |
| content_score = self._content_based_score(user_features, item) | |
| # Personality-based score | |
| personality_score = self._personality_score(user_features, item) | |
| # Hybrid score (weighted combination) | |
| final_score = ( | |
| 0.4 * collab_score + | |
| 0.3 * content_score + | |
| 0.3 * personality_score | |
| ) | |
| scored_items.append({ | |
| 'item_id': item['id'], | |
| 'item_type': item['type'], | |
| 'score': float(final_score), | |
| 'algorithm': 'hybrid', | |
| 'metadata': { | |
| 'reason': self._generate_reason(user_features, item), | |
| 'confidence': min(float(final_score), 1.0), | |
| 'collab_score': float(collab_score), | |
| 'content_score': float(content_score), | |
| 'personality_score': float(personality_score) | |
| } | |
| }) | |
| # Sort by score and return top N | |
| scored_items.sort(key=lambda x: x['score'], reverse=True) | |
| # Add rank | |
| for i, item in enumerate(scored_items[:limit]): | |
| item['rank'] = i + 1 | |
| return scored_items[:limit] | |
| def _get_candidate_items(self, user_id: str) -> List[Dict]: | |
| """Get candidate items for recommendation""" | |
| conn = self.feature_engineer.conn | |
| candidates = [] | |
| # Get tasks user hasn't viewed recently | |
| query = """ | |
| SELECT t.id, 'task' as type, t.title, t.category, t.priority | |
| FROM tasks t | |
| WHERE t.status != 'COMPLETED' | |
| AND t.id NOT IN ( | |
| SELECT item_id FROM user_events | |
| WHERE user_id = %s | |
| AND item_type = 'task' | |
| AND event_type IN ('view', 'complete') | |
| AND timestamp > NOW() - INTERVAL '7 days' | |
| ) | |
| LIMIT 50 | |
| """ | |
| cursor = conn.cursor() | |
| cursor.execute(query, (user_id,)) | |
| for row in cursor.fetchall(): | |
| candidates.append({ | |
| 'id': row[0], | |
| 'type': row[1], | |
| 'title': row[2], | |
| 'category': row[3], | |
| 'priority': row[4] | |
| }) | |
| return candidates | |
| def _collaborative_score(self, user_id: str, item_id: str, item_type: str) -> float: | |
| """Collaborative filtering score using user-item interactions""" | |
| if not self.collaborative_model: | |
| return 0.5 # Neutral score if model not available | |
| # Get similar users who interacted with this item | |
| conn = self.feature_engineer.conn | |
| query = """ | |
| SELECT DISTINCT user_id | |
| FROM user_events | |
| WHERE item_id = %s | |
| AND item_type = %s | |
| AND event_type IN ('view', 'click', 'complete') | |
| LIMIT 10 | |
| """ | |
| cursor = conn.cursor() | |
| cursor.execute(query, (item_id, item_type)) | |
| similar_users = [row[0] for row in cursor.fetchall()] | |
| if not similar_users: | |
| return 0.3 # Low score for items with no interactions | |
| # Compute similarity between current user and users who liked this item | |
| user_features = self.feature_engineer.compute_user_features(user_id) | |
| similarities = [] | |
| for other_user in similar_users: | |
| if other_user == user_id: | |
| continue | |
| try: | |
| other_features = self.feature_engineer.compute_user_features(other_user) | |
| sim = self._compute_user_similarity(user_features, other_features) | |
| similarities.append(sim) | |
| except: | |
| continue | |
| return float(np.mean(similarities)) if similarities else 0.3 | |
| def _content_based_score(self, user_features: Dict, item: Dict) -> float: | |
| """Content-based score using feature similarity""" | |
| # Match category preferences | |
| category_score = 0.0 | |
| item_category = item.get('category', 'other') | |
| user_categories = user_features.get('category_preferences', {}) | |
| if item_category in user_categories: | |
| total_views = sum(user_categories.values()) | |
| category_score = user_categories[item_category] / max(total_views, 1) | |
| # Match priority/difficulty with user's completion rate | |
| completion_rate = user_features.get('monetary_score', 0.5) | |
| item_priority = item.get('priority', 'MEDIUM') | |
| priority_score = 0.5 | |
| if item_priority == 'HIGH' and completion_rate > 0.7: | |
| priority_score = 0.8 | |
| elif item_priority == 'LOW' and completion_rate < 0.3: | |
| priority_score = 0.7 | |
| return 0.6 * category_score + 0.4 * priority_score | |
| def _personality_score(self, user_features: Dict, item: Dict) -> float: | |
| """Score based on personality-task fit""" | |
| # High conscientiousness → prefer structured, high-priority tasks | |
| conscientiousness = user_features.get('conscientiousness', 0.5) | |
| # High openness → prefer creative, diverse tasks | |
| openness = user_features.get('openness', 0.5) | |
| # High neuroticism → prefer low-stress, clear tasks | |
| neuroticism = user_features.get('neuroticism', 0.5) | |
| score = 0.5 | |
| if item.get('priority') == 'HIGH': | |
| score += conscientiousness * 0.3 | |
| if item.get('category') in ['creative', 'learning']: | |
| score += openness * 0.2 | |
| if item.get('priority') == 'LOW': | |
| score += neuroticism * 0.2 | |
| return min(score, 1.0) | |
| def _compute_user_similarity(self, user1: Dict, user2: Dict) -> float: | |
| """Compute similarity between two users""" | |
| # Use personality traits and behavior patterns | |
| traits1 = np.array([ | |
| user1.get('openness', 0.5), | |
| user1.get('conscientiousness', 0.5), | |
| user1.get('extraversion', 0.5), | |
| user1.get('agreeableness', 0.5), | |
| user1.get('neuroticism', 0.5), | |
| user1.get('recency_score', 0.0), | |
| user1.get('frequency_score', 0.0), | |
| user1.get('monetary_score', 0.0) | |
| ]).reshape(1, -1) | |
| traits2 = np.array([ | |
| user2.get('openness', 0.5), | |
| user2.get('conscientiousness', 0.5), | |
| user2.get('extraversion', 0.5), | |
| user2.get('agreeableness', 0.5), | |
| user2.get('neuroticism', 0.5), | |
| user2.get('recency_score', 0.0), | |
| user2.get('frequency_score', 0.0), | |
| user2.get('monetary_score', 0.0) | |
| ]).reshape(1, -1) | |
| similarity = cosine_similarity(traits1, traits2)[0][0] | |
| return float(similarity) | |
| def _generate_reason(self, user_features: Dict, item: Dict) -> str: | |
| """Generate human-readable reason for recommendation""" | |
| category = item.get('category', 'general') | |
| user_categories = user_features.get('category_preferences', {}) | |
| if category in user_categories and user_categories[category] > 5: | |
| return f"Based on your interest in {category} tasks" | |
| conscientiousness = user_features.get('conscientiousness', 0.5) | |
| if conscientiousness > 0.7: | |
| return "Matches your organized work style" | |
| return "Popular among similar users" | |