File size: 6,733 Bytes
22d587c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Kariana Unified UI - Async Socket Client
Handles communication with KarianaUMCP socket server
"""
import asyncio
import json
import logging
from typing import Any, Optional, Callable

logger = logging.getLogger(__name__)


class SocketClient:
    """Async socket client for communicating with KarianaUMCP"""

    def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0):
        self.host = host
        self.port = port
        self.timeout = timeout
        self._reader: Optional[asyncio.StreamReader] = None
        self._writer: Optional[asyncio.StreamWriter] = None
        self._connected = False
        self._lock = asyncio.Lock()

    @property
    def connected(self) -> bool:
        return self._connected and self._writer is not None

    async def connect(self) -> bool:
        """Establish connection to socket server"""
        try:
            self._reader, self._writer = await asyncio.wait_for(
                asyncio.open_connection(self.host, self.port),
                timeout=self.timeout
            )
            self._connected = True
            logger.info(f"Connected to {self.host}:{self.port}")
            return True
        except asyncio.TimeoutError:
            logger.warning(f"Connection timeout to {self.host}:{self.port}")
            return False
        except ConnectionRefusedError:
            logger.warning(f"Connection refused at {self.host}:{self.port}")
            return False
        except Exception as e:
            logger.error(f"Connection error: {e}")
            return False

    async def disconnect(self):
        """Close connection"""
        if self._writer:
            try:
                self._writer.close()
                await self._writer.wait_closed()
            except Exception as e:
                logger.error(f"Error closing connection: {e}")
            finally:
                self._writer = None
                self._reader = None
                self._connected = False

    async def send_command(self, command: dict) -> Optional[dict]:
        """Send a command and receive response"""
        async with self._lock:
            if not self.connected:
                if not await self.connect():
                    return None

            try:
                # Send command as JSON with newline delimiter
                message = json.dumps(command) + "\n"
                self._writer.write(message.encode())
                await self._writer.drain()

                # Read response (newline-delimited JSON)
                response_data = await asyncio.wait_for(
                    self._reader.readline(),
                    timeout=self.timeout
                )

                if not response_data:
                    logger.warning("Empty response received")
                    return None

                return json.loads(response_data.decode().strip())

            except asyncio.TimeoutError:
                logger.warning("Response timeout")
                await self.disconnect()
                return None
            except json.JSONDecodeError as e:
                logger.error(f"Invalid JSON response: {e}")
                return None
            except Exception as e:
                logger.error(f"Command error: {e}")
                await self.disconnect()
                return None

    async def ping(self) -> bool:
        """Test connection with ping command"""
        response = await self.send_command({"type": "ping"})
        return response is not None and response.get("status") == "ok"

    async def subscribe(self, event_type: str, callback: Callable[[dict], None]):
        """Subscribe to real-time events (logs, etc.)"""
        # Subscribe command
        response = await self.send_command({
            "type": f"subscribe_{event_type}"
        })

        if not response or not response.get("success"):
            return False

        # Start listening for events in background
        asyncio.create_task(self._event_listener(event_type, callback))
        return True

    async def _event_listener(self, event_type: str, callback: Callable[[dict], None]):
        """Background listener for subscribed events"""
        try:
            while self.connected:
                try:
                    data = await asyncio.wait_for(
                        self._reader.readline(),
                        timeout=1.0
                    )
                    if data:
                        event = json.loads(data.decode().strip())
                        if event.get("event_type") == event_type:
                            callback(event)
                except asyncio.TimeoutError:
                    continue
                except json.JSONDecodeError:
                    continue
        except Exception as e:
            logger.error(f"Event listener error: {e}")


class SyncSocketClient:
    """Synchronous wrapper for SocketClient (for Gradio callbacks)"""

    def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0):
        self.host = host
        self.port = port
        self.timeout = timeout
        self._client: Optional[SocketClient] = None
        self._loop: Optional[asyncio.AbstractEventLoop] = None

    def _get_loop(self) -> asyncio.AbstractEventLoop:
        """Get or create event loop"""
        try:
            return asyncio.get_running_loop()
        except RuntimeError:
            if self._loop is None or self._loop.is_closed():
                self._loop = asyncio.new_event_loop()
            return self._loop

    def _run(self, coro):
        """Run coroutine synchronously"""
        loop = self._get_loop()
        try:
            return loop.run_until_complete(coro)
        except RuntimeError:
            # Already running in async context, use nest_asyncio pattern
            import concurrent.futures
            with concurrent.futures.ThreadPoolExecutor() as pool:
                future = pool.submit(asyncio.run, coro)
                return future.result(timeout=self.timeout + 1)

    def connect(self) -> bool:
        self._client = SocketClient(self.host, self.port, self.timeout)
        return self._run(self._client.connect())

    def disconnect(self):
        if self._client:
            self._run(self._client.disconnect())

    def send_command(self, command: dict) -> Optional[dict]:
        if not self._client:
            self._client = SocketClient(self.host, self.port, self.timeout)
        return self._run(self._client.send_command(command))

    def ping(self) -> bool:
        if not self._client:
            self._client = SocketClient(self.host, self.port, self.timeout)
        return self._run(self._client.ping())