import os import time import json import asyncio import psutil import discord import requests import socket import random from datetime import datetime from dotenv import load_dotenv from discord.ext import commands, tasks from pydvpl.dvpl import compress_dvpl, decompress_dvpl, __LZ4_VERSION__ import pydvpl.version import pytgpt.phind as phind import pytgpt.opengpt as opengpt import pytgpt.blackboxai as blackboxai import subprocess import pytgpt from craiyon import Craiyon, craiyon_utils from io import BytesIO import base64 import craiyon # ENV_LOADER ------------------------------------------------------ START load_dotenv() # ENV_LOADER ------------------------------------------------------ END # DEFINES ------------------------------------------------------ START BOT_VERSION = '1.4.5' AUTHORIZED_USER_ID = os.getenv('AUTHORIZED_ID') TOKEN = os.getenv('DISCORD_TOKEN') LOG_CHANNEL_ID = os.getenv('CHANNEL_ID') AUTHORIZED_AI_ACCESS_ID = os.getenv('AUTHORIZED_AI_ID') MY_SERVER_ID = os.getenv('MY_SERVER_ID') FEEDBACK_CHANNEL_ID = os.getenv('FEEDBACK_CHANNEL_ID') UPDATE_INTERVAL = 60 activity_start_time = 0 # Define folder paths FEEDBACK_FOLDER = "feedback" ISSUES_FOLDER = os.path.join(FEEDBACK_FOLDER, "issues") REQUESTS_FOLDER = os.path.join(FEEDBACK_FOLDER, "requests") COMPRESS_FOLDER= os.path.join("received_to_compress") DECOMPRESS_FOLDER= os.path.join("received_to_decompress") # DEFINES ------------------------------------------------------ END # HELPER ------------------------------------------------------ START # FUNCTION ------------------------------------------------------ SPLIT # Ensure the existence of folders def ensure_folders(): for folder in [FEEDBACK_FOLDER, ISSUES_FOLDER, REQUESTS_FOLDER, COMPRESS_FOLDER, DECOMPRESS_FOLDER]: if not os.path.exists(folder): os.makedirs(folder) # FUNCTION ------------------------------------------------------ SPLIT async def clean_directory(ctx, directory): try: msg = await ctx.reply("Cleaning...") file_count = 0 for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if os.path.isfile(file_path): os.remove(file_path) file_count += 1 await msg.edit(content=f"All files cleaned successfully in {directory}. Total files cleaned: {file_count}.") except Exception as e: await msg.edit(content=f"An error occurred while cleaning files: {e}") # HELPER ------------------------------------------------------ END # CORE_PREFIX ------------------------------------------------------ START # Set up bot command prefix bot_intents = discord.Intents.all() # Initialize the bot with intents bot = commands.Bot(command_prefix=">>", intents=bot_intents) # CORE_PREFIX ------------------------------------------------------ END # CORE ------------------------------------------------------ START @bot.event async def on_ready(): global activity_start_time activity_start_time = time.time() # Set initial activity to playing time await update_playing_time_activity() # Start the task to update playing time update_playing_time.start() await bot.tree.sync() print(f'{bot.user} has connected to Discord!') ensure_folders() # FUNCTION ------------------------------------------------------ SPLIT @bot.event async def on_disconnect(): print('Bot disconnected from Discord. Attempting to reconnect...') await bot.change_presence(status=discord.Status.invisible) # FUNCTION ------------------------------------------------------ SPLIT @bot.event async def on_guild_join(guild): embed = discord.Embed( title="Thank You for Adding PyDVPL Bot! 💛", description="We're excited to be a part of your server. If you have any questions or need assistance, feel free to reach out to us!", color=discord.Color.yellow() ) embed.add_field(name="Feedback Commands:", value="`feedback` - Sends feedback to bot developers. (usage: >>feedback issues/requests )", inline=False) embed.add_field(name="Help Commands:", value="Do `/help` or `>>help` to begin with PyDVPL!", inline=False) # embed.set_footer(value="Do `/help` or `>>help` to begin with PyDVPL!") # Check for a suitable channel to send the message public_updates_channel = guild.public_updates_channel default_channel = guild.system_channel or guild.text_channels[0] # Default to the first text channel if public_updates_channel: await public_updates_channel.send(embed=embed) else: if default_channel: await default_channel.send(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.event async def on_command(ctx): # Log command usage log_channel = bot.get_channel(int(LOG_CHANNEL_ID)) if log_channel: if ctx.guild: log_message = f"Command `{ctx.command}` used by `{ctx.author} ({ctx.author.id})` in `{ctx.guild} ({ctx.guild.id})`" else: log_message = f"Command `{ctx.command}` used by `{ctx.author} ({ctx.author.id})` in {ctx.me.mention}'s `DM`" await log_channel.send(log_message) # FUNCTION ------------------------------------------------------ SPLIT @bot.event async def on_command_error(ctx, error): if isinstance(error, commands.CommandNotFound): await ctx.reply("Sorry, I couldn't find that command. Use `/help` or `>>help` to see the list of available commands.") else: print(f'An error occurred: {error}') # FUNCTION ------------------------------------------------------ SPLIT @bot.event async def on_error(event, *args): print(f'An error occurred during event {event}: {args[0]}') await bot.change_presence(status=discord.Status.dnd) # CORE ------------------------------------------------------ END # ACTIVITY ------------------------------------------------------ START async def update_playing_time_activity(): global activity_start_time global playing_time_activity current_time = time.time() uptime_seconds = int(current_time - activity_start_time) minutes, seconds = divmod(uptime_seconds, 60) hours, minutes = divmod(minutes, 60) playing_time_activity = f"/help or >>help for commands list | online for {hours}h {minutes}m" #seconds {seconds}s await bot.change_presence(activity=discord.Game(name=playing_time_activity)) # FUNCTION ------------------------------------------------------ SPLIT @tasks.loop(seconds=UPDATE_INTERVAL) async def update_playing_time(): await update_playing_time_activity() # ACTIVITY ------------------------------------------------------ END # PYDVPL ------------------------------------------------------ START async def compress_file(file_path): try: with open(file_path, "rb") as f: file_data = f.read() compressed_data = compress_dvpl(file_data) compressed_file_path = file_path + ".dvpl" # Example: Add .dvpl extension with open(compressed_file_path, "wb") as f: f.write(compressed_data) return compressed_file_path except Exception as e: print(f"Compression failed: {e}") return None # FUNCTION ------------------------------------------------------ SPLIT async def decompress_file(file_path): try: with open(file_path, "rb") as f: file_data = f.read() decompressed_data = decompress_dvpl(file_data) decompressed_file_path = os.path.splitext(file_path)[0] # Example: Remove .dvpl extension with open(decompressed_file_path, "wb") as f: f.write(decompressed_data) return decompressed_file_path except Exception as e: print(f"Decompression failed: {e}") return None # PYDVPL ------------------------------------------------------ END # UTILITY USER_CMD ------------------------------------------------------ START bot.remove_command("help") @bot.hybrid_command(help="Displays the list of available commands.") async def help(ctx): # Create an embed to display the list of available commands embed = discord.Embed(title="Command List", description="Here's a list of available commands:", color=discord.Color.blue()) # Add fields for different categories of commands with their descriptions embed.add_field(name="PyDVPL Prefix:", value="`/` - Discord's slash command prefix.\n`>>` - PyDVPL default prefix.") embed.add_field(name="Dvpl Commands:", value="`compress` - Compresses files.\n`decompress` - Decompresses files.", inline=False) embed.add_field(name="User Utility Commands:", value="`ping` - Checks the bot's latency.\n`invite` - Get the bot's invite link.\n`uptime` - Displays the bot's uptime.\n`clock` - Shows the current date and time.\n`chat` - Chat with AI LLM models like 'phind', 'chatgpt' & 'blackboxai'.\n`coinflip - Plays a coinflip game with the bot`", inline=False) embed.add_field(name="Premium User Commands:", value="`chat` - Chat with AI LLM models like 'phind', 'chatgpt' & 'blackboxai'.\n`image` - Generate images with 'craiyon' AI.", inline=False) embed.add_field(name="Moderator Commands:", value="`kick` - Kick a member from the server.\n`ban` - Ban a member from the server.\n`timeout` - Timeout a member for a specified duration.\n`purge` - Bulk deletes set amount of messages in guilds.", inline=False) embed.add_field(name="Admin Utility Commands:", value="`stats` - Displays bot statistics.\n`list` - Lists server members.\n`say` - Make the bot say something.\n`announce` - Announces a message to a channel.\n`activity` - Sets bot activity status.\n`online` - Sets bot status to online.\n`dnd` - Sets bot status to Do Not Disturb.\n`offline` - Sets bot status to offline.\n`clean` - Cleans up bot's messages.\n`update` - Update a Python package using pip.\n`ip` - Shows the IP address of the current server.", inline=False) embed.add_field(name="Feedback Commands:", value="`feedback` - Sends feedback to bot developers. (usage: >>feedback issues/requests )", inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed as an ephemeral message, visible only to the command invoker await ctx.reply(embed=embed, ephemeral=True) #await ctx.reply(f"The command `help` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Compresses attached files.") async def compress(ctx , attachments: discord.Attachment): attachments = ctx.message.attachments if not attachments: await ctx.reply("No file attached.") return for attachment in attachments: file_path = os.path.join(COMPRESS_FOLDER, attachment.filename) msg = await ctx.reply("Compressing...") # Check if the file is already in .dvpl format, if so, skip it if file_path.endswith(".dvpl"): embed = discord.Embed( title="Compression Skipped", description=f"File {attachment.filename} is already in .dvpl format.", color=discord.Color.gold() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) continue await attachment.save(file_path) compressed_file_path = await compress_file(file_path) if compressed_file_path: embed = discord.Embed( title="Compression Successful", description=f"File {attachment.filename} has been successfully compressed.", color=discord.Color.green() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) await ctx.reply(file=discord.File(compressed_file_path)) os.remove(file_path) # Delete the original file os.remove(compressed_file_path) # Delete the compressed file else: embed = discord.Embed( title="Compression Failed", description="Failed to compress the file.", color=discord.Color.red() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) @bot.hybrid_command(help="Decompresses attached files.") async def decompress(ctx, attachments: discord.Attachment): attachments = ctx.message.attachments if not attachments: await ctx.reply("No file attached.") return for attachment in attachments: file_path = os.path.join(DECOMPRESS_FOLDER, attachment.filename) msg = await ctx.reply("Decompressing...") # Check if the file is not in .dvpl format, if so, skip it if not file_path.endswith(".dvpl"): embed = discord.Embed( title="Decompression Skipped", description=f"File {attachment.filename} is not in .dvpl format.", color=discord.Color.gold() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) continue await attachment.save(file_path) decompressed_file_path = await decompress_file(file_path) if decompressed_file_path: embed = discord.Embed( title="Decompression Successful", description=f"File {attachment.filename} has been successfully decompressed.", color=discord.Color.green() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) await ctx.reply(file=discord.File(decompressed_file_path)) os.remove(file_path) # Delete the original file os.remove(decompressed_file_path) # Delete the decompressed file else: embed = discord.Embed( title="Decompression Failed", description="Failed to decompress the file.", color=discord.Color.red() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) #await ctx.reply(f"The command `decompress` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Pings to check bot's latency") async def ping(ctx): start_time = time.monotonic() msg = await ctx.reply("Pinging...") # Measure WebSocket latency ws_latency = bot.latency * 1000 # Calculate elapsed time since sending the message end_time = time.monotonic() elapsed_time = (end_time - start_time) * 1000 # Create an embedded message embed = discord.Embed( title="Pong! 🏓", description=f"WebSocket latency: `{ws_latency:.2f}ms`\nResponse time: `{elapsed_time:.2f}ms`", color=discord.Color.blurple() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) #await ctx.reply(f"The command `ping` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Pings domain names and ip addresses.") async def pinger(ctx, target): try: # Attempt to resolve the target to an IP address ip_address = socket.gethostbyname(target) msg = await ctx.reply("Pinging...") # Create a socket object s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Set a timeout for the connection attempt s.settimeout(5) # Record the start time start_time = time.time() # Connect to the target IP address on port 80 (HTTP) s.connect((ip_address, 80)) # Calculate the round trip time (RTT) rtt = (time.time() - start_time) * 1000 # Convert to milliseconds # Close the socket s.close() # Create an embedded message embed = discord.Embed( title="Ping Result", color=discord.Color.green() ) embed.add_field(name="Target", value=target, inline=False) embed.add_field(name="IP Address", value=ip_address, inline=False) # Add IP address field embed.add_field(name="Status", value="Successful 🟢", inline=False) embed.add_field(name="Latency", value=f"{rtt:.2f}ms", inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embedded message await msg.edit(content=None, embed=embed) except socket.gaierror: # Create an embedded message for resolution error embed = discord.Embed( title="Ping Result", description=f"Error: Unable to resolve {target} to an IP address. 🔴", color=discord.Color.red() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) except socket.error: # Create an embedded message for unreachable error embed = discord.Embed( title="Ping Result", description=f"Error: {target} is unreachable. 🔴", color=discord.Color.red() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Generates an invite link for the bot and sends it to your DM.") async def invite(ctx): permissions = discord.Permissions( read_messages=True, send_messages=True, embed_links=True, attach_files=True, read_message_history=True, mention_everyone=True, add_reactions=True, manage_guild=True, ban_members=True, kick_members=True, moderate_members=True, use_application_commands=True, manage_roles=True, administrator=True ) client_id = bot.user.id invite_link = discord.utils.oauth_url(client_id, permissions=permissions, scopes=['bot']) # Create an embedded message for the invite link invite_embed = discord.Embed( title="Invite Link", description=f"Click the button below to invite PyDVPL to your server", color=discord.Color.blue() ) invite_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) invite_embed.timestamp = ctx.message.created_at # Create a button for the invite link invite_button = discord.ui.Button( style=discord.ButtonStyle.link, label="Invite PyDVPL", url=invite_link ) # Create a View to hold the button view = discord.ui.View() view.add_item(invite_button) # Send the embed with the button to the user's DM try: await ctx.author.send( embed=invite_embed, view=view ) invite_confirmation_embed = discord.Embed( title="Invite Link Sent", description=f"The invite link has been sent to your DM, {ctx.author.mention}.", color=discord.Color.green() ) invite_confirmation_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) invite_confirmation_embed.timestamp = ctx.message.created_at await ctx.reply(embed=invite_confirmation_embed) except discord.Forbidden: invite_error_embed = discord.Embed( title="Invite Link Sending Failed", description=f"Failed to send the invite link to your DM, {ctx.author.mention}. Please make sure your DMs are open.", color=discord.Color.red() ) invite_error_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) invite_error_embed.timestamp = ctx.message.created_at await ctx.reply(embed=invite_error_embed) #await ctx.reply(f"The command `invite` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Send feedback or report issues", usage=" ") async def feedback(ctx, category: str, *, message: str): if category.lower() not in ["requests", "issues"]: await ctx.reply("Invalid category. Please choose 'requests' or 'issues'.") return feedback_data = { "author_id": ctx.author.id, "author_name": ctx.author.name, "timestamp": str(ctx.message.created_at), "message": message } folder_path = REQUESTS_FOLDER if category.lower() == "requests" else ISSUES_FOLDER file_path = os.path.join(folder_path, f"{category.lower()}.json") try: with open(file_path, "r") as f: data = json.load(f) except FileNotFoundError: data = [] data.append(feedback_data) with open(file_path, "w") as f: json.dump(data, f, indent=4) await ctx.reply(f"Thank you for your {category.lower()}. It has been recorded.") # Create an Embed object for the feedback data embed = discord.Embed( title=f"New {category.lower()} feedback", description=f"**Author Name:** {feedback_data['author_name']}\n**Timestamp:** {feedback_data['timestamp']}\n**Message:** {feedback_data['message']}", color=discord.Color.yellow() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed to the specified server's feedback channel feedback_channel_id = bot.get_channel(int(FEEDBACK_CHANNEL_ID)) if feedback_channel_id: if feedback_channel_id: await feedback_channel_id.send(embed=embed) else: print("Feedback channel not found in the server.") else: print("Server not found.") #await ctx.reply(f"The command `feedback` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Show how long the bot has been playing the current game.") async def uptime(ctx): global activity_start_time if activity_start_time == 0: bot_mention = ctx.me.mention await ctx.reply(f"{bot_mention}'s activity hasn't been set yet.") return current_time = time.time() uptime_seconds = int(current_time - activity_start_time) minutes, seconds = divmod(uptime_seconds, 60) hours, minutes = divmod(minutes, 60) bot_mention = ctx.me.mention # Create an embedded message for the uptime embed = discord.Embed( title="Bot Uptime", description=f"{bot_mention} has been playing for:", color=discord.Color.blue() ) embed.add_field(name="Hours", value=hours) embed.add_field(name="Minutes", value=minutes) embed.add_field(name="Seconds", value=seconds) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) #await ctx.reply(f"The command `uptime` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Shows the bot's version number.") async def version(ctx): # Create an embed message to display the bot version embed = discord.Embed( title="Bot Version", description="The current version of the bot and its dependencies.", color=discord.Color.gold() # Changed color to gold for a more visually appealing look ) bot_ver = { f"PyDVPL Bot Version": f"{BOT_VERSION}", } bot_ver_formatted = "\n".join([f"```{name}: {value}```" for name, value in bot_ver.items()]) # Bot version embed.add_field(name="**CORE**", value=bot_ver_formatted, inline=False) # Changed to bold and capitalized "CORE" # Add dependencies as child fields dependencies = { f"PyDVPL Lib Version": f"{pydvpl.version.__version__}", f"LZ4 PY Lib Version": f"{__LZ4_VERSION__}", f"Discord PY Version": f"{discord.__version__}", f"PyTGPT Lib Version": f"{pytgpt.__version__}", f"Craiyon Lib Version": f"{craiyon.__version__}" } # Format and layout dependencies dependencies_formatted = "\n".join([f"```{name}: {value}```" for name, value in dependencies.items()]) embed.add_field(name="**DEPENDENCIES**", value=dependencies_formatted, inline=False) # Changed to bold and capitalized "DEPENDENCIES" embed.set_footer( text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty ) embed.timestamp = ctx.message.created_at # Send the embed message await ctx.reply(embed=embed) #await ctx.reply(f"The command `version` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Shows the current date and time.") async def clock(ctx): # Get the current date and time now = datetime.now() # Format the date and time formatted_now = now.strftime("%Y-%m-%d %H:%M:%S") # Create an embed message to display the current date and time embed = discord.Embed( title="Current Date and Time", description=f"```\n{formatted_now}\n```", color=discord.Color.blue() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed message await ctx.reply(embed=embed) #await ctx.reply(f"The command `clock` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Chat with AI LLM models like 'phind', 'chatgpt', & 'blackboxai'.") async def chat(ctx, model, *, message): if str(ctx.author.id) not in AUTHORIZED_AI_ACCESS_ID: await ctx.reply("You are not authorized to use this command. Only `@rifsxd`'s server boosters & special permitted users can enjoy free `ChatGPT` access.") return msg = await ctx.reply("Responding...") if model.lower() == "phind": bot = phind.PHIND() elif model.lower() == "chatgpt": bot = opengpt.OPENGPT() elif model.lower() == "blackboxai": bot = blackboxai.BLACKBOXAI() else: await ctx.reply("Invalid model. Please choose 'phind' or 'chatgpt' or 'blackboxai'.") return response = bot.chat(message) if len(response) > 2000: half_length = len(response) // 2 first_half = response[:half_length] second_half = response[half_length:] embed = discord.Embed( title="Response", color=discord.Color.green() ) embed.add_field(name="First Half", value=f"{first_half}-") embed.add_field(name="Second Half", value=f"-{second_half}") embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) else: embed = discord.Embed( title="Response", description=response, color=discord.Color.green() ) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) #await ctx.reply(f"The command `chat` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Generate images with 'craiyon' ai.") async def image(ctx, *, prompt: str): # Check if the user is authorized to use the command if str(ctx.author.id) not in AUTHORIZED_AI_ACCESS_ID: await ctx.reply("You are not authorized to use this command.") return # Inform the user that the generation process has started msg = await ctx.reply(f"Generating images for prompt: `{prompt}`. Please be patient...") try: generator = Craiyon() generated_images = await generator.async_generate(prompt) # Encode images to base64 b64_list = await craiyon_utils.async_encode_base64(generated_images.images) # Prepare images for sending images = [] for index, image_b64 in enumerate(b64_list): img_bytes = BytesIO(base64.b64decode(image_b64)) image_file = discord.File(img_bytes, filename=f"result{index}.webp") images.append(image_file) embed = discord.Embed( title="Generated Images", description=f"Sucessfully generated images for prompt: `{prompt}`.", color=discord.Color.green() ) # Add a field for each generated image for index, image in enumerate(images): embed.add_field(name=f"Image {index+1}", value=f"[Click here to view]({image.url})") embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content=None, embed=embed) except Exception as e: # Print the specific exception message for debugging print(f"An error occurred while generating images: {e}") await ctx.reply("An error occurred while generating images. Please try again later.") #await ctx.reply(f"The command `image` was executed by - {ctx.author.mention}") # FUNCTION ------------------------------------------------------ SPLIT @bot.command(help="Plays a coinflip game with the bot") async def coinflip(ctx): emojis = ["🐈", "😸"] # Add more emojis if needed # Create an embedded message embed = discord.Embed(title="Coin Flip", description="React with 🐈 for tails or 😸 for heads!", color=discord.Colour.yellow()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at flip_msg = await ctx.send(embed=embed) # Add reactions for heads and tails for emoji in emojis: await flip_msg.add_reaction(emoji) # Wait for reactions def check(reaction, user): return user == ctx.author and str(reaction.emoji) in emojis try: reaction, _ = await bot.wait_for('reaction_add', timeout=30.0, check=check) user_choice = reaction.emoji except asyncio.TimeoutError: await ctx.send("You didn't react in time. Aborting...") return await asyncio.sleep(1) for i in range(3, 0, -1): embed.description = f"Flipping the coin in {i}..." await flip_msg.edit(embed=embed) await asyncio.sleep(1) # Randomly choose the result result = random.choice(emojis) embed.description = f"The result is **{result}**!" if result == user_choice: winner = ctx.author.display_name else: winner = "The bot" embed.set_footer(text=f"Winner: {winner}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) await flip_msg.edit(embed=embed) # UTILITY USER_CMD ------------------------------------------------------ END # UTILITY ADMIN_CMD ------------------------------------------------------ START @bot.hybrid_command(help="Cleans files in specific directories") async def clean(ctx, clean_mode): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return embed = discord.Embed(title="File Cleaning Command", color=0x00ff00) # Green color for the embed if clean_mode.lower() == 'compress': await clean_directory(ctx, COMPRESS_FOLDER) embed.add_field(name="Action", value="Compress Folder Cleaning", inline=False) elif clean_mode.lower() == 'decompress': await clean_directory(ctx, DECOMPRESS_FOLDER) embed.add_field(name="Action", value="Decompress Folder Cleaning", inline=False) elif clean_mode.lower() == 'issues': await clean_directory(ctx, ISSUES_FOLDER) embed.add_field(name="Action", value="Issues Folder Cleaning", inline=False) elif clean_mode.lower() == 'requests': await clean_directory(ctx, REQUESTS_FOLDER) embed.add_field(name="Action", value="Requests Folder Cleaning", inline=False) elif clean_mode.lower() == 'feedback': await clean_directory(ctx, REQUESTS_FOLDER) await clean_directory(ctx, ISSUES_FOLDER) embed.add_field(name="Action", value="Feedback Folders Cleaning", inline=False) elif clean_mode.lower() == 'dvpl': await clean_directory(ctx, DECOMPRESS_FOLDER) await clean_directory(ctx, COMPRESS_FOLDER) embed.add_field(name="Action", value="DVPL Folders Cleaning", inline=False) else: embed.add_field(name="Error", value="Invalid clean mode. Please choose 'compress', 'decompress', 'issues', or 'requests'.", inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) return embed.add_field(name="Status", value="Cleaning completed successfully.", inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Set bot's presence to online") async def online(ctx): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Your playing_time_activity variable should be defined somewhere playing_time_activity = "Your default activity here" # Change the bot's presence await bot.change_presence(activity=discord.Game(name=playing_time_activity)) # Prepare an embed for response embed = discord.Embed(title="Bot Presence Update", color=0x00ff00) # Green color for the embed embed.add_field(name="Status", value="Bot's presence set to online with activity: " + playing_time_activity, inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed as a reply await ctx.reply(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Set bot's presence to Do Not Disturb") async def dnd(ctx): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Change the bot's presence to Do Not Disturb await bot.change_presence(status=discord.Status.dnd) # Prepare an embed for response embed = discord.Embed(title="Bot Presence Update", color=0xff0000) # Red color for the embed embed.add_field(name="Status", value="Bot's presence set to Do Not Disturb", inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed as a reply await ctx.reply(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Set bot's presence to offline") async def offline(ctx): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Change the bot's presence to offline await bot.change_presence(status=discord.Status.offline) # Prepare an embed for response embed = discord.Embed(title="Bot Presence Update", color=0x000000) # Black color for the embed embed.add_field(name="Status", value="Bot's presence set to offline", inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed as a reply await ctx.reply(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Set bot's activity") async def activity(ctx, activity_type, *, activity_text=None): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Check if a valid activity type is provided valid_activity_types = ['listening', 'watching', 'playing'] if activity_type.lower() not in valid_activity_types: embed = discord.Embed(title="Invalid Activity Type", color=0xff0000) # Red color for the embed embed.add_field(name="Error", value='Invalid activity type. Please choose either "listening", "playing" or "watching".', inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) return # Check if activity text is provided if not activity_text: embed = discord.Embed(title="Activity Text Missing", color=0xff0000) # Red color for the embed embed.add_field(name="Error", value='Please provide an activity text.', inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) return # Determine the activity based on the provided type if activity_type.lower() == 'listening': activity = discord.Activity(type=discord.ActivityType.listening, name=activity_text) elif activity_type.lower() == 'watching': activity = discord.Activity(type=discord.ActivityType.watching, name=activity_text) elif activity_type.lower() == 'playing': activity = discord.Activity(type=discord.ActivityType.playing, name=activity_text) # Change the bot's presence await bot.change_presence(activity=activity) # Prepare an embed for response embed = discord.Embed(title="Bot Activity Update", color=0x00ff00) # Green color for the embed embed.add_field(name="Status", value=f'Activity set to {activity_type.capitalize()} {activity_text}', inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed as a reply await ctx.reply(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Send messages to specific channels or users") async def say(ctx, destination_type, *, id, message): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Define function to send reply and return async def send_reply(reply): embed = discord.Embed(title="Message Sent", color=0x00ff00) # Green color for the embed embed.add_field(name="Status", value=reply, inline=False) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) # Check destination type and send message accordingly if destination_type.lower() == 'channel': try: channel_id = int(id) channel = bot.get_channel(channel_id) if channel: await channel.send(message) await send_reply(f"Message `{message}` sent to channel `{channel.name}`.") else: await send_reply("Channel not found.") except ValueError: await send_reply("Channel ID must be a valid integer.") elif destination_type.lower() == 'server': try: server_id = int(id) server = bot.get_guild(server_id) if server: announcement_channel = discord.utils.get(server.channels, name="important-announcements") if announcement_channel: await announcement_channel.send(message) await send_reply(f"Message `{message}` sent to server `{server.name}` in `important-announcements` channel.") else: announcements_channel = discord.utils.get(server.channels, name="announcements") if announcements_channel: await announcements_channel.send(message) await send_reply(f"Message `{message}` sent to server `{server.name}` in `announcements` channel.") else: public_updates_channel = server.public_updates_channel if public_updates_channel: await public_updates_channel.send(message) await send_reply(f"Message `{message}` sent to server `{server.name}` in `public updates` channel.") else: default_channel = server.system_channel or server.text_channels[0] # Default to the first text channel if default_channel: await default_channel.send(message) await send_reply(f"Message `{message}` sent to server `{server.name}` in the default channel.") else: await send_reply("No announcement channels found in the server. Also, no default `public updates` neither `default` channels found.") else: await send_reply("Server not found.") except ValueError: await send_reply("Server ID must be a valid integer.") elif destination_type.lower() == 'user': try: user_id = int(id) user = bot.get_user(user_id) if user: try: await user.send(message) await send_reply(f"Message `{message}` sent to user '{user.display_name}'.") except discord.Forbidden: await send_reply(f"Couldn't send message to `{user.display_name}`. Their DMs are closed.") else: await send_reply("User not found.") except ValueError: await send_reply("User ID must be a valid integer.") else: await send_reply("Invalid destination type. Use either 'channel', 'server', or 'user'.") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Send announcements to all servers") async def announce(ctx, *, message): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Prepare an embed for the announcement message embed = discord.Embed(title="Announcement!", description=message, color=discord.Color.yellow()) # Iterate through all servers the bot is in and send the announcement for server in bot.guilds: # First, check for an "important-announcements" channel announcement_channel = discord.utils.get(server.channels, name="important-announcements") if announcement_channel: embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await announcement_channel.send(embed=embed) await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `important-announcements` channel.") else: # If no "important-announcements" channel, check for "announcements" channel announcements_channel = discord.utils.get(server.channels, name="announcements") if announcements_channel: embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await announcements_channel.send(embed=embed) await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `announcements` channel.") else: # If neither channel exists, send to the default channel or public updates channel public_updates_channel = server.public_updates_channel if public_updates_channel: embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await public_updates_channel.send(embed=embed) await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `public updates` channel.") else: default_channel = server.system_channel or server.text_channels[0] if default_channel: embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await default_channel.send(embed=embed) await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `defaults` channel.") else: await ctx.reply(f"No suitable announcement channel found in server `{server.name}`.") # Delete the original command message # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Show statistics about bot's health and connection.") async def stats(ctx): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return # Send a message to indicate that statistics are being fetched msg = await ctx.reply("Fetching statistics...") # Get bot's statistics guild_count = len(bot.guilds) member_count = len(set(bot.get_all_members())) uptime_seconds = int(time.time() - activity_start_time) uptime_minutes, uptime_seconds = divmod(uptime_seconds, 60) uptime_hours, uptime_minutes = divmod(uptime_minutes, 60) uptime_days, uptime_hours = divmod(uptime_hours, 24) latency = round(bot.latency * 1000) # latency in milliseconds # Get CPU usage cpu_usage = psutil.cpu_percent(interval=None) # Get memory usage memory_usage = psutil.virtual_memory().percent # Prepare an embed for the statistics embed = discord.Embed(title="Bot Statistics & Vitals", color=discord.Color.blue()) embed.add_field(name="Servers", value=f"`{guild_count}`", inline=True) embed.add_field(name="Members", value=f"`{member_count}`", inline=True) embed.add_field(name="Uptime", value=f"`{uptime_days} days`, `{uptime_hours} hours`, `{uptime_minutes} minutes`, `{uptime_seconds} seconds`", inline=False) embed.add_field(name="Latency", value=f"`{latency}ms`", inline=True) embed.add_field(name="CPU Usage", value=f"`{cpu_usage}%`", inline=True) embed.add_field(name="Memory Usage", value=f"`{memory_usage}%`", inline=True) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Edit the initial message with the statistics embed await msg.edit(content=None, embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="List issues, requests, or servers") async def list(ctx, list_type): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return async def list_feedback(ctx, folder_path, title): files = os.listdir(folder_path) feedback_embed = discord.Embed(title=title, color=discord.Color.yellow()) for file in files: file_path = os.path.join(folder_path, file) with open(file_path, "r") as f: feedback_data = json.load(f) for feedback in feedback_data: author_name = feedback['author_name'] author_id = feedback['author_id'] timestamp = feedback['timestamp'] message = feedback['message'] feedback_embed.add_field( name=f"Author: {author_name} (ID: {author_id})", value=f"Timestamp: {timestamp}\nMessage: {message}", inline=False ) if feedback_embed.fields: feedback_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) feedback_embed.timestamp = ctx.message.created_at await ctx.reply(embed=feedback_embed) else: await ctx.reply("No feedback found.") if list_type.lower() == 'servers': # Create a list of embed fields for each server fields = [] for server in bot.guilds: joined_date = server.me.joined_at.strftime("%Y-%m-%d") field_name = server.name field_value = f"Joined ({joined_date})\nID: {server.id}" fields.append((field_name, field_value)) # Create an embed with the servers list embed = discord.Embed(title="Servers List", color=discord.Color.green()) # Add embed fields for each server for name, value in fields: embed.add_field(name=name, value=value, inline=False) # Set footer with requester's display name and timestamp embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.avatar.url if ctx.author.avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at # Send the embed as a reply await ctx.reply(embed=embed) elif list_type.lower() == 'issues': await list_feedback(ctx, ISSUES_FOLDER, "List of issues:") elif list_type.lower() == 'requests': await list_feedback(ctx, REQUESTS_FOLDER, "List of requests:") else: await ctx.reply("Invalid list type. Please choose 'servers', 'issues', or 'requests'.") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Update a Python package using pip.") async def update(ctx, pkg_name): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return try: # Send a message to indicate that the package update process has started msg = await ctx.reply(f"Updating package `{pkg_name}`...") # Run pip install command to upgrade the package process = subprocess.Popen(['pip', 'install', pkg_name, '--upgrade'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() # Check if any errors occurred during the update process if error: # If there was an error, update the message with the error information embed = discord.Embed(title="Package Update Failed", description=f"Failed to update package `{pkg_name}`.\nError: {error.decode('utf-8')}", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(embed=embed) else: # If the update was successful, update the message to indicate success embed = discord.Embed(title="Package Update Successful", description=f"Package `{pkg_name}` successfully updated.", color=discord.Color.green()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(embed=embed) except Exception as e: # If an exception occurs during the update process, update the message with the exception information embed = discord.Embed(title="An Error Occurred", description=f"An error occurred while updating package `{pkg_name}`: {e}", color=discord.Color.red()) await msg.edit(embed=embed) # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Shows the IP address of the current server.") async def ip(ctx): # Check if the author is authorized to use this command if str(ctx.author.id) != AUTHORIZED_USER_ID: await ctx.reply("You are not authorized to use this command.") return try: # Send a message to indicate that the IP address is being fetched msg = await ctx.reply("Fetching IP address...") # Fetch the IP address using the ipify API response = requests.get('https://api.ipify.org') if response.status_code == 200: # If the request was successful, send the IP address as a reply embed = discord.Embed(title="IP Address", description=f"Your IP address is: `{response.text}`", color=discord.Color.green()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(embed=embed) else: # If the request failed, update the message to indicate failure embed = discord.Embed(title="Failed to Fetch IP Address", description="Failed to fetch IP address.", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(embed=embed) except Exception as e: # If an exception occurs during the process, update the message with the exception information embed = discord.Embed(title="An Error Occurred", description=f"An error occurred while fetching IP address: {e}", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(embed=embed) # UTILITY ADMIN_CMD ------------------------------------------------------ END # UTILITY MODERATOR_CMD ------------------------------------------------------ START # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Kick a member from the server.") async def kick(ctx, member: discord.Member, *, reason=None): # Check if the command invoker has the necessary permissions to kick members if ctx.author.guild_permissions.kick_members or ctx.author == ctx.guild.owner: # Check if the invoker has a higher role than the member to be kicked if ctx.author.top_role.position > member.top_role.position: try: # Attempt to send a direct message to the kicked member dm_message = f"You have been kicked from the server `{ctx.guild.name}`" if reason: dm_message += f" for the following reason: `{reason}`" await member.send(dm_message) except discord.Forbidden: # If unable to send a DM to the kicked member, inform the invoker await ctx.reply(f"Failed to send a direct message to {member}.") # Kick the member from the server await member.kick(reason=reason) # Send an embedded message confirming the member has been kicked embed = discord.Embed(title="Member Kicked", description=f"{member} has been kicked from the server.", color=discord.Color.green()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) else: # If the invoker does not have a higher role than the member, inform them await ctx.reply("You do not have permission to kick this member.") else: # If the invoker does not have the necessary permissions, inform them await ctx.reply("You do not have permission to use this command.") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Ban a member from the server.") async def ban(ctx, member: discord.Member, *, reason=None): # Check if the command invoker has the necessary permissions to ban members if ctx.author.guild_permissions.ban_members or ctx.author == ctx.guild.owner: # Check if the invoker has a higher role than the member to be banned if ctx.author.top_role.position > member.top_role.position: try: # Attempt to send a direct message to the banned member dm_message = f"You have been banned from the server `{ctx.guild.name}`" if reason: dm_message += f" for the following reason: `{reason}`" await member.send(dm_message) except discord.Forbidden: # If unable to send a DM to the banned member, inform the invoker await ctx.reply(f"Failed to send a direct message to {member}.") # Ban the member from the server await member.ban(reason=reason) # Send an embedded message confirming the member has been banned embed = discord.Embed(title="Member Banned", description=f"{member} has been banned from the server.", color=discord.Color.green()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) else: # If the invoker does not have a higher role than the member, inform them await ctx.reply("You do not have permission to ban this member.") else: # If the invoker does not have the necessary permissions, inform them await ctx.reply("You do not have permission to use this command.") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Timeout a member for a specified duration.") async def timeout(ctx, member: discord.Member, duration: int, *, reason=None): # Check if the author is authorized to use this command if ctx.author.guild_permissions.manage_roles or ctx.author == ctx.guild.owner: # Check if the invoker has a higher role than the member to be timed out if ctx.author.top_role.position > member.top_role.position: try: # Timeout duration in seconds timeout_duration = duration # Timeout the member by removing all roles await member.edit(roles=[], reason=reason) # Inform about the timeout embed = discord.Embed(title="Member Timed Out", description=f"`{member}` has been timed out for `{duration} seconds`.\nReason: `{reason}`", color=discord.Color.green()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) # Send a DM to the timed-out member dm_message = f"You have been timed out in the server `{ctx.guild.name}` for `{duration} seconds`" if reason: dm_message += f" for the following reason: `{reason}`" await member.send(dm_message) except Exception as e: # If an error occurs during the timeout process, inform the invoker embed = discord.Embed(title="Error", description=f"An error occurred while timing out `{member}`: {e}", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) return # Wait for the duration of the timeout await asyncio.sleep(timeout_duration) # Restore the member's roles after the timeout try: await member.edit(roles=member.roles, reason=reason) except Exception as e: # If an error occurs while restoring roles, inform the invoker print(f"An error occurred while restoring roles to `{member}`: {e}") return # Inform the member that their timeout has expired dm_message = f"Your timeout in the server `{ctx.guild.name}` for `{duration} seconds`" if reason: dm_message += f" for the following reason: `{reason}` has expired." try: await member.send(dm_message) except discord.Forbidden: # If unable to send a DM, inform the invoker embed = discord.Embed(title="Error", description=f"Failed to send DM to `{member}` to notify timeout expiration.", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await ctx.reply(embed=embed) else: # If the invoker does not have a higher role than the member, inform them await ctx.reply("You do not have permission to timeout this member.") else: # If the invoker does not have the necessary permissions, inform them await ctx.reply("You do not have permission to use this command.") # FUNCTION ------------------------------------------------------ SPLIT @bot.hybrid_command(help="Bulk deletes set amount of messages in guilds.") async def purge(ctx, amount: int): # Check if the command is invoked in a direct message channel if isinstance(ctx.channel, discord.DMChannel): # Inform that purging is not supported in direct messages embed = discord.Embed(title="Purge Error", description="Purging is not supported in direct messages.", color=discord.Color.red()) await ctx.reply(embed=embed) return # Check if the invoker has the necessary permissions to manage roles or is the guild owner if ctx.author.guild_permissions.manage_roles or ctx.author == ctx.guild.owner: try: # Send a message to indicate that purging is in progress msg = await ctx.reply(f"Purging `{amount}` messages...") try: # Use the before parameter to only purge messages before the command is invoked await ctx.channel.purge(limit=amount, before=ctx.message) # Update the message to indicate successful purging embed = discord.Embed(title="Purge Successful", description=f"`{amount}` messages purged successfully.", color=discord.Color.green()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content="", embed=embed) # Edit the message to remove content and set the new embed except discord.Forbidden: # If the bot doesn't have permission to delete messages, inform the invoker embed = discord.Embed(title="Purge Error", description="I don't have permission to delete messages in this channel.", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content="", embed=embed) # Edit the message to remove content and set the new embed except discord.HTTPException: # If an error occurs during purging, inform the invoker embed = discord.Embed(title="Purge Error", description="Failed to purge messages due to an error.", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content="", embed=embed) # Edit the message to remove content and set the new embed except Exception as e: # If an error occurs during the process, inform the invoker embed = discord.Embed(title="Error", description=f"An error occurred: {e}", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content="", embed=embed) return else: # If the invoker doesn't have the necessary permissions, inform them embed = discord.Embed(title="Permission Denied", description="You do not have permission to use this command.", color=discord.Color.red()) embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) embed.timestamp = ctx.message.created_at await msg.edit(content="", embed=embed) # UTILITY MODERATOR_CMD ------------------------------------------------------ END # RUN PYDVPL ------------------------------------------------------ START bot.run(TOKEN) # RUN PYDVPL ------------------------------------------------------ END