File size: 2,754 Bytes
2299bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Message queue - buffer system for async message processing.
"""
from typing import Optional, List
from collections import deque
import asyncio
import json
from datetime import datetime


class MessageQueue:
    """
    In-memory message queue with async support.
    In production, use Redis for distributed processing.
    """
    
    def __init__(self, max_size: int = 10000):
        self._queue = deque()
        self._max_size = max_size
        self._lock = asyncio.Lock()
    
    async def enqueue(self, message) -> bool:
        """
        Add a message to the queue.
        
        Args:
            message: Message object to queue
            
        Returns:
            True if enqueued successfully
        """
        async with self._lock:
            if len(self._queue) >= self._max_size:
                return False
            
            # Serialize message for storage
            msg_data = {
                "id": message.id,
                "sender_id": message.sender_id,
                "receiver_id": message.receiver_id,
                "group_id": message.group_id,
                "content": message.content,
                "type": message.type,
                "created_at": message.created_at.isoformat() if message.created_at else None
            }
            self._queue.append(msg_data)
            return True
    
    async def dequeue(self) -> Optional[dict]:
        """
        Remove and return the oldest message from queue.
        
        Returns:
            Message data dict or None if empty
        """
        async with self._lock:
            if not self._queue:
                return None
            return self._queue.popleft()
    
    async def peek(self) -> Optional[dict]:
        """View the oldest message without removing it."""
        async with self._lock:
            if not self._queue:
                return None
            return self._queue[0]
    
    async def get_batch(self, size: int = 100) -> List[dict]:
        """Get multiple messages at once for batch processing."""
        async with self._lock:
            batch = []
            for _ in range(min(size, len(self._queue))):
                if self._queue:
                    batch.append(self._queue.popleft())
            return batch
    
    async def clear(self):
        """Clear all messages from queue."""
        async with self._lock:
            self._queue.clear()
    
    def size(self) -> int:
        """Get current queue size (non-async)."""
        return len(self._queue)
    
    async def is_empty(self) -> bool:
        """Check if queue is empty."""
        async with self._lock:
            return len(self._queue) == 0


# Global queue instance
message_queue = MessageQueue()