|
|
|
|
|
|
|
|
|
|
|
from collections.abc import Iterable |
|
|
from typing import TYPE_CHECKING, Any, Optional, TypeVar |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
|
|
|
from .interfaces_base import VllmModelForPooling, is_pooling_model |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from vllm.model_executor.layers.pooler import PoolingType |
|
|
|
|
|
_T = TypeVar("_T", bound=type[nn.Module]) |
|
|
|
|
|
_GENERATE_SUFFIXES = [ |
|
|
"ForCausalLM", |
|
|
"ForConditionalGeneration", |
|
|
"ChatModel", |
|
|
"LMHeadModel", |
|
|
] |
|
|
|
|
|
|
|
|
def _get_pooling_model_name(orig_model_name: str, pooling_suffix: str) -> str: |
|
|
model_name = orig_model_name |
|
|
|
|
|
for generate_suffix in _GENERATE_SUFFIXES: |
|
|
model_name = model_name.removesuffix(generate_suffix) |
|
|
|
|
|
return model_name + pooling_suffix |
|
|
|
|
|
|
|
|
def _create_pooling_model_cls( |
|
|
orig_cls: _T, |
|
|
*, |
|
|
default_pooling_type: "PoolingType", |
|
|
default_normalize: bool, |
|
|
default_softmax: bool, |
|
|
) -> _T: |
|
|
|
|
|
from vllm.config import VllmConfig |
|
|
from vllm.model_executor.layers.pooler import Pooler, PoolerOutput |
|
|
from vllm.model_executor.pooling_metadata import PoolingMetadata |
|
|
|
|
|
from .utils import AutoWeightsLoader, WeightsMapper |
|
|
|
|
|
class ModelForPooling(orig_cls, VllmModelForPooling): |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
*, |
|
|
vllm_config: "VllmConfig", |
|
|
prefix: str = "", |
|
|
**kwargs: Any, |
|
|
) -> None: |
|
|
super().__init__(vllm_config=vllm_config, prefix=prefix, **kwargs) |
|
|
|
|
|
|
|
|
for attr in ("lm_head", "logits_processor"): |
|
|
if hasattr(self, attr): |
|
|
delattr(self, attr) |
|
|
|
|
|
pooler_config = vllm_config.model_config.pooler_config |
|
|
assert pooler_config is not None |
|
|
|
|
|
|
|
|
if not getattr(self, "_pooler", None): |
|
|
self._pooler = Pooler.from_config_with_defaults( |
|
|
pooler_config, |
|
|
pooling_type=default_pooling_type, |
|
|
normalize=default_normalize, |
|
|
softmax=default_softmax, |
|
|
) |
|
|
|
|
|
def pooler( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
pooling_metadata: PoolingMetadata, |
|
|
) -> PoolerOutput: |
|
|
return self._pooler(hidden_states, pooling_metadata) |
|
|
|
|
|
def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]): |
|
|
|
|
|
|
|
|
|
|
|
weights = ((name, data) for name, data in weights |
|
|
if not name.startswith("lm_head.")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if hasattr(self, "model") and hasattr(self.model, "load_weights"): |
|
|
|
|
|
model_is_only_param = all( |
|
|
name == "model" or next(child.parameters(), None) is None |
|
|
for name, child in self.named_children()) |
|
|
|
|
|
if model_is_only_param: |
|
|
mapper = WeightsMapper(orig_to_new_prefix={"model.": ""}) |
|
|
weights = mapper.apply(weights) |
|
|
|
|
|
loaded_params = self.model.load_weights(weights) |
|
|
loaded_params = {f"model.{name}" for name in loaded_params} |
|
|
return loaded_params |
|
|
|
|
|
|
|
|
if hasattr(orig_cls, "load_weights"): |
|
|
return orig_cls.load_weights(self, weights) |
|
|
|
|
|
else: |
|
|
loader = AutoWeightsLoader(self) |
|
|
return loader.load_weights(weights) |
|
|
|
|
|
return ModelForPooling |
|
|
|
|
|
|
|
|
def as_embedding_model(cls: _T) -> _T: |
|
|
""" |
|
|
Subclass an existing vLLM model to support embeddings. |
|
|
|
|
|
By default, the embeddings of the whole prompt are extracted from the |
|
|
normalized hidden state corresponding to the last token. |
|
|
|
|
|
Note: |
|
|
We assume that no extra layers are added to the original model; |
|
|
please implement your own model if this is not the case. |
|
|
""" |
|
|
|
|
|
if is_pooling_model(cls): |
|
|
return cls |
|
|
|
|
|
|
|
|
from vllm.model_executor.layers.pooler import PoolingType |
|
|
|
|
|
ModelForEmbedding = _create_pooling_model_cls( |
|
|
cls, |
|
|
default_pooling_type=PoolingType.LAST, |
|
|
default_normalize=True, |
|
|
default_softmax=False, |
|
|
) |
|
|
ModelForEmbedding.__name__ = \ |
|
|
_get_pooling_model_name(cls.__name__, "ForEmbedding") |
|
|
|
|
|
return ModelForEmbedding |
|
|
|
|
|
|
|
|
def as_classification_model(cls: _T) -> _T: |
|
|
""" |
|
|
Subclass an existing vLLM model to support classification. |
|
|
|
|
|
By default, the class probabilities are extracted from the softmaxed |
|
|
hidden state corresponding to the last token. |
|
|
|
|
|
Note: |
|
|
We assume that the classification head is a single linear layer |
|
|
stored as the attribute `score` of the top-level model; |
|
|
please implement your own model if this is not the case. |
|
|
""" |
|
|
|
|
|
if is_pooling_model(cls): |
|
|
return cls |
|
|
|
|
|
|
|
|
from vllm.config import VllmConfig |
|
|
from vllm.model_executor.layers.linear import RowParallelLinear |
|
|
from vllm.model_executor.layers.pooler import PoolingType |
|
|
from vllm.sequence import IntermediateTensors |
|
|
|
|
|
from .utils import maybe_prefix |
|
|
|
|
|
ModelForPooling = _create_pooling_model_cls( |
|
|
cls, |
|
|
default_pooling_type=PoolingType.LAST, |
|
|
default_normalize=False, |
|
|
default_softmax=True, |
|
|
) |
|
|
|
|
|
class ModelForClassification(ModelForPooling): |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
*, |
|
|
vllm_config: "VllmConfig", |
|
|
prefix: str = "", |
|
|
**kwargs: Any, |
|
|
) -> None: |
|
|
super().__init__(vllm_config=vllm_config, prefix=prefix, **kwargs) |
|
|
|
|
|
config = vllm_config.model_config.hf_config |
|
|
quant_config = vllm_config.quant_config |
|
|
|
|
|
self.score = RowParallelLinear(config.hidden_size, |
|
|
config.num_labels, |
|
|
quant_config=quant_config, |
|
|
input_is_parallel=False, |
|
|
bias=False, |
|
|
prefix=maybe_prefix( |
|
|
prefix, "score")) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
positions: torch.Tensor, |
|
|
intermediate_tensors: Optional[IntermediateTensors] = None, |
|
|
inputs_embeds: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
hidden_states = super().forward(input_ids, positions, |
|
|
intermediate_tensors, |
|
|
inputs_embeds) |
|
|
logits, _ = self.score(hidden_states) |
|
|
return logits |
|
|
|
|
|
|
|
|
ModelForClassification.__name__ = \ |
|
|
_get_pooling_model_name(cls.__name__, "ForClassification") |
|
|
|
|
|
return ModelForClassification |
|
|
|
|
|
|
|
|
def as_reward_model(cls: _T) -> _T: |
|
|
""" |
|
|
Subclass an existing vLLM model to support reward modeling. |
|
|
|
|
|
By default, we return the hidden states of each token directly. |
|
|
|
|
|
Note: |
|
|
We assume that no extra layers are added to the original model; |
|
|
please implement your own model if this is not the case. |
|
|
""" |
|
|
|
|
|
if is_pooling_model(cls): |
|
|
return cls |
|
|
|
|
|
|
|
|
from vllm.model_executor.layers.pooler import PoolingType |
|
|
|
|
|
ModelForReward = _create_pooling_model_cls( |
|
|
cls, |
|
|
default_pooling_type=PoolingType.ALL, |
|
|
default_normalize=False, |
|
|
default_softmax=False, |
|
|
) |
|
|
|
|
|
ModelForReward.__name__ = \ |
|
|
_get_pooling_model_name(cls.__name__, "ForReward") |
|
|
|
|
|
return ModelForReward |
|
|
|