| """Input and output utilities. |
| |
| The central concept in the I/O infrastructure is a "frame" - a dataclass that represents one atomic |
| configuration that contains data of different kinds. Atomic units are used in the frame object itself, |
| unless explicitly stated otherwise. Units specified by the file format are used in the files themselves. |
| """ |
|
|
| __all__ = [ |
| 'AnyPath', |
| 'get_fn_test', |
| 'Frame', |
| 'open_safe', |
| 'working_directory', |
| 'temporary_directory', |
| 'to_file', |
| 'from_file', |
| 'read_frames', |
| 'write_frames', |
| 'merge_frames', |
| ] |
|
|
| import os |
| import shutil |
| from collections import defaultdict |
| from contextlib import contextmanager |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Optional, Sequence, Union |
|
|
| import numpy as np |
|
|
| from ..utilities import AMLIOError |
|
|
|
|
| |
| formats = defaultdict(dict) |
|
|
|
|
| |
| ext2fmt = dict() |
|
|
|
|
| AnyPath = Union[str, Path] |
|
|
|
|
| def get_fn_test(filename): |
| """Get absolute file names of test data. |
| |
| Arguments: |
| filename: name of file in the test data directory, no path |
| """ |
| fn_out = Path(__file__).parent.parent / '../tests/data' / filename |
| return fn_out.resolve() |
|
|
|
|
| def register_io(fformat: str, operation: str, extension: Union[str, None] = None): |
| """Decorator to register an I/O operation for a specific file format. |
| |
| Optionally, the function can also register a file name extension to automatic |
| detection of file format from file name. |
| |
| Arguments: |
| fformat: name of file format |
| operation: I/O operation - "read" or "write" |
| extension: file name extension or `None` |
| """ |
| def decorator(function): |
| if operation not in ('read', 'write'): |
| raise ValueError('Unrecognized operation. Allowed values: "read", "write".') |
| formats[fformat][operation] = function |
| if extension is not None: |
| formats[fformat]['extension'] = extension |
| if (extension in ext2fmt.keys()) and ext2fmt[extension] != fformat: |
| raise ValueError(f'Attempted to register the same file extension ({extension}) twice.') |
| ext2fmt[extension] = fformat |
| return decorator |
|
|
|
|
| @dataclass(eq=False) |
| class Frame: |
| """All possible data of a single frame. |
| |
| Used to exchange data between data structure and I/O routines. Defaults are set to `None`, which |
| corresponds to that given kind of data not being set/available. We do not provide a comparison operator, |
| at least for now, as comparing NumPy arrays is more involved. |
| """ |
|
|
| |
| |
| |
| |
|
|
| names: Optional[Sequence] = None |
| positions: Optional[np.ndarray] = None |
| cell: Optional[np.ndarray] = None |
| comment: Optional[str] = None |
| energy: Optional[float] = None |
| forces: Optional[np.ndarray] = None |
|
|
| def update(self, other: 'Frame', force: bool = False): |
| """Update this frame with data from another. |
| |
| Arguments: |
| other: another frame |
| force: whether to overwrite data |
| """ |
|
|
| |
| if (other.names is not None) and (self.names != other.names): |
| raise ValueError('Inconsistent atom names.') |
|
|
| |
| attrs = ['positions', 'cell', 'comment', 'energy', 'forces'] |
| for attr in attrs: |
| attr_o = getattr(other, attr) |
| if attr_o is not None: |
| if force or (getattr(self, attr) is None): |
| setattr(self, attr, attr_o) |
|
|
|
|
| def open_safe(filename, mode='r', buffering=-1, verbose=False): |
| """A wrapper around `open` which saves backup files. |
| |
| If opening for writing and `filename` exists, it will be renamed |
| so that we do not overwrite any data. |
| |
| Arguments: |
| filename: name of file to open |
| mode: file open mode |
| buffering: passed through to `open` |
| verbose: whether to print to standard output what backup was performed |
| |
| Returns: |
| an open file |
| """ |
|
|
| if mode[0] == 'w': |
| |
|
|
| filename = Path(filename) |
|
|
| i = 0 |
| fn_backup = filename |
| while fn_backup.exists(): |
| name_new = f'#{filename.name:s}#{i:d}#' |
| fn_backup = fn_backup.with_name(name_new) |
| i += 1 |
|
|
| if fn_backup != filename: |
| filename.rename(fn_backup) |
| if verbose: |
| print(f'Backup performed: {filename} -> {fn_backup}\n') |
|
|
| elif mode[0] in ('r', 'a'): |
| |
| pass |
|
|
| else: |
| |
| raise NotImplementedError(f'Unsupported file open mode: {mode:s}.') |
|
|
| return open(filename, mode, buffering) |
|
|
|
|
| @contextmanager |
| def working_directory(directory): |
| """Change working directory within the context. |
| |
| This is not available in the standard library [1] but can be useful, especially for testing. |
| The old fixture in pytest (`tmpdir`) used py.path [2] which has `as_cwd`, but this is legacy |
| code now and not recommended [3]. |
| |
| [1] https://bugs.python.org/issue25625 |
| [2] https://py.readthedocs.io/en/latest/path.html |
| [3] https://docs.pytest.org/en/latest/how-to/tmpdir.html |
| |
| Arguments: |
| directory: directory to change to |
| """ |
|
|
| |
| dir_original = Path().absolute() |
|
|
| |
| try: |
| os.chdir(directory) |
| yield |
| finally: |
| os.chdir(dir_original) |
|
|
|
|
| @contextmanager |
| def temporary_directory(directory: AnyPath, parents: bool = False, keep: bool = False): |
| """Create a temporary directory. |
| |
| The directory is removed upon exiting the context, unless the users asks to keep it. |
| |
| Arguments: |
| directory: directory to create |
| parents: whether to create parents as well |
| keep: whether to keep directory after exiting context |
| """ |
|
|
| directory = Path(directory) |
|
|
| |
| if directory.exists(): |
| raise AMLIOError(f'Unable to create directory, already exists: {directory.absolute()}') |
|
|
| |
| directory.mkdir(parents=parents) |
|
|
| |
| try: |
| yield directory |
| finally: |
| if not keep: |
| shutil.rmtree(directory) |
|
|
|
|
| def from_file(fn_in, binary=False): |
| """Read the contents of a file into a variable. |
| |
| By default, the file will be read as a text file, resulting in a string. |
| It `binary` is true, it will be read as a binary file, resulting in bytes. |
| """ |
|
|
| mode = 'r' |
| if binary: |
| mode += 'b' |
| with open(fn_in, mode) as f_in: |
| data = f_in.read() |
| return data |
|
|
|
|
| def to_file(data, fn_out, binary=False, verbose=False): |
| """Write a variable to a file. |
| |
| The provided `data` would typically be a string or bytes, if `binary` is true. |
| The output file name is protected against overwriting and if `verbose is true, |
| backup file creation will be reported. |
| """ |
|
|
| mode = 'w' |
| if binary: |
| mode += 'b' |
| with open_safe(fn_out, mode, verbose=verbose) as f_out: |
| f_out.write(data) |
|
|
|
|
| def get_io_operation(fn, fformat, operation): |
| """Select I/O function for given file format. |
| |
| Arguments: |
| fn: name of file to operate on |
| fformat: name of file format |
| operation: I/O operation - "read" or "write" |
| |
| Returns: |
| function to read or write one frame |
| """ |
|
|
| if operation not in ('read', 'write'): |
| raise ValueError('Unrecognized operation. Allowed values: "read", "write".') |
|
|
| |
| if fformat is None: |
| fn = Path(fn) |
| extension = fn.suffix[1:] |
| try: |
| fformat = ext2fmt[extension] |
| except KeyError: |
| raise KeyError(f'Extension "{extension:s}" not registered for file format detection.') |
|
|
| try: |
| return formats[fformat][operation] |
| except KeyError: |
| msg = f'File format "{fformat:s}" not supported for operation "{operation:s}".' |
| raise ValueError(msg) |
|
|
|
|
| def read_frames(fn_in, fformat=None, **kwargs): |
| """Iterate over a trajectory file, returning all data for each frame.""" |
|
|
| read_frame = get_io_operation(fn_in, fformat, 'read') |
|
|
| |
| |
| with open(fn_in) as f_in: |
| while True: |
| frame = read_frame(f_in, **kwargs) |
| if frame is None: |
| break |
| yield frame |
|
|
|
|
| def write_frames(fn_out, frames, fformat=None): |
| """Write frames to file. |
| |
| The format of the file is given by `fformat` or inferred from the file |
| extension if `fformat` is `None`. |
| |
| Arguments: |
| fn_out: name of output file |
| frames: iterator over `Frame` objects |
| fformat: format of the file, or `None` |
| label_prop: label of property to include, or `None` |
| """ |
|
|
| write_frame = get_io_operation(fn_out, fformat, 'write') |
|
|
| |
| with open_safe(fn_out, 'w') as f_out: |
| for frame in frames: |
| write_frame(f_out, frame) |
|
|
|
|
| def merge_frames(frames, *frames_others, force: bool = False): |
| """Merge frames from multiple sources. |
| |
| The length of the result will be determined by the length of `frames`, |
| the other iterators should be at least as long as that. |
| |
| Arguments: |
| frames: iterator over `Frame` objects |
| frames_others: more iterators over `Frame` instances |
| force: whether to overwrite data |
| |
| Yields: |
| `Frame` objects |
| """ |
|
|
| for frame in frames: |
| for frames_extra in frames_others: |
| frame.update(next(frames_extra), force=force) |
| yield frame |
|
|