Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import datetime as dt | |
| import glob as glob | |
| import json | |
| import os.path as op | |
| import re as re | |
| from configparser import ConfigParser, RawConfigParser | |
| import numpy as np | |
| from scipy.io import loadmat | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import _format_dig_points, create_info | |
| from ..._fiff.utils import _mult_cal_one | |
| from ..._freesurfer import get_mni_fiducials | |
| from ...annotations import Annotations | |
| from ...transforms import _get_trans, apply_trans | |
| from ...utils import ( | |
| _check_fname, | |
| _check_option, | |
| _mask_to_onsets_offsets, | |
| _validate_type, | |
| fill_doc, | |
| logger, | |
| verbose, | |
| warn, | |
| ) | |
| from ..base import BaseRaw | |
| from ._localized_abbr import _localized_abbr | |
| def read_raw_nirx( | |
| fname, saturated="annotate", *, preload=False, encoding="latin-1", verbose=None | |
| ) -> "RawNIRX": | |
| """Reader for a NIRX fNIRS recording. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to the NIRX data folder or header file. | |
| %(saturated)s | |
| %(preload)s | |
| %(encoding_nirx)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawNIRX | |
| A Raw object containing NIRX data. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods of RawNIRX. | |
| Notes | |
| ----- | |
| %(nirx_notes)s | |
| """ | |
| return RawNIRX( | |
| fname, saturated, preload=preload, encoding=encoding, verbose=verbose | |
| ) | |
| def _open(fname): | |
| return open(fname, encoding="latin-1") | |
| class RawNIRX(BaseRaw): | |
| """Raw object from a NIRX fNIRS file. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to the NIRX data folder or header file. | |
| %(saturated)s | |
| %(preload)s | |
| %(encoding_nirx)s | |
| %(verbose)s | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| Notes | |
| ----- | |
| %(nirx_notes)s | |
| """ | |
| def __init__(self, fname, saturated, *, preload=False, encoding=None, verbose=None): | |
| logger.info(f"Loading {fname}") | |
| _validate_type(fname, "path-like", "fname") | |
| _validate_type(saturated, str, "saturated") | |
| _check_option("saturated", saturated, ("annotate", "nan", "ignore")) | |
| fname = str(fname) | |
| if fname.endswith(".hdr"): | |
| fname = op.dirname(op.abspath(fname)) | |
| fname = str(_check_fname(fname, "read", True, "fname", need_dir=True)) | |
| json_config = glob.glob(f"{fname}/*{'config.json'}") | |
| is_aurora = len(json_config) | |
| if is_aurora: | |
| # NIRSport2 devices using Aurora software | |
| keys = ( | |
| "hdr", | |
| "config.json", | |
| "description.json", | |
| "wl1", | |
| "wl2", | |
| "probeInfo.mat", | |
| "tri", | |
| ) | |
| else: | |
| # NIRScout devices and NIRSport1 devices | |
| keys = ( | |
| "hdr", | |
| "inf", | |
| "set", | |
| "tpl", | |
| "wl1", | |
| "wl2", | |
| "config.txt", | |
| "probeInfo.mat", | |
| ) | |
| n_dat = len(glob.glob(f"{fname}/*{'dat'}")) | |
| if n_dat != 1: | |
| warn( | |
| "A single dat file was expected in the specified path, " | |
| f"but got {n_dat}. This may indicate that the file " | |
| "structure has been modified since the measurement " | |
| "was saved." | |
| ) | |
| # Check if required files exist and store names for later use | |
| files = dict() | |
| nan_mask = dict() | |
| for key in keys: | |
| files[key] = glob.glob(f"{fname}/*{key}") | |
| fidx = 0 | |
| if len(files[key]) != 1: | |
| if key not in ("wl1", "wl2"): | |
| raise RuntimeError(f"Need one {key} file, got {len(files[key])}") | |
| noidx = np.where(["nosatflags_" in op.basename(x) for x in files[key]])[ | |
| 0 | |
| ] | |
| if len(noidx) != 1 or len(files[key]) != 2: | |
| raise RuntimeError( | |
| f"Need one nosatflags and one standard {key} file, " | |
| f"got {len(files[key])}" | |
| ) | |
| # Here two files have been found, one that is called | |
| # no sat flags. The nosatflag file has no NaNs in it. | |
| noidx = noidx[0] | |
| if saturated == "ignore": | |
| # Ignore NaN and return values | |
| fidx = noidx | |
| elif saturated == "nan": | |
| # Return NaN | |
| fidx = 0 if noidx == 1 else 1 | |
| else: | |
| assert saturated == "annotate" # guaranteed above | |
| fidx = noidx | |
| nan_mask[key] = files[key][0 if noidx == 1 else 1] | |
| files[key] = files[key][fidx] | |
| # Read number of rows/samples of wavelength data | |
| with _open(files["wl1"]) as fid: | |
| last_sample = fid.read().count("\n") - 1 | |
| # Read header file | |
| # The header file isn't compliant with the configparser. So all the | |
| # text between comments must be removed before passing to parser | |
| with open(files["hdr"], encoding=encoding) as f: | |
| hdr_str_all = f.read() | |
| hdr_str = re.sub("#.*?#", "", hdr_str_all, flags=re.DOTALL) | |
| if is_aurora: | |
| hdr_str = re.sub("(\\[DataStructure].*)", "", hdr_str, flags=re.DOTALL) | |
| hdr = RawConfigParser() | |
| hdr.read_string(hdr_str) | |
| # Check that the file format version is supported | |
| if is_aurora: | |
| # We may need to ease this requirement back | |
| if hdr["GeneralInfo"]["Version"] not in [ | |
| "2021.4.0-34-ge9fdbbc8", | |
| "2021.9.0-5-g3eb32851", | |
| "2021.9.0-6-g14ef4a71", | |
| ]: | |
| warn( | |
| "MNE has not been tested with Aurora version " | |
| f"{hdr['GeneralInfo']['Version']}" | |
| ) | |
| else: | |
| if hdr["GeneralInfo"]["NIRStar"] not in ['"15.0"', '"15.2"', '"15.3"']: | |
| raise RuntimeError( | |
| "MNE does not support this NIRStar version" | |
| f" ({hdr['GeneralInfo']['NIRStar']})" | |
| ) | |
| if ( | |
| "NIRScout" not in hdr["GeneralInfo"]["Device"] | |
| and "NIRSport" not in hdr["GeneralInfo"]["Device"] | |
| ): | |
| warn( | |
| "Only import of data from NIRScout devices have been " | |
| f"thoroughly tested. You are using a {hdr['GeneralInfo']['Device']}" | |
| " device." | |
| ) | |
| # Parse required header fields | |
| # Extract measurement date and time | |
| if is_aurora: | |
| datetime_str = hdr["GeneralInfo"]["Date"] | |
| else: | |
| datetime_str = hdr["GeneralInfo"]["Date"] + hdr["GeneralInfo"]["Time"] | |
| meas_date = None | |
| # Several formats have been observed so we try each in turn | |
| for loc, translations in _localized_abbr.items(): | |
| do_break = False | |
| # So far we are lucky in that all the formats below, if they | |
| # include %a (weekday abbr), always come first. Thus we can use | |
| # a .split(), replace, and rejoin. | |
| loc_datetime_str = datetime_str.split(" ") | |
| for key, val in translations["weekday"].items(): | |
| loc_datetime_str[0] = loc_datetime_str[0].replace(key, val) | |
| for ii in range(1, len(loc_datetime_str)): | |
| for key, val in translations["month"].items(): | |
| loc_datetime_str[ii] = loc_datetime_str[ii].replace(key, val) | |
| loc_datetime_str = " ".join(loc_datetime_str) | |
| logger.debug(f"Trying {loc} datetime: {loc_datetime_str}") | |
| for dt_code in [ | |
| '"%a, %b %d, %Y""%H:%M:%S.%f"', | |
| '"%a %d %b %Y""%H:%M:%S.%f"', | |
| '"%a, %d %b %Y""%H:%M:%S.%f"', | |
| "%Y-%m-%d %H:%M:%S.%f", | |
| '"%Y年%m月%d日""%H:%M:%S.%f"', | |
| ]: | |
| try: | |
| meas_date = dt.datetime.strptime(loc_datetime_str, dt_code) | |
| except ValueError: | |
| pass | |
| else: | |
| meas_date = meas_date.replace(tzinfo=dt.timezone.utc) | |
| do_break = True | |
| logger.debug(f"Measurement date language {loc} detected: {dt_code}") | |
| break | |
| if do_break: | |
| break | |
| if meas_date is None: | |
| warn( | |
| "Extraction of measurement date from NIRX file failed. " | |
| "This can be caused by files saved in certain locales " | |
| f"(currently only {list(_localized_abbr)} supported). " | |
| "Please report this as a github issue. " | |
| "The date is being set to January 1st, 2000, " | |
| f"instead of {repr(datetime_str)}." | |
| ) | |
| meas_date = dt.datetime(2000, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) | |
| # Extract frequencies of light used by machine | |
| if is_aurora: | |
| fnirs_wavelengths = [760, 850] | |
| else: | |
| fnirs_wavelengths = [ | |
| int(s) | |
| for s in re.findall(r"(\d+)", hdr["ImagingParameters"]["Wavelengths"]) | |
| ] | |
| # Extract source-detectors | |
| if is_aurora: | |
| sources = re.findall(r"(\d+)-\d+", hdr_str_all.split("\n")[-2]) | |
| detectors = re.findall(r"\d+-(\d+)", hdr_str_all.split("\n")[-2]) | |
| sources = [int(s) + 1 for s in sources] | |
| detectors = [int(d) + 1 for d in detectors] | |
| else: | |
| sources = np.asarray( | |
| [ | |
| int(s) | |
| for s in re.findall( | |
| r"(\d+)-\d+:\d+", hdr["DataStructure"]["S-D-Key"] | |
| ) | |
| ], | |
| int, | |
| ) | |
| detectors = np.asarray( | |
| [ | |
| int(s) | |
| for s in re.findall( | |
| r"\d+-(\d+):\d+", hdr["DataStructure"]["S-D-Key"] | |
| ) | |
| ], | |
| int, | |
| ) | |
| # Extract sampling rate | |
| if is_aurora: | |
| samplingrate = float(hdr["GeneralInfo"]["Sampling rate"]) | |
| else: | |
| samplingrate = float(hdr["ImagingParameters"]["SamplingRate"]) | |
| # Read participant information file | |
| if is_aurora: | |
| with open(files["description.json"]) as f: | |
| inf = json.load(f) | |
| else: | |
| inf = ConfigParser(allow_no_value=True) | |
| inf.read(files["inf"]) | |
| inf = inf._sections["Subject Demographics"] | |
| # Store subject information from inf file in mne format | |
| # Note: NIRX also records "Study Type", "Experiment History", | |
| # "Additional Notes", "Contact Information" and this information | |
| # is currently discarded | |
| # NIRStar does not record an id, or handedness by default | |
| # The name field is used to populate the his_id variable. | |
| subject_info = {} | |
| if is_aurora: | |
| names = inf["subject"].split() | |
| else: | |
| names = inf["name"].replace('"', "").split() | |
| subject_info["his_id"] = "_".join(names) | |
| if len(names) > 0: | |
| subject_info["first_name"] = names[0].replace('"', "") | |
| if len(names) > 1: | |
| subject_info["last_name"] = names[-1].replace('"', "") | |
| if len(names) > 2: | |
| subject_info["middle_name"] = names[-2].replace('"', "") | |
| subject_info["sex"] = inf["gender"].replace('"', "") | |
| # Recode values | |
| if subject_info["sex"] in {"M", "Male", "1"}: | |
| subject_info["sex"] = FIFF.FIFFV_SUBJ_SEX_MALE | |
| elif subject_info["sex"] in {"F", "Female", "2"}: | |
| subject_info["sex"] = FIFF.FIFFV_SUBJ_SEX_FEMALE | |
| else: | |
| subject_info["sex"] = FIFF.FIFFV_SUBJ_SEX_UNKNOWN | |
| if inf["age"] != "": | |
| subject_info["birthday"] = dt.date( | |
| meas_date.year - int(inf["age"]), | |
| meas_date.month, | |
| meas_date.day, | |
| ) | |
| # Read information about probe/montage/optodes | |
| # A word on terminology used here: | |
| # Sources produce light | |
| # Detectors measure light | |
| # Sources and detectors are both called optodes | |
| # Each source - detector pair produces a channel | |
| # Channels are defined as the midpoint between source and detector | |
| mat_data = loadmat(files["probeInfo.mat"]) | |
| probes = mat_data["probeInfo"]["probes"][0, 0] | |
| requested_channels = probes["index_c"][0, 0] | |
| src_locs = probes["coords_s3"][0, 0] / 100.0 | |
| det_locs = probes["coords_d3"][0, 0] / 100.0 | |
| ch_locs = probes["coords_c3"][0, 0] / 100.0 | |
| # These are all in MNI coordinates, so let's transform them to | |
| # the Neuromag head coordinate frame | |
| src_locs, det_locs, ch_locs, mri_head_t = _convert_fnirs_to_head( | |
| "fsaverage", "mri", "head", src_locs, det_locs, ch_locs | |
| ) | |
| # Set up digitization | |
| dig = get_mni_fiducials("fsaverage", verbose=False) | |
| for fid in dig: | |
| fid["r"] = apply_trans(mri_head_t, fid["r"]) | |
| fid["coord_frame"] = FIFF.FIFFV_COORD_HEAD | |
| for ii, ch_loc in enumerate(ch_locs, 1): | |
| dig.append( | |
| dict( | |
| kind=FIFF.FIFFV_POINT_EEG, # misnomer but probably okay | |
| r=ch_loc, | |
| ident=ii, | |
| coord_frame=FIFF.FIFFV_COORD_HEAD, | |
| ) | |
| ) | |
| dig = _format_dig_points(dig) | |
| del mri_head_t | |
| # Determine requested channel indices | |
| # The wl1 and wl2 files include all possible source - detector pairs. | |
| # But most of these are not relevant. We want to extract only the | |
| # subset requested in the probe file | |
| req_ind = np.array([], int) | |
| for req_idx in range(requested_channels.shape[0]): | |
| sd_idx = np.where( | |
| (sources == requested_channels[req_idx][0]) | |
| & (detectors == requested_channels[req_idx][1]) | |
| ) | |
| req_ind = np.concatenate((req_ind, sd_idx[0])) | |
| req_ind = req_ind.astype(int) | |
| snames = [f"S{sources[idx]}" for idx in req_ind] | |
| dnames = [f"_D{detectors[idx]}" for idx in req_ind] | |
| sdnames = [m + str(n) for m, n in zip(snames, dnames)] | |
| sd1 = [s + " " + str(fnirs_wavelengths[0]) for s in sdnames] | |
| sd2 = [s + " " + str(fnirs_wavelengths[1]) for s in sdnames] | |
| chnames = [val for pair in zip(sd1, sd2) for val in pair] | |
| # Create mne structure | |
| info = create_info(chnames, samplingrate, ch_types="fnirs_cw_amplitude") | |
| with info._unlock(): | |
| info.update(subject_info=subject_info, dig=dig) | |
| info["meas_date"] = meas_date | |
| # Store channel, source, and detector locations | |
| # The channel location is stored in the first 3 entries of loc. | |
| # The source location is stored in the second 3 entries of loc. | |
| # The detector location is stored in the third 3 entries of loc. | |
| # NIRx NIRSite uses MNI coordinates. | |
| # Also encode the light frequency in the structure. | |
| for ch_idx2 in range(requested_channels.shape[0]): | |
| # Find source and store location | |
| src = int(requested_channels[ch_idx2, 0]) - 1 | |
| # Find detector and store location | |
| det = int(requested_channels[ch_idx2, 1]) - 1 | |
| # Store channel location as midpoint between source and detector. | |
| midpoint = (src_locs[src, :] + det_locs[det, :]) / 2 | |
| for ii in range(2): | |
| ch_idx3 = ch_idx2 * 2 + ii | |
| info["chs"][ch_idx3]["loc"][3:6] = src_locs[src, :] | |
| info["chs"][ch_idx3]["loc"][6:9] = det_locs[det, :] | |
| info["chs"][ch_idx3]["loc"][:3] = midpoint | |
| info["chs"][ch_idx3]["loc"][9] = fnirs_wavelengths[ii] | |
| info["chs"][ch_idx3]["coord_frame"] = FIFF.FIFFV_COORD_HEAD | |
| # Extract the start/stop numbers for samples in the CSV. In theory the | |
| # sample bounds should just be 10 * the number of channels, but some | |
| # files have mixed \n and \n\r endings (!) so we can't rely on it, and | |
| # instead make a single pass over the entire file at the beginning so | |
| # that we know how to seek and read later. | |
| bounds = dict() | |
| for key in ("wl1", "wl2"): | |
| offset = 0 | |
| bounds[key] = [offset] | |
| with open(files[key], "rb") as fid: | |
| for line in fid: | |
| offset += len(line) | |
| bounds[key].append(offset) | |
| assert offset == fid.tell() | |
| # Extras required for reading data | |
| raw_extras = { | |
| "sd_index": req_ind, | |
| "files": files, | |
| "bounds": bounds, | |
| "nan_mask": nan_mask, | |
| } | |
| # Get our saturated mask | |
| annot_mask = None | |
| for ki, key in enumerate(("wl1", "wl2")): | |
| if nan_mask.get(key, None) is None: | |
| continue | |
| mask = np.isnan( | |
| _read_csv_rows_cols( | |
| nan_mask[key], 0, last_sample + 1, req_ind, {0: 0, 1: None} | |
| ).T | |
| ) | |
| if saturated == "nan": | |
| nan_mask[key] = mask | |
| else: | |
| assert saturated == "annotate" | |
| if annot_mask is None: | |
| annot_mask = np.zeros( | |
| (len(info["ch_names"]) // 2, last_sample + 1), bool | |
| ) | |
| annot_mask |= mask | |
| nan_mask[key] = None # shouldn't need again | |
| super().__init__( | |
| info, | |
| preload, | |
| filenames=[fname], | |
| last_samps=[last_sample], | |
| raw_extras=[raw_extras], | |
| verbose=verbose, | |
| ) | |
| # make onset/duration/description | |
| onset, duration, description, ch_names = list(), list(), list(), list() | |
| if annot_mask is not None: | |
| for ci, mask in enumerate(annot_mask): | |
| on, dur = _mask_to_onsets_offsets(mask) | |
| on = on / info["sfreq"] | |
| dur = dur / info["sfreq"] | |
| dur -= on | |
| onset.extend(on) | |
| duration.extend(dur) | |
| description.extend(["BAD_SATURATED"] * len(on)) | |
| ch_names.extend([self.ch_names[2 * ci : 2 * ci + 2]] * len(on)) | |
| # Read triggers from event file | |
| if not is_aurora: | |
| files["tri"] = files["hdr"][:-3] + "evt" | |
| if op.isfile(files["tri"]): | |
| with _open(files["tri"]) as fid: | |
| t = [re.findall(r"(\d+)", line) for line in fid] | |
| if is_aurora: | |
| tf_idx, desc_idx = _determine_tri_idxs(t[0]) | |
| for t_ in t: | |
| if is_aurora: | |
| trigger_frame = float(t_[tf_idx]) | |
| desc = float(t_[desc_idx]) | |
| else: | |
| binary_value = "".join(t_[1:])[::-1] | |
| desc = float(int(binary_value, 2)) | |
| trigger_frame = float(t_[0]) | |
| onset.append(trigger_frame / samplingrate) | |
| duration.append(1.0) # No duration info stored in files | |
| description.append(desc) | |
| ch_names.append(list()) | |
| annot = Annotations(onset, duration, description, ch_names=ch_names) | |
| self.set_annotations(annot) | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a segment of data from a file. | |
| The NIRX machine records raw data as two different wavelengths. | |
| The returned data interleaves the wavelengths. | |
| """ | |
| sd_index = self._raw_extras[fi]["sd_index"] | |
| wls = list() | |
| for key in ("wl1", "wl2"): | |
| d = _read_csv_rows_cols( | |
| self._raw_extras[fi]["files"][key], | |
| start, | |
| stop, | |
| sd_index, | |
| self._raw_extras[fi]["bounds"][key], | |
| ).T | |
| nan_mask = self._raw_extras[fi]["nan_mask"].get(key, None) | |
| if nan_mask is not None: | |
| d[nan_mask[:, start:stop]] = np.nan | |
| wls.append(d) | |
| # TODO: Make this more efficient by only indexing above what we need. | |
| # For now let's just construct the full data matrix and index. | |
| # Interleave wavelength 1 and 2 to match channel names: | |
| this_data = np.zeros((len(wls[0]) * 2, stop - start)) | |
| this_data[0::2, :] = wls[0] | |
| this_data[1::2, :] = wls[1] | |
| _mult_cal_one(data, this_data, idx, cals, mult) | |
| return data | |
| def _read_csv_rows_cols(fname, start, stop, cols, bounds, sep=" ", replace=None): | |
| with open(fname, "rb") as fid: | |
| fid.seek(bounds[start]) | |
| args = list() | |
| if bounds[1] is not None: | |
| args.append(bounds[stop] - bounds[start]) | |
| data = fid.read(*args).decode("latin-1") | |
| if replace is not None: | |
| data = replace(data) | |
| x = np.fromstring(data, float, sep=sep) | |
| x.shape = (stop - start, -1) | |
| x = x[:, cols] | |
| return x | |
| def _convert_fnirs_to_head(trans, fro, to, src_locs, det_locs, ch_locs): | |
| mri_head_t, _ = _get_trans(trans, fro, to) | |
| src_locs = apply_trans(mri_head_t, src_locs) | |
| det_locs = apply_trans(mri_head_t, det_locs) | |
| ch_locs = apply_trans(mri_head_t, ch_locs) | |
| return src_locs, det_locs, ch_locs, mri_head_t | |
| def _determine_tri_idxs(trigger): | |
| """Determine tri file indexes for frame and description.""" | |
| if len(trigger) == 12: | |
| # Aurora version 2021.9.6 or greater | |
| trigger_frame_idx = 7 | |
| desc_idx = 10 | |
| elif len(trigger) == 9: | |
| # Aurora version 2021.9.5 or earlier | |
| trigger_frame_idx = 7 | |
| desc_idx = 8 | |
| else: | |
| raise RuntimeError("Unable to read trigger file.") | |
| return trigger_frame_idx, desc_idx | |