""" Database requests module for security functionality. """ import asyncio from datetime import datetime, timedelta from fastapi import HTTPException from passlib.context import CryptContext from pydantic import EmailStr from cbh.api.account.dto import AccountStatus, AccountType, CoachOpportunity from cbh.api.account.models import AccountModel, AccountShorten from cbh.api.availability.models import AvailabilityModel from cbh.api.security.models import VerificationCodeModel from cbh.api.security.schemas import ( LoginAccountRequest, RegisterAccountRequest, RegisterCompleteRequest, ) from cbh.api.common.db_requests import get_obj_by_id from cbh.core.config import settings from cbh.core.security import verify_password from cbh.core.wrappers import background_task async def check_unique_fields_existence( name: str, new_value: EmailStr | str, current_value: str | None = None ) -> None: """ Check if a field value already exists in the database to ensure uniqueness. """ if new_value == current_value or not new_value: return account = await settings.DB_CLIENT.accounts.find_one( {name: {"$regex": f"^{str(new_value)}$", "$options": "i"}} ) if account: detail = f'Account with {name} "{new_value}" already exists.' raise HTTPException(status_code=400, detail=detail) async def save_account(data: RegisterAccountRequest) -> AccountModel: """ Create a new user account in the database. """ await check_unique_fields_existence("email", data.email) opportunity = None status = AccountStatus.ACTIVE if data.accountType == AccountType.COACH: opportunity = CoachOpportunity.GENERAL status = AccountStatus.INACTIVE account = AccountModel( name=f"{data.firstName} {data.lastName}", email=data.email, phone=data.phone, password=data.password, datetimeUpdated=datetime.now(), accountType=data.accountType, opportunity=opportunity, status=status, ) await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) if account.accountType == AccountType.COACH: asyncio.create_task(create_availability_obj(account)) return account async def create_availability_obj(account: AccountModel) -> AvailabilityModel: """ Create a new availability object. """ availability = AvailabilityModel(coach=AccountShorten(**account.to_mongo())) await settings.DB_CLIENT.availabilities.insert_one(availability.to_mongo()) return availability async def authenticate_account(data: LoginAccountRequest) -> AccountModel: """ Authenticate a user account using mail and password. """ account = await settings.DB_CLIENT.accounts.find_one( {"email": {"$regex": f"^{data.email}$", "$options": "i"}} ) if account is None: raise HTTPException(status_code=404, detail="Invalid email or password.") account = AccountModel.from_mongo(account) if not verify_password(data.password, account.password): raise HTTPException(status_code=401, detail="Invalid email or password") return account async def get_account_by_email( email: EmailStr, raise_exception: bool = True ) -> AccountModel | None: """ Verify if an account exists. """ account = await settings.DB_CLIENT.accounts.find_one( {"email": {"$regex": f"^{email}$", "$options": "i"}} ) if account is None and raise_exception: raise HTTPException(status_code=404, detail="Account not found") elif account is None: return None return AccountModel.from_mongo(account) async def complete_account_registration( accountId: str, request: RegisterCompleteRequest, ) -> AccountModel: """ Complete account registration. """ account = await get_obj_by_id(AccountModel, accountId) if account is None: raise HTTPException(status_code=404, detail="Account not found") elif account.status != AccountStatus.PENDING: raise HTTPException(status_code=400, detail="Account already registered") account.name = f"{request.firstName} {request.lastName}" account.password = request.password account.status = AccountStatus.ACTIVE account_shorten = AccountShorten( id=account.id, name=account.name, email=account.email, status=account.status, accountType=account.accountType, ) await asyncio.gather( settings.DB_CLIENT.accounts.update_one( {"id": account.id}, {"$set": account.to_mongo()} ), settings.DB_CLIENT.calls.update_one( { "customer.email": account.email, }, {"$set": {"customer": account_shorten.model_dump(mode='json')}}, ), ) return account async def reset_password_obj(account: AccountShorten, password: str) -> AccountModel: """ Reset a password object. """ password = CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password) await settings.DB_CLIENT.accounts.update_one( {"id": account.id}, {"$set": {"password": password}}, ) return account async def verify_code_obj( code_: str, exception: bool = True, is_delete: bool = True ) -> VerificationCodeModel: """ Verify a code object. """ code = ( await settings.DB_CLIENT.verificationcodes.find( {"id": code_}, ) .sort("_id", -1) .to_list(length=1) ) code = VerificationCodeModel.from_mongo(code[0]) if code else None error_msg = "Invalid or expired invitation link." if not code and exception: raise HTTPException( status_code=404, detail=error_msg, ) if ( code.expiresAt < datetime.now() and exception ): raise HTTPException(status_code=410, detail=error_msg) if is_delete: asyncio.create_task(delete_code(code)) return code async def create_code_obj( account: AccountShorten, time_delta: timedelta ) -> VerificationCodeModel: """ Create a code object. """ prev_code = ( await settings.DB_CLIENT.verificationcodes.find( { "account.id": account.id, } ) .sort("_id", -1) .to_list(length=1) ) prev_code = VerificationCodeModel.from_mongo(prev_code[0]) if prev_code else None if prev_code and prev_code.datetimeInserted > datetime.now() - timedelta(minutes=1): raise HTTPException(status_code=429, detail="Too many requests") code = VerificationCodeModel( account=account, expiresAt=datetime.now() + time_delta, ) await settings.DB_CLIENT.verificationcodes.insert_one(code.to_mongo()) return code @background_task() async def delete_code(code: VerificationCodeModel | None): """ Delete a code object. """ if code: await settings.DB_CLIENT.verificationcodes.delete_one({"id": code.id})