MCP_todo / services /auth.py
MAWB's picture
Update services/auth.py
425e1d7 verified
"""
JWT verification service for Better Auth integration.
Per @specs/001-auth-api-bridge/research.md
"""
from datetime import datetime, timedelta
from typing import Optional, Dict
from jose import JWTError, jwt
from config import settings
import uuid
SECRET_KEY = settings.better_auth_secret
ALGORITHM = "HS256"
# In-memory store for password reset tokens (in production, use Redis or database)
PASSWORD_RESET_TOKENS: Dict[str, dict] = {}
def create_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT token for a user.
Args:
user_id: User UUID to embed in the token
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=24)
payload = {
"sub": user_id,
"iat": datetime.utcnow(),
"exp": expire
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
async def verify_token(token: str) -> Optional[str]:
"""
Verify JWT token and return user_id.
Args:
token: JWT token string
Returns:
User ID (UUID string) if token is valid, None otherwise
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
return user_id
except JWTError:
return None
def create_password_reset_token(email: str) -> str:
"""
Create a password reset token for an email.
Args:
email: User's email address
Returns:
Password reset token
"""
reset_token = str(uuid.uuid4())
expiry = datetime.utcnow() + timedelta(hours=1) # Token valid for 1 hour
PASSWORD_RESET_TOKENS[reset_token] = {
"email": email,
"expires": expiry
}
return reset_token
def verify_password_reset_token(token: str) -> Optional[str]:
"""
Verify password reset token and return email.
Args:
token: Password reset token
Returns:
Email if token is valid, None otherwise
"""
if token not in PASSWORD_RESET_TOKENS:
return None
token_data = PASSWORD_RESET_TOKENS[token]
# Check if token has expired
if datetime.utcnow() > token_data["expires"]:
del PASSWORD_RESET_TOKENS[token]
return None
return token_data["email"]
def consume_password_reset_token(token: str) -> bool:
"""
Consume (invalidate) a password reset token after use.
Args:
token: Password reset token to consume
Returns:
True if token was valid and consumed, False otherwise
"""
if token in PASSWORD_RESET_TOKENS:
del PASSWORD_RESET_TOKENS[token]
return True
return False