| import numpy as np |
| import pytest |
|
|
| from sklearn.impute._base import _BaseImputer |
| from sklearn.impute._iterative import _assign_where |
| from sklearn.utils._mask import _get_mask |
| from sklearn.utils._testing import _convert_container, assert_allclose |
|
|
|
|
| @pytest.fixture |
| def data(): |
| X = np.random.randn(10, 2) |
| X[::2] = np.nan |
| return X |
|
|
|
|
| class NoFitIndicatorImputer(_BaseImputer): |
| def fit(self, X, y=None): |
| return self |
|
|
| def transform(self, X, y=None): |
| return self._concatenate_indicator(X, self._transform_indicator(X)) |
|
|
|
|
| class NoTransformIndicatorImputer(_BaseImputer): |
| def fit(self, X, y=None): |
| mask = _get_mask(X, value_to_mask=np.nan) |
| super()._fit_indicator(mask) |
| return self |
|
|
| def transform(self, X, y=None): |
| return self._concatenate_indicator(X, None) |
|
|
|
|
| class NoPrecomputedMaskFit(_BaseImputer): |
| def fit(self, X, y=None): |
| self._fit_indicator(X) |
| return self |
|
|
| def transform(self, X): |
| return self._concatenate_indicator(X, self._transform_indicator(X)) |
|
|
|
|
| class NoPrecomputedMaskTransform(_BaseImputer): |
| def fit(self, X, y=None): |
| mask = _get_mask(X, value_to_mask=np.nan) |
| self._fit_indicator(mask) |
| return self |
|
|
| def transform(self, X): |
| return self._concatenate_indicator(X, self._transform_indicator(X)) |
|
|
|
|
| def test_base_imputer_not_fit(data): |
| imputer = NoFitIndicatorImputer(add_indicator=True) |
| err_msg = "Make sure to call _fit_indicator before _transform_indicator" |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit(data).transform(data) |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit_transform(data) |
|
|
|
|
| def test_base_imputer_not_transform(data): |
| imputer = NoTransformIndicatorImputer(add_indicator=True) |
| err_msg = ( |
| "Call _fit_indicator and _transform_indicator in the imputer implementation" |
| ) |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit(data).transform(data) |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit_transform(data) |
|
|
|
|
| def test_base_no_precomputed_mask_fit(data): |
| imputer = NoPrecomputedMaskFit(add_indicator=True) |
| err_msg = "precomputed is True but the input data is not a mask" |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit(data) |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit_transform(data) |
|
|
|
|
| def test_base_no_precomputed_mask_transform(data): |
| imputer = NoPrecomputedMaskTransform(add_indicator=True) |
| err_msg = "precomputed is True but the input data is not a mask" |
| imputer.fit(data) |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.transform(data) |
| with pytest.raises(ValueError, match=err_msg): |
| imputer.fit_transform(data) |
|
|
|
|
| @pytest.mark.parametrize("X1_type", ["array", "dataframe"]) |
| def test_assign_where(X1_type): |
| """Check the behaviour of the private helpers `_assign_where`.""" |
| rng = np.random.RandomState(0) |
|
|
| n_samples, n_features = 10, 5 |
| X1 = _convert_container(rng.randn(n_samples, n_features), constructor_name=X1_type) |
| X2 = rng.randn(n_samples, n_features) |
| mask = rng.randint(0, 2, size=(n_samples, n_features)).astype(bool) |
|
|
| _assign_where(X1, X2, mask) |
|
|
| if X1_type == "dataframe": |
| X1 = X1.to_numpy() |
| assert_allclose(X1[mask], X2[mask]) |
|
|