File size: 4,352 Bytes
7c89ed7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import time
from mozdef_util.bulk_queue import BulkQueue
from mozdef_util.query_models import SearchQuery, ExistsMatch
from tests.unit_test_suite import UnitTestSuite
class BulkQueueTest(UnitTestSuite):
def setup(self):
super().setup()
def num_objects_saved(self):
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(ExistsMatch('keyname'))
results = search_query.execute(self.es_client)
return len(results['hits'])
class TestBasicInit(BulkQueueTest):
def setup(self):
super().setup()
self.queue = BulkQueue(self.es_client)
def test_threshold(self):
assert self.queue.threshold == 10
def test_size(self):
assert self.queue.size() == 0
def test_flush_time(self):
assert self.queue.flush_time == 30
class TestInitWithThreshold(BulkQueueTest):
def test_init_with_threshold(self):
queue = BulkQueue(self.es_client, 100)
assert queue.threshold == 100
class TestAdd(BulkQueueTest):
def setup(self):
super().setup()
self.queue = BulkQueue(self.es_client, threshold=20)
def test_basic_add(self):
assert self.queue.size() == 0
self.queue.add(index='events', body={'keyname', 'valuename'})
assert self.queue.size() == 1
assert self.queue.started() is False
def test_add_exact_threshold(self):
for num in range(0, 20):
self.queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.queue.size() == 0
assert self.num_objects_saved() == 20
assert self.queue.started() is False
def test_add_over_threshold(self):
for num in range(0, 21):
self.queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 20
assert self.queue.size() == 1
assert self.queue.started() is False
def test_add_multiple_thresholds(self):
for num in range(0, 201):
self.queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert self.queue.size() == 1
assert self.queue.started() is False
class TestTimer(BulkQueueTest):
def test_basic_timer(self):
queue = BulkQueue(self.es_client, flush_time=2)
assert queue.started() is False
queue.start_thread()
assert queue.started() is True
queue.add(index='events', body={'keyname': 'valuename'})
assert queue.size() == 1
time.sleep(3)
assert queue.size() == 0
queue.stop_thread()
assert queue.started() is False
def test_over_threshold(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_thread()
for num in range(0, 201):
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert queue.size() == 1
time.sleep(4)
assert self.num_objects_saved() == 201
assert queue.size() == 0
queue.stop_thread()
def test_two_iterations(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_thread()
for num in range(0, 201):
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert queue.size() == 1
time.sleep(3)
assert self.num_objects_saved() == 201
assert queue.size() == 0
for num in range(0, 201):
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 401
time.sleep(3)
assert self.num_objects_saved() == 402
queue.stop_thread()
def test_ten_iterations(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_thread()
total_events = 0
for num_rounds in range(0, 10):
for num in range(0, 20):
total_events += 1
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == total_events
assert queue.size() == 0
queue.stop_thread()
assert self.num_objects_saved() == 200
|