Spaces:
Sleeping
Sleeping
| """ | |
| Conversation Session Management - Track live conversations | |
| """ | |
| import json | |
| import uuid | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| class ConversationTurn: | |
| """Represents a single turn in a conversation""" | |
| def __init__(self, role: str, content: str, timestamp: str = None, | |
| node_id: str = None, summary: str = None): | |
| self.role = role # "ai" or "user" | |
| self.content = content | |
| self.timestamp = timestamp or datetime.now().isoformat() | |
| self.node_id = node_id # Which node in the flow this relates to | |
| self.summary = summary # AI's summary of this turn | |
| def to_dict(self) -> Dict: | |
| """Convert turn to dictionary""" | |
| return { | |
| "role": self.role, | |
| "content": self.content, | |
| "timestamp": self.timestamp, | |
| "node_id": self.node_id, | |
| "summary": self.summary | |
| } | |
| def from_dict(cls, data: Dict) -> 'ConversationTurn': | |
| """Create turn from dictionary""" | |
| return cls( | |
| role=data.get("role"), | |
| content=data.get("content"), | |
| timestamp=data.get("timestamp"), | |
| node_id=data.get("node_id"), | |
| summary=data.get("summary") | |
| ) | |
| class ConversationSession: | |
| """Manages a live conversation session""" | |
| def __init__(self, session_id: str = None, flow_id: str = None, | |
| respondent_id: str = None, flow_name: str = ""): | |
| self.id = session_id or str(uuid.uuid4()) | |
| self.flow_id = flow_id | |
| self.flow_name = flow_name | |
| self.respondent_id = respondent_id or f"respondent_{uuid.uuid4().hex[:8]}" | |
| self.conversation_history: List[ConversationTurn] = [] | |
| self.current_node_id: Optional[str] = None | |
| self.started_at = datetime.now().isoformat() | |
| self.ended_at: Optional[str] = None | |
| self.status = "active" # "active", "completed", "abandoned" | |
| self.metadata = {} | |
| def add_turn(self, role: str, content: str, node_id: str = None, summary: str = None): | |
| """Add a turn to the conversation""" | |
| turn = ConversationTurn( | |
| role=role, | |
| content=content, | |
| node_id=node_id, | |
| summary=summary | |
| ) | |
| self.conversation_history.append(turn) | |
| def get_conversation_for_llm(self) -> List[Dict[str, str]]: | |
| """Get conversation history in format suitable for LLM""" | |
| messages = [] | |
| for turn in self.conversation_history: | |
| messages.append({ | |
| "role": "assistant" if turn.role == "ai" else "user", | |
| "content": turn.content | |
| }) | |
| return messages | |
| def get_last_user_message(self) -> Optional[str]: | |
| """Get the most recent user message""" | |
| for turn in reversed(self.conversation_history): | |
| if turn.role == "user": | |
| return turn.content | |
| return None | |
| def get_turn_count(self) -> int: | |
| """Get total number of turns""" | |
| return len(self.conversation_history) | |
| def end_session(self): | |
| """Mark session as completed""" | |
| self.status = "completed" | |
| self.ended_at = datetime.now().isoformat() | |
| def abandon_session(self): | |
| """Mark session as abandoned""" | |
| self.status = "abandoned" | |
| self.ended_at = datetime.now().isoformat() | |
| def to_dict(self) -> Dict: | |
| """Convert session to dictionary""" | |
| return { | |
| "id": self.id, | |
| "flow_id": self.flow_id, | |
| "flow_name": self.flow_name, | |
| "respondent_id": self.respondent_id, | |
| "conversation_history": [turn.to_dict() for turn in self.conversation_history], | |
| "current_node_id": self.current_node_id, | |
| "started_at": self.started_at, | |
| "ended_at": self.ended_at, | |
| "status": self.status, | |
| "metadata": self.metadata | |
| } | |
| def from_dict(cls, data: Dict) -> 'ConversationSession': | |
| """Create session from dictionary""" | |
| session = cls( | |
| session_id=data.get("id"), | |
| flow_id=data.get("flow_id"), | |
| respondent_id=data.get("respondent_id"), | |
| flow_name=data.get("flow_name", "") | |
| ) | |
| session.conversation_history = [ | |
| ConversationTurn.from_dict(t) for t in data.get("conversation_history", []) | |
| ] | |
| session.current_node_id = data.get("current_node_id") | |
| session.started_at = data.get("started_at", datetime.now().isoformat()) | |
| session.ended_at = data.get("ended_at") | |
| session.status = data.get("status", "active") | |
| session.metadata = data.get("metadata", {}) | |
| return session | |
| def save_to_file(self, filepath: str): | |
| """Save session to JSON file""" | |
| with open(filepath, 'w') as f: | |
| json.dump(self.to_dict(), f, indent=2) | |
| def load_from_file(cls, filepath: str) -> 'ConversationSession': | |
| """Load session from JSON file""" | |
| with open(filepath, 'r') as f: | |
| data = json.load(f) | |
| return cls.from_dict(data) | |
| def get_transcript(self) -> str: | |
| """Get conversation as readable transcript""" | |
| lines = [] | |
| lines.append(f"Conversation Session: {self.id}") | |
| lines.append(f"Flow: {self.flow_name}") | |
| lines.append(f"Respondent: {self.respondent_id}") | |
| lines.append(f"Started: {self.started_at}") | |
| if self.ended_at: | |
| lines.append(f"Ended: {self.ended_at}") | |
| lines.append(f"Status: {self.status}") | |
| lines.append("\n" + "="*60 + "\n") | |
| for i, turn in enumerate(self.conversation_history, 1): | |
| speaker = "AI Moderator" if turn.role == "ai" else "Respondent" | |
| lines.append(f"[{i}] {speaker} ({turn.timestamp}):") | |
| lines.append(f"{turn.content}\n") | |
| if turn.summary: | |
| lines.append(f" Summary: {turn.summary}\n") | |
| return "\n".join(lines) | |
| def get_summary_stats(self) -> Dict: | |
| """Get summary statistics about the session""" | |
| user_turns = [t for t in self.conversation_history if t.role == "user"] | |
| ai_turns = [t for t in self.conversation_history if t.role == "ai"] | |
| return { | |
| "total_turns": len(self.conversation_history), | |
| "user_turns": len(user_turns), | |
| "ai_turns": len(ai_turns), | |
| "avg_user_response_length": sum(len(t.content) for t in user_turns) / max(len(user_turns), 1), | |
| "duration_minutes": self._calculate_duration_minutes(), | |
| "status": self.status | |
| } | |
| def _calculate_duration_minutes(self) -> float: | |
| """Calculate session duration in minutes""" | |
| if not self.ended_at: | |
| end_time = datetime.now() | |
| else: | |
| end_time = datetime.fromisoformat(self.ended_at) | |
| start_time = datetime.fromisoformat(self.started_at) | |
| duration = (end_time - start_time).total_seconds() / 60 | |
| return round(duration, 2) | |
| class SessionManager: | |
| """Manages multiple conversation sessions""" | |
| def __init__(self): | |
| self.sessions: Dict[str, ConversationSession] = {} | |
| def create_session(self, flow_id: str, flow_name: str = "", respondent_id: str = None) -> ConversationSession: | |
| """Create a new session""" | |
| session = ConversationSession( | |
| flow_id=flow_id, | |
| flow_name=flow_name, | |
| respondent_id=respondent_id | |
| ) | |
| self.sessions[session.id] = session | |
| return session | |
| def get_session(self, session_id: str) -> Optional[ConversationSession]: | |
| """Get a session by ID""" | |
| return self.sessions.get(session_id) | |
| def get_active_sessions(self) -> List[ConversationSession]: | |
| """Get all active sessions""" | |
| return [s for s in self.sessions.values() if s.status == "active"] | |
| def get_all_sessions(self) -> List[ConversationSession]: | |
| """Get all sessions""" | |
| return list(self.sessions.values()) | |
| def end_session(self, session_id: str): | |
| """End a session""" | |
| session = self.sessions.get(session_id) | |
| if session: | |
| session.end_session() | |