Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import numpy as np | |
| from scipy.fft import irfft, rfft | |
| from .utils import ( | |
| _check_option, | |
| _explain_exception, | |
| fill_doc, | |
| get_config, | |
| logger, | |
| sizeof_fmt, | |
| verbose, | |
| warn, | |
| ) | |
| _cuda_capable = False | |
| def get_cuda_memory(kind="available"): | |
| """Get the amount of free memory for CUDA operations. | |
| Parameters | |
| ---------- | |
| kind : str | |
| Can be ``"available"`` or ``"total"``. | |
| Returns | |
| ------- | |
| memory : str | |
| The amount of available or total memory as a human-readable string. | |
| """ | |
| if not _cuda_capable: | |
| warn("CUDA not enabled, returning zero for memory") | |
| mem = 0 | |
| else: | |
| import cupy | |
| mem = cupy.cuda.runtime.memGetInfo()[dict(available=0, total=1)[kind]] | |
| return sizeof_fmt(mem) | |
| def init_cuda(ignore_config=False, verbose=None): | |
| """Initialize CUDA functionality. | |
| This function attempts to load the necessary interfaces | |
| (hardware connectivity) to run CUDA-based filtering. This | |
| function should only need to be run once per session. | |
| If the config var (set via mne.set_config or in ENV) | |
| MNE_USE_CUDA == 'true', this function will be executed when | |
| the first CUDA setup is performed. If this variable is not | |
| set, this function can be manually executed. | |
| Parameters | |
| ---------- | |
| ignore_config : bool | |
| If True, ignore the config value MNE_USE_CUDA and force init. | |
| %(verbose)s | |
| """ | |
| global _cuda_capable | |
| if _cuda_capable: | |
| return | |
| if not ignore_config and (get_config("MNE_USE_CUDA", "false").lower() != "true"): | |
| logger.info("CUDA not enabled in config, skipping initialization") | |
| return | |
| # Triage possible errors for informative messaging | |
| _cuda_capable = False | |
| try: | |
| import cupy # noqa | |
| except ImportError: | |
| warn("module cupy not found, CUDA not enabled") | |
| return | |
| device_id = int(get_config("MNE_CUDA_DEVICE", "0")) | |
| try: | |
| # Initialize CUDA | |
| _set_cuda_device(device_id, verbose) | |
| except Exception: | |
| warn( | |
| "so CUDA device could be initialized, likely a hardware error, " | |
| f"CUDA not enabled{_explain_exception()}" | |
| ) | |
| return | |
| _cuda_capable = True | |
| # Figure out limit for CUDA FFT calculations | |
| logger.info(f"Enabling CUDA with {get_cuda_memory()} available memory") | |
| def set_cuda_device(device_id, verbose=None): | |
| """Set the CUDA device temporarily for the current session. | |
| Parameters | |
| ---------- | |
| device_id : int | |
| Numeric ID of the CUDA-capable device you want MNE-Python to use. | |
| %(verbose)s | |
| """ | |
| if _cuda_capable: | |
| _set_cuda_device(device_id, verbose) | |
| elif get_config("MNE_USE_CUDA", "false").lower() == "true": | |
| init_cuda() | |
| _set_cuda_device(device_id, verbose) | |
| else: | |
| warn( | |
| "Could not set CUDA device because CUDA is not enabled; either " | |
| "run mne.cuda.init_cuda() first, or set the MNE_USE_CUDA config " | |
| 'variable to "true".' | |
| ) | |
| def _set_cuda_device(device_id, verbose=None): | |
| """Set the CUDA device.""" | |
| import cupy | |
| cupy.cuda.Device(device_id).use() | |
| logger.info(f"Now using CUDA device {device_id}") | |
| ############################################################################### | |
| # Repeated FFT multiplication | |
| def _setup_cuda_fft_multiply_repeated(n_jobs, h, n_fft, kind="FFT FIR filtering"): | |
| """Set up repeated CUDA FFT multiplication with a given filter. | |
| Parameters | |
| ---------- | |
| n_jobs : int | str | |
| If ``n_jobs='cuda'``, the function will attempt to set up for CUDA | |
| FFT multiplication. | |
| h : array | |
| The filtering function that will be used repeatedly. | |
| n_fft : int | |
| The number of points in the FFT. | |
| kind : str | |
| The kind to report to the user. | |
| Returns | |
| ------- | |
| n_jobs : int | |
| Sets n_jobs = 1 if n_jobs == 'cuda' was passed in, otherwise | |
| original n_jobs is passed. | |
| cuda_dict : dict | |
| Dictionary with the following CUDA-related variables: | |
| use_cuda : bool | |
| Whether CUDA should be used. | |
| fft_plan : instance of FFTPlan | |
| FFT plan to use in calculating the FFT. | |
| ifft_plan : instance of FFTPlan | |
| FFT plan to use in calculating the IFFT. | |
| x_fft : instance of gpuarray | |
| Empty allocated GPU space for storing the result of the | |
| frequency-domain multiplication. | |
| x : instance of gpuarray | |
| Empty allocated GPU space for the data to filter. | |
| h_fft : array | instance of gpuarray | |
| This will either be a gpuarray (if CUDA enabled) or ndarray. | |
| Notes | |
| ----- | |
| This function is designed to be used with fft_multiply_repeated(). | |
| """ | |
| cuda_dict = dict(n_fft=n_fft, rfft=rfft, irfft=irfft, h_fft=rfft(h, n=n_fft)) | |
| if isinstance(n_jobs, str): | |
| _check_option("n_jobs", n_jobs, ("cuda",)) | |
| n_jobs = 1 | |
| init_cuda() | |
| if _cuda_capable: | |
| import cupy | |
| try: | |
| # do the IFFT normalization now so we don't have to later | |
| h_fft = cupy.array(cuda_dict["h_fft"]) | |
| logger.info(f"Using CUDA for {kind}") | |
| except Exception as exp: | |
| logger.info( | |
| "CUDA not used, could not instantiate memory (arrays may be too " | |
| f'large: "{exp}"), falling back to n_jobs=None' | |
| ) | |
| cuda_dict.update(h_fft=h_fft, rfft=_cuda_upload_rfft, irfft=_cuda_irfft_get) | |
| else: | |
| logger.info( | |
| "CUDA not used, CUDA could not be initialized, " | |
| "falling back to n_jobs=None" | |
| ) | |
| return n_jobs, cuda_dict | |
| def _fft_multiply_repeated(x, cuda_dict): | |
| """Do FFT multiplication by a filter function (possibly using CUDA). | |
| Parameters | |
| ---------- | |
| h_fft : 1-d array or gpuarray | |
| The filtering array to apply. | |
| x : 1-d array | |
| The array to filter. | |
| n_fft : int | |
| The number of points in the FFT. | |
| cuda_dict : dict | |
| Dictionary constructed using setup_cuda_multiply_repeated(). | |
| Returns | |
| ------- | |
| x : 1-d array | |
| Filtered version of x. | |
| """ | |
| # do the fourier-domain operations | |
| x_fft = cuda_dict["rfft"](x, cuda_dict["n_fft"]) | |
| x_fft *= cuda_dict["h_fft"] | |
| x = cuda_dict["irfft"](x_fft, cuda_dict["n_fft"]) | |
| return x | |
| ############################################################################### | |
| # FFT Resampling | |
| def _setup_cuda_fft_resample(n_jobs, W, new_len): | |
| """Set up CUDA FFT resampling. | |
| Parameters | |
| ---------- | |
| n_jobs : int | str | |
| If n_jobs == 'cuda', the function will attempt to set up for CUDA | |
| FFT resampling. | |
| W : array | |
| The filtering function to be used during resampling. | |
| If n_jobs='cuda', this function will be shortened (since CUDA | |
| assumes FFTs of real signals are half the length of the signal) | |
| and turned into a gpuarray. | |
| new_len : int | |
| The size of the array following resampling. | |
| Returns | |
| ------- | |
| n_jobs : int | |
| Sets n_jobs = 1 if n_jobs == 'cuda' was passed in, otherwise | |
| original n_jobs is passed. | |
| cuda_dict : dict | |
| Dictionary with the following CUDA-related variables: | |
| use_cuda : bool | |
| Whether CUDA should be used. | |
| fft_plan : instance of FFTPlan | |
| FFT plan to use in calculating the FFT. | |
| ifft_plan : instance of FFTPlan | |
| FFT plan to use in calculating the IFFT. | |
| x_fft : instance of gpuarray | |
| Empty allocated GPU space for storing the result of the | |
| frequency-domain multiplication. | |
| x : instance of gpuarray | |
| Empty allocated GPU space for the data to resample. | |
| Notes | |
| ----- | |
| This function is designed to be used with fft_resample(). | |
| """ | |
| cuda_dict = dict(use_cuda=False, rfft=rfft, irfft=irfft) | |
| rfft_len_x = len(W) // 2 + 1 | |
| # fold the window onto inself (should be symmetric) and truncate | |
| W = W.copy() | |
| W[1:rfft_len_x] = (W[1:rfft_len_x] + W[::-1][: rfft_len_x - 1]) / 2.0 | |
| W = W[:rfft_len_x] | |
| if isinstance(n_jobs, str): | |
| _check_option("n_jobs", n_jobs, ("cuda",)) | |
| n_jobs = 1 | |
| init_cuda() | |
| if _cuda_capable: | |
| try: | |
| import cupy | |
| # do the IFFT normalization now so we don't have to later | |
| W = cupy.array(W) | |
| logger.info("Using CUDA for FFT resampling") | |
| except Exception: | |
| logger.info( | |
| "CUDA not used, could not instantiate memory " | |
| "(arrays may be too large), falling back to " | |
| "n_jobs=None" | |
| ) | |
| else: | |
| cuda_dict.update( | |
| use_cuda=True, rfft=_cuda_upload_rfft, irfft=_cuda_irfft_get | |
| ) | |
| else: | |
| logger.info( | |
| "CUDA not used, CUDA could not be initialized, " | |
| "falling back to n_jobs=None" | |
| ) | |
| cuda_dict["W"] = W | |
| return n_jobs, cuda_dict | |
| def _cuda_upload_rfft(x, n, axis=-1): | |
| """Upload and compute rfft.""" | |
| import cupy | |
| return cupy.fft.rfft(cupy.array(x), n=n, axis=axis) | |
| def _cuda_irfft_get(x, n, axis=-1): | |
| """Compute irfft and get.""" | |
| import cupy | |
| return cupy.fft.irfft(x, n=n, axis=axis).get() | |
| def _fft_resample(x, new_len, npads, to_removes, cuda_dict=None, pad="reflect_limited"): | |
| """Do FFT resampling with a filter function (possibly using CUDA). | |
| Parameters | |
| ---------- | |
| x : 1-d array | |
| The array to resample. Will be converted to float64 if necessary. | |
| new_len : int | |
| The size of the output array (before removing padding). | |
| npads : tuple of int | |
| Amount of padding to apply to the start and end of the | |
| signal before resampling. | |
| to_removes : tuple of int | |
| Number of samples to remove after resampling. | |
| cuda_dict : dict | |
| Dictionary constructed using setup_cuda_multiply_repeated(). | |
| %(pad_resample)s | |
| The default is ``'reflect_limited'``. | |
| .. versionadded:: 0.15 | |
| Returns | |
| ------- | |
| x : 1-d array | |
| Filtered version of x. | |
| """ | |
| cuda_dict = dict(use_cuda=False) if cuda_dict is None else cuda_dict | |
| # add some padding at beginning and end to make this work a little cleaner | |
| if x.dtype != np.float64: | |
| x = x.astype(np.float64) | |
| x = _smart_pad(x, npads, pad) | |
| old_len = len(x) | |
| shorter = new_len < old_len | |
| use_len = new_len if shorter else old_len | |
| x_fft = cuda_dict["rfft"](x, None) | |
| if use_len % 2 == 0: | |
| nyq = use_len // 2 | |
| x_fft[nyq : nyq + 1] *= 2 if shorter else 0.5 | |
| x_fft *= cuda_dict["W"] | |
| y = cuda_dict["irfft"](x_fft, new_len) | |
| # now let's trim it back to the correct size (if there was padding) | |
| if (to_removes > 0).any(): | |
| y = y[to_removes[0] : y.shape[0] - to_removes[1]] | |
| return y | |
| ############################################################################### | |
| # Misc | |
| # this has to go in mne.cuda instead of mne.filter to avoid import errors | |
| def _smart_pad(x, n_pad, pad="reflect_limited"): | |
| """Pad vector x.""" | |
| n_pad = np.asarray(n_pad) | |
| assert n_pad.shape == (2,) | |
| if (n_pad == 0).all(): | |
| return x | |
| elif (n_pad < 0).any(): | |
| raise RuntimeError("n_pad must be non-negative") | |
| if pad == "reflect_limited": | |
| l_z_pad = np.zeros(max(n_pad[0] - len(x) + 1, 0), dtype=x.dtype) | |
| r_z_pad = np.zeros(max(n_pad[1] - len(x) + 1, 0), dtype=x.dtype) | |
| out = np.concatenate( | |
| [ | |
| l_z_pad, | |
| 2 * x[0] - x[n_pad[0] : 0 : -1], | |
| x, | |
| 2 * x[-1] - x[-2 : -n_pad[1] - 2 : -1], | |
| r_z_pad, | |
| ] | |
| ) | |
| else: | |
| kwargs = dict() | |
| if pad == "reflect": | |
| kwargs["reflect_type"] = "odd" | |
| out = np.pad(x, (tuple(n_pad),), pad, **kwargs) | |
| return out | |