File size: 3,130 Bytes
b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 a7c4301 b8e5043 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | """Plugin lifecycle manager."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from loguru import logger
from plugins.registry import PluginRegistry, plugin_registry
if TYPE_CHECKING:
from plugins.base_plugin import BasePlugin
class PluginManager:
"""Manages plugin lifecycle: activation, deactivation, tool aggregation."""
def __init__(self, plugins_dir: Path, registry: PluginRegistry | None = None):
self.plugins_dir = Path(plugins_dir)
self.registry = registry or plugin_registry
self.active_plugins: dict[str, BasePlugin] = {}
async def discover_and_activate_all(self):
"""Discover all plugins and activate them."""
self.registry.discover_and_load(self.plugins_dir)
for name in self.registry.list_plugins():
await self.activate_plugin(name)
async def activate_plugin(self, plugin_name: str):
"""Activate a plugin by name."""
if plugin_name in self.active_plugins:
logger.debug(f"Plugin already active: {plugin_name}")
return
cls = self.registry.get_plugin_class(plugin_name)
if cls is None:
logger.warning(f"Plugin class not found: {plugin_name}")
return
try:
instance = cls(context={})
await instance.initialize()
self.active_plugins[plugin_name] = instance
logger.info(f"Activated plugin: {plugin_name}")
except Exception as e:
logger.error(f"Failed to activate plugin {plugin_name}: {e}")
async def deactivate_plugin(self, plugin_name: str):
"""Deactivate a plugin by name."""
if plugin_name not in self.active_plugins:
return
try:
await self.active_plugins[plugin_name].terminate()
except Exception as e:
logger.warning(f"Error terminating plugin {plugin_name}: {e}")
del self.active_plugins[plugin_name]
logger.info(f"Deactivated plugin: {plugin_name}")
def get_all_tools(self) -> list[dict]:
"""Aggregate tool definitions from all active plugins."""
tools = []
for plugin in self.active_plugins.values():
tools.extend(plugin.get_tools())
return tools
async def execute_plugin_tool(self, tool_name: str, args: dict) -> str | None:
"""Try to execute a tool from any active plugin.
Returns:
The tool result string, or None if no plugin handles this tool.
"""
for plugin in self.active_plugins.values():
for tool_def in plugin.get_tools():
if tool_def["name"] == tool_name:
return await plugin.execute_tool(tool_name, args)
return None
async def reload_plugins(self):
"""Deactivate all, re-discover, re-activate."""
names = list(self.active_plugins.keys())
for name in names:
await self.deactivate_plugin(name)
self.registry.plugin_classes.clear()
self.registry.metadata.clear()
await self.discover_and_activate_all()
|