File size: 4,352 Bytes
7c89ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time

from mozdef_util.bulk_queue import BulkQueue
from mozdef_util.query_models import SearchQuery, ExistsMatch

from tests.unit_test_suite import UnitTestSuite


class BulkQueueTest(UnitTestSuite):
    def setup(self):
        super().setup()

    def num_objects_saved(self):
        self.refresh(self.event_index_name)
        search_query = SearchQuery()
        search_query.add_must(ExistsMatch('keyname'))
        results = search_query.execute(self.es_client)
        return len(results['hits'])


class TestBasicInit(BulkQueueTest):

    def setup(self):
        super().setup()
        self.queue = BulkQueue(self.es_client)

    def test_threshold(self):
        assert self.queue.threshold == 10

    def test_size(self):
        assert self.queue.size() == 0

    def test_flush_time(self):
        assert self.queue.flush_time == 30


class TestInitWithThreshold(BulkQueueTest):

    def test_init_with_threshold(self):
        queue = BulkQueue(self.es_client, 100)
        assert queue.threshold == 100


class TestAdd(BulkQueueTest):

    def setup(self):
        super().setup()
        self.queue = BulkQueue(self.es_client, threshold=20)

    def test_basic_add(self):
        assert self.queue.size() == 0
        self.queue.add(index='events', body={'keyname', 'valuename'})
        assert self.queue.size() == 1
        assert self.queue.started() is False

    def test_add_exact_threshold(self):
        for num in range(0, 20):
            self.queue.add(index='events', body={'keyname': 'value' + str(num)})
        assert self.queue.size() == 0
        assert self.num_objects_saved() == 20
        assert self.queue.started() is False

    def test_add_over_threshold(self):
        for num in range(0, 21):
            self.queue.add(index='events', body={'keyname': 'value' + str(num)})
        assert self.num_objects_saved() == 20
        assert self.queue.size() == 1
        assert self.queue.started() is False

    def test_add_multiple_thresholds(self):
        for num in range(0, 201):
            self.queue.add(index='events', body={'keyname': 'value' + str(num)})
        assert self.num_objects_saved() == 200
        assert self.queue.size() == 1
        assert self.queue.started() is False


class TestTimer(BulkQueueTest):

    def test_basic_timer(self):
        queue = BulkQueue(self.es_client, flush_time=2)
        assert queue.started() is False
        queue.start_thread()
        assert queue.started() is True
        queue.add(index='events', body={'keyname': 'valuename'})
        assert queue.size() == 1
        time.sleep(3)
        assert queue.size() == 0
        queue.stop_thread()
        assert queue.started() is False

    def test_over_threshold(self):
        queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
        queue.start_thread()
        for num in range(0, 201):
            queue.add(index='events', body={'keyname': 'value' + str(num)})
        assert self.num_objects_saved() == 200
        assert queue.size() == 1
        time.sleep(4)
        assert self.num_objects_saved() == 201
        assert queue.size() == 0
        queue.stop_thread()

    def test_two_iterations(self):
        queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
        queue.start_thread()
        for num in range(0, 201):
            queue.add(index='events', body={'keyname': 'value' + str(num)})
        assert self.num_objects_saved() == 200
        assert queue.size() == 1
        time.sleep(3)
        assert self.num_objects_saved() == 201
        assert queue.size() == 0
        for num in range(0, 201):
            queue.add(index='events', body={'keyname': 'value' + str(num)})
        assert self.num_objects_saved() == 401
        time.sleep(3)
        assert self.num_objects_saved() == 402
        queue.stop_thread()

    def test_ten_iterations(self):
        queue = BulkQueue(self.es_client, flush_time=3, threshold=10)
        queue.start_thread()
        total_events = 0
        for num_rounds in range(0, 10):
            for num in range(0, 20):
                total_events += 1
                queue.add(index='events', body={'keyname': 'value' + str(num)})
            assert self.num_objects_saved() == total_events
        assert queue.size() == 0
        queue.stop_thread()
        assert self.num_objects_saved() == 200