import importlib import importlib.util import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from pyrogram import Client from pyrogram.handlers import CallbackQueryHandler, MessageHandler from .. import LOGGER from ..helper.telegram_helper.filters import CustomFilters @dataclass class PluginInfo: name: str version: str author: str description: str enabled: bool = True handlers: List[Any] = field(default_factory=list) commands: List[str] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) class PluginBase: PLUGIN_INFO: PluginInfo async def on_load(self) -> bool: return True async def on_unload(self) -> bool: return True async def on_enable(self) -> bool: return True async def on_disable(self) -> bool: return True def register_command(self, command: str, handler_func, filters=None): if filters is None: filters = CustomFilters.authorized return MessageHandler(handler_func, filters=command & filters) def register_callback(self, pattern: str, callback_func, filters=None): if filters is None: filters = CustomFilters.authorized return CallbackQueryHandler(callback_func, filters=pattern & filters) class PluginManager: def __init__(self, bot: Client): self.bot = bot self.plugins: Dict[str, PluginInfo] = {} self.loaded_modules: Dict[str, Any] = {} self.plugins_dir = Path("plugins") self.plugins_dir.mkdir(exist_ok=True) def discover_plugins(self) -> List[str]: plugin_files = [] for file in self.plugins_dir.glob("*.py"): if file.name.startswith("__"): continue plugin_files.append(file.stem) return plugin_files def _refresh_commands(self): try: from ..helper.telegram_helper.bot_commands import BotCommands from ..helper.ext_utils.help_messages import ( get_bot_commands, get_help_string, ) import importlib BotCommands.refresh_commands() importlib.reload(sys.modules["bot.helper.ext_utils.help_messages"]) from ..helper.ext_utils.help_messages import BOT_COMMANDS, help_string globals()["BOT_COMMANDS"] = get_bot_commands() globals()["help_string"] = get_help_string() LOGGER.info("Bot commands and help refreshed") except Exception as e: LOGGER.error(f"Error refreshing commands: {e}", exc_info=True) async def load_plugin(self, plugin_name: str) -> bool: try: if plugin_name in self.loaded_modules: LOGGER.warning(f"Plugin {plugin_name} already loaded") return False plugin_path = self.plugins_dir / f"{plugin_name}.py" if not plugin_path.exists(): LOGGER.error(f"Plugin file {plugin_name}.py not found") return False spec = importlib.util.spec_from_file_location(plugin_name, plugin_path) module = importlib.util.module_from_spec(spec) sys.modules[plugin_name] = module spec.loader.exec_module(module) plugin_class = None for attr_name in dir(module): attr = getattr(module, attr_name) if ( isinstance(attr, type) and issubclass(attr, PluginBase) and attr != PluginBase ): plugin_class = attr break if not plugin_class: LOGGER.error(f"No valid plugin class found in {plugin_name}") return False plugin_instance = plugin_class() if not hasattr(plugin_instance, "PLUGIN_INFO"): LOGGER.error(f"Plugin {plugin_name} missing PLUGIN_INFO") return False plugin_info = plugin_instance.PLUGIN_INFO if await plugin_instance.on_load(): self.plugins[plugin_name] = plugin_info self.loaded_modules[plugin_name] = plugin_instance self._register_handlers(plugin_instance, plugin_info) self._refresh_commands() LOGGER.info(f"Plugin {plugin_name} loaded successfully") return True else: LOGGER.error(f"Plugin {plugin_name} failed to load") return False except Exception as e: LOGGER.error(f"Error loading plugin {plugin_name}: {e}", exc_info=True) return False async def unload_plugin(self, plugin_name: str) -> bool: try: if plugin_name not in self.loaded_modules: LOGGER.error(f"Plugin {plugin_name} not loaded") return False plugin_instance = self.loaded_modules[plugin_name] plugin_info = self.plugins[plugin_name] if await plugin_instance.on_unload(): self._unregister_handlers(plugin_info) del self.loaded_modules[plugin_name] del self.plugins[plugin_name] if plugin_name in sys.modules: del sys.modules[plugin_name] self._refresh_commands() LOGGER.info(f"Plugin {plugin_name} unloaded successfully") return True else: LOGGER.error(f"Plugin {plugin_name} failed to unload") return False except Exception as e: LOGGER.error(f"Error unloading plugin {plugin_name}: {e}", exc_info=True) return False async def reload_plugin(self, plugin_name: str) -> bool: try: await self.unload_plugin(plugin_name) return await self.load_plugin(plugin_name) except Exception as e: LOGGER.error(f"Error reloading plugin {plugin_name}: {e}", exc_info=True) return False async def enable_plugin(self, plugin_name: str) -> bool: try: if plugin_name not in self.plugins: LOGGER.error(f"Plugin {plugin_name} not found") return False plugin_instance = self.loaded_modules[plugin_name] if await plugin_instance.on_enable(): self.plugins[plugin_name].enabled = True self._refresh_commands() LOGGER.info(f"Plugin {plugin_name} enabled") return True else: LOGGER.error(f"Plugin {plugin_name} failed to enable") return False except Exception as e: LOGGER.error(f"Error enabling plugin {plugin_name}: {e}", exc_info=True) return False async def disable_plugin(self, plugin_name: str) -> bool: try: if plugin_name not in self.plugins: LOGGER.error(f"Plugin {plugin_name} not found") return False plugin_instance = self.loaded_modules[plugin_name] if await plugin_instance.on_disable(): self.plugins[plugin_name].enabled = False self._refresh_commands() LOGGER.info(f"Plugin {plugin_name} disabled") return True else: LOGGER.error(f"Plugin {plugin_name} failed to disable") return False except Exception as e: LOGGER.error(f"Error disabling plugin {plugin_name}: {e}", exc_info=True) return False def list_plugins(self) -> List[PluginInfo]: return list(self.plugins.values()) def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]: return self.plugins.get(plugin_name) def _register_handlers(self, plugin_instance: PluginBase, plugin_info: PluginInfo): from ..helper.telegram_helper.filters import CustomFilters from pyrogram.filters import command from pyrogram.handlers import MessageHandler for handler in plugin_info.handlers: self.bot.add_handler(handler) module = sys.modules.get(plugin_info.name) if module: for attr_name in dir(module): attr = getattr(module, attr_name) if callable(attr) and attr_name.endswith("_command"): cmd_name = attr_name.replace("_command", "") if cmd_name in plugin_info.commands: handler = MessageHandler( attr, filters=command(cmd_name, case_sensitive=True) & CustomFilters.authorized, ) plugin_info.handlers.append(handler) self.bot.add_handler(handler) LOGGER.info( f"Registered command /{cmd_name} for plugin {plugin_info.name}" ) def _unregister_handlers(self, plugin_info: PluginInfo): for handler in plugin_info.handlers: try: self.bot.remove_handler(handler) except Exception as e: LOGGER.warning(f"Error removing handler: {e}") plugin_manager = PluginManager(None) def get_plugin_manager() -> PluginManager: return plugin_manager