Desk-Back2 / app /services /websocket.py
Fred808's picture
Upload 32 files
8dafdf7 verified
from typing import List, Dict, Any
from fastapi import WebSocket
from datetime import datetime
from ..db.database import db
from ..utils.cache import cache
from ..db.models import Notification
# Store active WebSocket connections
active_connections: List[WebSocket] = []
async def connect(websocket: WebSocket):
"""Accept a new WebSocket connection"""
await websocket.accept()
active_connections.append(websocket)
async def disconnect(websocket: WebSocket):
"""Remove a WebSocket connection"""
if websocket in active_connections:
active_connections.remove(websocket)
async def broadcast_message(message: dict):
"""Broadcast a message to all connected clients"""
disconnected = []
for connection in active_connections:
try:
await connection.send_json(message)
except:
disconnected.append(connection)
# Clean up disconnected clients
for connection in disconnected:
await disconnect(connection)
async def create_and_broadcast_notification(
user_id: str,
title: str,
message: str,
notification_type: str,
data: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Create and broadcast a notification"""
async with db.session() as session:
# Create notification
notification = Notification(
user_id=user_id,
title=title,
message=message,
type=notification_type,
data=data,
created_at=datetime.utcnow(),
read=False
)
# Store in database
session.add(notification)
await session.commit()
await session.refresh(notification)
# Convert to dict for broadcasting
notification_dict = {
"id": str(notification.id),
"user_id": notification.user_id,
"title": notification.title,
"message": notification.message,
"type": notification.type,
"data": notification.data,
"created_at": notification.created_at.isoformat(),
"read": notification.read
}
# Broadcast to connected clients
await broadcast_message({
"type": "notification",
"data": notification_dict
})
# Clear user's notification cache
await cache.delete_cache(f"user_notifications:{user_id}")
return notification_dict