Spaces:
Sleeping
Sleeping
| import pytest | |
| import time | |
| import tempfile | |
| import shutil | |
| import os | |
| from os import path | |
| from ding.framework import Parallel | |
| from ding.framework.task import task | |
| from ding.utils import DistributedWriter | |
| def main_distributed_writer(tempdir): | |
| with task.start(): | |
| time.sleep(task.router.node_id * 1) # Sleep 0 and 1, write to different files | |
| tblogger = DistributedWriter(tempdir).plugin(task.router, is_writer=(task.router.node_id == 0)) | |
| def _add_scalar(ctx): | |
| n = 10 | |
| for i in range(n): | |
| tblogger.add_scalar(str(task.router.node_id), task.router.node_id, ctx.total_step * n + i) | |
| task.use(_add_scalar) | |
| task.use(lambda _: time.sleep(0.2)) | |
| task.run(max_step=3) | |
| time.sleep(0.3 + (1 - task.router.node_id) * 2) | |
| def test_distributed_writer(): | |
| tempdir = path.join(tempfile.gettempdir(), "tblogger") | |
| try: | |
| Parallel.runner(n_parallel_workers=2)(main_distributed_writer, tempdir) | |
| assert path.exists(tempdir) | |
| assert len(os.listdir(tempdir)) == 1 | |
| finally: | |
| if path.exists(tempdir): | |
| shutil.rmtree(tempdir) | |