Rafs-an09002 commited on
Commit
1561c06
·
verified ·
1 Parent(s): db00797

Create engine/search.py

Browse files
Files changed (1) hide show
  1. engine/search.py +305 -0
engine/search.py ADDED
@@ -0,0 +1,305 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Nexus-Nano Search Engine
3
+ Fast alpha-beta with minimal overhead
4
+
5
+ Focus: Speed > Depth
6
+ Target: Sub-second responses
7
+ """
8
+
9
+ import chess
10
+ import logging
11
+ from typing import Optional, Tuple, List, Dict
12
+
13
+ from .evaluate import NexusNanoEvaluator
14
+ from .transposition import TranspositionTable, NodeType
15
+ from .move_ordering import MoveOrderer
16
+ from .time_manager import TimeManager
17
+ from .endgame import EndgameDetector
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+
22
+ class NexusNanoEngine:
23
+ """Ultra-fast 2.8M parameter chess engine"""
24
+
25
+ MATE_SCORE = 100000
26
+ MAX_PLY = 100
27
+
28
+ def __init__(self, model_path: str, num_threads: int = 1):
29
+ """Initialize with single-threaded config"""
30
+
31
+ self.evaluator = NexusNanoEvaluator(model_path, num_threads)
32
+ self.tt = TranspositionTable(size_mb=64) # 64MB TT
33
+ self.move_orderer = MoveOrderer()
34
+ self.time_manager = TimeManager()
35
+ self.endgame_detector = EndgameDetector()
36
+
37
+ self.nodes_evaluated = 0
38
+ self.depth_reached = 0
39
+ self.sel_depth = 0
40
+ self.principal_variation = []
41
+
42
+ logger.info("⚡ Nexus-Nano Engine initialized")
43
+ logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB")
44
+ logger.info(f" TT: 64 MB")
45
+
46
+ def get_best_move(
47
+ self,
48
+ fen: str,
49
+ depth: int = 4,
50
+ time_limit: int = 2000
51
+ ) -> Dict:
52
+ """
53
+ Fast move search
54
+
55
+ Args:
56
+ fen: Position
57
+ depth: Max depth (1-6 recommended)
58
+ time_limit: Time in ms
59
+ """
60
+
61
+ board = chess.Board(fen)
62
+
63
+ # Reset
64
+ self.nodes_evaluated = 0
65
+ self.depth_reached = 0
66
+ self.sel_depth = 0
67
+ self.principal_variation = []
68
+
69
+ # Time setup
70
+ time_limit_sec = time_limit / 1000.0
71
+ self.time_manager.start_search(time_limit_sec, time_limit_sec)
72
+
73
+ # Clear old data
74
+ self.move_orderer.clear()
75
+ self.tt.increment_age()
76
+
77
+ # Special cases
78
+ legal_moves = list(board.legal_moves)
79
+
80
+ if len(legal_moves) == 0:
81
+ return self._no_legal_moves()
82
+
83
+ if len(legal_moves) == 1:
84
+ return self._single_move(board, legal_moves[0])
85
+
86
+ # Iterative deepening (fast)
87
+ best_move = legal_moves[0]
88
+ best_score = float('-inf')
89
+
90
+ for current_depth in range(1, depth + 1):
91
+ if self.time_manager.should_stop(current_depth):
92
+ break
93
+
94
+ score, move, pv = self._search_root(
95
+ board, current_depth, float('-inf'), float('inf')
96
+ )
97
+
98
+ if move:
99
+ best_move = move
100
+ best_score = score
101
+ self.depth_reached = current_depth
102
+ self.principal_variation = pv
103
+
104
+ return {
105
+ 'best_move': best_move.uci(),
106
+ 'evaluation': round(best_score / 100.0, 2),
107
+ 'depth_searched': self.depth_reached,
108
+ 'seldepth': self.sel_depth,
109
+ 'nodes_evaluated': self.nodes_evaluated,
110
+ 'time_taken': int(self.time_manager.elapsed() * 1000),
111
+ 'pv': [m.uci() for m in self.principal_variation],
112
+ 'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)),
113
+ 'tt_stats': self.tt.get_stats(),
114
+ 'move_ordering_stats': self.move_orderer.get_stats()
115
+ }
116
+
117
+ def _search_root(
118
+ self,
119
+ board: chess.Board,
120
+ depth: int,
121
+ alpha: float,
122
+ beta: float
123
+ ) -> Tuple[float, Optional[chess.Move], List[chess.Move]]:
124
+ """Root search"""
125
+
126
+ legal_moves = list(board.legal_moves)
127
+
128
+ # TT probe
129
+ zobrist_key = self.tt.compute_zobrist_key(board)
130
+ tt_result = self.tt.probe(zobrist_key, depth, alpha, beta)
131
+ tt_move = tt_result[1] if tt_result else None
132
+
133
+ # Order moves
134
+ ordered_moves = self.move_orderer.order_moves(
135
+ board, legal_moves, depth, tt_move
136
+ )
137
+
138
+ best_move = ordered_moves[0]
139
+ best_score = float('-inf')
140
+ best_pv = []
141
+
142
+ for move in ordered_moves:
143
+ board.push(move)
144
+ score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha)
145
+ score = -score
146
+ board.pop()
147
+
148
+ if score > best_score:
149
+ best_score = score
150
+ best_move = move
151
+ best_pv = [move] + pv
152
+
153
+ if score > alpha:
154
+ alpha = score
155
+
156
+ if self.time_manager.should_stop(depth):
157
+ break
158
+
159
+ self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move)
160
+
161
+ return best_score, best_move, best_pv
162
+
163
+ def _alpha_beta(
164
+ self,
165
+ board: chess.Board,
166
+ depth: int,
167
+ alpha: float,
168
+ beta: float
169
+ ) -> Tuple[float, List[chess.Move]]:
170
+ """Fast alpha-beta search"""
171
+
172
+ self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth)
173
+
174
+ # Draw check
175
+ if board.is_repetition(2) or board.is_fifty_moves():
176
+ return 0, []
177
+
178
+ # TT probe
179
+ zobrist_key = self.tt.compute_zobrist_key(board)
180
+ tt_result = self.tt.probe(zobrist_key, depth, alpha, beta)
181
+
182
+ if tt_result and tt_result[0] is not None:
183
+ return tt_result[0], []
184
+
185
+ tt_move = tt_result[1] if tt_result else None
186
+
187
+ # Quiescence
188
+ if depth <= 0:
189
+ return self._quiescence(board, alpha, beta, 0), []
190
+
191
+ # Legal moves
192
+ legal_moves = list(board.legal_moves)
193
+ if not legal_moves:
194
+ if board.is_check():
195
+ return -self.MATE_SCORE + (self.MAX_PLY - depth), []
196
+ return 0, []
197
+
198
+ ordered_moves = self.move_orderer.order_moves(
199
+ board, legal_moves, depth, tt_move
200
+ )
201
+
202
+ # Search
203
+ best_score = float('-inf')
204
+ best_pv = []
205
+ node_type = NodeType.UPPER_BOUND
206
+
207
+ for move in ordered_moves:
208
+ board.push(move)
209
+ score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha)
210
+ score = -score
211
+ board.pop()
212
+
213
+ if score > best_score:
214
+ best_score = score
215
+ best_pv = [move] + pv
216
+
217
+ if score > alpha:
218
+ alpha = score
219
+ node_type = NodeType.EXACT
220
+
221
+ if not board.is_capture(move):
222
+ self.move_orderer.update_killer_move(move, depth)
223
+
224
+ if score >= beta:
225
+ node_type = NodeType.LOWER_BOUND
226
+ break
227
+
228
+ self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None)
229
+
230
+ return best_score, best_pv
231
+
232
+ def _quiescence(
233
+ self,
234
+ board: chess.Board,
235
+ alpha: float,
236
+ beta: float,
237
+ qs_depth: int
238
+ ) -> float:
239
+ """Fast quiescence (captures only)"""
240
+
241
+ self.nodes_evaluated += 1
242
+
243
+ # Stand-pat
244
+ stand_pat = self.evaluator.evaluate_hybrid(board)
245
+ stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat)
246
+
247
+ if stand_pat >= beta:
248
+ return beta
249
+ if alpha < stand_pat:
250
+ alpha = stand_pat
251
+
252
+ # Depth limit
253
+ if qs_depth >= 6:
254
+ return stand_pat
255
+
256
+ # Captures only (no checks for speed)
257
+ captures = [m for m in board.legal_moves if board.is_capture(m)]
258
+
259
+ if not captures:
260
+ return stand_pat
261
+
262
+ captures = self.move_orderer.order_moves(board, captures, 0)
263
+
264
+ for move in captures:
265
+ board.push(move)
266
+ score = -self._quiescence(board, -beta, -alpha, qs_depth + 1)
267
+ board.pop()
268
+
269
+ if score >= beta:
270
+ return beta
271
+ if score > alpha:
272
+ alpha = score
273
+
274
+ return alpha
275
+
276
+ def _no_legal_moves(self) -> Dict:
277
+ return {
278
+ 'best_move': '0000',
279
+ 'evaluation': 0.0,
280
+ 'depth_searched': 0,
281
+ 'nodes_evaluated': 0,
282
+ 'time_taken': 0
283
+ }
284
+
285
+ def _single_move(self, board: chess.Board, move: chess.Move) -> Dict:
286
+ eval_score = self.evaluator.evaluate_hybrid(board)
287
+
288
+ return {
289
+ 'best_move': move.uci(),
290
+ 'evaluation': round(eval_score / 100.0, 2),
291
+ 'depth_searched': 0,
292
+ 'nodes_evaluated': 1,
293
+ 'time_taken': 0,
294
+ 'pv': [move.uci()]
295
+ }
296
+
297
+ def validate_fen(self, fen: str) -> bool:
298
+ try:
299
+ chess.Board(fen)
300
+ return True
301
+ except:
302
+ return False
303
+
304
+ def get_model_size(self) -> float:
305
+ return self.evaluator.get_model_size_mb()