File size: 6,306 Bytes
7c89ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import os

from mozdef_util.plugin_set import PluginSet


class TestPluginSet(object):
    def setup(self):
        self.plugin_dir = os.path.join(os.path.dirname(__file__), 'test_plugins')
        self.plugin_set = PluginSet(self.plugin_dir)
        self.metadata = {
            'index': 'test',
        }

    def test_registered_plugins(self):
        total_num_plugins = 0
        for name in os.listdir(self.plugin_dir):
            if name.startswith('plugin'):
                total_num_plugins += 1
        assert len(self.plugin_set.enabled_plugins) == total_num_plugins

    def test_registered_plugins_specific_enabled_plugins(self):
        enabled_plugins = ['plugin1']
        plugin_set = PluginSet(self.plugin_dir, enabled_plugins)
        assert len(plugin_set.enabled_plugins) == 1

    def test_registered_plugins_other_enabled_plugins(self):
        enabled_plugins = ['someotherplugin']
        plugin_set = PluginSet(self.plugin_dir, enabled_plugins)
        assert len(plugin_set.enabled_plugins) == 0

    def test_run_plugins_empty_set_plugins(self):
        enabled_plugins = []
        plugin_set = PluginSet(self.plugin_dir, enabled_plugins)
        assert len(plugin_set.enabled_plugins) == 0

    def test_ordered_enabled_plugins(self):
        ordered_plugins = self.plugin_set.ordered_enabled_plugins
        assert ordered_plugins[0]['registration'] == ['apples']
        assert ordered_plugins[1]['registration'] == 'bananas'
        assert ordered_plugins[2]['registration'] == ['*']
        assert ordered_plugins[3]['registration'] == ['oranges']
        assert ordered_plugins[4]['registration'] == ['pears']
        assert ordered_plugins[5]['registration'] == ['grapes']
        assert ordered_plugins[6]['registration'] == ['somesecretvalue']
        assert ordered_plugins[7]['registration'] == ['*']

    def test_run_plugins_matching_key_plugin1(self):
        '''
            Checks to see that we can match on a key
        '''
        message = {'apples': 'sometext', 'otherkey': 'abcd'}

        assert 'unit_test_key' not in message
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        # Verify that our first plugin ran and was able
        # to modify the message
        assert parsed_message['unit_test_key'] == 'apples'
        assert parsed_message == message
        assert parsed_metadata == self.metadata

    def test_run_plugins_matching_value_plugin1(self):
        '''
            Checks to see that we can match on a value
        '''
        message = {'test': 'apples', 'otherkey': 'abcd'}

        assert 'unit_test_key' not in message
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message['unit_test_key'] == 'apples'
        assert parsed_message == message
        assert parsed_metadata == self.metadata

    def test_run_plugins_matching_value_plugin2(self):
        '''
            Checks to see that we run a second plugin
            that can match individually of the first
            and can individually overwrite the same
            key plugin1 set
        '''
        message = {'bananas': 'test', 'otherkey': 'abcd'}

        assert 'unit_test_key' not in message
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message['unit_test_key'] == 'bananas'
        assert parsed_message == message
        assert parsed_metadata == self.metadata

    def test_run_plugins_matching_value_plugin3(self):
        message = {'test': 'oranges', 'otherkey': 'abcd'}

        assert 'unit_test_key' not in message
        assert 'plugin3_key' not in message
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message['plugin3_key'] == 'oranges'
        assert parsed_message == message
        assert parsed_metadata == self.metadata

    def test_run_plugins_matching_value_plugin4(self):
        '''
            Checks to see that a plugin can return None
            to signal the caller to drop the message
        '''
        message = {'test': 'pears', 'otherkey': 'abcd'}

        assert message is not None
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message is None
        # This is the interesting part here, because we return None
        # I couldn't get message to get assigned to None and have it bubble
        # up back here, so we have to deal with the return as the indicator for when to
        # drop an event
        assert message is not None
        assert parsed_metadata == self.metadata

    def test_run_plugins_matching_value_plugin5(self):
        '''
            Checks to see if we will can modify metadata in a plugin
        '''
        message = {'test': 'grapes', 'otherkey': 'abcd'}

        assert 'plugin5_key' not in self.metadata
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message == message
        assert self.metadata['plugin5_key'] == 'grapes'
        assert parsed_metadata['plugin5_key'] == 'grapes'

    def test_run_plugins_matching_value_plugin6(self):
        '''
            This unit tests checks to see if we modify a message
            in a plugin, that a plugin further down the 'line'
            can register and get the updated message
        '''
        message = {'apples': 'test', 'testkey': 'othervalue'}

        assert 'plugin6_key' not in message
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message['plugin6_key'] == 'plums'
        assert parsed_message == message
        assert parsed_metadata == self.metadata

    def test_run_plugins_matching_value_plugin7(self):
        '''
            Checks to see if we will match on any text using *
        '''
        message = {'test': 'testerson', 'otherkey': 'abcd'}

        assert 'plugin7_key' not in message
        parsed_message, parsed_metadata = self.plugin_set.run_plugins(message, self.metadata)
        assert parsed_message['plugin7_key'] == 'lime'
        assert parsed_message == message
        assert parsed_metadata == self.metadata