| # """ | |
| # Custom Chess Tokenizer for the Chess Challenge. | |
| # This tokenizer treats each move as a single token using the extended UCI notation | |
| # from the Lichess dataset (e.g., WPe2e4, BNg8f6). | |
| # The dataset format uses: | |
| # - W/B prefix for White/Black | |
| # - Piece letter: P=Pawn, N=Knight, B=Bishop, R=Rook, Q=Queen, K=King | |
| # - Source and destination squares (e.g., e2e4) | |
| # - Special suffixes: (x)=capture, (+)=check, (+*)=checkmate, (o)/(O)=castling | |
| # """ | |
| # | |
| # from __future__ import annotations | |
| # import json | |
| # import os | |
| # from pathlib import Path | |
| # from typing import Dict, List, Optional | |
| # from transformers import PreTrainedTokenizer | |
| # class ChessTokenizer(PreTrainedTokenizer): | |
| # """ | |
| # A custom tokenizer for chess moves using extended UCI notation. | |
| # This tokenizer maps each possible chess move to a unique token ID. | |
| # The vocabulary is built from the training dataset to ensure all moves | |
| # encountered during training have a corresponding token. | |
| # Example: | |
| # >>> tokenizer = ChessTokenizer() | |
| # >>> tokenizer.encode("WPe2e4 BPe7e5") | |
| # [1, 42, 87, 2] # [BOS, e2e4, e7e5, EOS] | |
| # """ | |
| # model_input_names = ["input_ids", "attention_mask"] | |
| # vocab_files_names = {"vocab_file": "vocab.json"} | |
| # # Special tokens | |
| # PAD_TOKEN = "[PAD]" | |
| # BOS_TOKEN = "[BOS]" | |
| # EOS_TOKEN = "[EOS]" | |
| # UNK_TOKEN = "[UNK]" | |
| # def __init__( | |
| # self, | |
| # vocab_file: Optional[str] = None, | |
| # vocab: Optional[Dict[str, int]] = None, | |
| # **kwargs, | |
| # ): | |
| # """ | |
| # Initialize the chess tokenizer. | |
| # Args: | |
| # vocab_file: Path to a JSON file containing the vocabulary mapping. | |
| # vocab: Dictionary mapping tokens to IDs (alternative to vocab_file). | |
| # **kwargs: Additional arguments passed to PreTrainedTokenizer. | |
| # """ | |
| # # Initialize special tokens | |
| # self._pad_token = self.PAD_TOKEN | |
| # self._bos_token = self.BOS_TOKEN | |
| # self._eos_token = self.EOS_TOKEN | |
| # self._unk_token = self.UNK_TOKEN | |
| # # Remove any duplicate special-token entries passed through kwargs | |
| # # to avoid "multiple values for keyword" errors when loading from disk. | |
| # kwargs.pop("pad_token", None) | |
| # kwargs.pop("bos_token", None) | |
| # kwargs.pop("eos_token", None) | |
| # kwargs.pop("unk_token", None) | |
| # # Load or create vocabulary | |
| # if vocab is not None: | |
| # self._vocab = vocab | |
| # elif vocab_file is not None and os.path.exists(vocab_file): | |
| # with open(vocab_file, "r", encoding="utf-8") as f: | |
| # self._vocab = json.load(f) | |
| # else: | |
| # # Create a minimal vocabulary with just special tokens | |
| # # The full vocabulary should be built from the dataset | |
| # self._vocab = self._create_default_vocab() | |
| # # Create reverse mapping | |
| # self._ids_to_tokens = {v: k for k, v in self._vocab.items()} | |
| # # Call parent init AFTER setting up vocab | |
| # super().__init__( | |
| # pad_token=self._pad_token, | |
| # bos_token=self._bos_token, | |
| # eos_token=self._eos_token, | |
| # unk_token=self._unk_token, | |
| # **kwargs, | |
| # ) | |
| # def _create_default_vocab(self) -> Dict[str, int]: | |
| # """ | |
| # Create a minimal default vocabulary with just special tokens. | |
| # For the full vocabulary, use `build_vocab_from_dataset()`. | |
| # This minimal vocab is just a placeholder - you should build from data. | |
| # """ | |
| # special_tokens = [self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN] | |
| # vocab = {token: idx for idx, token in enumerate(special_tokens)} | |
| # return vocab | |
| # # @classmethod | |
| # # def build_vocab_from_iterator( | |
| # # cls, | |
| # # iterator, | |
| # # min_frequency: int = 1, | |
| # # ) -> "ChessTokenizer": | |
| # # """ | |
| # # Build a tokenizer vocabulary from an iterator of game strings. | |
| # # Args: | |
| # # iterator: An iterator yielding game strings (space-separated moves). | |
| # # min_frequency: Minimum frequency for a token to be included. | |
| # # Returns: | |
| # # A ChessTokenizer with the built vocabulary. | |
| # # """ | |
| # # from collections import Counter | |
| # # token_counts = Counter() | |
| # # for game in iterator: | |
| # # moves = game.strip().split() | |
| # # token_counts.update(moves) | |
| # # # Filter by frequency | |
| # # tokens = [ | |
| # # token for token, count in token_counts.items() | |
| # # if count >= min_frequency | |
| # # ] | |
| # # # Sort for reproducibility | |
| # # tokens = sorted(tokens) | |
| # # # Build vocabulary | |
| # # special_tokens = [cls.PAD_TOKEN, cls.BOS_TOKEN, cls.EOS_TOKEN, cls.UNK_TOKEN] | |
| # # vocab = {token: idx for idx, token in enumerate(special_tokens + tokens)} | |
| # # return cls(vocab=vocab) | |
| # @classmethod | |
| # def build_vocab_from_iterator( | |
| # cls, | |
| # iterator, | |
| # vocab_size: int = 1200, | |
| # min_frequency: int = 1, | |
| # ) -> "ChessTokenizer": | |
| # """ | |
| # Build a tokenizer vocabulary from an iterator of game strings. | |
| # - Controls final vocab size explicitly via vocab_size. | |
| # - Keeps the most frequent move tokens (best coverage). | |
| # - Uses min_frequency as a floor, but vocab_size is the main control. | |
| # """ | |
| # from collections import Counter | |
| # token_counts = Counter() | |
| # for game in iterator: | |
| # moves = game.strip().split() | |
| # token_counts.update(moves) | |
| # # Filter by min_frequency first | |
| # items = [(tok, cnt) for tok, cnt in token_counts.items() if cnt >= min_frequency] | |
| # # Sort by frequency desc, then token for determinism | |
| # items.sort(key=lambda x: (-x[1], x[0])) | |
| # special_tokens = [cls.PAD_TOKEN, cls.BOS_TOKEN, cls.EOS_TOKEN, cls.UNK_TOKEN] | |
| # max_move_tokens = max(0, vocab_size - len(special_tokens)) | |
| # move_tokens = [tok for tok, _ in items[:max_move_tokens]] | |
| # vocab = {token: idx for idx, token in enumerate(special_tokens + move_tokens)} | |
| # return cls(vocab=vocab) | |
| # # @classmethod | |
| # # def build_vocab_from_dataset( | |
| # # cls, | |
| # # dataset_name: str = "dlouapre/lichess_2025-01_1M", | |
| # # split: str = "train", | |
| # # column: str = "text", | |
| # # min_frequency: int = 500, | |
| # # max_samples: Optional[int] = 100000, | |
| # # ) -> "ChessTokenizer": | |
| # # """ | |
| # # Build a tokenizer vocabulary from a Hugging Face dataset. | |
| # # Args: | |
| # # dataset_name: Name of the dataset on Hugging Face Hub. | |
| # # split: Dataset split to use. | |
| # # column: Column containing the game strings. | |
| # # min_frequency: Minimum frequency for a token to be included (default: 500). | |
| # # max_samples: Maximum number of samples to process (default: 100k). | |
| # # Returns: | |
| # # A ChessTokenizer with the built vocabulary. | |
| # # """ | |
| # # from datasets import load_dataset | |
| # # dataset = load_dataset(dataset_name, split=split) | |
| # # if max_samples is not None: | |
| # # dataset = dataset.select(range(min(max_samples, len(dataset)))) | |
| # # def game_iterator(): | |
| # # for example in dataset: | |
| # # yield example[column] | |
| # # return cls.build_vocab_from_iterator(game_iterator(), min_frequency=min_frequency) | |
| # @classmethod | |
| # def build_vocab_from_dataset( | |
| # cls, | |
| # dataset_name: str = "dlouapre/lichess_2025-01_1M", | |
| # split: str = "train", | |
| # column: str = "text", | |
| # vocab_size: int = 1200, | |
| # min_frequency: int = 1, | |
| # max_samples: Optional[int] = 200000, | |
| # ) -> "ChessTokenizer": | |
| # """ | |
| # Build a tokenizer vocabulary from a Hugging Face dataset. | |
| # Args: | |
| # vocab_size: Final vocab size INCLUDING special tokens. | |
| # min_frequency: Minimum count to consider a move (usually 1 is fine). | |
| # max_samples: How many games to scan to build vocab. | |
| # """ | |
| # from datasets import load_dataset | |
| # dataset = load_dataset(dataset_name, split=split) | |
| # # if max_samples is not None: # v0&1 | |
| # # dataset = dataset.select(range(min(max_samples, len(dataset)))) | |
| # if max_samples is not None: # v2 | |
| # n = min(max_samples, len(dataset)) | |
| # dataset = dataset.shuffle(seed=42).select(range(n)) | |
| # def game_iterator(): | |
| # for example in dataset: | |
| # yield example[column] | |
| # return cls.build_vocab_from_iterator( | |
| # game_iterator(), | |
| # vocab_size=vocab_size, | |
| # min_frequency=min_frequency, | |
| # ) | |
| # @property | |
| # def vocab_size(self) -> int: | |
| # """Return the size of the vocabulary.""" | |
| # return len(self._vocab) | |
| # def get_vocab(self) -> Dict[str, int]: | |
| # """Return the vocabulary as a dictionary.""" | |
| # return dict(self._vocab) | |
| # def _tokenize(self, text: str) -> List[str]: | |
| # """ | |
| # Tokenize a string of moves into a list of tokens. | |
| # Args: | |
| # text: A string of space-separated moves. | |
| # Returns: | |
| # List of move tokens. | |
| # """ | |
| # return text.strip().split() | |
| # def _convert_token_to_id(self, token: str) -> int: | |
| # """Convert a token to its ID.""" | |
| # return self._vocab.get(token, self._vocab.get(self.UNK_TOKEN, 0)) | |
| # def _convert_id_to_token(self, index: int) -> str: | |
| # """Convert an ID to its token.""" | |
| # return self._ids_to_tokens.get(index, self.UNK_TOKEN) | |
| # def convert_tokens_to_string(self, tokens: List[str]) -> str: | |
| # """Convert a list of tokens back to a string.""" | |
| # # Filter out special tokens for cleaner output | |
| # special = {self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN} | |
| # return " ".join(t for t in tokens if t not in special) | |
| # def save_vocabulary( | |
| # self, | |
| # save_directory: str, | |
| # filename_prefix: Optional[str] = None, | |
| # ) -> tuple: | |
| # """ | |
| # Save the vocabulary to a JSON file. | |
| # Args: | |
| # save_directory: Directory to save the vocabulary. | |
| # filename_prefix: Optional prefix for the filename. | |
| # Returns: | |
| # Tuple containing the path to the saved vocabulary file. | |
| # """ | |
| # if not os.path.isdir(save_directory): | |
| # os.makedirs(save_directory, exist_ok=True) | |
| # vocab_file = os.path.join( | |
| # save_directory, | |
| # (filename_prefix + "-" if filename_prefix else "") + "vocab.json", | |
| # ) | |
| # with open(vocab_file, "w", encoding="utf-8") as f: | |
| # json.dump(self._vocab, f, ensure_ascii=False, indent=2) | |
| # return (vocab_file,) | |
| # # def build_inputs_with_special_tokens(self, token_ids_0, token_ids_1=None): | |
| # # if token_ids_1 is not None: | |
| # # # Not expected here, but handle gracefully | |
| # # token_ids = token_ids_0 + token_ids_1 | |
| # # else: | |
| # # token_ids = token_ids_0 | |
| # # return [self.bos_token_id] + token_ids + [self.eos_token_id] | |
| # # def get_special_tokens_mask(self, token_ids_0, token_ids_1=None, already_has_special_tokens=False): | |
| # # if already_has_special_tokens: | |
| # # return [1 if t in (self.pad_token_id, self.bos_token_id, self.eos_token_id, self.unk_token_id) else 0 for t in token_ids_0] | |
| # # if token_ids_1 is not None: | |
| # # token_ids = token_ids_0 + token_ids_1 | |
| # # else: | |
| # # token_ids = token_ids_0 | |
| # # return [1] + [0] * len(token_ids) + [1] | |
| # def count_vocab_from_dataset( | |
| # dataset_name: str = "dlouapre/lichess_2025-01_1M", | |
| # split: str = "train", | |
| # column: str = "text", | |
| # max_samples: Optional[int] = 10000, | |
| # ) -> Dict[str, int]: | |
| # """ | |
| # Count token frequencies in a dataset (useful for vocabulary analysis). | |
| # Args: | |
| # dataset_name: Name of the dataset on Hugging Face Hub. | |
| # split: Dataset split to use. | |
| # column: Column containing the game strings. | |
| # max_samples: Maximum number of samples to process. | |
| # Returns: | |
| # Dictionary mapping tokens to their frequencies. | |
| # """ | |
| # from collections import Counter | |
| # from datasets import load_dataset | |
| # dataset = load_dataset(dataset_name, split=split) | |
| # if max_samples is not None: | |
| # dataset = dataset.select(range(min(max_samples, len(dataset)))) | |
| # token_counts = Counter() | |
| # for example in dataset: | |
| # moves = example[column].strip().split() | |
| # token_counts.update(moves) | |
| # return dict(token_counts) | |
| """ | |
| Grammar-aware Chess Tokenizer for the Chess Challenge. | |
| Goal: maximize legal move extraction in evaluate.py which searches for | |
| two square patterns ([a-h][1-8]) in the generated text and takes the first two. | |
| Strategy: | |
| - Decompose each move into structured tokens: | |
| - CP_<color><piece> (e.g., CP_WP, CP_BN) | |
| - SQ_<square> (e.g., SQ_e2, SQ_e4) | |
| - EV_<event> (e.g., EV_NONE, EV_X, EV_PLUS, EV_MATE, EV_PROMO_Q, ...) | |
| - SEP (end-of-move marker, decoded as a space) | |
| - Deterministic vocab: no dataset-dependent OOV -> UNK for rare full moves disappears. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, List, Optional | |
| from transformers import PreTrainedTokenizer | |
| class ChessTokenizer(PreTrainedTokenizer): | |
| model_input_names = ["input_ids", "attention_mask"] | |
| vocab_files_names = {"vocab_file": "vocab.json"} | |
| PAD_TOKEN = "[PAD]" | |
| BOS_TOKEN = "[BOS]" | |
| EOS_TOKEN = "[EOS]" | |
| UNK_TOKEN = "[UNK]" | |
| SEP_TOKEN = "[SEP]" # end-of-move marker (decoded as a space) | |
| _SQUARE_RE = re.compile(r"^[a-h][1-8]$") # positions are in the format xY where x is in [a-h], y in [1-8] | |
| def __init__( | |
| self, | |
| vocab_file: Optional[str] = None, | |
| vocab: Optional[Dict[str, int]] = None, | |
| **kwargs, | |
| ): | |
| self._pad_token = self.PAD_TOKEN | |
| self._bos_token = self.BOS_TOKEN | |
| self._eos_token = self.EOS_TOKEN | |
| self._unk_token = self.UNK_TOKEN | |
| self._sep_token = self.SEP_TOKEN | |
| kwargs.pop("pad_token", None) | |
| kwargs.pop("bos_token", None) | |
| kwargs.pop("eos_token", None) | |
| kwargs.pop("unk_token", None) | |
| if vocab is not None: | |
| self._vocab = vocab | |
| elif vocab_file is not None and os.path.exists(vocab_file): | |
| with open(vocab_file, "r", encoding="utf-8") as f: | |
| self._vocab = json.load(f) | |
| else: | |
| self._vocab = self._create_default_vocab() | |
| self._ids_to_tokens = {v: k for k, v in self._vocab.items()} | |
| super().__init__( | |
| pad_token=self._pad_token, | |
| bos_token=self._bos_token, | |
| eos_token=self._eos_token, | |
| unk_token=self._unk_token, | |
| **kwargs, | |
| ) | |
| #### Vocab | |
| def _create_default_vocab(self) -> Dict[str, int]: | |
| special = [self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN, self.SEP_TOKEN] | |
| # Color+piece (12 tokens) | |
| cp = [f"CP_{c}{p}" for c in "WB" for p in "PNBRQK"] | |
| # Squares (64 tokens) | |
| squares = [f"SQ_{f}{r}" for f in "abcdefgh" for r in "12345678"] | |
| # Events: keep small & canonical (you can extend later) | |
| events = [ | |
| "EV_NONE", | |
| "EV_X", | |
| "EV_PLUS", | |
| "EV_MATE", | |
| "EV_XPLUS", | |
| "EV_XMATE", | |
| "EV_O", # kingside castle | |
| "EV_OO", # queenside castle | |
| "EV_PROMO_N", | |
| "EV_PROMO_B", | |
| "EV_PROMO_R", | |
| "EV_PROMO_Q", | |
| "EV_XPROMO_N", | |
| "EV_XPROMO_B", | |
| "EV_XPROMO_R", | |
| "EV_XPROMO_Q", | |
| ] | |
| vocab_list = special + cp + squares + events # this vocabulary has size 12 + 64 + 16 + 5 = 97 tokens | |
| return {tok: i for i, tok in enumerate(vocab_list)} | |
| def vocab_size(self) -> int: | |
| return len(self._vocab) | |
| def get_vocab(self) -> Dict[str, int]: | |
| return dict(self._vocab) | |
| #### Core tokenization | |
| def _tokenize(self, text: str) -> List[str]: | |
| """ | |
| Input is a space-separated list of moves in extended UCI, e.g. | |
| "WPe2e4 BPe7e5 ..." | |
| Output is a sequence of structured tokens: | |
| CP_WP SQ_e2 SQ_e4 EV_NONE [SEP] ... | |
| """ | |
| moves = text.strip().split() | |
| tokens: List[str] = [] | |
| for mv in moves: | |
| toks = self._tokenize_one_move(mv) | |
| tokens.extend(toks) | |
| tokens.append(self.SEP_TOKEN) | |
| return tokens | |
| def _tokenize_one_move(self, mv: str) -> List[str]: | |
| # Minimal sanity: needs at least "WPe2e4" length 6 | |
| if len(mv) < 6: | |
| return [self.UNK_TOKEN] | |
| color = mv[0] # W/B | |
| piece = mv[1] # P/N/B/R/Q/K | |
| from_sq = mv[2:4] | |
| to_sq = mv[4:6] | |
| suffix = mv[6:] # can include capture/check/mate/castle/promo etc. => cf events tokens | |
| cp_tok = f"CP_{color}{piece}" | |
| from_tok = f"SQ_{from_sq}" | |
| to_tok = f"SQ_{to_sq}" | |
| if cp_tok not in self._vocab or from_tok not in self._vocab or to_tok not in self._vocab: | |
| return [self.UNK_TOKEN] | |
| ev_tok = self._event_token(piece, from_sq, to_sq, suffix) | |
| return [cp_tok, from_tok, to_tok, ev_tok] | |
| def _event_token(self, piece: str, from_sq: str, to_sq: str, suffix: str) -> str: | |
| """ | |
| Canonicalize suffix into one of EV_* tokens. | |
| Keep it simple: evaluator does not need these, but they help learning. | |
| """ | |
| # Castling (dataset uses (o)/(O)) | |
| if "(o)" in suffix: # kingside | |
| return "EV_O" | |
| if "(O)" in suffix: # queenside | |
| return "EV_OO" | |
| capture = "(x" in suffix # covers (x), (x+), (x+*), (x+) etc. | |
| mate = "+*" in suffix | |
| check = "(+)" in suffix or "(x+)" in suffix or "(+)" in suffix # tolerant | |
| promo = None | |
| m = re.search(r"=([NBRQ])", suffix) | |
| if m: | |
| promo = m.group(1) | |
| if promo is not None: | |
| base = f"EV_PROMO_{promo}" | |
| if capture: | |
| base = f"EV_XPROMO_{promo}" | |
| return base if base in self._vocab else "EV_NONE" | |
| if capture and mate: | |
| return "EV_XMATE" | |
| if capture and check: | |
| return "EV_XPLUS" | |
| if capture: | |
| return "EV_X" | |
| if mate: | |
| return "EV_MATE" | |
| if check: | |
| return "EV_PLUS" | |
| return "EV_NONE" | |
| #### Conversions | |
| def _convert_token_to_id(self, token: str) -> int: | |
| return self._vocab.get(token, self._vocab[self.UNK_TOKEN]) | |
| def _convert_id_to_token(self, index: int) -> str: | |
| return self._ids_to_tokens.get(index, self.UNK_TOKEN) | |
| def convert_tokens_to_string(self, tokens: List[str]) -> str: | |
| """ | |
| Decode to a string that contains squares early and clearly. | |
| We intentionally emit raw squares like "e2" "e4" separated by spaces, | |
| so evaluate.py will reliably extract them. | |
| """ | |
| out: List[str] = [] | |
| special = {self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN} | |
| for tok in tokens: | |
| if tok in special: | |
| continue | |
| if tok == self.SEP_TOKEN: | |
| out.append(" ") | |
| continue | |
| if tok.startswith("SQ_"): | |
| out.append(tok[3:]) # "SQ_e2" -> "e2" | |
| out.append(" ") | |
| continue | |
| if tok.startswith("CP_"): | |
| # Optional: keep CP to help model conditioning; does not hurt extraction | |
| out.append(tok[3:]) # "CP_WP" -> "WP" | |
| out.append(" ") | |
| continue | |
| if tok.startswith("EV_"): | |
| # Optional: keep events; ensure no squares are embedded here | |
| out.append(tok[3:]) # "EV_X" -> "X" | |
| out.append(" ") | |
| continue | |
| # fallback | |
| out.append(tok) | |
| out.append(" ") | |
| return "".join(out).strip() | |
| def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> tuple: | |
| if not os.path.isdir(save_directory): | |
| os.makedirs(save_directory, exist_ok=True) | |
| vocab_file = os.path.join( | |
| save_directory, | |
| (filename_prefix + "-" if filename_prefix else "") + "vocab.json", | |
| ) | |
| with open(vocab_file, "w", encoding="utf-8") as f: | |
| json.dump(self._vocab, f, ensure_ascii=False, indent=2) | |
| return (vocab_file,) | |