telegram-test / bot.py
Ewan
Update LangChain imports to use langchain_core.messages
c06ad11
"""
Simple Telegram bot with Gemini AI integration
"""
import os
import logging
from datetime import datetime
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from pymongo import MongoClient
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Initialize MongoDB
MONGODB_URI = os.getenv("MONGODB_URI")
mongo_client = MongoClient(MONGODB_URI)
db = mongo_client["telegram_bot"]
conversations_collection = db["conversations"]
# Initialize Gemini
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash-exp",
google_api_key=GOOGLE_API_KEY,
max_output_tokens=1024
)
# System prompt
SYSTEM_PROMPT = """You are a helpful, friendly AI assistant in a Telegram chat.
Keep your responses concise and conversational. Use emojis occasionally to make
the conversation more engaging. Be supportive and understanding."""
def get_conversation_history(user_id: int, limit: int = 10):
"""Retrieve recent conversation history from MongoDB"""
messages = conversations_collection.find(
{"user_id": user_id}
).sort("timestamp", -1).limit(limit)
# Convert to LangChain message format (reverse to get chronological order)
history = []
for msg in reversed(list(messages)):
if msg["role"] == "user":
history.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
history.append(AIMessage(content=msg["content"]))
return history
def save_message(user_id: int, username: str, role: str, content: str):
"""Save a message to MongoDB"""
conversations_collection.insert_one({
"user_id": user_id,
"username": username,
"role": role,
"content": content,
"timestamp": datetime.utcnow()
})
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command"""
user = update.effective_user
welcome_message = (
f"👋 Hello {user.first_name}!\n\n"
"I'm a simple test bot powered by Google Gemini. "
"Just send me a message and I'll respond!\n\n"
"Commands:\n"
"/start - Show this welcome message\n"
"/clear - Clear your conversation history"
)
await update.message.reply_text(welcome_message)
async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /clear command - clear conversation history"""
user_id = update.effective_user.id
result = conversations_collection.delete_many({"user_id": user_id})
await update.message.reply_text(
f"✅ Cleared {result.deleted_count} messages from your history."
)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle incoming messages"""
user = update.effective_user
user_message = update.message.text
logger.info(f"Message from {user.username} ({user.id}): {user_message}")
# Save user message
save_message(user.id, user.username or user.first_name, "user", user_message)
# Show typing indicator
await update.message.chat.send_action("typing")
try:
# Get conversation history
history = get_conversation_history(user.id)
# Build messages list
messages = [SystemMessage(content=SYSTEM_PROMPT)]
messages.extend(history)
messages.append(HumanMessage(content=user_message))
# Get AI response
response = llm.invoke(messages)
ai_message = response.content
# Save AI response
save_message(user.id, user.username or user.first_name, "assistant", ai_message)
# Send response
await update.message.reply_text(ai_message)
logger.info(f"Response sent to {user.username} ({user.id})")
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
await update.message.reply_text(
"Sorry, I encountered an error processing your message. Please try again."
)
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle errors"""
logger.error(f"Update {update} caused error {context.error}", exc_info=context.error)
def main():
"""Start the bot"""
# Get bot token
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
logger.error("TELEGRAM_BOT_TOKEN not found in environment variables!")
return
# Create application
application = Application.builder().token(token).build()
# Add handlers
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("clear", clear_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_error_handler(error_handler)
# Start the bot
logger.info("Starting bot...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()