Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import datetime | |
| import glob | |
| import inspect | |
| import os | |
| import numpy as np | |
| from ..._fiff.meas_info import create_info | |
| from ..._fiff.utils import _mult_cal_one | |
| from ...annotations import Annotations | |
| from ...utils import _check_fname, _soft_import, fill_doc, logger, verbose | |
| from ..base import BaseRaw | |
| def read_raw_neuralynx( | |
| fname, *, preload=False, exclude_fname_patterns=None, verbose=None | |
| ) -> "RawNeuralynx": | |
| """Reader for Neuralynx files. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to a folder with Neuralynx .ncs files. | |
| %(preload)s | |
| exclude_fname_patterns : list of str | |
| List of glob-like string patterns to exclude from channel list. | |
| Useful when not all channels have the same number of samples | |
| so you can read separate instances. | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawNeuralynx | |
| A Raw object containing Neuralynx data. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods of RawNeuralynx. | |
| Notes | |
| ----- | |
| Neuralynx files are read from disk using the `Neo package | |
| <http://neuralensemble.org/neo/>`__. | |
| Currently, only reading of the ``.ncs files`` is supported. | |
| ``raw.info["meas_date"]`` is read from the ``recording_opened`` property | |
| of the first ``.ncs`` file (i.e. channel) in the dataset (a warning is issued | |
| if files have different dates of acquisition). | |
| Channel-specific high and lowpass frequencies of online filters are determined | |
| based on the ``DspLowCutFrequency`` and ``DspHighCutFrequency`` header fields, | |
| respectively. If no filters were used for a channel, the default lowpass is set | |
| to the Nyquist frequency and the default highpass is set to 0. | |
| If channels have different high/low cutoffs, ``raw.info["highpass"]`` and | |
| ``raw.info["lowpass"]`` are then set to the maximum highpass and minimumlowpass | |
| values across channels, respectively. | |
| Other header variables can be inspected using Neo directly. For example:: | |
| from neo.io import NeuralynxIO # doctest: +SKIP | |
| fname = 'path/to/your/data' # doctest: +SKIP | |
| nlx_reader = NeuralynxIO(dirname=fname) # doctest: +SKIP | |
| print(nlx_reader.header) # doctest: +SKIP | |
| print(nlx_reader.file_headers.items()) # doctest: +SKIP | |
| """ | |
| return RawNeuralynx( | |
| fname, | |
| preload=preload, | |
| exclude_fname_patterns=exclude_fname_patterns, | |
| verbose=verbose, | |
| ) | |
| # Helper for neo change of exclude_filename -> exclude_filenames in 0.13.2 | |
| def _exclude_kwarg(exclude_fnames): | |
| from neo.io import NeuralynxIO | |
| key = "exclude_filename" | |
| if "exclude_filenames" in inspect.getfullargspec(NeuralynxIO).args: | |
| key += "s" | |
| return {key: exclude_fnames} | |
| class RawNeuralynx(BaseRaw): | |
| """RawNeuralynx class.""" | |
| def __init__( | |
| self, | |
| fname, | |
| *, | |
| preload=False, | |
| exclude_fname_patterns=None, | |
| verbose=None, | |
| ): | |
| fname = _check_fname(fname, "read", True, "fname", need_dir=True) | |
| _soft_import("neo", "Reading NeuralynxIO files", strict=True) | |
| from neo.io import NeuralynxIO | |
| logger.info(f"Checking files in {fname}") | |
| # construct a list of filenames to ignore | |
| exclude_fnames = None | |
| if exclude_fname_patterns: | |
| exclude_fnames = [] | |
| for pattern in exclude_fname_patterns: | |
| fnames = glob.glob(os.path.join(fname, pattern)) | |
| fnames = [os.path.basename(fname) for fname in fnames] | |
| exclude_fnames.extend(fnames) | |
| logger.info("Ignoring .ncs files:\n" + "\n".join(exclude_fnames)) | |
| # get basic file info from header, throw Error if NeuralynxIO can't parse | |
| try: | |
| nlx_reader = NeuralynxIO(dirname=fname, **_exclude_kwarg(exclude_fnames)) | |
| except ValueError as e: | |
| # give a more informative error message and what the user can do about it | |
| if "Incompatible section structures across streams" in str(e): | |
| raise ValueError( | |
| "It seems .ncs channels have different numbers of samples. " | |
| + "This is likely due to different sampling rates. " | |
| + "Try reading in only channels with uniform sampling rate " | |
| + "by excluding other channels with `exclude_fname_patterns` " | |
| + "input argument." | |
| + f"\nOriginal neo.NeuralynxRawIO ValueError:\n{e}" | |
| ) from None | |
| else: | |
| raise | |
| info = create_info( | |
| ch_types="seeg", | |
| ch_names=nlx_reader.header["signal_channels"]["name"].tolist(), | |
| sfreq=nlx_reader.get_signal_sampling_rate(), | |
| ) | |
| ncs_fnames = nlx_reader.ncs_filenames.values() | |
| ncs_hdrs = [ | |
| hdr | |
| for hdr_key, hdr in nlx_reader.file_headers.items() | |
| if hdr_key in ncs_fnames | |
| ] | |
| # if all files have the same recording_opened date, write it to info | |
| meas_dates = np.array([hdr["recording_opened"] for hdr in ncs_hdrs]) | |
| # to be sure, only write if all dates are the same | |
| meas_diff = [] | |
| for md in meas_dates: | |
| meas_diff.append((md - meas_dates[0]).total_seconds()) | |
| # tolerate a +/-1 second meas_date difference (arbitrary threshold) | |
| # else issue a warning | |
| warn_meas = (np.abs(meas_diff) > 1.0).any() | |
| if warn_meas: | |
| logger.warning( | |
| "Not all .ncs files have the same recording_opened date. " | |
| + "Writing meas_date based on the first .ncs file." | |
| ) | |
| # Neuarlynx allows channel specific low/highpass filters | |
| # if not enabled, assume default lowpass = nyquist, highpass = 0 | |
| default_lowpass = info["sfreq"] / 2 # nyquist | |
| default_highpass = 0 | |
| has_hp = [hdr["DSPLowCutFilterEnabled"] for hdr in ncs_hdrs] | |
| has_lp = [hdr["DSPHighCutFilterEnabled"] for hdr in ncs_hdrs] | |
| if not all(has_hp) or not all(has_lp): | |
| logger.warning( | |
| "Not all .ncs files have the same high/lowpass filter settings. " | |
| + "Assuming default highpass = 0, lowpass = nyquist." | |
| ) | |
| highpass_freqs = [ | |
| float(hdr["DspLowCutFrequency"]) | |
| if hdr["DSPLowCutFilterEnabled"] | |
| else default_highpass | |
| for hdr in ncs_hdrs | |
| ] | |
| lowpass_freqs = [ | |
| float(hdr["DspHighCutFrequency"]) | |
| if hdr["DSPHighCutFilterEnabled"] | |
| else default_lowpass | |
| for hdr in ncs_hdrs | |
| ] | |
| with info._unlock(): | |
| info["meas_date"] = meas_dates[0].astimezone(datetime.timezone.utc) | |
| info["highpass"] = np.max(highpass_freqs) | |
| info["lowpass"] = np.min(lowpass_freqs) | |
| # Neo reads only valid contiguous .ncs samples grouped as segments | |
| n_segments = nlx_reader.header["nb_segment"][0] | |
| block_id = 0 # assumes there's only one block of recording | |
| # get segment start/stop times | |
| start_times = np.array( | |
| [nlx_reader.segment_t_start(block_id, i) for i in range(n_segments)] | |
| ) | |
| stop_times = np.array( | |
| [nlx_reader.segment_t_stop(block_id, i) for i in range(n_segments)] | |
| ) | |
| # find discontinuous boundaries (of length n-1) | |
| next_start_times = start_times[1::] | |
| previous_stop_times = stop_times[:-1] | |
| seg_diffs = next_start_times - previous_stop_times | |
| # mark as discontinuous any two segments that have | |
| # start/stop delta larger than sampling period (1.5/sampling_rate) | |
| logger.info("Checking for temporal discontinuities in Neo data segments.") | |
| delta = 1.5 / info["sfreq"] | |
| gaps = seg_diffs > delta | |
| seg_gap_dict = {} | |
| logger.info( | |
| f"N = {gaps.sum()} discontinuous Neo segments detected " | |
| + f"with delta > {delta} sec. " | |
| + "Annotating gaps as BAD_ACQ_SKIP." | |
| if gaps.any() | |
| else "No discontinuities detected." | |
| ) | |
| gap_starts = stop_times[:-1][gaps] # gap starts at segment offset | |
| gap_stops = start_times[1::][gaps] # gap stops at segment onset | |
| # (n_gaps,) array of ints giving number of samples per inferred gap | |
| gap_n_samps = np.array( | |
| [ | |
| int(round(stop * info["sfreq"])) - int(round(start * info["sfreq"])) | |
| for start, stop in zip(gap_starts, gap_stops) | |
| ] | |
| ).astype(int) # force an int array (if no gaps, empty array is a float) | |
| # get sort indices for all segments (valid and gap) in ascending order | |
| all_starts_ids = np.argsort(np.concatenate([start_times, gap_starts])) | |
| # variable indicating whether each segment is a gap or not | |
| gap_indicator = np.concatenate( | |
| [ | |
| np.full(len(start_times), fill_value=0), | |
| np.full(len(gap_starts), fill_value=1), | |
| ] | |
| ) | |
| gap_indicator = gap_indicator[all_starts_ids].astype(bool) | |
| # store this in a dict to be passed to _raw_extras | |
| seg_gap_dict = { | |
| "gap_n_samps": gap_n_samps, | |
| "isgap": gap_indicator, # False (data segment) or True (gap segment) | |
| } | |
| valid_segment_sizes = [ | |
| nlx_reader.get_signal_size(block_id, i) for i in range(n_segments) | |
| ] | |
| sizes_sorted = np.concatenate([valid_segment_sizes, gap_n_samps])[ | |
| all_starts_ids | |
| ] | |
| # now construct an (n_samples,) indicator variable | |
| sample2segment = np.concatenate( | |
| [np.full(shape=(n,), fill_value=i) for i, n in enumerate(sizes_sorted)] | |
| ) | |
| # get the start sample index for each gap segment () | |
| gap_start_ids = np.cumsum(np.hstack([[0], sizes_sorted[:-1]]))[gap_indicator] | |
| # recreate time axis for gap annotations | |
| mne_times = np.arange(0, len(sample2segment)) / info["sfreq"] | |
| assert len(gap_start_ids) == len(gap_n_samps) | |
| annotations = Annotations( | |
| onset=[mne_times[onset_id] for onset_id in gap_start_ids], | |
| duration=[ | |
| mne_times[onset_id + (n - 1)] - mne_times[onset_id] | |
| for onset_id, n in zip(gap_start_ids, gap_n_samps) | |
| ], | |
| description=["BAD_ACQ_SKIP"] * len(gap_start_ids), | |
| ) | |
| super().__init__( | |
| info=info, | |
| last_samps=[sizes_sorted.sum() - 1], | |
| filenames=[fname], | |
| preload=preload, | |
| raw_extras=[ | |
| dict( | |
| smp2seg=sample2segment, | |
| exclude_fnames=exclude_fnames, | |
| segment_sizes=sizes_sorted, | |
| seg_gap_dict=seg_gap_dict, | |
| ) | |
| ], | |
| ) | |
| self.set_annotations(annotations) | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of raw data.""" | |
| from neo import AnalogSignal, Segment | |
| from neo.io import NeuralynxIO | |
| from neo.io.proxyobjects import AnalogSignalProxy | |
| # quantities is a dependency of neo so we are guaranteed it exists | |
| from quantities import Hz | |
| nlx_reader = NeuralynxIO( | |
| dirname=self.filenames[fi], | |
| **_exclude_kwarg(self._raw_extras[0]["exclude_fnames"]), | |
| ) | |
| neo_block = nlx_reader.read(lazy=True) | |
| # check that every segment has 1 associated neo.AnalogSignal() object | |
| # (not sure what multiple analogsignals per neo.Segment would mean) | |
| assert sum( | |
| [len(segment.analogsignals) for segment in neo_block[0].segments] | |
| ) == len(neo_block[0].segments) | |
| segment_sizes = self._raw_extras[fi]["segment_sizes"] | |
| # construct a (n_segments, 2) array of the first and last | |
| # sample index for each segment relative to the start of the recording | |
| seg_starts = [0] # first chunk starts at sample 0 | |
| seg_stops = [segment_sizes[0] - 1] | |
| for i in range(1, len(segment_sizes)): | |
| ons_new = ( | |
| seg_stops[i - 1] + 1 | |
| ) # current chunk starts one sample after the previous one | |
| seg_starts.append(ons_new) | |
| off_new = ( | |
| seg_stops[i - 1] + segment_sizes[i] | |
| ) # the last sample is len(chunk) samples after the previous ended | |
| seg_stops.append(off_new) | |
| start_stop_samples = np.stack([np.array(seg_starts), np.array(seg_stops)]).T | |
| first_seg = self._raw_extras[0]["smp2seg"][ | |
| start | |
| ] # segment containing start sample | |
| last_seg = self._raw_extras[0]["smp2seg"][ | |
| stop - 1 | |
| ] # segment containing stop sample | |
| # select all segments between the one that contains the start sample | |
| # and the one that contains the stop sample | |
| sel_samples_global = start_stop_samples[first_seg : last_seg + 1, :] | |
| # express end samples relative to segment onsets | |
| # to be used for slicing the arrays below | |
| sel_samples_local = sel_samples_global.copy() | |
| sel_samples_local[0:-1, 1] = ( | |
| sel_samples_global[0:-1, 1] - sel_samples_global[0:-1, 0] | |
| ) | |
| sel_samples_local[1::, 0] = ( | |
| 0 # now set the start sample for all segments after the first to 0 | |
| ) | |
| sel_samples_local[0, 0] = ( | |
| start - sel_samples_global[0, 0] | |
| ) # express start sample relative to segment onset | |
| sel_samples_local[-1, -1] = (stop - 1) - sel_samples_global[ | |
| -1, 0 | |
| ] # express stop sample relative to segment onset | |
| # array containing Segments | |
| segments_arr = np.array(neo_block[0].segments, dtype=object) | |
| # if gaps were detected, correctly insert gap Segments in between valid Segments | |
| gap_samples = self._raw_extras[fi]["seg_gap_dict"]["gap_n_samps"] | |
| gap_segments = [Segment(f"gap-{i}") for i in range(len(gap_samples))] | |
| # create AnalogSignal objects representing gap data filled with 0's | |
| sfreq = nlx_reader.get_signal_sampling_rate() | |
| n_chans = ( | |
| np.arange(idx.start, idx.stop, idx.step).size | |
| if type(idx) is slice | |
| else len(idx) # idx can be a slice or an np.array so check both | |
| ) | |
| for seg, n in zip(gap_segments, gap_samples): | |
| asig = AnalogSignal( | |
| signal=np.zeros((n, n_chans)), units="uV", sampling_rate=sfreq * Hz | |
| ) | |
| seg.analogsignals.append(asig) | |
| n_total_segments = len(neo_block[0].segments + gap_segments) | |
| segments_arr = np.zeros((n_total_segments,), dtype=object) | |
| # insert inferred gap segments at the right place in between valid segments | |
| isgap = self._raw_extras[0]["seg_gap_dict"]["isgap"] | |
| segments_arr[~isgap] = neo_block[0].segments | |
| segments_arr[isgap] = gap_segments | |
| # now load data for selected segments/channels via | |
| # neo.Segment.AnalogSignalProxy.load() or | |
| # pad directly as AnalogSignal.magnitude for any gap data | |
| all_data = np.concatenate( | |
| [ | |
| signal.load(channel_indexes=idx).magnitude[ | |
| samples[0] : samples[-1] + 1, : | |
| ] | |
| if isinstance(signal, AnalogSignalProxy) | |
| else signal.magnitude[samples[0] : samples[-1] + 1, :] | |
| for seg, samples in zip( | |
| segments_arr[first_seg : last_seg + 1], sel_samples_local | |
| ) | |
| for signal in seg.analogsignals | |
| ] | |
| ).T | |
| all_data *= 1e-6 # Convert uV to V | |
| n_channels = len(nlx_reader.header["signal_channels"]["name"]) | |
| block = np.zeros((n_channels, stop - start), dtype=data.dtype) | |
| block[idx] = all_data # shape = (n_channels, n_samples) | |
| # Then store the result where it needs to go | |
| _mult_cal_one(data, block, idx, cals, mult) | |