File size: 9,192 Bytes
bd33eac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from __future__ import annotations

import numpy as np
import pytest
import torch

from sentence_transformers import SparseEncoder

from .utils import sparse_allclose

# These tests fail if optimum.intel.openvino is imported, because openvinotoolkit/nncf
# patches torch._C._nn.gelu in a way that breaks pickling. As a result, we may have issues
# when running both backend tests and multi-process tests in the same session.


@pytest.mark.slow
def test_multi_process_encode_same_as_standard_encode(splade_bert_tiny_model: SparseEncoder):
    model = splade_bert_tiny_model
    # Test that multi-process encoding gives the same result as standard encoding
    texts = ["First sentence.", "Second sentence.", "Third sentence."] * 5

    # Standard encode
    embeddings_standard = model.encode(texts).cpu()

    # Multi-process encode with device=["cpu"] * 2
    embeddings_multi = model.encode(texts, device=["cpu"] * 2)

    # Should produce the same embeddings
    assert sparse_allclose(embeddings_standard, embeddings_multi, atol=1e-5)


@pytest.mark.slow
def test_multi_process_pool(splade_bert_tiny_model: SparseEncoder):
    # Test the start_multi_process_pool and stop_multi_process_pool functions
    model = splade_bert_tiny_model
    texts = ["First sentence.", "Second sentence.", "Third sentence."] * 5

    # Standard encode
    embeddings_standard = model.encode(texts).cpu()

    pool = model.start_multi_process_pool(["cpu"] * 2)
    try:
        # Encode using the pool
        embeddings_multi = model.encode(texts, pool=pool)

    finally:
        model.stop_multi_process_pool(pool)

    # Should be numpy array with correct shape and the same embeddings
    assert isinstance(embeddings_multi, torch.Tensor)
    assert embeddings_multi.is_sparse
    assert embeddings_multi.shape == (len(texts), model.get_sentence_embedding_dimension())
    assert sparse_allclose(embeddings_standard, embeddings_multi, atol=1e-5)


@pytest.mark.slow
def test_multi_process_with_args(splade_bert_tiny_model: SparseEncoder):
    # Test multi-process encoding with various arguments
    model = splade_bert_tiny_model
    texts = ["First sentence.", "Second sentence."]

    # Create a pool
    pool = model.start_multi_process_pool(["cpu"] * 2)

    try:
        # Test with normalize_embeddings and convert_to_tensor
        embeddings_maxed = model.encode(texts, pool=pool, max_active_dims=16)

        # Should be a tensor with normalized vectors
        assert isinstance(embeddings_maxed, torch.Tensor)
        assert embeddings_maxed.is_sparse
        assert torch.equal(embeddings_maxed.to_dense().nonzero(as_tuple=True)[0], torch.tensor([0] * 16 + [1] * 16))

        # Test with precision options
        embeddings_non_sparse = model.encode(texts, pool=pool, convert_to_sparse_tensor=False)
        assert isinstance(embeddings_maxed, torch.Tensor)
        assert not embeddings_non_sparse.is_sparse
        assert embeddings_non_sparse.shape == (len(texts), model.get_sentence_embedding_dimension())
    finally:
        model.stop_multi_process_pool(pool)


@pytest.mark.slow
def test_multi_process_chunk_size(splade_bert_tiny_model: SparseEncoder):
    # Test explicit chunk_size parameter
    model = splade_bert_tiny_model
    texts = ["First sentence.", "Second sentence.", "Third sentence."] * 10

    # Test with explicit chunk size
    embeddings = model.encode(texts, device=["cpu"] * 2, chunk_size=5)

    # Should produce correct embeddings
    assert isinstance(embeddings, torch.Tensor)
    assert embeddings.is_sparse
    assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())


@pytest.mark.slow
def test_multi_process_with_prompt(splade_bert_tiny_model: SparseEncoder):
    # Test multi-process encoding with prompts
    model = splade_bert_tiny_model
    model.prompts = {"retrieval": "Represent this sentence for searching relevant passages: "}
    texts = ["First sentence.", "Second sentence."] * 5

    standard_embeddings = model.encode(texts, prompt_name="retrieval").cpu()

    assert isinstance(standard_embeddings, torch.Tensor)
    assert standard_embeddings.is_sparse
    assert standard_embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())

    # Create a pool
    pool = model.start_multi_process_pool(["cpu"] * 2)

    try:
        # Encode with prompt
        multi_embeddings = model.encode(texts, pool=pool, prompt_name="retrieval")
    finally:
        model.stop_multi_process_pool(pool)

    assert isinstance(multi_embeddings, torch.Tensor)
    assert multi_embeddings.is_sparse
    assert multi_embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())

    assert sparse_allclose(standard_embeddings, multi_embeddings, atol=1e-5)


@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_sparse_tensor", [True, False])
def test_multi_process_with_empty_texts(
    splade_bert_tiny_model: SparseEncoder,
    convert_to_tensor: bool,
    convert_to_sparse_tensor: bool,
):
    # Test encoding with empty texts
    model = splade_bert_tiny_model
    texts = []

    # Encode with empty texts
    standard_embeddings = model.encode(
        texts, convert_to_tensor=convert_to_tensor, convert_to_sparse_tensor=convert_to_sparse_tensor
    )
    multi_embeddings = model.encode(
        texts,
        device=["cpu"] * 2,
        convert_to_tensor=convert_to_tensor,
        convert_to_sparse_tensor=convert_to_sparse_tensor,
    )

    # Should return empty arrays, identical types as without multi-processing
    assert type(standard_embeddings) is type(multi_embeddings)
    assert len(standard_embeddings) == 0
    assert len(multi_embeddings) == 0


@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_sparse_tensor", [True, False])
def test_multi_process_with_single_string(
    splade_bert_tiny_model: SparseEncoder,
    convert_to_tensor: bool,
    convert_to_sparse_tensor: bool,
):
    # Test encoding with a single text
    model = splade_bert_tiny_model
    texts = "This is a single sentence."

    # Encode with single text
    standard_embeddings = model.encode(
        texts, convert_to_tensor=convert_to_tensor, convert_to_sparse_tensor=convert_to_sparse_tensor
    )
    multi_embeddings = model.encode(
        texts,
        device=["cpu"] * 2,
        convert_to_tensor=convert_to_tensor,
        convert_to_sparse_tensor=convert_to_sparse_tensor,
    )

    # Assert that the embeddings are the same type and shape
    assert type(standard_embeddings) is type(multi_embeddings)
    if isinstance(standard_embeddings, (np.ndarray, torch.Tensor)):
        assert standard_embeddings.shape == multi_embeddings.shape
    else:
        assert len(standard_embeddings) == len(multi_embeddings)
        # Check that dictionary items are the same
        if isinstance(standard_embeddings, dict):
            assert standard_embeddings.keys() == multi_embeddings.keys()
            for key in standard_embeddings:
                if isinstance(standard_embeddings[key], torch.Tensor):
                    assert torch.allclose(standard_embeddings[key].cpu(), multi_embeddings[key], atol=1e-5)
                elif isinstance(standard_embeddings[key], np.ndarray):
                    assert np.allclose(standard_embeddings[key], multi_embeddings[key], atol=1e-5)
                else:
                    assert standard_embeddings[key] == multi_embeddings[key]
        elif isinstance(standard_embeddings, list) and len(standard_embeddings) > 0:
            for std_item, multi_item in zip(standard_embeddings, multi_embeddings):
                assert set(std_item.keys()) == set(multi_item.keys())
                for key in std_item:
                    if isinstance(std_item[key], torch.Tensor):
                        assert torch.allclose(std_item[key].cpu(), multi_item[key], atol=1e-5)
                    elif isinstance(std_item[key], np.ndarray):
                        assert np.allclose(std_item[key], multi_item[key], atol=1e-5)
                    else:
                        assert std_item[key] == multi_item[key]


@pytest.mark.slow
def test_multi_process_more_workers_than_texts(splade_bert_tiny_model: SparseEncoder):
    # Test with more workers than texts
    model = splade_bert_tiny_model
    texts = ["First sentence.", "Second sentence."]

    embeddings = model.encode(texts, device=["cpu"] * 3)

    # Should be numpy array with correct shape
    assert isinstance(embeddings, torch.Tensor)
    assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())


@pytest.mark.slow
def test_multi_process_with_large_chunk_size(splade_bert_tiny_model: SparseEncoder):
    # Test with a large chunk size
    model = splade_bert_tiny_model
    texts = ["First sentence.", "Second sentence."] * 10  # 20 sentences

    # Use a large chunk size
    embeddings = model.encode(texts, device=["cpu"] * 2, chunk_size=30)

    # Should produce correct embeddings
    assert isinstance(embeddings, torch.Tensor)
    assert embeddings.shape == (len(texts), model.get_sentence_embedding_dimension())