""" Security API views module. """ from datetime import timedelta from fastapi import Depends, HTTPException from cbh.api.account.dto import AccountType from cbh.api.account.models import AccountModel, AccountShorten from cbh.api.common.db_requests import get_obj_by_id from cbh.api.common.schemas import EmailRequest from cbh.api.security import security_router from cbh.api.security.db_requests import ( complete_account_registration, save_account, authenticate_account, get_account_by_email, reset_password_obj, create_code_obj, verify_code_obj, ) from cbh.api.security.dto import AccessToken from cbh.api.security.schemas import ( RegisterAccountRequest, LoginAccountRequest, LoginAccountResponse, RegisterCompleteRequest, ResetPasswordConfirmRequest, ) from cbh.api.security.utils import send_password_reset_email from cbh.core.security import PermissionDependency, create_access_token from cbh.core.wrappers import CbhResponseWrapper @security_router.post("/register") async def register_user( data: RegisterAccountRequest, ) -> CbhResponseWrapper[AccountModel]: """ Register a new user account. """ account = await save_account(data) return CbhResponseWrapper(data=account) @security_router.post("/login") async def login(data: LoginAccountRequest) -> CbhResponseWrapper[LoginAccountResponse]: """ Authenticate a user and generate an access token. """ account = await authenticate_account(data) access_token = create_access_token( account.email, str(account.id), account.accountType ) response = LoginAccountResponse( accessToken=AccessToken(value=access_token), account=account, ) return CbhResponseWrapper(data=response) @security_router.post("/verify") async def verify( account: AccountModel = Depends( PermissionDependency([AccountType.ADMIN, AccountType.USER]) ), ) -> CbhResponseWrapper[AccountModel]: """ Verify a user's authentication token. """ return CbhResponseWrapper(data=account) @security_router.post("/login/as/user") async def login_as_user( accountId: str, # _: AccountModel = Depends(PermissionDependency([AccountType.ADMIN])), ) -> CbhResponseWrapper[LoginAccountResponse]: """ Login as a user. """ account = await get_obj_by_id(AccountModel, accountId) if account is None: raise HTTPException(status_code=404, detail="User not found") token = create_access_token(account.email, str(account.id), account.accountType) response = LoginAccountResponse( accessToken=AccessToken(value=token), account=account, ) return CbhResponseWrapper(data=response) @security_router.post("/register/{accountId}/complete") async def register_complete( accountId: str, request: RegisterCompleteRequest, ) -> CbhResponseWrapper[AccountModel]: """ Signup complete. """ account = await complete_account_registration(accountId, request) return CbhResponseWrapper(data=account) @security_router.post("/password/reset/request") async def request_password_reset_code( data: EmailRequest, account: AccountModel = Depends(PermissionDependency(required=False)), ) -> CbhResponseWrapper: """ Reset a user's password. """ account_obj = await get_account_by_email(data.email) if not account_obj: return CbhResponseWrapper(data=None) account_obj = AccountShorten(**account_obj.model_dump()) code = await create_code_obj( account_obj, timedelta(minutes=5), ) await send_password_reset_email(code.id, account_obj) return CbhResponseWrapper() @security_router.post("/password/reset/confirm") async def reset_password_confirm( data: ResetPasswordConfirmRequest, ) -> CbhResponseWrapper[AccountModel]: """ Confirm a user's password reset. """ code = await verify_code_obj(data.code) account = await reset_password_obj(code.account, data.password) return CbhResponseWrapper(data=account)