Spaces:
Sleeping
Sleeping
| """ | |
| Feature Engineering Module for Cognexa ML Service | |
| This module handles: | |
| - Feature extraction from tasks and personality data | |
| - Feature normalization and scaling | |
| - Derived feature computation | |
| """ | |
| from typing import Dict, List | |
| from datetime import datetime | |
| import numpy as np | |
| class FeatureExtractor: | |
| """Extracts and engineers features from raw task and personality data""" | |
| def __init__(self): | |
| self.category_features = { | |
| "ACADEMIC": {"cognitive_load": 0.85, "time_sensitivity": 0.8, "social_component": 0.3}, | |
| "WORK": {"cognitive_load": 0.7, "time_sensitivity": 0.9, "social_component": 0.6}, | |
| "PERSONAL": {"cognitive_load": 0.4, "time_sensitivity": 0.5, "social_component": 0.5}, | |
| "FITNESS": {"cognitive_load": 0.3, "time_sensitivity": 0.6, "social_component": 0.4}, | |
| "SOCIAL": {"cognitive_load": 0.3, "time_sensitivity": 0.4, "social_component": 0.9}, | |
| "HEALTH": {"cognitive_load": 0.5, "time_sensitivity": 0.7, "social_component": 0.2}, | |
| "CREATIVE": {"cognitive_load": 0.7, "time_sensitivity": 0.3, "social_component": 0.4}, | |
| "LEARNING": {"cognitive_load": 0.8, "time_sensitivity": 0.5, "social_component": 0.3}, | |
| } | |
| self.priority_features = { | |
| "LOW": {"urgency_score": 0.2, "attention_demand": 0.3, "stress_contribution": 0.2}, | |
| "MEDIUM": {"urgency_score": 0.5, "attention_demand": 0.5, "stress_contribution": 0.5}, | |
| "HIGH": {"urgency_score": 0.8, "attention_demand": 0.8, "stress_contribution": 0.7}, | |
| "URGENT": {"urgency_score": 1.0, "attention_demand": 1.0, "stress_contribution": 0.9}, | |
| } | |
| def extract_task_features(self, task_data: Dict) -> Dict[str, float]: | |
| """Extract all features from task data""" | |
| features = {} | |
| # Category-based features | |
| category = task_data.get("category", "PERSONAL").upper() | |
| cat_features = self.category_features.get(category, self.category_features["PERSONAL"]) | |
| features.update({f"cat_{k}": v for k, v in cat_features.items()}) | |
| # Priority-based features | |
| priority = task_data.get("priority", "MEDIUM").upper() | |
| pri_features = self.priority_features.get(priority, self.priority_features["MEDIUM"]) | |
| features.update({f"pri_{k}": v for k, v in pri_features.items()}) | |
| # Time-based features | |
| features.update(self._extract_time_features(task_data)) | |
| # Complexity features | |
| features.update(self._extract_complexity_features(task_data)) | |
| # Personality-task interaction features | |
| if task_data.get("personality"): | |
| features.update(self._extract_personality_interaction_features(task_data)) | |
| return features | |
| def _extract_time_features(self, task_data: Dict) -> Dict[str, float]: | |
| """Extract time-related features""" | |
| features = {} | |
| due_date = task_data.get("due_date") | |
| if due_date: | |
| try: | |
| if isinstance(due_date, str): | |
| due = datetime.fromisoformat(due_date.replace('Z', '+00:00')) | |
| else: | |
| due = due_date | |
| days_until = (due.replace(tzinfo=None) - datetime.now()).days | |
| features["days_until_due"] = max(0, days_until) | |
| features["time_pressure"] = 1.0 / max(1, days_until) if days_until >= 0 else 1.0 | |
| features["is_overdue"] = 1.0 if days_until < 0 else 0.0 | |
| # Weekend deadline check | |
| features["due_weekend"] = 1.0 if due.weekday() >= 5 else 0.0 | |
| except (ValueError, AttributeError): | |
| features.update(self._default_time_features()) | |
| else: | |
| features.update(self._default_time_features()) | |
| # Estimated duration features | |
| duration = task_data.get("estimated_duration") or 60 | |
| features["duration_minutes"] = duration | |
| features["duration_normalized"] = min(1.0, duration / 480) # Normalize to 8-hour day | |
| features["is_short_task"] = 1.0 if duration <= 30 else 0.0 | |
| features["is_long_task"] = 1.0 if duration >= 180 else 0.0 | |
| return features | |
| def _default_time_features(self) -> Dict[str, float]: | |
| """Default time features when due date is not available""" | |
| return { | |
| "days_until_due": 7.0, | |
| "time_pressure": 0.14, | |
| "is_overdue": 0.0, | |
| "due_weekend": 0.0 | |
| } | |
| def _extract_complexity_features(self, task_data: Dict) -> Dict[str, float]: | |
| """Extract complexity-related features""" | |
| complexity = task_data.get("complexity") or 3 | |
| description = task_data.get("description") or "" | |
| description_complexity = self._calculate_description_complexity(description) | |
| estimated_complexity = min( | |
| 1.0, | |
| (complexity / 5.0) * 0.7 + description_complexity * 0.3 | |
| ) | |
| return { | |
| "complexity_raw": complexity, | |
| "complexity_normalized": complexity / 5.0, | |
| "is_simple": 1.0 if complexity <= 2 else 0.0, | |
| "is_complex": 1.0 if complexity >= 4 else 0.0, | |
| "description_complexity": description_complexity, | |
| "estimated_complexity": estimated_complexity, | |
| } | |
| def _calculate_description_complexity(self, description: str) -> float: | |
| """Estimate complexity from task description length and structure""" | |
| if not description: | |
| return 0.5 | |
| words = description.split() | |
| word_count = len(words) | |
| sentence_count = max(1, description.count(".") + description.count("!") + description.count("?")) | |
| word_score = min(1.0, word_count / 120) | |
| sentence_score = min(1.0, sentence_count / 8) | |
| return min(1.0, word_score * 0.7 + sentence_score * 0.3) | |
| def _extract_personality_interaction_features(self, task_data: Dict) -> Dict[str, float]: | |
| """Extract interaction features between personality and task""" | |
| personality = task_data.get("personality") or {} | |
| category = task_data.get("category", "PERSONAL").upper() | |
| features = {} | |
| # Conscientiousness x Task structure interaction | |
| conscientiousness = (personality.get("conscientiousness") or 50) / 100 | |
| features["conscient_task_fit"] = conscientiousness if category in ["ACADEMIC", "WORK"] else conscientiousness * 0.8 | |
| # Extraversion x Social task interaction | |
| extraversion = (personality.get("extraversion") or 50) / 100 | |
| cat_social = self.category_features.get(category, {}).get("social_component", 0.5) | |
| features["extravert_social_fit"] = 1 - abs(extraversion - cat_social) | |
| # Neuroticism x Stress interaction | |
| neuroticism = (personality.get("neuroticism") or 50) / 100 | |
| priority_stress = self.priority_features.get( | |
| task_data.get("priority", "MEDIUM").upper(), {} | |
| ).get("stress_contribution", 0.5) | |
| features["neurot_stress_vulnerability"] = neuroticism * priority_stress | |
| # Openness x Creative task interaction | |
| openness = (personality.get("openness") or 50) / 100 | |
| features["open_creative_fit"] = openness if category in ["CREATIVE", "LEARNING"] else openness * 0.6 | |
| return features | |
| def extract_personality_features(self, personality_data: Dict) -> Dict[str, float]: | |
| """Extract features from personality data""" | |
| features = {} | |
| # Raw traits normalized | |
| for trait in ["openness", "conscientiousness", "extraversion", "agreeableness", "neuroticism"]: | |
| raw_value = personality_data.get(trait) or 50 | |
| features[f"trait_{trait}"] = raw_value / 100 | |
| features[f"trait_{trait}_high"] = 1.0 if raw_value >= 70 else 0.0 | |
| features[f"trait_{trait}_low"] = 1.0 if raw_value <= 30 else 0.0 | |
| # Derived composite features | |
| features["emotional_stability"] = 1 - ((personality_data.get("neuroticism") or 50) / 100) | |
| features["productivity_potential"] = ( | |
| (personality_data.get("conscientiousness") or 50) + | |
| (100 - (personality_data.get("neuroticism") or 50)) | |
| ) / 200 | |
| features["social_energy"] = ( | |
| (personality_data.get("extraversion") or 50) + | |
| (personality_data.get("agreeableness") or 50) | |
| ) / 200 | |
| features["innovation_index"] = (personality_data.get("openness") or 50) / 100 | |
| return features | |
| def normalize_features(self, features: Dict[str, float]) -> Dict[str, float]: | |
| """Normalize features to consistent range""" | |
| normalized = {} | |
| for key, value in features.items(): | |
| if isinstance(value, (int, float)): | |
| # Ensure not NaN or Inf | |
| if np.isfinite(value): | |
| normalized[key] = float(np.clip(value, 0, 1)) | |
| else: | |
| normalized[key] = 0.5 # Default for invalid numbers | |
| elif isinstance(value, bool): | |
| # Convert bool to float | |
| normalized[key] = float(value) | |
| elif value is not None: | |
| # Skip non-numeric values that aren't None | |
| continue | |
| else: | |
| # None becomes 0.5 (neutral default) | |
| normalized[key] = 0.5 | |
| return normalized | |
| class HistoricalFeatureAggregator: | |
| """Aggregates historical data into features for prediction""" | |
| def __init__(self): | |
| self.lookback_days = 30 | |
| def aggregate_task_history(self, historical_tasks: List[Dict]) -> Dict[str, float]: | |
| """Aggregate features from historical task data""" | |
| if not historical_tasks: | |
| return self._default_historical_features() | |
| features = {} | |
| # Completion metrics | |
| completed = [t for t in historical_tasks if t.get("status") == "COMPLETED"] | |
| features["completion_rate"] = len(completed) / len(historical_tasks) | |
| # On-time completion | |
| on_time = [t for t in completed if self._was_on_time(t)] | |
| features["on_time_rate"] = len(on_time) / len(completed) if completed else 0.5 | |
| # Category performance | |
| category_stats = self._calculate_category_stats(historical_tasks) | |
| features.update(category_stats) | |
| # Productivity patterns | |
| productivity_patterns = self._calculate_productivity_patterns(historical_tasks) | |
| features.update(productivity_patterns) | |
| # Overdue patterns | |
| overdue_count = sum(1 for t in historical_tasks if t.get("is_overdue", False)) | |
| features["overdue_tendency"] = overdue_count / len(historical_tasks) | |
| return features | |
| def _was_on_time(self, task: Dict) -> bool: | |
| """Check if task was completed on time""" | |
| due_date = task.get("due_date") | |
| completed_date = task.get("completed_at") | |
| if not due_date or not completed_date: | |
| return True # Assume on-time if dates unknown | |
| try: | |
| due = datetime.fromisoformat(str(due_date).replace('Z', '+00:00')) | |
| completed = datetime.fromisoformat(str(completed_date).replace('Z', '+00:00')) | |
| return completed <= due | |
| except (ValueError, TypeError, AttributeError): | |
| return True | |
| def _calculate_category_stats(self, tasks: List[Dict]) -> Dict[str, float]: | |
| """Calculate performance statistics per category""" | |
| category_totals = {} | |
| category_completed = {} | |
| for task in tasks: | |
| cat = task.get("category", "PERSONAL").upper() | |
| category_totals[cat] = category_totals.get(cat, 0) + 1 | |
| if task.get("status") == "COMPLETED": | |
| category_completed[cat] = category_completed.get(cat, 0) + 1 | |
| features = {} | |
| for cat, total in category_totals.items(): | |
| completed = category_completed.get(cat, 0) | |
| features[f"cat_{cat.lower()}_completion_rate"] = completed / total if total > 0 else 0.5 | |
| return features | |
| def _calculate_productivity_patterns(self, tasks: List[Dict]) -> Dict[str, float]: | |
| """Calculate productivity patterns from historical data""" | |
| day_counts = {i: 0 for i in range(7)} # Monday = 0 | |
| day_completed = {i: 0 for i in range(7)} | |
| for task in tasks: | |
| created = task.get("created_at") | |
| if created: | |
| try: | |
| date = datetime.fromisoformat(str(created).replace('Z', '+00:00')) | |
| day = date.weekday() | |
| day_counts[day] += 1 | |
| if task.get("status") == "COMPLETED": | |
| day_completed[day] += 1 | |
| except (ValueError, TypeError, AttributeError): | |
| continue | |
| features = {} | |
| day_names = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] | |
| for i, name in enumerate(day_names): | |
| if day_counts[i] > 0: | |
| features[f"prod_{name}"] = day_completed[i] / day_counts[i] | |
| else: | |
| features[f"prod_{name}"] = 0.5 | |
| # Best and worst days | |
| best_day = max(features.items(), key=lambda x: x[1], default=("prod_wednesday", 0.5)) | |
| worst_day = min(features.items(), key=lambda x: x[1], default=("prod_monday", 0.5)) | |
| features["best_day_productivity"] = best_day[1] | |
| features["worst_day_productivity"] = worst_day[1] | |
| return features | |
| def _default_historical_features(self) -> Dict[str, float]: | |
| """Return default features when no history available""" | |
| return { | |
| "completion_rate": 0.7, | |
| "on_time_rate": 0.75, | |
| "overdue_tendency": 0.1, | |
| "best_day_productivity": 0.8, | |
| "worst_day_productivity": 0.6 | |
| } | |
| class FeatureScaler: | |
| """Handles feature scaling and normalization""" | |
| def __init__(self): | |
| self.feature_ranges = { | |
| "days_until_due": (0, 30), | |
| "duration_minutes": (0, 480), | |
| "complexity_raw": (1, 5), | |
| } | |
| self.means = {} | |
| self.stds = {} | |
| def fit(self, feature_sets: List[Dict[str, float]]): | |
| """Fit scaler on historical feature sets""" | |
| if not feature_sets: | |
| return | |
| # Calculate means and standard deviations | |
| all_features = {} | |
| for fs in feature_sets: | |
| for key, value in fs.items(): | |
| if isinstance(value, (int, float)): | |
| if key not in all_features: | |
| all_features[key] = [] | |
| all_features[key].append(value) | |
| for key, values in all_features.items(): | |
| self.means[key] = np.mean(values) | |
| self.stds[key] = np.std(values) if len(values) > 1 else 1.0 | |
| def transform(self, features: Dict[str, float]) -> Dict[str, float]: | |
| """Transform features using fitted parameters""" | |
| transformed = {} | |
| for key, value in features.items(): | |
| if isinstance(value, (int, float)): | |
| # Ensure not NaN or Inf | |
| if not np.isfinite(value): | |
| transformed[key] = 0.5 | |
| # Use min-max scaling for known ranges | |
| elif key in self.feature_ranges: | |
| min_val, max_val = self.feature_ranges[key] | |
| transformed[key] = (value - min_val) / (max_val - min_val) | |
| # Use z-score normalization for others | |
| elif key in self.means: | |
| std = self.stds.get(key, 1.0) | |
| if std > 0: | |
| transformed[key] = (value - self.means[key]) / std | |
| else: | |
| transformed[key] = 0.0 | |
| else: | |
| transformed[key] = value | |
| elif isinstance(value, bool): | |
| transformed[key] = float(value) | |
| else: | |
| # Skip or default non-numeric values | |
| pass | |
| return transformed | |
| def fit_transform(self, feature_sets: List[Dict[str, float]]) -> List[Dict[str, float]]: | |
| """Fit and transform in one step""" | |
| self.fit(feature_sets) | |
| return [self.transform(fs) for fs in feature_sets] | |