Spaces:
Sleeping
Sleeping
Commit ·
1a3a466
1
Parent(s): adb1c6b
updated get otp to support login and register, implemented protected apis and other improvements
Browse files- app/controllers/merchant_controller.py +13 -12
- app/controllers/merchant_login_controller.py +35 -15
- app/models/merchant.py +20 -2
- app/services/merchant_services.py +49 -7
- app/utils/auth_utils.py +12 -5
app/controllers/merchant_controller.py
CHANGED
|
@@ -2,7 +2,7 @@ import logging
|
|
| 2 |
from typing import Optional
|
| 3 |
from datetime import datetime, timedelta
|
| 4 |
from fastapi import APIRouter, HTTPException, Query, Depends, status, Body
|
| 5 |
-
from app.models.merchant import MerchantRegister
|
| 6 |
from app.services.merchant_services import generate_and_send_otp, get_otp_from_cache, verify_otp_and_save_merchant, validate_merchant
|
| 7 |
|
| 8 |
|
|
@@ -68,35 +68,36 @@ async def customer_otp_verification(user: MerchantRegister = Body(...)):
|
|
| 68 |
logger.error("Unexpected error while verifying and registering merchant: %s", e)
|
| 69 |
raise HTTPException(status_code=500, detail="Failed to verify and register merchant") from e
|
| 70 |
|
| 71 |
-
@router.
|
| 72 |
-
async def get_otps(
|
| 73 |
"""
|
| 74 |
API endpoint to retrieve the email or SMS OTP from Redis cache.
|
| 75 |
|
| 76 |
Args:
|
| 77 |
-
|
| 78 |
|
| 79 |
Returns:
|
| 80 |
dict: The OTP data if found, or an error message if not found.
|
| 81 |
"""
|
| 82 |
try:
|
| 83 |
-
logger.info("Retrieving OTP for
|
|
|
|
|
|
|
|
|
|
| 84 |
|
| 85 |
# Retrieve OTP from Redis
|
| 86 |
-
otp_data = await get_otp_from_cache(
|
| 87 |
if not otp_data:
|
| 88 |
raise HTTPException(status_code=404, detail="OTP not found or expired")
|
| 89 |
|
| 90 |
-
return {"
|
| 91 |
|
| 92 |
except HTTPException as e:
|
| 93 |
-
logger.error("Failed to retrieve OTP for
|
| 94 |
raise e
|
| 95 |
except Exception as e:
|
| 96 |
-
logger.error("Unexpected error while retrieving OTP for
|
| 97 |
-
raise HTTPException(status_code=500, detail="Failed to retrieve OTP") from e
|
| 98 |
-
|
| 99 |
-
from app.services.merchant_services import generate_and_send_otp
|
| 100 |
|
| 101 |
@router.post("/otp/regenerate")
|
| 102 |
async def regenerate_otp(user: MerchantRegister = Body(...)):
|
|
|
|
| 2 |
from typing import Optional
|
| 3 |
from datetime import datetime, timedelta
|
| 4 |
from fastapi import APIRouter, HTTPException, Query, Depends, status, Body
|
| 5 |
+
from app.models.merchant import MerchantRegister, GetOtpRequest
|
| 6 |
from app.services.merchant_services import generate_and_send_otp, get_otp_from_cache, verify_otp_and_save_merchant, validate_merchant
|
| 7 |
|
| 8 |
|
|
|
|
| 68 |
logger.error("Unexpected error while verifying and registering merchant: %s", e)
|
| 69 |
raise HTTPException(status_code=500, detail="Failed to verify and register merchant") from e
|
| 70 |
|
| 71 |
+
@router.post("/otp/get")
|
| 72 |
+
async def get_otps(payload: GetOtpRequest):
|
| 73 |
"""
|
| 74 |
API endpoint to retrieve the email or SMS OTP from Redis cache.
|
| 75 |
|
| 76 |
Args:
|
| 77 |
+
payload (GetOtpRequest): The request payload containing the identifier and type.
|
| 78 |
|
| 79 |
Returns:
|
| 80 |
dict: The OTP data if found, or an error message if not found.
|
| 81 |
"""
|
| 82 |
try:
|
| 83 |
+
logger.info("Retrieving OTP for identifier: %s, type: %s", payload.identifier, payload.type)
|
| 84 |
+
|
| 85 |
+
# Construct the Redis key based on the type
|
| 86 |
+
redis_key = f"otp:{payload.type}:{payload.identifier}"
|
| 87 |
|
| 88 |
# Retrieve OTP from Redis
|
| 89 |
+
otp_data = await get_otp_from_cache(redis_key)
|
| 90 |
if not otp_data:
|
| 91 |
raise HTTPException(status_code=404, detail="OTP not found or expired")
|
| 92 |
|
| 93 |
+
return {"identifier": payload.identifier, "type": payload.type, "otp_data": otp_data}
|
| 94 |
|
| 95 |
except HTTPException as e:
|
| 96 |
+
logger.error("Failed to retrieve OTP for identifier %s, type %s: %s", payload.identifier, payload.type, e.detail)
|
| 97 |
raise e
|
| 98 |
except Exception as e:
|
| 99 |
+
logger.error("Unexpected error while retrieving OTP for identifier %s, type %s: %s", payload.identifier, payload.type, e)
|
| 100 |
+
raise HTTPException(status_code=500, detail="Failed to retrieve OTP") from e
|
|
|
|
|
|
|
| 101 |
|
| 102 |
@router.post("/otp/regenerate")
|
| 103 |
async def regenerate_otp(user: MerchantRegister = Body(...)):
|
app/controllers/merchant_login_controller.py
CHANGED
|
@@ -3,9 +3,12 @@ from app.services.merchant_services import (
|
|
| 3 |
login_service,
|
| 4 |
refresh_token_service,
|
| 5 |
logout_service,
|
|
|
|
| 6 |
)
|
| 7 |
from fastapi.security import OAuth2PasswordRequestForm
|
| 8 |
from fastapi import APIRouter, HTTPException, Query, Depends, status, Body
|
|
|
|
|
|
|
| 9 |
import logging
|
| 10 |
|
| 11 |
# Initialize router and logger
|
|
@@ -13,18 +16,18 @@ router = APIRouter(prefix="/merchant")
|
|
| 13 |
logger = logging.getLogger(__name__)
|
| 14 |
|
| 15 |
@router.post("/login/otp")
|
| 16 |
-
async def generate_login_otp(
|
| 17 |
"""
|
| 18 |
Generate and send login OTP to the provided email or mobile number.
|
| 19 |
|
| 20 |
Args:
|
| 21 |
-
|
| 22 |
|
| 23 |
Returns:
|
| 24 |
dict: Confirmation message indicating OTP has been sent.
|
| 25 |
"""
|
| 26 |
try:
|
| 27 |
-
result = await generate_login_otp_service(identifier)
|
| 28 |
return result
|
| 29 |
except HTTPException as e:
|
| 30 |
logger.error("Failed to generate login OTP: %s", e.detail)
|
|
@@ -57,33 +60,29 @@ async def login(user: OAuth2PasswordRequestForm = Depends()):
|
|
| 57 |
|
| 58 |
|
| 59 |
@router.post("/token/refresh")
|
| 60 |
-
async def refresh_token(
|
| 61 |
-
identifier: str = Body(..., description="Email or Mobile number"),
|
| 62 |
-
refresh_token: str = Body(..., description="Refresh token")
|
| 63 |
-
):
|
| 64 |
"""
|
| 65 |
Refresh access and refresh tokens.
|
| 66 |
|
| 67 |
Args:
|
| 68 |
-
|
| 69 |
-
refresh_token (str): The refresh token.
|
| 70 |
|
| 71 |
Returns:
|
| 72 |
dict: New access and refresh tokens.
|
| 73 |
"""
|
| 74 |
try:
|
| 75 |
-
result = await refresh_token_service(identifier, refresh_token)
|
| 76 |
return result
|
| 77 |
except HTTPException as e:
|
| 78 |
-
logger.error("Failed to refresh token for identifier %s: %s", identifier, e.detail)
|
| 79 |
raise e
|
| 80 |
except Exception as e:
|
| 81 |
-
logger.error("Unexpected error while refreshing token for identifier %s: %s", identifier, e)
|
| 82 |
raise HTTPException(status_code=500, detail="Failed to refresh token") from e
|
| 83 |
|
| 84 |
|
| 85 |
@router.post("/logout")
|
| 86 |
-
async def logout(
|
| 87 |
"""
|
| 88 |
Logout the user by invalidating access and refresh tokens.
|
| 89 |
|
|
@@ -94,11 +93,32 @@ async def logout(identifier: str = Body(..., description="Email or Mobile number
|
|
| 94 |
dict: Confirmation message.
|
| 95 |
"""
|
| 96 |
try:
|
| 97 |
-
result = await logout_service(identifier)
|
| 98 |
return result
|
| 99 |
except HTTPException as e:
|
| 100 |
logger.error("Failed to logout: %s", e.detail)
|
| 101 |
raise e
|
| 102 |
except Exception as e:
|
| 103 |
logger.error("Unexpected error during logout: %s", e)
|
| 104 |
-
raise HTTPException(status_code=500, detail="Failed to logout") from e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 3 |
login_service,
|
| 4 |
refresh_token_service,
|
| 5 |
logout_service,
|
| 6 |
+
get_user_details_service,
|
| 7 |
)
|
| 8 |
from fastapi.security import OAuth2PasswordRequestForm
|
| 9 |
from fastapi import APIRouter, HTTPException, Query, Depends, status, Body
|
| 10 |
+
from app.models.merchant import LoginOtpRequest, RefreshTokenRequest, MerchantDetailsResponse
|
| 11 |
+
from app.utils.auth_utils import verify_token
|
| 12 |
import logging
|
| 13 |
|
| 14 |
# Initialize router and logger
|
|
|
|
| 16 |
logger = logging.getLogger(__name__)
|
| 17 |
|
| 18 |
@router.post("/login/otp")
|
| 19 |
+
async def generate_login_otp(payload: LoginOtpRequest):
|
| 20 |
"""
|
| 21 |
Generate and send login OTP to the provided email or mobile number.
|
| 22 |
|
| 23 |
Args:
|
| 24 |
+
payload (GenerateLoginOtpRequest): The request payload containing the identifier.
|
| 25 |
|
| 26 |
Returns:
|
| 27 |
dict: Confirmation message indicating OTP has been sent.
|
| 28 |
"""
|
| 29 |
try:
|
| 30 |
+
result = await generate_login_otp_service(payload.identifier)
|
| 31 |
return result
|
| 32 |
except HTTPException as e:
|
| 33 |
logger.error("Failed to generate login OTP: %s", e.detail)
|
|
|
|
| 60 |
|
| 61 |
|
| 62 |
@router.post("/token/refresh")
|
| 63 |
+
async def refresh_token(payload: RefreshTokenRequest):
|
|
|
|
|
|
|
|
|
|
| 64 |
"""
|
| 65 |
Refresh access and refresh tokens.
|
| 66 |
|
| 67 |
Args:
|
| 68 |
+
payload (RefreshTokenRequest): The request payload containing the identifier and refresh token.
|
|
|
|
| 69 |
|
| 70 |
Returns:
|
| 71 |
dict: New access and refresh tokens.
|
| 72 |
"""
|
| 73 |
try:
|
| 74 |
+
result = await refresh_token_service(payload.identifier, payload.refresh_token)
|
| 75 |
return result
|
| 76 |
except HTTPException as e:
|
| 77 |
+
logger.error("Failed to refresh token for identifier %s: %s", payload.identifier, e.detail)
|
| 78 |
raise e
|
| 79 |
except Exception as e:
|
| 80 |
+
logger.error("Unexpected error while refreshing token for identifier %s: %s", payload.identifier, e)
|
| 81 |
raise HTTPException(status_code=500, detail="Failed to refresh token") from e
|
| 82 |
|
| 83 |
|
| 84 |
@router.post("/logout")
|
| 85 |
+
async def logout(payload: LoginOtpRequest):
|
| 86 |
"""
|
| 87 |
Logout the user by invalidating access and refresh tokens.
|
| 88 |
|
|
|
|
| 93 |
dict: Confirmation message.
|
| 94 |
"""
|
| 95 |
try:
|
| 96 |
+
result = await logout_service(payload.identifier)
|
| 97 |
return result
|
| 98 |
except HTTPException as e:
|
| 99 |
logger.error("Failed to logout: %s", e.detail)
|
| 100 |
raise e
|
| 101 |
except Exception as e:
|
| 102 |
logger.error("Unexpected error during logout: %s", e)
|
| 103 |
+
raise HTTPException(status_code=500, detail="Failed to logout") from e
|
| 104 |
+
|
| 105 |
+
@router.get("/user/details", response_model=MerchantDetailsResponse, dependencies=[Depends(verify_token)])
|
| 106 |
+
async def get_user_details(identifier: str = Query(..., description="Email or Mobile number")):
|
| 107 |
+
"""
|
| 108 |
+
Get user details using the identifier (email or mobile).
|
| 109 |
+
|
| 110 |
+
Args:
|
| 111 |
+
identifier (str): Email or mobile number.
|
| 112 |
+
|
| 113 |
+
Returns:
|
| 114 |
+
MerchantDetailsResponse: User details from the database.
|
| 115 |
+
"""
|
| 116 |
+
try:
|
| 117 |
+
result = await get_user_details_service(identifier)
|
| 118 |
+
return result
|
| 119 |
+
except HTTPException as e:
|
| 120 |
+
logger.error("Failed to retrieve user details for identifier %s: %s", identifier, e.detail)
|
| 121 |
+
raise e
|
| 122 |
+
except Exception as e:
|
| 123 |
+
logger.error("Unexpected error while retrieving user details for identifier %s: %s", identifier, e)
|
| 124 |
+
raise HTTPException(status_code=500, detail="Failed to retrieve user details") from e
|
app/models/merchant.py
CHANGED
|
@@ -1,5 +1,5 @@
|
|
| 1 |
from pydantic import BaseModel, EmailStr, Field
|
| 2 |
-
from typing import Optional
|
| 3 |
from datetime import datetime
|
| 4 |
|
| 5 |
class MerchantRegister(BaseModel):
|
|
@@ -16,4 +16,22 @@ class MerchantRegister(BaseModel):
|
|
| 16 |
status: Optional[str] = Field("pending-verification", description="The status of the customer account")
|
| 17 |
email_ver_code: Optional[str] = None #(..., description="The email verification code")
|
| 18 |
mobile_ver_code: Optional[str] = None #(..., description="The mobile verification code")
|
| 19 |
-
created_at: Optional[datetime] = Field(default_factory=datetime.now, description="The timestamp when the customer was created")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
from pydantic import BaseModel, EmailStr, Field
|
| 2 |
+
from typing import Optional, Literal
|
| 3 |
from datetime import datetime
|
| 4 |
|
| 5 |
class MerchantRegister(BaseModel):
|
|
|
|
| 16 |
status: Optional[str] = Field("pending-verification", description="The status of the customer account")
|
| 17 |
email_ver_code: Optional[str] = None #(..., description="The email verification code")
|
| 18 |
mobile_ver_code: Optional[str] = None #(..., description="The mobile verification code")
|
| 19 |
+
created_at: Optional[datetime] = Field(default_factory=datetime.now, description="The timestamp when the customer was created")
|
| 20 |
+
|
| 21 |
+
class LoginOtpRequest(BaseModel):
|
| 22 |
+
identifier: str = Field(..., description="Email or Mobile number")
|
| 23 |
+
|
| 24 |
+
class RefreshTokenRequest(BaseModel):
|
| 25 |
+
identifier: str = Field(..., description="Email or Mobile number")
|
| 26 |
+
refresh_token: str = Field(..., description="Refresh token")
|
| 27 |
+
|
| 28 |
+
class GetOtpRequest(BaseModel):
|
| 29 |
+
identifier: str = Field(..., description="The email or mobile number to retrieve the OTP for")
|
| 30 |
+
type: Literal["register", "login"] = Field(..., description="The type of OTP (register or login)")
|
| 31 |
+
|
| 32 |
+
class MerchantDetailsResponse(BaseModel):
|
| 33 |
+
merchant_id: str
|
| 34 |
+
merchant_name: str
|
| 35 |
+
mobile: str
|
| 36 |
+
email: EmailStr
|
| 37 |
+
country: str
|
app/services/merchant_services.py
CHANGED
|
@@ -1,7 +1,7 @@
|
|
| 1 |
import logging
|
| 2 |
import random
|
| 3 |
from fastapi import HTTPException
|
| 4 |
-
from app.models.merchant import MerchantRegister
|
| 5 |
from app.repositories.db_repository import save_merchant, get_merchant_by_name, get_merchant_by_email, get_merchant_by_mobile
|
| 6 |
from app.utils.merchant_utils import generate_tenant_id
|
| 7 |
from app.utils.auth_utils import validate_email, validate_mobile, generate_tokens
|
|
@@ -62,8 +62,8 @@ async def send_sms_email_verification(email: str, mobile: str):
|
|
| 62 |
# Generate a 6-digit OTP
|
| 63 |
otp = ''.join(random.choices("0123456789", k=6))
|
| 64 |
# Store OTP in cache with a 15-minute expiry
|
| 65 |
-
await set_otp(f"otp:
|
| 66 |
-
await set_otp(f"otp:
|
| 67 |
# Send the OTP to the email address
|
| 68 |
# (You would typically use an email service here)
|
| 69 |
# For demonstration, we'll just log the OTP
|
|
@@ -83,8 +83,8 @@ async def verify_otp_and_save_merchant(merchant: MerchantRegister) -> dict:
|
|
| 83 |
errors = []
|
| 84 |
|
| 85 |
# Retrieve OTPs from cache
|
| 86 |
-
email_otp_data = await get_otp(f"otp:
|
| 87 |
-
mobile_otp_data = await get_otp(f"otp:
|
| 88 |
|
| 89 |
# Verify email OTP
|
| 90 |
email_verified = email_otp_data and email_otp_data["otp"] == merchant.email_ver_code
|
|
@@ -143,8 +143,8 @@ async def generate_and_send_otp(email: str, mobile: str) -> None:
|
|
| 143 |
otp = ''.join(random.choices("0123456789", k=6))
|
| 144 |
|
| 145 |
# Store OTP in cache with a 15-minute expiry
|
| 146 |
-
await set_otp(f"otp:
|
| 147 |
-
await set_otp(f"otp:
|
| 148 |
|
| 149 |
# Log the OTP generation
|
| 150 |
logger.info("Generated OTP for %s & %s: %s", email, mobile, otp)
|
|
@@ -254,3 +254,45 @@ async def logout_service(identifier: str) -> dict:
|
|
| 254 |
await delete_cache(f"token:login:refresh:{identifier}")
|
| 255 |
|
| 256 |
return {"message": "Successfully logged out."}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
import logging
|
| 2 |
import random
|
| 3 |
from fastapi import HTTPException
|
| 4 |
+
from app.models.merchant import MerchantRegister, MerchantDetailsResponse
|
| 5 |
from app.repositories.db_repository import save_merchant, get_merchant_by_name, get_merchant_by_email, get_merchant_by_mobile
|
| 6 |
from app.utils.merchant_utils import generate_tenant_id
|
| 7 |
from app.utils.auth_utils import validate_email, validate_mobile, generate_tokens
|
|
|
|
| 62 |
# Generate a 6-digit OTP
|
| 63 |
otp = ''.join(random.choices("0123456789", k=6))
|
| 64 |
# Store OTP in cache with a 15-minute expiry
|
| 65 |
+
await set_otp(f"otp:register:{email}", {"otp": otp, "expiry_duration": 15 * 60}) # 15 minutes in seconds
|
| 66 |
+
await set_otp(f"otp:register:{mobile}", {"otp": otp, "expiry_duration": 15 * 60}) # 15 minutes in seconds
|
| 67 |
# Send the OTP to the email address
|
| 68 |
# (You would typically use an email service here)
|
| 69 |
# For demonstration, we'll just log the OTP
|
|
|
|
| 83 |
errors = []
|
| 84 |
|
| 85 |
# Retrieve OTPs from cache
|
| 86 |
+
email_otp_data = await get_otp(f"otp:register:{merchant.email}")
|
| 87 |
+
mobile_otp_data = await get_otp(f"otp:register:{merchant.mobile}")
|
| 88 |
|
| 89 |
# Verify email OTP
|
| 90 |
email_verified = email_otp_data and email_otp_data["otp"] == merchant.email_ver_code
|
|
|
|
| 143 |
otp = ''.join(random.choices("0123456789", k=6))
|
| 144 |
|
| 145 |
# Store OTP in cache with a 15-minute expiry
|
| 146 |
+
await set_otp(f"otp:register:{email}", {"otp": otp, "expiry_duration": 15 * 60}) # 15 minutes in seconds
|
| 147 |
+
await set_otp(f"otp:register:{mobile}", {"otp": otp, "expiry_duration": 15 * 60})
|
| 148 |
|
| 149 |
# Log the OTP generation
|
| 150 |
logger.info("Generated OTP for %s & %s: %s", email, mobile, otp)
|
|
|
|
| 254 |
await delete_cache(f"token:login:refresh:{identifier}")
|
| 255 |
|
| 256 |
return {"message": "Successfully logged out."}
|
| 257 |
+
|
| 258 |
+
async def get_user_details_service(identifier: str) -> MerchantDetailsResponse:
|
| 259 |
+
"""
|
| 260 |
+
Retrieve user details from the database using the identifier.
|
| 261 |
+
|
| 262 |
+
Args:
|
| 263 |
+
identifier (str): Email or mobile number.
|
| 264 |
+
|
| 265 |
+
Returns:
|
| 266 |
+
MerchantDetailsResponse: User details if found.
|
| 267 |
+
|
| 268 |
+
Raises:
|
| 269 |
+
HTTPException: If the user is not found.
|
| 270 |
+
"""
|
| 271 |
+
try:
|
| 272 |
+
if "@" in identifier:
|
| 273 |
+
# Fetch user by email
|
| 274 |
+
user = await get_merchant_by_email(identifier)
|
| 275 |
+
else:
|
| 276 |
+
# Fetch user by mobile
|
| 277 |
+
user = await get_merchant_by_mobile(identifier)
|
| 278 |
+
|
| 279 |
+
if not user:
|
| 280 |
+
raise HTTPException(status_code=404, detail="User not found")
|
| 281 |
+
|
| 282 |
+
# Convert ObjectId to string if present
|
| 283 |
+
if "_id" in user:
|
| 284 |
+
user["_id"] = str(user["_id"])
|
| 285 |
+
|
| 286 |
+
# Extract only the required fields
|
| 287 |
+
response_data = MerchantDetailsResponse(
|
| 288 |
+
merchant_id=user.get("merchant_id"),
|
| 289 |
+
merchant_name=user.get("merchant_name"),
|
| 290 |
+
mobile=user.get("mobile"),
|
| 291 |
+
email=user.get("email"),
|
| 292 |
+
country=user.get("country"),
|
| 293 |
+
)
|
| 294 |
+
|
| 295 |
+
return response_data
|
| 296 |
+
except Exception as e:
|
| 297 |
+
logger.error("Failed to retrieve user details for identifier %s: %s", identifier, e)
|
| 298 |
+
raise HTTPException(status_code=500, detail="Failed to retrieve user details") from e
|
app/utils/auth_utils.py
CHANGED
|
@@ -1,8 +1,11 @@
|
|
| 1 |
import re
|
| 2 |
import logging
|
| 3 |
-
from fastapi import HTTPException
|
| 4 |
from jose import JWTError, jwt
|
| 5 |
from datetime import datetime, timedelta
|
|
|
|
|
|
|
|
|
|
| 6 |
|
| 7 |
SECRET_KEY = "B@@kMy$er^!(e"
|
| 8 |
ALGORITHM = "HS256"
|
|
@@ -51,18 +54,22 @@ def generate_tokens(identifier: str) -> dict:
|
|
| 51 |
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
|
| 52 |
|
| 53 |
|
| 54 |
-
def verify_token(token: str) -> dict:
|
| 55 |
"""
|
| 56 |
-
Verify
|
| 57 |
|
| 58 |
Args:
|
| 59 |
-
token (str): The
|
| 60 |
|
| 61 |
Returns:
|
| 62 |
dict: Decoded token data.
|
|
|
|
|
|
|
|
|
|
| 63 |
"""
|
| 64 |
try:
|
| 65 |
-
|
|
|
|
| 66 |
except jwt.ExpiredSignatureError:
|
| 67 |
raise HTTPException(status_code=401, detail="Token has expired")
|
| 68 |
except jwt.InvalidTokenError:
|
|
|
|
| 1 |
import re
|
| 2 |
import logging
|
| 3 |
+
from fastapi import HTTPException, Security # Import Security here
|
| 4 |
from jose import JWTError, jwt
|
| 5 |
from datetime import datetime, timedelta
|
| 6 |
+
from fastapi.security import HTTPBearer
|
| 7 |
+
|
| 8 |
+
security = HTTPBearer()
|
| 9 |
|
| 10 |
SECRET_KEY = "B@@kMy$er^!(e"
|
| 11 |
ALGORITHM = "HS256"
|
|
|
|
| 54 |
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
|
| 55 |
|
| 56 |
|
| 57 |
+
def verify_token(token: str = Security(security)) -> dict:
|
| 58 |
"""
|
| 59 |
+
Verify the access token.
|
| 60 |
|
| 61 |
Args:
|
| 62 |
+
token (str): The access token.
|
| 63 |
|
| 64 |
Returns:
|
| 65 |
dict: Decoded token data.
|
| 66 |
+
|
| 67 |
+
Raises:
|
| 68 |
+
HTTPException: If the token is invalid or expired.
|
| 69 |
"""
|
| 70 |
try:
|
| 71 |
+
payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
|
| 72 |
+
return payload
|
| 73 |
except jwt.ExpiredSignatureError:
|
| 74 |
raise HTTPException(status_code=401, detail="Token has expired")
|
| 75 |
except jwt.InvalidTokenError:
|