Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| import numpy as np | |
| from ..._fiff.meas_info import create_info | |
| from ...evoked import EvokedArray | |
| from ...utils import fill_doc, logger, verbose | |
| def read_evoked_besa(fname, verbose=None): | |
| """Reader function for BESA ``.avr`` or ``.mul`` files. | |
| When a ``.elp`` sidecar file is present, it will be used to determine | |
| electrode information. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Path to the ``.avr`` or ``.mul`` file. | |
| %(verbose)s | |
| Returns | |
| ------- | |
| ev : Evoked | |
| The evoked data in the .avr or .mul file. | |
| """ | |
| fname = Path(fname) | |
| if fname.suffix == ".avr": | |
| return _read_evoked_besa_avr(fname, verbose) | |
| elif fname.suffix == ".mul": | |
| return _read_evoked_besa_mul(fname, verbose) | |
| else: | |
| raise ValueError("Filename must end in either .avr or .mul") | |
| def _read_evoked_besa_avr(fname, verbose): | |
| """Create EvokedArray from a BESA .avr file.""" | |
| with open(fname) as f: | |
| header = f.readline().strip() | |
| # There are two versions of .avr files. The old style, generated by | |
| # BESA 1, 2 and 3 does not define Nchan and does not have channel names | |
| # in the file. | |
| new_style = "Nchan=" in header | |
| if new_style: | |
| ch_names = f.readline().strip().split() | |
| else: | |
| ch_names = None | |
| fields = _parse_header(header) | |
| data = np.loadtxt(fname, skiprows=2 if new_style else 1, ndmin=2) | |
| ch_types = _read_elp_sidecar(fname) | |
| # Consolidate channel names | |
| if new_style: | |
| if len(ch_names) != len(data): | |
| raise RuntimeError( | |
| "Mismatch between the number of channel names defined in " | |
| f"the .avr file ({len(ch_names)}) and the number of rows " | |
| f"in the data matrix ({len(data)})." | |
| ) | |
| else: | |
| # Determine channel names from the .elp sidecar file | |
| if ch_types is not None: | |
| ch_names = list(ch_types.keys()) | |
| if len(ch_names) != len(data): | |
| raise RuntimeError( | |
| "Mismatch between the number of channels " | |
| f"defined in the .avr file ({len(data)}) " | |
| f"and .elp file ({len(ch_names)})." | |
| ) | |
| else: | |
| logger.info( | |
| "No .elp file found and no channel names present in " | |
| "the .avr file. Falling back to generic names. " | |
| ) | |
| ch_names = [f"CH{i + 1:02d}" for i in range(len(data))] | |
| # Consolidate channel types | |
| if ch_types is None: | |
| logger.info("Marking all channels as EEG.") | |
| ch_types = ["eeg"] * len(ch_names) | |
| else: | |
| ch_types = [ch_types[ch] for ch in ch_names] | |
| # Go over all the header fields and make sure they are all defined to | |
| # something sensible. | |
| if "Npts" in fields: | |
| fields["Npts"] = int(fields["Npts"]) | |
| if fields["Npts"] != data.shape[1]: | |
| logger.warn( | |
| f"The size of the data matrix ({data.shape}) does not " | |
| f'match the "Npts" field ({fields["Npts"]}).' | |
| ) | |
| if "Nchan" in fields: | |
| fields["Nchan"] = int(fields["Nchan"]) | |
| if fields["Nchan"] != data.shape[0]: | |
| logger.warn( | |
| f"The size of the data matrix ({data.shape}) does not " | |
| f'match the "Nchan" field ({fields["Nchan"]}).' | |
| ) | |
| if "DI" in fields: | |
| fields["DI"] = float(fields["DI"]) | |
| else: | |
| raise RuntimeError( | |
| 'No "DI" field present. Could not determine sampling frequency.' | |
| ) | |
| if "TSB" in fields: | |
| fields["TSB"] = float(fields["TSB"]) | |
| else: | |
| fields["TSB"] = 0 | |
| if "SB" in fields: | |
| fields["SB"] = float(fields["SB"]) | |
| else: | |
| fields["SB"] = 1.0 | |
| if "SegmentName" not in fields: | |
| fields["SegmentName"] = "" | |
| # Build the Evoked object based on the header fields. | |
| info = create_info(ch_names, sfreq=1000 / fields["DI"], ch_types="eeg") | |
| return EvokedArray( | |
| data / fields["SB"] / 1e6, | |
| info, | |
| tmin=fields["TSB"] / 1000, | |
| comment=fields["SegmentName"], | |
| verbose=verbose, | |
| ) | |
| def _read_evoked_besa_mul(fname, verbose): | |
| """Create EvokedArray from a BESA .mul file.""" | |
| with open(fname) as f: | |
| header = f.readline().strip() | |
| ch_names = f.readline().strip().split() | |
| fields = _parse_header(header) | |
| data = np.loadtxt(fname, skiprows=2, ndmin=2) | |
| if len(ch_names) != data.shape[1]: | |
| raise RuntimeError( | |
| "Mismatch between the number of channel names " | |
| f"defined in the .mul file ({len(ch_names)}) " | |
| "and the number of columns in the data matrix " | |
| f"({data.shape[1]})." | |
| ) | |
| # Consolidate channel types | |
| ch_types = _read_elp_sidecar(fname) | |
| if ch_types is None: | |
| logger.info("Marking all channels as EEG.") | |
| ch_types = ["eeg"] * len(ch_names) | |
| else: | |
| ch_types = [ch_types[ch] for ch in ch_names] | |
| # Go over all the header fields and make sure they are all defined to | |
| # something sensible. | |
| if "TimePoints" in fields: | |
| fields["TimePoints"] = int(fields["TimePoints"]) | |
| if fields["TimePoints"] != data.shape[0]: | |
| logger.warn( | |
| f"The size of the data matrix ({data.shape}) does not " | |
| f'match the "TimePoints" field ({fields["TimePoints"]}).' | |
| ) | |
| if "Channels" in fields: | |
| fields["Channels"] = int(fields["Channels"]) | |
| if fields["Channels"] != data.shape[1]: | |
| logger.warn( | |
| f"The size of the data matrix ({data.shape}) does not " | |
| f'match the "Channels" field ({fields["Channels"]}).' | |
| ) | |
| if "SamplingInterval[ms]" in fields: | |
| fields["SamplingInterval[ms]"] = float(fields["SamplingInterval[ms]"]) | |
| else: | |
| raise RuntimeError( | |
| 'No "SamplingInterval[ms]" field present. Could ' | |
| "not determine sampling frequency." | |
| ) | |
| if "BeginSweep[ms]" in fields: | |
| fields["BeginSweep[ms]"] = float(fields["BeginSweep[ms]"]) | |
| else: | |
| fields["BeginSweep[ms]"] = 0.0 | |
| if "Bins/uV" in fields: | |
| fields["Bins/uV"] = float(fields["Bins/uV"]) | |
| else: | |
| fields["Bins/uV"] = 1 | |
| if "SegmentName" not in fields: | |
| fields["SegmentName"] = "" | |
| # Build the Evoked object based on the header fields. | |
| info = create_info( | |
| ch_names, sfreq=1000 / fields["SamplingInterval[ms]"], ch_types=ch_types | |
| ) | |
| return EvokedArray( | |
| data.T / fields["Bins/uV"] / 1e6, | |
| info, | |
| tmin=fields["BeginSweep[ms]"] / 1000, | |
| comment=fields["SegmentName"], | |
| verbose=verbose, | |
| ) | |
| def _parse_header(header): | |
| """Parse an .avr or .mul header string into name/val pairs. | |
| The header line looks like: | |
| Npts= 256 TSB= 0.000 DI= 4.000000 SB= 1.000 SC= 200.0 Nchan= 27 | |
| No consistent use of separation chars, so parsing this is a bit iffy. | |
| Parameters | |
| ---------- | |
| header : str | |
| The first line of the file. | |
| Returns | |
| ------- | |
| fields : dict | |
| The parsed header fields | |
| """ | |
| parts = header.split() # Splits on one or more spaces | |
| name_val_pairs = zip(parts[::2], parts[1::2]) | |
| return dict((name.replace("=", ""), val) for name, val in name_val_pairs) | |
| def _read_elp_sidecar(fname): | |
| """Read a possible .elp sidecar file with electrode information. | |
| The reason we don't use the read_custom_montage for this is that we are | |
| interested in the channel types, which a DigMontage object does not provide | |
| us. | |
| Parameters | |
| ---------- | |
| fname : Path | |
| The path of the .avr or .mul file. The corresponding .elp file will be | |
| derived from this path. | |
| Returns | |
| ------- | |
| ch_type : OrderedDict | None | |
| If the sidecar file exists, return a dictionary mapping channel names | |
| to channel types. Otherwise returns ``None``. | |
| """ | |
| fname_elp = fname.parent / (fname.stem + ".elp") | |
| if not fname_elp.exists(): | |
| logger.info(f"No {fname_elp} file present containing electrode information.") | |
| return None | |
| logger.info(f"Reading electrode names and types from {fname_elp}") | |
| ch_types = OrderedDict() | |
| with open(fname_elp) as f: | |
| lines = f.readlines() | |
| if len(lines[0].split()) > 3: | |
| # Channel types present | |
| for line in lines: | |
| ch_type, ch_name = line.split()[:2] | |
| ch_types[ch_name] = ch_type.lower() | |
| else: | |
| # No channel types present | |
| logger.info( | |
| "No channel types present in .elp file. Marking all channels as EEG." | |
| ) | |
| for line in lines: | |
| ch_name = line.split()[:1] | |
| ch_types[ch_name] = "eeg" | |
| return ch_types | |