Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import discord | |
| from discord.ext import commands, tasks | |
| from collections import defaultdict | |
| import json | |
| import os | |
| from dotenv import load_dotenv | |
| import os | |
| from huggingface_hub import HfApi, HfFolder | |
| from datetime import datetime, timedelta | |
| import pytz | |
| HfFolder.save_token(os.getenv("HUGGING_FACE_TOKEN")) # Save the token for authentication | |
| # Calculate the sleep time until the next midnight in IST | |
| ist_timezone = pytz.timezone('Asia/Kolkata') | |
| current_time = datetime.now(ist_timezone) | |
| next_midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
| sleep_time = int((next_midnight - current_time).total_seconds()) # Convert to integer | |
| api = HfApi() | |
| api.set_space_sleep_time(repo_id="bhagatsuryainatom/Vesperal", sleep_time=sleep_time) | |
| load_dotenv() | |
| intents = discord.Intents.default() | |
| intents.messages = True | |
| intents.message_content = True | |
| intents.voice_states = True | |
| intents.reactions = True | |
| bot = commands.Bot(command_prefix='!', intents=intents) | |
| # Place your bot's token here | |
| TOKEN = os.getenv('TOKEN') | |
| YOUR_CHANNEL_ID= os.getenv('YOUR_CHANNEL_ID') | |
| # Initialize the sessions and pending_messages dictionaries | |
| sessions = {} | |
| pending_messages = {} | |
| class Session: | |
| def __init__(self): | |
| self.camera_required = False | |
| self.time_without_video = defaultdict(int) | |
| self.warnings_given = defaultdict(int) | |
| self.exemptions = set() | |
| self.instructions_sent = False # Flag to track if instructions have been sent | |
| def to_dict(self): | |
| # Convert sets of Member IDs to lists for JSON serialization | |
| return { | |
| "camera_required": self.camera_required, | |
| "time_without_video": {str(member_id): time for member_id, time in self.time_without_video.items()}, | |
| "warnings_given": {str(member_id): warnings for member_id, warnings in self.warnings_given.items()}, | |
| "exemptions": list(self.exemptions) # Assuming this already stores member IDs | |
| } | |
| def from_dict(cls, data): | |
| session = cls() | |
| session.camera_required = data.get("camera_required", False) | |
| session.time_without_video = defaultdict(int, data.get("time_without_video", {})) | |
| session.warnings_given = defaultdict(int, data.get("warnings_given", {})) | |
| session.exemptions = set(data.get("exemptions", [])) | |
| session.instructions_sent = data.get("instructions_sent", False) | |
| return session | |
| def save_sessions(): | |
| with open('sessions.json', 'w') as f: | |
| json.dump({str(vc_id): session.to_dict() for vc_id, session in sessions.items()}, f, ensure_ascii=False, indent=4) | |
| def correct_and_load_json(path): | |
| try: | |
| with open(path, 'r') as file: | |
| return json.load(file) | |
| except json.JSONDecodeError: | |
| print("JSON format error detected. Attempting to correct.") | |
| with open(path, 'r') as file: | |
| file_content = file.read() | |
| corrected_content = file_content.replace("'", '"') # Replace single quotes with double quotes | |
| # Implement other corrections here as needed | |
| with open(path, 'w') as file: | |
| file.write(corrected_content) | |
| with open(path, 'r') as file: | |
| return json.load(file) # Attempt to load corrected content | |
| def load_sessions(): | |
| if os.path.exists('sessions.json') and os.path.getsize('sessions.json') > 0: | |
| try: | |
| session_data = correct_and_load_json('sessions.json') | |
| for vc_id, data in session_data.items(): | |
| sessions[int(vc_id)] = Session.from_dict(data) | |
| except json.JSONDecodeError: | |
| print("Failed to correct JSON file. Please check the file format manually.") | |
| else: | |
| print("Session file not found or is empty, starting fresh.") | |
| async def on_ready(): | |
| load_sessions() | |
| print(f'Logged in as {bot.user.name}') | |
| check_camera_status.start() | |
| # Ensure save_sessions_task starts properly within the on_ready event | |
| if not save_sessions_task.is_running(): | |
| save_sessions_task.start() | |
| async def test(ctx): | |
| await ctx.send("Test successful!") | |
| async def check_camera_status(): | |
| for vc_id, session in sessions.items(): | |
| if not session.camera_required: | |
| continue | |
| vc = bot.get_channel(vc_id) | |
| if vc is None: | |
| continue | |
| for member in vc.members: | |
| if member.id in session.exemptions: | |
| continue | |
| if not member.voice.self_video: | |
| session.time_without_video[member] += 30 | |
| if session.time_without_video[member] == 30: | |
| session.warnings_given[member] = 1 | |
| await member.send("Warning 1: Please turn on your camera within the next 30 seconds.") | |
| elif session.time_without_video[member] >= 60: | |
| session.warnings_given[member] = 2 | |
| await member.send("Final Warning: You are being removed for not turning on your camera.") | |
| await member.move_to(None) | |
| # Reset the member's session data | |
| session.time_without_video.pop(member, None) | |
| session.warnings_given.pop(member, None) | |
| else: | |
| # Reset the member's session data if their camera is on | |
| session.time_without_video.pop(member, None) | |
| session.warnings_given.pop(member, None) | |
| # Save sessions every 5 minutes | |
| async def save_sessions_task(): | |
| save_sessions() | |
| print("Sessions saved.") | |
| async def before_save_sessions(): | |
| await bot.wait_until_ready() | |
| async def on_voice_state_update(member, before, after): | |
| if member.bot: | |
| return | |
| if after.channel and (not before.channel or before.channel.id != after.channel.id): | |
| session = sessions.get(after.channel.id) | |
| if session is None: | |
| session = Session() | |
| sessions[after.channel.id] = session | |
| if any(role.name == "Admin" for role in member.roles): | |
| other_admins_in_channel = [m for m in after.channel.members if m != member and any(role.name == "Admin" for role in m.roles)] | |
| if not other_admins_in_channel: | |
| # Replace "YOUR_CHANNEL_ID" with your actual general channel ID | |
| general_channel_id = YOUR_CHANNEL_ID | |
| general_channel = member.guild.get_channel(int(general_channel_id)) | |
| if not general_channel: | |
| print(f"Couldn't find the channel with ID: {general_channel_id}.") | |
| return | |
| message_content = f"Admin {member.mention}, do you require cameras to be ON for this session in {after.channel.name}? React with 📷 for YES or ❌ for NO." | |
| try: | |
| message = await general_channel.send(message_content) | |
| pending_messages[message.id] = after.channel.id | |
| await message.add_reaction("📷") | |
| await message.add_reaction("❌") | |
| except discord.DiscordException as e: | |
| print(f"Failed to send message or add reactions due to: {e}") | |
| async def setperms(ctx, channel: discord.TextChannel = None): | |
| channel = channel or ctx.channel | |
| if not channel.permissions_for(ctx.guild.me).manage_channels: | |
| await ctx.send("I don't have permission to manage channels here.") | |
| return | |
| bot_member = ctx.guild.get_member(bot.user.id) | |
| overwrites = { | |
| ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False), | |
| bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True, manage_messages=True, add_reactions=True) | |
| } | |
| try: | |
| await channel.set_permissions(ctx.guild.default_role, read_messages=False) | |
| await channel.set_permissions(bot_member, **overwrites) | |
| await ctx.send(f"Permissions set for {channel.mention} successfully!") | |
| except Exception as e: | |
| await ctx.send(f"Failed to set permissions for {channel.mention} due to: {e}") | |
| async def on_reaction_add(reaction, user): | |
| await process_reaction(reaction, user) | |
| async def on_reaction_remove(reaction, user): | |
| await process_reaction(reaction, user) | |
| async def process_reaction(reaction, user): | |
| if reaction.message.id in pending_messages and not user.bot: | |
| channel_id = pending_messages[reaction.message.id] | |
| session = sessions.get(channel_id) | |
| if session and any(role.name == "Admin" for role in user.roles): | |
| if str(reaction.emoji) == "📷": | |
| session.camera_required = True | |
| session.exemptions.clear() | |
| elif str(reaction.emoji) == "❌": | |
| session.camera_required = False | |
| session.exemptions.clear() | |
| # Edit the message to reflect the updated session status | |
| await reaction.message.edit(content=f"Camera requirement for {reaction.message.channel.guild.get_channel(channel_id).name} has been {'UPDATED to ON' if session.camera_required else 'UPDATED to OFF'}.") | |
| # Send instructions only if they haven't been sent before | |
| if not session.instructions_sent: | |
| follow_up_message = ( | |
| "**Next Steps for Admins:**\n" | |
| "- Use `#camoff @user` to exempt any specific user from the camera requirement.\n" | |
| "- Use `#camoff` to exempt yourself if actively participating in the session.\n" | |
| "- Use `#end` to conclude the session and reset all settings.\n" | |
| "**Note:** These commands work within the context of the current voice session." | |
| ) | |
| await reaction.message.channel.send(follow_up_message) | |
| session.instructions_sent = True # Update the flag to prevent re-sending | |
| # Cleanup | |
| del pending_messages[reaction.message.id] | |
| def dummy_function(input_text): | |
| return "This is a dummy function for the Gradio UI." | |
| iface = gr.Interface(fn=dummy_function, | |
| inputs=gr.Textbox(label="Input"), | |
| outputs=gr.Textbox(label="Output"), | |
| title="Discord Bot", | |
| description="This Space runs a Discord bot in the background.") | |
| async def on_message(message): | |
| # Skip bot messages and non-admin users | |
| if message.author.bot or not any(role.name == "Admin" for role in message.author.roles): | |
| await bot.process_commands(message) # Ensure other commands are still processed | |
| return | |
| # Process commands only if the author is in a voice channel | |
| if message.author.voice: | |
| vc_id = message.author.voice.channel.id | |
| session = sessions.get(vc_id) | |
| if not session: | |
| await message.channel.send("No active session found for this voice channel.") | |
| await bot.process_commands(message) | |
| return | |
| # Admin commands | |
| if "#camoff" in message.content: | |
| # If no users are mentioned, exempt the author; otherwise, exempt mentioned users | |
| users_to_exempt = message.mentions if message.mentions else [message.author] | |
| for user in users_to_exempt: | |
| session.exemptions.add(user.id) | |
| await message.channel.send(f"{user.display_name} is now exempt from the camera requirement.") | |
| elif "#end" in message.content: | |
| # Clear exemptions and reset camera requirement | |
| session.exemptions.clear() | |
| session.camera_required = False | |
| await message.channel.send("Session ended: Camera requirements and exemptions have been reset.") | |
| await bot.process_commands(message) # Ensure this is at the end to process other commands | |
| # Process other commands | |
| await bot.process_commands(message) | |
| async def before_camera_check(): | |
| await bot.wait_until_ready() | |
| bot.run(TOKEN) | |
| iface.launch(inline=False, share=True) | |