Spaces:
Sleeping
Sleeping
| """ | |
| CoachingAgent: main orchestrator for MindSphere coaching sessions. | |
| Manages the full lifecycle: | |
| calibration → visualization → planning → coaching → complete | |
| Integrates the POMDP model, ToM particle filter, empathy planner, | |
| and LLM layers. Post-calibration responses are routed through Mistral | |
| LLM for natural, companion-style conversation — with belief context, | |
| ToM predictions, and cognitive load inference injected dynamically. | |
| Falls back to template-based responses when no API key is available. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| from .model import SphereModel, SKILL_FACTORS, SKILL_LEVEL_VALUES, ACTION_NAMES | |
| from .inference import ( | |
| update_belief, | |
| update_all_beliefs, | |
| compute_efe_all_factors, | |
| select_action, | |
| compute_information_gain, | |
| ) | |
| from .action_dispatcher import select_coaching_action | |
| from .dependency_graph import DependencyGraph | |
| from .utils import normalize, softmax | |
| from ..tom.particle_filter import UserTypeFilter | |
| from ..tom.empathy_planner import EmpathyPlanner | |
| from ..content.question_bank import ( | |
| QUESTION_BANK, | |
| CalibrationQuestion, | |
| get_adaptive_question_order, | |
| ) | |
| from ..content.interventions import ( | |
| get_gentle_push_pair, | |
| get_interventions_for_skill, | |
| Intervention, | |
| ) | |
| from ..content.templates import ( | |
| WELCOME_MESSAGE, SPHERE_INTRO, PLAN_INTRO, | |
| COACHING_PROBES, COACHING_EXERCISES, | |
| ) | |
| from .emotional_state import ( | |
| EmotionEngine, | |
| EmotionalPrediction, | |
| EmotionalObservation, | |
| PredictionError, | |
| compute_belief_entropy, | |
| ) | |
| from .learning import ModelLearner | |
| from .user_profile import UserProfile | |
| logger = logging.getLogger(__name__) | |
| # Phase constants | |
| PHASE_CALIBRATION = "calibration" | |
| PHASE_VISUALIZATION = "visualization" | |
| PHASE_PLANNING = "planning" | |
| PHASE_UPDATE = "update" | |
| PHASE_COACHING = "coaching" | |
| PHASE_COMPLETE = "complete" | |
| # Human-readable skill names | |
| SKILL_LABELS = { | |
| "focus": "Focus", | |
| "follow_through": "Follow-through", | |
| "social_courage": "Social Courage", | |
| "emotional_reg": "Emotional Regulation", | |
| "systems_thinking": "Systems Thinking", | |
| "self_trust": "Self-Trust", | |
| "task_clarity": "Task Clarity", | |
| "consistency": "Consistency", | |
| } | |
| class CoachingAgent: | |
| """ | |
| Main orchestrator for a MindSphere coaching session. | |
| When a Mistral API key is available, all post-calibration responses | |
| are generated via LLM with dynamic belief/ToM context injection. | |
| Otherwise, falls back to template-based responses. | |
| Usage: | |
| agent = CoachingAgent() | |
| result = agent.start_session() # returns welcome + first question | |
| # Phase 1: Calibration loop | |
| result = agent.step(user_answer_dict) | |
| # ... repeat for ~10 questions ... | |
| # Phase 2+: All responses routed through LLM | |
| result = agent.step(user_choice_dict) | |
| """ | |
| def __init__( | |
| self, | |
| lambda_empathy: float = 0.5, | |
| n_particles: int = 50, | |
| beta: float = 4.0, | |
| ): | |
| self.model = SphereModel() | |
| self.dep_graph = DependencyGraph() | |
| self.tom = UserTypeFilter(n_particles=n_particles) | |
| self.empathy = EmpathyPlanner(lambda_empathy=lambda_empathy, beta=beta) | |
| self.beliefs: Dict[str, np.ndarray] = {} | |
| self.phase: str = PHASE_CALIBRATION | |
| self.timestep: int = 0 | |
| self.asked_question_ids: List[str] = [] | |
| self.history: List[Dict[str, Any]] = [] | |
| self.current_question: Optional[CalibrationQuestion] = None | |
| self.current_intervention: Optional[Intervention] = None | |
| self.target_skill: Optional[str] = None | |
| # Conversation history for LLM context (role + content pairs) | |
| self.conversation_history: List[Dict[str, str]] = [] | |
| # Tracks turns within each post-calibration phase | |
| self._viz_turns: int = 0 | |
| self._planning_turns: int = 0 | |
| self._coaching_turns: int = 0 | |
| self._probes_asked: List[str] = [] # track which probing questions we've used | |
| self._exercises_given: List[str] = [] # track which exercises we've suggested | |
| self._accepted_interventions: List[Dict[str, Any]] = [] | |
| # Cognitive load tracking | |
| self._recent_sentiments: List[str] = [] # last N sentiment signals | |
| self._max_calibration_questions = 10 | |
| # POMDP parameter learning (Dirichlet concentration parameters) | |
| self.learner = ModelLearner(self.model) | |
| # Circumplex emotion engine (Pattisapu & Albarracin 2024) | |
| self.emotion = EmotionEngine() | |
| self._last_prediction: Optional[EmotionalPrediction] = None | |
| self._last_observation: Optional[EmotionalObservation] = None | |
| self._last_error: Optional[PredictionError] = None | |
| # Semantic user profile with Bayesian network causal model | |
| self.profile = UserProfile() | |
| # LLM generator + classifier — initialized lazily | |
| self._generator = None | |
| self._generator_initialized = False | |
| self._classifier = None | |
| self._classifier_initialized = False | |
| def generator(self): | |
| """Lazy-initialize the LLM generator.""" | |
| if not self._generator_initialized: | |
| self._generator_initialized = True | |
| try: | |
| from ..llm.client import MistralClient | |
| from ..llm.generator import CoachGenerator | |
| client = MistralClient() | |
| self._generator = CoachGenerator(client=client) | |
| if self._generator.is_available: | |
| logger.info(f"LLM generator available — using {client.base_url} ({client.model})") | |
| else: | |
| logger.warning("LLM generator created but not available (no API key?) — using template responses") | |
| self._generator = None | |
| except Exception as e: | |
| logger.warning(f"LLM generator init failed: {e} — using template responses") | |
| self._generator = None | |
| return self._generator | |
| def classifier(self): | |
| """Lazy-initialize the LLM classifier.""" | |
| if not self._classifier_initialized: | |
| self._classifier_initialized = True | |
| try: | |
| from ..llm.client import MistralClient | |
| from ..llm.classifier import SphereClassifier | |
| client = MistralClient() | |
| if client.is_available: | |
| self._classifier = SphereClassifier(client=client) | |
| logger.info("LLM classifier available for emotion observations") | |
| else: | |
| logger.warning("LLM classifier not available (no API key?)") | |
| except Exception as e: | |
| logger.warning(f"LLM classifier init failed: {e}") | |
| self._classifier = None | |
| return self._classifier | |
| def _llm_generate(self, user_message: str, cognitive_load: Optional[Dict] = None) -> str: | |
| """ | |
| Try to generate a response via LLM. Returns empty string if unavailable. | |
| The caller should fall back to template responses when this returns "". | |
| Args: | |
| user_message: The user's text | |
| cognitive_load: Pre-computed cognitive load dict (avoids double-calling | |
| _assess_cognitive_load which duplicates sentiment tracking) | |
| """ | |
| gen = self.generator | |
| if gen is None: | |
| logger.info(f"[LLM] Generator is None — using template fallback (phase={self.phase})") | |
| return "" | |
| # In stream mode, capture context for later streaming instead of calling LLM | |
| if getattr(self, '_stream_mode', False): | |
| ctx = self._prepare_llm_context(user_message, cognitive_load) | |
| self._stream_capture = ctx | |
| logger.info(f"[LLM] Stream mode — context captured (phase={self.phase})") | |
| return "" | |
| ctx = self._prepare_llm_context(user_message, cognitive_load) | |
| logger.info(f"[LLM] Generating response (phase={self.phase}, msg_len={len(user_message)}, history_len={len(self.conversation_history)}, profile_facts={len(self.profile.facts)})") | |
| result = gen.generate( | |
| user_message=ctx["user_message"], | |
| conversation_history=ctx["conversation_history"], | |
| phase=ctx["phase"], | |
| belief_summary=ctx["belief_summary"], | |
| tom_summary=ctx["tom_summary"], | |
| cognitive_load=ctx["cognitive_load"], | |
| target_skill=ctx["target_skill"], | |
| current_intervention=ctx["current_intervention"], | |
| accepted_interventions=ctx["accepted_interventions"], | |
| profile_section=ctx["profile_section"], | |
| ) | |
| if result: | |
| logger.info(f"[LLM] Got response ({len(result)} chars)") | |
| else: | |
| logger.warning(f"[LLM] Empty response — falling back to template (phase={self.phase})") | |
| return result | |
| def _prepare_llm_context(self, user_message: str, cognitive_load: Optional[Dict] = None) -> Dict[str, Any]: | |
| """Prepare shared context dict for LLM generation (used by both sync and stream).""" | |
| intervention_dict = None | |
| if self.current_intervention: | |
| intervention_dict = { | |
| "description": self.current_intervention.description, | |
| "target_skill": self.current_intervention.target_skill, | |
| "duration_minutes": self.current_intervention.duration_minutes, | |
| "difficulty": self.current_intervention.difficulty, | |
| } | |
| if cognitive_load is None: | |
| cognitive_load = self._assess_cognitive_load(user_message) | |
| user_state = self.get_inferred_user_state() | |
| cognitive_load["inferred_emotions"] = user_state.get("recent_emotions", []) | |
| cognitive_load["inferred_topics"] = user_state.get("recent_topics", []) | |
| cognitive_load["engagement_level"] = user_state.get("engagement_level", "moderate") | |
| current_emotion = self.emotion.get_current_emotion() | |
| if current_emotion: | |
| cognitive_load["circumplex_emotion"] = current_emotion.emotion_label() | |
| cognitive_load["circumplex_valence"] = round(current_emotion.valence, 2) | |
| cognitive_load["circumplex_arousal"] = round(current_emotion.arousal, 2) | |
| if self._last_error: | |
| cognitive_load["emotion_prediction_error"] = round(self._last_error.magnitude, 3) | |
| if self._last_prediction and self._last_observation: | |
| cognitive_load["predicted_emotion"] = self._last_prediction.predicted_emotion | |
| cognitive_load["observed_emotion"] = self._last_observation.observed_emotion | |
| return { | |
| "user_message": user_message, | |
| "conversation_history": self.conversation_history, | |
| "phase": self.phase, | |
| "belief_summary": self.get_belief_summary(), | |
| "tom_summary": self.tom.get_user_type_summary(), | |
| "cognitive_load": cognitive_load, | |
| "target_skill": self.target_skill, | |
| "current_intervention": intervention_dict, | |
| "accepted_interventions": self._accepted_interventions, | |
| "profile_section": self.profile.format_for_prompt(), | |
| } | |
| def _llm_generate_stream(self, user_message: str, cognitive_load: Optional[Dict] = None): | |
| """ | |
| Stream LLM response, yielding text chunks. | |
| Returns a generator of string chunks. If LLM is unavailable, yields nothing. | |
| """ | |
| gen = self.generator | |
| if gen is None: | |
| return | |
| ctx = self._prepare_llm_context(user_message, cognitive_load) | |
| logger.info(f"[LLM] Streaming response (phase={self.phase}, msg_len={len(user_message)}, profile_facts={len(self.profile.facts)})") | |
| yield from gen.generate_stream( | |
| user_message=ctx["user_message"], | |
| conversation_history=ctx["conversation_history"], | |
| phase=ctx["phase"], | |
| belief_summary=ctx["belief_summary"], | |
| tom_summary=ctx["tom_summary"], | |
| cognitive_load=ctx["cognitive_load"], | |
| target_skill=ctx["target_skill"], | |
| current_intervention=ctx["current_intervention"], | |
| accepted_interventions=ctx["accepted_interventions"], | |
| profile_section=ctx["profile_section"], | |
| ) | |
| def _assess_cognitive_load(self, user_message: str = "", emotional_data: Optional[Dict] = None) -> Dict[str, Any]: | |
| """ | |
| Infer cognitive load from ToM dimensions + conversation signals + emotional state. | |
| Uses: | |
| - ToM overwhelm_threshold (low = easily overwhelmed) | |
| - Recent sentiment signals from conversation | |
| - Message length and engagement patterns | |
| - Emotional valence beliefs and prediction error from Circumplex POMDP | |
| """ | |
| # ToM-based assessment | |
| tom_type = self.tom.get_user_type_summary() | |
| overwhelm_threshold = tom_type.get("overwhelm_threshold", 0.5) | |
| autonomy = tom_type.get("autonomy_sensitivity", 0.5) | |
| # Conversation-based signals | |
| lower = user_message.lower() | |
| signals = [] | |
| # Detect disengagement | |
| disengaged_words = [ | |
| "bored", "boring", "meh", "whatever", "don't care", | |
| "not interested", "idk", "dunno", | |
| ] | |
| if any(w in lower for w in disengaged_words): | |
| signals.append("disengaged") | |
| # Detect overwhelm | |
| overwhelm_words = [ | |
| "overwhelmed", "too much", "can't handle", "stressed", | |
| "anxious", "burned out", "exhausted", "tired", | |
| ] | |
| if any(w in lower for w in overwhelm_words): | |
| signals.append("overwhelmed") | |
| # Detect positive engagement | |
| engaged_words = [ | |
| "interesting", "tell me more", "curious", "that makes sense", | |
| "let's do it", "i want to", "excited", "ready", | |
| ] | |
| if any(w in lower for w in engaged_words): | |
| signals.append("engaged") | |
| # Detect off-topic intent | |
| offtopic_words = [ | |
| "by the way", "random question", "off topic", "unrelated", | |
| "can you", "do you know", "what about", "have you heard", | |
| "what's going on", "what do you think about", | |
| ] | |
| if any(w in lower for w in offtopic_words): | |
| signals.append("off_topic") | |
| # Detect deflection / topic change | |
| deflection_words = [ | |
| "don't want to talk about", "not want to talk", | |
| "change the subject", "change topic", "something else", | |
| "let's talk about", "can we talk about", "i want to talk about", | |
| "anyway", "never mind", "forget it", "moving on", | |
| "i'd rather", "not right now", "not now", | |
| "but what about", "what about", "how about", | |
| ] | |
| if any(w in lower for w in deflection_words): | |
| signals.append("deflection") | |
| # Short messages may indicate low engagement | |
| if len(user_message.strip()) < 10 and user_message.strip(): | |
| signals.append("low_effort") | |
| # === Emotional state integration (from Circumplex POMDP) === | |
| if emotional_data: | |
| beliefs = emotional_data.get("emotional_beliefs", {}) | |
| valence_belief = beliefs.get("valence", {}).get("belief", []) | |
| # Valence belief concentrated on negative states → emotional distress | |
| if len(valence_belief) >= 5: | |
| negative_mass = valence_belief[0] + valence_belief[1] | |
| if negative_mass > 0.6: | |
| signals.append("emotional_distress") | |
| # High prediction error → our emotional model was wrong | |
| error_data = emotional_data.get("error", {}) | |
| if error_data.get("magnitude", 0) > 0.5: | |
| signals.append("emotional_surprise") | |
| # Current valence strongly negative | |
| current = emotional_data.get("current_emotion") or {} | |
| if current.get("valence", 0) < -0.3: | |
| signals.append("low_valence") | |
| # Also check the observation directly (in case current_emotion is empty) | |
| obs = emotional_data.get("observation") or {} | |
| if obs.get("observed_valence", 0) < -0.3: | |
| if "low_valence" not in signals: | |
| signals.append("low_valence") | |
| # Track recent sentiments | |
| self._recent_sentiments.append( | |
| "disengaged" if "disengaged" in signals | |
| else "overwhelmed" if "overwhelmed" in signals | |
| else "engaged" if "engaged" in signals | |
| else "neutral" | |
| ) | |
| # Keep last 5 | |
| self._recent_sentiments = self._recent_sentiments[-5:] | |
| # Determine overall load level | |
| recent_disengaged = sum(1 for s in self._recent_sentiments if s == "disengaged") | |
| recent_overwhelmed = sum(1 for s in self._recent_sentiments if s == "overwhelmed") | |
| if recent_overwhelmed >= 2 or (overwhelm_threshold < 0.3 and "overwhelmed" in signals): | |
| load_level = "high" | |
| coaching_readiness = "not_ready" | |
| elif recent_disengaged >= 2 or "disengaged" in signals: | |
| load_level = "low_engagement" | |
| coaching_readiness = "not_ready" | |
| elif "off_topic" in signals or "deflection" in signals: | |
| load_level = "redirected" | |
| coaching_readiness = "not_ready" | |
| elif "emotional_distress" in signals or "low_valence" in signals: | |
| load_level = "emotionally_vulnerable" | |
| coaching_readiness = "not_ready" | |
| elif "engaged" in signals: | |
| load_level = "optimal" | |
| coaching_readiness = "ready" | |
| else: | |
| load_level = "moderate" | |
| coaching_readiness = "open" | |
| return { | |
| "level": load_level, | |
| "coaching_readiness": coaching_readiness, | |
| "signals": signals, | |
| "overwhelm_threshold": overwhelm_threshold, | |
| "autonomy_sensitivity": autonomy, | |
| } | |
| def _track_conversation(self, role: str, content: str) -> None: | |
| """Add a message to conversation history for LLM context.""" | |
| if not content: | |
| return | |
| # Warn on consecutive same-role duplicate messages | |
| if (self.conversation_history | |
| and self.conversation_history[-1]["role"] == role | |
| and self.conversation_history[-1]["content"] == content): | |
| logger.warning(f"[Track] Skipping duplicate {role} message: '{content[:60]}...'") | |
| return | |
| self.conversation_history.append({"role": role, "content": content}) | |
| # ------------------------------------------------------------------------- | |
| # PREDICT-OBSERVE-UPDATE — Circumplex Emotional Inference | |
| # ------------------------------------------------------------------------- | |
| def _predict_emotion(self) -> EmotionalPrediction: | |
| """ | |
| PREDICT the user's emotional state before observing their text. | |
| Uses: | |
| - Belief entropy → arousal (how uncertain is the model about the user?) | |
| - ToM felt cost + acceptance probability → valence | |
| - Reliability → confidence gating | |
| This follows Pattisapu & Albarracin (2024): | |
| - Arousal = H[Q(s|o)] = posterior entropy | |
| - Valence = U - EU = reward prediction error | |
| """ | |
| # Compute belief entropies across skill factors | |
| belief_entropies = {} | |
| for skill in SKILL_FACTORS: | |
| if skill in self.beliefs: | |
| belief_entropies[skill] = compute_belief_entropy(self.beliefs[skill]) | |
| # Get ToM predictions for felt cost | |
| if self.current_intervention: | |
| pred = self.tom.predict_response_gated(self.current_intervention.to_dict()) | |
| tom_felt_cost = pred.get("predicted_felt_cost", 0.3) | |
| tom_p_accept = pred.get("p_accept", 0.5) | |
| else: | |
| tom_felt_cost = 0.3 | |
| tom_p_accept = 0.5 | |
| reliability = self.tom.reliability | |
| prediction = self.emotion.predict( | |
| belief_entropies=belief_entropies, | |
| tom_felt_cost=tom_felt_cost, | |
| tom_p_accept=tom_p_accept, | |
| reliability=reliability, | |
| ) | |
| self._last_prediction = prediction | |
| return prediction | |
| def _observe_emotion(self, user_text: str) -> EmotionalObservation: | |
| """ | |
| OBSERVE the user's emotional state by classifying their text. | |
| Uses LLM classification when available, falls back to heuristic. | |
| The classification becomes a formal POMDP observation. | |
| """ | |
| classifier = self.classifier | |
| if classifier is not None: | |
| # Use LLM for accurate emotional observation | |
| context = f"Phase: {self.phase}" | |
| if self.target_skill: | |
| context += f", Target skill: {self.target_skill}" | |
| if self.current_intervention: | |
| context += f", Current suggestion: {self.current_intervention.description}" | |
| try: | |
| result = classifier.classify_emotion(user_text, context=context) | |
| except Exception: | |
| result = classifier.classify_emotion_heuristic(user_text) | |
| else: | |
| # Heuristic fallback — no LLM available | |
| result = self._classify_emotion_heuristic(user_text) | |
| observation = self.emotion.observe( | |
| valence_idx=result.get("valence_idx", 2), | |
| arousal_idx=result.get("arousal_idx", 2), | |
| raw_classification=result, | |
| ) | |
| self._last_observation = observation | |
| return observation | |
| def _update_from_emotion_error( | |
| self, | |
| prediction: EmotionalPrediction, | |
| observation: EmotionalObservation, | |
| ) -> PredictionError: | |
| """ | |
| UPDATE beliefs using the prediction error between predicted | |
| and observed emotional state. | |
| This is the core of the predict-observe-update loop: | |
| 1. Bayesian update of emotional beliefs (A-matrix in EmotionEngine) | |
| 2. Prediction error used to soft-update ToM particles | |
| - Large errors → ToM was wrong → adjust user type particles | |
| - Small errors → ToM is accurate → increase confidence | |
| From the empathy project: | |
| - Prediction error is the accuracy gate signal | |
| - When ToM predicts "calm" but user is "angry" → ToM needs calibration | |
| """ | |
| error = self.emotion.update(prediction, observation) | |
| self._last_error = error | |
| # Use prediction error to soft-update ToM particles | |
| # Large valence error → our model of what makes them feel good/bad is wrong | |
| # Large arousal error → our model of their uncertainty/engagement is wrong | |
| self._update_tom_from_emotion_error(error, observation) | |
| return error | |
| def _update_tom_from_emotion_error( | |
| self, | |
| error: PredictionError, | |
| observation: EmotionalObservation, | |
| ) -> None: | |
| """ | |
| Use emotional prediction error to update ToM particle filter. | |
| When our emotional prediction is wrong, it tells us something | |
| about the user's type: | |
| - We predicted calm but they're stressed → overwhelm_threshold lower than thought | |
| - We predicted positive but they're frustrated → autonomy higher than thought | |
| - We predicted engaged but they're bored → novelty_seeking higher than thought | |
| The update strength scales with the magnitude of the error. | |
| """ | |
| if error.magnitude < 0.1: | |
| # Small error — prediction was close, no update needed | |
| return | |
| # Scale update strength by error magnitude (capped at 0.2) | |
| strength = min(error.magnitude * 0.15, 0.2) | |
| # Determine which ToM dimensions to adjust based on the emotional observation | |
| observed_emotion = observation.observed_emotion | |
| valence_error = error.valence_error # positive = user happier than predicted | |
| arousal_error = error.arousal_error # positive = user more aroused than predicted | |
| # Dimension indices in particle space: | |
| # 0: avoids_evaluation, 1: hates_long_tasks, 2: novelty_seeking | |
| # 3: structure_preference, 4: external_validation, 5: autonomy_sensitivity | |
| # 6: overwhelm_threshold | |
| biases = [] | |
| # Negative valence surprise: user is more negative than predicted | |
| if valence_error < -0.2: | |
| biases.append((6, -1, strength)) # Lower overwhelm threshold | |
| biases.append((0, 1, strength * 0.5)) # Higher evaluation avoidance | |
| # Positive valence surprise: user is happier than predicted | |
| if valence_error > 0.2: | |
| biases.append((6, 1, strength * 0.5)) # Higher overwhelm threshold | |
| # High arousal surprise: user is more activated than predicted | |
| if arousal_error > 0.2: | |
| if observation.observed_valence < 0: | |
| # Negative + high arousal = angry/anxious → autonomy sensitivity | |
| biases.append((5, 1, strength)) | |
| else: | |
| # Positive + high arousal = excited → novelty seeking | |
| biases.append((2, 1, strength * 0.5)) | |
| # Low arousal surprise: user is less activated than predicted | |
| if arousal_error < -0.2: | |
| if observation.observed_valence < 0: | |
| # Negative + low arousal = bored/depressed → novelty seeking | |
| biases.append((2, 1, strength)) | |
| biases.append((1, 1, strength * 0.5)) # hates long tasks | |
| else: | |
| # Positive + low arousal = calm/relaxed → structure preference | |
| biases.append((3, 1, strength * 0.3)) | |
| # Apply biases to particles | |
| for dim_idx, direction, bias_strength in biases: | |
| for j in range(self.tom.n_particles): | |
| particle_val = self.tom.particle_params[j, dim_idx] | |
| if direction > 0: | |
| likelihood = 0.5 + bias_strength * particle_val | |
| else: | |
| likelihood = 0.5 + bias_strength * (1.0 - particle_val) | |
| self.tom.particle_weights[j] *= likelihood | |
| # Renormalize | |
| total = np.sum(self.tom.particle_weights) | |
| if total > 0: | |
| self.tom.particle_weights /= total | |
| else: | |
| self.tom.particle_weights = np.ones(self.tom.n_particles) / self.tom.n_particles | |
| # Invalidate caches | |
| self.tom._reliability_cache = None | |
| self.tom._confidence_cache = None | |
| def _classify_emotion_heuristic(self, user_text: str) -> Dict[str, Any]: | |
| """ | |
| Heuristic emotion classification — fallback when LLM is unavailable. | |
| Uses keyword matching to estimate valence and arousal. | |
| """ | |
| lower = user_text.lower() | |
| neg_words = [ | |
| "stressed", "anxious", "worried", "frustrated", "stuck", | |
| "hopeless", "lost", "confused", "scared", "tired", | |
| "burned out", "overwhelmed", "sad", "angry", "annoyed", | |
| "hate", "terrible", "awful", "bad", "depressed", | |
| "bored", "boring", "meh", "ugh", "down", "horrible", | |
| "pointless", "miserable", "give up", "sucks", "worst", | |
| "painful", "dread", "fail", "broken", "nothing works", | |
| ] | |
| pos_words = [ | |
| "good", "great", "happy", "excited", "curious", | |
| "interesting", "love", "amazing", "better", "hopeful", | |
| "motivated", "ready", "clear", "peaceful", "calm", | |
| "relaxed", "grateful", "proud", "confident", "wonderful", | |
| "fantastic", "awesome", "brilliant", "beautiful", | |
| "progress", "getting it", "starting to", | |
| ] | |
| high_arousal = [ | |
| "!", "stressed", "anxious", "excited", "angry", | |
| "can't believe", "urgent", "help", "panic", "scared", | |
| "amazing", "overwhelmed", "furious", "thrilled", | |
| "panicking", "intense", "?!", "!!", | |
| ] | |
| low_arousal = [ | |
| "bored", "tired", "calm", "peaceful", "relaxed", | |
| "meh", "whatever", "ok", "fine", "sleepy", | |
| "hopeless", "pointless", "numb", | |
| ] | |
| neg_c = sum(1 for w in neg_words if w in lower) | |
| pos_c = sum(1 for w in pos_words if w in lower) | |
| hi_c = sum(1 for w in high_arousal if w in lower) | |
| lo_c = sum(1 for w in low_arousal if w in lower) | |
| if len(user_text.strip()) < 10: | |
| lo_c += 1 | |
| # Valence | |
| if neg_c > pos_c + 1: | |
| v, vi = "very_negative", 0 | |
| elif neg_c > pos_c: | |
| v, vi = "negative", 1 | |
| elif pos_c > neg_c + 1: | |
| v, vi = "very_positive", 4 | |
| elif pos_c > neg_c: | |
| v, vi = "positive", 3 | |
| else: | |
| v, vi = "neutral", 2 | |
| # Arousal | |
| if hi_c > lo_c + 1: | |
| a, ai = "very_high", 4 | |
| elif hi_c > lo_c: | |
| a, ai = "high", 3 | |
| elif lo_c > hi_c + 1: | |
| a, ai = "very_low", 0 | |
| elif lo_c > hi_c: | |
| a, ai = "low", 1 | |
| else: | |
| a, ai = "moderate", 2 | |
| # Emotion label | |
| if vi >= 3 and ai >= 3: | |
| emotion = "excited" | |
| elif vi >= 3 and ai <= 1: | |
| emotion = "relaxed" | |
| elif vi <= 1 and ai >= 3: | |
| emotion = "angry" | |
| elif vi <= 1 and ai <= 1: | |
| emotion = "depressed" | |
| elif vi >= 3: | |
| emotion = "happy" | |
| elif vi <= 1: | |
| emotion = "sad" | |
| elif ai >= 3: | |
| emotion = "alert" | |
| else: | |
| emotion = "calm" | |
| return { | |
| "valence": v, "arousal": a, | |
| "valence_idx": vi, "arousal_idx": ai, | |
| "primary_emotion": emotion, | |
| "confidence": "medium", "emotional_cues": [], | |
| } | |
| def _run_emotional_inference(self, user_text: str) -> Dict[str, Any]: | |
| """ | |
| Run the full predict-observe-update loop for emotional inference. | |
| Returns a dict with prediction, observation, error, and current state. | |
| This is called each turn during post-calibration phases. | |
| """ | |
| # 1. PREDICT — before seeing the text | |
| prediction = self._predict_emotion() | |
| # 2. OBSERVE — classify the text | |
| observation = self._observe_emotion(user_text) | |
| # 3. UPDATE — compare and learn | |
| error = self._update_from_emotion_error(prediction, observation) | |
| return { | |
| "prediction": prediction.to_dict(), | |
| "observation": observation.to_dict(), | |
| "error": error.to_dict(), | |
| "current_emotion": self.emotion.get_current_emotion().to_dict() | |
| if self.emotion.get_current_emotion() else None, | |
| "emotional_beliefs": self.emotion.get_belief_state(), | |
| } | |
| def _run_batched_inference(self, user_text: str): | |
| """ | |
| Run emotion classification + profile extraction in a single LLM call. | |
| Returns (emotional_data, profile_data) where profile_data is the raw | |
| dict for store_extracted_data(), or None if batching wasn't possible. | |
| Falls back to the original separate emotion-only call if needed. | |
| """ | |
| classifier = self.classifier | |
| profile_data = None | |
| if classifier is not None: | |
| # Build context for the batched call | |
| context = f"Phase: {self.phase}" | |
| if self.target_skill: | |
| context += f", Target skill: {self.target_skill}" | |
| if self.current_intervention: | |
| context += f", Current suggestion: {self.current_intervention.description}" | |
| existing_summary = "" | |
| if self.profile.facts: | |
| existing_summary = "; ".join( | |
| f.content for f in self.profile.facts[-5:] | |
| ) | |
| try: | |
| result = classifier.classify_emotion_and_extract_profile( | |
| user_text, context=context, existing_summary=existing_summary, | |
| ) | |
| emotion_result = result["emotion"] | |
| profile_data = result["profile"] | |
| # Process emotion through predict-observe-update loop | |
| prediction = self._predict_emotion() | |
| observation = self.emotion.observe( | |
| valence_idx=emotion_result.get("valence_idx", 2), | |
| arousal_idx=emotion_result.get("arousal_idx", 2), | |
| raw_classification=emotion_result, | |
| ) | |
| self._last_observation = observation | |
| error = self._update_from_emotion_error(prediction, observation) | |
| emotional_data = { | |
| "prediction": prediction.to_dict(), | |
| "observation": observation.to_dict(), | |
| "error": error.to_dict(), | |
| "current_emotion": self.emotion.get_current_emotion().to_dict() | |
| if self.emotion.get_current_emotion() else None, | |
| "emotional_beliefs": self.emotion.get_belief_state(), | |
| } | |
| logger.info( | |
| "[Batched] emotion + profile in 1 LLM call " | |
| f"(emotion={emotion_result.get('primary_emotion')}, " | |
| f"profile_facts={len(profile_data.get('facts', []))})" | |
| ) | |
| return emotional_data, profile_data | |
| except Exception as e: | |
| logger.warning(f"[Batched] Failed ({e}), falling back to separate calls") | |
| # Fallback: original separate emotion inference | |
| emotional_data = self._run_emotional_inference(user_text) | |
| return emotional_data, None | |
| def start_session(self) -> Dict[str, Any]: | |
| """Initialize and return welcome message + first question.""" | |
| self.beliefs = self.model.get_initial_beliefs() | |
| self.phase = PHASE_CALIBRATION | |
| self.timestep = 0 | |
| self.asked_question_ids = [] | |
| self.history = [] | |
| self.conversation_history = [] | |
| self.tom.reset() | |
| self.emotion.reset() | |
| self._viz_turns = 0 | |
| self._planning_turns = 0 | |
| self._recent_sentiments = [] | |
| self._last_prediction = None | |
| self._last_observation = None | |
| self._last_error = None | |
| # Get first question (adaptive ordering) | |
| next_q = self._get_next_question() | |
| self.current_question = next_q | |
| # Track in conversation history | |
| self._track_conversation("assistant", WELCOME_MESSAGE) | |
| return { | |
| "phase": self.phase, | |
| "message": WELCOME_MESSAGE, | |
| "question": self._format_question(next_q) if next_q else None, | |
| "is_complete": False, | |
| } | |
| def step(self, user_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Process one step of the coaching session. | |
| Args: | |
| user_input: Dict with keys depending on phase: | |
| - calibration: {"answer": str, "answer_index": int (optional)} | |
| - planning/update: {"choice": "accept"|"too_hard"|"not_relevant"} | |
| - any phase: {"answer": str} for free-text chat | |
| Returns: | |
| Result dict with message, phase, question/intervention data, etc. | |
| """ | |
| self.timestep += 1 | |
| if self.phase == PHASE_CALIBRATION: | |
| return self._step_calibration(user_input) | |
| elif self.phase == PHASE_VISUALIZATION: | |
| return self._step_visualization(user_input) | |
| elif self.phase == PHASE_PLANNING: | |
| return self._step_planning(user_input) | |
| elif self.phase == PHASE_UPDATE: | |
| return self._step_update(user_input) | |
| elif self.phase == PHASE_COACHING: | |
| return self._step_coaching(user_input) | |
| else: | |
| return {"phase": PHASE_COMPLETE, "message": "Session complete.", "is_complete": True} | |
| def step_stream(self, user_input: Dict[str, Any]): | |
| """ | |
| Streaming version of step(). Yields SSE event dicts. | |
| Events: | |
| {"event": "metadata", "data": {phase, sphere_data, question, ...}} | |
| {"event": "token", "data": {"text": "chunk"}} (repeated) | |
| {"event": "done", "data": {}} | |
| """ | |
| self.timestep += 1 | |
| # Calibration: no LLM call, yield result directly | |
| if self.phase == PHASE_CALIBRATION: | |
| result = self._step_calibration(user_input) | |
| yield {"event": "metadata", "data": result} | |
| yield {"event": "done", "data": {}} | |
| return | |
| # Enable stream capture: _llm_generate will save context instead of calling LLM | |
| self._stream_capture = None | |
| self._stream_mode = True | |
| try: | |
| if self.phase == PHASE_VISUALIZATION: | |
| result = self._step_visualization(user_input) | |
| elif self.phase == PHASE_PLANNING: | |
| result = self._step_planning(user_input) | |
| elif self.phase == PHASE_UPDATE: | |
| result = self._step_update(user_input) | |
| elif self.phase == PHASE_COACHING: | |
| result = self._step_coaching(user_input) | |
| else: | |
| result = {"phase": PHASE_COMPLETE, "message": "Session complete.", "is_complete": True} | |
| finally: | |
| self._stream_mode = False | |
| captured = self._stream_capture | |
| self._stream_capture = None | |
| # Yield metadata (everything except message) | |
| template_msg = result.get("message", "") | |
| metadata = {k: v for k, v in result.items() if k != "message"} | |
| yield {"event": "metadata", "data": metadata} | |
| # Stream LLM response if context was captured | |
| if captured and self.generator: | |
| collected = [] | |
| for chunk in self.generator.generate_stream( | |
| user_message=captured["user_message"], | |
| conversation_history=captured["conversation_history"], | |
| phase=captured["phase"], | |
| belief_summary=captured["belief_summary"], | |
| tom_summary=captured["tom_summary"], | |
| cognitive_load=captured["cognitive_load"], | |
| target_skill=captured["target_skill"], | |
| current_intervention=captured["current_intervention"], | |
| accepted_interventions=captured["accepted_interventions"], | |
| profile_section=captured["profile_section"], | |
| ): | |
| collected.append(chunk) | |
| yield {"event": "token", "data": {"text": chunk}} | |
| full_text = "".join(collected) | |
| if full_text: | |
| # Replace template fallback in conversation history with real LLM response | |
| for i in range(len(self.conversation_history) - 1, -1, -1): | |
| if self.conversation_history[i]["role"] == "assistant": | |
| self.conversation_history[i]["content"] = full_text | |
| break | |
| else: | |
| # LLM returned nothing — use template message | |
| if template_msg: | |
| yield {"event": "token", "data": {"text": template_msg}} | |
| else: | |
| # No LLM available — yield template message at once | |
| if template_msg: | |
| yield {"event": "token", "data": {"text": template_msg}} | |
| yield {"event": "done", "data": {}} | |
| # ------------------------------------------------------------------------- | |
| # CALIBRATION PHASE | |
| # ------------------------------------------------------------------------- | |
| # Acknowledgment messages cycled during calibration | |
| _CALIBRATION_ACKS = [ | |
| "Got it.", | |
| "Thanks for sharing that.", | |
| "Understood.", | |
| "Okay, noted.", | |
| "That's helpful to know.", | |
| "Interesting — thanks.", | |
| "Appreciate your honesty.", | |
| "Good to know.", | |
| ] | |
| def _step_calibration(self, user_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process a calibration answer and return next question or transition.""" | |
| if self.current_question is None: | |
| return self._transition_to_visualization() | |
| q = self.current_question | |
| message_type = user_input.get("message_type", "text") | |
| # --- Handle MC questions --- | |
| if q.question_type == "mc": | |
| answer_idx = user_input.get("answer_index") | |
| if answer_idx is None: | |
| # Try to match text to option | |
| answer_text = user_input.get("answer", "") | |
| answer_idx = self._match_mc_answer(answer_text, q.options) | |
| # If we still have no valid answer, treat as conversational text | |
| if answer_idx is None: | |
| return { | |
| "phase": PHASE_CALIBRATION, | |
| "message": "No worries — just pick whichever option fits best.", | |
| "question": self._format_question(q), | |
| "progress": len(self.asked_question_ids) / self._max_calibration_questions, | |
| "is_complete": False, | |
| } | |
| if q.a_matrix is not None: | |
| # Update belief for this skill factor | |
| self.beliefs[q.category] = update_belief( | |
| self.beliefs[q.category], | |
| answer_idx, | |
| q.a_matrix, | |
| ) | |
| # Learn: refine the A-matrix from this observation | |
| self.learner.learn_from_observation( | |
| q.category, answer_idx, self.beliefs[q.category] | |
| ) | |
| else: | |
| # Free text — update multiple factors via classification result | |
| classified = user_input.get("classified", {}) | |
| skill_signals = classified.get("skill_signals", {}) | |
| for skill, signal in skill_signals.items(): | |
| if skill in self.beliefs: | |
| signal_map = {"very_low": 0, "low": 1, "medium": 2, "high": 3, "very_high": 4} | |
| obs_idx = signal_map.get(signal, 2) | |
| if skill in self.model.A: | |
| self.beliefs[skill] = update_belief( | |
| self.beliefs[skill], obs_idx, self.model.A[skill] | |
| ) | |
| # Learn: refine A-matrix | |
| self.learner.learn_from_observation( | |
| skill, obs_idx, self.beliefs[skill] | |
| ) | |
| # Update friction beliefs from classification | |
| overwhelm = classified.get("overwhelm_signal", "medium") | |
| overwhelm_map = {"low": 0, "medium": 1, "high": 2} | |
| if "overwhelm_sensitivity" in self.model.A: | |
| obs_idx = overwhelm_map.get(overwhelm, 1) | |
| self.beliefs["overwhelm_sensitivity"] = update_belief( | |
| self.beliefs["overwhelm_sensitivity"], | |
| obs_idx, | |
| self.model.A["overwhelm_sensitivity"], | |
| ) | |
| # Learn: refine friction observation model | |
| self.learner.learn_from_observation( | |
| "overwhelm_sensitivity", obs_idx, self.beliefs["overwhelm_sensitivity"] | |
| ) | |
| self.asked_question_ids.append(q.id) | |
| # Record in history | |
| self.history.append({ | |
| "timestep": self.timestep, | |
| "phase": PHASE_CALIBRATION, | |
| "question_id": q.id, | |
| "user_input": user_input, | |
| }) | |
| # Extract profile facts from calibration answers (especially free-text) | |
| cal_text = user_input.get("answer", "") | |
| if cal_text and len(cal_text) > 10: # Only extract from substantive answers | |
| self.profile.extract_and_store( | |
| user_text=cal_text, | |
| turn=self.timestep, | |
| classifier=self.classifier, | |
| context=f"Answering calibration Q: {q.question_text[:80]}", | |
| ) | |
| # Check if calibration is complete | |
| if len(self.asked_question_ids) >= self._max_calibration_questions: | |
| # Don't generate a separate ack — the sphere commentary will | |
| # acknowledge the last answer as part of its single LLM call. | |
| # Generating a separate ack causes two LLM responses to be | |
| # concatenated, producing the "double response" problem. | |
| user_text = user_input.get("answer", "") | |
| self._track_conversation("user", user_text) | |
| return self._transition_to_visualization() | |
| # Get next question | |
| next_q = self._get_next_question() | |
| if next_q is None: | |
| user_text = user_input.get("answer", "") | |
| self._track_conversation("user", user_text) | |
| return self._transition_to_visualization() | |
| self.current_question = next_q | |
| # Generate acknowledgment — LLM first, template fallback | |
| user_text = user_input.get("answer", "") | |
| ack = self._generate_calibration_ack(user_text, q) | |
| # Track in conversation history so LLM has context | |
| self._track_conversation("user", user_text) | |
| self._track_conversation("assistant", ack) | |
| return { | |
| "phase": PHASE_CALIBRATION, | |
| "message": ack, | |
| "question": self._format_question(next_q), | |
| "progress": len(self.asked_question_ids) / self._max_calibration_questions, | |
| "is_complete": False, | |
| } | |
| def _generate_calibration_ack(self, user_text: str, question: CalibrationQuestion) -> str: | |
| """ | |
| Generate a natural acknowledgment of the user's calibration answer. | |
| Uses LLM when available for context-aware responses (e.g., empathetic | |
| when user shares something emotional). Falls back to template cycling. | |
| """ | |
| gen = self.generator | |
| if gen is not None: | |
| logger.info(f"[LLM] Generating calibration ack for Q{len(self.asked_question_ids)} (user said: '{user_text[:50]}')") | |
| try: | |
| # Import here to avoid circular imports at module level | |
| from ..llm.generator import build_system_prompt | |
| system_prompt = build_system_prompt( | |
| phase="calibration", | |
| belief_summary=self.get_belief_summary(), | |
| ) | |
| # Give the LLM context about the question that was asked | |
| context_msg = f'[The user was asked: "{question.question_text}"]' | |
| if question.question_type == "mc" and question.options: | |
| context_msg += f'\n[Options were: {", ".join(question.options)}]' | |
| context_msg += f'\n[This is question {len(self.asked_question_ids)} of {self._max_calibration_questions}]' | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Add recent conversation for continuity | |
| for msg in self.conversation_history[-4:]: | |
| messages.append({"role": msg["role"], "content": msg["content"]}) | |
| messages.append({"role": "assistant", "content": context_msg}) | |
| messages.append({"role": "user", "content": user_text}) | |
| response = gen.client.chat_completion( | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=80, # Keep acks short | |
| model_override=None, # uses LLM_MODEL default | |
| ) | |
| if response and response.strip(): | |
| logger.info(f"[LLM] Calibration ack: '{response.strip()[:60]}...'") | |
| return response.strip() | |
| logger.warning("[LLM] Empty calibration ack from LLM") | |
| except Exception as e: | |
| logger.warning(f"[LLM] Calibration ack failed: {e}") | |
| else: | |
| logger.info("[LLM] No generator — using template calibration ack") | |
| # Template fallback | |
| ack_idx = (len(self.asked_question_ids) - 1) % len(self._CALIBRATION_ACKS) | |
| logger.info(f"[LLM] Template fallback ack: '{self._CALIBRATION_ACKS[ack_idx]}'") | |
| return self._CALIBRATION_ACKS[ack_idx] | |
| # ------------------------------------------------------------------------- | |
| # VISUALIZATION PHASE | |
| # ------------------------------------------------------------------------- | |
| def _transition_to_visualization(self) -> Dict[str, Any]: | |
| """Transition from calibration to visualization with personalized commentary.""" | |
| self.phase = PHASE_VISUALIZATION | |
| self._viz_turns = 0 | |
| sphere_data = self.get_sphere_data() | |
| # Single LLM call generates both acknowledgment of last answer | |
| # and sphere commentary together — avoids double-response problem | |
| commentary = self._generate_sphere_commentary() | |
| self._track_conversation("assistant", commentary) | |
| return { | |
| "phase": PHASE_VISUALIZATION, | |
| "message": commentary, | |
| "sphere_data": sphere_data, | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| def _step_visualization(self, user_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle user messages during sphere discussion, transition when ready.""" | |
| self._viz_turns += 1 | |
| user_text = user_input.get("answer", "").strip() | |
| # Run emotional inference | |
| emotional_data = self._run_emotional_inference(user_text) | |
| # Assess cognitive load / intent (with emotional data) | |
| cog_load = self._assess_cognitive_load(user_text, emotional_data=emotional_data) | |
| # Update cognitive model | |
| self._update_user_model_from_text(user_text) | |
| # Record history | |
| self.history.append({ | |
| "timestep": self.timestep, | |
| "phase": PHASE_VISUALIZATION, | |
| "user_input": user_input, | |
| "emotional_inference": emotional_data, | |
| }) | |
| # === Companion gate: be present when user needs it === | |
| if cog_load["coaching_readiness"] == "not_ready": | |
| logger.info(f"[Viz] Companion mode (signals={cog_load['signals']})") | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| response = llm_response or self._respond_to_sphere_reaction(user_text) | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_VISUALIZATION, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "efe_info": {"selected_action": "companion_chat", "override": "not_coaching_ready"}, | |
| "is_complete": False, | |
| } | |
| # === Decide action FIRST, then make one LLM call === | |
| lower = user_text.lower() | |
| wants_coaching = any(w in lower for w in [ | |
| "suggest", "what should", "help me", "what can i do", | |
| "let's work", "what next", "show me", "i want to improve", | |
| "what do you recommend", "first step", | |
| ]) | |
| # Determine whether we're transitioning to planning | |
| should_transition = False | |
| efe_info = None | |
| if wants_coaching: | |
| should_transition = True | |
| efe_info = {"selected_action": "propose_intervention", "override": "explicit_request"} | |
| elif self._viz_turns >= 2: | |
| emotion_error_mag = emotional_data.get("error", {}).get("magnitude", 0.0) | |
| valence_belief = self.emotion.belief_valence | |
| action_idx, action_name, efe_info = select_coaching_action( | |
| beliefs=self.beliefs, | |
| model=self.model, | |
| phase="visualization", | |
| timestep=self.timestep, | |
| tom_reliability=self.tom.reliability, | |
| empathy_planner=self.empathy, | |
| tom_filter=self.tom, | |
| target_skill=self.target_skill, | |
| current_intervention=self.current_intervention, | |
| emotion_prediction_error=emotion_error_mag, | |
| emotion_valence_belief=valence_belief, | |
| ) | |
| propose_prob = efe_info["action_probabilities"].get("propose_intervention", 0) | |
| if action_name == "propose_intervention" and propose_prob > 0.45: | |
| should_transition = True | |
| if should_transition: | |
| # Transition to planning — _generate_plan_message makes the single LLM call | |
| self._track_conversation("user", user_text) | |
| result = self._transition_to_planning() | |
| result["efe_info"] = efe_info or {"selected_action": "propose_intervention"} | |
| self._track_conversation("assistant", result["message"]) | |
| return result | |
| # Stay in visualization — one LLM call for conversational response | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| response = llm_response or self._respond_to_sphere_reaction(user_text) | |
| self._track_conversation("assistant", response) | |
| if efe_info is None: | |
| efe_info = {"selected_action": "ask_free_text", "phase": "visualization"} | |
| return { | |
| "phase": PHASE_VISUALIZATION, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "efe_info": efe_info, | |
| "is_complete": False, | |
| } | |
| # ------------------------------------------------------------------------- | |
| # PLANNING PHASE | |
| # ------------------------------------------------------------------------- | |
| def _transition_to_planning(self) -> Dict[str, Any]: | |
| """Transition to planning: propose first intervention using EFE to select skill.""" | |
| self.phase = PHASE_PLANNING | |
| self._planning_turns = 0 | |
| # Use EFE to select the best skill to target from top candidates | |
| impact_ranking = self.dep_graph.compute_impact_ranking( | |
| {k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS} | |
| ) | |
| top_skills = [s for s, _ in impact_ranking[:3]] if impact_ranking else SKILL_FACTORS[:1] | |
| # Evaluate EFE of propose_intervention targeting each candidate skill | |
| best_skill = top_skills[0] | |
| best_G = float("inf") | |
| for skill in top_skills: | |
| G = compute_efe_all_factors( | |
| self.beliefs, self.model, 3, # action = propose_intervention | |
| relevant_factors=[skill], | |
| lambda_epist=0.5, | |
| ) | |
| # Blend with empathy: predict felt cost for gentle intervention | |
| gentle_list = get_interventions_for_skill(skill, "gentle") | |
| if gentle_list: | |
| pred = self.tom.predict_response_gated(gentle_list[0].to_dict()) | |
| G_social = self.empathy.compute_blended_efe( | |
| G, pred["predicted_felt_cost"], self.tom.reliability | |
| ) | |
| else: | |
| G_social = G | |
| if G_social < best_G: | |
| best_G = G_social | |
| best_skill = skill | |
| self.target_skill = best_skill | |
| # Get gentle/push pair for counterfactual | |
| gentle, push = get_gentle_push_pair(self.target_skill) | |
| # Compute counterfactual | |
| counterfactual = self.empathy.compute_counterfactual( | |
| gentle.to_dict(), push.to_dict(), self.tom | |
| ) | |
| # Select the recommended intervention | |
| gentle_pred = self.tom.predict_response_gated(gentle.to_dict()) | |
| push_pred = self.tom.predict_response_gated(push.to_dict()) | |
| if gentle_pred["p_accept"] >= push_pred["p_accept"]: | |
| self.current_intervention = gentle | |
| else: | |
| self.current_intervention = push | |
| # Generate personalized planning message | |
| plan_message = self._generate_plan_message() | |
| self._track_conversation("assistant", plan_message) | |
| return { | |
| "phase": PHASE_PLANNING, | |
| "message": plan_message, | |
| "intervention": { | |
| "description": self.current_intervention.description, | |
| "target_skill": self.current_intervention.target_skill, | |
| "duration_minutes": self.current_intervention.duration_minutes, | |
| "difficulty": self.current_intervention.difficulty, | |
| }, | |
| "counterfactual": counterfactual, | |
| "target_skill": self.target_skill, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| def _step_planning(self, user_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """User responds to proposed intervention — either a choice or free text.""" | |
| self._planning_turns += 1 | |
| choice = user_input.get("choice") | |
| user_text = user_input.get("answer", "").strip() | |
| # If they clicked a choice button, handle it | |
| if choice: | |
| self._track_conversation("user", choice) | |
| result = self._process_choice(choice) | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| # === EMOTIONAL INFERENCE (runs every turn) === | |
| emotional_data = self._run_emotional_inference(user_text) | |
| cog_load = self._assess_cognitive_load(user_text, emotional_data=emotional_data) | |
| # Update cognitive model | |
| self._update_user_model_from_text(user_text) | |
| # === Companion gate: be present when user needs it === | |
| if cog_load["coaching_readiness"] == "not_ready": | |
| logger.info(f"[Planning] Companion mode (signals={cog_load['signals']})") | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| response = llm_response or self._generate_companion_response(user_text) | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_PLANNING, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": {"selected_action": "companion_chat", "override": "not_coaching_ready"}, | |
| "is_complete": False, | |
| } | |
| # Otherwise it's free text — have a conversation | |
| # NOTE: _respond_to_planning_chat may call _llm_generate, | |
| # so track user message AFTER to avoid duplicate user messages | |
| result = self._respond_to_planning_chat(user_text) | |
| result["emotional_state"] = emotional_data | |
| self._track_conversation("user", user_text) | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| # ------------------------------------------------------------------------- | |
| # UPDATE PHASE | |
| # ------------------------------------------------------------------------- | |
| def _step_update(self, user_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Process subsequent user choices or free-text in the update loop.""" | |
| choice = user_input.get("choice") | |
| user_text = user_input.get("answer", "").strip() | |
| if choice: | |
| self._track_conversation("user", choice) | |
| result = self._process_choice(choice) | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| # === EMOTIONAL INFERENCE (runs every turn) === | |
| emotional_data = self._run_emotional_inference(user_text) | |
| cog_load = self._assess_cognitive_load(user_text, emotional_data=emotional_data) | |
| # === Companion gate: be present when user needs it === | |
| if cog_load["coaching_readiness"] == "not_ready": | |
| logger.info(f"[Update] Companion mode (signals={cog_load['signals']})") | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| response = llm_response or self._generate_companion_response(user_text) | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_UPDATE, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": {"selected_action": "companion_chat", "override": "not_coaching_ready"}, | |
| "is_complete": False, | |
| } | |
| # Free text during update phase | |
| # NOTE: _respond_to_update_chat calls _llm_generate, | |
| # so track user message AFTER to avoid duplicate user messages | |
| result = self._respond_to_update_chat(user_text) | |
| result["emotional_state"] = emotional_data | |
| self._track_conversation("user", user_text) | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| def _process_choice(self, choice: str) -> Dict[str, Any]: | |
| """Process a user choice and update beliefs.""" | |
| self.phase = PHASE_UPDATE | |
| # Map choice to observation index | |
| choice_map = {"accept": 0, "too_hard": 1, "not_relevant": 2} | |
| choice_idx = choice_map.get(choice, 0) | |
| # Update ToM particle filter | |
| intervention_dict = ( | |
| self.current_intervention.to_dict() | |
| if self.current_intervention | |
| else {"difficulty": 0.3, "duration_minutes": 5} | |
| ) | |
| tom_stats = self.tom.update_weights(choice_idx, intervention_dict) | |
| # Update friction beliefs | |
| if "overwhelm_sensitivity" in self.model.A: | |
| belief_before = self.beliefs["overwhelm_sensitivity"].copy() | |
| self.beliefs["overwhelm_sensitivity"] = update_belief( | |
| self.beliefs["overwhelm_sensitivity"], | |
| choice_idx, | |
| self.model.A["overwhelm_sensitivity"], | |
| ) | |
| # Learn: refine friction observation model + transition model | |
| self.learner.learn_from_observation( | |
| "overwhelm_sensitivity", choice_idx, self.beliefs["overwhelm_sensitivity"] | |
| ) | |
| self.learner.learn_from_transition( | |
| "overwhelm_sensitivity", choice_idx, belief_before, self.beliefs["overwhelm_sensitivity"] | |
| ) | |
| # Record history | |
| self.history.append({ | |
| "timestep": self.timestep, | |
| "phase": PHASE_UPDATE, | |
| "choice": choice, | |
| "tom_stats": tom_stats, | |
| }) | |
| if choice == "accept": | |
| # Track the accepted intervention | |
| if self.current_intervention: | |
| self._accepted_interventions.append(self.current_intervention.to_dict()) | |
| # Generate personalized encouragement and transition to coaching | |
| message = self._generate_acceptance_message() | |
| return self._transition_to_coaching(message, tom_stats) | |
| else: | |
| # User rejected — adapt and propose alternative | |
| return self._propose_alternative(choice, tom_stats) | |
| def _propose_alternative( | |
| self, rejection_reason: str, tom_stats: Dict | |
| ) -> Dict[str, Any]: | |
| """Propose an adjusted intervention after rejection, using EFE+ToM to select.""" | |
| if self.target_skill is None: | |
| self.target_skill = SKILL_FACTORS[0] | |
| # Use EFE to find the best alternative across candidate (skill, intervention) pairs | |
| impact_ranking = self.dep_graph.compute_impact_ranking( | |
| {k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS} | |
| ) | |
| candidates = [] | |
| for skill, _ in (impact_ranking[:4] if impact_ranking else [(SKILL_FACTORS[0], 0)]): | |
| # For "too_hard", always use gentle; for "not_relevant", try different skills | |
| if rejection_reason == "too_hard" and skill == self.target_skill: | |
| ivs = get_interventions_for_skill(skill, "gentle") | |
| elif rejection_reason == "not_relevant" and skill == self.target_skill: | |
| continue # skip the rejected skill | |
| else: | |
| ivs = get_interventions_for_skill(skill, "gentle") | |
| for iv in ivs[:2]: # consider up to 2 interventions per skill | |
| G = compute_efe_all_factors( | |
| self.beliefs, self.model, 3, | |
| relevant_factors=[skill], lambda_epist=0.5, | |
| ) | |
| pred = self.tom.predict_response_gated(iv.to_dict()) | |
| G_social = self.empathy.compute_blended_efe( | |
| G, pred["predicted_felt_cost"], self.tom.reliability | |
| ) | |
| candidates.append((skill, iv, G_social)) | |
| if candidates: | |
| candidates.sort(key=lambda x: x[2]) | |
| best_skill, best_iv, _ = candidates[0] | |
| self.target_skill = best_skill | |
| self.current_intervention = best_iv | |
| else: | |
| # Fallback: original hardcoded logic | |
| if rejection_reason == "too_hard": | |
| ivs = get_interventions_for_skill(self.target_skill, "gentle") | |
| if ivs: | |
| self.current_intervention = ivs[0] | |
| # Compute new counterfactual | |
| gentle, push = get_gentle_push_pair(self.target_skill) | |
| counterfactual = self.empathy.compute_counterfactual( | |
| gentle.to_dict(), push.to_dict(), self.tom | |
| ) | |
| # Generate personalized adaptation message | |
| adaptation_message = self._generate_adaptation_message(rejection_reason) | |
| return { | |
| "phase": PHASE_UPDATE, | |
| "message": adaptation_message, | |
| "intervention": { | |
| "description": self.current_intervention.description if self.current_intervention else "", | |
| "target_skill": self.target_skill, | |
| "duration_minutes": self.current_intervention.duration_minutes if self.current_intervention else 2, | |
| "difficulty": self.current_intervention.difficulty if self.current_intervention else 0.1, | |
| }, | |
| "counterfactual": counterfactual, | |
| "tom_stats": tom_stats, | |
| "user_type_summary": self.tom.get_user_type_summary(), | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| # ------------------------------------------------------------------------- | |
| # COACHING PHASE | |
| # ------------------------------------------------------------------------- | |
| def _transition_to_coaching(self, acceptance_message: str, tom_stats: Dict = None) -> Dict[str, Any]: | |
| """Transition to ongoing coaching after intervention acceptance.""" | |
| self.phase = PHASE_COACHING | |
| self._coaching_turns = 0 | |
| # Generate a coaching follow-up — a probing question about the target skill | |
| probe = self._get_next_probe() | |
| if probe: | |
| coaching_message = acceptance_message + "\n\n" + probe | |
| else: | |
| coaching_message = acceptance_message + ( | |
| "\n\nNow that you have a concrete step, let's dig a bit deeper. " | |
| "Tell me more about what's going on for you right now." | |
| ) | |
| result = { | |
| "phase": PHASE_COACHING, | |
| "message": coaching_message, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| if tom_stats: | |
| result["tom_stats"] = tom_stats | |
| return result | |
| def _step_coaching(self, user_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Handle the ongoing coaching conversation using EFE-driven action selection.""" | |
| self._coaching_turns += 1 | |
| user_text = user_input.get("answer", "").strip() | |
| choice = user_input.get("choice") | |
| # === BATCHED INFERENCE: emotion + profile in one LLM call === | |
| emotional_data, profile_extracted = self._run_batched_inference(user_text) | |
| # Update the cognitive model from this message | |
| self._update_user_model_from_text(user_text, profile_data=profile_extracted) | |
| # Record history with emotional data | |
| self.history.append({ | |
| "timestep": self.timestep, | |
| "phase": PHASE_COACHING, | |
| "user_input": user_input, | |
| "emotional_inference": emotional_data, | |
| }) | |
| # === Keyword overrides: explicit stop always honored === | |
| lower = user_text.lower() | |
| wants_to_stop = any(w in lower for w in [ | |
| "done", "that's enough", "let's stop", "end session", | |
| "i'm good", "goodbye", "wrap up", "finish", | |
| ]) | |
| if wants_to_stop: | |
| self._track_conversation("user", user_text) | |
| result = self._end_session() | |
| result["efe_info"] = {"selected_action": "end_session", "override": "explicit_request"} | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| # === Override: explicit request for more action === | |
| wants_more_action = any(w in lower for w in [ | |
| "another step", "next step", "what else", | |
| "give me something", "another exercise", "what now", | |
| ]) | |
| cog_load = self._assess_cognitive_load(user_text, emotional_data=emotional_data) | |
| if wants_more_action and cog_load["coaching_readiness"] != "not_ready": | |
| self._track_conversation("user", user_text) | |
| result = self._propose_next_coaching_step(cognitive_load=cog_load) | |
| result["efe_info"] = {"selected_action": "propose_intervention", "override": "explicit_request"} | |
| result["emotional_state"] = emotional_data | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| # === Companion mode: respect off-topic and deflection === | |
| if cog_load["coaching_readiness"] == "not_ready": | |
| # User is off-topic, deflecting, or disengaged — be a companion | |
| logger.info(f"[Coaching] Companion mode triggered (signals={cog_load['signals']}, readiness={cog_load['coaching_readiness']})") | |
| # NOTE: _llm_generate MUST be called BEFORE _track_conversation | |
| # because the generator also appends user_message to the messages list. | |
| # Tracking first would cause the user message to appear twice. | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| response = llm_response or self._generate_companion_response(user_text) | |
| logger.info(f"[Coaching] Companion response (llm={'yes' if llm_response else 'template'}, len={len(response)})") | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_COACHING, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": {"selected_action": "companion_chat", "override": "not_coaching_ready"}, | |
| "is_complete": False, | |
| } | |
| # === EFE-driven action selection === | |
| emotion_error_mag = emotional_data.get("error", {}).get("magnitude", 0.0) | |
| valence_belief = self.emotion.belief_valence | |
| action_idx, action_name, efe_info = select_coaching_action( | |
| beliefs=self.beliefs, | |
| model=self.model, | |
| phase="coaching", | |
| timestep=self.timestep, | |
| tom_reliability=self.tom.reliability, | |
| empathy_planner=self.empathy, | |
| tom_filter=self.tom, | |
| target_skill=self.target_skill, | |
| current_intervention=self.current_intervention, | |
| emotion_prediction_error=emotion_error_mag, | |
| emotion_valence_belief=valence_belief, | |
| ) | |
| # Dispatch based on EFE-selected action | |
| logger.info(f"[Coaching] EFE selected: {action_name} (probs={efe_info.get('action_probabilities', {})})") | |
| if action_name == "propose_intervention": | |
| self._track_conversation("user", user_text) | |
| result = self._propose_next_coaching_step(cognitive_load=cog_load) | |
| result["efe_info"] = efe_info | |
| result["emotional_state"] = emotional_data | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| elif action_name == "end_session": | |
| self._track_conversation("user", user_text) | |
| result = self._end_session() | |
| result["efe_info"] = efe_info | |
| result["emotional_state"] = emotional_data | |
| self._track_conversation("assistant", result.get("message", "")) | |
| return result | |
| elif action_name == "safety_check": | |
| self._track_conversation("user", user_text) | |
| response = ( | |
| "I want to check in — how are you feeling about all this? " | |
| "Sometimes coaching conversations bring up a lot, and I want to make sure " | |
| "we're going at the right pace for you." | |
| ) | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_COACHING, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": efe_info, | |
| "is_complete": False, | |
| } | |
| elif action_name == "show_counterfactual": | |
| self._track_conversation("user", user_text) | |
| # Show counterfactual comparison for the current target skill | |
| gentle, push = get_gentle_push_pair(self.target_skill or SKILL_FACTORS[0]) | |
| cf = self.empathy.compute_counterfactual( | |
| gentle.to_dict(), push.to_dict(), self.tom | |
| ) | |
| cf_text = self.empathy.format_counterfactual_text(cf) | |
| llm_response = self._llm_generate( | |
| f"[SYSTEM: Present this counterfactual naturally: {cf_text}]", | |
| cognitive_load=cog_load, | |
| ) | |
| response = llm_response or ( | |
| f"Here's what my model predicts for two approaches:\n\n{cf_text}\n\n" | |
| "What feels more realistic for you?" | |
| ) | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_COACHING, | |
| "message": response, | |
| "counterfactual": cf, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": efe_info, | |
| "is_complete": False, | |
| } | |
| elif action_name == "reframe": | |
| # Reframing: use LLM with reframe context | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| if not llm_response: | |
| # Template reframe | |
| probe = self._get_next_probe() | |
| llm_response = ( | |
| "Let me offer a different way to look at this. " | |
| "What you're describing isn't a flaw — it's information about " | |
| "where the friction is. " | |
| ) | |
| if probe: | |
| llm_response += probe | |
| self._track_conversation("assistant", llm_response) | |
| return { | |
| "phase": PHASE_COACHING, | |
| "message": llm_response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": efe_info, | |
| "is_complete": False, | |
| } | |
| else: | |
| # Default: ask_free_text / adjust_difficulty — conversational response | |
| llm_response = self._llm_generate(user_text, cognitive_load=cog_load) | |
| self._track_conversation("user", user_text) | |
| response = llm_response or self._generate_coaching_response(user_text) | |
| self._track_conversation("assistant", response) | |
| return { | |
| "phase": PHASE_COACHING, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "emotional_state": emotional_data, | |
| "efe_info": efe_info, | |
| "is_complete": False, | |
| } | |
| def _get_next_probe(self) -> Optional[str]: | |
| """Get a probing question for the target skill that hasn't been asked yet.""" | |
| skill = self.target_skill or "focus" | |
| probes = COACHING_PROBES.get(skill, []) | |
| for probe in probes: | |
| if probe not in self._probes_asked: | |
| self._probes_asked.append(probe) | |
| return probe | |
| # Fall back to a different skill's probe | |
| sorted_skills = self._get_skill_scores_sorted() | |
| for skill_name, _ in sorted_skills: | |
| probes = COACHING_PROBES.get(skill_name, []) | |
| for probe in probes: | |
| if probe not in self._probes_asked: | |
| self._probes_asked.append(probe) | |
| return probe | |
| return None | |
| def _get_next_exercise(self) -> Optional[str]: | |
| """Get a coaching exercise for the target skill.""" | |
| skill = self.target_skill or "focus" | |
| exercises = COACHING_EXERCISES.get(skill, []) | |
| for ex in exercises: | |
| if ex not in self._exercises_given: | |
| self._exercises_given.append(ex) | |
| return ex | |
| # Fall back to a different skill | |
| sorted_skills = self._get_skill_scores_sorted() | |
| for skill_name, _ in sorted_skills: | |
| exercises = COACHING_EXERCISES.get(skill_name, []) | |
| for ex in exercises: | |
| if ex not in self._exercises_given: | |
| self._exercises_given.append(ex) | |
| return ex | |
| return None | |
| def _propose_next_coaching_step(self, cognitive_load: Optional[Dict] = None) -> Dict[str, Any]: | |
| """Propose the next coaching step via LLM (EFE already decided to propose).""" | |
| exercise = self._get_next_exercise() | |
| if exercise: | |
| sorted_skills = self._get_skill_scores_sorted() | |
| next_skill = sorted_skills[0][0] | |
| for skill_name, score in sorted_skills: | |
| if skill_name != self.target_skill: | |
| next_skill = skill_name | |
| break | |
| skill_label = self._label(next_skill) | |
| # Use LLM to present the exercise naturally | |
| system_hint = ( | |
| f"[SYSTEM: Propose this intervention naturally: " | |
| f"Target skill: {skill_label} ({self.model.get_skill_score(self.beliefs.get(next_skill, np.ones(5)/5)):.0f}/100). " | |
| f"Suggestion: \"{exercise}\". " | |
| f"Weave it into the conversation — don't just announce it. " | |
| f"Keep it to 2-3 sentences.]" | |
| ) | |
| llm_response = self._llm_generate(system_hint, cognitive_load=cognitive_load) | |
| message = llm_response or ( | |
| f"Here's something to try — this one targets your {skill_label}: " | |
| f"{exercise} How does that land?" | |
| ) | |
| else: | |
| # Exhausted exercises — probe instead | |
| probe = self._get_next_probe() | |
| message = probe or ( | |
| "We've covered a lot of ground. What feels like the most important thing " | |
| "you're taking away from this conversation?" | |
| ) | |
| return { | |
| "phase": PHASE_COACHING, | |
| "message": message, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| def _generate_coaching_response(self, user_text: str) -> str: | |
| """Generate a coaching response that probes deeper or offers insight.""" | |
| lower = user_text.lower() | |
| # Detect what the user is talking about | |
| is_emotional = any(w in lower for w in [ | |
| "stressed", "anxious", "worried", "frustrated", "stuck", | |
| "hopeless", "lost", "confused", "scared", "tired", | |
| "burned out", "overwhelmed", "depressed", "sad", | |
| ]) | |
| is_reflective = any(w in lower for w in [ | |
| "i think", "i realize", "i notice", "interesting", | |
| "never thought", "makes sense", "you're right", | |
| ]) | |
| is_asking = any(w in lower for w in [ | |
| "why", "how", "what should", "what do", "can you", | |
| "tell me", "explain", | |
| ]) | |
| mentions_work = any(w in lower for w in [ | |
| "work", "job", "career", "boss", "colleague", "deadline", | |
| "project", "meeting", | |
| ]) | |
| mentions_personal = any(w in lower for w in [ | |
| "family", "relationship", "friend", "partner", "home", | |
| "health", "sleep", "exercise", | |
| ]) | |
| if is_emotional: | |
| # Empathize first, then probe gently | |
| probe = self._get_next_probe() | |
| response = ( | |
| "I hear that, and I don't want to brush past it. What you're feeling " | |
| "is information — it tells us something about where the friction is. " | |
| ) | |
| if probe: | |
| response += f"Let me ask you this: {probe}" | |
| else: | |
| response += "What do you think is the root of that feeling?" | |
| return response | |
| elif is_reflective: | |
| # Reinforce the insight and build on it | |
| response = ( | |
| "That's a really useful observation. The ability to see your own patterns " | |
| "is exactly what makes change possible. " | |
| ) | |
| exercise = self._get_next_exercise() | |
| if exercise: | |
| response += ( | |
| f"Building on that insight, here's something you could try: {exercise}" | |
| ) | |
| else: | |
| probe = self._get_next_probe() | |
| if probe: | |
| response += f"Let's go deeper. {probe}" | |
| return response | |
| elif is_asking: | |
| # Answer based on the sphere data and ToM | |
| sorted_skills = self._get_skill_scores_sorted() | |
| weakest = sorted_skills[0] | |
| strongest = sorted_skills[-1] | |
| return ( | |
| f"Based on your patterns, your biggest opportunity is in " | |
| f"{self._label(weakest[0])} ({round(weakest[1])}/100). But your " | |
| f"{self._label(strongest[0])} ({round(strongest[1])}/100) shows you already " | |
| f"have real capacity. The question isn't whether you can change — you can. " | |
| f"It's about finding the right entry point and making it small enough to actually stick. " | |
| f"What specifically would you like to know more about?" | |
| ) | |
| elif mentions_work: | |
| # Probe into the work context | |
| probe = self._get_next_probe() | |
| response = ( | |
| "Work is often where these patterns show up most clearly. " | |
| "The stakes are higher, the structure is external, and the pressure is real. " | |
| ) | |
| if probe: | |
| response += probe | |
| else: | |
| response += ( | |
| "Can you tell me about a specific situation at work where you felt stuck or frustrated?" | |
| ) | |
| return response | |
| elif mentions_personal: | |
| # Acknowledge and explore | |
| response = ( | |
| "The personal side matters a lot — these patterns don't stay at work. " | |
| "They show up in relationships, health, everything. " | |
| ) | |
| probe = self._get_next_probe() | |
| if probe: | |
| response += probe | |
| else: | |
| response += "What's the connection you're seeing between this and your personal life?" | |
| return response | |
| else: | |
| # General coaching response — probe or exercise | |
| if self._coaching_turns % 3 == 0: | |
| # Every third turn, offer an exercise | |
| exercise = self._get_next_exercise() | |
| if exercise: | |
| return ( | |
| f"Thanks for sharing that. Here's something concrete you can do with that: " | |
| f"{exercise}\n\nBut I also want to understand you better. " | |
| f"What would change in your life if you got this right?" | |
| ) | |
| # Default: ask a probing question | |
| probe = self._get_next_probe() | |
| if probe: | |
| return ( | |
| f"That helps me understand where you're coming from. " | |
| f"Let me dig a bit deeper: {probe}" | |
| ) | |
| return ( | |
| "I appreciate you sharing that. Let's keep exploring — " | |
| "what feels like the most important thing right now? Is there something " | |
| "specific you'd like to work on, or should I suggest another step?" | |
| ) | |
| def _generate_companion_response(self, user_text: str) -> str: | |
| """Template fallback when user is off-topic and LLM is unavailable. | |
| Instead of forcing coaching, acknowledges what the user said and | |
| follows their lead — companion first, coach second. | |
| """ | |
| lower = user_text.lower() | |
| # Detect explicit deflection from coaching | |
| deflecting = any(w in lower for w in [ | |
| "don't want to talk about", "not want to talk", | |
| "something else", "never mind", "forget it", | |
| "not right now", "not now", "moving on", | |
| ]) | |
| if deflecting: | |
| return ( | |
| "No pressure at all — we can talk about whatever you want. " | |
| "What's on your mind?" | |
| ) | |
| # Detect overwhelm / disengagement | |
| overwhelmed = any(w in lower for w in [ | |
| "overwhelmed", "too much", "can't handle", "exhausted", | |
| ]) | |
| if overwhelmed: | |
| return ( | |
| "Hey, let's slow down. We don't have to push through anything right now. " | |
| "What would feel good to talk about instead?" | |
| ) | |
| # User brought up a non-coaching topic — engage with it | |
| return ( | |
| "I'm happy to chat about that! " | |
| "We can always come back to the other stuff whenever — or not. " | |
| "Tell me more." | |
| ) | |
| def _end_session(self) -> Dict[str, Any]: | |
| """End the coaching session with a summary.""" | |
| sorted_skills = self._get_skill_scores_sorted() | |
| weakest = self._label(sorted_skills[0][0]) | |
| strongest = self._label(sorted_skills[-1][0]) | |
| # Try LLM for a natural wrap-up | |
| llm_context = ( | |
| f"[SYSTEM: The user wants to end the session. " | |
| f"Strongest area: {strongest}. Growth area: {weakest}. " | |
| ) | |
| if self._accepted_interventions: | |
| last_step = self._accepted_interventions[-1].get("description", "") | |
| pred = self.tom.predict_response_gated(self._accepted_interventions[-1]) | |
| p_complete = pred.get("p_accept", 0.5) | |
| llm_context += ( | |
| f"They committed to: \"{last_step}\" " | |
| f"(predicted {round(p_complete * 100)}% follow-through). " | |
| ) | |
| llm_context += "Wrap up warmly and briefly. Reference things from the conversation.]" | |
| llm_response = self._llm_generate(llm_context) | |
| if not llm_response: | |
| # Template fallback | |
| parts = ["Let's wrap up. Here's what I'm taking away from our conversation:"] | |
| parts.append( | |
| f"Your biggest strength is {strongest} — that's your foundation. " | |
| f"Your biggest growth area is {weakest}." | |
| ) | |
| if self._accepted_interventions: | |
| last_step = self._accepted_interventions[-1].get("description", "") | |
| pred = self.tom.predict_response_gated(self._accepted_interventions[-1]) | |
| p_complete = pred.get("p_accept", 0.5) | |
| parts.append( | |
| f"Your committed step: \"{last_step}\" — " | |
| f"I predict a {round(p_complete * 100)}% chance you'll follow through." | |
| ) | |
| parts.append( | |
| "The fact that you showed up and went through this process tells me something important " | |
| "about you. You're not just thinking about change — you're doing something about it. " | |
| "Come back anytime to check in." | |
| ) | |
| llm_response = " ".join(parts) | |
| return { | |
| "phase": PHASE_COMPLETE, | |
| "message": llm_response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "user_type_summary": self.tom.get_user_type_summary(), | |
| "is_complete": True, | |
| } | |
| # ------------------------------------------------------------------------- | |
| # CONVERSATIONAL RESPONSE GENERATION | |
| # ------------------------------------------------------------------------- | |
| def _get_skill_scores_sorted(self) -> List[Tuple[str, float]]: | |
| """Get skill scores sorted low→high.""" | |
| scores = self.model.get_all_skill_scores(self.beliefs) | |
| return sorted(scores.items(), key=lambda x: x[1]) | |
| def _label(self, skill: str) -> str: | |
| """Human-readable skill name.""" | |
| return SKILL_LABELS.get(skill, skill.replace("_", " ").title()) | |
| def _generate_sphere_commentary(self) -> str: | |
| """Generate personalized commentary about the user's sphere.""" | |
| sorted_skills = self._get_skill_scores_sorted() | |
| weakest = sorted_skills[:2] | |
| strongest = sorted_skills[-2:] | |
| bottlenecks = self.dep_graph.find_bottlenecks( | |
| {k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS} | |
| ) | |
| # Try LLM first — give it the full sphere data to work with | |
| gen = self.generator | |
| logger.info(f"[LLM] Generating sphere commentary (generator={'available' if gen else 'None'})") | |
| if gen is not None: | |
| try: | |
| from ..llm.generator import build_system_prompt | |
| # Build a rich data summary for the LLM | |
| skill_lines = [] | |
| for skill_name, score in sorted_skills: | |
| skill_lines.append(f" - {self._label(skill_name)}: {round(score)}/100") | |
| sphere_context = "Here are the user's skill scores:\n" + "\n".join(skill_lines) | |
| if bottlenecks: | |
| bn = bottlenecks[0] | |
| blocker = self._label(bn["blocker"]) | |
| blocked_names = [self._label(b) for b in bn["blocked"][:2]] | |
| sphere_context += ( | |
| f"\n\nKey insight: {blocker} is a bottleneck — " | |
| f"it's holding back {' and '.join(blocked_names)}." | |
| ) | |
| system_prompt = build_system_prompt( | |
| phase="sphere_commentary", | |
| belief_summary=self.get_belief_summary(), | |
| tom_summary=self.tom.get_user_type_summary(), | |
| ) | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Include conversation history from calibration | |
| for msg in self.conversation_history[-6:]: | |
| messages.append({"role": msg["role"], "content": msg["content"]}) | |
| messages.append({ | |
| "role": "user", | |
| "content": ( | |
| f"[SYSTEM: First briefly acknowledge their last answer (1 sentence), then share " | |
| f"what the sphere shows — like a perceptive friend noticing patterns, not a report. " | |
| f"This should be ONE cohesive response, not two separate parts. " | |
| f"Data for reference (don't recite it all): {sphere_context}]" | |
| ), | |
| }) | |
| response = gen.client.chat_completion( | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=300, | |
| model_override=None, # uses LLM_MODEL default | |
| ) | |
| if response and response.strip(): | |
| logger.info(f"[LLM] Sphere commentary generated ({len(response)} chars)") | |
| return response.strip() | |
| logger.warning("[LLM] Empty sphere commentary from LLM") | |
| except Exception as e: | |
| logger.warning(f"[LLM] Sphere commentary failed: {e}") | |
| logger.info("[LLM] Using template sphere commentary") | |
| # Template fallback | |
| parts = [ | |
| "Here's your MindSphere — a snapshot of your patterns across eight areas." | |
| ] | |
| s1_name, s1_score = strongest[-1] | |
| s2_name, s2_score = strongest[-2] | |
| parts.append( | |
| f"Your strongest areas are {self._label(s1_name)} ({round(s1_score)}/100) " | |
| f"and {self._label(s2_name)} ({round(s2_score)}/100) — that's real foundation to build on." | |
| ) | |
| w1_name, w1_score = weakest[0] | |
| w2_name, w2_score = weakest[1] | |
| parts.append( | |
| f"The biggest dents are in {self._label(w1_name)} ({round(w1_score)}/100) " | |
| f"and {self._label(w2_name)} ({round(w2_score)}/100)." | |
| ) | |
| if bottlenecks: | |
| bn = bottlenecks[0] | |
| blocker = self._label(bn["blocker"]) | |
| blocked_names = [self._label(b) for b in bn["blocked"][:2]] | |
| parts.append( | |
| f"Interestingly, {blocker} is acting as a bottleneck — " | |
| f"it's holding back your {' and '.join(blocked_names)}. " | |
| f"That means improving it would have a ripple effect." | |
| ) | |
| parts.append( | |
| "Does this match how you see yourself? I'm curious what stands out to you." | |
| ) | |
| return " ".join(parts) | |
| def _respond_to_sphere_reaction(self, user_text: str) -> str: | |
| """Respond to the user's reaction to their sphere.""" | |
| lower = user_text.lower() | |
| # Detect sentiment/intent | |
| agrees = any(w in lower for w in [ | |
| "yes", "yeah", "accurate", "right", "true", "makes sense", | |
| "spot on", "correct", "agree", "that's me", "sounds right", | |
| ]) | |
| disagrees = any(w in lower for w in [ | |
| "no", "don't think", "disagree", "wrong", "not really", | |
| "doesn't seem", "inaccurate", "off", "surprised", | |
| ]) | |
| asks_why = any(w in lower for w in [ | |
| "why", "how come", "explain", "what does", "what do you mean", | |
| ]) | |
| mentions_stress = any(w in lower for w in [ | |
| "stress", "anxious", "overwhelm", "burned", "tired", | |
| "exhausted", "struggling", | |
| ]) | |
| mentions_focus = any(w in lower for w in [ | |
| "focus", "distract", "attention", "concentrate", | |
| ]) | |
| sorted_skills = self._get_skill_scores_sorted() | |
| weakest_name = self._label(sorted_skills[0][0]) | |
| weakest_score = round(sorted_skills[0][1]) | |
| if agrees: | |
| return ( | |
| f"Good — that self-awareness is genuinely useful. The fact that you can see " | |
| f"these patterns clearly means you're already ahead of where most people start. " | |
| f"Let me show you what I think would make the biggest difference right now." | |
| ) | |
| elif disagrees: | |
| # Acknowledge and show openness to being wrong | |
| return ( | |
| f"That's important feedback — my model is built from your answers, but you know " | |
| f"yourself better than ten questions can capture. Which part feels off? " | |
| f"I can adjust my understanding as we talk." | |
| ) | |
| elif asks_why: | |
| bottlenecks = self.dep_graph.find_bottlenecks( | |
| {k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS} | |
| ) | |
| if bottlenecks: | |
| bn = bottlenecks[0] | |
| blocker = self._label(bn["blocker"]) | |
| blocked = [self._label(b) for b in bn["blocked"][:2]] | |
| return ( | |
| f"The scores come from how you answered the calibration questions — " | |
| f"each answer shifts my estimate of where you sit on each skill. " | |
| f"The dependency analysis shows that {blocker} is a leverage point because " | |
| f"it feeds into {' and '.join(blocked)}. " | |
| f"Think of it like a supply chain — a bottleneck upstream affects everything downstream." | |
| ) | |
| return ( | |
| f"The scores come from your calibration answers — each one shifts my estimate " | |
| f"of where you sit across these eight dimensions. The dents show where " | |
| f"there's the most room for movement." | |
| ) | |
| elif mentions_stress: | |
| return ( | |
| f"Stress is real, and it touches a lot of these dimensions — especially " | |
| f"Emotional Regulation and Focus. The good news is that working on even one " | |
| f"of these can take pressure off the others. You mentioned being stressed before, " | |
| f"and I factored that into my model of you. Let me suggest something small " | |
| f"that might help." | |
| ) | |
| elif mentions_focus: | |
| focus_score = round(self.model.get_skill_score(self.beliefs.get("focus", self.model.D["focus"]))) | |
| return ( | |
| f"Focus came in at {focus_score}/100 — and from what you told me, that tracks. " | |
| f"The interesting thing is that focus isn't just about willpower. It often connects " | |
| f"to other patterns — like how clear your tasks are, or how you handle interruptions. " | |
| f"Let me show you what I think would help most." | |
| ) | |
| else: | |
| # Generic thoughtful response | |
| return ( | |
| f"Thanks for sharing that. Based on everything you've told me, " | |
| f"I think the most impactful place to start is {weakest_name} — " | |
| f"it's currently at {weakest_score}/100, and improving it would " | |
| f"unlock progress in other areas too. Want me to suggest a concrete first step?" | |
| ) | |
| def _generate_plan_message(self) -> str: | |
| """Generate personalized planning message for the first intervention.""" | |
| if not self.current_intervention or not self.target_skill: | |
| return PLAN_INTRO | |
| skill_label = self._label(self.target_skill) | |
| score = round(self.model.get_skill_score( | |
| self.beliefs.get(self.target_skill, self.model.D.get(self.target_skill, np.array([0.2]*5))) | |
| )) | |
| # Check ToM predictions to calibrate tone | |
| pred = self.tom.predict_response_gated(self.current_intervention.to_dict()) | |
| p_accept = pred.get("p_accept", 0.5) | |
| # Check if there's a dependency explanation | |
| bottlenecks = self.dep_graph.find_bottlenecks( | |
| {k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS} | |
| ) | |
| # Try LLM first | |
| llm_response = self._llm_generate( | |
| f"[SYSTEM: Propose this intervention naturally: " | |
| f"Target skill: {skill_label} ({score}/100). " | |
| f"Suggestion: \"{self.current_intervention.description}\" " | |
| f"({self.current_intervention.duration_minutes} min, " | |
| f"difficulty {self.current_intervention.difficulty}). " | |
| f"Predicted acceptance: {round(p_accept * 100)}%. " | |
| f"Weave it into conversation naturally.]" | |
| ) | |
| if llm_response: | |
| return llm_response | |
| # Template fallback | |
| parts = [] | |
| if bottlenecks and bottlenecks[0]["blocker"] == self.target_skill: | |
| blocked = [self._label(b) for b in bottlenecks[0]["blocked"][:2]] | |
| parts.append( | |
| f"I'm starting with {skill_label} because it's your biggest leverage point right now — " | |
| f"at {score}/100, it's holding back your {' and '.join(blocked)}." | |
| ) | |
| else: | |
| parts.append( | |
| f"I'm starting with {skill_label} ({score}/100) because " | |
| f"I think it's where a small change would make the biggest difference." | |
| ) | |
| parts.append( | |
| f'Here\'s what I have in mind: "{self.current_intervention.description}"' | |
| ) | |
| if p_accept < 0.4: | |
| parts.append( | |
| "I know this might feel like a stretch — and that's okay. " | |
| "It's designed to be small enough that you can try it without committing to anything big." | |
| ) | |
| elif p_accept > 0.7: | |
| parts.append( | |
| "I think this is right in your sweet spot — challenging enough to matter, " | |
| "small enough to actually happen." | |
| ) | |
| parts.append( | |
| "What do you think? You can also tell me if it feels too much or not relevant, " | |
| "and I'll adjust." | |
| ) | |
| return " ".join(parts) | |
| def _respond_to_planning_chat(self, user_text: str) -> Dict[str, Any]: | |
| """Respond to free-text during the planning phase.""" | |
| lower = user_text.lower() | |
| # Detect implicit acceptance or rejection | |
| positive = any(w in lower for w in [ | |
| "sure", "ok", "okay", "sounds good", "let's do it", "i'll try", | |
| "let's go", "yes", "yeah", "i can do that", "worth a shot", | |
| ]) | |
| negative = any(w in lower for w in [ | |
| "too hard", "too much", "can't", "won't work", "not for me", | |
| "impossible", "no way", "overwhelming", | |
| ]) | |
| irrelevant = any(w in lower for w in [ | |
| "not relevant", "doesn't apply", "not my issue", "wrong area", | |
| "not the problem", | |
| ]) | |
| if positive: | |
| return self._process_choice("accept") | |
| elif negative: | |
| return self._process_choice("too_hard") | |
| elif irrelevant: | |
| return self._process_choice("not_relevant") | |
| # For everything else (questions, off-topic, general chat) — use LLM | |
| llm_response = self._llm_generate(user_text) | |
| if llm_response: | |
| return { | |
| "phase": PHASE_PLANNING, | |
| "message": llm_response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| # Template fallback | |
| asks_question = any(w in lower for w in [ | |
| "why", "how", "what if", "explain", "tell me more", | |
| ]) | |
| if asks_question: | |
| response = self._explain_intervention() | |
| else: | |
| response = self._respond_general_chat(user_text, PHASE_PLANNING) | |
| return { | |
| "phase": PHASE_PLANNING, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| def _respond_to_update_chat(self, user_text: str) -> Dict[str, Any]: | |
| """Respond to free-text during the update phase.""" | |
| lower = user_text.lower() | |
| # Update cognitive model | |
| self._update_user_model_from_text(user_text) | |
| positive = any(w in lower for w in [ | |
| "sure", "ok", "okay", "sounds good", "i'll try", "yes", "yeah", | |
| ]) | |
| negative = any(w in lower for w in [ | |
| "too hard", "too much", "can't", "no", | |
| ]) | |
| irrelevant = any(w in lower for w in [ | |
| "not relevant", "doesn't apply", "wrong area", | |
| ]) | |
| if positive: | |
| return self._process_choice("accept") | |
| elif negative: | |
| return self._process_choice("too_hard") | |
| elif irrelevant: | |
| return self._process_choice("not_relevant") | |
| # Try LLM for natural conversation | |
| llm_response = self._llm_generate(user_text) | |
| response = llm_response or self._respond_general_chat(user_text, PHASE_UPDATE) | |
| return { | |
| "phase": PHASE_UPDATE, | |
| "message": response, | |
| "sphere_data": self.get_sphere_data(), | |
| "belief_summary": self.get_belief_summary(), | |
| "is_complete": False, | |
| } | |
| def _explain_intervention(self) -> str: | |
| """Explain why the current intervention was chosen.""" | |
| if not self.current_intervention or not self.target_skill: | |
| return "I chose this based on where I think a small change would make the biggest impact." | |
| skill_label = self._label(self.target_skill) | |
| score = round(self.model.get_skill_score( | |
| self.beliefs.get(self.target_skill, self.model.D.get(self.target_skill, np.array([0.2]*5))) | |
| )) | |
| # Check dependency | |
| bottlenecks = self.dep_graph.find_bottlenecks( | |
| {k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS} | |
| ) | |
| parts = [ | |
| f"{skill_label} is at {score}/100 in your sphere." | |
| ] | |
| if bottlenecks and bottlenecks[0]["blocker"] == self.target_skill: | |
| blocked = [self._label(b) for b in bottlenecks[0]["blocked"][:2]] | |
| parts.append( | |
| f"It's also a bottleneck — it's limiting your {' and '.join(blocked)}. " | |
| f"So improving it has a multiplier effect." | |
| ) | |
| # ToM insight | |
| pred = self.tom.predict_response_gated(self.current_intervention.to_dict()) | |
| p_accept = pred.get("p_accept", 0.5) | |
| parts.append( | |
| f"I picked this specific step because my model predicts about a " | |
| f"{round(p_accept * 100)}% chance you'll actually follow through on it — " | |
| f"and that matters more than ambition." | |
| ) | |
| parts.append("Does that make sense?") | |
| return " ".join(parts) | |
| def _generate_acceptance_message(self) -> str: | |
| """Generate encouragement when user accepts an intervention.""" | |
| if not self.current_intervention: | |
| return "Great choice. Let's see how it goes." | |
| skill_label = self._label(self.current_intervention.target_skill) | |
| duration = self.current_intervention.duration_minutes | |
| pred = self.tom.predict_response_gated(self.current_intervention.to_dict()) | |
| p_complete = pred.get("p_accept", 0.6) | |
| # Try LLM first | |
| llm_response = self._llm_generate( | |
| f"[SYSTEM: The user just accepted this step: " | |
| f"\"{self.current_intervention.description}\" " | |
| f"({round(duration)} min). " | |
| f"Predicted follow-through: {round(p_complete * 100)}%. " | |
| f"Encourage them naturally and transition into coaching conversation.]" | |
| ) | |
| if llm_response: | |
| return llm_response | |
| # Template fallback | |
| parts = [f"Good. {self.current_intervention.description}"] | |
| if duration <= 5: | |
| parts.append( | |
| f"It's only {round(duration)} minutes — the point isn't to change everything, " | |
| f"it's to prove to yourself that you can shift the pattern." | |
| ) | |
| else: | |
| parts.append( | |
| f"Block out {round(duration)} minutes for this. " | |
| f"You don't need to do it perfectly, just do it." | |
| ) | |
| parts.append( | |
| f"Based on what I know about you, I predict a {round(p_complete * 100)}% chance " | |
| f"you'll follow through. Let's check in next time and see how it went." | |
| ) | |
| return " ".join(parts) | |
| def _generate_adaptation_message(self, rejection_reason: str) -> str: | |
| """Generate personalized message when adapting after rejection.""" | |
| # Try LLM first | |
| if self.current_intervention: | |
| llm_response = self._llm_generate( | |
| f"[SYSTEM: The user rejected the previous suggestion as '{rejection_reason}'. " | |
| f"New suggestion: \"{self.current_intervention.description}\" " | |
| f"({round(self.current_intervention.duration_minutes)} min). " | |
| f"Acknowledge their feedback warmly and present the alternative naturally.]" | |
| ) | |
| if llm_response: | |
| return llm_response | |
| # Template fallback | |
| if rejection_reason == "too_hard": | |
| if self.current_intervention: | |
| return ( | |
| f"I hear you — that was too much. That's useful information for me. " | |
| f"Let me try something smaller: \"{self.current_intervention.description}\" " | |
| f"This should take about {round(self.current_intervention.duration_minutes)} minutes. " | |
| f"How does that feel?" | |
| ) | |
| return ( | |
| "I hear you — that was too much. Let me find something smaller " | |
| "that still moves the needle." | |
| ) | |
| elif rejection_reason == "not_relevant": | |
| skill_label = self._label(self.target_skill) if self.target_skill else "a different area" | |
| if self.current_intervention: | |
| return ( | |
| f"Fair enough — let me look at {skill_label} instead. " | |
| f"Here's what I have in mind: \"{self.current_intervention.description}\" " | |
| f"Does this connect better to what you're actually dealing with?" | |
| ) | |
| return ( | |
| f"Fair enough. Let me find something that connects better to what " | |
| f"actually matters to you." | |
| ) | |
| return "Let me try a different approach." | |
| # ------------------------------------------------------------------------- | |
| # COGNITIVE MODELING — continuous user model updates from conversation | |
| # ------------------------------------------------------------------------- | |
| def _update_user_model_from_text(self, user_text: str, profile_data=None) -> None: | |
| """ | |
| Update the cognitive model of the user from conversational signals. | |
| This goes beyond explicit choices — it infers emotional state, interests, | |
| engagement patterns, and updates the ToM particle filter accordingly. | |
| The agent is always building a richer picture of who this person is. | |
| Also extracts structured profile facts and updates the Bayesian network. | |
| Args: | |
| profile_data: Pre-extracted profile dict from batched LLM call. | |
| If provided, skips the separate LLM extraction call. | |
| """ | |
| # Extract profile facts, causal links, and progress signals | |
| if profile_data is not None and profile_data.get("facts"): | |
| # Use pre-extracted data from batched call (no extra LLM call) | |
| extraction = self.profile.store_extracted_data( | |
| data=profile_data, | |
| turn=self.timestep, | |
| ) | |
| else: | |
| # Fallback: separate LLM call for profile extraction | |
| extraction = self.profile.extract_and_store( | |
| user_text=user_text, | |
| turn=self.timestep, | |
| classifier=self.classifier, | |
| context=self._get_recent_context(), | |
| ) | |
| # Apply profile signals and progress to POMDP skill beliefs | |
| self._apply_skill_signals(extraction) | |
| lower = user_text.lower() | |
| # --- Infer emotional state --- | |
| emotional_signals = { | |
| "stressed": ["stressed", "stress", "anxious", "anxiety", "worried"], | |
| "frustrated": ["frustrated", "annoying", "annoyed", "ugh", "hate"], | |
| "sad": ["sad", "depressed", "hopeless", "lonely", "lost"], | |
| "excited": ["excited", "pumped", "can't wait", "stoked"], | |
| "calm": ["calm", "peaceful", "relaxed", "good"], | |
| "bored": ["bored", "boring", "meh", "whatever"], | |
| "overwhelmed": ["overwhelmed", "too much", "drowning", "buried"], | |
| "curious": ["curious", "wonder", "interesting", "tell me"], | |
| } | |
| detected_emotions = [] | |
| for emotion, keywords in emotional_signals.items(): | |
| if any(w in lower for w in keywords): | |
| detected_emotions.append(emotion) | |
| # --- Infer topics of interest --- | |
| topic_signals = { | |
| "work": ["work", "job", "career", "boss", "colleague", "office", "meeting", "deadline", "project"], | |
| "relationships": ["partner", "friend", "family", "relationship", "dating", "marriage"], | |
| "health": ["health", "exercise", "gym", "diet", "sleep", "energy", "body"], | |
| "creativity": ["creative", "art", "music", "writing", "design", "ideas"], | |
| "learning": ["learn", "study", "book", "course", "skill", "education"], | |
| "finances": ["money", "budget", "savings", "debt", "financial", "income"], | |
| "identity": ["who am i", "purpose", "meaning", "values", "identity", "authentic"], | |
| } | |
| detected_topics = [] | |
| for topic, keywords in topic_signals.items(): | |
| if any(w in lower for w in keywords): | |
| detected_topics.append(topic) | |
| # --- Update particle filter from conversational signals --- | |
| # The particle filter normally only updates from explicit choices. | |
| # Here we do soft updates based on inferred signals to expand the model. | |
| if detected_emotions: | |
| self._soft_update_tom_from_emotions(detected_emotions) | |
| # --- Store inferred state in history --- | |
| if detected_emotions or detected_topics: | |
| self.history.append({ | |
| "timestep": self.timestep, | |
| "type": "cognitive_model_update", | |
| "emotions": detected_emotions, | |
| "topics": detected_topics, | |
| "user_text_snippet": user_text[:100], | |
| }) | |
| def _apply_skill_signals(self, extraction: Dict[str, Any]) -> None: | |
| """ | |
| Apply profile and progress signals to update POMDP skill beliefs. | |
| This closes the feedback loop: facts extracted from conversation | |
| (breakup, progress reports, challenges) actually shift the skill | |
| beliefs, so the model evolves throughout the session. | |
| Two sources of signals: | |
| 1. Profile Bayesian network — inferred states affect skill beliefs | |
| (e.g., breakup → emotional_stress → emotional_reg impaired) | |
| 2. Progress signals — user reports improvement or regression | |
| (e.g., "I've been focusing better" → focus belief shifts up) | |
| """ | |
| # --- 1. Apply Bayesian network skill impacts --- | |
| bn_impacts = self.profile.bayes_net.get_skill_impacts() | |
| for skill, impact in bn_impacts.items(): | |
| if skill not in self.beliefs or skill not in SKILL_FACTORS: | |
| continue | |
| if abs(impact) < 0.03: | |
| continue # Too small to matter | |
| belief = self.beliefs[skill] | |
| n_levels = len(belief) | |
| # Shift belief: positive impact → shift toward higher levels, | |
| # negative impact → shift toward lower levels | |
| shift_strength = min(abs(impact), 0.3) # Cap the shift | |
| if impact > 0: | |
| # Shift probability mass toward higher skill levels | |
| target = np.zeros(n_levels) | |
| target[-1] = 0.5 # Weight toward high | |
| target[-2] = 0.3 | |
| target[n_levels // 2] = 0.2 | |
| else: | |
| # Shift probability mass toward lower skill levels | |
| target = np.zeros(n_levels) | |
| target[0] = 0.5 # Weight toward low | |
| target[1] = 0.3 | |
| target[n_levels // 2] = 0.2 | |
| target = normalize(target) | |
| # Soft blend: belief = (1 - alpha) * belief + alpha * target | |
| alpha = shift_strength * 0.15 # Gentle: max ~4.5% blend per message | |
| self.beliefs[skill] = normalize( | |
| (1.0 - alpha) * belief + alpha * target | |
| ) | |
| # --- 2. Apply progress signals (user-reported improvement/regression) --- | |
| progress_signals = extraction.get("progress_signals", []) | |
| for signal in progress_signals: | |
| skill = signal.get("skill") | |
| direction = signal.get("direction") | |
| magnitude = signal.get("magnitude", 0.2) | |
| if skill not in self.beliefs or skill not in SKILL_FACTORS: | |
| continue | |
| belief = self.beliefs[skill] | |
| n_levels = len(belief) | |
| # Progress signals are stronger than passive BN impacts — | |
| # user is explicitly reporting behavioral change | |
| shift_strength = min(magnitude, 0.5) | |
| if direction == "improvement": | |
| # Shift toward higher levels | |
| target = np.zeros(n_levels) | |
| for i in range(n_levels): | |
| target[i] = (i + 1) / n_levels # Linear ramp up | |
| else: | |
| # Shift toward lower levels | |
| target = np.zeros(n_levels) | |
| for i in range(n_levels): | |
| target[i] = (n_levels - i) / n_levels # Linear ramp down | |
| target = normalize(target) | |
| # Stronger blend for explicit progress reports | |
| alpha = shift_strength * 0.25 # Up to 12.5% blend | |
| self.beliefs[skill] = normalize( | |
| (1.0 - alpha) * belief + alpha * target | |
| ) | |
| logger.info( | |
| f"[Learning] Skill '{skill}' belief shifted ({direction}, " | |
| f"magnitude={magnitude:.2f}): {signal.get('evidence', '')}" | |
| ) | |
| def _soft_update_tom_from_emotions(self, emotions: List[str]) -> None: | |
| """ | |
| Soft-update the ToM particle filter based on emotional signals. | |
| Instead of a hard Bayesian update (which requires explicit choice observations), | |
| this applies a gentle bias to the particle weights based on what emotions | |
| tell us about the user's type dimensions. | |
| For example: "overwhelmed" suggests low overwhelm_threshold (dimension 6). | |
| "bored" with structured tasks suggests high novelty_seeking (dimension 2). | |
| """ | |
| # Map emotions to particle dimension biases | |
| # Each entry: (dimension_index, direction, strength) | |
| # direction > 0 means "this emotion suggests HIGH value on that dimension" | |
| # direction < 0 means "this emotion suggests LOW value" | |
| emotion_dim_map = { | |
| "overwhelmed": [(6, -1, 0.15)], # low overwhelm_threshold | |
| "stressed": [(6, -1, 0.10)], # low overwhelm_threshold | |
| "bored": [(2, 1, 0.10)], # high novelty_seeking | |
| "frustrated": [(5, 1, 0.08)], # high autonomy_sensitivity | |
| "excited": [(2, 1, 0.05), (6, 1, 0.05)], # novelty + high threshold | |
| "curious": [(2, 1, 0.05)], # novelty_seeking | |
| } | |
| for emotion in emotions: | |
| biases = emotion_dim_map.get(emotion, []) | |
| for dim_idx, direction, strength in biases: | |
| # Apply soft weight update: particles that match the signal get upweighted | |
| for j in range(self.tom.n_particles): | |
| particle_val = self.tom.particle_params[j, dim_idx] | |
| if direction > 0: | |
| # Upweight particles with high value on this dimension | |
| likelihood = 0.5 + strength * particle_val | |
| else: | |
| # Upweight particles with low value on this dimension | |
| likelihood = 0.5 + strength * (1.0 - particle_val) | |
| self.tom.particle_weights[j] *= likelihood | |
| # Renormalize | |
| total = np.sum(self.tom.particle_weights) | |
| if total > 0: | |
| self.tom.particle_weights /= total | |
| else: | |
| self.tom.particle_weights = np.ones(self.tom.n_particles) / self.tom.n_particles | |
| # Invalidate reliability cache | |
| self.tom._reliability_cache = None | |
| self.tom._confidence_cache = None | |
| def _get_recent_context(self) -> str: | |
| """Get recent conversation context for profile extraction.""" | |
| if not self.conversation_history: | |
| return "" | |
| recent = self.conversation_history[-4:] | |
| return " | ".join(f"{m['role']}: {m['content'][:80]}" for m in recent) | |
| def get_inferred_user_state(self) -> Dict[str, Any]: | |
| """ | |
| Get the current inferred cognitive/emotional state of the user. | |
| Aggregates recent conversation signals into a summary the LLM can use. | |
| """ | |
| # Get recent emotion/topic history | |
| recent_updates = [ | |
| h for h in self.history[-10:] | |
| if h.get("type") == "cognitive_model_update" | |
| ] | |
| recent_emotions = [] | |
| recent_topics = [] | |
| for update in recent_updates: | |
| recent_emotions.extend(update.get("emotions", [])) | |
| recent_topics.extend(update.get("topics", [])) | |
| # Deduplicate and get most recent | |
| unique_emotions = list(dict.fromkeys(reversed(recent_emotions)))[:3] | |
| unique_topics = list(dict.fromkeys(reversed(recent_topics)))[:3] | |
| # Assess overall engagement from sentiment tracking | |
| engagement = "moderate" | |
| if self._recent_sentiments: | |
| recent = self._recent_sentiments[-3:] | |
| if all(s == "engaged" for s in recent): | |
| engagement = "high" | |
| elif any(s == "disengaged" for s in recent): | |
| engagement = "low" | |
| elif any(s == "overwhelmed" for s in recent): | |
| engagement = "strained" | |
| return { | |
| "recent_emotions": unique_emotions, | |
| "recent_topics": unique_topics, | |
| "engagement_level": engagement, | |
| "turns_in_phase": self._coaching_turns if self.phase == PHASE_COACHING else 0, | |
| "tom_type": self.tom.get_user_type_summary(), | |
| "tom_reliability": self.tom.reliability, | |
| } | |
| def _respond_general_chat(self, user_text: str, phase: str) -> str: | |
| """Generate a response to general chat that relates back to coaching.""" | |
| lower = user_text.lower() | |
| # Detect emotional content | |
| is_emotional = any(w in lower for w in [ | |
| "stressed", "anxious", "worried", "frustrated", "stuck", | |
| "hopeless", "lost", "confused", "scared", "tired", | |
| "burned out", "overwhelmed", | |
| ]) | |
| is_positive = any(w in lower for w in [ | |
| "better", "good", "happy", "excited", "motivated", | |
| "hopeful", "ready", "clear", | |
| ]) | |
| if is_emotional: | |
| return ( | |
| f"I hear that. And I want you to know — what you're feeling makes sense " | |
| f"given what I'm seeing in your patterns. This isn't about fixing something " | |
| f"broken. It's about finding the smallest adjustment that creates the most relief. " | |
| f"That's what I'm trying to do here." | |
| ) | |
| elif is_positive: | |
| return ( | |
| f"That energy is worth channeling. Based on your sphere, the thing that would " | |
| f"compound most right now is working on your {self._label(self.target_skill or 'focus')}. " | |
| f"Want to commit to the step I suggested?" | |
| ) | |
| else: | |
| # Acknowledge and gently steer back | |
| if phase == PHASE_PLANNING: | |
| return ( | |
| f"Thanks for sharing that — it helps me understand you better. " | |
| f"Coming back to the step I suggested: does it feel doable, " | |
| f"or would you like me to adjust it?" | |
| ) | |
| return ( | |
| f"I appreciate you telling me that. It gives me more context for " | |
| f"how to help. What would be most useful for you right now?" | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # DATA ACCESSORS | |
| # ------------------------------------------------------------------------- | |
| def get_sphere_data(self) -> Dict[str, Any]: | |
| """Get radar chart data: skill scores + bottlenecks + edges.""" | |
| skill_beliefs = { | |
| k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS | |
| } | |
| scores = self.model.get_all_skill_scores(self.beliefs) | |
| bottlenecks = self.dep_graph.find_bottlenecks(skill_beliefs) | |
| return { | |
| "categories": scores, | |
| "bottlenecks": bottlenecks, | |
| "dependency_edges": self.dep_graph.get_all_edges(), | |
| } | |
| def get_belief_summary(self) -> Dict[str, Any]: | |
| """Get human-readable summary of all beliefs.""" | |
| summary = {} | |
| # Skill scores | |
| for skill in SKILL_FACTORS: | |
| if skill in self.beliefs: | |
| score = self.model.get_skill_score(self.beliefs[skill]) | |
| uncertainty = float(np.std(self.beliefs[skill] * SKILL_LEVEL_VALUES)) | |
| summary[skill] = {"score": round(score, 1), "uncertainty": round(uncertainty, 1)} | |
| # Preferences | |
| for factor, levels in self.model.spec.preference_factors.items(): | |
| if factor in self.beliefs: | |
| best_idx = int(np.argmax(self.beliefs[factor])) | |
| summary[factor] = { | |
| "inferred": levels[best_idx], | |
| "confidence": float(np.max(self.beliefs[factor])), | |
| } | |
| # Friction | |
| for factor, levels in self.model.spec.friction_factors.items(): | |
| if factor in self.beliefs: | |
| best_idx = int(np.argmax(self.beliefs[factor])) | |
| summary[factor] = { | |
| "inferred": levels[best_idx], | |
| "confidence": float(np.max(self.beliefs[factor])), | |
| } | |
| # ToM reliability | |
| summary["tom_reliability"] = self.tom.reliability | |
| summary["user_type"] = self.tom.get_user_type_summary() | |
| # Learning progress | |
| summary["learning"] = self.learner.get_learning_summary() | |
| # Semantic profile + Bayesian network | |
| summary["profile"] = self.profile.get_summary() | |
| return summary | |
| def get_profile_data(self) -> Dict[str, Any]: | |
| """Get detailed profile data for visualization panel.""" | |
| # Raw skill belief distributions (5-level probabilities) | |
| skill_beliefs = {} | |
| skill_scores = {} | |
| for skill in SKILL_FACTORS: | |
| if skill in self.beliefs: | |
| skill_beliefs[skill] = self.beliefs[skill].tolist() | |
| skill_scores[skill] = round( | |
| self.model.get_skill_score(self.beliefs[skill]), 1 | |
| ) | |
| # Emotional state from Circumplex POMDP | |
| emotional_state = { | |
| "valence_belief": self.emotion.belief_valence.tolist(), | |
| "arousal_belief": self.emotion.belief_arousal.tolist(), | |
| } | |
| trajectory = self.emotion.get_emotional_trajectory() | |
| if trajectory.get("current"): | |
| emotional_state["current"] = trajectory["current"] | |
| else: | |
| current = self.emotion.get_current_emotion() | |
| emotional_state["current"] = current.to_dict() if current else None | |
| emotional_state["trajectory"] = trajectory.get("states", []) | |
| emotional_state["avg_prediction_error"] = trajectory.get( | |
| "avg_prediction_error", 0.0 | |
| ) | |
| # ToM user type dimensions + reliability | |
| tom_profile = { | |
| "dimensions": self.tom.get_user_type_summary(), | |
| "reliability": round(self.tom.reliability, 3), | |
| } | |
| # Dependency graph: nodes (skills + scores + bottleneck flag) and edges | |
| skill_belief_dict = { | |
| k: v for k, v in self.beliefs.items() if k in SKILL_FACTORS | |
| } | |
| bottlenecks = self.dep_graph.find_bottlenecks(skill_belief_dict) | |
| bottleneck_ids = {b["blocker"] for b in bottlenecks} | |
| dependency_graph = { | |
| "nodes": [ | |
| { | |
| "id": skill, | |
| "score": skill_scores.get(skill, 50), | |
| "is_bottleneck": skill in bottleneck_ids, | |
| } | |
| for skill in SKILL_FACTORS | |
| ], | |
| "edges": self.dep_graph.get_all_edges(), | |
| } | |
| # Profile facts from Bayesian network | |
| profile_summary = self.profile.get_summary() | |
| facts = profile_summary.get("facts", []) | |
| bayes_net = profile_summary.get("bayes_net", {}) | |
| # Bayes net nodes dict {id: node_dict} → list with id included | |
| bn_nodes_dict = bayes_net.get("nodes", {}) | |
| bn_nodes_list = [] | |
| for nid, ndata in bn_nodes_dict.items(): | |
| node = dict(ndata) | |
| node["id"] = nid | |
| bn_nodes_list.append(node) | |
| profile_facts = { | |
| "facts": facts, | |
| "bayes_net": { | |
| "nodes": bn_nodes_list, | |
| "edges": bayes_net.get("edges", []), | |
| "skill_impacts": bayes_net.get("skill_impacts", {}), | |
| }, | |
| } | |
| # Score deltas: compare current vs previous snapshot | |
| score_deltas = {} | |
| if hasattr(self, "_prev_skill_scores") and self._prev_skill_scores: | |
| for skill, score in skill_scores.items(): | |
| prev = self._prev_skill_scores.get(skill, score) | |
| delta = round(score - prev, 1) | |
| score_deltas[skill] = delta | |
| # Store current scores for next comparison | |
| self._prev_skill_scores = dict(skill_scores) | |
| return { | |
| "skill_beliefs": skill_beliefs, | |
| "skill_scores": skill_scores, | |
| "score_deltas": score_deltas, | |
| "emotional_state": emotional_state, | |
| "tom_profile": tom_profile, | |
| "dependency_graph": dependency_graph, | |
| "profile_facts": profile_facts, | |
| } | |
| # ------------------------------------------------------------------------- | |
| # HELPERS | |
| # ------------------------------------------------------------------------- | |
| def _get_next_question(self) -> Optional[CalibrationQuestion]: | |
| """Get the next question using adaptive ordering.""" | |
| ordered = get_adaptive_question_order(self.beliefs, self.asked_question_ids) | |
| return ordered[0] if ordered else None | |
| def _format_question(self, q: CalibrationQuestion) -> Dict[str, Any]: | |
| """Format a question for the frontend.""" | |
| result = { | |
| "id": q.id, | |
| "category": q.category, | |
| "question_text": q.question_text, | |
| "question_type": q.question_type, | |
| } | |
| if q.question_type == "mc": | |
| result["options"] = q.options | |
| return result | |
| def _match_mc_answer(self, answer_text: str, options: List[str]) -> Optional[int]: | |
| """Simple text matching for MC answers. Returns None if no match.""" | |
| answer_lower = answer_text.strip().lower() | |
| if not answer_lower: | |
| return None | |
| # Check for exact index (e.g., "0", "1", "2", "3") | |
| if answer_lower.isdigit(): | |
| idx = int(answer_lower) | |
| if 0 <= idx < len(options): | |
| return idx | |
| # Check for letter (a, b, c, d) | |
| letter_map = {"a": 0, "b": 1, "c": 2, "d": 3} | |
| if answer_lower in letter_map: | |
| idx = letter_map[answer_lower] | |
| if idx < len(options): | |
| return idx | |
| # Require substantial overlap for substring match (>50% of option text) | |
| for i, opt in enumerate(options): | |
| opt_lower = opt.lower() | |
| if opt_lower == answer_lower: | |
| return i | |
| # Only match if the user's text contains most of the option | |
| if len(answer_lower) > 10 and opt_lower in answer_lower: | |
| return i | |
| return None # No confident match — treat as conversational | |
| def set_empathy_dial(self, lambda_value: float) -> None: | |
| """Adjust the empathy dial (0=challenging, 1=gentle).""" | |
| self.empathy.lambda_empathy = max(0.0, min(1.0, lambda_value)) | |