eodi-mcp / src /auth /session_manager.py
lovelymango's picture
Upload 12 files
2310db1 verified
"""
User Session Manager
====================
์‚ฌ์šฉ์ž ์ธ์ฆ ์„ธ์…˜ ๊ด€๋ฆฌ.
์ฃผ์˜: ์ด ๋ชจ๋“ˆ์€ MCP ํ”„๋กœํ† ์ฝœ ์„ธ์…˜(server_streamable.py์˜ SessionManager)๊ณผ ๋ณ„๊ฐœ์ž…๋‹ˆ๋‹ค.
- MCP SessionManager: MCP ํ”„๋กœํ† ์ฝœ ํ†ต์‹ ์šฉ ์„ธ์…˜ ID ๊ด€๋ฆฌ
- UserSessionManager: ์‚ฌ์šฉ์ž ์ธ์ฆ ์ƒํƒœ ๋ฐ JWT ํ† ํฐ ๊ด€๋ฆฌ
"""
import secrets
import time
import logging
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
logger = logging.getLogger("eodi.auth.session")
@dataclass
class UserSession:
"""์‚ฌ์šฉ์ž ์ธ์ฆ ์„ธ์…˜ ์ •๋ณด"""
user_id: str
email: str
access_token: str # Supabase access_token
refresh_token: str # Supabase refresh_token
expires_at: float # Unix timestamp
created_at: float = field(default_factory=time.time)
@property
def is_expired(self) -> bool:
"""ํ† ํฐ ๋งŒ๋ฃŒ ์—ฌ๋ถ€ (30์ดˆ ๋ฒ„ํผ)"""
return time.time() > (self.expires_at - 30)
@property
def email_masked(self) -> str:
"""์ด๋ฉ”์ผ ๋งˆ์Šคํ‚น (u***@example.com)"""
if not self.email or "@" not in self.email:
return self.email or "unknown"
local, domain = self.email.split("@", 1)
if len(local) <= 2:
masked_local = local[0] + "*"
else:
masked_local = local[0] + "*" * (len(local) - 2) + local[-1]
return f"{masked_local}@{domain}"
def to_dict(self) -> Dict[str, Any]:
"""API ์‘๋‹ต์šฉ ๋”•์…”๋„ˆ๋ฆฌ (๋ฏผ๊ฐ์ •๋ณด ์ œ์™ธ)"""
return {
"user_id": self.user_id,
"email_masked": self.email_masked,
"is_expired": self.is_expired,
"created_at": self.created_at,
}
class UserSessionManager:
"""
์‚ฌ์šฉ์ž ์ธ์ฆ ์„ธ์…˜ ๊ด€๋ฆฌ์ž.
MCP Tool ํ˜ธ์ถœ์—์„œ ์‚ฌ์šฉ์ž๋ฅผ ์‹๋ณ„ํ•˜๊ธฐ ์œ„ํ•œ ์„ธ์…˜ ํ† ํฐ์„ ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
ํ๋ฆ„:
1. ์‚ฌ์šฉ์ž๊ฐ€ OAuth/Magic Link๋กœ ์ธ์ฆ
2. Supabase ํ† ํฐ์„ ๋ฐ›์•„ ์„ธ์…˜ ์ƒ์„ฑ
3. ์„ธ์…˜ ํ† ํฐ์„ LLM์—๊ฒŒ ๋ฐ˜ํ™˜
4. ์ดํ›„ Tool ํ˜ธ์ถœ ์‹œ ์„ธ์…˜ ํ† ํฐ์„ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ „๋‹ฌ
5. ์„ธ์…˜ ํ† ํฐ์œผ๋กœ user_id ์กฐํšŒ โ†’ DB ์ ‘๊ทผ
์ฃผ์˜: ๋ฉ”๋ชจ๋ฆฌ ๊ธฐ๋ฐ˜ ์ €์žฅ์ด๋ฏ€๋กœ ์„œ๋ฒ„ ์žฌ์‹œ์ž‘ ์‹œ ์„ธ์…˜ ์†Œ์‹ค๋ฉ๋‹ˆ๋‹ค.
"""
SESSION_TTL_SECONDS = 3600 # 1์‹œ๊ฐ„
MAX_SESSIONS = 1000 # ๋ฉ”๋ชจ๋ฆฌ ๋ณดํ˜ธ
def __init__(self):
self._sessions: Dict[str, UserSession] = {}
self._user_to_session: Dict[str, str] = {} # user_id -> session_token (์ตœ์‹ )
self._last_cleanup = time.time()
def _cleanup_expired(self):
"""๋งŒ๋ฃŒ๋œ ์„ธ์…˜ ์ •๋ฆฌ"""
now = time.time()
if now - self._last_cleanup < 300: # 5๋ถ„๋งˆ๋‹ค
return
self._last_cleanup = now
expired = [
token for token, session in self._sessions.items()
if session.is_expired
]
for token in expired:
session = self._sessions.pop(token, None)
if session:
# user_to_session ๋งคํ•‘๋„ ์ •๋ฆฌ
if self._user_to_session.get(session.user_id) == token:
del self._user_to_session[session.user_id]
if expired:
logger.debug(f"๋งŒ๋ฃŒ๋œ ์„ธ์…˜ {len(expired)}๊ฐœ ์ •๋ฆฌ๋จ")
def create_session(
self,
user_id: str,
email: str,
access_token: str,
refresh_token: str,
expires_in: int = 3600
) -> str:
"""
์ƒˆ ์‚ฌ์šฉ์ž ์„ธ์…˜ ์ƒ์„ฑ.
Args:
user_id: Supabase Auth UUID
email: ์‚ฌ์šฉ์ž ์ด๋ฉ”์ผ
access_token: Supabase access_token
refresh_token: Supabase refresh_token
expires_in: ํ† ํฐ ๋งŒ๋ฃŒ๊นŒ์ง€ ์ดˆ
Returns:
session_token (LLM์ด ํ›„์† ํ˜ธ์ถœ์—์„œ ์‚ฌ์šฉ)
"""
self._cleanup_expired()
# ๋ฉ”๋ชจ๋ฆฌ ๋ณดํ˜ธ
if len(self._sessions) >= self.MAX_SESSIONS:
# ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ์„ธ์…˜ ์‚ญ์ œ
oldest_token = min(
self._sessions.keys(),
key=lambda t: self._sessions[t].created_at
)
old_session = self._sessions.pop(oldest_token)
if self._user_to_session.get(old_session.user_id) == oldest_token:
del self._user_to_session[old_session.user_id]
logger.warning("์„ธ์…˜ ํ•œ๋„ ์ดˆ๊ณผ, ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ์„ธ์…˜ ์‚ญ์ œ")
# ๊ธฐ์กด ์„ธ์…˜์ด ์žˆ์œผ๋ฉด ๊ฐฑ์‹ 
existing_token = self._user_to_session.get(user_id)
if existing_token and existing_token in self._sessions:
session = self._sessions[existing_token]
session.access_token = access_token
session.refresh_token = refresh_token
session.expires_at = time.time() + expires_in
logger.info(f"์„ธ์…˜ ๊ฐฑ์‹ : {session.email_masked}")
return existing_token
# ์ƒˆ ์„ธ์…˜ ์ƒ์„ฑ
session_token = secrets.token_urlsafe(32)
session = UserSession(
user_id=user_id,
email=email,
access_token=access_token,
refresh_token=refresh_token,
expires_at=time.time() + expires_in
)
self._sessions[session_token] = session
self._user_to_session[user_id] = session_token
logger.info(f"์ƒˆ ์„ธ์…˜ ์ƒ์„ฑ: {session.email_masked}")
return session_token
def get_session(self, session_token: str) -> Optional[UserSession]:
"""
์„ธ์…˜ ํ† ํฐ์œผ๋กœ ์„ธ์…˜ ์กฐํšŒ.
Args:
session_token: ์„ธ์…˜ ํ† ํฐ
Returns:
UserSession ๋˜๋Š” None (๋งŒ๋ฃŒ/๋ฏธ์กด์žฌ)
"""
self._cleanup_expired()
session = self._sessions.get(session_token)
if session and not session.is_expired:
return session
return None
def get_user_id(self, session_token: str) -> Optional[str]:
"""
์„ธ์…˜ ํ† ํฐ์—์„œ user_id ์ถ”์ถœ.
Args:
session_token: ์„ธ์…˜ ํ† ํฐ
Returns:
user_id ๋˜๋Š” None
"""
session = self.get_session(session_token)
return session.user_id if session else None
def invalidate_session(self, session_token: str) -> bool:
"""
์„ธ์…˜ ๋ฌดํšจํ™” (๋กœ๊ทธ์•„์›ƒ).
Args:
session_token: ์„ธ์…˜ ํ† ํฐ
Returns:
์„ฑ๊ณต ์—ฌ๋ถ€
"""
session = self._sessions.pop(session_token, None)
if session:
if self._user_to_session.get(session.user_id) == session_token:
del self._user_to_session[session.user_id]
logger.info(f"์„ธ์…˜ ๋ฌดํšจํ™”: {session.email_masked}")
return True
return False
def get_stats(self) -> Dict[str, Any]:
"""์„ธ์…˜ ํ†ต๊ณ„"""
self._cleanup_expired()
return {
"total_sessions": len(self._sessions),
"unique_users": len(self._user_to_session),
}
# ์ „์—ญ ์ธ์Šคํ„ด์Šค (Lazy loading)
_user_session_manager = None
def get_user_session_manager() -> UserSessionManager:
"""UserSessionManager ์‹ฑ๊ธ€ํ†ค ์ธ์Šคํ„ด์Šค ๋ฐ˜ํ™˜"""
global _user_session_manager
if _user_session_manager is None:
_user_session_manager = UserSessionManager()
return _user_session_manager