test_ui / src /open_llm_vtuber /proxy_message_queue.py
britto224's picture
Upload 130 files
5669b22 verified
import asyncio
from typing import Dict, Optional, Deque, Any, Callable
from collections import deque
from loguru import logger
class ProxyMessageQueue:
"""
Manages message queuing and consumption for the proxy handler.
Implements a producer-consumer pattern with conversation state awareness.
"""
def __init__(self):
"""Initialize the message queue manager"""
self.message_queue: Deque[Dict] = deque()
self._conversation_active = False
self.lock = asyncio.Lock()
self._consumer_task = None
self._forward_func = None
self._running = False
def initialize(self, forward_func: Callable[[Dict, Optional[str]], Any]):
"""
Initialize the queue with a message forwarding function.
Args:
forward_func: Function to call when forwarding messages to the server
"""
self._forward_func = forward_func
logger.debug("Message queue initialized with forward function")
def queue_message(self, message: Dict, sender_id: Optional[str] = None) -> None:
"""
Add a message to the queue.
Args:
message: The message to queue
sender_id: Optional ID of the client that sent the message
"""
# Store the message along with its sender ID
queue_item = {"message": message, "sender_id": sender_id}
logger.info(
f"Queuing message: {message.get('text', '')} (active conversation: {self._conversation_active})"
)
self.message_queue.append(queue_item)
# Start consumer if needed
self._ensure_consumer_running()
@property
def conversation_active(self) -> bool:
"""Get the conversation active state"""
return self._conversation_active
@conversation_active.setter
def conversation_active(self, active: bool) -> None:
"""
Set the conversation active state.
Args:
active: True if a conversation is active, False otherwise
"""
if self._conversation_active != active:
logger.debug(f"Setting conversation active state to: {active}")
self._conversation_active = active
# If conversation becomes inactive, make sure consumer is running to process any queued messages
if not active and self.has_pending_messages():
self._ensure_consumer_running()
def has_pending_messages(self) -> bool:
"""
Check if there are pending messages in the queue.
Returns:
bool: True if there are messages to process, False otherwise
"""
return len(self.message_queue) > 0
def _ensure_consumer_running(self):
"""Ensure the consumer task is running if needed"""
if not self._forward_func:
logger.warning("Cannot start consumer: no forward function provided")
return
if self._consumer_task is None or self._consumer_task.done():
self._running = True
self._consumer_task = asyncio.create_task(self._consume_loop())
logger.debug("Started message consumer task")
async def _consume_loop(self):
"""Background task that consumes messages based on conversation state"""
try:
while self._running:
# Wait a short time to prevent CPU hogging
await asyncio.sleep(0.1)
# Try to consume a message if appropriate
async with self.lock:
if not self._conversation_active and self.has_pending_messages():
# Get next message
queue_item = self.message_queue.popleft()
message = queue_item["message"]
sender_id = queue_item["sender_id"]
logger.info(
f"Consumer processing message: {message.get('text', '')}"
)
# Set active before forwarding to prevent race conditions
self._conversation_active = True
# Forward the message outside the lock to prevent deadlocks
asyncio.create_task(self._forward_message(message, sender_id))
# If queue is empty and we've been idle for a while, we can pause the consumer
if not self.has_pending_messages() and not self._conversation_active:
await asyncio.sleep(1) # Wait a bit longer before deciding to stop
if (
not self.has_pending_messages()
and not self._conversation_active
):
logger.debug(
"No more pending messages and no active conversation, pausing consumer"
)
break
except Exception as e:
logger.error(f"Error in message consumer loop: {e}")
finally:
self._running = False
logger.debug("Message consumer task ended")
async def _forward_message(self, message: Dict, sender_id: Optional[str] = None):
"""Forward a message using the provided forward function"""
try:
if self._forward_func:
# If this is a text input, send transcription first
if message.get("type") == "text-input":
# Create transcription message
transcription_message = message.copy()
transcription_message["type"] = "user-input-transcription"
# Forward transcription message
await self._forward_func(transcription_message, sender_id)
# Forward the original message
await self._forward_func(message, sender_id)
else:
logger.warning("No forward function available to process message")
except Exception as e:
logger.error(f"Error forwarding message: {e}")
# If forwarding fails, mark conversation as inactive to allow next message
self._conversation_active = False
def stop(self):
"""Stop the consumer task"""
self._running = False
if self._consumer_task and not self._consumer_task.done():
self._consumer_task.cancel()
def clear(self):
"""Clear all pending messages"""
self.message_queue.clear()
logger.info("Message queue cleared")