|
|
"""
|
|
|
Custom Chess Tokenizer for the Chess Challenge.
|
|
|
|
|
|
This tokenizer uses a STRUCTURED approach to tokenize chess moves, breaking down
|
|
|
each move into its components to help the model learn legal chess patterns.
|
|
|
|
|
|
The dataset format uses extended UCI notation:
|
|
|
- W/B prefix for White/Black
|
|
|
- Piece letter: P=Pawn, N=Knight, B=Bishop, R=Rook, Q=Queen, K=King
|
|
|
- Source and destination squares (e.g., e2e4)
|
|
|
- Special suffixes: (x)=capture, (+)=check, (+*)=checkmate, (o)/(O)=castling
|
|
|
|
|
|
Instead of treating each move as a single token (which creates thousands of tokens),
|
|
|
we tokenize the COMPONENTS:
|
|
|
- Color tokens: W, B
|
|
|
- Piece tokens: P, N, B, R, Q, K
|
|
|
- Square tokens: a1, a2, ..., h8 (64 squares)
|
|
|
- Suffix tokens: (x), (+), (+*), (o), (O), =Q, =R, =B, =N
|
|
|
|
|
|
This gives ~80 tokens total, helping the model learn:
|
|
|
1. Valid squares on the board
|
|
|
2. Which pieces can make which types of moves
|
|
|
3. The structure of legal chess moves
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import json
|
|
|
import os
|
|
|
import re
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
from transformers import PreTrainedTokenizer
|
|
|
|
|
|
|
|
|
class ChessTokenizer(PreTrainedTokenizer):
|
|
|
"""
|
|
|
A structured tokenizer for chess moves using component-based tokenization.
|
|
|
|
|
|
Instead of treating each move as a single token, this tokenizer breaks moves
|
|
|
into their structural components (color, piece, from-square, to-square, suffix).
|
|
|
This smaller vocabulary helps the model learn valid chess patterns.
|
|
|
|
|
|
Vocabulary (~80 tokens):
|
|
|
- Special: [PAD], [BOS], [EOS], [UNK]
|
|
|
- Colors: W, B
|
|
|
- Pieces: P, N, B, R, Q, K
|
|
|
- Squares: a1-h8 (64 tokens)
|
|
|
- Suffixes: (x), (+), (+*), (o), (O), =Q, =R, =B, =N
|
|
|
|
|
|
Example:
|
|
|
>>> tokenizer = ChessTokenizer()
|
|
|
>>> tokens = tokenizer.tokenize("WPe2e4 BPe7e5")
|
|
|
>>> print(tokens)
|
|
|
['W', 'P', 'e2', 'e4', 'B', 'P', 'e7', 'e5']
|
|
|
"""
|
|
|
|
|
|
model_input_names = ["input_ids", "attention_mask"]
|
|
|
vocab_files_names = {"vocab_file": "vocab.json"}
|
|
|
|
|
|
|
|
|
PAD_TOKEN = "[PAD]"
|
|
|
BOS_TOKEN = "[BOS]"
|
|
|
EOS_TOKEN = "[EOS]"
|
|
|
UNK_TOKEN = "[UNK]"
|
|
|
|
|
|
|
|
|
COLORS = ["W", "B"]
|
|
|
PIECES = ["P", "N", "B", "R", "Q", "K"]
|
|
|
FILES = ["a", "b", "c", "d", "e", "f", "g", "h"]
|
|
|
RANKS = ["1", "2", "3", "4", "5", "6", "7", "8"]
|
|
|
SQUARES = [f + r for f in ["a", "b", "c", "d", "e", "f", "g", "h"]
|
|
|
for r in ["1", "2", "3", "4", "5", "6", "7", "8"]]
|
|
|
SUFFIXES = ["(x)", "(+)", "(+*)", "(o)", "(O)", "=Q", "=R", "=B", "=N"]
|
|
|
|
|
|
|
|
|
|
|
|
MOVE_PATTERN = re.compile(
|
|
|
r'^([WB])([PNBRQK])([a-h][1-8])([a-h][1-8])(=[QRBN])?(\([xo+*O]+\))?$'
|
|
|
)
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
vocab_file: Optional[str] = None,
|
|
|
vocab: Optional[Dict[str, int]] = None,
|
|
|
**kwargs,
|
|
|
):
|
|
|
"""
|
|
|
Initialize the chess tokenizer.
|
|
|
|
|
|
Args:
|
|
|
vocab_file: Path to a JSON file containing the vocabulary mapping.
|
|
|
vocab: Dictionary mapping tokens to IDs (alternative to vocab_file).
|
|
|
**kwargs: Additional arguments passed to PreTrainedTokenizer.
|
|
|
"""
|
|
|
|
|
|
self._pad_token = self.PAD_TOKEN
|
|
|
self._bos_token = self.BOS_TOKEN
|
|
|
self._eos_token = self.EOS_TOKEN
|
|
|
self._unk_token = self.UNK_TOKEN
|
|
|
|
|
|
|
|
|
|
|
|
kwargs.pop("pad_token", None)
|
|
|
kwargs.pop("bos_token", None)
|
|
|
kwargs.pop("eos_token", None)
|
|
|
kwargs.pop("unk_token", None)
|
|
|
|
|
|
|
|
|
if vocab is not None:
|
|
|
self._vocab = vocab
|
|
|
elif vocab_file is not None and os.path.exists(vocab_file):
|
|
|
with open(vocab_file, "r", encoding="utf-8") as f:
|
|
|
self._vocab = json.load(f)
|
|
|
else:
|
|
|
|
|
|
self._vocab = self._create_structured_vocab()
|
|
|
|
|
|
|
|
|
self._ids_to_tokens = {v: k for k, v in self._vocab.items()}
|
|
|
|
|
|
|
|
|
super().__init__(
|
|
|
pad_token=self._pad_token,
|
|
|
bos_token=self._bos_token,
|
|
|
eos_token=self._eos_token,
|
|
|
unk_token=self._unk_token,
|
|
|
**kwargs,
|
|
|
)
|
|
|
|
|
|
def _create_structured_vocab(self) -> Dict[str, int]:
|
|
|
"""
|
|
|
Create the structured vocabulary with all chess components.
|
|
|
|
|
|
This creates a fixed vocabulary of ~85 tokens covering all possible
|
|
|
chess move components.
|
|
|
"""
|
|
|
tokens = []
|
|
|
|
|
|
|
|
|
tokens.extend([self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN])
|
|
|
|
|
|
|
|
|
tokens.extend(self.COLORS)
|
|
|
|
|
|
|
|
|
tokens.extend(self.PIECES)
|
|
|
|
|
|
|
|
|
tokens.extend(self.SQUARES)
|
|
|
|
|
|
|
|
|
tokens.extend(self.SUFFIXES)
|
|
|
|
|
|
|
|
|
vocab = {token: idx for idx, token in enumerate(tokens)}
|
|
|
return vocab
|
|
|
|
|
|
def _create_default_vocab(self) -> Dict[str, int]:
|
|
|
"""Alias for _create_structured_vocab for compatibility."""
|
|
|
return self._create_structured_vocab()
|
|
|
|
|
|
def _parse_move(self, move: str) -> List[str]:
|
|
|
"""
|
|
|
Parse a single move into its component tokens.
|
|
|
|
|
|
Args:
|
|
|
move: A move in extended UCI format (e.g., "WPe2e4", "BNg8f6(x)").
|
|
|
|
|
|
Returns:
|
|
|
List of component tokens.
|
|
|
"""
|
|
|
move = move.strip()
|
|
|
if not move:
|
|
|
return []
|
|
|
|
|
|
|
|
|
if move in [self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN]:
|
|
|
return [move]
|
|
|
|
|
|
|
|
|
match = self.MOVE_PATTERN.match(move)
|
|
|
if match:
|
|
|
color, piece, from_sq, to_sq, promotion, suffix = match.groups()
|
|
|
tokens = [color, piece, from_sq, to_sq]
|
|
|
if promotion:
|
|
|
tokens.append(promotion)
|
|
|
if suffix:
|
|
|
tokens.append(suffix)
|
|
|
return tokens
|
|
|
|
|
|
|
|
|
|
|
|
tokens = []
|
|
|
i = 0
|
|
|
|
|
|
|
|
|
if i < len(move) and move[i] in self.COLORS:
|
|
|
tokens.append(move[i])
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
if i < len(move) and move[i] in self.PIECES:
|
|
|
tokens.append(move[i])
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
if i + 1 < len(move) and move[i:i+2] in self.SQUARES:
|
|
|
tokens.append(move[i:i+2])
|
|
|
i += 2
|
|
|
|
|
|
|
|
|
if i + 1 < len(move) and move[i:i+2] in self.SQUARES:
|
|
|
tokens.append(move[i:i+2])
|
|
|
i += 2
|
|
|
|
|
|
|
|
|
if i + 1 < len(move) and move[i:i+2] in self.SUFFIXES:
|
|
|
tokens.append(move[i:i+2])
|
|
|
i += 2
|
|
|
|
|
|
|
|
|
remaining = move[i:]
|
|
|
if remaining in self.SUFFIXES:
|
|
|
tokens.append(remaining)
|
|
|
elif remaining:
|
|
|
|
|
|
for suffix in self.SUFFIXES:
|
|
|
if remaining.startswith(suffix):
|
|
|
tokens.append(suffix)
|
|
|
break
|
|
|
|
|
|
|
|
|
if not tokens:
|
|
|
return [self.UNK_TOKEN]
|
|
|
|
|
|
return tokens
|
|
|
|
|
|
@classmethod
|
|
|
def build_vocab_from_iterator(
|
|
|
cls,
|
|
|
iterator,
|
|
|
min_frequency: int = 1,
|
|
|
) -> "ChessTokenizer":
|
|
|
"""
|
|
|
Build a tokenizer (for compatibility - vocab is fixed).
|
|
|
|
|
|
The structured tokenizer has a fixed vocabulary, so this method
|
|
|
simply returns a new tokenizer instance.
|
|
|
|
|
|
Args:
|
|
|
iterator: An iterator yielding game strings (ignored for structured vocab).
|
|
|
min_frequency: Minimum frequency (ignored for structured vocab).
|
|
|
|
|
|
Returns:
|
|
|
A ChessTokenizer with the structured vocabulary.
|
|
|
"""
|
|
|
return cls()
|
|
|
|
|
|
@classmethod
|
|
|
def build_vocab_from_dataset(
|
|
|
cls,
|
|
|
dataset_name: str = "dlouapre/lichess_2025-01_1M",
|
|
|
split: str = "train",
|
|
|
column: str = "text",
|
|
|
min_frequency: int = 500,
|
|
|
max_samples: Optional[int] = 100000,
|
|
|
) -> "ChessTokenizer":
|
|
|
"""
|
|
|
Build a tokenizer (for compatibility - vocab is fixed).
|
|
|
|
|
|
The structured tokenizer has a fixed vocabulary covering all valid
|
|
|
chess move components, so no dataset scanning is needed.
|
|
|
|
|
|
Args:
|
|
|
dataset_name: Name of the dataset (ignored).
|
|
|
split: Dataset split (ignored).
|
|
|
column: Column name (ignored).
|
|
|
min_frequency: Minimum frequency (ignored).
|
|
|
max_samples: Maximum samples (ignored).
|
|
|
|
|
|
Returns:
|
|
|
A ChessTokenizer with the structured vocabulary.
|
|
|
"""
|
|
|
return cls()
|
|
|
|
|
|
@property
|
|
|
def vocab_size(self) -> int:
|
|
|
"""Return the size of the vocabulary."""
|
|
|
return len(self._vocab)
|
|
|
|
|
|
def get_vocab(self) -> Dict[str, int]:
|
|
|
"""Return the vocabulary as a dictionary."""
|
|
|
return dict(self._vocab)
|
|
|
|
|
|
def _tokenize(self, text: str) -> List[str]:
|
|
|
"""
|
|
|
Tokenize a string of moves into component tokens.
|
|
|
|
|
|
Args:
|
|
|
text: A string of space-separated moves.
|
|
|
|
|
|
Returns:
|
|
|
List of component tokens.
|
|
|
"""
|
|
|
tokens = []
|
|
|
moves = text.strip().split()
|
|
|
|
|
|
for move in moves:
|
|
|
move_tokens = self._parse_move(move)
|
|
|
tokens.extend(move_tokens)
|
|
|
|
|
|
return tokens
|
|
|
|
|
|
def _convert_token_to_id(self, token: str) -> int:
|
|
|
"""Convert a token to its ID."""
|
|
|
return self._vocab.get(token, self._vocab.get(self.UNK_TOKEN, 0))
|
|
|
|
|
|
def _convert_id_to_token(self, index: int) -> str:
|
|
|
"""Convert an ID to its token."""
|
|
|
return self._ids_to_tokens.get(index, self.UNK_TOKEN)
|
|
|
|
|
|
def convert_tokens_to_string(self, tokens: List[str]) -> str:
|
|
|
"""
|
|
|
Convert a list of tokens back to a move string.
|
|
|
|
|
|
Reconstructs moves from component tokens by grouping them appropriately.
|
|
|
"""
|
|
|
|
|
|
special = {self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.UNK_TOKEN}
|
|
|
tokens = [t for t in tokens if t not in special]
|
|
|
|
|
|
if not tokens:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
result = []
|
|
|
current_move = []
|
|
|
|
|
|
for token in tokens:
|
|
|
|
|
|
if token in self.COLORS:
|
|
|
if current_move:
|
|
|
result.append("".join(current_move))
|
|
|
current_move = [token]
|
|
|
else:
|
|
|
current_move.append(token)
|
|
|
|
|
|
|
|
|
if current_move:
|
|
|
result.append("".join(current_move))
|
|
|
|
|
|
return " ".join(result)
|
|
|
|
|
|
def save_vocabulary(
|
|
|
self,
|
|
|
save_directory: str,
|
|
|
filename_prefix: Optional[str] = None,
|
|
|
) -> tuple:
|
|
|
"""
|
|
|
Save the vocabulary to a JSON file.
|
|
|
|
|
|
Args:
|
|
|
save_directory: Directory to save the vocabulary.
|
|
|
filename_prefix: Optional prefix for the filename.
|
|
|
|
|
|
Returns:
|
|
|
Tuple containing the path to the saved vocabulary file.
|
|
|
"""
|
|
|
if not os.path.isdir(save_directory):
|
|
|
os.makedirs(save_directory, exist_ok=True)
|
|
|
|
|
|
vocab_file = os.path.join(
|
|
|
save_directory,
|
|
|
(filename_prefix + "-" if filename_prefix else "") + "vocab.json",
|
|
|
)
|
|
|
|
|
|
with open(vocab_file, "w", encoding="utf-8") as f:
|
|
|
json.dump(self._vocab, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
return (vocab_file,)
|
|
|
|
|
|
|
|
|
def count_vocab_from_dataset(
|
|
|
dataset_name: str = "dlouapre/lichess_2025-01_1M",
|
|
|
split: str = "train",
|
|
|
column: str = "text",
|
|
|
max_samples: Optional[int] = 10000,
|
|
|
) -> Dict[str, int]:
|
|
|
"""
|
|
|
Count token frequencies in a dataset (useful for vocabulary analysis).
|
|
|
|
|
|
With the structured tokenizer, this counts component frequencies.
|
|
|
|
|
|
Args:
|
|
|
dataset_name: Name of the dataset on Hugging Face Hub.
|
|
|
split: Dataset split to use.
|
|
|
column: Column containing the game strings.
|
|
|
max_samples: Maximum number of samples to process.
|
|
|
|
|
|
Returns:
|
|
|
Dictionary mapping tokens to their frequencies.
|
|
|
"""
|
|
|
from collections import Counter
|
|
|
from datasets import load_dataset
|
|
|
|
|
|
tokenizer = ChessTokenizer()
|
|
|
|
|
|
dataset = load_dataset(dataset_name, split=split)
|
|
|
|
|
|
if max_samples is not None:
|
|
|
dataset = dataset.select(range(min(max_samples, len(dataset))))
|
|
|
|
|
|
token_counts = Counter()
|
|
|
|
|
|
for example in dataset:
|
|
|
tokens = tokenizer._tokenize(example[column])
|
|
|
token_counts.update(tokens)
|
|
|
|
|
|
return dict(token_counts)
|
|
|
|