Mozdef / tests /mozdef_util /test_bulk_queue.py
ineso22's picture
Upload folder using huggingface_hub
7c89ed7 verified
import time
from mozdef_util.bulk_queue import BulkQueue
from mozdef_util.query_models import SearchQuery, ExistsMatch
from tests.unit_test_suite import UnitTestSuite
class BulkQueueTest(UnitTestSuite):
def setup(self):
super().setup()
def num_objects_saved(self):
self.refresh(self.event_index_name)
search_query = SearchQuery()
search_query.add_must(ExistsMatch('keyname'))
results = search_query.execute(self.es_client)
return len(results['hits'])
class TestBasicInit(BulkQueueTest):
def setup(self):
super().setup()
self.queue = BulkQueue(self.es_client)
def test_threshold(self):
assert self.queue.threshold == 10
def test_size(self):
assert self.queue.size() == 0
def test_flush_time(self):
assert self.queue.flush_time == 30
class TestInitWithThreshold(BulkQueueTest):
def test_init_with_threshold(self):
queue = BulkQueue(self.es_client, 100)
assert queue.threshold == 100
class TestAdd(BulkQueueTest):
def setup(self):
super().setup()
self.queue = BulkQueue(self.es_client, threshold=20)
def test_basic_add(self):
assert self.queue.size() == 0
self.queue.add(index='events', body={'keyname', 'valuename'})
assert self.queue.size() == 1
assert self.queue.started() is False
def test_add_exact_threshold(self):
for num in range(0, 20):
self.queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.queue.size() == 0
assert self.num_objects_saved() == 20
assert self.queue.started() is False
def test_add_over_threshold(self):
for num in range(0, 21):
self.queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 20
assert self.queue.size() == 1
assert self.queue.started() is False
def test_add_multiple_thresholds(self):
for num in range(0, 201):
self.queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert self.queue.size() == 1
assert self.queue.started() is False
class TestTimer(BulkQueueTest):
def test_basic_timer(self):
queue = BulkQueue(self.es_client, flush_time=2)
assert queue.started() is False
queue.start_thread()
assert queue.started() is True
queue.add(index='events', body={'keyname': 'valuename'})
assert queue.size() == 1
time.sleep(3)
assert queue.size() == 0
queue.stop_thread()
assert queue.started() is False
def test_over_threshold(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_thread()
for num in range(0, 201):
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert queue.size() == 1
time.sleep(4)
assert self.num_objects_saved() == 201
assert queue.size() == 0
queue.stop_thread()
def test_two_iterations(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_thread()
for num in range(0, 201):
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 200
assert queue.size() == 1
time.sleep(3)
assert self.num_objects_saved() == 201
assert queue.size() == 0
for num in range(0, 201):
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == 401
time.sleep(3)
assert self.num_objects_saved() == 402
queue.stop_thread()
def test_ten_iterations(self):
queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
queue.start_thread()
total_events = 0
for num_rounds in range(0, 10):
for num in range(0, 20):
total_events += 1
queue.add(index='events', body={'keyname': 'value' + str(num)})
assert self.num_objects_saved() == total_events
assert queue.size() == 0
queue.stop_thread()
assert self.num_objects_saved() == 200