import uuid import json import os from datetime import datetime from config import SESSION_MEMORY_LIMIT class SessionManager: def __init__(self): self.session_id = None self.conversation_history = [] self.last_response_audio = None self.sessions_dir = "sessions" # Create sessions directory if it doesn't exist if not os.path.exists(self.sessions_dir): os.makedirs(self.sessions_dir) def start_session(self, session_id=None): """Start a new session or resume an existing one""" if session_id: self.session_id = session_id self._load_session(session_id) else: self.session_id = str(uuid.uuid4()) self.conversation_history = [] print(f"Session active with ID: {self.session_id}") return self.session_id def add_interaction(self, user_text, nova_text): """Add a conversation turn to the history""" # Make sure we have valid content, especially for automated greetings if not user_text or user_text.strip() == "": user_text = "..." # Placeholder for empty user text (e.g., when Nova speaks first) interaction = { "timestamp": datetime.now().isoformat(), "user": user_text, "nova": nova_text } # Print to console for visibility print(f"\nUser: {user_text}") print(f"Nova: {nova_text}\n") self.conversation_history.append(interaction) # Keep conversation history within the limit if len(self.conversation_history) > SESSION_MEMORY_LIMIT: self.conversation_history.pop(0) # Save the session after each interaction self._save_session() def get_conversation_context(self): """Return formatted conversation history for context""" if not self.conversation_history: return "" context = "" for interaction in self.conversation_history: context += f"User: {interaction['user']}\nNova: {interaction['nova']}\n" return context def set_last_response(self, audio_data): """Store the last audio response for replay""" self.last_response_audio = audio_data def get_last_response(self): """Get the last audio response for replay""" return self.last_response_audio def _save_session(self): """Save the session to disk""" session_file = os.path.join(self.sessions_dir, f"{self.session_id}.json") with open(session_file, 'w') as f: json.dump({ "session_id": self.session_id, "conversation_history": self.conversation_history }, f) def _load_session(self, session_id): """Load a session from disk""" session_file = os.path.join(self.sessions_dir, f"{session_id}.json") if os.path.exists(session_file): with open(session_file, 'r') as f: data = json.load(f) self.conversation_history = data.get("conversation_history", []) print(f"Loaded existing session with {len(self.conversation_history)} interactions") else: self.conversation_history = [] print("Creating new session history")