Chat-App / backend /app /system /queue.py
openhands
feat: Add FastAPI backend + React frontend
2299bb4
"""
Message queue - buffer system for async message processing.
"""
from typing import Optional, List
from collections import deque
import asyncio
import json
from datetime import datetime
class MessageQueue:
"""
In-memory message queue with async support.
In production, use Redis for distributed processing.
"""
def __init__(self, max_size: int = 10000):
self._queue = deque()
self._max_size = max_size
self._lock = asyncio.Lock()
async def enqueue(self, message) -> bool:
"""
Add a message to the queue.
Args:
message: Message object to queue
Returns:
True if enqueued successfully
"""
async with self._lock:
if len(self._queue) >= self._max_size:
return False
# Serialize message for storage
msg_data = {
"id": message.id,
"sender_id": message.sender_id,
"receiver_id": message.receiver_id,
"group_id": message.group_id,
"content": message.content,
"type": message.type,
"created_at": message.created_at.isoformat() if message.created_at else None
}
self._queue.append(msg_data)
return True
async def dequeue(self) -> Optional[dict]:
"""
Remove and return the oldest message from queue.
Returns:
Message data dict or None if empty
"""
async with self._lock:
if not self._queue:
return None
return self._queue.popleft()
async def peek(self) -> Optional[dict]:
"""View the oldest message without removing it."""
async with self._lock:
if not self._queue:
return None
return self._queue[0]
async def get_batch(self, size: int = 100) -> List[dict]:
"""Get multiple messages at once for batch processing."""
async with self._lock:
batch = []
for _ in range(min(size, len(self._queue))):
if self._queue:
batch.append(self._queue.popleft())
return batch
async def clear(self):
"""Clear all messages from queue."""
async with self._lock:
self._queue.clear()
def size(self) -> int:
"""Get current queue size (non-async)."""
return len(self._queue)
async def is_empty(self) -> bool:
"""Check if queue is empty."""
async with self._lock:
return len(self._queue) == 0
# Global queue instance
message_queue = MessageQueue()