File size: 3,130 Bytes
b8e5043
a7c4301
b8e5043
a7c4301
 
b8e5043
a7c4301
b8e5043
a7c4301
b8e5043
a7c4301
b8e5043
 
a7c4301
 
 
b8e5043
a7c4301
b8e5043
a7c4301
b8e5043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a7c4301
 
b8e5043
a7c4301
b8e5043
 
a7c4301
b8e5043
a7c4301
b8e5043
 
 
 
a7c4301
 
b8e5043
a7c4301
b8e5043
a7c4301
b8e5043
 
a7c4301
 
b8e5043
a7c4301
b8e5043
 
a7c4301
 
b8e5043
 
a7c4301
 
b8e5043
a7c4301
b8e5043
 
 
 
a7c4301
 
b8e5043
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Plugin lifecycle manager."""

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

from loguru import logger

from plugins.registry import PluginRegistry, plugin_registry

if TYPE_CHECKING:
    from plugins.base_plugin import BasePlugin


class PluginManager:
    """Manages plugin lifecycle: activation, deactivation, tool aggregation."""

    def __init__(self, plugins_dir: Path, registry: PluginRegistry | None = None):
        self.plugins_dir = Path(plugins_dir)
        self.registry = registry or plugin_registry
        self.active_plugins: dict[str, BasePlugin] = {}

    async def discover_and_activate_all(self):
        """Discover all plugins and activate them."""
        self.registry.discover_and_load(self.plugins_dir)
        for name in self.registry.list_plugins():
            await self.activate_plugin(name)

    async def activate_plugin(self, plugin_name: str):
        """Activate a plugin by name."""
        if plugin_name in self.active_plugins:
            logger.debug(f"Plugin already active: {plugin_name}")
            return

        cls = self.registry.get_plugin_class(plugin_name)
        if cls is None:
            logger.warning(f"Plugin class not found: {plugin_name}")
            return

        try:
            instance = cls(context={})
            await instance.initialize()
            self.active_plugins[plugin_name] = instance
            logger.info(f"Activated plugin: {plugin_name}")
        except Exception as e:
            logger.error(f"Failed to activate plugin {plugin_name}: {e}")

    async def deactivate_plugin(self, plugin_name: str):
        """Deactivate a plugin by name."""
        if plugin_name not in self.active_plugins:
            return

        try:
            await self.active_plugins[plugin_name].terminate()
        except Exception as e:
            logger.warning(f"Error terminating plugin {plugin_name}: {e}")

        del self.active_plugins[plugin_name]
        logger.info(f"Deactivated plugin: {plugin_name}")

    def get_all_tools(self) -> list[dict]:
        """Aggregate tool definitions from all active plugins."""
        tools = []
        for plugin in self.active_plugins.values():
            tools.extend(plugin.get_tools())
        return tools

    async def execute_plugin_tool(self, tool_name: str, args: dict) -> str | None:
        """Try to execute a tool from any active plugin.

        Returns:
            The tool result string, or None if no plugin handles this tool.
        """
        for plugin in self.active_plugins.values():
            for tool_def in plugin.get_tools():
                if tool_def["name"] == tool_name:
                    return await plugin.execute_tool(tool_name, args)
        return None

    async def reload_plugins(self):
        """Deactivate all, re-discover, re-activate."""
        names = list(self.active_plugins.keys())
        for name in names:
            await self.deactivate_plugin(name)
        self.registry.plugin_classes.clear()
        self.registry.metadata.clear()
        await self.discover_and_activate_all()