Spaces:
Runtime error
Runtime error
| import io | |
| import json | |
| import threading | |
| from queue import Queue | |
| from subprocess import PIPE, Popen | |
| class RPC(object): | |
| def __init__(self, cmd): | |
| self.cmd = cmd | |
| self.start() | |
| super(RPC, self).__init__() | |
| def start(self): | |
| self._proc: Popen = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
| assert self._proc.stdin | |
| assert self._proc.stdout | |
| assert self._proc.stderr | |
| self.stdin = io.open(self._proc.stdin.fileno(), "wb") | |
| self.stdout = io.open(self._proc.stdout.fileno(), "rb") | |
| self.stderr = io.open(self._proc.stderr.fileno(), "rb") | |
| self._queue = Queue() | |
| self.watchdog_stdout = Watchdog(self, name="stdout") | |
| self.watchdog_stderr = Watchdog(self, name="stderr") | |
| def stop(self): | |
| if self.stdin and not self.stdin.closed: | |
| self.stdin.close() | |
| if self.stdout and not self.stdout.closed: | |
| self.stdout.close() | |
| if self.stderr and not self.stderr.closed: | |
| self.stderr.close() | |
| self.watchdog_stdout.stop() | |
| self.watchdog_stderr.stop() | |
| self._proc.terminate() | |
| self._proc.wait() | |
| self._proc.kill() | |
| def restart(self): | |
| self.stop() | |
| self.start() | |
| def request(self, cmd): | |
| self._write(cmd) | |
| return self._queue.get() | |
| def _handle_stdout(self, resp): | |
| try: | |
| self._queue.put((None, json.loads(resp))) | |
| except: | |
| self._queue.put((None, resp)) | |
| def _handle_stderr(self, resp): | |
| self._queue.put((resp, None)) | |
| def _write(self, s): | |
| req = json.dumps(s) | |
| req = req + "\0" | |
| try: | |
| self.stdin.write(bytearray(req, "utf-8")) | |
| self.stdin.flush() | |
| except: | |
| pass | |
| class Watchdog(threading.Thread): | |
| def __init__(self, rpc, name="watchdog", interval=0.1): | |
| super(Watchdog, self).__init__() | |
| if name == "stderr": | |
| self.stream = rpc.stderr | |
| self.handle = rpc._handle_stderr | |
| elif name == "stdout": | |
| self.stream = rpc.stdout | |
| self.handle = rpc._handle_stdout | |
| # store attributes | |
| self.rpc = rpc | |
| self.name = name | |
| self.interval = interval | |
| self.daemon = True | |
| # register a stop event | |
| self._stop = threading.Event() | |
| self.start() | |
| def start(self): | |
| super(Watchdog, self).start() | |
| def stop(self): | |
| self._stop.set() | |
| def run(self): | |
| # reset the stop event | |
| self._stop.clear() | |
| # stop here when stream is not set or closed | |
| if not self.stream or self.stream.closed: | |
| return | |
| # read new incoming lines | |
| while not self._stop.is_set(): | |
| resp = None | |
| # stop when stream is closed | |
| if self.stream.closed: | |
| break | |
| try: | |
| resp = "" | |
| while True: | |
| c = self.stream.read(1).decode("utf-8") | |
| if c == "\x00" and self.name == "stdout": | |
| break | |
| elif c == "\n" and self.name == "stderr": | |
| break | |
| elif not c: | |
| # EOF | |
| break | |
| else: | |
| resp = resp + c | |
| except IOError: | |
| # prevent residual race conditions occurring when stream is closed externally | |
| pass | |
| self.handle(resp) | |
| self._stop.wait(self.interval) | |