Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| from gzip import GzipFile | |
| from io import SEEK_SET, BytesIO | |
| from pathlib import Path | |
| import numpy as np | |
| from scipy.sparse import issparse | |
| from ..utils import _check_fname, _file_like, _validate_type, logger, verbose, warn | |
| from .constants import FIFF | |
| from .tag import Tag, _call_dict_names, _matrix_info, _read_tag_header, read_tag | |
| from .tree import dir_tree_find, make_dir_tree | |
| class _NoCloseRead: | |
| """Create a wrapper that will not close when used as a context manager.""" | |
| def __init__(self, fid): | |
| self.fid = fid | |
| def __enter__(self): | |
| return self.fid | |
| def __exit__(self, type_, value, traceback): | |
| return | |
| def close(self): | |
| return | |
| def seek(self, offset, whence=SEEK_SET): | |
| return self.fid.seek(offset, whence) | |
| def read(self, size=-1): | |
| return self.fid.read(size) | |
| def _fiff_get_fid(fname): | |
| """Open a FIF file with no additional parsing.""" | |
| if _file_like(fname): | |
| logger.debug("Using file-like I/O") | |
| fid = _NoCloseRead(fname) | |
| fid.seek(0) | |
| else: | |
| _validate_type(fname, Path, "fname", extra="or file-like") | |
| if fname.suffixes[-1] == ".gz": | |
| logger.debug("Using gzip I/O") | |
| fid = GzipFile(fname, "rb") # Open in binary mode | |
| else: | |
| logger.debug("Using normal I/O") | |
| fid = open(fname, "rb") # Open in binary mode | |
| return fid | |
| def _get_next_fname(fid, fname, tree): | |
| """Get the next filename in split files.""" | |
| _validate_type(fname, (Path, None), "fname") | |
| nodes_list = dir_tree_find(tree, FIFF.FIFFB_REF) | |
| next_fname = None | |
| for nodes in nodes_list: | |
| next_fname = None | |
| for ent in nodes["directory"]: | |
| if ent.kind == FIFF.FIFF_REF_ROLE: | |
| tag = read_tag(fid, ent.pos) | |
| role = int(tag.data.item()) | |
| if role != FIFF.FIFFV_ROLE_NEXT_FILE: | |
| next_fname = None | |
| break | |
| if ent.kind not in (FIFF.FIFF_REF_FILE_NAME, FIFF.FIFF_REF_FILE_NUM): | |
| continue | |
| # If we can't resolve it, assume/hope it's in the current directory | |
| if fname is None: | |
| fname = Path().resolve() | |
| if ent.kind == FIFF.FIFF_REF_FILE_NAME: | |
| tag = read_tag(fid, ent.pos) | |
| next_fname = fname.parent / tag.data | |
| if ent.kind == FIFF.FIFF_REF_FILE_NUM: | |
| # Some files don't have the name, just the number. So | |
| # we construct the name from the current name. | |
| if next_fname is not None: | |
| continue | |
| next_num = read_tag(fid, ent.pos).data.item() | |
| base = fname.name | |
| idx = base.find(".") | |
| idx2 = base.rfind("-") | |
| num_str = base[idx2 + 1 : idx] | |
| if not num_str.isdigit(): | |
| idx2 = -1 | |
| if idx2 < 0 and next_num == 1: | |
| # this is the first file, which may not be numbered | |
| next_fname = ( | |
| fname.parent / f"{base[:idx]}-{next_num:d}.{base[idx + 1 :]}" | |
| ) | |
| continue | |
| next_fname = ( | |
| fname.parent / f"{base[:idx2]}-{next_num:d}.{base[idx + 1 :]}" | |
| ) | |
| if next_fname is not None: | |
| break | |
| return next_fname | |
| def fiff_open(fname, preload=False, verbose=None): | |
| """Open a FIF file. | |
| Parameters | |
| ---------- | |
| fname : path-like | fid | |
| Name of the fif file, or an opened file (will seek back to 0). | |
| preload : bool | |
| If True, all data from the file is read into a memory buffer. This | |
| requires more memory, but can be faster for I/O operations that require | |
| frequent seeks. | |
| %(verbose)s | |
| Returns | |
| ------- | |
| fid : file | |
| The file descriptor of the open file. | |
| tree : fif tree | |
| The tree is a complex structure filled with dictionaries, | |
| lists and tags. | |
| directory : list | |
| A list of tags. | |
| """ | |
| fid = _fiff_get_fid(fname) | |
| try: | |
| return _fiff_open(fname, fid, preload) | |
| except Exception: | |
| fid.close() | |
| raise | |
| def _fiff_open(fname, fid, preload): | |
| # do preloading of entire file | |
| if preload: | |
| # note that StringIO objects instantiated this way are read-only, | |
| # but that's okay here since we are using mode "rb" anyway | |
| with fid as fid_old: | |
| fid = BytesIO(fid_old.read()) | |
| tag = _read_tag_header(fid, 0) | |
| # Check that this looks like a fif file | |
| prefix = f"file {repr(fname)} does not" | |
| if tag.kind != FIFF.FIFF_FILE_ID: | |
| raise ValueError(f"{prefix} start with a file id tag") | |
| if tag.type != FIFF.FIFFT_ID_STRUCT: | |
| raise ValueError(f"{prefix} start with a file id tag") | |
| if tag.size != 20: | |
| raise ValueError(f"{prefix} start with a file id tag") | |
| tag = read_tag(fid, tag.next_pos) | |
| if tag.kind != FIFF.FIFF_DIR_POINTER: | |
| raise ValueError(f"{prefix} have a directory pointer") | |
| # Read or create the directory tree | |
| logger.debug(f" Creating tag directory for {fname}...") | |
| dirpos = int(tag.data.item()) | |
| read_slow = True | |
| if dirpos > 0: | |
| dir_tag = read_tag(fid, dirpos) | |
| if dir_tag is None or dir_tag.data is None: | |
| fid.seek(0, 2) # move to end of file | |
| size = fid.tell() | |
| extra = "" if size > dirpos else f" > file size {size}" | |
| warn( | |
| "FIF tag directory missing at the end of the file " | |
| f"(at byte {dirpos}{extra}), possibly corrupted file: {fname}" | |
| ) | |
| else: | |
| directory = dir_tag.data | |
| read_slow = False | |
| if read_slow: | |
| pos = 0 | |
| fid.seek(pos, 0) | |
| directory = list() | |
| while pos is not None: | |
| tag = _read_tag_header(fid, pos) | |
| if tag is None: | |
| break # HACK : to fix file ending with empty tag... | |
| pos = tag.next_pos | |
| directory.append(tag) | |
| tree, _ = make_dir_tree(fid, directory, indent=1) | |
| logger.debug("[done]") | |
| # Back to the beginning | |
| fid.seek(0) | |
| return fid, tree, directory | |
| def show_fiff( | |
| fname, | |
| indent=" ", | |
| read_limit=np.inf, | |
| max_str=30, | |
| output=str, | |
| tag=None, | |
| *, | |
| show_bytes=False, | |
| verbose=None, | |
| ): | |
| """Show FIFF information. | |
| This function is similar to mne_show_fiff. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| Filename to evaluate. | |
| indent : str | |
| How to indent the lines. | |
| read_limit : int | |
| Max number of bytes of data to read from a tag. Can be np.inf | |
| to always read all data (helps test read completion). | |
| max_str : int | |
| Max number of characters of string representation to print for | |
| each tag's data. | |
| output : type | |
| Either str or list. str is a convenience output for printing. | |
| tag : int | None | |
| Provide information about this tag. If None (default), all information | |
| is shown. | |
| show_bytes : bool | |
| If True (default False), print the byte offsets of each tag. | |
| %(verbose)s | |
| Returns | |
| ------- | |
| contents : str | |
| The contents of the file. | |
| """ | |
| if output not in [list, str]: | |
| raise ValueError("output must be list or str") | |
| if isinstance(tag, str): # command mne show_fiff passes string | |
| tag = int(tag) | |
| fname = _check_fname(fname, "read", True) | |
| f, tree, _ = fiff_open(fname) | |
| # This gets set to 0 (unknown) by fiff_open, but FIFFB_ROOT probably | |
| # makes more sense for display | |
| tree["block"] = FIFF.FIFFB_ROOT | |
| with f as fid: | |
| out = _show_tree( | |
| fid, | |
| tree, | |
| indent=indent, | |
| level=0, | |
| read_limit=read_limit, | |
| max_str=max_str, | |
| tag_id=tag, | |
| show_bytes=show_bytes, | |
| ) | |
| if output is str: | |
| out = "\n".join(out) | |
| return out | |
| def _find_type(value, fmts=("FIFF_",), exclude=("FIFF_UNIT",)): | |
| """Find matching values.""" | |
| value = int(value) | |
| vals = [ | |
| k | |
| for k, v in FIFF.items() | |
| if v == value | |
| and any(fmt in k for fmt in fmts) | |
| and not any(exc in k for exc in exclude) | |
| ] | |
| if len(vals) == 0: | |
| vals = ["???"] | |
| return vals | |
| def _show_tree( | |
| fid, | |
| tree, | |
| indent, | |
| level, | |
| read_limit, | |
| max_str, | |
| tag_id, | |
| *, | |
| show_bytes=False, | |
| ): | |
| """Show FIFF tree.""" | |
| this_idt = indent * level | |
| next_idt = indent * (level + 1) | |
| # print block-level information | |
| found_types = "/".join(_find_type(tree["block"], fmts=["FIFFB_"])) | |
| out = [f"{this_idt}{str(int(tree['block'])).ljust(4)} = {found_types}"] | |
| tag_found = False | |
| if tag_id is None or out[0].strip().startswith(str(tag_id)): | |
| tag_found = True | |
| if tree["directory"] is not None: | |
| kinds = [ent.kind for ent in tree["directory"]] + [-1] | |
| types = [ent.type for ent in tree["directory"]] | |
| sizes = [ent.size for ent in tree["directory"]] | |
| poss = [ent.pos for ent in tree["directory"]] | |
| counter = 0 | |
| good = True | |
| for k, kn, size, pos, type_ in zip(kinds[:-1], kinds[1:], sizes, poss, types): | |
| if not tag_found and k != tag_id: | |
| continue | |
| tag = Tag(kind=k, type=type_, size=size, next=FIFF.FIFFV_NEXT_NONE, pos=pos) | |
| if read_limit is None or size <= read_limit: | |
| try: | |
| tag = read_tag(fid, pos) | |
| except Exception: | |
| good = False | |
| if kn == k: | |
| # don't print if the next item is the same type (count 'em) | |
| counter += 1 | |
| else: | |
| if show_bytes: | |
| at = f" @{pos}" | |
| else: | |
| at = "" | |
| # find the tag type | |
| this_type = _find_type(k, fmts=["FIFF_"]) | |
| # prepend a count if necessary | |
| prepend = "x" + str(counter + 1) + ": " if counter > 0 else "" | |
| postpend = "" | |
| # print tag data nicely | |
| if tag.data is not None: | |
| postpend = " = " + str(tag.data)[:max_str] | |
| if isinstance(tag.data, np.ndarray): | |
| if tag.data.size > 1: | |
| postpend += " ... array size=" + str(tag.data.size) | |
| elif isinstance(tag.data, dict): | |
| postpend += " ... dict len=" + str(len(tag.data)) | |
| elif isinstance(tag.data, str): | |
| postpend += " ... str len=" + str(len(tag.data)) | |
| elif isinstance(tag.data, list | tuple): | |
| postpend += " ... list len=" + str(len(tag.data)) | |
| elif issparse(tag.data): | |
| postpend += ( | |
| f" ... sparse ({tag.data.__class__.__name__}) shape=" | |
| f"{tag.data.shape}" | |
| ) | |
| else: | |
| postpend += " ... type=" + str(type(tag.data)) | |
| postpend = ">" * 20 + f"BAD @{pos}" if not good else postpend | |
| matrix_info = _matrix_info(tag) | |
| if matrix_info is not None: | |
| _, type_, _, _ = matrix_info | |
| type_ = _call_dict_names.get(type_, f"?{type_}?") | |
| this_type = "/".join(this_type) | |
| out += [ | |
| f"{next_idt}{prepend}{str(k).ljust(4)} = " | |
| f"{this_type}{at} ({size}b {type_}) {postpend}" | |
| ] | |
| out[-1] = out[-1].replace("\n", "¶") | |
| counter = 0 | |
| good = True | |
| if tag_id in kinds: | |
| tag_found = True | |
| if not tag_found: | |
| out = [""] | |
| level = -1 # removes extra indent | |
| # deal with children | |
| for branch in tree["children"]: | |
| out += _show_tree( | |
| fid, | |
| branch, | |
| indent, | |
| level + 1, | |
| read_limit, | |
| max_str, | |
| tag_id, | |
| show_bytes=show_bytes, | |
| ) | |
| return out | |