| |
| |
|
|
| from numbers import Integral, Real |
|
|
| import numpy as np |
| from scipy import optimize |
|
|
| from ..base import BaseEstimator, RegressorMixin, _fit_context |
| from ..utils._mask import axis0_safe_slice |
| from ..utils._param_validation import Interval |
| from ..utils.extmath import safe_sparse_dot |
| from ..utils.optimize import _check_optimize_result |
| from ..utils.validation import _check_sample_weight, validate_data |
| from ._base import LinearModel |
|
|
|
|
| def _huber_loss_and_gradient(w, X, y, epsilon, alpha, sample_weight=None): |
| """Returns the Huber loss and the gradient. |
| |
| Parameters |
| ---------- |
| w : ndarray, shape (n_features + 1,) or (n_features + 2,) |
| Feature vector. |
| w[:n_features] gives the coefficients |
| w[-1] gives the scale factor and if the intercept is fit w[-2] |
| gives the intercept factor. |
| |
| X : ndarray of shape (n_samples, n_features) |
| Input data. |
| |
| y : ndarray of shape (n_samples,) |
| Target vector. |
| |
| epsilon : float |
| Robustness of the Huber estimator. |
| |
| alpha : float |
| Regularization parameter. |
| |
| sample_weight : ndarray of shape (n_samples,), default=None |
| Weight assigned to each sample. |
| |
| Returns |
| ------- |
| loss : float |
| Huber loss. |
| |
| gradient : ndarray, shape (len(w)) |
| Returns the derivative of the Huber loss with respect to each |
| coefficient, intercept and the scale as a vector. |
| """ |
| _, n_features = X.shape |
| fit_intercept = n_features + 2 == w.shape[0] |
| if fit_intercept: |
| intercept = w[-2] |
| sigma = w[-1] |
| w = w[:n_features] |
| n_samples = np.sum(sample_weight) |
|
|
| |
| |
| linear_loss = y - safe_sparse_dot(X, w) |
| if fit_intercept: |
| linear_loss -= intercept |
| abs_linear_loss = np.abs(linear_loss) |
| outliers_mask = abs_linear_loss > epsilon * sigma |
|
|
| |
| |
| outliers = abs_linear_loss[outliers_mask] |
| num_outliers = np.count_nonzero(outliers_mask) |
| n_non_outliers = X.shape[0] - num_outliers |
|
|
| |
| |
| outliers_sw = sample_weight[outliers_mask] |
| n_sw_outliers = np.sum(outliers_sw) |
| outlier_loss = ( |
| 2.0 * epsilon * np.sum(outliers_sw * outliers) |
| - sigma * n_sw_outliers * epsilon**2 |
| ) |
|
|
| |
| |
| non_outliers = linear_loss[~outliers_mask] |
| weighted_non_outliers = sample_weight[~outliers_mask] * non_outliers |
| weighted_loss = np.dot(weighted_non_outliers.T, non_outliers) |
| squared_loss = weighted_loss / sigma |
|
|
| if fit_intercept: |
| grad = np.zeros(n_features + 2) |
| else: |
| grad = np.zeros(n_features + 1) |
|
|
| |
| X_non_outliers = -axis0_safe_slice(X, ~outliers_mask, n_non_outliers) |
| grad[:n_features] = ( |
| 2.0 / sigma * safe_sparse_dot(weighted_non_outliers, X_non_outliers) |
| ) |
|
|
| |
| signed_outliers = np.ones_like(outliers) |
| signed_outliers_mask = linear_loss[outliers_mask] < 0 |
| signed_outliers[signed_outliers_mask] = -1.0 |
| X_outliers = axis0_safe_slice(X, outliers_mask, num_outliers) |
| sw_outliers = sample_weight[outliers_mask] * signed_outliers |
| grad[:n_features] -= 2.0 * epsilon * (safe_sparse_dot(sw_outliers, X_outliers)) |
|
|
| |
| grad[:n_features] += alpha * 2.0 * w |
|
|
| |
| grad[-1] = n_samples |
| grad[-1] -= n_sw_outliers * epsilon**2 |
| grad[-1] -= squared_loss / sigma |
|
|
| |
| if fit_intercept: |
| grad[-2] = -2.0 * np.sum(weighted_non_outliers) / sigma |
| grad[-2] -= 2.0 * epsilon * np.sum(sw_outliers) |
|
|
| loss = n_samples * sigma + squared_loss + outlier_loss |
| loss += alpha * np.dot(w, w) |
| return loss, grad |
|
|
|
|
| class HuberRegressor(LinearModel, RegressorMixin, BaseEstimator): |
| """L2-regularized linear regression model that is robust to outliers. |
| |
| The Huber Regressor optimizes the squared loss for the samples where |
| ``|(y - Xw - c) / sigma| < epsilon`` and the absolute loss for the samples |
| where ``|(y - Xw - c) / sigma| > epsilon``, where the model coefficients |
| ``w``, the intercept ``c`` and the scale ``sigma`` are parameters |
| to be optimized. The parameter `sigma` makes sure that if `y` is scaled up |
| or down by a certain factor, one does not need to rescale `epsilon` to |
| achieve the same robustness. Note that this does not take into account |
| the fact that the different features of `X` may be of different scales. |
| |
| The Huber loss function has the advantage of not being heavily influenced |
| by the outliers while not completely ignoring their effect. |
| |
| Read more in the :ref:`User Guide <huber_regression>` |
| |
| .. versionadded:: 0.18 |
| |
| Parameters |
| ---------- |
| epsilon : float, default=1.35 |
| The parameter epsilon controls the number of samples that should be |
| classified as outliers. The smaller the epsilon, the more robust it is |
| to outliers. Epsilon must be in the range `[1, inf)`. |
| |
| max_iter : int, default=100 |
| Maximum number of iterations that |
| ``scipy.optimize.minimize(method="L-BFGS-B")`` should run for. |
| |
| alpha : float, default=0.0001 |
| Strength of the squared L2 regularization. Note that the penalty is |
| equal to ``alpha * ||w||^2``. |
| Must be in the range `[0, inf)`. |
| |
| warm_start : bool, default=False |
| This is useful if the stored attributes of a previously used model |
| has to be reused. If set to False, then the coefficients will |
| be rewritten for every call to fit. |
| See :term:`the Glossary <warm_start>`. |
| |
| fit_intercept : bool, default=True |
| Whether or not to fit the intercept. This can be set to False |
| if the data is already centered around the origin. |
| |
| tol : float, default=1e-05 |
| The iteration will stop when |
| ``max{|proj g_i | i = 1, ..., n}`` <= ``tol`` |
| where pg_i is the i-th component of the projected gradient. |
| |
| Attributes |
| ---------- |
| coef_ : array, shape (n_features,) |
| Features got by optimizing the L2-regularized Huber loss. |
| |
| intercept_ : float |
| Bias. |
| |
| scale_ : float |
| The value by which ``|y - Xw - c|`` is scaled down. |
| |
| n_features_in_ : int |
| Number of features seen during :term:`fit`. |
| |
| .. versionadded:: 0.24 |
| |
| feature_names_in_ : ndarray of shape (`n_features_in_`,) |
| Names of features seen during :term:`fit`. Defined only when `X` |
| has feature names that are all strings. |
| |
| .. versionadded:: 1.0 |
| |
| n_iter_ : int |
| Number of iterations that |
| ``scipy.optimize.minimize(method="L-BFGS-B")`` has run for. |
| |
| .. versionchanged:: 0.20 |
| |
| In SciPy <= 1.0.0 the number of lbfgs iterations may exceed |
| ``max_iter``. ``n_iter_`` will now report at most ``max_iter``. |
| |
| outliers_ : array, shape (n_samples,) |
| A boolean mask which is set to True where the samples are identified |
| as outliers. |
| |
| See Also |
| -------- |
| RANSACRegressor : RANSAC (RANdom SAmple Consensus) algorithm. |
| TheilSenRegressor : Theil-Sen Estimator robust multivariate regression model. |
| SGDRegressor : Fitted by minimizing a regularized empirical loss with SGD. |
| |
| References |
| ---------- |
| .. [1] Peter J. Huber, Elvezio M. Ronchetti, Robust Statistics |
| Concomitant scale estimates, p. 172 |
| .. [2] Art B. Owen (2006), `A robust hybrid of lasso and ridge regression. |
| <https://artowen.su.domains/reports/hhu.pdf>`_ |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.linear_model import HuberRegressor, LinearRegression |
| >>> from sklearn.datasets import make_regression |
| >>> rng = np.random.RandomState(0) |
| >>> X, y, coef = make_regression( |
| ... n_samples=200, n_features=2, noise=4.0, coef=True, random_state=0) |
| >>> X[:4] = rng.uniform(10, 20, (4, 2)) |
| >>> y[:4] = rng.uniform(10, 20, 4) |
| >>> huber = HuberRegressor().fit(X, y) |
| >>> huber.score(X, y) |
| -7.284... |
| >>> huber.predict(X[:1,]) |
| array([806.7200...]) |
| >>> linear = LinearRegression().fit(X, y) |
| >>> print("True coefficients:", coef) |
| True coefficients: [20.4923... 34.1698...] |
| >>> print("Huber coefficients:", huber.coef_) |
| Huber coefficients: [17.7906... 31.0106...] |
| >>> print("Linear Regression coefficients:", linear.coef_) |
| Linear Regression coefficients: [-1.9221... 7.0226...] |
| """ |
|
|
| _parameter_constraints: dict = { |
| "epsilon": [Interval(Real, 1.0, None, closed="left")], |
| "max_iter": [Interval(Integral, 0, None, closed="left")], |
| "alpha": [Interval(Real, 0, None, closed="left")], |
| "warm_start": ["boolean"], |
| "fit_intercept": ["boolean"], |
| "tol": [Interval(Real, 0.0, None, closed="left")], |
| } |
|
|
| def __init__( |
| self, |
| *, |
| epsilon=1.35, |
| max_iter=100, |
| alpha=0.0001, |
| warm_start=False, |
| fit_intercept=True, |
| tol=1e-05, |
| ): |
| self.epsilon = epsilon |
| self.max_iter = max_iter |
| self.alpha = alpha |
| self.warm_start = warm_start |
| self.fit_intercept = fit_intercept |
| self.tol = tol |
|
|
| @_fit_context(prefer_skip_nested_validation=True) |
| def fit(self, X, y, sample_weight=None): |
| """Fit the model according to the given training data. |
| |
| Parameters |
| ---------- |
| X : array-like, shape (n_samples, n_features) |
| Training vector, where `n_samples` is the number of samples and |
| `n_features` is the number of features. |
| |
| y : array-like, shape (n_samples,) |
| Target vector relative to X. |
| |
| sample_weight : array-like, shape (n_samples,) |
| Weight given to each sample. |
| |
| Returns |
| ------- |
| self : object |
| Fitted `HuberRegressor` estimator. |
| """ |
| X, y = validate_data( |
| self, |
| X, |
| y, |
| copy=False, |
| accept_sparse=["csr"], |
| y_numeric=True, |
| dtype=[np.float64, np.float32], |
| ) |
|
|
| sample_weight = _check_sample_weight(sample_weight, X) |
|
|
| if self.warm_start and hasattr(self, "coef_"): |
| parameters = np.concatenate((self.coef_, [self.intercept_, self.scale_])) |
| else: |
| if self.fit_intercept: |
| parameters = np.zeros(X.shape[1] + 2) |
| else: |
| parameters = np.zeros(X.shape[1] + 1) |
| |
| |
| parameters[-1] = 1 |
|
|
| |
| |
| |
| bounds = np.tile([-np.inf, np.inf], (parameters.shape[0], 1)) |
| bounds[-1][0] = np.finfo(np.float64).eps * 10 |
|
|
| opt_res = optimize.minimize( |
| _huber_loss_and_gradient, |
| parameters, |
| method="L-BFGS-B", |
| jac=True, |
| args=(X, y, self.epsilon, self.alpha, sample_weight), |
| options={"maxiter": self.max_iter, "gtol": self.tol, "iprint": -1}, |
| bounds=bounds, |
| ) |
|
|
| parameters = opt_res.x |
|
|
| if opt_res.status == 2: |
| raise ValueError( |
| "HuberRegressor convergence failed: l-BFGS-b solver terminated with %s" |
| % opt_res.message |
| ) |
| self.n_iter_ = _check_optimize_result("lbfgs", opt_res, self.max_iter) |
| self.scale_ = parameters[-1] |
| if self.fit_intercept: |
| self.intercept_ = parameters[-2] |
| else: |
| self.intercept_ = 0.0 |
| self.coef_ = parameters[: X.shape[1]] |
|
|
| residual = np.abs(y - safe_sparse_dot(X, self.coef_) - self.intercept_) |
| self.outliers_ = residual > self.scale_ * self.epsilon |
| return self |
|
|
| def __sklearn_tags__(self): |
| tags = super().__sklearn_tags__() |
| tags.input_tags.sparse = True |
| return tags |
|
|