|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import re |
|
|
from dataclasses import dataclass, field |
|
|
from datetime import datetime |
|
|
from enum import Enum |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
from pathlib import Path |
|
|
|
|
|
from .enums import RecoveryState |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class UserState: |
|
|
""" |
|
|
Comprehensive user state container for BLUX-cA. |
|
|
|
|
|
Designed to be serializable and user-controlled. |
|
|
Contains both recovery state and interaction metadata. |
|
|
""" |
|
|
recovery_state: RecoveryState = RecoveryState.CRISIS |
|
|
history_hint: int = 0 |
|
|
state_confidence: float = 0.7 |
|
|
last_state_change: Optional[str] = None |
|
|
state_duration_minutes: int = 0 |
|
|
interaction_count: int = 0 |
|
|
clarity_scores: Dict[str, float] = field(default_factory=lambda: { |
|
|
"logical": 0.0, |
|
|
"emotional": 0.0, |
|
|
"shadow": 0.0 |
|
|
}) |
|
|
safety_flags: List[str] = field(default_factory=list) |
|
|
session_markers: Dict[str, Any] = field(default_factory=dict) |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to serializable dictionary.""" |
|
|
return { |
|
|
"recovery_state": self.recovery_state.value, |
|
|
"history_hint": self.history_hint, |
|
|
"state_confidence": self.state_confidence, |
|
|
"last_state_change": self.last_state_change, |
|
|
"state_duration_minutes": self.state_duration_minutes, |
|
|
"interaction_count": self.interaction_count, |
|
|
"clarity_scores": self.clarity_scores, |
|
|
"safety_flags": self.safety_flags, |
|
|
"session_markers": self.session_markers, |
|
|
"metadata": self.metadata |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def from_dict(cls, data: Dict[str, Any]) -> UserState: |
|
|
"""Create UserState from dictionary.""" |
|
|
return cls( |
|
|
recovery_state=RecoveryState(data.get("recovery_state", "CRISIS")), |
|
|
history_hint=data.get("history_hint", 0), |
|
|
state_confidence=data.get("state_confidence", 0.7), |
|
|
last_state_change=data.get("last_state_change"), |
|
|
state_duration_minutes=data.get("state_duration_minutes", 0), |
|
|
interaction_count=data.get("interaction_count", 0), |
|
|
clarity_scores=data.get("clarity_scores", {}), |
|
|
safety_flags=data.get("safety_flags", []), |
|
|
session_markers=data.get("session_markers", {}), |
|
|
metadata=data.get("metadata", {}) |
|
|
) |
|
|
|
|
|
|
|
|
class StateTransitionReason(str, Enum): |
|
|
"""Reasons for state transitions.""" |
|
|
CRISIS_MARKER = "crisis_marker" |
|
|
PROGRESSION = "progression" |
|
|
REGRESSION = "regression" |
|
|
PURPOSE_MARKER = "purpose_marker" |
|
|
HONESTY_MARKER = "honesty_marker" |
|
|
AWARENESS_MARKER = "awareness_marker" |
|
|
RECONSTRUCTION_MARKER = "reconstruction_marker" |
|
|
INTEGRATION_MARKER = "integration_marker" |
|
|
TIME_BASED = "time_based" |
|
|
CLARITY_SCORE = "clarity_score" |
|
|
SAFETY_CONCERN = "safety_concern" |
|
|
USER_DIRECTED = "user_directed" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class StateTransition: |
|
|
"""Record of a state transition.""" |
|
|
from_state: RecoveryState |
|
|
to_state: RecoveryState |
|
|
reason: StateTransitionReason |
|
|
confidence: float |
|
|
timestamp: str |
|
|
input_text: Optional[str] = None |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
class RecoveryStateMachine: |
|
|
""" |
|
|
Sophisticated adaptive state machine for BLUX-cA. |
|
|
|
|
|
Manages user's recovery state with intelligent transitions, |
|
|
safety checks, and persistence capabilities. |
|
|
""" |
|
|
|
|
|
|
|
|
STATE_MARKERS = { |
|
|
RecoveryState.CRISIS: { |
|
|
"patterns": [ |
|
|
(r"\b(i can'?t|can'?t go on|overwhelmed|emergency|panic|suicidal)\b", 0.9), |
|
|
(r"\b(dying|end it|no hope|helpless|trapped)\b", 0.8), |
|
|
(r"\b(urgent|immediate|right now|need help now)\b", 0.7), |
|
|
(r"\!{2,}", 0.6), |
|
|
(r"\b(crisis|emergency|urgent)\b", 0.8), |
|
|
], |
|
|
"min_confidence": 0.6 |
|
|
}, |
|
|
RecoveryState.AWARENESS: { |
|
|
"patterns": [ |
|
|
(r"\b(notice|aware|realize|see|observe|pattern)\b", 0.7), |
|
|
(r"\b(something is|feels like|seems to be|maybe)\b", 0.6), |
|
|
(r"\b(curious|wonder|question|think about)\b", 0.7), |
|
|
(r"\?", 0.5), |
|
|
], |
|
|
"min_confidence": 0.5 |
|
|
}, |
|
|
RecoveryState.HONESTY: { |
|
|
"patterns": [ |
|
|
(r"\b(i was wrong|my fault|i lied|i hurt|i own|admit)\b", 0.8), |
|
|
(r"\b(truth|honest|real|actual|fact)\b", 0.7), |
|
|
(r"\b(confess|acknowledge|recognize|accept)\b", 0.7), |
|
|
(r"\b(vulnerable|exposed|bare|raw)\b", 0.6), |
|
|
], |
|
|
"min_confidence": 0.6 |
|
|
}, |
|
|
RecoveryState.RECONSTRUCTION: { |
|
|
"patterns": [ |
|
|
(r"\b(rebuild|reconstruct|build|create|make)\b", 0.8), |
|
|
(r"\b(plan|routine|schedule|habit|system)\b", 0.7), |
|
|
(r"\b(next step|action|do|implement|execute)\b", 0.7), |
|
|
(r"\b(structure|framework|approach|method)\b", 0.6), |
|
|
], |
|
|
"min_confidence": 0.5 |
|
|
}, |
|
|
RecoveryState.INTEGRATION: { |
|
|
"patterns": [ |
|
|
(r"\b(integrate|combine|bring together|merge)\b", 0.8), |
|
|
(r"\b(understand|comprehend|make sense|see whole)\b", 0.7), |
|
|
(r"\b(part of|belong|fit|connect)\b", 0.6), |
|
|
(r"\b(whole picture|bigger picture|overview)\b", 0.7), |
|
|
], |
|
|
"min_confidence": 0.5 |
|
|
}, |
|
|
RecoveryState.PURPOSE: { |
|
|
"patterns": [ |
|
|
(r"\b(purpose|meaning|calling|mission|why)\b", 0.9), |
|
|
(r"\b(help others|give back|contribute|serve)\b", 0.8), |
|
|
(r"\b(make difference|impact|change|transform)\b", 0.7), |
|
|
(r"\b(direction|path|journey|destiny)\b", 0.6), |
|
|
], |
|
|
"min_confidence": 0.6 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
ALLOWED_TRANSITIONS = { |
|
|
RecoveryState.CRISIS: [RecoveryState.AWARENESS, RecoveryState.HONESTY], |
|
|
RecoveryState.AWARENESS: [RecoveryState.HONESTY, RecoveryState.CRISIS, RecoveryState.RECONSTRUCTION], |
|
|
RecoveryState.HONESTY: [RecoveryState.RECONSTRUCTION, RecoveryState.AWARENESS, RecoveryState.INTEGRATION], |
|
|
RecoveryState.RECONSTRUCTION: [RecoveryState.INTEGRATION, RecoveryState.HONESTY, RecoveryState.PURPOSE], |
|
|
RecoveryState.INTEGRATION: [RecoveryState.PURPOSE, RecoveryState.RECONSTRUCTION], |
|
|
RecoveryState.PURPOSE: [RecoveryState.INTEGRATION] |
|
|
} |
|
|
|
|
|
def __init__(self, state: Optional[UserState] = None, persistence_path: Optional[str] = None) -> None: |
|
|
self.state = state or UserState() |
|
|
self.persistence_path = Path(persistence_path) if persistence_path else None |
|
|
self.transition_history: List[StateTransition] = [] |
|
|
self._initialize_state() |
|
|
|
|
|
def _initialize_state(self) -> None: |
|
|
"""Initialize or validate state on creation.""" |
|
|
if not self.state.last_state_change: |
|
|
self.state.last_state_change = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
_ = RecoveryState(self.state.recovery_state.value) |
|
|
except ValueError: |
|
|
self.state.recovery_state = RecoveryState.CRISIS |
|
|
|
|
|
|
|
|
if self.persistence_path and self.persistence_path.exists(): |
|
|
self._load_from_persistence() |
|
|
|
|
|
def update_from_input(self, text: str, clarity_scores: Optional[Dict[str, float]] = None) -> None: |
|
|
""" |
|
|
Update state based on user input and clarity scores. |
|
|
|
|
|
Args: |
|
|
text: User input text |
|
|
clarity_scores: Optional clarity scores from dimensions |
|
|
""" |
|
|
self.state.interaction_count += 1 |
|
|
|
|
|
|
|
|
if clarity_scores: |
|
|
self.state.clarity_scores.update(clarity_scores) |
|
|
|
|
|
|
|
|
detected_state, confidence, reason = self._detect_state_markers(text) |
|
|
|
|
|
|
|
|
should_change, new_state = self._evaluate_state_change( |
|
|
detected_state, confidence, reason, text |
|
|
) |
|
|
|
|
|
if should_change and new_state != self.state.recovery_state: |
|
|
self._perform_state_transition(new_state, reason, confidence, text) |
|
|
else: |
|
|
|
|
|
self._update_state_metrics() |
|
|
|
|
|
def _detect_state_markers(self, text: str) -> Tuple[Optional[RecoveryState], float, Optional[StateTransitionReason]]: |
|
|
"""Detect state markers in text and return suggested state with confidence.""" |
|
|
text_lower = text.lower() |
|
|
|
|
|
|
|
|
crisis_score = self._score_text_for_state(text_lower, RecoveryState.CRISIS) |
|
|
if crisis_score >= self.STATE_MARKERS[RecoveryState.CRISIS]["min_confidence"]: |
|
|
return RecoveryState.CRISIS, crisis_score, StateTransitionReason.CRISIS_MARKER |
|
|
|
|
|
|
|
|
best_state = None |
|
|
best_score = 0.0 |
|
|
best_reason = None |
|
|
|
|
|
for state, config in self.STATE_MARKERS.items(): |
|
|
if state == RecoveryState.CRISIS: |
|
|
continue |
|
|
|
|
|
score = self._score_text_for_state(text_lower, state) |
|
|
if score > best_score and score >= config["min_confidence"]: |
|
|
best_score = score |
|
|
best_state = state |
|
|
|
|
|
|
|
|
if best_state: |
|
|
reason_map = { |
|
|
RecoveryState.AWARENESS: StateTransitionReason.AWARENESS_MARKER, |
|
|
RecoveryState.HONESTY: StateTransitionReason.HONESTY_MARKER, |
|
|
RecoveryState.RECONSTRUCTION: StateTransitionReason.RECONSTRUCTION_MARKER, |
|
|
RecoveryState.INTEGRATION: StateTransitionReason.INTEGRATION_MARKER, |
|
|
RecoveryState.PURPOSE: StateTransitionReason.PURPOSE_MARKER |
|
|
} |
|
|
reason = reason_map.get(best_state, StateTransitionReason.PROGRESSION) |
|
|
return best_state, best_score, reason |
|
|
|
|
|
return None, 0.0, None |
|
|
|
|
|
def _score_text_for_state(self, text_lower: str, state: RecoveryState) -> float: |
|
|
"""Score text for likelihood of belonging to a state.""" |
|
|
if state not in self.STATE_MARKERS: |
|
|
return 0.0 |
|
|
|
|
|
patterns = self.STATE_MARKERS[state]["patterns"] |
|
|
total_score = 0.0 |
|
|
match_count = 0 |
|
|
|
|
|
for pattern, weight in patterns: |
|
|
matches = re.findall(pattern, text_lower, re.IGNORECASE) |
|
|
if matches: |
|
|
total_score += weight * len(matches) |
|
|
match_count += 1 |
|
|
|
|
|
|
|
|
if match_count > 0: |
|
|
score = total_score / match_count |
|
|
|
|
|
if match_count > 1: |
|
|
score *= min(1.0 + (match_count * 0.1), 1.3) |
|
|
return min(score, 1.0) |
|
|
|
|
|
return 0.0 |
|
|
|
|
|
def _evaluate_state_change( |
|
|
self, |
|
|
detected_state: Optional[RecoveryState], |
|
|
confidence: float, |
|
|
reason: Optional[StateTransitionReason], |
|
|
text: str |
|
|
) -> Tuple[bool, Optional[RecoveryState]]: |
|
|
"""Evaluate whether state should change and to what.""" |
|
|
current_state = self.state.recovery_state |
|
|
|
|
|
|
|
|
if detected_state == RecoveryState.CRISIS and confidence > 0.7: |
|
|
return True, RecoveryState.CRISIS |
|
|
|
|
|
|
|
|
if not detected_state or confidence < 0.5: |
|
|
|
|
|
if self.state.state_duration_minutes > 10: |
|
|
next_state = current_state.next_state() |
|
|
if next_state and self._is_transition_allowed(current_state, next_state): |
|
|
return True, next_state |
|
|
return False, None |
|
|
|
|
|
|
|
|
if not self._is_transition_allowed(current_state, detected_state): |
|
|
return False, None |
|
|
|
|
|
|
|
|
change_threshold = 0.6 - (self.state.state_confidence * 0.2) |
|
|
if confidence > change_threshold: |
|
|
|
|
|
if self._is_regression(current_state, detected_state): |
|
|
|
|
|
if confidence > 0.75: |
|
|
return True, detected_state |
|
|
else: |
|
|
return False, None |
|
|
return True, detected_state |
|
|
|
|
|
return False, None |
|
|
|
|
|
def _is_transition_allowed(self, from_state: RecoveryState, to_state: RecoveryState) -> bool: |
|
|
"""Check if transition is allowed.""" |
|
|
if from_state == to_state: |
|
|
return True |
|
|
|
|
|
allowed = self.ALLOWED_TRANSITIONS.get(from_state, []) |
|
|
return to_state in allowed |
|
|
|
|
|
def _is_regression(self, from_state: RecoveryState, to_state: RecoveryState) -> bool: |
|
|
"""Check if transition is a regression.""" |
|
|
progression_order = RecoveryState.progression_order() |
|
|
try: |
|
|
from_index = progression_order.index(from_state) |
|
|
to_index = progression_order.index(to_state) |
|
|
return to_index < from_index |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
def _perform_state_transition( |
|
|
self, |
|
|
new_state: RecoveryState, |
|
|
reason: StateTransitionReason, |
|
|
confidence: float, |
|
|
input_text: Optional[str] = None |
|
|
) -> None: |
|
|
"""Perform state transition and record it.""" |
|
|
old_state = self.state.recovery_state |
|
|
|
|
|
|
|
|
self.state.recovery_state = new_state |
|
|
self.state.state_confidence = confidence |
|
|
self.state.last_state_change = datetime.now().isoformat() |
|
|
self.state.state_duration_minutes = 0 |
|
|
|
|
|
|
|
|
transition = StateTransition( |
|
|
from_state=old_state, |
|
|
to_state=new_state, |
|
|
reason=reason, |
|
|
confidence=confidence, |
|
|
timestamp=datetime.now().isoformat(), |
|
|
input_text=input_text, |
|
|
metadata={ |
|
|
"interaction_count": self.state.interaction_count, |
|
|
"clarity_scores": self.state.clarity_scores.copy() |
|
|
} |
|
|
) |
|
|
self.transition_history.append(transition) |
|
|
|
|
|
|
|
|
if len(self.transition_history) > 100: |
|
|
self.transition_history = self.transition_history[-100:] |
|
|
|
|
|
|
|
|
self._save_to_persistence() |
|
|
|
|
|
logger = self._get_logger() |
|
|
logger.info(f"State transition: {old_state.value} -> {new_state.value} " |
|
|
f"(reason: {reason.value}, confidence: {confidence:.2f})") |
|
|
|
|
|
def _update_state_metrics(self) -> None: |
|
|
"""Update state duration and confidence metrics.""" |
|
|
if self.state.last_state_change: |
|
|
try: |
|
|
last_change = datetime.fromisoformat(self.state.last_state_change) |
|
|
duration = (datetime.now() - last_change).total_seconds() / 60 |
|
|
self.state.state_duration_minutes = int(duration) |
|
|
except (ValueError, TypeError): |
|
|
pass |
|
|
|
|
|
|
|
|
self.state.state_confidence *= 0.995 |
|
|
|
|
|
def _save_to_persistence(self) -> None: |
|
|
"""Save state to persistence path.""" |
|
|
if not self.persistence_path: |
|
|
return |
|
|
|
|
|
try: |
|
|
data = { |
|
|
"state": self.state.to_dict(), |
|
|
"transition_history": [ |
|
|
{ |
|
|
"from_state": t.from_state.value, |
|
|
"to_state": t.to_state.value, |
|
|
"reason": t.reason.value, |
|
|
"confidence": t.confidence, |
|
|
"timestamp": t.timestamp, |
|
|
"input_text": t.input_text, |
|
|
"metadata": t.metadata |
|
|
} |
|
|
for t in self.transition_history[-50:] |
|
|
] |
|
|
} |
|
|
|
|
|
with open(self.persistence_path, 'w') as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
logger = self._get_logger() |
|
|
logger.error(f"Failed to save state to persistence: {e}") |
|
|
|
|
|
def _load_from_persistence(self) -> None: |
|
|
"""Load state from persistence path.""" |
|
|
if not self.persistence_path or not self.persistence_path.exists(): |
|
|
return |
|
|
|
|
|
try: |
|
|
with open(self.persistence_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
if "state" in data: |
|
|
self.state = UserState.from_dict(data["state"]) |
|
|
|
|
|
|
|
|
if "transition_history" in data: |
|
|
self.transition_history = [] |
|
|
for t_data in data["transition_history"]: |
|
|
transition = StateTransition( |
|
|
from_state=RecoveryState(t_data["from_state"]), |
|
|
to_state=RecoveryState(t_data["to_state"]), |
|
|
reason=StateTransitionReason(t_data["reason"]), |
|
|
confidence=t_data["confidence"], |
|
|
timestamp=t_data["timestamp"], |
|
|
input_text=t_data.get("input_text"), |
|
|
metadata=t_data.get("metadata", {}) |
|
|
) |
|
|
self.transition_history.append(transition) |
|
|
|
|
|
logger = self._get_logger() |
|
|
logger.info(f"Loaded state from persistence: {self.state.recovery_state.value}") |
|
|
|
|
|
except Exception as e: |
|
|
logger = self._get_logger() |
|
|
logger.error(f"Failed to load state from persistence: {e}") |
|
|
|
|
|
def to_token(self) -> Dict[str, Any]: |
|
|
"""Convert current state to serializable token.""" |
|
|
return self.state.to_dict() |
|
|
|
|
|
@classmethod |
|
|
def from_token(cls, token: Optional[Dict[str, Any]], **kwargs) -> RecoveryStateMachine: |
|
|
"""Create RecoveryStateMachine from token.""" |
|
|
if not token: |
|
|
return cls(**kwargs) |
|
|
|
|
|
state = UserState.from_dict(token) |
|
|
rsm = cls(state=state, **kwargs) |
|
|
return rsm |
|
|
|
|
|
def get_state_summary(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive state summary.""" |
|
|
return { |
|
|
"current_state": self.state.recovery_state.value, |
|
|
"state_confidence": self.state.state_confidence, |
|
|
"state_duration_minutes": self.state.state_duration_minutes, |
|
|
"interaction_count": self.state.interaction_count, |
|
|
"clarity_scores": self.state.clarity_scores, |
|
|
"safety_flags": self.state.safety_flags, |
|
|
"transition_count": len(self.transition_history), |
|
|
"recent_transitions": [ |
|
|
{ |
|
|
"from": t.from_state.value, |
|
|
"to": t.to_state.value, |
|
|
"reason": t.reason.value, |
|
|
"time": t.timestamp |
|
|
} |
|
|
for t in self.transition_history[-5:] |
|
|
] if self.transition_history else [] |
|
|
} |
|
|
|
|
|
def add_safety_flag(self, flag: str) -> None: |
|
|
"""Add a safety flag to state.""" |
|
|
if flag not in self.state.safety_flags: |
|
|
self.state.safety_flags.append(flag) |
|
|
|
|
|
def clear_safety_flags(self) -> None: |
|
|
"""Clear all safety flags.""" |
|
|
self.state.safety_flags.clear() |
|
|
|
|
|
def _get_logger(self): |
|
|
"""Get logger instance.""" |
|
|
import logging |
|
|
return logging.getLogger(__name__) |