Spaces:
Running
Running
| """Reading tools from EDF, EDF+, BDF, and GDF.""" | |
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import os | |
| import re | |
| from datetime import date, datetime, timedelta, timezone | |
| from enum import Enum | |
| from pathlib import Path | |
| import numpy as np | |
| from scipy.interpolate import interp1d | |
| from ..._edf.open import _gdf_edf_get_fid | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import _empty_info, _unique_channel_names | |
| from ..._fiff.utils import _blk_read_lims, _mult_cal_one | |
| from ...annotations import Annotations | |
| from ...filter import resample | |
| from ...fixes import read_from_file_or_buffer | |
| from ...utils import ( | |
| _check_fname, | |
| _file_like, | |
| _validate_type, | |
| fill_doc, | |
| logger, | |
| verbose, | |
| warn, | |
| ) | |
| from ..base import BaseRaw, _get_scaling | |
| class FileType(Enum): | |
| """Enumeration to differentiate files when the extension is not known.""" | |
| GDF = 1 | |
| EDF = 2 | |
| BDF = 3 | |
| # common channel type names mapped to internal ch types | |
| CH_TYPE_MAPPING = { | |
| "EEG": FIFF.FIFFV_EEG_CH, | |
| "SEEG": FIFF.FIFFV_SEEG_CH, | |
| "ECOG": FIFF.FIFFV_ECOG_CH, | |
| "DBS": FIFF.FIFFV_DBS_CH, | |
| "EOG": FIFF.FIFFV_EOG_CH, | |
| "ECG": FIFF.FIFFV_ECG_CH, | |
| "EMG": FIFF.FIFFV_EMG_CH, | |
| "BIO": FIFF.FIFFV_BIO_CH, | |
| "RESP": FIFF.FIFFV_RESP_CH, | |
| "TEMP": FIFF.FIFFV_TEMPERATURE_CH, | |
| "MISC": FIFF.FIFFV_MISC_CH, | |
| "SAO2": FIFF.FIFFV_BIO_CH, | |
| "STIM": FIFF.FIFFV_STIM_CH, | |
| } | |
| class RawEDF(BaseRaw): | |
| """Raw object from EDF, EDF+ file. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | file-like | |
| Path to the EDF, EDF+ file. If a file-like object is provided, | |
| preloading must be used. | |
| .. versionchanged:: 1.10 | |
| Added support for file-like objects | |
| eog : list or tuple | |
| Names of channels or list of indices that should be designated EOG | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| misc : list or tuple | |
| Names of channels or list of indices that should be designated MISC | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| stim_channel : ``'auto'`` | str | list of str | int | list of int | |
| Defaults to ``'auto'``, which means that channels named ``'status'`` or | |
| ``'trigger'`` (case insensitive) are set to STIM. If str (or list of | |
| str), all channels matching the name(s) are set to STIM. If int (or | |
| list of ints), the channels corresponding to the indices are set to | |
| STIM. | |
| exclude : list of str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. | |
| infer_types : bool | |
| If True, try to infer channel types from channel labels. If a channel | |
| label starts with a known type (such as 'EEG') followed by a space and | |
| a name (such as 'Fp1'), the channel type will be set accordingly, and | |
| the channel will be renamed to the original label without the prefix. | |
| For unknown prefixes, the type will be 'EEG' and the name will not be | |
| modified. If False, do not infer types and assume all channels are of | |
| type 'EEG'. | |
| .. versionadded:: 0.24.1 | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| .. versionadded:: 1.1 | |
| %(preload)s | |
| %(units_edf_bdf_io)s | |
| %(encoding_edf)s | |
| %(exclude_after_unique)s | |
| %(verbose)s | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| mne.io.read_raw_edf : Recommended way to read EDF/EDF+ files. | |
| Notes | |
| ----- | |
| %(edf_resamp_note)s | |
| Biosemi devices trigger codes are encoded in 16-bit format, whereas system | |
| codes (CMS in/out-of range, battery low, etc.) are coded in bits 16-23 of | |
| the status channel (see http://www.biosemi.com/faq/trigger_signals.htm). | |
| To retrieve correct event values (bits 1-16), one could do: | |
| >>> events = mne.find_events(...) # doctest:+SKIP | |
| >>> events[:, 2] &= (2**16 - 1) # doctest:+SKIP | |
| The above operation can be carried out directly in :func:`mne.find_events` | |
| using the ``mask`` and ``mask_type`` parameters (see | |
| :func:`mne.find_events` for more details). | |
| It is also possible to retrieve system codes, but no particular effort has | |
| been made to decode these in MNE. In case it is necessary, for instance to | |
| check the CMS bit, the following operation can be carried out: | |
| >>> cms_bit = 20 # doctest:+SKIP | |
| >>> cms_high = (events[:, 2] & (1 << cms_bit)) != 0 # doctest:+SKIP | |
| It is worth noting that in some special cases, it may be necessary to shift | |
| event values in order to retrieve correct event triggers. This depends on | |
| the triggering device used to perform the synchronization. For instance, in | |
| some files events need to be shifted by 8 bits: | |
| >>> events[:, 2] >>= 8 # doctest:+SKIP | |
| TAL channels called 'EDF Annotations' are parsed and | |
| extracted annotations are stored in raw.annotations. Use | |
| :func:`mne.events_from_annotations` to obtain events from these | |
| annotations. | |
| If channels named 'status' or 'trigger' are present, they are considered as | |
| STIM channels by default. Use func:`mne.find_events` to parse events | |
| encoded in such analog stim channels. | |
| """ | |
| def __init__( | |
| self, | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| stim_channel="auto", | |
| exclude=(), | |
| infer_types=False, | |
| preload=False, | |
| include=None, | |
| units=None, | |
| encoding="utf8", | |
| exclude_after_unique=False, | |
| *, | |
| verbose=None, | |
| ): | |
| if not _file_like(input_fname): | |
| logger.info(f"Extracting EDF parameters from {input_fname}...") | |
| input_fname = os.path.abspath(input_fname) | |
| info, edf_info, orig_units = _get_info( | |
| input_fname, | |
| stim_channel, | |
| eog, | |
| misc, | |
| exclude, | |
| infer_types, | |
| FileType.EDF, | |
| include, | |
| exclude_after_unique, | |
| ) | |
| logger.info("Creating raw.info structure...") | |
| edf_info["blob"] = input_fname if _file_like(input_fname) else None | |
| _validate_type(units, (str, None, dict), "units") | |
| if units is None: | |
| units = dict() | |
| elif isinstance(units, str): | |
| units = {ch_name: units for ch_name in info["ch_names"]} | |
| for k, (this_ch, this_unit) in enumerate(orig_units.items()): | |
| if this_ch not in units: | |
| continue | |
| if this_unit not in ("", units[this_ch]): | |
| raise ValueError( | |
| f"Unit for channel {this_ch} is present in the file as " | |
| f"{repr(this_unit)}, cannot overwrite it with the units " | |
| f"argument {repr(units[this_ch])}." | |
| ) | |
| if this_unit == "": | |
| orig_units[this_ch] = units[this_ch] | |
| ch_type = edf_info["ch_types"][k] | |
| scaling = _get_scaling(ch_type.lower(), orig_units[this_ch]) | |
| edf_info["units"][k] /= scaling | |
| # Raw attributes | |
| last_samps = [edf_info["nsamples"] - 1] | |
| super().__init__( | |
| info, | |
| preload, | |
| filenames=[_path_from_fname(input_fname)], | |
| raw_extras=[edf_info], | |
| last_samps=last_samps, | |
| orig_format="int", | |
| orig_units=orig_units, | |
| verbose=verbose, | |
| ) | |
| # Read annotations from file and set it | |
| if len(edf_info["tal_idx"]) > 0: | |
| # Read TAL data exploiting the header info (no regexp) | |
| idx = np.empty(0, int) | |
| tal_data = self._read_segment_file( | |
| np.empty((0, self.n_times)), | |
| idx, | |
| 0, | |
| 0, | |
| int(self.n_times), | |
| np.ones((len(idx), 1)), | |
| None, | |
| ) | |
| annotations = _read_annotations_edf( | |
| tal_data[0], | |
| ch_names=info["ch_names"], | |
| encoding=encoding, | |
| ) | |
| self.set_annotations(annotations, on_missing="warn") | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of raw data.""" | |
| return _read_segment_file( | |
| data, | |
| idx, | |
| fi, | |
| start, | |
| stop, | |
| self._raw_extras[fi], | |
| self.filenames[fi] | |
| if self._raw_extras[fi]["blob"] is None | |
| else self._raw_extras[fi]["blob"], | |
| cals, | |
| mult, | |
| ) | |
| def _path_from_fname(fname) -> Path | None: | |
| if isinstance(fname, str | Path): | |
| return Path(fname) | |
| # Try to get a filename from the file-like object | |
| try: | |
| return Path(fname.name) | |
| except Exception: | |
| return None | |
| class RawBDF(BaseRaw): | |
| """Raw object from BDF file. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | file-like | |
| Path to the BDF file. If a file-like object is provided, | |
| preloading must be used. | |
| .. versionchanged:: 1.10 | |
| Added support for file-like objects | |
| eog : list or tuple | |
| Names of channels or list of indices that should be designated EOG | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| misc : list or tuple | |
| Names of channels or list of indices that should be designated MISC | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| stim_channel : ``'auto'`` | str | list of str | int | list of int | |
| Defaults to ``'auto'``, which means that channels named ``'status'`` or | |
| ``'trigger'`` (case insensitive) are set to STIM. If str (or list of | |
| str), all channels matching the name(s) are set to STIM. If int (or | |
| list of ints), the channels corresponding to the indices are set to | |
| STIM. | |
| exclude : list of str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. | |
| infer_types : bool | |
| If True, try to infer channel types from channel labels. If a channel | |
| label starts with a known type (such as 'EEG') followed by a space and | |
| a name (such as 'Fp1'), the channel type will be set accordingly, and | |
| the channel will be renamed to the original label without the prefix. | |
| For unknown prefixes, the type will be 'EEG' and the name will not be | |
| modified. If False, do not infer types and assume all channels are of | |
| type 'EEG'. | |
| .. versionadded:: 0.24.1 | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| .. versionadded:: 1.1 | |
| %(preload)s | |
| %(units_edf_bdf_io)s | |
| %(encoding_edf)s | |
| %(exclude_after_unique)s | |
| %(verbose)s | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| mne.io.read_raw_bdf : Recommended way to read BDF files. | |
| Notes | |
| ----- | |
| %(edf_resamp_note)s | |
| Biosemi devices trigger codes are encoded in 16-bit format, whereas system | |
| codes (CMS in/out-of range, battery low, etc.) are coded in bits 16-23 of | |
| the status channel (see http://www.biosemi.com/faq/trigger_signals.htm). | |
| To retrieve correct event values (bits 1-16), one could do: | |
| >>> events = mne.find_events(...) # doctest:+SKIP | |
| >>> events[:, 2] &= (2**16 - 1) # doctest:+SKIP | |
| The above operation can be carried out directly in :func:`mne.find_events` | |
| using the ``mask`` and ``mask_type`` parameters (see | |
| :func:`mne.find_events` for more details). | |
| It is also possible to retrieve system codes, but no particular effort has | |
| been made to decode these in MNE. In case it is necessary, for instance to | |
| check the CMS bit, the following operation can be carried out: | |
| >>> cms_bit = 20 # doctest:+SKIP | |
| >>> cms_high = (events[:, 2] & (1 << cms_bit)) != 0 # doctest:+SKIP | |
| It is worth noting that in some special cases, it may be necessary to shift | |
| event values in order to retrieve correct event triggers. This depends on | |
| the triggering device used to perform the synchronization. For instance, in | |
| some files events need to be shifted by 8 bits: | |
| >>> events[:, 2] >>= 8 # doctest:+SKIP | |
| TAL channels called 'BDF Annotations' are parsed and | |
| extracted annotations are stored in raw.annotations. Use | |
| :func:`mne.events_from_annotations` to obtain events from these | |
| annotations. | |
| If channels named 'status' or 'trigger' are present, they are considered as | |
| STIM channels by default. Use func:`mne.find_events` to parse events | |
| encoded in such analog stim channels. | |
| """ | |
| def __init__( | |
| self, | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| stim_channel="auto", | |
| exclude=(), | |
| infer_types=False, | |
| preload=False, | |
| include=None, | |
| units=None, | |
| encoding="utf8", | |
| exclude_after_unique=False, | |
| *, | |
| verbose=None, | |
| ): | |
| if not _file_like(input_fname): | |
| logger.info(f"Extracting BDF parameters from {input_fname}...") | |
| input_fname = os.path.abspath(input_fname) | |
| info, edf_info, orig_units = _get_info( | |
| input_fname, | |
| stim_channel, | |
| eog, | |
| misc, | |
| exclude, | |
| infer_types, | |
| FileType.BDF, | |
| include, | |
| exclude_after_unique, | |
| ) | |
| logger.info("Creating raw.info structure...") | |
| edf_info["blob"] = input_fname if _file_like(input_fname) else None | |
| _validate_type(units, (str, None, dict), "units") | |
| if units is None: | |
| units = dict() | |
| elif isinstance(units, str): | |
| units = {ch_name: units for ch_name in info["ch_names"]} | |
| for k, (this_ch, this_unit) in enumerate(orig_units.items()): | |
| if this_ch not in units: | |
| continue | |
| if this_unit not in ("", units[this_ch]): | |
| raise ValueError( | |
| f"Unit for channel {this_ch} is present in the file as " | |
| f"{repr(this_unit)}, cannot overwrite it with the units " | |
| f"argument {repr(units[this_ch])}." | |
| ) | |
| if this_unit == "": | |
| orig_units[this_ch] = units[this_ch] | |
| ch_type = edf_info["ch_types"][k] | |
| scaling = _get_scaling(ch_type.lower(), orig_units[this_ch]) | |
| edf_info["units"][k] /= scaling | |
| # Raw attributes | |
| last_samps = [edf_info["nsamples"] - 1] | |
| super().__init__( | |
| info, | |
| preload, | |
| filenames=[_path_from_fname(input_fname)], | |
| raw_extras=[edf_info], | |
| last_samps=last_samps, | |
| orig_format="int", | |
| orig_units=orig_units, | |
| verbose=verbose, | |
| ) | |
| # Read annotations from file and set it | |
| if len(edf_info["tal_idx"]) > 0: | |
| # Read TAL data exploiting the header info (no regexp) | |
| idx = np.empty(0, int) | |
| tal_data = self._read_segment_file( | |
| np.empty((0, self.n_times)), | |
| idx, | |
| 0, | |
| 0, | |
| int(self.n_times), | |
| np.ones((len(idx), 1)), | |
| None, | |
| ) | |
| annotations = _read_annotations_edf( | |
| tal_data[0], | |
| ch_names=info["ch_names"], | |
| encoding=encoding, | |
| ) | |
| self.set_annotations(annotations, on_missing="warn") | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of raw data.""" | |
| return _read_segment_file( | |
| data, | |
| idx, | |
| fi, | |
| start, | |
| stop, | |
| self._raw_extras[fi], | |
| self.filenames[fi] | |
| if self._raw_extras[fi]["blob"] is None | |
| else self._raw_extras[fi]["blob"], | |
| cals, | |
| mult, | |
| ) | |
| class RawGDF(BaseRaw): | |
| """Raw object from GDF file. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | file-like | |
| Path to the GDF file. If a file-like object is provided, | |
| preloading must be used. | |
| .. versionchanged:: 1.10 | |
| Added support for file-like objects | |
| eog : list or tuple | |
| Names of channels or list of indices that should be designated EOG | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| misc : list or tuple | |
| Names of channels or list of indices that should be designated MISC | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| stim_channel : ``'auto'`` | str | list of str | int | list of int | |
| Defaults to 'auto', which means that channels named 'status' or | |
| 'trigger' (case insensitive) are set to STIM. If str (or list of str), | |
| all channels matching the name(s) are set to STIM. If int (or list of | |
| ints), channels corresponding to the indices are set to STIM. | |
| exclude : list of str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. | |
| .. versionadded:: 0.24.1 | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| .. versionadded:: 1.1 | |
| %(preload)s | |
| %(verbose)s | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| mne.io.read_raw_gdf : Recommended way to read GDF files. | |
| Notes | |
| ----- | |
| If channels named 'status' or 'trigger' are present, they are considered as | |
| STIM channels by default. Use func:`mne.find_events` to parse events | |
| encoded in such analog stim channels. | |
| """ | |
| def __init__( | |
| self, | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| stim_channel="auto", | |
| exclude=(), | |
| preload=False, | |
| include=None, | |
| verbose=None, | |
| ): | |
| if not _file_like(input_fname): | |
| logger.info(f"Extracting GDF parameters from {input_fname}...") | |
| input_fname = os.path.abspath(input_fname) | |
| info, edf_info, orig_units = _get_info( | |
| input_fname, | |
| stim_channel, | |
| eog, | |
| misc, | |
| exclude, | |
| True, | |
| FileType.GDF, | |
| include, | |
| ) | |
| logger.info("Creating raw.info structure...") | |
| edf_info["blob"] = input_fname if _file_like(input_fname) else None | |
| # Raw attributes | |
| last_samps = [edf_info["nsamples"] - 1] | |
| super().__init__( | |
| info, | |
| preload, | |
| filenames=[_path_from_fname(input_fname)], | |
| raw_extras=[edf_info], | |
| last_samps=last_samps, | |
| orig_format="int", | |
| orig_units=orig_units, | |
| verbose=verbose, | |
| ) | |
| # Read annotations from file and set it | |
| onset, duration, desc = _get_annotations_gdf(edf_info, self.info["sfreq"]) | |
| self.set_annotations( | |
| Annotations( | |
| onset=onset, duration=duration, description=desc, orig_time=None | |
| ) | |
| ) | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of raw data.""" | |
| return _read_segment_file( | |
| data, | |
| idx, | |
| fi, | |
| start, | |
| stop, | |
| self._raw_extras[fi], | |
| self.filenames[fi] | |
| if self._raw_extras[fi]["blob"] is None | |
| else self._raw_extras[fi]["blob"], | |
| cals, | |
| mult, | |
| ) | |
| def _read_ch(fid, subtype, samp, dtype_byte, dtype=None): | |
| """Read a number of samples for a single channel.""" | |
| # BDF | |
| if subtype == "bdf": | |
| ch_data = read_from_file_or_buffer(fid, dtype=dtype, count=samp * dtype_byte) | |
| ch_data = ch_data.reshape(-1, 3).astype(INT32) | |
| ch_data = (ch_data[:, 0]) + (ch_data[:, 1] << 8) + (ch_data[:, 2] << 16) | |
| # 24th bit determines the sign | |
| ch_data[ch_data >= (1 << 23)] -= 1 << 24 | |
| # GDF data and EDF data | |
| else: | |
| ch_data = read_from_file_or_buffer(fid, dtype=dtype, count=samp) | |
| return ch_data | |
| def _read_segment_file(data, idx, fi, start, stop, raw_extras, filenames, cals, mult): | |
| """Read a chunk of raw data.""" | |
| n_samps = raw_extras["n_samps"] | |
| buf_len = int(raw_extras["max_samp"]) | |
| dtype = raw_extras["dtype_np"] | |
| dtype_byte = raw_extras["dtype_byte"] | |
| data_offset = raw_extras["data_offset"] | |
| stim_channel_idxs = raw_extras["stim_channel_idxs"] | |
| orig_sel = raw_extras["sel"] | |
| tal_idx = raw_extras.get("tal_idx", np.empty(0, int)) | |
| subtype = raw_extras["subtype"] | |
| cal = raw_extras["cal"] | |
| offsets = raw_extras["offsets"] | |
| gains = raw_extras["units"] | |
| read_sel = np.concatenate([orig_sel[idx], tal_idx]) | |
| tal_data = [] | |
| # only try to read the stim channel if it's not None and it's | |
| # actually one of the requested channels | |
| idx_arr = np.arange(idx.start, idx.stop) if isinstance(idx, slice) else idx | |
| # We could read this one EDF block at a time, which would be this: | |
| ch_offsets = np.cumsum(np.concatenate([[0], n_samps]), dtype=np.int64) | |
| block_start_idx, r_lims, _ = _blk_read_lims(start, stop, buf_len) | |
| # But to speed it up, we really need to read multiple blocks at once, | |
| # Otherwise we can end up with e.g. 18,181 chunks for a 20 MB file! | |
| # Let's do ~10 MB chunks: | |
| n_per = max(10 * 1024 * 1024 // (ch_offsets[-1] * dtype_byte), 1) | |
| with _gdf_edf_get_fid(filenames, buffering=0) as fid: | |
| # Extract data | |
| start_offset = data_offset + block_start_idx * ch_offsets[-1] * dtype_byte | |
| # first read everything into the `ones` array. For channels with | |
| # lower sampling frequency, there will be zeros left at the end of the | |
| # row. Ignore TAL/annotations channel and only store `orig_sel` | |
| ones = np.zeros((len(orig_sel), data.shape[-1]), dtype=data.dtype) | |
| # save how many samples have already been read per channel | |
| n_smp_read = [0 for _ in range(len(orig_sel))] | |
| # read data in chunks | |
| for ai in range(0, len(r_lims), n_per): | |
| block_offset = ai * ch_offsets[-1] * dtype_byte | |
| n_read = min(len(r_lims) - ai, n_per) | |
| fid.seek(start_offset + block_offset, 0) | |
| # Read and reshape to (n_chunks_read, ch0_ch1_ch2_ch3...) | |
| many_chunk = _read_ch( | |
| fid, subtype, ch_offsets[-1] * n_read, dtype_byte, dtype | |
| ).reshape(n_read, -1) | |
| r_sidx = r_lims[ai][0] | |
| r_eidx = buf_len * (n_read - 1) + r_lims[ai + n_read - 1][1] | |
| # loop over selected channels, ci=channel selection | |
| for ii, ci in enumerate(read_sel): | |
| # This now has size (n_chunks_read, n_samp[ci]) | |
| ch_data = many_chunk[:, ch_offsets[ci] : ch_offsets[ci + 1]].copy() | |
| # annotation channel has to be treated separately | |
| if ci in tal_idx: | |
| tal_data.append(ch_data) | |
| continue | |
| orig_idx = idx_arr[ii] | |
| ch_data = ch_data * cal[orig_idx] | |
| ch_data += offsets[orig_idx] | |
| ch_data *= gains[orig_idx] | |
| assert ci == orig_sel[orig_idx] | |
| if n_samps[ci] != buf_len: | |
| if orig_idx in stim_channel_idxs: | |
| # Stim channel will be interpolated | |
| old = np.linspace(0, 1, n_samps[ci] + 1, True) | |
| new = np.linspace(0, 1, buf_len, False) | |
| ch_data = np.append(ch_data, np.zeros((len(ch_data), 1)), -1) | |
| ch_data = interp1d(old, ch_data, kind="zero", axis=-1)(new) | |
| elif orig_idx in stim_channel_idxs: | |
| ch_data = np.bitwise_and(ch_data.astype(int), 2**17 - 1) | |
| one_i = ch_data.ravel()[r_sidx:r_eidx] | |
| # note how many samples have been read | |
| smp_read = n_smp_read[orig_idx] | |
| ones[orig_idx, smp_read : smp_read + len(one_i)] = one_i | |
| n_smp_read[orig_idx] += len(one_i) | |
| # resample channels with lower sample frequency | |
| # skip if no data was requested, ie. only annotations were read | |
| if any(n_smp_read) > 0: | |
| # expected number of samples, equals maximum sfreq | |
| smp_exp = data.shape[-1] | |
| # resample data after loading all chunks to prevent edge artifacts | |
| resampled = False | |
| for i, smp_read in enumerate(n_smp_read): | |
| # nothing read, nothing to resample | |
| if smp_read == 0: | |
| continue | |
| # upsample if n_samples is lower than from highest sfreq | |
| if smp_read != smp_exp: | |
| # sanity check that we read exactly how much we expected | |
| assert (ones[i, smp_read:] == 0).all() | |
| ones[i, :] = resample( | |
| ones[i, :smp_read].astype(np.float64), | |
| smp_exp, | |
| smp_read, | |
| npad=0, | |
| axis=-1, | |
| ) | |
| resampled = True | |
| # give warning if we resampled a subselection | |
| if resampled and raw_extras["nsamples"] != (stop - start): | |
| warn( | |
| "Loading an EDF with mixed sampling frequencies and " | |
| "preload=False will result in edge artifacts. " | |
| "It is recommended to use preload=True." | |
| "See also https://github.com/mne-tools/mne-python/issues/10635" | |
| ) | |
| _mult_cal_one(data[:, :], ones, idx, cals, mult) | |
| if len(tal_data) > 1: | |
| tal_data = np.concatenate([tal.ravel() for tal in tal_data]) | |
| tal_data = tal_data[np.newaxis, :] | |
| return tal_data | |
| def _read_header( | |
| fname, | |
| exclude, | |
| infer_types, | |
| file_type, | |
| include=None, | |
| exclude_after_unique=False, | |
| ): | |
| """Unify EDF, BDF and GDF _read_header call. | |
| Parameters | |
| ---------- | |
| fname : str | |
| Path to the EDF+, BDF, or GDF file or file-like object. | |
| exclude : list of str | str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. A str is | |
| interpreted as a regular expression. | |
| infer_types : bool | |
| If True, try to infer channel types from channel labels. If a channel | |
| label starts with a known type (such as 'EEG') followed by a space and | |
| a name (such as 'Fp1'), the channel type will be set accordingly, and | |
| the channel will be renamed to the original label without the prefix. | |
| For unknown prefixes, the type will be 'EEG' and the name will not be | |
| modified. If False, do not infer types and assume all channels are of | |
| type 'EEG'. | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| %(exclude_after_unique)s | |
| Returns | |
| ------- | |
| (edf_info, orig_units) : tuple | |
| """ | |
| if file_type in (FileType.BDF, FileType.EDF): | |
| return _read_edf_header( | |
| fname, | |
| exclude, | |
| infer_types, | |
| file_type, | |
| include, | |
| exclude_after_unique, | |
| ) | |
| elif file_type == FileType.GDF: | |
| return _read_gdf_header(fname, exclude, include), None | |
| else: | |
| raise NotImplementedError("Only GDF, EDF, and BDF files are supported.") | |
| def _get_info( | |
| fname, | |
| stim_channel, | |
| eog, | |
| misc, | |
| exclude, | |
| infer_types, | |
| file_type, | |
| include=None, | |
| exclude_after_unique=False, | |
| ): | |
| """Extract information from EDF+, BDF or GDF file.""" | |
| eog = eog if eog is not None else [] | |
| misc = misc if misc is not None else [] | |
| edf_info, orig_units = _read_header( | |
| fname, exclude, infer_types, file_type, include, exclude_after_unique | |
| ) | |
| # XXX: `tal_ch_names` to pass to `_check_stim_channel` should be computed | |
| # from `edf_info['ch_names']` and `edf_info['tal_idx']` but 'tal_idx' | |
| # contains stim channels that are not TAL. | |
| stim_channel_idxs, _ = _check_stim_channel(stim_channel, edf_info["ch_names"]) | |
| sel = edf_info["sel"] # selection of channels not excluded | |
| ch_names = edf_info["ch_names"] # of length len(sel) | |
| if "ch_types" in edf_info: | |
| ch_types = edf_info["ch_types"] # of length len(sel) | |
| else: | |
| ch_types = [None] * len(sel) | |
| if len(sel) == 0: # only want stim channels | |
| n_samps = edf_info["n_samps"][[0]] | |
| else: | |
| n_samps = edf_info["n_samps"][sel] | |
| nchan = edf_info["nchan"] | |
| physical_ranges = edf_info["physical_max"] - edf_info["physical_min"] | |
| cals = edf_info["digital_max"] - edf_info["digital_min"] | |
| bad_idx = np.where((~np.isfinite(cals)) | (cals == 0))[0] | |
| if len(bad_idx) > 0: | |
| warn( | |
| "Scaling factor is not defined in following channels:\n" | |
| + ", ".join(ch_names[i] for i in bad_idx) | |
| ) | |
| cals[bad_idx] = 1 | |
| bad_idx = np.where(physical_ranges == 0)[0] | |
| if len(bad_idx) > 0: | |
| warn( | |
| "Physical range is not defined in following channels:\n" | |
| + ", ".join(ch_names[i] for i in bad_idx) | |
| ) | |
| physical_ranges[bad_idx] = 1 | |
| # Creates a list of dicts of eeg channels for raw.info | |
| logger.info("Setting channel info structure...") | |
| chs = list() | |
| pick_mask = np.ones(len(ch_names)) | |
| chs_without_types = list() | |
| for idx, ch_name in enumerate(ch_names): | |
| chan_info = {} | |
| chan_info["cal"] = 1.0 | |
| chan_info["logno"] = idx + 1 | |
| chan_info["scanno"] = idx + 1 | |
| chan_info["range"] = 1.0 | |
| chan_info["unit_mul"] = FIFF.FIFF_UNITM_NONE | |
| chan_info["ch_name"] = ch_name | |
| chan_info["unit"] = FIFF.FIFF_UNIT_V | |
| chan_info["coord_frame"] = FIFF.FIFFV_COORD_HEAD | |
| chan_info["coil_type"] = FIFF.FIFFV_COIL_EEG | |
| chan_info["kind"] = FIFF.FIFFV_EEG_CH | |
| # montage can't be stored in EDF so channel locs are unknown: | |
| chan_info["loc"] = np.full(12, np.nan) | |
| # if the edf info contained channel type information | |
| # set it now | |
| ch_type = ch_types[idx] | |
| if ch_type is not None and ch_type in CH_TYPE_MAPPING: | |
| chan_info["kind"] = CH_TYPE_MAPPING.get(ch_type) | |
| if ch_type not in ["EEG", "ECOG", "SEEG", "DBS"]: | |
| chan_info["coil_type"] = FIFF.FIFFV_COIL_NONE | |
| pick_mask[idx] = False | |
| # if user passes in explicit mapping for eog, misc and stim | |
| # channels set them here | |
| if ch_name in eog or idx in eog or idx - nchan in eog: | |
| chan_info["coil_type"] = FIFF.FIFFV_COIL_NONE | |
| chan_info["kind"] = FIFF.FIFFV_EOG_CH | |
| pick_mask[idx] = False | |
| elif ch_name in misc or idx in misc or idx - nchan in misc: | |
| chan_info["coil_type"] = FIFF.FIFFV_COIL_NONE | |
| chan_info["kind"] = FIFF.FIFFV_MISC_CH | |
| pick_mask[idx] = False | |
| elif idx in stim_channel_idxs: | |
| chan_info["coil_type"] = FIFF.FIFFV_COIL_NONE | |
| chan_info["unit"] = FIFF.FIFF_UNIT_NONE | |
| chan_info["kind"] = FIFF.FIFFV_STIM_CH | |
| pick_mask[idx] = False | |
| chan_info["ch_name"] = ch_name | |
| ch_names[idx] = chan_info["ch_name"] | |
| edf_info["units"][idx] = 1 | |
| elif ch_type not in CH_TYPE_MAPPING: | |
| chs_without_types.append(ch_name) | |
| chs.append(chan_info) | |
| # warn if channel type was not inferable | |
| if len(chs_without_types): | |
| msg = ( | |
| "Could not determine channel type of the following channels, " | |
| f"they will be set as EEG:\n{', '.join(chs_without_types)}" | |
| ) | |
| logger.info(msg) | |
| edf_info["stim_channel_idxs"] = stim_channel_idxs | |
| if any(pick_mask): | |
| picks = [item for item, mask in zip(range(nchan), pick_mask) if mask] | |
| edf_info["max_samp"] = max_samp = n_samps[picks].max() | |
| else: | |
| edf_info["max_samp"] = max_samp = n_samps.max() | |
| # Info structure | |
| # ------------------------------------------------------------------------- | |
| not_stim_ch = [x for x in range(n_samps.shape[0]) if x not in stim_channel_idxs] | |
| if len(not_stim_ch) == 0: # only loading stim channels | |
| not_stim_ch = list(range(len(n_samps))) | |
| sfreq = ( | |
| np.take(n_samps, not_stim_ch).max() | |
| * edf_info["record_length"][1] | |
| / edf_info["record_length"][0] | |
| ) | |
| del n_samps | |
| info = _empty_info(sfreq) | |
| info["meas_date"] = edf_info["meas_date"] | |
| info["chs"] = chs | |
| info["ch_names"] = ch_names | |
| # Subject information | |
| info["subject_info"] = {} | |
| # String subject identifier | |
| if edf_info["subject_info"].get("id") is not None: | |
| info["subject_info"]["his_id"] = edf_info["subject_info"]["id"] | |
| # Subject sex (0=unknown, 1=male, 2=female) | |
| if edf_info["subject_info"].get("sex") is not None: | |
| if edf_info["subject_info"]["sex"] == "M": | |
| info["subject_info"]["sex"] = 1 | |
| elif edf_info["subject_info"]["sex"] == "F": | |
| info["subject_info"]["sex"] = 2 | |
| else: | |
| info["subject_info"]["sex"] = 0 | |
| # Subject names (first, middle, last). | |
| if edf_info["subject_info"].get("name") is not None: | |
| sub_names = edf_info["subject_info"]["name"].split("_") | |
| if len(sub_names) < 2 or len(sub_names) > 3: | |
| info["subject_info"]["last_name"] = edf_info["subject_info"]["name"] | |
| elif len(sub_names) == 2: | |
| info["subject_info"]["first_name"] = sub_names[0] | |
| info["subject_info"]["last_name"] = sub_names[1] | |
| else: | |
| info["subject_info"]["first_name"] = sub_names[0] | |
| info["subject_info"]["middle_name"] = sub_names[1] | |
| info["subject_info"]["last_name"] = sub_names[2] | |
| # Birthday in (year, month, day) format. | |
| if isinstance(edf_info["subject_info"].get("birthday"), datetime): | |
| info["subject_info"]["birthday"] = date( | |
| edf_info["subject_info"]["birthday"].year, | |
| edf_info["subject_info"]["birthday"].month, | |
| edf_info["subject_info"]["birthday"].day, | |
| ) | |
| # Handedness (1=right, 2=left, 3=ambidextrous). | |
| if edf_info["subject_info"].get("hand") is not None: | |
| info["subject_info"]["hand"] = int(edf_info["subject_info"]["hand"]) | |
| # Height in meters. | |
| if edf_info["subject_info"].get("height") is not None: | |
| info["subject_info"]["height"] = float(edf_info["subject_info"]["height"]) | |
| # Weight in kilograms. | |
| if edf_info["subject_info"].get("weight") is not None: | |
| info["subject_info"]["weight"] = float(edf_info["subject_info"]["weight"]) | |
| # Remove values after conversion to help with in-memory anonymization | |
| for key in ("subject_info", "meas_date"): | |
| del edf_info[key] | |
| # Filter settings | |
| if filt_ch_idxs := [x for x in range(len(sel)) if x not in stim_channel_idxs]: | |
| _set_prefilter(info, edf_info, filt_ch_idxs, "highpass") | |
| _set_prefilter(info, edf_info, filt_ch_idxs, "lowpass") | |
| if np.isnan(info["lowpass"]): | |
| info["lowpass"] = info["sfreq"] / 2.0 | |
| if info["highpass"] > info["lowpass"]: | |
| warn( | |
| f"Highpass cutoff frequency {info['highpass']} is greater " | |
| f"than lowpass cutoff frequency {info['lowpass']}, " | |
| "setting values to 0 and Nyquist." | |
| ) | |
| info["highpass"] = 0.0 | |
| info["lowpass"] = info["sfreq"] / 2.0 | |
| # Some keys to be consistent with FIF measurement info | |
| info["description"] = None | |
| edf_info["nsamples"] = int(edf_info["n_records"] * max_samp) | |
| info._unlocked = False | |
| info._update_redundant() | |
| # Later used for reading | |
| edf_info["cal"] = physical_ranges / cals | |
| # physical dimension in µV | |
| edf_info["offsets"] = ( | |
| edf_info["physical_min"] - edf_info["digital_min"] * edf_info["cal"] | |
| ) | |
| del edf_info["physical_min"] | |
| del edf_info["digital_min"] | |
| if edf_info["subtype"] == "bdf": | |
| edf_info["cal"][stim_channel_idxs] = 1 | |
| edf_info["offsets"][stim_channel_idxs] = 0 | |
| edf_info["units"][stim_channel_idxs] = 1 | |
| return info, edf_info, orig_units | |
| def _parse_prefilter_string(prefiltering): | |
| """Parse prefilter string from EDF+ and BDF headers.""" | |
| filter_types = ["HP", "LP"] | |
| filter_strings = {t: [] for t in filter_types} | |
| for filt in prefiltering: | |
| for t in filter_types: | |
| matches = re.findall(rf"{t}:\s*([a-zA-Z0-9,.]+)(Hz)?", filt) | |
| value = "" | |
| for match in matches: | |
| if match[0]: | |
| value = match[0].replace("Hz", "").replace(",", ".") | |
| filter_strings[t].append(value) | |
| return np.array(filter_strings["HP"]), np.array(filter_strings["LP"]) | |
| def _prefilter_float(filt): | |
| if isinstance(filt, int | float | np.number): | |
| return filt | |
| if filt == "DC": | |
| return 0.0 | |
| if filt.replace(".", "", 1).isdigit(): | |
| return float(filt) | |
| return np.nan | |
| def _set_prefilter(info, edf_info, ch_idxs, key): | |
| value = 0 | |
| if len(values := edf_info.get(key, [])): | |
| values = [x for i, x in enumerate(values) if i in ch_idxs] | |
| if len(np.unique(values)) > 1: | |
| warn( | |
| f"Channels contain different {key} filters. " | |
| f"{'Highest' if key == 'highpass' else 'Lowest'} filter " | |
| "setting will be stored." | |
| ) | |
| if key == "highpass": | |
| value = np.nanmax([_prefilter_float(x) for x in values]) | |
| else: | |
| value = np.nanmin([_prefilter_float(x) for x in values]) | |
| else: | |
| value = _prefilter_float(values[0]) | |
| if not np.isnan(value) and value != 0: | |
| info[key] = value | |
| def _edf_str(x): | |
| return x.decode("latin-1").split("\x00")[0] | |
| def _edf_str_num(x): | |
| return _edf_str(x).replace(",", ".") | |
| def _read_edf_header( | |
| fname, | |
| exclude, | |
| infer_types, | |
| file_type, | |
| include=None, | |
| exclude_after_unique=False, | |
| ): | |
| """Read header information from EDF+ or BDF file.""" | |
| edf_info = {"events": []} | |
| with _gdf_edf_get_fid(fname) as fid: | |
| fid.read(8) # version (unused here) | |
| # patient ID | |
| patient = {} | |
| id_info = fid.read(80).decode("latin-1").rstrip() | |
| id_info = id_info.split(" ") | |
| if len(id_info): | |
| patient["id"] = id_info[0] | |
| if len(id_info) >= 4: | |
| try: | |
| birthdate = datetime.strptime(id_info[2], "%d-%b-%Y") | |
| except ValueError: | |
| birthdate = "X" | |
| patient["sex"] = id_info[1] | |
| patient["birthday"] = birthdate | |
| patient["name"] = id_info[3] | |
| if len(id_info) > 4: | |
| for info in id_info[4:]: | |
| if "=" in info: | |
| key, value = info.split("=") | |
| err = f"patient {key} info cannot be {value}, skipping." | |
| if key in ["weight", "height"]: | |
| try: | |
| patient[key] = float(value) | |
| except ValueError: | |
| logger.debug(err) | |
| continue | |
| elif key in ["hand"]: | |
| try: | |
| patient[key] = int(value) | |
| except ValueError: | |
| logger.debug(err) | |
| continue | |
| else: | |
| warn(f"Invalid patient information {key}") | |
| # Recording ID | |
| rec_info = fid.read(80).decode("latin-1").rstrip().split(" ") | |
| # if the measurement date is available in the recording info, it's used instead | |
| # of the file's meas_date since it contains all 4 digits of the year. | |
| meas_date = None | |
| if len(rec_info) == 5: | |
| try: | |
| meas_date = datetime.strptime(rec_info[1], "%d-%b-%Y") | |
| except Exception: | |
| meas_date = None | |
| else: | |
| fid.read(8) # skip the file's meas_date | |
| if meas_date is None: | |
| try: | |
| meas_date = fid.read(8).decode("latin-1") | |
| day, month, year = (int(x) for x in meas_date.split(".")) | |
| year = year + 2000 if year < 85 else year + 1900 | |
| meas_date = datetime(year, month, day) | |
| except Exception: | |
| meas_date = None | |
| if meas_date is not None: | |
| # try to get the hour/minute/sec from the recording info | |
| try: | |
| meas_time = fid.read(8).decode("latin-1") | |
| hour, minute, second = (int(x) for x in meas_time.split(".")) | |
| except Exception: | |
| hour, minute, second = 0, 0, 0 | |
| meas_date = meas_date.replace( | |
| hour=hour, minute=minute, second=second, tzinfo=timezone.utc | |
| ) | |
| else: | |
| fid.read(8) # skip the file's measurement time | |
| warn("Invalid measurement date encountered in the header.") | |
| try: | |
| header_nbytes = int(_edf_str(fid.read(8))) | |
| except ValueError: | |
| raise ValueError( | |
| f"Bad {'EDF' if file_type == FileType.EDF else 'BDF'} file provided." | |
| ) | |
| # The following 44 bytes sometimes identify the file type, but this is | |
| # not guaranteed. Therefore, we skip this field and use the file_type | |
| # to determine the subtype (EDF or BDF, which differ in the | |
| # number of bytes they use for the data records; EDF uses 2 bytes | |
| # whereas BDF uses 3 bytes). | |
| fid.read(44) | |
| subtype = file_type | |
| n_records = int(_edf_str(fid.read(8))) | |
| record_length = float(_edf_str(fid.read(8))) | |
| record_length = np.array([record_length, 1.0]) # in seconds | |
| if record_length[0] == 0: | |
| record_length[0] = 1.0 | |
| warn( | |
| "Header information is incorrect for record length. Default " | |
| "record length set to 1.\nIt is possible that this file only" | |
| " contains annotations and no signals. In that case, please " | |
| "use mne.read_annotations() to load these annotations." | |
| ) | |
| nchan = int(_edf_str(fid.read(4))) | |
| channels = list(range(nchan)) | |
| # read in 16 byte labels and strip any extra spaces at the end | |
| ch_labels = [fid.read(16).strip().decode("latin-1") for _ in channels] | |
| # get channel names and optionally channel type | |
| # EDF specification contains 16 bytes that encode channel names, | |
| # optionally prefixed by a string representing channel type separated | |
| # by a space | |
| if infer_types: | |
| ch_types, ch_names = [], [] | |
| for ch_label in ch_labels: | |
| ch_type, ch_name = "EEG", ch_label # default to EEG | |
| parts = ch_label.split(" ") | |
| if len(parts) > 1: | |
| if parts[0].upper() in CH_TYPE_MAPPING: | |
| ch_type = parts[0].upper() | |
| ch_name = " ".join(parts[1:]) | |
| logger.info( | |
| f"Channel '{ch_label}' recognized as type " | |
| f"{ch_type} (renamed to '{ch_name}')." | |
| ) | |
| ch_types.append(ch_type) | |
| ch_names.append(ch_name) | |
| else: | |
| ch_types, ch_names = ["EEG"] * nchan, ch_labels | |
| tal_idx = _find_tal_idx(ch_names) | |
| if exclude_after_unique: | |
| # make sure channel names are unique | |
| ch_names = _unique_channel_names(ch_names) | |
| exclude = _find_exclude_idx(ch_names, exclude, include) | |
| exclude = np.concatenate([exclude, tal_idx]) | |
| sel = np.setdiff1d(np.arange(len(ch_names)), exclude) | |
| for ch in channels: | |
| fid.read(80) # transducer | |
| units = [fid.read(8).strip().decode("latin-1") for ch in channels] | |
| edf_info["units"] = list() | |
| for i, unit in enumerate(units): | |
| if i in exclude: | |
| continue | |
| # allow μ (greek mu), µ (micro symbol) and μ (sjis mu) codepoints | |
| if unit in ("\u03bcV", "\u00b5V", "\x83\xcaV", "uV"): | |
| edf_info["units"].append(1e-6) | |
| elif unit == "mV": | |
| edf_info["units"].append(1e-3) | |
| else: | |
| edf_info["units"].append(1) | |
| edf_info["units"] = np.array(edf_info["units"], float) | |
| ch_names = [ch_names[idx] for idx in sel] | |
| ch_types = [ch_types[idx] for idx in sel] | |
| units = [units[idx] for idx in sel] | |
| if not exclude_after_unique: | |
| # make sure channel names are unique | |
| ch_names = _unique_channel_names(ch_names) | |
| orig_units = dict(zip(ch_names, units)) | |
| physical_min = np.array([float(_edf_str_num(fid.read(8))) for ch in channels])[ | |
| sel | |
| ] | |
| physical_max = np.array([float(_edf_str_num(fid.read(8))) for ch in channels])[ | |
| sel | |
| ] | |
| digital_min = np.array([float(_edf_str_num(fid.read(8))) for ch in channels])[ | |
| sel | |
| ] | |
| digital_max = np.array([float(_edf_str_num(fid.read(8))) for ch in channels])[ | |
| sel | |
| ] | |
| prefiltering = np.array([_edf_str(fid.read(80)).strip() for ch in channels]) | |
| highpass, lowpass = _parse_prefilter_string(prefiltering) | |
| # number of samples per record | |
| n_samps = np.array([int(_edf_str(fid.read(8))) for ch in channels]) | |
| # Populate edf_info | |
| edf_info.update( | |
| ch_names=ch_names, | |
| ch_types=ch_types, | |
| data_offset=header_nbytes, | |
| digital_max=digital_max, | |
| digital_min=digital_min, | |
| highpass=highpass, | |
| sel=sel, | |
| lowpass=lowpass, | |
| meas_date=meas_date, | |
| n_records=n_records, | |
| n_samps=n_samps, | |
| nchan=nchan, | |
| subject_info=patient, | |
| physical_max=physical_max, | |
| physical_min=physical_min, | |
| record_length=record_length, | |
| subtype="bdf" if subtype == FileType.BDF else "edf", | |
| tal_idx=tal_idx, | |
| ) | |
| fid.read(32 * nchan).decode() # reserved | |
| assert fid.tell() == header_nbytes | |
| fid.seek(0, 2) | |
| n_bytes = fid.tell() | |
| n_data_bytes = n_bytes - header_nbytes | |
| total_samps = ( | |
| n_data_bytes // 3 if subtype == FileType.BDF else n_data_bytes // 2 | |
| ) | |
| read_records = total_samps // np.sum(n_samps) | |
| if n_records != read_records: | |
| warn( | |
| "Number of records from the header does not match the file " | |
| "size (perhaps the recording was not stopped before exiting)." | |
| " Inferring from the file size." | |
| ) | |
| edf_info["n_records"] = read_records | |
| del n_records | |
| if subtype == FileType.BDF: | |
| edf_info["dtype_byte"] = 3 # 24-bit (3 byte) integers | |
| edf_info["dtype_np"] = UINT8 | |
| else: | |
| edf_info["dtype_byte"] = 2 # 16-bit (2 byte) integers | |
| edf_info["dtype_np"] = INT16 | |
| return edf_info, orig_units | |
| INT8 = "<i1" | |
| UINT8 = "<u1" | |
| INT16 = "<i2" | |
| UINT16 = "<u2" | |
| INT32 = "<i4" | |
| UINT32 = "<u4" | |
| INT64 = "<i8" | |
| UINT64 = "<u8" | |
| FLOAT32 = "<f4" | |
| FLOAT64 = "<f8" | |
| GDFTYPE_NP = ( | |
| None, | |
| INT8, | |
| UINT8, | |
| INT16, | |
| UINT16, | |
| INT32, | |
| UINT32, | |
| INT64, | |
| UINT64, | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| FLOAT32, | |
| FLOAT64, | |
| ) | |
| GDFTYPE_BYTE = tuple(np.dtype(x).itemsize if x is not None else 0 for x in GDFTYPE_NP) | |
| def _check_dtype_byte(types): | |
| assert sum(GDFTYPE_BYTE) == 42 | |
| dtype_byte = [GDFTYPE_BYTE[t] for t in types] | |
| dtype_np = [GDFTYPE_NP[t] for t in types] | |
| if len(np.unique(dtype_byte)) > 1: | |
| # We will not read it properly, so this should be an error | |
| raise RuntimeError("Reading multiple data types not supported") | |
| return dtype_np[0], dtype_byte[0] | |
| def _read_gdf_header(fname, exclude, include=None): | |
| """Read GDF 1.x and GDF 2.x header info.""" | |
| edf_info = dict() | |
| events = None | |
| with _gdf_edf_get_fid(fname) as fid: | |
| try: | |
| version = fid.read(8).decode() | |
| edf_info["type"] = edf_info["subtype"] = version[:3] | |
| edf_info["number"] = float(version[4:]) | |
| except ValueError: | |
| raise ValueError("Bad GDF file provided.") | |
| meas_date = None | |
| # GDF 1.x | |
| # --------------------------------------------------------------------- | |
| if edf_info["number"] < 1.9: | |
| # patient ID | |
| pid = fid.read(80).decode("latin-1") | |
| pid = pid.split(" ", 2) | |
| patient = {} | |
| if len(pid) >= 2: | |
| patient["id"] = pid[0] | |
| patient["name"] = pid[1] | |
| # Recording ID | |
| meas_id = {} | |
| meas_id["recording_id"] = _edf_str(fid.read(80)).strip() | |
| # date | |
| tm = _edf_str(fid.read(16)).strip() | |
| try: | |
| if tm[14:16] == " ": | |
| tm = tm[:14] + "00" + tm[16:] | |
| meas_date = datetime( | |
| int(tm[0:4]), | |
| int(tm[4:6]), | |
| int(tm[6:8]), | |
| int(tm[8:10]), | |
| int(tm[10:12]), | |
| int(tm[12:14]), | |
| int(tm[14:16]) * pow(10, 4), | |
| tzinfo=timezone.utc, | |
| ) | |
| except Exception: | |
| pass | |
| header_nbytes = read_from_file_or_buffer(fid, INT64, 1)[0] | |
| meas_id["equipment"] = read_from_file_or_buffer(fid, UINT8, 8)[0] | |
| meas_id["hospital"] = read_from_file_or_buffer(fid, UINT8, 8)[0] | |
| meas_id["technician"] = read_from_file_or_buffer(fid, UINT8, 8)[0] | |
| fid.seek(20, 1) # 20bytes reserved | |
| n_records = read_from_file_or_buffer(fid, INT64, 1)[0] | |
| # record length in seconds | |
| record_length = read_from_file_or_buffer(fid, UINT32, 2) | |
| if record_length[0] == 0: | |
| record_length[0] = 1.0 | |
| warn( | |
| "Header information is incorrect for record length. " | |
| "Default record length set to 1." | |
| ) | |
| nchan = int(read_from_file_or_buffer(fid, UINT32, 1)[0]) | |
| channels = list(range(nchan)) | |
| ch_names = [_edf_str(fid.read(16)).strip() for ch in channels] | |
| exclude = _find_exclude_idx(ch_names, exclude, include) | |
| sel = np.setdiff1d(np.arange(len(ch_names)), exclude) | |
| fid.seek(80 * len(channels), 1) # transducer | |
| units = [_edf_str(fid.read(8)).strip() for ch in channels] | |
| edf_info["units"] = list() | |
| for i, unit in enumerate(units): | |
| if i in exclude: | |
| continue | |
| if unit[:2] == "uV": | |
| edf_info["units"].append(1e-6) | |
| else: | |
| edf_info["units"].append(1) | |
| edf_info["units"] = np.array(edf_info["units"], float) | |
| ch_names = [ch_names[idx] for idx in sel] | |
| physical_min = read_from_file_or_buffer(fid, FLOAT64, len(channels)) | |
| physical_max = read_from_file_or_buffer(fid, FLOAT64, len(channels)) | |
| digital_min = read_from_file_or_buffer(fid, INT64, len(channels)) | |
| digital_max = read_from_file_or_buffer(fid, INT64, len(channels)) | |
| prefiltering = [_edf_str(fid.read(80)) for ch in channels] | |
| highpass, lowpass = _parse_prefilter_string(prefiltering) | |
| # n samples per record | |
| n_samps = read_from_file_or_buffer(fid, INT32, len(channels)) | |
| # channel data type | |
| dtype = read_from_file_or_buffer(fid, INT32, len(channels)) | |
| # total number of bytes for data | |
| bytes_tot = np.sum( | |
| [GDFTYPE_BYTE[t] * n_samps[i] for i, t in enumerate(dtype)] | |
| ) | |
| # Populate edf_info | |
| dtype_np, dtype_byte = _check_dtype_byte(dtype) | |
| edf_info.update( | |
| bytes_tot=bytes_tot, | |
| ch_names=ch_names, | |
| data_offset=header_nbytes, | |
| digital_min=digital_min, | |
| digital_max=digital_max, | |
| dtype_byte=dtype_byte, | |
| dtype_np=dtype_np, | |
| exclude=exclude, | |
| highpass=highpass, | |
| sel=sel, | |
| lowpass=lowpass, | |
| meas_date=meas_date, | |
| meas_id=meas_id, | |
| n_records=n_records, | |
| n_samps=n_samps, | |
| nchan=nchan, | |
| subject_info=patient, | |
| physical_max=physical_max, | |
| physical_min=physical_min, | |
| record_length=record_length, | |
| ) | |
| fid.seek(32 * edf_info["nchan"], 1) # reserved | |
| assert fid.tell() == header_nbytes | |
| # Event table | |
| # ----------------------------------------------------------------- | |
| etp = header_nbytes + n_records * edf_info["bytes_tot"] | |
| # skip data to go to event table | |
| fid.seek(etp) | |
| etmode = read_from_file_or_buffer(fid, UINT8, 1)[0] | |
| if etmode in (1, 3): | |
| sr = read_from_file_or_buffer(fid, UINT8, 3).astype(np.uint32) | |
| event_sr = sr[0] | |
| for i in range(1, len(sr)): | |
| event_sr = event_sr + sr[i] * 2 ** (i * 8) | |
| n_events = read_from_file_or_buffer(fid, UINT32, 1)[0] | |
| pos = ( | |
| read_from_file_or_buffer(fid, UINT32, n_events) - 1 | |
| ) # 1-based inds | |
| typ = read_from_file_or_buffer(fid, UINT16, n_events) | |
| if etmode == 3: | |
| chn = read_from_file_or_buffer(fid, UINT16, n_events) | |
| dur = read_from_file_or_buffer(fid, UINT32, n_events) | |
| else: | |
| chn = np.zeros(n_events, dtype=np.int32) | |
| dur = np.ones(n_events, dtype=UINT32) | |
| np.maximum(dur, 1, out=dur) | |
| events = [n_events, pos, typ, chn, dur] | |
| # GDF 2.x | |
| # --------------------------------------------------------------------- | |
| else: | |
| # FIXED HEADER | |
| handedness = ("Unknown", "Right", "Left", "Equal") | |
| gender = ("Unknown", "Male", "Female") | |
| scale = ("Unknown", "No", "Yes", "Corrected") | |
| # date | |
| pid = fid.read(66).decode() | |
| pid = pid.split(" ", 2) | |
| patient = {} | |
| if len(pid) >= 2: | |
| patient["id"] = pid[0] | |
| patient["name"] = pid[1] | |
| fid.seek(10, 1) # 10bytes reserved | |
| # Smoking / Alcohol abuse / drug abuse / medication | |
| sadm = read_from_file_or_buffer(fid, UINT8, 1)[0] | |
| patient["smoking"] = scale[sadm % 4] | |
| patient["alcohol_abuse"] = scale[(sadm >> 2) % 4] | |
| patient["drug_abuse"] = scale[(sadm >> 4) % 4] | |
| patient["medication"] = scale[(sadm >> 6) % 4] | |
| patient["weight"] = read_from_file_or_buffer(fid, UINT8, 1)[0] | |
| if patient["weight"] == 0 or patient["weight"] == 255: | |
| patient["weight"] = None | |
| patient["height"] = read_from_file_or_buffer(fid, UINT8, 1)[0] | |
| if patient["height"] == 0 or patient["height"] == 255: | |
| patient["height"] = None | |
| # Gender / Handedness / Visual Impairment | |
| ghi = read_from_file_or_buffer(fid, UINT8, 1)[0] | |
| patient["sex"] = gender[ghi % 4] | |
| patient["handedness"] = handedness[(ghi >> 2) % 4] | |
| patient["visual"] = scale[(ghi >> 4) % 4] | |
| # Recording identification | |
| meas_id = {} | |
| meas_id["recording_id"] = _edf_str(fid.read(64)).strip() | |
| vhsv = read_from_file_or_buffer(fid, UINT8, 4) | |
| loc = {} | |
| if vhsv[3] == 0: | |
| loc["vertpre"] = 10 * int(vhsv[0] >> 4) + int(vhsv[0] % 16) | |
| loc["horzpre"] = 10 * int(vhsv[1] >> 4) + int(vhsv[1] % 16) | |
| loc["size"] = 10 * int(vhsv[2] >> 4) + int(vhsv[2] % 16) | |
| else: | |
| loc["vertpre"] = 29 | |
| loc["horzpre"] = 29 | |
| loc["size"] = 29 | |
| loc["version"] = 0 | |
| loc["latitude"] = ( | |
| float(read_from_file_or_buffer(fid, UINT32, 1)[0]) / 3600000 | |
| ) | |
| loc["longitude"] = ( | |
| float(read_from_file_or_buffer(fid, UINT32, 1)[0]) / 3600000 | |
| ) | |
| loc["altitude"] = float(read_from_file_or_buffer(fid, INT32, 1)[0]) / 100 | |
| meas_id["loc"] = loc | |
| meas_date = read_from_file_or_buffer(fid, UINT64, 1)[0] | |
| if meas_date != 0: | |
| meas_date = datetime(1, 1, 1, tzinfo=timezone.utc) + timedelta( | |
| meas_date * pow(2, -32) - 367 | |
| ) | |
| else: | |
| meas_date = None | |
| birthday = read_from_file_or_buffer(fid, UINT64, 1).tolist()[0] | |
| if birthday == 0: | |
| birthday = datetime(1, 1, 1, tzinfo=timezone.utc) | |
| else: | |
| birthday = datetime(1, 1, 1, tzinfo=timezone.utc) + timedelta( | |
| birthday * pow(2, -32) - 367 | |
| ) | |
| patient["birthday"] = birthday | |
| if patient["birthday"] != datetime(1, 1, 1, 0, 0, tzinfo=timezone.utc): | |
| today = datetime.now(tz=timezone.utc) | |
| patient["age"] = today.year - patient["birthday"].year | |
| # fudge the day by -1 if today happens to be a leap day | |
| day = 28 if today.month == 2 and today.day == 29 else today.day | |
| today = today.replace(year=patient["birthday"].year, day=day) | |
| if today < patient["birthday"]: | |
| patient["age"] -= 1 | |
| else: | |
| patient["age"] = None | |
| header_nbytes = read_from_file_or_buffer(fid, UINT16, 1)[0] * 256 | |
| fid.seek(6, 1) # 6 bytes reserved | |
| meas_id["equipment"] = read_from_file_or_buffer(fid, UINT8, 8) | |
| meas_id["ip"] = read_from_file_or_buffer(fid, UINT8, 6) | |
| patient["headsize"] = read_from_file_or_buffer(fid, UINT16, 3) | |
| patient["headsize"] = np.asarray(patient["headsize"], np.float32) | |
| patient["headsize"] = np.ma.masked_array( | |
| patient["headsize"], np.equal(patient["headsize"], 0), None | |
| ).filled() | |
| ref = read_from_file_or_buffer(fid, FLOAT32, 3) | |
| gnd = read_from_file_or_buffer(fid, FLOAT32, 3) | |
| n_records = read_from_file_or_buffer(fid, INT64, 1)[0] | |
| # record length in seconds | |
| record_length = read_from_file_or_buffer(fid, UINT32, 2) | |
| if record_length[0] == 0: | |
| record_length[0] = 1.0 | |
| warn( | |
| "Header information is incorrect for record length. " | |
| "Default record length set to 1." | |
| ) | |
| nchan = int(read_from_file_or_buffer(fid, UINT16, 1)[0]) | |
| fid.seek(2, 1) # 2bytes reserved | |
| # Channels (variable header) | |
| channels = list(range(nchan)) | |
| ch_names = [_edf_str(fid.read(16)).strip() for ch in channels] | |
| exclude = _find_exclude_idx(ch_names, exclude, include) | |
| sel = np.setdiff1d(np.arange(len(ch_names)), exclude) | |
| fid.seek(80 * len(channels), 1) # reserved space | |
| fid.seek(6 * len(channels), 1) # phys_dim, obsolete | |
| """The Physical Dimensions are encoded as int16, according to: | |
| - Units codes : | |
| https://sourceforge.net/p/biosig/svn/HEAD/tree/trunk/biosig/doc/units.csv | |
| - Decimal factors codes: | |
| https://sourceforge.net/p/biosig/svn/HEAD/tree/trunk/biosig/doc/DecimalFactors.txt | |
| """ # noqa | |
| units = read_from_file_or_buffer(fid, UINT16, len(channels)).tolist() | |
| unitcodes = np.array(units[:]) | |
| edf_info["units"] = list() | |
| for i, unit in enumerate(units): | |
| if i in exclude: | |
| continue | |
| if unit == 4275: # microvolts | |
| edf_info["units"].append(1e-6) | |
| elif unit == 4274: # millivolts | |
| edf_info["units"].append(1e-3) | |
| elif unit == 512: # dimensionless | |
| edf_info["units"].append(1) | |
| elif unit == 0: | |
| edf_info["units"].append(1) # unrecognized | |
| else: | |
| warn( | |
| f"Unsupported physical dimension for channel {i} " | |
| "(assuming dimensionless). Please contact the " | |
| "MNE-Python developers for support." | |
| ) | |
| edf_info["units"].append(1) | |
| edf_info["units"] = np.array(edf_info["units"], float) | |
| ch_names = [ch_names[idx] for idx in sel] | |
| physical_min = read_from_file_or_buffer(fid, FLOAT64, len(channels)) | |
| physical_max = read_from_file_or_buffer(fid, FLOAT64, len(channels)) | |
| digital_min = read_from_file_or_buffer(fid, FLOAT64, len(channels)) | |
| digital_max = read_from_file_or_buffer(fid, FLOAT64, len(channels)) | |
| fid.seek(68 * len(channels), 1) # obsolete | |
| lowpass = read_from_file_or_buffer(fid, FLOAT32, len(channels)) | |
| highpass = read_from_file_or_buffer(fid, FLOAT32, len(channels)) | |
| notch = read_from_file_or_buffer(fid, FLOAT32, len(channels)) | |
| # number of samples per record | |
| n_samps = read_from_file_or_buffer(fid, INT32, len(channels)) | |
| # data type | |
| dtype = read_from_file_or_buffer(fid, INT32, len(channels)) | |
| channel = {} | |
| channel["xyz"] = [ | |
| read_from_file_or_buffer(fid, FLOAT32, 3)[0] for ch in channels | |
| ] | |
| if edf_info["number"] < 2.19: | |
| impedance = read_from_file_or_buffer(fid, UINT8, len(channels)).astype( | |
| float | |
| ) | |
| impedance[impedance == 255] = np.nan | |
| channel["impedance"] = pow(2, impedance / 8) | |
| fid.seek(19 * len(channels), 1) # reserved | |
| else: | |
| tmp = read_from_file_or_buffer(fid, FLOAT32, 5 * len(channels)) | |
| tmp = tmp[::5] | |
| fZ = tmp[:] | |
| impedance = tmp[:] | |
| # channels with no voltage (code 4256) data | |
| ch = [unitcodes & 65504 != 4256][0] | |
| impedance[np.where(ch)] = None | |
| # channel with no impedance (code 4288) data | |
| ch = [unitcodes & 65504 != 4288][0] | |
| fZ[np.where(ch)[0]] = None | |
| assert fid.tell() == header_nbytes | |
| # total number of bytes for data | |
| bytes_tot = np.sum( | |
| [GDFTYPE_BYTE[t] * n_samps[i] for i, t in enumerate(dtype)] | |
| ) | |
| # Populate edf_info | |
| dtype_np, dtype_byte = _check_dtype_byte(dtype) | |
| edf_info.update( | |
| bytes_tot=bytes_tot, | |
| ch_names=ch_names, | |
| data_offset=header_nbytes, | |
| dtype_byte=dtype_byte, | |
| dtype_np=dtype_np, | |
| digital_min=digital_min, | |
| digital_max=digital_max, | |
| exclude=exclude, | |
| gnd=gnd, | |
| highpass=highpass, | |
| sel=sel, | |
| impedance=impedance, | |
| lowpass=lowpass, | |
| meas_date=meas_date, | |
| meas_id=meas_id, | |
| n_records=n_records, | |
| n_samps=n_samps, | |
| nchan=nchan, | |
| notch=notch, | |
| subject_info=patient, | |
| physical_max=physical_max, | |
| physical_min=physical_min, | |
| record_length=record_length, | |
| ref=ref, | |
| ) | |
| # EVENT TABLE | |
| # ----------------------------------------------------------------- | |
| etp = ( | |
| edf_info["data_offset"] + edf_info["n_records"] * edf_info["bytes_tot"] | |
| ) | |
| fid.seek(etp) # skip data to go to event table | |
| etmode = fid.read(1) | |
| if isinstance(etmode, (bytes, bytearray)) and len(etmode) == 1: | |
| etmode = np.frombuffer(etmode, dtype=UINT8).tolist()[0] | |
| if edf_info["number"] < 1.94: | |
| sr = read_from_file_or_buffer(fid, UINT8, 3) | |
| event_sr = sr[0] | |
| for i in range(1, len(sr)): | |
| event_sr = event_sr + sr[i] * 2 ** (i * 8) | |
| n_events = read_from_file_or_buffer(fid, UINT32, 1)[0] | |
| else: | |
| ne = read_from_file_or_buffer(fid, UINT8, 3) | |
| n_events = sum(int(ne[i]) << (i * 8) for i in range(len(ne))) | |
| event_sr = read_from_file_or_buffer(fid, FLOAT32, 1)[0] | |
| pos = ( | |
| read_from_file_or_buffer(fid, UINT32, n_events) - 1 | |
| ) # 1-based inds | |
| typ = read_from_file_or_buffer(fid, UINT16, n_events) | |
| if etmode == 3: | |
| chn = read_from_file_or_buffer(fid, UINT16, n_events) | |
| dur = read_from_file_or_buffer(fid, UINT32, n_events) | |
| else: | |
| chn = np.zeros(n_events, dtype=np.uint32) | |
| dur = np.ones(n_events, dtype=np.uint32) | |
| np.maximum(dur, 1, out=dur) | |
| events = [n_events, pos, typ, chn, dur] | |
| edf_info["event_sfreq"] = event_sr | |
| edf_info.update(events=events, sel=np.arange(len(edf_info["ch_names"]))) | |
| return edf_info | |
| def _check_stim_channel( | |
| stim_channel, | |
| ch_names, | |
| tal_ch_names=("EDF Annotations", "BDF Annotations"), | |
| ): | |
| """Check that the stimulus channel exists in the current datafile.""" | |
| DEFAULT_STIM_CH_NAMES = ["status", "trigger"] | |
| if stim_channel is None or stim_channel is False: | |
| return [], [] | |
| if stim_channel is True: # convenient aliases | |
| stim_channel = "auto" | |
| elif isinstance(stim_channel, str): | |
| if stim_channel == "auto": | |
| if "auto" in ch_names: | |
| warn( | |
| RuntimeWarning, | |
| "Using `stim_channel='auto'` when auto" | |
| " also corresponds to a channel name is ambiguous." | |
| " Please use `stim_channel=['auto']`.", | |
| ) | |
| else: | |
| valid_stim_ch_names = DEFAULT_STIM_CH_NAMES | |
| else: | |
| valid_stim_ch_names = [stim_channel.lower()] | |
| elif isinstance(stim_channel, int): | |
| valid_stim_ch_names = [ch_names[stim_channel].lower()] | |
| elif isinstance(stim_channel, list): | |
| if all([isinstance(s, str) for s in stim_channel]): | |
| valid_stim_ch_names = [s.lower() for s in stim_channel] | |
| elif all([isinstance(s, int) for s in stim_channel]): | |
| valid_stim_ch_names = [ch_names[s].lower() for s in stim_channel] | |
| else: | |
| raise ValueError("Invalid stim_channel") | |
| else: | |
| raise ValueError("Invalid stim_channel") | |
| # Forbid the synthesis of stim channels from TAL Annotations | |
| tal_ch_names_found = [ | |
| ch for ch in valid_stim_ch_names if ch in [t.lower() for t in tal_ch_names] | |
| ] | |
| if len(tal_ch_names_found): | |
| _msg = ( | |
| "The synthesis of the stim channel is not supported since 0.18. Please " | |
| f"remove {tal_ch_names_found} from `stim_channel` and use " | |
| "`mne.events_from_annotations` instead." | |
| ) | |
| raise ValueError(_msg) | |
| ch_names_low = [ch.lower() for ch in ch_names] | |
| found = list(set(valid_stim_ch_names) & set(ch_names_low)) | |
| if not found: | |
| return [], [] | |
| else: | |
| stim_channel_idxs = [ch_names_low.index(f) for f in found] | |
| names = [ch_names[idx] for idx in stim_channel_idxs] | |
| return stim_channel_idxs, names | |
| def _find_exclude_idx(ch_names, exclude, include=None): | |
| """Find indices of all channels to exclude. | |
| If there are several channels called "A" and we want to exclude "A", then | |
| add (the index of) all "A" channels to the exclusion list. | |
| """ | |
| if include: # find other than include channels | |
| if exclude: | |
| raise ValueError( | |
| f"'exclude' must be empty if 'include' is assigned. Got {exclude}." | |
| ) | |
| if isinstance(include, str): # regex for channel names | |
| indices_include = [] | |
| for idx, ch in enumerate(ch_names): | |
| if re.match(include, ch): | |
| indices_include.append(idx) | |
| indices = np.setdiff1d(np.arange(len(ch_names)), indices_include) | |
| return indices | |
| # list of channel names | |
| return [idx for idx, ch in enumerate(ch_names) if ch not in include] | |
| if isinstance(exclude, str): # regex for channel names | |
| indices = [] | |
| for idx, ch in enumerate(ch_names): | |
| if re.match(exclude, ch): | |
| indices.append(idx) | |
| return indices | |
| # list of channel names | |
| return [idx for idx, ch in enumerate(ch_names) if ch in exclude] | |
| def _find_tal_idx(ch_names): | |
| # Annotations / TAL Channels | |
| accepted_tal_ch_names = ["EDF Annotations", "BDF Annotations"] | |
| tal_channel_idx = np.where(np.isin(ch_names, accepted_tal_ch_names))[0] | |
| return tal_channel_idx | |
| def _check_args(input_fname, preload, target_ext): | |
| if not _file_like(input_fname): | |
| input_fname = _check_fname(fname=input_fname, overwrite="read", must_exist=True) | |
| ext = input_fname.suffix[1:].lower() | |
| if ext != target_ext: | |
| raise NotImplementedError( | |
| f"Only {target_ext.upper()} files are supported, got {ext}." | |
| ) | |
| else: | |
| if not preload: | |
| raise ValueError("preload must be used with file-like objects") | |
| def read_raw_edf( | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| stim_channel="auto", | |
| exclude=(), | |
| infer_types=False, | |
| include=None, | |
| preload=False, | |
| units=None, | |
| encoding="utf8", | |
| exclude_after_unique=False, | |
| *, | |
| verbose=None, | |
| ) -> RawEDF: | |
| """Reader function for EDF and EDF+ files. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | |
| Path to the EDF or EDF+ file or EDF/EDF+ file itself. If a file-like | |
| object is provided, preload must be used. | |
| .. versionchanged:: 1.10 | |
| Added support for file-like objects | |
| eog : list or tuple | |
| Names of channels or list of indices that should be designated EOG | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| misc : list or tuple | |
| Names of channels or list of indices that should be designated MISC | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| stim_channel : ``'auto'`` | str | list of str | int | list of int | |
| Defaults to ``'auto'``, which means that channels named ``'status'`` or | |
| ``'trigger'`` (case insensitive) are set to STIM. If str (or list of | |
| str), all channels matching the name(s) are set to STIM. If int (or | |
| list of ints), channels corresponding to the indices are set to STIM. | |
| exclude : list of str | str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. A str is | |
| interpreted as a regular expression. | |
| infer_types : bool | |
| If True, try to infer channel types from channel labels. If a channel | |
| label starts with a known type (such as 'EEG') followed by a space and | |
| a name (such as 'Fp1'), the channel type will be set accordingly, and | |
| the channel will be renamed to the original label without the prefix. | |
| For unknown prefixes, the type will be 'EEG' and the name will not be | |
| modified. If False, do not infer types and assume all channels are of | |
| type 'EEG'. | |
| .. versionadded:: 0.24.1 | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| .. versionadded:: 1.1 | |
| %(preload)s | |
| %(units_edf_bdf_io)s | |
| %(encoding_edf)s | |
| %(exclude_after_unique)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawEDF | |
| The raw instance. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.read_raw_bdf : Reader function for BDF files. | |
| mne.io.read_raw_gdf : Reader function for GDF files. | |
| mne.export.export_raw : Export function for EDF files. | |
| mne.io.Raw : Documentation of attributes and methods of RawEDF. | |
| Notes | |
| ----- | |
| %(edf_resamp_note)s | |
| It is worth noting that in some special cases, it may be necessary to shift | |
| event values in order to retrieve correct event triggers. This depends on | |
| the triggering device used to perform the synchronization. For instance, in | |
| some files events need to be shifted by 8 bits: | |
| >>> events[:, 2] >>= 8 # doctest:+SKIP | |
| TAL channels called 'EDF Annotations' are parsed and extracted annotations | |
| are stored in raw.annotations. Use :func:`mne.events_from_annotations` to | |
| obtain events from these annotations. | |
| If channels named 'status' or 'trigger' are present, they are considered as | |
| STIM channels by default. Use func:`mne.find_events` to parse events | |
| encoded in such analog stim channels. | |
| The EDF specification allows optional storage of channel types in the | |
| prefix of the signal label for each channel. For example, ``EEG Fz`` | |
| implies that ``Fz`` is an EEG channel and ``MISC E`` would imply ``E`` is | |
| a MISC channel. However, there is no standard way of specifying all | |
| channel types. MNE-Python will try to infer the channel type, when such a | |
| string exists, defaulting to EEG, when there is no prefix or the prefix is | |
| not recognized. | |
| The following prefix strings are mapped to MNE internal types: | |
| - 'EEG': 'eeg' | |
| - 'SEEG': 'seeg' | |
| - 'ECOG': 'ecog' | |
| - 'DBS': 'dbs' | |
| - 'EOG': 'eog' | |
| - 'ECG': 'ecg' | |
| - 'EMG': 'emg' | |
| - 'BIO': 'bio' | |
| - 'RESP': 'resp' | |
| - 'MISC': 'misc' | |
| - 'SAO2': 'bio' | |
| The EDF specification allows storage of subseconds in measurement date. | |
| However, this reader currently sets subseconds to 0 by default. | |
| """ | |
| _check_args(input_fname, preload, "edf") | |
| return RawEDF( | |
| input_fname=input_fname, | |
| eog=eog, | |
| misc=misc, | |
| stim_channel=stim_channel, | |
| exclude=exclude, | |
| infer_types=infer_types, | |
| preload=preload, | |
| include=include, | |
| units=units, | |
| encoding=encoding, | |
| exclude_after_unique=exclude_after_unique, | |
| verbose=verbose, | |
| ) | |
| def read_raw_bdf( | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| stim_channel="auto", | |
| exclude=(), | |
| infer_types=False, | |
| include=None, | |
| preload=False, | |
| units=None, | |
| encoding="utf8", | |
| exclude_after_unique=False, | |
| *, | |
| verbose=None, | |
| ) -> RawBDF: | |
| """Reader function for BDF files. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | file-like | |
| Path to the BDF file of BDF file itself. If a file-like object is | |
| provided, preload must be used. | |
| .. versionchanged:: 1.10 | |
| Added support for file-like objects | |
| eog : list or tuple | |
| Names of channels or list of indices that should be designated EOG | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| misc : list or tuple | |
| Names of channels or list of indices that should be designated MISC | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| stim_channel : ``'auto'`` | str | list of str | int | list of int | |
| Defaults to ``'auto'``, which means that channels named ``'status'`` or | |
| ``'trigger'`` (case insensitive) are set to STIM. If str (or list of | |
| str), all channels matching the name(s) are set to STIM. If int (or | |
| list of ints), channels corresponding to the indices are set to STIM. | |
| exclude : list of str | str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. A str is | |
| interpreted as a regular expression. | |
| infer_types : bool | |
| If True, try to infer channel types from channel labels. If a channel | |
| label starts with a known type (such as 'EEG') followed by a space and | |
| a name (such as 'Fp1'), the channel type will be set accordingly, and | |
| the channel will be renamed to the original label without the prefix. | |
| For unknown prefixes, the type will be 'EEG' and the name will not be | |
| modified. If False, do not infer types and assume all channels are of | |
| type 'EEG'. | |
| .. versionadded:: 0.24.1 | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| .. versionadded:: 1.1 | |
| %(preload)s | |
| %(units_edf_bdf_io)s | |
| %(encoding_edf)s | |
| %(exclude_after_unique)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawEDF | |
| The raw instance. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.read_raw_edf : Reader function for EDF and EDF+ files. | |
| mne.io.read_raw_gdf : Reader function for GDF files. | |
| mne.io.Raw : Documentation of attributes and methods of RawEDF. | |
| Notes | |
| ----- | |
| :class:`mne.io.Raw` only stores signals with matching sampling frequencies. | |
| Therefore, if mixed sampling frequency signals are requested, all signals | |
| are upsampled to the highest loaded sampling frequency. In this case, using | |
| preload=True is recommended, as otherwise, edge artifacts appear when | |
| slices of the signal are requested. | |
| Biosemi devices trigger codes are encoded in 16-bit format, whereas system | |
| codes (CMS in/out-of range, battery low, etc.) are coded in bits 16-23 of | |
| the status channel (see http://www.biosemi.com/faq/trigger_signals.htm). | |
| To retrieve correct event values (bits 1-16), one could do: | |
| >>> events = mne.find_events(...) # doctest:+SKIP | |
| >>> events[:, 2] &= (2**16 - 1) # doctest:+SKIP | |
| The above operation can be carried out directly in :func:`mne.find_events` | |
| using the ``mask`` and ``mask_type`` parameters (see | |
| :func:`mne.find_events` for more details). | |
| It is also possible to retrieve system codes, but no particular effort has | |
| been made to decode these in MNE. In case it is necessary, for instance to | |
| check the CMS bit, the following operation can be carried out: | |
| >>> cms_bit = 20 # doctest:+SKIP | |
| >>> cms_high = (events[:, 2] & (1 << cms_bit)) != 0 # doctest:+SKIP | |
| It is worth noting that in some special cases, it may be necessary to shift | |
| event values in order to retrieve correct event triggers. This depends on | |
| the triggering device used to perform the synchronization. For instance, in | |
| some files events need to be shifted by 8 bits: | |
| >>> events[:, 2] >>= 8 # doctest:+SKIP | |
| TAL channels called 'BDF Annotations' are parsed and extracted annotations | |
| are stored in raw.annotations. Use :func:`mne.events_from_annotations` to | |
| obtain events from these annotations. | |
| If channels named 'status' or 'trigger' are present, they are considered as | |
| STIM channels by default. Use func:`mne.find_events` to parse events | |
| encoded in such analog stim channels. | |
| """ | |
| _check_args(input_fname, preload, "bdf") | |
| return RawBDF( | |
| input_fname=input_fname, | |
| eog=eog, | |
| misc=misc, | |
| stim_channel=stim_channel, | |
| exclude=exclude, | |
| infer_types=infer_types, | |
| preload=preload, | |
| include=include, | |
| units=units, | |
| encoding=encoding, | |
| exclude_after_unique=exclude_after_unique, | |
| verbose=verbose, | |
| ) | |
| def read_raw_gdf( | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| stim_channel="auto", | |
| exclude=(), | |
| include=None, | |
| preload=False, | |
| verbose=None, | |
| ) -> RawGDF: | |
| """Reader function for GDF files. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | file-like | |
| Path to the GDF file or GDF file itself. If a file-like object is | |
| provided, preload must be used. | |
| .. versionchanged:: 1.10 | |
| Added support for file-like objects | |
| eog : list or tuple | |
| Names of channels or list of indices that should be designated EOG | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| misc : list or tuple | |
| Names of channels or list of indices that should be designated MISC | |
| channels. Values should correspond to the electrodes in the file. | |
| Default is None. | |
| stim_channel : ``'auto'`` | str | list of str | int | list of int | |
| Defaults to ``'auto'``, which means that channels named ``'status'`` or | |
| ``'trigger'`` (case insensitive) are set to STIM. If str (or list of | |
| str), all channels matching the name(s) are set to STIM. If int (or | |
| list of ints), channels corresponding to the indices are set to STIM. | |
| exclude : list of str | str | |
| Channel names to exclude. This can help when reading data with | |
| different sampling rates to avoid unnecessary resampling. A str is | |
| interpreted as a regular expression. | |
| include : list of str | str | |
| Channel names to be included. A str is interpreted as a regular | |
| expression. 'exclude' must be empty if include is assigned. | |
| %(preload)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawGDF | |
| The raw instance. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.read_raw_edf : Reader function for EDF and EDF+ files. | |
| mne.io.read_raw_bdf : Reader function for BDF files. | |
| mne.io.Raw : Documentation of attributes and methods of RawGDF. | |
| Notes | |
| ----- | |
| If channels named 'status' or 'trigger' are present, they are considered as | |
| STIM channels by default. Use func:`mne.find_events` to parse events | |
| encoded in such analog stim channels. | |
| """ | |
| _check_args(input_fname, preload, "gdf") | |
| return RawGDF( | |
| input_fname=input_fname, | |
| eog=eog, | |
| misc=misc, | |
| stim_channel=stim_channel, | |
| exclude=exclude, | |
| preload=preload, | |
| include=include, | |
| verbose=verbose, | |
| ) | |
| def _read_annotations_edf(annotations, ch_names=None, encoding="utf8"): | |
| """Annotation File Reader. | |
| Parameters | |
| ---------- | |
| annotations : ndarray (n_chans, n_samples) | str | |
| Channel data in EDF+ TAL format or path to annotation file. | |
| ch_names : list of string | |
| List of channels' names. | |
| %(encoding_edf)s | |
| Returns | |
| ------- | |
| annot : instance of Annotations | |
| The annotations. | |
| """ | |
| pat = "([+-]\\d+\\.?\\d*)(\x15(\\d+\\.?\\d*))?(\x14.*?)\x14\x00" | |
| if isinstance(annotations, str | Path): | |
| with open(annotations, "rb") as annot_file: | |
| triggers = re.findall(pat.encode(), annot_file.read()) | |
| triggers = [tuple(map(lambda x: x.decode(encoding), t)) for t in triggers] | |
| else: | |
| tals = bytearray() | |
| annotations = np.atleast_2d(annotations) | |
| for chan in annotations: | |
| this_chan = chan.ravel() | |
| if this_chan.dtype == INT32: # BDF | |
| this_chan = this_chan.view(dtype=UINT8) | |
| this_chan = this_chan.reshape(-1, 4) | |
| # Why only keep the first 3 bytes as BDF values | |
| # are stored with 24 bits (not 32) | |
| this_chan = this_chan[:, :3].ravel() | |
| # As ravel() returns a 1D array we can add all values at once | |
| tals.extend(this_chan) | |
| else: | |
| this_chan = chan.astype(np.int64) | |
| # Exploit np vectorized processing | |
| tals.extend(np.uint8([this_chan % 256, this_chan // 256]).flatten("F")) | |
| try: | |
| triggers = re.findall(pat, tals.decode(encoding)) | |
| except UnicodeDecodeError as e: | |
| raise Exception( | |
| "Encountered invalid byte in at least one annotations channel." | |
| " You might want to try setting \"encoding='latin1'\"." | |
| ) from e | |
| events = {} | |
| offset = 0.0 | |
| for k, ev in enumerate(triggers): | |
| onset = float(ev[0]) + offset | |
| duration = float(ev[2]) if ev[2] else 0 | |
| for description in ev[3].split("\x14")[1:]: | |
| if description: | |
| if ( | |
| "@@" in description | |
| and ch_names is not None | |
| and description.split("@@")[1] in ch_names | |
| ): | |
| description, ch_name = description.split("@@") | |
| key = f"{onset}_{duration}_{description}" | |
| else: | |
| ch_name = None | |
| key = f"{onset}_{duration}_{description}" | |
| if key in events: | |
| key += f"_{k}" # make key unique | |
| if key in events and ch_name: | |
| events[key][3] += (ch_name,) | |
| else: | |
| events[key] = [ | |
| onset, | |
| duration, | |
| description, | |
| (ch_name,) if ch_name else (), | |
| ] | |
| elif k == 0: | |
| # The startdate/time of a file is specified in the EDF+ header | |
| # fields 'startdate of recording' and 'starttime of recording'. | |
| # These fields must indicate the absolute second in which the | |
| # start of the first data record falls. So, the first TAL in | |
| # the first data record always starts with +0.X, indicating | |
| # that the first data record starts a fraction, X, of a second | |
| # after the startdate/time that is specified in the EDF+ | |
| # header. If X=0, then the .X may be omitted. | |
| offset = -onset | |
| if events: | |
| onset, duration, description, annot_ch_names = zip(*events.values()) | |
| else: | |
| onset, duration, description, annot_ch_names = list(), list(), list(), list() | |
| assert len(onset) == len(duration) == len(description) == len(annot_ch_names) | |
| return Annotations( | |
| onset=onset, | |
| duration=duration, | |
| description=description, | |
| orig_time=None, | |
| ch_names=annot_ch_names, | |
| ) | |
| def _get_annotations_gdf(edf_info, sfreq): | |
| onset, duration, desc = list(), list(), list() | |
| events = edf_info.get("events", None) | |
| # Annotations in GDF: events are stored as the following | |
| # list: `events = [n_events, pos, typ, chn, dur]` where pos is the | |
| # latency, dur is the duration in samples. They both are | |
| # numpy.ndarray | |
| if events is not None and events[1].shape[0] > 0: | |
| onset = events[1] / sfreq | |
| duration = events[4] / sfreq | |
| desc = events[2] | |
| return onset, duration, desc | |