Spaces:
Running
Running
| """ | |
| JWT Token Validator | |
| ================== | |
| Supabase Auth๊ฐ ๋ฐ๊ธํ JWT ํ ํฐ ๊ฒ์ฆ. | |
| """ | |
| import logging | |
| from typing import Dict, Any, Optional, Tuple | |
| from datetime import datetime, timezone | |
| try: | |
| import jwt | |
| except ImportError: | |
| jwt = None # PyJWT ๋ฏธ์ค์น ์ graceful ์ฒ๋ฆฌ | |
| from .config import SUPABASE_JWT_SECRET | |
| logger = logging.getLogger("eodi.auth.token_validator") | |
| class TokenValidator: | |
| """ | |
| JWT ํ ํฐ ๊ฒ์ฆ๊ธฐ. | |
| Supabase Auth๊ฐ ๋ฐ๊ธํ access_token์ ์๋ช ๋ฐ ๋ง๋ฃ ๊ฒ์ฆ. | |
| ์๋ฒ ์ธก์์ ํ ํฐ ์ ํจ์ฑ์ ํ์ธํ ๋ ์ฌ์ฉ. | |
| """ | |
| def __init__(self, jwt_secret: str = None): | |
| """ | |
| Args: | |
| jwt_secret: Supabase JWT Secret (์์ผ๋ฉด ํ๊ฒฝ๋ณ์์์ ๋ก๋) | |
| """ | |
| self.jwt_secret = jwt_secret or SUPABASE_JWT_SECRET | |
| self._enabled = bool(self.jwt_secret) and jwt is not None | |
| if jwt is None: | |
| logger.warning("PyJWT๊ฐ ์ค์น๋์ง ์์์ต๋๋ค. pip install PyJWT") | |
| elif not self._enabled: | |
| logger.warning("JWT Secret ๋ฏธ์ค์ - ํ ํฐ ๊ฒ์ฆ์ด ๋นํ์ฑํ๋ฉ๋๋ค") | |
| def is_enabled(self) -> bool: | |
| """ํ ํฐ ๊ฒ์ฆ ํ์ฑํ ์ฌ๋ถ""" | |
| return self._enabled | |
| def validate(self, token: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: | |
| """ | |
| JWT ํ ํฐ ๊ฒ์ฆ. | |
| Args: | |
| token: JWT access_token | |
| Returns: | |
| (์ ํจ์ฌ๋ถ, payload, error_message) | |
| """ | |
| if not self._enabled: | |
| # JWT Secret ๋ฏธ์ค์ ์ ๊ฒ์ฆ ์คํต (๊ฐ๋ฐ ํ๊ฒฝ์ฉ) | |
| logger.warning("JWT ๊ฒ์ฆ ์คํต (Secret ๋ฏธ์ค์ ๋๋ PyJWT ๋ฏธ์ค์น)") | |
| return True, {"sub": "unknown"}, None | |
| try: | |
| payload = jwt.decode( | |
| token, | |
| self.jwt_secret, | |
| algorithms=["HS256"], | |
| options={"verify_aud": False} # Supabase๋ aud ํด๋ ์์ด ๋ค์ํจ | |
| ) | |
| return True, payload, None | |
| except jwt.ExpiredSignatureError: | |
| return False, None, "ํ ํฐ์ด ๋ง๋ฃ๋์์ต๋๋ค" | |
| except jwt.InvalidTokenError as e: | |
| logger.warning(f"JWT ๊ฒ์ฆ ์คํจ: {e}") | |
| return False, None, f"์ ํจํ์ง ์์ ํ ํฐ์ ๋๋ค: {e}" | |
| def extract_user_id(self, token: str) -> Optional[str]: | |
| """ | |
| ํ ํฐ์์ user_id(sub ํด๋ ์) ์ถ์ถ. | |
| Args: | |
| token: JWT access_token | |
| Returns: | |
| user_id ๋๋ None | |
| """ | |
| is_valid, payload, _ = self.validate(token) | |
| if is_valid and payload: | |
| return payload.get("sub") | |
| return None | |
| def is_expired(self, token: str) -> bool: | |
| """ | |
| ํ ํฐ ๋ง๋ฃ ์ฌ๋ถ ํ์ธ. | |
| Args: | |
| token: JWT access_token | |
| Returns: | |
| ๋ง๋ฃ ์ฌ๋ถ | |
| """ | |
| is_valid, _, error = self.validate(token) | |
| return not is_valid and error and "๋ง๋ฃ" in error | |
| def get_expiry_time(self, token: str) -> Optional[datetime]: | |
| """ | |
| ํ ํฐ ๋ง๋ฃ ์๊ฐ ์ถ์ถ. | |
| Args: | |
| token: JWT access_token | |
| Returns: | |
| ๋ง๋ฃ ์๊ฐ ๋๋ None | |
| """ | |
| if jwt is None: | |
| return None | |
| try: | |
| # ๊ฒ์ฆ ์์ด ํ์ด๋ก๋๋ง ๋์ฝ๋ฉ | |
| payload = jwt.decode( | |
| token, | |
| options={"verify_signature": False} | |
| ) | |
| exp = payload.get("exp") | |
| if exp: | |
| return datetime.fromtimestamp(exp, tz=timezone.utc) | |
| return None | |
| except Exception: | |
| return None | |
| # ์ ์ญ ์ธ์คํด์ค (Lazy loading) | |
| _token_validator = None | |
| def get_token_validator() -> TokenValidator: | |
| """TokenValidator ์ฑ๊ธํค ์ธ์คํด์ค ๋ฐํ""" | |
| global _token_validator | |
| if _token_validator is None: | |
| _token_validator = TokenValidator() | |
| return _token_validator | |