Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import datetime as dt | |
| import re | |
| import numpy as np | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import _merge_info, create_info | |
| from ..._fiff.utils import _mult_cal_one | |
| from ...utils import _check_fname, _check_option, fill_doc, logger, verbose, warn | |
| from ..base import BaseRaw | |
| from ..nirx.nirx import _read_csv_rows_cols | |
| def read_raw_hitachi(fname, preload=False, verbose=None) -> "RawHitachi": | |
| """Reader for a Hitachi fNIRS recording. | |
| Parameters | |
| ---------- | |
| %(hitachi_fname)s | |
| %(preload)s | |
| %(verbose)s | |
| Returns | |
| ------- | |
| raw : instance of RawHitachi | |
| A Raw object containing Hitachi data. | |
| See :class:`mne.io.Raw` for documentation of attributes and methods. | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods of RawHitachi. | |
| Notes | |
| ----- | |
| %(hitachi_notes)s | |
| """ | |
| return RawHitachi(fname, preload, verbose=verbose) | |
| def _check_bad(cond, msg): | |
| if cond: | |
| raise RuntimeError(f"Could not parse file: {msg}") | |
| class RawHitachi(BaseRaw): | |
| """Raw object from a Hitachi fNIRS file. | |
| Parameters | |
| ---------- | |
| %(hitachi_fname)s | |
| %(preload)s | |
| %(verbose)s | |
| See Also | |
| -------- | |
| mne.io.Raw : Documentation of attributes and methods. | |
| Notes | |
| ----- | |
| %(hitachi_notes)s | |
| """ | |
| def __init__(self, fname, preload=False, *, verbose=None): | |
| if not isinstance(fname, list | tuple): | |
| fname = [fname] | |
| fname = list(fname) # our own list that we can modify | |
| for fi, this_fname in enumerate(fname): | |
| fname[fi] = _check_fname(this_fname, "read", True, f"fname[{fi}]") | |
| infos = list() | |
| probes = list() | |
| last_samps = list() | |
| S_offset = D_offset = 0 | |
| ignore_names = ["Time"] | |
| for this_fname in fname: | |
| info, extra, last_samp, offsets = _get_hitachi_info( | |
| this_fname, S_offset, D_offset, ignore_names | |
| ) | |
| ignore_names = list(set(ignore_names + info["ch_names"])) | |
| S_offset += offsets[0] | |
| D_offset += offsets[1] | |
| infos.append(info) | |
| probes.append(extra) | |
| last_samps.append(last_samp) | |
| # combine infos | |
| if len(fname) > 1: | |
| info = _merge_info(infos) | |
| else: | |
| info = infos[0] | |
| if len(set(last_samps)) != 1: | |
| raise RuntimeError( | |
| "All files must have the same number of samples, got: {last_samps}" | |
| ) | |
| last_samps = [last_samps[0]] | |
| raw_extras = [dict(probes=probes)] | |
| # One representative filename is good enough here | |
| # (additional filenames indicate temporal concat, not ch concat) | |
| super().__init__( | |
| info, | |
| preload, | |
| filenames=[fname[0]], | |
| last_samps=last_samps, | |
| raw_extras=raw_extras, | |
| verbose=verbose, | |
| ) | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a segment of data from a file.""" | |
| this_data = list() | |
| for this_probe in self._raw_extras[fi]["probes"]: | |
| this_data.append( | |
| _read_csv_rows_cols( | |
| this_probe["fname"], | |
| start, | |
| stop, | |
| this_probe["keep_mask"], | |
| this_probe["bounds"], | |
| sep=",", | |
| replace=lambda x: x.replace("\r", "\n") | |
| .replace("\n\n", "\n") | |
| .replace("\n", ",") | |
| .replace(":", ""), | |
| ).T | |
| ) | |
| this_data = np.concatenate(this_data, axis=0) | |
| _mult_cal_one(data, this_data, idx, cals, mult) | |
| return data | |
| def _get_hitachi_info(fname, S_offset, D_offset, ignore_names): | |
| logger.info(f"Loading {fname}") | |
| raw_extra = dict(fname=fname) | |
| info_extra = dict() | |
| subject_info = dict() | |
| ch_wavelengths = dict() | |
| fnirs_wavelengths = [None, None] | |
| meas_date = age = ch_names = sfreq = None | |
| with open(fname, "rb") as fid: | |
| lines = fid.read() | |
| lines = lines.decode("latin-1").rstrip("\r\n") | |
| oldlen = len(lines) | |
| assert len(lines) == oldlen | |
| bounds = [0] | |
| end = "\n" if "\n" in lines else "\r" | |
| bounds.extend(a.end() for a in re.finditer(end, lines)) | |
| bounds.append(len(lines)) | |
| lines = lines.split(end) | |
| assert len(bounds) == len(lines) + 1 | |
| line = lines[0].rstrip(",\r\n") | |
| _check_bad(line != "Header", "no header found") | |
| li = 0 | |
| mode = None | |
| for li, line in enumerate(lines[1:], 1): | |
| # Newer format has some blank lines | |
| if len(line) == 0: | |
| continue | |
| parts = line.rstrip(",\r\n").split(",") | |
| if len(parts) == 0: # some header lines are blank | |
| continue | |
| kind, parts = parts[0], parts[1:] | |
| if len(parts) == 0: | |
| parts = [""] # some fields (e.g., Comment) meaningfully blank | |
| if kind == "File Version": | |
| logger.info(f"Reading Hitachi fNIRS file version {parts[0]}") | |
| elif kind == "AnalyzeMode": | |
| _check_bad(parts != ["Continuous"], f"not continuous data ({parts})") | |
| elif kind == "Sampling Period[s]": | |
| sfreq = 1 / float(parts[0]) | |
| elif kind == "Exception": | |
| raise NotImplementedError(kind) | |
| elif kind == "Comment": | |
| info_extra["description"] = parts[0] | |
| elif kind == "ID": | |
| subject_info["his_id"] = parts[0] | |
| elif kind == "Name": | |
| if len(parts): | |
| name = parts[0].split(" ") | |
| if len(name): | |
| subject_info["first_name"] = name[0] | |
| subject_info["last_name"] = " ".join(name[1:]) | |
| elif kind == "Age": | |
| age = int(parts[0].rstrip("y")) | |
| elif kind == "Mode": | |
| mode = parts[0] | |
| elif kind in ("HPF[Hz]", "LPF[Hz]"): | |
| try: | |
| freq = float(parts[0]) | |
| except ValueError: | |
| pass | |
| else: | |
| info_extra[{"HPF[Hz]": "highpass", "LPF[Hz]": "lowpass"}[kind]] = freq | |
| elif kind == "Date": | |
| # 5/17/04 5:14 | |
| try: | |
| mdy, HM = parts[0].split(" ") | |
| H, M = HM.split(":") | |
| if len(H) == 1: | |
| H = f"0{H}" | |
| mdyHM = " ".join([mdy, ":".join([H, M])]) | |
| for fmt in ("%m/%d/%y %H:%M", "%Y/%m/%d %H:%M"): | |
| try: | |
| meas_date = dt.datetime.strptime(mdyHM, fmt) | |
| except Exception: | |
| pass | |
| else: | |
| break | |
| else: | |
| raise RuntimeError # unknown format | |
| except Exception: | |
| warn( | |
| "Extraction of measurement date failed. " | |
| "Please report this as a github issue. " | |
| "The date is being set to January 1st, 2000, " | |
| f"instead of {repr(parts[0])}" | |
| ) | |
| elif kind == "Sex": | |
| try: | |
| subject_info["sex"] = dict( | |
| female=FIFF.FIFFV_SUBJ_SEX_FEMALE, male=FIFF.FIFFV_SUBJ_SEX_MALE | |
| )[parts[0].lower()] | |
| except KeyError: | |
| pass | |
| elif kind == "Wave[nm]": | |
| fnirs_wavelengths[:] = [int(part) for part in parts] | |
| elif kind == "Wave Length": | |
| ch_regex = re.compile(r"^(.*)\(([0-9\.]+)\)$") | |
| for ent in parts: | |
| _, v = ch_regex.match(ent).groups() | |
| ch_wavelengths[ent] = float(v) | |
| elif kind == "Data": | |
| break | |
| fnirs_wavelengths = np.array(fnirs_wavelengths, int) | |
| assert len(fnirs_wavelengths) == 2 | |
| ch_names = lines[li + 1].rstrip(",\r\n").split(",") | |
| # cull to correct ones | |
| raw_extra["keep_mask"] = ~np.isin(ch_names, list(ignore_names)) | |
| for ci, ch_name in enumerate(ch_names): | |
| if re.match("Probe[0-9]+", ch_name): | |
| raw_extra["keep_mask"][ci] = False | |
| # set types | |
| ch_names = [ | |
| ch_name for ci, ch_name in enumerate(ch_names) if raw_extra["keep_mask"][ci] | |
| ] | |
| ch_types = [ | |
| "fnirs_cw_amplitude" if ch_name.startswith("CH") else "stim" | |
| for ch_name in ch_names | |
| ] | |
| # get locations | |
| nirs_names = [ | |
| ch_name | |
| for ch_name, ch_type in zip(ch_names, ch_types) | |
| if ch_type == "fnirs_cw_amplitude" | |
| ] | |
| n_nirs = len(nirs_names) | |
| assert n_nirs % 2 == 0 | |
| names = { | |
| "3x3": "ETG-100", | |
| "3x5": "ETG-7000", | |
| "4x4": "ETG-7000", | |
| "3x11": "ETG-4000", | |
| } | |
| _check_option("Hitachi mode", mode, sorted(names)) | |
| n_row, n_col = (int(x) for x in mode.split("x")) | |
| logger.info(f"Constructing pairing matrix for {names[mode]} ({mode})") | |
| pairs = _compute_pairs(n_row, n_col, n=1 + (mode == "3x3")) | |
| assert n_nirs == len(pairs) * 2 | |
| locs = np.zeros((len(ch_names), 12)) | |
| locs[:, :9] = np.nan | |
| idxs = np.where(np.array(ch_types, "U") == "fnirs_cw_amplitude")[0] | |
| for ii, idx in enumerate(idxs): | |
| ch_name = ch_names[idx] | |
| # Use the actual/accurate wavelength in loc | |
| acc_freq = ch_wavelengths[ch_name] | |
| locs[idx][9] = acc_freq | |
| # Rename channel based on standard naming scheme, using the | |
| # nominal wavelength | |
| sidx, didx = pairs[ii // 2] | |
| nom_freq = fnirs_wavelengths[np.argmin(np.abs(acc_freq - fnirs_wavelengths))] | |
| ch_names[idx] = f"S{S_offset + sidx + 1}_D{D_offset + didx + 1} {nom_freq}" | |
| offsets = np.array(pairs, int).max(axis=0) + 1 | |
| # figure out bounds | |
| bounds = raw_extra["bounds"] = bounds[li + 2 :] | |
| last_samp = len(bounds) - 2 | |
| if age is not None and meas_date is not None: | |
| subject_info["birthday"] = dt.date( | |
| meas_date.year - age, | |
| meas_date.month, | |
| meas_date.day, | |
| ) | |
| if meas_date is None: | |
| meas_date = dt.datetime(2000, 1, 1, 0, 0, 0) | |
| meas_date = meas_date.replace(tzinfo=dt.timezone.utc) | |
| if subject_info: | |
| info_extra["subject_info"] = subject_info | |
| # Create mne structure | |
| info = create_info(ch_names, sfreq, ch_types=ch_types) | |
| with info._unlock(): | |
| info.update(info_extra) | |
| info["meas_date"] = meas_date | |
| for li, loc in enumerate(locs): | |
| info["chs"][li]["loc"][:] = loc | |
| return info, raw_extra, last_samp, offsets | |
| def _compute_pairs(n_rows, n_cols, n=1): | |
| n_tot = n_rows * n_cols | |
| sd_idx = (np.arange(n_tot) // 2).reshape(n_rows, n_cols) | |
| d_bool = np.empty((n_rows, n_cols), bool) | |
| for ri in range(n_rows): | |
| d_bool[ri] = np.arange(ri, ri + n_cols) % 2 | |
| pairs = list() | |
| for ri in range(n_rows): | |
| # First iterate over connections within the row | |
| for ci in range(n_cols - 1): | |
| pair = (sd_idx[ri, ci], sd_idx[ri, ci + 1]) | |
| if d_bool[ri, ci]: # reverse | |
| pair = pair[::-1] | |
| pairs.append(pair) | |
| # Next iterate over row-row connections, if applicable | |
| if ri >= n_rows - 1: | |
| continue | |
| for ci in range(n_cols): | |
| pair = (sd_idx[ri, ci], sd_idx[ri + 1, ci]) | |
| if d_bool[ri, ci]: | |
| pair = pair[::-1] | |
| pairs.append(pair) | |
| if n > 1: | |
| assert n == 2 # only one supported for now | |
| pairs = np.array(pairs, int) | |
| second = pairs + pairs.max(axis=0) + 1 | |
| pairs = np.r_[pairs, second] | |
| pairs = tuple(tuple(row) for row in pairs) | |
| return tuple(pairs) | |