| import discord |
| import asyncio |
| import logging |
| import os |
| from typing import Dict, List, Any |
| import threading |
| from discord.ext import commands |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class DiscordBot: |
| """ |
| Discord bot integration for the AI second brain. |
| Handles message ingestion, responses, and synchronization with the main app. |
| """ |
| |
| def __init__(self, agent, token=None, channel_whitelist=None): |
| """ |
| Initialize the Discord bot. |
| |
| Args: |
| agent: The AssistantAgent instance to use for processing queries |
| token: Discord bot token (defaults to environment variable) |
| channel_whitelist: List of channel IDs to listen to (None for all) |
| """ |
| self.agent = agent |
| self.token = token or os.getenv("DISCORD_BOT_TOKEN") |
| self.channel_whitelist = channel_whitelist or [] |
| self.message_history = [] |
| |
| |
| intents = discord.Intents.default() |
| intents.message_content = True |
| self.client = commands.Bot(command_prefix="!", intents=intents) |
| |
| |
| self.setup_event_handlers() |
| |
| |
| self.bot_thread = None |
| |
| logger.info("Discord bot initialized") |
| |
| def setup_event_handlers(self): |
| """Register event handlers for the Discord client.""" |
| |
| @self.client.event |
| async def on_ready(): |
| logger.info(f"Discord bot logged in as {self.client.user}") |
| |
| @self.client.event |
| async def on_message(message): |
| |
| if message.author == self.client.user: |
| return |
| |
| |
| await self.client.process_commands(message) |
| |
| |
| if self.channel_whitelist and message.channel.id not in self.channel_whitelist: |
| return |
| |
| |
| is_dm = isinstance(message.channel, discord.DMChannel) |
| is_mentioned = self.client.user in message.mentions |
| |
| if is_dm or is_mentioned: |
| await self.process_message(message) |
| |
| |
| @self.client.command(name="help") |
| async def help_command(ctx): |
| help_text = """ |
| **AI Assistant Commands** |
| - Mention me with a question to get an answer |
| - Send me a DM with your query |
| - Use `!search <query>` to search your knowledge base |
| - Use `!upload` with an attachment to add to your knowledge base |
| """ |
| await ctx.send(help_text) |
| |
| |
| @self.client.command(name="search") |
| async def search_command(ctx, *, query): |
| async with ctx.typing(): |
| response = await self.process_query(query) |
| await ctx.send(response["answer"]) |
| |
| |
| if response["sources"]: |
| sources_text = "**Sources:**\n" + "\n".join([ |
| f"- {s['file_name']} ({s['source']})" |
| for s in response["sources"] |
| ]) |
| await ctx.send(sources_text) |
| |
| async def process_message(self, message): |
| """Process a Discord message and send a response.""" |
| |
| content = message.content |
| for mention in message.mentions: |
| content = content.replace(f'<@{mention.id}>', '').replace(f'<@!{mention.id}>', '') |
| |
| query = content.strip() |
| if not query: |
| await message.channel.send("How can I help you?") |
| return |
| |
| |
| async with message.channel.typing(): |
| |
| response = await self.process_query(query) |
| |
| |
| self.message_history.append({ |
| "user": str(message.author), |
| "query": query, |
| "response": response["answer"], |
| "timestamp": message.created_at.isoformat(), |
| "channel": str(message.channel) |
| }) |
| |
| |
| await message.channel.send(response["answer"]) |
| |
| |
| if response["sources"]: |
| sources_text = "**Sources:**\n" + "\n".join([ |
| f"- {s['file_name']} ({s['source']})" |
| for s in response["sources"] |
| ]) |
| await message.channel.send(sources_text) |
| |
| async def process_query(self, query): |
| """Process a query using the agent and return a response.""" |
| |
| loop = asyncio.get_event_loop() |
| response = await loop.run_in_executor(None, self.agent.query, query) |
| |
| |
| if "answer" in response: |
| await loop.run_in_executor( |
| None, |
| self.agent.add_conversation_to_memory, |
| query, |
| response["answer"] |
| ) |
| |
| return response |
| |
| def start(self): |
| """Start the Discord bot in a separate thread.""" |
| if not self.token: |
| logger.error("Discord bot token not found") |
| return False |
| |
| def run_bot(): |
| asyncio.set_event_loop(asyncio.new_event_loop()) |
| self.client.run(self.token) |
| |
| self.bot_thread = threading.Thread(target=run_bot, daemon=True) |
| self.bot_thread.start() |
| logger.info("Discord bot started in background thread") |
| return True |
| |
| def stop(self): |
| """Stop the Discord bot.""" |
| if self.client and self.client.is_ready(): |
| asyncio.run_coroutine_threadsafe(self.client.close(), self.client.loop) |
| logger.info("Discord bot stopped") |
| |
| def get_message_history(self): |
| """Get the message history.""" |
| return self.message_history |