""" Feature Engineering Module for Cognexa ML Service This module handles: - Feature extraction from tasks and personality data - Feature normalization and scaling - Derived feature computation """ from typing import Dict, List from datetime import datetime import numpy as np class FeatureExtractor: """Extracts and engineers features from raw task and personality data""" def __init__(self): self.category_features = { "ACADEMIC": {"cognitive_load": 0.85, "time_sensitivity": 0.8, "social_component": 0.3}, "WORK": {"cognitive_load": 0.7, "time_sensitivity": 0.9, "social_component": 0.6}, "PERSONAL": {"cognitive_load": 0.4, "time_sensitivity": 0.5, "social_component": 0.5}, "FITNESS": {"cognitive_load": 0.3, "time_sensitivity": 0.6, "social_component": 0.4}, "SOCIAL": {"cognitive_load": 0.3, "time_sensitivity": 0.4, "social_component": 0.9}, "HEALTH": {"cognitive_load": 0.5, "time_sensitivity": 0.7, "social_component": 0.2}, "CREATIVE": {"cognitive_load": 0.7, "time_sensitivity": 0.3, "social_component": 0.4}, "LEARNING": {"cognitive_load": 0.8, "time_sensitivity": 0.5, "social_component": 0.3}, } self.priority_features = { "LOW": {"urgency_score": 0.2, "attention_demand": 0.3, "stress_contribution": 0.2}, "MEDIUM": {"urgency_score": 0.5, "attention_demand": 0.5, "stress_contribution": 0.5}, "HIGH": {"urgency_score": 0.8, "attention_demand": 0.8, "stress_contribution": 0.7}, "URGENT": {"urgency_score": 1.0, "attention_demand": 1.0, "stress_contribution": 0.9}, } def extract_task_features(self, task_data: Dict) -> Dict[str, float]: """Extract all features from task data""" features = {} # Category-based features category = task_data.get("category", "PERSONAL").upper() cat_features = self.category_features.get(category, self.category_features["PERSONAL"]) features.update({f"cat_{k}": v for k, v in cat_features.items()}) # Priority-based features priority = task_data.get("priority", "MEDIUM").upper() pri_features = self.priority_features.get(priority, self.priority_features["MEDIUM"]) features.update({f"pri_{k}": v for k, v in pri_features.items()}) # Time-based features features.update(self._extract_time_features(task_data)) # Complexity features features.update(self._extract_complexity_features(task_data)) # Personality-task interaction features if task_data.get("personality"): features.update(self._extract_personality_interaction_features(task_data)) return features def _extract_time_features(self, task_data: Dict) -> Dict[str, float]: """Extract time-related features""" features = {} due_date = task_data.get("due_date") if due_date: try: if isinstance(due_date, str): due = datetime.fromisoformat(due_date.replace('Z', '+00:00')) else: due = due_date days_until = (due.replace(tzinfo=None) - datetime.now()).days features["days_until_due"] = max(0, days_until) features["time_pressure"] = 1.0 / max(1, days_until) if days_until >= 0 else 1.0 features["is_overdue"] = 1.0 if days_until < 0 else 0.0 # Weekend deadline check features["due_weekend"] = 1.0 if due.weekday() >= 5 else 0.0 except (ValueError, AttributeError): features.update(self._default_time_features()) else: features.update(self._default_time_features()) # Estimated duration features duration = task_data.get("estimated_duration") or 60 features["duration_minutes"] = duration features["duration_normalized"] = min(1.0, duration / 480) # Normalize to 8-hour day features["is_short_task"] = 1.0 if duration <= 30 else 0.0 features["is_long_task"] = 1.0 if duration >= 180 else 0.0 return features def _default_time_features(self) -> Dict[str, float]: """Default time features when due date is not available""" return { "days_until_due": 7.0, "time_pressure": 0.14, "is_overdue": 0.0, "due_weekend": 0.0 } def _extract_complexity_features(self, task_data: Dict) -> Dict[str, float]: """Extract complexity-related features""" complexity = task_data.get("complexity") or 3 description = task_data.get("description") or "" description_complexity = self._calculate_description_complexity(description) estimated_complexity = min( 1.0, (complexity / 5.0) * 0.7 + description_complexity * 0.3 ) return { "complexity_raw": complexity, "complexity_normalized": complexity / 5.0, "is_simple": 1.0 if complexity <= 2 else 0.0, "is_complex": 1.0 if complexity >= 4 else 0.0, "description_complexity": description_complexity, "estimated_complexity": estimated_complexity, } def _calculate_description_complexity(self, description: str) -> float: """Estimate complexity from task description length and structure""" if not description: return 0.5 words = description.split() word_count = len(words) sentence_count = max(1, description.count(".") + description.count("!") + description.count("?")) word_score = min(1.0, word_count / 120) sentence_score = min(1.0, sentence_count / 8) return min(1.0, word_score * 0.7 + sentence_score * 0.3) def _extract_personality_interaction_features(self, task_data: Dict) -> Dict[str, float]: """Extract interaction features between personality and task""" personality = task_data.get("personality") or {} category = task_data.get("category", "PERSONAL").upper() features = {} # Conscientiousness x Task structure interaction conscientiousness = (personality.get("conscientiousness") or 50) / 100 features["conscient_task_fit"] = conscientiousness if category in ["ACADEMIC", "WORK"] else conscientiousness * 0.8 # Extraversion x Social task interaction extraversion = (personality.get("extraversion") or 50) / 100 cat_social = self.category_features.get(category, {}).get("social_component", 0.5) features["extravert_social_fit"] = 1 - abs(extraversion - cat_social) # Neuroticism x Stress interaction neuroticism = (personality.get("neuroticism") or 50) / 100 priority_stress = self.priority_features.get( task_data.get("priority", "MEDIUM").upper(), {} ).get("stress_contribution", 0.5) features["neurot_stress_vulnerability"] = neuroticism * priority_stress # Openness x Creative task interaction openness = (personality.get("openness") or 50) / 100 features["open_creative_fit"] = openness if category in ["CREATIVE", "LEARNING"] else openness * 0.6 return features def extract_personality_features(self, personality_data: Dict) -> Dict[str, float]: """Extract features from personality data""" features = {} # Raw traits normalized for trait in ["openness", "conscientiousness", "extraversion", "agreeableness", "neuroticism"]: raw_value = personality_data.get(trait) or 50 features[f"trait_{trait}"] = raw_value / 100 features[f"trait_{trait}_high"] = 1.0 if raw_value >= 70 else 0.0 features[f"trait_{trait}_low"] = 1.0 if raw_value <= 30 else 0.0 # Derived composite features features["emotional_stability"] = 1 - ((personality_data.get("neuroticism") or 50) / 100) features["productivity_potential"] = ( (personality_data.get("conscientiousness") or 50) + (100 - (personality_data.get("neuroticism") or 50)) ) / 200 features["social_energy"] = ( (personality_data.get("extraversion") or 50) + (personality_data.get("agreeableness") or 50) ) / 200 features["innovation_index"] = (personality_data.get("openness") or 50) / 100 return features def normalize_features(self, features: Dict[str, float]) -> Dict[str, float]: """Normalize features to consistent range""" normalized = {} for key, value in features.items(): if isinstance(value, (int, float)): # Ensure not NaN or Inf if np.isfinite(value): normalized[key] = float(np.clip(value, 0, 1)) else: normalized[key] = 0.5 # Default for invalid numbers elif isinstance(value, bool): # Convert bool to float normalized[key] = float(value) elif value is not None: # Skip non-numeric values that aren't None continue else: # None becomes 0.5 (neutral default) normalized[key] = 0.5 return normalized class HistoricalFeatureAggregator: """Aggregates historical data into features for prediction""" def __init__(self): self.lookback_days = 30 def aggregate_task_history(self, historical_tasks: List[Dict]) -> Dict[str, float]: """Aggregate features from historical task data""" if not historical_tasks: return self._default_historical_features() features = {} # Completion metrics completed = [t for t in historical_tasks if t.get("status") == "COMPLETED"] features["completion_rate"] = len(completed) / len(historical_tasks) # On-time completion on_time = [t for t in completed if self._was_on_time(t)] features["on_time_rate"] = len(on_time) / len(completed) if completed else 0.5 # Category performance category_stats = self._calculate_category_stats(historical_tasks) features.update(category_stats) # Productivity patterns productivity_patterns = self._calculate_productivity_patterns(historical_tasks) features.update(productivity_patterns) # Overdue patterns overdue_count = sum(1 for t in historical_tasks if t.get("is_overdue", False)) features["overdue_tendency"] = overdue_count / len(historical_tasks) return features def _was_on_time(self, task: Dict) -> bool: """Check if task was completed on time""" due_date = task.get("due_date") completed_date = task.get("completed_at") if not due_date or not completed_date: return True # Assume on-time if dates unknown try: due = datetime.fromisoformat(str(due_date).replace('Z', '+00:00')) completed = datetime.fromisoformat(str(completed_date).replace('Z', '+00:00')) return completed <= due except (ValueError, TypeError, AttributeError): return True def _calculate_category_stats(self, tasks: List[Dict]) -> Dict[str, float]: """Calculate performance statistics per category""" category_totals = {} category_completed = {} for task in tasks: cat = task.get("category", "PERSONAL").upper() category_totals[cat] = category_totals.get(cat, 0) + 1 if task.get("status") == "COMPLETED": category_completed[cat] = category_completed.get(cat, 0) + 1 features = {} for cat, total in category_totals.items(): completed = category_completed.get(cat, 0) features[f"cat_{cat.lower()}_completion_rate"] = completed / total if total > 0 else 0.5 return features def _calculate_productivity_patterns(self, tasks: List[Dict]) -> Dict[str, float]: """Calculate productivity patterns from historical data""" day_counts = {i: 0 for i in range(7)} # Monday = 0 day_completed = {i: 0 for i in range(7)} for task in tasks: created = task.get("created_at") if created: try: date = datetime.fromisoformat(str(created).replace('Z', '+00:00')) day = date.weekday() day_counts[day] += 1 if task.get("status") == "COMPLETED": day_completed[day] += 1 except (ValueError, TypeError, AttributeError): continue features = {} day_names = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] for i, name in enumerate(day_names): if day_counts[i] > 0: features[f"prod_{name}"] = day_completed[i] / day_counts[i] else: features[f"prod_{name}"] = 0.5 # Best and worst days best_day = max(features.items(), key=lambda x: x[1], default=("prod_wednesday", 0.5)) worst_day = min(features.items(), key=lambda x: x[1], default=("prod_monday", 0.5)) features["best_day_productivity"] = best_day[1] features["worst_day_productivity"] = worst_day[1] return features def _default_historical_features(self) -> Dict[str, float]: """Return default features when no history available""" return { "completion_rate": 0.7, "on_time_rate": 0.75, "overdue_tendency": 0.1, "best_day_productivity": 0.8, "worst_day_productivity": 0.6 } class FeatureScaler: """Handles feature scaling and normalization""" def __init__(self): self.feature_ranges = { "days_until_due": (0, 30), "duration_minutes": (0, 480), "complexity_raw": (1, 5), } self.means = {} self.stds = {} def fit(self, feature_sets: List[Dict[str, float]]): """Fit scaler on historical feature sets""" if not feature_sets: return # Calculate means and standard deviations all_features = {} for fs in feature_sets: for key, value in fs.items(): if isinstance(value, (int, float)): if key not in all_features: all_features[key] = [] all_features[key].append(value) for key, values in all_features.items(): self.means[key] = np.mean(values) self.stds[key] = np.std(values) if len(values) > 1 else 1.0 def transform(self, features: Dict[str, float]) -> Dict[str, float]: """Transform features using fitted parameters""" transformed = {} for key, value in features.items(): if isinstance(value, (int, float)): # Ensure not NaN or Inf if not np.isfinite(value): transformed[key] = 0.5 # Use min-max scaling for known ranges elif key in self.feature_ranges: min_val, max_val = self.feature_ranges[key] transformed[key] = (value - min_val) / (max_val - min_val) # Use z-score normalization for others elif key in self.means: std = self.stds.get(key, 1.0) if std > 0: transformed[key] = (value - self.means[key]) / std else: transformed[key] = 0.0 else: transformed[key] = value elif isinstance(value, bool): transformed[key] = float(value) else: # Skip or default non-numeric values pass return transformed def fit_transform(self, feature_sets: List[Dict[str, float]]) -> List[Dict[str, float]]: """Fit and transform in one step""" self.fit(feature_sets) return [self.transform(fs) for fs in feature_sets]