iceboxai-text / chat_bot.py
alphabagibagi's picture
Update chat_bot.py
19a9ddc verified
import os
import logging
import requests
import base64
import io
import re
import asyncio
from datetime import datetime, timezone
from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
from supabase import create_client, Client
from flask import Flask
import threading
# Load environment variables
load_dotenv()
# Configuration
TELEGRAM_TOKEN = os.getenv("TELEGRAM_CHAT_BOT_TOKEN")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
POLLINATIONS_KEY = os.getenv("POLLINATIONS_KEY", "")
# Pollinations API Endpoint
POLLINATIONS_CHAT_URL = "https://gen.pollinations.ai/v1/chat/completions"
# Streaming Configuration
STREAM_UPDATE_INTERVAL = 1.0 # Update message every 1.0 seconds (Safety: avoid 429 Flood Control)
MIN_CHUNK_SIZE = 40 # Reasonable chunk size
# Available Models
AVAILABLE_MODELS = {
"openai": {
"name": "openai",
"display": "🧠 GPT-5.1",
"description": "Text generation, Web search",
"supports_vision": False
}
}
DEFAULT_MODEL = "openai"
# Logging setup
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
# Supabase Client
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
async def get_or_create_user(user, chat_id: int) -> str:
"""Get user UUID from database, or create if not exists."""
try:
logger.info(f"Getting or creating user for chat_id: {chat_id}")
user_res = supabase.table("telegram_users").select("id, bots_joined").eq("chat_id", chat_id).execute()
if user_res.data:
user_data = user_res.data[0]
user_uuid = user_data['id']
bots_joined = user_data.get('bots_joined') or []
logger.info(f"User found: {user_uuid}")
if 'chat_bot' not in bots_joined:
bots_joined.append('chat_bot')
supabase.table("telegram_users").update({
"bots_joined": bots_joined
}).eq("id", user_uuid).execute()
logger.info(f"Updated bots_joined for user {user_uuid}")
return user_uuid
else:
logger.info(f"User not found, creating new user for chat_id: {chat_id}")
data = {
"p_chat_id": chat_id,
"p_username": user.username,
"p_first_name": user.first_name,
"p_last_name": user.last_name,
"p_language_code": user.language_code,
"p_is_bot": user.is_bot
}
supabase.rpc("upsert_telegram_user", data).execute()
user_res = supabase.table("telegram_users").select("id").eq("chat_id", chat_id).execute()
user_uuid = user_res.data[0]['id']
logger.info(f"New user created: {user_uuid}")
supabase.table("telegram_users").update({
"bots_joined": ['chat_bot']
}).eq("id", user_uuid).execute()
return user_uuid
except Exception as e:
logger.error(f"Error get_or_create_user: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return None
async def get_chat_history(user_uuid: str, limit: int = 20):
"""Retrieve chat history from database."""
try:
logger.info(f"Fetching chat history for user: {user_uuid}, limit: {limit}")
response = supabase.table("chat_history")\
.select("role, content, image_url")\
.eq("user_id", user_uuid)\
.order("created_at", desc=True)\
.limit(limit)\
.execute()
history_count = len(response.data) if response.data else 0
logger.info(f"Retrieved {history_count} messages from history")
return response.data[::-1] if response.data else []
except Exception as e:
logger.error(f"Error fetching history: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return []
async def save_chat_message(user_uuid: str, role: str, content: str, model_used: str = None, image_url: str = None):
"""Save a message to chat history."""
try:
data = {
"user_id": user_uuid,
"role": role,
"content": content,
"model_used": model_used,
"image_url": image_url
}
logger.info(f"Attempting to save message - User: {user_uuid}, Role: {role}, Content length: {len(content) if content else 0}")
result = supabase.table("chat_history").insert(data).execute()
logger.info(f"Message saved successfully - ID: {result.data[0]['id'] if result.data else 'unknown'}")
return True
except Exception as e:
logger.error(f"Error saving message: {e}")
logger.error(f"Data attempted: user_id={user_uuid}, role={role}, model_used={model_used}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return False
async def log_chat_generation(user_uuid: str, chat_id: int, model: str, prompt: str, response: str):
"""Log chat generation event for analytics."""
try:
data = {
"user_id": user_uuid,
"chat_id": chat_id,
"model_used": model,
"prompt_length": len(prompt) if prompt else 0,
"response_length": len(response) if response else 0
}
supabase.table("chat_generation_logs").insert(data).execute()
logger.info(f"Chat generation logged for user {user_uuid}")
except Exception as e:
logger.error(f"Error logging chat generation: {e}")
def build_messages_payload(history: list, new_message: str, image_url: str = None):
"""Build messages array for Pollinations API."""
messages = []
for msg in history:
content_parts = []
if msg.get('content'):
content_parts.append({
"type": "text",
"text": msg['content']
})
if msg.get('image_url'):
content_parts.append({
"type": "image_url",
"image_url": {"url": msg['image_url']}
})
messages.append({
"role": msg['role'],
"content": content_parts if len(content_parts) > 1 else msg['content']
})
if image_url:
new_content = [
{"type": "text", "text": new_message},
{"type": "image_url", "image_url": {"url": image_url}}
]
else:
new_content = new_message
messages.append({
"role": "user",
"content": new_content
})
return messages
def clean_markdown_for_telegram(text: str) -> str:
"""Convert markdown to HTML for better Telegram compatibility."""
text = re.sub(r'###\s+(.+)', r'<b>\1</b>', text)
text = re.sub(r'##\s+(.+)', r'<b>\1</b>', text)
text = re.sub(r'#\s+(.+)', r'<b>\1</b>', text)
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
BULLET_PLACEHOLDER = "___BULLET___"
text = re.sub(r'^\*\s', BULLET_PLACEHOLDER, text, flags=re.MULTILINE)
text = re.sub(r'\*([^\*\n]+?)\*', r'<i>\1</i>', text)
text = text.replace(BULLET_PLACEHOLDER, 'β€’ ')
text = re.sub(r'`([^`]+)`', r'<code>\1</code>', text)
text = re.sub(r'```(\w+)?\n([\s\S]+?)\n```', r'<pre>\2</pre>', text)
return text
async def call_pollinations_api_streaming(messages: list, model: str, callback):
"""
Call Pollinations API with streaming support.
Args:
messages: List of message objects
model: Model name to use
callback: Async function to call with each chunk of text
Returns:
Complete text response
"""
try:
headers = {
"Content-Type": "application/json"
}
if POLLINATIONS_KEY:
headers["Authorization"] = f"Bearer {POLLINATIONS_KEY}"
payload = {
"model": model,
"messages": messages,
"stream": True # Enable streaming
}
logger.info(f"Calling Pollinations API (streaming) with model: {model}")
# Use requests with stream=True
response = requests.post(
POLLINATIONS_CHAT_URL,
json=payload,
headers=headers,
stream=True,
timeout=60
)
response.raise_for_status()
full_text = ""
# Process streaming response
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
# SSE format: data: {...}
if line.startswith('data: '):
data_str = line[6:] # Remove 'data: ' prefix
# Check for end of stream
if data_str.strip() == '[DONE]':
break
try:
import json
data = json.loads(data_str)
# Extract content from delta
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
full_text += content
# Call callback with accumulated text
await callback(full_text)
except json.JSONDecodeError:
# Skip malformed JSON
continue
logger.info(f"Streaming complete. Total length: {len(full_text)}")
return full_text
except requests.exceptions.RequestException as e:
logger.error(f"API request error: {e}")
return None
except Exception as e:
logger.error(f"Error in streaming API call: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return None
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /start command."""
user = update.effective_user
chat_id = update.effective_chat.id
user_uuid = await get_or_create_user(user, chat_id)
welcome_text = (
"**Welcome to Icebox AI Chat!**\n\n"
"I'm an AI assistant that can help you:\n"
"β€’ Answer questions\n"
"β€’ Write and summarize text\n"
"β€’ Search the web\n\n"
"```\n"
"πŸ“Ÿ How to start:\n"
"Just send any message.\n\n"
"Examples:\n"
"> Give me business ideas\n"
"> Summarize this article\n"
"> Find the latest AI news\n"
"```\n\n"
"**βš™οΈ Commands:**\n"
"β€’ /help β€” show help\n"
"β€’ /model β€” model info\n"
"β€’ /clear β€” delete chat history"
)
keyboard = [
[InlineKeyboardButton("βš™οΈ Model Settings", callback_data="select_model")],
[InlineKeyboardButton("πŸ—‘οΈ Clear Chat History", callback_data="clear_history")],
[InlineKeyboardButton("🏞️ Create Image", url="https://t.me/iceboxai_bot")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(welcome_text, reply_markup=reply_markup, parse_mode="Markdown")
async def model_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /model command."""
current_model = context.user_data.get('model', DEFAULT_MODEL)
model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL])
text = f"""βš™οΈ **Current Model Information**
**Name:** {model_info['display']}
**Description:** {model_info['description']}
**Vision Support:** {"Yes βœ…" if model_info['supports_vision'] else "No ❌"}
**Streaming:** Enabled ⚑"""
keyboard = [
[InlineKeyboardButton("πŸ”„ Change Model", callback_data="select_model")],
[InlineKeyboardButton("Β« Back", callback_data="back_to_chat")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(text, reply_markup=reply_markup, parse_mode="Markdown")
async def handle_model_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle model selection callbacks."""
query = update.callback_query
await query.answer()
data = query.data
if data == "show_help":
current_model = context.user_data.get('model', DEFAULT_MODEL)
model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL])
help_text = f"""πŸ“š **Icebox AI Chat Help**
**Available commands:**
/start - Start the bot
/help - Show this help
/image - Generate images with AI
/model - View model information
/clear - Clear chat history
**How to use:**
Send a text message to ask questions. The bot remembers conversation context and responses stream in real-time.
**Current model:** {model_info['display']}
**Vision support:** {"Yes βœ…" if model_info['supports_vision'] else "No ❌"}
**Streaming:** Enabled ⚑
**Tips:**
β€’ Chat history is saved, so the AI remembers previous conversations
β€’ Use /clear to start a new conversation without previous context
β€’ Watch responses appear in real-time as the AI generates them!"""
keyboard = [
[InlineKeyboardButton("Β« Back", callback_data="back_to_chat")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(help_text, reply_markup=reply_markup, parse_mode="Markdown")
elif data == "select_model":
keyboard = []
current_model = context.user_data.get('model', DEFAULT_MODEL)
for model_key, model_info in AVAILABLE_MODELS.items():
checkmark = "βœ“ " if model_key == current_model else ""
button_text = f"{checkmark}{model_info['display']}"
keyboard.append([InlineKeyboardButton(button_text, callback_data=f"model_{model_key}")])
keyboard.append([InlineKeyboardButton("Β« Back", callback_data="back_to_chat")])
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("βš™οΈ **Select Model:**", reply_markup=reply_markup, parse_mode="Markdown")
elif data.startswith("model_"):
model_key = data.replace("model_", "")
context.user_data['model'] = model_key
model_info = AVAILABLE_MODELS[model_key]
text = f"""βœ… **Model Updated**
Now using: {model_info['display']}
{model_info['description']}"""
keyboard = [
[InlineKeyboardButton("βš™οΈ Model Settings", callback_data="select_model")],
[InlineKeyboardButton("Β« Back to Chat", callback_data="back_to_chat")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup, parse_mode="Markdown")
elif data == "back_to_chat":
current_model = context.user_data.get('model', DEFAULT_MODEL)
model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL])
text = f"""πŸ€– **Icebox AI Chat**
Active model: {model_info['display']}
Streaming: Enabled ⚑
Send me a message to start chatting.
**Commands:**
/help - Show help & available commands
/image - Generate images with AI
/model - View model information
/clear - Clear chat history"""
keyboard = [
[InlineKeyboardButton("πŸ“š Help", callback_data="show_help")],
[InlineKeyboardButton("πŸ—‘οΈ Clear Chat History", callback_data="clear_history")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(text, reply_markup=reply_markup, parse_mode="Markdown")
elif data == "clear_history":
chat_id = update.effective_chat.id
try:
user_res = supabase.table("telegram_users").select("id").eq("chat_id", chat_id).execute()
if user_res.data:
user_uuid = user_res.data[0]['id']
supabase.table("chat_history").delete().eq("user_id", user_uuid).execute()
await query.edit_message_text(
"πŸ—‘οΈ **Chat history cleared!**\n\nSend a message to start a new conversation.",
parse_mode="Markdown"
)
else:
await query.edit_message_text("Error: User not found.")
except Exception as e:
logger.error(f"Error clearing history: {e}")
await query.edit_message_text("Error: Failed to clear chat history.")
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle incoming text messages with streaming response."""
user = update.effective_user
chat_id = update.effective_chat.id
message_text = update.message.text
logger.info(f"Received message from chat_id: {chat_id}, user: {user.username}")
# Get or create user
user_uuid = await get_or_create_user(user, chat_id)
if not user_uuid:
logger.error(f"Failed to get user_uuid for chat_id: {chat_id}")
await update.message.reply_text("Error: Unable to identify user.")
return
logger.info(f"User UUID: {user_uuid}")
# Get current model
current_model = context.user_data.get('model', DEFAULT_MODEL)
logger.info(f"Using model: {current_model}")
# Send initial typing action
await context.bot.send_chat_action(chat_id=chat_id, action="typing")
# Get chat history
history = await get_chat_history(user_uuid, limit=20)
# Build messages payload
messages = build_messages_payload(history, message_text)
# Send initial placeholder message
reply_message = await update.message.reply_text("⏳ _Thinking..._", parse_mode="Markdown")
# Variables for streaming updates
last_update_time = asyncio.get_event_loop().time()
last_text = ""
update_lock = asyncio.Lock()
async def update_message(text):
"""Callback to update message with streamed content."""
nonlocal last_update_time, last_text
current_time = asyncio.get_event_loop().time()
time_since_update = current_time - last_update_time
text_diff = len(text) - len(last_text)
# Update only if enough time passed (Telegram has strict edit limits)
should_update = time_since_update >= STREAM_UPDATE_INTERVAL
if should_update:
async with update_lock:
try:
# Clean and format text
cleaned_text = clean_markdown_for_telegram(text)
# Add typing indicator
display_text = cleaned_text + " β–Œ"
# Update message (with rate limiting protection)
if display_text != last_text:
await reply_message.edit_text(display_text, parse_mode="HTML")
last_text = display_text
last_update_time = current_time
# Keep showing typing action (only every 4 seconds to save resources)
if 'last_action_time' not in context.user_data or (current_time - context.user_data.get('last_action_time', 0) > 4):
await context.bot.send_chat_action(chat_id=chat_id, action="typing")
context.user_data['last_action_time'] = current_time
except Exception as e:
# Ignore update errors (like message not modified)
if "Message is not modified" not in str(e):
logger.warning(f"Error updating message: {e}")
# Call streaming API
ai_reply = await call_pollinations_api_streaming(messages, current_model, update_message)
if ai_reply:
logger.info(f"Received complete AI reply, length: {len(ai_reply)}")
# Save user message
await save_chat_message(user_uuid, "user", message_text, model_used=current_model)
# Save AI reply
await save_chat_message(user_uuid, "assistant", ai_reply, model_used=current_model)
# Log for analytics
await log_chat_generation(user_uuid, chat_id, current_model, message_text, ai_reply)
# Final update - remove typing indicator
ai_reply_cleaned = clean_markdown_for_telegram(ai_reply)
try:
# Send final version without typing indicator
if len(ai_reply_cleaned) > 4096:
# Delete placeholder and send in chunks
await reply_message.delete()
for i in range(0, len(ai_reply_cleaned), 4096):
chunk = ai_reply_cleaned[i:i+4096]
try:
await update.message.reply_text(chunk, parse_mode="HTML")
except Exception as e:
logger.warning(f"HTML parse error, sending as plain text: {e}")
await update.message.reply_text(chunk)
else:
await reply_message.edit_text(ai_reply_cleaned, parse_mode="HTML")
except Exception as e:
logger.warning(f"Error in final update: {e}")
try:
await reply_message.edit_text(ai_reply_cleaned)
except:
pass
else:
logger.error("No AI reply received from API")
await reply_message.edit_text(
"⚠️ Sorry, an error occurred while contacting the AI. Please try again."
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /help command."""
current_model = context.user_data.get('model', DEFAULT_MODEL)
model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL])
help_text = f"""πŸ“š **Icebox AI Chat Help**
**Available commands:**
β€’ /start - Start the bot
β€’ /model - View model info
β€’ /image - Generate AI images
β€’ /clear - Clear chat history
β€’ /help - Show this help
**How to use:**
Send a text message to ask questions. The bot remembers conversation context and responses stream in real-time ⚑
**Current model:** {model_info['display']}
**Vision support:** {"Yes βœ…" if model_info['supports_vision'] else "No ❌"}
**Streaming:** Enabled ⚑
**Tips:**
β€’ Chat history is saved, so the AI remembers previous conversations
β€’ Use /clear to start a new conversation without previous context
β€’ Watch responses appear in real-time as the AI generates them!
"""
await update.message.reply_text(help_text, parse_mode="Markdown")
async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /clear command to clear chat history."""
chat_id = update.effective_chat.id
try:
user_res = supabase.table("telegram_users").select("id").eq("chat_id", chat_id).execute()
if user_res.data:
user_uuid = user_res.data[0]['id']
supabase.table("chat_history").delete().eq("user_id", user_uuid).execute()
await update.message.reply_text(
"πŸ—‘οΈ **Chat history cleared!**\n\nSend a message to start a new conversation.",
parse_mode="Markdown"
)
else:
await update.message.reply_text("Error: User not found. Please type /start first.")
except Exception as e:
logger.error(f"Error clearing history: {e}")
await update.message.reply_text("Error: Failed to clear chat history.")
async def image_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle /image command - redirect to image generation bot."""
image_text = """🏞️ **Generate AI Images**
To create AI-generated images, please use our dedicated image bot:
πŸ‘‰ @iceboxai_bot
Just start a chat with that bot and describe the image you want to generate!"""
await update.message.reply_text(image_text, parse_mode="Markdown")
def main():
"""Start the chat bot."""
if not TELEGRAM_TOKEN:
print("Error: TELEGRAM_CHAT_BOT_TOKEN not found in environment variables.")
return
if not SUPABASE_URL or not SUPABASE_KEY:
print("Error: SUPABASE_URL or SUPABASE_KEY not found.")
return
base_url = os.getenv("TELEGRAM_API_BASE_URL")
base_file_url = os.getenv("TELEGRAM_API_FILE_URL")
# Increase timeout significantly for slow proxies (n8n/etc)
from telegram.request import HTTPXRequest
request_config = HTTPXRequest(
connect_timeout=30.0,
read_timeout=60.0,
write_timeout=30.0,
pool_timeout=30.0
)
builder = Application.builder().token(TELEGRAM_TOKEN).request(request_config)
if base_url:
builder.base_url(base_url)
print(f"Using Custom API URL: {base_url}")
if base_file_url:
builder.base_file_url(base_file_url)
application = builder.build()
# Command handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("model", model_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("clear", clear_command))
application.add_handler(CommandHandler("image", image_command))
# Callback query handler
application.add_handler(CallbackQueryHandler(handle_model_callback))
# Message handlers
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
print("Icebox AI Chat Bot (Streaming Enabled) is running...")
application.run_polling()
def run_dummy_server():
"""Run a dummy Flask server on port 7860 for Hugging Face Spaces."""
app = Flask(__name__)
@app.route('/')
def health_check():
return "Chat Bot is running!", 200
port = int(os.getenv("PORT", 7860))
app.run(host='0.0.0.0', port=port)
if __name__ == "__main__":
# Start dummy server in a separate thread
threading.Thread(target=run_dummy_server, daemon=True).start()
main()