saik0s's picture
Add files using upload-large-folder tool
8d5e143 verified
Raw
History Blame Contribute Delete
11.3 kB
import os
import json
import datetime
import re
from pathlib import Path
from typing import List, Dict, Optional, Tuple
import time
class ChatSessionManager:
"""
Manages chat sessions for the OpenRouter node.
Handles creating, loading, and updating chat conversations with automatic session management.
"""
def __init__(self, base_path: str = None):
"""
Initialize the chat session manager.
Args:
base_path: Base directory for storing chats. Defaults to node's directory + /chats
"""
if base_path is None:
base_path = os.path.join(os.path.dirname(__file__), "chats")
self.base_path = Path(base_path)
self.base_path.mkdir(exist_ok=True)
self.current_session_path = None
self.session_timeout_hours = 1 # Sessions expire after 1 hour of inactivity
def _sanitize_filename(self, text: str, max_length: int = 50) -> str:
"""
Sanitize text for use in filenames.
Args:
text: Text to sanitize
max_length: Maximum length of the sanitized text
Returns:
Sanitized text safe for filenames
"""
# Get first 5 words or max_length characters, whichever is shorter
words = text.split()[:5]
text = " ".join(words)
# Remove special characters
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[-\s]+', '_', text)
# Truncate to max length
if len(text) > max_length:
text = text[:max_length].rstrip('_')
return text.lower()
def _get_timestamp(self) -> str:
"""Get current timestamp in format YYYYMMDD_HHMMSS"""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _get_iso_timestamp(self) -> str:
"""Get current timestamp in ISO format"""
return datetime.datetime.now().isoformat()
def _find_active_session(self) -> Optional[Path]:
"""
Find an active session within the timeout period.
Returns:
Path to active session directory or None
"""
if not self.base_path.exists():
return None
current_time = time.time()
# Get all session directories
sessions = [d for d in self.base_path.iterdir() if d.is_dir()]
# Sort by modification time (most recent first)
sessions.sort(key=lambda x: x.stat().st_mtime, reverse=True)
for session_dir in sessions:
conversation_file = session_dir / "conversation.json"
if conversation_file.exists():
# Check if session is within timeout period
last_modified = conversation_file.stat().st_mtime
hours_elapsed = (current_time - last_modified) / 3600
if hours_elapsed <= self.session_timeout_hours:
return session_dir
return None
def _create_new_session(self, first_message: str) -> Path:
"""
Create a new chat session directory.
Args:
first_message: The first user message to use in the session name
Returns:
Path to the new session directory
"""
timestamp = self._get_timestamp()
sanitized_message = self._sanitize_filename(first_message)
# Create session directory name
session_name = f"session_{timestamp}_{sanitized_message}"
session_path = self.base_path / session_name
# Handle potential naming conflicts
counter = 1
while session_path.exists():
session_path = self.base_path / f"{session_name}_{counter}"
counter += 1
session_path.mkdir(exist_ok=True)
return session_path
def get_or_create_session(self, user_message: str, system_prompt: str) -> Tuple[Path, List[Dict]]:
"""
Get an active session or create a new one.
Args:
user_message: The current user message
system_prompt: The system prompt to use
Returns:
Tuple of (session_path, message_history)
"""
# Try to find an active session
active_session = self._find_active_session()
if active_session:
self.current_session_path = active_session
messages = self.load_conversation(active_session)
return active_session, messages
else:
# Create new session
new_session = self._create_new_session(user_message)
self.current_session_path = new_session
# Initialize with system prompt
messages = [{"role": "system", "content": system_prompt}]
# Create initial conversation file
conversation_data = {
"session_id": new_session.name,
"created_at": self._get_iso_timestamp(),
"last_updated": self._get_iso_timestamp(),
"messages": messages
}
conversation_file = new_session / "conversation.json"
with open(conversation_file, 'w', encoding='utf-8') as f:
json.dump(conversation_data, f, indent=2, ensure_ascii=False)
return new_session, messages
def load_conversation(self, session_path: Path) -> List[Dict]:
"""
Load conversation history from a session.
Args:
session_path: Path to the session directory
Returns:
List of messages in the conversation
"""
conversation_file = session_path / "conversation.json"
if not conversation_file.exists():
return []
try:
with open(conversation_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("messages", [])
except (json.JSONDecodeError, IOError) as e:
print(f"Error loading conversation from {conversation_file}: {e}")
return []
def save_conversation(self, session_path: Path, messages: List[Dict]):
"""
Save conversation history to a session.
Args:
session_path: Path to the session directory
messages: List of messages to save
"""
conversation_file = session_path / "conversation.json"
# Load existing data to preserve metadata
existing_data = {}
if conversation_file.exists():
try:
with open(conversation_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (json.JSONDecodeError, IOError):
pass
# Update conversation data
conversation_data = {
"session_id": existing_data.get("session_id", session_path.name),
"created_at": existing_data.get("created_at", self._get_iso_timestamp()),
"last_updated": self._get_iso_timestamp(),
"messages": messages
}
# Save to file
try:
with open(conversation_file, 'w', encoding='utf-8') as f:
json.dump(conversation_data, f, indent=2, ensure_ascii=False)
except IOError as e:
print(f"Error saving conversation to {conversation_file}: {e}")
def append_message(self, role: str, content: str, session_path: Optional[Path] = None):
"""
Append a message to the current or specified session.
Args:
role: Message role ('user', 'assistant', or 'system')
content: Message content
session_path: Optional specific session path, uses current if not provided
"""
if session_path is None:
session_path = self.current_session_path
if session_path is None:
raise ValueError("No active session to append message to")
# Load existing messages
messages = self.load_conversation(session_path)
# Append new message
messages.append({
"role": role,
"content": content,
"timestamp": self._get_iso_timestamp()
})
# Save updated conversation
self.save_conversation(session_path, messages)
def get_recent_sessions(self, limit: int = 10) -> List[Dict]:
"""
Get a list of recent chat sessions.
Args:
limit: Maximum number of sessions to return
Returns:
List of session metadata dictionaries
"""
if not self.base_path.exists():
return []
sessions = []
session_dirs = [d for d in self.base_path.iterdir() if d.is_dir()]
# Sort by modification time (most recent first)
session_dirs.sort(key=lambda x: x.stat().st_mtime, reverse=True)
for session_dir in session_dirs[:limit]:
conversation_file = session_dir / "conversation.json"
if conversation_file.exists():
try:
with open(conversation_file, 'r', encoding='utf-8') as f:
data = json.load(f)
sessions.append({
"session_id": data.get("session_id", session_dir.name),
"path": str(session_dir),
"created_at": data.get("created_at"),
"last_updated": data.get("last_updated"),
"message_count": len(data.get("messages", [])),
"first_user_message": next(
(msg["content"] for msg in data.get("messages", [])
if msg["role"] == "user"),
"No user message"
)[:100] # First 100 chars of first user message
})
except (json.JSONDecodeError, IOError):
continue
return sessions
def clear_old_sessions(self, days: int = 30):
"""
Clear sessions older than specified days.
Args:
days: Number of days to keep sessions
"""
if not self.base_path.exists():
return
current_time = time.time()
cutoff_time = current_time - (days * 24 * 3600)
for session_dir in self.base_path.iterdir():
if session_dir.is_dir():
# Check modification time
if session_dir.stat().st_mtime < cutoff_time:
# Remove old session
try:
import shutil
shutil.rmtree(session_dir)
print(f"Removed old session: {session_dir.name}")
except Exception as e:
print(f"Error removing session {session_dir.name}: {e}")