SPG_ML / personalization /feature_engineering.py
meetmendapara's picture
Initial commit for ML space
df31aa1
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import os
import json
try:
import psycopg2
_PSYCOPG2_AVAILABLE = True
except ImportError:
psycopg2 = None # type: ignore
_PSYCOPG2_AVAILABLE = False
class FeatureEngineer:
def __init__(self):
database_url = os.getenv('DATABASE_URL', 'postgresql://postgres:postgres@localhost:5432/cognexa')
if _PSYCOPG2_AVAILABLE:
try:
self.conn = psycopg2.connect(database_url)
except Exception:
self.conn = None
else:
self.conn = None
def compute_user_features(self, user_id: str) -> Dict:
"""Compute features for a single user"""
# Get user events from last 90 days
query = """
SELECT event_type, item_id, item_type, metadata, timestamp
FROM user_events
WHERE user_id = %s AND timestamp > NOW() - INTERVAL '90 days'
ORDER BY timestamp DESC
"""
df = pd.read_sql(query, self.conn, params=(user_id,))
if df.empty:
return self._default_features(user_id)
features = {
'user_id': user_id,
'total_views': len(df[df['event_type'] == 'view']),
'total_clicks': len(df[df['event_type'] == 'click']),
'total_completions': len(df[df['event_type'] == 'complete']),
'avg_time_on_site': self._compute_avg_time(df),
'recency_score': self._compute_recency(df),
'frequency_score': self._compute_frequency(df),
'monetary_score': self._compute_completion_rate(df),
'category_preferences': self._compute_category_preferences(df),
'time_of_day_preferences': self._compute_time_preferences(df),
'last_activity_at': df['timestamp'].max().isoformat() if not df.empty else None
}
# Get personality traits
personality = self._get_personality_traits(user_id)
features.update(personality)
return features
def _compute_avg_time(self, df: pd.DataFrame) -> float:
"""Compute average time spent on site"""
time_events = df[df['event_type'] == 'time_spent']
if time_events.empty:
return 0.0
durations = []
for metadata in time_events['metadata']:
if isinstance(metadata, dict):
durations.append(metadata.get('duration_seconds', 0))
return float(np.mean(durations)) if durations else 0.0
def _compute_recency(self, df: pd.DataFrame) -> float:
"""Days since last activity (normalized)"""
if df.empty:
return 0.0
last_activity = pd.to_datetime(df['timestamp'].max())
days_ago = (datetime.now() - last_activity).days
return 1.0 / (1.0 + days_ago) # Higher score = more recent
def _compute_frequency(self, df: pd.DataFrame) -> float:
"""Events per day"""
if df.empty:
return 0.0
df['timestamp'] = pd.to_datetime(df['timestamp'])
days_active = (df['timestamp'].max() - df['timestamp'].min()).days + 1
return len(df) / max(days_active, 1)
def _compute_completion_rate(self, df: pd.DataFrame) -> float:
"""Task completion rate"""
task_events = df[df['item_type'] == 'task']
if task_events.empty:
return 0.0
completions = len(task_events[task_events['event_type'] == 'complete'])
views = len(task_events[task_events['event_type'] == 'view'])
return completions / max(views, 1)
def _compute_category_preferences(self, df: pd.DataFrame) -> Dict:
"""Count views by category"""
categories = []
for metadata in df['metadata']:
if isinstance(metadata, dict):
categories.append(metadata.get('category', 'other'))
else:
categories.append('other')
if not categories:
return {}
category_series = pd.Series(categories)
return category_series.value_counts().to_dict()
def _compute_time_preferences(self, df: pd.DataFrame) -> Dict:
"""Preferred time of day for activity"""
if df.empty:
return {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0}
df['timestamp'] = pd.to_datetime(df['timestamp'])
hours = df['timestamp'].dt.hour
return {
'morning': len(hours[(hours >= 6) & (hours < 12)]),
'afternoon': len(hours[(hours >= 12) & (hours < 18)]),
'evening': len(hours[(hours >= 18) & (hours < 24)]),
'night': len(hours[(hours >= 0) & (hours < 6)])
}
def _get_personality_traits(self, user_id: str) -> Dict:
"""Get OCEAN personality traits"""
query = """
SELECT openness, conscientiousness, extraversion, agreeableness, neuroticism
FROM personality_profiles
WHERE user_id = %s
"""
cursor = self.conn.cursor()
cursor.execute(query, (user_id,))
result = cursor.fetchone()
if result:
return {
'openness': float(result[0]) if result[0] else 0.5,
'conscientiousness': float(result[1]) if result[1] else 0.5,
'extraversion': float(result[2]) if result[2] else 0.5,
'agreeableness': float(result[3]) if result[3] else 0.5,
'neuroticism': float(result[4]) if result[4] else 0.5
}
return {
'openness': 0.5,
'conscientiousness': 0.5,
'extraversion': 0.5,
'agreeableness': 0.5,
'neuroticism': 0.5
}
def _default_features(self, user_id: str) -> Dict:
"""Default features for new users"""
personality = self._get_personality_traits(user_id)
return {
'user_id': user_id,
'total_views': 0,
'total_clicks': 0,
'total_completions': 0,
'avg_time_on_site': 0.0,
'recency_score': 0.0,
'frequency_score': 0.0,
'monetary_score': 0.0,
'category_preferences': {},
'time_of_day_preferences': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0},
'last_activity_at': None,
**personality
}
def batch_compute_user_features(self) -> None:
"""Compute features for all active users (run daily)"""
query = """
SELECT DISTINCT user_id
FROM user_events
WHERE timestamp > NOW() - INTERVAL '90 days'
"""
cursor = self.conn.cursor()
cursor.execute(query)
user_ids = [row[0] for row in cursor.fetchall()]
print(f"Computing features for {len(user_ids)} users...")
for user_id in user_ids:
try:
features = self.compute_user_features(user_id)
self._save_user_features(user_id, features)
except Exception as e:
print(f"Error computing features for user {user_id}: {e}")
print("Feature computation completed")
def _save_user_features(self, user_id: str, features: Dict) -> None:
"""Save computed features to database"""
query = """
INSERT INTO user_features (
user_id, total_views, total_clicks, total_completions,
avg_time_on_site, recency_score, frequency_score, monetary_score,
most_viewed_category, last_activity_at, feature_vector, computed_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (user_id) DO UPDATE SET
total_views = EXCLUDED.total_views,
total_clicks = EXCLUDED.total_clicks,
total_completions = EXCLUDED.total_completions,
avg_time_on_site = EXCLUDED.avg_time_on_site,
recency_score = EXCLUDED.recency_score,
frequency_score = EXCLUDED.frequency_score,
monetary_score = EXCLUDED.monetary_score,
most_viewed_category = EXCLUDED.most_viewed_category,
last_activity_at = EXCLUDED.last_activity_at,
feature_vector = EXCLUDED.feature_vector,
computed_at = NOW()
"""
category_prefs = features.get('category_preferences', {})
most_viewed = max(category_prefs.items(), key=lambda x: x[1])[0] if category_prefs else 'other'
cursor = self.conn.cursor()
cursor.execute(query, (
user_id,
features['total_views'],
features['total_clicks'],
features['total_completions'],
features['avg_time_on_site'],
features['recency_score'],
features['frequency_score'],
features['monetary_score'],
most_viewed,
features.get('last_activity_at'),
json.dumps(features)
))
self.conn.commit()