Admin-Desk / app /services /websocket.py
Fred808's picture
Update app/services/websocket.py
e472a99 verified
from typing import List, Dict, Any
from fastapi import WebSocket
from datetime import datetime
from ..db.database import db
from ..utils.cache import cache
from ..db.models import Notification
# Store active WebSocket connections
active_connections: List[WebSocket] = []
async def connect(websocket: WebSocket):
"""Accept a new WebSocket connection"""
await websocket.accept()
active_connections.append(websocket)
async def disconnect(websocket: WebSocket):
"""Remove a WebSocket connection"""
if websocket in active_connections:
active_connections.remove(websocket)
async def broadcast_message(message: dict):
"""Broadcast a message to all connected clients"""
disconnected = []
for connection in active_connections:
try:
await connection.send_json(message)
except:
disconnected.append(connection)
# Clean up disconnected clients
for connection in disconnected:
await disconnect(connection)
async def create_and_broadcast_notification(
user_id: str,
title: str,
message: str,
notification_type: str,
data: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Create and broadcast a notification"""
async with db.session() as session:
# Create notification
notification = Notification(
user_id=user_id,
title=title,
message=message,
type=notification_type,
data=data,
created_at=datetime.utcnow(),
read=False
)
# Store in database
session.add(notification)
await session.commit()
await session.refresh(notification)
# Convert to dict for broadcasting
notification_dict = {
"id": str(notification.id),
"user_id": notification.user_id,
"title": notification.title,
"message": notification.message,
"type": notification.type,
"data": notification.data,
"created_at": notification.created_at.isoformat(),
"read": notification.read
}
# Broadcast to connected clients
await broadcast_message({
"type": "notification",
"data": notification_dict
})
# Clear user's notification cache
await cache.delete_cache(f"user_notifications:{user_id}")
return notification_dict