Spaces:
Sleeping
Sleeping
| """ | |
| Kariana Unified UI - Async Socket Client | |
| Handles communication with KarianaUMCP socket server | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from typing import Any, Optional, Callable | |
| logger = logging.getLogger(__name__) | |
| class SocketClient: | |
| """Async socket client for communicating with KarianaUMCP""" | |
| def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0): | |
| self.host = host | |
| self.port = port | |
| self.timeout = timeout | |
| self._reader: Optional[asyncio.StreamReader] = None | |
| self._writer: Optional[asyncio.StreamWriter] = None | |
| self._connected = False | |
| self._lock = asyncio.Lock() | |
| def connected(self) -> bool: | |
| return self._connected and self._writer is not None | |
| async def connect(self) -> bool: | |
| """Establish connection to socket server""" | |
| try: | |
| self._reader, self._writer = await asyncio.wait_for( | |
| asyncio.open_connection(self.host, self.port), | |
| timeout=self.timeout | |
| ) | |
| self._connected = True | |
| logger.info(f"Connected to {self.host}:{self.port}") | |
| return True | |
| except asyncio.TimeoutError: | |
| logger.warning(f"Connection timeout to {self.host}:{self.port}") | |
| return False | |
| except ConnectionRefusedError: | |
| logger.warning(f"Connection refused at {self.host}:{self.port}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Connection error: {e}") | |
| return False | |
| async def disconnect(self): | |
| """Close connection""" | |
| if self._writer: | |
| try: | |
| self._writer.close() | |
| await self._writer.wait_closed() | |
| except Exception as e: | |
| logger.error(f"Error closing connection: {e}") | |
| finally: | |
| self._writer = None | |
| self._reader = None | |
| self._connected = False | |
| async def send_command(self, command: dict) -> Optional[dict]: | |
| """Send a command and receive response""" | |
| async with self._lock: | |
| if not self.connected: | |
| if not await self.connect(): | |
| return None | |
| try: | |
| # Send command as JSON with newline delimiter | |
| message = json.dumps(command) + "\n" | |
| self._writer.write(message.encode()) | |
| await self._writer.drain() | |
| # Read response (newline-delimited JSON) | |
| response_data = await asyncio.wait_for( | |
| self._reader.readline(), | |
| timeout=self.timeout | |
| ) | |
| if not response_data: | |
| logger.warning("Empty response received") | |
| return None | |
| return json.loads(response_data.decode().strip()) | |
| except asyncio.TimeoutError: | |
| logger.warning("Response timeout") | |
| await self.disconnect() | |
| return None | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Invalid JSON response: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Command error: {e}") | |
| await self.disconnect() | |
| return None | |
| async def ping(self) -> bool: | |
| """Test connection with ping command""" | |
| response = await self.send_command({"type": "ping"}) | |
| return response is not None and response.get("status") == "ok" | |
| async def subscribe(self, event_type: str, callback: Callable[[dict], None]): | |
| """Subscribe to real-time events (logs, etc.)""" | |
| # Subscribe command | |
| response = await self.send_command({ | |
| "type": f"subscribe_{event_type}" | |
| }) | |
| if not response or not response.get("success"): | |
| return False | |
| # Start listening for events in background | |
| asyncio.create_task(self._event_listener(event_type, callback)) | |
| return True | |
| async def _event_listener(self, event_type: str, callback: Callable[[dict], None]): | |
| """Background listener for subscribed events""" | |
| try: | |
| while self.connected: | |
| try: | |
| data = await asyncio.wait_for( | |
| self._reader.readline(), | |
| timeout=1.0 | |
| ) | |
| if data: | |
| event = json.loads(data.decode().strip()) | |
| if event.get("event_type") == event_type: | |
| callback(event) | |
| except asyncio.TimeoutError: | |
| continue | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logger.error(f"Event listener error: {e}") | |
| class SyncSocketClient: | |
| """Synchronous wrapper for SocketClient (for Gradio callbacks)""" | |
| def __init__(self, host: str = "localhost", port: int = 9877, timeout: float = 5.0): | |
| self.host = host | |
| self.port = port | |
| self.timeout = timeout | |
| self._client: Optional[SocketClient] = None | |
| self._loop: Optional[asyncio.AbstractEventLoop] = None | |
| def _get_loop(self) -> asyncio.AbstractEventLoop: | |
| """Get or create event loop""" | |
| try: | |
| return asyncio.get_running_loop() | |
| except RuntimeError: | |
| if self._loop is None or self._loop.is_closed(): | |
| self._loop = asyncio.new_event_loop() | |
| return self._loop | |
| def _run(self, coro): | |
| """Run coroutine synchronously""" | |
| loop = self._get_loop() | |
| try: | |
| return loop.run_until_complete(coro) | |
| except RuntimeError: | |
| # Already running in async context, use nest_asyncio pattern | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as pool: | |
| future = pool.submit(asyncio.run, coro) | |
| return future.result(timeout=self.timeout + 1) | |
| def connect(self) -> bool: | |
| self._client = SocketClient(self.host, self.port, self.timeout) | |
| return self._run(self._client.connect()) | |
| def disconnect(self): | |
| if self._client: | |
| self._run(self._client.disconnect()) | |
| def send_command(self, command: dict) -> Optional[dict]: | |
| if not self._client: | |
| self._client = SocketClient(self.host, self.port, self.timeout) | |
| return self._run(self._client.send_command(command)) | |
| def ping(self) -> bool: | |
| if not self._client: | |
| self._client = SocketClient(self.host, self.port, self.timeout) | |
| return self._run(self._client.ping()) | |