Spaces:
Paused
Paused
| from fastapi import APIRouter, Depends, HTTPException, Query | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, update | |
| from typing import List, Dict, Any, Optional | |
| from ..core.dependencies import get_current_active_user | |
| from ..db.database import get_db | |
| from ..db.models import Notification, User | |
| from ..db.schemas import NotificationCreate, NotificationInDB | |
| router = APIRouter() | |
| async def get_notifications( | |
| skip: int = Query(0, ge=0), | |
| limit: int = Query(50, ge=1, le=100), | |
| unread_only: bool = False, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> List[NotificationInDB]: | |
| """Get user's notifications""" | |
| query = select(Notification).where(Notification.user_id == current_user.id) | |
| if unread_only: | |
| query = query.where(Notification.read == False) | |
| query = query.order_by(Notification.created_at.desc()).offset(skip).limit(limit) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def mark_notification_read( | |
| notification_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> NotificationInDB: | |
| """Mark a notification as read""" | |
| stmt = select(Notification).where( | |
| Notification.id == notification_id, | |
| Notification.user_id == current_user.id | |
| ) | |
| result = await db.execute(stmt) | |
| notification = result.scalar_one_or_none() | |
| if not notification: | |
| raise HTTPException(status_code=404, detail="Notification not found") | |
| notification.read = True | |
| await db.commit() | |
| await db.refresh(notification) | |
| return notification | |
| async def mark_all_notifications_read( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, int]: | |
| """Mark all notifications as read""" | |
| stmt = update(Notification).where( | |
| Notification.user_id == current_user.id, | |
| Notification.read == False | |
| ).values(read=True) | |
| result = await db.execute(stmt) | |
| await db.commit() | |
| return {"marked_count": result.rowcount} | |
| async def delete_notification( | |
| notification_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, bool]: | |
| """Delete a notification""" | |
| stmt = select(Notification).where( | |
| Notification.id == notification_id, | |
| Notification.user_id == current_user.id | |
| ) | |
| result = await db.execute(stmt) | |
| notification = result.scalar_one_or_none() | |
| if not notification: | |
| raise HTTPException(status_code=404, detail="Notification not found") | |
| await db.delete(notification) | |
| await db.commit() | |
| return {"success": True} |