| |
| from pathlib import Path |
| import json |
| from inspect import stack |
| from .ticker import IntervalTicker |
|
|
| _CURRENT_BEAT_STACK = [] |
|
|
|
|
| def get_heartbeat(): |
| """ |
| Returns: |
| The :class:`HeartBeat` object that's currently being used. |
| Throws an error if no :class:`EventStorage` is currently enabled. |
| """ |
| assert len( |
| _CURRENT_BEAT_STACK |
| ), "get_heartbeat() has to be called inside a 'with EventStorage(...)' context!" |
| return _CURRENT_BEAT_STACK[-1] |
|
|
|
|
| def get_tqdm_meter(pbar, format_dict): |
| format_dict['bar_format'] = "{r_bar}" |
| meter_str = pbar.format_meter(**format_dict) |
| meter_str = meter_str[2:] |
| return meter_str |
|
|
|
|
| def caller_info(n_stack_up): |
| info = stack()[1 + n_stack_up] |
| msg = f"{info.filename}:{info.lineno} - {info.function}" |
| return msg |
|
|
|
|
| class HeartBeat(): |
| def __init__( |
| self, pbar, write_interval=10, |
| output_dir="./", fname="heartbeat.json" |
| ): |
| self.pbar = pbar |
| self.fname = Path(output_dir) / fname |
| self.ticker = IntervalTicker(write_interval) |
| self.completed = False |
|
|
| |
| self.beat(force_write=True, n_stack_up=2) |
|
|
| def beat(self, force_write=False, n_stack_up=1): |
| on_write_period = self.ticker.tick() |
| if force_write or on_write_period: |
| stats = self.stats() |
| stats['caller'] = caller_info(n_stack_up) |
|
|
| with open(self.fname, "w") as f: |
| json.dump(stats, f) |
|
|
| def done(self): |
| self.completed = True |
| self.beat(force_write=True, n_stack_up=2) |
|
|
| def stats(self): |
| pbar = self.pbar |
| fdict = pbar.format_dict |
| stats = { |
| "beat": self.ticker.tick_str(), |
| "done": self.completed, |
| "meter": get_tqdm_meter(pbar, fdict), |
| "elapsed": int(fdict['elapsed']) |
| } |
| return stats |
|
|
| def __enter__(self): |
| _CURRENT_BEAT_STACK.append(self) |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| assert _CURRENT_BEAT_STACK[-1] == self |
| _CURRENT_BEAT_STACK.pop() |
|
|