SPG_ML / explainability.py
meetmendapara's picture
Added Personalization Models
5059de5
"""
Explainability Module for Cognexa ML Service
This module provides:
- Real SHAP feature attribution (when trained model available)
- Approximated feature attribution (fallback)
- Counterfactual explanations
- Natural language explanations
- Visualization data generation
"""
from typing import Dict, List, Tuple, Optional, Any
from pathlib import Path
import logging
import pickle
import numpy as np
logger = logging.getLogger(__name__)
TRAINED_MODELS_DIR = Path(__file__).parent / "trained_models"
# Try to import real SHAP library
try:
import shap
SHAP_AVAILABLE = True
except ImportError:
SHAP_AVAILABLE = False
logger.info("SHAP library not installed; using approximation-based explainability.")
try:
import dice_ml # type: ignore
DICE_AVAILABLE = True
except Exception:
DICE_AVAILABLE = False
class SHAPExplainer:
"""SHAP explainer for task predictions.
When a trained model and the `shap` library are available, uses
TreeExplainer or KernelExplainer for real Shapley values.
Otherwise falls back to weighted feature attribution.
Based on Lundberg & Lee (2017): "A Unified Approach to Interpreting Model Predictions"
SHAP values satisfy three desirable properties:
1. Local accuracy: explanation matches model prediction
2. Missingness: missing features have zero impact
3. Consistency: if model changes to rely more on a feature, SHAP value increases
"""
def __init__(self):
# Feature importance weights (used as fallback when no trained model)
# Weights based on meta-analysis of personality-performance research
self.feature_weights = {
"completion_rate": 0.22, # Historical behavior (strongest predictor)
"trait_conscientiousness": 0.15, # Barrick & Mount (1991): r=0.27
"time_pressure": 0.18, # Deadline proximity impact
"complexity_normalized": 0.15, # Task difficulty
"pri_attention_demand": 0.10, # Priority/urgency
"cat_cognitive_load": 0.08, # Category-based mental effort
"duration_normalized": 0.07, # Time investment required
"trait_neuroticism": 0.05, # Stress sensitivity (negative)
"trait_openness": 0.04, # Creativity/adaptability
"trait_extraversion": 0.03, # Social energy
"on_time_rate": 0.08, # Punctuality history
"overdue_tendency": -0.06, # Delay patterns (negative)
}
# Feature value baselines (population means for normalization)
self.baselines = {
"completion_rate": 0.7,
"trait_conscientiousness": 0.5,
"time_pressure": 0.15,
"complexity_normalized": 0.6,
"pri_attention_demand": 0.5,
"cat_cognitive_load": 0.5,
"duration_normalized": 0.25,
"trait_neuroticism": 0.5,
"trait_openness": 0.5,
"trait_extraversion": 0.5,
"on_time_rate": 0.75,
"overdue_tendency": 0.1,
}
# Lazily loaded trained model for real SHAP
self._model_bundle: Optional[Dict] = None
self._shap_explainer = None
self._model_loaded = False
# Cache for SHAP computations
self._shap_cache = {}
def _ensure_model_loaded(self):
"""Load the trained ensemble model for real SHAP computation."""
if self._model_loaded:
return
self._model_loaded = True
try:
path = TRAINED_MODELS_DIR / "task_completion_ensemble.pkl"
if path.exists():
with open(path, "rb") as fh:
self._model_bundle = pickle.load(fh)
logger.info("SHAPExplainer: loaded trained model for real SHAP")
if SHAP_AVAILABLE and self._model_bundle:
model = self._model_bundle.get("model") or self._model_bundle.get("best_model_obj")
if model is not None:
try:
self._shap_explainer = shap.TreeExplainer(model)
logger.info("SHAPExplainer: using TreeExplainer")
except Exception:
try:
self._shap_explainer = shap.KernelExplainer(
model.predict_proba if hasattr(model, 'predict_proba') else model.predict,
np.zeros((1, len(self._model_bundle.get("feature_columns", []))))
)
logger.info("SHAPExplainer: using KernelExplainer")
except Exception as e:
logger.warning("Could not create SHAP explainer: %s", e)
except Exception as exc:
logger.warning("SHAPExplainer: failed to load model: %s", exc)
def _compute_real_shap(self, features: Dict[str, float], task_data: Optional[Dict] = None) -> Optional[Dict[str, float]]:
"""Compute real SHAP values using the trained model."""
self._ensure_model_loaded()
if self._shap_explainer is None or self._model_bundle is None:
return None
try:
feature_columns = self._model_bundle.get("feature_columns", [])
if not feature_columns:
return None
# Build feature vector matching training column order
from models import _build_feature_vector
vec = _build_feature_vector(
feature_columns, features, task_data or {},
self._model_bundle.get("category_encoder"),
self._model_bundle.get("priority_encoder"),
)
shap_values = self._shap_explainer.shap_values(vec)
# Handle multi-output (binary classification returns list of arrays)
if isinstance(shap_values, list):
shap_values = shap_values[1] # positive class
shap_values = np.array(shap_values).flatten()
# Map back to our feature display names
result = {}
feature_name_map = {
"completion_rate": "completion_rate",
"conscientiousness": "trait_conscientiousness",
"time_pressure": "time_pressure",
"complexity": "complexity_normalized",
"cognitive_load": "cat_cognitive_load",
"duration_normalized": "duration_normalized",
"neuroticism": "trait_neuroticism",
"days_until_due": "time_pressure",
"estimated_duration": "duration_normalized",
}
for i, col in enumerate(feature_columns):
if i < len(shap_values):
display_name = feature_name_map.get(col, col)
if display_name in result:
result[display_name] += float(shap_values[i])
else:
result[display_name] = float(shap_values[i])
return result
except Exception as e:
logger.warning("Real SHAP computation failed: %s", e)
return None
def explain(self, features: Dict[str, float], prediction: float, task_data: Optional[Dict] = None) -> Dict:
"""Generate SHAP explanation for prediction.
Tries real SHAP values first; falls back to weighted approximation.
"""
base_value = 0.5 # Base prediction without features
# Try real SHAP values from trained model
real_shap = self._compute_real_shap(features, task_data) if task_data else None
using_real_shap = real_shap is not None
if real_shap:
shap_values = real_shap
else:
# Fallback: approximate SHAP using weighted feature attribution
shap_values = {}
for feature, weight in self.feature_weights.items():
if feature in features:
actual_value = features[feature]
baseline = self.baselines.get(feature, 0.5)
contribution = self._calculate_contribution(
feature, actual_value, baseline, weight, prediction
)
shap_values[feature] = contribution
# Sort by absolute impact
sorted_features = sorted(
shap_values.items(),
key=lambda x: abs(x[1]),
reverse=True
)
# Generate explanation components
return {
"base_value": base_value,
"prediction": prediction,
"shap_values": shap_values,
"method": "tree_shap" if using_real_shap else "weighted_approximation",
"feature_ranking": [
{
"feature": f,
"impact": round(v, 4),
"direction": "positive" if v > 0 else "negative",
"plain_english": self._to_plain_english(
f, v, features.get(f, self.baselines.get(f, 0.5))
)
}
for f, v in sorted_features
],
"top_3_factors_plain_english": [
self._to_plain_english(f, v, features.get(f, self.baselines.get(f, 0.5)))
for f, v in sorted_features[:3]
],
"top_positive_features": self._get_top_features(shap_values, positive=True),
"top_negative_features": self._get_top_features(shap_values, positive=False),
"explanation_text": self._generate_text_explanation(sorted_features, prediction),
"waterfall_data": self._create_waterfall_data(sorted_features, base_value, prediction)
}
def _calculate_contribution(self, feature: str, actual: float,
baseline: float, weight: float,
prediction: float) -> float:
"""Calculate feature contribution to prediction"""
# Direction depends on feature type
if feature in ["complexity_normalized", "time_pressure", "cat_cognitive_load",
"duration_normalized", "trait_neuroticism"]:
# These features negatively impact completion probability
contribution = -(actual - baseline) * weight
else:
# These features positively impact completion probability
contribution = (actual - baseline) * weight
# Scale to match prediction deviation from base
scale_factor = (prediction - 0.5) / (sum(self.feature_weights.values()) * 0.5)
contribution *= abs(scale_factor) if scale_factor != 0 else 1
return round(contribution, 4)
def _get_top_features(self, shap_values: Dict[str, float],
positive: bool, n: int = 3) -> List[Dict]:
"""Get top N positive or negative features"""
filtered = {k: v for k, v in shap_values.items()
if (v > 0 if positive else v < 0)}
sorted_features = sorted(
filtered.items(),
key=lambda x: x[1] if positive else -x[1],
reverse=True
)[:n]
return [
{
"feature": self._format_feature_name(f),
"impact": round(abs(v), 4),
"raw_feature": f,
"plain_english": self._to_plain_english(f, v,
self.baselines.get(f, 0.5) + (v / self.feature_weights.get(f, 0.1) if self.feature_weights.get(f, 0) else 0))
}
for f, v in sorted_features
]
def _format_feature_name(self, feature: str) -> str:
"""Format feature name for display"""
name_mapping = {
"completion_rate": "Historical Completion Rate",
"trait_conscientiousness": "Conscientiousness",
"time_pressure": "Time Pressure",
"complexity_normalized": "Task Complexity",
"pri_attention_demand": "Priority Level",
"cat_cognitive_load": "Category Difficulty",
"duration_normalized": "Task Duration",
"trait_neuroticism": "Stress Sensitivity"
}
return name_mapping.get(feature, feature.replace("_", " ").title())
def _to_plain_english(self, feature: str, value: float, actual: float) -> str:
"""Translate a SHAP feature contribution to plain English.
Returns a human-readable sentence explaining WHY this feature
matters for the prediction, personalized to the actual value.
"""
explanations = {
"completion_rate": {
"positive": f"You've completed {actual:.0%} of past tasks on time - this strong track record boosts your predicted success.",
"negative": f"Your recent completion rate ({actual:.0%}) is lower than average, suggesting you may struggle to finish on time."
},
"trait_conscientiousness": {
"positive": "Your high conscientiousness means you tend to be disciplined and organized, which helps task completion.",
"negative": "Lower conscientiousness can mean less structured work habits - try setting external reminders."
},
"time_pressure": {
"positive": "You have comfortable time before the deadline, reducing stress and delay risk.",
"negative": "The deadline is approaching fast, which increases the chance of delay."
},
"complexity_normalized": {
"positive": "This is a straightforward task with low complexity - you should be able to handle it well.",
"negative": "This task is quite complex, which makes it harder to complete on time without careful planning."
},
"pri_attention_demand": {
"positive": "This task has high priority, so you're likely to give it focused attention.",
"negative": "Lower priority may mean this task gets pushed aside in favor of urgent items."
},
"cat_cognitive_load": {
"positive": "The nature of this task doesn't require heavy mental effort, making it easier to complete.",
"negative": "This type of task demands significant cognitive effort, which can slow you down."
},
"duration_normalized": {
"positive": "This is a relatively short task - easier to complete in one sitting.",
"negative": "This is a long task that may be interrupted, increasing delay risk."
},
"trait_neuroticism": {
"positive": "Your emotional stability helps you stay calm under pressure, supporting on-time delivery.",
"negative": "Higher stress sensitivity may amplify worry about this task, consider mindfulness breaks."
}
}
direction = "positive" if value > 0 else "negative"
feature_explanations = explanations.get(feature, {})
if feature_explanations:
return feature_explanations.get(direction, f"{'Helps' if value > 0 else 'Hinders'} your chances of completing on time.")
# Generic fallback
formatted = self._format_feature_name(feature)
if value > 0:
return f"Your {formatted.lower()} is working in your favor for this task."
else:
return f"Your {formatted.lower()} is a concern that may cause delay."
def _generate_text_explanation(self, sorted_features: List[Tuple],
prediction: float) -> str:
"""Generate human-readable explanation"""
# Determine overall outcome
if prediction >= 0.7:
outcome = "likely to be completed on time"
elif prediction >= 0.5:
outcome = "moderately likely to be completed on time"
else:
outcome = "at risk of not being completed on time"
explanation = f"This task is {outcome} ({prediction:.0%} probability). "
# Describe key factors
factors = []
for feature, value in sorted_features[:3]:
formatted = self._format_feature_name(feature)
if value > 0.05:
factors.append(f"{formatted} increases likelihood")
elif value < -0.05:
factors.append(f"{formatted} decreases likelihood")
if factors:
explanation += "Key factors: " + "; ".join(factors) + "."
return explanation
def _create_waterfall_data(self, sorted_features: List[Tuple],
base: float, final: float) -> List[Dict]:
"""Create data for waterfall visualization"""
waterfall = [
{"name": "Base Probability", "value": base, "cumulative": base, "type": "base"}
]
cumulative = base
for feature, value in sorted_features:
cumulative += value
waterfall.append({
"name": self._format_feature_name(feature),
"value": round(value, 3),
"cumulative": round(cumulative, 3),
"type": "positive" if value > 0 else "negative"
})
waterfall.append({
"name": "Final Prediction",
"value": round(final, 3),
"cumulative": round(final, 3),
"type": "total"
})
return waterfall
class CounterfactualExplainer:
"""Generates counterfactual explanations"""
def __init__(self):
# Actionable features and their change impacts
self.actionable_features = {
"complexity_normalized": {
"action": "Break task into smaller subtasks",
"change_direction": "decrease",
"impact_per_unit": 0.15
},
"time_pressure": {
"action": "Extend deadline if possible",
"change_direction": "decrease",
"impact_per_unit": 0.12
},
"duration_normalized": {
"action": "Reduce task scope",
"change_direction": "decrease",
"impact_per_unit": 0.08
},
"pri_attention_demand": {
"action": "Prioritize this task higher",
"change_direction": "increase",
"impact_per_unit": 0.05
}
}
def generate_counterfactuals(
self,
features: Dict[str, float],
current_prediction: float,
target_prediction: float = 0.7,
use_dice: bool = False,
) -> List[Dict]:
"""Generate counterfactual explanations to reach target."""
if use_dice and not DICE_AVAILABLE:
logger.info("DiCE requested but not available; using heuristic counterfactuals")
if current_prediction >= target_prediction:
return [{"message": "Task already meets target probability"}]
gap = target_prediction - current_prediction
counterfactuals = []
for feature, config in self.actionable_features.items():
if feature in features:
current_value = features[feature]
# Calculate needed change
impact = config["impact_per_unit"]
direction = 1 if config["change_direction"] == "increase" else -1
# How much feature needs to change
needed_change = gap / impact * direction
# New value
new_value = current_value + needed_change
# Check if change is feasible (0-1 range)
if 0 <= new_value <= 1:
expected_prob = current_prediction + impact * abs(needed_change)
counterfactuals.append({
"feature": feature,
"current_value": round(current_value, 3),
"suggested_value": round(new_value, 3),
"change_amount": round(needed_change, 3),
"action": config["action"],
"expected_probability": round(min(0.95, expected_prob), 2),
"feasibility": self._assess_feasibility(feature, current_value, new_value)
})
# Sort by feasibility and impact
counterfactuals.sort(
key=lambda x: (x["feasibility"] == "high", x["expected_probability"]),
reverse=True
)
return counterfactuals[:5] # Return top 5 counterfactuals
def _assess_feasibility(self, feature: str, current: float, suggested: float) -> str:
"""Assess how feasible a change is"""
change_magnitude = abs(suggested - current)
# Some features are easier to change
easy_features = ["pri_attention_demand"]
hard_features = ["time_pressure"] # Deadlines often fixed
if feature in easy_features:
return "high"
elif feature in hard_features:
return "low" if change_magnitude > 0.3 else "medium"
else:
if change_magnitude < 0.2:
return "high"
elif change_magnitude < 0.4:
return "medium"
else:
return "low"
class RecommendationGenerator:
"""Generates actionable recommendations based on prediction analysis"""
def __init__(self):
self.recommendation_templates = {
"high_complexity": [
{"title": "Break Down Task", "description": "Split into smaller, manageable subtasks", "priority": "high"},
{"title": "Identify Key Milestones", "description": "Set clear checkpoint goals", "priority": "medium"}
],
"time_pressure": [
{"title": "Start Early", "description": "Begin work today to reduce deadline pressure", "priority": "high"},
{"title": "Time Block", "description": "Reserve dedicated time slots for this task", "priority": "medium"}
],
"high_stress": [
{"title": "Take Breaks", "description": "Schedule regular 5-10 minute breaks", "priority": "medium"},
{"title": "Mindfulness", "description": "Try a quick breathing exercise before starting", "priority": "low"}
],
"low_conscientiousness": [
{"title": "Set Reminders", "description": "Create progress check-in reminders", "priority": "high"},
{"title": "External Accountability", "description": "Share your goal with someone", "priority": "medium"}
],
"high_neuroticism": [
{"title": "Buffer Time", "description": "Add extra time to deadline in your planning", "priority": "medium"},
{"title": "Worst-Case Planning", "description": "Identify backup plans if issues arise", "priority": "low"}
],
"introversion_social_task": [
{"title": "Prepare Talking Points", "description": "Plan what you need to communicate", "priority": "medium"},
{"title": "Schedule Recovery", "description": "Plan quiet time after social interactions", "priority": "low"}
]
}
def generate_recommendations(self, features: Dict[str, float],
prediction: float,
stress_level: float,
difficulty: str) -> List[Dict]:
"""Generate personalized recommendations"""
recommendations = []
# Difficulty-based recommendations
if difficulty == "HARD" or features.get("complexity_normalized", 0) > 0.7:
recommendations.extend(self.recommendation_templates["high_complexity"])
# Time-based recommendations
if features.get("time_pressure", 0) > 0.3:
recommendations.extend(self.recommendation_templates["time_pressure"])
# Stress-based recommendations
if stress_level >= 7:
recommendations.extend(self.recommendation_templates["high_stress"])
# Personality-based recommendations
if features.get("trait_conscientiousness", 1) < 0.4:
recommendations.extend(self.recommendation_templates["low_conscientiousness"])
if features.get("trait_neuroticism", 0) > 0.6:
recommendations.extend(self.recommendation_templates["high_neuroticism"])
# Social task + introversion
social_component = features.get("cat_social_component", 0)
extraversion = features.get("trait_extraversion", 0.5)
if social_component > 0.6 and extraversion < 0.4:
recommendations.extend(self.recommendation_templates["introversion_social_task"])
# Add risk level and sort
for rec in recommendations:
rec["risk_addressed"] = self._determine_risk_addressed(rec["title"], prediction)
# Remove duplicates and sort by priority
unique_recs = []
seen_titles = set()
for rec in recommendations:
if rec["title"] not in seen_titles:
seen_titles.add(rec["title"])
unique_recs.append(rec)
priority_order = {"high": 0, "medium": 1, "low": 2}
unique_recs.sort(key=lambda x: priority_order.get(x["priority"], 1))
return unique_recs[:6] # Return top 6 recommendations
def _determine_risk_addressed(self, title: str, prediction: float) -> str:
"""Determine what risk the recommendation addresses"""
risk_mapping = {
"Break Down Task": "completion_risk",
"Identify Key Milestones": "tracking_risk",
"Start Early": "time_risk",
"Time Block": "focus_risk",
"Take Breaks": "burnout_risk",
"Mindfulness": "stress_risk",
"Set Reminders": "forgotten_risk",
"External Accountability": "motivation_risk",
"Buffer Time": "deadline_risk",
"Worst-Case Planning": "failure_risk",
"Prepare Talking Points": "social_risk",
"Schedule Recovery": "energy_risk"
}
return risk_mapping.get(title, "general_risk")
class ExplanationAggregator:
"""Aggregates all explanation components into a comprehensive response"""
def __init__(self):
self.shap_explainer = SHAPExplainer()
self.counterfactual_explainer = CounterfactualExplainer()
self.recommendation_generator = RecommendationGenerator()
def generate_full_explanation(self, features: Dict[str, float],
prediction: Dict,
task_data: Dict) -> Dict:
"""Generate comprehensive explanation"""
completion_prob = prediction.get("completion_probability", 0.5)
stress_level = prediction.get("stress_level", 5)
difficulty = prediction.get("difficulty_level", "MODERATE")
# Get SHAP explanation
shap_explanation = self.shap_explainer.explain(features, completion_prob)
# Get counterfactuals (if probability is below target)
counterfactuals = self.counterfactual_explainer.generate_counterfactuals(
features, completion_prob
)
# Get recommendations
recommendations = self.recommendation_generator.generate_recommendations(
features, completion_prob, stress_level, difficulty
)
# Combine into comprehensive explanation
return {
"prediction_summary": {
"completion_probability": completion_prob,
"stress_level": stress_level,
"difficulty": difficulty,
"outcome_assessment": self._assess_outcome(completion_prob)
},
"feature_attribution": shap_explanation,
"counterfactual_scenarios": counterfactuals,
"recommendations": recommendations,
"confidence_assessment": {
"data_quality": self._assess_data_quality(features),
"prediction_confidence": prediction.get("confidence_level", 0.7),
"explanation_confidence": self._calculate_explanation_confidence(features)
},
"natural_language_summary": self._generate_summary(
shap_explanation, counterfactuals, recommendations, prediction
)
}
def _assess_outcome(self, probability: float) -> str:
"""Assess likely outcome"""
if probability >= 0.8:
return "Very likely to succeed"
elif probability >= 0.6:
return "Likely to succeed with some attention"
elif probability >= 0.4:
return "Uncertain - needs proactive management"
else:
return "At risk - consider restructuring"
def _assess_data_quality(self, features: Dict) -> str:
"""Assess quality of input data"""
key_features = ["completion_rate", "trait_conscientiousness", "complexity_normalized", "time_pressure"]
present = sum(1 for f in key_features if f in features)
if present == len(key_features):
return "high"
elif present >= len(key_features) * 0.5:
return "medium"
else:
return "low"
def _calculate_explanation_confidence(self, features: Dict) -> float:
"""Calculate confidence in the explanation"""
# More features = more confident explanation
feature_coverage = len(features) / 10 # Assuming 10 key features
return min(0.9, max(0.5, feature_coverage))
def _generate_summary(self, shap: Dict, counterfactuals: List,
recommendations: List, prediction: Dict) -> str:
"""Generate natural language summary"""
prob = prediction.get("completion_probability", 0.5)
stress = prediction.get("stress_level", 5)
# Opening
if prob >= 0.7:
summary = f"Good news! This task has a {prob:.0%} completion probability. "
elif prob >= 0.5:
summary = f"This task has a moderate {prob:.0%} completion probability. "
else:
summary = f"Attention needed: This task has only a {prob:.0%} completion probability. "
# Key factors
if shap.get("top_positive_features"):
top_pos = shap["top_positive_features"][0]["feature"]
summary += f"Your {top_pos} is working in your favor. "
if shap.get("top_negative_features"):
top_neg = shap["top_negative_features"][0]["feature"]
summary += f"However, {top_neg} is a concern. "
# Stress note
if stress >= 7:
summary += "This is a high-stress task - consider stress management techniques. "
# Top recommendation
if recommendations:
top_rec = recommendations[0]
summary += f"Top recommendation: {top_rec['description']}."
return summary