abpt / src /data /openwebmath_bpe.py
Search
auto: sync run_testformer_wikitext_combo_remote.py
f37be5a
from __future__ import annotations
import json
from pathlib import Path
import torch
from src.data.the_stack import _normalize_repo_name
from src.data.the_stack_bpe import BPETokenDataset, _train_tokenizer
def _load_openwebmath_text(
data_dir: str,
repo_id: str,
split: str,
target_bytes: int,
) -> str:
Path(data_dir).mkdir(parents=True, exist_ok=True)
cache_name = f"{_normalize_repo_name(repo_id)}_{split}_{target_bytes}.txt"
cache_path = Path(data_dir) / cache_name
if cache_path.exists():
return cache_path.read_text(encoding="utf-8")
from datasets import load_dataset
ds = load_dataset(repo_id, split=split, streaming=True)
chunks: list[str] = []
total = 0
for sample in ds:
text = sample.get("text") or ""
if not isinstance(text, str) or not text.strip():
continue
block = text + "\n\n"
chunks.append(block)
total += len(block.encode("utf-8"))
if total >= target_bytes:
break
if total == 0:
raise RuntimeError(f"No usable text collected from {repo_id}:{split}.")
merged = "".join(chunks)
cache_path.write_text(merged, encoding="utf-8")
return merged
def load_openwebmath_bpe(
seq_len: int = 256,
device: str = "cpu",
data_dir: str = "data_cache",
repo_id: str = "open-web-math/open-web-math",
target_bytes: int = 200_000,
vocab_size: int = 256,
) -> tuple[BPETokenDataset, BPETokenDataset]:
Path(data_dir).mkdir(parents=True, exist_ok=True)
prefix = f"{_normalize_repo_name(repo_id)}_{target_bytes}_bpe{vocab_size}"
tokenizer_path = Path(data_dir) / f"{prefix}_tokenizer.json"
ids_path = Path(data_dir) / f"{prefix}_ids.pt"
meta_path = Path(data_dir) / f"{prefix}_meta.json"
if tokenizer_path.exists() and ids_path.exists() and meta_path.exists():
token_ids = torch.load(ids_path, map_location="cpu")
meta = json.loads(meta_path.read_text(encoding="utf-8"))
actual_vocab_size = int(meta["vocab_size"])
else:
text = _load_openwebmath_text(
data_dir=data_dir,
repo_id=repo_id,
split="train",
target_bytes=target_bytes,
)
tokenizer = _train_tokenizer(text=text, vocab_size=vocab_size)
token_ids = torch.tensor(tokenizer.encode(text).ids, dtype=torch.long)
actual_vocab_size = tokenizer.get_vocab_size()
tokenizer.save(str(tokenizer_path))
torch.save(token_ids, ids_path)
meta_path.write_text(
json.dumps(
{
"repo_id": repo_id,
"target_bytes": target_bytes,
"vocab_size": actual_vocab_size,
"token_count": int(token_ids.numel()),
},
indent=2,
),
encoding="utf-8",
)
train = BPETokenDataset(
token_ids=token_ids,
vocab_size=actual_vocab_size,
split="train",
seq_len=seq_len,
device=device,
)
val = BPETokenDataset(
token_ids=token_ids,
vocab_size=actual_vocab_size,
split="val",
seq_len=seq_len,
device=device,
)
return train, val