File size: 6,710 Bytes
5669b22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import asyncio
from typing import Dict, Optional, Deque, Any, Callable
from collections import deque
from loguru import logger


class ProxyMessageQueue:
    """

    Manages message queuing and consumption for the proxy handler.

    Implements a producer-consumer pattern with conversation state awareness.

    """

    def __init__(self):
        """Initialize the message queue manager"""
        self.message_queue: Deque[Dict] = deque()
        self._conversation_active = False
        self.lock = asyncio.Lock()
        self._consumer_task = None
        self._forward_func = None
        self._running = False

    def initialize(self, forward_func: Callable[[Dict, Optional[str]], Any]):
        """

        Initialize the queue with a message forwarding function.



        Args:

            forward_func: Function to call when forwarding messages to the server

        """
        self._forward_func = forward_func
        logger.debug("Message queue initialized with forward function")

    def queue_message(self, message: Dict, sender_id: Optional[str] = None) -> None:
        """

        Add a message to the queue.



        Args:

            message: The message to queue

            sender_id: Optional ID of the client that sent the message

        """
        # Store the message along with its sender ID
        queue_item = {"message": message, "sender_id": sender_id}
        logger.info(
            f"Queuing message: {message.get('text', '')} (active conversation: {self._conversation_active})"
        )
        self.message_queue.append(queue_item)

        # Start consumer if needed
        self._ensure_consumer_running()

    @property
    def conversation_active(self) -> bool:
        """Get the conversation active state"""
        return self._conversation_active

    @conversation_active.setter
    def conversation_active(self, active: bool) -> None:
        """

        Set the conversation active state.



        Args:

            active: True if a conversation is active, False otherwise

        """
        if self._conversation_active != active:
            logger.debug(f"Setting conversation active state to: {active}")
            self._conversation_active = active

            # If conversation becomes inactive, make sure consumer is running to process any queued messages
            if not active and self.has_pending_messages():
                self._ensure_consumer_running()

    def has_pending_messages(self) -> bool:
        """

        Check if there are pending messages in the queue.



        Returns:

            bool: True if there are messages to process, False otherwise

        """
        return len(self.message_queue) > 0

    def _ensure_consumer_running(self):
        """Ensure the consumer task is running if needed"""
        if not self._forward_func:
            logger.warning("Cannot start consumer: no forward function provided")
            return

        if self._consumer_task is None or self._consumer_task.done():
            self._running = True
            self._consumer_task = asyncio.create_task(self._consume_loop())
            logger.debug("Started message consumer task")

    async def _consume_loop(self):
        """Background task that consumes messages based on conversation state"""
        try:
            while self._running:
                # Wait a short time to prevent CPU hogging
                await asyncio.sleep(0.1)

                # Try to consume a message if appropriate
                async with self.lock:
                    if not self._conversation_active and self.has_pending_messages():
                        # Get next message
                        queue_item = self.message_queue.popleft()
                        message = queue_item["message"]
                        sender_id = queue_item["sender_id"]

                        logger.info(
                            f"Consumer processing message: {message.get('text', '')}"
                        )

                        # Set active before forwarding to prevent race conditions
                        self._conversation_active = True

                        # Forward the message outside the lock to prevent deadlocks
                        asyncio.create_task(self._forward_message(message, sender_id))

                # If queue is empty and we've been idle for a while, we can pause the consumer
                if not self.has_pending_messages() and not self._conversation_active:
                    await asyncio.sleep(1)  # Wait a bit longer before deciding to stop
                    if (
                        not self.has_pending_messages()
                        and not self._conversation_active
                    ):
                        logger.debug(
                            "No more pending messages and no active conversation, pausing consumer"
                        )
                        break

        except Exception as e:
            logger.error(f"Error in message consumer loop: {e}")
        finally:
            self._running = False
            logger.debug("Message consumer task ended")

    async def _forward_message(self, message: Dict, sender_id: Optional[str] = None):
        """Forward a message using the provided forward function"""
        try:
            if self._forward_func:
                # If this is a text input, send transcription first
                if message.get("type") == "text-input":
                    # Create transcription message
                    transcription_message = message.copy()
                    transcription_message["type"] = "user-input-transcription"
                    # Forward transcription message
                    await self._forward_func(transcription_message, sender_id)

                # Forward the original message
                await self._forward_func(message, sender_id)
            else:
                logger.warning("No forward function available to process message")
        except Exception as e:
            logger.error(f"Error forwarding message: {e}")
            # If forwarding fails, mark conversation as inactive to allow next message
            self._conversation_active = False

    def stop(self):
        """Stop the consumer task"""
        self._running = False
        if self._consumer_task and not self._consumer_task.done():
            self._consumer_task.cancel()

    def clear(self):
        """Clear all pending messages"""
        self.message_queue.clear()
        logger.info("Message queue cleared")