Amogh1221 commited on
Commit
6d758fe
Β·
verified Β·
1 Parent(s): 64bc90b

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +198 -360
main.py CHANGED
@@ -1,7 +1,7 @@
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from pydantic import BaseModel
4
- from typing import Optional, List, Dict
5
  import os
6
  import math
7
  import chess
@@ -10,83 +10,37 @@ import asyncio
10
  import json
11
  from contextlib import asynccontextmanager
12
 
13
- @asynccontextmanager
14
- async def lifespan(app: FastAPI):
15
- # Startup: Initialize the engine pool
16
- await pool.start()
17
- yield
18
- # Shutdown: Clean up the engine pool
19
- await pool.stop()
20
-
21
- app = FastAPI(title="Deepcastle Engine API", lifespan=lifespan)
22
- class ConnectionManager:
23
- def __init__(self):
24
- self.active_connections: Dict[str, List[WebSocket]] = {}
25
-
26
- async def connect(self, websocket: WebSocket, match_id: str):
27
- await websocket.accept()
28
- if match_id not in self.active_connections:
29
- self.active_connections[match_id] = []
30
- self.active_connections[match_id].append(websocket)
31
-
32
- def disconnect(self, websocket: WebSocket, match_id: str):
33
- if match_id in self.active_connections:
34
- if websocket in self.active_connections[match_id]:
35
- self.active_connections[match_id].remove(websocket)
36
- if not self.active_connections[match_id]:
37
- del self.active_connections[match_id]
38
-
39
- async def broadcast(self, message: str, match_id: str, exclude: WebSocket = None):
40
- if match_id in self.active_connections:
41
- for connection in self.active_connections[match_id]:
42
- if connection != exclude:
43
- try:
44
- await connection.send_text(message)
45
- except Exception:
46
- pass
47
-
48
- manager = ConnectionManager()
49
 
50
- @app.websocket("/ws/{match_id}")
51
- async def websocket_endpoint(websocket: WebSocket, match_id: str):
52
- await manager.connect(websocket, match_id)
53
- room = manager.active_connections.get(match_id, [])
54
- # Notify others that someone joined
55
- await manager.broadcast(json.dumps({"type": "join"}), match_id, exclude=websocket)
56
  try:
57
- while True:
58
- data = await websocket.receive_text()
59
- await manager.broadcast(data, match_id, exclude=websocket)
60
- except WebSocketDisconnect:
61
- manager.disconnect(websocket, match_id)
62
- await manager.broadcast(json.dumps({"type": "opponent_disconnected"}), match_id)
63
  except Exception:
64
- manager.disconnect(websocket, match_id)
65
- await manager.broadcast(json.dumps({"type": "opponent_disconnected"}), match_id)
66
-
67
- app.add_middleware(
68
- CORSMiddleware,
69
- allow_origins=["*"],
70
- allow_methods=["*"],
71
- allow_headers=["*"],
72
- )
73
 
74
- # Paths relative to the Docker container
75
- ENGINE_PATH = os.environ.get("ENGINE_PATH", "/app/engine/deepcastle")
76
- NNUE_PATH = os.environ.get("NNUE_PATH", "/app/engine/output.nnue")
77
 
78
  class MoveRequest(BaseModel):
79
  fen: str
80
- time: float = 1.0 # seconds
81
  depth: Optional[int] = None
82
 
83
  class MoveResponse(BaseModel):
84
  bestmove: str
85
- score: float
86
  depth: int
87
  nodes: int
88
  nps: int
89
- pv: str
90
  mate_in: Optional[int] = None
91
  opening: Optional[str] = None
92
 
@@ -100,196 +54,77 @@ class MoveAnalysis(BaseModel):
100
  move_num: int
101
  san: str
102
  best_move: str
103
- classification: str
104
  opening: Optional[str] = None
105
- cpl: float
106
  score_before: float
107
  score_after: float
108
 
109
  class AnalyzeResponse(BaseModel):
110
- accuracy: float
111
- estimated_elo: int
112
  moves: List[MoveAnalysis]
113
- counts: Dict[str, int]
114
-
115
- @app.get("/")
116
- def home():
117
- return {"status": "online", "engine": "Deepcastle Hybrid Neural", "platform": "Hugging Face Spaces"}
118
 
119
- @app.get("/health")
120
- def health():
121
- if not os.path.exists(ENGINE_PATH):
122
- return {"status": "error", "message": "Engine binary not found"}
123
- return {"status": "ok", "engine": "Deepcastle"}
124
-
125
- class EnginePool:
126
- def __init__(self, size=4):
127
- self.size = size
128
- self.engines = asyncio.Queue()
129
- self.all_engines = []
130
-
131
- async def start(self):
132
- print(f"Initializing engine pool with {self.size} processes...")
133
- for i in range(self.size):
134
- try:
135
- engine = await self._create_engine()
136
- await self.engines.put(engine)
137
- self.all_engines.append(engine)
138
- print(f" [+] Engine {i+1}/{self.size} ready.")
139
- # Give the system some room to breathe between processes
140
- await asyncio.sleep(0.5)
141
- except Exception as e:
142
- print(f" [!] Failed to start engine {i+1}: {e}")
143
-
144
- async def _create_engine(self):
145
- if not os.path.exists(ENGINE_PATH):
146
- raise Exception("Engine binary not found")
147
- transport, engine = await chess.engine.popen_uci(ENGINE_PATH)
148
- if os.path.exists(NNUE_PATH):
149
- try:
150
- # Set Hash to 512 as requested, keep Threads to 1 to avoid CPU stalling
151
- await engine.configure({"EvalFile": NNUE_PATH, "Hash": 512, "Threads": 1})
152
- except Exception:
153
- pass
154
- return engine
155
-
156
- @asynccontextmanager
157
- async def acquire(self):
158
- engine = await self.engines.get()
159
- try:
160
- yield engine
161
- finally:
162
- # Check if engine is still alive, if not, restart it
163
- try:
164
- await self.engines.put(engine)
165
- except Exception:
166
- # If engine is dead, we could restart here, but for now just put back
167
- await self.engines.put(await self._create_engine())
168
-
169
- async def stop(self):
170
- print("Shutting down engine pool...")
171
- for engine in self.all_engines:
172
- try:
173
- await engine.quit()
174
- except:
175
- pass
176
-
177
- pool = EnginePool(size=4)
178
 
179
  def get_normalized_score(info) -> tuple[float, Optional[int]]:
180
- """Returns the score from White's perspective in centipawns."""
181
  if "score" not in info:
182
  return 0.0, None
183
  raw = info["score"].white()
184
  if raw.is_mate():
185
  m = raw.mate() or 0
186
  return (10000.0 if m > 0 else -10000.0), m
187
- return raw.score() or 0.0, None
188
-
189
- # ─── Engine Inference Route ────────────────────────────────────────────────────
190
- @app.post("/move", response_model=MoveResponse)
191
- async def get_move(request: MoveRequest):
192
- try:
193
- async with pool.acquire() as engine:
194
- board = chess.Board(request.fen)
195
- limit = chess.engine.Limit(time=request.time, depth=request.depth)
196
-
197
- result = await engine.play(board, limit)
198
- info = await engine.analyse(board, limit)
199
-
200
- # From White's perspective in CP -> converted to Pawns for UI
201
- score_cp, mate_in = get_normalized_score(info)
202
- depth = info.get("depth", 0)
203
- nodes = info.get("nodes", 0)
204
- nps = info.get("nps", 0)
205
-
206
- pv_board = board.copy()
207
- pv_parts = []
208
- for m in info.get("pv", [])[:5]:
209
- if m in pv_board.legal_moves:
210
- try:
211
- pv_parts.append(pv_board.san(m))
212
- pv_board.push(m)
213
- except Exception:
214
- break
215
- else: break
216
- pv = " ".join(pv_parts)
217
-
218
- score_pawns = score_cp / 100.0 if abs(score_cp) < 9900 else (100.0 if score_cp > 0 else -100.0)
219
- board_fen_only = board.fen().split(" ")[0]
220
- opening_name = openings_db.get(board_fen_only)
221
-
222
- return MoveResponse(
223
- bestmove=result.move.uci(),
224
- score=score_pawns,
225
- depth=depth,
226
- nodes=nodes,
227
- nps=nps,
228
- pv=pv,
229
- mate_in=mate_in,
230
- opening=opening_name
231
- )
232
- except Exception as e:
233
- print(f"Error: {e}")
234
- raise HTTPException(status_code=500, detail=str(e))
235
-
236
-
237
- import math
238
- import json
239
- import os
240
- from typing import Optional, List, Tuple
241
-
242
- openings_db = {}
243
- openings_path = os.path.join(os.path.dirname(__file__), "openings.json")
244
- if os.path.exists(openings_path):
245
- try:
246
- with open(openings_path, "r", encoding="utf-8") as f:
247
- openings_db = json.load(f)
248
- except Exception as e:
249
- pass
250
 
251
  def get_win_percentage_from_cp(cp: int) -> float:
 
252
  cp_ceiled = max(-1000, min(1000, cp))
253
  MULTIPLIER = -0.00368208
254
  win_chances = 2.0 / (1.0 + math.exp(MULTIPLIER * cp_ceiled)) - 1.0
255
  return 50.0 + 50.0 * win_chances
256
 
257
  def get_move_accuracy(win_pct_before: float, win_pct_after: float, is_white_move: bool) -> float:
258
- """Lichess-style win%-based per-move accuracy (0–100)."""
259
  if is_white_move:
260
  diff = win_pct_before - win_pct_after
261
  else:
262
  diff = (100.0 - win_pct_before) - (100.0 - win_pct_after)
263
 
 
264
  accuracy = 103.1668 * math.exp(-0.04354 * max(0.0, diff)) - 3.1669
265
  return max(0.0, min(100.0, accuracy))
266
 
267
  def get_win_percentage(info: dict) -> float:
 
268
  score = info.get("score")
269
- if not score:
270
- return 50.0
271
  white_score = score.white()
272
  if white_score.is_mate():
273
  mate_val = white_score.mate()
274
  return 100.0 if mate_val > 0 else 0.0
275
- return get_win_percentage_from_cp(white_score.score())
276
 
277
- def is_losing_or_alt_winning(pos_win_pct: float, alt_win_pct: float, is_white_move: bool) -> bool:
 
 
 
278
  is_losing = pos_win_pct < 50.0 if is_white_move else pos_win_pct > 50.0
279
  is_alt_winning = alt_win_pct > 97.0 if is_white_move else alt_win_pct < 3.0
280
  return is_losing or is_alt_winning
281
 
282
- def get_has_changed_outcome(last_win_pct: float, pos_win_pct: float, is_white_move: bool) -> bool:
283
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
284
  return diff > 10.0 and ((last_win_pct < 50.0 and pos_win_pct > 50.0) or (last_win_pct > 50.0 and pos_win_pct < 50.0))
285
 
286
- def get_is_only_good_move(pos_win_pct: float, alt_win_pct: float, is_white_move: bool) -> bool:
287
  diff = (pos_win_pct - alt_win_pct) * (1 if is_white_move else -1)
288
  return diff > 10.0
289
 
290
- def is_simple_recapture(fen_two_moves_ago: str, previous_move: chess.Move, played_move: chess.Move) -> bool:
291
- if previous_move.to_square != played_move.to_square:
292
- return False
293
  b = chess.Board(fen_two_moves_ago)
294
  return b.piece_at(previous_move.to_square) is not None
295
 
@@ -299,229 +134,232 @@ def get_material_difference(board: chess.Board) -> int:
299
  b = sum(values.get(p.piece_type, 0) for p in board.piece_map().values() if p.color == chess.BLACK)
300
  return w - b
301
 
302
- def get_is_piece_sacrifice(board: chess.Board, played_move: chess.Move, best_pv: list) -> bool:
303
- if not best_pv:
304
- return False
305
  start_diff = get_material_difference(board)
306
  white_to_play = board.turn == chess.WHITE
307
-
308
  sim_board = board.copy()
309
  moves = [played_move] + best_pv
310
- if len(moves) % 2 == 1:
311
- moves = moves[:-1]
312
-
313
- captured_w = []
314
- captured_b = []
315
- non_capturing = 1
316
-
317
  for m in moves:
318
  if m in sim_board.legal_moves:
319
  captured_piece = sim_board.piece_at(m.to_square)
320
- if sim_board.is_en_passant(m):
321
- captured_piece = chess.Piece(chess.PAWN, not sim_board.turn)
322
-
323
  if captured_piece:
324
- if sim_board.turn == chess.WHITE:
325
- captured_b.append(captured_piece.piece_type)
326
- else:
327
- captured_w.append(captured_piece.piece_type)
328
- non_capturing = 1
329
- else:
330
- non_capturing -= 1
331
- if non_capturing < 0:
332
- break
333
  sim_board.push(m)
334
- else:
335
- break
336
-
337
  for p in captured_w[:]:
338
  if p in captured_b:
339
- captured_w.remove(p)
340
- captured_b.remove(p)
341
-
342
  if abs(len(captured_w) - len(captured_b)) <= 1 and all(p == chess.PAWN for p in captured_w + captured_b):
343
  return False
344
-
345
  end_diff = get_material_difference(sim_board)
346
  mat_diff = end_diff - start_diff
347
  player_rel = mat_diff if white_to_play else -mat_diff
348
-
349
  return player_rel < 0
350
 
351
- def get_move_classification(
352
- last_win_pct: float,
353
- pos_win_pct: float,
354
- is_white_move: bool,
355
- played_move: chess.Move,
356
- best_move_before: chess.Move,
357
- alt_win_pct: Optional[float],
358
- fen_two_moves_ago: Optional[str],
359
- uci_next_two_moves: Optional[Tuple[chess.Move, chess.Move]],
360
- board_before_move: chess.Board,
361
- best_pv_after: list
362
- ) -> str:
363
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
364
-
365
  if alt_win_pct is not None and diff >= -2.0:
366
  if get_is_piece_sacrifice(board_before_move, played_move, best_pv_after):
367
- if not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move):
368
- return "Brilliant"
369
-
370
- if alt_win_pct is not None and diff >= -2.0:
371
  is_recapture = False
372
  if fen_two_moves_ago and uci_next_two_moves:
373
  is_recapture = is_simple_recapture(fen_two_moves_ago, uci_next_two_moves[0], uci_next_two_moves[1])
374
-
375
  if not is_recapture and not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move):
376
- if get_has_changed_outcome(last_win_pct, pos_win_pct, is_white_move) or get_is_only_good_move(pos_win_pct, alt_win_pct, is_white_move):
377
- return "Great"
378
-
379
- if best_move_before and played_move == best_move_before:
380
- return "Best"
381
-
382
  if diff < -20.0: return "Blunder"
383
  if diff < -10.0: return "Mistake"
384
  if diff < -5.0: return "Inaccuracy"
385
  if diff < -2.0: return "Good"
386
  return "Excellent"
387
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
388
  @app.post("/analyze-game", response_model=AnalyzeResponse)
389
  async def analyze_game(request: AnalyzeRequest):
 
390
  try:
391
  async with pool.acquire() as engine:
392
  board = chess.Board(request.start_fen) if request.start_fen else chess.Board()
393
  limit = chess.engine.Limit(time=request.time_per_move)
394
-
395
  analysis_results = []
396
  infos_before = await engine.analyse(board, limit, multipv=2)
397
  infos_before = infos_before if isinstance(infos_before, list) else [infos_before]
398
-
399
- counts = {
400
- "Book": 0, "Brilliant": 0, "Great": 0, "Best": 0,
401
- "Excellent": 0, "Good": 0, "Inaccuracy": 0,
402
- "Mistake": 0, "Blunder": 0
403
- }
404
-
405
  player_is_white = (request.player_color.lower() == "white")
406
- fen_history = [board.fen()]
407
- move_history = []
408
- player_move_accuracies: List[float] = []
409
- player_cpls: List[float] = [] # keep for estimated_elo
410
  current_score, _ = get_normalized_score(infos_before[0])
411
-
412
  for i, san_move in enumerate(request.moves):
413
  is_white_turn = board.turn == chess.WHITE
414
  is_player_turn = is_white_turn if player_is_white else not is_white_turn
415
-
416
  score_before = current_score
417
- try:
418
- move = board.parse_san(san_move)
419
- except Exception:
420
- break
421
-
422
  info_dict = infos_before[0]
423
- pv_list = info_dict.get("pv", [])
424
- best_move_before = pv_list[0] if pv_list else None
425
-
426
- score_before, _ = get_normalized_score(info_dict)
427
  win_pct_before = get_win_percentage(info_dict)
428
- alt_win_pct_before: Optional[float] = None
429
  if len(infos_before) > 1:
430
  for line in infos_before:
431
  if line.get("pv") and line.get("pv")[0] != move:
432
- alt_win_pct_before = get_win_percentage(line)
433
- break
434
-
435
  board_before_move = board.copy()
436
- board.push(move)
437
-
438
- move_history.append(move)
439
- fen_history.append(board.fen())
440
-
441
  infos_after_raw = await engine.analyse(board, limit, multipv=2)
442
- infos_after: List[dict] = infos_after_raw if isinstance(infos_after_raw, list) else [infos_after_raw]
443
-
444
- info_after_dict: dict = infos_after[0]
445
- win_pct_after = get_win_percentage(info_after_dict)
446
- score_after, _ = get_normalized_score(info_after_dict)
447
  current_score = score_after
448
- best_pv_after = info_after_dict.get("pv", [])
449
-
450
- fen_two_moves_ago = None
451
- uci_next_two_moves = None
452
- if len(move_history) >= 2:
453
- fen_two_moves_ago = fen_history[-3]
454
- uci_next_two_moves = (move_history[-2], move_history[-1])
455
-
456
- cls = "Book"
457
- opening_name = None
458
  board_fen_only = board.fen().split(" ")[0]
459
  if board_fen_only in openings_db:
460
- cls = "Book"
461
- opening_name = openings_db[board_fen_only]
462
  else:
463
- cls = get_move_classification(
464
- last_win_pct=win_pct_before,
465
- pos_win_pct=win_pct_after,
466
- is_white_move=is_white_turn,
467
- played_move=move,
468
- best_move_before=best_move_before,
469
- alt_win_pct=alt_win_pct_before,
470
- fen_two_moves_ago=fen_two_moves_ago,
471
- uci_next_two_moves=uci_next_two_moves,
472
- board_before_move=board_before_move,
473
- best_pv_after=best_pv_after
474
- )
475
-
476
  move_gain = score_after - score_before if is_white_turn else score_before - score_after
477
  cpl = max(0.0, min(1000.0, -move_gain))
478
-
479
- # Lichess-style per-move accuracy using win%
480
  move_acc = get_move_accuracy(win_pct_before, win_pct_after, is_white_turn)
481
-
482
  if is_player_turn:
483
- player_move_accuracies.append(move_acc)
484
- player_cpls.append(cpl)
485
  counts[cls] = counts.get(cls, 0) + 1
486
-
487
- analysis_results.append(MoveAnalysis(
488
- move_num=i+1,
489
- san=san_move,
490
- fen=board.fen(),
491
- classification=cls,
492
- cpl=float(cpl),
493
- score_before=float(score_before / 100.0),
494
- score_after=float(score_after / 100.0),
495
- best_move=best_move_before.uci() if best_move_before else "",
496
- opening=opening_name
497
- ))
498
  infos_before = infos_after
499
-
500
- # NEW β€” Lichess win%-based accuracy
501
  if player_move_accuracies:
502
- # Lichess uses harmonic mean blended with arithmetic mean
503
- arithmetic_mean = sum(player_move_accuracies) / len(player_move_accuracies)
504
- harmonic_mean = len(player_move_accuracies) / sum(1.0 / max(a, 0.1) for a in player_move_accuracies)
505
- accuracy = (arithmetic_mean + harmonic_mean) / 2.0
506
- else:
507
- accuracy = 0.0
508
-
509
- # Elo from avg CPL using exponential decay calibrated to your 3600 engine
510
- # Toughened constant from -0.01 to -0.015 for more realistic scoring
511
  avg_cpl = sum(player_cpls) / max(1, len(player_cpls))
512
  estimated_elo = int(max(400, min(3600, round(3600 * math.exp(-0.015 * avg_cpl)))))
513
-
514
- return AnalyzeResponse(
515
- accuracy=round(accuracy, 1),
516
- estimated_elo=estimated_elo,
517
- moves=analysis_results,
518
- counts=counts
519
- )
520
  except Exception as e:
521
- print(f"Analysis Error: {e}")
522
  raise HTTPException(status_code=500, detail=str(e))
523
 
524
-
525
  if __name__ == "__main__":
526
  import uvicorn
527
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from pydantic import BaseModel
4
+ from typing import Optional, List, Dict, Tuple
5
  import os
6
  import math
7
  import chess
 
10
  import json
11
  from contextlib import asynccontextmanager
12
 
13
+ # ─── 1. SERVER CONFIGURATION ───────────────────────────────────────────────────────────
14
+ # These variables define the paths for the engine and the neural network weights.
15
+ ENGINE_PATH = os.environ.get("ENGINE_PATH", "/app/engine/deepcastle")
16
+ NNUE_PATH = os.environ.get("NNUE_PATH", "/app/engine/output.nnue")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
+ # ─── 2. OPENINGS DATABASE ─────────────────────────────────────────────────────────────
19
+ # This loads a pre-populated opening book so the bot can respond instantly during theory.
20
+ openings_db = {}
21
+ openings_path = os.path.join(os.path.dirname(__file__), "openings.json")
22
+ if os.path.exists(openings_path):
 
23
  try:
24
+ with open(openings_path, "r", encoding="utf-8") as f:
25
+ openings_db = json.load(f)
 
 
 
 
26
  except Exception:
27
+ pass
 
 
 
 
 
 
 
 
28
 
29
+ # ─── 3. PYDANTIC MODELS (Data Structures) ───────────────────────────────────────────
30
+ # Used for input validation and defining the exact format of the JSON responses.
 
31
 
32
  class MoveRequest(BaseModel):
33
  fen: str
34
+ time: float = 1.0 # Seconds the engine will search
35
  depth: Optional[int] = None
36
 
37
  class MoveResponse(BaseModel):
38
  bestmove: str
39
+ score: float # Pawns (+1.0 for White, -1.0 for Black)
40
  depth: int
41
  nodes: int
42
  nps: int
43
+ pv: str # Principal Variation (best sequence of moves)
44
  mate_in: Optional[int] = None
45
  opening: Optional[str] = None
46
 
 
54
  move_num: int
55
  san: str
56
  best_move: str
57
+ classification: str # "Brilliant", "Great", "Best", etc.
58
  opening: Optional[str] = None
59
+ cpl: float # Centipawn Loss (0 = perfect)
60
  score_before: float
61
  score_after: float
62
 
63
  class AnalyzeResponse(BaseModel):
64
+ accuracy: float # Lichess-style (0-100)
65
+ estimated_elo: int # Calibrated Performance Rating
66
  moves: List[MoveAnalysis]
67
+ counts: Dict[str, int] # Count of each classification (2 Blunders, etc.)
 
 
 
 
68
 
69
+ # ─── 4. CORE MATH & LOGIC HELPERS ──────────────────────────────────────────────────
70
+ # These functions calculate win percentages, accuracy, and calibrated Elo scores.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
  def get_normalized_score(info) -> tuple[float, Optional[int]]:
73
+ """Normalizes UCI score to centipawns from White's perspective."""
74
  if "score" not in info:
75
  return 0.0, None
76
  raw = info["score"].white()
77
  if raw.is_mate():
78
  m = raw.mate() or 0
79
  return (10000.0 if m > 0 else -10000.0), m
80
+ return float(raw.score() or 0.0), None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  def get_win_percentage_from_cp(cp: int) -> float:
83
+ """AS-UCI win chance formula (0.0 to 100.0)."""
84
  cp_ceiled = max(-1000, min(1000, cp))
85
  MULTIPLIER = -0.00368208
86
  win_chances = 2.0 / (1.0 + math.exp(MULTIPLIER * cp_ceiled)) - 1.0
87
  return 50.0 + 50.0 * win_chances
88
 
89
  def get_move_accuracy(win_pct_before: float, win_pct_after: float, is_white_move: bool) -> float:
90
+ """Lichess-style per-move accuracy calculation using exponential decay."""
91
  if is_white_move:
92
  diff = win_pct_before - win_pct_after
93
  else:
94
  diff = (100.0 - win_pct_before) - (100.0 - win_pct_after)
95
 
96
+ # Formula: 103.16 * exp(-0.04 * diff) - 3.16
97
  accuracy = 103.1668 * math.exp(-0.04354 * max(0.0, diff)) - 3.1669
98
  return max(0.0, min(100.0, accuracy))
99
 
100
  def get_win_percentage(info: dict) -> float:
101
+ """Extracts win percentage from engine info dictionary."""
102
  score = info.get("score")
103
+ if not score: return 50.0
 
104
  white_score = score.white()
105
  if white_score.is_mate():
106
  mate_val = white_score.mate()
107
  return 100.0 if mate_val > 0 else 0.0
108
+ return get_win_percentage_from_cp(white_score.score() or 0)
109
 
110
+ # ─── 5. MOVE CLASSIFICATION HELPERS ───────────────────────────────────────────────
111
+ # These identify Brilliant moves, Sacrifices, Blunders, and Book moves.
112
+
113
+ def is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move) -> bool:
114
  is_losing = pos_win_pct < 50.0 if is_white_move else pos_win_pct > 50.0
115
  is_alt_winning = alt_win_pct > 97.0 if is_white_move else alt_win_pct < 3.0
116
  return is_losing or is_alt_winning
117
 
118
+ def get_has_changed_outcome(last_win_pct, pos_win_pct, is_white_move) -> bool:
119
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
120
  return diff > 10.0 and ((last_win_pct < 50.0 and pos_win_pct > 50.0) or (last_win_pct > 50.0 and pos_win_pct < 50.0))
121
 
122
+ def get_is_only_good_move(pos_win_pct, alt_win_pct, is_white_move) -> bool:
123
  diff = (pos_win_pct - alt_win_pct) * (1 if is_white_move else -1)
124
  return diff > 10.0
125
 
126
+ def is_simple_recapture(fen_two_moves_ago, previous_move, played_move) -> bool:
127
+ if previous_move.to_square != played_move.to_square: return False
 
128
  b = chess.Board(fen_two_moves_ago)
129
  return b.piece_at(previous_move.to_square) is not None
130
 
 
134
  b = sum(values.get(p.piece_type, 0) for p in board.piece_map().values() if p.color == chess.BLACK)
135
  return w - b
136
 
137
+ def get_is_piece_sacrifice(board: chess.Board, played_move, best_pv: list) -> bool:
138
+ """Checks if a move is a genuine piece sacrifice by looking at the resulting PV material."""
139
+ if not best_pv: return False
140
  start_diff = get_material_difference(board)
141
  white_to_play = board.turn == chess.WHITE
 
142
  sim_board = board.copy()
143
  moves = [played_move] + best_pv
144
+ if len(moves) % 2 == 1: moves = moves[:-1]
145
+ captured_w, captured_b = [], []
 
 
 
 
 
146
  for m in moves:
147
  if m in sim_board.legal_moves:
148
  captured_piece = sim_board.piece_at(m.to_square)
149
+ if sim_board.is_en_passant(m): captured_piece = chess.Piece(chess.PAWN, not sim_board.turn)
 
 
150
  if captured_piece:
151
+ if sim_board.turn == chess.WHITE: captured_b.append(captured_piece.piece_type)
152
+ else: captured_w.append(captured_piece.piece_type)
 
 
 
 
 
 
 
153
  sim_board.push(m)
154
+ else: break
 
 
155
  for p in captured_w[:]:
156
  if p in captured_b:
157
+ captured_w.remove(p); captured_b.remove(p)
 
 
158
  if abs(len(captured_w) - len(captured_b)) <= 1 and all(p == chess.PAWN for p in captured_w + captured_b):
159
  return False
 
160
  end_diff = get_material_difference(sim_board)
161
  mat_diff = end_diff - start_diff
162
  player_rel = mat_diff if white_to_play else -mat_diff
 
163
  return player_rel < 0
164
 
165
+ def get_move_classification(last_win_pct, pos_win_pct, is_white_move, played_move, best_move_before, alt_win_pct, fen_two_moves_ago, uci_next_two_moves, board_before_move, best_pv_after) -> str:
166
+ """Classifies a move based on win% change and engine principal variation."""
 
 
 
 
 
 
 
 
 
 
167
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
 
168
  if alt_win_pct is not None and diff >= -2.0:
169
  if get_is_piece_sacrifice(board_before_move, played_move, best_pv_after):
170
+ if not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move): return "Brilliant"
 
 
 
171
  is_recapture = False
172
  if fen_two_moves_ago and uci_next_two_moves:
173
  is_recapture = is_simple_recapture(fen_two_moves_ago, uci_next_two_moves[0], uci_next_two_moves[1])
 
174
  if not is_recapture and not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move):
175
+ if get_has_changed_outcome(last_win_pct, pos_win_pct, is_white_move) or get_is_only_good_move(pos_win_pct, alt_win_pct, is_white_move): return "Great"
176
+ if best_move_before and played_move == best_move_before: return "Best"
 
 
 
 
177
  if diff < -20.0: return "Blunder"
178
  if diff < -10.0: return "Mistake"
179
  if diff < -5.0: return "Inaccuracy"
180
  if diff < -2.0: return "Good"
181
  return "Excellent"
182
 
183
+ # ─── 6. MULTIPLAYER CONNECTION MANAGER ───────────────────────────────────────────────
184
+ # Handles websocket routing for playing against friends in real-time rooms.
185
+ class ConnectionManager:
186
+ def __init__(self):
187
+ self.active_connections: Dict[str, List[WebSocket]] = {}
188
+ async def connect(self, websocket: WebSocket, match_id: str):
189
+ await websocket.accept()
190
+ if match_id not in self.active_connections: self.active_connections[match_id] = []
191
+ self.active_connections[match_id].append(websocket)
192
+ def disconnect(self, websocket: WebSocket, match_id: str):
193
+ if match_id in self.active_connections:
194
+ if websocket in self.active_connections[match_id]: self.active_connections[match_id].remove(websocket)
195
+ if not self.active_connections[match_id]: del self.active_connections[match_id]
196
+ async def broadcast(self, message: str, match_id: str, exclude: WebSocket = None):
197
+ if match_id in self.active_connections:
198
+ for connection in self.active_connections[match_id]:
199
+ if connection != exclude:
200
+ try: await connection.send_text(message)
201
+ except Exception: pass
202
+
203
+ manager = ConnectionManager()
204
+
205
+ # ─── 7. ENGINE POOL (Orchestrator) ──────────────────────────────────────────────────
206
+ # Maintains 4 persistent engine processes for high-concurrency instant response.
207
+ class EnginePool:
208
+ def __init__(self, size=4):
209
+ self.size = size
210
+ self.engines = asyncio.Queue()
211
+ self.all_engines = []
212
+ async def start(self):
213
+ print(f"Initializing bulletproof engine pool with {self.size} processes...")
214
+ for i in range(self.size):
215
+ try:
216
+ engine = await self._create_engine()
217
+ await self.engines.put(engine)
218
+ self.all_engines.append(engine)
219
+ print(f" [+] Engine {i+1}/{self.size} ready.")
220
+ await asyncio.sleep(0.5) # Prevent CPU thrashing on boot
221
+ except Exception as e:
222
+ print(f" [!] Failed to start engine {i+1}: {e}")
223
+ async def _create_engine(self):
224
+ if not os.path.exists(ENGINE_PATH): raise Exception("Engine binary not found")
225
+ transport, engine = await chess.engine.popen_uci(ENGINE_PATH)
226
+ if os.path.exists(NNUE_PATH):
227
+ try: await engine.configure({"EvalFile": NNUE_PATH, "Hash": 512, "Threads": 1})
228
+ except Exception: pass
229
+ return engine
230
+ @asynccontextmanager
231
+ async def acquire(self):
232
+ """Yields an engine from the pool and ensures it's returned or restarted."""
233
+ engine = await self.engines.get()
234
+ try: yield engine
235
+ finally:
236
+ try: await self.engines.put(engine)
237
+ except Exception: await self.engines.put(await self._create_engine())
238
+ async def stop(self):
239
+ for engine in self.all_engines:
240
+ try: await engine.quit()
241
+ except: pass
242
+
243
+ pool = EnginePool(size=4)
244
+
245
+ # ─── 8. FASTAPI LIFESPAN CONTROLLER ──────────────────────────────────────────────────
246
+ # Controls what happens when the server starts and stops on Hugging Face.
247
+ @asynccontextmanager
248
+ async def lifespan(app: FastAPI):
249
+ # Initialize the engine pool first
250
+ await pool.start()
251
+ yield
252
+ # Cleanup on shutdown
253
+ await pool.stop()
254
+
255
+ app = FastAPI(title="Deepcastle Engine API", lifespan=lifespan)
256
+ app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
257
+
258
+ # ─── 9. API ROUTES ──────────────────────────────────────────────────────────────────
259
+
260
+ @app.websocket("/ws/{match_id}")
261
+ async def websocket_endpoint(websocket: WebSocket, match_id: str):
262
+ """Handles real-time friend play relay."""
263
+ await manager.connect(websocket, match_id)
264
+ await manager.broadcast(json.dumps({"type": "join"}), match_id, exclude=websocket)
265
+ try:
266
+ while True:
267
+ data = await websocket.receive_text()
268
+ await manager.broadcast(data, match_id, exclude=websocket)
269
+ except WebSocketDisconnect:
270
+ manager.disconnect(websocket, match_id)
271
+ await manager.broadcast(json.dumps({"type": "opponent_disconnected"}), match_id)
272
+ except Exception:
273
+ manager.disconnect(websocket, match_id)
274
+
275
+ @app.get("/")
276
+ def home():
277
+ return {"status": "online", "engine": "Deepcastle Pro", "platform": "Hugging Face"}
278
+
279
+ @app.post("/move", response_model=MoveResponse)
280
+ async def get_move(request: MoveRequest):
281
+ """Fetches the best engine move for a given FEN position."""
282
+ try:
283
+ async with pool.acquire() as engine:
284
+ board = chess.Board(request.fen)
285
+ limit = chess.engine.Limit(time=request.time, depth=request.depth)
286
+ result = await engine.play(board, limit)
287
+ info = await engine.analyse(board, limit)
288
+ score_cp, mate_in = get_normalized_score(info)
289
+ pv_board = board.copy()
290
+ pv_parts = []
291
+ for m in info.get("pv", [])[:5]:
292
+ if m in pv_board.legal_moves:
293
+ try:
294
+ pv_parts.append(pv_board.san(m)); pv_board.push(m)
295
+ except Exception: break
296
+ else: break
297
+ score_pawns = score_cp / 100.0 if abs(score_cp) < 9900 else (100.0 if score_cp > 0 else -100.0)
298
+ return MoveResponse(bestmove=result.move.uci(), score=score_pawns, depth=info.get("depth", 0), nodes=info.get("nodes", 0), nps=info.get("nps", 0), pv=" ".join(pv_parts), mate_in=mate_in, opening=openings_db.get(board.fen().split(" ")[0]))
299
+ except Exception as e:
300
+ raise HTTPException(status_code=500, detail=str(e))
301
+
302
  @app.post("/analyze-game", response_model=AnalyzeResponse)
303
  async def analyze_game(request: AnalyzeRequest):
304
+ """Performs deep game analysis, classifies moves, and estimates performance Elo."""
305
  try:
306
  async with pool.acquire() as engine:
307
  board = chess.Board(request.start_fen) if request.start_fen else chess.Board()
308
  limit = chess.engine.Limit(time=request.time_per_move)
 
309
  analysis_results = []
310
  infos_before = await engine.analyse(board, limit, multipv=2)
311
  infos_before = infos_before if isinstance(infos_before, list) else [infos_before]
312
+ counts = {"Book": 0, "Brilliant": 0, "Great": 0, "Best": 0, "Excellent": 0, "Good": 0, "Inaccuracy": 0, "Mistake": 0, "Blunder": 0}
 
 
 
 
 
 
313
  player_is_white = (request.player_color.lower() == "white")
314
+ fen_history, move_history = [board.fen()], []
315
+ player_move_accuracies, player_cpls = [], []
 
 
316
  current_score, _ = get_normalized_score(infos_before[0])
 
317
  for i, san_move in enumerate(request.moves):
318
  is_white_turn = board.turn == chess.WHITE
319
  is_player_turn = is_white_turn if player_is_white else not is_white_turn
 
320
  score_before = current_score
321
+ try: move = board.parse_san(san_move)
322
+ except Exception: break
 
 
 
323
  info_dict = infos_before[0]
324
+ best_move_before = info_dict.get("pv", [None])[0]
 
 
 
325
  win_pct_before = get_win_percentage(info_dict)
326
+ alt_win_pct_before = None
327
  if len(infos_before) > 1:
328
  for line in infos_before:
329
  if line.get("pv") and line.get("pv")[0] != move:
330
+ alt_win_pct_before = get_win_percentage(line); break
 
 
331
  board_before_move = board.copy()
332
+ board.push(move); move_history.append(move); fen_history.append(board.fen())
 
 
 
 
333
  infos_after_raw = await engine.analyse(board, limit, multipv=2)
334
+ infos_after = infos_after_raw if isinstance(infos_after_raw, list) else [infos_after_raw]
335
+ info_after_dict = infos_after[0]
336
+ win_pct_after, (score_after, _) = get_win_percentage(info_after_dict), get_normalized_score(info_after_dict)
 
 
337
  current_score = score_after
338
+ fen_two_moves_ago = fen_history[-3] if len(move_history) >= 2 else None
339
+ uci_next_two_moves = (move_history[-2], move_history[-1]) if len(move_history) >= 2 else None
 
 
 
 
 
 
 
 
340
  board_fen_only = board.fen().split(" ")[0]
341
  if board_fen_only in openings_db:
342
+ cls, opening_name = "Book", openings_db[board_fen_only]
 
343
  else:
344
+ cls, opening_name = get_move_classification(win_pct_before, win_pct_after, is_white_turn, move, best_move_before, alt_win_pct_before, fen_two_moves_ago, uci_next_two_moves, board_before_move, info_after_dict.get("pv", [])), None
 
 
 
 
 
 
 
 
 
 
 
 
345
  move_gain = score_after - score_before if is_white_turn else score_before - score_after
346
  cpl = max(0.0, min(1000.0, -move_gain))
 
 
347
  move_acc = get_move_accuracy(win_pct_before, win_pct_after, is_white_turn)
 
348
  if is_player_turn:
349
+ player_move_accuracies.append(move_acc); player_cpls.append(cpl)
 
350
  counts[cls] = counts.get(cls, 0) + 1
351
+ analysis_results.append(MoveAnalysis(move_num=i+1, san=san_move, best_move=best_move_before.uci() if best_move_before else "", classification=cls, opening=opening_name, cpl=float(cpl), score_before=float(score_before / 100.0), score_after=float(score_after / 100.0)))
 
 
 
 
 
 
 
 
 
 
 
352
  infos_before = infos_after
 
 
353
  if player_move_accuracies:
354
+ accuracy = ( (sum(player_move_accuracies) / len(player_move_accuracies)) + (len(player_move_accuracies) / sum(1.0 / max(a, 0.1) for a in player_move_accuracies)) ) / 2.0
355
+ else: accuracy = 0.0
 
 
 
 
 
 
 
356
  avg_cpl = sum(player_cpls) / max(1, len(player_cpls))
357
  estimated_elo = int(max(400, min(3600, round(3600 * math.exp(-0.015 * avg_cpl)))))
358
+ return AnalyzeResponse(accuracy=round(accuracy, 1), estimated_elo=estimated_elo, moves=analysis_results, counts=counts)
 
 
 
 
 
 
359
  except Exception as e:
 
360
  raise HTTPException(status_code=500, detail=str(e))
361
 
362
+ # ─── 10. MAIN ENTRY POINT ────────────────────────────────────────────────────────────
363
  if __name__ == "__main__":
364
  import uvicorn
365
  uvicorn.run(app, host="0.0.0.0", port=7860)