leech / bot /core /plugin_manager.py
dragxd's picture
Initial commit: Push project to Hugging Face
db78256
import importlib
import importlib.util
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from pyrogram import Client
from pyrogram.handlers import CallbackQueryHandler, MessageHandler
from .. import LOGGER
from ..helper.telegram_helper.filters import CustomFilters
@dataclass
class PluginInfo:
name: str
version: str
author: str
description: str
enabled: bool = True
handlers: List[Any] = field(default_factory=list)
commands: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list)
class PluginBase:
PLUGIN_INFO: PluginInfo
async def on_load(self) -> bool:
return True
async def on_unload(self) -> bool:
return True
async def on_enable(self) -> bool:
return True
async def on_disable(self) -> bool:
return True
def register_command(self, command: str, handler_func, filters=None):
if filters is None:
filters = CustomFilters.authorized
return MessageHandler(handler_func, filters=command & filters)
def register_callback(self, pattern: str, callback_func, filters=None):
if filters is None:
filters = CustomFilters.authorized
return CallbackQueryHandler(callback_func, filters=pattern & filters)
class PluginManager:
def __init__(self, bot: Client):
self.bot = bot
self.plugins: Dict[str, PluginInfo] = {}
self.loaded_modules: Dict[str, Any] = {}
self.plugins_dir = Path("plugins")
self.plugins_dir.mkdir(exist_ok=True)
def discover_plugins(self) -> List[str]:
plugin_files = []
for file in self.plugins_dir.glob("*.py"):
if file.name.startswith("__"):
continue
plugin_files.append(file.stem)
return plugin_files
def _refresh_commands(self):
try:
from ..helper.telegram_helper.bot_commands import BotCommands
from ..helper.ext_utils.help_messages import (
get_bot_commands,
get_help_string,
)
import importlib
BotCommands.refresh_commands()
importlib.reload(sys.modules["bot.helper.ext_utils.help_messages"])
from ..helper.ext_utils.help_messages import BOT_COMMANDS, help_string
globals()["BOT_COMMANDS"] = get_bot_commands()
globals()["help_string"] = get_help_string()
LOGGER.info("Bot commands and help refreshed")
except Exception as e:
LOGGER.error(f"Error refreshing commands: {e}", exc_info=True)
async def load_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name in self.loaded_modules:
LOGGER.warning(f"Plugin {plugin_name} already loaded")
return False
plugin_path = self.plugins_dir / f"{plugin_name}.py"
if not plugin_path.exists():
LOGGER.error(f"Plugin file {plugin_name}.py not found")
return False
spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
module = importlib.util.module_from_spec(spec)
sys.modules[plugin_name] = module
spec.loader.exec_module(module)
plugin_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, PluginBase)
and attr != PluginBase
):
plugin_class = attr
break
if not plugin_class:
LOGGER.error(f"No valid plugin class found in {plugin_name}")
return False
plugin_instance = plugin_class()
if not hasattr(plugin_instance, "PLUGIN_INFO"):
LOGGER.error(f"Plugin {plugin_name} missing PLUGIN_INFO")
return False
plugin_info = plugin_instance.PLUGIN_INFO
if await plugin_instance.on_load():
self.plugins[plugin_name] = plugin_info
self.loaded_modules[plugin_name] = plugin_instance
self._register_handlers(plugin_instance, plugin_info)
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} loaded successfully")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to load")
return False
except Exception as e:
LOGGER.error(f"Error loading plugin {plugin_name}: {e}", exc_info=True)
return False
async def unload_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name not in self.loaded_modules:
LOGGER.error(f"Plugin {plugin_name} not loaded")
return False
plugin_instance = self.loaded_modules[plugin_name]
plugin_info = self.plugins[plugin_name]
if await plugin_instance.on_unload():
self._unregister_handlers(plugin_info)
del self.loaded_modules[plugin_name]
del self.plugins[plugin_name]
if plugin_name in sys.modules:
del sys.modules[plugin_name]
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} unloaded successfully")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to unload")
return False
except Exception as e:
LOGGER.error(f"Error unloading plugin {plugin_name}: {e}", exc_info=True)
return False
async def reload_plugin(self, plugin_name: str) -> bool:
try:
await self.unload_plugin(plugin_name)
return await self.load_plugin(plugin_name)
except Exception as e:
LOGGER.error(f"Error reloading plugin {plugin_name}: {e}", exc_info=True)
return False
async def enable_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name not in self.plugins:
LOGGER.error(f"Plugin {plugin_name} not found")
return False
plugin_instance = self.loaded_modules[plugin_name]
if await plugin_instance.on_enable():
self.plugins[plugin_name].enabled = True
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} enabled")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to enable")
return False
except Exception as e:
LOGGER.error(f"Error enabling plugin {plugin_name}: {e}", exc_info=True)
return False
async def disable_plugin(self, plugin_name: str) -> bool:
try:
if plugin_name not in self.plugins:
LOGGER.error(f"Plugin {plugin_name} not found")
return False
plugin_instance = self.loaded_modules[plugin_name]
if await plugin_instance.on_disable():
self.plugins[plugin_name].enabled = False
self._refresh_commands()
LOGGER.info(f"Plugin {plugin_name} disabled")
return True
else:
LOGGER.error(f"Plugin {plugin_name} failed to disable")
return False
except Exception as e:
LOGGER.error(f"Error disabling plugin {plugin_name}: {e}", exc_info=True)
return False
def list_plugins(self) -> List[PluginInfo]:
return list(self.plugins.values())
def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]:
return self.plugins.get(plugin_name)
def _register_handlers(self, plugin_instance: PluginBase, plugin_info: PluginInfo):
from ..helper.telegram_helper.filters import CustomFilters
from pyrogram.filters import command
from pyrogram.handlers import MessageHandler
for handler in plugin_info.handlers:
self.bot.add_handler(handler)
module = sys.modules.get(plugin_info.name)
if module:
for attr_name in dir(module):
attr = getattr(module, attr_name)
if callable(attr) and attr_name.endswith("_command"):
cmd_name = attr_name.replace("_command", "")
if cmd_name in plugin_info.commands:
handler = MessageHandler(
attr,
filters=command(cmd_name, case_sensitive=True)
& CustomFilters.authorized,
)
plugin_info.handlers.append(handler)
self.bot.add_handler(handler)
LOGGER.info(
f"Registered command /{cmd_name} for plugin {plugin_info.name}"
)
def _unregister_handlers(self, plugin_info: PluginInfo):
for handler in plugin_info.handlers:
try:
self.bot.remove_handler(handler)
except Exception as e:
LOGGER.warning(f"Error removing handler: {e}")
plugin_manager = PluginManager(None)
def get_plugin_manager() -> PluginManager:
return plugin_manager