Spaces:
Paused
Paused
File size: 3,059 Bytes
b70ff07 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from typing import List, Dict, Any, Optional
from ..core.dependencies import get_current_active_user
from ..db.database import get_db
from ..db.models import Notification, User
from ..db.schemas import NotificationCreate, NotificationInDB
router = APIRouter()
@router.get("/", response_model=List[NotificationInDB])
async def get_notifications(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
unread_only: bool = False,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> List[NotificationInDB]:
"""Get user's notifications"""
query = select(Notification).where(Notification.user_id == current_user.id)
if unread_only:
query = query.where(Notification.read == False)
query = query.order_by(Notification.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
return result.scalars().all()
@router.post("/mark-read/{notification_id}", response_model=NotificationInDB)
async def mark_notification_read(
notification_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> NotificationInDB:
"""Mark a notification as read"""
stmt = select(Notification).where(
Notification.id == notification_id,
Notification.user_id == current_user.id
)
result = await db.execute(stmt)
notification = result.scalar_one_or_none()
if not notification:
raise HTTPException(status_code=404, detail="Notification not found")
notification.read = True
await db.commit()
await db.refresh(notification)
return notification
@router.post("/mark-all-read")
async def mark_all_notifications_read(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, int]:
"""Mark all notifications as read"""
stmt = update(Notification).where(
Notification.user_id == current_user.id,
Notification.read == False
).values(read=True)
result = await db.execute(stmt)
await db.commit()
return {"marked_count": result.rowcount}
@router.delete("/{notification_id}")
async def delete_notification(
notification_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> Dict[str, bool]:
"""Delete a notification"""
stmt = select(Notification).where(
Notification.id == notification_id,
Notification.user_id == current_user.id
)
result = await db.execute(stmt)
notification = result.scalar_one_or_none()
if not notification:
raise HTTPException(status_code=404, detail="Notification not found")
await db.delete(notification)
await db.commit()
return {"success": True} |