Amogh1221 commited on
Commit
44da706
Β·
verified Β·
1 Parent(s): 5f5fb8a

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +373 -240
main.py CHANGED
@@ -1,46 +1,90 @@
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from pydantic import BaseModel
4
- from typing import Optional, List, Dict, Tuple
5
  import os
6
  import math
7
  import chess
8
  import chess.engine
9
  import asyncio
10
  import json
11
- from contextlib import asynccontextmanager
12
 
13
- # ─── 1. SERVER CONFIGURATION ───────────────────────────────────────────────────────────
14
- # These variables define the paths for the engine and the neural network weights.
15
- ENGINE_PATH = os.environ.get("ENGINE_PATH", "/app/engine/deepcastle")
16
- NNUE_PATH = os.environ.get("NNUE_PATH", "/app/engine/output.nnue")
17
 
18
- # ─── 2. OPENINGS DATABASE ─────────────────────────────────────────────────────────────
19
- # This loads a pre-populated opening book so the bot can respond instantly during theory.
20
- openings_db = {}
21
- openings_path = os.path.join(os.path.dirname(__file__), "openings.json")
22
- if os.path.exists(openings_path):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  try:
24
- with open(openings_path, "r", encoding="utf-8") as f:
25
- openings_db = json.load(f)
 
 
 
 
 
 
26
  except Exception:
27
- pass
 
 
28
 
29
- # ─── 3. PYDANTIC MODELS (Data Structures) ───────────────────────────────────────────
30
- # Used for input validation and defining the exact format of the JSON responses.
 
 
 
 
 
 
 
 
 
31
 
32
  class MoveRequest(BaseModel):
33
  fen: str
34
- time: float = 1.0 # Seconds the engine will search
35
  depth: Optional[int] = None
36
 
37
  class MoveResponse(BaseModel):
38
  bestmove: str
39
- score: float # Pawns (+1.0 for White, -1.0 for Black)
40
  depth: int
41
  nodes: int
42
  nps: int
43
- pv: str # Principal Variation (best sequence of moves)
44
  mate_in: Optional[int] = None
45
  opening: Optional[str] = None
46
 
@@ -54,77 +98,156 @@ class MoveAnalysis(BaseModel):
54
  move_num: int
55
  san: str
56
  best_move: str
57
- classification: str # "Brilliant", "Great", "Best", etc.
58
  opening: Optional[str] = None
59
- cpl: float # Centipawn Loss (0 = perfect)
60
  score_before: float
61
  score_after: float
62
 
63
  class AnalyzeResponse(BaseModel):
64
- accuracy: float # Lichess-style (0-100)
65
- estimated_elo: int # Calibrated Performance Rating
66
  moves: List[MoveAnalysis]
67
- counts: Dict[str, int] # Count of each classification (2 Blunders, etc.)
68
 
69
- # ─── 4. CORE MATH & LOGIC HELPERS ──────────────────────────────────────────────────
70
- # These functions calculate win percentages, accuracy, and calibrated Elo scores.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
  def get_normalized_score(info) -> tuple[float, Optional[int]]:
73
- """Normalizes UCI score to centipawns from White's perspective."""
74
  if "score" not in info:
75
  return 0.0, None
76
  raw = info["score"].white()
77
  if raw.is_mate():
78
  m = raw.mate() or 0
79
  return (10000.0 if m > 0 else -10000.0), m
80
- return float(raw.score() or 0.0), None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  def get_win_percentage_from_cp(cp: int) -> float:
83
- """AS-UCI win chance formula (0.0 to 100.0)."""
84
  cp_ceiled = max(-1000, min(1000, cp))
85
  MULTIPLIER = -0.00368208
86
  win_chances = 2.0 / (1.0 + math.exp(MULTIPLIER * cp_ceiled)) - 1.0
87
  return 50.0 + 50.0 * win_chances
88
 
89
- def get_move_accuracy(win_pct_before: float, win_pct_after: float, is_white_move: bool) -> float:
90
- """Lichess-style per-move accuracy calculation using exponential decay."""
91
- if is_white_move:
92
- diff = win_pct_before - win_pct_after
93
- else:
94
- diff = (100.0 - win_pct_before) - (100.0 - win_pct_after)
95
-
96
- # Formula: 103.16 * exp(-0.04 * diff) - 3.16
97
- accuracy = 103.1668 * math.exp(-0.04354 * max(0.0, diff)) - 3.1669
98
- return max(0.0, min(100.0, accuracy))
99
-
100
  def get_win_percentage(info: dict) -> float:
101
- """Extracts win percentage from engine info dictionary."""
102
  score = info.get("score")
103
- if not score: return 50.0
 
104
  white_score = score.white()
105
  if white_score.is_mate():
106
  mate_val = white_score.mate()
107
  return 100.0 if mate_val > 0 else 0.0
108
- return get_win_percentage_from_cp(white_score.score() or 0)
109
-
110
- # ─── 5. MOVE CLASSIFICATION HELPERS ───────────────────────────────────────────────
111
- # These identify Brilliant moves, Sacrifices, Blunders, and Book moves.
112
 
113
- def is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move) -> bool:
114
  is_losing = pos_win_pct < 50.0 if is_white_move else pos_win_pct > 50.0
115
  is_alt_winning = alt_win_pct > 97.0 if is_white_move else alt_win_pct < 3.0
116
  return is_losing or is_alt_winning
117
 
118
- def get_has_changed_outcome(last_win_pct, pos_win_pct, is_white_move) -> bool:
119
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
120
  return diff > 10.0 and ((last_win_pct < 50.0 and pos_win_pct > 50.0) or (last_win_pct > 50.0 and pos_win_pct < 50.0))
121
 
122
- def get_is_only_good_move(pos_win_pct, alt_win_pct, is_white_move) -> bool:
123
  diff = (pos_win_pct - alt_win_pct) * (1 if is_white_move else -1)
124
  return diff > 10.0
125
 
126
- def is_simple_recapture(fen_two_moves_ago, previous_move, played_move) -> bool:
127
- if previous_move.to_square != played_move.to_square: return False
 
128
  b = chess.Board(fen_two_moves_ago)
129
  return b.piece_at(previous_move.to_square) is not None
130
 
@@ -134,232 +257,242 @@ def get_material_difference(board: chess.Board) -> int:
134
  b = sum(values.get(p.piece_type, 0) for p in board.piece_map().values() if p.color == chess.BLACK)
135
  return w - b
136
 
137
- def get_is_piece_sacrifice(board: chess.Board, played_move, best_pv: list) -> bool:
138
- """Checks if a move is a genuine piece sacrifice by looking at the resulting PV material."""
139
- if not best_pv: return False
140
  start_diff = get_material_difference(board)
141
  white_to_play = board.turn == chess.WHITE
 
142
  sim_board = board.copy()
143
  moves = [played_move] + best_pv
144
- if len(moves) % 2 == 1: moves = moves[:-1]
145
- captured_w, captured_b = [], []
 
 
 
 
 
146
  for m in moves:
147
  if m in sim_board.legal_moves:
148
  captured_piece = sim_board.piece_at(m.to_square)
149
- if sim_board.is_en_passant(m): captured_piece = chess.Piece(chess.PAWN, not sim_board.turn)
 
 
150
  if captured_piece:
151
- if sim_board.turn == chess.WHITE: captured_b.append(captured_piece.piece_type)
152
- else: captured_w.append(captured_piece.piece_type)
 
 
 
 
 
 
 
153
  sim_board.push(m)
154
- else: break
 
 
155
  for p in captured_w[:]:
156
  if p in captured_b:
157
- captured_w.remove(p); captured_b.remove(p)
 
 
158
  if abs(len(captured_w) - len(captured_b)) <= 1 and all(p == chess.PAWN for p in captured_w + captured_b):
159
  return False
 
160
  end_diff = get_material_difference(sim_board)
161
  mat_diff = end_diff - start_diff
162
  player_rel = mat_diff if white_to_play else -mat_diff
 
163
  return player_rel < 0
164
 
165
- def get_move_classification(last_win_pct, pos_win_pct, is_white_move, played_move, best_move_before, alt_win_pct, fen_two_moves_ago, uci_next_two_moves, board_before_move, best_pv_after) -> str:
166
- """Classifies a move based on win% change and engine principal variation."""
 
 
 
 
 
 
 
 
 
 
167
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
 
168
  if alt_win_pct is not None and diff >= -2.0:
169
  if get_is_piece_sacrifice(board_before_move, played_move, best_pv_after):
170
- if not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move): return "Brilliant"
 
 
 
171
  is_recapture = False
172
  if fen_two_moves_ago and uci_next_two_moves:
173
  is_recapture = is_simple_recapture(fen_two_moves_ago, uci_next_two_moves[0], uci_next_two_moves[1])
 
174
  if not is_recapture and not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move):
175
- if get_has_changed_outcome(last_win_pct, pos_win_pct, is_white_move) or get_is_only_good_move(pos_win_pct, alt_win_pct, is_white_move): return "Great"
176
- if best_move_before and played_move == best_move_before: return "Best"
 
 
 
 
177
  if diff < -20.0: return "Blunder"
178
  if diff < -10.0: return "Mistake"
179
  if diff < -5.0: return "Inaccuracy"
180
  if diff < -2.0: return "Good"
181
  return "Excellent"
182
 
183
- # ─── 6. MULTIPLAYER CONNECTION MANAGER ───────────────────────────────────────────────
184
- # Handles websocket routing for playing against friends in real-time rooms.
185
- class ConnectionManager:
186
- def __init__(self):
187
- self.active_connections: Dict[str, List[WebSocket]] = {}
188
- async def connect(self, websocket: WebSocket, match_id: str):
189
- await websocket.accept()
190
- if match_id not in self.active_connections: self.active_connections[match_id] = []
191
- self.active_connections[match_id].append(websocket)
192
- def disconnect(self, websocket: WebSocket, match_id: str):
193
- if match_id in self.active_connections:
194
- if websocket in self.active_connections[match_id]: self.active_connections[match_id].remove(websocket)
195
- if not self.active_connections[match_id]: del self.active_connections[match_id]
196
- async def broadcast(self, message: str, match_id: str, exclude: WebSocket = None):
197
- if match_id in self.active_connections:
198
- for connection in self.active_connections[match_id]:
199
- if connection != exclude:
200
- try: await connection.send_text(message)
201
- except Exception: pass
202
-
203
- manager = ConnectionManager()
204
-
205
- # ─── 7. ENGINE POOL (Orchestrator) ──────────────────────────────────────────────────
206
- # Maintains 4 persistent engine processes for high-concurrency instant response.
207
- class EnginePool:
208
- def __init__(self, size=4):
209
- self.size = size
210
- self.engines = asyncio.Queue()
211
- self.all_engines = []
212
- async def start(self):
213
- print(f"Initializing bulletproof engine pool with {self.size} processes...")
214
- for i in range(self.size):
215
- try:
216
- engine = await self._create_engine()
217
- await self.engines.put(engine)
218
- self.all_engines.append(engine)
219
- print(f" [+] Engine {i+1}/{self.size} ready.")
220
- await asyncio.sleep(0.5) # Prevent CPU thrashing on boot
221
- except Exception as e:
222
- print(f" [!] Failed to start engine {i+1}: {e}")
223
- async def _create_engine(self):
224
- if not os.path.exists(ENGINE_PATH): raise Exception("Engine binary not found")
225
- transport, engine = await chess.engine.popen_uci(ENGINE_PATH)
226
- if os.path.exists(NNUE_PATH):
227
- try: await engine.configure({"EvalFile": NNUE_PATH, "Hash": 512, "Threads": 1})
228
- except Exception: pass
229
- return engine
230
- @asynccontextmanager
231
- async def acquire(self):
232
- """Yields an engine from the pool and ensures it's returned or restarted."""
233
- engine = await self.engines.get()
234
- try: yield engine
235
- finally:
236
- try: await self.engines.put(engine)
237
- except Exception: await self.engines.put(await self._create_engine())
238
- async def stop(self):
239
- for engine in self.all_engines:
240
- try: await engine.quit()
241
- except: pass
242
-
243
- pool = EnginePool(size=4)
244
-
245
- # ─── 8. FASTAPI LIFESPAN CONTROLLER ──────────────────────────────────────────────────
246
- # Controls what happens when the server starts and stops on Hugging Face.
247
- @asynccontextmanager
248
- async def lifespan(app: FastAPI):
249
- # Initialize the engine pool first
250
- await pool.start()
251
- yield
252
- # Cleanup on shutdown
253
- await pool.stop()
254
-
255
- app = FastAPI(title="Deepcastle Engine API", lifespan=lifespan)
256
- app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
257
-
258
- # ─── 9. API ROUTES ──────────────────────────────────────────────────────────────────
259
-
260
- @app.websocket("/ws/{match_id}")
261
- async def websocket_endpoint(websocket: WebSocket, match_id: str):
262
- """Handles real-time friend play relay."""
263
- await manager.connect(websocket, match_id)
264
- await manager.broadcast(json.dumps({"type": "join"}), match_id, exclude=websocket)
265
- try:
266
- while True:
267
- data = await websocket.receive_text()
268
- await manager.broadcast(data, match_id, exclude=websocket)
269
- except WebSocketDisconnect:
270
- manager.disconnect(websocket, match_id)
271
- await manager.broadcast(json.dumps({"type": "opponent_disconnected"}), match_id)
272
- except Exception:
273
- manager.disconnect(websocket, match_id)
274
-
275
- @app.get("/")
276
- def home():
277
- return {"status": "online", "engine": "Deepcastle Pro", "platform": "Hugging Face"}
278
-
279
- @app.post("/move", response_model=MoveResponse)
280
- async def get_move(request: MoveRequest):
281
- """Fetches the best engine move for a given FEN position."""
282
- try:
283
- async with pool.acquire() as engine:
284
- board = chess.Board(request.fen)
285
- limit = chess.engine.Limit(time=request.time, depth=request.depth)
286
- result = await engine.play(board, limit)
287
- info = await engine.analyse(board, limit)
288
- score_cp, mate_in = get_normalized_score(info)
289
- pv_board = board.copy()
290
- pv_parts = []
291
- for m in info.get("pv", [])[:5]:
292
- if m in pv_board.legal_moves:
293
- try:
294
- pv_parts.append(pv_board.san(m)); pv_board.push(m)
295
- except Exception: break
296
- else: break
297
- score_pawns = score_cp / 100.0 if abs(score_cp) < 9900 else (100.0 if score_cp > 0 else -100.0)
298
- return MoveResponse(bestmove=result.move.uci(), score=score_pawns, depth=info.get("depth", 0), nodes=info.get("nodes", 0), nps=info.get("nps", 0), pv=" ".join(pv_parts), mate_in=mate_in, opening=openings_db.get(board.fen().split(" ")[0]))
299
- except Exception as e:
300
- raise HTTPException(status_code=500, detail=str(e))
301
-
302
  @app.post("/analyze-game", response_model=AnalyzeResponse)
303
  async def analyze_game(request: AnalyzeRequest):
304
- """Performs deep game analysis, classifies moves, and estimates performance Elo."""
305
  try:
306
- async with pool.acquire() as engine:
307
- board = chess.Board(request.start_fen) if request.start_fen else chess.Board()
308
- limit = chess.engine.Limit(time=request.time_per_move)
309
- analysis_results = []
310
- infos_before = await engine.analyse(board, limit, multipv=2)
311
- infos_before = infos_before if isinstance(infos_before, list) else [infos_before]
312
- counts = {"Book": 0, "Brilliant": 0, "Great": 0, "Best": 0, "Excellent": 0, "Good": 0, "Inaccuracy": 0, "Mistake": 0, "Blunder": 0}
313
- player_is_white = (request.player_color.lower() == "white")
314
- fen_history, move_history = [board.fen()], []
315
- player_move_accuracies, player_cpls = [], []
316
- current_score, _ = get_normalized_score(infos_before[0])
317
- for i, san_move in enumerate(request.moves):
318
- is_white_turn = board.turn == chess.WHITE
319
- is_player_turn = is_white_turn if player_is_white else not is_white_turn
320
- score_before = current_score
321
- try: move = board.parse_san(san_move)
322
- except Exception: break
323
- info_dict = infos_before[0]
324
- best_move_before = info_dict.get("pv", [None])[0]
325
- win_pct_before = get_win_percentage(info_dict)
326
- alt_win_pct_before = None
327
- if len(infos_before) > 1:
328
- for line in infos_before:
329
- if line.get("pv") and line.get("pv")[0] != move:
330
- alt_win_pct_before = get_win_percentage(line); break
331
- board_before_move = board.copy()
332
- board.push(move); move_history.append(move); fen_history.append(board.fen())
333
- infos_after_raw = await engine.analyse(board, limit, multipv=2)
334
- infos_after = infos_after_raw if isinstance(infos_after_raw, list) else [infos_after_raw]
335
- info_after_dict = infos_after[0]
336
- win_pct_after, (score_after, _) = get_win_percentage(info_after_dict), get_normalized_score(info_after_dict)
337
- current_score = score_after
338
- fen_two_moves_ago = fen_history[-3] if len(move_history) >= 2 else None
339
- uci_next_two_moves = (move_history[-2], move_history[-1]) if len(move_history) >= 2 else None
340
- board_fen_only = board.fen().split(" ")[0]
341
- if board_fen_only in openings_db:
342
- cls, opening_name = "Book", openings_db[board_fen_only]
343
- else:
344
- cls, opening_name = get_move_classification(win_pct_before, win_pct_after, is_white_turn, move, best_move_before, alt_win_pct_before, fen_two_moves_ago, uci_next_two_moves, board_before_move, info_after_dict.get("pv", [])), None
345
- move_gain = score_after - score_before if is_white_turn else score_before - score_after
346
- cpl = max(0.0, min(1000.0, -move_gain))
347
- move_acc = get_move_accuracy(win_pct_before, win_pct_after, is_white_turn)
348
- if is_player_turn:
349
- player_move_accuracies.append(move_acc); player_cpls.append(cpl)
350
- counts[cls] = counts.get(cls, 0) + 1
351
- analysis_results.append(MoveAnalysis(move_num=i+1, san=san_move, best_move=best_move_before.uci() if best_move_before else "", classification=cls, opening=opening_name, cpl=float(cpl), score_before=float(score_before / 100.0), score_after=float(score_after / 100.0)))
352
- infos_before = infos_after
353
- if player_move_accuracies:
354
- accuracy = ( (sum(player_move_accuracies) / len(player_move_accuracies)) + (len(player_move_accuracies) / sum(1.0 / max(a, 0.1) for a in player_move_accuracies)) ) / 2.0
355
- else: accuracy = 0.0
356
- avg_cpl = sum(player_cpls) / max(1, len(player_cpls))
357
- estimated_elo = int(max(400, min(3600, round(3600 * math.exp(-0.015 * avg_cpl)))))
358
- return AnalyzeResponse(accuracy=round(accuracy, 1), estimated_elo=estimated_elo, moves=analysis_results, counts=counts)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
359
  except Exception as e:
 
360
  raise HTTPException(status_code=500, detail=str(e))
 
 
 
 
 
 
 
361
 
362
- # ─── 10. MAIN ENTRY POINT ────────────────────────────────────────────────────────────
363
  if __name__ == "__main__":
364
  import uvicorn
 
365
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from pydantic import BaseModel
4
+ from typing import Optional, List, Dict
5
  import os
6
  import math
7
  import chess
8
  import chess.engine
9
  import asyncio
10
  import json
 
11
 
12
+ app = FastAPI(title="Deepcastle Engine API")
 
 
 
13
 
14
+ # ─── Multiplaying / Challenge Manager ──────────────────────────────────────────
15
+ class ConnectionManager:
16
+ def __init__(self):
17
+ # match_id -> list of websockets
18
+ self.active_connections: Dict[str, List[WebSocket]] = {}
19
+
20
+ async def connect(self, websocket: WebSocket, match_id: str):
21
+ await websocket.accept()
22
+ if match_id not in self.active_connections:
23
+ self.active_connections[match_id] = []
24
+ self.active_connections[match_id].append(websocket)
25
+
26
+ def disconnect(self, websocket: WebSocket, match_id: str):
27
+ if match_id in self.active_connections:
28
+ if websocket in self.active_connections[match_id]:
29
+ self.active_connections[match_id].remove(websocket)
30
+ if not self.active_connections[match_id]:
31
+ del self.active_connections[match_id]
32
+
33
+ async def broadcast(self, message: str, match_id: str, exclude: WebSocket = None):
34
+ if match_id in self.active_connections:
35
+ for connection in self.active_connections[match_id]:
36
+ if connection != exclude:
37
+ try:
38
+ await connection.send_text(message)
39
+ except Exception:
40
+ pass
41
+
42
+ manager = ConnectionManager()
43
+
44
+ @app.websocket("/ws/{match_id}")
45
+ async def websocket_endpoint(websocket: WebSocket, match_id: str):
46
+ await manager.connect(websocket, match_id)
47
+ room = manager.active_connections.get(match_id, [])
48
+ # Notify others that someone joined
49
+ await manager.broadcast(json.dumps({"type": "join"}), match_id, exclude=websocket)
50
  try:
51
+ while True:
52
+ data = await websocket.receive_text()
53
+ # Relay the message (move, etc.) to others in the same room
54
+ await manager.broadcast(data, match_id, exclude=websocket)
55
+ except WebSocketDisconnect:
56
+ manager.disconnect(websocket, match_id)
57
+ # Notify remaining players that opponent disconnected β†’ they win
58
+ await manager.broadcast(json.dumps({"type": "opponent_disconnected"}), match_id)
59
  except Exception:
60
+ manager.disconnect(websocket, match_id)
61
+ await manager.broadcast(json.dumps({"type": "opponent_disconnected"}), match_id)
62
+
63
 
64
+ # Allow ALL for easy testing (we can restrict this later if needed)
65
+ app.add_middleware(
66
+ CORSMiddleware,
67
+ allow_origins=["*"],
68
+ allow_methods=["*"],
69
+ allow_headers=["*"],
70
+ )
71
+
72
+ # Paths relative to the Docker container
73
+ ENGINE_PATH = os.environ.get("ENGINE_PATH", "/app/engine/deepcastle")
74
+ NNUE_PATH = os.environ.get("NNUE_PATH", "/app/engine/output.nnue")
75
 
76
  class MoveRequest(BaseModel):
77
  fen: str
78
+ time: float = 1.0 # seconds
79
  depth: Optional[int] = None
80
 
81
  class MoveResponse(BaseModel):
82
  bestmove: str
83
+ score: float
84
  depth: int
85
  nodes: int
86
  nps: int
87
+ pv: str
88
  mate_in: Optional[int] = None
89
  opening: Optional[str] = None
90
 
 
98
  move_num: int
99
  san: str
100
  best_move: str
101
+ classification: str
102
  opening: Optional[str] = None
103
+ cpl: float
104
  score_before: float
105
  score_after: float
106
 
107
  class AnalyzeResponse(BaseModel):
108
+ accuracy: float
109
+ estimated_elo: int
110
  moves: List[MoveAnalysis]
111
+ counts: Dict[str, int]
112
 
113
+ @app.get("/")
114
+ def home():
115
+ return {"status": "online", "engine": "Deepcastle Hybrid Neural", "platform": "Hugging Face Spaces"}
116
+
117
+ @app.get("/health")
118
+ def health():
119
+ if not os.path.exists(ENGINE_PATH):
120
+ return {"status": "error", "message": "Engine binary not found"}
121
+ return {"status": "ok", "engine": "Deepcastle"}
122
+
123
+ async def get_engine():
124
+ if not os.path.exists(ENGINE_PATH):
125
+ raise HTTPException(status_code=500, detail="Engine binary not found")
126
+ transport, engine = await chess.engine.popen_uci(ENGINE_PATH)
127
+ if os.path.exists(NNUE_PATH):
128
+ try:
129
+ await engine.configure({"EvalFile": NNUE_PATH})
130
+ await engine.configure({"Hash": 512, "Threads": 2})
131
+ except Exception:
132
+ pass
133
+ return engine
134
 
135
  def get_normalized_score(info) -> tuple[float, Optional[int]]:
136
+ """Returns the score from White's perspective in centipawns."""
137
  if "score" not in info:
138
  return 0.0, None
139
  raw = info["score"].white()
140
  if raw.is_mate():
141
  m = raw.mate() or 0
142
  return (10000.0 if m > 0 else -10000.0), m
143
+ return raw.score() or 0.0, None
144
+
145
+ # ─── Engine Inference Route ────────────────────────────────────────────────────
146
+ @app.post("/move", response_model=MoveResponse)
147
+ async def get_move(request: MoveRequest):
148
+ engine = None
149
+ try:
150
+ engine = await get_engine()
151
+ board = chess.Board(request.fen)
152
+ limit = chess.engine.Limit(time=request.time, depth=request.depth)
153
+
154
+ result = await engine.play(board, limit)
155
+ info = await engine.analyse(board, limit)
156
+
157
+ # From White's perspective in CP -> converted to Pawns for UI
158
+ score_cp, mate_in = get_normalized_score(info)
159
+
160
+ depth = info.get("depth", 0)
161
+ nodes = info.get("nodes", 0)
162
+ nps = info.get("nps", 0)
163
+
164
+ pv_board = board.copy()
165
+ pv_parts = []
166
+ for m in info.get("pv", [])[:5]:
167
+ if m in pv_board.legal_moves:
168
+ try:
169
+ pv_parts.append(pv_board.san(m))
170
+ pv_board.push(m)
171
+ except Exception:
172
+ break
173
+ else:
174
+ break
175
+ pv = " ".join(pv_parts)
176
+
177
+ # Map mate score to pawns representation to not break old UI
178
+ score_pawns = score_cp / 100.0 if abs(score_cp) < 9900 else (100.0 if score_cp > 0 else -100.0)
179
+
180
+ # Check for opening name
181
+ board_fen_only = board.fen().split(" ")[0]
182
+ opening_name = openings_db.get(board_fen_only)
183
+
184
+ return MoveResponse(
185
+ bestmove=result.move.uci(),
186
+ score=score_pawns,
187
+ depth=depth,
188
+ nodes=nodes,
189
+ nps=nps,
190
+ pv=pv,
191
+ mate_in=mate_in,
192
+ opening=opening_name
193
+ )
194
+ except Exception as e:
195
+ print(f"Error: {e}")
196
+ raise HTTPException(status_code=500, detail=str(e))
197
+ finally:
198
+ if engine:
199
+ try:
200
+ await engine.quit()
201
+ except Exception:
202
+ pass
203
+
204
+
205
+ import math
206
+ import json
207
+ import os
208
+ from typing import Optional, List, Tuple
209
+
210
+ openings_db = {}
211
+ openings_path = os.path.join(os.path.dirname(__file__), "openings.json")
212
+ if os.path.exists(openings_path):
213
+ try:
214
+ with open(openings_path, "r", encoding="utf-8") as f:
215
+ openings_db = json.load(f)
216
+ except Exception as e:
217
+ pass
218
 
219
  def get_win_percentage_from_cp(cp: int) -> float:
 
220
  cp_ceiled = max(-1000, min(1000, cp))
221
  MULTIPLIER = -0.00368208
222
  win_chances = 2.0 / (1.0 + math.exp(MULTIPLIER * cp_ceiled)) - 1.0
223
  return 50.0 + 50.0 * win_chances
224
 
 
 
 
 
 
 
 
 
 
 
 
225
  def get_win_percentage(info: dict) -> float:
 
226
  score = info.get("score")
227
+ if not score:
228
+ return 50.0
229
  white_score = score.white()
230
  if white_score.is_mate():
231
  mate_val = white_score.mate()
232
  return 100.0 if mate_val > 0 else 0.0
233
+ return get_win_percentage_from_cp(white_score.score())
 
 
 
234
 
235
+ def is_losing_or_alt_winning(pos_win_pct: float, alt_win_pct: float, is_white_move: bool) -> bool:
236
  is_losing = pos_win_pct < 50.0 if is_white_move else pos_win_pct > 50.0
237
  is_alt_winning = alt_win_pct > 97.0 if is_white_move else alt_win_pct < 3.0
238
  return is_losing or is_alt_winning
239
 
240
+ def get_has_changed_outcome(last_win_pct: float, pos_win_pct: float, is_white_move: bool) -> bool:
241
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
242
  return diff > 10.0 and ((last_win_pct < 50.0 and pos_win_pct > 50.0) or (last_win_pct > 50.0 and pos_win_pct < 50.0))
243
 
244
+ def get_is_only_good_move(pos_win_pct: float, alt_win_pct: float, is_white_move: bool) -> bool:
245
  diff = (pos_win_pct - alt_win_pct) * (1 if is_white_move else -1)
246
  return diff > 10.0
247
 
248
+ def is_simple_recapture(fen_two_moves_ago: str, previous_move: chess.Move, played_move: chess.Move) -> bool:
249
+ if previous_move.to_square != played_move.to_square:
250
+ return False
251
  b = chess.Board(fen_two_moves_ago)
252
  return b.piece_at(previous_move.to_square) is not None
253
 
 
257
  b = sum(values.get(p.piece_type, 0) for p in board.piece_map().values() if p.color == chess.BLACK)
258
  return w - b
259
 
260
+ def get_is_piece_sacrifice(board: chess.Board, played_move: chess.Move, best_pv: list) -> bool:
261
+ if not best_pv:
262
+ return False
263
  start_diff = get_material_difference(board)
264
  white_to_play = board.turn == chess.WHITE
265
+
266
  sim_board = board.copy()
267
  moves = [played_move] + best_pv
268
+ if len(moves) % 2 == 1:
269
+ moves = moves[:-1]
270
+
271
+ captured_w = []
272
+ captured_b = []
273
+ non_capturing = 1
274
+
275
  for m in moves:
276
  if m in sim_board.legal_moves:
277
  captured_piece = sim_board.piece_at(m.to_square)
278
+ if sim_board.is_en_passant(m):
279
+ captured_piece = chess.Piece(chess.PAWN, not sim_board.turn)
280
+
281
  if captured_piece:
282
+ if sim_board.turn == chess.WHITE:
283
+ captured_b.append(captured_piece.piece_type)
284
+ else:
285
+ captured_w.append(captured_piece.piece_type)
286
+ non_capturing = 1
287
+ else:
288
+ non_capturing -= 1
289
+ if non_capturing < 0:
290
+ break
291
  sim_board.push(m)
292
+ else:
293
+ break
294
+
295
  for p in captured_w[:]:
296
  if p in captured_b:
297
+ captured_w.remove(p)
298
+ captured_b.remove(p)
299
+
300
  if abs(len(captured_w) - len(captured_b)) <= 1 and all(p == chess.PAWN for p in captured_w + captured_b):
301
  return False
302
+
303
  end_diff = get_material_difference(sim_board)
304
  mat_diff = end_diff - start_diff
305
  player_rel = mat_diff if white_to_play else -mat_diff
306
+
307
  return player_rel < 0
308
 
309
+ def get_move_classification(
310
+ last_win_pct: float,
311
+ pos_win_pct: float,
312
+ is_white_move: bool,
313
+ played_move: chess.Move,
314
+ best_move_before: chess.Move,
315
+ alt_win_pct: Optional[float],
316
+ fen_two_moves_ago: Optional[str],
317
+ uci_next_two_moves: Optional[Tuple[chess.Move, chess.Move]],
318
+ board_before_move: chess.Board,
319
+ best_pv_after: list
320
+ ) -> str:
321
  diff = (pos_win_pct - last_win_pct) * (1 if is_white_move else -1)
322
+
323
  if alt_win_pct is not None and diff >= -2.0:
324
  if get_is_piece_sacrifice(board_before_move, played_move, best_pv_after):
325
+ if not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move):
326
+ return "Brilliant"
327
+
328
+ if alt_win_pct is not None and diff >= -2.0:
329
  is_recapture = False
330
  if fen_two_moves_ago and uci_next_two_moves:
331
  is_recapture = is_simple_recapture(fen_two_moves_ago, uci_next_two_moves[0], uci_next_two_moves[1])
332
+
333
  if not is_recapture and not is_losing_or_alt_winning(pos_win_pct, alt_win_pct, is_white_move):
334
+ if get_has_changed_outcome(last_win_pct, pos_win_pct, is_white_move) or get_is_only_good_move(pos_win_pct, alt_win_pct, is_white_move):
335
+ return "Great"
336
+
337
+ if best_move_before and played_move == best_move_before:
338
+ return "Best"
339
+
340
  if diff < -20.0: return "Blunder"
341
  if diff < -10.0: return "Mistake"
342
  if diff < -5.0: return "Inaccuracy"
343
  if diff < -2.0: return "Good"
344
  return "Excellent"
345
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
  @app.post("/analyze-game", response_model=AnalyzeResponse)
347
  async def analyze_game(request: AnalyzeRequest):
348
+ engine = None
349
  try:
350
+ engine = await get_engine()
351
+ board = chess.Board(request.start_fen) if request.start_fen else chess.Board()
352
+ limit = chess.engine.Limit(time=request.time_per_move)
353
+
354
+ analysis_results = []
355
+
356
+ infos_before = await engine.analyse(board, limit, multipv=2)
357
+ infos_before = infos_before if isinstance(infos_before, list) else [infos_before]
358
+
359
+ counts = {
360
+ "Book": 0, "Brilliant": 0, "Great": 0, "Best": 0,
361
+ "Excellent": 0, "Good": 0, "Inaccuracy": 0,
362
+ "Mistake": 0, "Blunder": 0
363
+ }
364
+
365
+ player_is_white = (request.player_color.lower() == "white")
366
+
367
+ fen_history = [board.fen()]
368
+ move_history = []
369
+ total_cpl = 0.0
370
+ player_moves_count = 0
371
+ current_score, _ = get_normalized_score(infos_before[0])
372
+
373
+ for i, san_move in enumerate(request.moves):
374
+ is_white_turn = board.turn == chess.WHITE
375
+ is_player_turn = is_white_turn if player_is_white else not is_white_turn
376
+
377
+ score_before = current_score
378
+
379
+ try:
380
+ move = board.parse_san(san_move)
381
+ except Exception:
382
+ break # Invalid move
383
+
384
+ info_dict = infos_before[0]
385
+ pv_list = info_dict.get("pv", [])
386
+ best_move_before = pv_list[0] if pv_list else None
387
+
388
+ score_before, _ = get_normalized_score(info_dict)
389
+ win_pct_before = get_win_percentage(info_dict)
390
+ alt_win_pct_before: Optional[float] = None
391
+ if len(infos_before) > 1:
392
+ # Find the first alternative move that is not the played move
393
+ for line in infos_before:
394
+ if line.get("pv") and line.get("pv")[0] != move:
395
+ alt_win_pct_before = get_win_percentage(line)
396
+ break
397
+
398
+ board_before_move = board.copy()
399
+ board.push(move)
400
+
401
+ move_history.append(move)
402
+ fen_history.append(board.fen())
403
+
404
+ infos_after_raw = await engine.analyse(board, limit, multipv=2)
405
+ infos_after: List[dict] = infos_after_raw if isinstance(infos_after_raw, list) else [infos_after_raw]
406
+
407
+ info_after_dict: dict = infos_after[0]
408
+
409
+ win_pct_after = get_win_percentage(info_after_dict)
410
+ score_after, _ = get_normalized_score(info_after_dict)
411
+ current_score = score_after
412
+
413
+ best_pv_after = info_after_dict.get("pv", [])
414
+
415
+ fen_two_moves_ago = None
416
+ uci_next_two_moves = None
417
+ if len(move_history) >= 2:
418
+ fen_two_moves_ago = fen_history[-3]
419
+ uci_next_two_moves = (move_history[-2], move_history[-1])
420
+
421
+ cls = "Book"
422
+ opening_name = None
423
+ board_fen_only = board.fen().split(" ")[0]
424
+ if board_fen_only in openings_db:
425
+ cls = "Book"
426
+ opening_name = openings_db[board_fen_only]
427
+ else:
428
+ cls = get_move_classification(
429
+ last_win_pct=win_pct_before,
430
+ pos_win_pct=win_pct_after,
431
+ is_white_move=is_white_turn,
432
+ played_move=move,
433
+ best_move_before=best_move_before,
434
+ alt_win_pct=alt_win_pct_before,
435
+ fen_two_moves_ago=fen_two_moves_ago,
436
+ uci_next_two_moves=uci_next_two_moves,
437
+ board_before_move=board_before_move,
438
+ best_pv_after=best_pv_after
439
+ )
440
+
441
+ move_gain = score_after - score_before if is_white_turn else score_before - score_after
442
+ cpl = max(0, -move_gain)
443
+ cpl = min(cpl, 1000.0)
444
+
445
+ if is_player_turn:
446
+ total_cpl += cpl
447
+ player_moves_count += 1
448
+ counts[cls] = counts.get(cls, 0) + 1
449
+
450
+ analysis_results.append(MoveAnalysis(
451
+ move_num=i+1,
452
+ san=san_move,
453
+ fen=board.fen(),
454
+ classification=cls,
455
+ cpl=float(cpl),
456
+ score_before=float(score_before / 100.0),
457
+ score_after=float(score_after / 100.0),
458
+ best_move=best_move_before.uci() if best_move_before else "",
459
+ opening=opening_name
460
+ ))
461
+
462
+ infos_before = infos_after
463
+
464
+ # Win probability matching accuracy formula
465
+ # Accuracy = 100 * exp(-0.02 * avg_cpl) smoothed
466
+ avg_cpl = total_cpl / max(1, player_moves_count)
467
+
468
+ # Simple heuristic mapping for Accuracy & Elo
469
+ # 0 avg loss -> 100%
470
+ # ~100 avg loss -> ~60%
471
+ accuracy = max(10.0, min(100.0, 100.0 * math.exp(-0.005 * avg_cpl)))
472
+
473
+ # Estimate Elo based slightly on accuracy
474
+ # This is a fun heuristic metric
475
+ estimated_elo = int(max(400, min(3600, 3600 - (avg_cpl * 20))))
476
+
477
+ return AnalyzeResponse(
478
+ accuracy=round(accuracy, 1),
479
+ estimated_elo=estimated_elo,
480
+ moves=analysis_results,
481
+ counts=counts
482
+ )
483
+
484
  except Exception as e:
485
+ print(f"Analysis Error: {e}")
486
  raise HTTPException(status_code=500, detail=str(e))
487
+ finally:
488
+ if engine:
489
+ try:
490
+ await engine.quit()
491
+ except Exception:
492
+ pass
493
+
494
 
 
495
  if __name__ == "__main__":
496
  import uvicorn
497
+ # Hugging Face Spaces port is 7860
498
  uvicorn.run(app, host="0.0.0.0", port=7860)