| | """
|
| | Plugin loader for dynamically loading scanner plugins.
|
| |
|
| | Supports loading plugins from:
|
| | - Built-in plugins package
|
| | - Custom plugin directories
|
| | - Module paths
|
| | """
|
| |
|
| | from __future__ import annotations
|
| |
|
| | import importlib
|
| | import importlib.util
|
| | import sys
|
| | from pathlib import Path
|
| | from typing import Dict, List, Optional, Type
|
| |
|
| | from .base import ScannerPlugin, PluginRegistry
|
| |
|
| |
|
| | class PluginLoader:
|
| | """
|
| | Load and manage scanner plugins from various sources.
|
| |
|
| | Supports:
|
| | - Loading built-in plugins from plugins.builtin package
|
| | - Loading plugins from custom directories
|
| | - Loading plugins from module paths
|
| | - Discovering plugins automatically
|
| |
|
| | Features:
|
| | - Module-level caching to avoid reloading
|
| | - Plugin metadata caching
|
| | - Efficient registry lookups
|
| | """
|
| |
|
| | def __init__(self, registry: Optional[PluginRegistry] = None):
|
| | """
|
| | Initialize loader.
|
| |
|
| | Args:
|
| | registry: PluginRegistry to load plugins into. If None, creates new registry.
|
| | """
|
| | self.registry = registry or PluginRegistry()
|
| | self._loaded_modules: Dict[str, any] = {}
|
| | self._loaded_plugins: Dict[str, ScannerPlugin] = {}
|
| | self._builtin_loaded: bool = False
|
| |
|
| | def load_builtin_plugins(self) -> Dict[str, ScannerPlugin]:
|
| | """
|
| | Load all built-in plugins from the builtin package.
|
| |
|
| | Uses caching to avoid reloading plugins that are already loaded.
|
| |
|
| | Returns:
|
| | Dict mapping plugin name -> loaded plugin instance
|
| | """
|
| |
|
| | if self._builtin_loaded and self._loaded_plugins:
|
| | print(f"[PluginLoader] Using cached builtin plugins ({len(self._loaded_plugins)} plugins)")
|
| | return self._loaded_plugins.copy()
|
| |
|
| | plugins = {}
|
| | builtin_dir = Path(__file__).parent / "builtin"
|
| |
|
| | if not builtin_dir.exists():
|
| | print(f"[PluginLoader] Builtin plugins directory not found: {builtin_dir}")
|
| | return plugins
|
| |
|
| | for plugin_file in builtin_dir.glob("*.py"):
|
| | if plugin_file.name.startswith("_"):
|
| | continue
|
| |
|
| | module_name = plugin_file.stem
|
| |
|
| |
|
| | if module_name in self._loaded_plugins:
|
| | plugins[module_name] = self._loaded_plugins[module_name]
|
| | print(f"[PluginLoader] Using cached plugin: {module_name}")
|
| | continue
|
| |
|
| | try:
|
| | plugin = self._load_plugin_from_file(
|
| | plugin_file,
|
| | f"security_gateway.plugins.builtin.{module_name}",
|
| | )
|
| | if plugin:
|
| | plugins[module_name] = plugin
|
| |
|
| | self._loaded_plugins[module_name] = plugin
|
| | print(f"[PluginLoader] Loaded builtin plugin: {module_name}")
|
| | except Exception as e:
|
| | print(f"[PluginLoader] Failed to load builtin plugin '{module_name}': {e}")
|
| |
|
| |
|
| | if plugins:
|
| | self._builtin_loaded = True
|
| |
|
| | return plugins
|
| |
|
| | def load_from_directory(self, directory: str) -> Dict[str, ScannerPlugin]:
|
| | """
|
| | Load plugins from a directory.
|
| |
|
| | Scans directory for .py files and attempts to instantiate ScannerPlugin subclasses.
|
| |
|
| | Args:
|
| | directory: Path to directory containing plugins
|
| |
|
| | Returns:
|
| | Dict mapping plugin name -> loaded plugin instance
|
| | """
|
| | plugins = {}
|
| | dir_path = Path(directory)
|
| |
|
| | if not dir_path.exists() or not dir_path.is_dir():
|
| | raise ValueError(f"Plugin directory not found: {directory}")
|
| |
|
| | for plugin_file in dir_path.glob("*.py"):
|
| | if plugin_file.name.startswith("_"):
|
| | continue
|
| |
|
| | module_name = plugin_file.stem
|
| | try:
|
| | plugin = self._load_plugin_from_file(
|
| | plugin_file,
|
| | f"custom_plugin_{module_name}",
|
| | )
|
| | if plugin:
|
| | plugins[module_name] = plugin
|
| | print(f"[PluginLoader] Loaded plugin from {directory}: {module_name}")
|
| | except Exception as e:
|
| | print(f"[PluginLoader] Failed to load plugin '{module_name}': {e}")
|
| |
|
| | return plugins
|
| |
|
| | def load_from_module(self, module_path: str) -> Optional[ScannerPlugin]:
|
| | """
|
| | Load a plugin from a module path.
|
| |
|
| | Example: "security_gateway.plugins.builtin.path_traversal"
|
| |
|
| | Args:
|
| | module_path: Full Python module path
|
| |
|
| | Returns:
|
| | Loaded ScannerPlugin instance, or None if not found
|
| | """
|
| | try:
|
| | if module_path in self._loaded_modules:
|
| | return self._loaded_modules[module_path]
|
| |
|
| | module = importlib.import_module(module_path)
|
| | plugin = self._extract_plugin_from_module(module)
|
| |
|
| | if plugin:
|
| | self._loaded_modules[module_path] = plugin
|
| | print(f"[PluginLoader] Loaded plugin from module: {module_path}")
|
| |
|
| | return plugin
|
| | except ImportError as e:
|
| | print(f"[PluginLoader] Failed to import module '{module_path}': {e}")
|
| | return None
|
| | except Exception as e:
|
| | print(f"[PluginLoader] Failed to load plugin from module '{module_path}': {e}")
|
| | return None
|
| |
|
| | def _load_plugin_from_file(self, filepath: Path, module_name: str) -> Optional[ScannerPlugin]:
|
| | """
|
| | Load a single plugin from a Python file.
|
| |
|
| | Uses module caching to avoid reloading the same module.
|
| |
|
| | Args:
|
| | filepath: Path to .py file
|
| | module_name: Name to give the loaded module
|
| |
|
| | Returns:
|
| | Loaded ScannerPlugin instance, or None if not found
|
| | """
|
| |
|
| | if module_name in self._loaded_modules:
|
| | return self._extract_plugin_from_module(self._loaded_modules[module_name])
|
| |
|
| | try:
|
| | spec = importlib.util.spec_from_file_location(module_name, filepath)
|
| | if not spec or not spec.loader:
|
| | return None
|
| |
|
| | module = importlib.util.module_from_spec(spec)
|
| | sys.modules[module_name] = module
|
| | spec.loader.exec_module(module)
|
| |
|
| |
|
| | self._loaded_modules[module_name] = module
|
| |
|
| | return self._extract_plugin_from_module(module)
|
| | except Exception as e:
|
| | print(f"[PluginLoader] Error loading file {filepath}: {e}")
|
| | return None
|
| |
|
| | def _extract_plugin_from_module(self, module: any) -> Optional[ScannerPlugin]:
|
| | """
|
| | Extract ScannerPlugin instance from a loaded module.
|
| |
|
| | Looks for:
|
| | 1. A 'plugin' attribute that is a ScannerPlugin instance
|
| | 2. A class that inherits from ScannerPlugin and instantiates it
|
| |
|
| | Args:
|
| | module: Loaded Python module
|
| |
|
| | Returns:
|
| | ScannerPlugin instance, or None if not found
|
| | """
|
| |
|
| | if hasattr(module, "plugin") and isinstance(module.plugin, ScannerPlugin):
|
| | return module.plugin
|
| |
|
| |
|
| | for attr_name in dir(module):
|
| | attr = getattr(module, attr_name)
|
| | if (
|
| | isinstance(attr, type)
|
| | and issubclass(attr, ScannerPlugin)
|
| | and attr is not ScannerPlugin
|
| | ):
|
| | try:
|
| | return attr()
|
| | except Exception as e:
|
| | print(f"[PluginLoader] Failed to instantiate {attr_name}: {e}")
|
| |
|
| | return None
|
| |
|
| | def register_plugin(self, plugin: ScannerPlugin, auto_discover: bool = True) -> bool:
|
| | """
|
| | Register a plugin with the registry.
|
| |
|
| | Args:
|
| | plugin: ScannerPlugin instance
|
| | auto_discover: If True, automatically load dependencies
|
| |
|
| | Returns:
|
| | True if registered, False if already registered
|
| | """
|
| | try:
|
| | self.registry.register(plugin)
|
| | metadata = plugin.get_metadata()
|
| | print(f"[PluginLoader] Registered plugin: {metadata.name} v{metadata.version}")
|
| | return True
|
| | except ValueError as e:
|
| | print(f"[PluginLoader] Registration failed: {e}")
|
| | return False
|
| |
|
| | def load_and_register_builtin(self) -> int:
|
| | """
|
| | Load and register all built-in plugins.
|
| |
|
| | Returns:
|
| | Number of plugins successfully loaded and registered
|
| | """
|
| | plugins = self.load_builtin_plugins()
|
| | count = 0
|
| | for plugin in plugins.values():
|
| | if self.register_plugin(plugin):
|
| | count += 1
|
| | return count
|
| |
|
| | def load_and_register_from_directory(self, directory: str) -> int:
|
| | """
|
| | Load and register plugins from a directory.
|
| |
|
| | Args:
|
| | directory: Path to plugin directory
|
| |
|
| | Returns:
|
| | Number of plugins successfully loaded and registered
|
| | """
|
| | plugins = self.load_from_directory(directory)
|
| | count = 0
|
| | for plugin in plugins.values():
|
| | if self.register_plugin(plugin):
|
| | count += 1
|
| | return count
|
| |
|
| | def get_registry(self) -> PluginRegistry:
|
| | """Get the plugin registry."""
|
| | return self.registry
|
| |
|
| | def get_cache_status(self) -> Dict[str, any]:
|
| | """
|
| | Get current cache status and statistics.
|
| |
|
| | Returns:
|
| | Dict with cache metrics
|
| | """
|
| | return {
|
| | "loaded_modules": len(self._loaded_modules),
|
| | "loaded_plugins": len(self._loaded_plugins),
|
| | "builtin_loaded": self._builtin_loaded,
|
| | "registered_plugins": len(self.registry.get_all_plugins()),
|
| | "enabled_plugins": len(self.registry.get_enabled_plugins()),
|
| | "module_names": list(self._loaded_modules.keys()),
|
| | "plugin_names": list(self._loaded_plugins.keys()),
|
| | }
|
| |
|
| | def clear_caches(self) -> None:
|
| | """
|
| | Clear all plugin and module caches.
|
| |
|
| | WARNING: This removes cached instances and forces reload on next load.
|
| | Use only for testing or plugin reload scenarios.
|
| | """
|
| | self._loaded_modules.clear()
|
| | self._loaded_plugins.clear()
|
| | self._builtin_loaded = False
|
| | print("[PluginLoader] Plugin caches cleared")
|
| |
|
| | def get_loaded_plugin_names(self) -> List[str]:
|
| | """Get list of all loaded plugin names."""
|
| | return list(self._loaded_plugins.keys())
|
| |
|
| | def is_plugin_loaded(self, plugin_name: str) -> bool:
|
| | """Check if a specific plugin is loaded."""
|
| | return plugin_name in self._loaded_plugins
|
| |
|