| |
| |
| |
| |
|
|
| import copy |
|
|
| from mq.plugins.parse_sshd import message |
|
|
| accept_message = {} |
| accept_message['utctimestamp'] = '2017-08-24T22:49:42+00:00' |
| accept_message['timestamp'] = '2017-08-24T22:49:42+00:00' |
| accept_message['receivedtimestamp'] = '2017-08-24T22:49:42+00:00' |
| accept_message['category'] = 'syslog' |
| accept_message['processid'] = '0' |
| accept_message['processname'] = 'sshd' |
| accept_message['severity'] = '7' |
| accept_message['hostname'] = 'syslog1.private.scl3.mozilla.com' |
| accept_message['mozdefhostname'] = 'mozdef4.private.scl3.mozilla.com' |
| accept_message['eventsource'] = 'systemlogs' |
| accept_message['details'] = {} |
| accept_message['details']['processid'] = '5413' |
| accept_message['details']['sourceipv4address'] = '10.22.74.208' |
| accept_message['details']['hostname'] = 'mysuperhost.somewhere.com' |
| accept_message['details']['program'] = 'sshd' |
| accept_message['details']['sourceipaddress'] = '10.22.74.208' |
| accept_message['details']['sourceport'] = '37486' |
|
|
|
|
| |
| class TestSSHDAcceptedMessageV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Accepted publickey for user1 from 10.22.74.208 port 26388 ssh2: RSA 1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
| assert retmessage['details']['rsakeyfingerprint'] == '1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac' |
| assert retmessage['details']['authmethod'] == 'publickey' |
| assert retmessage['details']['sourceport'] == '26388' |
| assert retmessage['details']['authstatus'] == 'Accepted' |
| assert retmessage['details']['sourceipaddress'] == '10.22.74.208' |
|
|
| |
|
|
|
|
| class TestSSHDAcceptedMessageV2(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Accepted publickey for user1@domainname.com from 10.22.248.134 port 52216 ssh2: RSA SHA256:1fPhSawXQzFDrJoN2uSos2nGg3wS3oGp15x8/HR+pBc' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1@domainname.com' |
| assert retmessage['details']['rsakeyfingerprint'] == 'SHA256:1fPhSawXQzFDrJoN2uSos2nGg3wS3oGp15x8/HR+pBc' |
| assert retmessage['details']['authmethod'] == 'publickey' |
| assert retmessage['details']['sourceport'] == '52216' |
| assert retmessage['details']['authstatus'] == 'Accepted' |
| assert retmessage['details']['sourceipaddress'] == '10.22.248.134' |
|
|
|
|
| |
| class TestSSHDAcceptedMessageV3(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Accepted publickey for user1@domainname.com from 10.22.74.208 port 26388 ssh2: RSA 1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1@domainname.com' |
| assert retmessage['details']['rsakeyfingerprint'] == '1f:c9:4c:90:bc:fb:72:c7:4d:02:da:07:ed:fe:07:ac' |
| assert retmessage['details']['authmethod'] == 'publickey' |
| assert retmessage['details']['sourceport'] == '26388' |
| assert retmessage['details']['authstatus'] == 'Accepted' |
| assert retmessage['details']['sourceipaddress'] == '10.22.74.208' |
|
|
|
|
| |
| class TestSSHDAcceptedMessageV4(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Accepted publickey for user1 from 10.22.74.208 port 26388 ssh2' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
| assert retmessage['details']['rsakeyfingerprint'] is None |
| assert retmessage['details']['authmethod'] == 'publickey' |
| assert retmessage['details']['sourceport'] == '26388' |
| assert retmessage['details']['authstatus'] == 'Accepted' |
| assert retmessage['details']['sourceipaddress'] == '10.22.74.208' |
|
|
|
|
| |
| class TestSSHDPAMSessionOpenedMessageV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'pam_unix(sshd:session): session opened for user user1 by (uid=0)' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
|
|
|
|
| |
| class TestSSHDPAMSessionClosedMessageV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'pam_unix(sshd:session): session closed for user user1' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
|
|
|
|
| |
| class TestSSHDPostponedMessageV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Postponed publickey for user1 from 10.22.75.209 port 37486 ssh2' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
| assert retmessage['details']['authmethod'] == 'publickey' |
|
|
|
|
| |
| class TestSSHDPostponedMessageV2(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Postponed publickey for user1 from 10.22.75.209 port 37486 ssh2 [preauth]' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
| assert retmessage['details']['authmethod'] == 'publickey' |
|
|
|
|
| |
| class TestSSHDPostponedMessageV3(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Postponed publickey for user1@somewhere.com from 10.22.74.208 port 37486 ssh2 [preauth]' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1@somewhere.com' |
| assert retmessage['details']['authmethod'] == 'publickey' |
|
|
|
|
| |
| class TestSSHDStartingSessionV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Starting session: command for user1 from 10.22.128.93 port 51748' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
| assert retmessage['details']['sessiontype'] == 'command' |
| assert retmessage['details']['sourceipaddress'] == '10.22.128.93' |
| assert retmessage['details']['sourceport'] == '51748' |
|
|
|
|
| |
| class TestSSHDStartingSessionV2(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Starting session: shell on pts/0 for user2 from 10.22.252.6 port 59983' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user2' |
| assert retmessage['details']['sessiontype'] == 'shell' |
| assert retmessage['details']['sourceipaddress'] == '10.22.252.6' |
| assert retmessage['details']['sourceport'] == '59983' |
| assert retmessage['details']['device'] == 'pts/0' |
|
|
|
|
| |
| class TestSSHDUnauthorizedUserV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Invalid user user@loftydreams.com from 10.22.75.209' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user@loftydreams.com' |
| assert retmessage['details']['sourceipaddress'] == '10.22.75.209' |
|
|
|
|
| |
| class TestSSHDUnauthorizedUsertV2(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'input_userauth_request: invalid user user1 [preauth]' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['username'] == 'user1' |
|
|
|
|
| |
| class TestSSHDReceivedDisconnectV1(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Received disconnect from 10.22.75.209: 2103: Bye Bye [preauth]' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['sourceipaddress'] == '10.22.75.209' |
| assert retmessage['details']['sourceport'] == '2103' |
|
|
|
|
| |
| class TestSSHDReceivedDisconnectV2(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Received disconnect from 10.22.75.209: 2103: Normal Shutdown, Thank you for playing [preauth]' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['sourceipaddress'] == '10.22.75.209' |
| assert retmessage['details']['sourceport'] == '2103' |
|
|
|
|
| |
| class TestSSHDReceivedDisconnectV3(): |
| def setup(self): |
|
|
| self.msgobj = message() |
| self.msg = copy.deepcopy(accept_message) |
| self.msg['summary'] = 'Received disconnect from 10.22.75.209: 2103: [preauth]' |
|
|
| def test_onMessage(self): |
| metadata = {} |
|
|
| (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) |
|
|
| assert retmessage is not None |
| assert retmeta is not None |
| assert retmessage['details']['sourceipaddress'] == '10.22.75.209' |
| assert retmessage['details']['sourceport'] == '2103' |
|
|