KarianaUMCP / utils /socket_client.py
barlowski's picture
Upload folder using huggingface_hub
22d587c verified
"""
Kariana Unified UI - Async Socket Client
Handles communication with KarianaUMCP socket server
"""
import asyncio
import json
import logging
from typing import Any, Optional, Callable
logger = logging.getLogger(__name__)
class SocketClient:
"""Async socket client for communicating with KarianaUMCP"""
def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._connected = False
self._lock = asyncio.Lock()
@property
def connected(self) -> bool:
return self._connected and self._writer is not None
async def connect(self) -> bool:
"""Establish connection to socket server"""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=self.timeout
)
self._connected = True
logger.info(f"Connected to {self.host}:{self.port}")
return True
except asyncio.TimeoutError:
logger.warning(f"Connection timeout to {self.host}:{self.port}")
return False
except ConnectionRefusedError:
logger.warning(f"Connection refused at {self.host}:{self.port}")
return False
except Exception as e:
logger.error(f"Connection error: {e}")
return False
async def disconnect(self):
"""Close connection"""
if self._writer:
try:
self._writer.close()
await self._writer.wait_closed()
except Exception as e:
logger.error(f"Error closing connection: {e}")
finally:
self._writer = None
self._reader = None
self._connected = False
async def send_command(self, command: dict) -> Optional[dict]:
"""Send a command and receive response"""
async with self._lock:
if not self.connected:
if not await self.connect():
return None
try:
# Send command as JSON with newline delimiter
message = json.dumps(command) + "\n"
self._writer.write(message.encode())
await self._writer.drain()
# Read response (newline-delimited JSON)
response_data = await asyncio.wait_for(
self._reader.readline(),
timeout=self.timeout
)
if not response_data:
logger.warning("Empty response received")
return None
return json.loads(response_data.decode().strip())
except asyncio.TimeoutError:
logger.warning("Response timeout")
await self.disconnect()
return None
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response: {e}")
return None
except Exception as e:
logger.error(f"Command error: {e}")
await self.disconnect()
return None
async def ping(self) -> bool:
"""Test connection with ping command"""
response = await self.send_command({"type": "ping"})
return response is not None and response.get("status") == "ok"
async def subscribe(self, event_type: str, callback: Callable[[dict], None]):
"""Subscribe to real-time events (logs, etc.)"""
# Subscribe command
response = await self.send_command({
"type": f"subscribe_{event_type}"
})
if not response or not response.get("success"):
return False
# Start listening for events in background
asyncio.create_task(self._event_listener(event_type, callback))
return True
async def _event_listener(self, event_type: str, callback: Callable[[dict], None]):
"""Background listener for subscribed events"""
try:
while self.connected:
try:
data = await asyncio.wait_for(
self._reader.readline(),
timeout=1.0
)
if data:
event = json.loads(data.decode().strip())
if event.get("event_type") == event_type:
callback(event)
except asyncio.TimeoutError:
continue
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Event listener error: {e}")
class SyncSocketClient:
"""Synchronous wrapper for SocketClient (for Gradio callbacks)"""
def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0):
self.host = host
self.port = port
self.timeout = timeout
self._client: Optional[SocketClient] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
def _get_loop(self) -> asyncio.AbstractEventLoop:
"""Get or create event loop"""
try:
return asyncio.get_running_loop()
except RuntimeError:
if self._loop is None or self._loop.is_closed():
self._loop = asyncio.new_event_loop()
return self._loop
def _run(self, coro):
"""Run coroutine synchronously"""
loop = self._get_loop()
try:
return loop.run_until_complete(coro)
except RuntimeError:
# Already running in async context, use nest_asyncio pattern
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, coro)
return future.result(timeout=self.timeout + 1)
def connect(self) -> bool:
self._client = SocketClient(self.host, self.port, self.timeout)
return self._run(self._client.connect())
def disconnect(self):
if self._client:
self._run(self._client.disconnect())
def send_command(self, command: dict) -> Optional[dict]:
if not self._client:
self._client = SocketClient(self.host, self.port, self.timeout)
return self._run(self._client.send_command(command))
def ping(self) -> bool:
if not self._client:
self._client = SocketClient(self.host, self.port, self.timeout)
return self._run(self._client.ping())