yuki-sui's picture
Upload 169 files
ed71b0e verified
"""
Plugin loader for dynamically loading scanner plugins.
Supports loading plugins from:
- Built-in plugins package
- Custom plugin directories
- Module paths
"""
from __future__ import annotations
import importlib
import importlib.util
import sys
from pathlib import Path
from typing import Dict, List, Optional, Type
from .base import ScannerPlugin, PluginRegistry
class PluginLoader:
"""
Load and manage scanner plugins from various sources.
Supports:
- Loading built-in plugins from plugins.builtin package
- Loading plugins from custom directories
- Loading plugins from module paths
- Discovering plugins automatically
Features:
- Module-level caching to avoid reloading
- Plugin metadata caching
- Efficient registry lookups
"""
def __init__(self, registry: Optional[PluginRegistry] = None):
"""
Initialize loader.
Args:
registry: PluginRegistry to load plugins into. If None, creates new registry.
"""
self.registry = registry or PluginRegistry()
self._loaded_modules: Dict[str, any] = {} # Cache loaded module objects
self._loaded_plugins: Dict[str, ScannerPlugin] = {} # Cache loaded plugin instances
self._builtin_loaded: bool = False # Flag: builtin plugins loaded
def load_builtin_plugins(self) -> Dict[str, ScannerPlugin]:
"""
Load all built-in plugins from the builtin package.
Uses caching to avoid reloading plugins that are already loaded.
Returns:
Dict mapping plugin name -> loaded plugin instance
"""
# Return cached plugins if already loaded
if self._builtin_loaded and self._loaded_plugins:
print(f"[PluginLoader] Using cached builtin plugins ({len(self._loaded_plugins)} plugins)")
return self._loaded_plugins.copy()
plugins = {}
builtin_dir = Path(__file__).parent / "builtin"
if not builtin_dir.exists():
print(f"[PluginLoader] Builtin plugins directory not found: {builtin_dir}")
return plugins
for plugin_file in builtin_dir.glob("*.py"):
if plugin_file.name.startswith("_"):
continue
module_name = plugin_file.stem
# Check if already loaded
if module_name in self._loaded_plugins:
plugins[module_name] = self._loaded_plugins[module_name]
print(f"[PluginLoader] Using cached plugin: {module_name}")
continue
try:
plugin = self._load_plugin_from_file(
plugin_file,
f"security_gateway.plugins.builtin.{module_name}",
)
if plugin:
plugins[module_name] = plugin
# Cache the plugin instance
self._loaded_plugins[module_name] = plugin
print(f"[PluginLoader] Loaded builtin plugin: {module_name}")
except Exception as e:
print(f"[PluginLoader] Failed to load builtin plugin '{module_name}': {e}")
# Mark builtin plugins as loaded
if plugins:
self._builtin_loaded = True
return plugins
def load_from_directory(self, directory: str) -> Dict[str, ScannerPlugin]:
"""
Load plugins from a directory.
Scans directory for .py files and attempts to instantiate ScannerPlugin subclasses.
Args:
directory: Path to directory containing plugins
Returns:
Dict mapping plugin name -> loaded plugin instance
"""
plugins = {}
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
raise ValueError(f"Plugin directory not found: {directory}")
for plugin_file in dir_path.glob("*.py"):
if plugin_file.name.startswith("_"):
continue
module_name = plugin_file.stem
try:
plugin = self._load_plugin_from_file(
plugin_file,
f"custom_plugin_{module_name}",
)
if plugin:
plugins[module_name] = plugin
print(f"[PluginLoader] Loaded plugin from {directory}: {module_name}")
except Exception as e:
print(f"[PluginLoader] Failed to load plugin '{module_name}': {e}")
return plugins
def load_from_module(self, module_path: str) -> Optional[ScannerPlugin]:
"""
Load a plugin from a module path.
Example: "security_gateway.plugins.builtin.path_traversal"
Args:
module_path: Full Python module path
Returns:
Loaded ScannerPlugin instance, or None if not found
"""
try:
if module_path in self._loaded_modules:
return self._loaded_modules[module_path]
module = importlib.import_module(module_path)
plugin = self._extract_plugin_from_module(module)
if plugin:
self._loaded_modules[module_path] = plugin
print(f"[PluginLoader] Loaded plugin from module: {module_path}")
return plugin
except ImportError as e:
print(f"[PluginLoader] Failed to import module '{module_path}': {e}")
return None
except Exception as e:
print(f"[PluginLoader] Failed to load plugin from module '{module_path}': {e}")
return None
def _load_plugin_from_file(self, filepath: Path, module_name: str) -> Optional[ScannerPlugin]:
"""
Load a single plugin from a Python file.
Uses module caching to avoid reloading the same module.
Args:
filepath: Path to .py file
module_name: Name to give the loaded module
Returns:
Loaded ScannerPlugin instance, or None if not found
"""
# Check if module already loaded and cached
if module_name in self._loaded_modules:
return self._extract_plugin_from_module(self._loaded_modules[module_name])
try:
spec = importlib.util.spec_from_file_location(module_name, filepath)
if not spec or not spec.loader:
return None
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
# Cache the loaded module
self._loaded_modules[module_name] = module
return self._extract_plugin_from_module(module)
except Exception as e:
print(f"[PluginLoader] Error loading file {filepath}: {e}")
return None
def _extract_plugin_from_module(self, module: any) -> Optional[ScannerPlugin]:
"""
Extract ScannerPlugin instance from a loaded module.
Looks for:
1. A 'plugin' attribute that is a ScannerPlugin instance
2. A class that inherits from ScannerPlugin and instantiates it
Args:
module: Loaded Python module
Returns:
ScannerPlugin instance, or None if not found
"""
# Check for 'plugin' attribute
if hasattr(module, "plugin") and isinstance(module.plugin, ScannerPlugin):
return module.plugin
# Look for ScannerPlugin subclasses
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, ScannerPlugin)
and attr is not ScannerPlugin
):
try:
return attr()
except Exception as e:
print(f"[PluginLoader] Failed to instantiate {attr_name}: {e}")
return None
def register_plugin(self, plugin: ScannerPlugin, auto_discover: bool = True) -> bool:
"""
Register a plugin with the registry.
Args:
plugin: ScannerPlugin instance
auto_discover: If True, automatically load dependencies
Returns:
True if registered, False if already registered
"""
try:
self.registry.register(plugin)
metadata = plugin.get_metadata()
print(f"[PluginLoader] Registered plugin: {metadata.name} v{metadata.version}")
return True
except ValueError as e:
print(f"[PluginLoader] Registration failed: {e}")
return False
def load_and_register_builtin(self) -> int:
"""
Load and register all built-in plugins.
Returns:
Number of plugins successfully loaded and registered
"""
plugins = self.load_builtin_plugins()
count = 0
for plugin in plugins.values():
if self.register_plugin(plugin):
count += 1
return count
def load_and_register_from_directory(self, directory: str) -> int:
"""
Load and register plugins from a directory.
Args:
directory: Path to plugin directory
Returns:
Number of plugins successfully loaded and registered
"""
plugins = self.load_from_directory(directory)
count = 0
for plugin in plugins.values():
if self.register_plugin(plugin):
count += 1
return count
def get_registry(self) -> PluginRegistry:
"""Get the plugin registry."""
return self.registry
def get_cache_status(self) -> Dict[str, any]:
"""
Get current cache status and statistics.
Returns:
Dict with cache metrics
"""
return {
"loaded_modules": len(self._loaded_modules),
"loaded_plugins": len(self._loaded_plugins),
"builtin_loaded": self._builtin_loaded,
"registered_plugins": len(self.registry.get_all_plugins()),
"enabled_plugins": len(self.registry.get_enabled_plugins()),
"module_names": list(self._loaded_modules.keys()),
"plugin_names": list(self._loaded_plugins.keys()),
}
def clear_caches(self) -> None:
"""
Clear all plugin and module caches.
WARNING: This removes cached instances and forces reload on next load.
Use only for testing or plugin reload scenarios.
"""
self._loaded_modules.clear()
self._loaded_plugins.clear()
self._builtin_loaded = False
print("[PluginLoader] Plugin caches cleared")
def get_loaded_plugin_names(self) -> List[str]:
"""Get list of all loaded plugin names."""
return list(self._loaded_plugins.keys())
def is_plugin_loaded(self, plugin_name: str) -> bool:
"""Check if a specific plugin is loaded."""
return plugin_name in self._loaded_plugins