Initial GrandLine implementation: deterministic shard-first dataset preprocessing for LLM pretraining
ed59144 verified | """Test deduplication: exact dedup with DuckDB-backed store.""" | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from grandline.blocks.dedup import ExactDedup | |
| from grandline.dedup_store import DedupStore | |
| from grandline.hashing import hash_text | |
| from grandline.types import Document | |
| def test_dedup_store_basic(): | |
| """DedupStore correctly identifies seen/unseen hashes.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| store = DedupStore(Path(tmp) / "test.duckdb") | |
| h1 = hash_text("hello world") | |
| h2 = hash_text("different text") | |
| assert not store.seen(h1) | |
| store.add(h1) | |
| assert store.seen(h1) | |
| assert not store.seen(h2) | |
| store.close() | |
| def test_dedup_store_batch(): | |
| """Batch operations work correctly.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| store = DedupStore(Path(tmp) / "test.duckdb") | |
| hashes = [hash_text(f"doc {i}") for i in range(10)] | |
| store.add_batch(hashes[:5]) | |
| # First 5 should be seen | |
| for h in hashes[:5]: | |
| assert store.seen(h) | |
| # Last 5 should not be seen | |
| for h in hashes[5:]: | |
| assert not store.seen(h) | |
| assert store.count == 5 | |
| store.close() | |
| def test_dedup_store_filter_unseen(): | |
| """filter_unseen returns only new hashes.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| store = DedupStore(Path(tmp) / "test.duckdb") | |
| hashes = [hash_text(f"doc {i}") for i in range(10)] | |
| store.add_batch(hashes[:5]) | |
| unseen = store.filter_unseen(hashes) | |
| assert len(unseen) == 5 | |
| assert set(unseen) == set(hashes[5:]) | |
| store.close() | |
| def test_dedup_store_idempotent(): | |
| """Adding the same hash twice is a no-op.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| store = DedupStore(Path(tmp) / "test.duckdb") | |
| h = hash_text("hello") | |
| store.add(h) | |
| store.add(h) # Should not error | |
| store.add_batch([h, h, h]) # Should not error | |
| assert store.count == 1 | |
| store.close() | |
| def test_dedup_store_persistence(): | |
| """Store persists across close/reopen cycles.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| db_path = Path(tmp) / "test.duckdb" | |
| # Write | |
| store = DedupStore(db_path) | |
| h = hash_text("persistent") | |
| store.add(h) | |
| store.close() | |
| # Read | |
| store = DedupStore(db_path) | |
| assert store.seen(h) | |
| assert store.count == 1 | |
| store.close() | |
| def test_exact_dedup_block(): | |
| """ExactDedup block drops duplicate documents.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| store = DedupStore(Path(tmp) / "test.duckdb") | |
| block = ExactDedup(store=store, batch_size=3) | |
| docs = [ | |
| Document(text="unique doc 1", doc_id="0"), | |
| Document(text="unique doc 2", doc_id="1"), | |
| Document(text="unique doc 1", doc_id="2"), # duplicate of 0 | |
| Document(text="unique doc 3", doc_id="3"), | |
| Document(text="unique doc 2", doc_id="4"), # duplicate of 1 | |
| ] | |
| results = list(block(iter(docs))) | |
| # Should keep only unique texts | |
| assert len(results) == 3 | |
| result_ids = [d.doc_id for d in results] | |
| assert result_ids == ["0", "1", "3"] | |
| # Hashes should be in metadata | |
| for doc in results: | |
| assert "blake3" in doc.metadata | |
| store.close() | |
| def test_exact_dedup_cross_batch(): | |
| """Dedup works across batch boundaries.""" | |
| with tempfile.TemporaryDirectory() as tmp: | |
| store = DedupStore(Path(tmp) / "test.duckdb") | |
| block = ExactDedup(store=store, batch_size=2) | |
| docs = [ | |
| Document(text="text A", doc_id="0"), | |
| Document(text="text B", doc_id="1"), | |
| # Batch boundary here (batch_size=2) | |
| Document(text="text A", doc_id="2"), # dup across batch | |
| Document(text="text C", doc_id="3"), | |
| ] | |
| results = list(block(iter(docs))) | |
| assert len(results) == 3 | |
| result_ids = [d.doc_id for d in results] | |
| assert result_ids == ["0", "1", "3"] | |
| store.close() | |
| if __name__ == "__main__": | |
| test_dedup_store_basic() | |
| test_dedup_store_batch() | |
| test_dedup_store_filter_unseen() | |
| test_dedup_store_idempotent() | |
| test_dedup_store_persistence() | |
| test_exact_dedup_block() | |
| test_exact_dedup_cross_batch() | |
| print("All dedup tests passed!") | |