from mq.plugins.vulnerability import message class TestVulnerabilityMessageV1(): def setup(self): self.msgobj = message() self.msg = {} self.msg['description'] = 'system vulnerability management automation' self.msg['utctimestamp'] = '2015-01-21T15:33:51.136378+00:00' self.msg['sourcename'] = 'development' self.msg['asset'] = {} self.msg['asset']['assetid'] = 23 self.msg['asset']['ipv4address'] = '1.2.3.4' self.msg['asset']['macaddress'] = '' self.msg['asset']['hostname'] = 'hostname.mozilla.com' self.msg['vuln'] = {} self.msg['vuln']['status'] = 'new' self.msg['vuln']['vulnid'] = 'nexpose:43883' self.msg['vuln']['title'] = 'RHSA-2013:1475: postgresql and postgresql84 security update' self.msg['vuln']['discovery_time'] = 1421845863 self.msg['vuln']['age_days'] = 32.7 self.msg['vuln']['known_malware'] = False self.msg['vuln']['known_exploits'] = False self.msg['vuln']['cvss'] = 8.5 self.msg['vuln']['cves'] = ['CVE-2013-022', 'CVE-2013-1900'] def test_onMessage(self): metadata = {} self.msg['type'] = 'vulnerability' (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) assert retmessage is not None assert retmeta is not None assert retmeta['id'] == '793fd2bebd558dd8e358d8e80dd8cdc2' def test_calculate_id(self): self.msgobj.get_handler(self.msg).calculate_id(self.msg) == '793fd2bebd558dd8e358d8e80dd8cdc2' def test_validate_correct(self): assert self.msgobj.get_handler(self.msg).validate(self.msg) is True def test_validate_incorrect(self): del self.msg['utctimestamp'] assert self.msgobj.get_handler(self.msg).validate(self.msg) is False def test_validate_incorrect_vuln(self): del self.msg['vuln']['age_days'] assert self.msgobj.get_handler(self.msg).validate(self.msg) is False class TestVulnerabilityMessageV2(): def setup(self): self.msgobj = message() self.msg = {} self.msg['scan_start'] = '2016-11-21T19:18:28+00:00' self.msg['scan_end'] = '2016-11-21T19:20:31+00:00' self.msg['description'] = 'scanapi runscan mozdef emitter' self.msg['utctimestamp'] = '2016-11-21T22:18:31.399746+00:00' self.msg['zone'] = 'scl3' self.msg['sourcename'] = 'scanapi' self.msg['version'] = 2 self.msg['vulnerabilities'] = [ { 'name': 'RHEL 6 : kernel (RHSA-2016:2006)', 'vulnerable_packages': [ 'kernel-2.6.32-642.4.2.el6', 'kernel-devel-2.6.32-642.4.2.el6', 'kernel-firmware-2.6.32-642.4.2.el6', 'kernel-headers-2.6.32-642.4.2.el6' ], 'output': '\nRemote package installed : kernel-2.6.32-642.4.2.el6\n' + 'Should be : kernel-2.6.32-642.6.1.el6\n\n' + 'Remote package installed : kernel-devel-2.6.32-642.4.2.el6\n' + 'Should be : kernel-devel-2.6.32-642.6.1.el6\n' + '\nRemote package installed : kernel-firmware-2.6.32-642.4.2.el6\n' + 'Should be : kernel-firmware-2.6.32-642.6.1.el6\n\n' + 'Remote package installed : kernel-headers-2.6.32-642.4.2.el6\n' + 'Should be : kernel-headers-2.6.32-642.6.1.el6\n\n', 'cve': 'CVE-2016-4470', 'cvss': 7.2, 'risk': 'high' } ] self.msg['asset'] = { 'os': 'Linux Kernel 2.6.32-642.4.2.el6.x86_64 on Red Hat Enterprise Linux Server release 6.8 (Santiago)', 'hostname': 'hostname.mozilla.com', 'ipaddress': '1.2.3.4' } def test_onMessage(self): metadata = {} self.msg['type'] = 'vulnerability' (retmessage, retmeta) = self.msgobj.onMessage(self.msg, metadata) assert retmessage is not None assert retmeta is not None assert retmeta['id'] == 'dc057651c89b7064ae9d8b140ab12d40'