accujuris-api / app /services /notification_service.py
arnavam's picture
Initial HuggingFace deployment
cd3078d
from datetime import datetime, timezone
from typing import Optional
from bson import ObjectId
from app.database import get_db
async def create_notification(
user_id: str,
notif_type: str,
title: str,
message: str,
payload: Optional[dict] = None,
) -> str:
db = get_db()
doc = {
"user_id": user_id,
"type": notif_type,
"title": title,
"message": message,
"payload": payload or {},
"is_read": False,
"created_at": datetime.now(timezone.utc),
}
result = await db.notifications.insert_one(doc)
return str(result.inserted_id)
async def unread_count(user_id: str) -> int:
db = get_db()
return await db.notifications.count_documents({"user_id": user_id, "is_read": False})
async def list_notifications(user_id: str, unread_only: bool = False, limit: int = 30, skip: int = 0) -> list[dict]:
db = get_db()
query = {"user_id": user_id}
if unread_only:
query["is_read"] = False
cursor = db.notifications.find(query).sort("created_at", -1).skip(skip).limit(limit)
items = []
async for n in cursor:
items.append(
{
"id": str(n["_id"]),
"user_id": n["user_id"],
"type": n.get("type", "general"),
"title": n.get("title", ""),
"message": n.get("message", ""),
"payload": n.get("payload"),
"is_read": n.get("is_read", False),
"created_at": n.get("created_at"),
}
)
return items
async def mark_read(user_id: str, notification_ids: list[str]) -> int:
db = get_db()
object_ids = []
for item in notification_ids:
try:
object_ids.append(ObjectId(item))
except Exception:
continue
if not object_ids:
return 0
result = await db.notifications.update_many(
{"_id": {"$in": object_ids}, "user_id": user_id},
{"$set": {"is_read": True}},
)
return int(result.modified_count)