AI-agent / aura /core /ipc.py
AURA Sync Bot
chore: deploy to HuggingFace Space
999bb04
"""Unix socket IPC server for AURA."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Awaitable, Callable
from .platform import supports_unix_sockets
RequestHandler = Callable[[str], Awaitable[str] | str]
@dataclass(slots=True)
class IPCResult:
"""Structured result from IPC operations."""
ok: bool
message: str
details: dict[str, str] | None = None
class UnixSocketServer:
"""Minimal async Unix socket server."""
def __init__(self, socket_path: str | Path, handler: RequestHandler | None = None) -> None:
self.socket_path = Path(socket_path)
self.handler = handler or (lambda message: message)
self._server: asyncio.base_events.Server | None = None
async def start(self) -> IPCResult:
"""Start the socket server."""
if not supports_unix_sockets():
return IPCResult(ok=False, message="unix-sockets-unsupported")
try:
self.socket_path.parent.mkdir(parents=True, exist_ok=True)
if self.socket_path.exists():
self.socket_path.unlink()
self._server = await asyncio.start_unix_server(self._handle_client, path=str(self.socket_path))
return IPCResult(ok=True, message="ipc-started", details={"path": str(self.socket_path)})
except Exception as exc:
return IPCResult(ok=False, message=str(exc), details={"path": str(self.socket_path)})
async def stop(self) -> IPCResult:
"""Stop the socket server."""
try:
if self._server is not None:
self._server.close()
await self._server.wait_closed()
self._server = None
if self.socket_path.exists():
self.socket_path.unlink()
return IPCResult(ok=True, message="ipc-stopped", details={"path": str(self.socket_path)})
except Exception as exc:
return IPCResult(ok=False, message=str(exc), details={"path": str(self.socket_path)})
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
data = await reader.readline()
message = data.decode("utf-8").rstrip("\n")
response = self.handler(message)
if asyncio.iscoroutine(response):
response = await response
writer.write((str(response) + "\n").encode("utf-8"))
await writer.drain()
writer.close()
await writer.wait_closed()