| from abc import ABC, abstractmethod
|
| import asyncio
|
| from typing import Callable, Dict, Any
|
|
|
|
|
| class LivePlatformInterface(ABC):
|
| """
|
| Abstract interface for live streaming platforms.
|
| This interface defines the methods that any live platform implementation must provide.
|
| It handles connecting to the VTuber server via proxy, sending messages from the live platform,
|
| and receiving responses.
|
| """
|
|
|
| @abstractmethod
|
| async def connect(self, proxy_url: str) -> bool:
|
| """
|
| Connect to the VTuber server via proxy WebSocket.
|
|
|
| Args:
|
| proxy_url: WebSocket URL for the proxy
|
|
|
| Returns:
|
| bool: True if connection successful, False otherwise
|
| """
|
| pass
|
|
|
| @abstractmethod
|
| async def disconnect(self) -> None:
|
| """
|
| Disconnect from the proxy server.
|
| """
|
| pass
|
|
|
| @abstractmethod
|
| async def send_message(self, text: str) -> bool:
|
| """
|
| Send a message from the live platform to the VTuber.
|
|
|
| Args:
|
| text: Message text content
|
|
|
| Returns:
|
| bool: True if message was sent successfully, False otherwise
|
| """
|
| pass
|
|
|
| @abstractmethod
|
| async def register_message_handler(
|
| self, handler: Callable[[Dict[str, Any]], None]
|
| ) -> None:
|
| """
|
| Register a callback function to handle response messages from the VTuber.
|
|
|
| Args:
|
| handler: Callback function that takes a message dict as parameter
|
| """
|
| pass
|
|
|
| @abstractmethod
|
| async def start_receiving(self) -> None:
|
| """
|
| Start receiving messages from the proxy server.
|
| This method should typically be run in a separate task.
|
| """
|
| pass
|
|
|
| @abstractmethod
|
| async def run(self) -> None:
|
| """
|
| Main entry point to run the live platform client.
|
| This should handle the complete lifecycle including connection,
|
| message receiving, and clean disconnection.
|
| """
|
| pass
|
|
|
| @property
|
| @abstractmethod
|
| def is_connected(self) -> bool:
|
| """
|
| Check if the client is currently connected to the proxy.
|
|
|
| Returns:
|
| bool: True if connected, False otherwise
|
| """
|
| pass
|
|
|
| @abstractmethod
|
| async def handle_incoming_messages(self, message: Dict[str, Any]) -> None:
|
| """
|
| Process incoming messages from the VTuber server.
|
|
|
| Args:
|
| message: The message received from the VTuber
|
| """
|
| pass
|
|
|
|
|
| class MessageQueue:
|
| """
|
| A simple message queue for storing and retrieving messages.
|
| """
|
|
|
| def __init__(self):
|
| """Initialize an empty message queue."""
|
| self._queue = asyncio.Queue()
|
|
|
| async def put(self, message: str) -> None:
|
| """
|
| Add a message to the queue.
|
|
|
| Args:
|
| message: Message to queue
|
| """
|
| await self._queue.put(message)
|
|
|
| async def get(self) -> str:
|
| """
|
| Get the next message from the queue.
|
|
|
| Returns:
|
| str: The next message
|
| """
|
| return await self._queue.get()
|
|
|
| def empty(self) -> bool:
|
| """
|
| Check if the queue is empty.
|
|
|
| Returns:
|
| bool: True if empty, False otherwise
|
| """
|
| return self._queue.empty()
|
|
|
| def qsize(self) -> int:
|
| """
|
| Get the current queue size.
|
|
|
| Returns:
|
| int: Number of messages in queue
|
| """
|
| return self._queue.qsize()
|
|
|