| |
|
|
| |
| |
| |
| |
|
|
| import pytz |
| import tzlocal |
| import datetime |
| import os |
| import sys |
|
|
|
|
| def utc_timezone(): |
| return pytz.timezone('UTC') |
|
|
|
|
| tzlocal.get_localzone = utc_timezone |
|
|
|
|
| class MockOptions(): |
| @property |
| def mozdefhostname(self): |
| return 'sample' |
|
|
|
|
| class TestKeyMapping(): |
| def teardown(self): |
| sys.path.remove(self.mq_path) |
|
|
| def setup(self): |
| if 'lib' in sys.modules: |
| del sys.modules['lib'] |
| self.mq_path = os.path.join(os.path.dirname(__file__), "../../mq/") |
| sys.path.insert(0, self.mq_path) |
| from mq import esworker_eventtask |
| mock_options = MockOptions() |
| esworker_eventtask.options = mock_options |
| self.key_mapping = esworker_eventtask.keyMapping |
|
|
| def test_syslog_dict(self): |
| syslog_dict = { |
| 'CATEGORY': 'syslog', |
| 'DATE': 'Oct 27 14:01:12', |
| 'FACILITY': 'daemon', |
| 'HOST': 'ub_server', |
| 'HOST_FROM': '10.1.20.139', |
| 'LEGACY_MSGHDR': 'systemd[1]: ', |
| 'MESSAGE': 'Stopped Getty on tty1.', |
| 'PID': '1', |
| 'PRIORITY': 'info', |
| 'PROGRAM': 'systemd', |
| 'SEQNUM': '8', |
| 'SOURCE': 'syslog_tcp', |
| 'SOURCEIP': '10.1.20.139', |
| 'TAGS': '.source.syslog_tcp' |
| } |
|
|
| result = self.key_mapping(syslog_dict) |
| assert result['processid'] == '1' |
| assert result['processname'] == 'systemd' |
| assert result['severity'] == 'INFO' |
| assert result['mozdefhostname'] == 'sample' |
| assert result['hostname'] == 'ub_server' |
| assert result['summary'] == 'Stopped Getty on tty1.' |
| assert result['source'] == 'syslog_tcp' |
| assert result['receivedtimestamp'] != result['utctimestamp'] |
| expected_year = datetime.datetime.now().year |
| assert result['utctimestamp'] == str(expected_year) + '-10-27T14:01:12+00:00' |
| assert result['timestamp'] == str(expected_year) + '-10-27T14:01:12+00:00' |
| assert result['details']['eventsourceipaddress'] == '10.1.20.139' |
| assert result['tags'] == ['.source.syslog_tcp'] |
| assert result['category'] == 'syslog' |
|
|
| def test_tags_list(self): |
| tags_dict = { |
| 'tags': ['example1'] |
| } |
| result = self.key_mapping(tags_dict) |
| assert result['tags'] == ['example1'] |
|
|
| def test_details_nondict(self): |
| message = { |
| 'summary': 'example summary', |
| 'payload': 'examplepayload', |
| 'details': 'somestring', |
| } |
| result = self.key_mapping(message) |
| assert result['summary'] == 'example summary' |
| assert sorted(result['details'].keys()) == ['message', 'payload'] |
| assert result['details']['message'] == 'somestring' |
| assert result['details']['payload'] == 'examplepayload' |
|
|
| def test_no_details(self): |
| message = { |
| 'summary': 'example summary', |
| } |
| result = self.key_mapping(message) |
| assert result['details'] == {} |
|
|