|
|
from __future__ import annotations |
|
|
|
|
|
import numpy as np |
|
|
import pytest |
|
|
import torch |
|
|
|
|
|
from sentence_transformers import SparseEncoder |
|
|
|
|
|
from .utils import sparse_allclose |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_encode_same_as_standard_encode(splade_bert_tiny_model: SparseEncoder): |
|
|
model = splade_bert_tiny_model |
|
|
|
|
|
texts = ["First sentence.", "Second sentence.", "Third sentence."] * 5 |
|
|
|
|
|
|
|
|
embeddings_standard = model.encode(texts).cpu() |
|
|
|
|
|
|
|
|
embeddings_multi = model.encode(texts, device=["cpu"] * 2) |
|
|
|
|
|
|
|
|
assert sparse_allclose(embeddings_standard, embeddings_multi, atol=1e-5) |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_pool(splade_bert_tiny_model: SparseEncoder): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = ["First sentence.", "Second sentence.", "Third sentence."] * 5 |
|
|
|
|
|
|
|
|
embeddings_standard = model.encode(texts).cpu() |
|
|
|
|
|
pool = model.start_multi_process_pool(["cpu"] * 2) |
|
|
try: |
|
|
|
|
|
embeddings_multi = model.encode(texts, pool=pool) |
|
|
|
|
|
finally: |
|
|
model.stop_multi_process_pool(pool) |
|
|
|
|
|
|
|
|
assert isinstance(embeddings_multi, torch.Tensor) |
|
|
assert embeddings_multi.is_sparse |
|
|
assert embeddings_multi.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
assert sparse_allclose(embeddings_standard, embeddings_multi, atol=1e-5) |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_with_args(splade_bert_tiny_model: SparseEncoder): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = ["First sentence.", "Second sentence."] |
|
|
|
|
|
|
|
|
pool = model.start_multi_process_pool(["cpu"] * 2) |
|
|
|
|
|
try: |
|
|
|
|
|
embeddings_maxed = model.encode(texts, pool=pool, max_active_dims=16) |
|
|
|
|
|
|
|
|
assert isinstance(embeddings_maxed, torch.Tensor) |
|
|
assert embeddings_maxed.is_sparse |
|
|
assert torch.equal(embeddings_maxed.to_dense().nonzero(as_tuple=True)[0], torch.tensor([0] * 16 + [1] * 16)) |
|
|
|
|
|
|
|
|
embeddings_non_sparse = model.encode(texts, pool=pool, convert_to_sparse_tensor=False) |
|
|
assert isinstance(embeddings_maxed, torch.Tensor) |
|
|
assert not embeddings_non_sparse.is_sparse |
|
|
assert embeddings_non_sparse.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
finally: |
|
|
model.stop_multi_process_pool(pool) |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_chunk_size(splade_bert_tiny_model: SparseEncoder): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = ["First sentence.", "Second sentence.", "Third sentence."] * 10 |
|
|
|
|
|
|
|
|
embeddings = model.encode(texts, device=["cpu"] * 2, chunk_size=5) |
|
|
|
|
|
|
|
|
assert isinstance(embeddings, torch.Tensor) |
|
|
assert embeddings.is_sparse |
|
|
assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_with_prompt(splade_bert_tiny_model: SparseEncoder): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
model.prompts = {"retrieval": "Represent this sentence for searching relevant passages: "} |
|
|
texts = ["First sentence.", "Second sentence."] * 5 |
|
|
|
|
|
standard_embeddings = model.encode(texts, prompt_name="retrieval").cpu() |
|
|
|
|
|
assert isinstance(standard_embeddings, torch.Tensor) |
|
|
assert standard_embeddings.is_sparse |
|
|
assert standard_embeddings.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
|
|
|
|
|
|
pool = model.start_multi_process_pool(["cpu"] * 2) |
|
|
|
|
|
try: |
|
|
|
|
|
multi_embeddings = model.encode(texts, pool=pool, prompt_name="retrieval") |
|
|
finally: |
|
|
model.stop_multi_process_pool(pool) |
|
|
|
|
|
assert isinstance(multi_embeddings, torch.Tensor) |
|
|
assert multi_embeddings.is_sparse |
|
|
assert multi_embeddings.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
|
|
|
assert sparse_allclose(standard_embeddings, multi_embeddings, atol=1e-5) |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
@pytest.mark.parametrize("convert_to_tensor", [True, False]) |
|
|
@pytest.mark.parametrize("convert_to_sparse_tensor", [True, False]) |
|
|
def test_multi_process_with_empty_texts( |
|
|
splade_bert_tiny_model: SparseEncoder, |
|
|
convert_to_tensor: bool, |
|
|
convert_to_sparse_tensor: bool, |
|
|
): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = [] |
|
|
|
|
|
|
|
|
standard_embeddings = model.encode( |
|
|
texts, convert_to_tensor=convert_to_tensor, convert_to_sparse_tensor=convert_to_sparse_tensor |
|
|
) |
|
|
multi_embeddings = model.encode( |
|
|
texts, |
|
|
device=["cpu"] * 2, |
|
|
convert_to_tensor=convert_to_tensor, |
|
|
convert_to_sparse_tensor=convert_to_sparse_tensor, |
|
|
) |
|
|
|
|
|
|
|
|
assert type(standard_embeddings) is type(multi_embeddings) |
|
|
assert len(standard_embeddings) == 0 |
|
|
assert len(multi_embeddings) == 0 |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
@pytest.mark.parametrize("convert_to_tensor", [True, False]) |
|
|
@pytest.mark.parametrize("convert_to_sparse_tensor", [True, False]) |
|
|
def test_multi_process_with_single_string( |
|
|
splade_bert_tiny_model: SparseEncoder, |
|
|
convert_to_tensor: bool, |
|
|
convert_to_sparse_tensor: bool, |
|
|
): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = "This is a single sentence." |
|
|
|
|
|
|
|
|
standard_embeddings = model.encode( |
|
|
texts, convert_to_tensor=convert_to_tensor, convert_to_sparse_tensor=convert_to_sparse_tensor |
|
|
) |
|
|
multi_embeddings = model.encode( |
|
|
texts, |
|
|
device=["cpu"] * 2, |
|
|
convert_to_tensor=convert_to_tensor, |
|
|
convert_to_sparse_tensor=convert_to_sparse_tensor, |
|
|
) |
|
|
|
|
|
|
|
|
assert type(standard_embeddings) is type(multi_embeddings) |
|
|
if isinstance(standard_embeddings, (np.ndarray, torch.Tensor)): |
|
|
assert standard_embeddings.shape == multi_embeddings.shape |
|
|
else: |
|
|
assert len(standard_embeddings) == len(multi_embeddings) |
|
|
|
|
|
if isinstance(standard_embeddings, dict): |
|
|
assert standard_embeddings.keys() == multi_embeddings.keys() |
|
|
for key in standard_embeddings: |
|
|
if isinstance(standard_embeddings[key], torch.Tensor): |
|
|
assert torch.allclose(standard_embeddings[key].cpu(), multi_embeddings[key], atol=1e-5) |
|
|
elif isinstance(standard_embeddings[key], np.ndarray): |
|
|
assert np.allclose(standard_embeddings[key], multi_embeddings[key], atol=1e-5) |
|
|
else: |
|
|
assert standard_embeddings[key] == multi_embeddings[key] |
|
|
elif isinstance(standard_embeddings, list) and len(standard_embeddings) > 0: |
|
|
for std_item, multi_item in zip(standard_embeddings, multi_embeddings): |
|
|
assert set(std_item.keys()) == set(multi_item.keys()) |
|
|
for key in std_item: |
|
|
if isinstance(std_item[key], torch.Tensor): |
|
|
assert torch.allclose(std_item[key].cpu(), multi_item[key], atol=1e-5) |
|
|
elif isinstance(std_item[key], np.ndarray): |
|
|
assert np.allclose(std_item[key], multi_item[key], atol=1e-5) |
|
|
else: |
|
|
assert std_item[key] == multi_item[key] |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_more_workers_than_texts(splade_bert_tiny_model: SparseEncoder): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = ["First sentence.", "Second sentence."] |
|
|
|
|
|
embeddings = model.encode(texts, device=["cpu"] * 3) |
|
|
|
|
|
|
|
|
assert isinstance(embeddings, torch.Tensor) |
|
|
assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
|
|
|
|
|
|
@pytest.mark.slow |
|
|
def test_multi_process_with_large_chunk_size(splade_bert_tiny_model: SparseEncoder): |
|
|
|
|
|
model = splade_bert_tiny_model |
|
|
texts = ["First sentence.", "Second sentence."] * 10 |
|
|
|
|
|
|
|
|
embeddings = model.encode(texts, device=["cpu"] * 2, chunk_size=30) |
|
|
|
|
|
|
|
|
assert isinstance(embeddings, torch.Tensor) |
|
|
assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension()) |
|
|
|