blux-ca / ca /core /states.py
Justadudeinspace
restructure and upgrade all ca python files
2c5ae19
from __future__ import annotations
import json
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path
from .enums import RecoveryState
@dataclass
class UserState:
"""
Comprehensive user state container for BLUX-cA.
Designed to be serializable and user-controlled.
Contains both recovery state and interaction metadata.
"""
recovery_state: RecoveryState = RecoveryState.CRISIS
history_hint: int = 0
state_confidence: float = 0.7
last_state_change: Optional[str] = None
state_duration_minutes: int = 0
interaction_count: int = 0
clarity_scores: Dict[str, float] = field(default_factory=lambda: {
"logical": 0.0,
"emotional": 0.0,
"shadow": 0.0
})
safety_flags: List[str] = field(default_factory=list)
session_markers: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to serializable dictionary."""
return {
"recovery_state": self.recovery_state.value,
"history_hint": self.history_hint,
"state_confidence": self.state_confidence,
"last_state_change": self.last_state_change,
"state_duration_minutes": self.state_duration_minutes,
"interaction_count": self.interaction_count,
"clarity_scores": self.clarity_scores,
"safety_flags": self.safety_flags,
"session_markers": self.session_markers,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> UserState:
"""Create UserState from dictionary."""
return cls(
recovery_state=RecoveryState(data.get("recovery_state", "CRISIS")),
history_hint=data.get("history_hint", 0),
state_confidence=data.get("state_confidence", 0.7),
last_state_change=data.get("last_state_change"),
state_duration_minutes=data.get("state_duration_minutes", 0),
interaction_count=data.get("interaction_count", 0),
clarity_scores=data.get("clarity_scores", {}),
safety_flags=data.get("safety_flags", []),
session_markers=data.get("session_markers", {}),
metadata=data.get("metadata", {})
)
class StateTransitionReason(str, Enum):
"""Reasons for state transitions."""
CRISIS_MARKER = "crisis_marker"
PROGRESSION = "progression"
REGRESSION = "regression"
PURPOSE_MARKER = "purpose_marker"
HONESTY_MARKER = "honesty_marker"
AWARENESS_MARKER = "awareness_marker"
RECONSTRUCTION_MARKER = "reconstruction_marker"
INTEGRATION_MARKER = "integration_marker"
TIME_BASED = "time_based"
CLARITY_SCORE = "clarity_score"
SAFETY_CONCERN = "safety_concern"
USER_DIRECTED = "user_directed"
@dataclass
class StateTransition:
"""Record of a state transition."""
from_state: RecoveryState
to_state: RecoveryState
reason: StateTransitionReason
confidence: float
timestamp: str
input_text: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
class RecoveryStateMachine:
"""
Sophisticated adaptive state machine for BLUX-cA.
Manages user's recovery state with intelligent transitions,
safety checks, and persistence capabilities.
"""
# State transition patterns with weights and confidence thresholds
STATE_MARKERS = {
RecoveryState.CRISIS: {
"patterns": [
(r"\b(i can'?t|can'?t go on|overwhelmed|emergency|panic|suicidal)\b", 0.9),
(r"\b(dying|end it|no hope|helpless|trapped)\b", 0.8),
(r"\b(urgent|immediate|right now|need help now)\b", 0.7),
(r"\!{2,}", 0.6), # Multiple exclamation marks
(r"\b(crisis|emergency|urgent)\b", 0.8),
],
"min_confidence": 0.6
},
RecoveryState.AWARENESS: {
"patterns": [
(r"\b(notice|aware|realize|see|observe|pattern)\b", 0.7),
(r"\b(something is|feels like|seems to be|maybe)\b", 0.6),
(r"\b(curious|wonder|question|think about)\b", 0.7),
(r"\?", 0.5), # Question marks
],
"min_confidence": 0.5
},
RecoveryState.HONESTY: {
"patterns": [
(r"\b(i was wrong|my fault|i lied|i hurt|i own|admit)\b", 0.8),
(r"\b(truth|honest|real|actual|fact)\b", 0.7),
(r"\b(confess|acknowledge|recognize|accept)\b", 0.7),
(r"\b(vulnerable|exposed|bare|raw)\b", 0.6),
],
"min_confidence": 0.6
},
RecoveryState.RECONSTRUCTION: {
"patterns": [
(r"\b(rebuild|reconstruct|build|create|make)\b", 0.8),
(r"\b(plan|routine|schedule|habit|system)\b", 0.7),
(r"\b(next step|action|do|implement|execute)\b", 0.7),
(r"\b(structure|framework|approach|method)\b", 0.6),
],
"min_confidence": 0.5
},
RecoveryState.INTEGRATION: {
"patterns": [
(r"\b(integrate|combine|bring together|merge)\b", 0.8),
(r"\b(understand|comprehend|make sense|see whole)\b", 0.7),
(r"\b(part of|belong|fit|connect)\b", 0.6),
(r"\b(whole picture|bigger picture|overview)\b", 0.7),
],
"min_confidence": 0.5
},
RecoveryState.PURPOSE: {
"patterns": [
(r"\b(purpose|meaning|calling|mission|why)\b", 0.9),
(r"\b(help others|give back|contribute|serve)\b", 0.8),
(r"\b(make difference|impact|change|transform)\b", 0.7),
(r"\b(direction|path|journey|destiny)\b", 0.6),
],
"min_confidence": 0.6
}
}
# Allowed state transitions (from -> to)
ALLOWED_TRANSITIONS = {
RecoveryState.CRISIS: [RecoveryState.AWARENESS, RecoveryState.HONESTY],
RecoveryState.AWARENESS: [RecoveryState.HONESTY, RecoveryState.CRISIS, RecoveryState.RECONSTRUCTION],
RecoveryState.HONESTY: [RecoveryState.RECONSTRUCTION, RecoveryState.AWARENESS, RecoveryState.INTEGRATION],
RecoveryState.RECONSTRUCTION: [RecoveryState.INTEGRATION, RecoveryState.HONESTY, RecoveryState.PURPOSE],
RecoveryState.INTEGRATION: [RecoveryState.PURPOSE, RecoveryState.RECONSTRUCTION],
RecoveryState.PURPOSE: [RecoveryState.INTEGRATION] # Can regress from purpose
}
def __init__(self, state: Optional[UserState] = None, persistence_path: Optional[str] = None) -> None:
self.state = state or UserState()
self.persistence_path = Path(persistence_path) if persistence_path else None
self.transition_history: List[StateTransition] = []
self._initialize_state()
def _initialize_state(self) -> None:
"""Initialize or validate state on creation."""
if not self.state.last_state_change:
self.state.last_state_change = datetime.now().isoformat()
# Validate recovery state
try:
# Ensure state is valid enum value
_ = RecoveryState(self.state.recovery_state.value)
except ValueError:
self.state.recovery_state = RecoveryState.CRISIS
# Load from persistence if available
if self.persistence_path and self.persistence_path.exists():
self._load_from_persistence()
def update_from_input(self, text: str, clarity_scores: Optional[Dict[str, float]] = None) -> None:
"""
Update state based on user input and clarity scores.
Args:
text: User input text
clarity_scores: Optional clarity scores from dimensions
"""
self.state.interaction_count += 1
# Update clarity scores if provided
if clarity_scores:
self.state.clarity_scores.update(clarity_scores)
# Detect markers in input
detected_state, confidence, reason = self._detect_state_markers(text)
# Calculate if state should change
should_change, new_state = self._evaluate_state_change(
detected_state, confidence, reason, text
)
if should_change and new_state != self.state.recovery_state:
self._perform_state_transition(new_state, reason, confidence, text)
else:
# Update state confidence and duration
self._update_state_metrics()
def _detect_state_markers(self, text: str) -> Tuple[Optional[RecoveryState], float, Optional[StateTransitionReason]]:
"""Detect state markers in text and return suggested state with confidence."""
text_lower = text.lower()
# Check for crisis markers first (highest priority)
crisis_score = self._score_text_for_state(text_lower, RecoveryState.CRISIS)
if crisis_score >= self.STATE_MARKERS[RecoveryState.CRISIS]["min_confidence"]:
return RecoveryState.CRISIS, crisis_score, StateTransitionReason.CRISIS_MARKER
# Check other states
best_state = None
best_score = 0.0
best_reason = None
for state, config in self.STATE_MARKERS.items():
if state == RecoveryState.CRISIS:
continue # Already checked
score = self._score_text_for_state(text_lower, state)
if score > best_score and score >= config["min_confidence"]:
best_score = score
best_state = state
# Determine reason
if best_state:
reason_map = {
RecoveryState.AWARENESS: StateTransitionReason.AWARENESS_MARKER,
RecoveryState.HONESTY: StateTransitionReason.HONESTY_MARKER,
RecoveryState.RECONSTRUCTION: StateTransitionReason.RECONSTRUCTION_MARKER,
RecoveryState.INTEGRATION: StateTransitionReason.INTEGRATION_MARKER,
RecoveryState.PURPOSE: StateTransitionReason.PURPOSE_MARKER
}
reason = reason_map.get(best_state, StateTransitionReason.PROGRESSION)
return best_state, best_score, reason
return None, 0.0, None
def _score_text_for_state(self, text_lower: str, state: RecoveryState) -> float:
"""Score text for likelihood of belonging to a state."""
if state not in self.STATE_MARKERS:
return 0.0
patterns = self.STATE_MARKERS[state]["patterns"]
total_score = 0.0
match_count = 0
for pattern, weight in patterns:
matches = re.findall(pattern, text_lower, re.IGNORECASE)
if matches:
total_score += weight * len(matches)
match_count += 1
# Normalize score
if match_count > 0:
score = total_score / match_count
# Boost score for multiple distinct matches
if match_count > 1:
score *= min(1.0 + (match_count * 0.1), 1.3)
return min(score, 1.0)
return 0.0
def _evaluate_state_change(
self,
detected_state: Optional[RecoveryState],
confidence: float,
reason: Optional[StateTransitionReason],
text: str
) -> Tuple[bool, Optional[RecoveryState]]:
"""Evaluate whether state should change and to what."""
current_state = self.state.recovery_state
# Safety: Always transition to crisis if detected with high confidence
if detected_state == RecoveryState.CRISIS and confidence > 0.7:
return True, RecoveryState.CRISIS
# No detected state or low confidence
if not detected_state or confidence < 0.5:
# Consider time-based progression
if self.state.state_duration_minutes > 10: # Been in state for a while
next_state = current_state.next_state()
if next_state and self._is_transition_allowed(current_state, next_state):
return True, next_state
return False, None
# Check if transition is allowed
if not self._is_transition_allowed(current_state, detected_state):
return False, None
# Determine if we should change based on confidence and current state confidence
change_threshold = 0.6 - (self.state.state_confidence * 0.2)
if confidence > change_threshold:
# Ensure we're not regressing without good reason
if self._is_regression(current_state, detected_state):
# Only allow regression with high confidence
if confidence > 0.75:
return True, detected_state
else:
return False, None
return True, detected_state
return False, None
def _is_transition_allowed(self, from_state: RecoveryState, to_state: RecoveryState) -> bool:
"""Check if transition is allowed."""
if from_state == to_state:
return True # Staying in same state is always allowed
allowed = self.ALLOWED_TRANSITIONS.get(from_state, [])
return to_state in allowed
def _is_regression(self, from_state: RecoveryState, to_state: RecoveryState) -> bool:
"""Check if transition is a regression."""
progression_order = RecoveryState.progression_order()
try:
from_index = progression_order.index(from_state)
to_index = progression_order.index(to_state)
return to_index < from_index
except ValueError:
return False
def _perform_state_transition(
self,
new_state: RecoveryState,
reason: StateTransitionReason,
confidence: float,
input_text: Optional[str] = None
) -> None:
"""Perform state transition and record it."""
old_state = self.state.recovery_state
# Update state
self.state.recovery_state = new_state
self.state.state_confidence = confidence
self.state.last_state_change = datetime.now().isoformat()
self.state.state_duration_minutes = 0
# Record transition
transition = StateTransition(
from_state=old_state,
to_state=new_state,
reason=reason,
confidence=confidence,
timestamp=datetime.now().isoformat(),
input_text=input_text,
metadata={
"interaction_count": self.state.interaction_count,
"clarity_scores": self.state.clarity_scores.copy()
}
)
self.transition_history.append(transition)
# Limit history size
if len(self.transition_history) > 100:
self.transition_history = self.transition_history[-100:]
# Update persistence
self._save_to_persistence()
logger = self._get_logger()
logger.info(f"State transition: {old_state.value} -> {new_state.value} "
f"(reason: {reason.value}, confidence: {confidence:.2f})")
def _update_state_metrics(self) -> None:
"""Update state duration and confidence metrics."""
if self.state.last_state_change:
try:
last_change = datetime.fromisoformat(self.state.last_state_change)
duration = (datetime.now() - last_change).total_seconds() / 60
self.state.state_duration_minutes = int(duration)
except (ValueError, TypeError):
pass
# Decay confidence slightly over time if no reinforcement
self.state.state_confidence *= 0.995
def _save_to_persistence(self) -> None:
"""Save state to persistence path."""
if not self.persistence_path:
return
try:
data = {
"state": self.state.to_dict(),
"transition_history": [
{
"from_state": t.from_state.value,
"to_state": t.to_state.value,
"reason": t.reason.value,
"confidence": t.confidence,
"timestamp": t.timestamp,
"input_text": t.input_text,
"metadata": t.metadata
}
for t in self.transition_history[-50:] # Save last 50 transitions
]
}
with open(self.persistence_path, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger = self._get_logger()
logger.error(f"Failed to save state to persistence: {e}")
def _load_from_persistence(self) -> None:
"""Load state from persistence path."""
if not self.persistence_path or not self.persistence_path.exists():
return
try:
with open(self.persistence_path, 'r') as f:
data = json.load(f)
# Load state
if "state" in data:
self.state = UserState.from_dict(data["state"])
# Load transition history
if "transition_history" in data:
self.transition_history = []
for t_data in data["transition_history"]:
transition = StateTransition(
from_state=RecoveryState(t_data["from_state"]),
to_state=RecoveryState(t_data["to_state"]),
reason=StateTransitionReason(t_data["reason"]),
confidence=t_data["confidence"],
timestamp=t_data["timestamp"],
input_text=t_data.get("input_text"),
metadata=t_data.get("metadata", {})
)
self.transition_history.append(transition)
logger = self._get_logger()
logger.info(f"Loaded state from persistence: {self.state.recovery_state.value}")
except Exception as e:
logger = self._get_logger()
logger.error(f"Failed to load state from persistence: {e}")
def to_token(self) -> Dict[str, Any]:
"""Convert current state to serializable token."""
return self.state.to_dict()
@classmethod
def from_token(cls, token: Optional[Dict[str, Any]], **kwargs) -> RecoveryStateMachine:
"""Create RecoveryStateMachine from token."""
if not token:
return cls(**kwargs)
state = UserState.from_dict(token)
rsm = cls(state=state, **kwargs)
return rsm
def get_state_summary(self) -> Dict[str, Any]:
"""Get comprehensive state summary."""
return {
"current_state": self.state.recovery_state.value,
"state_confidence": self.state.state_confidence,
"state_duration_minutes": self.state.state_duration_minutes,
"interaction_count": self.state.interaction_count,
"clarity_scores": self.state.clarity_scores,
"safety_flags": self.state.safety_flags,
"transition_count": len(self.transition_history),
"recent_transitions": [
{
"from": t.from_state.value,
"to": t.to_state.value,
"reason": t.reason.value,
"time": t.timestamp
}
for t in self.transition_history[-5:]
] if self.transition_history else []
}
def add_safety_flag(self, flag: str) -> None:
"""Add a safety flag to state."""
if flag not in self.state.safety_flags:
self.state.safety_flags.append(flag)
def clear_safety_flags(self) -> None:
"""Clear all safety flags."""
self.state.safety_flags.clear()
def _get_logger(self):
"""Get logger instance."""
import logging
return logging.getLogger(__name__)