import os import importlib import logging import inspect import json from jsonschema import validate, exceptions, Draft7Validator PLUGIN_DIR = "scan_modules" class PluginManager: """Manages the loading and execution of scan modules (plugins).""" def __init__(self): self.plugins = {} self.load_plugins() def load_plugins(self): """Loads all available plugins from the plugin directory.""" for filename in os.listdir(PLUGIN_DIR): if filename.endswith("_module.py"): module_name = filename[:-3] module_path = os.path.join(PLUGIN_DIR, filename) try: spec = importlib.util.spec_from_file_location(module_name, module_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) if hasattr(module, 'Plugin'): plugin_class = module.Plugin dependencies = getattr(plugin_class, 'dependencies', []) config_schema = getattr(plugin_class, 'config_schema', None) if self.check_dependencies(dependencies): plugin_instance = plugin_class() if config_schema: try: Draft7Validator.check_schema(config_schema) plugin_instance.config_schema = config_schema except exceptions.SchemaError as e: logging.error(f"Invalid config schema for plugin '{plugin_instance.name}': {e}") continue self.plugins[plugin_instance.name] = plugin_instance logging.info(f"Plugin '{plugin_instance.name}' loaded successfully.") else: logging.warning(f"Plugin '{module_name}' dependencies not met, skipping.") else: logging.warning(f"Module '{module_name}' does not have a 'Plugin' class.") except Exception as e: logging.error(f"Error loading plugin '{module_name}': {e}") def check_dependencies(self, dependencies): """Checks if all plugin dependencies are met.""" for dep in dependencies: if dep not in self.plugins: logging.warning(f"Dependency '{dep}' not found.") return False return True def get_plugin(self, plugin_name): """Returns a plugin instance by name.""" return self.plugins.get(plugin_name) def get_all_plugins(self): """Returns a dictionary of all loaded plugins.""" return self.plugins def run_plugin(self, plugin_name, target, config, output_dir): """Runs a specific plugin.""" plugin = self.get_plugin(plugin_name) if plugin: try: if hasattr(plugin, 'config_schema'): self.validate_plugin_config(config['parameters'], plugin.config_schema) plugin.run_scan(target, config, output_dir) except Exception as e: logging.error(f"Error running plugin '{plugin_name}': {e}") else: logging.warning(f"Plugin '{plugin_name}' not found.") def validate_plugin_config(self, config_params, config_schema): """Validates plugin configuration against its schema.""" try: validate(instance=config_params, schema=config_schema) except exceptions.ValidationError as e: logging.error(f"Plugin configuration validation error: {e}") raise ValueError(f"Plugin configuration validation error: {e}")