File size: 7,618 Bytes
461adca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
Session manager for handling user sessions.
"""
import asyncio
import uuid
from typing import Dict, Optional
from config import get_settings
from src.session.state import UserSessionState, generate_session_id
from src.session.storage import create_storage, BaseStorage


class SessionManager:
    """
    Manages user sessions with isolation.
    
    This class ensures that each user has their own isolated session state,
    which is critical for multi-user environments like Hugging Face Spaces.
    """
    
    def __init__(self, storage_type: str = "memory", **storage_kwargs):
        """
        Initialize the session manager.
        
        Args:
            storage_type: Type of storage ("memory" or "redis")
            **storage_kwargs: Additional arguments for storage initialization
        """
        self.storage: BaseStorage = create_storage(storage_type, **storage_kwargs)
        self._cleanup_task: Optional[asyncio.Task] = None
        self._lock = asyncio.Lock()
        
        # Don't start cleanup task here — no event loop may be running yet.
        # It will be started lazily on first get_session() call.
    
    def _start_cleanup_task(self) -> None:
        """Start the background cleanup task (only if inside a running event loop)."""
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            # No running event loop — skip, will retry on next get_session()
            return
        if self._cleanup_task is None or self._cleanup_task.done():
            self._cleanup_task = loop.create_task(self._cleanup_loop())
    
    async def _cleanup_loop(self) -> None:
        """Background task to clean up expired sessions."""
        try:
            settings = get_settings()
            cleanup_interval = settings.session.cleanup_interval_minutes
            
            while True:
                try:
                    await asyncio.sleep(cleanup_interval * 60)  # Convert to seconds
                    
                    timeout_minutes = settings.session.timeout_minutes
                    cleaned_count = await self.storage.cleanup_expired(timeout_minutes)
                    
                    if cleaned_count > 0:
                        print(f"Cleaned up {cleaned_count} expired sessions")
                        
                except Exception as e:
                    print(f"Error in session cleanup: {e}")
                    await asyncio.sleep(60)  # Wait a minute before retrying
                    
        except asyncio.CancelledError:
            print("Session cleanup task cancelled")
            raise
    
    async def get_session(self, session_id: Optional[str] = None) -> UserSessionState:
        """
        Get or create a user session.
        
        Args:
            session_id: Existing session ID, or None to create new
            
        Returns:
            UserSessionState instance
        """
        # Lazily start cleanup task now that we're inside a running event loop
        self._start_cleanup_task()
        
        async with self._lock:
            if session_id is None:
                session_id = generate_session_id()
            
            # Try to get existing session
            session = await self.storage.get(session_id)
            
            if session is None:
                # Create new session
                session = UserSessionState(session_id=session_id)
                await self.storage.set(session)
                print(f"Created new session: {session_id}")
            else:
                # Update activity timestamp
                session.update_activity()
                await self.storage.set(session)
            
            return session
    
    async def update_session(self, session: UserSessionState) -> None:
        """
        Update session data in storage.
        
        Args:
            session: Session to update
        """
        async with self._lock:
            session.update_activity()
            await self.storage.set(session)
    
    async def delete_session(self, session_id: str) -> None:
        """
        Delete a session.
        
        Args:
            session_id: ID of session to delete
        """
        async with self._lock:
            await self.storage.delete(session_id)
            print(f"Deleted session: {session_id}")
    
    async def cleanup_expired_sessions(self) -> int:
        """
        Manually trigger cleanup of expired sessions.
        
        Returns:
            Number of sessions cleaned up
        """
        async with self._lock:
            settings = get_settings()
            timeout_minutes = settings.session.timeout_minutes
            cleaned_count = await self.storage.cleanup_expired(timeout_minutes)
            
            if cleaned_count > 0:
                print(f"Manually cleaned up {cleaned_count} expired sessions")
            
            return cleaned_count
    
    async def get_all_sessions(self) -> Dict[str, UserSessionState]:
        """
        Get all active sessions (for monitoring/debugging).
        
        Returns:
            Dictionary of session_id -> UserSessionState
        """
        async with self._lock:
            return await self.storage.get_all_sessions()
    
    async def get_session_count(self) -> int:
        """
        Get the total number of active sessions.
        
        Returns:
            Number of active sessions
        """
        sessions = await self.get_all_sessions()
        return len(sessions)
    
    async def shutdown(self) -> None:
        """Shutdown the session manager and cleanup resources."""
        if self._cleanup_task and not self._cleanup_task.done():
            self._cleanup_task.cancel()
            try:
                await self._cleanup_task
            except asyncio.CancelledError:
                pass
        
        print("Session manager shutdown complete")
    
    def __str__(self) -> str:
        """String representation for debugging."""
        return f"SessionManager(storage_type={type(self.storage).__name__})"
    
    def __repr__(self) -> str:
        """Detailed string representation."""
        return self.__str__()


# Global session manager instance
_session_manager: Optional[SessionManager] = None


def get_session_manager() -> SessionManager:
    """Get or create the global session manager instance."""
    global _session_manager
    
    if _session_manager is None:
        settings = get_settings()
        
        # Configure storage based on settings
        storage_kwargs = {}
        if settings.session.storage_type == "redis":
            storage_kwargs.update({
                "host": settings.redis.host,
                "port": settings.redis.port,
                "db": settings.redis.db,
                "password": settings.redis.password,
            })
        
        _session_manager = SessionManager(
            storage_type=settings.session.storage_type,
            **storage_kwargs
        )
    
    return _session_manager


async def create_user_session() -> UserSessionState:
    """
    Create a new user session.
    
    Returns:
        New UserSessionState instance
    """
    manager = get_session_manager()
    return await manager.get_session()


async def get_user_session(session_id: str) -> UserSessionState:
    """
    Get an existing user session.
    
    Args:
        session_id: Session ID
        
    Returns:
        UserSessionState instance
    """
    manager = get_session_manager()
    return await manager.get_session(session_id)