File size: 7,997 Bytes
9bbba62 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | from __future__ import annotations
import numpy as np
import pytest
import torch
from sentence_transformers import CrossEncoder
# These tests fail if optimum.intel.openvino is imported, because openvinotoolkit/nncf
# patches torch._C._nn.gelu in a way that breaks pickling. As a result, we may have issues
# when running both backend tests and multi-process tests in the same session.
@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", (False, True))
@pytest.mark.parametrize("apply_softmax", (False, True))
def test_predict_multi_process(
reranker_bert_tiny_model: CrossEncoder, convert_to_tensor: bool, apply_softmax: bool
) -> None:
model = reranker_bert_tiny_model
pairs = [[f"This is sentence {i}", f"This is another sentence {i}"] for i in range(40)]
# Start the multi-process pool on e.g. two CPU devices & compute the scores using the pool
pool = model.start_multi_process_pool(["cpu", "cpu"])
scores = model.predict(pairs, pool=pool, convert_to_tensor=convert_to_tensor, apply_softmax=apply_softmax)
model.stop_multi_process_pool(pool)
if convert_to_tensor:
assert isinstance(scores, torch.Tensor)
assert scores.shape[0] == len(pairs)
else:
assert isinstance(scores, np.ndarray)
assert scores.shape[0] == len(pairs)
# Make sure the scores aren't just all 0
assert scores.sum() != 0.0
# Compare against normal predictions
scores_normal = model.predict(pairs, convert_to_tensor=convert_to_tensor, apply_softmax=apply_softmax)
if convert_to_tensor:
diff = torch.max(torch.abs(scores - scores_normal))
assert diff < 1e-3
else:
diff = np.max(np.abs(scores - scores_normal))
assert diff < 1e-3
@pytest.mark.slow
def test_multi_process_predict_same_as_standard_predict(reranker_bert_tiny_model: CrossEncoder):
model = reranker_bert_tiny_model
# Test that multi-process prediction gives the same result as standard prediction
pairs = [
["First sentence.", "Second sentence."],
["Second sentence.", "Third sentence."],
["Third sentence.", "Fourth sentence."],
] * 5
# Standard predict
scores_standard = model.predict(pairs)
# Multi-process predict with device=["cpu"] * 2
scores_multi = model.predict(pairs, device=["cpu"] * 2)
# Should produce the same scores
assert np.allclose(scores_standard, scores_multi, atol=1e-6)
@pytest.mark.slow
def test_multi_process_pool(reranker_bert_tiny_model: CrossEncoder):
# Test the start_multi_process_pool and stop_multi_process_pool functions
model = reranker_bert_tiny_model
pairs = [
["First sentence.", "Second sentence."],
["Second sentence.", "Third sentence."],
["Third sentence.", "Fourth sentence."],
] * 5
# Standard predict
scores_standard = model.predict(pairs)
pool = model.start_multi_process_pool(["cpu"] * 2)
try:
# Predict using the pool
scores_multi = model.predict(pairs, pool=pool)
finally:
model.stop_multi_process_pool(pool)
# Should be numpy array with correct shape and the same scores
assert isinstance(scores_multi, np.ndarray)
assert scores_multi.shape == scores_standard.shape
assert np.allclose(scores_standard, scores_multi, atol=1e-6)
@pytest.mark.slow
def test_multi_process_chunk_size(reranker_bert_tiny_model: CrossEncoder):
# Test explicit chunk_size parameter for predict
model = reranker_bert_tiny_model
pairs = [
["First sentence.", "Second sentence."],
["Second sentence.", "Third sentence."],
["Third sentence.", "Fourth sentence."],
] * 10
# Test with explicit chunk size
scores = model.predict(pairs, device=["cpu"] * 2, chunk_size=5)
# Should produce correct scores
assert isinstance(scores, np.ndarray)
assert scores.shape[0] == len(pairs)
@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_numpy", [True, False])
def test_multi_process_with_empty_pairs(
reranker_bert_tiny_model: CrossEncoder, convert_to_tensor: bool, convert_to_numpy: bool
):
# Test predicting with empty pairs
model = reranker_bert_tiny_model
pairs: list[list[str]] = []
# Predict with empty pairs
scores_standard = model.predict(pairs, convert_to_tensor=convert_to_tensor, convert_to_numpy=convert_to_numpy)
scores_multi = model.predict(
pairs,
device=["cpu"] * 2,
convert_to_tensor=convert_to_tensor,
convert_to_numpy=convert_to_numpy,
)
# Should return empty arrays, identical types as without multi-processing
assert type(scores_standard) is type(scores_multi)
if convert_to_tensor:
assert isinstance(scores_standard, torch.Tensor)
assert scores_standard.numel() == 0
elif convert_to_numpy:
assert isinstance(scores_standard, np.ndarray)
assert scores_standard.size == 0
else:
assert isinstance(scores_standard, list)
assert len(scores_standard) == 0
@pytest.mark.slow
@pytest.mark.parametrize("convert_to_tensor", [True, False])
@pytest.mark.parametrize("convert_to_numpy", [True, False])
def test_multi_process_with_single_pair(
reranker_bert_tiny_model: CrossEncoder, convert_to_tensor: bool, convert_to_numpy: bool
):
# Test predicting with a single pair
model = reranker_bert_tiny_model
pair = ["This is a single sentence.", "This is another sentence."]
# Predict with single pair
scores_standard = model.predict(pair, convert_to_tensor=convert_to_tensor, convert_to_numpy=convert_to_numpy)
scores_multi = model.predict(
pair,
device=["cpu"] * 2,
convert_to_tensor=convert_to_tensor,
convert_to_numpy=convert_to_numpy,
)
# Assert that the scores are the same type and shape
assert type(scores_standard) is type(scores_multi)
if isinstance(scores_standard, (np.ndarray, torch.Tensor)):
assert scores_standard.shape == scores_multi.shape
else:
# Scalar outputs for num_labels=1
assert np.allclose(scores_standard, scores_multi, atol=1e-6)
@pytest.mark.slow
def test_multi_process_more_workers_than_pairs(reranker_bert_tiny_model: CrossEncoder):
# Test with more workers than pairs
model = reranker_bert_tiny_model
pairs = [["First sentence.", "Second sentence."], ["Second sentence.", "Third sentence."]]
scores = model.predict(pairs, device=["cpu"] * 3)
# Should be numpy array with correct shape
assert isinstance(scores, np.ndarray)
assert scores.shape[0] == len(pairs)
@pytest.mark.slow
def test_multi_process_with_large_chunk_size(reranker_bert_tiny_model: CrossEncoder):
# Test with a large chunk size
model = reranker_bert_tiny_model
pairs = [["First sentence.", "Second sentence."]] * 20 # 20 pairs
# Use a large chunk size
scores = model.predict(pairs, device=["cpu"] * 2, chunk_size=30)
# Should produce correct scores
assert isinstance(scores, np.ndarray)
assert scores.shape[0] == len(pairs)
@pytest.mark.slow
@pytest.mark.skipif(
not torch.cuda.is_available(), reason="CUDA must be available to experiment with 2 separate devices"
)
def test_multi_process_output_tensors_two_devices(reranker_bert_tiny_model: CrossEncoder):
# Test with two separate devices
model = reranker_bert_tiny_model
pairs = [["First sentence.", "Second sentence."], ["Second sentence.", "Third sentence."]]
# Ensure that scores are moved to CPU so they can be concatenated
scores = model.predict(pairs, device=["cpu", "cuda"], convert_to_tensor=True)
assert isinstance(scores, torch.Tensor)
assert scores.device.type == "cpu"
assert scores.shape[0] == len(pairs)
# But the default is still just numpy
scores = model.predict(pairs, device=["cpu", "cuda"])
assert isinstance(scores, np.ndarray)
assert scores.shape[0] == len(pairs)
|