eodi-mcp / src /auth /magic_link.py
lovelymango's picture
Upload 12 files
2310db1 verified
"""
Magic Link Authentication
=========================
Claude Desktop λ“± OAuthλ₯Ό μ§€μ›ν•˜μ§€ μ•ŠλŠ” MCP ν΄λΌμ΄μ–ΈνŠΈμš©.
흐름:
1. μ‚¬μš©μžκ°€ 이메일 μž…λ ₯ β†’ user_request_auth Tool 호좜
2. Supabase Magic Link λ°œμ†‘
3. μ‚¬μš©μžκ°€ μ΄λ©”μΌμ—μ„œ 링크 클릭 β†’ /auth/callback νŽ˜μ΄μ§€
4. νŽ˜μ΄μ§€μ—μ„œ 6자리 μ½”λ“œ ν‘œμ‹œ
5. μ‚¬μš©μžκ°€ GPT/Claude둜 λŒμ•„μ™€μ„œ μ½”λ“œ μž…λ ₯ β†’ user_verify_code Tool 호좜
6. μ½”λ“œ 검증 β†’ μ„Έμ…˜ 토큰 λ°˜ν™˜
"""
import secrets
import time
import logging
from typing import Dict, Any, Optional, Tuple
from collections import defaultdict
import httpx
from .config import (
SUPABASE_URL,
SUPABASE_KEY,
MAGIC_LINK_REDIRECT_URL,
AUTH_CODE_TTL_SECONDS,
AUTH_CODE_LENGTH,
AUTH_CODE_MAX_ATTEMPTS,
)
logger = logging.getLogger("eodi.auth.magic_link")
class AuthCodeStore:
"""
인증 μ½”λ“œ μž„μ‹œ μ €μž₯μ†Œ.
Magic Link 클릭 ν›„ ν‘œμ‹œλ˜λŠ” 6자리 μ½”λ“œ 관리.
Rate limiting 및 brute force λ°©μ§€ 포함.
"""
def __init__(self, ttl_seconds: int = AUTH_CODE_TTL_SECONDS):
self.ttl = ttl_seconds
self._codes: Dict[str, Dict[str, Any]] = {}
self._attempts: Dict[str, int] = defaultdict(int) # IP별 μ‹œλ„ 횟수
self._last_cleanup = time.time()
def _cleanup(self):
"""만료된 μ½”λ“œ 및 μ‹œλ„ 횟수 정리"""
now = time.time()
if now - self._last_cleanup < 60:
return
self._last_cleanup = now
# 만료된 μ½”λ“œ μ‚­μ œ
expired = [
c for c, data in self._codes.items()
if now - data["created_at"] > self.ttl
]
for c in expired:
del self._codes[c]
# 였래된 μ‹œλ„ 횟수 리셋
self._attempts.clear()
def generate_code(
self,
user_id: str,
email: str,
access_token: str,
refresh_token: str
) -> str:
"""
6자리 인증 μ½”λ“œ 생성.
Args:
user_id: Supabase Auth UUID
email: μ‚¬μš©μž 이메일
access_token: Supabase access_token
refresh_token: Supabase refresh_token
Returns:
6자리 숫자 μ½”λ“œ
"""
self._cleanup()
# 같은 user_id의 κΈ°μ‘΄ μ½”λ“œ μ‚­μ œ (쀑볡 λ°©μ§€)
existing = [
c for c, data in self._codes.items()
if data["user_id"] == user_id
]
for c in existing:
del self._codes[c]
# 6자리 숫자 μ½”λ“œ 생성
code = "".join([str(secrets.randbelow(10)) for _ in range(AUTH_CODE_LENGTH)])
self._codes[code] = {
"user_id": user_id,
"email": email,
"access_token": access_token,
"refresh_token": refresh_token,
"created_at": time.time(),
}
return code
def verify_code(
self,
code: str,
client_ip: str = "unknown"
) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
"""
μ½”λ“œ 검증 및 μ†ŒλΉ„.
Args:
code: 6자리 인증 μ½”λ“œ
client_ip: ν΄λΌμ΄μ–ΈνŠΈ IP (rate limiting용)
Returns:
(μœ νš¨μ—¬λΆ€, code_data, error_message)
"""
self._cleanup()
# Rate limiting 체크
if self._attempts[client_ip] >= AUTH_CODE_MAX_ATTEMPTS:
logger.warning(f"Rate limit exceeded for IP: {client_ip[:20]}...")
return False, None, f"λ„ˆλ¬΄ λ§Žμ€ μ‹œλ„μž…λ‹ˆλ‹€. {self.ttl}초 ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”."
self._attempts[client_ip] += 1
# μ½”λ“œ μ •κ·œν™” (곡백 제거 λ“±)
code = code.strip().replace(" ", "").replace("-", "")
if code not in self._codes:
return False, None, "μ½”λ“œκ°€ μΌμΉ˜ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. λ‹€μ‹œ ν™•μΈν•΄μ£Όμ„Έμš”."
code_data = self._codes.pop(code)
if time.time() - code_data["created_at"] > self.ttl:
return False, None, "μ½”λ“œκ°€ λ§Œλ£Œλ˜μ—ˆμŠ΅λ‹ˆλ‹€. μƒˆ μ½”λ“œλ₯Ό μš”μ²­ν•΄μ£Όμ„Έμš”."
# 성곡 μ‹œ μ‹œλ„ 횟수 리셋
self._attempts[client_ip] = 0
return True, code_data, None
class MagicLinkAuth:
"""
Magic Link 인증 ν•Έλ“€λŸ¬.
"""
def __init__(self):
self.code_store = AuthCodeStore()
self._http_client = None
async def _get_client(self) -> httpx.AsyncClient:
"""HTTP ν΄λΌμ΄μ–ΈνŠΈ (μž¬μ‚¬μš©)"""
if self._http_client is None:
self._http_client = httpx.AsyncClient(timeout=30)
return self._http_client
async def send_magic_link(self, email: str) -> Tuple[bool, Optional[str]]:
"""
Magic Link 이메일 λ°œμ†‘.
Args:
email: μ‚¬μš©μž 이메일
Returns:
(성곡여뢀, error_message)
"""
try:
client = await self._get_client()
response = await client.post(
f"{SUPABASE_URL}/auth/v1/magiclink",
json={
"email": email,
"options": {
"redirectTo": MAGIC_LINK_REDIRECT_URL
}
},
headers={
"apikey": SUPABASE_KEY,
"Content-Type": "application/json"
}
)
if response.status_code == 200:
logger.info(f"Magic Link λ°œμ†‘ 성곡: {email[:3]}***")
return True, None
else:
error_text = response.text[:200]
logger.error(f"Magic Link λ°œμ†‘ μ‹€νŒ¨: {response.status_code} - {error_text}")
return False, f"이메일 λ°œμ†‘μ— μ‹€νŒ¨ν–ˆμŠ΅λ‹ˆλ‹€: {error_text}"
except Exception as e:
logger.error(f"Magic Link λ°œμ†‘ 였λ₯˜: {e}")
return False, f"이메일 λ°œμ†‘ 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {e}"
def generate_auth_code(
self,
user_id: str,
email: str,
access_token: str,
refresh_token: str
) -> str:
"""
인증 성곡 ν›„ 6자리 μ½”λ“œ 생성.
Magic Link 클릭 β†’ /auth/callbackμ—μ„œ 호좜.
"""
return self.code_store.generate_code(user_id, email, access_token, refresh_token)
def verify_auth_code(
self,
code: str,
client_ip: str = "unknown"
) -> Tuple[bool, Optional[str], Optional[str], Optional[str]]:
"""
인증 μ½”λ“œ 검증.
Args:
code: 6자리 인증 μ½”λ“œ
client_ip: ν΄λΌμ΄μ–ΈνŠΈ IP
Returns:
(μœ νš¨μ—¬λΆ€, session_token, email_masked, error_message)
"""
is_valid, code_data, error = self.code_store.verify_code(code, client_ip)
if not is_valid:
return False, None, None, error
# μ„Έμ…˜ 생성
from .session_manager import get_user_session_manager
session_manager = get_user_session_manager()
session_token = session_manager.create_session(
user_id=code_data["user_id"],
email=code_data["email"],
access_token=code_data["access_token"],
refresh_token=code_data["refresh_token"],
expires_in=3600
)
# 이메일 λ§ˆμŠ€ν‚Ή
email = code_data["email"]
if "@" in email:
local, domain = email.split("@", 1)
if len(local) <= 2:
masked = local[0] + "*"
else:
masked = local[0] + "*" * (len(local) - 2) + local[-1]
email_masked = f"{masked}@{domain}"
else:
email_masked = email
return True, session_token, email_masked, None
# μ „μ—­ μΈμŠ€ν„΄μŠ€ (Lazy loading)
_magic_link_auth = None
def get_magic_link_auth() -> MagicLinkAuth:
"""MagicLinkAuth 싱글톀 μΈμŠ€ν„΄μŠ€ λ°˜ν™˜"""
global _magic_link_auth
if _magic_link_auth is None:
_magic_link_auth = MagicLinkAuth()
return _magic_link_auth