spark / cbh /api /calls /utils.py
brestok's picture
fix
fa2fd41
import asyncio
from datetime import datetime, timedelta, timezone
from fastapi import HTTPException
from jinja2 import Environment, FileSystemLoader, select_autoescape
from cbh.api.calls.models import CallModel
from cbh.api.calls.dto import CallStatus
from cbh.core.config import settings
def can_edit_call(call: CallModel) -> None:
"""
Check if the call can be edited.
"""
now = datetime.now(timezone.utc) if call.event.startDate.tzinfo else datetime.now()
min_edit_time = now + timedelta(hours=48)
if call.status != CallStatus.SCHEDULED or call.event.startDate < min_edit_time:
raise HTTPException(status_code=400, detail="Call cannot be edited.")
def calculate_sleep_time(call: CallModel) -> int:
"""
Calculate the sleep time for the reminder.
"""
now = datetime.now(timezone.utc) if call.event.startDate.tzinfo else datetime.now()
reminder_time = call.event.startDate - timedelta(hours=1)
sleep_time = (reminder_time - now).total_seconds()
return max(0, int(sleep_time))
async def send_customer_booking_email(call: CallModel) -> None:
"""
Send a booking email.
"""
if not call.event.isActive:
return
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("bookingConfirmation.html")
template_content = template.render(
coach_name=call.coach.name,
user_name=call.customer.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
join_link=f"{settings.Audience}/calls/{call.id}",
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
)
await settings.EMAIL_CLIENT.send_email(
call.customer.email,
f"You're all set! Session with {call.coach.name} confirmed!",
template_content,
)
async def send_coach_booking_email(call: CallModel) -> None:
"""
Send a booking email.
"""
if not call.event.isActive:
return
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("bookingConfirmationCoach.html")
template_content = template.render(
coach_name=call.coach.name,
user_name=call.customer.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
join_link=f"{settings.Audience}/calls/{call.id}",
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
)
await settings.EMAIL_CLIENT.send_email(
call.coach.email,
f"New session booked with {call.customer.name}!",
template_content,
)
async def send_cancel_customer_email(call: CallModel) -> None:
"""
Send a cancellation email.
"""
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("sessionCancellation.html")
template_content = template.render(
coach_name=call.coach.name,
user_name=call.customer.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
)
await settings.EMAIL_CLIENT.send_email(
call.customer.email,
f"Your session with {call.coach.name} has been cancelled.",
template_content,
)
async def send_cancel_coach_email(call: CallModel) -> None:
"""
Send a cancellation email.
"""
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("sessionCancellationCoach.html")
template_content = template.render(
coach_name=call.coach.name,
user_name=call.customer.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
)
await settings.EMAIL_CLIENT.send_email(
call.coach.email,
f"{call.customer.name} cancelled their session.",
template_content,
)
async def send_session_reminder_email(call: CallModel) -> None:
"""
Send a session reminder email.
"""
if not call.event.isActive:
return
sleep_time = calculate_sleep_time(call)
await asyncio.sleep(sleep_time)
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("sessionReminder.html")
template_content = template.render(
coach_name=call.coach.name,
user_name=call.customer.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
join_link=f"{settings.Audience}/calls/{call.id}",
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
)
await settings.EMAIL_CLIENT.send_email(
call.customer.email,
f"Your session with {call.coach.name} is starting in 1 hour.",
template_content,
)
async def send_session_reminder_coach_email(call: CallModel) -> None:
"""
Send a session reminder email.
"""
if not call.event.isActive:
return
sleep_time = calculate_sleep_time(call)
await asyncio.sleep(sleep_time)
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("sessionReminderCoach.html")
template_content = template.render(
coach_name=call.coach.name,
user_name=call.customer.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
join_link=f"{settings.Audience}/calls/{call.id}",
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
)
await settings.EMAIL_CLIENT.send_email(
call.coach.email,
f"Your session with {call.customer.name} is starting in 1 hour.",
template_content,
)
async def send_anonymous_booking_email(call: CallModel) -> None:
"""
Send a anonymous booking email.
"""
templates_path = settings.BASE_DIR / "cbh" / "templates" / "emails"
env = Environment(
loader=FileSystemLoader(templates_path),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template("anonymousBookingConfirmation.html")
template_content = template.render(
coach_name=call.coach.name,
start_date=call.event.startDate.strftime("%d %B %Y, %H:%M"),
join_link=f"{settings.Audience}/calls/{call.id}",
duration=(call.event.endDate - call.event.startDate).total_seconds() // 60,
registration_link=f"{settings.Audience}/signup/complete?accountId={call.customer.id}",
)
await settings.EMAIL_CLIENT.send_email(
call.customer.email,
f"You're all set! Session with {call.coach.name} confirmed!",
template_content,
)