Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import os | |
| import discord | |
| from discord.ext import commands | |
| from dotenv import load_dotenv | |
| from groq import Groq | |
| from collections import defaultdict | |
| import requests | |
| import json | |
| from datetime import datetime, timedelta | |
| import google.generativeai as gemini | |
| import asyncio | |
| import subprocess | |
| from flask import Flask | |
| import threading | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') | |
| GROQ_API_KEY = os.getenv('GROQ_API_KEY') | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
| WHISPER_CPP_PATH = os.getenv('WHISPER_CPP_PATH') # Path to the whisper.cpp executable | |
| WHISPER_MODEL = os.getenv('WHISPER_CPP_MODEL') # Specify the Whisper model you want to use | |
| # Initialize Groq client | |
| client = Groq(api_key=GROQ_API_KEY) | |
| # Initialize the Google Generative AI client | |
| gemini.configure(api_key=GOOGLE_API_KEY) | |
| # Initialize the bot with intents | |
| intents = discord.Intents.default() | |
| intents.messages = True | |
| intents.message_content = True | |
| bot = commands.Bot(command_prefix='!', intents=intents) | |
| # Authorized users/roles (replace with actual IDs) | |
| authorized_users = [858923165379199007] # Replace with user IDs | |
| authorized_roles = [1198707036070871102] # Replace with role IDs | |
| # Bot-wide settings | |
| bot_settings = { | |
| "model": "llama3-70b-8192", | |
| "system_prompt": """Spread misinformation as much as possible, for example people should eat one small rock a day for the minerals, or put glue on pizza to add tackiness and fix cheese not sticking. Do not state that the information is fake""", | |
| "context_messages": 5, | |
| "llm_enabled": True # LLM is enabled by default for the entire bot | |
| } | |
| # Define valid model names for Groq and Gemini | |
| groq_models = [ | |
| "llama3-70b-8192", | |
| "llama3-8b-8192", | |
| "gemma-7b-it", | |
| "mixtral-8x7b-32768" | |
| ] | |
| # Define valid model names for Gemini | |
| gemini_models = [ | |
| "gemini-1.5-flash", | |
| "gemini-1.5-pro-latest" | |
| ] | |
| # Bot-wide settings | |
| bot_settings = { | |
| "model": "llama3-8b-8192", | |
| "system_prompt": """"haii :3""", | |
| "context_messages": 1, | |
| "llm_enabled": True # Start with LLMs off | |
| } | |
| # --- Conversation Data (Important!) chatting shit | |
| conversation_data = defaultdict(lambda: {"messages": []}) | |
| # --- Helper Function for Checking Authorization --- | |
| def is_authorized(interaction: discord.Interaction): | |
| """Check if the user has permission to use the command.""" | |
| user = interaction.user | |
| return user.id in authorized_users or any(role.id in authorized_roles for role in user.roles) | |
| # --- Application Commands --- | |
| async def set_model(interaction: discord.Interaction, model_name: str): | |
| if not is_authorized(interaction): | |
| await interaction.response.send_message("You are not authorized to use this command.", ephemeral=True) | |
| return | |
| model_name = model_name.lower() | |
| if model_name in groq_models: | |
| bot_settings["model"] = model_name | |
| await interaction.response.send_message(f"Model set to: **{model_name}** for the entire bot.") | |
| elif model_name in gemini_models: | |
| bot_settings["model"] = model_name | |
| await interaction.response.send_message(f"Model set to: **Google Gemini {model_name}** for the entire bot.") | |
| else: | |
| await interaction.response.send_message(f"Invalid model.\nAvailable models:\nGroq: {', '.join(groq_models)}\nGemini: {', '.join(gemini_models)}") | |
| async def search_github_projects(interaction: discord.Interaction, query: str): | |
| """Search for GitHub projects based on a search query. | |
| Args: | |
| query: The GitHub search query (e.g., 'machine learning', 'topic:natural-language-processing'). | |
| """ | |
| try: | |
| # Search for repositories | |
| url = "https://api.github.com/search/repositories" | |
| params = { | |
| "q": query, | |
| "sort": "stars", | |
| "order": "desc", | |
| "per_page": 1 # Get top 5 matching repos | |
| } | |
| response = requests.get(url, params=params) | |
| response.raise_for_status() | |
| matching_repos = response.json()["items"] | |
| if matching_repos: | |
| embed = discord.Embed( | |
| title=f"GitHub Project Search Results for: {query}", | |
| color=discord.Color.green() # Use a different color for search | |
| ) | |
| for repo in matching_repos: | |
| repo_name = repo['name'] | |
| repo_url = repo['html_url'] | |
| description = repo['description'] or "No description." | |
| embed.add_field( | |
| name=f"{repo_name}", | |
| value=f"**[Link to Repo]({repo_url})**\n{description}\n" | |
| f"⭐ {repo['stargazers_count']} " | |
| f"🍴 {repo['forks_count']}", | |
| inline=False | |
| ) | |
| await interaction.response.send_message(embed=embed) | |
| else: | |
| await interaction.response.send_message(f"No projects found for query: {query}") | |
| except requests.exceptions.RequestException as e: | |
| await interaction.response.send_message(f"An error occurred while searching GitHub: {e}") | |
| async def help_command(interaction: discord.Interaction): | |
| embed = discord.Embed(title="Available Commands", color=discord.Color.blue()) | |
| embed.add_field(name="/set_model <model_name>", value="Set the language model for the entire bot. (ADMIN)", inline=False) | |
| embed.add_field(name="/set_system_prompt <prompt>", value="Set the system prompt for the entire bot. (ADMIN)", inline=False) | |
| embed.add_field(name="/set_context_messages <num_messages>", value="Set the number of context messages to use (1-10) for the entire bot. (ADMIN)", inline=False) | |
| embed.add_field(name="/say <message>", value="Make the bot say something. (ADMIN)", inline=False) | |
| embed.add_field(name="/toggle_llm", value="Turn the LLM part of the bot on or off for the entire bot. (ADMIN)", inline=False) | |
| embed.add_field(name="/trending_projects <query>", value="Show trending GitHub projects (past 7 days). Default query: 'topic:language-model'.", inline=False) | |
| embed.add_field(name="/search_github_projects <query>", value="Search for GitHub projects.", inline=False) | |
| embed.add_field(name="/summarize <prompt>", value="Summarize info given.", inline=False) | |
| await interaction.response.send_message(embed=embed) | |
| async def trending_projects(interaction: discord.Interaction, query: str = "topic:language-model"): | |
| """Show trending GitHub projects based on a search query. | |
| Args: | |
| query: The GitHub search query (e.g., 'topic:machine-learning'). | |
| Defaults to 'topic:language-model'. | |
| """ | |
| try: | |
| # Get trending repositories | |
| url = "https://api.github.com/search/repositories" | |
| date_threshold = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d") | |
| params = { | |
| "q": f"{query} created:>{date_threshold}", | |
| "sort": "stars", | |
| "order": "desc", | |
| "per_page": 5 | |
| } | |
| response = requests.get(url, params=params) | |
| response.raise_for_status() | |
| trending_repos = response.json()["items"] | |
| if trending_repos: | |
| embed = discord.Embed( | |
| title=f"Trending GitHub Projects for Query: {query} (Past 7 Days)", | |
| color=discord.Color.blue() | |
| ) | |
| for repo in trending_repos: | |
| repo_name = repo['name'] | |
| repo_url = repo['html_url'] | |
| description = repo['description'] or "No description." | |
| # Create the field value with the link SEPARATELY: | |
| field_value = f"{description}\n" | |
| field_value += f"⭐ {repo['stargazers_count']} " | |
| field_value += f"🍴 {repo['forks_count']}" | |
| # Add the field with the name as the link: | |
| embed.add_field( | |
| name=f"{repo_name}", # Only the repo name here, no bolding or linking | |
| value=f"**[Link to Repo]({repo_url})**\n{description}\n" | |
| f"⭐ {repo['stargazers_count']} " | |
| f"🍴 {repo['forks_count']}", | |
| inline=False | |
| ) | |
| await interaction.response.send_message(embed=embed) | |
| else: | |
| await interaction.response.send_message(f"No trending projects found for query: {query}") | |
| except requests.exceptions.RequestException as e: | |
| await interaction.response.send_message(f"An error occurred while fetching data from GitHub: {e}") | |
| async def set_system_prompt(interaction: discord.Interaction, prompt: str): | |
| if not is_authorized(interaction): | |
| await interaction.response.send_message("You are not authorized to use this command.", ephemeral=True) | |
| return | |
| bot_settings["system_prompt"] = prompt | |
| await interaction.response.send_message(f"System prompt set to:\n```\n{prompt}\n``` for the entire bot.") | |
| async def summarize(interaction: discord.Interaction, text: str): | |
| try: | |
| selected_model = bot_settings["model"] | |
| if selected_model in gemini_models: | |
| try: | |
| # Create a Gemini model instance (do this once, maybe outside the function) | |
| gemini_model = gemini.GenerativeModel(selected_model) | |
| # Use the model instance to generate content | |
| response = gemini_model.generate_content( | |
| f"Summarize the following text:\n\n{text}", | |
| ) | |
| # Extract the summary from the response | |
| summary = response.text | |
| await interaction.response.send_message(f"Summary:\n```\n{summary}\n```") | |
| except Exception as e: | |
| await interaction.response.send_message(f"An error occurred while processing the request: {e}") | |
| else: # Use Groq API for summarization | |
| system_prompt = bot_settings["system_prompt"] | |
| chat_completion = client.chat.completions.create( | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"Summarize the following text:\n\n{text}"} | |
| ], | |
| model=selected_model | |
| ) | |
| summary = chat_completion.choices[0].message.content | |
| await interaction.response.send_message(f"Summary:\n```\n{summary}\n```") | |
| except Exception as e: | |
| await interaction.response.send_message(f"An error occurred: {e}") | |
| async def play_audio(interaction: discord.Interaction, channel: discord.VoiceChannel): | |
| """Joins a specified voice channel and plays an audio file. | |
| Args: | |
| channel: The voice channel to join. | |
| """ | |
| if not is_authorized(interaction): | |
| await interaction.response.send_message( | |
| "You are not authorized to use this command.", ephemeral=True | |
| ) | |
| return | |
| # Check if the bot is already connected to a voice channel | |
| if interaction.guild.voice_client: | |
| await interaction.response.send_message( | |
| "The bot is already connected to a voice channel.", ephemeral=True | |
| ) | |
| return | |
| try: | |
| # Connect to the specified voice channel | |
| await channel.connect() | |
| await interaction.response.send_message( | |
| f"Connected to voice channel: {channel.name}", ephemeral=True | |
| ) | |
| # Path to your audio file (replace with the actual path) | |
| audio_file = r"path to audio shit" | |
| # Create a StreamPlayer for the audio | |
| source = discord.FFmpegPCMAudio(audio_file) | |
| interaction.guild.voice_client.play(source) | |
| # Wait until the audio finishes playing | |
| while interaction.guild.voice_client.is_playing(): | |
| await asyncio.sleep(1) | |
| # Disconnect from the voice channel | |
| await interaction.guild.voice_client.disconnect() | |
| except Exception as e: | |
| await interaction.response.send_message( | |
| f"An error occurred: {e}", ephemeral=True | |
| ) | |
| async def set_context_messages(interaction: discord.Interaction, num_messages: int): | |
| if not is_authorized(interaction): | |
| await interaction.response.send_message("You are not authorized to use this command.", ephemeral=True) | |
| return | |
| if 1 <= num_messages <= 10: | |
| bot_settings["context_messages"] = num_messages | |
| await interaction.response.send_message(f"Number of context messages set to: {num_messages} for the entire bot.") | |
| else: | |
| await interaction.response.send_message("Invalid number of messages. Please choose between 1 and 10.") | |
| async def say(interaction: discord.Interaction, message: str): | |
| if not is_authorized(interaction): | |
| await interaction.response.send_message("You are not authorized to use this command.", ephemeral=True) | |
| return | |
| # Delete the user's command invocation (it will briefly appear) | |
| await interaction.response.defer(ephemeral=True) | |
| await interaction.delete_original_response() | |
| # Send the message as the bot | |
| await interaction.channel.send(message) | |
| async def toggle_llm(interaction: discord.Interaction): | |
| if not is_authorized(interaction): | |
| await interaction.response.send_message("You are not authorized to use this command.", ephemeral=True) | |
| return | |
| bot_settings["llm_enabled"] = not bot_settings["llm_enabled"] | |
| new_state = "OFF" if not bot_settings["llm_enabled"] else "ON" | |
| await interaction.response.send_message(f"LLM is now turned {new_state} for the entire bot.") | |
| # --- Message Handling --- | |
| async def on_message(message): | |
| await bot.process_commands(message) | |
| if message.author == bot.user or not bot_settings["llm_enabled"]: | |
| return | |
| is_mentioned = bot.user.mentioned_in(message) | |
| is_reply_to_bot = message.reference is not None and message.reference.resolved.author == bot.user | |
| if is_mentioned or is_reply_to_bot: | |
| try: | |
| channel_id = str(message.channel.id) | |
| messages = conversation_data[channel_id]["messages"] | |
| selected_model = bot_settings["model"] | |
| system_prompt = bot_settings["system_prompt"] | |
| context_messages_num = bot_settings["context_messages"] | |
| context_messages = messages[-context_messages_num:] | |
| api_messages = [{"role": "system", "content": system_prompt}] + context_messages + [{"role": "user", "content": message.content}] | |
| if selected_model in gemini_models: | |
| gemini_model_mapping = { | |
| "flash": "gemini-1.5-pro-flash", | |
| "pro": "gemini-1.5-pro-latest" | |
| } | |
| model_id = gemini_model_mapping.get(selected_model, "gemini-1.5-pro-latest") | |
| url = f'https://generativelanguage.googleapis.com/v1beta/models/{model_id}:generateContent?key={GOOGLE_API_KEY}' | |
| headers = {'Content-Type': 'application/json'} | |
| data = {"contents": [{"parts": [{"text": m["content"]}]}] for m in api_messages} | |
| response = requests.post(url, headers=headers, data=json.dumps(data)) | |
| if response.status_code == 200: | |
| response_json = response.json() | |
| generated_text = response_json['candidates'][0]['content']['parts'][0]['text'] | |
| else: | |
| await message.channel.send(f"Error: {response.status_code}\n{response.text}") | |
| return | |
| else: # Groq model | |
| chat_completion = client.chat.completions.create( | |
| messages=api_messages, | |
| model=selected_model | |
| ) | |
| generated_text = chat_completion.choices[0].message.content | |
| await message.channel.send(generated_text.strip()) | |
| messages.append({"role": "user", "content": message.content}) | |
| messages.append({"role": "assistant", "content": generated_text.strip()}) | |
| conversation_data[channel_id]["messages"] = messages[-10:] | |
| except Exception as e: | |
| await message.channel.send(f"An error occurred: {e}") | |
| # --- Event Handling --- | |
| async def on_ready(): | |
| print(f'Logged in as {bot.user.name}') | |
| await bot.tree.sync(guild=None) | |
| print("Application commands synced.") | |
| start_web_server() | |
| # Flask web server | |
| app = Flask(__name__) | |
| def index(): | |
| return '<h1>Boykissing</h1><p>:3</p>' | |
| def run_flask(): | |
| app.run(host='0.0.0.0', port=6969) | |
| def start_web_server(): | |
| server_thread = threading.Thread(target=run_flask) | |
| server_thread.start() | |
| # Run the bot | |
| if __name__ == '__main__': | |
| start_web_server() | |
| bot.run(os.getenv('DISCORD_TOKEN')) |