| import time |
|
|
| from mozdef_util.bulk_queue import BulkQueue |
| from mozdef_util.query_models import SearchQuery, ExistsMatch |
|
|
| from tests.unit_test_suite import UnitTestSuite |
|
|
|
|
| class BulkQueueTest(UnitTestSuite): |
| def setup(self): |
| super().setup() |
|
|
| def num_objects_saved(self): |
| self.refresh(self.event_index_name) |
| search_query = SearchQuery() |
| search_query.add_must(ExistsMatch('keyname')) |
| results = search_query.execute(self.es_client) |
| return len(results['hits']) |
|
|
|
|
| class TestBasicInit(BulkQueueTest): |
|
|
| def setup(self): |
| super().setup() |
| self.queue = BulkQueue(self.es_client) |
|
|
| def test_threshold(self): |
| assert self.queue.threshold == 10 |
|
|
| def test_size(self): |
| assert self.queue.size() == 0 |
|
|
| def test_flush_time(self): |
| assert self.queue.flush_time == 30 |
|
|
|
|
| class TestInitWithThreshold(BulkQueueTest): |
|
|
| def test_init_with_threshold(self): |
| queue = BulkQueue(self.es_client, 100) |
| assert queue.threshold == 100 |
|
|
|
|
| class TestAdd(BulkQueueTest): |
|
|
| def setup(self): |
| super().setup() |
| self.queue = BulkQueue(self.es_client, threshold=20) |
|
|
| def test_basic_add(self): |
| assert self.queue.size() == 0 |
| self.queue.add(index='events', body={'keyname', 'valuename'}) |
| assert self.queue.size() == 1 |
| assert self.queue.started() is False |
|
|
| def test_add_exact_threshold(self): |
| for num in range(0, 20): |
| self.queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.queue.size() == 0 |
| assert self.num_objects_saved() == 20 |
| assert self.queue.started() is False |
|
|
| def test_add_over_threshold(self): |
| for num in range(0, 21): |
| self.queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.num_objects_saved() == 20 |
| assert self.queue.size() == 1 |
| assert self.queue.started() is False |
|
|
| def test_add_multiple_thresholds(self): |
| for num in range(0, 201): |
| self.queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.num_objects_saved() == 200 |
| assert self.queue.size() == 1 |
| assert self.queue.started() is False |
|
|
|
|
| class TestTimer(BulkQueueTest): |
|
|
| def test_basic_timer(self): |
| queue = BulkQueue(self.es_client, flush_time=2) |
| assert queue.started() is False |
| queue.start_thread() |
| assert queue.started() is True |
| queue.add(index='events', body={'keyname': 'valuename'}) |
| assert queue.size() == 1 |
| time.sleep(3) |
| assert queue.size() == 0 |
| queue.stop_thread() |
| assert queue.started() is False |
|
|
| def test_over_threshold(self): |
| queue = BulkQueue(self.es_client, flush_time=3, threshold=10) |
| queue.start_thread() |
| for num in range(0, 201): |
| queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.num_objects_saved() == 200 |
| assert queue.size() == 1 |
| time.sleep(4) |
| assert self.num_objects_saved() == 201 |
| assert queue.size() == 0 |
| queue.stop_thread() |
|
|
| def test_two_iterations(self): |
| queue = BulkQueue(self.es_client, flush_time=3, threshold=10) |
| queue.start_thread() |
| for num in range(0, 201): |
| queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.num_objects_saved() == 200 |
| assert queue.size() == 1 |
| time.sleep(3) |
| assert self.num_objects_saved() == 201 |
| assert queue.size() == 0 |
| for num in range(0, 201): |
| queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.num_objects_saved() == 401 |
| time.sleep(3) |
| assert self.num_objects_saved() == 402 |
| queue.stop_thread() |
|
|
| def test_ten_iterations(self): |
| queue = BulkQueue(self.es_client, flush_time=3, threshold=10) |
| queue.start_thread() |
| total_events = 0 |
| for num_rounds in range(0, 10): |
| for num in range(0, 20): |
| total_events += 1 |
| queue.add(index='events', body={'keyname': 'value' + str(num)}) |
| assert self.num_objects_saved() == total_events |
| assert queue.size() == 0 |
| queue.stop_thread() |
| assert self.num_objects_saved() == 200 |
|
|