| """ |
| Database requests module for security functionality. |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timedelta |
|
|
| from fastapi import HTTPException |
| from passlib.context import CryptContext |
| from pydantic import EmailStr |
| from pymongo import ReturnDocument |
|
|
| from cbh.api.account.dto import AccountType, RegistrationType |
| from cbh.api.account.models import AccountModel, AccountShorten |
| from cbh.api.security.dto import VerificationCodeStatus, VerificationCodeType |
| from cbh.api.security.models import VerificationCodeModel |
| from cbh.api.security.schemas import ( |
| LoginAccountRequest, |
| RegisterAccountRequest, |
| ) |
| from cbh.core.config import settings |
| from cbh.core.security import verify_password |
| from cbh.core.wrappers import background_task |
|
|
|
|
| async def check_unique_email(email: EmailStr | str) -> AccountModel | None: |
| """ |
| Check if a field value already exists in the database to ensure uniqueness. |
| """ |
| account = await settings.DB_CLIENT.accounts.find_one( |
| {"email": {"$regex": f"^{str(email)}$", "$options": "i"}} |
| ) |
| account = AccountModel.from_mongo(account) if account else None |
|
|
| return account |
|
|
|
|
| async def authenticate_account(data: LoginAccountRequest) -> AccountModel: |
| """ |
| Authenticate a user account using mail and password. |
| """ |
| account = await settings.DB_CLIENT.accounts.find_one( |
| {"email": {"$regex": f"^{data.email}$", "$options": "i"}} |
| ) |
| if account is None: |
| raise HTTPException(status_code=404, detail="Invalid email or password.") |
|
|
| account = AccountModel.from_mongo(account) |
| if account.registrationType != RegistrationType.ORGANIC: |
| raise HTTPException(status_code=422, detail="Please sign in with social providers.") |
|
|
| if not verify_password(data.password, account.password): |
| raise HTTPException(status_code=400, detail="Invalid email or password.") |
|
|
| return account |
|
|
|
|
| async def get_account_by_email(email: str) -> AccountModel | None: |
| """ |
| Verify if an account exists. |
| """ |
| account = await settings.DB_CLIENT.accounts.find_one( |
| {"email": {"$regex": f"^{email}$", "$options": "i"}} |
| ) |
| return AccountModel.from_mongo(account) if account else None |
|
|
|
|
| async def create_code_obj( |
| account: AccountShorten, type_: VerificationCodeType, time_delta: timedelta |
| ) -> VerificationCodeModel: |
| """ |
| Create a code object. |
| """ |
| prev_code = ( |
| await settings.DB_CLIENT.verificationcodes.find( |
| { |
| "account.id": account.id, |
| "type": type_.value, |
| } |
| ) |
| .sort("_id", -1) |
| .to_list(length=1) |
| ) |
| prev_code = VerificationCodeModel.from_mongo(prev_code[0]) if prev_code else None |
|
|
| if prev_code and prev_code.datetimeInserted > datetime.now() - timedelta(minutes=1): |
| raise HTTPException(status_code=429, detail="Too many requests") |
|
|
| code = VerificationCodeModel( |
| account=account, |
| type=type_, |
| expiresAt=datetime.now() + time_delta, |
| ) |
| await settings.DB_CLIENT.verificationcodes.insert_one(code.to_mongo()) |
| return code |
|
|
|
|
| @background_task() |
| async def set_used_code(code: VerificationCodeModel | None): |
| """ |
| Set a code object as used. |
| """ |
| if code: |
| await settings.DB_CLIENT.verificationcodes.update_one( |
| {"id": code.id}, {"$set": {"status": VerificationCodeStatus.USED.value}} |
| ) |
|
|
|
|
| async def verify_code_obj( |
| code_: str, types: list[VerificationCodeType], exception: bool = True, set_used: bool = True |
| ) -> VerificationCodeModel: |
| """ |
| Verify a code object. |
| """ |
| code = ( |
| await settings.DB_CLIENT.verificationcodes.find( |
| {"id": code_, "type": {"$in": [t.value for t in types]}}, |
| ) |
| .sort("_id", -1) |
| .to_list(length=1) |
| ) |
| code = VerificationCodeModel.from_mongo(code[0]) if code else None |
|
|
| if not code and exception: |
| error_msg = "Invalid invitation link. Please ask your manager to resend the invite." |
| if VerificationCodeType.PASSWORD_RESET in types: |
| error_msg = "Invalid password reset link. Please request a new one." |
| raise HTTPException( |
| status_code=404, |
| detail=error_msg, |
| ) |
|
|
| if code and code.status == VerificationCodeStatus.USED and exception: |
| error_map = { |
| VerificationCodeType.ORG_INVITATION: "You already created an account. Please sign in.", |
| VerificationCodeType.TEAM_INVITATION: "You already accepted this invitation. Please sign in.", |
| VerificationCodeType.PASSWORD_RESET: "You already used this reset link. Please request a new one.", |
| VerificationCodeType.ORG_CREATION: "You already created an organization. Please sign in.", |
| } |
| raise HTTPException(status_code=400, detail=error_map[code.type]) |
|
|
| if code and code.expiresAt < datetime.now() and exception: |
| error_msg = "Expired invitation link. Please ask your manager to resend the invite." |
| if VerificationCodeType.PASSWORD_RESET in types: |
| error_msg = "Expired password reset link. Please request a new one." |
| raise HTTPException(status_code=410, detail=error_msg) |
|
|
| if code and set_used: |
| asyncio.create_task(set_used_code(code)) |
| return code |
|
|
|
|
| async def reset_password_obj(account: AccountShorten, password: str) -> AccountShorten: |
| """ |
| Reset a password object. |
| """ |
| password = CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password) |
| await settings.DB_CLIENT.accounts.update_one( |
| {"id": account.id}, |
| {"$set": {"password": password}}, |
| ) |
| return account |
|
|
|
|
|
|
| async def create_google_account(user_info: dict) -> AccountModel: |
| account = AccountModel( |
| email=user_info["email"], |
| name=user_info.get("name"), |
| accountType=AccountType.USER, |
| registrationType=RegistrationType.GOOGLE, |
| ) |
| await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) |
| return account |
|
|
|
|
| async def create_account(data: RegisterAccountRequest) -> AccountModel: |
| account = AccountModel( |
| email=data.email, |
| password=data.password, |
| name=data.name, |
| accountType=AccountType.USER, |
| registrationType=RegistrationType.ORGANIC, |
| ) |
| await settings.DB_CLIENT.accounts.insert_one(account.to_mongo()) |
| return account |