|
|
import logging |
|
|
import json |
|
|
import json5 |
|
|
import time |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any, Optional |
|
|
from pydantic import BaseModel, Field |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class PhaseTransitionResponse(BaseModel): |
|
|
goals_progress: Dict[str, float] |
|
|
should_transition: bool |
|
|
next_phase: str |
|
|
reasoning: str |
|
|
|
|
|
class SessionCharacteristics(BaseModel): |
|
|
alliance_strength: float = Field(ge=0.0, le=1.0) |
|
|
engagement_level: float = Field(ge=0.0, le=1.0) |
|
|
emotional_pattern: str |
|
|
cognitive_pattern: str |
|
|
coping_mechanisms: List[str] = Field(min_items=2) |
|
|
progress_quality: float = Field(ge=0.0, le=1.0) |
|
|
recommended_focus: str |
|
|
|
|
|
class ConversationPhase(BaseModel): |
|
|
name: str |
|
|
description: str |
|
|
goals: List[str] |
|
|
typical_duration: int |
|
|
started_at: Optional[str] = None |
|
|
ended_at: Optional[str] = None |
|
|
completion_metrics: Dict[str, float] = Field(default_factory=dict) |
|
|
|
|
|
class FlowManager: |
|
|
|
|
|
|
|
|
PHASES = { |
|
|
'introduction': { |
|
|
'description': 'Establishing rapport and identifying main concerns', |
|
|
'goals': [ |
|
|
'build therapeutic alliance', |
|
|
'identify primary concerns', |
|
|
'understand client expectations', |
|
|
'establish session structure' |
|
|
], |
|
|
'typical_duration': 5 |
|
|
}, |
|
|
'exploration': { |
|
|
'description': 'In-depth exploration of issues and their context', |
|
|
'goals': [ |
|
|
'examine emotional responses', |
|
|
'explore thought patterns', |
|
|
'identify behavioral patterns', |
|
|
'understand situational context', |
|
|
'recognize relationship dynamics' |
|
|
], |
|
|
'typical_duration': 15 |
|
|
}, |
|
|
'intervention': { |
|
|
'description': 'Providing strategies, insights, and therapeutic interventions', |
|
|
'goals': [ |
|
|
'introduce coping techniques', |
|
|
'reframe negative thinking', |
|
|
'provide emotional validation', |
|
|
'offer perspective shifts', |
|
|
'suggest behavioral modifications' |
|
|
], |
|
|
'typical_duration': 20 |
|
|
}, |
|
|
'conclusion': { |
|
|
'description': 'Summarizing insights and establishing next steps', |
|
|
'goals': [ |
|
|
'review key insights', |
|
|
'consolidate learning', |
|
|
'identify action items', |
|
|
'set intentions', |
|
|
'provide closure' |
|
|
], |
|
|
'typical_duration': 5 |
|
|
} |
|
|
} |
|
|
|
|
|
def __init__(self, llm, session_duration: int = 45): |
|
|
|
|
|
self.llm = llm |
|
|
self.session_duration = session_duration * 60 |
|
|
|
|
|
|
|
|
self.user_sessions = {} |
|
|
|
|
|
logger.info(f"Initialized FlowManager with {session_duration} minute sessions") |
|
|
|
|
|
def _ensure_user_session(self, user_id: str): |
|
|
|
|
|
if user_id not in self.user_sessions: |
|
|
self.initialize_session(user_id) |
|
|
|
|
|
def initialize_session(self, user_id: str): |
|
|
|
|
|
now = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
initial_phase = ConversationPhase( |
|
|
name='introduction', |
|
|
description=self.PHASES['introduction']['description'], |
|
|
goals=self.PHASES['introduction']['goals'], |
|
|
typical_duration=self.PHASES['introduction']['typical_duration'], |
|
|
started_at=now |
|
|
) |
|
|
|
|
|
|
|
|
session_id = f"{user_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}" |
|
|
|
|
|
|
|
|
self.user_sessions[user_id] = { |
|
|
'session_id': session_id, |
|
|
'user_id': user_id, |
|
|
'started_at': now, |
|
|
'updated_at': now, |
|
|
'current_phase': initial_phase, |
|
|
'phase_history': [initial_phase], |
|
|
'message_count': 0, |
|
|
'emotion_history': [], |
|
|
'emotion_progression': [], |
|
|
'flags': { |
|
|
'crisis_detected': False, |
|
|
'long_silences': False |
|
|
}, |
|
|
'llm_context': { |
|
|
'session_characteristics': {} |
|
|
} |
|
|
} |
|
|
|
|
|
logger.info(f"Initialized new session for user {user_id}") |
|
|
return self.user_sessions[user_id] |
|
|
|
|
|
def process_message(self, user_id: str, message: str, emotions: Dict[str, float]) -> Dict[str, Any]: |
|
|
|
|
|
self._ensure_user_session(user_id) |
|
|
session = self.user_sessions[user_id] |
|
|
|
|
|
|
|
|
now = datetime.now().isoformat() |
|
|
session['updated_at'] = now |
|
|
session['message_count'] += 1 |
|
|
|
|
|
|
|
|
emotion_entry = { |
|
|
'timestamp': now, |
|
|
'emotions': emotions, |
|
|
'message_idx': session['message_count'] |
|
|
} |
|
|
session['emotion_history'].append(emotion_entry) |
|
|
|
|
|
|
|
|
if not session.get('emotion_progression'): |
|
|
session['emotion_progression'] = [] |
|
|
|
|
|
|
|
|
primary_emotion = max(emotions.items(), key=lambda x: x[1])[0] |
|
|
session['emotion_progression'].append(primary_emotion) |
|
|
|
|
|
|
|
|
self._check_phase_transition(user_id, message, emotions) |
|
|
|
|
|
|
|
|
if session['message_count'] % 5 == 0: |
|
|
self._update_session_characteristics(user_id) |
|
|
|
|
|
|
|
|
flow_context = self._create_flow_context(user_id) |
|
|
|
|
|
return flow_context |
|
|
|
|
|
def _check_phase_transition(self, user_id: str, message: str, emotions: Dict[str, float]): |
|
|
|
|
|
session = self.user_sessions[user_id] |
|
|
current_phase = session['current_phase'] |
|
|
|
|
|
|
|
|
started_at = datetime.fromisoformat(session['started_at']) |
|
|
now = datetime.now() |
|
|
elapsed_seconds = (now - started_at).total_seconds() |
|
|
session_progress = elapsed_seconds / self.session_duration |
|
|
|
|
|
|
|
|
phase_context = { |
|
|
'current': current_phase.name, |
|
|
'description': current_phase.description, |
|
|
'goals': current_phase.goals, |
|
|
'time_in_phase': (now - datetime.fromisoformat(current_phase.started_at)).total_seconds() / 60, |
|
|
'session_progress': session_progress, |
|
|
'message_count': session['message_count'] |
|
|
} |
|
|
|
|
|
|
|
|
min_time_in_phase_minutes = max(2, current_phase.typical_duration * 0.5) |
|
|
if phase_context['time_in_phase'] < min_time_in_phase_minutes: |
|
|
return |
|
|
|
|
|
prompt = f""" |
|
|
Evaluate whether this therapeutic conversation should transition to the next phase. |
|
|
|
|
|
Current conversation state: |
|
|
- Current phase: {current_phase.name} ("{current_phase.description}") |
|
|
- Goals for this phase: {', '.join(current_phase.goals)} |
|
|
- Time spent in this phase: {phase_context['time_in_phase']:.1f} minutes |
|
|
- Session progress: {session_progress * 100:.1f}% complete |
|
|
- Message count: {session['message_count']} |
|
|
|
|
|
Latest message from user: "{message}" |
|
|
|
|
|
Current emotions: {', '.join([f"{e} ({score:.2f})" for e, score in |
|
|
sorted(emotions.items(), key=lambda x: x[1], reverse=True)[:3]])} |
|
|
|
|
|
Phases in a therapeutic conversation: |
|
|
1. introduction: {self.PHASES['introduction']['description']} |
|
|
2. exploration: {self.PHASES['exploration']['description']} |
|
|
3. intervention: {self.PHASES['intervention']['description']} |
|
|
4. conclusion: {self.PHASES['conclusion']['description']} |
|
|
|
|
|
Consider: |
|
|
1. Have the goals of the current phase been sufficiently addressed? |
|
|
2. Is the timing appropriate considering overall session progress? |
|
|
3. Is there a natural transition point in the conversation? |
|
|
4. Does the emotional content suggest readiness to move forward? |
|
|
|
|
|
First, provide your analysis of whether the key goals of the current phase have been met. |
|
|
Then decide if the conversation should transition to the next phase. |
|
|
|
|
|
Respond with a JSON object in this format: |
|
|
{{ |
|
|
"goals_progress": {{ |
|
|
"goal1": 0.5, |
|
|
"goal2": 0.7 |
|
|
}}, |
|
|
"should_transition": false, |
|
|
"next_phase": "exploration", |
|
|
"reasoning": "brief explanation" |
|
|
}} |
|
|
|
|
|
Output ONLY valid JSON without additional text. |
|
|
""" |
|
|
|
|
|
response = self.llm.invoke(prompt) |
|
|
|
|
|
try: |
|
|
|
|
|
evaluation = json.loads(response) |
|
|
|
|
|
phase_transition = PhaseTransitionResponse.parse_obj(evaluation) |
|
|
|
|
|
|
|
|
for goal, score in phase_transition.goals_progress.items(): |
|
|
if goal in current_phase.goals: |
|
|
current_phase.completion_metrics[goal] = score |
|
|
|
|
|
|
|
|
if phase_transition.should_transition: |
|
|
if phase_transition.next_phase in self.PHASES: |
|
|
self._transition_to_phase(user_id, phase_transition.next_phase, phase_transition.reasoning) |
|
|
except (json.JSONDecodeError, ValueError): |
|
|
self._check_time_based_transition(user_id) |
|
|
|
|
|
def _check_time_based_transition(self, user_id: str): |
|
|
|
|
|
session = self.user_sessions[user_id] |
|
|
current_phase = session['current_phase'] |
|
|
|
|
|
|
|
|
started_at = datetime.fromisoformat(session['started_at']) |
|
|
now = datetime.now() |
|
|
elapsed_minutes = (now - started_at).total_seconds() / 60 |
|
|
|
|
|
|
|
|
intro_threshold = self.PHASES['introduction']['typical_duration'] |
|
|
explore_threshold = intro_threshold + self.PHASES['exploration']['typical_duration'] |
|
|
intervention_threshold = explore_threshold + self.PHASES['intervention']['typical_duration'] |
|
|
|
|
|
|
|
|
next_phase = None |
|
|
if current_phase.name == 'introduction' and elapsed_minutes >= intro_threshold: |
|
|
next_phase = 'exploration' |
|
|
elif current_phase.name == 'exploration' and elapsed_minutes >= explore_threshold: |
|
|
next_phase = 'intervention' |
|
|
elif current_phase.name == 'intervention' and elapsed_minutes >= intervention_threshold: |
|
|
next_phase = 'conclusion' |
|
|
|
|
|
if next_phase: |
|
|
self._transition_to_phase(user_id, next_phase, "Time-based transition") |
|
|
|
|
|
def _transition_to_phase(self, user_id: str, next_phase_name: str, reason: str): |
|
|
|
|
|
session = self.user_sessions[user_id] |
|
|
current_phase = session['current_phase'] |
|
|
|
|
|
|
|
|
now = datetime.now().isoformat() |
|
|
current_phase.ended_at = now |
|
|
|
|
|
|
|
|
new_phase = ConversationPhase( |
|
|
name=next_phase_name, |
|
|
description=self.PHASES[next_phase_name]['description'], |
|
|
goals=self.PHASES[next_phase_name]['goals'], |
|
|
typical_duration=self.PHASES[next_phase_name]['typical_duration'], |
|
|
started_at=now |
|
|
) |
|
|
|
|
|
|
|
|
session['current_phase'] = new_phase |
|
|
session['phase_history'].append(new_phase) |
|
|
|
|
|
logger.info(f"User {user_id} transitioned from {current_phase.name} to {next_phase_name}: {reason}") |
|
|
|
|
|
def _update_session_characteristics(self, user_id: str): |
|
|
session = self.user_sessions[user_id] |
|
|
|
|
|
|
|
|
if session['message_count'] < 5: |
|
|
return |
|
|
|
|
|
|
|
|
message_sample = [] |
|
|
emotion_summary = {} |
|
|
|
|
|
|
|
|
for i, emotion_data in enumerate(session['emotion_history'][-10:]): |
|
|
msg_idx = emotion_data['message_idx'] |
|
|
if i % 2 == 0: |
|
|
message_sample.append(f"Message {msg_idx}: User emotions: {', '.join([f'{e}({s:.2f})' for e, s in sorted(emotion_data['emotions'].items(), key=lambda x: x[1], reverse=True)[:2]])}") |
|
|
|
|
|
|
|
|
for emotion, score in emotion_data['emotions'].items(): |
|
|
if score > 0.3: |
|
|
emotion_summary[emotion] = emotion_summary.get(emotion, 0) + score |
|
|
|
|
|
|
|
|
if emotion_summary: |
|
|
total = sum(emotion_summary.values()) |
|
|
emotion_summary = {e: s/total for e, s in emotion_summary.items()} |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
Analyze this therapy session and provide a JSON response with the following characteristics: |
|
|
|
|
|
Current session state: |
|
|
- Phase: {session['current_phase'].name} ({session['current_phase'].description}) |
|
|
- Message count: {session['message_count']} |
|
|
- Emotion summary: {', '.join([f'{e}({s:.2f})' for e, s in sorted(emotion_summary.items(), key=lambda x: x[1], reverse=True)])} |
|
|
|
|
|
Recent messages: |
|
|
{chr(10).join(message_sample)} |
|
|
|
|
|
Required JSON format: |
|
|
{{ |
|
|
"alliance_strength": 0.8, |
|
|
"engagement_level": 0.7, |
|
|
"emotional_pattern": "brief description of emotional pattern", |
|
|
"cognitive_pattern": "brief description of cognitive pattern", |
|
|
"coping_mechanisms": ["mechanism1", "mechanism2"], |
|
|
"progress_quality": 0.6, |
|
|
"recommended_focus": "brief therapeutic recommendation" |
|
|
}} |
|
|
|
|
|
Important: |
|
|
1. Respond with ONLY the JSON object |
|
|
2. Use numbers between 0.0 and 1.0 for alliance_strength, engagement_level, and progress_quality |
|
|
3. Keep descriptions brief and focused |
|
|
4. Include at least 2 coping mechanisms |
|
|
5. Provide a specific recommended focus |
|
|
|
|
|
JSON Response: |
|
|
""" |
|
|
|
|
|
response = self.llm.invoke(prompt) |
|
|
|
|
|
try: |
|
|
|
|
|
characteristics = json.loads(response) |
|
|
|
|
|
session_chars = SessionCharacteristics.parse_obj(characteristics) |
|
|
session['llm_context']['session_characteristics'] = session_chars.dict() |
|
|
logger.info(f"Updated session characteristics for user {user_id}") |
|
|
except (json.JSONDecodeError, ValueError) as e: |
|
|
logger.warning(f"Failed to parse session characteristics: {e}") |
|
|
|
|
|
def _create_flow_context(self, user_id: str) -> Dict[str, Any]: |
|
|
|
|
|
session = self.user_sessions[user_id] |
|
|
current_phase = session['current_phase'] |
|
|
|
|
|
|
|
|
started_at = datetime.fromisoformat(session['started_at']) |
|
|
now = datetime.now() |
|
|
elapsed_seconds = (now - started_at).total_seconds() |
|
|
remaining_seconds = max(0, self.session_duration - elapsed_seconds) |
|
|
|
|
|
|
|
|
emotions_summary = {} |
|
|
for emotion_data in session['emotion_history'][-3:]: |
|
|
for emotion, score in emotion_data['emotions'].items(): |
|
|
emotions_summary[emotion] = emotions_summary.get(emotion, 0) + score |
|
|
|
|
|
if emotions_summary: |
|
|
primary_emotions = sorted(emotions_summary.items(), key=lambda x: x[1], reverse=True)[:3] |
|
|
else: |
|
|
primary_emotions = [] |
|
|
|
|
|
|
|
|
phase_guidance = [] |
|
|
|
|
|
|
|
|
if current_phase.name == 'introduction': |
|
|
phase_guidance.append("Build rapport and identify main concerns") |
|
|
if session['message_count'] > 3: |
|
|
phase_guidance.append("Begin exploring emotional context") |
|
|
|
|
|
elif current_phase.name == 'exploration': |
|
|
phase_guidance.append("Deepen understanding of issues and contexts") |
|
|
phase_guidance.append("Connect emotional patterns to identify themes") |
|
|
|
|
|
elif current_phase.name == 'intervention': |
|
|
phase_guidance.append("Offer support strategies and therapeutic insights") |
|
|
if remaining_seconds < 600: |
|
|
phase_guidance.append("Begin consolidating key insights") |
|
|
|
|
|
elif current_phase.name == 'conclusion': |
|
|
phase_guidance.append("Summarize insights and establish next steps") |
|
|
phase_guidance.append("Provide closure while maintaining supportive presence") |
|
|
|
|
|
|
|
|
if 'session_characteristics' in session['llm_context']: |
|
|
char = session['llm_context']['session_characteristics'] |
|
|
|
|
|
|
|
|
if char.get('alliance_strength', 0.8) < 0.6: |
|
|
phase_guidance.append("Focus on strengthening therapeutic alliance") |
|
|
|
|
|
|
|
|
if char.get('engagement_level', 0.8) < 0.6: |
|
|
phase_guidance.append("Increase engagement with more personalized responses") |
|
|
|
|
|
|
|
|
if 'recommended_focus' in char: |
|
|
phase_guidance.append(char['recommended_focus']) |
|
|
|
|
|
|
|
|
flow_context = { |
|
|
'phase': { |
|
|
'name': current_phase.name, |
|
|
'description': current_phase.description, |
|
|
'goals': current_phase.goals |
|
|
}, |
|
|
'session': { |
|
|
'elapsed_minutes': elapsed_seconds / 60, |
|
|
'remaining_minutes': remaining_seconds / 60, |
|
|
'progress_percentage': (elapsed_seconds / self.session_duration) * 100, |
|
|
'message_count': session['message_count'] |
|
|
}, |
|
|
'emotions': [{'name': e, 'intensity': s} for e, s in primary_emotions], |
|
|
'guidance': phase_guidance |
|
|
} |
|
|
|
|
|
return flow_context |