from fastapi import FastAPI, WebSocket from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse import subprocess import asyncio import os from threading import Thread from queue import Queue, Empty os.environ['LD_LIBRARY_PATH'] = '/engines/maia:' + os.environ.get('LD_LIBRARY_PATH', '') def enqueue_output(out, queue): for line in iter(out.readline, b''): queue.put(line) out.close() class EngineChess: def __init__(self, path_engine): self.path_engine = path_engine self._stockfish = subprocess.Popen( path_engine, universal_newlines=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) self.queueOutput = Queue() self.thread = Thread(target=enqueue_output, args=(self._stockfish.stdout, self.queueOutput)) self.thread.daemon = True # thread dies with the program self.thread.start() self._has_quit_command_been_sent = False self._debug_view = True def _put(self, command): if not self._stockfish.stdin: raise BrokenPipeError() if self._stockfish.poll() is None and not self._has_quit_command_been_sent: self._stockfish.stdin.write(f"{command}\n") self._stockfish.stdin.flush() if command == "quit": self._has_quit_command_been_sent = True def _read_line(self) -> str: if not self._stockfish.stdout: raise BrokenPipeError() if self._stockfish.poll() is not None: raise StockfishException("The Stockfish process has crashed") try: line = self.queueOutput.get_nowait() # or q.get(timeout=.1) except Empty: return "" if self._debug_view: print(f"Engine {self.path_engine[0]}:", line.strip()) return line.strip() def _is_ready(self) -> None: self._put("isready") while self._read_line() != "readyok": pass def put(self, cmd): return self._put(cmd) def read_line(self) -> str: return self._read_line() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.websocket("/stockfish-{version}") async def websocket_endpoint(websocket: WebSocket, version: str): await websocket.accept() stockfish = EngineChess([f"engines/stockfish/stockfish-{version}-uci"]) async def read_from_socket(websocket: WebSocket): async for data in websocket.iter_text(): print(f"Stockfish Client: {data}") stockfish.put(data) asyncio.create_task(read_from_socket(websocket)) while True: while True: res = stockfish.read_line() if res: await websocket.send_text(f"{res}") else: break await asyncio.sleep(0.1) @app.websocket("/maia-{elo}") async def websocket_endpoint(websocket: WebSocket, elo: str): await websocket.accept() stockfish = EngineChess(["./engines/maia/lc0", "--backend=trivial", f"--weights=./engines/maia-{elo}.pb.gz"]) async def read_from_socket(websocket: WebSocket): async for data in websocket.iter_text(): print(f"Maia Client: {data}") stockfish.put(data) asyncio.create_task(read_from_socket(websocket)) while True: while True: res = stockfish.read_line() if res: await websocket.send_text(f"{res}") else: break await asyncio.sleep(0.1) ######################## async def get_prompt(): # Get current working directory cwd = os.getcwd() # Shorten path if it is too long if len(cwd) > 30: cwd = os.path.basename(cwd) cwd = f".../{cwd}" return f"{cwd}$ " async def exec_command(command): try: # Execute command using subprocess process = await asyncio.create_subprocess_shell( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash' ) # Wait for command to complete stdout, stderr = await process.communicate() # Prepare output to send back output = stdout.decode() + stderr.decode() except Exception as e: output = str(e) return output async def send_prompt(websocket): # Send initial prompt to client prompt = await get_prompt() await websocket.send_text(prompt) @app.websocket("/shell") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() await send_prompt(websocket) try: while True: # Receive command from client command = await websocket.receive_text() if command == "exit": await websocket.send_text("Goodbye!") break # Execute command output = await exec_command(command) # Send output to client await websocket.send_text(output) # Send updated prompt to client await send_prompt(websocket) except websockets.exceptions.ConnectionClosedError: pass