spark / cbh /api /security /db_requests.py
brestok's picture
fix
9039c7f
"""
Database requests module for security functionality.
"""
import asyncio
from datetime import datetime, timedelta
from fastapi import HTTPException
from passlib.context import CryptContext
from pydantic import EmailStr
from cbh.api.account.dto import AccountStatus, AccountType, CoachOpportunity
from cbh.api.account.models import AccountModel, AccountShorten
from cbh.api.availability.models import AvailabilityModel
from cbh.api.security.models import VerificationCodeModel
from cbh.api.security.schemas import (
LoginAccountRequest,
RegisterAccountRequest,
RegisterCompleteRequest,
)
from cbh.api.common.db_requests import get_obj_by_id
from cbh.core.config import settings
from cbh.core.security import verify_password
from cbh.core.wrappers import background_task
async def check_unique_fields_existence(
name: str, new_value: EmailStr | str, current_value: str | None = None
) -> None:
"""
Check if a field value already exists in the database to ensure uniqueness.
"""
if new_value == current_value or not new_value:
return
account = await settings.DB_CLIENT.accounts.find_one(
{name: {"$regex": f"^{str(new_value)}$", "$options": "i"}}
)
if account:
detail = f'Account with {name} "{new_value}" already exists.'
raise HTTPException(status_code=400, detail=detail)
async def save_account(data: RegisterAccountRequest) -> AccountModel:
"""
Create a new user account in the database.
"""
await check_unique_fields_existence("email", data.email)
opportunity = None
status = AccountStatus.ACTIVE
if data.accountType == AccountType.COACH:
opportunity = CoachOpportunity.GENERAL
status = AccountStatus.INACTIVE
account = AccountModel(
name=f"{data.firstName} {data.lastName}",
email=data.email,
phone=data.phone,
password=data.password,
datetimeUpdated=datetime.now(),
accountType=data.accountType,
opportunity=opportunity,
status=status,
)
await settings.DB_CLIENT.accounts.insert_one(account.to_mongo())
if account.accountType == AccountType.COACH:
asyncio.create_task(create_availability_obj(account))
return account
async def create_availability_obj(account: AccountModel) -> AvailabilityModel:
"""
Create a new availability object.
"""
availability = AvailabilityModel(coach=AccountShorten(**account.to_mongo()))
await settings.DB_CLIENT.availabilities.insert_one(availability.to_mongo())
return availability
async def authenticate_account(data: LoginAccountRequest) -> AccountModel:
"""
Authenticate a user account using mail and password.
"""
account = await settings.DB_CLIENT.accounts.find_one(
{"email": {"$regex": f"^{data.email}$", "$options": "i"}}
)
if account is None:
raise HTTPException(status_code=404, detail="Invalid email or password.")
account = AccountModel.from_mongo(account)
if not verify_password(data.password, account.password):
raise HTTPException(status_code=401, detail="Invalid email or password")
return account
async def get_account_by_email(
email: EmailStr, raise_exception: bool = True
) -> AccountModel | None:
"""
Verify if an account exists.
"""
account = await settings.DB_CLIENT.accounts.find_one(
{"email": {"$regex": f"^{email}$", "$options": "i"}}
)
if account is None and raise_exception:
raise HTTPException(status_code=404, detail="Account not found")
elif account is None:
return None
return AccountModel.from_mongo(account)
async def complete_account_registration(
accountId: str,
request: RegisterCompleteRequest,
) -> AccountModel:
"""
Complete account registration.
"""
account = await get_obj_by_id(AccountModel, accountId)
if account is None:
raise HTTPException(status_code=404, detail="Account not found")
elif account.status != AccountStatus.PENDING:
raise HTTPException(status_code=400, detail="Account already registered")
account.name = f"{request.firstName} {request.lastName}"
account.password = request.password
account.status = AccountStatus.ACTIVE
account_shorten = AccountShorten(
id=account.id,
name=account.name,
email=account.email,
status=account.status,
accountType=account.accountType,
)
await asyncio.gather(
settings.DB_CLIENT.accounts.update_one(
{"id": account.id}, {"$set": account.to_mongo()}
),
settings.DB_CLIENT.calls.update_one(
{
"customer.email": account.email,
},
{"$set": {"customer": account_shorten.model_dump(mode='json')}},
),
)
return account
async def reset_password_obj(account: AccountShorten, password: str) -> AccountModel:
"""
Reset a password object.
"""
password = CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password)
await settings.DB_CLIENT.accounts.update_one(
{"id": account.id},
{"$set": {"password": password}},
)
return account
async def verify_code_obj(
code_: str, exception: bool = True, is_delete: bool = True
) -> VerificationCodeModel:
"""
Verify a code object.
"""
code = (
await settings.DB_CLIENT.verificationcodes.find(
{"id": code_},
)
.sort("_id", -1)
.to_list(length=1)
)
code = VerificationCodeModel.from_mongo(code[0]) if code else None
error_msg = "Invalid or expired invitation link."
if not code and exception:
raise HTTPException(
status_code=404,
detail=error_msg,
)
if (
code.expiresAt < datetime.now()
and exception
):
raise HTTPException(status_code=410, detail=error_msg)
if is_delete:
asyncio.create_task(delete_code(code))
return code
async def create_code_obj(
account: AccountShorten, time_delta: timedelta
) -> VerificationCodeModel:
"""
Create a code object.
"""
prev_code = (
await settings.DB_CLIENT.verificationcodes.find(
{
"account.id": account.id,
}
)
.sort("_id", -1)
.to_list(length=1)
)
prev_code = VerificationCodeModel.from_mongo(prev_code[0]) if prev_code else None
if prev_code and prev_code.datetimeInserted > datetime.now() - timedelta(minutes=1):
raise HTTPException(status_code=429, detail="Too many requests")
code = VerificationCodeModel(
account=account,
expiresAt=datetime.now() + time_delta,
)
await settings.DB_CLIENT.verificationcodes.insert_one(code.to_mongo())
return code
@background_task()
async def delete_code(code: VerificationCodeModel | None):
"""
Delete a code object.
"""
if code:
await settings.DB_CLIENT.verificationcodes.delete_one({"id": code.id})