| from datetime import datetime, timedelta |
| from typing import Optional |
| from fastapi import APIRouter, HTTPException, Depends, status, Request |
| from pydantic import BaseModel |
| from supabase import create_client, Client |
| from gotrue.errors import AuthApiError |
|
|
| from core.config import SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SECRET_KEY, ALGORITHM, VERIFICATION_CODE_EXPIRE_MINUTES |
| from core.models import UserCredentials, ForgotPasswordRequest, Token, User, ChangePasswordRequest, ResetPasswordWithCodeRequest |
| from core.utils import verify_password, get_password_hash, create_access_token, send_email, \ |
| generate_verification_code, store_verification_code, verify_stored_code |
| from core.dependencies import get_current_active_user, get_current_user_from_token |
| import jwt |
|
|
| class EmailRequest(BaseModel): |
| email: str |
|
|
| router = APIRouter() |
|
|
| |
| service_supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
|
|
| @router.post("/send-verification-code") |
| async def send_verification_code(email_request: EmailRequest): |
| """ |
| 生成并发送邮箱验证码。 |
| """ |
| try: |
| |
| res = service_supabase.table('sp_users').select('id').eq('email', email_request.email).execute() |
| if res.data and len(res.data) > 0: |
| |
| return {"message": "此邮箱已被注册,请直接登录或尝试找回密码。"} |
|
|
| code = generate_verification_code() |
| store_verification_code(email_request.email, code) |
|
|
| email_body = f"您好,\n\n您的验证码是:{code}\n\n此验证码将在 {VERIFICATION_CODE_EXPIRE_MINUTES} 分钟后失效。\n\n如果您没有请求此验证码,请忽略此邮件。\n\n此致,\nSuperProxy Support" |
|
|
| try: |
| print(f"Attempting to send verification email to {email_request.email}...") |
| email_sent = send_email(email_request.email, "您的验证码", email_body) |
| if email_sent: |
| print(f"Verification email successfully sent to {email_request.email}.") |
| return {"message": "验证码已发送,请检查您的邮箱。"} |
| else: |
| print(f"Failed to send verification email to {email_request.email}. send_email returned False.") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="发送验证码失败。请联系管理员或检查邮箱配置。" |
| ) |
| except Exception as email_error: |
| print(f"Error sending verification email to {email_request.email}: {email_error}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="发送验证码失败。请检查邮箱配置或联系管理员。" |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Error in send_verification_code: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @router.post("/send-reset-password-code") |
| async def send_reset_password_code(email_request: EmailRequest): |
| """ |
| 生成并发送用于重置密码的邮箱验证码。 |
| """ |
| try: |
| |
| res = service_supabase.table('sp_users').select('id').eq('email', email_request.email).execute() |
| if not res.data or len(res.data) == 0: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="此邮箱未注册。" |
| ) |
|
|
| code = generate_verification_code() |
| store_verification_code(email_request.email, code, prefix="reset_") |
|
|
| email_body = f"您好,\n\n您请求重置密码。您的验证码是:{code}\n\n此验证码将在 {VERIFICATION_CODE_EXPIRE_MINUTES} 分钟后失效。\n\n如果您没有请求此操作,请忽略此邮件。\n\n此致,\nSuperProxy Support" |
|
|
| try: |
| print(f"Attempting to send reset password verification email to {email_request.email}...") |
| email_sent = send_email(email_request.email, "您的重置密码验证码", email_body) |
| if email_sent: |
| print(f"Reset password verification email successfully sent to {email_request.email}.") |
| return {"message": "重置密码验证码已发送,请检查您的邮箱。"} |
| else: |
| print(f"Failed to send reset password verification email to {email_request.email}. send_email returned False.") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="发送重置密码验证码失败。请联系管理员或检查邮箱配置。" |
| ) |
| except Exception as email_error: |
| print(f"Error sending reset password verification email to {email_request.email}: {email_error}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="发送重置密码验证码失败。请检查邮箱配置或联系管理员。" |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Error in send_reset_password_code: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
|
|
| @router.post("/signup") |
| async def signup(user_credentials: UserCredentials, request: Request): |
| try: |
| |
| res = service_supabase.table('sp_users').select('id').eq('email', user_credentials.email).execute() |
| if res.data and len(res.data) > 0: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="此邮箱已被注册。" |
| ) |
| |
| |
| if not verify_stored_code(user_credentials.email, user_credentials.verification_code): |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="验证码不正确或已过期。" |
| ) |
|
|
| hashed_password = get_password_hash(user_credentials.password) |
| |
| |
| data, count = service_supabase.table('sp_users').insert({ |
| "email": user_credentials.email, |
| "password_hash": hashed_password, |
| "created_at": datetime.now().isoformat(), |
| "email_verified": True, |
| "is_admin": False |
| }).execute() |
|
|
| if data: |
| return {"message": "注册成功!您现在可以登录。"} |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="注册失败,无法创建用户。请联系管理员。" |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Error in signup: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @router.get("/verify-email") |
| async def verify_email(token: str): |
| try: |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| email: str = payload.get("sub") |
| token_type: str = payload.get("type") |
| password_hash: str = payload.get("password_hash") |
|
|
| if email is None or token_type != "email_verification" or password_hash is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired verification token.") |
|
|
| |
| res = service_supabase.table('sp_users').select('id').eq('email', email).execute() |
| if res.data and len(res.data) > 0: |
| return {"message": "邮箱已验证,您现在可以登录。"} |
|
|
| |
| data, count = service_supabase.table('sp_users').insert({ |
| "email": email, |
| "password_hash": password_hash, |
| "created_at": datetime.now().isoformat(), |
| "email_verified": True, |
| "is_admin": False |
| }).execute() |
|
|
| if data: |
| return {"message": "邮箱验证成功!您现在可以登录。"} |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="邮箱验证失败,无法创建用户。请联系管理员。" |
| ) |
| except jwt.PyJWTError: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired verification token.") |
| except Exception as e: |
| print(f"Error in verify_email: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @router.post("/login", response_model=Token) |
| async def login(user_credentials: UserCredentials): |
| try: |
| try: |
| res = service_supabase.table('sp_users').select('id, email, password_hash').eq('email', user_credentials.email).single().execute() |
| user_data = res.data |
| except AuthApiError as e: |
| if "The result contains 0 rows" in e.message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="用户名不存在。" |
| ) |
| else: |
| print(f"AuthApiError fetching user in login: {e}") |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="用户名或密码不正确,请重新输入。" |
| ) |
| except Exception as e: |
| print(f"Unexpected error fetching user in login: {e}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="邮箱或密码错误。请重新输入!" |
| ) |
| |
| if not verify_password(user_credentials.password, user_data['password_hash']): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="用户名或密码不正确,请重新输入。" |
| ) |
| |
| access_token_expires = timedelta(minutes=30) |
| access_token = create_access_token( |
| data={"sub": user_data['id']}, expires_delta=access_token_expires |
| ) |
| return Token(access_token=access_token) |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Unexpected error in login: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="邮箱或密码错误。请重新输入!") |
|
|
| @router.post("/forgot-password") |
| async def forgot_password(request: Request, forgot_password_request: ForgotPasswordRequest): |
| """ |
| 发送密码重置邮件。 |
| """ |
| try: |
| try: |
| res = service_supabase.table('sp_users').select('id').eq('email', forgot_password_request.email).single().execute() |
| user_data = res.data |
| except AuthApiError as e: |
| if "The result contains 0 rows" in e.message: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="邮箱不存在!" |
| ) |
| else: |
| print(f"AuthApiError fetching user in forgot_password: {e}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="发送密码重置邮件失败,请稍后再试。" |
| ) |
| except Exception as e: |
| print(f"Unexpected error fetching user in forgot_password: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="邮箱不存在!") |
| |
| user_id = user_data['id'] |
| |
| |
| reset_token_expires = timedelta(hours=1) |
| reset_token = create_access_token( |
| data={"sub": user_id, "type": "password_reset"}, expires_delta=reset_token_expires |
| ) |
| |
| |
| |
| |
| reset_link = f"{request.url.scheme}://{request.url.netloc}/reset-password?token={reset_token}" |
| |
| email_body = f"您好,\n\n您请求重置密码。请点击以下链接重置您的密码:\n\n{reset_link}\n\n此链接将在1小时后失效。\n\n如果您没有请求此操作,请忽略此邮件。\n\n此致,\nSuperProxy Support" |
| |
| print(f"Attempting to send password reset email to {forgot_password_request.email}...") |
| email_sent = send_email(forgot_password_request.email, "密码重置请求", email_body) |
| if email_sent: |
| print(f"Password reset email successfully sent to {forgot_password_request.email}.") |
| return {"message": "密码重置邮件已发送,请检查您的邮箱。"} |
| else: |
| print(f"Failed to send password reset email to {forgot_password_request.email}. send_email returned False.") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="发送密码重置邮件失败,请检查服务器日志或邮箱配置。" |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Unexpected error in forgot_password: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="邮箱不存在!") |
|
|
| @router.post("/change-password") |
| async def change_password( |
| password_request: ChangePasswordRequest, |
| current_user: Optional[User] = Depends(get_current_user_from_token) |
| ): |
| """ |
| 允许已登录用户修改其密码,或通过重置令牌修改密码。 |
| """ |
| try: |
| user_id_to_update = None |
| if password_request.reset_token: |
| |
| try: |
| payload = jwt.decode(password_request.reset_token, SECRET_KEY, algorithms=[ALGORITHM]) |
| token_type = payload.get("type") |
| user_id_to_update = payload.get("sub") |
| if token_type != "password_reset" or user_id_to_update is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid reset token type.") |
| except jwt.PyJWTError: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset token.") |
| elif current_user: |
| user_id_to_update = current_user.id |
| else: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated or no reset token provided.") |
|
|
| if not user_id_to_update: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not determine user to update.") |
|
|
| hashed_password = get_password_hash(password_request.new_password) |
| |
| data, count = service_supabase.table('sp_users').update({'password_hash': hashed_password}).eq('id', user_id_to_update).execute() |
|
|
| if data: |
| return {"message": "Password updated successfully. Please log in again with your new password."} |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="Failed to update password." |
| ) |
| except Exception as e: |
| print(f"Error in change_password: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
| @router.post("/reset-password-with-code") |
| async def reset_password_with_code(reset_request: ResetPasswordWithCodeRequest): |
| """ |
| 通过邮箱、验证码和新密码重置用户密码。 |
| """ |
| try: |
| |
| res = service_supabase.table('sp_users').select('id').eq('email', reset_request.email).execute() |
| if not res.data or len(res.data) == 0: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail="此邮箱未注册。" |
| ) |
| user_id = res.data[0]['id'] |
|
|
| |
| if not verify_stored_code(reset_request.email, reset_request.verification_code, prefix="reset_"): |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="验证码不正确或已过期。" |
| ) |
|
|
| |
| hashed_password = get_password_hash(reset_request.new_password) |
| |
| |
| data, count = service_supabase.table('sp_users').update({'password_hash': hashed_password}).eq('id', user_id).execute() |
|
|
| if data: |
| return {"message": "密码重置成功!请使用新密码登录。"} |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="密码重置失败,无法更新密码。请联系管理员。" |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Error in reset_password_with_code: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|