""" Kariana Unified UI - Async Socket Client Handles communication with KarianaUMCP socket server """ import asyncio import json import logging from typing import Any, Optional, Callable logger = logging.getLogger(__name__) class SocketClient: """Async socket client for communicating with KarianaUMCP""" def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0): self.host = host self.port = port self.timeout = timeout self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None self._connected = False self._lock = asyncio.Lock() @property def connected(self) -> bool: return self._connected and self._writer is not None async def connect(self) -> bool: """Establish connection to socket server""" try: self._reader, self._writer = await asyncio.wait_for( asyncio.open_connection(self.host, self.port), timeout=self.timeout ) self._connected = True logger.info(f"Connected to {self.host}:{self.port}") return True except asyncio.TimeoutError: logger.warning(f"Connection timeout to {self.host}:{self.port}") return False except ConnectionRefusedError: logger.warning(f"Connection refused at {self.host}:{self.port}") return False except Exception as e: logger.error(f"Connection error: {e}") return False async def disconnect(self): """Close connection""" if self._writer: try: self._writer.close() await self._writer.wait_closed() except Exception as e: logger.error(f"Error closing connection: {e}") finally: self._writer = None self._reader = None self._connected = False async def send_command(self, command: dict) -> Optional[dict]: """Send a command and receive response""" async with self._lock: if not self.connected: if not await self.connect(): return None try: # Send command as JSON with newline delimiter message = json.dumps(command) + "\n" self._writer.write(message.encode()) await self._writer.drain() # Read response (newline-delimited JSON) response_data = await asyncio.wait_for( self._reader.readline(), timeout=self.timeout ) if not response_data: logger.warning("Empty response received") return None return json.loads(response_data.decode().strip()) except asyncio.TimeoutError: logger.warning("Response timeout") await self.disconnect() return None except json.JSONDecodeError as e: logger.error(f"Invalid JSON response: {e}") return None except Exception as e: logger.error(f"Command error: {e}") await self.disconnect() return None async def ping(self) -> bool: """Test connection with ping command""" response = await self.send_command({"type": "ping"}) return response is not None and response.get("status") == "ok" async def subscribe(self, event_type: str, callback: Callable[[dict], None]): """Subscribe to real-time events (logs, etc.)""" # Subscribe command response = await self.send_command({ "type": f"subscribe_{event_type}" }) if not response or not response.get("success"): return False # Start listening for events in background asyncio.create_task(self._event_listener(event_type, callback)) return True async def _event_listener(self, event_type: str, callback: Callable[[dict], None]): """Background listener for subscribed events""" try: while self.connected: try: data = await asyncio.wait_for( self._reader.readline(), timeout=1.0 ) if data: event = json.loads(data.decode().strip()) if event.get("event_type") == event_type: callback(event) except asyncio.TimeoutError: continue except json.JSONDecodeError: continue except Exception as e: logger.error(f"Event listener error: {e}") class SyncSocketClient: """Synchronous wrapper for SocketClient (for Gradio callbacks)""" def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0): self.host = host self.port = port self.timeout = timeout self._client: Optional[SocketClient] = None self._loop: Optional[asyncio.AbstractEventLoop] = None def _get_loop(self) -> asyncio.AbstractEventLoop: """Get or create event loop""" try: return asyncio.get_running_loop() except RuntimeError: if self._loop is None or self._loop.is_closed(): self._loop = asyncio.new_event_loop() return self._loop def _run(self, coro): """Run coroutine synchronously""" loop = self._get_loop() try: return loop.run_until_complete(coro) except RuntimeError: # Already running in async context, use nest_asyncio pattern import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as pool: future = pool.submit(asyncio.run, coro) return future.result(timeout=self.timeout + 1) def connect(self) -> bool: self._client = SocketClient(self.host, self.port, self.timeout) return self._run(self._client.connect()) def disconnect(self): if self._client: self._run(self._client.disconnect()) def send_command(self, command: dict) -> Optional[dict]: if not self._client: self._client = SocketClient(self.host, self.port, self.timeout) return self._run(self._client.send_command(command)) def ping(self) -> bool: if not self._client: self._client = SocketClient(self.host, self.port, self.timeout) return self._run(self._client.ping())