Spaces:
Sleeping
Sleeping
File size: 6,733 Bytes
22d587c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | """
Kariana Unified UI - Async Socket Client
Handles communication with KarianaUMCP socket server
"""
import asyncio
import json
import logging
from typing import Any, Optional, Callable
logger = logging.getLogger(__name__)
class SocketClient:
"""Async socket client for communicating with KarianaUMCP"""
def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._connected = False
self._lock = asyncio.Lock()
@property
def connected(self) -> bool:
return self._connected and self._writer is not None
async def connect(self) -> bool:
"""Establish connection to socket server"""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=self.timeout
)
self._connected = True
logger.info(f"Connected to {self.host}:{self.port}")
return True
except asyncio.TimeoutError:
logger.warning(f"Connection timeout to {self.host}:{self.port}")
return False
except ConnectionRefusedError:
logger.warning(f"Connection refused at {self.host}:{self.port}")
return False
except Exception as e:
logger.error(f"Connection error: {e}")
return False
async def disconnect(self):
"""Close connection"""
if self._writer:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception as e:
logger.error(f"Error closing connection: {e}")
finally:
self._writer = None
self._reader = None
self._connected = False
async def send_command(self, command: dict) -> Optional[dict]:
"""Send a command and receive response"""
async with self._lock:
if not self.connected:
if not await self.connect():
return None
try:
# Send command as JSON with newline delimiter
message = json.dumps(command) + "\n"
self._writer.write(message.encode())
await self._writer.drain()
# Read response (newline-delimited JSON)
response_data = await asyncio.wait_for(
self._reader.readline(),
timeout=self.timeout
)
if not response_data:
logger.warning("Empty response received")
return None
return json.loads(response_data.decode().strip())
except asyncio.TimeoutError:
logger.warning("Response timeout")
await self.disconnect()
return None
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response: {e}")
return None
except Exception as e:
logger.error(f"Command error: {e}")
await self.disconnect()
return None
async def ping(self) -> bool:
"""Test connection with ping command"""
response = await self.send_command({"type": "ping"})
return response is not None and response.get("status") == "ok"
async def subscribe(self, event_type: str, callback: Callable[[dict], None]):
"""Subscribe to real-time events (logs, etc.)"""
# Subscribe command
response = await self.send_command({
"type": f"subscribe_{event_type}"
})
if not response or not response.get("success"):
return False
# Start listening for events in background
asyncio.create_task(self._event_listener(event_type, callback))
return True
async def _event_listener(self, event_type: str, callback: Callable[[dict], None]):
"""Background listener for subscribed events"""
try:
while self.connected:
try:
data = await asyncio.wait_for(
self._reader.readline(),
timeout=1.0
)
if data:
event = json.loads(data.decode().strip())
if event.get("event_type") == event_type:
callback(event)
except asyncio.TimeoutError:
continue
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Event listener error: {e}")
class SyncSocketClient:
"""Synchronous wrapper for SocketClient (for Gradio callbacks)"""
def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._client: Optional[SocketClient] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
def _get_loop(self) -> asyncio.AbstractEventLoop:
"""Get or create event loop"""
try:
return asyncio.get_running_loop()
except RuntimeError:
if self._loop is None or self._loop.is_closed():
self._loop = asyncio.new_event_loop()
return self._loop
def _run(self, coro):
"""Run coroutine synchronously"""
loop = self._get_loop()
try:
return loop.run_until_complete(coro)
except RuntimeError:
# Already running in async context, use nest_asyncio pattern
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, coro)
return future.result(timeout=self.timeout + 1)
def connect(self) -> bool:
self._client = SocketClient(self.host, self.port, self.timeout)
return self._run(self._client.connect())
def disconnect(self):
if self._client:
self._run(self._client.disconnect())
def send_command(self, command: dict) -> Optional[dict]:
if not self._client:
self._client = SocketClient(self.host, self.port, self.timeout)
return self._run(self._client.send_command(command))
def ping(self) -> bool:
if not self._client:
self._client = SocketClient(self.host, self.port, self.timeout)
return self._run(self._client.ping())
|