Project-Red-Sword / tests /test_scanner.py
dia-gov's picture
Upload 433 files
5f491f6 verified
import unittest
import asyncio
import os
import shutil
from core.scanner import run_scan
from core.config import load_config
from core.plugin_manager import PluginManager
import logging
import re
class TestScanner(unittest.TestCase):
def setUp(self):
self.config_path = "test_config.yaml"
self.output_dir = "test_scan_reports"
self.log_dir = "test_logs"
self.create_test_config()
self.logger = logging.getLogger()
self.log_handler = TestLogHandler()
self.logger.addHandler(self.log_handler)
self.log_handler.setLevel(logging.DEBUG)
def tearDown(self):
if os.path.exists(self.output_dir):
shutil.rmtree(self.output_dir)
if os.path.exists(self.log_dir):
shutil.rmtree(self.log_dir)
if os.path.exists(self.config_path):
os.remove(self.config_path)
self.logger.removeHandler(self.log_handler)
def create_test_config(self):
test_config = {
'description': 'Test Network Security Audit',
'timeout_seconds': 3600,
'target_selection': {
'type': 'static',
'targets': ['127.0.0.1'],
},
'scan_modules': [
{
'name': 'nmap_port_scan',
'type': 'nmap',
'enabled': True,
'parameters': {
'ports': '80',
'timing_template': 3,
'scan_type': 'syn',
'os_detection': False,
'version_detection': False,
'script_scan': None
},
'post_processing': {
'report_format': 'json',
'save_to_file': 'test_nmap_scan_results.json'
}
},
{
'name': 'example_dependent_scan',
'type': 'example',
'enabled': True,
'parameters': {},
'post_processing': {
'report_format': 'txt',
'save_to_file': 'test_dependent_scan_results.txt'
}
},
{
'name': 'nikto_web_scan',
'type': 'nikto',
'enabled': True,
'parameters': {
'target_ports': '80,443',
'plugins': 'default',
'tuning': 'x0'
},
'post_processing': {
'report_format': 'txt',
'save_to_file': 'test_nikto_scan_results.txt'
}
},
{
'name': 'ssl_scan',
'type': 'sslscan',
'enabled': True,
'parameters': {
'target_ports': '443',
'ssl_protocols': 'TLSv1.2,TLSv1.3'
},
'post_processing': {
'report_format': 'csv',
'save_to_file': 'test_sslscan_results.csv'
}
}
],
'global_options': {
'log_level': 'DEBUG',
'output_directory': self.output_dir,
'concurrent_scans': 1,
'rate_limit_delay': 0.5,
'log_directory': self.log_dir
},
'security_roles': {
'red_team': {
'enabled_modules': ['nmap_port_scan', 'example_dependent_scan', 'nikto_web_scan', 'ssl_scan']
}
}
}
import yaml
with open(self.config_path, 'w') as f:
yaml.dump(test_config, f)
def test_run_scan(self):
asyncio.run(run_scan(config_path=self.config_path, role='red_team'))
self.assertTrue(os.path.exists(self.output_dir))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_nmap_scan_results.json')))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_dependent_scan_results.txt')))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_nikto_scan_results.txt')))
self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_sslscan_results.csv')))
self.assertTrue(os.path.exists(self.log_dir))
self.assertTrue(any(f.startswith('red_sword') for f in os.listdir(self.log_dir)))
log_messages = self.log_handler.messages
self.assertTrue(any(re.search(r"Nmap pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Nmap post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Example dependent pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Example dependent post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Nikto pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"Nikto post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"SSLScan pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
self.assertTrue(any(re.search(r"SSLScan post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages))
def test_plugin_dependencies(self):
plugin_manager = PluginManager()
self.assertIn('nmap_port_scan', plugin_manager.get_all_plugins())
self.assertIn('example_dependent_scan', plugin_manager.get_all_plugins())
self.assertIn('nikto_web_scan', plugin_manager.get_all_plugins())
self.assertIn('ssl_scan', plugin_manager.get_all_plugins())
def test_plugin_config_validation(self):
plugin_manager = PluginManager()
nmap_plugin = plugin_manager.get_plugin('nmap_port_scan')
with self.assertRaises(ValueError):
plugin_manager.validate_plugin_config({'ports': '80'}, nmap_plugin.config_schema)
plugin_manager.validate_plugin_config({'ports': '80', 'timing_template': 3}, nmap_plugin.config_schema)
class TestLogHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.messages = []
super().__init__(*args, **kwargs)
def emit(self, record):
self.messages.append(self.format(record))
if __name__ == '__main__':
unittest.main()