File size: 3,669 Bytes
5669b22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from abc import ABC, abstractmethod
import asyncio
from typing import Callable, Dict, Any


class LivePlatformInterface(ABC):
    """

    Abstract interface for live streaming platforms.

    This interface defines the methods that any live platform implementation must provide.

    It handles connecting to the VTuber server via proxy, sending messages from the live platform,

    and receiving responses.

    """

    @abstractmethod
    async def connect(self, proxy_url: str) -> bool:
        """

        Connect to the VTuber server via proxy WebSocket.



        Args:

            proxy_url: WebSocket URL for the proxy



        Returns:

            bool: True if connection successful, False otherwise

        """
        pass

    @abstractmethod
    async def disconnect(self) -> None:
        """

        Disconnect from the proxy server.

        """
        pass

    @abstractmethod
    async def send_message(self, text: str) -> bool:
        """

        Send a message from the live platform to the VTuber.



        Args:

            text: Message text content



        Returns:

            bool: True if message was sent successfully, False otherwise

        """
        pass

    @abstractmethod
    async def register_message_handler(

        self, handler: Callable[[Dict[str, Any]], None]

    ) -> None:
        """

        Register a callback function to handle response messages from the VTuber.



        Args:

            handler: Callback function that takes a message dict as parameter

        """
        pass

    @abstractmethod
    async def start_receiving(self) -> None:
        """

        Start receiving messages from the proxy server.

        This method should typically be run in a separate task.

        """
        pass

    @abstractmethod
    async def run(self) -> None:
        """

        Main entry point to run the live platform client.

        This should handle the complete lifecycle including connection,

        message receiving, and clean disconnection.

        """
        pass

    @property
    @abstractmethod
    def is_connected(self) -> bool:
        """

        Check if the client is currently connected to the proxy.



        Returns:

            bool: True if connected, False otherwise

        """
        pass

    @abstractmethod
    async def handle_incoming_messages(self, message: Dict[str, Any]) -> None:
        """

        Process incoming messages from the VTuber server.



        Args:

            message: The message received from the VTuber

        """
        pass


class MessageQueue:
    """

    A simple message queue for storing and retrieving messages.

    """

    def __init__(self):
        """Initialize an empty message queue."""
        self._queue = asyncio.Queue()

    async def put(self, message: str) -> None:
        """

        Add a message to the queue.



        Args:

            message: Message to queue

        """
        await self._queue.put(message)

    async def get(self) -> str:
        """

        Get the next message from the queue.



        Returns:

            str: The next message

        """
        return await self._queue.get()

    def empty(self) -> bool:
        """

        Check if the queue is empty.



        Returns:

            bool: True if empty, False otherwise

        """
        return self._queue.empty()

    def qsize(self) -> int:
        """

        Get the current queue size.



        Returns:

            int: Number of messages in queue

        """
        return self._queue.qsize()