staffily / cbh /api /security /db_requests.py
brestok's picture
init
fa152ae
"""
Database requests module for security functionality.
"""
import asyncio
from datetime import datetime, timedelta
from fastapi import HTTPException
from passlib.context import CryptContext
from pydantic import EmailStr
from pymongo import ReturnDocument
from cbh.api.account.dto import AccountType, RegistrationType
from cbh.api.account.models import AccountModel, AccountShorten
from cbh.api.security.dto import VerificationCodeStatus, VerificationCodeType
from cbh.api.security.models import VerificationCodeModel
from cbh.api.security.schemas import (
LoginAccountRequest,
RegisterAccountRequest,
)
from cbh.core.config import settings
from cbh.core.security import verify_password
from cbh.core.wrappers import background_task
async def check_unique_email(email: EmailStr | str) -> AccountModel | None:
"""
Check if a field value already exists in the database to ensure uniqueness.
"""
account = await settings.DB_CLIENT.accounts.find_one(
{"email": {"$regex": f"^{str(email)}$", "$options": "i"}}
)
account = AccountModel.from_mongo(account) if account else None
return account
async def authenticate_account(data: LoginAccountRequest) -> AccountModel:
"""
Authenticate a user account using mail and password.
"""
account = await settings.DB_CLIENT.accounts.find_one(
{"email": {"$regex": f"^{data.email}$", "$options": "i"}}
)
if account is None:
raise HTTPException(status_code=404, detail="Invalid email or password.")
account = AccountModel.from_mongo(account)
if account.registrationType != RegistrationType.ORGANIC:
raise HTTPException(status_code=422, detail="Please sign in with social providers.")
if not verify_password(data.password, account.password):
raise HTTPException(status_code=400, detail="Invalid email or password.")
return account
async def get_account_by_email(email: str) -> AccountModel | None:
"""
Verify if an account exists.
"""
account = await settings.DB_CLIENT.accounts.find_one(
{"email": {"$regex": f"^{email}$", "$options": "i"}}
)
return AccountModel.from_mongo(account) if account else None
async def create_code_obj(
account: AccountShorten, type_: VerificationCodeType, time_delta: timedelta
) -> VerificationCodeModel:
"""
Create a code object.
"""
prev_code = (
await settings.DB_CLIENT.verificationcodes.find(
{
"account.id": account.id,
"type": type_.value,
}
)
.sort("_id", -1)
.to_list(length=1)
)
prev_code = VerificationCodeModel.from_mongo(prev_code[0]) if prev_code else None
if prev_code and prev_code.datetimeInserted > datetime.now() - timedelta(minutes=1):
raise HTTPException(status_code=429, detail="Too many requests")
code = VerificationCodeModel(
account=account,
type=type_,
expiresAt=datetime.now() + time_delta,
)
await settings.DB_CLIENT.verificationcodes.insert_one(code.to_mongo())
return code
@background_task()
async def set_used_code(code: VerificationCodeModel | None):
"""
Set a code object as used.
"""
if code:
await settings.DB_CLIENT.verificationcodes.update_one(
{"id": code.id}, {"$set": {"status": VerificationCodeStatus.USED.value}}
)
async def verify_code_obj(
code_: str, types: list[VerificationCodeType], exception: bool = True, set_used: bool = True
) -> VerificationCodeModel:
"""
Verify a code object.
"""
code = (
await settings.DB_CLIENT.verificationcodes.find(
{"id": code_, "type": {"$in": [t.value for t in types]}},
)
.sort("_id", -1)
.to_list(length=1)
)
code = VerificationCodeModel.from_mongo(code[0]) if code else None
if not code and exception:
error_msg = "Invalid invitation link. Please ask your manager to resend the invite."
if VerificationCodeType.PASSWORD_RESET in types:
error_msg = "Invalid password reset link. Please request a new one."
raise HTTPException(
status_code=404,
detail=error_msg,
)
if code and code.status == VerificationCodeStatus.USED and exception:
error_map = {
VerificationCodeType.ORG_INVITATION: "You already created an account. Please sign in.",
VerificationCodeType.TEAM_INVITATION: "You already accepted this invitation. Please sign in.",
VerificationCodeType.PASSWORD_RESET: "You already used this reset link. Please request a new one.",
VerificationCodeType.ORG_CREATION: "You already created an organization. Please sign in.",
}
raise HTTPException(status_code=400, detail=error_map[code.type])
if code and code.expiresAt < datetime.now() and exception:
error_msg = "Expired invitation link. Please ask your manager to resend the invite."
if VerificationCodeType.PASSWORD_RESET in types:
error_msg = "Expired password reset link. Please request a new one."
raise HTTPException(status_code=410, detail=error_msg)
if code and set_used:
asyncio.create_task(set_used_code(code))
return code
async def reset_password_obj(account: AccountShorten, password: str) -> AccountShorten:
"""
Reset a password object.
"""
password = CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password)
await settings.DB_CLIENT.accounts.update_one(
{"id": account.id},
{"$set": {"password": password}},
)
return account
async def create_google_account(user_info: dict) -> AccountModel:
account = AccountModel(
email=user_info["email"],
name=user_info.get("name"),
accountType=AccountType.USER,
registrationType=RegistrationType.GOOGLE,
)
await settings.DB_CLIENT.accounts.insert_one(account.to_mongo())
return account
async def create_account(data: RegisterAccountRequest) -> AccountModel:
account = AccountModel(
email=data.email,
password=data.password,
name=data.name,
accountType=AccountType.USER,
registrationType=RegistrationType.ORGANIC,
)
await settings.DB_CLIENT.accounts.insert_one(account.to_mongo())
return account