Admin-Desk2 / app /utils /email.py
Fred808's picture
Upload 86 files
b70ff07 verified
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from pydantic import EmailStr
from typing import List, Dict, Any
from ..core.config import settings
from pathlib import Path
import aiofiles
import jinja2
class EmailService:
def __init__(self):
self.conf = ConnectionConfig(
MAIL_USERNAME=settings.MAIL_USERNAME,
MAIL_PASSWORD=settings.MAIL_PASSWORD,
MAIL_FROM=settings.MAIL_FROM,
MAIL_PORT=settings.MAIL_PORT,
MAIL_SERVER=settings.MAIL_SERVER,
MAIL_TLS=True,
MAIL_SSL=False,
TEMPLATE_FOLDER=Path(__file__).parent.parent / 'templates' / 'email'
)
self.fast_mail = FastMail(self.conf)
# Create templates directory if it doesn't exist
template_dir = Path(__file__).parent.parent / 'templates' / 'email'
template_dir.mkdir(parents=True, exist_ok=True)
async def send_email(
self,
email_to: List[EmailStr],
subject: str,
template_name: str,
template_data: Dict[str, Any]
):
"""Send an email using a template"""
try:
message = MessageSchema(
subject=subject,
recipients=email_to,
template_body=template_data,
subtype="html"
)
await self.fast_mail.send_message(
message,
template_name=template_name
)
return True
except Exception as e:
print(f"Failed to send email: {str(e)}")
return False
async def send_order_confirmation(self, email: EmailStr, order_data: Dict[str, Any]):
"""Send order confirmation email"""
return await self.send_email(
email_to=[email],
subject="Order Confirmation",
template_name="order_confirmation.html",
template_data={
"order_id": str(order_data["_id"]),
"total_amount": order_data["total_amount"],
"products": order_data["products"],
"status": order_data["status"]
}
)
async def send_password_reset(self, email: EmailStr, reset_token: str):
"""Send password reset email"""
return await self.send_email(
email_to=[email],
subject="Password Reset Request",
template_name="password_reset.html",
template_data={
"reset_token": reset_token,
"reset_url": f"{settings.FRONTEND_URL}/reset-password?token={reset_token}"
}
)
async def send_welcome_email(self, email: EmailStr, username: str):
"""Send welcome email to new users"""
return await self.send_email(
email_to=[email],
subject="Welcome to Admin Dashboard",
template_name="welcome.html",
template_data={
"username": username,
"login_url": f"{settings.FRONTEND_URL}/login"
}
)
async def send_low_stock_alert(self, email: EmailStr, product_data: Dict[str, Any]):
"""Send low stock alert to admins"""
return await self.send_email(
email_to=[email],
subject="Low Stock Alert",
template_name="low_stock_alert.html",
template_data={
"product_name": product_data["name"],
"current_stock": product_data["inventory_count"],
"product_id": str(product_data["_id"])
}
)
email_service = EmailService()