| |
| import json |
| import os |
| import tempfile |
| import unittest |
|
|
| from detectron2.utils.events import ( |
| CommonMetricPrinter, |
| EventStorage, |
| JSONWriter, |
| get_event_storage, |
| has_event_storage, |
| ) |
|
|
|
|
| class TestEventWriter(unittest.TestCase): |
| def testScalar(self): |
| with tempfile.TemporaryDirectory( |
| prefix="detectron2_tests" |
| ) as dir, EventStorage() as storage: |
| json_file = os.path.join(dir, "test.json") |
| writer = JSONWriter(json_file) |
| for k in range(60): |
| storage.put_scalar("key", k, smoothing_hint=False) |
| if (k + 1) % 20 == 0: |
| writer.write() |
| storage.step() |
| writer.close() |
| with open(json_file) as f: |
| data = [json.loads(l) for l in f] |
| self.assertTrue([int(k["key"]) for k in data] == [19, 39, 59]) |
|
|
| def testScalarMismatchedPeriod(self): |
| with tempfile.TemporaryDirectory( |
| prefix="detectron2_tests" |
| ) as dir, EventStorage() as storage: |
| json_file = os.path.join(dir, "test.json") |
|
|
| writer = JSONWriter(json_file) |
| for k in range(60): |
| if k % 17 == 0: |
| storage.put_scalar("key2", k, smoothing_hint=False) |
| storage.put_scalar("key", k, smoothing_hint=False) |
| if (k + 1) % 20 == 0: |
| writer.write() |
| storage.step() |
| writer.close() |
| with open(json_file) as f: |
| data = [json.loads(l) for l in f] |
| self.assertTrue([int(k.get("key2", 0)) for k in data] == [17, 0, 34, 0, 51, 0]) |
| self.assertTrue([int(k.get("key", 0)) for k in data] == [0, 19, 0, 39, 0, 59]) |
| self.assertTrue([int(k["iteration"]) for k in data] == [17, 19, 34, 39, 51, 59]) |
|
|
| def testPrintETA(self): |
| with EventStorage() as s: |
| p1 = CommonMetricPrinter(10) |
| p2 = CommonMetricPrinter() |
|
|
| s.put_scalar("time", 1.0) |
| s.step() |
| s.put_scalar("time", 1.0) |
| s.step() |
|
|
| with self.assertLogs("detectron2.utils.events") as logs: |
| p1.write() |
| self.assertIn("eta", logs.output[0]) |
|
|
| with self.assertLogs("detectron2.utils.events") as logs: |
| p2.write() |
| self.assertNotIn("eta", logs.output[0]) |
|
|
| def testPrintNonLosses(self): |
| with EventStorage() as s: |
| p1 = CommonMetricPrinter(10) |
| p2 = CommonMetricPrinter() |
|
|
| s.put_scalar("time", 1.0) |
| s.put_scalar("[metric]bn_stat", 1.0) |
| s.step() |
| s.put_scalar("time", 1.0) |
| s.put_scalar("[metric]bn_stat", 1.0) |
| s.step() |
|
|
| with self.assertLogs("detectron2.utils.events") as logs: |
| p1.write() |
| self.assertIn("[metric]bn_stat", logs.output[0]) |
|
|
| with self.assertLogs("detectron2.utils.events") as logs: |
| p2.write() |
| self.assertIn("[metric]bn_stat", logs.output[0]) |
|
|
| def testSmoothingWithWindowSize(self): |
| with tempfile.TemporaryDirectory( |
| prefix="detectron2_tests" |
| ) as dir, EventStorage() as storage: |
| json_file = os.path.join(dir, "test.json") |
| writer = JSONWriter(json_file, window_size=10) |
| for k in range(20): |
| storage.put_scalar("key1", k, smoothing_hint=True) |
| if (k + 1) % 2 == 0: |
| storage.put_scalar("key2", k, smoothing_hint=True) |
| if (k + 1) % 5 == 0: |
| storage.put_scalar("key3", k, smoothing_hint=True) |
| if (k + 1) % 10 == 0: |
| writer.write() |
| storage.step() |
|
|
| num_samples = {k: storage.count_samples(k, 10) for k in ["key1", "key2", "key3"]} |
| self.assertEqual(num_samples, {"key1": 10, "key2": 5, "key3": 2}) |
| writer.close() |
| with open(json_file) as f: |
| data = [json.loads(l) for l in f] |
| self.assertEqual([k["key1"] for k in data], [4.5, 14.5]) |
| self.assertEqual([k["key2"] for k in data], [5, 15]) |
| self.assertEqual([k["key3"] for k in data], [6.5, 16.5]) |
|
|
| def testEventStorage(self): |
| self.assertFalse(has_event_storage()) |
| with EventStorage() as storage: |
| self.assertTrue(has_event_storage()) |
| self.assertEqual(storage, get_event_storage()) |
| self.assertFalse(has_event_storage()) |
|
|