Rafs-an09002's picture
Create engine/search.py
da6ae37 verified
raw
history blame
9.09 kB
"""
Nexus-Core Search Engine
Efficient alpha-beta with essential optimizations
- Basic transposition table
- Simple move ordering (MVV-LVA)
- Quiescence search
"""
import onnxruntime as ort
import numpy as np
import chess
import time
import logging
from pathlib import Path
from typing import Optional, Dict, Tuple, List
logger = logging.getLogger(__name__)
class NexusCoreEngine:
"""
Lightweight chess engine for Nexus-Core
Optimized for speed over strength
"""
PIECE_VALUES = {
chess.PAWN: 100,
chess.KNIGHT: 320,
chess.BISHOP: 330,
chess.ROOK: 500,
chess.QUEEN: 900,
chess.KING: 0
}
def __init__(self, model_path: str, num_threads: int = 2):
"""Initialize engine"""
self.model_path = Path(model_path)
if not self.model_path.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
# Load ONNX model
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = num_threads
sess_options.inter_op_num_threads = num_threads
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
logger.info(f"Loading Nexus-Core from {model_path}...")
self.session = ort.InferenceSession(
str(self.model_path),
sess_options=sess_options,
providers=['CPUExecutionProvider']
)
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
# Simple transposition table (dict-based, 100K entries)
self.tt_cache = {}
self.max_tt_size = 100000
# Statistics
self.nodes_evaluated = 0
logger.info("✅ Nexus-Core engine ready")
def fen_to_tensor(self, fen: str) -> np.ndarray:
"""Convert FEN to 12-channel tensor"""
board = chess.Board(fen)
tensor = np.zeros((1, 12, 8, 8), dtype=np.float32)
piece_to_channel = {
chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2,
chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5
}
for square, piece in board.piece_map().items():
rank, file = divmod(square, 8)
channel = piece_to_channel[piece.piece_type]
if piece.color == chess.BLACK:
channel += 6
tensor[0, channel, rank, file] = 1.0
return tensor
def evaluate(self, board: chess.Board) -> float:
"""Neural network evaluation"""
self.nodes_evaluated += 1
# Check cache (simple FEN-based)
fen_key = board.fen().split(' ')[0]
if fen_key in self.tt_cache:
return self.tt_cache[fen_key]
# Run inference
input_tensor = self.fen_to_tensor(board.fen())
output = self.session.run([self.output_name], {self.input_name: input_tensor})
# Value output (tanh normalized)
eval_score = float(output[0][0][0]) * 400.0 # Scale to centipawns
# Flip for black
if board.turn == chess.BLACK:
eval_score = -eval_score
# Cache result
if len(self.tt_cache) < self.max_tt_size:
self.tt_cache[fen_key] = eval_score
return eval_score
def order_moves(self, board: chess.Board, moves: List[chess.Move]) -> List[chess.Move]:
"""
Simple move ordering
1. Captures (MVV-LVA)
2. Checks
3. Other moves
"""
scored_moves = []
for move in moves:
score = 0
# Captures
if board.is_capture(move):
victim = board.piece_at(move.to_square)
attacker = board.piece_at(move.from_square)
if victim and attacker:
victim_val = self.PIECE_VALUES.get(victim.piece_type, 0)
attacker_val = self.PIECE_VALUES.get(attacker.piece_type, 1)
score = (victim_val * 10 - attacker_val) * 100
# Promotions
if move.promotion == chess.QUEEN:
score += 9000
# Checks
board.push(move)
if board.is_check():
score += 5000
board.pop()
scored_moves.append((score, move))
scored_moves.sort(key=lambda x: x[0], reverse=True)
return [move for _, move in scored_moves]
def quiescence(self, board: chess.Board, alpha: float, beta: float, depth: int = 2) -> float:
"""Quiescence search (captures only)"""
stand_pat = self.evaluate(board)
if stand_pat >= beta:
return beta
if alpha < stand_pat:
alpha = stand_pat
if depth == 0:
return stand_pat
# Only captures
captures = [m for m in board.legal_moves if board.is_capture(m)]
if not captures:
return stand_pat
captures = self.order_moves(board, captures)
for move in captures:
board.push(move)
score = -self.quiescence(board, -beta, -alpha, depth - 1)
board.pop()
if score >= beta:
return beta
if score > alpha:
alpha = score
return alpha
def alpha_beta(
self,
board: chess.Board,
depth: int,
alpha: float,
beta: float,
start_time: float,
time_limit: float
) -> Tuple[float, Optional[chess.Move]]:
"""Alpha-beta search"""
# Time check
if time.time() - start_time > time_limit:
return self.evaluate(board), None
# Terminal nodes
if board.is_game_over():
if board.is_checkmate():
return -10000, None
return 0, None
# Leaf nodes
if depth == 0:
return self.quiescence(board, alpha, beta), None
legal_moves = list(board.legal_moves)
if not legal_moves:
return 0, None
ordered_moves = self.order_moves(board, legal_moves)
best_move = ordered_moves[0]
best_score = float('-inf')
for move in ordered_moves:
board.push(move)
score, _ = self.alpha_beta(board, depth - 1, -beta, -alpha, start_time, time_limit)
score = -score
board.pop()
if score > best_score:
best_score = score
best_move = move
alpha = max(alpha, score)
if alpha >= beta:
break
return best_score, best_move
def get_best_move(self, fen: str, depth: int = 4, time_limit: int = 3000) -> Dict:
"""Main search entry"""
board = chess.Board(fen)
self.nodes_evaluated = 0
time_limit_sec = time_limit / 1000.0
start_time = time.time()
# Special cases
legal_moves = list(board.legal_moves)
if len(legal_moves) == 0:
return {'best_move': '0000', 'evaluation': 0.0, 'depth_searched': 0, 'nodes_evaluated': 0}
if len(legal_moves) == 1:
return {
'best_move': legal_moves[0].uci(),
'evaluation': round(self.evaluate(board) / 100.0, 2),
'depth_searched': 0,
'nodes_evaluated': 1,
'time_taken': 0
}
# Iterative deepening
best_move = legal_moves[0]
best_score = float('-inf')
for current_depth in range(1, depth + 1):
if time.time() - start_time > time_limit_sec * 0.9:
break
try:
score, move = self.alpha_beta(
board, current_depth,
float('-inf'), float('inf'),
start_time, time_limit_sec
)
if move:
best_move = move
best_score = score
except Exception as e:
logger.warning(f"Search error: {e}")
break
time_taken = int((time.time() - start_time) * 1000)
return {
'best_move': best_move.uci(),
'evaluation': round(best_score / 100.0, 2),
'depth_searched': current_depth,
'nodes_evaluated': self.nodes_evaluated,
'time_taken': time_taken
}
def validate_fen(self, fen: str) -> bool:
try:
chess.Board(fen)
return True
except:
return False
def get_model_size(self) -> float:
return self.model_path.stat().st_size / (1024 * 1024)