Initial GrandLine implementation: deterministic shard-first dataset preprocessing for LLM pretraining
ed59144 verified | """Test determinism: same input + same config → same output. | |
| Verifies that GrandLine pipelines produce identical results | |
| across multiple runs with identical inputs. | |
| """ | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from grandline.blocks.filters import MinMaxLengthFilter | |
| from grandline.blocks.normalize import NormalizeText | |
| from grandline.blocks.pack import PackBlock | |
| from grandline.pipeline import Pipeline | |
| from grandline.types import Document, TokenizedDocument | |
| def make_docs(n: int = 10) -> list[Document]: | |
| """Create a deterministic set of test documents.""" | |
| return [ | |
| Document( | |
| text=f"This is test document number {i}. " * (i + 5), | |
| metadata={"idx": i}, | |
| source="test", | |
| shard_id="shard_0", | |
| doc_id=f"shard_0:{i}", | |
| ) | |
| for i in range(n) | |
| ] | |
| def test_normalize_determinism(): | |
| """Normalization produces identical output across runs.""" | |
| block = NormalizeText() | |
| docs = make_docs() | |
| result_1 = [doc.text for doc in block(iter(docs))] | |
| # Reset docs (NormalizeText modifies in place) | |
| docs = make_docs() | |
| result_2 = [doc.text for doc in block(iter(docs))] | |
| assert result_1 == result_2, "Normalization is not deterministic" | |
| def test_filter_determinism(): | |
| """Filtering preserves order and produces same results.""" | |
| block = MinMaxLengthFilter(min_chars=50, max_chars=1000) | |
| docs = make_docs(20) | |
| result_1 = [(doc.doc_id, doc.text) for doc in block(iter(docs))] | |
| docs = make_docs(20) | |
| result_2 = [(doc.doc_id, doc.text) for doc in block(iter(docs))] | |
| assert result_1 == result_2, "Filtering is not deterministic" | |
| def test_pipeline_fingerprint_stability(): | |
| """Pipeline fingerprint is stable across instantiations.""" | |
| def make_pipeline(): | |
| return Pipeline( | |
| blocks=[NormalizeText(), MinMaxLengthFilter(min_chars=100)], | |
| name="test", | |
| dataset_name="test_ds", | |
| ) | |
| p1 = make_pipeline() | |
| p2 = make_pipeline() | |
| assert p1.fingerprint == p2.fingerprint, "Pipeline fingerprints differ" | |
| def test_pack_determinism(): | |
| """Packing produces identical output for identical input.""" | |
| pack = PackBlock(max_seq_len=20, eos_id=2, pad_id=0) | |
| def make_tokenized(): | |
| return [ | |
| TokenizedDocument(token_ids=[1, 3, 5, 7], num_tokens=4), | |
| TokenizedDocument(token_ids=[2, 4, 6, 8, 10], num_tokens=5), | |
| TokenizedDocument(token_ids=[11, 12, 13], num_tokens=3), | |
| ] | |
| result_1 = [(s.input_ids, s.seq_lens) for s in pack(iter(make_tokenized()))] | |
| result_2 = [(s.input_ids, s.seq_lens) for s in pack(iter(make_tokenized()))] | |
| assert result_1 == result_2, "Packing is not deterministic" | |
| def test_pipeline_end_to_end_determinism(): | |
| """Full pipeline (normalize + filter) produces same results.""" | |
| pipeline = Pipeline( | |
| blocks=[NormalizeText(), MinMaxLengthFilter(min_chars=50, max_chars=5000)], | |
| name="test_e2e", | |
| dataset_name="test", | |
| ) | |
| docs = make_docs(15) | |
| result_1 = [(d.doc_id, d.text) for d in pipeline.run(iter(docs))] | |
| docs = make_docs(15) | |
| result_2 = [(d.doc_id, d.text) for d in pipeline.run(iter(docs))] | |
| assert result_1 == result_2, "End-to-end pipeline is not deterministic" | |
| assert len(result_1) > 0, "Pipeline produced no output" | |
| if __name__ == "__main__": | |
| test_normalize_determinism() | |
| test_filter_determinism() | |
| test_pipeline_fingerprint_stability() | |
| test_pack_determinism() | |
| test_pipeline_end_to_end_determinism() | |
| print("All determinism tests passed!") | |