Spaces:
Runtime error
Runtime error
Silicon Valley - Admin
Enhance session management and Dockerfile for improved performance and security
362d092
| import asyncio | |
| import uuid | |
| import time | |
| from typing import AsyncGenerator, Dict, Tuple, Any | |
| from dataclasses import dataclass | |
| class SessionDoesNotExist(Exception): | |
| pass | |
| class SessionAlreadyExists(Exception): | |
| pass | |
| class ClientError(Exception): | |
| def __init__(self, message): | |
| super().__init__(message) | |
| self.message = message | |
| class ClientRequest: | |
| request_id: str | |
| data: Any | |
| class ClientResponse: | |
| request_id: str | |
| error: bool | |
| data: Any | |
| class SessionBroker: | |
| def __init__(self, session_timeout: int = 3600): # 1 hora por defecto | |
| self.sessions: Dict[str, asyncio.Queue] = {} | |
| self.pending_responses: Dict[Tuple[str, str], asyncio.Future] = {} | |
| self.session_last_active: Dict[str, float] = {} | |
| self.session_timeout = session_timeout | |
| asyncio.create_task(self._cleanup_inactive_sessions()) | |
| async def _cleanup_inactive_sessions(self): | |
| while True: | |
| current_time = time.time() | |
| inactive_sessions = [ | |
| session_id for session_id, last_active in self.session_last_active.items() | |
| if current_time - last_active > self.session_timeout | |
| ] | |
| for session_id in inactive_sessions: | |
| if session_id in self.sessions: | |
| del self.sessions[session_id] | |
| del self.session_last_active[session_id] | |
| self.pending_responses = { | |
| k: v for k, v in self.pending_responses.items() | |
| if k[0] != session_id | |
| } | |
| await asyncio.sleep(60) # Verificar cada minuto | |
| async def send_request(self, session_id: str, data: Any, timeout: int = 60) -> Any: | |
| if session_id not in self.sessions: | |
| raise SessionDoesNotExist() | |
| self.session_last_active[session_id] = time.time() | |
| request_id = str(uuid.uuid4()) | |
| future = asyncio.get_event_loop().create_future() | |
| self.pending_responses[(session_id, request_id)] = future | |
| await self.sessions[session_id].put(ClientRequest(request_id=request_id, data=data)) | |
| try: | |
| return await asyncio.wait_for(future, timeout) | |
| except asyncio.TimeoutError: | |
| raise | |
| finally: | |
| if (session_id, request_id) in self.pending_responses: | |
| del self.pending_responses[(session_id, request_id)] | |
| async def receive_response(self, session_id: str, response: ClientResponse) -> None: | |
| self.session_last_active[session_id] = time.time() | |
| if (session_id, response.request_id) in self.pending_responses: | |
| future = self.pending_responses.pop((session_id, response.request_id)) | |
| if not future.done(): | |
| if response.error: | |
| future.set_exception(ClientError(message=response.data)) | |
| else: | |
| future.set_result(response.data) | |
| async def subscribe(self, session_id: str) -> AsyncGenerator[ClientRequest, None]: | |
| if session_id in self.sessions: | |
| raise SessionAlreadyExists() | |
| queue = asyncio.Queue() | |
| self.sessions[session_id] = queue | |
| self.session_last_active[session_id] = time.time() | |
| try: | |
| while True: | |
| yield await queue.get() | |
| self.session_last_active[session_id] = time.time() | |
| finally: | |
| if session_id in self.sessions: | |
| del self.sessions[session_id] | |
| if session_id in self.session_last_active: | |
| del self.session_last_active[session_id] | |
| self.pending_responses = { | |
| k: v for k, v in self.pending_responses.items() | |
| if k[0] != session_id | |
| } |