spwebsite / routes /auth.py
geqintan's picture
update
133609a
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, status, Request
from pydantic import BaseModel # Import BaseModel for EmailRequest
from supabase import create_client, Client # Import create_client
from gotrue.errors import AuthApiError
from core.config import SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SECRET_KEY, ALGORITHM, VERIFICATION_CODE_EXPIRE_MINUTES
from core.models import UserCredentials, ForgotPasswordRequest, Token, User, ChangePasswordRequest, ResetPasswordWithCodeRequest # Import new model
from core.utils import verify_password, get_password_hash, create_access_token, send_email, \
generate_verification_code, store_verification_code, verify_stored_code # Import verification code utilities
from core.dependencies import get_current_active_user, get_current_user_from_token
import jwt # Import jwt for change_password
class EmailRequest(BaseModel):
email: str
router = APIRouter()
# Supabase Client (service role for custom user management)
service_supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
@router.post("/send-verification-code")
async def send_verification_code(email_request: EmailRequest):
"""
生成并发送邮箱验证码。
"""
try:
# Check if user already exists
res = service_supabase.table('sp_users').select('id').eq('email', email_request.email).execute()
if res.data and len(res.data) > 0:
# 如果邮箱已注册,返回一个不同的消息,而不是抛出错误
return {"message": "此邮箱已被注册,请直接登录或尝试找回密码。"}
code = generate_verification_code()
store_verification_code(email_request.email, code)
email_body = f"您好,\n\n您的验证码是:{code}\n\n此验证码将在 {VERIFICATION_CODE_EXPIRE_MINUTES} 分钟后失效。\n\n如果您没有请求此验证码,请忽略此邮件。\n\n此致,\nSuperProxy Support"
try:
print(f"Attempting to send verification email to {email_request.email}...")
email_sent = send_email(email_request.email, "您的验证码", email_body)
if email_sent:
print(f"Verification email successfully sent to {email_request.email}.")
return {"message": "验证码已发送,请检查您的邮箱。"}
else:
print(f"Failed to send verification email to {email_request.email}. send_email returned False.")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="发送验证码失败。请联系管理员或检查邮箱配置。"
)
except Exception as email_error:
print(f"Error sending verification email to {email_request.email}: {email_error}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="发送验证码失败。请检查邮箱配置或联系管理员。"
)
except HTTPException:
raise
except Exception as e:
print(f"Error in send_verification_code: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/send-reset-password-code")
async def send_reset_password_code(email_request: EmailRequest):
"""
生成并发送用于重置密码的邮箱验证码。
"""
try:
# Check if user exists
res = service_supabase.table('sp_users').select('id').eq('email', email_request.email).execute()
if not res.data or len(res.data) == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="此邮箱未注册。"
)
code = generate_verification_code()
store_verification_code(email_request.email, code, prefix="reset_") # Use a different prefix for reset codes
email_body = f"您好,\n\n您请求重置密码。您的验证码是:{code}\n\n此验证码将在 {VERIFICATION_CODE_EXPIRE_MINUTES} 分钟后失效。\n\n如果您没有请求此操作,请忽略此邮件。\n\n此致,\nSuperProxy Support"
try:
print(f"Attempting to send reset password verification email to {email_request.email}...")
email_sent = send_email(email_request.email, "您的重置密码验证码", email_body)
if email_sent:
print(f"Reset password verification email successfully sent to {email_request.email}.")
return {"message": "重置密码验证码已发送,请检查您的邮箱。"}
else:
print(f"Failed to send reset password verification email to {email_request.email}. send_email returned False.")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="发送重置密码验证码失败。请联系管理员或检查邮箱配置。"
)
except Exception as email_error:
print(f"Error sending reset password verification email to {email_request.email}: {email_error}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="发送重置密码验证码失败。请检查邮箱配置或联系管理员。"
)
except HTTPException:
raise
except Exception as e:
print(f"Error in send_reset_password_code: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/signup")
async def signup(user_credentials: UserCredentials, request: Request):
try:
# Check if user already exists in sp_users table
res = service_supabase.table('sp_users').select('id').eq('email', user_credentials.email).execute()
if res.data and len(res.data) > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="此邮箱已被注册。"
)
# Verify the provided verification code
if not verify_stored_code(user_credentials.email, user_credentials.verification_code):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="验证码不正确或已过期。"
)
hashed_password = get_password_hash(user_credentials.password)
# Insert user into sp_users table after successful email verification
data, count = service_supabase.table('sp_users').insert({
"email": user_credentials.email,
"password_hash": hashed_password,
"created_at": datetime.now().isoformat(),
"email_verified": True, # Mark email as verified
"is_admin": False # New users are not admins by default
}).execute()
if data:
return {"message": "注册成功!您现在可以登录。"}
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="注册失败,无法创建用户。请联系管理员。"
)
except HTTPException:
raise
except Exception as e:
print(f"Error in signup: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.get("/verify-email")
async def verify_email(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
token_type: str = payload.get("type")
password_hash: str = payload.get("password_hash")
if email is None or token_type != "email_verification" or password_hash is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired verification token.")
# Check if user already exists (e.g., if verification link was clicked multiple times)
res = service_supabase.table('sp_users').select('id').eq('email', email).execute()
if res.data and len(res.data) > 0:
return {"message": "邮箱已验证,您现在可以登录。"}
# Insert user into sp_users table after successful email verification
data, count = service_supabase.table('sp_users').insert({
"email": email,
"password_hash": password_hash,
"created_at": datetime.now().isoformat(),
"email_verified": True, # Mark email as verified
"is_admin": False # New users are not admins by default
}).execute()
if data:
return {"message": "邮箱验证成功!您现在可以登录。"}
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="邮箱验证失败,无法创建用户。请联系管理员。"
)
except jwt.PyJWTError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired verification token.")
except Exception as e:
print(f"Error in verify_email: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/login", response_model=Token)
async def login(user_credentials: UserCredentials):
try:
try:
res = service_supabase.table('sp_users').select('id, email, password_hash').eq('email', user_credentials.email).single().execute()
user_data = res.data
except AuthApiError as e: # Catch Supabase specific errors
if "The result contains 0 rows" in e.message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, # 404 for user not found
detail="用户名不存在。"
)
else:
print(f"AuthApiError fetching user in login: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码不正确,请重新输入。"
)
except Exception as e:
print(f"Unexpected error fetching user in login: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="邮箱或密码错误。请重新输入!"
)
if not verify_password(user_credentials.password, user_data['password_hash']):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码不正确,请重新输入。"
)
access_token_expires = timedelta(minutes=30) # ACCESS_TOKEN_EXPIRE_MINUTES from config
access_token = create_access_token(
data={"sub": user_data['id']}, expires_delta=access_token_expires
)
return Token(access_token=access_token)
except HTTPException:
raise
except Exception as e:
print(f"Unexpected error in login: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="邮箱或密码错误。请重新输入!")
@router.post("/forgot-password")
async def forgot_password(request: Request, forgot_password_request: ForgotPasswordRequest):
"""
发送密码重置邮件。
"""
try:
try:
res = service_supabase.table('sp_users').select('id').eq('email', forgot_password_request.email).single().execute()
user_data = res.data
except AuthApiError as e:
if "The result contains 0 rows" in e.message:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="邮箱不存在!"
)
else:
print(f"AuthApiError fetching user in forgot_password: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="发送密码重置邮件失败,请稍后再试。"
)
except Exception as e:
print(f"Unexpected error fetching user in forgot_password: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="邮箱不存在!")
user_id = user_data['id']
# Generate a password reset token
reset_token_expires = timedelta(hours=1) # Token valid for 1 hour
reset_token = create_access_token(
data={"sub": user_id, "type": "password_reset"}, expires_delta=reset_token_expires
)
# Construct reset link
# Assuming the frontend reset-password page is at /reset-password
# And it expects a 'token' query parameter
reset_link = f"{request.url.scheme}://{request.url.netloc}/reset-password?token={reset_token}"
email_body = f"您好,\n\n您请求重置密码。请点击以下链接重置您的密码:\n\n{reset_link}\n\n此链接将在1小时后失效。\n\n如果您没有请求此操作,请忽略此邮件。\n\n此致,\nSuperProxy Support"
print(f"Attempting to send password reset email to {forgot_password_request.email}...")
email_sent = send_email(forgot_password_request.email, "密码重置请求", email_body)
if email_sent:
print(f"Password reset email successfully sent to {forgot_password_request.email}.")
return {"message": "密码重置邮件已发送,请检查您的邮箱。"}
else:
print(f"Failed to send password reset email to {forgot_password_request.email}. send_email returned False.")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="发送密码重置邮件失败,请检查服务器日志或邮箱配置。"
)
except HTTPException:
raise
except Exception as e:
print(f"Unexpected error in forgot_password: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="邮箱不存在!")
@router.post("/change-password")
async def change_password(
password_request: ChangePasswordRequest,
current_user: Optional[User] = Depends(get_current_user_from_token) # Only allow token auth for this
):
"""
允许已登录用户修改其密码,或通过重置令牌修改密码。
"""
try:
user_id_to_update = None
if password_request.reset_token:
# Verify reset token
try:
payload = jwt.decode(password_request.reset_token, SECRET_KEY, algorithms=[ALGORITHM])
token_type = payload.get("type")
user_id_to_update = payload.get("sub")
if token_type != "password_reset" or user_id_to_update is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid reset token type.")
except jwt.PyJWTError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset token.")
elif current_user:
user_id_to_update = current_user.id
else:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated or no reset token provided.")
if not user_id_to_update:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not determine user to update.")
hashed_password = get_password_hash(password_request.new_password)
data, count = service_supabase.table('sp_users').update({'password_hash': hashed_password}).eq('id', user_id_to_update).execute()
if data:
return {"message": "Password updated successfully. Please log in again with your new password."}
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update password."
)
except Exception as e:
print(f"Error in change_password: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/reset-password-with-code")
async def reset_password_with_code(reset_request: ResetPasswordWithCodeRequest):
"""
通过邮箱、验证码和新密码重置用户密码。
"""
try:
# 1. Check if user exists
res = service_supabase.table('sp_users').select('id').eq('email', reset_request.email).execute()
if not res.data or len(res.data) == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="此邮箱未注册。"
)
user_id = res.data[0]['id']
# 2. Verify the provided verification code
if not verify_stored_code(reset_request.email, reset_request.verification_code, prefix="reset_"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="验证码不正确或已过期。"
)
# 3. Hash the new password
hashed_password = get_password_hash(reset_request.new_password)
# 4. Update user's password in sp_users table
data, count = service_supabase.table('sp_users').update({'password_hash': hashed_password}).eq('id', user_id).execute()
if data:
return {"message": "密码重置成功!请使用新密码登录。"}
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="密码重置失败,无法更新密码。请联系管理员。"
)
except HTTPException:
raise
except Exception as e:
print(f"Error in reset_password_with_code: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))