lsmpp's picture
Add files using upload-large-folder tool
bd33eac verified
from __future__ import annotations
import numpy as np
import pytest
import torch
from sentence_transformers import SparseEncoder
from .utils import sparse_allclose
# These tests fail if optimum.intel.openvino is imported, because openvinotoolkit/nncf
# patches torch._C._nn.gelu in a way that breaks pickling. As a result, we may have issues
# when running both backend tests and multi-process tests in the same session.
@pytest.mark.slow
def test_multi_process_encode_same_as_standard_encode(splade_bert_tiny_model: SparseEncoder):
model = splade_bert_tiny_model
# Test that multi-process encoding gives the same result as standard encoding
texts = ["First sentence.", "Second sentence.", "Third sentence."] * 5
# Standard encode
embeddings_standard = model.encode(texts).cpu()
# Multi-process encode with device=["cpu"] * 2
embeddings_multi = model.encode(texts, device=["cpu"] * 2)
# Should produce the same embeddings
assert sparse_allclose(embeddings_standard, embeddings_multi, atol=1e-5)
@pytest.mark.slow
def test_multi_process_pool(splade_bert_tiny_model: SparseEncoder):
# Test the start_multi_process_pool and stop_multi_process_pool functions
model = splade_bert_tiny_model
texts = ["First sentence.", "Second sentence.", "Third sentence."] * 5
# Standard encode
embeddings_standard = model.encode(texts).cpu()
pool = model.start_multi_process_pool(["cpu"] * 2)
try:
# Encode using the pool
embeddings_multi = model.encode(texts, pool=pool)
finally:
model.stop_multi_process_pool(pool)
# Should be numpy array with correct shape and the same embeddings
assert isinstance(embeddings_multi, torch.Tensor)
assert embeddings_multi.is_sparse
assert embeddings_multi.shape == (len(texts), model.get_sentence_embedding_dimension())
assert sparse_allclose(embeddings_standard, embeddings_multi, atol=1e-5)
@pytest.mark.slow
def test_multi_process_with_args(splade_bert_tiny_model: SparseEncoder):
# Test multi-process encoding with various arguments
model = splade_bert_tiny_model
texts = ["First sentence.", "Second sentence."]
# Create a pool
pool = model.start_multi_process_pool(["cpu"] * 2)
try:
# Test with normalize_embeddings and convert_to_tensor
embeddings_maxed = model.encode(texts, pool=pool, max_active_dims=16)
# Should be a tensor with normalized vectors
assert isinstance(embeddings_maxed, torch.Tensor)
assert embeddings_maxed.is_sparse
assert torch.equal(embeddings_maxed.to_dense().nonzero(as_tuple=True)[0], torch.tensor([0] * 16 + [1] * 16))
# Test with precision options
embeddings_non_sparse = model.encode(texts, pool=pool, convert_to_sparse_tensor=False)
assert isinstance(embeddings_maxed, torch.Tensor)
assert not embeddings_non_sparse.is_sparse
assert embeddings_non_sparse.shape == (len(texts), model.get_sentence_embedding_dimension())
finally:
model.stop_multi_process_pool(pool)
@pytest.mark.slow
def test_multi_process_chunk_size(splade_bert_tiny_model: SparseEncoder):
# Test explicit chunk_size parameter
model = splade_bert_tiny_model
texts = ["First sentence.", "Second sentence.", "Third sentence."] * 10
# Test with explicit chunk size
embeddings = model.encode(texts, device=["cpu"] * 2, chunk_size=5)
# Should produce correct embeddings
assert isinstance(embeddings, torch.Tensor)
assert embeddings.is_sparse
assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())
@pytest.mark.slow
def test_multi_process_with_prompt(splade_bert_tiny_model: SparseEncoder):
# Test multi-process encoding with prompts
model = splade_bert_tiny_model
model.prompts = {"retrieval": "Represent this sentence for searching relevant passages: "}
texts = ["First sentence.", "Second sentence."] * 5
standard_embeddings = model.encode(texts, prompt_name="retrieval").cpu()
assert isinstance(standard_embeddings, torch.Tensor)
assert standard_embeddings.is_sparse
assert standard_embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())
# Create a pool
pool = model.start_multi_process_pool(["cpu"] * 2)
try:
# Encode with prompt
multi_embeddings = model.encode(texts, pool=pool, prompt_name="retrieval")
finally:
model.stop_multi_process_pool(pool)
assert isinstance(multi_embeddings, torch.Tensor)
assert multi_embeddings.is_sparse
assert multi_embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())
assert sparse_allclose(standard_embeddings, multi_embeddings, atol=1e-5)
@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_sparse_tensor", [True, False])
def test_multi_process_with_empty_texts(
splade_bert_tiny_model: SparseEncoder,
convert_to_tensor: bool,
convert_to_sparse_tensor: bool,
):
# Test encoding with empty texts
model = splade_bert_tiny_model
texts = []
# Encode with empty texts
standard_embeddings = model.encode(
texts, convert_to_tensor=convert_to_tensor, convert_to_sparse_tensor=convert_to_sparse_tensor
)
multi_embeddings = model.encode(
texts,
device=["cpu"] * 2,
convert_to_tensor=convert_to_tensor,
convert_to_sparse_tensor=convert_to_sparse_tensor,
)
# Should return empty arrays, identical types as without multi-processing
assert type(standard_embeddings) is type(multi_embeddings)
assert len(standard_embeddings) == 0
assert len(multi_embeddings) == 0
@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_sparse_tensor", [True, False])
def test_multi_process_with_single_string(
splade_bert_tiny_model: SparseEncoder,
convert_to_tensor: bool,
convert_to_sparse_tensor: bool,
):
# Test encoding with a single text
model = splade_bert_tiny_model
texts = "This is a single sentence."
# Encode with single text
standard_embeddings = model.encode(
texts, convert_to_tensor=convert_to_tensor, convert_to_sparse_tensor=convert_to_sparse_tensor
)
multi_embeddings = model.encode(
texts,
device=["cpu"] * 2,
convert_to_tensor=convert_to_tensor,
convert_to_sparse_tensor=convert_to_sparse_tensor,
)
# Assert that the embeddings are the same type and shape
assert type(standard_embeddings) is type(multi_embeddings)
if isinstance(standard_embeddings, (np.ndarray, torch.Tensor)):
assert standard_embeddings.shape == multi_embeddings.shape
else:
assert len(standard_embeddings) == len(multi_embeddings)
# Check that dictionary items are the same
if isinstance(standard_embeddings, dict):
assert standard_embeddings.keys() == multi_embeddings.keys()
for key in standard_embeddings:
if isinstance(standard_embeddings[key], torch.Tensor):
assert torch.allclose(standard_embeddings[key].cpu(), multi_embeddings[key], atol=1e-5)
elif isinstance(standard_embeddings[key], np.ndarray):
assert np.allclose(standard_embeddings[key], multi_embeddings[key], atol=1e-5)
else:
assert standard_embeddings[key] == multi_embeddings[key]
elif isinstance(standard_embeddings, list) and len(standard_embeddings) > 0:
for std_item, multi_item in zip(standard_embeddings, multi_embeddings):
assert set(std_item.keys()) == set(multi_item.keys())
for key in std_item:
if isinstance(std_item[key], torch.Tensor):
assert torch.allclose(std_item[key].cpu(), multi_item[key], atol=1e-5)
elif isinstance(std_item[key], np.ndarray):
assert np.allclose(std_item[key], multi_item[key], atol=1e-5)
else:
assert std_item[key] == multi_item[key]
@pytest.mark.slow
def test_multi_process_more_workers_than_texts(splade_bert_tiny_model: SparseEncoder):
# Test with more workers than texts
model = splade_bert_tiny_model
texts = ["First sentence.", "Second sentence."]
embeddings = model.encode(texts, device=["cpu"] * 3)
# Should be numpy array with correct shape
assert isinstance(embeddings, torch.Tensor)
assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())
@pytest.mark.slow
def test_multi_process_with_large_chunk_size(splade_bert_tiny_model: SparseEncoder):
# Test with a large chunk size
model = splade_bert_tiny_model
texts = ["First sentence.", "Second sentence."] * 10 # 20 sentences
# Use a large chunk size
embeddings = model.encode(texts, device=["cpu"] * 2, chunk_size=30)
# Should produce correct embeddings
assert isinstance(embeddings, torch.Tensor)
assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())