engine / main.py
bettermint's picture
Update main.py
c2bf34e verified
raw
history blame
5.45 kB
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
import subprocess
import asyncio
import os
from threading import Thread
from queue import Queue, Empty
os.environ['LD_LIBRARY_PATH'] = '/engines/maia:' + os.environ.get('LD_LIBRARY_PATH', '')
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
class EngineChess:
def __init__(self, path_engine):
self.path_engine = path_engine
self._stockfish = subprocess.Popen(
path_engine,
universal_newlines=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self.queueOutput = Queue()
self.thread = Thread(target=enqueue_output, args=(self._stockfish.stdout, self.queueOutput))
self.thread.daemon = True # thread dies with the program
self.thread.start()
self._has_quit_command_been_sent = False
self._debug_view = True
def _put(self, command):
if not self._stockfish.stdin:
raise BrokenPipeError()
if self._stockfish.poll() is None and not self._has_quit_command_been_sent:
self._stockfish.stdin.write(f"{command}\n")
self._stockfish.stdin.flush()
if command == "quit":
self._has_quit_command_been_sent = True
def _read_line(self) -> str:
if not self._stockfish.stdout:
raise BrokenPipeError()
if self._stockfish.poll() is not None:
raise StockfishException("The Stockfish process has crashed")
try:
line = self.queueOutput.get_nowait() # or q.get(timeout=.1)
except Empty:
return ""
if self._debug_view:
print(f"Engine {self.path_engine[0]}:", line.strip())
return line.strip()
def _is_ready(self) -> None:
self._put("isready")
while self._read_line() != "readyok":
pass
def put(self, cmd):
return self._put(cmd)
def read_line(self) -> str:
return self._read_line()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.websocket("/stockfish-{version}")
async def websocket_endpoint(websocket: WebSocket, version: str):
await websocket.accept()
stockfish = EngineChess([f"engines/stockfish/stockfish-{version}-uci"])
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_text():
print(f"Stockfish Client: {data}")
stockfish.put(data)
asyncio.create_task(read_from_socket(websocket))
while True:
while True:
res = stockfish.read_line()
if res:
await websocket.send_text(f"{res}")
else:
break
await asyncio.sleep(0.1)
@app.websocket("/maia-{elo}")
async def websocket_endpoint(websocket: WebSocket, elo: str):
await websocket.accept()
stockfish = EngineChess(["./engines/maia/lc0", "--backend=trivial", f"--weights=./engines/maia-{elo}.pb.gz"])
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_text():
print(f"Maia Client: {data}")
stockfish.put(data)
asyncio.create_task(read_from_socket(websocket))
while True:
while True:
res = stockfish.read_line()
if res:
await websocket.send_text(f"{res}")
else:
break
await asyncio.sleep(0.1)
########################
async def get_prompt():
# Get current working directory
cwd = os.getcwd()
# Shorten path if it is too long
if len(cwd) > 30:
cwd = os.path.basename(cwd)
cwd = f".../{cwd}"
return f"{cwd}$ "
async def exec_command(command):
try:
# Execute command using subprocess
process = await asyncio.create_subprocess_shell(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
executable='/bin/bash'
)
# Wait for command to complete
stdout, stderr = await process.communicate()
# Prepare output to send back
output = stdout.decode() + stderr.decode()
except Exception as e:
output = str(e)
return output
async def send_prompt(websocket):
# Send initial prompt to client
prompt = await get_prompt()
await websocket.send_text(prompt)
@app.websocket("/shell")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await send_prompt(websocket)
try:
while True:
# Receive command from client
command = await websocket.receive_text()
if command == "exit":
await websocket.send_text("Goodbye!")
break
# Execute command
output = await exec_command(command)
# Send output to client
await websocket.send_text(output)
# Send updated prompt to client
await send_prompt(websocket)
except websockets.exceptions.ConnectionClosedError:
pass