SPG_ML / features.py
meetmendapara's picture
Initial commit for ML space
df31aa1
"""
Feature Engineering Module for Cognexa ML Service
This module handles:
- Feature extraction from tasks and personality data
- Feature normalization and scaling
- Derived feature computation
"""
from typing import Dict, List
from datetime import datetime
import numpy as np
class FeatureExtractor:
"""Extracts and engineers features from raw task and personality data"""
def __init__(self):
self.category_features = {
"ACADEMIC": {"cognitive_load": 0.85, "time_sensitivity": 0.8, "social_component": 0.3},
"WORK": {"cognitive_load": 0.7, "time_sensitivity": 0.9, "social_component": 0.6},
"PERSONAL": {"cognitive_load": 0.4, "time_sensitivity": 0.5, "social_component": 0.5},
"FITNESS": {"cognitive_load": 0.3, "time_sensitivity": 0.6, "social_component": 0.4},
"SOCIAL": {"cognitive_load": 0.3, "time_sensitivity": 0.4, "social_component": 0.9},
"HEALTH": {"cognitive_load": 0.5, "time_sensitivity": 0.7, "social_component": 0.2},
"CREATIVE": {"cognitive_load": 0.7, "time_sensitivity": 0.3, "social_component": 0.4},
"LEARNING": {"cognitive_load": 0.8, "time_sensitivity": 0.5, "social_component": 0.3},
}
self.priority_features = {
"LOW": {"urgency_score": 0.2, "attention_demand": 0.3, "stress_contribution": 0.2},
"MEDIUM": {"urgency_score": 0.5, "attention_demand": 0.5, "stress_contribution": 0.5},
"HIGH": {"urgency_score": 0.8, "attention_demand": 0.8, "stress_contribution": 0.7},
"URGENT": {"urgency_score": 1.0, "attention_demand": 1.0, "stress_contribution": 0.9},
}
def extract_task_features(self, task_data: Dict) -> Dict[str, float]:
"""Extract all features from task data"""
features = {}
# Category-based features
category = task_data.get("category", "PERSONAL").upper()
cat_features = self.category_features.get(category, self.category_features["PERSONAL"])
features.update({f"cat_{k}": v for k, v in cat_features.items()})
# Priority-based features
priority = task_data.get("priority", "MEDIUM").upper()
pri_features = self.priority_features.get(priority, self.priority_features["MEDIUM"])
features.update({f"pri_{k}": v for k, v in pri_features.items()})
# Time-based features
features.update(self._extract_time_features(task_data))
# Complexity features
features.update(self._extract_complexity_features(task_data))
# Personality-task interaction features
if task_data.get("personality"):
features.update(self._extract_personality_interaction_features(task_data))
return features
def _extract_time_features(self, task_data: Dict) -> Dict[str, float]:
"""Extract time-related features"""
features = {}
due_date = task_data.get("due_date")
if due_date:
try:
if isinstance(due_date, str):
due = datetime.fromisoformat(due_date.replace('Z', '+00:00'))
else:
due = due_date
days_until = (due.replace(tzinfo=None) - datetime.now()).days
features["days_until_due"] = max(0, days_until)
features["time_pressure"] = 1.0 / max(1, days_until) if days_until >= 0 else 1.0
features["is_overdue"] = 1.0 if days_until < 0 else 0.0
# Weekend deadline check
features["due_weekend"] = 1.0 if due.weekday() >= 5 else 0.0
except (ValueError, AttributeError):
features.update(self._default_time_features())
else:
features.update(self._default_time_features())
# Estimated duration features
duration = task_data.get("estimated_duration") or 60
features["duration_minutes"] = duration
features["duration_normalized"] = min(1.0, duration / 480) # Normalize to 8-hour day
features["is_short_task"] = 1.0 if duration <= 30 else 0.0
features["is_long_task"] = 1.0 if duration >= 180 else 0.0
return features
def _default_time_features(self) -> Dict[str, float]:
"""Default time features when due date is not available"""
return {
"days_until_due": 7.0,
"time_pressure": 0.14,
"is_overdue": 0.0,
"due_weekend": 0.0
}
def _extract_complexity_features(self, task_data: Dict) -> Dict[str, float]:
"""Extract complexity-related features"""
complexity = task_data.get("complexity") or 3
description = task_data.get("description") or ""
description_complexity = self._calculate_description_complexity(description)
estimated_complexity = min(
1.0,
(complexity / 5.0) * 0.7 + description_complexity * 0.3
)
return {
"complexity_raw": complexity,
"complexity_normalized": complexity / 5.0,
"is_simple": 1.0 if complexity <= 2 else 0.0,
"is_complex": 1.0 if complexity >= 4 else 0.0,
"description_complexity": description_complexity,
"estimated_complexity": estimated_complexity,
}
def _calculate_description_complexity(self, description: str) -> float:
"""Estimate complexity from task description length and structure"""
if not description:
return 0.5
words = description.split()
word_count = len(words)
sentence_count = max(1, description.count(".") + description.count("!") + description.count("?"))
word_score = min(1.0, word_count / 120)
sentence_score = min(1.0, sentence_count / 8)
return min(1.0, word_score * 0.7 + sentence_score * 0.3)
def _extract_personality_interaction_features(self, task_data: Dict) -> Dict[str, float]:
"""Extract interaction features between personality and task"""
personality = task_data.get("personality") or {}
category = task_data.get("category", "PERSONAL").upper()
features = {}
# Conscientiousness x Task structure interaction
conscientiousness = (personality.get("conscientiousness") or 50) / 100
features["conscient_task_fit"] = conscientiousness if category in ["ACADEMIC", "WORK"] else conscientiousness * 0.8
# Extraversion x Social task interaction
extraversion = (personality.get("extraversion") or 50) / 100
cat_social = self.category_features.get(category, {}).get("social_component", 0.5)
features["extravert_social_fit"] = 1 - abs(extraversion - cat_social)
# Neuroticism x Stress interaction
neuroticism = (personality.get("neuroticism") or 50) / 100
priority_stress = self.priority_features.get(
task_data.get("priority", "MEDIUM").upper(), {}
).get("stress_contribution", 0.5)
features["neurot_stress_vulnerability"] = neuroticism * priority_stress
# Openness x Creative task interaction
openness = (personality.get("openness") or 50) / 100
features["open_creative_fit"] = openness if category in ["CREATIVE", "LEARNING"] else openness * 0.6
return features
def extract_personality_features(self, personality_data: Dict) -> Dict[str, float]:
"""Extract features from personality data"""
features = {}
# Raw traits normalized
for trait in ["openness", "conscientiousness", "extraversion", "agreeableness", "neuroticism"]:
raw_value = personality_data.get(trait) or 50
features[f"trait_{trait}"] = raw_value / 100
features[f"trait_{trait}_high"] = 1.0 if raw_value >= 70 else 0.0
features[f"trait_{trait}_low"] = 1.0 if raw_value <= 30 else 0.0
# Derived composite features
features["emotional_stability"] = 1 - ((personality_data.get("neuroticism") or 50) / 100)
features["productivity_potential"] = (
(personality_data.get("conscientiousness") or 50) +
(100 - (personality_data.get("neuroticism") or 50))
) / 200
features["social_energy"] = (
(personality_data.get("extraversion") or 50) +
(personality_data.get("agreeableness") or 50)
) / 200
features["innovation_index"] = (personality_data.get("openness") or 50) / 100
return features
def normalize_features(self, features: Dict[str, float]) -> Dict[str, float]:
"""Normalize features to consistent range"""
normalized = {}
for key, value in features.items():
if isinstance(value, (int, float)):
# Ensure not NaN or Inf
if np.isfinite(value):
normalized[key] = float(np.clip(value, 0, 1))
else:
normalized[key] = 0.5 # Default for invalid numbers
elif isinstance(value, bool):
# Convert bool to float
normalized[key] = float(value)
elif value is not None:
# Skip non-numeric values that aren't None
continue
else:
# None becomes 0.5 (neutral default)
normalized[key] = 0.5
return normalized
class HistoricalFeatureAggregator:
"""Aggregates historical data into features for prediction"""
def __init__(self):
self.lookback_days = 30
def aggregate_task_history(self, historical_tasks: List[Dict]) -> Dict[str, float]:
"""Aggregate features from historical task data"""
if not historical_tasks:
return self._default_historical_features()
features = {}
# Completion metrics
completed = [t for t in historical_tasks if t.get("status") == "COMPLETED"]
features["completion_rate"] = len(completed) / len(historical_tasks)
# On-time completion
on_time = [t for t in completed if self._was_on_time(t)]
features["on_time_rate"] = len(on_time) / len(completed) if completed else 0.5
# Category performance
category_stats = self._calculate_category_stats(historical_tasks)
features.update(category_stats)
# Productivity patterns
productivity_patterns = self._calculate_productivity_patterns(historical_tasks)
features.update(productivity_patterns)
# Overdue patterns
overdue_count = sum(1 for t in historical_tasks if t.get("is_overdue", False))
features["overdue_tendency"] = overdue_count / len(historical_tasks)
return features
def _was_on_time(self, task: Dict) -> bool:
"""Check if task was completed on time"""
due_date = task.get("due_date")
completed_date = task.get("completed_at")
if not due_date or not completed_date:
return True # Assume on-time if dates unknown
try:
due = datetime.fromisoformat(str(due_date).replace('Z', '+00:00'))
completed = datetime.fromisoformat(str(completed_date).replace('Z', '+00:00'))
return completed <= due
except (ValueError, TypeError, AttributeError):
return True
def _calculate_category_stats(self, tasks: List[Dict]) -> Dict[str, float]:
"""Calculate performance statistics per category"""
category_totals = {}
category_completed = {}
for task in tasks:
cat = task.get("category", "PERSONAL").upper()
category_totals[cat] = category_totals.get(cat, 0) + 1
if task.get("status") == "COMPLETED":
category_completed[cat] = category_completed.get(cat, 0) + 1
features = {}
for cat, total in category_totals.items():
completed = category_completed.get(cat, 0)
features[f"cat_{cat.lower()}_completion_rate"] = completed / total if total > 0 else 0.5
return features
def _calculate_productivity_patterns(self, tasks: List[Dict]) -> Dict[str, float]:
"""Calculate productivity patterns from historical data"""
day_counts = {i: 0 for i in range(7)} # Monday = 0
day_completed = {i: 0 for i in range(7)}
for task in tasks:
created = task.get("created_at")
if created:
try:
date = datetime.fromisoformat(str(created).replace('Z', '+00:00'))
day = date.weekday()
day_counts[day] += 1
if task.get("status") == "COMPLETED":
day_completed[day] += 1
except (ValueError, TypeError, AttributeError):
continue
features = {}
day_names = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for i, name in enumerate(day_names):
if day_counts[i] > 0:
features[f"prod_{name}"] = day_completed[i] / day_counts[i]
else:
features[f"prod_{name}"] = 0.5
# Best and worst days
best_day = max(features.items(), key=lambda x: x[1], default=("prod_wednesday", 0.5))
worst_day = min(features.items(), key=lambda x: x[1], default=("prod_monday", 0.5))
features["best_day_productivity"] = best_day[1]
features["worst_day_productivity"] = worst_day[1]
return features
def _default_historical_features(self) -> Dict[str, float]:
"""Return default features when no history available"""
return {
"completion_rate": 0.7,
"on_time_rate": 0.75,
"overdue_tendency": 0.1,
"best_day_productivity": 0.8,
"worst_day_productivity": 0.6
}
class FeatureScaler:
"""Handles feature scaling and normalization"""
def __init__(self):
self.feature_ranges = {
"days_until_due": (0, 30),
"duration_minutes": (0, 480),
"complexity_raw": (1, 5),
}
self.means = {}
self.stds = {}
def fit(self, feature_sets: List[Dict[str, float]]):
"""Fit scaler on historical feature sets"""
if not feature_sets:
return
# Calculate means and standard deviations
all_features = {}
for fs in feature_sets:
for key, value in fs.items():
if isinstance(value, (int, float)):
if key not in all_features:
all_features[key] = []
all_features[key].append(value)
for key, values in all_features.items():
self.means[key] = np.mean(values)
self.stds[key] = np.std(values) if len(values) > 1 else 1.0
def transform(self, features: Dict[str, float]) -> Dict[str, float]:
"""Transform features using fitted parameters"""
transformed = {}
for key, value in features.items():
if isinstance(value, (int, float)):
# Ensure not NaN or Inf
if not np.isfinite(value):
transformed[key] = 0.5
# Use min-max scaling for known ranges
elif key in self.feature_ranges:
min_val, max_val = self.feature_ranges[key]
transformed[key] = (value - min_val) / (max_val - min_val)
# Use z-score normalization for others
elif key in self.means:
std = self.stds.get(key, 1.0)
if std > 0:
transformed[key] = (value - self.means[key]) / std
else:
transformed[key] = 0.0
else:
transformed[key] = value
elif isinstance(value, bool):
transformed[key] = float(value)
else:
# Skip or default non-numeric values
pass
return transformed
def fit_transform(self, feature_sets: List[Dict[str, float]]) -> List[Dict[str, float]]:
"""Fit and transform in one step"""
self.fit(feature_sets)
return [self.transform(fs) for fs in feature_sets]