Spaces:
Runtime error
Runtime error
| import unittest | |
| import asyncio | |
| import os | |
| import shutil | |
| from core.scanner import run_scan | |
| from core.config import load_config | |
| from core.plugin_manager import PluginManager | |
| import logging | |
| import re | |
| class TestScanner(unittest.TestCase): | |
| def setUp(self): | |
| self.config_path = "test_config.yaml" | |
| self.output_dir = "test_scan_reports" | |
| self.log_dir = "test_logs" | |
| self.create_test_config() | |
| self.logger = logging.getLogger() | |
| self.log_handler = TestLogHandler() | |
| self.logger.addHandler(self.log_handler) | |
| self.log_handler.setLevel(logging.DEBUG) | |
| def tearDown(self): | |
| if os.path.exists(self.output_dir): | |
| shutil.rmtree(self.output_dir) | |
| if os.path.exists(self.log_dir): | |
| shutil.rmtree(self.log_dir) | |
| if os.path.exists(self.config_path): | |
| os.remove(self.config_path) | |
| self.logger.removeHandler(self.log_handler) | |
| def create_test_config(self): | |
| test_config = { | |
| 'description': 'Test Network Security Audit', | |
| 'timeout_seconds': 3600, | |
| 'target_selection': { | |
| 'type': 'static', | |
| 'targets': ['127.0.0.1'], | |
| }, | |
| 'scan_modules': [ | |
| { | |
| 'name': 'nmap_port_scan', | |
| 'type': 'nmap', | |
| 'enabled': True, | |
| 'parameters': { | |
| 'ports': '80', | |
| 'timing_template': 3, | |
| 'scan_type': 'syn', | |
| 'os_detection': False, | |
| 'version_detection': False, | |
| 'script_scan': None | |
| }, | |
| 'post_processing': { | |
| 'report_format': 'json', | |
| 'save_to_file': 'test_nmap_scan_results.json' | |
| } | |
| }, | |
| { | |
| 'name': 'example_dependent_scan', | |
| 'type': 'example', | |
| 'enabled': True, | |
| 'parameters': {}, | |
| 'post_processing': { | |
| 'report_format': 'txt', | |
| 'save_to_file': 'test_dependent_scan_results.txt' | |
| } | |
| }, | |
| { | |
| 'name': 'nikto_web_scan', | |
| 'type': 'nikto', | |
| 'enabled': True, | |
| 'parameters': { | |
| 'target_ports': '80,443', | |
| 'plugins': 'default', | |
| 'tuning': 'x0' | |
| }, | |
| 'post_processing': { | |
| 'report_format': 'txt', | |
| 'save_to_file': 'test_nikto_scan_results.txt' | |
| } | |
| }, | |
| { | |
| 'name': 'ssl_scan', | |
| 'type': 'sslscan', | |
| 'enabled': True, | |
| 'parameters': { | |
| 'target_ports': '443', | |
| 'ssl_protocols': 'TLSv1.2,TLSv1.3' | |
| }, | |
| 'post_processing': { | |
| 'report_format': 'csv', | |
| 'save_to_file': 'test_sslscan_results.csv' | |
| } | |
| } | |
| ], | |
| 'global_options': { | |
| 'log_level': 'DEBUG', | |
| 'output_directory': self.output_dir, | |
| 'concurrent_scans': 1, | |
| 'rate_limit_delay': 0.5, | |
| 'log_directory': self.log_dir | |
| }, | |
| 'security_roles': { | |
| 'red_team': { | |
| 'enabled_modules': ['nmap_port_scan', 'example_dependent_scan', 'nikto_web_scan', 'ssl_scan'] | |
| } | |
| } | |
| } | |
| import yaml | |
| with open(self.config_path, 'w') as f: | |
| yaml.dump(test_config, f) | |
| def test_run_scan(self): | |
| asyncio.run(run_scan(config_path=self.config_path, role='red_team')) | |
| self.assertTrue(os.path.exists(self.output_dir)) | |
| self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_nmap_scan_results.json'))) | |
| self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_dependent_scan_results.txt'))) | |
| self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_nikto_scan_results.txt'))) | |
| self.assertTrue(os.path.exists(os.path.join(self.output_dir, 'test_sslscan_results.csv'))) | |
| self.assertTrue(os.path.exists(self.log_dir)) | |
| self.assertTrue(any(f.startswith('red_sword') for f in os.listdir(self.log_dir))) | |
| log_messages = self.log_handler.messages | |
| self.assertTrue(any(re.search(r"Nmap pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"Nmap post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"Example dependent pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"Example dependent post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"Nikto pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"Nikto post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"SSLScan pre-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| self.assertTrue(any(re.search(r"SSLScan post-scan hook executed for 127\.0\.0\.1", msg) for msg in log_messages)) | |
| def test_plugin_dependencies(self): | |
| plugin_manager = PluginManager() | |
| self.assertIn('nmap_port_scan', plugin_manager.get_all_plugins()) | |
| self.assertIn('example_dependent_scan', plugin_manager.get_all_plugins()) | |
| self.assertIn('nikto_web_scan', plugin_manager.get_all_plugins()) | |
| self.assertIn('ssl_scan', plugin_manager.get_all_plugins()) | |
| def test_plugin_config_validation(self): | |
| plugin_manager = PluginManager() | |
| nmap_plugin = plugin_manager.get_plugin('nmap_port_scan') | |
| with self.assertRaises(ValueError): | |
| plugin_manager.validate_plugin_config({'ports': '80'}, nmap_plugin.config_schema) | |
| plugin_manager.validate_plugin_config({'ports': '80', 'timing_template': 3}, nmap_plugin.config_schema) | |
| class TestLogHandler(logging.Handler): | |
| def __init__(self, *args, **kwargs): | |
| self.messages = [] | |
| super().__init__(*args, **kwargs) | |
| def emit(self, record): | |
| self.messages.append(self.format(record)) | |
| if __name__ == '__main__': | |
| unittest.main() | |