""" Database requests module for security functionality. """ import asyncio from datetime import datetime, timedelta from fastapi import HTTPException from passlib.context import CryptContext from pydantic import EmailStr from pymongo import ReturnDocument from cbh.api.account.dto import AccountType, RegistrationType from cbh.api.account.models import AccountModel, AccountShorten from cbh.api.security.dto import VerificationCodeStatus, VerificationCodeType from cbh.api.security.models import VerificationCodeModel from cbh.api.security.schemas import ( LoginAccountRequest, RegisterAccountRequest, ) from cbh.core.config import settings from cbh.core.security import verify_password from cbh.core.wrappers import background_task async def check_unique_email(email: EmailStr | str) -> AccountModel | None: """ Check if a field value already exists in the database to ensure uniqueness. """ account = await settings.DB_CLIENT.accounts.find_one( {"email": {"$regex": f"^{str(email)}$", "$options": "i"}} ) account = AccountModel.from_mongo(account) if account else None return account async def authenticate_account(data: LoginAccountRequest) -> AccountModel: """ Authenticate a user account using mail and password. """ account = await settings.DB_CLIENT.accounts.find_one( {"email": {"$regex": f"^{data.email}$", "$options": "i"}} ) if account is None: raise HTTPException(status_code=404, detail="Invalid email or password.") account = AccountModel.from_mongo(account) if account.registrationType != RegistrationType.ORGANIC: raise HTTPException(status_code=422, detail="Please sign in with social providers.") if not verify_password(data.password, account.password): raise HTTPException(status_code=400, detail="Invalid email or password.") return account async def get_account_by_email(email: str) -> AccountModel | None: """ Verify if an account exists. """ account = await settings.DB_CLIENT.accounts.find_one( {"email": {"$regex": f"^{email}$", "$options": "i"}} ) return AccountModel.from_mongo(account) if account else None async def create_code_obj( account: AccountShorten, type_: VerificationCodeType, time_delta: timedelta ) -> VerificationCodeModel: """ Create a code object. """ prev_code = ( await settings.DB_CLIENT.verificationcodes.find( { "account.id": account.id, "type": type_.value, } ) .sort("_id", -1) .to_list(length=1) ) prev_code = VerificationCodeModel.from_mongo(prev_code[0]) if prev_code else None if prev_code and prev_code.datetimeInserted > datetime.now() - timedelta(minutes=1): raise HTTPException(status_code=429, detail="Too many requests") code = VerificationCodeModel( account=account, type=type_, expiresAt=datetime.now() + time_delta, ) await settings.DB_CLIENT.verificationcodes.insert_one(code.to_mongo()) return code @background_task() async def set_used_code(code: VerificationCodeModel | None): """ Set a code object as used. """ if code: await settings.DB_CLIENT.verificationcodes.update_one( {"id": code.id}, {"$set": {"status": VerificationCodeStatus.USED.value}} ) async def verify_code_obj( code_: str, types: list[VerificationCodeType], exception: bool = True, set_used: bool = True ) -> VerificationCodeModel: """ Verify a code object. """ code = ( await settings.DB_CLIENT.verificationcodes.find( {"id": code_, "type": {"$in": [t.value for t in types]}}, ) .sort("_id", -1) .to_list(length=1) ) code = VerificationCodeModel.from_mongo(code[0]) if code else None if not code and exception: error_msg = "Invalid invitation link. Please ask your manager to resend the invite." if VerificationCodeType.PASSWORD_RESET in types: error_msg = "Invalid password reset link. Please request a new one." raise HTTPException( status_code=404, detail=error_msg, ) if code and code.status == VerificationCodeStatus.USED and exception: error_map = { VerificationCodeType.ORG_INVITATION: "You already created an account. Please sign in.", VerificationCodeType.TEAM_INVITATION: "You already accepted this invitation. Please sign in.", VerificationCodeType.PASSWORD_RESET: "You already used this reset link. Please request a new one.", VerificationCodeType.ORG_CREATION: "You already created an organization. Please sign in.", } raise HTTPException(status_code=400, detail=error_map[code.type]) if code and code.expiresAt < datetime.now() and exception: error_msg = "Expired invitation link. Please ask your manager to resend the invite." if VerificationCodeType.PASSWORD_RESET in types: error_msg = "Expired password reset link. Please request a new one." raise HTTPException(status_code=410, detail=error_msg) if code and set_used: asyncio.create_task(set_used_code(code)) return code async def reset_password_obj(account: AccountShorten, password: str) -> AccountShorten: """ Reset a password object. """ password = CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password) await settings.DB_CLIENT.accounts.update_one( {"id": account.id}, {"$set": {"password": password}}, ) return account async def create_google_account(user_info: dict) -> AccountModel: account = AccountModel( email=user_info["email"], name=user_info.get("name"), accountType=AccountType.USER, registrationType=RegistrationType.GOOGLE, ) await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) return account async def create_account(data: RegisterAccountRequest) -> AccountModel: account = AccountModel( email=data.email, password=data.password, name=data.name, accountType=AccountType.USER, registrationType=RegistrationType.ORGANIC, ) await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) return account