File size: 3,284 Bytes
7c89ed7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation

import pytz
import tzlocal
import datetime
import os
import sys


def utc_timezone():
    return pytz.timezone('UTC')


tzlocal.get_localzone = utc_timezone


class MockOptions():
    @property
    def mozdefhostname(self):
        return 'sample'


class TestKeyMapping():
    def teardown(self):
        sys.path.remove(self.mq_path)

    def setup(self):
        if 'lib' in sys.modules:
            del sys.modules['lib']
        self.mq_path = os.path.join(os.path.dirname(__file__), "../../mq/")
        sys.path.insert(0, self.mq_path)
        from mq import esworker_eventtask
        mock_options = MockOptions()
        esworker_eventtask.options = mock_options
        self.key_mapping = esworker_eventtask.keyMapping

    def test_syslog_dict(self):
        syslog_dict = {
            'CATEGORY': 'syslog',
            'DATE': 'Oct 27 14:01:12',
            'FACILITY': 'daemon',
            'HOST': 'ub_server',
            'HOST_FROM': '10.1.20.139',
            'LEGACY_MSGHDR': 'systemd[1]: ',
            'MESSAGE': 'Stopped Getty on tty1.',
            'PID': '1',
            'PRIORITY': 'info',
            'PROGRAM': 'systemd',
            'SEQNUM': '8',
            'SOURCE': 'syslog_tcp',
            'SOURCEIP': '10.1.20.139',
            'TAGS': '.source.syslog_tcp'
        }

        result = self.key_mapping(syslog_dict)
        assert result['processid'] == '1'
        assert result['processname'] == 'systemd'
        assert result['severity'] == 'INFO'
        assert result['mozdefhostname'] == 'sample'
        assert result['hostname'] == 'ub_server'
        assert result['summary'] == 'Stopped Getty on tty1.'
        assert result['source'] == 'syslog_tcp'
        assert result['receivedtimestamp'] != result['utctimestamp']
        expected_year = datetime.datetime.now().year
        assert result['utctimestamp'] == str(expected_year) + '-10-27T14:01:12+00:00'
        assert result['timestamp'] == str(expected_year) + '-10-27T14:01:12+00:00'
        assert result['details']['eventsourceipaddress'] == '10.1.20.139'
        assert result['tags'] == ['.source.syslog_tcp']
        assert result['category'] == 'syslog'

    def test_tags_list(self):
        tags_dict = {
            'tags': ['example1']
        }
        result = self.key_mapping(tags_dict)
        assert result['tags'] == ['example1']

    def test_details_nondict(self):
        message = {
            'summary': 'example summary',
            'payload': 'examplepayload',
            'details': 'somestring',
        }
        result = self.key_mapping(message)
        assert result['summary'] == 'example summary'
        assert sorted(result['details'].keys()) == ['message', 'payload']
        assert result['details']['message'] == 'somestring'
        assert result['details']['payload'] == 'examplepayload'

    def test_no_details(self):
        message = {
            'summary': 'example summary',
        }
        result = self.key_mapping(message)
        assert result['details'] == {}