lsmpp's picture
Add files using upload-large-folder tool
bd33eac verified
from __future__ import annotations
import csv
import gzip
import os
from collections.abc import Generator
import pytest
from datasets import load_dataset
from sentence_transformers import SparseEncoder, SparseEncoderTrainer, SparseEncoderTrainingArguments, util
from sentence_transformers.readers import InputExample
from sentence_transformers.sparse_encoder import losses
from sentence_transformers.sparse_encoder.evaluation import SparseEmbeddingSimilarityEvaluator
from sentence_transformers.util import is_training_available
if not is_training_available():
pytest.skip(
reason='Sentence Transformers was not installed with the `["train"]` extra.',
allow_module_level=True,
)
@pytest.fixture()
def sts_resource() -> Generator[tuple[list[InputExample], list[InputExample]], None, None]:
sts_dataset_path = "datasets/stsbenchmark.tsv.gz"
if not os.path.exists(sts_dataset_path):
util.http_get("https://sbert.net/datasets/stsbenchmark.tsv.gz", sts_dataset_path)
stsb_train_samples = []
stsb_test_samples = []
with gzip.open(sts_dataset_path, "rt", encoding="utf8") as fIn:
reader = csv.DictReader(fIn, delimiter="\t", quoting=csv.QUOTE_NONE)
for row in reader:
score = float(row["score"]) / 5.0
inp_example = InputExample(texts=[row["sentence1"], row["sentence2"]], label=score)
if row["split"] == "test":
stsb_test_samples.append(inp_example)
elif row["split"] == "train":
stsb_train_samples.append(inp_example)
yield stsb_train_samples, stsb_test_samples
@pytest.fixture()
def dummy_sparse_encoder_model() -> SparseEncoder:
return SparseEncoder("sparse-encoder-testing/splade-bert-tiny-nq")
def evaluate_stsb_test(
model: SparseEncoder,
expected_score: float,
test_samples: list[InputExample],
num_test_samples: int = -1,
) -> None:
test_s1 = [s.texts[0] for s in test_samples[:num_test_samples]]
test_s2 = [s.texts[1] for s in test_samples[:num_test_samples]]
test_labels = [s.label for s in test_samples[:num_test_samples]]
evaluator = SparseEmbeddingSimilarityEvaluator(
sentences1=test_s1,
sentences2=test_s2,
scores=test_labels,
max_active_dims=64,
)
scores_dict = evaluator(model)
assert evaluator.primary_metric, "Could not find spearman cosine correlation metric in evaluator output"
score = scores_dict[evaluator.primary_metric] * 100
print(f"STS-Test Performance: {score:.2f} vs. exp: {expected_score:.2f}")
assert score > expected_score or abs(score - expected_score) < 0.5 # Looser tolerance for sparse models initially
@pytest.mark.slow
def test_train_stsb_slow(
dummy_sparse_encoder_model: SparseEncoder, sts_resource: tuple[list[InputExample], list[InputExample]], tmp_path
) -> None:
model = dummy_sparse_encoder_model
sts_train_samples, sts_test_samples = sts_resource
train_dataset = (
load_dataset("sentence-transformers/stsb", split="train")
.map(
lambda batch: {
"sentence1": batch["sentence1"],
"sentence2": batch["sentence2"],
"score": [s / 5.0 for s in batch["score"]],
},
batched=True,
)
.select(range(len(sts_train_samples)))
)
loss = losses.SpladeLoss(
model=model,
loss=losses.SparseMultipleNegativesRankingLoss(model=model),
document_regularizer_weight=3e-5,
query_regularizer_weight=5e-5,
)
training_args = SparseEncoderTrainingArguments(
output_dir=tmp_path,
num_train_epochs=1,
per_device_train_batch_size=16, # Smaller batch for faster test
warmup_ratio=0.1,
logging_steps=10,
eval_strategy="no",
save_strategy="no",
learning_rate=2e-5,
remove_unused_columns=False, # Important when using custom datasets
)
trainer = SparseEncoderTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
loss=loss,
)
trainer.train()
evaluate_stsb_test(model, 10, sts_test_samples) # Lower expected score for a short training
def test_train_stsb(
dummy_sparse_encoder_model: SparseEncoder, sts_resource: tuple[list[InputExample], list[InputExample]]
) -> None:
model = dummy_sparse_encoder_model
sts_train_samples, sts_test_samples = sts_resource
train_samples_subset = sts_train_samples[:100]
train_dict = {"sentence1": [], "sentence2": [], "score": []}
for example in train_samples_subset:
train_dict["sentence1"].append(example.texts[0])
train_dict["sentence2"].append(example.texts[1])
train_dict["score"].append(example.label)
from datasets import Dataset
train_dataset = Dataset.from_dict(train_dict)
loss = losses.SpladeLoss(
model=model,
loss=losses.SparseMultipleNegativesRankingLoss(model=model),
document_regularizer_weight=3e-5,
query_regularizer_weight=5e-5,
)
training_args = SparseEncoderTrainingArguments(
output_dir="runs/sparse_stsb_test_output",
num_train_epochs=1,
per_device_train_batch_size=8, # Even smaller batch
warmup_ratio=0.1,
logging_steps=5,
# eval_strategy="steps", # No eval during this very short training
# eval_steps=20,
save_strategy="no", # No saving for this quick test
# save_steps=20,
learning_rate=2e-5,
remove_unused_columns=False,
)
trainer = SparseEncoderTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
loss=loss,
)
trainer.train()
evaluate_stsb_test(model, 5, sts_test_samples, num_test_samples=50) # Very low expectation