grandline / tests /test_dedup.py
dignity045's picture
Initial GrandLine implementation: deterministic shard-first dataset preprocessing for LLM pretraining
ed59144 verified
"""Test deduplication: exact dedup with DuckDB-backed store."""
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from grandline.blocks.dedup import ExactDedup
from grandline.dedup_store import DedupStore
from grandline.hashing import hash_text
from grandline.types import Document
def test_dedup_store_basic():
"""DedupStore correctly identifies seen/unseen hashes."""
with tempfile.TemporaryDirectory() as tmp:
store = DedupStore(Path(tmp) / "test.duckdb")
h1 = hash_text("hello world")
h2 = hash_text("different text")
assert not store.seen(h1)
store.add(h1)
assert store.seen(h1)
assert not store.seen(h2)
store.close()
def test_dedup_store_batch():
"""Batch operations work correctly."""
with tempfile.TemporaryDirectory() as tmp:
store = DedupStore(Path(tmp) / "test.duckdb")
hashes = [hash_text(f"doc {i}") for i in range(10)]
store.add_batch(hashes[:5])
# First 5 should be seen
for h in hashes[:5]:
assert store.seen(h)
# Last 5 should not be seen
for h in hashes[5:]:
assert not store.seen(h)
assert store.count == 5
store.close()
def test_dedup_store_filter_unseen():
"""filter_unseen returns only new hashes."""
with tempfile.TemporaryDirectory() as tmp:
store = DedupStore(Path(tmp) / "test.duckdb")
hashes = [hash_text(f"doc {i}") for i in range(10)]
store.add_batch(hashes[:5])
unseen = store.filter_unseen(hashes)
assert len(unseen) == 5
assert set(unseen) == set(hashes[5:])
store.close()
def test_dedup_store_idempotent():
"""Adding the same hash twice is a no-op."""
with tempfile.TemporaryDirectory() as tmp:
store = DedupStore(Path(tmp) / "test.duckdb")
h = hash_text("hello")
store.add(h)
store.add(h) # Should not error
store.add_batch([h, h, h]) # Should not error
assert store.count == 1
store.close()
def test_dedup_store_persistence():
"""Store persists across close/reopen cycles."""
with tempfile.TemporaryDirectory() as tmp:
db_path = Path(tmp) / "test.duckdb"
# Write
store = DedupStore(db_path)
h = hash_text("persistent")
store.add(h)
store.close()
# Read
store = DedupStore(db_path)
assert store.seen(h)
assert store.count == 1
store.close()
def test_exact_dedup_block():
"""ExactDedup block drops duplicate documents."""
with tempfile.TemporaryDirectory() as tmp:
store = DedupStore(Path(tmp) / "test.duckdb")
block = ExactDedup(store=store, batch_size=3)
docs = [
Document(text="unique doc 1", doc_id="0"),
Document(text="unique doc 2", doc_id="1"),
Document(text="unique doc 1", doc_id="2"), # duplicate of 0
Document(text="unique doc 3", doc_id="3"),
Document(text="unique doc 2", doc_id="4"), # duplicate of 1
]
results = list(block(iter(docs)))
# Should keep only unique texts
assert len(results) == 3
result_ids = [d.doc_id for d in results]
assert result_ids == ["0", "1", "3"]
# Hashes should be in metadata
for doc in results:
assert "blake3" in doc.metadata
store.close()
def test_exact_dedup_cross_batch():
"""Dedup works across batch boundaries."""
with tempfile.TemporaryDirectory() as tmp:
store = DedupStore(Path(tmp) / "test.duckdb")
block = ExactDedup(store=store, batch_size=2)
docs = [
Document(text="text A", doc_id="0"),
Document(text="text B", doc_id="1"),
# Batch boundary here (batch_size=2)
Document(text="text A", doc_id="2"), # dup across batch
Document(text="text C", doc_id="3"),
]
results = list(block(iter(docs)))
assert len(results) == 3
result_ids = [d.doc_id for d in results]
assert result_ids == ["0", "1", "3"]
store.close()
if __name__ == "__main__":
test_dedup_store_basic()
test_dedup_store_batch()
test_dedup_store_filter_unseen()
test_dedup_store_idempotent()
test_dedup_store_persistence()
test_exact_dedup_block()
test_exact_dedup_cross_batch()
print("All dedup tests passed!")