Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import numpy as np | |
| from ..._fiff._digitization import DigPoint, _ensure_fiducials_head | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import create_info | |
| from ..._fiff.pick import pick_info | |
| from ...transforms import rotation3d_align_z_axis | |
| from ...utils import _check_pandas_installed, warn | |
| _supported_megs = ["neuromag306"] | |
| _unit_dict = { | |
| "m": 1, | |
| "cm": 1e-2, | |
| "mm": 1e-3, | |
| "V": 1, | |
| "mV": 1e-3, | |
| "uV": 1e-6, | |
| "T": 1, | |
| "T/m": 1, | |
| "T/cm": 1e2, | |
| } | |
| NOINFO_WARNING = ( | |
| "Importing FieldTrip data without an info dict from the " | |
| "original file. Channel locations, orientations and types " | |
| "will be incorrect. The imported data cannot be used for " | |
| "source analysis, channel interpolation etc." | |
| ) | |
| def _validate_ft_struct(ft_struct): | |
| """Run validation checks on the ft_structure.""" | |
| if isinstance(ft_struct, list): | |
| raise RuntimeError("Loading of data in cell arrays is not supported") | |
| def _create_info(ft_struct, raw_info): | |
| """Create MNE info structure from a FieldTrip structure.""" | |
| if raw_info is None: | |
| warn(NOINFO_WARNING) | |
| sfreq = _set_sfreq(ft_struct) | |
| ch_names = ft_struct["label"] | |
| if raw_info: | |
| info = raw_info.copy() | |
| missing_channels = set(ch_names) - set(info["ch_names"]) | |
| if missing_channels: | |
| warn( | |
| "The following channels are present in the FieldTrip data " | |
| f"but cannot be found in the provided info: {missing_channels}.\n" | |
| "These channels will be removed from the resulting data!" | |
| ) | |
| missing_chan_idx = [ch_names.index(ch) for ch in missing_channels] | |
| new_chs = [ch for ch in ch_names if ch not in missing_channels] | |
| ch_names = new_chs | |
| ft_struct["label"] = ch_names | |
| if "trial" in ft_struct: | |
| ft_struct["trial"] = _remove_missing_channels_from_trial( | |
| ft_struct["trial"], missing_chan_idx | |
| ) | |
| if "avg" in ft_struct: | |
| if ft_struct["avg"].ndim == 2: | |
| ft_struct["avg"] = np.delete( | |
| ft_struct["avg"], missing_chan_idx, axis=0 | |
| ) | |
| with info._unlock(): | |
| info["sfreq"] = sfreq | |
| ch_idx = [info["ch_names"].index(ch) for ch in ch_names] | |
| pick_info(info, ch_idx, copy=False) | |
| else: | |
| info = create_info(ch_names, sfreq) | |
| chs, dig = _create_info_chs_dig(ft_struct) | |
| with info._unlock(update_redundant=True): | |
| info.update(chs=chs, dig=dig) | |
| return info | |
| def _remove_missing_channels_from_trial(trial, missing_chan_idx): | |
| if isinstance(trial, list): | |
| for idx_trial in range(len(trial)): | |
| trial[idx_trial] = _remove_missing_channels_from_trial( | |
| trial[idx_trial], missing_chan_idx | |
| ) | |
| elif isinstance(trial, np.ndarray): | |
| if trial.ndim == 2: | |
| trial = np.delete(trial, missing_chan_idx, axis=0) | |
| else: | |
| raise ValueError( | |
| '"trial" field of the FieldTrip structure has an unknown format.' | |
| ) | |
| return trial | |
| def _create_info_chs_dig(ft_struct): | |
| """Create the chs info field from the FieldTrip structure.""" | |
| all_channels = ft_struct["label"] | |
| ch_defaults = dict( | |
| coord_frame=FIFF.FIFFV_COORD_UNKNOWN, | |
| cal=1.0, | |
| range=1.0, | |
| unit_mul=FIFF.FIFF_UNITM_NONE, | |
| loc=np.array([0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1]), | |
| unit=FIFF.FIFF_UNIT_V, | |
| ) | |
| try: | |
| elec = ft_struct["elec"] | |
| except KeyError: | |
| elec = None | |
| try: | |
| grad = ft_struct["grad"] | |
| except KeyError: | |
| grad = None | |
| if elec is None and grad is None: | |
| warn( | |
| "The supplied FieldTrip structure does not have an elec or grad " | |
| "field. No channel locations will extracted and the kind of " | |
| "channel might be inaccurate." | |
| ) | |
| if "chanpos" not in (elec or grad or {"chanpos": None}): | |
| raise RuntimeError( | |
| "This file was created with an old version of FieldTrip. You can " | |
| "convert the data to the new version by loading it into FieldTrip " | |
| "and applying ft_selectdata with an empty cfg structure on it. " | |
| "Otherwise you can supply the Info field." | |
| ) | |
| chs = list() | |
| dig = list() | |
| counter = 0 | |
| for idx_chan, cur_channel_label in enumerate(all_channels): | |
| cur_ch = ch_defaults.copy() | |
| cur_ch["ch_name"] = cur_channel_label | |
| cur_ch["logno"] = idx_chan + 1 | |
| cur_ch["scanno"] = idx_chan + 1 | |
| if elec and cur_channel_label in elec["label"]: | |
| cur_ch = _process_channel_eeg(cur_ch, elec) | |
| assert cur_ch["coord_frame"] == FIFF.FIFFV_COORD_HEAD | |
| # Ref gets ident=0 and we don't have it, so start at 1 | |
| counter += 1 | |
| d = DigPoint( | |
| r=cur_ch["loc"][:3], | |
| coord_frame=FIFF.FIFFV_COORD_HEAD, | |
| kind=FIFF.FIFFV_POINT_EEG, | |
| ident=counter, | |
| ) | |
| dig.append(d) | |
| elif grad and cur_channel_label in grad["label"]: | |
| cur_ch = _process_channel_meg(cur_ch, grad) | |
| else: | |
| if cur_channel_label.startswith("EOG"): | |
| cur_ch["kind"] = FIFF.FIFFV_EOG_CH | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_EEG | |
| elif cur_channel_label.startswith("ECG"): | |
| cur_ch["kind"] = FIFF.FIFFV_ECG_CH | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_EEG_BIPOLAR | |
| elif cur_channel_label.startswith("STI"): | |
| cur_ch["kind"] = FIFF.FIFFV_STIM_CH | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_NONE | |
| else: | |
| warn( | |
| f"Cannot guess the correct type of channel {cur_channel_label}. " | |
| "Making it a MISC channel." | |
| ) | |
| cur_ch["kind"] = FIFF.FIFFV_MISC_CH | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_NONE | |
| chs.append(cur_ch) | |
| _ensure_fiducials_head(dig) | |
| return chs, dig | |
| def _set_sfreq(ft_struct): | |
| """Set the sample frequency.""" | |
| try: | |
| sfreq = ft_struct["fsample"] | |
| except KeyError: | |
| try: | |
| time = ft_struct["time"] | |
| except KeyError: | |
| raise ValueError("No Source for sfreq found") | |
| else: | |
| t1, t2 = float(time[0]), float(time[1]) | |
| sfreq = 1 / (t2 - t1) | |
| try: | |
| sfreq = float(sfreq) | |
| except TypeError: | |
| warn( | |
| "FieldTrip structure contained multiple sample rates, trying the " | |
| f"first of:\n{sfreq} Hz" | |
| ) | |
| sfreq = float(sfreq.ravel()[0]) | |
| return sfreq | |
| def _set_tmin(ft_struct): | |
| """Set the start time before the event in evoked data if possible.""" | |
| times = ft_struct["time"] | |
| time_check = all(times[i][0] == times[i - 1][0] for i, x in enumerate(times)) | |
| if time_check: | |
| tmin = times[0][0] | |
| else: | |
| raise RuntimeError( | |
| "Loading data with non-uniform times per epoch is not supported" | |
| ) | |
| return tmin | |
| def _create_events(ft_struct, trialinfo_column): | |
| """Create an event matrix from the FieldTrip structure.""" | |
| if "trialinfo" not in ft_struct: | |
| return None | |
| event_type = ft_struct["trialinfo"] | |
| event_number = range(len(event_type)) | |
| if trialinfo_column < 0: | |
| raise ValueError("trialinfo_column must be positive") | |
| available_ti_cols = 1 | |
| if event_type.ndim == 2: | |
| available_ti_cols = event_type.shape[1] | |
| if trialinfo_column > (available_ti_cols - 1): | |
| raise ValueError( | |
| "trialinfo_column is higher than the amount of columns in trialinfo." | |
| ) | |
| event_trans_val = np.zeros(len(event_type)) | |
| if event_type.ndim == 2: | |
| event_type = event_type[:, trialinfo_column] | |
| events = ( | |
| np.vstack([np.array(event_number), event_trans_val, event_type]).astype("int").T | |
| ) | |
| return events | |
| def _create_event_metadata(ft_struct): | |
| """Create event metadata from trialinfo.""" | |
| pandas = _check_pandas_installed(strict=False) | |
| if not pandas: | |
| warn( | |
| "The Pandas library is not installed. Not returning the original " | |
| "trialinfo matrix as metadata." | |
| ) | |
| return None | |
| metadata = pandas.DataFrame(ft_struct["trialinfo"]) | |
| return metadata | |
| def _process_channel_eeg(cur_ch, elec): | |
| """Convert EEG channel from FieldTrip to MNE. | |
| Parameters | |
| ---------- | |
| cur_ch: dict | |
| Channel specific dictionary to populate. | |
| elec: dict | |
| elec dict as loaded from the FieldTrip structure | |
| Returns | |
| ------- | |
| cur_ch: dict | |
| The original dict (cur_ch) with the added information | |
| """ | |
| all_labels = np.asanyarray(elec["label"]) | |
| chan_idx_in_elec = np.where(all_labels == cur_ch["ch_name"])[0][0] | |
| position = np.squeeze(elec["chanpos"][chan_idx_in_elec, :]) | |
| # chanunit = elec['chanunit'][chan_idx_in_elec] # not used/needed yet | |
| position_unit = elec["unit"] | |
| position = position * _unit_dict[position_unit] | |
| cur_ch["loc"] = np.hstack((position, np.zeros((9,)))) | |
| cur_ch["unit"] = FIFF.FIFF_UNIT_V | |
| cur_ch["kind"] = FIFF.FIFFV_EEG_CH | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_EEG | |
| cur_ch["coord_frame"] = FIFF.FIFFV_COORD_HEAD | |
| return cur_ch | |
| def _process_channel_meg(cur_ch, grad): | |
| """Convert MEG channel from FieldTrip to MNE. | |
| Parameters | |
| ---------- | |
| cur_ch: dict | |
| Channel specific dictionary to populate. | |
| grad: dict | |
| grad dict as loaded from the FieldTrip structure | |
| Returns | |
| ------- | |
| dict: The original dict (cur_ch) with the added information | |
| """ | |
| all_labels = np.asanyarray(grad["label"]) | |
| chan_idx_in_grad = np.where(all_labels == cur_ch["ch_name"])[0][0] | |
| gradtype = grad["type"] | |
| chantype = grad["chantype"][chan_idx_in_grad] | |
| position_unit = grad["unit"] | |
| position = np.squeeze(grad["chanpos"][chan_idx_in_grad, :]) | |
| position = position * _unit_dict[position_unit] | |
| if gradtype == "neuromag306" and "tra" in grad and "coilpos" in grad: | |
| # Try to regenerate original channel pos. | |
| idx_in_coilpos = np.where(grad["tra"][chan_idx_in_grad, :] != 0)[0] | |
| cur_coilpos = grad["coilpos"][idx_in_coilpos, :] | |
| cur_coilpos = cur_coilpos * _unit_dict[position_unit] | |
| cur_coilori = grad["coilori"][idx_in_coilpos, :] | |
| if chantype == "megmag": | |
| position = cur_coilpos[0] - 0.0003 * cur_coilori[0] | |
| if chantype == "megplanar": | |
| tmp_pos = cur_coilpos - 0.0003 * cur_coilori | |
| position = np.average(tmp_pos, axis=0) | |
| original_orientation = np.squeeze(grad["chanori"][chan_idx_in_grad, :]) | |
| try: | |
| orientation = rotation3d_align_z_axis(original_orientation).T | |
| except AssertionError: | |
| orientation = np.eye(3) | |
| assert orientation.shape == (3, 3) | |
| orientation = orientation.flatten() | |
| # chanunit = grad['chanunit'][chan_idx_in_grad] # not used/needed yet | |
| cur_ch["loc"] = np.hstack((position, orientation)) | |
| cur_ch["kind"] = FIFF.FIFFV_MEG_CH | |
| if chantype == "megmag": | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_POINT_MAGNETOMETER | |
| cur_ch["unit"] = FIFF.FIFF_UNIT_T | |
| elif chantype == "megplanar": | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_VV_PLANAR_T1 | |
| cur_ch["unit"] = FIFF.FIFF_UNIT_T_M | |
| elif chantype == "refmag": | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_MAGNES_REF_MAG | |
| cur_ch["unit"] = FIFF.FIFF_UNIT_T | |
| elif chantype == "refgrad": | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_MAGNES_REF_GRAD | |
| cur_ch["unit"] = FIFF.FIFF_UNIT_T | |
| elif chantype == "meggrad": | |
| cur_ch["coil_type"] = FIFF.FIFFV_COIL_AXIAL_GRAD_5CM | |
| cur_ch["unit"] = FIFF.FIFF_UNIT_T | |
| else: | |
| raise RuntimeError(f"Unexpected coil type: {chantype}.") | |
| cur_ch["coord_frame"] = FIFF.FIFFV_COORD_HEAD | |
| return cur_ch | |