File size: 9,094 Bytes
da6ae37
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""
Nexus-Core Search Engine
Efficient alpha-beta with essential optimizations
- Basic transposition table
- Simple move ordering (MVV-LVA)
- Quiescence search
"""

import onnxruntime as ort
import numpy as np
import chess
import time
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple, List

logger = logging.getLogger(__name__)


class NexusCoreEngine:
    """
    Lightweight chess engine for Nexus-Core
    Optimized for speed over strength
    """
    
    PIECE_VALUES = {
        chess.PAWN: 100,
        chess.KNIGHT: 320,
        chess.BISHOP: 330,
        chess.ROOK: 500,
        chess.QUEEN: 900,
        chess.KING: 0
    }
    
    def __init__(self, model_path: str, num_threads: int = 2):
        """Initialize engine"""
        
        self.model_path = Path(model_path)
        if not self.model_path.exists():
            raise FileNotFoundError(f"Model not found: {model_path}")
        
        # Load ONNX model
        sess_options = ort.SessionOptions()
        sess_options.intra_op_num_threads = num_threads
        sess_options.inter_op_num_threads = num_threads
        sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        
        logger.info(f"Loading Nexus-Core from {model_path}...")
        self.session = ort.InferenceSession(
            str(self.model_path),
            sess_options=sess_options,
            providers=['CPUExecutionProvider']
        )
        
        self.input_name = self.session.get_inputs()[0].name
        self.output_name = self.session.get_outputs()[0].name
        
        # Simple transposition table (dict-based, 100K entries)
        self.tt_cache = {}
        self.max_tt_size = 100000
        
        # Statistics
        self.nodes_evaluated = 0
        
        logger.info("✅ Nexus-Core engine ready")
    
    def fen_to_tensor(self, fen: str) -> np.ndarray:
        """Convert FEN to 12-channel tensor"""
        board = chess.Board(fen)
        tensor = np.zeros((1, 12, 8, 8), dtype=np.float32)
        
        piece_to_channel = {
            chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2,
            chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5
        }
        
        for square, piece in board.piece_map().items():
            rank, file = divmod(square, 8)
            channel = piece_to_channel[piece.piece_type]
            if piece.color == chess.BLACK:
                channel += 6
            tensor[0, channel, rank, file] = 1.0
        
        return tensor
    
    def evaluate(self, board: chess.Board) -> float:
        """Neural network evaluation"""
        self.nodes_evaluated += 1
        
        # Check cache (simple FEN-based)
        fen_key = board.fen().split(' ')[0]
        if fen_key in self.tt_cache:
            return self.tt_cache[fen_key]
        
        # Run inference
        input_tensor = self.fen_to_tensor(board.fen())
        output = self.session.run([self.output_name], {self.input_name: input_tensor})
        
        # Value output (tanh normalized)
        eval_score = float(output[0][0][0]) * 400.0  # Scale to centipawns
        
        # Flip for black
        if board.turn == chess.BLACK:
            eval_score = -eval_score
        
        # Cache result
        if len(self.tt_cache) < self.max_tt_size:
            self.tt_cache[fen_key] = eval_score
        
        return eval_score
    
    def order_moves(self, board: chess.Board, moves: List[chess.Move]) -> List[chess.Move]:
        """
        Simple move ordering
        1. Captures (MVV-LVA)
        2. Checks
        3. Other moves
        """
        scored_moves = []
        
        for move in moves:
            score = 0
            
            # Captures
            if board.is_capture(move):
                victim = board.piece_at(move.to_square)
                attacker = board.piece_at(move.from_square)
                if victim and attacker:
                    victim_val = self.PIECE_VALUES.get(victim.piece_type, 0)
                    attacker_val = self.PIECE_VALUES.get(attacker.piece_type, 1)
                    score = (victim_val * 10 - attacker_val) * 100
            
            # Promotions
            if move.promotion == chess.QUEEN:
                score += 9000
            
            # Checks
            board.push(move)
            if board.is_check():
                score += 5000
            board.pop()
            
            scored_moves.append((score, move))
        
        scored_moves.sort(key=lambda x: x[0], reverse=True)
        return [move for _, move in scored_moves]
    
    def quiescence(self, board: chess.Board, alpha: float, beta: float, depth: int = 2) -> float:
        """Quiescence search (captures only)"""
        
        stand_pat = self.evaluate(board)
        
        if stand_pat >= beta:
            return beta
        if alpha < stand_pat:
            alpha = stand_pat
        
        if depth == 0:
            return stand_pat
        
        # Only captures
        captures = [m for m in board.legal_moves if board.is_capture(m)]
        if not captures:
            return stand_pat
        
        captures = self.order_moves(board, captures)
        
        for move in captures:
            board.push(move)
            score = -self.quiescence(board, -beta, -alpha, depth - 1)
            board.pop()
            
            if score >= beta:
                return beta
            if score > alpha:
                alpha = score
        
        return alpha
    
    def alpha_beta(
        self,
        board: chess.Board,
        depth: int,
        alpha: float,
        beta: float,
        start_time: float,
        time_limit: float
    ) -> Tuple[float, Optional[chess.Move]]:
        """Alpha-beta search"""
        
        # Time check
        if time.time() - start_time > time_limit:
            return self.evaluate(board), None
        
        # Terminal nodes
        if board.is_game_over():
            if board.is_checkmate():
                return -10000, None
            return 0, None
        
        # Leaf nodes
        if depth == 0:
            return self.quiescence(board, alpha, beta), None
        
        legal_moves = list(board.legal_moves)
        if not legal_moves:
            return 0, None
        
        ordered_moves = self.order_moves(board, legal_moves)
        
        best_move = ordered_moves[0]
        best_score = float('-inf')
        
        for move in ordered_moves:
            board.push(move)
            score, _ = self.alpha_beta(board, depth - 1, -beta, -alpha, start_time, time_limit)
            score = -score
            board.pop()
            
            if score > best_score:
                best_score = score
                best_move = move
            
            alpha = max(alpha, score)
            if alpha >= beta:
                break
        
        return best_score, best_move
    
    def get_best_move(self, fen: str, depth: int = 4, time_limit: int = 3000) -> Dict:
        """Main search entry"""
        
        board = chess.Board(fen)
        self.nodes_evaluated = 0
        
        time_limit_sec = time_limit / 1000.0
        start_time = time.time()
        
        # Special cases
        legal_moves = list(board.legal_moves)
        if len(legal_moves) == 0:
            return {'best_move': '0000', 'evaluation': 0.0, 'depth_searched': 0, 'nodes_evaluated': 0}
        
        if len(legal_moves) == 1:
            return {
                'best_move': legal_moves[0].uci(),
                'evaluation': round(self.evaluate(board) / 100.0, 2),
                'depth_searched': 0,
                'nodes_evaluated': 1,
                'time_taken': 0
            }
        
        # Iterative deepening
        best_move = legal_moves[0]
        best_score = float('-inf')
        
        for current_depth in range(1, depth + 1):
            if time.time() - start_time > time_limit_sec * 0.9:
                break
            
            try:
                score, move = self.alpha_beta(
                    board, current_depth,
                    float('-inf'), float('inf'),
                    start_time, time_limit_sec
                )
                
                if move:
                    best_move = move
                    best_score = score
                
            except Exception as e:
                logger.warning(f"Search error: {e}")
                break
        
        time_taken = int((time.time() - start_time) * 1000)
        
        return {
            'best_move': best_move.uci(),
            'evaluation': round(best_score / 100.0, 2),
            'depth_searched': current_depth,
            'nodes_evaluated': self.nodes_evaluated,
            'time_taken': time_taken
        }
    
    def validate_fen(self, fen: str) -> bool:
        try:
            chess.Board(fen)
            return True
        except:
            return False
    
    def get_model_size(self) -> float:
        return self.model_path.stat().st_size / (1024 * 1024)