Spaces:
Running
Running
File size: 2,972 Bytes
c03517d eee6092 c03517d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
import subprocess
import asyncio
from threading import Thread
from queue import Queue, Empty
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
class EngineChess:
def __init__(self, path_engine):
self._stockfish = subprocess.Popen(
path_engine,
universal_newlines=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
self.queueOutput = Queue()
self.thread = Thread(target=enqueue_output, args=(self._stockfish.stdout, self.queueOutput))
self.thread.daemon = True # thread dies with the program
self.thread.start()
self._has_quit_command_been_sent = False
self._debug_view = False
def _put(self, command):
if not self._stockfish.stdin:
raise BrokenPipeError()
if self._stockfish.poll() is None and not self._has_quit_command_been_sent:
if self._debug_view:
print(f">>> {command}\n")
self._stockfish.stdin.write(f"{command}\n")
self._stockfish.stdin.flush()
if command == "quit":
self._has_quit_command_been_sent = True
def _read_line(self) -> str:
if not self._stockfish.stdout:
raise BrokenPipeError()
if self._stockfish.poll() is not None:
raise StockfishException("The Stockfish process has crashed")
try:
line = self.queueOutput.get_nowait() # or q.get(timeout=.1)
except Empty:
return ""
if self._debug_view:
print(line.strip())
return line.strip()
def _is_ready(self) -> None:
self._put("isready")
while self._read_line() != "readyok":
pass
def put(self, cmd):
return self._put(cmd)
def read_line(self) -> str:
return self._read_line()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.websocket("/stockfish-{version}")
async def websocket_endpoint(websocket: WebSocket, version: str):
await websocket.accept()
stockfish = EngineChess(f"engines/stockfish-{version}-uci")
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_text():
print(f"Client: {data}")
stockfish.put(data)
asyncio.create_task(read_from_socket(websocket))
while True:
while True:
res = stockfish.read_line()
if res != "":
await websocket.send_text(f"{res}")
else:
break
await asyncio.sleep(0.1) |