Spaces:
Sleeping
Sleeping
| """Chess environment for OpenEnv using moonfish.""" | |
| import random | |
| import uuid | |
| from typing import Any, Dict, Optional, Tuple | |
| import chess | |
| from moonfish.psqt import board_evaluation, MG_PIECE_VALUES, count_pieces, get_phase | |
| from moonfish.lib import search_move | |
| from ..models import ChessAction, ChessObservation, ChessState, RewardConfig | |
| class ChessEnvironment: | |
| """ | |
| Chess environment implementing the OpenEnv interface. | |
| Uses python-chess for game logic and moonfish for position evaluation. | |
| Designed for RL training where an agent plays as one color against | |
| an opponent (which can be random, moonfish engine, or self-play). | |
| """ | |
| def __init__( | |
| self, | |
| reward_config: Optional[RewardConfig] = None, | |
| max_moves: int = 500, | |
| agent_color: Optional[ | |
| bool | |
| ] = None, # None = alternate, True = White, False = Black | |
| opponent: Optional[ | |
| str | |
| ] = None, # None = self-play, "moonfish" = moonfish engine, "random" = random | |
| opponent_depth: int = 2, # Search depth for moonfish opponent | |
| ): | |
| """ | |
| Initialize the chess environment. | |
| Args: | |
| reward_config: Configuration for reward shaping | |
| max_moves: Maximum half-moves before draw (prevents infinite games) | |
| agent_color: Which color the RL agent plays (None = alternates each episode) | |
| opponent: Opponent type - None (self-play), "moonfish", or "random" | |
| opponent_depth: Search depth when using moonfish as opponent | |
| """ | |
| self.reward_config = reward_config or RewardConfig() | |
| self.max_moves = max_moves | |
| self.agent_color_setting = agent_color | |
| self.opponent = opponent | |
| self.opponent_depth = opponent_depth | |
| # Will be set on reset | |
| self._board: Optional[chess.Board] = None | |
| self._state: Optional[ChessState] = None | |
| self._agent_color: bool = chess.WHITE | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| fen: Optional[str] = None, | |
| **kwargs, | |
| ) -> ChessObservation: | |
| """ | |
| Initialize a new chess game episode. | |
| Args: | |
| seed: Random seed (unused for now, chess is deterministic) | |
| episode_id: Unique identifier for this episode | |
| fen: Optional starting position in FEN notation | |
| Returns: | |
| Initial observation of the board state | |
| """ | |
| # Create new board | |
| if fen: | |
| self._board = chess.Board(fen) | |
| else: | |
| self._board = chess.Board() | |
| # Determine agent color | |
| if self.agent_color_setting is None: | |
| # Alternate each episode based on episode_id hash | |
| if episode_id: | |
| self._agent_color = hash(episode_id) % 2 == 0 | |
| else: | |
| self._agent_color = chess.WHITE | |
| else: | |
| self._agent_color = self.agent_color_setting | |
| # Initialize state | |
| self._state = ChessState( | |
| episode_id=episode_id or uuid.uuid4().hex, | |
| step_count=0, | |
| current_player="white" if self._board.turn else "black", | |
| fen=self._board.fen(), | |
| move_history=[], | |
| ) | |
| # If agent plays Black and opponent is configured, opponent moves first | |
| if self.opponent is not None and self._agent_color == chess.BLACK: | |
| self._make_opponent_move() | |
| return self._get_observation() | |
| def step( | |
| self, action: ChessAction, timeout_s: Optional[float] = None, **kwargs | |
| ) -> Tuple[ChessObservation, float, bool]: | |
| """ | |
| Execute a chess move and return the resulting state. | |
| Args: | |
| action: The move to make in UCI format (e.g., "e2e4") | |
| timeout_s: Unused timeout parameter | |
| Returns: | |
| Tuple of (observation, reward, done) | |
| """ | |
| if self._board is None or self._state is None: | |
| raise RuntimeError("Environment not initialized. Call reset() first.") | |
| # Parse the move | |
| try: | |
| move = chess.Move.from_uci(action.move) | |
| except ValueError: | |
| # Invalid move format | |
| return self._handle_illegal_move(f"Invalid move format: {action.move}") | |
| # Check if move is legal | |
| if move not in self._board.legal_moves: | |
| return self._handle_illegal_move(f"Illegal move: {action.move}") | |
| # Execute the move | |
| self._board.push(move) | |
| self._state.step_count += 1 | |
| self._state.move_history.append(action.move) | |
| self._state.current_player = "white" if self._board.turn else "black" | |
| self._state.fen = self._board.fen() | |
| # Calculate reward and check for game end | |
| reward, done = self._calculate_reward_and_done() | |
| # If game not over and opponent is configured, make opponent move | |
| if not done and self.opponent is not None: | |
| self._make_opponent_move() | |
| # Recalculate after opponent move | |
| opp_reward, done = self._calculate_reward_and_done() | |
| # Opponent's reward is negative of ours (zero-sum) | |
| reward += -opp_reward if done else 0 | |
| observation = self._get_observation(done=done, reward=reward if done else None) | |
| return observation, reward, done | |
| def state(self) -> ChessState: | |
| """Return the current episode state.""" | |
| if self._state is None: | |
| raise RuntimeError("Environment not initialized. Call reset() first.") | |
| return self._state | |
| def close(self) -> None: | |
| """Clean up resources.""" | |
| self._board = None | |
| self._state = None | |
| def get_metadata(self) -> Dict[str, Any]: | |
| """Return environment metadata.""" | |
| return { | |
| "name": "chess", | |
| "version": "1.0.0", | |
| "max_moves": self.max_moves, | |
| "reward_config": { | |
| "win": self.reward_config.win, | |
| "loss": self.reward_config.loss, | |
| "draw": self.reward_config.draw, | |
| "illegal_move": self.reward_config.illegal_move, | |
| "use_evaluation": self.reward_config.use_evaluation, | |
| "evaluation_scale": self.reward_config.evaluation_scale, | |
| }, | |
| } | |
| def _get_observation( | |
| self, | |
| done: bool = False, | |
| reward: Optional[float] = None, | |
| result: Optional[str] = None, | |
| error: Optional[str] = None, | |
| ) -> ChessObservation: | |
| """Build observation from current board state.""" | |
| assert self._board is not None | |
| legal_moves = [move.uci() for move in self._board.legal_moves] | |
| metadata: Dict[str, Any] = {} | |
| # Add evaluation if configured | |
| if self.reward_config.use_evaluation: | |
| metadata["evaluation"] = board_evaluation(self._board) | |
| # Add material count | |
| metadata["material"] = self._get_material_count() | |
| # Add game phase (0 = opening, 256 = endgame) | |
| metadata["phase"] = get_phase(self._board) | |
| metadata["fullmove_number"] = self._board.fullmove_number | |
| metadata["halfmove_clock"] = self._board.halfmove_clock | |
| if error: | |
| metadata["error"] = error | |
| # Determine result string if game is over | |
| if done and result is None: | |
| result = self._get_result_string() | |
| return ChessObservation( | |
| fen=self._board.fen(), | |
| legal_moves=legal_moves, | |
| is_check=self._board.is_check(), | |
| done=done, | |
| reward=reward, | |
| result=result, | |
| metadata=metadata, | |
| ) | |
| def _calculate_reward_and_done(self) -> Tuple[float, bool]: | |
| """Calculate reward and check if episode is done.""" | |
| assert self._board is not None | |
| # Check for game end | |
| if self._board.is_checkmate(): | |
| # The side to move is checkmated, so the previous mover won | |
| winner = not self._board.turn | |
| if winner == self._agent_color: | |
| return self.reward_config.win, True | |
| else: | |
| return self.reward_config.loss, True | |
| if self._board.is_stalemate(): | |
| return self.reward_config.draw, True | |
| if self._board.is_insufficient_material(): | |
| return self.reward_config.draw, True | |
| if self._board.is_fifty_moves(): | |
| return self.reward_config.draw, True | |
| if self._board.is_repetition(3): | |
| return self.reward_config.draw, True | |
| # Check move limit | |
| if self._state and self._state.step_count >= self.max_moves: | |
| return self.reward_config.draw, True | |
| # Game continues | |
| reward = 0.0 | |
| # Optional: Add evaluation-based intermediate rewards | |
| if self.reward_config.use_evaluation: | |
| eval_score = board_evaluation(self._board) | |
| # Normalize evaluation to agent's perspective | |
| if self._board.turn != self._agent_color: | |
| eval_score = -eval_score | |
| reward = eval_score * self.reward_config.evaluation_scale | |
| return reward, False | |
| def _handle_illegal_move( | |
| self, error_msg: str | |
| ) -> Tuple[ChessObservation, float, bool]: | |
| """Handle an illegal move attempt.""" | |
| observation = self._get_observation(done=False, error=error_msg) | |
| return observation, self.reward_config.illegal_move, False | |
| def _get_result_string(self) -> str: | |
| """Get the game result as a string.""" | |
| assert self._board is not None | |
| if self._board.is_checkmate(): | |
| return "1-0" if not self._board.turn else "0-1" | |
| return "1/2-1/2" | |
| def _get_material_count(self) -> Dict[str, int]: | |
| """Count material for both sides using moonfish piece values.""" | |
| assert self._board is not None | |
| # count_pieces returns [wp, bp, wn, bn, wb, bb, wr, br, wq, bq] | |
| pieces = count_pieces(self._board) | |
| wp, bp, wn, bn, wb, bb, wr, br, wq, bq = pieces | |
| white = ( | |
| wp * MG_PIECE_VALUES[chess.PAWN] | |
| + wn * MG_PIECE_VALUES[chess.KNIGHT] | |
| + wb * MG_PIECE_VALUES[chess.BISHOP] | |
| + wr * MG_PIECE_VALUES[chess.ROOK] | |
| + wq * MG_PIECE_VALUES[chess.QUEEN] | |
| ) | |
| black = ( | |
| bp * MG_PIECE_VALUES[chess.PAWN] | |
| + bn * MG_PIECE_VALUES[chess.KNIGHT] | |
| + bb * MG_PIECE_VALUES[chess.BISHOP] | |
| + br * MG_PIECE_VALUES[chess.ROOK] | |
| + bq * MG_PIECE_VALUES[chess.QUEEN] | |
| ) | |
| return {"white": white, "black": black} | |
| def _make_opponent_move(self) -> None: | |
| """Make a move for the opponent using configured strategy.""" | |
| assert self._board is not None | |
| assert self._state is not None | |
| if not list(self._board.legal_moves): | |
| return # No legal moves (game should be over) | |
| if self.opponent == "moonfish": | |
| # Use moonfish engine to find best move | |
| move = search_move(self._board, depth=self.opponent_depth) | |
| elif self.opponent == "random": | |
| # Pick a random legal move | |
| move = random.choice(list(self._board.legal_moves)) | |
| else: | |
| return # No opponent configured | |
| # Execute opponent's move | |
| self._board.push(move) | |
| self._state.step_count += 1 | |
| self._state.move_history.append(move.uci()) | |
| self._state.current_player = "white" if self._board.turn else "black" | |
| self._state.fen = self._board.fen() | |