MukeshKapoor25's picture
feat: refactor email channel to use SMTP with aiosmtplib, update configuration keys
7a3f919
"""
Notification API endpoints.
POST /send β€” Send single notification
POST /send/batch β€” Send batch notifications
POST /list β€” List notifications (with projection_list support)
GET /{id} β€” Get notification by ID
POST /{id}/retry β€” Retry a failed notification
GET /channels/status β€” Channel configuration status (per-merchant)
"""
from fastapi import APIRouter, HTTPException, Query, status
from app.models.notification import (
SendNotificationRequest,
SendNotificationResponse,
SendBatchRequest,
SendBatchResponse,
ListNotificationsRequest,
ListNotificationsResponse,
)
from app.services.notification_service import NotificationService
from app.services.dispatcher import NotificationDispatcher
from app.core.logging import get_logger
logger = get_logger(__name__)
router = APIRouter(tags=["Notifications"])
@router.post("/send", response_model=SendNotificationResponse)
async def send_notification(payload: SendNotificationRequest):
"""
Queue a notification for delivery via one or more channels.
merchant_id is required β€” credentials are fetched from merchant settings.
"""
try:
doc = await NotificationService.create_and_queue(payload)
return SendNotificationResponse(
success=True,
notification_id=doc["notification_id"],
message="Notification queued for delivery",
channels=doc["channels"],
)
except Exception as e:
logger.error(
"Send notification failed",
exc_info=True,
extra={
"event": "notification_create_failure",
"merchant_id": payload.merchant_id,
"template_name": payload.template_name,
},
)
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/send/batch", response_model=SendBatchResponse)
async def send_batch(payload: SendBatchRequest):
"""Queue multiple notifications in one call."""
queued, failed, results = await NotificationService.create_and_queue_batch(
payload.notifications
)
return SendBatchResponse(
success=failed == 0,
total=len(payload.notifications),
queued=queued,
failed=failed,
results=[
SendNotificationResponse(
success="error" not in r,
notification_id=r.get("notification_id", ""),
message=r.get("error", "Queued"),
channels=r.get("channels", []),
)
for r in results
],
)
@router.post("/list", response_model=ListNotificationsResponse)
async def list_notifications(payload: ListNotificationsRequest):
"""
List notifications with filters and projection_list support.
Follows the mandatory POST /list pattern with MongoDB projection.
"""
docs, total = await NotificationService.list_notifications(
filters=payload.filters,
skip=payload.skip,
limit=payload.limit,
projection_list=payload.projection_list,
)
return ListNotificationsResponse(
success=True,
total=total,
skip=payload.skip,
limit=payload.limit,
data=docs,
)
@router.get("/{notification_id}")
async def get_notification(notification_id: str):
"""Get a single notification by ID."""
doc = await NotificationService.get_notification(notification_id)
if not doc:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Notification not found")
return {"success": True, "data": doc}
@router.post("/{notification_id}/retry")
async def retry_notification(notification_id: str):
"""Re-queue a failed notification for retry."""
doc = await NotificationService.retry_notification(notification_id)
if not doc:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail="Notification not found or not in failed state",
)
return {"success": True, "message": "Notification re-queued", "data": doc}
@router.get("/channels/status")
async def channel_status(
merchant_id: str = Query(..., description="Merchant ID to check channel availability"),
):
"""
Return which notification channels are configured for a specific merchant.
Credentials are per-merchant, so merchant_id is required.
"""
channels = await NotificationDispatcher.get_channel_status(merchant_id)
return {"success": True, "merchant_id": merchant_id, "channels": channels}