Spaces:
Sleeping
Sleeping
| """ | |
| Nexus-Core Search Engine | |
| Efficient alpha-beta with essential optimizations | |
| - Basic transposition table | |
| - Simple move ordering (MVV-LVA) | |
| - Quiescence search | |
| """ | |
| import onnxruntime as ort | |
| import numpy as np | |
| import chess | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from typing import Optional, Dict, Tuple, List | |
| logger = logging.getLogger(__name__) | |
| class NexusCoreEngine: | |
| """ | |
| Lightweight chess engine for Nexus-Core | |
| Optimized for speed over strength | |
| """ | |
| PIECE_VALUES = { | |
| chess.PAWN: 100, | |
| chess.KNIGHT: 320, | |
| chess.BISHOP: 330, | |
| chess.ROOK: 500, | |
| chess.QUEEN: 900, | |
| chess.KING: 0 | |
| } | |
| def __init__(self, model_path: str, num_threads: int = 2): | |
| """Initialize engine""" | |
| self.model_path = Path(model_path) | |
| if not self.model_path.exists(): | |
| raise FileNotFoundError(f"Model not found: {model_path}") | |
| # Load ONNX model | |
| sess_options = ort.SessionOptions() | |
| sess_options.intra_op_num_threads = num_threads | |
| sess_options.inter_op_num_threads = num_threads | |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| logger.info(f"Loading Nexus-Core from {model_path}...") | |
| self.session = ort.InferenceSession( | |
| str(self.model_path), | |
| sess_options=sess_options, | |
| providers=['CPUExecutionProvider'] | |
| ) | |
| self.input_name = self.session.get_inputs()[0].name | |
| self.output_name = self.session.get_outputs()[0].name | |
| # Simple transposition table (dict-based, 100K entries) | |
| self.tt_cache = {} | |
| self.max_tt_size = 100000 | |
| # Statistics | |
| self.nodes_evaluated = 0 | |
| logger.info("✅ Nexus-Core engine ready") | |
| def fen_to_tensor(self, fen: str) -> np.ndarray: | |
| """Convert FEN to 12-channel tensor""" | |
| board = chess.Board(fen) | |
| tensor = np.zeros((1, 12, 8, 8), dtype=np.float32) | |
| piece_to_channel = { | |
| chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2, | |
| chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5 | |
| } | |
| for square, piece in board.piece_map().items(): | |
| rank, file = divmod(square, 8) | |
| channel = piece_to_channel[piece.piece_type] | |
| if piece.color == chess.BLACK: | |
| channel += 6 | |
| tensor[0, channel, rank, file] = 1.0 | |
| return tensor | |
| def evaluate(self, board: chess.Board) -> float: | |
| """Neural network evaluation""" | |
| self.nodes_evaluated += 1 | |
| # Check cache (simple FEN-based) | |
| fen_key = board.fen().split(' ')[0] | |
| if fen_key in self.tt_cache: | |
| return self.tt_cache[fen_key] | |
| # Run inference | |
| input_tensor = self.fen_to_tensor(board.fen()) | |
| output = self.session.run([self.output_name], {self.input_name: input_tensor}) | |
| # Value output (tanh normalized) | |
| eval_score = float(output[0][0][0]) * 400.0 # Scale to centipawns | |
| # Flip for black | |
| if board.turn == chess.BLACK: | |
| eval_score = -eval_score | |
| # Cache result | |
| if len(self.tt_cache) < self.max_tt_size: | |
| self.tt_cache[fen_key] = eval_score | |
| return eval_score | |
| def order_moves(self, board: chess.Board, moves: List[chess.Move]) -> List[chess.Move]: | |
| """ | |
| Simple move ordering | |
| 1. Captures (MVV-LVA) | |
| 2. Checks | |
| 3. Other moves | |
| """ | |
| scored_moves = [] | |
| for move in moves: | |
| score = 0 | |
| # Captures | |
| if board.is_capture(move): | |
| victim = board.piece_at(move.to_square) | |
| attacker = board.piece_at(move.from_square) | |
| if victim and attacker: | |
| victim_val = self.PIECE_VALUES.get(victim.piece_type, 0) | |
| attacker_val = self.PIECE_VALUES.get(attacker.piece_type, 1) | |
| score = (victim_val * 10 - attacker_val) * 100 | |
| # Promotions | |
| if move.promotion == chess.QUEEN: | |
| score += 9000 | |
| # Checks | |
| board.push(move) | |
| if board.is_check(): | |
| score += 5000 | |
| board.pop() | |
| scored_moves.append((score, move)) | |
| scored_moves.sort(key=lambda x: x[0], reverse=True) | |
| return [move for _, move in scored_moves] | |
| def quiescence(self, board: chess.Board, alpha: float, beta: float, depth: int = 2) -> float: | |
| """Quiescence search (captures only)""" | |
| stand_pat = self.evaluate(board) | |
| if stand_pat >= beta: | |
| return beta | |
| if alpha < stand_pat: | |
| alpha = stand_pat | |
| if depth == 0: | |
| return stand_pat | |
| # Only captures | |
| captures = [m for m in board.legal_moves if board.is_capture(m)] | |
| if not captures: | |
| return stand_pat | |
| captures = self.order_moves(board, captures) | |
| for move in captures: | |
| board.push(move) | |
| score = -self.quiescence(board, -beta, -alpha, depth - 1) | |
| board.pop() | |
| if score >= beta: | |
| return beta | |
| if score > alpha: | |
| alpha = score | |
| return alpha | |
| def alpha_beta( | |
| self, | |
| board: chess.Board, | |
| depth: int, | |
| alpha: float, | |
| beta: float, | |
| start_time: float, | |
| time_limit: float | |
| ) -> Tuple[float, Optional[chess.Move]]: | |
| """Alpha-beta search""" | |
| # Time check | |
| if time.time() - start_time > time_limit: | |
| return self.evaluate(board), None | |
| # Terminal nodes | |
| if board.is_game_over(): | |
| if board.is_checkmate(): | |
| return -10000, None | |
| return 0, None | |
| # Leaf nodes | |
| if depth == 0: | |
| return self.quiescence(board, alpha, beta), None | |
| legal_moves = list(board.legal_moves) | |
| if not legal_moves: | |
| return 0, None | |
| ordered_moves = self.order_moves(board, legal_moves) | |
| best_move = ordered_moves[0] | |
| best_score = float('-inf') | |
| for move in ordered_moves: | |
| board.push(move) | |
| score, _ = self.alpha_beta(board, depth - 1, -beta, -alpha, start_time, time_limit) | |
| score = -score | |
| board.pop() | |
| if score > best_score: | |
| best_score = score | |
| best_move = move | |
| alpha = max(alpha, score) | |
| if alpha >= beta: | |
| break | |
| return best_score, best_move | |
| def get_best_move(self, fen: str, depth: int = 4, time_limit: int = 3000) -> Dict: | |
| """Main search entry""" | |
| board = chess.Board(fen) | |
| self.nodes_evaluated = 0 | |
| time_limit_sec = time_limit / 1000.0 | |
| start_time = time.time() | |
| # Special cases | |
| legal_moves = list(board.legal_moves) | |
| if len(legal_moves) == 0: | |
| return {'best_move': '0000', 'evaluation': 0.0, 'depth_searched': 0, 'nodes_evaluated': 0} | |
| if len(legal_moves) == 1: | |
| return { | |
| 'best_move': legal_moves[0].uci(), | |
| 'evaluation': round(self.evaluate(board) / 100.0, 2), | |
| 'depth_searched': 0, | |
| 'nodes_evaluated': 1, | |
| 'time_taken': 0 | |
| } | |
| # Iterative deepening | |
| best_move = legal_moves[0] | |
| best_score = float('-inf') | |
| for current_depth in range(1, depth + 1): | |
| if time.time() - start_time > time_limit_sec * 0.9: | |
| break | |
| try: | |
| score, move = self.alpha_beta( | |
| board, current_depth, | |
| float('-inf'), float('inf'), | |
| start_time, time_limit_sec | |
| ) | |
| if move: | |
| best_move = move | |
| best_score = score | |
| except Exception as e: | |
| logger.warning(f"Search error: {e}") | |
| break | |
| time_taken = int((time.time() - start_time) * 1000) | |
| return { | |
| 'best_move': best_move.uci(), | |
| 'evaluation': round(best_score / 100.0, 2), | |
| 'depth_searched': current_depth, | |
| 'nodes_evaluated': self.nodes_evaluated, | |
| 'time_taken': time_taken | |
| } | |
| def validate_fen(self, fen: str) -> bool: | |
| try: | |
| chess.Board(fen) | |
| return True | |
| except: | |
| return False | |
| def get_model_size(self) -> float: | |
| return self.model_path.stat().st_size / (1024 * 1024) |