File size: 9,489 Bytes
db78256 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
import importlib
import importlib.util
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from pyrogram import Client
from pyrogram.handlers import CallbackQueryHandler, MessageHandler
from .. import LOGGER
from ..helper.telegram_helper.filters import CustomFilters
@dataclass
class PluginInfo:
name: str
version: str
author: str
description: str
enabled: bool = True
handlers: List[Any] = field(default_factory=list)
commands: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
class PluginBase:
PLUGIN_INFO: PluginInfo
async def on_load(self) -> bool:
return True
async def on_unload(self) -> bool:
return True
async def on_enable(self) -> bool:
return True
async def on_disable(self) -> bool:
return True
def register_command(self, command: str, handler_func, filters=None):
if filters is None:
filters = CustomFilters.authorized
return MessageHandler(handler_func, filters=command & filters)
def register_callback(self, pattern: str, callback_func, filters=None):
if filters is None:
filters = CustomFilters.authorized
return CallbackQueryHandler(callback_func, filters=pattern & filters)
class PluginManager:
def __init__(self, bot: Client):
self.bot = bot
self.plugins: Dict[str, PluginInfo] = {}
self.loaded_modules: Dict[str, Any] = {}
self.plugins_dir = Path("plugins")
self.plugins_dir.mkdir(exist_ok=True)
def discover_plugins(self) -> List[str]:
plugin_files = []
for file in self.plugins_dir.glob("*.py"):
if file.name.startswith("__"):
continue
plugin_files.append(file.stem)
return plugin_files
def _refresh_commands(self):
try:
from ..helper.telegram_helper.bot_commands import BotCommands
from ..helper.ext_utils.help_messages import (
get_bot_commands,
get_help_string,
)
import importlib
BotCommands.refresh_commands()
importlib.reload(sys.modules["bot.helper.ext_utils.help_messages"])
from ..helper.ext_utils.help_messages import BOT_COMMANDS, help_string
globals()["BOT_COMMANDS"] = get_bot_commands()
globals()["help_string"] = get_help_string()
LOGGER.info("Bot commands and help refreshed")
except Exception as e:
LOGGER.error(f"Error refreshing commands: {e}", exc_info=True)
async def load_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name in self.loaded_modules:
LOGGER.warning(f"Plugin {plugin_name} already loaded")
return False
plugin_path = self.plugins_dir / f"{plugin_name}.py"
if not plugin_path.exists():
LOGGER.error(f"Plugin file {plugin_name}.py not found")
return False
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
module = importlib.util.module_from_spec(spec)
sys.modules[plugin_name] = module
spec.loader.exec_module(module)
plugin_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, PluginBase)
and attr != PluginBase
):
plugin_class = attr
break
if not plugin_class:
LOGGER.error(f"No valid plugin class found in {plugin_name}")
return False
plugin_instance = plugin_class()
if not hasattr(plugin_instance, "PLUGIN_INFO"):
LOGGER.error(f"Plugin {plugin_name} missing PLUGIN_INFO")
return False
plugin_info = plugin_instance.PLUGIN_INFO
if await plugin_instance.on_load():
self.plugins[plugin_name] = plugin_info
self.loaded_modules[plugin_name] = plugin_instance
self._register_handlers(plugin_instance, plugin_info)
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} loaded successfully")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to load")
return False
except Exception as e:
LOGGER.error(f"Error loading plugin {plugin_name}: {e}", exc_info=True)
return False
async def unload_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name not in self.loaded_modules:
LOGGER.error(f"Plugin {plugin_name} not loaded")
return False
plugin_instance = self.loaded_modules[plugin_name]
plugin_info = self.plugins[plugin_name]
if await plugin_instance.on_unload():
self._unregister_handlers(plugin_info)
del self.loaded_modules[plugin_name]
del self.plugins[plugin_name]
if plugin_name in sys.modules:
del sys.modules[plugin_name]
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} unloaded successfully")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to unload")
return False
except Exception as e:
LOGGER.error(f"Error unloading plugin {plugin_name}: {e}", exc_info=True)
return False
async def reload_plugin(self, plugin_name: str) -> bool:
try:
await self.unload_plugin(plugin_name)
return await self.load_plugin(plugin_name)
except Exception as e:
LOGGER.error(f"Error reloading plugin {plugin_name}: {e}", exc_info=True)
return False
async def enable_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name not in self.plugins:
LOGGER.error(f"Plugin {plugin_name} not found")
return False
plugin_instance = self.loaded_modules[plugin_name]
if await plugin_instance.on_enable():
self.plugins[plugin_name].enabled = True
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} enabled")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to enable")
return False
except Exception as e:
LOGGER.error(f"Error enabling plugin {plugin_name}: {e}", exc_info=True)
return False
async def disable_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name not in self.plugins:
LOGGER.error(f"Plugin {plugin_name} not found")
return False
plugin_instance = self.loaded_modules[plugin_name]
if await plugin_instance.on_disable():
self.plugins[plugin_name].enabled = False
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} disabled")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to disable")
return False
except Exception as e:
LOGGER.error(f"Error disabling plugin {plugin_name}: {e}", exc_info=True)
return False
def list_plugins(self) -> List[PluginInfo]:
return list(self.plugins.values())
def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]:
return self.plugins.get(plugin_name)
def _register_handlers(self, plugin_instance: PluginBase, plugin_info: PluginInfo):
from ..helper.telegram_helper.filters import CustomFilters
from pyrogram.filters import command
from pyrogram.handlers import MessageHandler
for handler in plugin_info.handlers:
self.bot.add_handler(handler)
module = sys.modules.get(plugin_info.name)
if module:
for attr_name in dir(module):
attr = getattr(module, attr_name)
if callable(attr) and attr_name.endswith("_command"):
cmd_name = attr_name.replace("_command", "")
if cmd_name in plugin_info.commands:
handler = MessageHandler(
attr,
filters=command(cmd_name, case_sensitive=True)
& CustomFilters.authorized,
)
plugin_info.handlers.append(handler)
self.bot.add_handler(handler)
LOGGER.info(
f"Registered command /{cmd_name} for plugin {plugin_info.name}"
)
def _unregister_handlers(self, plugin_info: PluginInfo):
for handler in plugin_info.handlers:
try:
self.bot.remove_handler(handler)
except Exception as e:
LOGGER.warning(f"Error removing handler: {e}")
plugin_manager = PluginManager(None)
def get_plugin_manager() -> PluginManager:
return plugin_manager
|