| """Unix socket IPC server for AURA.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Awaitable, Callable |
|
|
| from .platform import supports_unix_sockets |
|
|
| RequestHandler = Callable[[str], Awaitable[str] | str] |
|
|
|
|
| @dataclass(slots=True) |
| class IPCResult: |
| """Structured result from IPC operations.""" |
|
|
| ok: bool |
| message: str |
| details: dict[str, str] | None = None |
|
|
|
|
| class UnixSocketServer: |
| """Minimal async Unix socket server.""" |
|
|
| def __init__(self, socket_path: str | Path, handler: RequestHandler | None = None) -> None: |
| self.socket_path = Path(socket_path) |
| self.handler = handler or (lambda message: message) |
| self._server: asyncio.base_events.Server | None = None |
|
|
| async def start(self) -> IPCResult: |
| """Start the socket server.""" |
|
|
| if not supports_unix_sockets(): |
| return IPCResult(ok=False, message="unix-sockets-unsupported") |
| try: |
| self.socket_path.parent.mkdir(parents=True, exist_ok=True) |
| if self.socket_path.exists(): |
| self.socket_path.unlink() |
| self._server = await asyncio.start_unix_server(self._handle_client, path=str(self.socket_path)) |
| return IPCResult(ok=True, message="ipc-started", details={"path": str(self.socket_path)}) |
| except Exception as exc: |
| return IPCResult(ok=False, message=str(exc), details={"path": str(self.socket_path)}) |
|
|
| async def stop(self) -> IPCResult: |
| """Stop the socket server.""" |
|
|
| try: |
| if self._server is not None: |
| self._server.close() |
| await self._server.wait_closed() |
| self._server = None |
| if self.socket_path.exists(): |
| self.socket_path.unlink() |
| return IPCResult(ok=True, message="ipc-stopped", details={"path": str(self.socket_path)}) |
| except Exception as exc: |
| return IPCResult(ok=False, message=str(exc), details={"path": str(self.socket_path)}) |
|
|
| async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: |
| data = await reader.readline() |
| message = data.decode("utf-8").rstrip("\n") |
| response = self.handler(message) |
| if asyncio.iscoroutine(response): |
| response = await response |
| writer.write((str(response) + "\n").encode("utf-8")) |
| await writer.drain() |
| writer.close() |
| await writer.wait_closed() |
|
|