File size: 5,328 Bytes
77cbb2d
 
 
 
 
 
 
 
 
 
c06ad11
77cbb2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
Simple Telegram bot with Gemini AI integration
"""
import os
import logging
from datetime import datetime
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from pymongo import MongoClient

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

# Initialize MongoDB
MONGODB_URI = os.getenv("MONGODB_URI")
mongo_client = MongoClient(MONGODB_URI)
db = mongo_client["telegram_bot"]
conversations_collection = db["conversations"]

# Initialize Gemini
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
llm = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-exp",
    google_api_key=GOOGLE_API_KEY,
    max_output_tokens=1024
)

# System prompt
SYSTEM_PROMPT = """You are a helpful, friendly AI assistant in a Telegram chat.
Keep your responses concise and conversational. Use emojis occasionally to make
the conversation more engaging. Be supportive and understanding."""


def get_conversation_history(user_id: int, limit: int = 10):
    """Retrieve recent conversation history from MongoDB"""
    messages = conversations_collection.find(
        {"user_id": user_id}
    ).sort("timestamp", -1).limit(limit)

    # Convert to LangChain message format (reverse to get chronological order)
    history = []
    for msg in reversed(list(messages)):
        if msg["role"] == "user":
            history.append(HumanMessage(content=msg["content"]))
        elif msg["role"] == "assistant":
            history.append(AIMessage(content=msg["content"]))

    return history


def save_message(user_id: int, username: str, role: str, content: str):
    """Save a message to MongoDB"""
    conversations_collection.insert_one({
        "user_id": user_id,
        "username": username,
        "role": role,
        "content": content,
        "timestamp": datetime.utcnow()
    })


async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle /start command"""
    user = update.effective_user
    welcome_message = (
        f"👋 Hello {user.first_name}!\n\n"
        "I'm a simple test bot powered by Google Gemini. "
        "Just send me a message and I'll respond!\n\n"
        "Commands:\n"
        "/start - Show this welcome message\n"
        "/clear - Clear your conversation history"
    )
    await update.message.reply_text(welcome_message)


async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle /clear command - clear conversation history"""
    user_id = update.effective_user.id
    result = conversations_collection.delete_many({"user_id": user_id})
    await update.message.reply_text(
        f"✅ Cleared {result.deleted_count} messages from your history."
    )


async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle incoming messages"""
    user = update.effective_user
    user_message = update.message.text

    logger.info(f"Message from {user.username} ({user.id}): {user_message}")

    # Save user message
    save_message(user.id, user.username or user.first_name, "user", user_message)

    # Show typing indicator
    await update.message.chat.send_action("typing")

    try:
        # Get conversation history
        history = get_conversation_history(user.id)

        # Build messages list
        messages = [SystemMessage(content=SYSTEM_PROMPT)]
        messages.extend(history)
        messages.append(HumanMessage(content=user_message))

        # Get AI response
        response = llm.invoke(messages)
        ai_message = response.content

        # Save AI response
        save_message(user.id, user.username or user.first_name, "assistant", ai_message)

        # Send response
        await update.message.reply_text(ai_message)

        logger.info(f"Response sent to {user.username} ({user.id})")

    except Exception as e:
        logger.error(f"Error processing message: {e}", exc_info=True)
        await update.message.reply_text(
            "Sorry, I encountered an error processing your message. Please try again."
        )


async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
    """Handle errors"""
    logger.error(f"Update {update} caused error {context.error}", exc_info=context.error)


def main():
    """Start the bot"""
    # Get bot token
    token = os.getenv("TELEGRAM_BOT_TOKEN")
    if not token:
        logger.error("TELEGRAM_BOT_TOKEN not found in environment variables!")
        return

    # Create application
    application = Application.builder().token(token).build()

    # Add handlers
    application.add_handler(CommandHandler("start", start_command))
    application.add_handler(CommandHandler("clear", clear_command))
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
    application.add_error_handler(error_handler)

    # Start the bot
    logger.info("Starting bot...")
    application.run_polling(allowed_updates=Update.ALL_TYPES)


if __name__ == "__main__":
    main()