moonfish_chess / server /chess_environment.py
luccabb's picture
Upload folder using huggingface_hub
b5e858e verified
"""Chess environment for OpenEnv using moonfish."""
import random
import uuid
from typing import Any, Dict, Optional, Tuple
import chess
from moonfish.psqt import board_evaluation, MG_PIECE_VALUES, count_pieces, get_phase
from moonfish.lib import search_move
from ..models import ChessAction, ChessObservation, ChessState, RewardConfig
class ChessEnvironment:
"""
Chess environment implementing the OpenEnv interface.
Uses python-chess for game logic and moonfish for position evaluation.
Designed for RL training where an agent plays as one color against
an opponent (which can be random, moonfish engine, or self-play).
"""
def __init__(
self,
reward_config: Optional[RewardConfig] = None,
max_moves: int = 500,
agent_color: Optional[
bool
] = None, # None = alternate, True = White, False = Black
opponent: Optional[
str
] = None, # None = self-play, "moonfish" = moonfish engine, "random" = random
opponent_depth: int = 2, # Search depth for moonfish opponent
):
"""
Initialize the chess environment.
Args:
reward_config: Configuration for reward shaping
max_moves: Maximum half-moves before draw (prevents infinite games)
agent_color: Which color the RL agent plays (None = alternates each episode)
opponent: Opponent type - None (self-play), "moonfish", or "random"
opponent_depth: Search depth when using moonfish as opponent
"""
self.reward_config = reward_config or RewardConfig()
self.max_moves = max_moves
self.agent_color_setting = agent_color
self.opponent = opponent
self.opponent_depth = opponent_depth
# Will be set on reset
self._board: Optional[chess.Board] = None
self._state: Optional[ChessState] = None
self._agent_color: bool = chess.WHITE
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
fen: Optional[str] = None,
**kwargs,
) -> ChessObservation:
"""
Initialize a new chess game episode.
Args:
seed: Random seed (unused for now, chess is deterministic)
episode_id: Unique identifier for this episode
fen: Optional starting position in FEN notation
Returns:
Initial observation of the board state
"""
# Create new board
if fen:
self._board = chess.Board(fen)
else:
self._board = chess.Board()
# Determine agent color
if self.agent_color_setting is None:
# Alternate each episode based on episode_id hash
if episode_id:
self._agent_color = hash(episode_id) % 2 == 0
else:
self._agent_color = chess.WHITE
else:
self._agent_color = self.agent_color_setting
# Initialize state
self._state = ChessState(
episode_id=episode_id or uuid.uuid4().hex,
step_count=0,
current_player="white" if self._board.turn else "black",
fen=self._board.fen(),
move_history=[],
)
# If agent plays Black and opponent is configured, opponent moves first
if self.opponent is not None and self._agent_color == chess.BLACK:
self._make_opponent_move()
return self._get_observation()
def step(
self, action: ChessAction, timeout_s: Optional[float] = None, **kwargs
) -> Tuple[ChessObservation, float, bool]:
"""
Execute a chess move and return the resulting state.
Args:
action: The move to make in UCI format (e.g., "e2e4")
timeout_s: Unused timeout parameter
Returns:
Tuple of (observation, reward, done)
"""
if self._board is None or self._state is None:
raise RuntimeError("Environment not initialized. Call reset() first.")
# Parse the move
try:
move = chess.Move.from_uci(action.move)
except ValueError:
# Invalid move format
return self._handle_illegal_move(f"Invalid move format: {action.move}")
# Check if move is legal
if move not in self._board.legal_moves:
return self._handle_illegal_move(f"Illegal move: {action.move}")
# Execute the move
self._board.push(move)
self._state.step_count += 1
self._state.move_history.append(action.move)
self._state.current_player = "white" if self._board.turn else "black"
self._state.fen = self._board.fen()
# Calculate reward and check for game end
reward, done = self._calculate_reward_and_done()
# If game not over and opponent is configured, make opponent move
if not done and self.opponent is not None:
self._make_opponent_move()
# Recalculate after opponent move
opp_reward, done = self._calculate_reward_and_done()
# Opponent's reward is negative of ours (zero-sum)
reward += -opp_reward if done else 0
observation = self._get_observation(done=done, reward=reward if done else None)
return observation, reward, done
@property
def state(self) -> ChessState:
"""Return the current episode state."""
if self._state is None:
raise RuntimeError("Environment not initialized. Call reset() first.")
return self._state
def close(self) -> None:
"""Clean up resources."""
self._board = None
self._state = None
def get_metadata(self) -> Dict[str, Any]:
"""Return environment metadata."""
return {
"name": "chess",
"version": "1.0.0",
"max_moves": self.max_moves,
"reward_config": {
"win": self.reward_config.win,
"loss": self.reward_config.loss,
"draw": self.reward_config.draw,
"illegal_move": self.reward_config.illegal_move,
"use_evaluation": self.reward_config.use_evaluation,
"evaluation_scale": self.reward_config.evaluation_scale,
},
}
def _get_observation(
self,
done: bool = False,
reward: Optional[float] = None,
result: Optional[str] = None,
error: Optional[str] = None,
) -> ChessObservation:
"""Build observation from current board state."""
assert self._board is not None
legal_moves = [move.uci() for move in self._board.legal_moves]
metadata: Dict[str, Any] = {}
# Add evaluation if configured
if self.reward_config.use_evaluation:
metadata["evaluation"] = board_evaluation(self._board)
# Add material count
metadata["material"] = self._get_material_count()
# Add game phase (0 = opening, 256 = endgame)
metadata["phase"] = get_phase(self._board)
metadata["fullmove_number"] = self._board.fullmove_number
metadata["halfmove_clock"] = self._board.halfmove_clock
if error:
metadata["error"] = error
# Determine result string if game is over
if done and result is None:
result = self._get_result_string()
return ChessObservation(
fen=self._board.fen(),
legal_moves=legal_moves,
is_check=self._board.is_check(),
done=done,
reward=reward,
result=result,
metadata=metadata,
)
def _calculate_reward_and_done(self) -> Tuple[float, bool]:
"""Calculate reward and check if episode is done."""
assert self._board is not None
# Check for game end
if self._board.is_checkmate():
# The side to move is checkmated, so the previous mover won
winner = not self._board.turn
if winner == self._agent_color:
return self.reward_config.win, True
else:
return self.reward_config.loss, True
if self._board.is_stalemate():
return self.reward_config.draw, True
if self._board.is_insufficient_material():
return self.reward_config.draw, True
if self._board.is_fifty_moves():
return self.reward_config.draw, True
if self._board.is_repetition(3):
return self.reward_config.draw, True
# Check move limit
if self._state and self._state.step_count >= self.max_moves:
return self.reward_config.draw, True
# Game continues
reward = 0.0
# Optional: Add evaluation-based intermediate rewards
if self.reward_config.use_evaluation:
eval_score = board_evaluation(self._board)
# Normalize evaluation to agent's perspective
if self._board.turn != self._agent_color:
eval_score = -eval_score
reward = eval_score * self.reward_config.evaluation_scale
return reward, False
def _handle_illegal_move(
self, error_msg: str
) -> Tuple[ChessObservation, float, bool]:
"""Handle an illegal move attempt."""
observation = self._get_observation(done=False, error=error_msg)
return observation, self.reward_config.illegal_move, False
def _get_result_string(self) -> str:
"""Get the game result as a string."""
assert self._board is not None
if self._board.is_checkmate():
return "1-0" if not self._board.turn else "0-1"
return "1/2-1/2"
def _get_material_count(self) -> Dict[str, int]:
"""Count material for both sides using moonfish piece values."""
assert self._board is not None
# count_pieces returns [wp, bp, wn, bn, wb, bb, wr, br, wq, bq]
pieces = count_pieces(self._board)
wp, bp, wn, bn, wb, bb, wr, br, wq, bq = pieces
white = (
wp * MG_PIECE_VALUES[chess.PAWN]
+ wn * MG_PIECE_VALUES[chess.KNIGHT]
+ wb * MG_PIECE_VALUES[chess.BISHOP]
+ wr * MG_PIECE_VALUES[chess.ROOK]
+ wq * MG_PIECE_VALUES[chess.QUEEN]
)
black = (
bp * MG_PIECE_VALUES[chess.PAWN]
+ bn * MG_PIECE_VALUES[chess.KNIGHT]
+ bb * MG_PIECE_VALUES[chess.BISHOP]
+ br * MG_PIECE_VALUES[chess.ROOK]
+ bq * MG_PIECE_VALUES[chess.QUEEN]
)
return {"white": white, "black": black}
def _make_opponent_move(self) -> None:
"""Make a move for the opponent using configured strategy."""
assert self._board is not None
assert self._state is not None
if not list(self._board.legal_moves):
return # No legal moves (game should be over)
if self.opponent == "moonfish":
# Use moonfish engine to find best move
move = search_move(self._board, depth=self.opponent_depth)
elif self.opponent == "random":
# Pick a random legal move
move = random.choice(list(self._board.legal_moves))
else:
return # No opponent configured
# Execute opponent's move
self._board.push(move)
self._state.step_count += 1
self._state.move_history.append(move.uci())
self._state.current_player = "white" if self._board.turn else "black"
self._state.fen = self._board.fen()