| """ |
| Database requests module for security functionality. |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timedelta |
|
|
| from fastapi import HTTPException |
| from passlib.context import CryptContext |
| from pydantic import EmailStr |
|
|
| from cbh.api.account.dto import AccountStatus, AccountType, CoachOpportunity |
| from cbh.api.account.models import AccountModel, AccountShorten |
| from cbh.api.availability.models import AvailabilityModel |
| from cbh.api.security.models import VerificationCodeModel |
| from cbh.api.security.schemas import ( |
| LoginAccountRequest, |
| RegisterAccountRequest, |
| RegisterCompleteRequest, |
| ) |
| from cbh.api.common.db_requests import get_obj_by_id |
| from cbh.core.config import settings |
| from cbh.core.security import verify_password |
| from cbh.core.wrappers import background_task |
|
|
|
|
| async def check_unique_fields_existence( |
| name: str, new_value: EmailStr | str, current_value: str | None = None |
| ) -> None: |
| """ |
| Check if a field value already exists in the database to ensure uniqueness. |
| """ |
| if new_value == current_value or not new_value: |
| return |
| account = await settings.DB_CLIENT.accounts.find_one( |
| {name: {"$regex": f"^{str(new_value)}$", "$options": "i"}} |
| ) |
| if account: |
| detail = f'Account with {name} "{new_value}" already exists.' |
| raise HTTPException(status_code=400, detail=detail) |
|
|
|
|
| async def save_account(data: RegisterAccountRequest) -> AccountModel: |
| """ |
| Create a new user account in the database. |
| """ |
| await check_unique_fields_existence("email", data.email) |
|
|
| opportunity = None |
| status = AccountStatus.ACTIVE |
| if data.accountType == AccountType.COACH: |
| opportunity = CoachOpportunity.GENERAL |
| status = AccountStatus.INACTIVE |
|
|
| account = AccountModel( |
| name=f"{data.firstName} {data.lastName}", |
| email=data.email, |
| phone=data.phone, |
| password=data.password, |
| datetimeUpdated=datetime.now(), |
| accountType=data.accountType, |
| opportunity=opportunity, |
| status=status, |
| ) |
| await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) |
|
|
| if account.accountType == AccountType.COACH: |
| asyncio.create_task(create_availability_obj(account)) |
| return account |
|
|
|
|
| async def create_availability_obj(account: AccountModel) -> AvailabilityModel: |
| """ |
| Create a new availability object. |
| """ |
| availability = AvailabilityModel(coach=AccountShorten(**account.to_mongo())) |
| await settings.DB_CLIENT.availabilities.insert_one(availability.to_mongo()) |
| return availability |
|
|
|
|
| async def authenticate_account(data: LoginAccountRequest) -> AccountModel: |
| """ |
| Authenticate a user account using mail and password. |
| """ |
| account = await settings.DB_CLIENT.accounts.find_one( |
| {"email": {"$regex": f"^{data.email}$", "$options": "i"}} |
| ) |
| if account is None: |
| raise HTTPException(status_code=404, detail="Invalid email or password.") |
|
|
| account = AccountModel.from_mongo(account) |
|
|
| if not verify_password(data.password, account.password): |
| raise HTTPException(status_code=401, detail="Invalid email or password") |
|
|
| return account |
|
|
|
|
| async def get_account_by_email( |
| email: EmailStr, raise_exception: bool = True |
| ) -> AccountModel | None: |
| """ |
| Verify if an account exists. |
| """ |
| account = await settings.DB_CLIENT.accounts.find_one( |
| {"email": {"$regex": f"^{email}$", "$options": "i"}} |
| ) |
| if account is None and raise_exception: |
| raise HTTPException(status_code=404, detail="Account not found") |
| elif account is None: |
| return None |
| return AccountModel.from_mongo(account) |
|
|
|
|
| async def complete_account_registration( |
| accountId: str, |
| request: RegisterCompleteRequest, |
| ) -> AccountModel: |
| """ |
| Complete account registration. |
| """ |
| account = await get_obj_by_id(AccountModel, accountId) |
| if account is None: |
| raise HTTPException(status_code=404, detail="Account not found") |
| elif account.status != AccountStatus.PENDING: |
| raise HTTPException(status_code=400, detail="Account already registered") |
| account.name = f"{request.firstName} {request.lastName}" |
| account.password = request.password |
| account.status = AccountStatus.ACTIVE |
|
|
| account_shorten = AccountShorten( |
| id=account.id, |
| name=account.name, |
| email=account.email, |
| status=account.status, |
| accountType=account.accountType, |
| ) |
| await asyncio.gather( |
| settings.DB_CLIENT.accounts.update_one( |
| {"id": account.id}, {"$set": account.to_mongo()} |
| ), |
| settings.DB_CLIENT.calls.update_one( |
| { |
| "customer.email": account.email, |
| }, |
| {"$set": {"customer": account_shorten.model_dump(mode='json')}}, |
| ), |
| ) |
| return account |
|
|
|
|
| async def reset_password_obj(account: AccountShorten, password: str) -> AccountModel: |
| """ |
| Reset a password object. |
| """ |
| password = CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password) |
| await settings.DB_CLIENT.accounts.update_one( |
| {"id": account.id}, |
| {"$set": {"password": password}}, |
| ) |
| return account |
|
|
|
|
| async def verify_code_obj( |
| code_: str, exception: bool = True, is_delete: bool = True |
| ) -> VerificationCodeModel: |
| """ |
| Verify a code object. |
| """ |
| code = ( |
| await settings.DB_CLIENT.verificationcodes.find( |
| {"id": code_}, |
| ) |
| .sort("_id", -1) |
| .to_list(length=1) |
| ) |
| code = VerificationCodeModel.from_mongo(code[0]) if code else None |
|
|
| error_msg = "Invalid or expired invitation link." |
| if not code and exception: |
| raise HTTPException( |
| status_code=404, |
| detail=error_msg, |
| ) |
| if ( |
| code.expiresAt < datetime.now() |
| and exception |
| ): |
| raise HTTPException(status_code=410, detail=error_msg) |
|
|
| if is_delete: |
| asyncio.create_task(delete_code(code)) |
| return code |
|
|
|
|
| async def create_code_obj( |
| account: AccountShorten, time_delta: timedelta |
| ) -> VerificationCodeModel: |
| """ |
| Create a code object. |
| """ |
| prev_code = ( |
| await settings.DB_CLIENT.verificationcodes.find( |
| { |
| "account.id": account.id, |
| } |
| ) |
| .sort("_id", -1) |
| .to_list(length=1) |
| ) |
| prev_code = VerificationCodeModel.from_mongo(prev_code[0]) if prev_code else None |
|
|
| if prev_code and prev_code.datetimeInserted > datetime.now() - timedelta(minutes=1): |
| raise HTTPException(status_code=429, detail="Too many requests") |
|
|
| code = VerificationCodeModel( |
| account=account, |
| expiresAt=datetime.now() + time_delta, |
| ) |
| await settings.DB_CLIENT.verificationcodes.insert_one(code.to_mongo()) |
| return code |
|
|
|
|
| @background_task() |
| async def delete_code(code: VerificationCodeModel | None): |
| """ |
| Delete a code object. |
| """ |
| if code: |
| await settings.DB_CLIENT.verificationcodes.delete_one({"id": code.id}) |
|
|
|
|