Spaces:
Runtime error
Runtime error
File size: 6,737 Bytes
5f491f6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | import unittest
import asyncio
import os
import shutil
from core.scanner import run_scan
from core.config import load_config
from core.plugin_manager import PluginManager
import logging
import re
class TestScanner(unittest.TestCase):
def setUp(self):
self.config_path = "test_config.yaml"
self.output_dir = "test_scan_reports"
self.log_dir = "test_logs"
self.create_test_config()
self.logger = logging.getLogger()
self.log_handler = TestLogHandler()
self.logger.addHandler(self.log_handler)
self.log_handler.setLevel(logging.DEBUG)
def tearDown(self):
if os.path.exists(self.output_dir):
shutil.rmtree(self.output_dir)
if os.path.exists(self.log_dir):
shutil.rmtree(self.log_dir)
if os.path.exists(self.config_path):
os.remove(self.config_path)
self.logger.removeHandler(self.log_handler)
def create_test_config(self):
test_config = {
'description': 'Test Network Security Audit',
'timeout_seconds': 3600,
'target_selection': {
'type': 'static',
'targets': ['127.0.0.1'],
},
'scan_modules': [
{
'name': 'nmap_port_scan',
'type': 'nmap',
'enabled': True,
'parameters': {
'ports': '80',
'timing_template': 3,
'scan_type': 'syn',
'os_detection': False,
'version_detection': False,
'script_scan': None
},
'post_processing': {
'report_format': 'json',
'save_to_file': 'test_nmap_scan_results.json'
}
},
{
'name': 'example_dependent_scan',
'type': 'example',
'enabled': True,
'parameters': {},
'post_processing': {
'report_format': 'txt',
'save_to_file': 'test_dependent_scan_results.txt'
}
},
{
'name': 'nikto_web_scan',
'type': 'nikto',
'enabled': True,
'parameters': {
'target_ports': '80,443',
'plugins': 'default',
'tuning': 'x0'
},
'post_processing': {
'report_format': 'txt',
'save_to_file': 'test_nikto_scan_results.txt'
}
},
{
'name': 'ssl_scan',
'type': 'sslscan',
'enabled': True,
'parameters': {
'target_ports': '443',
'ssl_protocols': 'TLSv1.2,TLSv1.3'
},
'post_processing': {
'report_format': 'csv',
'save_to_file': 'test_sslscan_results.csv'
}
}
],
'global_options': {
'log_level': 'DEBUG',
'output_directory': self.output_dir,
'concurrent_scans': 1,
'rate_limit_delay': 0.5,
'log_directory': self.log_dir
},
'security_roles': {
'red_team': {
'enabled_modules': ['nmap_port_scan', 'example_dependent_scan', 'nikto_web_scan', 'ssl_scan']
}
}
}
import yaml
with open(self.config_path, 'w') as f:
yaml.dump(test_config, f)
def test_run_scan(self):
asyncio.run(run_scan(config_path=self.config_path, role='red_team'))
self.assertTrue(os.path.exists(self.output_dir))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_nmap_scan_results.json')))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_dependent_scan_results.txt')))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_nikto_scan_results.txt')))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_sslscan_results.csv')))
self.assertTrue(os.path.exists(self.log_dir))
self.assertTrue(any(f.startswith('red_sword') for f in os.listdir(self.log_dir)))
log_messages = self.log_handler.messages
self.assertTrue(any(re.search(r"Nmap pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Nmap post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Example dependent pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Example dependent post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Nikto pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Nikto post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"SSLScan pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"SSLScan post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
def test_plugin_dependencies(self):
plugin_manager = PluginManager()
self.assertIn('nmap_port_scan', plugin_manager.get_all_plugins())
self.assertIn('example_dependent_scan', plugin_manager.get_all_plugins())
self.assertIn('nikto_web_scan', plugin_manager.get_all_plugins())
self.assertIn('ssl_scan', plugin_manager.get_all_plugins())
def test_plugin_config_validation(self):
plugin_manager = PluginManager()
nmap_plugin = plugin_manager.get_plugin('nmap_port_scan')
with self.assertRaises(ValueError):
plugin_manager.validate_plugin_config({'ports': '80'}, nmap_plugin.config_schema)
plugin_manager.validate_plugin_config({'ports': '80', 'timing_template': 3}, nmap_plugin.config_schema)
class TestLogHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.messages = []
super().__init__(*args, **kwargs)
def emit(self, record):
self.messages.append(self.format(record))
if __name__ == '__main__':
unittest.main()
|