Spaces:
Paused
Paused
| from fastapi_mail import FastMail, MessageSchema, ConnectionConfig | |
| from pydantic import EmailStr | |
| from typing import List, Dict, Any | |
| from ..core.config import settings | |
| from pathlib import Path | |
| import aiofiles | |
| import jinja2 | |
| class EmailService: | |
| def __init__(self): | |
| self.conf = ConnectionConfig( | |
| MAIL_USERNAME=settings.MAIL_USERNAME, | |
| MAIL_PASSWORD=settings.MAIL_PASSWORD, | |
| MAIL_FROM=settings.MAIL_FROM, | |
| MAIL_PORT=settings.MAIL_PORT, | |
| MAIL_SERVER=settings.MAIL_SERVER, | |
| MAIL_TLS=True, | |
| MAIL_SSL=False, | |
| TEMPLATE_FOLDER=Path(__file__).parent.parent / 'templates' / 'email' | |
| ) | |
| self.fast_mail = FastMail(self.conf) | |
| # Create templates directory if it doesn't exist | |
| template_dir = Path(__file__).parent.parent / 'templates' / 'email' | |
| template_dir.mkdir(parents=True, exist_ok=True) | |
| async def send_email( | |
| self, | |
| email_to: List[EmailStr], | |
| subject: str, | |
| template_name: str, | |
| template_data: Dict[str, Any] | |
| ): | |
| """Send an email using a template""" | |
| try: | |
| message = MessageSchema( | |
| subject=subject, | |
| recipients=email_to, | |
| template_body=template_data, | |
| subtype="html" | |
| ) | |
| await self.fast_mail.send_message( | |
| message, | |
| template_name=template_name | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to send email: {str(e)}") | |
| return False | |
| async def send_order_confirmation(self, email: EmailStr, order_data: Dict[str, Any]): | |
| """Send order confirmation email""" | |
| return await self.send_email( | |
| email_to=[email], | |
| subject="Order Confirmation", | |
| template_name="order_confirmation.html", | |
| template_data={ | |
| "order_id": str(order_data["_id"]), | |
| "total_amount": order_data["total_amount"], | |
| "products": order_data["products"], | |
| "status": order_data["status"] | |
| } | |
| ) | |
| async def send_password_reset(self, email: EmailStr, reset_token: str): | |
| """Send password reset email""" | |
| return await self.send_email( | |
| email_to=[email], | |
| subject="Password Reset Request", | |
| template_name="password_reset.html", | |
| template_data={ | |
| "reset_token": reset_token, | |
| "reset_url": f"{settings.FRONTEND_URL}/reset-password?token={reset_token}" | |
| } | |
| ) | |
| async def send_welcome_email(self, email: EmailStr, username: str): | |
| """Send welcome email to new users""" | |
| return await self.send_email( | |
| email_to=[email], | |
| subject="Welcome to Admin Dashboard", | |
| template_name="welcome.html", | |
| template_data={ | |
| "username": username, | |
| "login_url": f"{settings.FRONTEND_URL}/login" | |
| } | |
| ) | |
| async def send_low_stock_alert(self, email: EmailStr, product_data: Dict[str, Any]): | |
| """Send low stock alert to admins""" | |
| return await self.send_email( | |
| email_to=[email], | |
| subject="Low Stock Alert", | |
| template_name="low_stock_alert.html", | |
| template_data={ | |
| "product_name": product_data["name"], | |
| "current_stock": product_data["inventory_count"], | |
| "product_id": str(product_data["_id"]) | |
| } | |
| ) | |
| email_service = EmailService() |