Spaces:
Running
Running
| """ | |
| Magic Link Authentication | |
| ========================= | |
| Claude Desktop λ± OAuthλ₯Ό μ§μνμ§ μλ MCP ν΄λΌμ΄μΈνΈμ©. | |
| νλ¦: | |
| 1. μ¬μ©μκ° μ΄λ©μΌ μ λ ₯ β user_request_auth Tool νΈμΆ | |
| 2. Supabase Magic Link λ°μ‘ | |
| 3. μ¬μ©μκ° μ΄λ©μΌμμ λ§ν¬ ν΄λ¦ β /auth/callback νμ΄μ§ | |
| 4. νμ΄μ§μμ 6μ리 μ½λ νμ | |
| 5. μ¬μ©μκ° GPT/Claudeλ‘ λμμμ μ½λ μ λ ₯ β user_verify_code Tool νΈμΆ | |
| 6. μ½λ κ²μ¦ β μΈμ ν ν° λ°ν | |
| """ | |
| import secrets | |
| import time | |
| import logging | |
| from typing import Dict, Any, Optional, Tuple | |
| from collections import defaultdict | |
| import httpx | |
| from .config import ( | |
| SUPABASE_URL, | |
| SUPABASE_KEY, | |
| MAGIC_LINK_REDIRECT_URL, | |
| AUTH_CODE_TTL_SECONDS, | |
| AUTH_CODE_LENGTH, | |
| AUTH_CODE_MAX_ATTEMPTS, | |
| ) | |
| logger = logging.getLogger("eodi.auth.magic_link") | |
| class AuthCodeStore: | |
| """ | |
| μΈμ¦ μ½λ μμ μ μ₯μ. | |
| Magic Link ν΄λ¦ ν νμλλ 6μ리 μ½λ κ΄λ¦¬. | |
| Rate limiting λ° brute force λ°©μ§ ν¬ν¨. | |
| """ | |
| def __init__(self, ttl_seconds: int = AUTH_CODE_TTL_SECONDS): | |
| self.ttl = ttl_seconds | |
| self._codes: Dict[str, Dict[str, Any]] = {} | |
| self._attempts: Dict[str, int] = defaultdict(int) # IPλ³ μλ νμ | |
| self._last_cleanup = time.time() | |
| def _cleanup(self): | |
| """λ§λ£λ μ½λ λ° μλ νμ μ 리""" | |
| now = time.time() | |
| if now - self._last_cleanup < 60: | |
| return | |
| self._last_cleanup = now | |
| # λ§λ£λ μ½λ μμ | |
| expired = [ | |
| c for c, data in self._codes.items() | |
| if now - data["created_at"] > self.ttl | |
| ] | |
| for c in expired: | |
| del self._codes[c] | |
| # μ€λλ μλ νμ 리μ | |
| self._attempts.clear() | |
| def generate_code( | |
| self, | |
| user_id: str, | |
| email: str, | |
| access_token: str, | |
| refresh_token: str | |
| ) -> str: | |
| """ | |
| 6μ리 μΈμ¦ μ½λ μμ±. | |
| Args: | |
| user_id: Supabase Auth UUID | |
| email: μ¬μ©μ μ΄λ©μΌ | |
| access_token: Supabase access_token | |
| refresh_token: Supabase refresh_token | |
| Returns: | |
| 6μ리 μ«μ μ½λ | |
| """ | |
| self._cleanup() | |
| # κ°μ user_idμ κΈ°μ‘΄ μ½λ μμ (μ€λ³΅ λ°©μ§) | |
| existing = [ | |
| c for c, data in self._codes.items() | |
| if data["user_id"] == user_id | |
| ] | |
| for c in existing: | |
| del self._codes[c] | |
| # 6μ리 μ«μ μ½λ μμ± | |
| code = "".join([str(secrets.randbelow(10)) for _ in range(AUTH_CODE_LENGTH)]) | |
| self._codes[code] = { | |
| "user_id": user_id, | |
| "email": email, | |
| "access_token": access_token, | |
| "refresh_token": refresh_token, | |
| "created_at": time.time(), | |
| } | |
| return code | |
| def verify_code( | |
| self, | |
| code: str, | |
| client_ip: str = "unknown" | |
| ) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: | |
| """ | |
| μ½λ κ²μ¦ λ° μλΉ. | |
| Args: | |
| code: 6μ리 μΈμ¦ μ½λ | |
| client_ip: ν΄λΌμ΄μΈνΈ IP (rate limitingμ©) | |
| Returns: | |
| (μ ν¨μ¬λΆ, code_data, error_message) | |
| """ | |
| self._cleanup() | |
| # Rate limiting μ²΄ν¬ | |
| if self._attempts[client_ip] >= AUTH_CODE_MAX_ATTEMPTS: | |
| logger.warning(f"Rate limit exceeded for IP: {client_ip[:20]}...") | |
| return False, None, f"λ무 λ§μ μλμ λλ€. {self.ttl}μ΄ ν λ€μ μλν΄μ£ΌμΈμ." | |
| self._attempts[client_ip] += 1 | |
| # μ½λ μ κ·ν (곡백 μ κ±° λ±) | |
| code = code.strip().replace(" ", "").replace("-", "") | |
| if code not in self._codes: | |
| return False, None, "μ½λκ° μΌμΉνμ§ μμ΅λλ€. λ€μ νμΈν΄μ£ΌμΈμ." | |
| code_data = self._codes.pop(code) | |
| if time.time() - code_data["created_at"] > self.ttl: | |
| return False, None, "μ½λκ° λ§λ£λμμ΅λλ€. μ μ½λλ₯Ό μμ²ν΄μ£ΌμΈμ." | |
| # μ±κ³΅ μ μλ νμ 리μ | |
| self._attempts[client_ip] = 0 | |
| return True, code_data, None | |
| class MagicLinkAuth: | |
| """ | |
| Magic Link μΈμ¦ νΈλ€λ¬. | |
| """ | |
| def __init__(self): | |
| self.code_store = AuthCodeStore() | |
| self._http_client = None | |
| async def _get_client(self) -> httpx.AsyncClient: | |
| """HTTP ν΄λΌμ΄μΈνΈ (μ¬μ¬μ©)""" | |
| if self._http_client is None: | |
| self._http_client = httpx.AsyncClient(timeout=30) | |
| return self._http_client | |
| async def send_magic_link(self, email: str) -> Tuple[bool, Optional[str]]: | |
| """ | |
| Magic Link μ΄λ©μΌ λ°μ‘. | |
| Args: | |
| email: μ¬μ©μ μ΄λ©μΌ | |
| Returns: | |
| (μ±κ³΅μ¬λΆ, error_message) | |
| """ | |
| try: | |
| client = await self._get_client() | |
| response = await client.post( | |
| f"{SUPABASE_URL}/auth/v1/magiclink", | |
| json={ | |
| "email": email, | |
| "options": { | |
| "redirectTo": MAGIC_LINK_REDIRECT_URL | |
| } | |
| }, | |
| headers={ | |
| "apikey": SUPABASE_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"Magic Link λ°μ‘ μ±κ³΅: {email[:3]}***") | |
| return True, None | |
| else: | |
| error_text = response.text[:200] | |
| logger.error(f"Magic Link λ°μ‘ μ€ν¨: {response.status_code} - {error_text}") | |
| return False, f"μ΄λ©μΌ λ°μ‘μ μ€ν¨νμ΅λλ€: {error_text}" | |
| except Exception as e: | |
| logger.error(f"Magic Link λ°μ‘ μ€λ₯: {e}") | |
| return False, f"μ΄λ©μΌ λ°μ‘ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {e}" | |
| def generate_auth_code( | |
| self, | |
| user_id: str, | |
| email: str, | |
| access_token: str, | |
| refresh_token: str | |
| ) -> str: | |
| """ | |
| μΈμ¦ μ±κ³΅ ν 6μ리 μ½λ μμ±. | |
| Magic Link ν΄λ¦ β /auth/callbackμμ νΈμΆ. | |
| """ | |
| return self.code_store.generate_code(user_id, email, access_token, refresh_token) | |
| def verify_auth_code( | |
| self, | |
| code: str, | |
| client_ip: str = "unknown" | |
| ) -> Tuple[bool, Optional[str], Optional[str], Optional[str]]: | |
| """ | |
| μΈμ¦ μ½λ κ²μ¦. | |
| Args: | |
| code: 6μ리 μΈμ¦ μ½λ | |
| client_ip: ν΄λΌμ΄μΈνΈ IP | |
| Returns: | |
| (μ ν¨μ¬λΆ, session_token, email_masked, error_message) | |
| """ | |
| is_valid, code_data, error = self.code_store.verify_code(code, client_ip) | |
| if not is_valid: | |
| return False, None, None, error | |
| # μΈμ μμ± | |
| from .session_manager import get_user_session_manager | |
| session_manager = get_user_session_manager() | |
| session_token = session_manager.create_session( | |
| user_id=code_data["user_id"], | |
| email=code_data["email"], | |
| access_token=code_data["access_token"], | |
| refresh_token=code_data["refresh_token"], | |
| expires_in=3600 | |
| ) | |
| # μ΄λ©μΌ λ§μ€νΉ | |
| email = code_data["email"] | |
| if "@" in email: | |
| local, domain = email.split("@", 1) | |
| if len(local) <= 2: | |
| masked = local[0] + "*" | |
| else: | |
| masked = local[0] + "*" * (len(local) - 2) + local[-1] | |
| email_masked = f"{masked}@{domain}" | |
| else: | |
| email_masked = email | |
| return True, session_token, email_masked, None | |
| # μ μ μΈμ€ν΄μ€ (Lazy loading) | |
| _magic_link_auth = None | |
| def get_magic_link_auth() -> MagicLinkAuth: | |
| """MagicLinkAuth μ±κΈν€ μΈμ€ν΄μ€ λ°ν""" | |
| global _magic_link_auth | |
| if _magic_link_auth is None: | |
| _magic_link_auth = MagicLinkAuth() | |
| return _magic_link_auth | |