""" User Session Manager ==================== 사용자 인증 세션 관리. 주의: 이 모듈은 MCP 프로토콜 세션(server_streamable.py의 SessionManager)과 별개입니다. - MCP SessionManager: MCP 프로토콜 통신용 세션 ID 관리 - UserSessionManager: 사용자 인증 상태 및 JWT 토큰 관리 """ import secrets import time import logging from typing import Dict, Any, Optional from dataclasses import dataclass, field logger = logging.getLogger("eodi.auth.session") @dataclass class UserSession: """사용자 인증 세션 정보""" user_id: str email: str access_token: str # Supabase access_token refresh_token: str # Supabase refresh_token expires_at: float # Unix timestamp created_at: float = field(default_factory=time.time) @property def is_expired(self) -> bool: """토큰 만료 여부 (30초 버퍼)""" return time.time() > (self.expires_at - 30) @property def email_masked(self) -> str: """이메일 마스킹 (u***@example.com)""" if not self.email or "@" not in self.email: return self.email or "unknown" local, domain = self.email.split("@", 1) if len(local) <= 2: masked_local = local[0] + "*" else: masked_local = local[0] + "*" * (len(local) - 2) + local[-1] return f"{masked_local}@{domain}" def to_dict(self) -> Dict[str, Any]: """API 응답용 딕셔너리 (민감정보 제외)""" return { "user_id": self.user_id, "email_masked": self.email_masked, "is_expired": self.is_expired, "created_at": self.created_at, } class UserSessionManager: """ 사용자 인증 세션 관리자. MCP Tool 호출에서 사용자를 식별하기 위한 세션 토큰을 관리합니다. 흐름: 1. 사용자가 OAuth/Magic Link로 인증 2. Supabase 토큰을 받아 세션 생성 3. 세션 토큰을 LLM에게 반환 4. 이후 Tool 호출 시 세션 토큰을 파라미터로 전달 5. 세션 토큰으로 user_id 조회 → DB 접근 주의: 메모리 기반 저장이므로 서버 재시작 시 세션 소실됩니다. """ SESSION_TTL_SECONDS = 3600 # 1시간 MAX_SESSIONS = 1000 # 메모리 보호 def __init__(self): self._sessions: Dict[str, UserSession] = {} self._user_to_session: Dict[str, str] = {} # user_id -> session_token (최신) self._last_cleanup = time.time() def _cleanup_expired(self): """만료된 세션 정리""" now = time.time() if now - self._last_cleanup < 300: # 5분마다 return self._last_cleanup = now expired = [ token for token, session in self._sessions.items() if session.is_expired ] for token in expired: session = self._sessions.pop(token, None) if session: # user_to_session 매핑도 정리 if self._user_to_session.get(session.user_id) == token: del self._user_to_session[session.user_id] if expired: logger.debug(f"만료된 세션 {len(expired)}개 정리됨") def create_session( self, user_id: str, email: str, access_token: str, refresh_token: str, expires_in: int = 3600 ) -> str: """ 새 사용자 세션 생성. Args: user_id: Supabase Auth UUID email: 사용자 이메일 access_token: Supabase access_token refresh_token: Supabase refresh_token expires_in: 토큰 만료까지 초 Returns: session_token (LLM이 후속 호출에서 사용) """ self._cleanup_expired() # 메모리 보호 if len(self._sessions) >= self.MAX_SESSIONS: # 가장 오래된 세션 삭제 oldest_token = min( self._sessions.keys(), key=lambda t: self._sessions[t].created_at ) old_session = self._sessions.pop(oldest_token) if self._user_to_session.get(old_session.user_id) == oldest_token: del self._user_to_session[old_session.user_id] logger.warning("세션 한도 초과, 가장 오래된 세션 삭제") # 기존 세션이 있으면 갱신 existing_token = self._user_to_session.get(user_id) if existing_token and existing_token in self._sessions: session = self._sessions[existing_token] session.access_token = access_token session.refresh_token = refresh_token session.expires_at = time.time() + expires_in logger.info(f"세션 갱신: {session.email_masked}") return existing_token # 새 세션 생성 session_token = secrets.token_urlsafe(32) session = UserSession( user_id=user_id, email=email, access_token=access_token, refresh_token=refresh_token, expires_at=time.time() + expires_in ) self._sessions[session_token] = session self._user_to_session[user_id] = session_token logger.info(f"새 세션 생성: {session.email_masked}") return session_token def get_session(self, session_token: str) -> Optional[UserSession]: """ 세션 토큰으로 세션 조회. Args: session_token: 세션 토큰 Returns: UserSession 또는 None (만료/미존재) """ self._cleanup_expired() session = self._sessions.get(session_token) if session and not session.is_expired: return session return None def get_user_id(self, session_token: str) -> Optional[str]: """ 세션 토큰에서 user_id 추출. Args: session_token: 세션 토큰 Returns: user_id 또는 None """ session = self.get_session(session_token) return session.user_id if session else None def invalidate_session(self, session_token: str) -> bool: """ 세션 무효화 (로그아웃). Args: session_token: 세션 토큰 Returns: 성공 여부 """ session = self._sessions.pop(session_token, None) if session: if self._user_to_session.get(session.user_id) == session_token: del self._user_to_session[session.user_id] logger.info(f"세션 무효화: {session.email_masked}") return True return False def get_stats(self) -> Dict[str, Any]: """세션 통계""" self._cleanup_expired() return { "total_sessions": len(self._sessions), "unique_users": len(self._user_to_session), } # 전역 인스턴스 (Lazy loading) _user_session_manager = None def get_user_session_manager() -> UserSessionManager: """UserSessionManager 싱글톤 인스턴스 반환""" global _user_session_manager if _user_session_manager is None: _user_session_manager = UserSessionManager() return _user_session_manager