Spaces:
Running
Running
| """Populate measurement info.""" | |
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import os.path as op | |
| from calendar import timegm | |
| from time import strptime | |
| import numpy as np | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.ctf_comp import _add_kind, _calibrate_comp | |
| from ..._fiff.meas_info import _empty_info | |
| from ..._fiff.write import get_new_file_id | |
| from ...annotations import Annotations | |
| from ...transforms import ( | |
| _coord_frame_name, | |
| apply_trans, | |
| combine_transforms, | |
| invert_transform, | |
| ) | |
| from ...utils import _clean_names, logger, warn | |
| from .constants import CTF | |
| _ctf_to_fiff = { | |
| CTF.CTFV_COIL_LPA: FIFF.FIFFV_POINT_LPA, | |
| CTF.CTFV_COIL_RPA: FIFF.FIFFV_POINT_RPA, | |
| CTF.CTFV_COIL_NAS: FIFF.FIFFV_POINT_NASION, | |
| } | |
| def _pick_isotrak_and_hpi_coils(res4, coils, t): | |
| """Pick the HPI coil locations given in device coordinates.""" | |
| if coils is None: | |
| return list(), list() | |
| dig = list() | |
| hpi_result = dict(dig_points=list()) | |
| n_coil_dev = 0 | |
| n_coil_head = 0 | |
| for p in coils: | |
| if p["valid"]: | |
| if p["kind"] in [CTF.CTFV_COIL_LPA, CTF.CTFV_COIL_RPA, CTF.CTFV_COIL_NAS]: | |
| kind = FIFF.FIFFV_POINT_CARDINAL | |
| ident = _ctf_to_fiff[p["kind"]] | |
| else: # CTF.CTFV_COIL_SPARE | |
| kind = FIFF.FIFFV_POINT_HPI | |
| ident = p["kind"] | |
| if p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_DEVICE: | |
| if t is None or t["t_ctf_dev_dev"] is None: | |
| raise RuntimeError( | |
| "No coordinate transformation available for HPI coil locations" | |
| ) | |
| d = dict( | |
| kind=kind, | |
| ident=ident, | |
| r=apply_trans(t["t_ctf_dev_dev"], p["r"]), | |
| coord_frame=FIFF.FIFFV_COORD_UNKNOWN, | |
| ) | |
| hpi_result["dig_points"].append(d) | |
| n_coil_dev += 1 | |
| elif p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD: | |
| if t is None or t["t_ctf_head_head"] is None: | |
| raise RuntimeError( | |
| "No coordinate transformation " | |
| "available for (virtual) Polhemus data" | |
| ) | |
| d = dict( | |
| kind=kind, | |
| ident=ident, | |
| r=apply_trans(t["t_ctf_head_head"], p["r"]), | |
| coord_frame=FIFF.FIFFV_COORD_HEAD, | |
| ) | |
| dig.append(d) | |
| n_coil_head += 1 | |
| if n_coil_head > 0: | |
| logger.info(" Polhemus data for %d HPI coils added", n_coil_head) | |
| if n_coil_dev > 0: | |
| logger.info( | |
| " Device coordinate locations for %d HPI coils added", n_coil_dev | |
| ) | |
| return dig, [hpi_result] | |
| def _convert_time(date_str, time_str): | |
| """Convert date and time strings to float time.""" | |
| if date_str == time_str == "": | |
| date_str = "01/01/1970" | |
| time_str = "00:00:00" | |
| logger.info( | |
| "No date or time found, setting to the start of the " | |
| "POSIX epoch (1970/01/01 midnight)" | |
| ) | |
| for fmt in ("%d/%m/%Y", "%d-%b-%Y", "%a, %b %d, %Y", "%Y/%m/%d"): | |
| try: | |
| date = strptime(date_str.strip(), fmt) | |
| except ValueError: | |
| pass | |
| else: | |
| break | |
| else: | |
| raise RuntimeError( | |
| f"Illegal date: {date_str}.\nIf the language of the date does not " | |
| "correspond to your local machine's language try to set the " | |
| "locale to the language of the date string:\n" | |
| 'locale.setlocale(locale.LC_ALL, "en_US")' | |
| ) | |
| for fmt in ("%H:%M:%S", "%H:%M"): | |
| try: | |
| time = strptime(time_str, fmt) | |
| except ValueError: | |
| pass | |
| else: | |
| break | |
| else: | |
| raise RuntimeError(f"Illegal time: {time_str}") | |
| # MNE-C uses mktime which uses local time, but here we instead decouple | |
| # conversion location from the process, and instead assume that the | |
| # acquisition was in GMT. This will be wrong for most sites, but at least | |
| # the value we obtain here won't depend on the geographical location | |
| # that the file was converted. | |
| res = timegm( | |
| ( | |
| date.tm_year, | |
| date.tm_mon, | |
| date.tm_mday, | |
| time.tm_hour, | |
| time.tm_min, | |
| time.tm_sec, | |
| date.tm_wday, | |
| date.tm_yday, | |
| date.tm_isdst, | |
| ) | |
| ) | |
| return res | |
| def _get_plane_vectors(ez): | |
| """Get two orthogonal vectors orthogonal to ez (ez will be modified).""" | |
| assert ez.shape == (3,) | |
| ez_len = np.sqrt(np.sum(ez * ez)) | |
| if ez_len == 0: | |
| raise RuntimeError("Zero length normal. Cannot proceed.") | |
| if np.abs(ez_len - np.abs(ez[2])) < 1e-5: # ez already in z-direction | |
| ex = np.array([1.0, 0.0, 0.0]) | |
| else: | |
| ex = np.zeros(3) | |
| if ez[1] < ez[2]: | |
| ex[0 if ez[0] < ez[1] else 1] = 1.0 | |
| else: | |
| ex[0 if ez[0] < ez[2] else 2] = 1.0 | |
| ez /= ez_len | |
| ex -= np.dot(ez, ex) * ez | |
| ex /= np.sqrt(np.sum(ex * ex)) | |
| ey = np.cross(ez, ex) | |
| return ex, ey | |
| def _at_origin(x): | |
| """Determine if a vector is at the origin.""" | |
| return np.sum(x * x) < 1e-8 | |
| def _check_comp_ch(cch, kind, desired=None): | |
| if desired is None: | |
| desired = cch["grad_order_no"] | |
| if cch["grad_order_no"] != desired: | |
| raise RuntimeError( | |
| f"{kind} channel with inconsistent compensation " | |
| f"grade {cch['grad_order_no']}, should be {desired}" | |
| ) | |
| return desired | |
| def _convert_channel_info(res4, t, use_eeg_pos): | |
| """Convert CTF channel information to fif format.""" | |
| nmeg = neeg = nstim = nmisc = nref = 0 | |
| chs = list() | |
| this_comp = None | |
| for k, cch in enumerate(res4["chs"]): | |
| cal = float(1.0 / (cch["proper_gain"] * cch["qgain"])) | |
| ch = dict( | |
| scanno=k + 1, | |
| range=1.0, | |
| cal=cal, | |
| loc=np.full(12, np.nan), | |
| unit_mul=FIFF.FIFF_UNITM_NONE, | |
| ch_name=cch["ch_name"][:15], | |
| coil_type=FIFF.FIFFV_COIL_NONE, | |
| ) | |
| del k | |
| chs.append(ch) | |
| # Create the channel position information | |
| if cch["sensor_type_index"] in ( | |
| CTF.CTFV_REF_MAG_CH, | |
| CTF.CTFV_REF_GRAD_CH, | |
| CTF.CTFV_MEG_CH, | |
| ): | |
| # Extra check for a valid MEG channel | |
| if ( | |
| np.sum(cch["coil"]["pos"][0] ** 2) < 1e-6 | |
| or np.sum(cch["coil"]["norm"][0] ** 2) < 1e-6 | |
| ): | |
| nmisc += 1 | |
| ch.update( | |
| logno=nmisc, | |
| coord_frame=FIFF.FIFFV_COORD_UNKNOWN, | |
| kind=FIFF.FIFFV_MISC_CH, | |
| unit=FIFF.FIFF_UNIT_V, | |
| ) | |
| text = "MEG" | |
| if cch["sensor_type_index"] != CTF.CTFV_MEG_CH: | |
| text += " ref" | |
| warn( | |
| f"{text} channel {ch['ch_name']} did not have position " | |
| "assigned, so it was changed to a MISC channel" | |
| ) | |
| continue | |
| ch["unit"] = FIFF.FIFF_UNIT_T | |
| # Set up the local coordinate frame | |
| r0 = cch["coil"]["pos"][0].copy() | |
| ez = cch["coil"]["norm"][0].copy() | |
| # It turns out that positive proper_gain requires swapping | |
| # of the normal direction | |
| if cch["proper_gain"] > 0.0: | |
| ez *= -1 | |
| # Check how the other vectors should be defined | |
| off_diag = False | |
| # Default: ex and ey are arbitrary in the plane normal to ez | |
| if cch["sensor_type_index"] == CTF.CTFV_REF_GRAD_CH: | |
| # The off-diagonal gradiometers are an exception: | |
| # | |
| # We use the same convention for ex as for Neuromag planar | |
| # gradiometers: ex pointing in the positive gradient direction | |
| diff = cch["coil"]["pos"][0] - cch["coil"]["pos"][1] | |
| size = np.sqrt(np.sum(diff * diff)) | |
| if size > 0.0: | |
| diff /= size | |
| # Is ez normal to the line joining the coils? | |
| if np.abs(np.dot(diff, ez)) < 1e-3: | |
| off_diag = True | |
| # Handle the off-diagonal gradiometer coordinate system | |
| r0 -= size * diff / 2.0 | |
| ex = diff | |
| ey = np.cross(ez, ex) | |
| else: | |
| ex, ey = _get_plane_vectors(ez) | |
| else: | |
| ex, ey = _get_plane_vectors(ez) | |
| # Transform into a Neuromag-like device coordinate system | |
| ch["loc"] = np.concatenate( | |
| [ | |
| apply_trans(t["t_ctf_dev_dev"], r0), | |
| apply_trans(t["t_ctf_dev_dev"], ex, move=False), | |
| apply_trans(t["t_ctf_dev_dev"], ey, move=False), | |
| apply_trans(t["t_ctf_dev_dev"], ez, move=False), | |
| ] | |
| ) | |
| del r0, ex, ey, ez | |
| # Set the coil type | |
| if cch["sensor_type_index"] == CTF.CTFV_REF_MAG_CH: | |
| ch["kind"] = FIFF.FIFFV_REF_MEG_CH | |
| ch["coil_type"] = FIFF.FIFFV_COIL_CTF_REF_MAG | |
| nref += 1 | |
| ch["logno"] = nref | |
| elif cch["sensor_type_index"] == CTF.CTFV_REF_GRAD_CH: | |
| ch["kind"] = FIFF.FIFFV_REF_MEG_CH | |
| if off_diag: | |
| ch["coil_type"] = FIFF.FIFFV_COIL_CTF_OFFDIAG_REF_GRAD | |
| else: | |
| ch["coil_type"] = FIFF.FIFFV_COIL_CTF_REF_GRAD | |
| nref += 1 | |
| ch["logno"] = nref | |
| else: | |
| this_comp = _check_comp_ch(cch, "Gradiometer", this_comp) | |
| ch["kind"] = FIFF.FIFFV_MEG_CH | |
| ch["coil_type"] = FIFF.FIFFV_COIL_CTF_GRAD | |
| nmeg += 1 | |
| ch["logno"] = nmeg | |
| # Encode the software gradiometer order | |
| ch["coil_type"] = int(ch["coil_type"] | (cch["grad_order_no"] << 16)) | |
| ch["coord_frame"] = FIFF.FIFFV_COORD_DEVICE | |
| elif cch["sensor_type_index"] == CTF.CTFV_EEG_CH: | |
| coord_frame = FIFF.FIFFV_COORD_HEAD | |
| if use_eeg_pos: | |
| # EEG electrode coordinates may be present but in the | |
| # CTF head frame | |
| ch["loc"][:3] = cch["coil"]["pos"][0] | |
| if not _at_origin(ch["loc"][:3]): | |
| if t["t_ctf_head_head"] is None: | |
| warn( | |
| f"EEG electrode ({ch['ch_name']}) location omitted because " | |
| "of missing HPI information" | |
| ) | |
| ch["loc"].fill(np.nan) | |
| coord_frame = FIFF.FIFFV_MNE_COORD_CTF_HEAD | |
| else: | |
| ch["loc"][:3] = apply_trans(t["t_ctf_head_head"], ch["loc"][:3]) | |
| neeg += 1 | |
| ch.update( | |
| logno=neeg, | |
| kind=FIFF.FIFFV_EEG_CH, | |
| unit=FIFF.FIFF_UNIT_V, | |
| coord_frame=coord_frame, | |
| coil_type=FIFF.FIFFV_COIL_EEG, | |
| ) | |
| elif cch["sensor_type_index"] == CTF.CTFV_STIM_CH: | |
| nstim += 1 | |
| ch.update( | |
| logno=nstim, | |
| coord_frame=FIFF.FIFFV_COORD_UNKNOWN, | |
| kind=FIFF.FIFFV_STIM_CH, | |
| unit=FIFF.FIFF_UNIT_V, | |
| ) | |
| else: | |
| nmisc += 1 | |
| ch.update( | |
| logno=nmisc, | |
| coord_frame=FIFF.FIFFV_COORD_UNKNOWN, | |
| kind=FIFF.FIFFV_MISC_CH, | |
| unit=FIFF.FIFF_UNIT_V, | |
| ) | |
| return chs | |
| def _comp_sort_keys(c): | |
| """Sort the compensation data.""" | |
| return (int(c["coeff_type"]), int(c["scanno"])) | |
| def _check_comp(comp): | |
| """Check that conversion to named matrices is possible.""" | |
| ref_sens = None | |
| kind = -1 | |
| for k, c_k in enumerate(comp): | |
| if c_k["coeff_type"] != kind: | |
| c_ref = c_k | |
| ref_sens = c_ref["sensors"] | |
| kind = c_k["coeff_type"] | |
| elif not c_k["sensors"] == ref_sens: | |
| raise RuntimeError("Cannot use an uneven compensation matrix") | |
| def _conv_comp(comp, first, last, chs): | |
| """Add a new converted compensation data item.""" | |
| ch_names = [c["ch_name"] for c in chs] | |
| n_col = comp[first]["ncoeff"] | |
| col_names = comp[first]["sensors"][:n_col] | |
| row_names = [comp[p]["sensor_name"] for p in range(first, last + 1)] | |
| mask = np.isin(col_names, ch_names) # missing channels excluded | |
| col_names = np.array(col_names)[mask].tolist() | |
| n_col = len(col_names) | |
| n_row = len(row_names) | |
| ccomp = dict(ctfkind=comp[first]["coeff_type"], save_calibrated=False) | |
| _add_kind(ccomp) | |
| data = np.empty((n_row, n_col)) | |
| for ii, coeffs in enumerate(comp[first : last + 1]): | |
| # Pick the elements to the matrix | |
| data[ii, :] = coeffs["coeffs"][mask] | |
| ccomp["data"] = dict( | |
| row_names=row_names, | |
| col_names=col_names, | |
| data=data, | |
| nrow=len(row_names), | |
| ncol=len(col_names), | |
| ) | |
| mk = ("proper_gain", "qgain") | |
| _calibrate_comp(ccomp, chs, row_names, col_names, mult_keys=mk, flip=True) | |
| return ccomp | |
| def _convert_comp_data(res4): | |
| """Convert the compensation data into named matrices.""" | |
| if res4["ncomp"] == 0: | |
| return | |
| # Sort the coefficients in our favorite order | |
| res4["comp"] = sorted(res4["comp"], key=_comp_sort_keys) | |
| # Check that all items for a given compensation type have the correct | |
| # number of channels | |
| _check_comp(res4["comp"]) | |
| # Create named matrices | |
| first = 0 | |
| kind = -1 | |
| comps = list() | |
| for k in range(len(res4["comp"])): | |
| if res4["comp"][k]["coeff_type"] != kind: | |
| if k > 0: | |
| comps.append(_conv_comp(res4["comp"], first, k - 1, res4["chs"])) | |
| kind = res4["comp"][k]["coeff_type"] | |
| first = k | |
| comps.append(_conv_comp(res4["comp"], first, k, res4["chs"])) | |
| return comps | |
| def _pick_eeg_pos(c): | |
| """Pick EEG positions.""" | |
| eeg = dict( | |
| coord_frame=FIFF.FIFFV_COORD_HEAD, | |
| assign_to_chs=False, | |
| labels=list(), | |
| ids=list(), | |
| rr=list(), | |
| kinds=list(), | |
| np=0, | |
| ) | |
| for ch in c["chs"]: | |
| if ch["kind"] == FIFF.FIFFV_EEG_CH and not _at_origin(ch["loc"][:3]): | |
| eeg["labels"].append(ch["ch_name"]) | |
| eeg["ids"].append(ch["logno"]) | |
| eeg["rr"].append(ch["loc"][:3]) | |
| eeg["kinds"].append(FIFF.FIFFV_POINT_EEG) | |
| eeg["np"] += 1 | |
| if eeg["np"] == 0: | |
| return None | |
| logger.info("Picked positions of %d EEG channels from channel info", eeg["np"]) | |
| return eeg | |
| def _add_eeg_pos(eeg, t, c): | |
| """Pick the (virtual) EEG position data.""" | |
| if eeg is None: | |
| return | |
| if t is None or t["t_ctf_head_head"] is None: | |
| raise RuntimeError( | |
| "No coordinate transformation available for EEG position data" | |
| ) | |
| eeg_assigned = 0 | |
| if eeg["assign_to_chs"]: | |
| for k in range(eeg["np"]): | |
| # Look for a channel name match | |
| for ch in c["chs"]: | |
| if ch["ch_name"].lower() == eeg["labels"][k].lower(): | |
| r0 = ch["loc"][:3] | |
| r0[:] = eeg["rr"][k] | |
| if eeg["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD: | |
| r0[:] = apply_trans(t["t_ctf_head_head"], r0) | |
| elif eeg["coord_frame"] != FIFF.FIFFV_COORD_HEAD: | |
| raise RuntimeError( | |
| "Illegal coordinate frame for EEG electrode " | |
| f"positions : {_coord_frame_name(eeg['coord_frame'])}" | |
| ) | |
| # Use the logical channel number as an identifier | |
| eeg["ids"][k] = ch["logno"] | |
| eeg["kinds"][k] = FIFF.FIFFV_POINT_EEG | |
| eeg_assigned += 1 | |
| break | |
| # Add these to the Polhemus data | |
| fid_count = eeg_count = extra_count = 0 | |
| for k in range(eeg["np"]): | |
| d = dict( | |
| r=eeg["rr"][k].copy(), | |
| kind=eeg["kinds"][k], | |
| ident=eeg["ids"][k], | |
| coord_frame=FIFF.FIFFV_COORD_HEAD, | |
| ) | |
| c["dig"].append(d) | |
| if eeg["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD: | |
| d["r"] = apply_trans(t["t_ctf_head_head"], d["r"]) | |
| elif eeg["coord_frame"] != FIFF.FIFFV_COORD_HEAD: | |
| raise RuntimeError( | |
| "Illegal coordinate frame for EEG electrode positions: " | |
| + _coord_frame_name(eeg["coord_frame"]) | |
| ) | |
| if eeg["kinds"][k] == FIFF.FIFFV_POINT_CARDINAL: | |
| fid_count += 1 | |
| elif eeg["kinds"][k] == FIFF.FIFFV_POINT_EEG: | |
| eeg_count += 1 | |
| else: | |
| extra_count += 1 | |
| if eeg_assigned > 0: | |
| logger.info( | |
| " %d EEG electrode locations assigned to channel info.", eeg_assigned | |
| ) | |
| for count, kind in zip( | |
| (fid_count, eeg_count, extra_count), | |
| ("fiducials", "EEG locations", "extra points"), | |
| ): | |
| if count > 0: | |
| logger.info(" %d %s added to Polhemus data.", count, kind) | |
| _filt_map = {CTF.CTFV_FILTER_LOWPASS: "lowpass", CTF.CTFV_FILTER_HIGHPASS: "highpass"} | |
| def _compose_meas_info(res4, coils, trans, eeg): | |
| """Create meas info from CTF data.""" | |
| info = _empty_info(res4["sfreq"]) | |
| # Collect all the necessary data from the structures read | |
| info["meas_id"] = get_new_file_id() | |
| info["meas_id"]["usecs"] = 0 | |
| info["meas_id"]["secs"] = _convert_time(res4["data_date"], res4["data_time"]) | |
| info["meas_date"] = (info["meas_id"]["secs"], info["meas_id"]["usecs"]) | |
| info["experimenter"] = res4["nf_operator"] | |
| info["subject_info"] = dict(his_id=res4["nf_subject_id"]) | |
| for filt in res4["filters"]: | |
| if filt["type"] in _filt_map: | |
| info[_filt_map[filt["type"]]] = filt["freq"] | |
| info["dig"], info["hpi_results"] = _pick_isotrak_and_hpi_coils(res4, coils, trans) | |
| if trans is not None: | |
| if len(info["hpi_results"]) > 0: | |
| info["hpi_results"][0]["coord_trans"] = trans["t_ctf_head_head"] | |
| if trans["t_dev_head"] is not None: | |
| info["dev_head_t"] = trans["t_dev_head"] | |
| info["dev_ctf_t"] = combine_transforms( | |
| trans["t_dev_head"], | |
| invert_transform(trans["t_ctf_head_head"]), | |
| FIFF.FIFFV_COORD_DEVICE, | |
| FIFF.FIFFV_MNE_COORD_CTF_HEAD, | |
| ) | |
| if trans["t_ctf_head_head"] is not None: | |
| info["ctf_head_t"] = trans["t_ctf_head_head"] | |
| info["chs"] = _convert_channel_info(res4, trans, eeg is None) | |
| info["comps"] = _convert_comp_data(res4) | |
| if eeg is None: | |
| # Pick EEG locations from chan info if not read from a separate file | |
| eeg = _pick_eeg_pos(info) | |
| _add_eeg_pos(eeg, trans, info) | |
| logger.info(" Measurement info composed.") | |
| info._unlocked = False | |
| info._update_redundant() | |
| return info | |
| def _read_bad_chans(directory, info): | |
| """Read Bad channel list and match to internal names.""" | |
| fname = op.join(directory, "BadChannels") | |
| if not op.exists(fname): | |
| return [] | |
| mapping = dict(zip(_clean_names(info["ch_names"]), info["ch_names"])) | |
| with open(fname) as fid: | |
| bad_chans = [mapping[f.strip()] for f in fid.readlines()] | |
| return bad_chans | |
| def _annotate_bad_segments(directory, start_time, meas_date): | |
| fname = op.join(directory, "bad.segments") | |
| if not op.exists(fname): | |
| return None | |
| # read in bad segment file | |
| onsets = [] | |
| durations = [] | |
| desc = [] | |
| with open(fname) as fid: | |
| for f in fid.readlines(): | |
| tmp = f.strip().split() | |
| desc.append(f"bad_{tmp[0]}") | |
| onsets.append(np.float64(tmp[1]) - start_time) | |
| durations.append(np.float64(tmp[2]) - np.float64(tmp[1])) | |
| # return None if there are no bad segments | |
| if len(onsets) == 0: | |
| return None | |
| return Annotations(onsets, durations, desc, meas_date) | |