File size: 7,997 Bytes
9bbba62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from __future__ import annotations

import numpy as np
import pytest
import torch

from sentence_transformers import CrossEncoder

# These tests fail if optimum.intel.openvino is imported, because openvinotoolkit/nncf
# patches torch._C._nn.gelu in a way that breaks pickling. As a result, we may have issues
# when running both backend tests and multi-process tests in the same session.


@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", (False, True))
@pytest.mark.parametrize("apply_softmax", (False, True))
def test_predict_multi_process(
    reranker_bert_tiny_model: CrossEncoder, convert_to_tensor: bool, apply_softmax: bool
) -> None:
    model = reranker_bert_tiny_model
    pairs = [[f"This is sentence {i}", f"This is another sentence {i}"] for i in range(40)]

    # Start the multi-process pool on e.g. two CPU devices & compute the scores using the pool
    pool = model.start_multi_process_pool(["cpu", "cpu"])
    scores = model.predict(pairs, pool=pool, convert_to_tensor=convert_to_tensor, apply_softmax=apply_softmax)
    model.stop_multi_process_pool(pool)

    if convert_to_tensor:
        assert isinstance(scores, torch.Tensor)
        assert scores.shape[0] == len(pairs)
    else:
        assert isinstance(scores, np.ndarray)
        assert scores.shape[0] == len(pairs)

    # Make sure the scores aren't just all 0
    assert scores.sum() != 0.0

    # Compare against normal predictions
    scores_normal = model.predict(pairs, convert_to_tensor=convert_to_tensor, apply_softmax=apply_softmax)
    if convert_to_tensor:
        diff = torch.max(torch.abs(scores - scores_normal))
        assert diff < 1e-3
    else:
        diff = np.max(np.abs(scores - scores_normal))
        assert diff < 1e-3


@pytest.mark.slow
def test_multi_process_predict_same_as_standard_predict(reranker_bert_tiny_model: CrossEncoder):
    model = reranker_bert_tiny_model
    # Test that multi-process prediction gives the same result as standard prediction
    pairs = [
        ["First sentence.", "Second sentence."],
        ["Second sentence.", "Third sentence."],
        ["Third sentence.", "Fourth sentence."],
    ] * 5

    # Standard predict
    scores_standard = model.predict(pairs)

    # Multi-process predict with device=["cpu"] * 2
    scores_multi = model.predict(pairs, device=["cpu"] * 2)

    # Should produce the same scores
    assert np.allclose(scores_standard, scores_multi, atol=1e-6)


@pytest.mark.slow
def test_multi_process_pool(reranker_bert_tiny_model: CrossEncoder):
    # Test the start_multi_process_pool and stop_multi_process_pool functions
    model = reranker_bert_tiny_model
    pairs = [
        ["First sentence.", "Second sentence."],
        ["Second sentence.", "Third sentence."],
        ["Third sentence.", "Fourth sentence."],
    ] * 5

    # Standard predict
    scores_standard = model.predict(pairs)

    pool = model.start_multi_process_pool(["cpu"] * 2)
    try:
        # Predict using the pool
        scores_multi = model.predict(pairs, pool=pool)

    finally:
        model.stop_multi_process_pool(pool)

    # Should be numpy array with correct shape and the same scores
    assert isinstance(scores_multi, np.ndarray)
    assert scores_multi.shape == scores_standard.shape
    assert np.allclose(scores_standard, scores_multi, atol=1e-6)


@pytest.mark.slow
def test_multi_process_chunk_size(reranker_bert_tiny_model: CrossEncoder):
    # Test explicit chunk_size parameter for predict
    model = reranker_bert_tiny_model
    pairs = [
        ["First sentence.", "Second sentence."],
        ["Second sentence.", "Third sentence."],
        ["Third sentence.", "Fourth sentence."],
    ] * 10

    # Test with explicit chunk size
    scores = model.predict(pairs, device=["cpu"] * 2, chunk_size=5)

    # Should produce correct scores
    assert isinstance(scores, np.ndarray)
    assert scores.shape[0] == len(pairs)


@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_numpy", [True, False])
def test_multi_process_with_empty_pairs(
    reranker_bert_tiny_model: CrossEncoder, convert_to_tensor: bool, convert_to_numpy: bool
):
    # Test predicting with empty pairs
    model = reranker_bert_tiny_model
    pairs: list[list[str]] = []

    # Predict with empty pairs
    scores_standard = model.predict(pairs, convert_to_tensor=convert_to_tensor, convert_to_numpy=convert_to_numpy)
    scores_multi = model.predict(
        pairs,
        device=["cpu"] * 2,
        convert_to_tensor=convert_to_tensor,
        convert_to_numpy=convert_to_numpy,
    )

    # Should return empty arrays, identical types as without multi-processing
    assert type(scores_standard) is type(scores_multi)
    if convert_to_tensor:
        assert isinstance(scores_standard, torch.Tensor)
        assert scores_standard.numel() == 0
    elif convert_to_numpy:
        assert isinstance(scores_standard, np.ndarray)
        assert scores_standard.size == 0
    else:
        assert isinstance(scores_standard, list)
        assert len(scores_standard) == 0


@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_numpy", [True, False])
def test_multi_process_with_single_pair(
    reranker_bert_tiny_model: CrossEncoder, convert_to_tensor: bool, convert_to_numpy: bool
):
    # Test predicting with a single pair
    model = reranker_bert_tiny_model
    pair = ["This is a single sentence.", "This is another sentence."]

    # Predict with single pair
    scores_standard = model.predict(pair, convert_to_tensor=convert_to_tensor, convert_to_numpy=convert_to_numpy)
    scores_multi = model.predict(
        pair,
        device=["cpu"] * 2,
        convert_to_tensor=convert_to_tensor,
        convert_to_numpy=convert_to_numpy,
    )

    # Assert that the scores are the same type and shape
    assert type(scores_standard) is type(scores_multi)
    if isinstance(scores_standard, (np.ndarray, torch.Tensor)):
        assert scores_standard.shape == scores_multi.shape
    else:
        # Scalar outputs for num_labels=1
        assert np.allclose(scores_standard, scores_multi, atol=1e-6)


@pytest.mark.slow
def test_multi_process_more_workers_than_pairs(reranker_bert_tiny_model: CrossEncoder):
    # Test with more workers than pairs
    model = reranker_bert_tiny_model
    pairs = [["First sentence.", "Second sentence."], ["Second sentence.", "Third sentence."]]

    scores = model.predict(pairs, device=["cpu"] * 3)

    # Should be numpy array with correct shape
    assert isinstance(scores, np.ndarray)
    assert scores.shape[0] == len(pairs)


@pytest.mark.slow
def test_multi_process_with_large_chunk_size(reranker_bert_tiny_model: CrossEncoder):
    # Test with a large chunk size
    model = reranker_bert_tiny_model
    pairs = [["First sentence.", "Second sentence."]] * 20  # 20 pairs

    # Use a large chunk size
    scores = model.predict(pairs, device=["cpu"] * 2, chunk_size=30)

    # Should produce correct scores
    assert isinstance(scores, np.ndarray)
    assert scores.shape[0] == len(pairs)


@pytest.mark.slow
@pytest.mark.skipif(
    not torch.cuda.is_available(), reason="CUDA must be available to experiment with 2 separate devices"
)
def test_multi_process_output_tensors_two_devices(reranker_bert_tiny_model: CrossEncoder):
    # Test with two separate devices
    model = reranker_bert_tiny_model
    pairs = [["First sentence.", "Second sentence."], ["Second sentence.", "Third sentence."]]

    # Ensure that scores are moved to CPU so they can be concatenated
    scores = model.predict(pairs, device=["cpu", "cuda"], convert_to_tensor=True)
    assert isinstance(scores, torch.Tensor)
    assert scores.device.type == "cpu"
    assert scores.shape[0] == len(pairs)

    # But the default is still just numpy
    scores = model.predict(pairs, device=["cpu", "cuda"])
    assert isinstance(scores, np.ndarray)
    assert scores.shape[0] == len(pairs)