""" Decomposed Chess Tokenizer for the Chess Challenge. Each move becomes 3 or 4 tokens: WP e2_f e4_t BN g8_f f6_t Promotion adds an extra token: WP e7_f e8_t =q Why this helps: - Fixed small vocab (~150 tokens) - Near-zero OOV / UNK, so the evaluator can always parse squares - Compatible with the provided evaluate.py (it auto-detects 'decomposed') Special tokens behavior: - Adds BOS only (NO EOS) - If BOS already present, does not add it twice """ from __future__ import annotations import json import os from typing import Dict, List, Optional from transformers import PreTrainedTokenizer class ChessTokenizer(PreTrainedTokenizer): model_input_names = ["input_ids", "attention_mask"] vocab_files_names = {"vocab_file": "vocab.json"} PAD_TOKEN = "[PAD]" BOS_TOKEN = "[BOS]" EOS_TOKEN = "[EOS]" # kept for compatibility, not auto-added UNK_TOKEN = "[UNK]" def __init__( self, vocab_file: Optional[str] = None, vocab: Optional[Dict[str, int]] = None, **kwargs, ): self._pad_token = self.PAD_TOKEN self._bos_token = self.BOS_TOKEN self._eos_token = self.EOS_TOKEN self._unk_token = self.UNK_TOKEN # avoid duplicates from kwargs kwargs.pop("pad_token", None) kwargs.pop("bos_token", None) kwargs.pop("eos_token", None) kwargs.pop("unk_token", None) if vocab is not None: self._vocab = vocab elif vocab_file is not None and os.path.exists(vocab_file): with open(vocab_file, "r", encoding="utf-8") as f: self._vocab = json.load(f) else: self._vocab = self._build_fixed_vocab() self._ids_to_tokens = {v: k for k, v in self._vocab.items()} super().__init__( pad_token=self._pad_token, bos_token=self._bos_token, eos_token=self._eos_token, unk_token=self._unk_token, **kwargs, ) # -------------------------- # Fixed vocab: pieces + squares + promos # -------------------------- @staticmethod def _all_squares() -> List[str]: files = "abcdefgh" ranks = "12345678" return [f + r for r in ranks for f in files] # a1..h8 def _build_fixed_vocab(self) -> Dict[str, int]: special = [self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN] # piece tokens: WP..WK, BP..BK piece_tokens = [f"{c}{p}" for c in "WB" for p in "PNBRQK"] squares = self._all_squares() from_tokens = [f"{sq}_f" for sq in squares] to_tokens = [f"{sq}_t" for sq in squares] promo_tokens = ["=q", "=r", "=b", "=n"] tokens = special + piece_tokens + from_tokens + to_tokens + promo_tokens return {tok: i for i, tok in enumerate(tokens)} # -------------------------- # Special tokens handling (robust with evaluate.py) # -------------------------- def build_inputs_with_special_tokens( self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None ) -> List[int]: # BOS only, NO EOS if token_ids_1 is not None: token_ids_0 = token_ids_0 + token_ids_1 if token_ids_0 and token_ids_0[0] == self.bos_token_id: return token_ids_0 return [self.bos_token_id] + token_ids_0 def get_special_tokens_mask( self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None, already_has_special_tokens: bool = False, ) -> List[int]: if already_has_special_tokens: specials = {self.pad_token_id, self.bos_token_id, self.eos_token_id, self.unk_token_id} return [1 if t in specials else 0 for t in token_ids_0] if token_ids_1 is None: return [1] + [0] * len(token_ids_0) return [1] + [0] * (len(token_ids_0) + len(token_ids_1)) def create_token_type_ids_from_sequences( self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None ) -> List[int]: if token_ids_1 is None: return [0] * (len(token_ids_0) + 1) return [0] * (len(token_ids_0) + len(token_ids_1) + 1) # -------------------------- # Tokenization # -------------------------- def _tokenize(self, text: str) -> List[str]: if not text or not text.strip(): return [] parts = text.strip().split() out: List[str] = [] for tok in parts: # allow literal special tokens present in text if tok in {self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN}: out.append(tok) continue # already decomposed tokens if (len(tok) == 2 and tok[0] in "WB" and tok[1] in "PNBRQK") or tok.endswith("_f") or tok.endswith("_t") or tok in {"=q", "=r", "=b", "=n"}: out.append(tok) continue # parse extended UCI (dataset): WPe2e4, BNg8f6(x), WPe7e8=Q(+), ... if len(tok) < 6: out.append(self.UNK_TOKEN) continue color = tok[0] piece = tok[1] from_sq = tok[2:4] to_sq = tok[4:6] out.append(f"{color}{piece}") out.append(f"{from_sq}_f") out.append(f"{to_sq}_t") # promotion like "=Q" if "=" in tok: try: promo_part = tok.split("=", 1)[1] promo_letter = promo_part[0].lower() promo_tok = f"={promo_letter}" if promo_tok in self._vocab: out.append(promo_tok) except Exception: pass return out def _convert_token_to_id(self, token: str) -> int: return self._vocab.get(token, self._vocab[self.UNK_TOKEN]) def _convert_id_to_token(self, index: int) -> str: return self._ids_to_tokens.get(index, self.UNK_TOKEN) def convert_tokens_to_string(self, tokens: List[str]) -> str: return " ".join(tokens) # -------------------------- # Vocab I/O # -------------------------- @property def vocab_size(self) -> int: return len(self._vocab) def get_vocab(self) -> Dict[str, int]: return dict(self._vocab) def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> tuple: os.makedirs(save_directory, exist_ok=True) vocab_file = os.path.join( save_directory, (filename_prefix + "-" if filename_prefix else "") + "vocab.json", ) with open(vocab_file, "w", encoding="utf-8") as f: json.dump(self._vocab, f, ensure_ascii=False, indent=2) return (vocab_file,)