Mozdef / tests /mq /plugins /test_parse_sshd.py
ineso22's picture
Upload folder using huggingface_hub
7c89ed7 verified
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
import copy
from mq.plugins.parse_sshd import message
accept_message = {}
accept_message['utctimestamp'] = '2017-08-24T22:49:42+00:00'
accept_message['timestamp'] = '2017-08-24T22:49:42+00:00'
accept_message['receivedtimestamp'] = '2017-08-24T22:49:42+00:00'
accept_message['category'] = 'syslog'
accept_message['processid'] = '0'
accept_message['processname'] = 'sshd'
accept_message['severity'] = '7'
accept_message['hostname'] = 'syslog1.private.scl3.mozilla.com'
accept_message['mozdefhostname'] = 'mozdef4.private.scl3.mozilla.com'
accept_message['eventsource'] = 'systemlogs'
accept_message['details'] = {}
accept_message['details']['processid'] = '5413'
accept_message['details']['sourceipv4address'] = '10.22.74.208'
accept_message['details']['hostname'] = 'mysuperhost.somewhere.com'
accept_message['details']['program'] = 'sshd'
accept_message['details']['sourceipaddress'] = '10.22.74.208'
accept_message['details']['sourceport'] = '37486'
# Short username, RSA fpr present
class TestSSHDAcceptedMessageV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Accepted publickey for user1 from 10.22.74.208 port 26388 ssh2: RSA 1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
assert retmessage['details']['rsakeyfingerprint'] == '1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac'
assert retmessage['details']['authmethod'] == 'publickey'
assert retmessage['details']['sourceport'] == '26388'
assert retmessage['details']['authstatus'] == 'Accepted'
assert retmessage['details']['sourceipaddress'] == '10.22.74.208'
# Long Username and SHA256 fpr present
class TestSSHDAcceptedMessageV2():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Accepted publickey for user1@domainname.com from 10.22.248.134 port 52216 ssh2: RSA SHA256:1fPhSawXQzFDrJoN2uSos2nGg3wS3oGp15x8/HR+pBc'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1@domainname.com'
assert retmessage['details']['rsakeyfingerprint'] == 'SHA256:1fPhSawXQzFDrJoN2uSos2nGg3wS3oGp15x8/HR+pBc'
assert retmessage['details']['authmethod'] == 'publickey'
assert retmessage['details']['sourceport'] == '52216'
assert retmessage['details']['authstatus'] == 'Accepted'
assert retmessage['details']['sourceipaddress'] == '10.22.248.134'
# Long username
class TestSSHDAcceptedMessageV3():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Accepted publickey for user1@domainname.com from 10.22.74.208 port 26388 ssh2: RSA 1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1@domainname.com'
assert retmessage['details']['rsakeyfingerprint'] == '1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac'
assert retmessage['details']['authmethod'] == 'publickey'
assert retmessage['details']['sourceport'] == '26388'
assert retmessage['details']['authstatus'] == 'Accepted'
assert retmessage['details']['sourceipaddress'] == '10.22.74.208'
# Short username, RSA fpr missing
class TestSSHDAcceptedMessageV4():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Accepted publickey for user1 from 10.22.74.208 port 26388 ssh2'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
assert retmessage['details']['rsakeyfingerprint'] is None
assert retmessage['details']['authmethod'] == 'publickey'
assert retmessage['details']['sourceport'] == '26388'
assert retmessage['details']['authstatus'] == 'Accepted'
assert retmessage['details']['sourceipaddress'] == '10.22.74.208'
# PAM session opened for user
class TestSSHDPAMSessionOpenedMessageV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'pam_unix(sshd:session): session opened for user user1 by (uid=0)'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
# PAM session closed for user
class TestSSHDPAMSessionClosedMessageV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'pam_unix(sshd:session): session closed for user user1'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
# Postponed preauth - short, simple username
class TestSSHDPostponedMessageV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Postponed publickey for user1 from 10.22.75.209 port 37486 ssh2'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
assert retmessage['details']['authmethod'] == 'publickey'
# Postponed preauth - long, simple username
class TestSSHDPostponedMessageV2():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Postponed publickey for user1 from 10.22.75.209 port 37486 ssh2 [preauth]'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
assert retmessage['details']['authmethod'] == 'publickey'
# Postponed preauth - long username
class TestSSHDPostponedMessageV3():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Postponed publickey for user1@somewhere.com from 10.22.74.208 port 37486 ssh2 [preauth]'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1@somewhere.com'
assert retmessage['details']['authmethod'] == 'publickey'
# Starting session
class TestSSHDStartingSessionV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Starting session: command for user1 from 10.22.128.93 port 51748'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
assert retmessage['details']['sessiontype'] == 'command'
assert retmessage['details']['sourceipaddress'] == '10.22.128.93'
assert retmessage['details']['sourceport'] == '51748'
# Starting session
class TestSSHDStartingSessionV2():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Starting session: shell on pts/0 for user2 from 10.22.252.6 port 59983'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user2'
assert retmessage['details']['sessiontype'] == 'shell'
assert retmessage['details']['sourceipaddress'] == '10.22.252.6'
assert retmessage['details']['sourceport'] == '59983'
assert retmessage['details']['device'] == 'pts/0'
# Invalid User - complex username
class TestSSHDUnauthorizedUserV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Invalid user user@loftydreams.com from 10.22.75.209'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user@loftydreams.com'
assert retmessage['details']['sourceipaddress'] == '10.22.75.209'
# Input Userauth Request
class TestSSHDUnauthorizedUsertV2():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'input_userauth_request: invalid user user1 [preauth]'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['username'] == 'user1'
# SSH Disconnect - Bye Bye
class TestSSHDReceivedDisconnectV1():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Received disconnect from 10.22.75.209: 2103: Bye Bye [preauth]'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['sourceipaddress'] == '10.22.75.209'
assert retmessage['details']['sourceport'] == '2103'
# SSH Disconnect - normal shutdown
class TestSSHDReceivedDisconnectV2():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Received disconnect from 10.22.75.209: 2103: Normal Shutdown, Thank you for playing [preauth]'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['sourceipaddress'] == '10.22.75.209'
assert retmessage['details']['sourceport'] == '2103'
# SSH Disconnect
class TestSSHDReceivedDisconnectV3():
def setup(self):
self.msgobj = message()
self.msg = copy.deepcopy(accept_message)
self.msg['summary'] = 'Received disconnect from 10.22.75.209: 2103: [preauth]'
def test_onMessage(self):
metadata = {}
(retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata)
assert retmessage is not None
assert retmeta is not None
assert retmessage['details']['sourceipaddress'] == '10.22.75.209'
assert retmessage['details']['sourceport'] == '2103'