""" Plugin loader for dynamically loading scanner plugins. Supports loading plugins from: - Built-in plugins package - Custom plugin directories - Module paths """ from __future__ import annotations import importlib import importlib.util import sys from pathlib import Path from typing import Dict, List, Optional, Type from .base import ScannerPlugin, PluginRegistry class PluginLoader: """ Load and manage scanner plugins from various sources. Supports: - Loading built-in plugins from plugins.builtin package - Loading plugins from custom directories - Loading plugins from module paths - Discovering plugins automatically Features: - Module-level caching to avoid reloading - Plugin metadata caching - Efficient registry lookups """ def __init__(self, registry: Optional[PluginRegistry] = None): """ Initialize loader. Args: registry: PluginRegistry to load plugins into. If None, creates new registry. """ self.registry = registry or PluginRegistry() self._loaded_modules: Dict[str, any] = {} # Cache loaded module objects self._loaded_plugins: Dict[str, ScannerPlugin] = {} # Cache loaded plugin instances self._builtin_loaded: bool = False # Flag: builtin plugins loaded def load_builtin_plugins(self) -> Dict[str, ScannerPlugin]: """ Load all built-in plugins from the builtin package. Uses caching to avoid reloading plugins that are already loaded. Returns: Dict mapping plugin name -> loaded plugin instance """ # Return cached plugins if already loaded if self._builtin_loaded and self._loaded_plugins: print(f"[PluginLoader] Using cached builtin plugins ({len(self._loaded_plugins)} plugins)") return self._loaded_plugins.copy() plugins = {} builtin_dir = Path(__file__).parent / "builtin" if not builtin_dir.exists(): print(f"[PluginLoader] Builtin plugins directory not found: {builtin_dir}") return plugins for plugin_file in builtin_dir.glob("*.py"): if plugin_file.name.startswith("_"): continue module_name = plugin_file.stem # Check if already loaded if module_name in self._loaded_plugins: plugins[module_name] = self._loaded_plugins[module_name] print(f"[PluginLoader] Using cached plugin: {module_name}") continue try: plugin = self._load_plugin_from_file( plugin_file, f"security_gateway.plugins.builtin.{module_name}", ) if plugin: plugins[module_name] = plugin # Cache the plugin instance self._loaded_plugins[module_name] = plugin print(f"[PluginLoader] Loaded builtin plugin: {module_name}") except Exception as e: print(f"[PluginLoader] Failed to load builtin plugin '{module_name}': {e}") # Mark builtin plugins as loaded if plugins: self._builtin_loaded = True return plugins def load_from_directory(self, directory: str) -> Dict[str, ScannerPlugin]: """ Load plugins from a directory. Scans directory for .py files and attempts to instantiate ScannerPlugin subclasses. Args: directory: Path to directory containing plugins Returns: Dict mapping plugin name -> loaded plugin instance """ plugins = {} dir_path = Path(directory) if not dir_path.exists() or not dir_path.is_dir(): raise ValueError(f"Plugin directory not found: {directory}") for plugin_file in dir_path.glob("*.py"): if plugin_file.name.startswith("_"): continue module_name = plugin_file.stem try: plugin = self._load_plugin_from_file( plugin_file, f"custom_plugin_{module_name}", ) if plugin: plugins[module_name] = plugin print(f"[PluginLoader] Loaded plugin from {directory}: {module_name}") except Exception as e: print(f"[PluginLoader] Failed to load plugin '{module_name}': {e}") return plugins def load_from_module(self, module_path: str) -> Optional[ScannerPlugin]: """ Load a plugin from a module path. Example: "security_gateway.plugins.builtin.path_traversal" Args: module_path: Full Python module path Returns: Loaded ScannerPlugin instance, or None if not found """ try: if module_path in self._loaded_modules: return self._loaded_modules[module_path] module = importlib.import_module(module_path) plugin = self._extract_plugin_from_module(module) if plugin: self._loaded_modules[module_path] = plugin print(f"[PluginLoader] Loaded plugin from module: {module_path}") return plugin except ImportError as e: print(f"[PluginLoader] Failed to import module '{module_path}': {e}") return None except Exception as e: print(f"[PluginLoader] Failed to load plugin from module '{module_path}': {e}") return None def _load_plugin_from_file(self, filepath: Path, module_name: str) -> Optional[ScannerPlugin]: """ Load a single plugin from a Python file. Uses module caching to avoid reloading the same module. Args: filepath: Path to .py file module_name: Name to give the loaded module Returns: Loaded ScannerPlugin instance, or None if not found """ # Check if module already loaded and cached if module_name in self._loaded_modules: return self._extract_plugin_from_module(self._loaded_modules[module_name]) try: spec = importlib.util.spec_from_file_location(module_name, filepath) if not spec or not spec.loader: return None module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) # Cache the loaded module self._loaded_modules[module_name] = module return self._extract_plugin_from_module(module) except Exception as e: print(f"[PluginLoader] Error loading file {filepath}: {e}") return None def _extract_plugin_from_module(self, module: any) -> Optional[ScannerPlugin]: """ Extract ScannerPlugin instance from a loaded module. Looks for: 1. A 'plugin' attribute that is a ScannerPlugin instance 2. A class that inherits from ScannerPlugin and instantiates it Args: module: Loaded Python module Returns: ScannerPlugin instance, or None if not found """ # Check for 'plugin' attribute if hasattr(module, "plugin") and isinstance(module.plugin, ScannerPlugin): return module.plugin # Look for ScannerPlugin subclasses for attr_name in dir(module): attr = getattr(module, attr_name) if ( isinstance(attr, type) and issubclass(attr, ScannerPlugin) and attr is not ScannerPlugin ): try: return attr() except Exception as e: print(f"[PluginLoader] Failed to instantiate {attr_name}: {e}") return None def register_plugin(self, plugin: ScannerPlugin, auto_discover: bool = True) -> bool: """ Register a plugin with the registry. Args: plugin: ScannerPlugin instance auto_discover: If True, automatically load dependencies Returns: True if registered, False if already registered """ try: self.registry.register(plugin) metadata = plugin.get_metadata() print(f"[PluginLoader] Registered plugin: {metadata.name} v{metadata.version}") return True except ValueError as e: print(f"[PluginLoader] Registration failed: {e}") return False def load_and_register_builtin(self) -> int: """ Load and register all built-in plugins. Returns: Number of plugins successfully loaded and registered """ plugins = self.load_builtin_plugins() count = 0 for plugin in plugins.values(): if self.register_plugin(plugin): count += 1 return count def load_and_register_from_directory(self, directory: str) -> int: """ Load and register plugins from a directory. Args: directory: Path to plugin directory Returns: Number of plugins successfully loaded and registered """ plugins = self.load_from_directory(directory) count = 0 for plugin in plugins.values(): if self.register_plugin(plugin): count += 1 return count def get_registry(self) -> PluginRegistry: """Get the plugin registry.""" return self.registry def get_cache_status(self) -> Dict[str, any]: """ Get current cache status and statistics. Returns: Dict with cache metrics """ return { "loaded_modules": len(self._loaded_modules), "loaded_plugins": len(self._loaded_plugins), "builtin_loaded": self._builtin_loaded, "registered_plugins": len(self.registry.get_all_plugins()), "enabled_plugins": len(self.registry.get_enabled_plugins()), "module_names": list(self._loaded_modules.keys()), "plugin_names": list(self._loaded_plugins.keys()), } def clear_caches(self) -> None: """ Clear all plugin and module caches. WARNING: This removes cached instances and forces reload on next load. Use only for testing or plugin reload scenarios. """ self._loaded_modules.clear() self._loaded_plugins.clear() self._builtin_loaded = False print("[PluginLoader] Plugin caches cleared") def get_loaded_plugin_names(self) -> List[str]: """Get list of all loaded plugin names.""" return list(self._loaded_plugins.keys()) def is_plugin_loaded(self, plugin_name: str) -> bool: """Check if a specific plugin is loaded.""" return plugin_name in self._loaded_plugins