File size: 6,406 Bytes
ae279de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Session manager with automatic cleanup for RAG chains"""

import time
import threading
from typing import Optional, Dict, Any
import logging

logger = logging.getLogger(__name__)


class SessionManager:
    """Manages RAG chains with automatic cleanup and TTL"""
    
    def __init__(self, ttl_seconds: int = 86400):
        """
        Initialize SessionManager
        
        Args:
            ttl_seconds: Time-to-live for sessions in seconds (default: 24 hours)
        """
        self.sessions: Dict[str, dict] = {}
        self.ttl = ttl_seconds
        self.lock = threading.Lock()
        
        # Start cleanup thread
        self.cleanup_thread = threading.Thread(
            target=self._cleanup_loop, 
            daemon=True
        )
        self.cleanup_thread.start()
        logger.info(f"SessionManager initialized with TTL={ttl_seconds}s")
    
    def create_session(self, session_id: str, rag_chain: Any, metadata: dict = None) -> str:
        """
        Create a new session with automatic expiry
        
        Args:
            session_id: Unique session identifier
            rag_chain: RAG chain object to store
            metadata: Optional metadata dictionary
            
        Returns:
            session_id for verification
        """
        with self.lock:
            self.sessions[session_id] = {
                'chain': rag_chain,
                'created_at': time.time(),
                'last_accessed': time.time(),
                'metadata': metadata or {},
                'access_count': 0
            }
            logger.info(f"Session created: {session_id}. Total: {len(self.sessions)}")
        return session_id
    
    def get_session(self, session_id: str) -> Optional[Any]:
        """
        Retrieve a session and update access time
        
        Args:
            session_id: Session to retrieve
            
        Returns:
            RAG chain object or None if not found
        """
        with self.lock:
            if session_id not in self.sessions:
                return None
            
            session_data = self.sessions[session_id]
            session_data['last_accessed'] = time.time()
            session_data['access_count'] += 1
            
            return session_data['chain']
    
    def delete_session(self, session_id: str) -> bool:
        """
        Manually delete a session
        
        Args:
            session_id: Session to delete
            
        Returns:
            True if deleted, False if not found
        """
        with self.lock:
            if session_id in self.sessions:
                del self.sessions[session_id]
                logger.info(f"Session deleted: {session_id}. Remaining: {len(self.sessions)}")
                return True
        return False
    
    def get_session_count(self) -> int:
        """Get number of active sessions"""
        with self.lock:
            return len(self.sessions)
    
    def get_session_info(self, session_id: str) -> Optional[dict]:
        """
        Get detailed session information
        
        Args:
            session_id: Session to query
            
        Returns:
            Dictionary with session metadata or None
        """
        with self.lock:
            if session_id not in self.sessions:
                return None
            
            session_data = self.sessions[session_id]
            current_time = time.time()
            
            return {
                'session_id': session_id,
                'created_at': session_data['created_at'],
                'last_accessed': session_data['last_accessed'],
                'access_count': session_data['access_count'],
                'age_seconds': current_time - session_data['created_at'],
                'idle_seconds': current_time - session_data['last_accessed'],
                'ttl_seconds': self.ttl,
                'expires_at': session_data['created_at'] + self.ttl,
            }
    
    def get_all_sessions_info(self) -> list:
        """Get information about all active sessions"""
        with self.lock:
            return [self.get_session_info(sid) for sid in self.sessions.keys()]
    
    def _cleanup_loop(self):
        """Background cleanup thread that runs periodically"""
        while True:
            try:
                time.sleep(300)  # Check every 5 minutes
                self._cleanup_expired_sessions()
            except Exception as e:
                logger.error(f"Cleanup loop error: {e}")
    
    def _cleanup_expired_sessions(self):
        """Remove sessions that have exceeded their TTL"""
        current_time = time.time()
        expired = []
        
        with self.lock:
            for session_id, session_data in list(self.sessions.items()):
                age = current_time - session_data['created_at']
                if age > self.ttl:
                    expired.append(session_id)
            
            # Remove expired sessions
            for session_id in expired:
                del self.sessions[session_id]
                logger.info(f"Expired session: {session_id}")
        
        if expired:
            logger.info(f"Cleaned {len(expired)} expired sessions. Active: {len(self.sessions)}")
    
    def cleanup_all(self):
        """Clear all sessions (typically called on shutdown)"""
        with self.lock:
            count = len(self.sessions)
            self.sessions.clear()
            logger.info(f"Cleaned up all {count} sessions")
    
    def get_memory_stats(self) -> dict:
        """Get memory statistics"""
        with self.lock:
            total_age = 0
            total_idle = 0
            total_accesses = 0
            current_time = time.time()
            
            for session_data in self.sessions.values():
                total_age += current_time - session_data['created_at']
                total_idle += current_time - session_data['last_accessed']
                total_accesses += session_data['access_count']
            
            count = len(self.sessions)
            
            return {
                'total_sessions': count,
                'avg_age_seconds': total_age / count if count > 0 else 0,
                'avg_idle_seconds': total_idle / count if count > 0 else 0,
                'total_accesses': total_accesses,
                'ttl_seconds': self.ttl,
            }