Spaces:
Running
Running
| """Conversion tool from SQD to FIF. | |
| RawKIT class is adapted from Denis Engemann et al.'s mne_bti2fiff.py. | |
| """ | |
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| from collections import OrderedDict, defaultdict | |
| from math import cos, sin | |
| from os import SEEK_CUR, PathLike | |
| from os import path as op | |
| from pathlib import Path | |
| import numpy as np | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import _empty_info | |
| from ..._fiff.pick import pick_types | |
| from ..._fiff.utils import _mult_cal_one | |
| from ...epochs import BaseEpochs | |
| from ...event import read_events | |
| from ...transforms import Transform, als_ras_trans, apply_trans | |
| from ...utils import ( | |
| _check_fname, | |
| _check_option, | |
| _stamp_to_dt, | |
| fill_doc, | |
| logger, | |
| verbose, | |
| warn, | |
| ) | |
| from ..base import BaseRaw | |
| from .constants import KIT, LEGACY_AMP_PARAMS | |
| from .coreg import _set_dig_kit, read_mrk | |
| FLOAT64 = "<f8" | |
| UINT32 = "<u4" | |
| INT32 = "<i4" | |
| def _call_digitization(info, mrk, elp, hsp, kit_info, *, bad_coils=()): | |
| # Use values from kit_info only if all others are None | |
| if mrk is None and elp is None and hsp is None: | |
| mrk = kit_info.get("mrk", None) | |
| elp = kit_info.get("elp", None) | |
| hsp = kit_info.get("hsp", None) | |
| # prepare mrk | |
| if isinstance(mrk, list): | |
| mrk = [ | |
| read_mrk(marker) if isinstance(marker, str | Path | PathLike) else marker | |
| for marker in mrk | |
| ] | |
| mrk = np.mean(mrk, axis=0) | |
| # setup digitization | |
| if mrk is not None and elp is not None and hsp is not None: | |
| with info._unlock(): | |
| info["dig"], info["dev_head_t"], info["hpi_results"] = _set_dig_kit( | |
| mrk, | |
| elp, | |
| hsp, | |
| kit_info["eeg_dig"], | |
| bad_coils=bad_coils, | |
| ) | |
| elif mrk is not None or elp is not None or hsp is not None: | |
| raise ValueError( | |
| "mrk, elp and hsp need to be provided as a group (all or none)" | |
| ) | |
| return info | |
| class UnsupportedKITFormat(ValueError): | |
| """Our reader is not guaranteed to work with old files.""" | |
| def __init__(self, sqd_version, *args, **kwargs): | |
| self.sqd_version = sqd_version | |
| ValueError.__init__(self, *args, **kwargs) | |
| class RawKIT(BaseRaw): | |
| r"""Raw object from KIT SQD file. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | |
| Path to the SQD file. | |
| %(kit_mrk)s | |
| %(kit_elp)s | |
| %(kit_hsp)s | |
| %(kit_stim)s | |
| %(kit_slope)s | |
| %(kit_stimthresh)s | |
| %(preload)s | |
| %(kit_stimcode)s | |
| allow_unknown_format : bool | |
| Force reading old data that is not officially supported. Alternatively, | |
| read and re-save the data with the KIT MEG Laboratory application. | |
| %(standardize_names)s | |
| %(kit_badcoils)s | |
| %(verbose)s | |
| Notes | |
| ----- | |
| ``elp`` and ``hsp`` are usually the exported text files (*.txt) from the | |
| Polhemus FastScan system. ``hsp`` refers to the headshape surface points. | |
| ``elp`` refers to the points in head-space that corresponds to the HPI | |
| points. | |
| If ``mrk``\, ``hsp`` or ``elp`` are :term:`array_like` inputs, then the | |
| numbers in xyz coordinates should be in units of meters. | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| """ | |
| _extra_attributes = ("read_stim_ch",) | |
| def __init__( | |
| self, | |
| input_fname, | |
| mrk=None, | |
| elp=None, | |
| hsp=None, | |
| stim=">", | |
| slope="-", | |
| stimthresh=1, | |
| preload=False, | |
| stim_code="binary", | |
| allow_unknown_format=False, | |
| standardize_names=None, | |
| *, | |
| bad_coils=(), | |
| verbose=None, | |
| ): | |
| logger.info(f"Extracting SQD Parameters from {input_fname}...") | |
| input_fname = op.abspath(input_fname) | |
| self.preload = False | |
| logger.info("Creating Raw.info structure...") | |
| info, kit_info = get_kit_info( | |
| input_fname, allow_unknown_format, standardize_names | |
| ) | |
| kit_info["slope"] = slope | |
| kit_info["stimthresh"] = stimthresh | |
| if kit_info["acq_type"] != KIT.CONTINUOUS: | |
| raise TypeError("SQD file contains epochs, not raw data. Wrong reader.") | |
| logger.info("Creating Info structure...") | |
| last_samps = [kit_info["n_samples"] - 1] | |
| self._raw_extras = [kit_info] | |
| _set_stimchannels(self, info, stim, stim_code) | |
| super().__init__( | |
| info, | |
| preload, | |
| last_samps=last_samps, | |
| filenames=[input_fname], | |
| raw_extras=self._raw_extras, | |
| verbose=verbose, | |
| ) | |
| self.info = _call_digitization( | |
| info=self.info, | |
| mrk=mrk, | |
| elp=elp, | |
| hsp=hsp, | |
| kit_info=kit_info, | |
| bad_coils=bad_coils, | |
| ) | |
| logger.info("Ready.") | |
| def read_stim_ch(self, buffer_size=1e5): | |
| """Read events from data. | |
| Parameter | |
| --------- | |
| buffer_size : int | |
| The size of chunk to by which the data are scanned. | |
| Returns | |
| ------- | |
| events : array, [samples] | |
| The event vector (1 x samples). | |
| """ | |
| buffer_size = int(buffer_size) | |
| start = int(self.first_samp) | |
| stop = int(self.last_samp + 1) | |
| pick = pick_types(self.info, meg=False, ref_meg=False, stim=True, exclude=[]) | |
| stim_ch = np.empty((1, stop), dtype=np.int64) | |
| for b_start in range(start, stop, buffer_size): | |
| b_stop = b_start + buffer_size | |
| x = self[pick, b_start:b_stop][0] | |
| stim_ch[:, b_start : b_start + x.shape[1]] = x | |
| return stim_ch | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of raw data.""" | |
| sqd = self._raw_extras[fi] | |
| nchan = sqd["nchan"] | |
| data_left = (stop - start) * nchan | |
| conv_factor = sqd["conv_factor"] | |
| n_bytes = sqd["dtype"].itemsize | |
| assert n_bytes in (2, 4) | |
| # Read up to 100 MB of data at a time. | |
| blk_size = min(data_left, (100000000 // n_bytes // nchan) * nchan) | |
| with open(self.filenames[fi], "rb", buffering=0) as fid: | |
| # extract data | |
| pointer = start * nchan * n_bytes | |
| fid.seek(sqd["dirs"][KIT.DIR_INDEX_RAW_DATA]["offset"] + pointer) | |
| stim = sqd["stim"] | |
| for blk_start in np.arange(0, data_left, blk_size) // nchan: | |
| blk_size = min(blk_size, data_left - blk_start * nchan) | |
| block = np.fromfile(fid, dtype=sqd["dtype"], count=blk_size) | |
| block = block.reshape(nchan, -1, order="F").astype(float) | |
| blk_stop = blk_start + block.shape[1] | |
| data_view = data[:, blk_start:blk_stop] | |
| block *= conv_factor | |
| # Create a synthetic stim channel | |
| if stim is not None: | |
| stim_ch = _make_stim_channel( | |
| block[stim, :], | |
| sqd["slope"], | |
| sqd["stimthresh"], | |
| sqd["stim_code"], | |
| stim, | |
| ) | |
| block = np.vstack((block, stim_ch)) | |
| _mult_cal_one(data_view, block, idx, cals, mult) | |
| # cals are all unity, so can be ignored | |
| def _set_stimchannels(inst, info, stim, stim_code): | |
| """Specify how the trigger channel is synthesized from analog channels. | |
| Has to be done before loading data. For a RawKIT instance that has been | |
| created with preload=True, this method will raise a | |
| NotImplementedError. | |
| Parameters | |
| ---------- | |
| %(info_not_none)s | |
| stim : list of int | '<' | '>' | |
| Can be submitted as list of trigger channels. | |
| If a list is not specified, the default triggers extracted from | |
| misc channels will be used with specified directionality. | |
| '<' means that largest values assigned to the first channel | |
| in sequence. | |
| '>' means the largest trigger assigned to the last channel | |
| in sequence. | |
| stim_code : 'binary' | 'channel' | |
| How to decode trigger values from stim channels. 'binary' read stim | |
| channel events as binary code, 'channel' encodes channel number. | |
| """ | |
| if inst.preload: | |
| raise NotImplementedError("Can't change stim channel after loading data") | |
| _check_option("stim_code", stim_code, ["binary", "channel"]) | |
| if stim is not None: | |
| if isinstance(stim, str): | |
| picks = _default_stim_chs(info) | |
| if stim == "<": | |
| stim = picks[::-1] | |
| elif stim == ">": | |
| stim = picks | |
| else: | |
| raise ValueError( | |
| f"stim needs to be list of int, '>' or '<', not {str(stim)!r}" | |
| ) | |
| else: | |
| stim = np.asarray(stim, int) | |
| if stim.max() >= inst._raw_extras[0]["nchan"]: | |
| raise ValueError( | |
| f"Got stim={stim}, but sqd file only has " | |
| f"{inst._raw_extras[0]['nchan']} channels." | |
| ) | |
| # modify info | |
| nchan = inst._raw_extras[0]["nchan"] + 1 | |
| info["chs"].append( | |
| dict( | |
| cal=KIT.CALIB_FACTOR, | |
| logno=nchan, | |
| scanno=nchan, | |
| range=1.0, | |
| unit=FIFF.FIFF_UNIT_NONE, | |
| unit_mul=FIFF.FIFF_UNITM_NONE, | |
| ch_name="STI 014", | |
| coil_type=FIFF.FIFFV_COIL_NONE, | |
| loc=np.full(12, np.nan), | |
| kind=FIFF.FIFFV_STIM_CH, | |
| coord_frame=FIFF.FIFFV_COORD_UNKNOWN, | |
| ) | |
| ) | |
| info._update_redundant() | |
| inst._raw_extras[0]["stim"] = stim | |
| inst._raw_extras[0]["stim_code"] = stim_code | |
| def _default_stim_chs(info): | |
| """Return default stim channels for SQD files.""" | |
| return pick_types(info, meg=False, ref_meg=False, misc=True, exclude=[])[:8] | |
| def _make_stim_channel(trigger_chs, slope, threshold, stim_code, trigger_values): | |
| """Create synthetic stim channel from multiple trigger channels.""" | |
| if slope == "+": | |
| trig_chs_bin = trigger_chs > threshold | |
| elif slope == "-": | |
| trig_chs_bin = trigger_chs < threshold | |
| else: | |
| raise ValueError("slope needs to be '+' or '-'") | |
| # trigger value | |
| if stim_code == "binary": | |
| trigger_values = 2 ** np.arange(len(trigger_chs)) | |
| elif stim_code != "channel": | |
| raise ValueError( | |
| f"stim_code must be 'binary' or 'channel', got {repr(stim_code)}" | |
| ) | |
| trig_chs = trig_chs_bin * trigger_values[:, np.newaxis] | |
| return np.array(trig_chs.sum(axis=0), ndmin=2) | |
| class EpochsKIT(BaseEpochs): | |
| """Epochs Array object from KIT SQD file. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | |
| Path to the sqd file. | |
| events : array of int, shape (n_events, 3) | path-like | |
| The array of :term:`events`. The first column contains the event time | |
| in samples, with :term:`first_samp` included. The third column contains | |
| the event id. If a path, must yield a ``.txt`` file containing the | |
| events. | |
| If some events don't match the events of interest as specified by | |
| ``event_id``, they will be marked as ``IGNORED`` in the drop log. | |
| %(event_id)s | |
| tmin : float | |
| Start time before event. | |
| %(baseline_epochs)s | |
| %(reject_epochs)s | |
| %(flat)s | |
| %(epochs_reject_tmin_tmax)s | |
| %(kit_mrk)s | |
| %(kit_elp)s | |
| %(kit_hsp)s | |
| allow_unknown_format : bool | |
| Force reading old data that is not officially supported. Alternatively, | |
| read and re-save the data with the KIT MEG Laboratory application. | |
| %(standardize_names)s | |
| %(verbose)s | |
| Notes | |
| ----- | |
| ``elp`` and ``hsp`` are usually the exported text files (*.txt) from the | |
| Polhemus FastScan system. hsp refers to the headshape surface points. elp | |
| refers to the points in head-space that corresponds to the HPI points. | |
| Currently, '*.elp' and '*.hsp' files are NOT supported. | |
| See Also | |
| -------- | |
| mne.Epochs : Documentation of attributes and methods. | |
| """ | |
| def __init__( | |
| self, | |
| input_fname, | |
| events, | |
| event_id=None, | |
| tmin=0, | |
| baseline=None, | |
| reject=None, | |
| flat=None, | |
| reject_tmin=None, | |
| reject_tmax=None, | |
| mrk=None, | |
| elp=None, | |
| hsp=None, | |
| allow_unknown_format=False, | |
| standardize_names=None, | |
| verbose=None, | |
| ): | |
| if isinstance(events, str | PathLike | Path): | |
| events = read_events(events) | |
| input_fname = str( | |
| _check_fname(fname=input_fname, must_exist=True, overwrite="read") | |
| ) | |
| logger.info(f"Extracting KIT Parameters from {input_fname}...") | |
| self.info, kit_info = get_kit_info( | |
| input_fname, allow_unknown_format, standardize_names | |
| ) | |
| kit_info.update(input_fname=input_fname) | |
| self._raw_extras = [kit_info] | |
| self.filenames = [] | |
| if len(events) != self._raw_extras[0]["n_epochs"]: | |
| raise ValueError("Event list does not match number of epochs.") | |
| if self._raw_extras[0]["acq_type"] == KIT.EPOCHS: | |
| self._raw_extras[0]["data_length"] = KIT.INT | |
| else: | |
| raise TypeError( | |
| "SQD file contains raw data, not epochs or average. Wrong reader." | |
| ) | |
| if event_id is None: # convert to int to make typing-checks happy | |
| event_id = {str(e): int(e) for e in np.unique(events[:, 2])} | |
| for key, val in event_id.items(): | |
| if val not in events[:, 2]: | |
| raise ValueError(f"No matching events found for {key} (event id {val})") | |
| data = self._read_kit_data() | |
| assert data.shape == ( | |
| self._raw_extras[0]["n_epochs"], | |
| self.info["nchan"], | |
| self._raw_extras[0]["frame_length"], | |
| ) | |
| tmax = ((data.shape[2] - 1) / self.info["sfreq"]) + tmin | |
| super().__init__( | |
| self.info, | |
| data, | |
| events, | |
| event_id, | |
| tmin, | |
| tmax, | |
| baseline, | |
| reject=reject, | |
| flat=flat, | |
| reject_tmin=reject_tmin, | |
| reject_tmax=reject_tmax, | |
| filename=input_fname, | |
| verbose=verbose, | |
| ) | |
| self.info = _call_digitization( | |
| info=self.info, mrk=mrk, elp=elp, hsp=hsp, kit_info=kit_info | |
| ) | |
| logger.info("Ready.") | |
| def _read_kit_data(self): | |
| """Read epochs data. | |
| Returns | |
| ------- | |
| data : array, [channels x samples] | |
| the data matrix (channels x samples). | |
| times : array, [samples] | |
| returns the time values corresponding to the samples. | |
| """ | |
| info = self._raw_extras[0] | |
| epoch_length = info["frame_length"] | |
| n_epochs = info["n_epochs"] | |
| n_samples = info["n_samples"] | |
| input_fname = info["input_fname"] | |
| dtype = info["dtype"] | |
| nchan = info["nchan"] | |
| with open(input_fname, "rb", buffering=0) as fid: | |
| fid.seek(info["dirs"][KIT.DIR_INDEX_RAW_DATA]["offset"]) | |
| count = n_samples * nchan | |
| data = np.fromfile(fid, dtype=dtype, count=count) | |
| data = data.reshape((n_samples, nchan)).T | |
| data = data * info["conv_factor"] | |
| data = data.reshape((nchan, n_epochs, epoch_length)) | |
| data = data.transpose((1, 0, 2)) | |
| return data | |
| def _read_dir(fid): | |
| return dict( | |
| offset=np.fromfile(fid, UINT32, 1)[0], | |
| size=np.fromfile(fid, INT32, 1)[0], | |
| max_count=np.fromfile(fid, INT32, 1)[0], | |
| count=np.fromfile(fid, INT32, 1)[0], | |
| ) | |
| def _read_dirs(fid, verbose=None): | |
| dirs = list() | |
| dirs.append(_read_dir(fid)) | |
| for ii in range(dirs[0]["count"] - 1): | |
| logger.debug(f" KIT dir entry {ii} @ {fid.tell()}") | |
| dirs.append(_read_dir(fid)) | |
| assert len(dirs) == dirs[KIT.DIR_INDEX_DIR]["count"] | |
| return dirs | |
| def get_kit_info(rawfile, allow_unknown_format, standardize_names=None, verbose=None): | |
| """Extract all the information from the sqd/con file. | |
| Parameters | |
| ---------- | |
| rawfile : path-like | |
| KIT file to be read. | |
| allow_unknown_format : bool | |
| Force reading old data that is not officially supported. Alternatively, | |
| read and re-save the data with the KIT MEG Laboratory application. | |
| %(standardize_names)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| %(info_not_none)s | |
| sqd : dict | |
| A dict containing all the sqd parameter settings. | |
| """ | |
| sqd = dict() | |
| sqd["rawfile"] = rawfile | |
| unsupported_format = False | |
| with open(rawfile, "rb", buffering=0) as fid: # buffering=0 for np bug | |
| # | |
| # directories (0) | |
| # | |
| sqd["dirs"] = dirs = _read_dirs(fid) | |
| # | |
| # system (1) | |
| # | |
| fid.seek(dirs[KIT.DIR_INDEX_SYSTEM]["offset"]) | |
| # check file format version | |
| version, revision = np.fromfile(fid, INT32, 2) | |
| if version < 2 or (version == 2 and revision < 3): | |
| version_string = f"V{version}R{revision:03d}" | |
| if allow_unknown_format: | |
| unsupported_format = True | |
| warn(f"Force loading KIT format {version_string}") | |
| else: | |
| raise UnsupportedKITFormat( | |
| version_string, | |
| f"SQD file format {version_string} is not officially supported. " | |
| "Set allow_unknown_format=True to load it anyways.", | |
| ) | |
| sysid = np.fromfile(fid, INT32, 1)[0] | |
| # basic info | |
| system_name = _read_name(fid, n=128) | |
| # model name | |
| model_name = _read_name(fid, n=128) | |
| # channels | |
| sqd["nchan"] = channel_count = int(np.fromfile(fid, INT32, 1)[0]) | |
| comment = _read_name(fid, n=256) | |
| create_time, last_modified_time = np.fromfile(fid, INT32, 2) | |
| del last_modified_time | |
| fid.seek(KIT.INT * 3, SEEK_CUR) # reserved | |
| dewar_style = np.fromfile(fid, INT32, 1)[0] | |
| fid.seek(KIT.INT * 3, SEEK_CUR) # spare | |
| fll_type = np.fromfile(fid, INT32, 1)[0] | |
| fid.seek(KIT.INT * 3, SEEK_CUR) # spare | |
| trigger_type = np.fromfile(fid, INT32, 1)[0] | |
| fid.seek(KIT.INT * 3, SEEK_CUR) # spare | |
| adboard_type = np.fromfile(fid, INT32, 1)[0] | |
| fid.seek(KIT.INT * 29, SEEK_CUR) # reserved | |
| if version < 2 or (version == 2 and revision <= 3): | |
| adc_range = float(np.fromfile(fid, INT32, 1)[0]) | |
| else: | |
| adc_range = np.fromfile(fid, FLOAT64, 1)[0] | |
| adc_polarity, adc_allocated, adc_stored = np.fromfile(fid, INT32, 3) | |
| del adc_polarity | |
| system_name = system_name.replace("\x00", "") | |
| system_name = system_name.strip().replace("\n", "/") | |
| model_name = model_name.replace("\x00", "") | |
| model_name = model_name.strip().replace("\n", "/") | |
| full_version = f"V{version:d}R{revision:03d}" | |
| logger.debug("SQD file basic information:") | |
| logger.debug("Meg160 version = %s", full_version) | |
| logger.debug("System ID = %i", sysid) | |
| logger.debug("System name = %s", system_name) | |
| logger.debug("Model name = %s", model_name) | |
| logger.debug("Channel count = %i", channel_count) | |
| logger.debug("Comment = %s", comment) | |
| logger.debug("Dewar style = %i", dewar_style) | |
| logger.debug("FLL type = %i", fll_type) | |
| logger.debug("Trigger type = %i", trigger_type) | |
| logger.debug("A/D board type = %i", adboard_type) | |
| logger.debug("ADC range = +/-%s[V]", adc_range / 2.0) | |
| logger.debug("ADC allocate = %i[bit]", adc_allocated) | |
| logger.debug("ADC bit = %i[bit]", adc_stored) | |
| # MGH description: 'acquisition (megacq) VectorView system at NMR-MGH' | |
| description = f"{system_name} ({sysid}) {full_version} {model_name}" | |
| assert adc_allocated % 8 == 0 | |
| sqd["dtype"] = np.dtype(f"<i{adc_allocated // 8}") | |
| # check that we can read this file | |
| if fll_type not in KIT.FLL_SETTINGS: | |
| fll_types = sorted(KIT.FLL_SETTINGS.keys()) | |
| use_fll_type = fll_types[np.searchsorted(fll_types, fll_type) - 1] | |
| warn( | |
| "Unknown site filter settings (FLL) for system " | |
| f'"{system_name}" model "{model_name}" (ID {sysid}), will assume FLL ' | |
| f"{fll_type}->{use_fll_type}, check your data for correctness, " | |
| "including channel scales and filter settings!" | |
| ) | |
| fll_type = use_fll_type | |
| # | |
| # channel information (4) | |
| # | |
| chan_dir = dirs[KIT.DIR_INDEX_CHANNELS] | |
| chan_offset, chan_size = chan_dir["offset"], chan_dir["size"] | |
| sqd["channels"] = channels = [] | |
| exg_gains = list() | |
| for i in range(channel_count): | |
| fid.seek(chan_offset + chan_size * i) | |
| (channel_type,) = np.fromfile(fid, INT32, 1) | |
| # System 52 mislabeled reference channels as NULL. This was fixed | |
| # in system 53; not sure about 51... | |
| if sysid == 52 and i < 160 and channel_type == KIT.CHANNEL_NULL: | |
| channel_type = KIT.CHANNEL_MAGNETOMETER_REFERENCE | |
| if channel_type in KIT.CHANNELS_MEG: | |
| if channel_type not in KIT.CH_TO_FIFF_COIL: | |
| raise NotImplementedError( | |
| "KIT channel type {channel_type} can not be read. Please " | |
| "contact the mne-python developers." | |
| ) | |
| channels.append( | |
| { | |
| "type": channel_type, | |
| # (x, y, z, theta, phi) for all MEG channels. Some channel | |
| # types have additional information which we're not using. | |
| "loc": np.fromfile(fid, dtype=FLOAT64, count=5), | |
| } | |
| ) | |
| if channel_type in KIT.CHANNEL_NAME_NCHAR: | |
| fid.seek(16, SEEK_CUR) # misc fields | |
| channels[-1]["name"] = _read_name(fid, channel_type) | |
| elif channel_type in KIT.CHANNELS_MISC: | |
| (channel_no,) = np.fromfile(fid, INT32, 1) | |
| fid.seek(4, SEEK_CUR) | |
| name = _read_name(fid, channel_type) | |
| channels.append( | |
| { | |
| "type": channel_type, | |
| "no": channel_no, | |
| "name": name, | |
| } | |
| ) | |
| if channel_type in (KIT.CHANNEL_EEG, KIT.CHANNEL_ECG): | |
| offset = 6 if channel_type == KIT.CHANNEL_EEG else 8 | |
| fid.seek(offset, SEEK_CUR) | |
| exg_gains.append(np.fromfile(fid, FLOAT64, 1)[0]) | |
| elif channel_type == KIT.CHANNEL_NULL: | |
| channels.append({"type": channel_type}) | |
| else: | |
| raise OSError("Unknown KIT channel type: {channel_type}") | |
| exg_gains = np.array(exg_gains) | |
| # | |
| # Channel sensitivity information: (5) | |
| # | |
| # only sensor channels requires gain. the additional misc channels | |
| # (trigger channels, audio and voice channels) are passed | |
| # through unaffected | |
| fid.seek(dirs[KIT.DIR_INDEX_CALIBRATION]["offset"]) | |
| # (offset [Volt], gain [Tesla/Volt]) for each channel | |
| sensitivity = np.fromfile(fid, dtype=FLOAT64, count=channel_count * 2) | |
| sensitivity.shape = (channel_count, 2) | |
| channel_offset, channel_gain = sensitivity.T | |
| assert (channel_offset == 0).all() # otherwise we have a problem | |
| # | |
| # amplifier gain (7) | |
| # | |
| fid.seek(dirs[KIT.DIR_INDEX_AMP_FILTER]["offset"]) | |
| amp_data = np.fromfile(fid, INT32, 1)[0] | |
| if fll_type >= 100: # Kapper Type | |
| # gain: mask bit | |
| gain1 = (amp_data & 0x00007000) >> 12 | |
| gain2 = (amp_data & 0x70000000) >> 28 | |
| gain3 = (amp_data & 0x07000000) >> 24 | |
| amp_gain = KIT.GAINS[gain1] * KIT.GAINS[gain2] * KIT.GAINS[gain3] | |
| # filter settings | |
| hpf = (amp_data & 0x00000700) >> 8 | |
| lpf = (amp_data & 0x00070000) >> 16 | |
| bef = (amp_data & 0x00000003) >> 0 | |
| else: # Hanger Type | |
| # gain | |
| input_gain = (amp_data & 0x1800) >> 11 | |
| output_gain = (amp_data & 0x0007) >> 0 | |
| amp_gain = KIT.GAINS[input_gain] * KIT.GAINS[output_gain] | |
| # filter settings | |
| hpf = (amp_data & 0x007) >> 4 | |
| lpf = (amp_data & 0x0700) >> 8 | |
| bef = (amp_data & 0xC000) >> 14 | |
| hpf_options, lpf_options, bef_options = KIT.FLL_SETTINGS[fll_type] | |
| sqd["highpass"] = KIT.HPFS[hpf_options][hpf] | |
| sqd["lowpass"] = KIT.LPFS[lpf_options][lpf] | |
| sqd["notch"] = KIT.BEFS[bef_options][bef] | |
| # | |
| # Acquisition Parameters (8) | |
| # | |
| fid.seek(dirs[KIT.DIR_INDEX_ACQ_COND]["offset"]) | |
| (sqd["acq_type"],) = (acq_type,) = np.fromfile(fid, INT32, 1) | |
| (sqd["sfreq"],) = np.fromfile(fid, FLOAT64, 1) | |
| if acq_type == KIT.CONTINUOUS: | |
| # samples_count, = np.fromfile(fid, INT32, 1) | |
| fid.seek(KIT.INT, SEEK_CUR) | |
| (sqd["n_samples"],) = np.fromfile(fid, INT32, 1) | |
| elif acq_type == KIT.EVOKED or acq_type == KIT.EPOCHS: | |
| (sqd["frame_length"],) = np.fromfile(fid, INT32, 1) | |
| (sqd["pretrigger_length"],) = np.fromfile(fid, INT32, 1) | |
| (sqd["average_count"],) = np.fromfile(fid, INT32, 1) | |
| (sqd["n_epochs"],) = np.fromfile(fid, INT32, 1) | |
| if acq_type == KIT.EVOKED: | |
| sqd["n_samples"] = sqd["frame_length"] | |
| else: | |
| sqd["n_samples"] = sqd["frame_length"] * sqd["n_epochs"] | |
| else: | |
| raise OSError( | |
| f"Invalid acquisition type: {acq_type}. Your file is neither " | |
| "continuous nor epoched data." | |
| ) | |
| # | |
| # digitization information (12 and 26) | |
| # | |
| dig_dir = dirs[KIT.DIR_INDEX_DIG_POINTS] | |
| cor_dir = dirs[KIT.DIR_INDEX_COREG] | |
| dig = dict() | |
| hsp = list() | |
| if dig_dir["count"] > 0 and cor_dir["count"] > 0: | |
| # directories (0) | |
| fid.seek(dig_dir["offset"]) | |
| for _ in range(dig_dir["count"]): | |
| name = _read_name(fid, n=8).strip() | |
| # Sometimes there are mismatches (e.g., AFz vs AFZ) between | |
| # the channel name and its digitized, name, so let's be case | |
| # insensitive. It will also prevent collisions with HSP | |
| name = name.lower() | |
| rr = np.fromfile(fid, FLOAT64, 3) | |
| if name: | |
| assert name not in dig | |
| dig[name] = rr | |
| else: | |
| hsp.append(rr) | |
| # nasion, lpa, rpa, HPI in native space | |
| elp = [] | |
| for key in ( | |
| "fidnz", | |
| "fidt9", | |
| "fidt10", | |
| "hpi_1", | |
| "hpi_2", | |
| "hpi_3", | |
| "hpi_4", | |
| "hpi_5", | |
| ): | |
| if key in dig and np.isfinite(dig[key]).all(): | |
| elp.append(dig.pop(key)) | |
| elp = np.array(elp) | |
| hsp = np.array(hsp, float).reshape(-1, 3) | |
| if elp.shape not in ((6, 3), (7, 3), (8, 3)): | |
| raise RuntimeError(f"Fewer than 3 HPI coils found, got {len(elp) - 3}") | |
| # coregistration | |
| fid.seek(cor_dir["offset"]) | |
| mrk = np.zeros((elp.shape[0] - 3, 3)) | |
| meg_done = [True] * 5 | |
| for _ in range(cor_dir["count"]): | |
| done = np.fromfile(fid, INT32, 1)[0] | |
| fid.seek( | |
| 16 * KIT.DOUBLE + 16 * KIT.DOUBLE, # meg_to_mri # mri_to_meg | |
| SEEK_CUR, | |
| ) | |
| marker_count = np.fromfile(fid, INT32, 1)[0] | |
| if not done: | |
| continue | |
| assert marker_count >= len(mrk) | |
| for mi in range(len(mrk)): | |
| mri_type, meg_type, mri_done, this_meg_done = np.fromfile( | |
| fid, INT32, 4 | |
| ) | |
| del mri_type, meg_type, mri_done | |
| meg_done[mi] = bool(this_meg_done) | |
| fid.seek(3 * KIT.DOUBLE, SEEK_CUR) # mri_pos | |
| mrk[mi] = np.fromfile(fid, FLOAT64, 3) | |
| fid.seek(256, SEEK_CUR) # marker_file (char) | |
| if not all(meg_done): | |
| logger.info( | |
| f"Keeping {sum(meg_done)}/{len(meg_done)} HPI " | |
| "coils that were digitized" | |
| ) | |
| elp = elp[[True] * 3 + meg_done] | |
| mrk = mrk[meg_done] | |
| sqd.update(hsp=hsp, elp=elp, mrk=mrk) | |
| # precompute conversion factor for reading data | |
| if unsupported_format: | |
| if sysid not in LEGACY_AMP_PARAMS: | |
| raise OSError(f"Legacy parameters for system ID {sysid} unavailable.") | |
| adc_range, adc_stored = LEGACY_AMP_PARAMS[sysid] | |
| is_meg = np.array([ch["type"] in KIT.CHANNELS_MEG for ch in channels]) | |
| ad_to_volt = adc_range / (2.0**adc_stored) | |
| ad_to_tesla = ad_to_volt / amp_gain * channel_gain | |
| conv_factor = np.where(is_meg, ad_to_tesla, ad_to_volt) | |
| # XXX this is a bit of a hack. Should probably do this more cleanly at | |
| # some point... the 2 ** (adc_stored - 14) was empirically determined using | |
| # the test files with known amplitudes. The conv_factors need to be | |
| # replaced by these values otherwise we're off by a factor off 5000.0 | |
| # for the EEG data. | |
| is_exg = [ch["type"] in (KIT.CHANNEL_EEG, KIT.CHANNEL_ECG) for ch in channels] | |
| exg_gains /= 2.0 ** (adc_stored - 14) | |
| exg_gains[exg_gains == 0] = ad_to_volt | |
| conv_factor[is_exg] = exg_gains | |
| sqd["conv_factor"] = conv_factor[:, np.newaxis] | |
| # Create raw.info dict for raw fif object with SQD data | |
| info = _empty_info(float(sqd["sfreq"])) | |
| info.update( | |
| meas_date=_stamp_to_dt((create_time, 0)), | |
| lowpass=sqd["lowpass"], | |
| highpass=sqd["highpass"], | |
| kit_system_id=sysid, | |
| description=description, | |
| dev_head_t=Transform("meg", "head"), | |
| ) | |
| # Creates a list of dicts of meg channels for raw.info | |
| logger.info("Setting channel info structure...") | |
| info["chs"] = fiff_channels = [] | |
| channel_index = defaultdict(lambda: 0) | |
| sqd["eeg_dig"] = OrderedDict() | |
| for idx, ch in enumerate(channels, 1): | |
| if ch["type"] in KIT.CHANNELS_MEG: | |
| ch_name = ch.get("name", "") | |
| if ch_name == "" or standardize_names: | |
| ch_name = f"MEG {idx:03d}" | |
| # create three orthogonal vector | |
| # ch_angles[0]: theta, ch_angles[1]: phi | |
| theta, phi = np.radians(ch["loc"][3:]) | |
| x = sin(theta) * cos(phi) | |
| y = sin(theta) * sin(phi) | |
| z = cos(theta) | |
| vec_z = np.array([x, y, z]) | |
| vec_z /= np.linalg.norm(vec_z) | |
| vec_x = np.zeros(vec_z.size, dtype=np.float64) | |
| if vec_z[1] < vec_z[2]: | |
| if vec_z[0] < vec_z[1]: | |
| vec_x[0] = 1.0 | |
| else: | |
| vec_x[1] = 1.0 | |
| elif vec_z[0] < vec_z[2]: | |
| vec_x[0] = 1.0 | |
| else: | |
| vec_x[2] = 1.0 | |
| vec_x -= np.sum(vec_x * vec_z) * vec_z | |
| vec_x /= np.linalg.norm(vec_x) | |
| vec_y = np.cross(vec_z, vec_x) | |
| # transform to Neuromag like coordinate space | |
| vecs = np.vstack((ch["loc"][:3], vec_x, vec_y, vec_z)) | |
| vecs = apply_trans(als_ras_trans, vecs) | |
| unit = FIFF.FIFF_UNIT_T | |
| loc = vecs.ravel() | |
| else: | |
| ch_type_label = KIT.CH_LABEL[ch["type"]] | |
| channel_index[ch_type_label] += 1 | |
| ch_type_index = channel_index[ch_type_label] | |
| ch_name = ch.get("name", "") | |
| eeg_name = ch_name.lower() | |
| # some files have all EEG labeled as EEG | |
| if ch_name in ("", "EEG") or standardize_names: | |
| ch_name = f"{ch_type_label} {ch_type_index:03d}" | |
| unit = FIFF.FIFF_UNIT_V | |
| loc = np.zeros(12) | |
| if eeg_name and eeg_name in dig: | |
| loc[:3] = sqd["eeg_dig"][eeg_name] = dig[eeg_name] | |
| fiff_channels.append( | |
| dict( | |
| cal=KIT.CALIB_FACTOR, | |
| logno=idx, | |
| scanno=idx, | |
| range=KIT.RANGE, | |
| unit=unit, | |
| unit_mul=KIT.UNIT_MUL, | |
| ch_name=ch_name, | |
| coord_frame=FIFF.FIFFV_COORD_DEVICE, | |
| coil_type=KIT.CH_TO_FIFF_COIL[ch["type"]], | |
| kind=KIT.CH_TO_FIFF_KIND[ch["type"]], | |
| loc=loc, | |
| ) | |
| ) | |
| info._unlocked = False | |
| info._update_redundant() | |
| return info, sqd | |
| def _read_name(fid, ch_type=None, n=None): | |
| n = n if ch_type is None else KIT.CHANNEL_NAME_NCHAR[ch_type] | |
| return fid.read(n).split(b"\x00")[0].decode("utf-8") | |
| def read_raw_kit( | |
| input_fname, | |
| mrk=None, | |
| elp=None, | |
| hsp=None, | |
| stim=">", | |
| slope="-", | |
| stimthresh=1, | |
| preload=False, | |
| stim_code="binary", | |
| allow_unknown_format=False, | |
| standardize_names=False, | |
| *, | |
| bad_coils=(), | |
| verbose=None, | |
| ) -> RawKIT: | |
| r"""Reader function for Ricoh/KIT conversion to FIF. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | |
| Path to the SQD file. | |
| %(kit_mrk)s | |
| %(kit_elp)s | |
| %(kit_hsp)s | |
| %(kit_stim)s | |
| %(kit_slope)s | |
| %(kit_stimthresh)s | |
| %(preload)s | |
| %(kit_stimcode)s | |
| allow_unknown_format : bool | |
| Force reading old data that is not officially supported. Alternatively, | |
| read and re-save the data with the KIT MEG Laboratory application. | |
| %(standardize_names)s | |
| %(kit_badcoils)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawKIT | |
| A Raw object containing KIT data. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods of RawKIT. | |
| Notes | |
| ----- | |
| ``elp`` and ``hsp`` are usually the exported text files (\*.txt) from the | |
| Polhemus FastScan system. ``hsp`` refers to the headshape surface points. | |
| ``elp`` refers to the points in head-space that corresponds to the HPI | |
| points. | |
| If ``mrk``\, ``hsp`` or ``elp`` are :term:`array_like` inputs, then the | |
| numbers in xyz coordinates should be in units of meters. | |
| """ | |
| return RawKIT( | |
| input_fname=input_fname, | |
| mrk=mrk, | |
| elp=elp, | |
| hsp=hsp, | |
| stim=stim, | |
| slope=slope, | |
| stimthresh=stimthresh, | |
| preload=preload, | |
| stim_code=stim_code, | |
| allow_unknown_format=allow_unknown_format, | |
| standardize_names=standardize_names, | |
| bad_coils=bad_coils, | |
| verbose=verbose, | |
| ) | |
| def read_epochs_kit( | |
| input_fname, | |
| events, | |
| event_id=None, | |
| mrk=None, | |
| elp=None, | |
| hsp=None, | |
| allow_unknown_format=False, | |
| standardize_names=False, | |
| verbose=None, | |
| ) -> EpochsKIT: | |
| """Reader function for Ricoh/KIT epochs files. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | |
| Path to the SQD file. | |
| events : array of int, shape (n_events, 3) | path-like | |
| The array of :term:`events`. The first column contains the event time | |
| in samples, with :term:`first_samp` included. The third column contains | |
| the event id. If a path, must yield a ``.txt`` file containing the | |
| events. | |
| If some events don't match the events of interest as specified by | |
| ``event_id``, they will be marked as ``IGNORED`` in the drop log. | |
| %(event_id)s | |
| %(kit_mrk)s | |
| %(kit_elp)s | |
| %(kit_hsp)s | |
| allow_unknown_format : bool | |
| Force reading old data that is not officially supported. Alternatively, | |
| read and re-save the data with the KIT MEG Laboratory application. | |
| %(standardize_names)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| EpochsKIT : instance of BaseEpochs | |
| The epochs. | |
| See Also | |
| -------- | |
| mne.Epochs : Documentation of attributes and methods. | |
| Notes | |
| ----- | |
| .. versionadded:: 0.9.0 | |
| """ | |
| epochs = EpochsKIT( | |
| input_fname=input_fname, | |
| events=events, | |
| event_id=event_id, | |
| mrk=mrk, | |
| elp=elp, | |
| hsp=hsp, | |
| allow_unknown_format=allow_unknown_format, | |
| standardize_names=standardize_names, | |
| verbose=verbose, | |
| ) | |
| return epochs | |