Spaces:
Runtime error
Runtime error
| """ | |
| Simple Telegram bot with Gemini AI integration | |
| """ | |
| import os | |
| import logging | |
| from datetime import datetime | |
| from telegram import Update | |
| from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes | |
| from dotenv import load_dotenv | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from pymongo import MongoClient | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize MongoDB | |
| MONGODB_URI = os.getenv("MONGODB_URI") | |
| mongo_client = MongoClient(MONGODB_URI) | |
| db = mongo_client["telegram_bot"] | |
| conversations_collection = db["conversations"] | |
| # Initialize Gemini | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-exp", | |
| google_api_key=GOOGLE_API_KEY, | |
| max_output_tokens=1024 | |
| ) | |
| # System prompt | |
| SYSTEM_PROMPT = """You are a helpful, friendly AI assistant in a Telegram chat. | |
| Keep your responses concise and conversational. Use emojis occasionally to make | |
| the conversation more engaging. Be supportive and understanding.""" | |
| def get_conversation_history(user_id: int, limit: int = 10): | |
| """Retrieve recent conversation history from MongoDB""" | |
| messages = conversations_collection.find( | |
| {"user_id": user_id} | |
| ).sort("timestamp", -1).limit(limit) | |
| # Convert to LangChain message format (reverse to get chronological order) | |
| history = [] | |
| for msg in reversed(list(messages)): | |
| if msg["role"] == "user": | |
| history.append(HumanMessage(content=msg["content"])) | |
| elif msg["role"] == "assistant": | |
| history.append(AIMessage(content=msg["content"])) | |
| return history | |
| def save_message(user_id: int, username: str, role: str, content: str): | |
| """Save a message to MongoDB""" | |
| conversations_collection.insert_one({ | |
| "user_id": user_id, | |
| "username": username, | |
| "role": role, | |
| "content": content, | |
| "timestamp": datetime.utcnow() | |
| }) | |
| async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /start command""" | |
| user = update.effective_user | |
| welcome_message = ( | |
| f"👋 Hello {user.first_name}!\n\n" | |
| "I'm a simple test bot powered by Google Gemini. " | |
| "Just send me a message and I'll respond!\n\n" | |
| "Commands:\n" | |
| "/start - Show this welcome message\n" | |
| "/clear - Clear your conversation history" | |
| ) | |
| await update.message.reply_text(welcome_message) | |
| async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /clear command - clear conversation history""" | |
| user_id = update.effective_user.id | |
| result = conversations_collection.delete_many({"user_id": user_id}) | |
| await update.message.reply_text( | |
| f"✅ Cleared {result.deleted_count} messages from your history." | |
| ) | |
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle incoming messages""" | |
| user = update.effective_user | |
| user_message = update.message.text | |
| logger.info(f"Message from {user.username} ({user.id}): {user_message}") | |
| # Save user message | |
| save_message(user.id, user.username or user.first_name, "user", user_message) | |
| # Show typing indicator | |
| await update.message.chat.send_action("typing") | |
| try: | |
| # Get conversation history | |
| history = get_conversation_history(user.id) | |
| # Build messages list | |
| messages = [SystemMessage(content=SYSTEM_PROMPT)] | |
| messages.extend(history) | |
| messages.append(HumanMessage(content=user_message)) | |
| # Get AI response | |
| response = llm.invoke(messages) | |
| ai_message = response.content | |
| # Save AI response | |
| save_message(user.id, user.username or user.first_name, "assistant", ai_message) | |
| # Send response | |
| await update.message.reply_text(ai_message) | |
| logger.info(f"Response sent to {user.username} ({user.id})") | |
| except Exception as e: | |
| logger.error(f"Error processing message: {e}", exc_info=True) | |
| await update.message.reply_text( | |
| "Sorry, I encountered an error processing your message. Please try again." | |
| ) | |
| async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle errors""" | |
| logger.error(f"Update {update} caused error {context.error}", exc_info=context.error) | |
| def main(): | |
| """Start the bot""" | |
| # Get bot token | |
| token = os.getenv("TELEGRAM_BOT_TOKEN") | |
| if not token: | |
| logger.error("TELEGRAM_BOT_TOKEN not found in environment variables!") | |
| return | |
| # Create application | |
| application = Application.builder().token(token).build() | |
| # Add handlers | |
| application.add_handler(CommandHandler("start", start_command)) | |
| application.add_handler(CommandHandler("clear", clear_command)) | |
| application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) | |
| application.add_error_handler(error_handler) | |
| # Start the bot | |
| logger.info("Starting bot...") | |
| application.run_polling(allowed_updates=Update.ALL_TYPES) | |
| if __name__ == "__main__": | |
| main() | |