| | import os |
| | import logging |
| | import threading |
| | import asyncio |
| | from typing import Dict, List, Any |
| | import time |
| | from datetime import datetime |
| | from telegram import Update, Bot |
| | from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | class TelegramBot: |
| | """ |
| | Telegram bot integration for the AI second brain. |
| | Handles message ingestion, responses, and synchronization with the main app. |
| | """ |
| | |
| | def __init__(self, agent, token=None, allowed_user_ids=None): |
| | """ |
| | Initialize the Telegram bot. |
| | |
| | Args: |
| | agent: The AssistantAgent instance to use for processing queries |
| | token: Telegram bot token (defaults to environment variable) |
| | allowed_user_ids: List of Telegram user IDs that can use the bot (None for all) |
| | """ |
| | self.agent = agent |
| | self.token = token or os.getenv("TELEGRAM_BOT_TOKEN") |
| | self.allowed_user_ids = allowed_user_ids or [] |
| | if isinstance(self.allowed_user_ids, str): |
| | |
| | self.allowed_user_ids = [int(uid.strip()) for uid in self.allowed_user_ids.split(',') if uid.strip()] |
| | self.message_history = [] |
| | |
| | |
| | self.application = None |
| | self.bot_thread = None |
| | |
| | logger.info("Telegram bot initialized") |
| | |
| | async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | """Handle the /start command.""" |
| | user_name = update.message.from_user.first_name |
| | await update.message.reply_text( |
| | f"Hello {user_name}! I'm your AI Second Brain assistant. Ask me anything or use /help to see available commands." |
| | ) |
| | |
| | async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | """Handle the /help command.""" |
| | help_text = """ |
| | *AI Second Brain Commands* |
| | - Just send me a message with your question |
| | - /search query - Search your knowledge base |
| | - /help - Show this help message |
| | """ |
| | await update.message.reply_text(help_text, parse_mode='Markdown') |
| | |
| | async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | """Handle the /search command.""" |
| | |
| | if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids: |
| | await update.message.reply_text("You're not authorized to use this bot.") |
| | return |
| | |
| | query = ' '.join(context.args) |
| | if not query: |
| | await update.message.reply_text("Please provide a search query: /search your query here") |
| | return |
| | |
| | |
| | await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") |
| | |
| | |
| | try: |
| | response = await self.process_query(query) |
| | |
| | |
| | await update.message.reply_text(response["answer"]) |
| | |
| | |
| | if response["sources"]: |
| | sources_text = "*Sources:*\n" + "\n".join([ |
| | f"- {s['file_name']} ({s['source']})" |
| | for s in response["sources"] |
| | ]) |
| | await update.message.reply_text(sources_text, parse_mode='Markdown') |
| | except Exception as e: |
| | logger.error(f"Error processing search: {e}") |
| | await update.message.reply_text(f"Error processing your search: {str(e)}") |
| | |
| | async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
| | """Handle normal messages.""" |
| | |
| | if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids: |
| | await update.message.reply_text("You're not authorized to use this bot.") |
| | return |
| | |
| | |
| | query = update.message.text |
| | |
| | |
| | await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") |
| | |
| | |
| | try: |
| | |
| | response = await self.process_query(query) |
| | |
| | |
| | self.message_history.append({ |
| | "user": update.message.from_user.username or str(update.message.from_user.id), |
| | "user_id": update.message.from_user.id, |
| | "query": query, |
| | "response": response["answer"], |
| | "timestamp": datetime.now().isoformat(), |
| | "chat_id": update.effective_chat.id |
| | }) |
| | |
| | |
| | await update.message.reply_text(response["answer"]) |
| | |
| | |
| | if response["sources"]: |
| | sources_text = "*Sources:*\n" + "\n".join([ |
| | f"- {s['file_name']} ({s['source']})" |
| | for s in response["sources"] |
| | ]) |
| | await update.message.reply_text(sources_text, parse_mode='Markdown') |
| | except Exception as e: |
| | logger.error(f"Error processing message: {e}") |
| | await update.message.reply_text(f"I encountered an error: {str(e)}") |
| | |
| | async def error_handler(self, update, context): |
| | """Handle errors.""" |
| | logger.error(f"Error: {context.error} - caused by update {update}") |
| | |
| | |
| | if update and update.effective_chat: |
| | await context.bot.send_message( |
| | chat_id=update.effective_chat.id, |
| | text="Sorry, an error occurred while processing your message." |
| | ) |
| | |
| | async def process_query(self, query): |
| | """Process a query using the agent and return a response.""" |
| | |
| | loop = asyncio.get_event_loop() |
| | |
| | |
| | def run_query(): |
| | return self.agent.query(query) |
| | |
| | |
| | response = await loop.run_in_executor(None, run_query) |
| | |
| | |
| | if "answer" in response: |
| | def add_to_memory(): |
| | self.agent.add_conversation_to_memory(query, response["answer"]) |
| | |
| | await loop.run_in_executor(None, add_to_memory) |
| | |
| | return response |
| | |
| | def setup_application(self): |
| | """Set up the Telegram application with handlers.""" |
| | |
| | self.application = Application.builder().token(self.token).build() |
| | |
| | |
| | self.application.add_handler(CommandHandler("start", self.start_command)) |
| | self.application.add_handler(CommandHandler("help", self.help_command)) |
| | self.application.add_handler(CommandHandler("search", self.search_command)) |
| | |
| | |
| | self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) |
| | |
| | |
| | self.application.add_error_handler(self.error_handler) |
| | |
| | logger.info("Telegram application set up successfully") |
| | |
| | def start(self): |
| | """Start the Telegram bot in a separate thread.""" |
| | if not self.token: |
| | logger.error("Telegram bot token not found") |
| | return False |
| | |
| | try: |
| | |
| | self.setup_application() |
| | |
| | |
| | def run_bot(): |
| | asyncio.set_event_loop(asyncio.new_event_loop()) |
| | self.application.run_polling(stop_signals=None) |
| | |
| | self.bot_thread = threading.Thread(target=run_bot, daemon=True) |
| | self.bot_thread.start() |
| | |
| | logger.info("Telegram bot started in background thread") |
| | return True |
| | except Exception as e: |
| | logger.error(f"Error starting Telegram bot: {e}") |
| | return False |
| | |
| | def stop(self): |
| | """Stop the Telegram bot.""" |
| | if self.application: |
| | logger.info("Stopping Telegram bot...") |
| | |
| | async def stop_app(): |
| | await self.application.stop() |
| | await self.application.shutdown() |
| | |
| | |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | try: |
| | loop.run_until_complete(stop_app()) |
| | finally: |
| | loop.close() |
| | |
| | logger.info("Telegram bot stopped") |
| | return True |
| | return False |
| | |
| | def get_message_history(self): |
| | """Get the message history.""" |
| | return self.message_history |