import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional import os import json try: import psycopg2 _PSYCOPG2_AVAILABLE = True except ImportError: psycopg2 = None # type: ignore _PSYCOPG2_AVAILABLE = False class FeatureEngineer: def __init__(self): database_url = os.getenv('DATABASE_URL', 'postgresql://postgres:postgres@localhost:5432/cognexa') if _PSYCOPG2_AVAILABLE: try: self.conn = psycopg2.connect(database_url) except Exception: self.conn = None else: self.conn = None def compute_user_features(self, user_id: str) -> Dict: """Compute features for a single user""" # Get user events from last 90 days query = """ SELECT event_type, item_id, item_type, metadata, timestamp FROM user_events WHERE user_id = %s AND timestamp > NOW() - INTERVAL '90 days' ORDER BY timestamp DESC """ df = pd.read_sql(query, self.conn, params=(user_id,)) if df.empty: return self._default_features(user_id) features = { 'user_id': user_id, 'total_views': len(df[df['event_type'] == 'view']), 'total_clicks': len(df[df['event_type'] == 'click']), 'total_completions': len(df[df['event_type'] == 'complete']), 'avg_time_on_site': self._compute_avg_time(df), 'recency_score': self._compute_recency(df), 'frequency_score': self._compute_frequency(df), 'monetary_score': self._compute_completion_rate(df), 'category_preferences': self._compute_category_preferences(df), 'time_of_day_preferences': self._compute_time_preferences(df), 'last_activity_at': df['timestamp'].max().isoformat() if not df.empty else None } # Get personality traits personality = self._get_personality_traits(user_id) features.update(personality) return features def _compute_avg_time(self, df: pd.DataFrame) -> float: """Compute average time spent on site""" time_events = df[df['event_type'] == 'time_spent'] if time_events.empty: return 0.0 durations = [] for metadata in time_events['metadata']: if isinstance(metadata, dict): durations.append(metadata.get('duration_seconds', 0)) return float(np.mean(durations)) if durations else 0.0 def _compute_recency(self, df: pd.DataFrame) -> float: """Days since last activity (normalized)""" if df.empty: return 0.0 last_activity = pd.to_datetime(df['timestamp'].max()) days_ago = (datetime.now() - last_activity).days return 1.0 / (1.0 + days_ago) # Higher score = more recent def _compute_frequency(self, df: pd.DataFrame) -> float: """Events per day""" if df.empty: return 0.0 df['timestamp'] = pd.to_datetime(df['timestamp']) days_active = (df['timestamp'].max() - df['timestamp'].min()).days + 1 return len(df) / max(days_active, 1) def _compute_completion_rate(self, df: pd.DataFrame) -> float: """Task completion rate""" task_events = df[df['item_type'] == 'task'] if task_events.empty: return 0.0 completions = len(task_events[task_events['event_type'] == 'complete']) views = len(task_events[task_events['event_type'] == 'view']) return completions / max(views, 1) def _compute_category_preferences(self, df: pd.DataFrame) -> Dict: """Count views by category""" categories = [] for metadata in df['metadata']: if isinstance(metadata, dict): categories.append(metadata.get('category', 'other')) else: categories.append('other') if not categories: return {} category_series = pd.Series(categories) return category_series.value_counts().to_dict() def _compute_time_preferences(self, df: pd.DataFrame) -> Dict: """Preferred time of day for activity""" if df.empty: return {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0} df['timestamp'] = pd.to_datetime(df['timestamp']) hours = df['timestamp'].dt.hour return { 'morning': len(hours[(hours >= 6) & (hours < 12)]), 'afternoon': len(hours[(hours >= 12) & (hours < 18)]), 'evening': len(hours[(hours >= 18) & (hours < 24)]), 'night': len(hours[(hours >= 0) & (hours < 6)]) } def _get_personality_traits(self, user_id: str) -> Dict: """Get OCEAN personality traits""" query = """ SELECT openness, conscientiousness, extraversion, agreeableness, neuroticism FROM personality_profiles WHERE user_id = %s """ cursor = self.conn.cursor() cursor.execute(query, (user_id,)) result = cursor.fetchone() if result: return { 'openness': float(result[0]) if result[0] else 0.5, 'conscientiousness': float(result[1]) if result[1] else 0.5, 'extraversion': float(result[2]) if result[2] else 0.5, 'agreeableness': float(result[3]) if result[3] else 0.5, 'neuroticism': float(result[4]) if result[4] else 0.5 } return { 'openness': 0.5, 'conscientiousness': 0.5, 'extraversion': 0.5, 'agreeableness': 0.5, 'neuroticism': 0.5 } def _default_features(self, user_id: str) -> Dict: """Default features for new users""" personality = self._get_personality_traits(user_id) return { 'user_id': user_id, 'total_views': 0, 'total_clicks': 0, 'total_completions': 0, 'avg_time_on_site': 0.0, 'recency_score': 0.0, 'frequency_score': 0.0, 'monetary_score': 0.0, 'category_preferences': {}, 'time_of_day_preferences': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0}, 'last_activity_at': None, **personality } def batch_compute_user_features(self) -> None: """Compute features for all active users (run daily)""" query = """ SELECT DISTINCT user_id FROM user_events WHERE timestamp > NOW() - INTERVAL '90 days' """ cursor = self.conn.cursor() cursor.execute(query) user_ids = [row[0] for row in cursor.fetchall()] print(f"Computing features for {len(user_ids)} users...") for user_id in user_ids: try: features = self.compute_user_features(user_id) self._save_user_features(user_id, features) except Exception as e: print(f"Error computing features for user {user_id}: {e}") print("Feature computation completed") def _save_user_features(self, user_id: str, features: Dict) -> None: """Save computed features to database""" query = """ INSERT INTO user_features ( user_id, total_views, total_clicks, total_completions, avg_time_on_site, recency_score, frequency_score, monetary_score, most_viewed_category, last_activity_at, feature_vector, computed_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (user_id) DO UPDATE SET total_views = EXCLUDED.total_views, total_clicks = EXCLUDED.total_clicks, total_completions = EXCLUDED.total_completions, avg_time_on_site = EXCLUDED.avg_time_on_site, recency_score = EXCLUDED.recency_score, frequency_score = EXCLUDED.frequency_score, monetary_score = EXCLUDED.monetary_score, most_viewed_category = EXCLUDED.most_viewed_category, last_activity_at = EXCLUDED.last_activity_at, feature_vector = EXCLUDED.feature_vector, computed_at = NOW() """ category_prefs = features.get('category_preferences', {}) most_viewed = max(category_prefs.items(), key=lambda x: x[1])[0] if category_prefs else 'other' cursor = self.conn.cursor() cursor.execute(query, ( user_id, features['total_views'], features['total_clicks'], features['total_completions'], features['avg_time_on_site'], features['recency_score'], features['frequency_score'], features['monetary_score'], most_viewed, features.get('last_activity_at'), json.dumps(features) )) self.conn.commit()