Mahault
Add EFE-driven action selection, learning feedback loop, and Docker deployment
9cbd0dd
"""
Factored POMDP Model for MindSphere Coaching.
State space is factored into independent factors (mean-field approximation)
to avoid combinatorial explosion. Each factor has its own A/B/C/D matrices.
Adapted from NEXT-prototype's onboarding_model_v0.py patterns.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np
from .utils import normalize
# =============================================================================
# STATE SPACE DEFINITIONS
# =============================================================================
SKILL_FACTORS = [
"focus",
"follow_through",
"social_courage",
"emotional_reg",
"systems_thinking",
"self_trust",
"task_clarity",
"consistency",
]
SKILL_LEVELS = ["very_low", "low", "medium", "high", "very_high"]
SKILL_LEVEL_VALUES = np.array([10.0, 30.0, 50.0, 70.0, 90.0]) # 0-100 scale
PREFERENCE_FACTORS = {
"coaching_style": ["gentle", "balanced", "challenging"],
"feedback_mode": ["visual", "narrative", "action_oriented"],
}
FRICTION_FACTORS = {
"overwhelm_sensitivity": ["low", "medium", "high"],
"autonomy_need": ["low", "medium", "high"],
}
# Actions the coaching agent can take
ACTION_NAMES = [
"ask_mc_question",
"ask_free_text",
"show_sphere",
"propose_intervention",
"reframe",
"adjust_difficulty",
"show_counterfactual",
"safety_check",
"end_session",
]
# Observation modalities
OBS_MC_LEVELS = ["answer_0", "answer_1", "answer_2", "answer_3"]
OBS_ENGAGEMENT_LEVELS = ["low", "medium", "high"]
OBS_CHOICE_LEVELS = ["accept", "too_hard", "not_relevant"]
# =============================================================================
# MODEL SPECIFICATION
# =============================================================================
@dataclass
class SphereModelSpec:
"""Specification for the factored POMDP model."""
skill_factors: List[str] = field(default_factory=lambda: list(SKILL_FACTORS))
n_skill_levels: int = 5
preference_factors: Dict[str, List[str]] = field(
default_factory=lambda: dict(PREFERENCE_FACTORS)
)
friction_factors: Dict[str, List[str]] = field(
default_factory=lambda: dict(FRICTION_FACTORS)
)
action_names: List[str] = field(default_factory=lambda: list(ACTION_NAMES))
n_actions: int = 9
n_mc_options: int = 4
# =============================================================================
# POMDP MODEL
# =============================================================================
class SphereModel:
"""
Factored POMDP model for MindSphere coaching.
Each factor has independent:
- A matrix: p(observation | state) [n_obs x n_states]
- B matrix: p(state' | state, action) [n_actions x n_states x n_states]
- C vector: preference over observations [n_obs]
- D vector: initial prior over states [n_states]
This avoids the combinatorial explosion of a full joint state space.
"""
def __init__(self, spec: Optional[SphereModelSpec] = None):
self.spec = spec or SphereModelSpec()
self.A: Dict[str, np.ndarray] = {}
self.B: Dict[str, np.ndarray] = {}
self.C: Dict[str, np.ndarray] = {}
self.D: Dict[str, np.ndarray] = {}
self._build()
def _build(self) -> None:
"""Populate all A/B/C/D matrices."""
self._build_skill_matrices()
self._build_preference_matrices()
self._build_friction_matrices()
# -------------------------------------------------------------------------
# SKILL FACTOR MATRICES
# -------------------------------------------------------------------------
@staticmethod
def _build_improvement_transition(n_s: int, strength: float) -> np.ndarray:
"""
Build a transition matrix with upward shift (skill improvement).
Creates a near-identity matrix where each state has `strength`
probability of moving one level up. The highest state stays put.
Args:
n_s: Number of state levels
strength: Probability of moving up one level (e.g., 0.08)
Returns:
Transition matrix [n_s x n_s], columns sum to 1
"""
B = np.eye(n_s, dtype=np.float64) * (1.0 - strength)
# Add upward shift: p(s+1 | s) = strength
for s in range(n_s - 1):
B[s + 1, s] += strength
# Highest state: no room to go up, stays put
B[-1, -1] = 1.0 - strength # was set above
B[-1, -1] += strength # add the shift mass back (stays)
# Add small noise floor
noise = 0.005
B = B * (1.0 - noise) + np.ones((n_s, n_s)) * noise / n_s
# Normalize columns
for col in range(n_s):
B[:, col] = normalize(B[:, col])
return B
def _build_skill_matrices(self) -> None:
"""Build A/B/C/D for each of the 8 skill factors."""
n_s = self.spec.n_skill_levels # 5
n_o = self.spec.n_mc_options # 4
# Action indices for reference
# 0: ask_mc, 1: ask_free_text, 2: show_sphere
# 3: propose_intervention, 4: reframe, 5: adjust_difficulty
# 6: show_counterfactual, 7: safety_check, 8: end_session
identity_noisy = (
np.eye(n_s, dtype=np.float64) * 0.98
+ np.ones((n_s, n_s), dtype=np.float64) * 0.02 / n_s
)
# Normalize the template
for col in range(n_s):
identity_noisy[:, col] = normalize(identity_noisy[:, col])
# Pre-build improvement transitions (shared across skills)
B_intervention = self._build_improvement_transition(n_s, strength=0.08)
B_reframe = self._build_improvement_transition(n_s, strength=0.04)
for skill in self.spec.skill_factors:
# A matrix: p(answer | skill_level) [n_obs x n_states]
# Higher skill → higher-numbered answer is more likely
A = np.array([
[0.55, 0.30, 0.10, 0.04, 0.01], # answer_0 (lowest)
[0.30, 0.40, 0.25, 0.10, 0.05], # answer_1
[0.10, 0.20, 0.40, 0.36, 0.14], # answer_2
[0.05, 0.10, 0.25, 0.50, 0.80], # answer_3 (highest)
], dtype=np.float64)
# Normalize columns
for col in range(n_s):
A[:, col] = normalize(A[:, col])
self.A[skill] = A
# B matrix: p(s' | s, action) — action-discriminative
# [n_actions x n_states x n_states]
B = np.zeros((self.spec.n_actions, n_s, n_s), dtype=np.float64)
for a in range(self.spec.n_actions):
if a == 3: # propose_intervention: ~8% chance of skill improvement
B[a] = B_intervention.copy()
elif a == 4: # reframe: ~4% chance of skill improvement
B[a] = B_reframe.copy()
else: # all other actions: near-identity (no skill change)
B[a] = identity_noisy.copy()
self.B[skill] = B
# C vector: prefer high-skill observations
self.C[skill] = np.array([-2.0, -0.5, 0.5, 2.0], dtype=np.float64)
# D vector: mildly pessimistic prior
self.D[skill] = normalize(
np.array([0.12, 0.23, 0.32, 0.22, 0.11], dtype=np.float64)
)
# -------------------------------------------------------------------------
# PREFERENCE FACTOR MATRICES
# -------------------------------------------------------------------------
def _build_preference_matrices(self) -> None:
"""Build A/B/C/D for preference factors."""
for factor_name, levels in self.spec.preference_factors.items():
n_levels = len(levels)
# A: uniform observation model (preferences inferred from free text)
A = np.ones((n_levels, n_levels), dtype=np.float64)
for col in range(n_levels):
A[:, col] = normalize(A[:, col])
self.A[factor_name] = A
# B: stable (preferences don't change during session)
B = np.zeros(
(self.spec.n_actions, n_levels, n_levels), dtype=np.float64
)
for a in range(self.spec.n_actions):
B[a] = np.eye(n_levels, dtype=np.float64)
self.B[factor_name] = B
# C: no strong preference
self.C[factor_name] = np.zeros(n_levels, dtype=np.float64)
# D: uniform prior
self.D[factor_name] = np.ones(n_levels, dtype=np.float64) / n_levels
# -------------------------------------------------------------------------
# FRICTION FACTOR MATRICES
# -------------------------------------------------------------------------
def _build_friction_matrices(self) -> None:
"""Build A/B/C/D for friction factors."""
for factor_name, levels in self.spec.friction_factors.items():
n_levels = len(levels)
# A: inferred from user choices and engagement
# For overwhelm_sensitivity:
# "too_hard" response more likely when sensitivity is high
if factor_name == "overwhelm_sensitivity":
A = np.array([
[0.60, 0.30, 0.10], # "accept" - more likely if low sensitivity
[0.25, 0.40, 0.55], # "too_hard" - more likely if high
[0.15, 0.30, 0.35], # "not_relevant"
], dtype=np.float64)
elif factor_name == "autonomy_need":
A = np.array([
[0.50, 0.35, 0.15], # "accept" structured task
[0.20, 0.30, 0.35], # "too_hard"
[0.30, 0.35, 0.50], # "not_relevant" - high autonomy rejects
], dtype=np.float64)
else:
A = np.ones((n_levels, n_levels), dtype=np.float64)
for col in range(n_levels):
A[:, col] = normalize(A[:, col])
self.A[factor_name] = A
# B: friction can shift based on coaching actions
# Action-discriminative: adjust_difficulty and safety_check
# shift overwhelm_sensitivity toward lower values
B = np.zeros(
(self.spec.n_actions, n_levels, n_levels), dtype=np.float64
)
identity_f = (
np.eye(n_levels, dtype=np.float64) * 0.9
+ np.ones((n_levels, n_levels)) * 0.1 / n_levels
)
for col in range(n_levels):
identity_f[:, col] = normalize(identity_f[:, col])
for a in range(self.spec.n_actions):
if factor_name == "overwhelm_sensitivity" and a == 5:
# adjust_difficulty: shift overwhelm toward lower (10%)
B_adj = np.eye(n_levels, dtype=np.float64) * 0.85
for s in range(1, n_levels):
B_adj[s - 1, s] += 0.10 # shift down
B_adj[0, 0] += 0.10 # lowest stays
B_adj += np.ones((n_levels, n_levels)) * 0.05 / n_levels
for col in range(n_levels):
B_adj[:, col] = normalize(B_adj[:, col])
B[a] = B_adj
elif factor_name == "overwhelm_sensitivity" and a == 7:
# safety_check: small shift toward lower overwhelm (5%)
B_safe = np.eye(n_levels, dtype=np.float64) * 0.88
for s in range(1, n_levels):
B_safe[s - 1, s] += 0.05 # shift down
B_safe[0, 0] += 0.05
B_safe += np.ones((n_levels, n_levels)) * 0.07 / n_levels
for col in range(n_levels):
B_safe[:, col] = normalize(B_safe[:, col])
B[a] = B_safe
else:
B[a] = identity_f.copy()
self.B[factor_name] = B
# C: prefer low overwhelm, moderate autonomy
if factor_name == "overwhelm_sensitivity":
self.C[factor_name] = np.array([1.0, 0.0, -1.5], dtype=np.float64)
else:
self.C[factor_name] = np.zeros(n_levels, dtype=np.float64)
# D: most people are medium
self.D[factor_name] = normalize(
np.array([0.25, 0.50, 0.25], dtype=np.float64)
)
# -------------------------------------------------------------------------
# ACCESSORS
# -------------------------------------------------------------------------
def get_all_factor_names(self) -> List[str]:
"""Get all factor names (skills + preferences + friction)."""
factors = list(self.spec.skill_factors)
factors.extend(self.spec.preference_factors.keys())
factors.extend(self.spec.friction_factors.keys())
return factors
def get_initial_beliefs(self) -> Dict[str, np.ndarray]:
"""Get initial belief distributions for all factors."""
return {name: self.D[name].copy() for name in self.get_all_factor_names()}
def get_skill_score(self, belief: np.ndarray) -> float:
"""Convert a 5-level skill belief into a 0-100 score."""
return float(np.dot(belief, SKILL_LEVEL_VALUES))
def get_all_skill_scores(
self, beliefs: Dict[str, np.ndarray]
) -> Dict[str, float]:
"""Get 0-100 scores for all 8 skill factors."""
return {
skill: self.get_skill_score(beliefs[skill])
for skill in self.spec.skill_factors
}