""" JWT Token Validator ================== Supabase Auth가 발급한 JWT 토큰 검증. """ import logging from typing import Dict, Any, Optional, Tuple from datetime import datetime, timezone try: import jwt except ImportError: jwt = None # PyJWT 미설치 시 graceful 처리 from .config import SUPABASE_JWT_SECRET logger = logging.getLogger("eodi.auth.token_validator") class TokenValidator: """ JWT 토큰 검증기. Supabase Auth가 발급한 access_token의 서명 및 만료 검증. 서버 측에서 토큰 유효성을 확인할 때 사용. """ def __init__(self, jwt_secret: str = None): """ Args: jwt_secret: Supabase JWT Secret (없으면 환경변수에서 로드) """ self.jwt_secret = jwt_secret or SUPABASE_JWT_SECRET self._enabled = bool(self.jwt_secret) and jwt is not None if jwt is None: logger.warning("PyJWT가 설치되지 않았습니다. pip install PyJWT") elif not self._enabled: logger.warning("JWT Secret 미설정 - 토큰 검증이 비활성화됩니다") @property def is_enabled(self) -> bool: """토큰 검증 활성화 여부""" return self._enabled def validate(self, token: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: """ JWT 토큰 검증. Args: token: JWT access_token Returns: (유효여부, payload, error_message) """ if not self._enabled: # JWT Secret 미설정 시 검증 스킵 (개발 환경용) logger.warning("JWT 검증 스킵 (Secret 미설정 또는 PyJWT 미설치)") return True, {"sub": "unknown"}, None try: payload = jwt.decode( token, self.jwt_secret, algorithms=["HS256"], options={"verify_aud": False} # Supabase는 aud 클레임이 다양함 ) return True, payload, None except jwt.ExpiredSignatureError: return False, None, "토큰이 만료되었습니다" except jwt.InvalidTokenError as e: logger.warning(f"JWT 검증 실패: {e}") return False, None, f"유효하지 않은 토큰입니다: {e}" def extract_user_id(self, token: str) -> Optional[str]: """ 토큰에서 user_id(sub 클레임) 추출. Args: token: JWT access_token Returns: user_id 또는 None """ is_valid, payload, _ = self.validate(token) if is_valid and payload: return payload.get("sub") return None def is_expired(self, token: str) -> bool: """ 토큰 만료 여부 확인. Args: token: JWT access_token Returns: 만료 여부 """ is_valid, _, error = self.validate(token) return not is_valid and error and "만료" in error def get_expiry_time(self, token: str) -> Optional[datetime]: """ 토큰 만료 시간 추출. Args: token: JWT access_token Returns: 만료 시간 또는 None """ if jwt is None: return None try: # 검증 없이 페이로드만 디코딩 payload = jwt.decode( token, options={"verify_signature": False} ) exp = payload.get("exp") if exp: return datetime.fromtimestamp(exp, tz=timezone.utc) return None except Exception: return None # 전역 인스턴스 (Lazy loading) _token_validator = None def get_token_validator() -> TokenValidator: """TokenValidator 싱글톤 인스턴스 반환""" global _token_validator if _token_validator is None: _token_validator = TokenValidator() return _token_validator