Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| import os | |
| import json | |
| try: | |
| import psycopg2 | |
| _PSYCOPG2_AVAILABLE = True | |
| except ImportError: | |
| psycopg2 = None # type: ignore | |
| _PSYCOPG2_AVAILABLE = False | |
| class FeatureEngineer: | |
| def __init__(self): | |
| database_url = os.getenv('DATABASE_URL', 'postgresql://postgres:postgres@localhost:5432/cognexa') | |
| if _PSYCOPG2_AVAILABLE: | |
| try: | |
| self.conn = psycopg2.connect(database_url) | |
| except Exception: | |
| self.conn = None | |
| else: | |
| self.conn = None | |
| def compute_user_features(self, user_id: str) -> Dict: | |
| """Compute features for a single user""" | |
| # Get user events from last 90 days | |
| query = """ | |
| SELECT event_type, item_id, item_type, metadata, timestamp | |
| FROM user_events | |
| WHERE user_id = %s AND timestamp > NOW() - INTERVAL '90 days' | |
| ORDER BY timestamp DESC | |
| """ | |
| df = pd.read_sql(query, self.conn, params=(user_id,)) | |
| if df.empty: | |
| return self._default_features(user_id) | |
| features = { | |
| 'user_id': user_id, | |
| 'total_views': len(df[df['event_type'] == 'view']), | |
| 'total_clicks': len(df[df['event_type'] == 'click']), | |
| 'total_completions': len(df[df['event_type'] == 'complete']), | |
| 'avg_time_on_site': self._compute_avg_time(df), | |
| 'recency_score': self._compute_recency(df), | |
| 'frequency_score': self._compute_frequency(df), | |
| 'monetary_score': self._compute_completion_rate(df), | |
| 'category_preferences': self._compute_category_preferences(df), | |
| 'time_of_day_preferences': self._compute_time_preferences(df), | |
| 'last_activity_at': df['timestamp'].max().isoformat() if not df.empty else None | |
| } | |
| # Get personality traits | |
| personality = self._get_personality_traits(user_id) | |
| features.update(personality) | |
| return features | |
| def _compute_avg_time(self, df: pd.DataFrame) -> float: | |
| """Compute average time spent on site""" | |
| time_events = df[df['event_type'] == 'time_spent'] | |
| if time_events.empty: | |
| return 0.0 | |
| durations = [] | |
| for metadata in time_events['metadata']: | |
| if isinstance(metadata, dict): | |
| durations.append(metadata.get('duration_seconds', 0)) | |
| return float(np.mean(durations)) if durations else 0.0 | |
| def _compute_recency(self, df: pd.DataFrame) -> float: | |
| """Days since last activity (normalized)""" | |
| if df.empty: | |
| return 0.0 | |
| last_activity = pd.to_datetime(df['timestamp'].max()) | |
| days_ago = (datetime.now() - last_activity).days | |
| return 1.0 / (1.0 + days_ago) # Higher score = more recent | |
| def _compute_frequency(self, df: pd.DataFrame) -> float: | |
| """Events per day""" | |
| if df.empty: | |
| return 0.0 | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| days_active = (df['timestamp'].max() - df['timestamp'].min()).days + 1 | |
| return len(df) / max(days_active, 1) | |
| def _compute_completion_rate(self, df: pd.DataFrame) -> float: | |
| """Task completion rate""" | |
| task_events = df[df['item_type'] == 'task'] | |
| if task_events.empty: | |
| return 0.0 | |
| completions = len(task_events[task_events['event_type'] == 'complete']) | |
| views = len(task_events[task_events['event_type'] == 'view']) | |
| return completions / max(views, 1) | |
| def _compute_category_preferences(self, df: pd.DataFrame) -> Dict: | |
| """Count views by category""" | |
| categories = [] | |
| for metadata in df['metadata']: | |
| if isinstance(metadata, dict): | |
| categories.append(metadata.get('category', 'other')) | |
| else: | |
| categories.append('other') | |
| if not categories: | |
| return {} | |
| category_series = pd.Series(categories) | |
| return category_series.value_counts().to_dict() | |
| def _compute_time_preferences(self, df: pd.DataFrame) -> Dict: | |
| """Preferred time of day for activity""" | |
| if df.empty: | |
| return {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0} | |
| df['timestamp'] = pd.to_datetime(df['timestamp']) | |
| hours = df['timestamp'].dt.hour | |
| return { | |
| 'morning': len(hours[(hours >= 6) & (hours < 12)]), | |
| 'afternoon': len(hours[(hours >= 12) & (hours < 18)]), | |
| 'evening': len(hours[(hours >= 18) & (hours < 24)]), | |
| 'night': len(hours[(hours >= 0) & (hours < 6)]) | |
| } | |
| def _get_personality_traits(self, user_id: str) -> Dict: | |
| """Get OCEAN personality traits""" | |
| query = """ | |
| SELECT openness, conscientiousness, extraversion, agreeableness, neuroticism | |
| FROM personality_profiles | |
| WHERE user_id = %s | |
| """ | |
| cursor = self.conn.cursor() | |
| cursor.execute(query, (user_id,)) | |
| result = cursor.fetchone() | |
| if result: | |
| return { | |
| 'openness': float(result[0]) if result[0] else 0.5, | |
| 'conscientiousness': float(result[1]) if result[1] else 0.5, | |
| 'extraversion': float(result[2]) if result[2] else 0.5, | |
| 'agreeableness': float(result[3]) if result[3] else 0.5, | |
| 'neuroticism': float(result[4]) if result[4] else 0.5 | |
| } | |
| return { | |
| 'openness': 0.5, | |
| 'conscientiousness': 0.5, | |
| 'extraversion': 0.5, | |
| 'agreeableness': 0.5, | |
| 'neuroticism': 0.5 | |
| } | |
| def _default_features(self, user_id: str) -> Dict: | |
| """Default features for new users""" | |
| personality = self._get_personality_traits(user_id) | |
| return { | |
| 'user_id': user_id, | |
| 'total_views': 0, | |
| 'total_clicks': 0, | |
| 'total_completions': 0, | |
| 'avg_time_on_site': 0.0, | |
| 'recency_score': 0.0, | |
| 'frequency_score': 0.0, | |
| 'monetary_score': 0.0, | |
| 'category_preferences': {}, | |
| 'time_of_day_preferences': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0}, | |
| 'last_activity_at': None, | |
| **personality | |
| } | |
| def batch_compute_user_features(self) -> None: | |
| """Compute features for all active users (run daily)""" | |
| query = """ | |
| SELECT DISTINCT user_id | |
| FROM user_events | |
| WHERE timestamp > NOW() - INTERVAL '90 days' | |
| """ | |
| cursor = self.conn.cursor() | |
| cursor.execute(query) | |
| user_ids = [row[0] for row in cursor.fetchall()] | |
| print(f"Computing features for {len(user_ids)} users...") | |
| for user_id in user_ids: | |
| try: | |
| features = self.compute_user_features(user_id) | |
| self._save_user_features(user_id, features) | |
| except Exception as e: | |
| print(f"Error computing features for user {user_id}: {e}") | |
| print("Feature computation completed") | |
| def _save_user_features(self, user_id: str, features: Dict) -> None: | |
| """Save computed features to database""" | |
| query = """ | |
| INSERT INTO user_features ( | |
| user_id, total_views, total_clicks, total_completions, | |
| avg_time_on_site, recency_score, frequency_score, monetary_score, | |
| most_viewed_category, last_activity_at, feature_vector, computed_at | |
| ) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) | |
| ON CONFLICT (user_id) DO UPDATE SET | |
| total_views = EXCLUDED.total_views, | |
| total_clicks = EXCLUDED.total_clicks, | |
| total_completions = EXCLUDED.total_completions, | |
| avg_time_on_site = EXCLUDED.avg_time_on_site, | |
| recency_score = EXCLUDED.recency_score, | |
| frequency_score = EXCLUDED.frequency_score, | |
| monetary_score = EXCLUDED.monetary_score, | |
| most_viewed_category = EXCLUDED.most_viewed_category, | |
| last_activity_at = EXCLUDED.last_activity_at, | |
| feature_vector = EXCLUDED.feature_vector, | |
| computed_at = NOW() | |
| """ | |
| category_prefs = features.get('category_preferences', {}) | |
| most_viewed = max(category_prefs.items(), key=lambda x: x[1])[0] if category_prefs else 'other' | |
| cursor = self.conn.cursor() | |
| cursor.execute(query, ( | |
| user_id, | |
| features['total_views'], | |
| features['total_clicks'], | |
| features['total_completions'], | |
| features['avg_time_on_site'], | |
| features['recency_score'], | |
| features['frequency_score'], | |
| features['monetary_score'], | |
| most_viewed, | |
| features.get('last_activity_at'), | |
| json.dumps(features) | |
| )) | |
| self.conn.commit() | |