Desk-Back2 / app /services /notifications.py
Fred808's picture
Upload 32 files
8dafdf7 verified
from typing import Dict, Any
from datetime import datetime
from ..db.database import db
from ..core.config import settings
from ..utils.cache import cache
from ..services.websocket import create_and_broadcast_notification
class NotificationService:
async def create_notification(
self,
user_id: str,
title: str,
message: str,
notification_type: str,
data: Dict[str, Any] = None
):
"""Create and store a notification"""
return await create_and_broadcast_notification(
user_id=user_id,
title=title,
message=message,
notification_type=notification_type,
data=data
)
async def get_user_notifications(
self,
user_id: str,
skip: int = 0,
limit: int = 50,
unread_only: bool = False
):
"""Get notifications for a user"""
cache_key = f"user_notifications:{user_id}"
if not unread_only:
cached = await cache.get_cache(cache_key)
if cached:
return cached
query = {"user_id": user_id}
if unread_only:
query["read"] = False
cursor = db.db["notifications"].find(query)\
.sort("created_at", -1)\
.skip(skip)\
.limit(limit)
notifications = await cursor.to_list(length=limit)
if not unread_only:
await cache.set_cache(cache_key, notifications, expire=300) # Cache for 5 minutes
return notifications
async def mark_as_read(self, notification_id: str, user_id: str):
"""Mark a notification as read"""
result = await db.db["notifications"].update_one(
{"_id": notification_id, "user_id": user_id},
{"$set": {"read": True}}
)
if result.modified_count > 0:
await cache.delete_cache(f"user_notifications:{user_id}")
return True
return False
async def mark_all_as_read(self, user_id: str):
"""Mark all notifications as read for a user"""
result = await db.db["notifications"].update_many(
{"user_id": user_id, "read": False},
{"$set": {"read": True}}
)
await cache.delete_cache(f"user_notifications:{user_id}")
return result.modified_count
async def delete_notification(self, notification_id: str, user_id: str):
"""Delete a notification"""
result = await db.db["notifications"].delete_one(
{"_id": notification_id, "user_id": user_id}
)
if result.deleted_count > 0:
await cache.delete_cache(f"user_notifications:{user_id}")
return True
return False
notifications = NotificationService()