Sarthak
chore: moved model2vec as in internal package
473c3a0
from __future__ import annotations
import re
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, TypeVar
import huggingface_hub
import numpy as np
import skops.io
from sklearn.metrics import classification_report
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from distiller.model2vec.hf_utils import _create_model_card
from distiller.model2vec.model import PathLike, StaticModel
if TYPE_CHECKING:
from collections.abc import Sequence
from sklearn.pipeline import Pipeline
_DEFAULT_TRUST_PATTERN = re.compile(r"sklearn\..+")
_DEFAULT_MODEL_FILENAME = "pipeline.skops"
LabelType = TypeVar("LabelType", list[str], list[list[str]])
class StaticModelPipeline:
def __init__(self, model: StaticModel, head: Pipeline) -> None:
"""Create a pipeline with a StaticModel encoder."""
self.model = model
self.head = head
classifier = self.head[-1]
# Check if the classifier is a multilabel classifier.
# NOTE: this doesn't look robust, but it is.
# Different classifiers, such as OVR wrappers, support multilabel output natively, so we
# can just use predict.
self.multilabel = False
if isinstance(classifier, MLPClassifier) and classifier.out_activation_ == "logistic":
self.multilabel = True
@property
def classes_(self) -> np.ndarray:
"""The classes of the classifier."""
return self.head.classes_
@classmethod
def from_pretrained(
cls: type[StaticModelPipeline], path: PathLike, token: str | None = None, trust_remote_code: bool = False
) -> StaticModelPipeline:
"""
Load a StaticModel from a local path or huggingface hub path.
NOTE: if you load a private model from the huggingface hub, you need to pass a token.
:param path: The path to the folder containing the pipeline, or a repository on the Hugging Face Hub
:param token: The token to use to download the pipeline from the hub.
:param trust_remote_code: Whether to trust the remote code. If this is False, we will only load components coming from `sklearn`.
:return: The loaded pipeline.
"""
model, head = _load_pipeline(path, token, trust_remote_code)
model.embedding = np.nan_to_num(model.embedding)
return cls(model, head)
def save_pretrained(self, path: str) -> None:
"""Save the model to a folder."""
save_pipeline(self, path)
def push_to_hub(
self, repo_id: str, subfolder: str | None = None, token: str | None = None, private: bool = False
) -> None:
"""
Save a model to a folder, and then push that folder to the hf hub.
:param repo_id: The id of the repository to push to.
:param subfolder: The subfolder to push to.
:param token: The token to use to push to the hub.
:param private: Whether the repository should be private.
"""
from distiller.model2vec.hf_utils import push_folder_to_hub
with TemporaryDirectory() as temp_dir:
save_pipeline(self, temp_dir)
self.model.save_pretrained(temp_dir)
push_folder_to_hub(Path(temp_dir), subfolder, repo_id, private, token)
def _encode_and_coerce_to_2d(
self,
X: Sequence[str],
show_progress_bar: bool,
max_length: int | None,
batch_size: int,
use_multiprocessing: bool,
multiprocessing_threshold: int,
) -> np.ndarray:
"""Encode the instances and coerce the output to a matrix."""
encoded = self.model.encode(
X,
show_progress_bar=show_progress_bar,
max_length=max_length,
batch_size=batch_size,
use_multiprocessing=use_multiprocessing,
multiprocessing_threshold=multiprocessing_threshold,
)
if np.ndim(encoded) == 1:
encoded = encoded[None, :]
return encoded
def predict(
self,
X: Sequence[str],
show_progress_bar: bool = False,
max_length: int | None = 512,
batch_size: int = 1024,
use_multiprocessing: bool = True,
multiprocessing_threshold: int = 10_000,
threshold: float = 0.5,
) -> np.ndarray:
"""
Predict the labels of the input.
:param X: The input data to predict. Can be a list of strings or a single string.
:param show_progress_bar: Whether to display a progress bar during prediction. Defaults to False.
:param max_length: The maximum length of the input sequences. Defaults to 512.
:param batch_size: The batch size for prediction. Defaults to 1024.
:param use_multiprocessing: Whether to use multiprocessing for encoding. Defaults to True.
:param multiprocessing_threshold: The threshold for the number of samples to use multiprocessing. Defaults to 10,000.
:param threshold: The threshold for multilabel classification. Defaults to 0.5. Ignored if not multilabel.
:return: The predicted labels or probabilities.
"""
encoded = self._encode_and_coerce_to_2d(
X,
show_progress_bar=show_progress_bar,
max_length=max_length,
batch_size=batch_size,
use_multiprocessing=use_multiprocessing,
multiprocessing_threshold=multiprocessing_threshold,
)
if self.multilabel:
out_labels = []
proba = self.head.predict_proba(encoded)
for vector in proba:
out_labels.append(self.classes_[vector > threshold])
return np.asarray(out_labels, dtype=object)
return self.head.predict(encoded)
def predict_proba(
self,
X: Sequence[str],
show_progress_bar: bool = False,
max_length: int | None = 512,
batch_size: int = 1024,
use_multiprocessing: bool = True,
multiprocessing_threshold: int = 10_000,
) -> np.ndarray:
"""
Predict the labels of the input.
:param X: The input data to predict. Can be a list of strings or a single string.
:param show_progress_bar: Whether to display a progress bar during prediction. Defaults to False.
:param max_length: The maximum length of the input sequences. Defaults to 512.
:param batch_size: The batch size for prediction. Defaults to 1024.
:param use_multiprocessing: Whether to use multiprocessing for encoding. Defaults to True.
:param multiprocessing_threshold: The threshold for the number of samples to use multiprocessing. Defaults to 10,000.
:return: The predicted labels or probabilities.
"""
encoded = self._encode_and_coerce_to_2d(
X,
show_progress_bar=show_progress_bar,
max_length=max_length,
batch_size=batch_size,
use_multiprocessing=use_multiprocessing,
multiprocessing_threshold=multiprocessing_threshold,
)
return self.head.predict_proba(encoded)
def evaluate(
self, X: Sequence[str], y: LabelType, batch_size: int = 1024, threshold: float = 0.5, output_dict: bool = False
) -> str | dict[str, dict[str, float]]:
"""
Evaluate the classifier on a given dataset using scikit-learn's classification report.
:param X: The texts to predict on.
:param y: The ground truth labels.
:param batch_size: The batch size.
:param threshold: The threshold for multilabel classification.
:param output_dict: Whether to output the classification report as a dictionary.
:return: A classification report.
"""
predictions = self.predict(X, show_progress_bar=True, batch_size=batch_size, threshold=threshold)
return evaluate_single_or_multi_label(predictions=predictions, y=y, output_dict=output_dict)
def _load_pipeline(
folder_or_repo_path: PathLike, token: str | None = None, trust_remote_code: bool = False
) -> tuple[StaticModel, Pipeline]:
"""
Load a model and an sklearn pipeline.
This assumes the following files are present in the repo:
- `pipeline.skops`: The head of the pipeline.
- `config.json`: The configuration of the model.
- `model.safetensors`: The weights of the model.
- `tokenizer.json`: The tokenizer of the model.
:param folder_or_repo_path: The path to the folder containing the pipeline.
:param token: The token to use to download the pipeline from the hub. If this is None, you will only
be able to load the pipeline from a local folder, public repository, or a repository that you have access to
because you are logged in.
:param trust_remote_code: Whether to trust the remote code. If this is False,
we will only load components coming from `sklearn`. If this is True, we will load all components.
If you set this to True, you are responsible for whatever happens.
:return: The encoder model and the loaded head
:raises FileNotFoundError: If the pipeline file does not exist in the folder.
:raises ValueError: If an untrusted type is found in the pipeline, and `trust_remote_code` is False.
"""
folder_or_repo_path = Path(folder_or_repo_path)
model_filename = _DEFAULT_MODEL_FILENAME
head_pipeline_path: str | Path
if folder_or_repo_path.exists():
head_pipeline_path = folder_or_repo_path / model_filename
if not head_pipeline_path.exists():
msg = f"Pipeline file does not exist in {folder_or_repo_path}"
raise FileNotFoundError(msg)
else:
head_pipeline_path = huggingface_hub.hf_hub_download(
folder_or_repo_path.as_posix(), model_filename, token=token
)
model = StaticModel.from_pretrained(folder_or_repo_path)
unknown_types = skops.io.get_untrusted_types(file=head_pipeline_path)
# If the user does not trust remote code, we should check that the unknown types are trusted.
# By default, we trust everything coming from scikit-learn.
if not trust_remote_code:
for t in unknown_types:
if not _DEFAULT_TRUST_PATTERN.match(t):
msg = f"Untrusted type {t}."
raise ValueError(msg)
head = skops.io.load(head_pipeline_path, trusted=unknown_types)
return model, head
def save_pipeline(pipeline: StaticModelPipeline, folder_path: str | Path) -> None:
"""
Save a pipeline to a folder.
:param pipeline: The pipeline to save.
:param folder_path: The path to the folder to save the pipeline to.
"""
folder_path = Path(folder_path)
folder_path.mkdir(parents=True, exist_ok=True)
model_filename = _DEFAULT_MODEL_FILENAME
head_pipeline_path = folder_path / model_filename
skops.io.dump(pipeline.head, head_pipeline_path)
pipeline.model.save_pretrained(folder_path)
base_model_name = pipeline.model.base_model_name
if isinstance(base_model_name, list) and base_model_name:
name = base_model_name[0]
elif isinstance(base_model_name, str):
name = base_model_name
else:
name = "unknown"
_create_model_card(
folder_path,
base_model_name=name,
language=pipeline.model.language,
template_path="modelcards/classifier_template.md",
)
def _is_multi_label_shaped(y: LabelType) -> bool:
"""Check if the labels are in a multi-label shape."""
return isinstance(y, (list, tuple)) and len(y) > 0 and isinstance(y[0], (list, tuple, set))
def evaluate_single_or_multi_label(
predictions: np.ndarray,
y: LabelType,
output_dict: bool = False,
) -> str | dict[str, dict[str, float]]:
"""
Evaluate the classifier on a given dataset using scikit-learn's classification report.
:param predictions: The predictions.
:param y: The ground truth labels.
:param output_dict: Whether to output the classification report as a dictionary.
:return: A classification report.
"""
if _is_multi_label_shaped(y):
classes = sorted({label for labels in y for label in labels})
mlb = MultiLabelBinarizer(classes=classes)
y = mlb.fit_transform(y)
predictions = mlb.transform(predictions)
elif isinstance(y[0], (str, int)):
classes = sorted(set(y))
return classification_report(
y,
predictions,
output_dict=output_dict,
zero_division=0,
)