spark / cbh /api /security /views.py
brestok's picture
fix
e2a8a40
"""
Security API views module.
"""
from datetime import timedelta
from fastapi import Depends, HTTPException
from cbh.api.account.dto import AccountType
from cbh.api.account.models import AccountModel, AccountShorten
from cbh.api.common.db_requests import get_obj_by_id
from cbh.api.common.schemas import EmailRequest
from cbh.api.security import security_router
from cbh.api.security.db_requests import (
complete_account_registration,
save_account,
authenticate_account, get_account_by_email, reset_password_obj, create_code_obj, verify_code_obj,
)
from cbh.api.security.dto import AccessToken
from cbh.api.security.schemas import (
RegisterAccountRequest,
LoginAccountRequest,
LoginAccountResponse,
RegisterCompleteRequest, ResetPasswordConfirmRequest,
)
from cbh.api.security.utils import send_password_reset_email
from cbh.core.security import PermissionDependency, create_access_token
from cbh.core.wrappers import CbhResponseWrapper
@security_router.post("/register")
async def register_user(
data: RegisterAccountRequest,
) -> CbhResponseWrapper[AccountModel]:
"""
Register a new user account.
"""
account = await save_account(data)
return CbhResponseWrapper(data=account)
@security_router.post("/login")
async def login(data: LoginAccountRequest) -> CbhResponseWrapper[LoginAccountResponse]:
"""
Authenticate a user and generate an access token.
"""
account = await authenticate_account(data)
access_token = create_access_token(
account.email, str(account.id), account.accountType
)
response = LoginAccountResponse(
accessToken=AccessToken(value=access_token),
account=account,
)
return CbhResponseWrapper(data=response)
@security_router.post("/verify")
async def verify(
account: AccountModel = Depends(
PermissionDependency([AccountType.ADMIN, AccountType.USER])
),
) -> CbhResponseWrapper[AccountModel]:
"""
Verify a user's authentication token.
"""
return CbhResponseWrapper(data=account)
@security_router.post("/login/as/user")
async def login_as_user(
accountId: str,
# _: AccountModel = Depends(PermissionDependency([AccountType.ADMIN])),
) -> CbhResponseWrapper[LoginAccountResponse]:
"""
Login as a user.
"""
account = await get_obj_by_id(AccountModel, accountId)
if account is None:
raise HTTPException(status_code=404, detail="User not found")
token = create_access_token(account.email, str(account.id), account.accountType)
response = LoginAccountResponse(
accessToken=AccessToken(value=token),
account=account,
)
return CbhResponseWrapper(data=response)
@security_router.post("/register/{accountId}/complete")
async def register_complete(
accountId: str,
request: RegisterCompleteRequest,
) -> CbhResponseWrapper[AccountModel]:
"""
Signup complete.
"""
account = await complete_account_registration(accountId, request)
return CbhResponseWrapper(data=account)
@security_router.post("/password/reset/request")
async def request_password_reset_code(
data: EmailRequest,
account: AccountModel = Depends(PermissionDependency(required=False)),
) -> CbhResponseWrapper:
"""
Reset a user's password.
"""
account_obj = await get_account_by_email(data.email)
if not account_obj:
return CbhResponseWrapper(data=None)
account_obj = AccountShorten(**account_obj.model_dump())
code = await create_code_obj(
account_obj,
timedelta(minutes=5),
)
await send_password_reset_email(code.id, account_obj)
return CbhResponseWrapper()
@security_router.post("/password/reset/confirm")
async def reset_password_confirm(
data: ResetPasswordConfirmRequest,
) -> CbhResponseWrapper[AccountModel]:
"""
Confirm a user's password reset.
"""
code = await verify_code_obj(data.code)
account = await reset_password_obj(code.account, data.password)
return CbhResponseWrapper(data=account)