| import asyncio |
| from datetime import datetime |
|
|
| from fastapi import Depends, Query, Request |
|
|
| from cbh.api.account.dto import AccountStatus, AccountType |
| from cbh.api.account.models import AccountModel, AccountShorten |
| from cbh.api.calls import calls_router |
| from cbh.api.calls.db_requests import ( |
| calculate_call_availabilities, |
| cancel_call_obj, |
| check_call_payment, |
| create_anonymous_account_obj, |
| create_call_obj, |
| filter_calls_objs, |
| ) |
| from cbh.api.calls.models import CallModel |
| from cbh.api.calls.schemas import ( |
| CallAvailabilityResponse, |
| CallsFilter, |
| DailyInitializeResponse, |
| PurchaseAnonymousCallRequest, |
| PurchaseCallRequest, |
| StripeCheckResponse, |
| StripeSessionResponse, |
| ) |
| from cbh.api.calls.services import ( |
| create_stripe_session, |
| manage_stripe_event, |
| refund_stripe_payment, |
| verify_stripe_webhook, |
| verify_discount_code, |
| ) |
| from cbh.api.calls.services.daily import create_daily_token |
| from cbh.api.calls.utils import ( |
| can_edit_call, |
| send_anonymous_booking_email, |
| send_coach_booking_email, |
| send_customer_booking_email, |
| send_cancel_customer_email, |
| send_cancel_coach_email, |
| send_session_reminder_email, |
| send_session_reminder_coach_email, |
| ) |
| from cbh.api.common.db_requests import get_obj_by_id |
| from cbh.api.common.dto import Paging |
| from cbh.api.common.schemas import AllObjectsResponse, FilterRequest |
| from cbh.core.security import PermissionDependency |
| from cbh.core.wrappers import CbhResponseWrapper |
|
|
|
|
| @calls_router.post("/filter") |
| async def filter_calls( |
| request: FilterRequest[CallsFilter], |
| account: AccountModel = Depends(PermissionDependency()), |
| ) -> CbhResponseWrapper[AllObjectsResponse[CallModel]]: |
| """ |
| Filter calls. |
| """ |
| calls, total_count = await filter_calls_objs(request, account) |
| return CbhResponseWrapper( |
| data=AllObjectsResponse( |
| data=calls, |
| paging=Paging( |
| pageSize=request.pageSize, |
| pageIndex=request.pageIndex, |
| totalCount=total_count, |
| ), |
| ) |
| ) |
|
|
|
|
| @calls_router.get("/availabilities") |
| async def get_availabilities( |
| startDate: datetime = Query(...), |
| endDate: datetime = Query(...), |
| discountCode: str | None = Query(None), |
| _: AccountModel = Depends(PermissionDependency([AccountType.USER], required=False)), |
| ) -> CbhResponseWrapper[CallAvailabilityResponse]: |
| """ |
| Get availabilities. |
| """ |
| promotion_code = await verify_discount_code(discountCode) |
| response = await calculate_call_availabilities(startDate, endDate, promotion_code) |
| return CbhResponseWrapper(data=response) |
|
|
|
|
| @calls_router.get("/{callId}") |
| async def get_call( |
| call_id: str, |
| _: AccountModel = Depends(PermissionDependency()), |
| ) -> CbhResponseWrapper[CallModel]: |
| """ |
| Get call. |
| """ |
| call = await get_obj_by_id(CallModel, call_id) |
| return CbhResponseWrapper(data=call) |
|
|
|
|
| @calls_router.post("/{callId}/cancel") |
| async def cancel_call( |
| callId: str, |
| account: AccountModel = Depends(PermissionDependency([AccountType.USER])), |
| ) -> CbhResponseWrapper[CallModel]: |
| """ |
| Cancel call and refund payment. |
| """ |
| call = await get_obj_by_id( |
| CallModel, callId, additional_filter={"customer.id": account.id} |
| ) |
| can_edit_call(call) |
| if call.paymentIntentId: |
| await refund_stripe_payment(call.paymentIntentId) |
|
|
| call = await cancel_call_obj(call) |
| asyncio.create_task(send_cancel_customer_email(call)) |
| asyncio.create_task(send_cancel_coach_email(call)) |
| return CbhResponseWrapper(data=call) |
|
|
|
|
| @calls_router.post("/purchase") |
| async def purchase_call( |
| request: PurchaseCallRequest, |
| account: AccountModel = Depends(PermissionDependency([AccountType.USER])), |
| ) -> CbhResponseWrapper[StripeSessionResponse]: |
| """ |
| Purchase a call. |
| """ |
| coach, promotion_code = await asyncio.gather( |
| get_obj_by_id( |
| AccountShorten, request.coachId, projection=AccountShorten.to_mongo_fields() |
| ), |
| verify_discount_code(request.discountCode), |
| ) |
| call = await create_call_obj(request, coach, account) |
| session_url = await create_stripe_session(call, promotion_code) |
| return CbhResponseWrapper(data=StripeSessionResponse(sessionUrl=session_url)) |
|
|
|
|
| @calls_router.post("/purchase/anonymous") |
| async def purchase_call_anonymous( |
| request: PurchaseAnonymousCallRequest, |
| ) -> CbhResponseWrapper[StripeSessionResponse]: |
| """ |
| Purchase a call. |
| """ |
| coach, promotion_code, account = await asyncio.gather( |
| get_obj_by_id( |
| AccountShorten, request.coachId, projection=AccountShorten.to_mongo_fields() |
| ), |
| verify_discount_code(request.discountCode), |
| create_anonymous_account_obj(request.email), |
| ) |
| call = await create_call_obj(request, coach, account) |
| session_url = await create_stripe_session(call, promotion_code, is_anonymous=True) |
| return CbhResponseWrapper(data=StripeSessionResponse(sessionUrl=session_url)) |
|
|
|
|
| @calls_router.post("/stripe/callback") |
| async def stripe_callback(request: Request) -> CbhResponseWrapper: |
| """ |
| Stripe callback. |
| """ |
| payload = await request.body() |
| event = verify_stripe_webhook(request, payload) |
| call = await manage_stripe_event(event) |
|
|
| if call.customer.status == AccountStatus.PENDING: |
| asyncio.create_task(send_anonymous_booking_email(call)) |
| else: |
| asyncio.create_task(send_customer_booking_email(call)) |
|
|
| asyncio.create_task(send_coach_booking_email(call)) |
| asyncio.create_task(send_session_reminder_email(call)) |
| asyncio.create_task(send_session_reminder_coach_email(call)) |
| return CbhResponseWrapper(data=None) |
|
|
|
|
| @calls_router.get("/stripe/check/{callId}") |
| async def check_stripe_payment( |
| callId: str, |
| _: AccountModel = Depends(PermissionDependency(required=False)), |
| ) -> CbhResponseWrapper[StripeCheckResponse]: |
| """ |
| Check stripe payment. |
| """ |
| response = await check_call_payment(callId) |
| return CbhResponseWrapper(data=response) |
|
|
|
|
| @calls_router.get("/daily/{callId}/initialize") |
| async def initialize_daily( |
| callId: str, |
| account: AccountModel = Depends(PermissionDependency()), |
| ) -> CbhResponseWrapper[DailyInitializeResponse]: |
| """ |
| Fetch daily token. |
| """ |
| call = await get_obj_by_id( |
| CallModel, |
| callId, |
| additional_filter={ |
| "$or": [{"customer.id": account.id}, {"coach.id": account.id}] |
| }, |
| ) |
| token = await create_daily_token(call, account.id) |
| return CbhResponseWrapper( |
| data=DailyInitializeResponse(token=token, roomUrl=call.roomUrl) |
| ) |
|
|