AlexWortega commited on
Commit
b08e9da
·
verified ·
1 Parent(s): 9f577b0

Upload data_physics.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. data_physics.py +112 -0
data_physics.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Streaming packed-token loader for the tiny-vocab physics MoE.
2
+
3
+ Yields contiguous `input_ids` blocks of length `seq_len+1` (so the trainer can
4
+ split into input[:-1]/labels[1:]). Scenes are serialized with the reduced
5
+ sim-only serialization, encoded with the trained tokenizer (which adds
6
+ <bos>..<eos> via its post-processor), then concatenated and chopped into
7
+ fixed blocks (standard causal-LM packing).
8
+
9
+ Train = interleave across the 24 train/ shards (all scenario types mixed).
10
+ Val = a held-back slice from a few train-type val/ shards (same types,
11
+ unseen scenes) so eval loss reflects in-distribution generalization.
12
+ """
13
+ from __future__ import annotations
14
+ import json
15
+ from typing import Iterator, List
16
+
17
+ import numpy as np
18
+ import torch
19
+ from datasets import load_dataset, interleave_datasets
20
+ from tokenizers import Tokenizer
21
+
22
+ import physics_serialize as psz
23
+
24
+ REPO = "AlexWortega/physics-scenarios-packed"
25
+ TRAIN_TYPES = [
26
+ "avalanche","basketball","billiards","breakout","bridge","chain","conveyor",
27
+ "dominos","explosion","funnel","head_on","jenga","marble_run","orbit",
28
+ "pendulum","pinball","plinko","projectile","pyramid","seesaw","ski_jump",
29
+ "tower","wind","wrecking_ball",
30
+ ]
31
+
32
+
33
+ def _serialize_row(row) -> str | None:
34
+ lines = row["jsonl"].decode().splitlines()
35
+ if not lines:
36
+ return None
37
+ try:
38
+ header = json.loads(lines[0])
39
+ frames = [json.loads(x) for x in lines[1:] if x.startswith("{")]
40
+ except Exception:
41
+ return None
42
+ if not frames:
43
+ return None
44
+ return psz.serialize_scene(header, frames)
45
+
46
+
47
+ def _shard_mix(split: str, types: List[str], seed: int):
48
+ shards = [f"{split}/{t}.tar.gz" for t in types]
49
+ dss = [load_dataset(REPO, data_files={split: [s]}, split=split, streaming=True)
50
+ for s in shards]
51
+ if len(dss) == 1:
52
+ return dss[0]
53
+ return interleave_datasets(dss, seed=seed, stopping_strategy="all_exhausted")
54
+
55
+
56
+ def packed_token_stream(tokenizer_path: str, seq_len: int, split: str = "train",
57
+ seed: int = 0, types: List[str] | None = None,
58
+ shuffle_buffer: int = 0) -> Iterator[np.ndarray]:
59
+ """Yield np.int64 arrays of length seq_len+1 (packed)."""
60
+ tok = Tokenizer.from_file(tokenizer_path)
61
+ types = types or TRAIN_TYPES
62
+ mix = _shard_mix(split, types, seed)
63
+ if shuffle_buffer > 0:
64
+ mix = mix.shuffle(seed=seed, buffer_size=shuffle_buffer)
65
+ block = seq_len + 1
66
+ buf: List[int] = []
67
+ for row in mix:
68
+ s = _serialize_row(row)
69
+ if s is None:
70
+ continue
71
+ ids = tok.encode(s).ids # includes <bos>..<eos>
72
+ buf.extend(ids)
73
+ while len(buf) >= block:
74
+ yield np.asarray(buf[:block], dtype=np.int64)
75
+ buf = buf[block:]
76
+
77
+
78
+ def batch_iterator(tokenizer_path: str, seq_len: int, batch_size: int,
79
+ split: str = "train", seed: int = 0,
80
+ types: List[str] | None = None, shuffle_buffer: int = 0,
81
+ device: str = "cuda", infinite: bool = False):
82
+ """Yield (input_ids[B,S], labels[B,S]) int64 cuda tensors."""
83
+ while True:
84
+ gen = packed_token_stream(tokenizer_path, seq_len, split, seed,
85
+ types, shuffle_buffer)
86
+ chunk: List[np.ndarray] = []
87
+ for arr in gen:
88
+ chunk.append(arr)
89
+ if len(chunk) == batch_size:
90
+ m = np.stack(chunk, axis=0) # [B, S+1]
91
+ inp = torch.from_numpy(m[:, :-1]).to(device, non_blocking=True)
92
+ lbl = torch.from_numpy(m[:, 1:]).to(device, non_blocking=True)
93
+ yield inp, lbl
94
+ chunk = []
95
+ if not infinite:
96
+ return
97
+ seed += 1 # reshuffle the interleave on each epoch
98
+
99
+
100
+ if __name__ == "__main__":
101
+ import sys, time
102
+ tp = sys.argv[1] if len(sys.argv) > 1 else "tokenizer.json"
103
+ it = batch_iterator(tp, seq_len=1024, batch_size=4, split="train",
104
+ shuffle_buffer=0, device="cpu")
105
+ t0 = time.time(); n = 0
106
+ for inp, lbl in it:
107
+ n += 1
108
+ if n == 1:
109
+ print("first batch", inp.shape, "min", int(inp.min()), "max", int(inp.max()))
110
+ if n >= 20:
111
+ break
112
+ print(f"{n} batches in {time.time()-t0:.1f}s ({n*4*1024} tokens)")