Spaces:
Sleeping
Sleeping
| from datetime import datetime, timezone | |
| from typing import Optional | |
| from bson import ObjectId | |
| from app.database import get_db | |
| async def create_notification( | |
| user_id: str, | |
| notif_type: str, | |
| title: str, | |
| message: str, | |
| payload: Optional[dict] = None, | |
| ) -> str: | |
| db = get_db() | |
| doc = { | |
| "user_id": user_id, | |
| "type": notif_type, | |
| "title": title, | |
| "message": message, | |
| "payload": payload or {}, | |
| "is_read": False, | |
| "created_at": datetime.now(timezone.utc), | |
| } | |
| result = await db.notifications.insert_one(doc) | |
| return str(result.inserted_id) | |
| async def unread_count(user_id: str) -> int: | |
| db = get_db() | |
| return await db.notifications.count_documents({"user_id": user_id, "is_read": False}) | |
| async def list_notifications(user_id: str, unread_only: bool = False, limit: int = 30, skip: int = 0) -> list[dict]: | |
| db = get_db() | |
| query = {"user_id": user_id} | |
| if unread_only: | |
| query["is_read"] = False | |
| cursor = db.notifications.find(query).sort("created_at", -1).skip(skip).limit(limit) | |
| items = [] | |
| async for n in cursor: | |
| items.append( | |
| { | |
| "id": str(n["_id"]), | |
| "user_id": n["user_id"], | |
| "type": n.get("type", "general"), | |
| "title": n.get("title", ""), | |
| "message": n.get("message", ""), | |
| "payload": n.get("payload"), | |
| "is_read": n.get("is_read", False), | |
| "created_at": n.get("created_at"), | |
| } | |
| ) | |
| return items | |
| async def mark_read(user_id: str, notification_ids: list[str]) -> int: | |
| db = get_db() | |
| object_ids = [] | |
| for item in notification_ids: | |
| try: | |
| object_ids.append(ObjectId(item)) | |
| except Exception: | |
| continue | |
| if not object_ids: | |
| return 0 | |
| result = await db.notifications.update_many( | |
| {"_id": {"$in": object_ids}, "user_id": user_id}, | |
| {"$set": {"is_read": True}}, | |
| ) | |
| return int(result.modified_count) | |