Spaces:
Running
Running
| # | |
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import re | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import numpy as np | |
| from ..._fiff._digitization import _make_dig_points | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import create_info | |
| from ..._fiff.tag import _coil_trans_to_loc | |
| from ..._fiff.utils import _mult_cal_one, _read_segments_file | |
| from ...annotations import annotations_from_events | |
| from ...epochs import Epochs | |
| from ...surface import _normal_orth | |
| from ...transforms import ( | |
| Transform, | |
| _angle_between_quats, | |
| apply_trans, | |
| combine_transforms, | |
| get_ras_to_neuromag_trans, | |
| invert_transform, | |
| rot_to_quat, | |
| ) | |
| from ...utils import ( | |
| _on_missing, | |
| _soft_import, | |
| catch_logging, | |
| logger, | |
| verbose, | |
| warn, | |
| ) | |
| from ..base import BaseRaw | |
| from ..ctf.trans import _quaternion_align | |
| CURRY_SUFFIX_DATA = [".cdt", ".dat"] | |
| CURRY_SUFFIX_HDR = [".cdt.dpa", ".cdt.dpo", ".dap"] | |
| CURRY_SUFFIX_LABELS = [".cdt.dpa", ".cdt.dpo", ".rs3"] | |
| def _get_curry_version(fname): | |
| """Check out the curry file version.""" | |
| fname_hdr = _check_curry_header_filename(_check_curry_filename(fname)) | |
| content_hdr = fname_hdr.read_text() | |
| return ( | |
| "Curry 7" | |
| if ".dap" in str(fname_hdr) | |
| else "Curry 8" | |
| if re.compile(r"FileVersion\s*=\s*[0-9]+") | |
| .search(content_hdr) | |
| .group(0) | |
| .split()[-1][0] | |
| == "8" | |
| else "Curry 9" | |
| if re.compile(r"FileVersion\s*=\s*[0-9]+") | |
| .search(content_hdr) | |
| .group(0) | |
| .split()[-1][0] | |
| == "9" | |
| else None | |
| ) | |
| def _check_curry_filename(fname): | |
| fname_in = Path(fname).expanduser() | |
| fname_out = None | |
| # try suffixes | |
| if fname_in.suffix in CURRY_SUFFIX_DATA: | |
| fname_out = fname_in | |
| elif ( | |
| fname_in.with_suffix("").exists() | |
| and fname_in.with_suffix("").suffix in CURRY_SUFFIX_DATA | |
| ): | |
| fname_out = fname_in.with_suffix("") | |
| else: | |
| for data_suff in CURRY_SUFFIX_DATA: | |
| if fname_in.with_suffix(data_suff).exists(): | |
| fname_out = fname_in.with_suffix(data_suff) | |
| break | |
| # final check | |
| if not fname_out or not fname_out.exists(): | |
| raise FileNotFoundError( | |
| f"no curry data file found (.dat or .cdt), checked {fname_out or fname_in}" | |
| ) | |
| return fname_out | |
| def _check_curry_header_filename(fname): | |
| fname_in = Path(fname) | |
| fname_hdr = None | |
| # try suffixes | |
| for hdr_suff in CURRY_SUFFIX_HDR: | |
| if fname_in.with_suffix(hdr_suff).exists(): | |
| fname_hdr = fname_in.with_suffix(hdr_suff) | |
| break | |
| # final check | |
| if not fname_hdr or not fname_in.exists(): | |
| raise FileNotFoundError( | |
| f"no corresponding header file found {CURRY_SUFFIX_HDR}" | |
| ) | |
| return fname_hdr | |
| def _check_curry_labels_filename(fname): | |
| fname_in = Path(fname) | |
| fname_labels = None | |
| # try suffixes | |
| for hdr_suff in CURRY_SUFFIX_LABELS: | |
| if fname_in.with_suffix(hdr_suff).exists(): | |
| fname_labels = fname_in.with_suffix(hdr_suff) | |
| break | |
| # final check | |
| if not fname_labels or not fname_in.exists(): | |
| raise FileNotFoundError( | |
| f"no corresponding labels file found {CURRY_SUFFIX_HDR}" | |
| ) | |
| return fname_labels | |
| def _check_curry_sfreq_consistency(fname_hdr): | |
| content_hdr = fname_hdr.read_text() | |
| stime = float( | |
| re.compile(r"SampleTimeUsec\s*=\s*.+").search(content_hdr).group(0).split()[-1] | |
| ) | |
| sfreq = float( | |
| re.compile(r"SampleFreqHz\s*=\s*.+").search(content_hdr).group(0).split()[-1] | |
| ) | |
| if stime == 0: | |
| raise ValueError("Header file indicates a sampling interval of 0µs.") | |
| if not np.isclose(1e6 / stime, sfreq): | |
| warn( | |
| f"Sample distance ({stime}µs) and sample frequency ({sfreq}Hz) in header " | |
| "file do not match! sfreq will be derived from sample distance." | |
| ) | |
| def _get_curry_meas_info(fname): | |
| # Note that the time zone information is not stored in the Curry info | |
| # file, and it seems the start time info is in the local timezone | |
| # of the acquisition system (which is unknown); therefore, just set | |
| # the timezone to be UTC. If the user knows otherwise, they can | |
| # change it later. (Some Curry files might include StartOffsetUTCMin, | |
| # but its presence is unpredictable, so we won't rely on it.) | |
| fname_hdr = _check_curry_header_filename(fname) | |
| content_hdr = fname_hdr.read_text() | |
| # read meas_date | |
| meas_date = [ | |
| int(re.compile(rf"{v}\s*=\s*-?\d+").search(content_hdr).group(0).split()[-1]) | |
| for v in [ | |
| "StartYear", | |
| "StartMonth", | |
| "StartDay", | |
| "StartHour", | |
| "StartMin", | |
| "StartSec", | |
| "StartMillisec", | |
| ] | |
| ] | |
| try: | |
| meas_date = datetime( | |
| *meas_date[:-1], | |
| meas_date[-1] * 1000, # -> microseconds | |
| timezone.utc, | |
| ) | |
| except Exception: | |
| meas_date = None | |
| # read datatype | |
| byteorder = ( | |
| re.compile(r"DataByteOrder\s*=\s*[A-Z]+") | |
| .search(content_hdr) | |
| .group() | |
| .split()[-1] | |
| ) | |
| is_ascii = byteorder == "ASCII" | |
| # amplifier info | |
| # TODO - PRIVACY | |
| # seems like there can be identifiable information (serial numbers, dates). | |
| # MNE anonymization functions only overwrite "serial" and "site", though | |
| # TODO - FUTURE ENHANCEMENT | |
| # # there can be filter details in AmplifierInfo, too | |
| amp_info = ( | |
| re.compile(r"AmplifierInfo\s*=.*\n") | |
| .search(content_hdr) | |
| .group() | |
| .strip("\n") | |
| .split("= ")[-1] | |
| .strip() | |
| ) | |
| device_info = ( | |
| dict(serial=amp_info) | |
| if amp_info != "" | |
| else None # model="", serial="", site="" | |
| ) | |
| return meas_date, is_ascii, device_info | |
| def _get_curry_recording_type(fname): | |
| _soft_import("curryreader", "read recording modality") | |
| import curryreader | |
| epochinfo = curryreader.read(str(fname), plotdata=0, verbosity=1)["epochinfo"] | |
| if epochinfo.size == 0: | |
| return "raw" | |
| else: | |
| n_average = epochinfo[:, 0] | |
| if (n_average == 1).all(): | |
| return "epochs" | |
| else: | |
| return "evoked" | |
| def _get_curry_epoch_info(fname): | |
| _soft_import("curryreader", "read epoch info") | |
| _soft_import("pandas", "dataframe integration") | |
| import curryreader | |
| import pandas as pd | |
| # use curry-python-reader | |
| currydata = curryreader.read(str(fname), plotdata=0, verbosity=1) | |
| # get epoch info | |
| sfreq = currydata["info"]["samplingfreq"] | |
| n_samples = currydata["info"]["samples"] | |
| n_epochs = len(currydata["epochlabels"]) | |
| epochinfo = currydata["epochinfo"] | |
| epochtypes = epochinfo[:, 2].astype(int).tolist() | |
| epochlabels = currydata["epochlabels"] | |
| epochmetainfo = pd.DataFrame( | |
| epochinfo[:, -4:], columns=["accept", "correct", "response", "response time"] | |
| ) | |
| # create mne events | |
| events = np.array( | |
| [[i * n_samples for i in range(n_epochs)], [0] * n_epochs, epochtypes] | |
| ).T | |
| event_id = dict(zip(epochlabels, epochtypes)) | |
| return dict( | |
| events=events, | |
| event_id=event_id, | |
| tmin=0.0, | |
| tmax=(n_samples - 1) / sfreq, | |
| baseline=None, | |
| detrend=None, | |
| verbose=False, | |
| metadata=epochmetainfo, | |
| reject_by_annotation=False, | |
| reject=None, | |
| ) | |
| def _get_curry_meg_normals(fname): | |
| fname_lbl = _check_curry_labels_filename(fname) | |
| normals_str = fname_lbl.read_text().split("\n") | |
| # i_start, i_stop = [ | |
| # i | |
| # for i, ll in enumerate(normals_str) | |
| # if ("NORMALS" in ll and "START_LIST" in ll) | |
| # or ("NORMALS" in ll and "END_LIST" in ll) | |
| # ] | |
| # normals_str = [nn.split("\t") for nn in normals_str[i_start + 1 : i_stop]] | |
| i_list = [ | |
| i | |
| for i, ll in enumerate(normals_str) | |
| if ("NORMALS" in ll and "START_LIST" in ll) | |
| or ("NORMALS" in ll and "END_LIST" in ll) | |
| ] | |
| assert len(i_list) % 2 == 0 | |
| i_start_list = i_list[::2] | |
| i_stop_list = i_list[1::2] | |
| normals_str = [ | |
| nn.split("\t") | |
| for i_start, i_stop in zip(i_start_list, i_stop_list) | |
| for nn in normals_str[i_start + 1 : i_stop] | |
| ] | |
| return np.array([[float(nnn.strip()) for nnn in nn] for nn in normals_str]) | |
| def _extract_curry_info(fname): | |
| _soft_import("curryreader", "read file header") | |
| import curryreader | |
| # check if sfreq values make sense | |
| fname_hdr = _check_curry_header_filename(fname) | |
| _check_curry_sfreq_consistency(fname_hdr) | |
| # use curry-python-reader | |
| currydata = curryreader.read(str(fname), plotdata=0, verbosity=1) | |
| # basic info | |
| sfreq = currydata["info"]["samplingfreq"] | |
| n_samples = currydata["info"]["samples"] | |
| if n_samples != currydata["data"].shape[0]: # normal in epoched data | |
| n_samples = currydata["data"].shape[0] | |
| if _get_curry_recording_type(fname) == "raw": | |
| warn( | |
| "sample count from header doesn't match actual data! " | |
| "file corrupted? will use data shape" | |
| ) | |
| # channel information | |
| n_ch = currydata["info"]["channels"] | |
| ch_names = currydata["labels"] | |
| ch_pos = currydata["sensorpos"] | |
| landmarks = currydata["landmarks"] | |
| if not isinstance(landmarks, np.ndarray): | |
| landmarks = np.array(landmarks) | |
| landmarkslabels = currydata["landmarkslabels"] | |
| hpimatrix = currydata["hpimatrix"] | |
| if isinstance(currydata["hpimatrix"], np.ndarray) and hpimatrix.ndim == 1: | |
| hpimatrix = hpimatrix[np.newaxis, :] | |
| # data | |
| orig_format = "int" | |
| # curryreader.py always reads float32, but this is probably just numpy. | |
| # legacy MNE code states int. | |
| # events | |
| events = currydata["events"] | |
| annotations = currydata["annotations"] | |
| assert len(annotations) == len(events) | |
| if len(events) > 0: | |
| event_desc = dict() | |
| for k, v in zip(events[:, 1], annotations): | |
| if int(k) not in event_desc.keys(): | |
| event_desc[int(k)] = v.strip() if (v.strip() != "") else str(int(k)) | |
| else: | |
| event_desc = None | |
| # impedance measurements | |
| # moved to standalone def; see read_impedances_curry | |
| # impedances = currydata["impedances"] | |
| # get other essential info not provided by curryreader | |
| # channel types and units | |
| ch_types, units = [], [] | |
| ch_groups = fname_hdr.read_text().split("DEVICE_PARAMETERS")[1::2] | |
| for ch_group in ch_groups: | |
| ch_group = re.compile(r"\s+").sub(" ", ch_group).strip() | |
| groupid = ch_group.split()[0] | |
| unit = ch_group.split("DataUnit = ")[1].split()[0] | |
| n_ch_group = int(ch_group.split("NumChanThisGroup = ")[1].split()[0]) | |
| ch_type = ( | |
| "mag" if ("MAG" in groupid) else "misc" if ("OTHER" in groupid) else "eeg" | |
| ) | |
| # combine info | |
| ch_types += [ch_type] * n_ch_group | |
| units += [unit] * n_ch_group | |
| # This for Git issue #8391. In some cases, the 'labels' (.rs3 file will | |
| # list channels that are not actually saved in the datafile (such as the | |
| # 'Ref' channel). These channels are denoted in the 'info' (.dap) file | |
| # in the CHAN_IN_FILE section with a '0' as their index. | |
| # | |
| # current curryreader cannot cope with this - loads the list of channels solely | |
| # based on their order, so can be false. fix it here! | |
| if not len(ch_types) == len(units) == len(ch_names) == n_ch: | |
| # read relevant info | |
| fname_lbl = _check_curry_labels_filename(fname) | |
| lbl = fname_lbl.read_text().split("START_LIST") | |
| ch_names_full = [] | |
| for i in range(1, len(lbl)): | |
| if "LABELS" in lbl[i - 1].split()[-1]: | |
| for ll in lbl[i].split("\n")[1:]: | |
| if "LABELS" not in ll: | |
| ch_names_full.append(ll.strip()) | |
| else: | |
| break | |
| hdr = fname_hdr.read_text().split("START_LIST") | |
| chaninfile_full = [] | |
| for i in range(1, len(hdr)): | |
| if "CHAN_IN_FILE" in hdr[i - 1].split()[-1]: | |
| for ll in hdr[i].split("\n")[1:]: | |
| if "CHAN_IN_FILE" not in ll: | |
| chaninfile_full.append(int(ll.strip())) | |
| else: | |
| break | |
| # drop channels with chan_in_file==0, account for order | |
| i_drop = [i for i, ich in enumerate(chaninfile_full) if ich == 0] | |
| ch_names = [ | |
| ch_names_full[i] for i in np.argsort(chaninfile_full) if i not in i_drop | |
| ] | |
| ch_pos = np.array( | |
| [ | |
| ch_pos[i] | |
| for i in np.argsort(chaninfile_full) | |
| if (i not in i_drop) and (i < len(ch_pos)) | |
| ] | |
| ) | |
| ch_types = [ch_types[i] for i in np.argsort(chaninfile_full) if i not in i_drop] | |
| units = [units[i] for i in np.argsort(chaninfile_full) if i not in i_drop] | |
| assert len(ch_types) == len(units) == len(ch_names) == n_ch | |
| assert len(ch_pos) == ch_types.count("eeg") + ch_types.count("mag") | |
| # finetune channel types (e.g. stim, eog etc might be identified by name) | |
| # TODO - FUTURE ENHANCEMENT | |
| # scale data to SI units | |
| orig_units = dict(zip(ch_names, units)) | |
| cals = [ | |
| 1.0 / 1e15 if (u == "fT") else 1.0 / 1e6 if (u == "uV") else 1.0 for u in units | |
| ] | |
| return ( | |
| sfreq, | |
| n_samples, | |
| ch_names, | |
| ch_types, | |
| ch_pos, | |
| landmarks, | |
| landmarkslabels, | |
| hpimatrix, | |
| events, | |
| event_desc, | |
| orig_format, | |
| orig_units, | |
| cals, | |
| ) | |
| def _read_annotations_curry(fname, sfreq="auto"): | |
| r"""Read events from Curry event files. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| The filename. | |
| sfreq : float | 'auto' | |
| The sampling frequency in the file. If set to 'auto' then the | |
| ``sfreq`` is taken from the fileheader. | |
| Returns | |
| ------- | |
| annot : instance of Annotations | None | |
| The annotations. | |
| """ | |
| fname = _check_curry_filename(fname) | |
| (sfreq_fromfile, _, _, _, _, _, _, _, events, event_desc, _, _, _) = ( | |
| _extract_curry_info(fname) | |
| ) | |
| if sfreq == "auto": | |
| sfreq = sfreq_fromfile | |
| elif np.isreal(sfreq): | |
| if float(sfreq) != float(sfreq_fromfile): | |
| warn( | |
| f"provided sfreq ({sfreq} Hz) does not match freq from fileheader " | |
| "({sfreq_fromfile} Hz)!" | |
| ) | |
| else: | |
| raise ValueError("'sfreq' must be numeric or 'auto'") | |
| if isinstance(events, np.ndarray): # if there are events | |
| events = events.astype("int") | |
| events = np.insert(events, 1, np.diff(events[:, 2:]).flatten(), axis=1)[:, :3] | |
| return annotations_from_events(events, sfreq, event_desc=event_desc) | |
| else: | |
| warn("no event annotations found") | |
| return None | |
| def _set_chanloc_curry( | |
| inst, ch_types, ch_pos, landmarks, landmarkslabels, hpimatrix, on_bad_hpi_match | |
| ): | |
| ch_names = inst.info["ch_names"] | |
| # scale ch_pos to m?! | |
| ch_pos /= 1000.0 | |
| landmarks /= 1000.0 | |
| # channel locations | |
| # what about misc without pos? can they mess things up if unordered? | |
| assert len(ch_pos) >= (ch_types.count("mag") + ch_types.count("eeg")) | |
| assert len(ch_pos) == (ch_types.count("mag") + ch_types.count("eeg")) | |
| ch_pos_meg = { | |
| ch_names[i]: ch_pos[i, :3] for i, t in enumerate(ch_types) if t == "mag" | |
| } | |
| ch_pos_eeg = { | |
| ch_names[i]: ch_pos[i, :3] for i, t in enumerate(ch_types) if t == "eeg" | |
| } | |
| # landmarks and headshape | |
| # FIX: one of the test files (c,rfDC*.cdt) names landmarks differently: | |
| NAS_NAMES = ["nasion", "nas"] | |
| LPA_NAMES = ["left ear", "lpa"] | |
| RPA_NAMES = ["right ear", "rpa"] | |
| landmarkslabels = [ | |
| "Nas" | |
| if (ll.lower() in NAS_NAMES) | |
| else "LPA" | |
| if (ll.lower() in LPA_NAMES) | |
| else "RPA" | |
| if (ll.lower() in RPA_NAMES) | |
| else ll | |
| for ll in landmarkslabels | |
| ] | |
| landmark_dict = dict(zip(landmarkslabels, landmarks)) | |
| for k in ["Nas", "RPA", "LPA"]: | |
| if k not in landmark_dict.keys(): | |
| landmark_dict[k] = None | |
| if len(landmarkslabels) > 0: | |
| hpi_pos = landmarks[ | |
| [i for i, n in enumerate(landmarkslabels) if re.match("HPI.?[1-99]", n)], | |
| :, | |
| ] | |
| else: | |
| hpi_pos = None | |
| if len(landmarkslabels) > 0: | |
| hsp_pos = landmarks[ | |
| [i for i, n in enumerate(landmarkslabels) if re.match("H.?[1-99]", n)], : | |
| ] | |
| else: | |
| hsp_pos = None | |
| has_cards = ( | |
| False | |
| if ( | |
| isinstance(landmark_dict["Nas"], type(None)) | |
| and isinstance(landmark_dict["LPA"], type(None)) | |
| and isinstance(landmark_dict["RPA"], type(None)) | |
| ) | |
| else True | |
| ) | |
| has_hpi = True if isinstance(hpi_pos, np.ndarray) else False | |
| add_missing_fiducials = not has_cards # raises otherwise | |
| dig = _make_dig_points( | |
| nasion=landmark_dict["Nas"], | |
| lpa=landmark_dict["LPA"], | |
| rpa=landmark_dict["RPA"], | |
| hpi=hpi_pos, | |
| extra_points=hsp_pos, | |
| dig_ch_pos=ch_pos_eeg, | |
| coord_frame="head", | |
| add_missing_fiducials=add_missing_fiducials, | |
| ) | |
| with inst.info._unlock(): | |
| inst.info["dig"] = dig | |
| # loc transformation for meg sensors (taken from previous version) | |
| if len(ch_pos_meg) > 0: | |
| R = np.eye(4) | |
| R[[0, 1], [0, 1]] = -1 # rotate 180 deg | |
| # shift down and back | |
| # (chosen by eyeballing to make the helmet look roughly correct) | |
| R[:3, 3] = [0.0, -0.015, -0.12] | |
| curry_dev_dev_t = Transform("ctf_meg", "meg", R) | |
| ch_normals_meg = _get_curry_meg_normals(inst.filenames[0]) | |
| assert len(ch_normals_meg) == len(ch_pos_meg) | |
| else: | |
| curry_dev_dev_t, ch_normals_meg = None, None | |
| # fill up chanlocs | |
| assert len(ch_names) == len(ch_types) >= len(ch_pos) | |
| for i, (ch_name, ch_type, ch_loc) in enumerate(zip(ch_names, ch_types, ch_pos)): | |
| assert inst.info["ch_names"][i] == ch_name | |
| ch = inst.info["chs"][i] | |
| if ch_type == "eeg": | |
| with inst.info._unlock(): | |
| ch["loc"][:3] = ch_loc[:3] | |
| ch["coord_frame"] = FIFF.FIFFV_COORD_HEAD | |
| elif ch_type == "mag": | |
| # transform mode | |
| pos = ch_loc[:3] # just the inner coil for MEG | |
| pos = apply_trans(curry_dev_dev_t, pos) | |
| nn = ch_normals_meg[i] | |
| assert np.isclose(np.linalg.norm(nn), 1.0, atol=1e-4) | |
| nn /= np.linalg.norm(nn) | |
| nn = apply_trans(curry_dev_dev_t, nn, move=False) | |
| trans = np.eye(4) | |
| trans[:3, 3] = pos | |
| trans[:3, :3] = _normal_orth(nn).T | |
| with inst.info._unlock(): | |
| ch["loc"] = _coil_trans_to_loc(trans) | |
| # TODO: We should figure out if all files are Compumedics, | |
| # and even then figure out if it's adult or child | |
| ch["coil_type"] = FIFF.FIFFV_COIL_COMPUMEDICS_ADULT_GRAD | |
| ch["coord_frame"] = FIFF.FIFFV_COORD_DEVICE | |
| elif ch_type == "misc": | |
| pass | |
| else: | |
| raise NotImplementedError | |
| # TODO - REVIEW NEEDED | |
| # do we need further transformations for MEG channel positions? | |
| # the testfiles i got look good to me.. | |
| _make_trans_dig( | |
| inst.info, | |
| curry_dev_dev_t, | |
| landmark_dict, | |
| has_cards, | |
| has_hpi, | |
| hpimatrix, | |
| on_bad_hpi_match, | |
| ) | |
| def _make_trans_dig( | |
| info, | |
| curry_dev_dev_t, | |
| landmark_dict, | |
| has_cards, | |
| has_hpi, | |
| chpidata, | |
| on_bad_hpi_match, | |
| ): | |
| cards = { | |
| FIFF.FIFFV_POINT_LPA: landmark_dict["LPA"], | |
| FIFF.FIFFV_POINT_NASION: landmark_dict["Nas"], | |
| FIFF.FIFFV_POINT_RPA: landmark_dict["RPA"], | |
| } | |
| # Coordinate frame transformations and definitions | |
| no_msg = "Leaving device<->head transform as None" | |
| info["dev_head_t"] = None | |
| lm = [v for v in landmark_dict.values() if isinstance(v, np.ndarray)] | |
| if len(lm) == 0: | |
| # no dig | |
| logger.info(no_msg + " (no landmarks found)") | |
| return | |
| if has_cards and has_hpi: # have all three | |
| logger.info("Composing device<->head transformation from dig points") | |
| hpi_u = np.array( | |
| [d["r"] for d in info["dig"] if d["kind"] == FIFF.FIFFV_POINT_HPI], float | |
| ) | |
| hpi_c = np.ascontiguousarray(chpidata[0][: len(hpi_u), 1:4]) | |
| bad_hpi_match = False | |
| try: | |
| with catch_logging() as log: | |
| unknown_curry_t = _quaternion_align( | |
| "unknown", | |
| "ctf_meg", | |
| hpi_u.astype("float64"), | |
| hpi_c.astype("float64"), | |
| 1e-2, | |
| ) | |
| except RuntimeError: | |
| bad_hpi_match = True | |
| with catch_logging() as log: | |
| unknown_curry_t = _quaternion_align( | |
| "unknown", | |
| "ctf_meg", | |
| hpi_u.astype("float64"), | |
| hpi_c.astype("float64"), | |
| 1e-1, | |
| ) | |
| logger.info(log.getvalue()) | |
| angle = np.rad2deg( | |
| _angle_between_quats( | |
| np.zeros(3), rot_to_quat(unknown_curry_t["trans"][:3, :3]) | |
| ) | |
| ) | |
| dist = 1000 * np.linalg.norm(unknown_curry_t["trans"][:3, 3]) | |
| logger.info(f" Fit a {angle:0.1f}° rotation, {dist:0.1f} mm translation") | |
| if bad_hpi_match: | |
| _on_missing( | |
| on_bad_hpi_match, | |
| "Poor HPI matching (see log above)!", | |
| name="on_bad_hpi_match", | |
| ) | |
| unknown_dev_t = combine_transforms( | |
| unknown_curry_t, curry_dev_dev_t, "unknown", "meg" | |
| ) | |
| unknown_head_t = Transform( | |
| "unknown", | |
| "head", | |
| get_ras_to_neuromag_trans( | |
| *( | |
| cards[key] | |
| for key in ( | |
| FIFF.FIFFV_POINT_NASION, | |
| FIFF.FIFFV_POINT_LPA, | |
| FIFF.FIFFV_POINT_RPA, | |
| ) | |
| ) | |
| ), | |
| ) | |
| with info._unlock(): | |
| info["dev_head_t"] = combine_transforms( | |
| invert_transform(unknown_dev_t), unknown_head_t, "meg", "head" | |
| ) | |
| for d in info["dig"]: | |
| d.update( | |
| coord_frame=FIFF.FIFFV_COORD_HEAD, | |
| r=apply_trans(unknown_head_t, d["r"]), | |
| ) | |
| else: | |
| if has_cards: | |
| no_msg += " (no .hpi file found)" | |
| elif has_hpi: | |
| no_msg += " (not all cardinal points found)" | |
| else: | |
| no_msg += " (neither cardinal points nor .hpi file found)" | |
| logger.info(no_msg) | |
| def read_raw_curry( | |
| fname, preload=False, on_bad_hpi_match="warn", verbose=None | |
| ) -> "RawCurry": | |
| """Read raw data from Curry files. | |
| .. versionchanged:: 1.11 | |
| This function now requires ``curryreader`` to be installed. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to a valid curry file. | |
| %(preload)s | |
| %(on_bad_hpi_match)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawCurry | |
| A Raw object containing Curry data. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods of RawCurry. | |
| """ | |
| fname = _check_curry_filename(fname) | |
| fname_hdr = _check_curry_header_filename(fname) | |
| _check_curry_sfreq_consistency(fname_hdr) | |
| rectype = _get_curry_recording_type(fname) | |
| inst = RawCurry(fname, preload, on_bad_hpi_match, verbose) | |
| if rectype in ["epochs", "evoked"]: | |
| curry_epoch_info = _get_curry_epoch_info(fname) | |
| inst = Epochs(inst, **curry_epoch_info) | |
| if rectype == "evoked": | |
| raise NotImplementedError # not sure this is even supported format | |
| return inst | |
| class RawCurry(BaseRaw): | |
| """Raw object from Curry file. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to a valid curry file. | |
| %(preload)s | |
| %(on_bad_hpi_match)s | |
| %(verbose)s | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| """ | |
| def __init__(self, fname, preload=False, on_bad_hpi_match="warn", verbose=None): | |
| fname = _check_curry_filename(fname) | |
| ( | |
| sfreq, | |
| n_samples, | |
| ch_names, | |
| ch_types, | |
| ch_pos, | |
| landmarks, | |
| landmarkslabels, | |
| hpimatrix, | |
| events, | |
| event_desc, | |
| orig_format, | |
| orig_units, | |
| cals, | |
| ) = _extract_curry_info(fname) | |
| meas_date, is_ascii, device_info = _get_curry_meas_info(fname) | |
| # construct info | |
| info = create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types) | |
| info["device_info"] = device_info | |
| # create raw object | |
| last_samps = [n_samples - 1] | |
| raw_extras = dict(is_ascii=is_ascii) | |
| super().__init__( | |
| info, | |
| preload=False, | |
| filenames=[fname], | |
| last_samps=last_samps, | |
| orig_format=orig_format, | |
| raw_extras=[raw_extras], | |
| orig_units=orig_units, | |
| verbose=verbose, | |
| ) | |
| # set meas_date | |
| self.set_meas_date(meas_date) | |
| # scale data to SI units | |
| self._cals = np.array(cals) | |
| if isinstance(preload, bool | np.bool_) and preload: | |
| self.load_data() | |
| # set events / annotations | |
| # format from curryreader: sample, etype, startsample, endsample | |
| if isinstance(events, np.ndarray): # if there are events | |
| events = events.astype("int") | |
| events = np.insert(events, 1, np.diff(events[:, 2:]).flatten(), axis=1)[ | |
| :, :3 | |
| ] | |
| annot = annotations_from_events(events, sfreq, event_desc=event_desc) | |
| self.set_annotations(annot) | |
| # add HPI data (if present) | |
| # TODO - FUTURE ENHANCEMENT | |
| # from curryreader docstring: | |
| # "HPI-coil measurements matrix (Orion-MEG only) where every row is: | |
| # [measurementsample, dipolefitflag, x, y, z, deviation]" | |
| # | |
| # that's incorrect, though. it ratehr seems to be: | |
| # [sample, dipole_1, x_1,y_1, z_1, dev_1, ..., dipole_n, x_n, ...] | |
| # for all n coils. | |
| # | |
| # Do not implement cHPI reader for now. | |
| # Can be used for dev-head transform, though! | |
| if not isinstance(hpimatrix, list): | |
| # warn("cHPI data found, but reader not implemented.") | |
| hpisamples = hpimatrix[:, 0] | |
| n_coil = int((hpimatrix.shape[1] - 1) / 5) | |
| hpimatrix = hpimatrix[:, 1:].reshape(hpimatrix.shape[0], n_coil, 5) / 1000 | |
| logger.info(f"found {len(hpisamples)} cHPI samples for {n_coil} coils") | |
| # add sensor locations | |
| # TODO - REVIEW NEEDED | |
| assert len(self.info["ch_names"]) == len(ch_types) >= len(ch_pos) | |
| _set_chanloc_curry( | |
| inst=self, | |
| ch_types=ch_types, | |
| ch_pos=ch_pos, | |
| landmarks=landmarks, | |
| landmarkslabels=landmarkslabels, | |
| hpimatrix=hpimatrix, | |
| on_bad_hpi_match=on_bad_hpi_match, | |
| ) | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of raw data.""" | |
| if self._raw_extras[fi]["is_ascii"]: | |
| if isinstance(idx, slice): | |
| idx = np.arange(idx.start, idx.stop) | |
| block = np.loadtxt( | |
| self.filenames[0], skiprows=start, max_rows=stop - start, ndmin=2 | |
| ).T | |
| _mult_cal_one(data, block, idx, cals, mult) | |
| else: | |
| _read_segments_file( | |
| self, data, idx, fi, start, stop, cals, mult, dtype="<f4" | |
| ) | |
| def read_impedances_curry(fname, verbose=None): | |
| """Read impedance measurements from Curry files. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to a valid curry file. | |
| %(verbose)s | |
| Returns | |
| ------- | |
| ch_names : list | |
| A list object containing channel names | |
| impedances : np.ndarray | |
| An array containing up to 10 impedance measurements for all recorded channels. | |
| """ | |
| _soft_import("curryreader", "read impedances") | |
| import curryreader | |
| # use curry-python-reader to load data | |
| fname = _check_curry_filename(fname) | |
| fname_hdr = _check_curry_header_filename(fname) | |
| _check_curry_sfreq_consistency(fname_hdr) | |
| currydata = curryreader.read(str(fname), plotdata=0, verbosity=1) | |
| impedances = currydata["impedances"] | |
| ch_names = currydata["labels"] | |
| # get impedance measurement times | |
| # TODO - FUTURE ENHANCEMENT | |
| # info can be in the files (as events or IMPEDANCE_TIMES) | |
| # inconsistently, though, and low priority | |
| # print impedances | |
| logger.info("impedance measurements:") | |
| for iimp in range(impedances.shape[0]): | |
| logger.info({ch: float(imp) for ch, imp in zip(ch_names, impedances[iimp])}) | |
| return ch_names, impedances | |