""" Magic Link Authentication ========================= Claude Desktop 등 OAuth를 지원하지 않는 MCP 클라이언트용. 흐름: 1. 사용자가 이메일 입력 → user_request_auth Tool 호출 2. Supabase Magic Link 발송 3. 사용자가 이메일에서 링크 클릭 → /auth/callback 페이지 4. 페이지에서 6자리 코드 표시 5. 사용자가 GPT/Claude로 돌아와서 코드 입력 → user_verify_code Tool 호출 6. 코드 검증 → 세션 토큰 반환 """ import secrets import time import logging from typing import Dict, Any, Optional, Tuple from collections import defaultdict import httpx from .config import ( SUPABASE_URL, SUPABASE_KEY, MAGIC_LINK_REDIRECT_URL, AUTH_CODE_TTL_SECONDS, AUTH_CODE_LENGTH, AUTH_CODE_MAX_ATTEMPTS, ) logger = logging.getLogger("eodi.auth.magic_link") class AuthCodeStore: """ 인증 코드 임시 저장소. Magic Link 클릭 후 표시되는 6자리 코드 관리. Rate limiting 및 brute force 방지 포함. """ def __init__(self, ttl_seconds: int = AUTH_CODE_TTL_SECONDS): self.ttl = ttl_seconds self._codes: Dict[str, Dict[str, Any]] = {} self._attempts: Dict[str, int] = defaultdict(int) # IP별 시도 횟수 self._last_cleanup = time.time() def _cleanup(self): """만료된 코드 및 시도 횟수 정리""" now = time.time() if now - self._last_cleanup < 60: return self._last_cleanup = now # 만료된 코드 삭제 expired = [ c for c, data in self._codes.items() if now - data["created_at"] > self.ttl ] for c in expired: del self._codes[c] # 오래된 시도 횟수 리셋 self._attempts.clear() def generate_code( self, user_id: str, email: str, access_token: str, refresh_token: str ) -> str: """ 6자리 인증 코드 생성. Args: user_id: Supabase Auth UUID email: 사용자 이메일 access_token: Supabase access_token refresh_token: Supabase refresh_token Returns: 6자리 숫자 코드 """ self._cleanup() # 같은 user_id의 기존 코드 삭제 (중복 방지) existing = [ c for c, data in self._codes.items() if data["user_id"] == user_id ] for c in existing: del self._codes[c] # 6자리 숫자 코드 생성 code = "".join([str(secrets.randbelow(10)) for _ in range(AUTH_CODE_LENGTH)]) self._codes[code] = { "user_id": user_id, "email": email, "access_token": access_token, "refresh_token": refresh_token, "created_at": time.time(), } return code def verify_code( self, code: str, client_ip: str = "unknown" ) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: """ 코드 검증 및 소비. Args: code: 6자리 인증 코드 client_ip: 클라이언트 IP (rate limiting용) Returns: (유효여부, code_data, error_message) """ self._cleanup() # Rate limiting 체크 if self._attempts[client_ip] >= AUTH_CODE_MAX_ATTEMPTS: logger.warning(f"Rate limit exceeded for IP: {client_ip[:20]}...") return False, None, f"너무 많은 시도입니다. {self.ttl}초 후 다시 시도해주세요." self._attempts[client_ip] += 1 # 코드 정규화 (공백 제거 등) code = code.strip().replace(" ", "").replace("-", "") if code not in self._codes: return False, None, "코드가 일치하지 않습니다. 다시 확인해주세요." code_data = self._codes.pop(code) if time.time() - code_data["created_at"] > self.ttl: return False, None, "코드가 만료되었습니다. 새 코드를 요청해주세요." # 성공 시 시도 횟수 리셋 self._attempts[client_ip] = 0 return True, code_data, None class MagicLinkAuth: """ Magic Link 인증 핸들러. """ def __init__(self): self.code_store = AuthCodeStore() self._http_client = None async def _get_client(self) -> httpx.AsyncClient: """HTTP 클라이언트 (재사용)""" if self._http_client is None: self._http_client = httpx.AsyncClient(timeout=30) return self._http_client async def send_magic_link(self, email: str) -> Tuple[bool, Optional[str]]: """ Magic Link 이메일 발송. Args: email: 사용자 이메일 Returns: (성공여부, error_message) """ try: client = await self._get_client() response = await client.post( f"{SUPABASE_URL}/auth/v1/magiclink", json={ "email": email, "options": { "redirectTo": MAGIC_LINK_REDIRECT_URL } }, headers={ "apikey": SUPABASE_KEY, "Content-Type": "application/json" } ) if response.status_code == 200: logger.info(f"Magic Link 발송 성공: {email[:3]}***") return True, None else: error_text = response.text[:200] logger.error(f"Magic Link 발송 실패: {response.status_code} - {error_text}") return False, f"이메일 발송에 실패했습니다: {error_text}" except Exception as e: logger.error(f"Magic Link 발송 오류: {e}") return False, f"이메일 발송 중 오류가 발생했습니다: {e}" def generate_auth_code( self, user_id: str, email: str, access_token: str, refresh_token: str ) -> str: """ 인증 성공 후 6자리 코드 생성. Magic Link 클릭 → /auth/callback에서 호출. """ return self.code_store.generate_code(user_id, email, access_token, refresh_token) def verify_auth_code( self, code: str, client_ip: str = "unknown" ) -> Tuple[bool, Optional[str], Optional[str], Optional[str]]: """ 인증 코드 검증. Args: code: 6자리 인증 코드 client_ip: 클라이언트 IP Returns: (유효여부, session_token, email_masked, error_message) """ is_valid, code_data, error = self.code_store.verify_code(code, client_ip) if not is_valid: return False, None, None, error # 세션 생성 from .session_manager import get_user_session_manager session_manager = get_user_session_manager() session_token = session_manager.create_session( user_id=code_data["user_id"], email=code_data["email"], access_token=code_data["access_token"], refresh_token=code_data["refresh_token"], expires_in=3600 ) # 이메일 마스킹 email = code_data["email"] if "@" in email: local, domain = email.split("@", 1) if len(local) <= 2: masked = local[0] + "*" else: masked = local[0] + "*" * (len(local) - 2) + local[-1] email_masked = f"{masked}@{domain}" else: email_masked = email return True, session_token, email_masked, None # 전역 인스턴스 (Lazy loading) _magic_link_auth = None def get_magic_link_auth() -> MagicLinkAuth: """MagicLinkAuth 싱글톤 인스턴스 반환""" global _magic_link_auth if _magic_link_auth is None: _magic_link_auth = MagicLinkAuth() return _magic_link_auth