"""Test determinism: same input + same config → same output. Verifies that GrandLine pipelines produce identical results across multiple runs with identical inputs. """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from grandline.blocks.filters import MinMaxLengthFilter from grandline.blocks.normalize import NormalizeText from grandline.blocks.pack import PackBlock from grandline.pipeline import Pipeline from grandline.types import Document, TokenizedDocument def make_docs(n: int = 10) -> list[Document]: """Create a deterministic set of test documents.""" return [ Document( text=f"This is test document number {i}. " * (i + 5), metadata={"idx": i}, source="test", shard_id="shard_0", doc_id=f"shard_0:{i}", ) for i in range(n) ] def test_normalize_determinism(): """Normalization produces identical output across runs.""" block = NormalizeText() docs = make_docs() result_1 = [doc.text for doc in block(iter(docs))] # Reset docs (NormalizeText modifies in place) docs = make_docs() result_2 = [doc.text for doc in block(iter(docs))] assert result_1 == result_2, "Normalization is not deterministic" def test_filter_determinism(): """Filtering preserves order and produces same results.""" block = MinMaxLengthFilter(min_chars=50, max_chars=1000) docs = make_docs(20) result_1 = [(doc.doc_id, doc.text) for doc in block(iter(docs))] docs = make_docs(20) result_2 = [(doc.doc_id, doc.text) for doc in block(iter(docs))] assert result_1 == result_2, "Filtering is not deterministic" def test_pipeline_fingerprint_stability(): """Pipeline fingerprint is stable across instantiations.""" def make_pipeline(): return Pipeline( blocks=[NormalizeText(), MinMaxLengthFilter(min_chars=100)], name="test", dataset_name="test_ds", ) p1 = make_pipeline() p2 = make_pipeline() assert p1.fingerprint == p2.fingerprint, "Pipeline fingerprints differ" def test_pack_determinism(): """Packing produces identical output for identical input.""" pack = PackBlock(max_seq_len=20, eos_id=2, pad_id=0) def make_tokenized(): return [ TokenizedDocument(token_ids=[1, 3, 5, 7], num_tokens=4), TokenizedDocument(token_ids=[2, 4, 6, 8, 10], num_tokens=5), TokenizedDocument(token_ids=[11, 12, 13], num_tokens=3), ] result_1 = [(s.input_ids, s.seq_lens) for s in pack(iter(make_tokenized()))] result_2 = [(s.input_ids, s.seq_lens) for s in pack(iter(make_tokenized()))] assert result_1 == result_2, "Packing is not deterministic" def test_pipeline_end_to_end_determinism(): """Full pipeline (normalize + filter) produces same results.""" pipeline = Pipeline( blocks=[NormalizeText(), MinMaxLengthFilter(min_chars=50, max_chars=5000)], name="test_e2e", dataset_name="test", ) docs = make_docs(15) result_1 = [(d.doc_id, d.text) for d in pipeline.run(iter(docs))] docs = make_docs(15) result_2 = [(d.doc_id, d.text) for d in pipeline.run(iter(docs))] assert result_1 == result_2, "End-to-end pipeline is not deterministic" assert len(result_1) > 0, "Pipeline produced no output" if __name__ == "__main__": test_normalize_determinism() test_filter_determinism() test_pipeline_fingerprint_stability() test_pack_determinism() test_pipeline_end_to_end_determinism() print("All determinism tests passed!")