Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import asyncio | |
| import psutil | |
| import discord | |
| import requests | |
| import socket | |
| import random | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from discord.ext import commands, tasks | |
| from pydvpl.dvpl import compress_dvpl, decompress_dvpl, __LZ4_VERSION__ | |
| import pydvpl.version | |
| import pytgpt.phind as phind | |
| import pytgpt.opengpt as opengpt | |
| import pytgpt.blackboxai as blackboxai | |
| import subprocess | |
| import pytgpt | |
| from craiyon import Craiyon, craiyon_utils | |
| from io import BytesIO | |
| import base64 | |
| import craiyon | |
| # ENV_LOADER ------------------------------------------------------ START | |
| load_dotenv() | |
| # ENV_LOADER ------------------------------------------------------ END | |
| # DEFINES ------------------------------------------------------ START | |
| BOT_VERSION = '1.4.5' | |
| AUTHORIZED_USER_ID = os.getenv('AUTHORIZED_ID') | |
| TOKEN = os.getenv('DISCORD_TOKEN') | |
| LOG_CHANNEL_ID = os.getenv('CHANNEL_ID') | |
| AUTHORIZED_AI_ACCESS_ID = os.getenv('AUTHORIZED_AI_ID') | |
| MY_SERVER_ID = os.getenv('MY_SERVER_ID') | |
| FEEDBACK_CHANNEL_ID = os.getenv('FEEDBACK_CHANNEL_ID') | |
| UPDATE_INTERVAL = 60 | |
| activity_start_time = 0 | |
| # Define folder paths | |
| FEEDBACK_FOLDER = "feedback" | |
| ISSUES_FOLDER = os.path.join(FEEDBACK_FOLDER, "issues") | |
| REQUESTS_FOLDER = os.path.join(FEEDBACK_FOLDER, "requests") | |
| COMPRESS_FOLDER= os.path.join("received_to_compress") | |
| DECOMPRESS_FOLDER= os.path.join("received_to_decompress") | |
| # DEFINES ------------------------------------------------------ END | |
| # HELPER ------------------------------------------------------ START | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| # Ensure the existence of folders | |
| def ensure_folders(): | |
| for folder in [FEEDBACK_FOLDER, ISSUES_FOLDER, REQUESTS_FOLDER, COMPRESS_FOLDER, DECOMPRESS_FOLDER]: | |
| if not os.path.exists(folder): | |
| os.makedirs(folder) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def clean_directory(ctx, directory): | |
| try: | |
| msg = await ctx.reply("Cleaning...") | |
| file_count = 0 | |
| for filename in os.listdir(directory): | |
| file_path = os.path.join(directory, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| file_count += 1 | |
| await msg.edit(content=f"All files cleaned successfully in {directory}. Total files cleaned: {file_count}.") | |
| except Exception as e: | |
| await msg.edit(content=f"An error occurred while cleaning files: {e}") | |
| # HELPER ------------------------------------------------------ END | |
| # CORE_PREFIX ------------------------------------------------------ START | |
| # Set up bot command prefix | |
| bot_intents = discord.Intents.all() | |
| # Initialize the bot with intents | |
| bot = commands.Bot(command_prefix=">>", intents=bot_intents) | |
| # CORE_PREFIX ------------------------------------------------------ END | |
| # CORE ------------------------------------------------------ START | |
| async def on_ready(): | |
| global activity_start_time | |
| activity_start_time = time.time() | |
| # Set initial activity to playing time | |
| await update_playing_time_activity() | |
| # Start the task to update playing time | |
| update_playing_time.start() | |
| await bot.tree.sync() | |
| print(f'{bot.user} has connected to Discord!') | |
| ensure_folders() | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def on_disconnect(): | |
| print('Bot disconnected from Discord. Attempting to reconnect...') | |
| await bot.change_presence(status=discord.Status.invisible) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def on_guild_join(guild): | |
| embed = discord.Embed( | |
| title="Thank You for Adding PyDVPL Bot! 💛", | |
| description="We're excited to be a part of your server. If you have any questions or need assistance, feel free to reach out to us!", | |
| color=discord.Color.yellow() | |
| ) | |
| embed.add_field(name="Feedback Commands:", value="`feedback` - Sends feedback to bot developers. (usage: >>feedback issues/requests <message>)", inline=False) | |
| embed.add_field(name="Help Commands:", value="Do `/help` or `>>help` to begin with PyDVPL!", inline=False) | |
| # embed.set_footer(value="Do `/help` or `>>help` to begin with PyDVPL!") | |
| # Check for a suitable channel to send the message | |
| public_updates_channel = guild.public_updates_channel | |
| default_channel = guild.system_channel or guild.text_channels[0] # Default to the first text channel | |
| if public_updates_channel: | |
| await public_updates_channel.send(embed=embed) | |
| else: | |
| if default_channel: | |
| await default_channel.send(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def on_command(ctx): | |
| # Log command usage | |
| log_channel = bot.get_channel(int(LOG_CHANNEL_ID)) | |
| if log_channel: | |
| if ctx.guild: | |
| log_message = f"Command `{ctx.command}` used by `{ctx.author} ({ctx.author.id})` in `{ctx.guild} ({ctx.guild.id})`" | |
| else: | |
| log_message = f"Command `{ctx.command}` used by `{ctx.author} ({ctx.author.id})` in {ctx.me.mention}'s `DM`" | |
| await log_channel.send(log_message) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def on_command_error(ctx, error): | |
| if isinstance(error, commands.CommandNotFound): | |
| await ctx.reply("Sorry, I couldn't find that command. Use `/help` or `>>help` to see the list of available commands.") | |
| else: | |
| print(f'An error occurred: {error}') | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def on_error(event, *args): | |
| print(f'An error occurred during event {event}: {args[0]}') | |
| await bot.change_presence(status=discord.Status.dnd) | |
| # CORE ------------------------------------------------------ END | |
| # ACTIVITY ------------------------------------------------------ START | |
| async def update_playing_time_activity(): | |
| global activity_start_time | |
| global playing_time_activity | |
| current_time = time.time() | |
| uptime_seconds = int(current_time - activity_start_time) | |
| minutes, seconds = divmod(uptime_seconds, 60) | |
| hours, minutes = divmod(minutes, 60) | |
| playing_time_activity = f"/help or >>help for commands list | online for {hours}h {minutes}m" #seconds {seconds}s | |
| await bot.change_presence(activity=discord.Game(name=playing_time_activity)) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def update_playing_time(): | |
| await update_playing_time_activity() | |
| # ACTIVITY ------------------------------------------------------ END | |
| # PYDVPL ------------------------------------------------------ START | |
| async def compress_file(file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| file_data = f.read() | |
| compressed_data = compress_dvpl(file_data) | |
| compressed_file_path = file_path + ".dvpl" # Example: Add .dvpl extension | |
| with open(compressed_file_path, "wb") as f: | |
| f.write(compressed_data) | |
| return compressed_file_path | |
| except Exception as e: | |
| print(f"Compression failed: {e}") | |
| return None | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def decompress_file(file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| file_data = f.read() | |
| decompressed_data = decompress_dvpl(file_data) | |
| decompressed_file_path = os.path.splitext(file_path)[0] # Example: Remove .dvpl extension | |
| with open(decompressed_file_path, "wb") as f: | |
| f.write(decompressed_data) | |
| return decompressed_file_path | |
| except Exception as e: | |
| print(f"Decompression failed: {e}") | |
| return None | |
| # PYDVPL ------------------------------------------------------ END | |
| # UTILITY USER_CMD ------------------------------------------------------ START | |
| bot.remove_command("help") | |
| async def help(ctx): | |
| # Create an embed to display the list of available commands | |
| embed = discord.Embed(title="Command List", description="Here's a list of available commands:", color=discord.Color.blue()) | |
| # Add fields for different categories of commands with their descriptions | |
| embed.add_field(name="PyDVPL Prefix:", value="`/` - Discord's slash command prefix.\n`>>` - PyDVPL default prefix.") | |
| embed.add_field(name="Dvpl Commands:", value="`compress` - Compresses files.\n`decompress` - Decompresses files.", inline=False) | |
| embed.add_field(name="User Utility Commands:", value="`ping` - Checks the bot's latency.\n`invite` - Get the bot's invite link.\n`uptime` - Displays the bot's uptime.\n`clock` - Shows the current date and time.\n`chat` - Chat with AI LLM models like 'phind', 'chatgpt' & 'blackboxai'.\n`coinflip - Plays a coinflip game with the bot`", inline=False) | |
| embed.add_field(name="Premium User Commands:", value="`chat` - Chat with AI LLM models like 'phind', 'chatgpt' & 'blackboxai'.\n`image` - Generate images with 'craiyon' AI.", inline=False) | |
| embed.add_field(name="Moderator Commands:", value="`kick` - Kick a member from the server.\n`ban` - Ban a member from the server.\n`timeout` - Timeout a member for a specified duration.\n`purge` - Bulk deletes set amount of messages in guilds.", inline=False) | |
| embed.add_field(name="Admin Utility Commands:", value="`stats` - Displays bot statistics.\n`list` - Lists server members.\n`say` - Make the bot say something.\n`announce` - Announces a message to a channel.\n`activity` - Sets bot activity status.\n`online` - Sets bot status to online.\n`dnd` - Sets bot status to Do Not Disturb.\n`offline` - Sets bot status to offline.\n`clean` - Cleans up bot's messages.\n`update` - Update a Python package using pip.\n`ip` - Shows the IP address of the current server.", inline=False) | |
| embed.add_field(name="Feedback Commands:", value="`feedback` - Sends feedback to bot developers. (usage: >>feedback issues/requests <message>)", inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed as an ephemeral message, visible only to the command invoker | |
| await ctx.reply(embed=embed, ephemeral=True) | |
| #await ctx.reply(f"The command `help` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def compress(ctx , attachments: discord.Attachment): | |
| attachments = ctx.message.attachments | |
| if not attachments: | |
| await ctx.reply("No file attached.") | |
| return | |
| for attachment in attachments: | |
| file_path = os.path.join(COMPRESS_FOLDER, attachment.filename) | |
| msg = await ctx.reply("Compressing...") | |
| # Check if the file is already in .dvpl format, if so, skip it | |
| if file_path.endswith(".dvpl"): | |
| embed = discord.Embed( | |
| title="Compression Skipped", | |
| description=f"File {attachment.filename} is already in .dvpl format.", | |
| color=discord.Color.gold() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| continue | |
| await attachment.save(file_path) | |
| compressed_file_path = await compress_file(file_path) | |
| if compressed_file_path: | |
| embed = discord.Embed( | |
| title="Compression Successful", | |
| description=f"File {attachment.filename} has been successfully compressed.", | |
| color=discord.Color.green() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| await ctx.reply(file=discord.File(compressed_file_path)) | |
| os.remove(file_path) # Delete the original file | |
| os.remove(compressed_file_path) # Delete the compressed file | |
| else: | |
| embed = discord.Embed( | |
| title="Compression Failed", | |
| description="Failed to compress the file.", | |
| color=discord.Color.red() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| async def decompress(ctx, attachments: discord.Attachment): | |
| attachments = ctx.message.attachments | |
| if not attachments: | |
| await ctx.reply("No file attached.") | |
| return | |
| for attachment in attachments: | |
| file_path = os.path.join(DECOMPRESS_FOLDER, attachment.filename) | |
| msg = await ctx.reply("Decompressing...") | |
| # Check if the file is not in .dvpl format, if so, skip it | |
| if not file_path.endswith(".dvpl"): | |
| embed = discord.Embed( | |
| title="Decompression Skipped", | |
| description=f"File {attachment.filename} is not in .dvpl format.", | |
| color=discord.Color.gold() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| continue | |
| await attachment.save(file_path) | |
| decompressed_file_path = await decompress_file(file_path) | |
| if decompressed_file_path: | |
| embed = discord.Embed( | |
| title="Decompression Successful", | |
| description=f"File {attachment.filename} has been successfully decompressed.", | |
| color=discord.Color.green() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| await ctx.reply(file=discord.File(decompressed_file_path)) | |
| os.remove(file_path) # Delete the original file | |
| os.remove(decompressed_file_path) # Delete the decompressed file | |
| else: | |
| embed = discord.Embed( | |
| title="Decompression Failed", | |
| description="Failed to decompress the file.", | |
| color=discord.Color.red() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| #await ctx.reply(f"The command `decompress` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def ping(ctx): | |
| start_time = time.monotonic() | |
| msg = await ctx.reply("Pinging...") | |
| # Measure WebSocket latency | |
| ws_latency = bot.latency * 1000 | |
| # Calculate elapsed time since sending the message | |
| end_time = time.monotonic() | |
| elapsed_time = (end_time - start_time) * 1000 | |
| # Create an embedded message | |
| embed = discord.Embed( | |
| title="Pong! 🏓", | |
| description=f"WebSocket latency: `{ws_latency:.2f}ms`\nResponse time: `{elapsed_time:.2f}ms`", | |
| color=discord.Color.blurple() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| #await ctx.reply(f"The command `ping` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def pinger(ctx, target): | |
| try: | |
| # Attempt to resolve the target to an IP address | |
| ip_address = socket.gethostbyname(target) | |
| msg = await ctx.reply("Pinging...") | |
| # Create a socket object | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| # Set a timeout for the connection attempt | |
| s.settimeout(5) | |
| # Record the start time | |
| start_time = time.time() | |
| # Connect to the target IP address on port 80 (HTTP) | |
| s.connect((ip_address, 80)) | |
| # Calculate the round trip time (RTT) | |
| rtt = (time.time() - start_time) * 1000 # Convert to milliseconds | |
| # Close the socket | |
| s.close() | |
| # Create an embedded message | |
| embed = discord.Embed( | |
| title="Ping Result", | |
| color=discord.Color.green() | |
| ) | |
| embed.add_field(name="Target", value=target, inline=False) | |
| embed.add_field(name="IP Address", value=ip_address, inline=False) # Add IP address field | |
| embed.add_field(name="Status", value="Successful 🟢", inline=False) | |
| embed.add_field(name="Latency", value=f"{rtt:.2f}ms", inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embedded message | |
| await msg.edit(content=None, embed=embed) | |
| except socket.gaierror: | |
| # Create an embedded message for resolution error | |
| embed = discord.Embed( | |
| title="Ping Result", | |
| description=f"Error: Unable to resolve {target} to an IP address. 🔴", | |
| color=discord.Color.red() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| except socket.error: | |
| # Create an embedded message for unreachable error | |
| embed = discord.Embed( | |
| title="Ping Result", | |
| description=f"Error: {target} is unreachable. 🔴", | |
| color=discord.Color.red() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def invite(ctx): | |
| permissions = discord.Permissions( | |
| read_messages=True, | |
| send_messages=True, | |
| embed_links=True, | |
| attach_files=True, | |
| read_message_history=True, | |
| mention_everyone=True, | |
| add_reactions=True, | |
| manage_guild=True, | |
| ban_members=True, | |
| kick_members=True, | |
| moderate_members=True, | |
| use_application_commands=True, | |
| manage_roles=True, | |
| administrator=True | |
| ) | |
| client_id = bot.user.id | |
| invite_link = discord.utils.oauth_url(client_id, permissions=permissions, scopes=['bot']) | |
| # Create an embedded message for the invite link | |
| invite_embed = discord.Embed( | |
| title="Invite Link", | |
| description=f"Click the button below to invite PyDVPL to your server", | |
| color=discord.Color.blue() | |
| ) | |
| invite_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| invite_embed.timestamp = ctx.message.created_at | |
| # Create a button for the invite link | |
| invite_button = discord.ui.Button( | |
| style=discord.ButtonStyle.link, | |
| label="Invite PyDVPL", | |
| url=invite_link | |
| ) | |
| # Create a View to hold the button | |
| view = discord.ui.View() | |
| view.add_item(invite_button) | |
| # Send the embed with the button to the user's DM | |
| try: | |
| await ctx.author.send( | |
| embed=invite_embed, | |
| view=view | |
| ) | |
| invite_confirmation_embed = discord.Embed( | |
| title="Invite Link Sent", | |
| description=f"The invite link has been sent to your DM, {ctx.author.mention}.", | |
| color=discord.Color.green() | |
| ) | |
| invite_confirmation_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| invite_confirmation_embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=invite_confirmation_embed) | |
| except discord.Forbidden: | |
| invite_error_embed = discord.Embed( | |
| title="Invite Link Sending Failed", | |
| description=f"Failed to send the invite link to your DM, {ctx.author.mention}. Please make sure your DMs are open.", | |
| color=discord.Color.red() | |
| ) | |
| invite_error_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| invite_error_embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=invite_error_embed) | |
| #await ctx.reply(f"The command `invite` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def feedback(ctx, category: str, *, message: str): | |
| if category.lower() not in ["requests", "issues"]: | |
| await ctx.reply("Invalid category. Please choose 'requests' or 'issues'.") | |
| return | |
| feedback_data = { | |
| "author_id": ctx.author.id, | |
| "author_name": ctx.author.name, | |
| "timestamp": str(ctx.message.created_at), | |
| "message": message | |
| } | |
| folder_path = REQUESTS_FOLDER if category.lower() == "requests" else ISSUES_FOLDER | |
| file_path = os.path.join(folder_path, f"{category.lower()}.json") | |
| try: | |
| with open(file_path, "r") as f: | |
| data = json.load(f) | |
| except FileNotFoundError: | |
| data = [] | |
| data.append(feedback_data) | |
| with open(file_path, "w") as f: | |
| json.dump(data, f, indent=4) | |
| await ctx.reply(f"Thank you for your {category.lower()}. It has been recorded.") | |
| # Create an Embed object for the feedback data | |
| embed = discord.Embed( | |
| title=f"New {category.lower()} feedback", | |
| description=f"**Author Name:** {feedback_data['author_name']}\n**Timestamp:** {feedback_data['timestamp']}\n**Message:** {feedback_data['message']}", | |
| color=discord.Color.yellow() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed to the specified server's feedback channel | |
| feedback_channel_id = bot.get_channel(int(FEEDBACK_CHANNEL_ID)) | |
| if feedback_channel_id: | |
| if feedback_channel_id: | |
| await feedback_channel_id.send(embed=embed) | |
| else: | |
| print("Feedback channel not found in the server.") | |
| else: | |
| print("Server not found.") | |
| #await ctx.reply(f"The command `feedback` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def uptime(ctx): | |
| global activity_start_time | |
| if activity_start_time == 0: | |
| bot_mention = ctx.me.mention | |
| await ctx.reply(f"{bot_mention}'s activity hasn't been set yet.") | |
| return | |
| current_time = time.time() | |
| uptime_seconds = int(current_time - activity_start_time) | |
| minutes, seconds = divmod(uptime_seconds, 60) | |
| hours, minutes = divmod(minutes, 60) | |
| bot_mention = ctx.me.mention | |
| # Create an embedded message for the uptime | |
| embed = discord.Embed( | |
| title="Bot Uptime", | |
| description=f"{bot_mention} has been playing for:", | |
| color=discord.Color.blue() | |
| ) | |
| embed.add_field(name="Hours", value=hours) | |
| embed.add_field(name="Minutes", value=minutes) | |
| embed.add_field(name="Seconds", value=seconds) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| #await ctx.reply(f"The command `uptime` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def version(ctx): | |
| # Create an embed message to display the bot version | |
| embed = discord.Embed( | |
| title="Bot Version", | |
| description="The current version of the bot and its dependencies.", | |
| color=discord.Color.gold() # Changed color to gold for a more visually appealing look | |
| ) | |
| bot_ver = { | |
| f"PyDVPL Bot Version": f"{BOT_VERSION}", | |
| } | |
| bot_ver_formatted = "\n".join([f"```{name}: {value}```" for name, value in bot_ver.items()]) | |
| # Bot version | |
| embed.add_field(name="**CORE**", value=bot_ver_formatted, inline=False) # Changed to bold and capitalized "CORE" | |
| # Add dependencies as child fields | |
| dependencies = { | |
| f"PyDVPL Lib Version": f"{pydvpl.version.__version__}", | |
| f"LZ4 PY Lib Version": f"{__LZ4_VERSION__}", | |
| f"Discord PY Version": f"{discord.__version__}", | |
| f"PyTGPT Lib Version": f"{pytgpt.__version__}", | |
| f"Craiyon Lib Version": f"{craiyon.__version__}" | |
| } | |
| # Format and layout dependencies | |
| dependencies_formatted = "\n".join([f"```{name}: {value}```" for name, value in dependencies.items()]) | |
| embed.add_field(name="**DEPENDENCIES**", value=dependencies_formatted, inline=False) # Changed to bold and capitalized "DEPENDENCIES" | |
| embed.set_footer( | |
| text=f"Requested by {ctx.author.display_name}", | |
| icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty | |
| ) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed message | |
| await ctx.reply(embed=embed) | |
| #await ctx.reply(f"The command `version` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def clock(ctx): | |
| # Get the current date and time | |
| now = datetime.now() | |
| # Format the date and time | |
| formatted_now = now.strftime("%Y-%m-%d %H:%M:%S") | |
| # Create an embed message to display the current date and time | |
| embed = discord.Embed( | |
| title="Current Date and Time", | |
| description=f"```\n{formatted_now}\n```", | |
| color=discord.Color.blue() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed message | |
| await ctx.reply(embed=embed) | |
| #await ctx.reply(f"The command `clock` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def chat(ctx, model, *, message): | |
| if str(ctx.author.id) not in AUTHORIZED_AI_ACCESS_ID: | |
| await ctx.reply("You are not authorized to use this command. Only `@rifsxd`'s server boosters & special permitted users can enjoy free `ChatGPT` access.") | |
| return | |
| msg = await ctx.reply("Responding...") | |
| if model.lower() == "phind": | |
| bot = phind.PHIND() | |
| elif model.lower() == "chatgpt": | |
| bot = opengpt.OPENGPT() | |
| elif model.lower() == "blackboxai": | |
| bot = blackboxai.BLACKBOXAI() | |
| else: | |
| await ctx.reply("Invalid model. Please choose 'phind' or 'chatgpt' or 'blackboxai'.") | |
| return | |
| response = bot.chat(message) | |
| if len(response) > 2000: | |
| half_length = len(response) // 2 | |
| first_half = response[:half_length] | |
| second_half = response[half_length:] | |
| embed = discord.Embed( | |
| title="Response", | |
| color=discord.Color.green() | |
| ) | |
| embed.add_field(name="First Half", value=f"{first_half}-") | |
| embed.add_field(name="Second Half", value=f"-{second_half}") | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| else: | |
| embed = discord.Embed( | |
| title="Response", | |
| description=response, | |
| color=discord.Color.green() | |
| ) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| #await ctx.reply(f"The command `chat` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def image(ctx, *, prompt: str): | |
| # Check if the user is authorized to use the command | |
| if str(ctx.author.id) not in AUTHORIZED_AI_ACCESS_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Inform the user that the generation process has started | |
| msg = await ctx.reply(f"Generating images for prompt: `{prompt}`. Please be patient...") | |
| try: | |
| generator = Craiyon() | |
| generated_images = await generator.async_generate(prompt) | |
| # Encode images to base64 | |
| b64_list = await craiyon_utils.async_encode_base64(generated_images.images) | |
| # Prepare images for sending | |
| images = [] | |
| for index, image_b64 in enumerate(b64_list): | |
| img_bytes = BytesIO(base64.b64decode(image_b64)) | |
| image_file = discord.File(img_bytes, filename=f"result{index}.webp") | |
| images.append(image_file) | |
| embed = discord.Embed( | |
| title="Generated Images", | |
| description=f"Sucessfully generated images for prompt: `{prompt}`.", | |
| color=discord.Color.green() | |
| ) | |
| # Add a field for each generated image | |
| for index, image in enumerate(images): | |
| embed.add_field(name=f"Image {index+1}", value=f"[Click here to view]({image.url})") | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content=None, embed=embed) | |
| except Exception as e: | |
| # Print the specific exception message for debugging | |
| print(f"An error occurred while generating images: {e}") | |
| await ctx.reply("An error occurred while generating images. Please try again later.") | |
| #await ctx.reply(f"The command `image` was executed by - {ctx.author.mention}") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def coinflip(ctx): | |
| emojis = ["🐈", "😸"] # Add more emojis if needed | |
| # Create an embedded message | |
| embed = discord.Embed(title="Coin Flip", description="React with 🐈 for tails or 😸 for heads!", color=discord.Colour.yellow()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| flip_msg = await ctx.send(embed=embed) | |
| # Add reactions for heads and tails | |
| for emoji in emojis: | |
| await flip_msg.add_reaction(emoji) | |
| # Wait for reactions | |
| def check(reaction, user): | |
| return user == ctx.author and str(reaction.emoji) in emojis | |
| try: | |
| reaction, _ = await bot.wait_for('reaction_add', timeout=30.0, check=check) | |
| user_choice = reaction.emoji | |
| except asyncio.TimeoutError: | |
| await ctx.send("You didn't react in time. Aborting...") | |
| return | |
| await asyncio.sleep(1) | |
| for i in range(3, 0, -1): | |
| embed.description = f"Flipping the coin in {i}..." | |
| await flip_msg.edit(embed=embed) | |
| await asyncio.sleep(1) | |
| # Randomly choose the result | |
| result = random.choice(emojis) | |
| embed.description = f"The result is **{result}**!" | |
| if result == user_choice: | |
| winner = ctx.author.display_name | |
| else: | |
| winner = "The bot" | |
| embed.set_footer(text=f"Winner: {winner}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| await flip_msg.edit(embed=embed) | |
| # UTILITY USER_CMD ------------------------------------------------------ END | |
| # UTILITY ADMIN_CMD ------------------------------------------------------ START | |
| async def clean(ctx, clean_mode): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| embed = discord.Embed(title="File Cleaning Command", color=0x00ff00) # Green color for the embed | |
| if clean_mode.lower() == 'compress': | |
| await clean_directory(ctx, COMPRESS_FOLDER) | |
| embed.add_field(name="Action", value="Compress Folder Cleaning", inline=False) | |
| elif clean_mode.lower() == 'decompress': | |
| await clean_directory(ctx, DECOMPRESS_FOLDER) | |
| embed.add_field(name="Action", value="Decompress Folder Cleaning", inline=False) | |
| elif clean_mode.lower() == 'issues': | |
| await clean_directory(ctx, ISSUES_FOLDER) | |
| embed.add_field(name="Action", value="Issues Folder Cleaning", inline=False) | |
| elif clean_mode.lower() == 'requests': | |
| await clean_directory(ctx, REQUESTS_FOLDER) | |
| embed.add_field(name="Action", value="Requests Folder Cleaning", inline=False) | |
| elif clean_mode.lower() == 'feedback': | |
| await clean_directory(ctx, REQUESTS_FOLDER) | |
| await clean_directory(ctx, ISSUES_FOLDER) | |
| embed.add_field(name="Action", value="Feedback Folders Cleaning", inline=False) | |
| elif clean_mode.lower() == 'dvpl': | |
| await clean_directory(ctx, DECOMPRESS_FOLDER) | |
| await clean_directory(ctx, COMPRESS_FOLDER) | |
| embed.add_field(name="Action", value="DVPL Folders Cleaning", inline=False) | |
| else: | |
| embed.add_field(name="Error", value="Invalid clean mode. Please choose 'compress', 'decompress', 'issues', or 'requests'.", inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| return | |
| embed.add_field(name="Status", value="Cleaning completed successfully.", inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def online(ctx): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Your playing_time_activity variable should be defined somewhere | |
| playing_time_activity = "Your default activity here" | |
| # Change the bot's presence | |
| await bot.change_presence(activity=discord.Game(name=playing_time_activity)) | |
| # Prepare an embed for response | |
| embed = discord.Embed(title="Bot Presence Update", color=0x00ff00) # Green color for the embed | |
| embed.add_field(name="Status", value="Bot's presence set to online with activity: " + playing_time_activity, inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed as a reply | |
| await ctx.reply(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def dnd(ctx): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Change the bot's presence to Do Not Disturb | |
| await bot.change_presence(status=discord.Status.dnd) | |
| # Prepare an embed for response | |
| embed = discord.Embed(title="Bot Presence Update", color=0xff0000) # Red color for the embed | |
| embed.add_field(name="Status", value="Bot's presence set to Do Not Disturb", inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed as a reply | |
| await ctx.reply(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def offline(ctx): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Change the bot's presence to offline | |
| await bot.change_presence(status=discord.Status.offline) | |
| # Prepare an embed for response | |
| embed = discord.Embed(title="Bot Presence Update", color=0x000000) # Black color for the embed | |
| embed.add_field(name="Status", value="Bot's presence set to offline", inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed as a reply | |
| await ctx.reply(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def activity(ctx, activity_type, *, activity_text=None): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Check if a valid activity type is provided | |
| valid_activity_types = ['listening', 'watching', 'playing'] | |
| if activity_type.lower() not in valid_activity_types: | |
| embed = discord.Embed(title="Invalid Activity Type", color=0xff0000) # Red color for the embed | |
| embed.add_field(name="Error", value='Invalid activity type. Please choose either "listening", "playing" or "watching".', inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| return | |
| # Check if activity text is provided | |
| if not activity_text: | |
| embed = discord.Embed(title="Activity Text Missing", color=0xff0000) # Red color for the embed | |
| embed.add_field(name="Error", value='Please provide an activity text.', inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| return | |
| # Determine the activity based on the provided type | |
| if activity_type.lower() == 'listening': | |
| activity = discord.Activity(type=discord.ActivityType.listening, name=activity_text) | |
| elif activity_type.lower() == 'watching': | |
| activity = discord.Activity(type=discord.ActivityType.watching, name=activity_text) | |
| elif activity_type.lower() == 'playing': | |
| activity = discord.Activity(type=discord.ActivityType.playing, name=activity_text) | |
| # Change the bot's presence | |
| await bot.change_presence(activity=activity) | |
| # Prepare an embed for response | |
| embed = discord.Embed(title="Bot Activity Update", color=0x00ff00) # Green color for the embed | |
| embed.add_field(name="Status", value=f'Activity set to {activity_type.capitalize()} {activity_text}', inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed as a reply | |
| await ctx.reply(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def say(ctx, destination_type, *, id, message): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Define function to send reply and return | |
| async def send_reply(reply): | |
| embed = discord.Embed(title="Message Sent", color=0x00ff00) # Green color for the embed | |
| embed.add_field(name="Status", value=reply, inline=False) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| # Check destination type and send message accordingly | |
| if destination_type.lower() == 'channel': | |
| try: | |
| channel_id = int(id) | |
| channel = bot.get_channel(channel_id) | |
| if channel: | |
| await channel.send(message) | |
| await send_reply(f"Message `{message}` sent to channel `{channel.name}`.") | |
| else: | |
| await send_reply("Channel not found.") | |
| except ValueError: | |
| await send_reply("Channel ID must be a valid integer.") | |
| elif destination_type.lower() == 'server': | |
| try: | |
| server_id = int(id) | |
| server = bot.get_guild(server_id) | |
| if server: | |
| announcement_channel = discord.utils.get(server.channels, name="important-announcements") | |
| if announcement_channel: | |
| await announcement_channel.send(message) | |
| await send_reply(f"Message `{message}` sent to server `{server.name}` in `important-announcements` channel.") | |
| else: | |
| announcements_channel = discord.utils.get(server.channels, name="announcements") | |
| if announcements_channel: | |
| await announcements_channel.send(message) | |
| await send_reply(f"Message `{message}` sent to server `{server.name}` in `announcements` channel.") | |
| else: | |
| public_updates_channel = server.public_updates_channel | |
| if public_updates_channel: | |
| await public_updates_channel.send(message) | |
| await send_reply(f"Message `{message}` sent to server `{server.name}` in `public updates` channel.") | |
| else: | |
| default_channel = server.system_channel or server.text_channels[0] # Default to the first text channel | |
| if default_channel: | |
| await default_channel.send(message) | |
| await send_reply(f"Message `{message}` sent to server `{server.name}` in the default channel.") | |
| else: | |
| await send_reply("No announcement channels found in the server. Also, no default `public updates` neither `default` channels found.") | |
| else: | |
| await send_reply("Server not found.") | |
| except ValueError: | |
| await send_reply("Server ID must be a valid integer.") | |
| elif destination_type.lower() == 'user': | |
| try: | |
| user_id = int(id) | |
| user = bot.get_user(user_id) | |
| if user: | |
| try: | |
| await user.send(message) | |
| await send_reply(f"Message `{message}` sent to user '{user.display_name}'.") | |
| except discord.Forbidden: | |
| await send_reply(f"Couldn't send message to `{user.display_name}`. Their DMs are closed.") | |
| else: | |
| await send_reply("User not found.") | |
| except ValueError: | |
| await send_reply("User ID must be a valid integer.") | |
| else: | |
| await send_reply("Invalid destination type. Use either 'channel', 'server', or 'user'.") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def announce(ctx, *, message): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Prepare an embed for the announcement message | |
| embed = discord.Embed(title="Announcement!", description=message, color=discord.Color.yellow()) | |
| # Iterate through all servers the bot is in and send the announcement | |
| for server in bot.guilds: | |
| # First, check for an "important-announcements" channel | |
| announcement_channel = discord.utils.get(server.channels, name="important-announcements") | |
| if announcement_channel: | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await announcement_channel.send(embed=embed) | |
| await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `important-announcements` channel.") | |
| else: | |
| # If no "important-announcements" channel, check for "announcements" channel | |
| announcements_channel = discord.utils.get(server.channels, name="announcements") | |
| if announcements_channel: | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await announcements_channel.send(embed=embed) | |
| await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `announcements` channel.") | |
| else: | |
| # If neither channel exists, send to the default channel or public updates channel | |
| public_updates_channel = server.public_updates_channel | |
| if public_updates_channel: | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await public_updates_channel.send(embed=embed) | |
| await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `public updates` channel.") | |
| else: | |
| default_channel = server.system_channel or server.text_channels[0] | |
| if default_channel: | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await default_channel.send(embed=embed) | |
| await ctx.reply(f"Message `{message}` sent to server `{server.name}` in `defaults` channel.") | |
| else: | |
| await ctx.reply(f"No suitable announcement channel found in server `{server.name}`.") | |
| # Delete the original command message | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def stats(ctx): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| # Send a message to indicate that statistics are being fetched | |
| msg = await ctx.reply("Fetching statistics...") | |
| # Get bot's statistics | |
| guild_count = len(bot.guilds) | |
| member_count = len(set(bot.get_all_members())) | |
| uptime_seconds = int(time.time() - activity_start_time) | |
| uptime_minutes, uptime_seconds = divmod(uptime_seconds, 60) | |
| uptime_hours, uptime_minutes = divmod(uptime_minutes, 60) | |
| uptime_days, uptime_hours = divmod(uptime_hours, 24) | |
| latency = round(bot.latency * 1000) # latency in milliseconds | |
| # Get CPU usage | |
| cpu_usage = psutil.cpu_percent(interval=None) | |
| # Get memory usage | |
| memory_usage = psutil.virtual_memory().percent | |
| # Prepare an embed for the statistics | |
| embed = discord.Embed(title="Bot Statistics & Vitals", color=discord.Color.blue()) | |
| embed.add_field(name="Servers", value=f"`{guild_count}`", inline=True) | |
| embed.add_field(name="Members", value=f"`{member_count}`", inline=True) | |
| embed.add_field(name="Uptime", value=f"`{uptime_days} days`, `{uptime_hours} hours`, `{uptime_minutes} minutes`, `{uptime_seconds} seconds`", inline=False) | |
| embed.add_field(name="Latency", value=f"`{latency}ms`", inline=True) | |
| embed.add_field(name="CPU Usage", value=f"`{cpu_usage}%`", inline=True) | |
| embed.add_field(name="Memory Usage", value=f"`{memory_usage}%`", inline=True) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Edit the initial message with the statistics embed | |
| await msg.edit(content=None, embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def list(ctx, list_type): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| async def list_feedback(ctx, folder_path, title): | |
| files = os.listdir(folder_path) | |
| feedback_embed = discord.Embed(title=title, color=discord.Color.yellow()) | |
| for file in files: | |
| file_path = os.path.join(folder_path, file) | |
| with open(file_path, "r") as f: | |
| feedback_data = json.load(f) | |
| for feedback in feedback_data: | |
| author_name = feedback['author_name'] | |
| author_id = feedback['author_id'] | |
| timestamp = feedback['timestamp'] | |
| message = feedback['message'] | |
| feedback_embed.add_field( | |
| name=f"Author: {author_name} (ID: {author_id})", | |
| value=f"Timestamp: {timestamp}\nMessage: {message}", | |
| inline=False | |
| ) | |
| if feedback_embed.fields: | |
| feedback_embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| feedback_embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=feedback_embed) | |
| else: | |
| await ctx.reply("No feedback found.") | |
| if list_type.lower() == 'servers': | |
| # Create a list of embed fields for each server | |
| fields = [] | |
| for server in bot.guilds: | |
| joined_date = server.me.joined_at.strftime("%Y-%m-%d") | |
| field_name = server.name | |
| field_value = f"Joined ({joined_date})\nID: {server.id}" | |
| fields.append((field_name, field_value)) | |
| # Create an embed with the servers list | |
| embed = discord.Embed(title="Servers List", color=discord.Color.green()) | |
| # Add embed fields for each server | |
| for name, value in fields: | |
| embed.add_field(name=name, value=value, inline=False) | |
| # Set footer with requester's display name and timestamp | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.avatar.url if ctx.author.avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| # Send the embed as a reply | |
| await ctx.reply(embed=embed) | |
| elif list_type.lower() == 'issues': | |
| await list_feedback(ctx, ISSUES_FOLDER, "List of issues:") | |
| elif list_type.lower() == 'requests': | |
| await list_feedback(ctx, REQUESTS_FOLDER, "List of requests:") | |
| else: | |
| await ctx.reply("Invalid list type. Please choose 'servers', 'issues', or 'requests'.") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def update(ctx, pkg_name): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| try: | |
| # Send a message to indicate that the package update process has started | |
| msg = await ctx.reply(f"Updating package `{pkg_name}`...") | |
| # Run pip install command to upgrade the package | |
| process = subprocess.Popen(['pip', 'install', pkg_name, '--upgrade'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| output, error = process.communicate() | |
| # Check if any errors occurred during the update process | |
| if error: | |
| # If there was an error, update the message with the error information | |
| embed = discord.Embed(title="Package Update Failed", description=f"Failed to update package `{pkg_name}`.\nError: {error.decode('utf-8')}", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(embed=embed) | |
| else: | |
| # If the update was successful, update the message to indicate success | |
| embed = discord.Embed(title="Package Update Successful", description=f"Package `{pkg_name}` successfully updated.", color=discord.Color.green()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(embed=embed) | |
| except Exception as e: | |
| # If an exception occurs during the update process, update the message with the exception information | |
| embed = discord.Embed(title="An Error Occurred", description=f"An error occurred while updating package `{pkg_name}`: {e}", color=discord.Color.red()) | |
| await msg.edit(embed=embed) | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def ip(ctx): | |
| # Check if the author is authorized to use this command | |
| if str(ctx.author.id) != AUTHORIZED_USER_ID: | |
| await ctx.reply("You are not authorized to use this command.") | |
| return | |
| try: | |
| # Send a message to indicate that the IP address is being fetched | |
| msg = await ctx.reply("Fetching IP address...") | |
| # Fetch the IP address using the ipify API | |
| response = requests.get('https://api.ipify.org') | |
| if response.status_code == 200: | |
| # If the request was successful, send the IP address as a reply | |
| embed = discord.Embed(title="IP Address", description=f"Your IP address is: `{response.text}`", color=discord.Color.green()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(embed=embed) | |
| else: | |
| # If the request failed, update the message to indicate failure | |
| embed = discord.Embed(title="Failed to Fetch IP Address", description="Failed to fetch IP address.", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(embed=embed) | |
| except Exception as e: | |
| # If an exception occurs during the process, update the message with the exception information | |
| embed = discord.Embed(title="An Error Occurred", description=f"An error occurred while fetching IP address: {e}", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(embed=embed) | |
| # UTILITY ADMIN_CMD ------------------------------------------------------ END | |
| # UTILITY MODERATOR_CMD ------------------------------------------------------ START | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def kick(ctx, member: discord.Member, *, reason=None): | |
| # Check if the command invoker has the necessary permissions to kick members | |
| if ctx.author.guild_permissions.kick_members or ctx.author == ctx.guild.owner: | |
| # Check if the invoker has a higher role than the member to be kicked | |
| if ctx.author.top_role.position > member.top_role.position: | |
| try: | |
| # Attempt to send a direct message to the kicked member | |
| dm_message = f"You have been kicked from the server `{ctx.guild.name}`" | |
| if reason: | |
| dm_message += f" for the following reason: `{reason}`" | |
| await member.send(dm_message) | |
| except discord.Forbidden: | |
| # If unable to send a DM to the kicked member, inform the invoker | |
| await ctx.reply(f"Failed to send a direct message to {member}.") | |
| # Kick the member from the server | |
| await member.kick(reason=reason) | |
| # Send an embedded message confirming the member has been kicked | |
| embed = discord.Embed(title="Member Kicked", description=f"{member} has been kicked from the server.", color=discord.Color.green()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| else: | |
| # If the invoker does not have a higher role than the member, inform them | |
| await ctx.reply("You do not have permission to kick this member.") | |
| else: | |
| # If the invoker does not have the necessary permissions, inform them | |
| await ctx.reply("You do not have permission to use this command.") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def ban(ctx, member: discord.Member, *, reason=None): | |
| # Check if the command invoker has the necessary permissions to ban members | |
| if ctx.author.guild_permissions.ban_members or ctx.author == ctx.guild.owner: | |
| # Check if the invoker has a higher role than the member to be banned | |
| if ctx.author.top_role.position > member.top_role.position: | |
| try: | |
| # Attempt to send a direct message to the banned member | |
| dm_message = f"You have been banned from the server `{ctx.guild.name}`" | |
| if reason: | |
| dm_message += f" for the following reason: `{reason}`" | |
| await member.send(dm_message) | |
| except discord.Forbidden: | |
| # If unable to send a DM to the banned member, inform the invoker | |
| await ctx.reply(f"Failed to send a direct message to {member}.") | |
| # Ban the member from the server | |
| await member.ban(reason=reason) | |
| # Send an embedded message confirming the member has been banned | |
| embed = discord.Embed(title="Member Banned", description=f"{member} has been banned from the server.", color=discord.Color.green()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| else: | |
| # If the invoker does not have a higher role than the member, inform them | |
| await ctx.reply("You do not have permission to ban this member.") | |
| else: | |
| # If the invoker does not have the necessary permissions, inform them | |
| await ctx.reply("You do not have permission to use this command.") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def timeout(ctx, member: discord.Member, duration: int, *, reason=None): | |
| # Check if the author is authorized to use this command | |
| if ctx.author.guild_permissions.manage_roles or ctx.author == ctx.guild.owner: | |
| # Check if the invoker has a higher role than the member to be timed out | |
| if ctx.author.top_role.position > member.top_role.position: | |
| try: | |
| # Timeout duration in seconds | |
| timeout_duration = duration | |
| # Timeout the member by removing all roles | |
| await member.edit(roles=[], reason=reason) | |
| # Inform about the timeout | |
| embed = discord.Embed(title="Member Timed Out", description=f"`{member}` has been timed out for `{duration} seconds`.\nReason: `{reason}`", color=discord.Color.green()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| # Send a DM to the timed-out member | |
| dm_message = f"You have been timed out in the server `{ctx.guild.name}` for `{duration} seconds`" | |
| if reason: | |
| dm_message += f" for the following reason: `{reason}`" | |
| await member.send(dm_message) | |
| except Exception as e: | |
| # If an error occurs during the timeout process, inform the invoker | |
| embed = discord.Embed(title="Error", description=f"An error occurred while timing out `{member}`: {e}", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| return | |
| # Wait for the duration of the timeout | |
| await asyncio.sleep(timeout_duration) | |
| # Restore the member's roles after the timeout | |
| try: | |
| await member.edit(roles=member.roles, reason=reason) | |
| except Exception as e: | |
| # If an error occurs while restoring roles, inform the invoker | |
| print(f"An error occurred while restoring roles to `{member}`: {e}") | |
| return | |
| # Inform the member that their timeout has expired | |
| dm_message = f"Your timeout in the server `{ctx.guild.name}` for `{duration} seconds`" | |
| if reason: | |
| dm_message += f" for the following reason: `{reason}` has expired." | |
| try: | |
| await member.send(dm_message) | |
| except discord.Forbidden: | |
| # If unable to send a DM, inform the invoker | |
| embed = discord.Embed(title="Error", description=f"Failed to send DM to `{member}` to notify timeout expiration.", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await ctx.reply(embed=embed) | |
| else: | |
| # If the invoker does not have a higher role than the member, inform them | |
| await ctx.reply("You do not have permission to timeout this member.") | |
| else: | |
| # If the invoker does not have the necessary permissions, inform them | |
| await ctx.reply("You do not have permission to use this command.") | |
| # FUNCTION ------------------------------------------------------ SPLIT | |
| async def purge(ctx, amount: int): | |
| # Check if the command is invoked in a direct message channel | |
| if isinstance(ctx.channel, discord.DMChannel): | |
| # Inform that purging is not supported in direct messages | |
| embed = discord.Embed(title="Purge Error", description="Purging is not supported in direct messages.", color=discord.Color.red()) | |
| await ctx.reply(embed=embed) | |
| return | |
| # Check if the invoker has the necessary permissions to manage roles or is the guild owner | |
| if ctx.author.guild_permissions.manage_roles or ctx.author == ctx.guild.owner: | |
| try: | |
| # Send a message to indicate that purging is in progress | |
| msg = await ctx.reply(f"Purging `{amount}` messages...") | |
| try: | |
| # Use the before parameter to only purge messages before the command is invoked | |
| await ctx.channel.purge(limit=amount, before=ctx.message) | |
| # Update the message to indicate successful purging | |
| embed = discord.Embed(title="Purge Successful", description=f"`{amount}` messages purged successfully.", color=discord.Color.green()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content="", embed=embed) # Edit the message to remove content and set the new embed | |
| except discord.Forbidden: | |
| # If the bot doesn't have permission to delete messages, inform the invoker | |
| embed = discord.Embed(title="Purge Error", description="I don't have permission to delete messages in this channel.", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content="", embed=embed) # Edit the message to remove content and set the new embed | |
| except discord.HTTPException: | |
| # If an error occurs during purging, inform the invoker | |
| embed = discord.Embed(title="Purge Error", description="Failed to purge messages due to an error.", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content="", embed=embed) # Edit the message to remove content and set the new embed | |
| except Exception as e: | |
| # If an error occurs during the process, inform the invoker | |
| embed = discord.Embed(title="Error", description=f"An error occurred: {e}", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content="", embed=embed) | |
| return | |
| else: | |
| # If the invoker doesn't have the necessary permissions, inform them | |
| embed = discord.Embed(title="Permission Denied", description="You do not have permission to use this command.", color=discord.Color.red()) | |
| embed.set_footer(text=f"Requested by {ctx.author.display_name}", icon_url=ctx.author.display_avatar.url if ctx.author.display_avatar else discord.Embed.Empty) | |
| embed.timestamp = ctx.message.created_at | |
| await msg.edit(content="", embed=embed) | |
| # UTILITY MODERATOR_CMD ------------------------------------------------------ END | |
| # RUN PYDVPL ------------------------------------------------------ START | |
| bot.run(TOKEN) | |
| # RUN PYDVPL ------------------------------------------------------ END |