| """Dictionary learning.""" |
|
|
| |
| |
|
|
| import itertools |
| import sys |
| import time |
| from numbers import Integral, Real |
|
|
| import numpy as np |
| from joblib import effective_n_jobs |
| from scipy import linalg |
|
|
| from ..base import ( |
| BaseEstimator, |
| ClassNamePrefixFeaturesOutMixin, |
| TransformerMixin, |
| _fit_context, |
| ) |
| from ..linear_model import Lars, Lasso, LassoLars, orthogonal_mp_gram |
| from ..utils import check_array, check_random_state, gen_batches, gen_even_slices |
| from ..utils._param_validation import Interval, StrOptions, validate_params |
| from ..utils.extmath import randomized_svd, row_norms, svd_flip |
| from ..utils.parallel import Parallel, delayed |
| from ..utils.validation import check_is_fitted, validate_data |
|
|
|
|
| def _check_positive_coding(method, positive): |
| if positive and method in ["omp", "lars"]: |
| raise ValueError( |
| "Positive constraint not supported for '{}' coding method.".format(method) |
| ) |
|
|
|
|
| def _sparse_encode_precomputed( |
| X, |
| dictionary, |
| *, |
| gram=None, |
| cov=None, |
| algorithm="lasso_lars", |
| regularization=None, |
| copy_cov=True, |
| init=None, |
| max_iter=1000, |
| verbose=0, |
| positive=False, |
| ): |
| """Generic sparse coding with precomputed Gram and/or covariance matrices. |
| |
| Each row of the result is the solution to a Lasso problem. |
| |
| Parameters |
| ---------- |
| X : ndarray of shape (n_samples, n_features) |
| Data matrix. |
| |
| dictionary : ndarray of shape (n_components, n_features) |
| The dictionary matrix against which to solve the sparse coding of |
| the data. Some of the algorithms assume normalized rows. |
| |
| gram : ndarray of shape (n_components, n_components), default=None |
| Precomputed Gram matrix, `dictionary * dictionary'` |
| gram can be `None` if method is 'threshold'. |
| |
| cov : ndarray of shape (n_components, n_samples), default=None |
| Precomputed covariance, `dictionary * X'`. |
| |
| algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', 'threshold'}, \ |
| default='lasso_lars' |
| The algorithm used: |
| |
| * `'lars'`: uses the least angle regression method |
| (`linear_model.lars_path`); |
| * `'lasso_lars'`: uses Lars to compute the Lasso solution; |
| * `'lasso_cd'`: uses the coordinate descent method to compute the |
| Lasso solution (`linear_model.Lasso`). lasso_lars will be faster if |
| the estimated components are sparse; |
| * `'omp'`: uses orthogonal matching pursuit to estimate the sparse |
| solution; |
| * `'threshold'`: squashes to zero all coefficients less than |
| regularization from the projection `dictionary * data'`. |
| |
| regularization : int or float, default=None |
| The regularization parameter. It corresponds to alpha when |
| algorithm is `'lasso_lars'`, `'lasso_cd'` or `'threshold'`. |
| Otherwise it corresponds to `n_nonzero_coefs`. |
| |
| init : ndarray of shape (n_samples, n_components), default=None |
| Initialization value of the sparse code. Only used if |
| `algorithm='lasso_cd'`. |
| |
| max_iter : int, default=1000 |
| Maximum number of iterations to perform if `algorithm='lasso_cd'` or |
| `'lasso_lars'`. |
| |
| copy_cov : bool, default=True |
| Whether to copy the precomputed covariance matrix; if `False`, it may |
| be overwritten. |
| |
| verbose : int, default=0 |
| Controls the verbosity; the higher, the more messages. |
| |
| positive: bool, default=False |
| Whether to enforce a positivity constraint on the sparse code. |
| |
| .. versionadded:: 0.20 |
| |
| Returns |
| ------- |
| code : ndarray of shape (n_components, n_features) |
| The sparse codes. |
| """ |
| n_samples, n_features = X.shape |
| n_components = dictionary.shape[0] |
|
|
| if algorithm == "lasso_lars": |
| alpha = float(regularization) / n_features |
| try: |
| err_mgt = np.seterr(all="ignore") |
|
|
| |
| |
| lasso_lars = LassoLars( |
| alpha=alpha, |
| fit_intercept=False, |
| verbose=verbose, |
| precompute=gram, |
| fit_path=False, |
| positive=positive, |
| max_iter=max_iter, |
| ) |
| lasso_lars.fit(dictionary.T, X.T, Xy=cov) |
| new_code = lasso_lars.coef_ |
| finally: |
| np.seterr(**err_mgt) |
|
|
| elif algorithm == "lasso_cd": |
| alpha = float(regularization) / n_features |
|
|
| |
| |
| |
| clf = Lasso( |
| alpha=alpha, |
| fit_intercept=False, |
| precompute=gram, |
| max_iter=max_iter, |
| warm_start=True, |
| positive=positive, |
| ) |
|
|
| if init is not None: |
| |
| |
| |
| |
| |
| if not init.flags["WRITEABLE"]: |
| init = np.array(init) |
| clf.coef_ = init |
|
|
| clf.fit(dictionary.T, X.T, check_input=False) |
| new_code = clf.coef_ |
|
|
| elif algorithm == "lars": |
| try: |
| err_mgt = np.seterr(all="ignore") |
|
|
| |
| |
| lars = Lars( |
| fit_intercept=False, |
| verbose=verbose, |
| precompute=gram, |
| n_nonzero_coefs=int(regularization), |
| fit_path=False, |
| ) |
| lars.fit(dictionary.T, X.T, Xy=cov) |
| new_code = lars.coef_ |
| finally: |
| np.seterr(**err_mgt) |
|
|
| elif algorithm == "threshold": |
| new_code = (np.sign(cov) * np.maximum(np.abs(cov) - regularization, 0)).T |
| if positive: |
| np.clip(new_code, 0, None, out=new_code) |
|
|
| elif algorithm == "omp": |
| new_code = orthogonal_mp_gram( |
| Gram=gram, |
| Xy=cov, |
| n_nonzero_coefs=int(regularization), |
| tol=None, |
| norms_squared=row_norms(X, squared=True), |
| copy_Xy=copy_cov, |
| ).T |
|
|
| return new_code.reshape(n_samples, n_components) |
|
|
|
|
| @validate_params( |
| { |
| "X": ["array-like"], |
| "dictionary": ["array-like"], |
| "gram": ["array-like", None], |
| "cov": ["array-like", None], |
| "algorithm": [ |
| StrOptions({"lasso_lars", "lasso_cd", "lars", "omp", "threshold"}) |
| ], |
| "n_nonzero_coefs": [Interval(Integral, 1, None, closed="left"), None], |
| "alpha": [Interval(Real, 0, None, closed="left"), None], |
| "copy_cov": ["boolean"], |
| "init": ["array-like", None], |
| "max_iter": [Interval(Integral, 0, None, closed="left")], |
| "n_jobs": [Integral, None], |
| "check_input": ["boolean"], |
| "verbose": ["verbose"], |
| "positive": ["boolean"], |
| }, |
| prefer_skip_nested_validation=True, |
| ) |
| |
| def sparse_encode( |
| X, |
| dictionary, |
| *, |
| gram=None, |
| cov=None, |
| algorithm="lasso_lars", |
| n_nonzero_coefs=None, |
| alpha=None, |
| copy_cov=True, |
| init=None, |
| max_iter=1000, |
| n_jobs=None, |
| check_input=True, |
| verbose=0, |
| positive=False, |
| ): |
| """Sparse coding. |
| |
| Each row of the result is the solution to a sparse coding problem. |
| The goal is to find a sparse array `code` such that:: |
| |
| X ~= code * dictionary |
| |
| Read more in the :ref:`User Guide <SparseCoder>`. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Data matrix. |
| |
| dictionary : array-like of shape (n_components, n_features) |
| The dictionary matrix against which to solve the sparse coding of |
| the data. Some of the algorithms assume normalized rows for meaningful |
| output. |
| |
| gram : array-like of shape (n_components, n_components), default=None |
| Precomputed Gram matrix, `dictionary * dictionary'`. |
| |
| cov : array-like of shape (n_components, n_samples), default=None |
| Precomputed covariance, `dictionary' * X`. |
| |
| algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', 'threshold'}, \ |
| default='lasso_lars' |
| The algorithm used: |
| |
| * `'lars'`: uses the least angle regression method |
| (`linear_model.lars_path`); |
| * `'lasso_lars'`: uses Lars to compute the Lasso solution; |
| * `'lasso_cd'`: uses the coordinate descent method to compute the |
| Lasso solution (`linear_model.Lasso`). lasso_lars will be faster if |
| the estimated components are sparse; |
| * `'omp'`: uses orthogonal matching pursuit to estimate the sparse |
| solution; |
| * `'threshold'`: squashes to zero all coefficients less than |
| regularization from the projection `dictionary * data'`. |
| |
| n_nonzero_coefs : int, default=None |
| Number of nonzero coefficients to target in each column of the |
| solution. This is only used by `algorithm='lars'` and `algorithm='omp'` |
| and is overridden by `alpha` in the `omp` case. If `None`, then |
| `n_nonzero_coefs=int(n_features / 10)`. |
| |
| alpha : float, default=None |
| If `algorithm='lasso_lars'` or `algorithm='lasso_cd'`, `alpha` is the |
| penalty applied to the L1 norm. |
| If `algorithm='threshold'`, `alpha` is the absolute value of the |
| threshold below which coefficients will be squashed to zero. |
| If `algorithm='omp'`, `alpha` is the tolerance parameter: the value of |
| the reconstruction error targeted. In this case, it overrides |
| `n_nonzero_coefs`. |
| If `None`, default to 1. |
| |
| copy_cov : bool, default=True |
| Whether to copy the precomputed covariance matrix; if `False`, it may |
| be overwritten. |
| |
| init : ndarray of shape (n_samples, n_components), default=None |
| Initialization value of the sparse codes. Only used if |
| `algorithm='lasso_cd'`. |
| |
| max_iter : int, default=1000 |
| Maximum number of iterations to perform if `algorithm='lasso_cd'` or |
| `'lasso_lars'`. |
| |
| n_jobs : int, default=None |
| Number of parallel jobs to run. |
| ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. |
| ``-1`` means using all processors. See :term:`Glossary <n_jobs>` |
| for more details. |
| |
| check_input : bool, default=True |
| If `False`, the input arrays X and dictionary will not be checked. |
| |
| verbose : int, default=0 |
| Controls the verbosity; the higher, the more messages. |
| |
| positive : bool, default=False |
| Whether to enforce positivity when finding the encoding. |
| |
| .. versionadded:: 0.20 |
| |
| Returns |
| ------- |
| code : ndarray of shape (n_samples, n_components) |
| The sparse codes. |
| |
| See Also |
| -------- |
| sklearn.linear_model.lars_path : Compute Least Angle Regression or Lasso |
| path using LARS algorithm. |
| sklearn.linear_model.orthogonal_mp : Solves Orthogonal Matching Pursuit problems. |
| sklearn.linear_model.Lasso : Train Linear Model with L1 prior as regularizer. |
| SparseCoder : Find a sparse representation of data from a fixed precomputed |
| dictionary. |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.decomposition import sparse_encode |
| >>> X = np.array([[-1, -1, -1], [0, 0, 3]]) |
| >>> dictionary = np.array( |
| ... [[0, 1, 0], |
| ... [-1, -1, 2], |
| ... [1, 1, 1], |
| ... [0, 1, 1], |
| ... [0, 2, 1]], |
| ... dtype=np.float64 |
| ... ) |
| >>> sparse_encode(X, dictionary, alpha=1e-10) |
| array([[ 0., 0., -1., 0., 0.], |
| [ 0., 1., 1., 0., 0.]]) |
| """ |
| if check_input: |
| if algorithm == "lasso_cd": |
| dictionary = check_array( |
| dictionary, order="C", dtype=[np.float64, np.float32] |
| ) |
| X = check_array(X, order="C", dtype=[np.float64, np.float32]) |
| else: |
| dictionary = check_array(dictionary) |
| X = check_array(X) |
|
|
| if dictionary.shape[1] != X.shape[1]: |
| raise ValueError( |
| "Dictionary and X have different numbers of features:" |
| "dictionary.shape: {} X.shape{}".format(dictionary.shape, X.shape) |
| ) |
|
|
| _check_positive_coding(algorithm, positive) |
|
|
| return _sparse_encode( |
| X, |
| dictionary, |
| gram=gram, |
| cov=cov, |
| algorithm=algorithm, |
| n_nonzero_coefs=n_nonzero_coefs, |
| alpha=alpha, |
| copy_cov=copy_cov, |
| init=init, |
| max_iter=max_iter, |
| n_jobs=n_jobs, |
| verbose=verbose, |
| positive=positive, |
| ) |
|
|
|
|
| def _sparse_encode( |
| X, |
| dictionary, |
| *, |
| gram=None, |
| cov=None, |
| algorithm="lasso_lars", |
| n_nonzero_coefs=None, |
| alpha=None, |
| copy_cov=True, |
| init=None, |
| max_iter=1000, |
| n_jobs=None, |
| verbose=0, |
| positive=False, |
| ): |
| """Sparse coding without input/parameter validation.""" |
|
|
| n_samples, n_features = X.shape |
| n_components = dictionary.shape[0] |
|
|
| if algorithm in ("lars", "omp"): |
| regularization = n_nonzero_coefs |
| if regularization is None: |
| regularization = min(max(n_features / 10, 1), n_components) |
| else: |
| regularization = alpha |
| if regularization is None: |
| regularization = 1.0 |
|
|
| if gram is None and algorithm != "threshold": |
| gram = np.dot(dictionary, dictionary.T) |
|
|
| if cov is None and algorithm != "lasso_cd": |
| copy_cov = False |
| cov = np.dot(dictionary, X.T) |
|
|
| if effective_n_jobs(n_jobs) == 1 or algorithm == "threshold": |
| code = _sparse_encode_precomputed( |
| X, |
| dictionary, |
| gram=gram, |
| cov=cov, |
| algorithm=algorithm, |
| regularization=regularization, |
| copy_cov=copy_cov, |
| init=init, |
| max_iter=max_iter, |
| verbose=verbose, |
| positive=positive, |
| ) |
| return code |
|
|
| |
| n_samples = X.shape[0] |
| n_components = dictionary.shape[0] |
| code = np.empty((n_samples, n_components)) |
| slices = list(gen_even_slices(n_samples, effective_n_jobs(n_jobs))) |
|
|
| code_views = Parallel(n_jobs=n_jobs, verbose=verbose)( |
| delayed(_sparse_encode_precomputed)( |
| X[this_slice], |
| dictionary, |
| gram=gram, |
| cov=cov[:, this_slice] if cov is not None else None, |
| algorithm=algorithm, |
| regularization=regularization, |
| copy_cov=copy_cov, |
| init=init[this_slice] if init is not None else None, |
| max_iter=max_iter, |
| verbose=verbose, |
| positive=positive, |
| ) |
| for this_slice in slices |
| ) |
| for this_slice, this_view in zip(slices, code_views): |
| code[this_slice] = this_view |
| return code |
|
|
|
|
| def _update_dict( |
| dictionary, |
| Y, |
| code, |
| A=None, |
| B=None, |
| verbose=False, |
| random_state=None, |
| positive=False, |
| ): |
| """Update the dense dictionary factor in place. |
| |
| Parameters |
| ---------- |
| dictionary : ndarray of shape (n_components, n_features) |
| Value of the dictionary at the previous iteration. |
| |
| Y : ndarray of shape (n_samples, n_features) |
| Data matrix. |
| |
| code : ndarray of shape (n_samples, n_components) |
| Sparse coding of the data against which to optimize the dictionary. |
| |
| A : ndarray of shape (n_components, n_components), default=None |
| Together with `B`, sufficient stats of the online model to update the |
| dictionary. |
| |
| B : ndarray of shape (n_features, n_components), default=None |
| Together with `A`, sufficient stats of the online model to update the |
| dictionary. |
| |
| verbose: bool, default=False |
| Degree of output the procedure will print. |
| |
| random_state : int, RandomState instance or None, default=None |
| Used for randomly initializing the dictionary. Pass an int for |
| reproducible results across multiple function calls. |
| See :term:`Glossary <random_state>`. |
| |
| positive : bool, default=False |
| Whether to enforce positivity when finding the dictionary. |
| |
| .. versionadded:: 0.20 |
| """ |
| n_samples, n_components = code.shape |
| random_state = check_random_state(random_state) |
|
|
| if A is None: |
| A = code.T @ code |
| if B is None: |
| B = Y.T @ code |
|
|
| n_unused = 0 |
|
|
| for k in range(n_components): |
| if A[k, k] > 1e-6: |
| |
| dictionary[k] += (B[:, k] - A[k] @ dictionary) / A[k, k] |
| else: |
| |
| newd = Y[random_state.choice(n_samples)] |
|
|
| |
| noise_level = 0.01 * (newd.std() or 1) |
| noise = random_state.normal(0, noise_level, size=len(newd)) |
|
|
| dictionary[k] = newd + noise |
| code[:, k] = 0 |
| n_unused += 1 |
|
|
| if positive: |
| np.clip(dictionary[k], 0, None, out=dictionary[k]) |
|
|
| |
| dictionary[k] /= max(linalg.norm(dictionary[k]), 1) |
|
|
| if verbose and n_unused > 0: |
| print(f"{n_unused} unused atoms resampled.") |
|
|
|
|
| def _dict_learning( |
| X, |
| n_components, |
| *, |
| alpha, |
| max_iter, |
| tol, |
| method, |
| n_jobs, |
| dict_init, |
| code_init, |
| callback, |
| verbose, |
| random_state, |
| return_n_iter, |
| positive_dict, |
| positive_code, |
| method_max_iter, |
| ): |
| """Main dictionary learning algorithm""" |
| t0 = time.time() |
| |
| if code_init is not None and dict_init is not None: |
| code = np.array(code_init, order="F") |
| |
| dictionary = dict_init |
| else: |
| code, S, dictionary = linalg.svd(X, full_matrices=False) |
| |
| code, dictionary = svd_flip(code, dictionary) |
| dictionary = S[:, np.newaxis] * dictionary |
| r = len(dictionary) |
| if n_components <= r: |
| code = code[:, :n_components] |
| dictionary = dictionary[:n_components, :] |
| else: |
| code = np.c_[code, np.zeros((len(code), n_components - r))] |
| dictionary = np.r_[ |
| dictionary, np.zeros((n_components - r, dictionary.shape[1])) |
| ] |
|
|
| |
| |
| dictionary = np.asfortranarray(dictionary) |
|
|
| errors = [] |
| current_cost = np.nan |
|
|
| if verbose == 1: |
| print("[dict_learning]", end=" ") |
|
|
| |
| ii = -1 |
|
|
| for ii in range(max_iter): |
| dt = time.time() - t0 |
| if verbose == 1: |
| sys.stdout.write(".") |
| sys.stdout.flush() |
| elif verbose: |
| print( |
| "Iteration % 3i (elapsed time: % 3is, % 4.1fmn, current cost % 7.3f)" |
| % (ii, dt, dt / 60, current_cost) |
| ) |
|
|
| |
| code = sparse_encode( |
| X, |
| dictionary, |
| algorithm=method, |
| alpha=alpha, |
| init=code, |
| n_jobs=n_jobs, |
| positive=positive_code, |
| max_iter=method_max_iter, |
| verbose=verbose, |
| ) |
|
|
| |
| _update_dict( |
| dictionary, |
| X, |
| code, |
| verbose=verbose, |
| random_state=random_state, |
| positive=positive_dict, |
| ) |
|
|
| |
| current_cost = 0.5 * np.sum((X - code @ dictionary) ** 2) + alpha * np.sum( |
| np.abs(code) |
| ) |
| errors.append(current_cost) |
|
|
| if ii > 0: |
| dE = errors[-2] - errors[-1] |
| |
| if dE < tol * errors[-1]: |
| if verbose == 1: |
| |
| print("") |
| elif verbose: |
| print("--- Convergence reached after %d iterations" % ii) |
| break |
| if ii % 5 == 0 and callback is not None: |
| callback(locals()) |
|
|
| if return_n_iter: |
| return code, dictionary, errors, ii + 1 |
| else: |
| return code, dictionary, errors |
|
|
|
|
| @validate_params( |
| { |
| "X": ["array-like"], |
| "return_code": ["boolean"], |
| "method": [StrOptions({"cd", "lars"})], |
| "method_max_iter": [Interval(Integral, 0, None, closed="left")], |
| }, |
| prefer_skip_nested_validation=False, |
| ) |
| def dict_learning_online( |
| X, |
| n_components=2, |
| *, |
| alpha=1, |
| max_iter=100, |
| return_code=True, |
| dict_init=None, |
| callback=None, |
| batch_size=256, |
| verbose=False, |
| shuffle=True, |
| n_jobs=None, |
| method="lars", |
| random_state=None, |
| positive_dict=False, |
| positive_code=False, |
| method_max_iter=1000, |
| tol=1e-3, |
| max_no_improvement=10, |
| ): |
| """Solve a dictionary learning matrix factorization problem online. |
| |
| Finds the best dictionary and the corresponding sparse code for |
| approximating the data matrix X by solving:: |
| |
| (U^*, V^*) = argmin 0.5 || X - U V ||_Fro^2 + alpha * || U ||_1,1 |
| (U,V) |
| with || V_k ||_2 = 1 for all 0 <= k < n_components |
| |
| where V is the dictionary and U is the sparse code. ||.||_Fro stands for |
| the Frobenius norm and ||.||_1,1 stands for the entry-wise matrix norm |
| which is the sum of the absolute values of all the entries in the matrix. |
| This is accomplished by repeatedly iterating over mini-batches by slicing |
| the input data. |
| |
| Read more in the :ref:`User Guide <DictionaryLearning>`. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Data matrix. |
| |
| n_components : int or None, default=2 |
| Number of dictionary atoms to extract. If None, then ``n_components`` |
| is set to ``n_features``. |
| |
| alpha : float, default=1 |
| Sparsity controlling parameter. |
| |
| max_iter : int, default=100 |
| Maximum number of iterations over the complete dataset before |
| stopping independently of any early stopping criterion heuristics. |
| |
| .. versionadded:: 1.1 |
| |
| return_code : bool, default=True |
| Whether to also return the code U or just the dictionary `V`. |
| |
| dict_init : ndarray of shape (n_components, n_features), default=None |
| Initial values for the dictionary for warm restart scenarios. |
| If `None`, the initial values for the dictionary are created |
| with an SVD decomposition of the data via |
| :func:`~sklearn.utils.extmath.randomized_svd`. |
| |
| callback : callable, default=None |
| A callable that gets invoked at the end of each iteration. |
| |
| batch_size : int, default=256 |
| The number of samples to take in each batch. |
| |
| .. versionchanged:: 1.3 |
| The default value of `batch_size` changed from 3 to 256 in version 1.3. |
| |
| verbose : bool, default=False |
| To control the verbosity of the procedure. |
| |
| shuffle : bool, default=True |
| Whether to shuffle the data before splitting it in batches. |
| |
| n_jobs : int, default=None |
| Number of parallel jobs to run. |
| ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. |
| ``-1`` means using all processors. See :term:`Glossary <n_jobs>` |
| for more details. |
| |
| method : {'lars', 'cd'}, default='lars' |
| * `'lars'`: uses the least angle regression method to solve the lasso |
| problem (`linear_model.lars_path`); |
| * `'cd'`: uses the coordinate descent method to compute the |
| Lasso solution (`linear_model.Lasso`). Lars will be faster if |
| the estimated components are sparse. |
| |
| random_state : int, RandomState instance or None, default=None |
| Used for initializing the dictionary when ``dict_init`` is not |
| specified, randomly shuffling the data when ``shuffle`` is set to |
| ``True``, and updating the dictionary. Pass an int for reproducible |
| results across multiple function calls. |
| See :term:`Glossary <random_state>`. |
| |
| positive_dict : bool, default=False |
| Whether to enforce positivity when finding the dictionary. |
| |
| .. versionadded:: 0.20 |
| |
| positive_code : bool, default=False |
| Whether to enforce positivity when finding the code. |
| |
| .. versionadded:: 0.20 |
| |
| method_max_iter : int, default=1000 |
| Maximum number of iterations to perform when solving the lasso problem. |
| |
| .. versionadded:: 0.22 |
| |
| tol : float, default=1e-3 |
| Control early stopping based on the norm of the differences in the |
| dictionary between 2 steps. |
| |
| To disable early stopping based on changes in the dictionary, set |
| `tol` to 0.0. |
| |
| .. versionadded:: 1.1 |
| |
| max_no_improvement : int, default=10 |
| Control early stopping based on the consecutive number of mini batches |
| that does not yield an improvement on the smoothed cost function. |
| |
| To disable convergence detection based on cost function, set |
| `max_no_improvement` to None. |
| |
| .. versionadded:: 1.1 |
| |
| Returns |
| ------- |
| code : ndarray of shape (n_samples, n_components), |
| The sparse code (only returned if `return_code=True`). |
| |
| dictionary : ndarray of shape (n_components, n_features), |
| The solutions to the dictionary learning problem. |
| |
| n_iter : int |
| Number of iterations run. Returned only if `return_n_iter` is |
| set to `True`. |
| |
| See Also |
| -------- |
| dict_learning : Solve a dictionary learning matrix factorization problem. |
| DictionaryLearning : Find a dictionary that sparsely encodes data. |
| MiniBatchDictionaryLearning : A faster, less accurate, version of the dictionary |
| learning algorithm. |
| SparsePCA : Sparse Principal Components Analysis. |
| MiniBatchSparsePCA : Mini-batch Sparse Principal Components Analysis. |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.datasets import make_sparse_coded_signal |
| >>> from sklearn.decomposition import dict_learning_online |
| >>> X, _, _ = make_sparse_coded_signal( |
| ... n_samples=30, n_components=15, n_features=20, n_nonzero_coefs=10, |
| ... random_state=42, |
| ... ) |
| >>> U, V = dict_learning_online( |
| ... X, n_components=15, alpha=0.2, max_iter=20, batch_size=3, random_state=42 |
| ... ) |
| |
| We can check the level of sparsity of `U`: |
| |
| >>> np.mean(U == 0) |
| np.float64(0.53...) |
| |
| We can compare the average squared euclidean norm of the reconstruction |
| error of the sparse coded signal relative to the squared euclidean norm of |
| the original signal: |
| |
| >>> X_hat = U @ V |
| >>> np.mean(np.sum((X_hat - X) ** 2, axis=1) / np.sum(X ** 2, axis=1)) |
| np.float64(0.05...) |
| """ |
| transform_algorithm = "lasso_" + method |
|
|
| est = MiniBatchDictionaryLearning( |
| n_components=n_components, |
| alpha=alpha, |
| max_iter=max_iter, |
| n_jobs=n_jobs, |
| fit_algorithm=method, |
| batch_size=batch_size, |
| shuffle=shuffle, |
| dict_init=dict_init, |
| random_state=random_state, |
| transform_algorithm=transform_algorithm, |
| transform_alpha=alpha, |
| positive_code=positive_code, |
| positive_dict=positive_dict, |
| transform_max_iter=method_max_iter, |
| verbose=verbose, |
| callback=callback, |
| tol=tol, |
| max_no_improvement=max_no_improvement, |
| ).fit(X) |
|
|
| if not return_code: |
| return est.components_ |
| else: |
| code = est.transform(X) |
| return code, est.components_ |
|
|
|
|
| @validate_params( |
| { |
| "X": ["array-like"], |
| "method": [StrOptions({"lars", "cd"})], |
| "return_n_iter": ["boolean"], |
| "method_max_iter": [Interval(Integral, 0, None, closed="left")], |
| }, |
| prefer_skip_nested_validation=False, |
| ) |
| def dict_learning( |
| X, |
| n_components, |
| *, |
| alpha, |
| max_iter=100, |
| tol=1e-8, |
| method="lars", |
| n_jobs=None, |
| dict_init=None, |
| code_init=None, |
| callback=None, |
| verbose=False, |
| random_state=None, |
| return_n_iter=False, |
| positive_dict=False, |
| positive_code=False, |
| method_max_iter=1000, |
| ): |
| """Solve a dictionary learning matrix factorization problem. |
| |
| Finds the best dictionary and the corresponding sparse code for |
| approximating the data matrix X by solving:: |
| |
| (U^*, V^*) = argmin 0.5 || X - U V ||_Fro^2 + alpha * || U ||_1,1 |
| (U,V) |
| with || V_k ||_2 = 1 for all 0 <= k < n_components |
| |
| where V is the dictionary and U is the sparse code. ||.||_Fro stands for |
| the Frobenius norm and ||.||_1,1 stands for the entry-wise matrix norm |
| which is the sum of the absolute values of all the entries in the matrix. |
| |
| Read more in the :ref:`User Guide <DictionaryLearning>`. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Data matrix. |
| |
| n_components : int |
| Number of dictionary atoms to extract. |
| |
| alpha : int or float |
| Sparsity controlling parameter. |
| |
| max_iter : int, default=100 |
| Maximum number of iterations to perform. |
| |
| tol : float, default=1e-8 |
| Tolerance for the stopping condition. |
| |
| method : {'lars', 'cd'}, default='lars' |
| The method used: |
| |
| * `'lars'`: uses the least angle regression method to solve the lasso |
| problem (`linear_model.lars_path`); |
| * `'cd'`: uses the coordinate descent method to compute the |
| Lasso solution (`linear_model.Lasso`). Lars will be faster if |
| the estimated components are sparse. |
| |
| n_jobs : int, default=None |
| Number of parallel jobs to run. |
| ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. |
| ``-1`` means using all processors. See :term:`Glossary <n_jobs>` |
| for more details. |
| |
| dict_init : ndarray of shape (n_components, n_features), default=None |
| Initial value for the dictionary for warm restart scenarios. Only used |
| if `code_init` and `dict_init` are not None. |
| |
| code_init : ndarray of shape (n_samples, n_components), default=None |
| Initial value for the sparse code for warm restart scenarios. Only used |
| if `code_init` and `dict_init` are not None. |
| |
| callback : callable, default=None |
| Callable that gets invoked every five iterations. |
| |
| verbose : bool, default=False |
| To control the verbosity of the procedure. |
| |
| random_state : int, RandomState instance or None, default=None |
| Used for randomly initializing the dictionary. Pass an int for |
| reproducible results across multiple function calls. |
| See :term:`Glossary <random_state>`. |
| |
| return_n_iter : bool, default=False |
| Whether or not to return the number of iterations. |
| |
| positive_dict : bool, default=False |
| Whether to enforce positivity when finding the dictionary. |
| |
| .. versionadded:: 0.20 |
| |
| positive_code : bool, default=False |
| Whether to enforce positivity when finding the code. |
| |
| .. versionadded:: 0.20 |
| |
| method_max_iter : int, default=1000 |
| Maximum number of iterations to perform. |
| |
| .. versionadded:: 0.22 |
| |
| Returns |
| ------- |
| code : ndarray of shape (n_samples, n_components) |
| The sparse code factor in the matrix factorization. |
| |
| dictionary : ndarray of shape (n_components, n_features), |
| The dictionary factor in the matrix factorization. |
| |
| errors : array |
| Vector of errors at each iteration. |
| |
| n_iter : int |
| Number of iterations run. Returned only if `return_n_iter` is |
| set to True. |
| |
| See Also |
| -------- |
| dict_learning_online : Solve a dictionary learning matrix factorization |
| problem online. |
| DictionaryLearning : Find a dictionary that sparsely encodes data. |
| MiniBatchDictionaryLearning : A faster, less accurate version |
| of the dictionary learning algorithm. |
| SparsePCA : Sparse Principal Components Analysis. |
| MiniBatchSparsePCA : Mini-batch Sparse Principal Components Analysis. |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.datasets import make_sparse_coded_signal |
| >>> from sklearn.decomposition import dict_learning |
| >>> X, _, _ = make_sparse_coded_signal( |
| ... n_samples=30, n_components=15, n_features=20, n_nonzero_coefs=10, |
| ... random_state=42, |
| ... ) |
| >>> U, V, errors = dict_learning(X, n_components=15, alpha=0.1, random_state=42) |
| |
| We can check the level of sparsity of `U`: |
| |
| >>> np.mean(U == 0) |
| np.float64(0.6...) |
| |
| We can compare the average squared euclidean norm of the reconstruction |
| error of the sparse coded signal relative to the squared euclidean norm of |
| the original signal: |
| |
| >>> X_hat = U @ V |
| >>> np.mean(np.sum((X_hat - X) ** 2, axis=1) / np.sum(X ** 2, axis=1)) |
| np.float64(0.01...) |
| """ |
| estimator = DictionaryLearning( |
| n_components=n_components, |
| alpha=alpha, |
| max_iter=max_iter, |
| tol=tol, |
| fit_algorithm=method, |
| n_jobs=n_jobs, |
| dict_init=dict_init, |
| callback=callback, |
| code_init=code_init, |
| verbose=verbose, |
| random_state=random_state, |
| positive_code=positive_code, |
| positive_dict=positive_dict, |
| transform_max_iter=method_max_iter, |
| ).set_output(transform="default") |
| code = estimator.fit_transform(X) |
| if return_n_iter: |
| return ( |
| code, |
| estimator.components_, |
| estimator.error_, |
| estimator.n_iter_, |
| ) |
| return code, estimator.components_, estimator.error_ |
|
|
|
|
| class _BaseSparseCoding(ClassNamePrefixFeaturesOutMixin, TransformerMixin): |
| """Base class from SparseCoder and DictionaryLearning algorithms.""" |
|
|
| def __init__( |
| self, |
| transform_algorithm, |
| transform_n_nonzero_coefs, |
| transform_alpha, |
| split_sign, |
| n_jobs, |
| positive_code, |
| transform_max_iter, |
| ): |
| self.transform_algorithm = transform_algorithm |
| self.transform_n_nonzero_coefs = transform_n_nonzero_coefs |
| self.transform_alpha = transform_alpha |
| self.transform_max_iter = transform_max_iter |
| self.split_sign = split_sign |
| self.n_jobs = n_jobs |
| self.positive_code = positive_code |
|
|
| def _transform(self, X, dictionary): |
| """Private method allowing to accommodate both DictionaryLearning and |
| SparseCoder.""" |
| X = validate_data(self, X, reset=False) |
|
|
| if hasattr(self, "alpha") and self.transform_alpha is None: |
| transform_alpha = self.alpha |
| else: |
| transform_alpha = self.transform_alpha |
|
|
| code = sparse_encode( |
| X, |
| dictionary, |
| algorithm=self.transform_algorithm, |
| n_nonzero_coefs=self.transform_n_nonzero_coefs, |
| alpha=transform_alpha, |
| max_iter=self.transform_max_iter, |
| n_jobs=self.n_jobs, |
| positive=self.positive_code, |
| ) |
|
|
| if self.split_sign: |
| |
| n_samples, n_features = code.shape |
| split_code = np.empty((n_samples, 2 * n_features)) |
| split_code[:, :n_features] = np.maximum(code, 0) |
| split_code[:, n_features:] = -np.minimum(code, 0) |
| code = split_code |
|
|
| return code |
|
|
| def transform(self, X): |
| """Encode the data as a sparse combination of the dictionary atoms. |
| |
| Coding method is determined by the object parameter |
| `transform_algorithm`. |
| |
| Parameters |
| ---------- |
| X : ndarray of shape (n_samples, n_features) |
| Test data to be transformed, must have the same number of |
| features as the data used to train the model. |
| |
| Returns |
| ------- |
| X_new : ndarray of shape (n_samples, n_components) |
| Transformed data. |
| """ |
| check_is_fitted(self) |
| return self._transform(X, self.components_) |
|
|
|
|
| class SparseCoder(_BaseSparseCoding, BaseEstimator): |
| """Sparse coding. |
| |
| Finds a sparse representation of data against a fixed, precomputed |
| dictionary. |
| |
| Each row of the result is the solution to a sparse coding problem. |
| The goal is to find a sparse array `code` such that:: |
| |
| X ~= code * dictionary |
| |
| Read more in the :ref:`User Guide <SparseCoder>`. |
| |
| Parameters |
| ---------- |
| dictionary : ndarray of shape (n_components, n_features) |
| The dictionary atoms used for sparse coding. Lines are assumed to be |
| normalized to unit norm. |
| |
| transform_algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', \ |
| 'threshold'}, default='omp' |
| Algorithm used to transform the data: |
| |
| - `'lars'`: uses the least angle regression method |
| (`linear_model.lars_path`); |
| - `'lasso_lars'`: uses Lars to compute the Lasso solution; |
| - `'lasso_cd'`: uses the coordinate descent method to compute the |
| Lasso solution (linear_model.Lasso). `'lasso_lars'` will be faster if |
| the estimated components are sparse; |
| - `'omp'`: uses orthogonal matching pursuit to estimate the sparse |
| solution; |
| - `'threshold'`: squashes to zero all coefficients less than alpha from |
| the projection ``dictionary * X'``. |
| |
| transform_n_nonzero_coefs : int, default=None |
| Number of nonzero coefficients to target in each column of the |
| solution. This is only used by `algorithm='lars'` and `algorithm='omp'` |
| and is overridden by `alpha` in the `omp` case. If `None`, then |
| `transform_n_nonzero_coefs=int(n_features / 10)`. |
| |
| transform_alpha : float, default=None |
| If `algorithm='lasso_lars'` or `algorithm='lasso_cd'`, `alpha` is the |
| penalty applied to the L1 norm. |
| If `algorithm='threshold'`, `alpha` is the absolute value of the |
| threshold below which coefficients will be squashed to zero. |
| If `algorithm='omp'`, `alpha` is the tolerance parameter: the value of |
| the reconstruction error targeted. In this case, it overrides |
| `n_nonzero_coefs`. |
| If `None`, default to 1. |
| |
| split_sign : bool, default=False |
| Whether to split the sparse feature vector into the concatenation of |
| its negative part and its positive part. This can improve the |
| performance of downstream classifiers. |
| |
| n_jobs : int, default=None |
| Number of parallel jobs to run. |
| ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. |
| ``-1`` means using all processors. See :term:`Glossary <n_jobs>` |
| for more details. |
| |
| positive_code : bool, default=False |
| Whether to enforce positivity when finding the code. |
| |
| .. versionadded:: 0.20 |
| |
| transform_max_iter : int, default=1000 |
| Maximum number of iterations to perform if `algorithm='lasso_cd'` or |
| `lasso_lars`. |
| |
| .. versionadded:: 0.22 |
| |
| Attributes |
| ---------- |
| n_components_ : int |
| Number of atoms. |
| |
| n_features_in_ : int |
| Number of features seen during :term:`fit`. |
| |
| .. versionadded:: 0.24 |
| |
| feature_names_in_ : ndarray of shape (`n_features_in_`,) |
| Names of features seen during :term:`fit`. Defined only when `X` |
| has feature names that are all strings. |
| |
| .. versionadded:: 1.0 |
| |
| See Also |
| -------- |
| DictionaryLearning : Find a dictionary that sparsely encodes data. |
| MiniBatchDictionaryLearning : A faster, less accurate, version of the |
| dictionary learning algorithm. |
| MiniBatchSparsePCA : Mini-batch Sparse Principal Components Analysis. |
| SparsePCA : Sparse Principal Components Analysis. |
| sparse_encode : Sparse coding where each row of the result is the solution |
| to a sparse coding problem. |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.decomposition import SparseCoder |
| >>> X = np.array([[-1, -1, -1], [0, 0, 3]]) |
| >>> dictionary = np.array( |
| ... [[0, 1, 0], |
| ... [-1, -1, 2], |
| ... [1, 1, 1], |
| ... [0, 1, 1], |
| ... [0, 2, 1]], |
| ... dtype=np.float64 |
| ... ) |
| >>> coder = SparseCoder( |
| ... dictionary=dictionary, transform_algorithm='lasso_lars', |
| ... transform_alpha=1e-10, |
| ... ) |
| >>> coder.transform(X) |
| array([[ 0., 0., -1., 0., 0.], |
| [ 0., 1., 1., 0., 0.]]) |
| """ |
|
|
| def __init__( |
| self, |
| dictionary, |
| *, |
| transform_algorithm="omp", |
| transform_n_nonzero_coefs=None, |
| transform_alpha=None, |
| split_sign=False, |
| n_jobs=None, |
| positive_code=False, |
| transform_max_iter=1000, |
| ): |
| super().__init__( |
| transform_algorithm, |
| transform_n_nonzero_coefs, |
| transform_alpha, |
| split_sign, |
| n_jobs, |
| positive_code, |
| transform_max_iter, |
| ) |
| self.dictionary = dictionary |
|
|
| def fit(self, X, y=None): |
| """Do nothing and return the estimator unchanged. |
| |
| This method is just there to implement the usual API and hence |
| work in pipelines. |
| |
| Parameters |
| ---------- |
| X : Ignored |
| Not used, present for API consistency by convention. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| self : object |
| Returns the instance itself. |
| """ |
| return self |
|
|
| def transform(self, X, y=None): |
| """Encode the data as a sparse combination of the dictionary atoms. |
| |
| Coding method is determined by the object parameter |
| `transform_algorithm`. |
| |
| Parameters |
| ---------- |
| X : ndarray of shape (n_samples, n_features) |
| Training vector, where `n_samples` is the number of samples |
| and `n_features` is the number of features. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| X_new : ndarray of shape (n_samples, n_components) |
| Transformed data. |
| """ |
| return super()._transform(X, self.dictionary) |
|
|
| def __sklearn_tags__(self): |
| tags = super().__sklearn_tags__() |
| tags.requires_fit = False |
| tags.transformer_tags.preserves_dtype = ["float64", "float32"] |
| return tags |
|
|
| @property |
| def n_components_(self): |
| """Number of atoms.""" |
| return self.dictionary.shape[0] |
|
|
| @property |
| def n_features_in_(self): |
| """Number of features seen during `fit`.""" |
| return self.dictionary.shape[1] |
|
|
| @property |
| def _n_features_out(self): |
| """Number of transformed output features.""" |
| return self.n_components_ |
|
|
|
|
| class DictionaryLearning(_BaseSparseCoding, BaseEstimator): |
| """Dictionary learning. |
| |
| Finds a dictionary (a set of atoms) that performs well at sparsely |
| encoding the fitted data. |
| |
| Solves the optimization problem:: |
| |
| (U^*,V^*) = argmin 0.5 || X - U V ||_Fro^2 + alpha * || U ||_1,1 |
| (U,V) |
| with || V_k ||_2 <= 1 for all 0 <= k < n_components |
| |
| ||.||_Fro stands for the Frobenius norm and ||.||_1,1 stands for |
| the entry-wise matrix norm which is the sum of the absolute values |
| of all the entries in the matrix. |
| |
| Read more in the :ref:`User Guide <DictionaryLearning>`. |
| |
| Parameters |
| ---------- |
| n_components : int, default=None |
| Number of dictionary elements to extract. If None, then ``n_components`` |
| is set to ``n_features``. |
| |
| alpha : float, default=1.0 |
| Sparsity controlling parameter. |
| |
| max_iter : int, default=1000 |
| Maximum number of iterations to perform. |
| |
| tol : float, default=1e-8 |
| Tolerance for numerical error. |
| |
| fit_algorithm : {'lars', 'cd'}, default='lars' |
| * `'lars'`: uses the least angle regression method to solve the lasso |
| problem (:func:`~sklearn.linear_model.lars_path`); |
| * `'cd'`: uses the coordinate descent method to compute the |
| Lasso solution (:class:`~sklearn.linear_model.Lasso`). Lars will be |
| faster if the estimated components are sparse. |
| |
| .. versionadded:: 0.17 |
| *cd* coordinate descent method to improve speed. |
| |
| transform_algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', \ |
| 'threshold'}, default='omp' |
| Algorithm used to transform the data: |
| |
| - `'lars'`: uses the least angle regression method |
| (:func:`~sklearn.linear_model.lars_path`); |
| - `'lasso_lars'`: uses Lars to compute the Lasso solution. |
| - `'lasso_cd'`: uses the coordinate descent method to compute the |
| Lasso solution (:class:`~sklearn.linear_model.Lasso`). `'lasso_lars'` |
| will be faster if the estimated components are sparse. |
| - `'omp'`: uses orthogonal matching pursuit to estimate the sparse |
| solution. |
| - `'threshold'`: squashes to zero all coefficients less than alpha from |
| the projection ``dictionary * X'``. |
| |
| .. versionadded:: 0.17 |
| *lasso_cd* coordinate descent method to improve speed. |
| |
| transform_n_nonzero_coefs : int, default=None |
| Number of nonzero coefficients to target in each column of the |
| solution. This is only used by `algorithm='lars'` and |
| `algorithm='omp'`. If `None`, then |
| `transform_n_nonzero_coefs=int(n_features / 10)`. |
| |
| transform_alpha : float, default=None |
| If `algorithm='lasso_lars'` or `algorithm='lasso_cd'`, `alpha` is the |
| penalty applied to the L1 norm. |
| If `algorithm='threshold'`, `alpha` is the absolute value of the |
| threshold below which coefficients will be squashed to zero. |
| If `None`, defaults to `alpha`. |
| |
| .. versionchanged:: 1.2 |
| When None, default value changed from 1.0 to `alpha`. |
| |
| n_jobs : int or None, default=None |
| Number of parallel jobs to run. |
| ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. |
| ``-1`` means using all processors. See :term:`Glossary <n_jobs>` |
| for more details. |
| |
| code_init : ndarray of shape (n_samples, n_components), default=None |
| Initial value for the code, for warm restart. Only used if `code_init` |
| and `dict_init` are not None. |
| |
| dict_init : ndarray of shape (n_components, n_features), default=None |
| Initial values for the dictionary, for warm restart. Only used if |
| `code_init` and `dict_init` are not None. |
| |
| callback : callable, default=None |
| Callable that gets invoked every five iterations. |
| |
| .. versionadded:: 1.3 |
| |
| verbose : bool, default=False |
| To control the verbosity of the procedure. |
| |
| split_sign : bool, default=False |
| Whether to split the sparse feature vector into the concatenation of |
| its negative part and its positive part. This can improve the |
| performance of downstream classifiers. |
| |
| random_state : int, RandomState instance or None, default=None |
| Used for initializing the dictionary when ``dict_init`` is not |
| specified, randomly shuffling the data when ``shuffle`` is set to |
| ``True``, and updating the dictionary. Pass an int for reproducible |
| results across multiple function calls. |
| See :term:`Glossary <random_state>`. |
| |
| positive_code : bool, default=False |
| Whether to enforce positivity when finding the code. |
| |
| .. versionadded:: 0.20 |
| |
| positive_dict : bool, default=False |
| Whether to enforce positivity when finding the dictionary. |
| |
| .. versionadded:: 0.20 |
| |
| transform_max_iter : int, default=1000 |
| Maximum number of iterations to perform if `algorithm='lasso_cd'` or |
| `'lasso_lars'`. |
| |
| .. versionadded:: 0.22 |
| |
| Attributes |
| ---------- |
| components_ : ndarray of shape (n_components, n_features) |
| dictionary atoms extracted from the data |
| |
| error_ : array |
| vector of errors at each iteration |
| |
| n_features_in_ : int |
| Number of features seen during :term:`fit`. |
| |
| .. versionadded:: 0.24 |
| |
| feature_names_in_ : ndarray of shape (`n_features_in_`,) |
| Names of features seen during :term:`fit`. Defined only when `X` |
| has feature names that are all strings. |
| |
| .. versionadded:: 1.0 |
| |
| n_iter_ : int |
| Number of iterations run. |
| |
| See Also |
| -------- |
| MiniBatchDictionaryLearning: A faster, less accurate, version of the |
| dictionary learning algorithm. |
| MiniBatchSparsePCA : Mini-batch Sparse Principal Components Analysis. |
| SparseCoder : Find a sparse representation of data from a fixed, |
| precomputed dictionary. |
| SparsePCA : Sparse Principal Components Analysis. |
| |
| References |
| ---------- |
| |
| J. Mairal, F. Bach, J. Ponce, G. Sapiro, 2009: Online dictionary learning |
| for sparse coding (https://www.di.ens.fr/~fbach/mairal_icml09.pdf) |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.datasets import make_sparse_coded_signal |
| >>> from sklearn.decomposition import DictionaryLearning |
| >>> X, dictionary, code = make_sparse_coded_signal( |
| ... n_samples=30, n_components=15, n_features=20, n_nonzero_coefs=10, |
| ... random_state=42, |
| ... ) |
| >>> dict_learner = DictionaryLearning( |
| ... n_components=15, transform_algorithm='lasso_lars', transform_alpha=0.1, |
| ... random_state=42, |
| ... ) |
| >>> X_transformed = dict_learner.fit(X).transform(X) |
| |
| We can check the level of sparsity of `X_transformed`: |
| |
| >>> np.mean(X_transformed == 0) |
| np.float64(0.52...) |
| |
| We can compare the average squared euclidean norm of the reconstruction |
| error of the sparse coded signal relative to the squared euclidean norm of |
| the original signal: |
| |
| >>> X_hat = X_transformed @ dict_learner.components_ |
| >>> np.mean(np.sum((X_hat - X) ** 2, axis=1) / np.sum(X ** 2, axis=1)) |
| np.float64(0.05...) |
| """ |
|
|
| _parameter_constraints: dict = { |
| "n_components": [Interval(Integral, 1, None, closed="left"), None], |
| "alpha": [Interval(Real, 0, None, closed="left")], |
| "max_iter": [Interval(Integral, 0, None, closed="left")], |
| "tol": [Interval(Real, 0, None, closed="left")], |
| "fit_algorithm": [StrOptions({"lars", "cd"})], |
| "transform_algorithm": [ |
| StrOptions({"lasso_lars", "lasso_cd", "lars", "omp", "threshold"}) |
| ], |
| "transform_n_nonzero_coefs": [Interval(Integral, 1, None, closed="left"), None], |
| "transform_alpha": [Interval(Real, 0, None, closed="left"), None], |
| "n_jobs": [Integral, None], |
| "code_init": [np.ndarray, None], |
| "dict_init": [np.ndarray, None], |
| "callback": [callable, None], |
| "verbose": ["verbose"], |
| "split_sign": ["boolean"], |
| "random_state": ["random_state"], |
| "positive_code": ["boolean"], |
| "positive_dict": ["boolean"], |
| "transform_max_iter": [Interval(Integral, 0, None, closed="left")], |
| } |
|
|
| def __init__( |
| self, |
| n_components=None, |
| *, |
| alpha=1, |
| max_iter=1000, |
| tol=1e-8, |
| fit_algorithm="lars", |
| transform_algorithm="omp", |
| transform_n_nonzero_coefs=None, |
| transform_alpha=None, |
| n_jobs=None, |
| code_init=None, |
| dict_init=None, |
| callback=None, |
| verbose=False, |
| split_sign=False, |
| random_state=None, |
| positive_code=False, |
| positive_dict=False, |
| transform_max_iter=1000, |
| ): |
| super().__init__( |
| transform_algorithm, |
| transform_n_nonzero_coefs, |
| transform_alpha, |
| split_sign, |
| n_jobs, |
| positive_code, |
| transform_max_iter, |
| ) |
| self.n_components = n_components |
| self.alpha = alpha |
| self.max_iter = max_iter |
| self.tol = tol |
| self.fit_algorithm = fit_algorithm |
| self.code_init = code_init |
| self.dict_init = dict_init |
| self.callback = callback |
| self.verbose = verbose |
| self.random_state = random_state |
| self.positive_dict = positive_dict |
|
|
| def fit(self, X, y=None): |
| """Fit the model from data in X. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Training vector, where `n_samples` is the number of samples |
| and `n_features` is the number of features. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| self : object |
| Returns the instance itself. |
| """ |
| self.fit_transform(X) |
| return self |
|
|
| @_fit_context(prefer_skip_nested_validation=True) |
| def fit_transform(self, X, y=None): |
| """Fit the model from data in X and return the transformed data. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Training vector, where `n_samples` is the number of samples |
| and `n_features` is the number of features. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| V : ndarray of shape (n_samples, n_components) |
| Transformed data. |
| """ |
| _check_positive_coding(method=self.fit_algorithm, positive=self.positive_code) |
|
|
| method = "lasso_" + self.fit_algorithm |
|
|
| random_state = check_random_state(self.random_state) |
| X = validate_data(self, X) |
|
|
| if self.n_components is None: |
| n_components = X.shape[1] |
| else: |
| n_components = self.n_components |
|
|
| V, U, E, self.n_iter_ = _dict_learning( |
| X, |
| n_components, |
| alpha=self.alpha, |
| tol=self.tol, |
| max_iter=self.max_iter, |
| method=method, |
| method_max_iter=self.transform_max_iter, |
| n_jobs=self.n_jobs, |
| code_init=self.code_init, |
| dict_init=self.dict_init, |
| callback=self.callback, |
| verbose=self.verbose, |
| random_state=random_state, |
| return_n_iter=True, |
| positive_dict=self.positive_dict, |
| positive_code=self.positive_code, |
| ) |
| self.components_ = U |
| self.error_ = E |
|
|
| return V |
|
|
| @property |
| def _n_features_out(self): |
| """Number of transformed output features.""" |
| return self.components_.shape[0] |
|
|
| def __sklearn_tags__(self): |
| tags = super().__sklearn_tags__() |
| tags.transformer_tags.preserves_dtype = ["float64", "float32"] |
| return tags |
|
|
|
|
| class MiniBatchDictionaryLearning(_BaseSparseCoding, BaseEstimator): |
| """Mini-batch dictionary learning. |
| |
| Finds a dictionary (a set of atoms) that performs well at sparsely |
| encoding the fitted data. |
| |
| Solves the optimization problem:: |
| |
| (U^*,V^*) = argmin 0.5 || X - U V ||_Fro^2 + alpha * || U ||_1,1 |
| (U,V) |
| with || V_k ||_2 <= 1 for all 0 <= k < n_components |
| |
| ||.||_Fro stands for the Frobenius norm and ||.||_1,1 stands for |
| the entry-wise matrix norm which is the sum of the absolute values |
| of all the entries in the matrix. |
| |
| Read more in the :ref:`User Guide <DictionaryLearning>`. |
| |
| Parameters |
| ---------- |
| n_components : int, default=None |
| Number of dictionary elements to extract. |
| |
| alpha : float, default=1 |
| Sparsity controlling parameter. |
| |
| max_iter : int, default=1_000 |
| Maximum number of iterations over the complete dataset before |
| stopping independently of any early stopping criterion heuristics. |
| |
| .. versionadded:: 1.1 |
| |
| fit_algorithm : {'lars', 'cd'}, default='lars' |
| The algorithm used: |
| |
| - `'lars'`: uses the least angle regression method to solve the lasso |
| problem (`linear_model.lars_path`) |
| - `'cd'`: uses the coordinate descent method to compute the |
| Lasso solution (`linear_model.Lasso`). Lars will be faster if |
| the estimated components are sparse. |
| |
| n_jobs : int, default=None |
| Number of parallel jobs to run. |
| ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. |
| ``-1`` means using all processors. See :term:`Glossary <n_jobs>` |
| for more details. |
| |
| batch_size : int, default=256 |
| Number of samples in each mini-batch. |
| |
| .. versionchanged:: 1.3 |
| The default value of `batch_size` changed from 3 to 256 in version 1.3. |
| |
| shuffle : bool, default=True |
| Whether to shuffle the samples before forming batches. |
| |
| dict_init : ndarray of shape (n_components, n_features), default=None |
| Initial value of the dictionary for warm restart scenarios. |
| |
| transform_algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', \ |
| 'threshold'}, default='omp' |
| Algorithm used to transform the data: |
| |
| - `'lars'`: uses the least angle regression method |
| (`linear_model.lars_path`); |
| - `'lasso_lars'`: uses Lars to compute the Lasso solution. |
| - `'lasso_cd'`: uses the coordinate descent method to compute the |
| Lasso solution (`linear_model.Lasso`). `'lasso_lars'` will be faster |
| if the estimated components are sparse. |
| - `'omp'`: uses orthogonal matching pursuit to estimate the sparse |
| solution. |
| - `'threshold'`: squashes to zero all coefficients less than alpha from |
| the projection ``dictionary * X'``. |
| |
| transform_n_nonzero_coefs : int, default=None |
| Number of nonzero coefficients to target in each column of the |
| solution. This is only used by `algorithm='lars'` and |
| `algorithm='omp'`. If `None`, then |
| `transform_n_nonzero_coefs=int(n_features / 10)`. |
| |
| transform_alpha : float, default=None |
| If `algorithm='lasso_lars'` or `algorithm='lasso_cd'`, `alpha` is the |
| penalty applied to the L1 norm. |
| If `algorithm='threshold'`, `alpha` is the absolute value of the |
| threshold below which coefficients will be squashed to zero. |
| If `None`, defaults to `alpha`. |
| |
| .. versionchanged:: 1.2 |
| When None, default value changed from 1.0 to `alpha`. |
| |
| verbose : bool or int, default=False |
| To control the verbosity of the procedure. |
| |
| split_sign : bool, default=False |
| Whether to split the sparse feature vector into the concatenation of |
| its negative part and its positive part. This can improve the |
| performance of downstream classifiers. |
| |
| random_state : int, RandomState instance or None, default=None |
| Used for initializing the dictionary when ``dict_init`` is not |
| specified, randomly shuffling the data when ``shuffle`` is set to |
| ``True``, and updating the dictionary. Pass an int for reproducible |
| results across multiple function calls. |
| See :term:`Glossary <random_state>`. |
| |
| positive_code : bool, default=False |
| Whether to enforce positivity when finding the code. |
| |
| .. versionadded:: 0.20 |
| |
| positive_dict : bool, default=False |
| Whether to enforce positivity when finding the dictionary. |
| |
| .. versionadded:: 0.20 |
| |
| transform_max_iter : int, default=1000 |
| Maximum number of iterations to perform if `algorithm='lasso_cd'` or |
| `'lasso_lars'`. |
| |
| .. versionadded:: 0.22 |
| |
| callback : callable, default=None |
| A callable that gets invoked at the end of each iteration. |
| |
| .. versionadded:: 1.1 |
| |
| tol : float, default=1e-3 |
| Control early stopping based on the norm of the differences in the |
| dictionary between 2 steps. |
| |
| To disable early stopping based on changes in the dictionary, set |
| `tol` to 0.0. |
| |
| .. versionadded:: 1.1 |
| |
| max_no_improvement : int, default=10 |
| Control early stopping based on the consecutive number of mini batches |
| that does not yield an improvement on the smoothed cost function. |
| |
| To disable convergence detection based on cost function, set |
| `max_no_improvement` to None. |
| |
| .. versionadded:: 1.1 |
| |
| Attributes |
| ---------- |
| components_ : ndarray of shape (n_components, n_features) |
| Components extracted from the data. |
| |
| n_features_in_ : int |
| Number of features seen during :term:`fit`. |
| |
| .. versionadded:: 0.24 |
| |
| feature_names_in_ : ndarray of shape (`n_features_in_`,) |
| Names of features seen during :term:`fit`. Defined only when `X` |
| has feature names that are all strings. |
| |
| .. versionadded:: 1.0 |
| |
| n_iter_ : int |
| Number of iterations over the full dataset. |
| |
| n_steps_ : int |
| Number of mini-batches processed. |
| |
| .. versionadded:: 1.1 |
| |
| See Also |
| -------- |
| DictionaryLearning : Find a dictionary that sparsely encodes data. |
| MiniBatchSparsePCA : Mini-batch Sparse Principal Components Analysis. |
| SparseCoder : Find a sparse representation of data from a fixed, |
| precomputed dictionary. |
| SparsePCA : Sparse Principal Components Analysis. |
| |
| References |
| ---------- |
| |
| J. Mairal, F. Bach, J. Ponce, G. Sapiro, 2009: Online dictionary learning |
| for sparse coding (https://www.di.ens.fr/~fbach/mairal_icml09.pdf) |
| |
| Examples |
| -------- |
| >>> import numpy as np |
| >>> from sklearn.datasets import make_sparse_coded_signal |
| >>> from sklearn.decomposition import MiniBatchDictionaryLearning |
| >>> X, dictionary, code = make_sparse_coded_signal( |
| ... n_samples=30, n_components=15, n_features=20, n_nonzero_coefs=10, |
| ... random_state=42) |
| >>> dict_learner = MiniBatchDictionaryLearning( |
| ... n_components=15, batch_size=3, transform_algorithm='lasso_lars', |
| ... transform_alpha=0.1, max_iter=20, random_state=42) |
| >>> X_transformed = dict_learner.fit_transform(X) |
| |
| We can check the level of sparsity of `X_transformed`: |
| |
| >>> np.mean(X_transformed == 0) > 0.5 |
| np.True_ |
| |
| We can compare the average squared euclidean norm of the reconstruction |
| error of the sparse coded signal relative to the squared euclidean norm of |
| the original signal: |
| |
| >>> X_hat = X_transformed @ dict_learner.components_ |
| >>> np.mean(np.sum((X_hat - X) ** 2, axis=1) / np.sum(X ** 2, axis=1)) |
| np.float64(0.052...) |
| """ |
|
|
| _parameter_constraints: dict = { |
| "n_components": [Interval(Integral, 1, None, closed="left"), None], |
| "alpha": [Interval(Real, 0, None, closed="left")], |
| "max_iter": [Interval(Integral, 0, None, closed="left")], |
| "fit_algorithm": [StrOptions({"cd", "lars"})], |
| "n_jobs": [None, Integral], |
| "batch_size": [Interval(Integral, 1, None, closed="left")], |
| "shuffle": ["boolean"], |
| "dict_init": [None, np.ndarray], |
| "transform_algorithm": [ |
| StrOptions({"lasso_lars", "lasso_cd", "lars", "omp", "threshold"}) |
| ], |
| "transform_n_nonzero_coefs": [Interval(Integral, 1, None, closed="left"), None], |
| "transform_alpha": [Interval(Real, 0, None, closed="left"), None], |
| "verbose": ["verbose"], |
| "split_sign": ["boolean"], |
| "random_state": ["random_state"], |
| "positive_code": ["boolean"], |
| "positive_dict": ["boolean"], |
| "transform_max_iter": [Interval(Integral, 0, None, closed="left")], |
| "callback": [None, callable], |
| "tol": [Interval(Real, 0, None, closed="left")], |
| "max_no_improvement": [Interval(Integral, 0, None, closed="left"), None], |
| } |
|
|
| def __init__( |
| self, |
| n_components=None, |
| *, |
| alpha=1, |
| max_iter=1_000, |
| fit_algorithm="lars", |
| n_jobs=None, |
| batch_size=256, |
| shuffle=True, |
| dict_init=None, |
| transform_algorithm="omp", |
| transform_n_nonzero_coefs=None, |
| transform_alpha=None, |
| verbose=False, |
| split_sign=False, |
| random_state=None, |
| positive_code=False, |
| positive_dict=False, |
| transform_max_iter=1000, |
| callback=None, |
| tol=1e-3, |
| max_no_improvement=10, |
| ): |
| super().__init__( |
| transform_algorithm, |
| transform_n_nonzero_coefs, |
| transform_alpha, |
| split_sign, |
| n_jobs, |
| positive_code, |
| transform_max_iter, |
| ) |
| self.n_components = n_components |
| self.alpha = alpha |
| self.max_iter = max_iter |
| self.fit_algorithm = fit_algorithm |
| self.dict_init = dict_init |
| self.verbose = verbose |
| self.shuffle = shuffle |
| self.batch_size = batch_size |
| self.split_sign = split_sign |
| self.random_state = random_state |
| self.positive_dict = positive_dict |
| self.callback = callback |
| self.max_no_improvement = max_no_improvement |
| self.tol = tol |
|
|
| def _check_params(self, X): |
| |
| self._n_components = self.n_components |
| if self._n_components is None: |
| self._n_components = X.shape[1] |
|
|
| |
| _check_positive_coding(self.fit_algorithm, self.positive_code) |
| self._fit_algorithm = "lasso_" + self.fit_algorithm |
|
|
| |
| self._batch_size = min(self.batch_size, X.shape[0]) |
|
|
| def _initialize_dict(self, X, random_state): |
| """Initialization of the dictionary.""" |
| if self.dict_init is not None: |
| dictionary = self.dict_init |
| else: |
| |
| _, S, dictionary = randomized_svd( |
| X, self._n_components, random_state=random_state |
| ) |
| dictionary = S[:, np.newaxis] * dictionary |
|
|
| if self._n_components <= len(dictionary): |
| dictionary = dictionary[: self._n_components, :] |
| else: |
| dictionary = np.concatenate( |
| ( |
| dictionary, |
| np.zeros( |
| (self._n_components - len(dictionary), dictionary.shape[1]), |
| dtype=dictionary.dtype, |
| ), |
| ) |
| ) |
|
|
| dictionary = check_array(dictionary, order="F", dtype=X.dtype, copy=False) |
| dictionary = np.require(dictionary, requirements="W") |
|
|
| return dictionary |
|
|
| def _update_inner_stats(self, X, code, batch_size, step): |
| """Update the inner stats inplace.""" |
| if step < batch_size - 1: |
| theta = (step + 1) * batch_size |
| else: |
| theta = batch_size**2 + step + 1 - batch_size |
| beta = (theta + 1 - batch_size) / (theta + 1) |
|
|
| self._A *= beta |
| self._A += code.T @ code / batch_size |
| self._B *= beta |
| self._B += X.T @ code / batch_size |
|
|
| def _minibatch_step(self, X, dictionary, random_state, step): |
| """Perform the update on the dictionary for one minibatch.""" |
| batch_size = X.shape[0] |
|
|
| |
| code = _sparse_encode( |
| X, |
| dictionary, |
| algorithm=self._fit_algorithm, |
| alpha=self.alpha, |
| n_jobs=self.n_jobs, |
| positive=self.positive_code, |
| max_iter=self.transform_max_iter, |
| verbose=self.verbose, |
| ) |
|
|
| batch_cost = ( |
| 0.5 * ((X - code @ dictionary) ** 2).sum() |
| + self.alpha * np.sum(np.abs(code)) |
| ) / batch_size |
|
|
| |
| self._update_inner_stats(X, code, batch_size, step) |
|
|
| |
| _update_dict( |
| dictionary, |
| X, |
| code, |
| self._A, |
| self._B, |
| verbose=self.verbose, |
| random_state=random_state, |
| positive=self.positive_dict, |
| ) |
|
|
| return batch_cost |
|
|
| def _check_convergence( |
| self, X, batch_cost, new_dict, old_dict, n_samples, step, n_steps |
| ): |
| """Helper function to encapsulate the early stopping logic. |
| |
| Early stopping is based on two factors: |
| - A small change of the dictionary between two minibatch updates. This is |
| controlled by the tol parameter. |
| - No more improvement on a smoothed estimate of the objective function for a |
| a certain number of consecutive minibatch updates. This is controlled by |
| the max_no_improvement parameter. |
| """ |
| batch_size = X.shape[0] |
|
|
| |
| step = step + 1 |
|
|
| |
| |
| if step <= min(100, n_samples / batch_size): |
| if self.verbose: |
| print(f"Minibatch step {step}/{n_steps}: mean batch cost: {batch_cost}") |
| return False |
|
|
| |
| |
| |
| if self._ewa_cost is None: |
| self._ewa_cost = batch_cost |
| else: |
| alpha = batch_size / (n_samples + 1) |
| alpha = min(alpha, 1) |
| self._ewa_cost = self._ewa_cost * (1 - alpha) + batch_cost * alpha |
|
|
| if self.verbose: |
| print( |
| f"Minibatch step {step}/{n_steps}: mean batch cost: " |
| f"{batch_cost}, ewa cost: {self._ewa_cost}" |
| ) |
|
|
| |
| dict_diff = linalg.norm(new_dict - old_dict) / self._n_components |
| if self.tol > 0 and dict_diff <= self.tol: |
| if self.verbose: |
| print(f"Converged (small dictionary change) at step {step}/{n_steps}") |
| return True |
|
|
| |
| |
| if self._ewa_cost_min is None or self._ewa_cost < self._ewa_cost_min: |
| self._no_improvement = 0 |
| self._ewa_cost_min = self._ewa_cost |
| else: |
| self._no_improvement += 1 |
|
|
| if ( |
| self.max_no_improvement is not None |
| and self._no_improvement >= self.max_no_improvement |
| ): |
| if self.verbose: |
| print( |
| "Converged (lack of improvement in objective function) " |
| f"at step {step}/{n_steps}" |
| ) |
| return True |
|
|
| return False |
|
|
| @_fit_context(prefer_skip_nested_validation=True) |
| def fit(self, X, y=None): |
| """Fit the model from data in X. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Training vector, where `n_samples` is the number of samples |
| and `n_features` is the number of features. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| self : object |
| Returns the instance itself. |
| """ |
| X = validate_data( |
| self, X, dtype=[np.float64, np.float32], order="C", copy=False |
| ) |
|
|
| self._check_params(X) |
| self._random_state = check_random_state(self.random_state) |
|
|
| dictionary = self._initialize_dict(X, self._random_state) |
| old_dict = dictionary.copy() |
|
|
| if self.shuffle: |
| X_train = X.copy() |
| self._random_state.shuffle(X_train) |
| else: |
| X_train = X |
|
|
| n_samples, n_features = X_train.shape |
|
|
| if self.verbose: |
| print("[dict_learning]") |
|
|
| |
| self._A = np.zeros( |
| (self._n_components, self._n_components), dtype=X_train.dtype |
| ) |
| self._B = np.zeros((n_features, self._n_components), dtype=X_train.dtype) |
|
|
| |
| self._ewa_cost = None |
| self._ewa_cost_min = None |
| self._no_improvement = 0 |
|
|
| batches = gen_batches(n_samples, self._batch_size) |
| batches = itertools.cycle(batches) |
| n_steps_per_iter = int(np.ceil(n_samples / self._batch_size)) |
| n_steps = self.max_iter * n_steps_per_iter |
|
|
| i = -1 |
|
|
| for i, batch in zip(range(n_steps), batches): |
| X_batch = X_train[batch] |
|
|
| batch_cost = self._minibatch_step( |
| X_batch, dictionary, self._random_state, i |
| ) |
|
|
| if self._check_convergence( |
| X_batch, batch_cost, dictionary, old_dict, n_samples, i, n_steps |
| ): |
| break |
|
|
| |
| |
| if self.callback is not None: |
| self.callback(locals()) |
|
|
| old_dict[:] = dictionary |
|
|
| self.n_steps_ = i + 1 |
| self.n_iter_ = np.ceil(self.n_steps_ / n_steps_per_iter) |
| self.components_ = dictionary |
|
|
| return self |
|
|
| @_fit_context(prefer_skip_nested_validation=True) |
| def partial_fit(self, X, y=None): |
| """Update the model using the data in X as a mini-batch. |
| |
| Parameters |
| ---------- |
| X : array-like of shape (n_samples, n_features) |
| Training vector, where `n_samples` is the number of samples |
| and `n_features` is the number of features. |
| |
| y : Ignored |
| Not used, present for API consistency by convention. |
| |
| Returns |
| ------- |
| self : object |
| Return the instance itself. |
| """ |
| has_components = hasattr(self, "components_") |
|
|
| X = validate_data( |
| self, X, dtype=[np.float64, np.float32], order="C", reset=not has_components |
| ) |
|
|
| if not has_components: |
| |
| self._check_params(X) |
| self._random_state = check_random_state(self.random_state) |
|
|
| dictionary = self._initialize_dict(X, self._random_state) |
|
|
| self.n_steps_ = 0 |
|
|
| self._A = np.zeros((self._n_components, self._n_components), dtype=X.dtype) |
| self._B = np.zeros((X.shape[1], self._n_components), dtype=X.dtype) |
| else: |
| dictionary = self.components_ |
|
|
| self._minibatch_step(X, dictionary, self._random_state, self.n_steps_) |
|
|
| self.components_ = dictionary |
| self.n_steps_ += 1 |
|
|
| return self |
|
|
| @property |
| def _n_features_out(self): |
| """Number of transformed output features.""" |
| return self.components_.shape[0] |
|
|
| def __sklearn_tags__(self): |
| tags = super().__sklearn_tags__() |
| tags.transformer_tags.preserves_dtype = ["float64", "float32"] |
| return tags |
|
|