| | |
| | from pathlib import Path |
| | import json |
| | import os |
| | from contextlib import contextmanager |
| | from .ticker import IntervalTicker |
| |
|
| |
|
| | _CURRENT_STORAGE_STACK = [] |
| |
|
| |
|
| | def get_event_storage(): |
| | """ |
| | Returns: |
| | The :class:`EventStorage` object that's currently being used. |
| | Throws an error if no :class:`EventStorage` is currently enabled. |
| | """ |
| | assert len( |
| | _CURRENT_STORAGE_STACK |
| | ), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!" |
| | return _CURRENT_STORAGE_STACK[-1] |
| |
|
| |
|
| | def read_lined_json(fname): |
| | with Path(fname).open('r') as f: |
| | for line in f: |
| | item = json.loads(line) |
| | yield item |
| |
|
| |
|
| | def read_stats(dirname, key): |
| | if dirname is None or not (fname := Path(dirname) / "history.json").is_file(): |
| | return [], [] |
| | stats = read_lined_json(fname) |
| | stats = list(filter(lambda x: key in x, stats)) |
| | xs = [e['iter'] for e in stats] |
| | ys = [e[key] for e in stats] |
| | return xs, ys |
| |
|
| |
|
| | class EventStorage(): |
| | def __init__(self, output_dir="./hotdog", start_iter=0, flush_period=60): |
| | self.iter = start_iter |
| | self.ticker = IntervalTicker(flush_period) |
| | self.history = [] |
| | self._current_prefix = "" |
| | self._init_curr_buffer_() |
| |
|
| | self.output_dir = output_dir |
| | self.writable = False |
| |
|
| | def _open(self): |
| | if self.writable: |
| | output_dir = Path(self.output_dir) |
| | if not output_dir.is_dir(): |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | json_fname = output_dir / 'history.json' |
| |
|
| | self._file_handle = json_fname.open('a', encoding='utf8') |
| | self.output_dir = output_dir |
| |
|
| | def _init_curr_buffer_(self): |
| | self.curr_buffer = {'iter': self.iter} |
| |
|
| | def step(self, flush=False): |
| | self.history.append(self.curr_buffer) |
| |
|
| | on_flush_period = self.ticker.tick() |
| | if flush or on_flush_period: |
| | self.flush_history() |
| |
|
| | self.iter += 1 |
| | self._init_curr_buffer_() |
| |
|
| | def flush_history(self): |
| | if self.writable: |
| | for item in self.history: |
| | line = json.dumps(item, sort_keys=True, ensure_ascii=False) + "\n" |
| | self._file_handle.write(line) |
| | self._file_handle.flush() |
| | self.history = [] |
| |
|
| | def full_key(self, key): |
| | assert isinstance(key, str) |
| | name = self._current_prefix + key |
| | return name |
| |
|
| | def put(self, key, val): |
| | key = self.full_key(key) |
| | assert isinstance(val, (int, float, str)) |
| | if isinstance(val, float): |
| | val = round(val, 3) |
| | self.curr_buffer[key] = val |
| |
|
| | def put_scalars(self, **kwargs): |
| | for k, v in kwargs.items(): |
| | self.put(k, v) |
| |
|
| | def put_artifact(self, key, ext,p, save_func): |
| | if not self.writable: |
| | return |
| | p=p.replace(" ","_") |
| | os.makedirs(self.output_dir / key, exist_ok=True) |
| | fname = (self.output_dir / key / f"step_{self.iter}_{p}").with_suffix(ext) |
| | fname = str(fname) |
| |
|
| | |
| | |
| | |
| | save_func(fname) |
| | self.put(key, fname) |
| | return fname |
| |
|
| | def close(self): |
| | self.flush_history() |
| | if self.writable: |
| | self._file_handle.close() |
| |
|
| | def get_last(self): |
| | if len(self.history) > 0: |
| | last = self.history[-1] |
| | return last |
| |
|
| | def __enter__(self): |
| | if len(_CURRENT_STORAGE_STACK) > 0: |
| | parent = _CURRENT_STORAGE_STACK[-1] |
| | root, dirname = parent.output_dir, self.output_dir |
| | if root is not None and dirname is not None: |
| | child_dir = parent.output_dir / f"{self.output_dir}_{parent.iter}" |
| | self.output_dir = child_dir |
| | parent.put(str(dirname), str(child_dir)) |
| |
|
| | if self.output_dir is not None: |
| | self.writable = True |
| | self._open() |
| |
|
| | _CURRENT_STORAGE_STACK.append(self) |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | assert _CURRENT_STORAGE_STACK[-1] == self |
| | _CURRENT_STORAGE_STACK.pop() |
| | self.close() |
| |
|