grandline / tests /test_determinism.py
dignity045's picture
Initial GrandLine implementation: deterministic shard-first dataset preprocessing for LLM pretraining
ed59144 verified
"""Test determinism: same input + same config → same output.
Verifies that GrandLine pipelines produce identical results
across multiple runs with identical inputs.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from grandline.blocks.filters import MinMaxLengthFilter
from grandline.blocks.normalize import NormalizeText
from grandline.blocks.pack import PackBlock
from grandline.pipeline import Pipeline
from grandline.types import Document, TokenizedDocument
def make_docs(n: int = 10) -> list[Document]:
"""Create a deterministic set of test documents."""
return [
Document(
text=f"This is test document number {i}. " * (i + 5),
metadata={"idx": i},
source="test",
shard_id="shard_0",
doc_id=f"shard_0:{i}",
)
for i in range(n)
]
def test_normalize_determinism():
"""Normalization produces identical output across runs."""
block = NormalizeText()
docs = make_docs()
result_1 = [doc.text for doc in block(iter(docs))]
# Reset docs (NormalizeText modifies in place)
docs = make_docs()
result_2 = [doc.text for doc in block(iter(docs))]
assert result_1 == result_2, "Normalization is not deterministic"
def test_filter_determinism():
"""Filtering preserves order and produces same results."""
block = MinMaxLengthFilter(min_chars=50, max_chars=1000)
docs = make_docs(20)
result_1 = [(doc.doc_id, doc.text) for doc in block(iter(docs))]
docs = make_docs(20)
result_2 = [(doc.doc_id, doc.text) for doc in block(iter(docs))]
assert result_1 == result_2, "Filtering is not deterministic"
def test_pipeline_fingerprint_stability():
"""Pipeline fingerprint is stable across instantiations."""
def make_pipeline():
return Pipeline(
blocks=[NormalizeText(), MinMaxLengthFilter(min_chars=100)],
name="test",
dataset_name="test_ds",
)
p1 = make_pipeline()
p2 = make_pipeline()
assert p1.fingerprint == p2.fingerprint, "Pipeline fingerprints differ"
def test_pack_determinism():
"""Packing produces identical output for identical input."""
pack = PackBlock(max_seq_len=20, eos_id=2, pad_id=0)
def make_tokenized():
return [
TokenizedDocument(token_ids=[1, 3, 5, 7], num_tokens=4),
TokenizedDocument(token_ids=[2, 4, 6, 8, 10], num_tokens=5),
TokenizedDocument(token_ids=[11, 12, 13], num_tokens=3),
]
result_1 = [(s.input_ids, s.seq_lens) for s in pack(iter(make_tokenized()))]
result_2 = [(s.input_ids, s.seq_lens) for s in pack(iter(make_tokenized()))]
assert result_1 == result_2, "Packing is not deterministic"
def test_pipeline_end_to_end_determinism():
"""Full pipeline (normalize + filter) produces same results."""
pipeline = Pipeline(
blocks=[NormalizeText(), MinMaxLengthFilter(min_chars=50, max_chars=5000)],
name="test_e2e",
dataset_name="test",
)
docs = make_docs(15)
result_1 = [(d.doc_id, d.text) for d in pipeline.run(iter(docs))]
docs = make_docs(15)
result_2 = [(d.doc_id, d.text) for d in pipeline.run(iter(docs))]
assert result_1 == result_2, "End-to-end pipeline is not deterministic"
assert len(result_1) > 0, "Pipeline produced no output"
if __name__ == "__main__":
test_normalize_determinism()
test_filter_determinism()
test_pipeline_fingerprint_stability()
test_pack_determinism()
test_pipeline_end_to_end_determinism()
print("All determinism tests passed!")