Spaces:
Running
Running
| """Read .res4 files.""" | |
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| import os.path as op | |
| import numpy as np | |
| from ...utils import logger | |
| from .constants import CTF | |
| def _make_ctf_name(directory, extra, raise_error=True): | |
| """Make a CTF name.""" | |
| fname = op.join(directory, op.basename(directory)[:-3] + "." + extra) | |
| found = True | |
| if not op.isfile(fname): | |
| if raise_error: | |
| raise OSError(f"Standard file {fname} not found") | |
| found = False | |
| return fname, found | |
| def _read_double(fid, n=1): | |
| """Read a double.""" | |
| return np.fromfile(fid, ">f8", n) | |
| def _read_string(fid, n_bytes, decode=True): | |
| """Read string.""" | |
| s0 = fid.read(n_bytes) | |
| s = s0.split(b"\x00")[0] | |
| return s.decode("utf-8") if decode else s | |
| def _read_ustring(fid, n_bytes): | |
| """Read unsigned character string.""" | |
| return np.fromfile(fid, ">B", n_bytes) | |
| def _read_int2(fid): | |
| """Read int from short.""" | |
| return _auto_cast(np.fromfile(fid, ">i2", 1)[0]) | |
| def _read_int(fid): | |
| """Read a 32-bit integer.""" | |
| return np.fromfile(fid, ">i4", 1)[0] | |
| def _move_to_next(fid, byte=8): | |
| """Move to next byte boundary.""" | |
| now = fid.tell() | |
| if now % byte != 0: | |
| now = now - (now % byte) + byte | |
| fid.seek(now, 0) | |
| def _read_filter(fid): | |
| """Read filter information.""" | |
| f = dict() | |
| f["freq"] = _read_double(fid)[0] | |
| f["class"] = _read_int(fid) | |
| f["type"] = _read_int(fid) | |
| f["npar"] = _read_int2(fid) | |
| f["pars"] = _read_double(fid, f["npar"]) | |
| return f | |
| def _read_comp_coeff(fid, d): | |
| """Read compensation coefficients.""" | |
| # Read the coefficients and initialize | |
| d["ncomp"] = _read_int2(fid) | |
| d["comp"] = list() | |
| # Read each record | |
| dt = np.dtype( | |
| [ | |
| ("sensor_name", "S32"), | |
| ("coeff_type", ">i4"), | |
| ("d0", ">i4"), | |
| ("ncoeff", ">i2"), | |
| ("sensors", f"S{CTF.CTFV_SENSOR_LABEL}", CTF.CTFV_MAX_BALANCING), | |
| ("coeffs", ">f8", CTF.CTFV_MAX_BALANCING), | |
| ] | |
| ) | |
| comps = np.fromfile(fid, dt, d["ncomp"]) | |
| for k in range(d["ncomp"]): | |
| comp = dict() | |
| d["comp"].append(comp) | |
| comp["sensor_name"] = comps["sensor_name"][k].split(b"\x00")[0].decode("utf-8") | |
| comp["coeff_type"] = comps["coeff_type"][k].item() | |
| comp["ncoeff"] = comps["ncoeff"][k].item() | |
| comp["sensors"] = [ | |
| s.split(b"\x00")[0].decode("utf-8") | |
| for s in comps["sensors"][k][: comp["ncoeff"]] | |
| ] | |
| comp["coeffs"] = comps["coeffs"][k][: comp["ncoeff"]] | |
| comp["scanno"] = d["ch_names"].index(comp["sensor_name"]) | |
| def _read_res4(dsdir): | |
| """Read the magical res4 file.""" | |
| # adapted from read_res4.c | |
| name, _ = _make_ctf_name(dsdir, "res4") | |
| res = dict() | |
| with open(name, "rb") as fid: | |
| # Read the fields | |
| res["head"] = _read_string(fid, 8) | |
| res["appname"] = _read_string(fid, 256) | |
| res["origin"] = _read_string(fid, 256) | |
| res["desc"] = _read_string(fid, 256) | |
| res["nave"] = _read_int2(fid) | |
| res["data_time"] = _read_string(fid, 255) | |
| res["data_date"] = _read_string(fid, 255) | |
| # Seems that date and time can be swapped | |
| # (are they entered manually?!) | |
| if "/" in res["data_time"] and ":" in res["data_date"]: | |
| data_date = res["data_date"] | |
| res["data_date"] = res["data_time"] | |
| res["data_time"] = data_date | |
| res["nsamp"] = _read_int(fid) | |
| res["nchan"] = _read_int2(fid) | |
| _move_to_next(fid, 8) | |
| res["sfreq"] = _read_double(fid)[0] | |
| res["epoch_time"] = _read_double(fid)[0] | |
| res["no_trials"] = _read_int2(fid) | |
| _move_to_next(fid, 4) | |
| res["pre_trig_pts"] = _read_int(fid) | |
| res["no_trials_done"] = _read_int2(fid) | |
| res["no_trials_bst_message_windowlay"] = _read_int2(fid) | |
| _move_to_next(fid, 4) | |
| res["save_trials"] = _read_int(fid) | |
| res["primary_trigger"] = fid.read(1) | |
| res["secondary_trigger"] = [ | |
| fid.read(1) for k in range(CTF.CTFV_MAX_AVERAGE_BINS) | |
| ] | |
| res["trigger_polarity_mask"] = fid.read(1) | |
| res["trigger_mode"] = _read_int2(fid) | |
| _move_to_next(fid, 4) | |
| res["accept_reject"] = _read_int(fid) | |
| res["run_time_bst_message_windowlay"] = _read_int2(fid) | |
| _move_to_next(fid, 4) | |
| res["zero_head"] = _read_int(fid) | |
| _move_to_next(fid, 4) | |
| res["artifact_mode"] = _read_int(fid) | |
| _read_int(fid) # padding | |
| res["nf_run_name"] = _read_string(fid, 32) | |
| res["nf_run_title"] = _read_string(fid, 256) | |
| res["nf_instruments"] = _read_string(fid, 32) | |
| res["nf_collect_descriptor"] = _read_string(fid, 32) | |
| res["nf_subject_id"] = _read_string(fid, 32) | |
| res["nf_operator"] = _read_string(fid, 32) | |
| if len(res["nf_operator"]) == 0: | |
| res["nf_operator"] = None | |
| res["nf_sensor_file_name"] = _read_ustring(fid, 60) | |
| _move_to_next(fid, 4) | |
| res["rdlen"] = _read_int(fid) | |
| fid.seek(CTF.FUNNY_POS, 0) | |
| if res["rdlen"] > 0: | |
| res["run_desc"] = _read_string(fid, res["rdlen"]) | |
| # Filters | |
| res["nfilt"] = _read_int2(fid) | |
| res["filters"] = list() | |
| for k in range(res["nfilt"]): | |
| res["filters"].append(_read_filter(fid)) | |
| # Channel information (names, then data) | |
| res["ch_names"] = list() | |
| for k in range(res["nchan"]): | |
| ch_name = _read_string(fid, 32) | |
| res["ch_names"].append(ch_name) | |
| _coil_dt = np.dtype( | |
| [ | |
| ("pos", ">f8", 3), | |
| ("d0", ">f8"), | |
| ("norm", ">f8", 3), | |
| ("d1", ">f8"), | |
| ("turns", ">i2"), | |
| ("d2", ">i4"), | |
| ("d3", ">i2"), | |
| ("area", ">f8"), | |
| ] | |
| ) | |
| _ch_dt = np.dtype( | |
| [ | |
| ("sensor_type_index", ">i2"), | |
| ("original_run_no", ">i2"), | |
| ("coil_type", ">i4"), | |
| ("proper_gain", ">f8"), | |
| ("qgain", ">f8"), | |
| ("io_gain", ">f8"), | |
| ("io_offset", ">f8"), | |
| ("num_coils", ">i2"), | |
| ("grad_order_no", ">i2"), | |
| ("d0", ">i4"), | |
| ("coil", _coil_dt, CTF.CTFV_MAX_COILS), | |
| ("head_coil", _coil_dt, CTF.CTFV_MAX_COILS), | |
| ] | |
| ) | |
| chs = np.fromfile(fid, _ch_dt, res["nchan"]) | |
| for coil in (chs["coil"], chs["head_coil"]): | |
| coil["pos"] /= 100.0 | |
| coil["area"] *= 1e-4 | |
| # convert to dict | |
| chs = [dict(zip(chs.dtype.names, x)) for x in chs] | |
| for ch in chs: | |
| for key, val in ch.items(): | |
| ch[key] = _auto_cast(val) | |
| res["chs"] = chs | |
| for k in range(res["nchan"]): | |
| res["chs"][k]["ch_name"] = res["ch_names"][k] | |
| # The compensation coefficients | |
| _read_comp_coeff(fid, res) | |
| logger.info(" res4 data read.") | |
| return res | |
| def _auto_cast(x): | |
| # Upcast scalars | |
| if isinstance(x, np.ScalarType): | |
| if x.dtype.kind == "i": | |
| if x.dtype != np.int64: | |
| x = x.astype(np.int64) | |
| elif x.dtype.kind == "f": | |
| if x.dtype != np.float64: | |
| x = x.astype(np.float64) | |
| return x | |