|
|
""" |
|
|
Data loading utilities for the Chess Challenge. |
|
|
|
|
|
This module provides functions to load and process chess game data |
|
|
from the Lichess dataset on Hugging Face. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from typing import Dict, Iterator, List, Optional |
|
|
|
|
|
import torch |
|
|
from torch.utils.data import Dataset |
|
|
|
|
|
|
|
|
class ChessDataset(Dataset): |
|
|
""" |
|
|
PyTorch Dataset for chess games. |
|
|
|
|
|
This dataset loads games from a Hugging Face dataset and prepares |
|
|
them for language modeling training. |
|
|
|
|
|
Each game is tokenized and truncated/padded to max_length. |
|
|
The labels are shifted by one position for next-token prediction. |
|
|
|
|
|
Example: |
|
|
>>> from tokenizer import ChessTokenizer |
|
|
>>> tokenizer = ChessTokenizer.build_vocab_from_dataset() |
|
|
>>> dataset = ChessDataset(tokenizer, max_length=256) |
|
|
>>> sample = dataset[0] |
|
|
>>> print(sample["input_ids"].shape) # (256,) |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
tokenizer, |
|
|
dataset_name: str = "dlouapre/lichess_2025-01_1M", |
|
|
split: str = "train", |
|
|
column: str = "text", |
|
|
max_length: int = 256, |
|
|
max_samples: Optional[int] = None, |
|
|
): |
|
|
""" |
|
|
Initialize the chess dataset. |
|
|
|
|
|
Args: |
|
|
tokenizer: The chess tokenizer to use. |
|
|
dataset_name: Name of the dataset on Hugging Face Hub. |
|
|
split: Dataset split to use. |
|
|
column: Column containing the game strings. |
|
|
max_length: Maximum sequence length. |
|
|
max_samples: Maximum number of samples to load. |
|
|
""" |
|
|
from datasets import load_dataset |
|
|
|
|
|
self.tokenizer = tokenizer |
|
|
self.max_length = max_length |
|
|
self.column = column |
|
|
|
|
|
|
|
|
dataset = load_dataset(dataset_name, split=split) |
|
|
|
|
|
if max_samples is not None: |
|
|
dataset = dataset.select(range(min(max_samples, len(dataset)))) |
|
|
|
|
|
self.data = dataset |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return len(self.data) |
|
|
|
|
|
def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]: |
|
|
game = self.data[idx][self.column] |
|
|
|
|
|
|
|
|
game_with_bos = self.tokenizer.bos_token + " " + game |
|
|
|
|
|
|
|
|
encoding = self.tokenizer( |
|
|
game_with_bos, |
|
|
truncation=True, |
|
|
max_length=self.max_length, |
|
|
padding="max_length", |
|
|
return_tensors="pt", |
|
|
) |
|
|
|
|
|
|
|
|
input_ids = encoding["input_ids"].squeeze(0) |
|
|
attention_mask = encoding["attention_mask"].squeeze(0) |
|
|
|
|
|
|
|
|
labels = input_ids.clone() |
|
|
|
|
|
|
|
|
labels[attention_mask == 0] = -100 |
|
|
|
|
|
return { |
|
|
"input_ids": input_ids, |
|
|
"attention_mask": attention_mask, |
|
|
"labels": labels, |
|
|
} |
|
|
|
|
|
|
|
|
class ChessDataCollator: |
|
|
""" |
|
|
Data collator for chess games. |
|
|
|
|
|
This collator pads sequences to the same length within a batch |
|
|
and creates the appropriate attention masks. |
|
|
""" |
|
|
|
|
|
def __init__(self, tokenizer, max_length: int = 256): |
|
|
self.tokenizer = tokenizer |
|
|
self.max_length = max_length |
|
|
|
|
|
def __call__(self, features: List[Dict]) -> Dict[str, torch.Tensor]: |
|
|
|
|
|
input_ids = torch.stack([f["input_ids"] for f in features]) |
|
|
attention_mask = torch.stack([f["attention_mask"] for f in features]) |
|
|
labels = torch.stack([f["labels"] for f in features]) |
|
|
|
|
|
return { |
|
|
"input_ids": input_ids, |
|
|
"attention_mask": attention_mask, |
|
|
"labels": labels, |
|
|
} |
|
|
|
|
|
|
|
|
def create_train_val_datasets( |
|
|
tokenizer, |
|
|
dataset_name: str = "dlouapre/lichess_2025-01_1M", |
|
|
max_length: int = 256, |
|
|
train_samples: Optional[int] = None, |
|
|
val_samples: int = 5000, |
|
|
val_ratio: float = 0.05, |
|
|
): |
|
|
""" |
|
|
Create training and validation datasets. |
|
|
|
|
|
Args: |
|
|
tokenizer: The chess tokenizer. |
|
|
dataset_name: Name of the dataset. |
|
|
max_length: Maximum sequence length. |
|
|
train_samples: Maximum training samples (None for all). |
|
|
val_samples: Number of validation samples. |
|
|
val_ratio: Ratio of validation samples (used if train_samples is None). |
|
|
|
|
|
Returns: |
|
|
Tuple of (train_dataset, val_dataset). |
|
|
""" |
|
|
from datasets import load_dataset |
|
|
|
|
|
|
|
|
full_dataset = load_dataset(dataset_name, split="train") |
|
|
|
|
|
|
|
|
total = len(full_dataset) |
|
|
|
|
|
if train_samples is not None: |
|
|
n_train = min(train_samples, total - val_samples) |
|
|
else: |
|
|
n_train = int(total * (1 - val_ratio)) |
|
|
|
|
|
n_val = min(val_samples, total - n_train) |
|
|
|
|
|
|
|
|
train_data = full_dataset.select(range(n_train)) |
|
|
val_data = full_dataset.select(range(n_train, n_train + n_val)) |
|
|
|
|
|
|
|
|
train_dataset = ChessDataset( |
|
|
tokenizer=tokenizer, |
|
|
dataset_name=dataset_name, |
|
|
max_length=max_length, |
|
|
) |
|
|
train_dataset.data = train_data |
|
|
|
|
|
val_dataset = ChessDataset( |
|
|
tokenizer=tokenizer, |
|
|
dataset_name=dataset_name, |
|
|
max_length=max_length, |
|
|
) |
|
|
val_dataset.data = val_data |
|
|
|
|
|
return train_dataset, val_dataset |
|
|
|
|
|
|
|
|
def stream_games( |
|
|
dataset_name: str = "dlouapre/lichess_2025-01_1M", |
|
|
split: str = "train", |
|
|
column: str = "text", |
|
|
) -> Iterator[str]: |
|
|
""" |
|
|
Stream games from the dataset for memory-efficient processing. |
|
|
|
|
|
Args: |
|
|
dataset_name: Name of the dataset on Hugging Face Hub. |
|
|
split: Dataset split to use. |
|
|
column: Column containing the game strings. |
|
|
|
|
|
Yields: |
|
|
Game strings one at a time. |
|
|
""" |
|
|
from datasets import load_dataset |
|
|
|
|
|
dataset = load_dataset(dataset_name, split=split, streaming=True) |
|
|
|
|
|
for example in dataset: |
|
|
yield example[column] |
|
|
|
|
|
|
|
|
def analyze_dataset_statistics( |
|
|
dataset_name: str = "dlouapre/lichess_2025-01_1M", |
|
|
max_samples: int = 10000, |
|
|
) -> Dict: |
|
|
""" |
|
|
Analyze statistics of the chess dataset. |
|
|
|
|
|
Args: |
|
|
dataset_name: Name of the dataset. |
|
|
max_samples: Maximum number of samples to analyze. |
|
|
|
|
|
Returns: |
|
|
Dictionary containing dataset statistics. |
|
|
""" |
|
|
from collections import Counter |
|
|
from datasets import load_dataset |
|
|
|
|
|
dataset = load_dataset(dataset_name, split="train") |
|
|
dataset = dataset.select(range(min(max_samples, len(dataset)))) |
|
|
|
|
|
game_lengths = [] |
|
|
move_counts = Counter() |
|
|
opening_moves = Counter() |
|
|
|
|
|
for example in dataset: |
|
|
moves = example["text"].strip().split() |
|
|
game_lengths.append(len(moves)) |
|
|
move_counts.update(moves) |
|
|
|
|
|
|
|
|
if len(moves) >= 4: |
|
|
opening = " ".join(moves[:4]) |
|
|
opening_moves[opening] += 1 |
|
|
|
|
|
return { |
|
|
"total_games": len(dataset), |
|
|
"avg_game_length": sum(game_lengths) / len(game_lengths), |
|
|
"min_game_length": min(game_lengths), |
|
|
"max_game_length": max(game_lengths), |
|
|
"unique_moves": len(move_counts), |
|
|
"most_common_moves": move_counts.most_common(20), |
|
|
"most_common_openings": opening_moves.most_common(10), |
|
|
} |
|
|
|