Spaces:
Running
Running
| """ | |
| User Session Manager | |
| ==================== | |
| ์ฌ์ฉ์ ์ธ์ฆ ์ธ์ ๊ด๋ฆฌ. | |
| ์ฃผ์: ์ด ๋ชจ๋์ MCP ํ๋กํ ์ฝ ์ธ์ (server_streamable.py์ SessionManager)๊ณผ ๋ณ๊ฐ์ ๋๋ค. | |
| - MCP SessionManager: MCP ํ๋กํ ์ฝ ํต์ ์ฉ ์ธ์ ID ๊ด๋ฆฌ | |
| - UserSessionManager: ์ฌ์ฉ์ ์ธ์ฆ ์ํ ๋ฐ JWT ํ ํฐ ๊ด๋ฆฌ | |
| """ | |
| import secrets | |
| import time | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from dataclasses import dataclass, field | |
| logger = logging.getLogger("eodi.auth.session") | |
| class UserSession: | |
| """์ฌ์ฉ์ ์ธ์ฆ ์ธ์ ์ ๋ณด""" | |
| user_id: str | |
| email: str | |
| access_token: str # Supabase access_token | |
| refresh_token: str # Supabase refresh_token | |
| expires_at: float # Unix timestamp | |
| created_at: float = field(default_factory=time.time) | |
| def is_expired(self) -> bool: | |
| """ํ ํฐ ๋ง๋ฃ ์ฌ๋ถ (30์ด ๋ฒํผ)""" | |
| return time.time() > (self.expires_at - 30) | |
| def email_masked(self) -> str: | |
| """์ด๋ฉ์ผ ๋ง์คํน (u***@example.com)""" | |
| if not self.email or "@" not in self.email: | |
| return self.email or "unknown" | |
| local, domain = self.email.split("@", 1) | |
| if len(local) <= 2: | |
| masked_local = local[0] + "*" | |
| else: | |
| masked_local = local[0] + "*" * (len(local) - 2) + local[-1] | |
| return f"{masked_local}@{domain}" | |
| def to_dict(self) -> Dict[str, Any]: | |
| """API ์๋ต์ฉ ๋์ ๋๋ฆฌ (๋ฏผ๊ฐ์ ๋ณด ์ ์ธ)""" | |
| return { | |
| "user_id": self.user_id, | |
| "email_masked": self.email_masked, | |
| "is_expired": self.is_expired, | |
| "created_at": self.created_at, | |
| } | |
| class UserSessionManager: | |
| """ | |
| ์ฌ์ฉ์ ์ธ์ฆ ์ธ์ ๊ด๋ฆฌ์. | |
| MCP Tool ํธ์ถ์์ ์ฌ์ฉ์๋ฅผ ์๋ณํ๊ธฐ ์ํ ์ธ์ ํ ํฐ์ ๊ด๋ฆฌํฉ๋๋ค. | |
| ํ๋ฆ: | |
| 1. ์ฌ์ฉ์๊ฐ OAuth/Magic Link๋ก ์ธ์ฆ | |
| 2. Supabase ํ ํฐ์ ๋ฐ์ ์ธ์ ์์ฑ | |
| 3. ์ธ์ ํ ํฐ์ LLM์๊ฒ ๋ฐํ | |
| 4. ์ดํ Tool ํธ์ถ ์ ์ธ์ ํ ํฐ์ ํ๋ผ๋ฏธํฐ๋ก ์ ๋ฌ | |
| 5. ์ธ์ ํ ํฐ์ผ๋ก user_id ์กฐํ โ DB ์ ๊ทผ | |
| ์ฃผ์: ๋ฉ๋ชจ๋ฆฌ ๊ธฐ๋ฐ ์ ์ฅ์ด๋ฏ๋ก ์๋ฒ ์ฌ์์ ์ ์ธ์ ์์ค๋ฉ๋๋ค. | |
| """ | |
| SESSION_TTL_SECONDS = 3600 # 1์๊ฐ | |
| MAX_SESSIONS = 1000 # ๋ฉ๋ชจ๋ฆฌ ๋ณดํธ | |
| def __init__(self): | |
| self._sessions: Dict[str, UserSession] = {} | |
| self._user_to_session: Dict[str, str] = {} # user_id -> session_token (์ต์ ) | |
| self._last_cleanup = time.time() | |
| def _cleanup_expired(self): | |
| """๋ง๋ฃ๋ ์ธ์ ์ ๋ฆฌ""" | |
| now = time.time() | |
| if now - self._last_cleanup < 300: # 5๋ถ๋ง๋ค | |
| return | |
| self._last_cleanup = now | |
| expired = [ | |
| token for token, session in self._sessions.items() | |
| if session.is_expired | |
| ] | |
| for token in expired: | |
| session = self._sessions.pop(token, None) | |
| if session: | |
| # user_to_session ๋งคํ๋ ์ ๋ฆฌ | |
| if self._user_to_session.get(session.user_id) == token: | |
| del self._user_to_session[session.user_id] | |
| if expired: | |
| logger.debug(f"๋ง๋ฃ๋ ์ธ์ {len(expired)}๊ฐ ์ ๋ฆฌ๋จ") | |
| def create_session( | |
| self, | |
| user_id: str, | |
| email: str, | |
| access_token: str, | |
| refresh_token: str, | |
| expires_in: int = 3600 | |
| ) -> str: | |
| """ | |
| ์ ์ฌ์ฉ์ ์ธ์ ์์ฑ. | |
| Args: | |
| user_id: Supabase Auth UUID | |
| email: ์ฌ์ฉ์ ์ด๋ฉ์ผ | |
| access_token: Supabase access_token | |
| refresh_token: Supabase refresh_token | |
| expires_in: ํ ํฐ ๋ง๋ฃ๊น์ง ์ด | |
| Returns: | |
| session_token (LLM์ด ํ์ ํธ์ถ์์ ์ฌ์ฉ) | |
| """ | |
| self._cleanup_expired() | |
| # ๋ฉ๋ชจ๋ฆฌ ๋ณดํธ | |
| if len(self._sessions) >= self.MAX_SESSIONS: | |
| # ๊ฐ์ฅ ์ค๋๋ ์ธ์ ์ญ์ | |
| oldest_token = min( | |
| self._sessions.keys(), | |
| key=lambda t: self._sessions[t].created_at | |
| ) | |
| old_session = self._sessions.pop(oldest_token) | |
| if self._user_to_session.get(old_session.user_id) == oldest_token: | |
| del self._user_to_session[old_session.user_id] | |
| logger.warning("์ธ์ ํ๋ ์ด๊ณผ, ๊ฐ์ฅ ์ค๋๋ ์ธ์ ์ญ์ ") | |
| # ๊ธฐ์กด ์ธ์ ์ด ์์ผ๋ฉด ๊ฐฑ์ | |
| existing_token = self._user_to_session.get(user_id) | |
| if existing_token and existing_token in self._sessions: | |
| session = self._sessions[existing_token] | |
| session.access_token = access_token | |
| session.refresh_token = refresh_token | |
| session.expires_at = time.time() + expires_in | |
| logger.info(f"์ธ์ ๊ฐฑ์ : {session.email_masked}") | |
| return existing_token | |
| # ์ ์ธ์ ์์ฑ | |
| session_token = secrets.token_urlsafe(32) | |
| session = UserSession( | |
| user_id=user_id, | |
| email=email, | |
| access_token=access_token, | |
| refresh_token=refresh_token, | |
| expires_at=time.time() + expires_in | |
| ) | |
| self._sessions[session_token] = session | |
| self._user_to_session[user_id] = session_token | |
| logger.info(f"์ ์ธ์ ์์ฑ: {session.email_masked}") | |
| return session_token | |
| def get_session(self, session_token: str) -> Optional[UserSession]: | |
| """ | |
| ์ธ์ ํ ํฐ์ผ๋ก ์ธ์ ์กฐํ. | |
| Args: | |
| session_token: ์ธ์ ํ ํฐ | |
| Returns: | |
| UserSession ๋๋ None (๋ง๋ฃ/๋ฏธ์กด์ฌ) | |
| """ | |
| self._cleanup_expired() | |
| session = self._sessions.get(session_token) | |
| if session and not session.is_expired: | |
| return session | |
| return None | |
| def get_user_id(self, session_token: str) -> Optional[str]: | |
| """ | |
| ์ธ์ ํ ํฐ์์ user_id ์ถ์ถ. | |
| Args: | |
| session_token: ์ธ์ ํ ํฐ | |
| Returns: | |
| user_id ๋๋ None | |
| """ | |
| session = self.get_session(session_token) | |
| return session.user_id if session else None | |
| def invalidate_session(self, session_token: str) -> bool: | |
| """ | |
| ์ธ์ ๋ฌดํจํ (๋ก๊ทธ์์). | |
| Args: | |
| session_token: ์ธ์ ํ ํฐ | |
| Returns: | |
| ์ฑ๊ณต ์ฌ๋ถ | |
| """ | |
| session = self._sessions.pop(session_token, None) | |
| if session: | |
| if self._user_to_session.get(session.user_id) == session_token: | |
| del self._user_to_session[session.user_id] | |
| logger.info(f"์ธ์ ๋ฌดํจํ: {session.email_masked}") | |
| return True | |
| return False | |
| def get_stats(self) -> Dict[str, Any]: | |
| """์ธ์ ํต๊ณ""" | |
| self._cleanup_expired() | |
| return { | |
| "total_sessions": len(self._sessions), | |
| "unique_users": len(self._user_to_session), | |
| } | |
| # ์ ์ญ ์ธ์คํด์ค (Lazy loading) | |
| _user_session_manager = None | |
| def get_user_session_manager() -> UserSessionManager: | |
| """UserSessionManager ์ฑ๊ธํค ์ธ์คํด์ค ๋ฐํ""" | |
| global _user_session_manager | |
| if _user_session_manager is None: | |
| _user_session_manager = UserSessionManager() | |
| return _user_session_manager | |