Spaces:
Running
Running
| import os | |
| import logging | |
| import requests | |
| import base64 | |
| import io | |
| import re | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from dotenv import load_dotenv | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
| from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes | |
| from supabase import create_client, Client | |
| from flask import Flask | |
| import threading | |
| # Load environment variables | |
| load_dotenv() | |
| # Configuration | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_CHAT_BOT_TOKEN") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| POLLINATIONS_KEY = os.getenv("POLLINATIONS_KEY", "") | |
| # Pollinations API Endpoint | |
| POLLINATIONS_CHAT_URL = "https://gen.pollinations.ai/v1/chat/completions" | |
| # Streaming Configuration | |
| STREAM_UPDATE_INTERVAL = 1.0 # Update message every 1.0 seconds (Safety: avoid 429 Flood Control) | |
| MIN_CHUNK_SIZE = 40 # Reasonable chunk size | |
| # Available Models | |
| AVAILABLE_MODELS = { | |
| "openai": { | |
| "name": "openai", | |
| "display": "π§ GPT-5.1", | |
| "description": "Text generation, Web search", | |
| "supports_vision": False | |
| } | |
| } | |
| DEFAULT_MODEL = "openai" | |
| # Logging setup | |
| logging.basicConfig( | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Supabase Client | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| async def get_or_create_user(user, chat_id: int) -> str: | |
| """Get user UUID from database, or create if not exists.""" | |
| try: | |
| logger.info(f"Getting or creating user for chat_id: {chat_id}") | |
| user_res = supabase.table("telegram_users").select("id, bots_joined").eq("chat_id", chat_id).execute() | |
| if user_res.data: | |
| user_data = user_res.data[0] | |
| user_uuid = user_data['id'] | |
| bots_joined = user_data.get('bots_joined') or [] | |
| logger.info(f"User found: {user_uuid}") | |
| if 'chat_bot' not in bots_joined: | |
| bots_joined.append('chat_bot') | |
| supabase.table("telegram_users").update({ | |
| "bots_joined": bots_joined | |
| }).eq("id", user_uuid).execute() | |
| logger.info(f"Updated bots_joined for user {user_uuid}") | |
| return user_uuid | |
| else: | |
| logger.info(f"User not found, creating new user for chat_id: {chat_id}") | |
| data = { | |
| "p_chat_id": chat_id, | |
| "p_username": user.username, | |
| "p_first_name": user.first_name, | |
| "p_last_name": user.last_name, | |
| "p_language_code": user.language_code, | |
| "p_is_bot": user.is_bot | |
| } | |
| supabase.rpc("upsert_telegram_user", data).execute() | |
| user_res = supabase.table("telegram_users").select("id").eq("chat_id", chat_id).execute() | |
| user_uuid = user_res.data[0]['id'] | |
| logger.info(f"New user created: {user_uuid}") | |
| supabase.table("telegram_users").update({ | |
| "bots_joined": ['chat_bot'] | |
| }).eq("id", user_uuid).execute() | |
| return user_uuid | |
| except Exception as e: | |
| logger.error(f"Error get_or_create_user: {e}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return None | |
| async def get_chat_history(user_uuid: str, limit: int = 20): | |
| """Retrieve chat history from database.""" | |
| try: | |
| logger.info(f"Fetching chat history for user: {user_uuid}, limit: {limit}") | |
| response = supabase.table("chat_history")\ | |
| .select("role, content, image_url")\ | |
| .eq("user_id", user_uuid)\ | |
| .order("created_at", desc=True)\ | |
| .limit(limit)\ | |
| .execute() | |
| history_count = len(response.data) if response.data else 0 | |
| logger.info(f"Retrieved {history_count} messages from history") | |
| return response.data[::-1] if response.data else [] | |
| except Exception as e: | |
| logger.error(f"Error fetching history: {e}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return [] | |
| async def save_chat_message(user_uuid: str, role: str, content: str, model_used: str = None, image_url: str = None): | |
| """Save a message to chat history.""" | |
| try: | |
| data = { | |
| "user_id": user_uuid, | |
| "role": role, | |
| "content": content, | |
| "model_used": model_used, | |
| "image_url": image_url | |
| } | |
| logger.info(f"Attempting to save message - User: {user_uuid}, Role: {role}, Content length: {len(content) if content else 0}") | |
| result = supabase.table("chat_history").insert(data).execute() | |
| logger.info(f"Message saved successfully - ID: {result.data[0]['id'] if result.data else 'unknown'}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving message: {e}") | |
| logger.error(f"Data attempted: user_id={user_uuid}, role={role}, model_used={model_used}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return False | |
| async def log_chat_generation(user_uuid: str, chat_id: int, model: str, prompt: str, response: str): | |
| """Log chat generation event for analytics.""" | |
| try: | |
| data = { | |
| "user_id": user_uuid, | |
| "chat_id": chat_id, | |
| "model_used": model, | |
| "prompt_length": len(prompt) if prompt else 0, | |
| "response_length": len(response) if response else 0 | |
| } | |
| supabase.table("chat_generation_logs").insert(data).execute() | |
| logger.info(f"Chat generation logged for user {user_uuid}") | |
| except Exception as e: | |
| logger.error(f"Error logging chat generation: {e}") | |
| def build_messages_payload(history: list, new_message: str, image_url: str = None): | |
| """Build messages array for Pollinations API.""" | |
| messages = [] | |
| for msg in history: | |
| content_parts = [] | |
| if msg.get('content'): | |
| content_parts.append({ | |
| "type": "text", | |
| "text": msg['content'] | |
| }) | |
| if msg.get('image_url'): | |
| content_parts.append({ | |
| "type": "image_url", | |
| "image_url": {"url": msg['image_url']} | |
| }) | |
| messages.append({ | |
| "role": msg['role'], | |
| "content": content_parts if len(content_parts) > 1 else msg['content'] | |
| }) | |
| if image_url: | |
| new_content = [ | |
| {"type": "text", "text": new_message}, | |
| {"type": "image_url", "image_url": {"url": image_url}} | |
| ] | |
| else: | |
| new_content = new_message | |
| messages.append({ | |
| "role": "user", | |
| "content": new_content | |
| }) | |
| return messages | |
| def clean_markdown_for_telegram(text: str) -> str: | |
| """Convert markdown to HTML for better Telegram compatibility.""" | |
| text = re.sub(r'###\s+(.+)', r'<b>\1</b>', text) | |
| text = re.sub(r'##\s+(.+)', r'<b>\1</b>', text) | |
| text = re.sub(r'#\s+(.+)', r'<b>\1</b>', text) | |
| text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text) | |
| BULLET_PLACEHOLDER = "___BULLET___" | |
| text = re.sub(r'^\*\s', BULLET_PLACEHOLDER, text, flags=re.MULTILINE) | |
| text = re.sub(r'\*([^\*\n]+?)\*', r'<i>\1</i>', text) | |
| text = text.replace(BULLET_PLACEHOLDER, 'β’ ') | |
| text = re.sub(r'`([^`]+)`', r'<code>\1</code>', text) | |
| text = re.sub(r'```(\w+)?\n([\s\S]+?)\n```', r'<pre>\2</pre>', text) | |
| return text | |
| async def call_pollinations_api_streaming(messages: list, model: str, callback): | |
| """ | |
| Call Pollinations API with streaming support. | |
| Args: | |
| messages: List of message objects | |
| model: Model name to use | |
| callback: Async function to call with each chunk of text | |
| Returns: | |
| Complete text response | |
| """ | |
| try: | |
| headers = { | |
| "Content-Type": "application/json" | |
| } | |
| if POLLINATIONS_KEY: | |
| headers["Authorization"] = f"Bearer {POLLINATIONS_KEY}" | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "stream": True # Enable streaming | |
| } | |
| logger.info(f"Calling Pollinations API (streaming) with model: {model}") | |
| # Use requests with stream=True | |
| response = requests.post( | |
| POLLINATIONS_CHAT_URL, | |
| json=payload, | |
| headers=headers, | |
| stream=True, | |
| timeout=60 | |
| ) | |
| response.raise_for_status() | |
| full_text = "" | |
| # Process streaming response | |
| for line in response.iter_lines(): | |
| if line: | |
| line = line.decode('utf-8') | |
| # SSE format: data: {...} | |
| if line.startswith('data: '): | |
| data_str = line[6:] # Remove 'data: ' prefix | |
| # Check for end of stream | |
| if data_str.strip() == '[DONE]': | |
| break | |
| try: | |
| import json | |
| data = json.loads(data_str) | |
| # Extract content from delta | |
| if 'choices' in data and len(data['choices']) > 0: | |
| delta = data['choices'][0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| full_text += content | |
| # Call callback with accumulated text | |
| await callback(full_text) | |
| except json.JSONDecodeError: | |
| # Skip malformed JSON | |
| continue | |
| logger.info(f"Streaming complete. Total length: {len(full_text)}") | |
| return full_text | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"API request error: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error in streaming API call: {e}") | |
| import traceback | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return None | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /start command.""" | |
| user = update.effective_user | |
| chat_id = update.effective_chat.id | |
| user_uuid = await get_or_create_user(user, chat_id) | |
| welcome_text = ( | |
| "**Welcome to Icebox AI Chat!**\n\n" | |
| "I'm an AI assistant that can help you:\n" | |
| "β’ Answer questions\n" | |
| "β’ Write and summarize text\n" | |
| "β’ Search the web\n\n" | |
| "```\n" | |
| "π How to start:\n" | |
| "Just send any message.\n\n" | |
| "Examples:\n" | |
| "> Give me business ideas\n" | |
| "> Summarize this article\n" | |
| "> Find the latest AI news\n" | |
| "```\n\n" | |
| "**βοΈ Commands:**\n" | |
| "β’ /help β show help\n" | |
| "β’ /model β model info\n" | |
| "β’ /clear β delete chat history" | |
| ) | |
| keyboard = [ | |
| [InlineKeyboardButton("βοΈ Model Settings", callback_data="select_model")], | |
| [InlineKeyboardButton("ποΈ Clear Chat History", callback_data="clear_history")], | |
| [InlineKeyboardButton("ποΈ Create Image", url="https://t.me/iceboxai_bot")] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await update.message.reply_text(welcome_text, reply_markup=reply_markup, parse_mode="Markdown") | |
| async def model_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /model command.""" | |
| current_model = context.user_data.get('model', DEFAULT_MODEL) | |
| model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL]) | |
| text = f"""βοΈ **Current Model Information** | |
| **Name:** {model_info['display']} | |
| **Description:** {model_info['description']} | |
| **Vision Support:** {"Yes β " if model_info['supports_vision'] else "No β"} | |
| **Streaming:** Enabled β‘""" | |
| keyboard = [ | |
| [InlineKeyboardButton("π Change Model", callback_data="select_model")], | |
| [InlineKeyboardButton("Β« Back", callback_data="back_to_chat")] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await update.message.reply_text(text, reply_markup=reply_markup, parse_mode="Markdown") | |
| async def handle_model_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle model selection callbacks.""" | |
| query = update.callback_query | |
| await query.answer() | |
| data = query.data | |
| if data == "show_help": | |
| current_model = context.user_data.get('model', DEFAULT_MODEL) | |
| model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL]) | |
| help_text = f"""π **Icebox AI Chat Help** | |
| **Available commands:** | |
| /start - Start the bot | |
| /help - Show this help | |
| /image - Generate images with AI | |
| /model - View model information | |
| /clear - Clear chat history | |
| **How to use:** | |
| Send a text message to ask questions. The bot remembers conversation context and responses stream in real-time. | |
| **Current model:** {model_info['display']} | |
| **Vision support:** {"Yes β " if model_info['supports_vision'] else "No β"} | |
| **Streaming:** Enabled β‘ | |
| **Tips:** | |
| β’ Chat history is saved, so the AI remembers previous conversations | |
| β’ Use /clear to start a new conversation without previous context | |
| β’ Watch responses appear in real-time as the AI generates them!""" | |
| keyboard = [ | |
| [InlineKeyboardButton("Β« Back", callback_data="back_to_chat")] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(help_text, reply_markup=reply_markup, parse_mode="Markdown") | |
| elif data == "select_model": | |
| keyboard = [] | |
| current_model = context.user_data.get('model', DEFAULT_MODEL) | |
| for model_key, model_info in AVAILABLE_MODELS.items(): | |
| checkmark = "β " if model_key == current_model else "" | |
| button_text = f"{checkmark}{model_info['display']}" | |
| keyboard.append([InlineKeyboardButton(button_text, callback_data=f"model_{model_key}")]) | |
| keyboard.append([InlineKeyboardButton("Β« Back", callback_data="back_to_chat")]) | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text("βοΈ **Select Model:**", reply_markup=reply_markup, parse_mode="Markdown") | |
| elif data.startswith("model_"): | |
| model_key = data.replace("model_", "") | |
| context.user_data['model'] = model_key | |
| model_info = AVAILABLE_MODELS[model_key] | |
| text = f"""β **Model Updated** | |
| Now using: {model_info['display']} | |
| {model_info['description']}""" | |
| keyboard = [ | |
| [InlineKeyboardButton("βοΈ Model Settings", callback_data="select_model")], | |
| [InlineKeyboardButton("Β« Back to Chat", callback_data="back_to_chat")] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(text, reply_markup=reply_markup, parse_mode="Markdown") | |
| elif data == "back_to_chat": | |
| current_model = context.user_data.get('model', DEFAULT_MODEL) | |
| model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL]) | |
| text = f"""π€ **Icebox AI Chat** | |
| Active model: {model_info['display']} | |
| Streaming: Enabled β‘ | |
| Send me a message to start chatting. | |
| **Commands:** | |
| /help - Show help & available commands | |
| /image - Generate images with AI | |
| /model - View model information | |
| /clear - Clear chat history""" | |
| keyboard = [ | |
| [InlineKeyboardButton("π Help", callback_data="show_help")], | |
| [InlineKeyboardButton("ποΈ Clear Chat History", callback_data="clear_history")] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(text, reply_markup=reply_markup, parse_mode="Markdown") | |
| elif data == "clear_history": | |
| chat_id = update.effective_chat.id | |
| try: | |
| user_res = supabase.table("telegram_users").select("id").eq("chat_id", chat_id).execute() | |
| if user_res.data: | |
| user_uuid = user_res.data[0]['id'] | |
| supabase.table("chat_history").delete().eq("user_id", user_uuid).execute() | |
| await query.edit_message_text( | |
| "ποΈ **Chat history cleared!**\n\nSend a message to start a new conversation.", | |
| parse_mode="Markdown" | |
| ) | |
| else: | |
| await query.edit_message_text("Error: User not found.") | |
| except Exception as e: | |
| logger.error(f"Error clearing history: {e}") | |
| await query.edit_message_text("Error: Failed to clear chat history.") | |
| async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle incoming text messages with streaming response.""" | |
| user = update.effective_user | |
| chat_id = update.effective_chat.id | |
| message_text = update.message.text | |
| logger.info(f"Received message from chat_id: {chat_id}, user: {user.username}") | |
| # Get or create user | |
| user_uuid = await get_or_create_user(user, chat_id) | |
| if not user_uuid: | |
| logger.error(f"Failed to get user_uuid for chat_id: {chat_id}") | |
| await update.message.reply_text("Error: Unable to identify user.") | |
| return | |
| logger.info(f"User UUID: {user_uuid}") | |
| # Get current model | |
| current_model = context.user_data.get('model', DEFAULT_MODEL) | |
| logger.info(f"Using model: {current_model}") | |
| # Send initial typing action | |
| await context.bot.send_chat_action(chat_id=chat_id, action="typing") | |
| # Get chat history | |
| history = await get_chat_history(user_uuid, limit=20) | |
| # Build messages payload | |
| messages = build_messages_payload(history, message_text) | |
| # Send initial placeholder message | |
| reply_message = await update.message.reply_text("β³ _Thinking..._", parse_mode="Markdown") | |
| # Variables for streaming updates | |
| last_update_time = asyncio.get_event_loop().time() | |
| last_text = "" | |
| update_lock = asyncio.Lock() | |
| async def update_message(text): | |
| """Callback to update message with streamed content.""" | |
| nonlocal last_update_time, last_text | |
| current_time = asyncio.get_event_loop().time() | |
| time_since_update = current_time - last_update_time | |
| text_diff = len(text) - len(last_text) | |
| # Update only if enough time passed (Telegram has strict edit limits) | |
| should_update = time_since_update >= STREAM_UPDATE_INTERVAL | |
| if should_update: | |
| async with update_lock: | |
| try: | |
| # Clean and format text | |
| cleaned_text = clean_markdown_for_telegram(text) | |
| # Add typing indicator | |
| display_text = cleaned_text + " β" | |
| # Update message (with rate limiting protection) | |
| if display_text != last_text: | |
| await reply_message.edit_text(display_text, parse_mode="HTML") | |
| last_text = display_text | |
| last_update_time = current_time | |
| # Keep showing typing action (only every 4 seconds to save resources) | |
| if 'last_action_time' not in context.user_data or (current_time - context.user_data.get('last_action_time', 0) > 4): | |
| await context.bot.send_chat_action(chat_id=chat_id, action="typing") | |
| context.user_data['last_action_time'] = current_time | |
| except Exception as e: | |
| # Ignore update errors (like message not modified) | |
| if "Message is not modified" not in str(e): | |
| logger.warning(f"Error updating message: {e}") | |
| # Call streaming API | |
| ai_reply = await call_pollinations_api_streaming(messages, current_model, update_message) | |
| if ai_reply: | |
| logger.info(f"Received complete AI reply, length: {len(ai_reply)}") | |
| # Save user message | |
| await save_chat_message(user_uuid, "user", message_text, model_used=current_model) | |
| # Save AI reply | |
| await save_chat_message(user_uuid, "assistant", ai_reply, model_used=current_model) | |
| # Log for analytics | |
| await log_chat_generation(user_uuid, chat_id, current_model, message_text, ai_reply) | |
| # Final update - remove typing indicator | |
| ai_reply_cleaned = clean_markdown_for_telegram(ai_reply) | |
| try: | |
| # Send final version without typing indicator | |
| if len(ai_reply_cleaned) > 4096: | |
| # Delete placeholder and send in chunks | |
| await reply_message.delete() | |
| for i in range(0, len(ai_reply_cleaned), 4096): | |
| chunk = ai_reply_cleaned[i:i+4096] | |
| try: | |
| await update.message.reply_text(chunk, parse_mode="HTML") | |
| except Exception as e: | |
| logger.warning(f"HTML parse error, sending as plain text: {e}") | |
| await update.message.reply_text(chunk) | |
| else: | |
| await reply_message.edit_text(ai_reply_cleaned, parse_mode="HTML") | |
| except Exception as e: | |
| logger.warning(f"Error in final update: {e}") | |
| try: | |
| await reply_message.edit_text(ai_reply_cleaned) | |
| except: | |
| pass | |
| else: | |
| logger.error("No AI reply received from API") | |
| await reply_message.edit_text( | |
| "β οΈ Sorry, an error occurred while contacting the AI. Please try again." | |
| ) | |
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /help command.""" | |
| current_model = context.user_data.get('model', DEFAULT_MODEL) | |
| model_info = AVAILABLE_MODELS.get(current_model, AVAILABLE_MODELS[DEFAULT_MODEL]) | |
| help_text = f"""π **Icebox AI Chat Help** | |
| **Available commands:** | |
| β’ /start - Start the bot | |
| β’ /model - View model info | |
| β’ /image - Generate AI images | |
| β’ /clear - Clear chat history | |
| β’ /help - Show this help | |
| **How to use:** | |
| Send a text message to ask questions. The bot remembers conversation context and responses stream in real-time β‘ | |
| **Current model:** {model_info['display']} | |
| **Vision support:** {"Yes β " if model_info['supports_vision'] else "No β"} | |
| **Streaming:** Enabled β‘ | |
| **Tips:** | |
| β’ Chat history is saved, so the AI remembers previous conversations | |
| β’ Use /clear to start a new conversation without previous context | |
| β’ Watch responses appear in real-time as the AI generates them! | |
| """ | |
| await update.message.reply_text(help_text, parse_mode="Markdown") | |
| async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /clear command to clear chat history.""" | |
| chat_id = update.effective_chat.id | |
| try: | |
| user_res = supabase.table("telegram_users").select("id").eq("chat_id", chat_id).execute() | |
| if user_res.data: | |
| user_uuid = user_res.data[0]['id'] | |
| supabase.table("chat_history").delete().eq("user_id", user_uuid).execute() | |
| await update.message.reply_text( | |
| "ποΈ **Chat history cleared!**\n\nSend a message to start a new conversation.", | |
| parse_mode="Markdown" | |
| ) | |
| else: | |
| await update.message.reply_text("Error: User not found. Please type /start first.") | |
| except Exception as e: | |
| logger.error(f"Error clearing history: {e}") | |
| await update.message.reply_text("Error: Failed to clear chat history.") | |
| async def image_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /image command - redirect to image generation bot.""" | |
| image_text = """ποΈ **Generate AI Images** | |
| To create AI-generated images, please use our dedicated image bot: | |
| π @iceboxai_bot | |
| Just start a chat with that bot and describe the image you want to generate!""" | |
| await update.message.reply_text(image_text, parse_mode="Markdown") | |
| def main(): | |
| """Start the chat bot.""" | |
| if not TELEGRAM_TOKEN: | |
| print("Error: TELEGRAM_CHAT_BOT_TOKEN not found in environment variables.") | |
| return | |
| if not SUPABASE_URL or not SUPABASE_KEY: | |
| print("Error: SUPABASE_URL or SUPABASE_KEY not found.") | |
| return | |
| base_url = os.getenv("TELEGRAM_API_BASE_URL") | |
| base_file_url = os.getenv("TELEGRAM_API_FILE_URL") | |
| # Increase timeout significantly for slow proxies (n8n/etc) | |
| from telegram.request import HTTPXRequest | |
| request_config = HTTPXRequest( | |
| connect_timeout=30.0, | |
| read_timeout=60.0, | |
| write_timeout=30.0, | |
| pool_timeout=30.0 | |
| ) | |
| builder = Application.builder().token(TELEGRAM_TOKEN).request(request_config) | |
| if base_url: | |
| builder.base_url(base_url) | |
| print(f"Using Custom API URL: {base_url}") | |
| if base_file_url: | |
| builder.base_file_url(base_file_url) | |
| application = builder.build() | |
| # Command handlers | |
| application.add_handler(CommandHandler("start", start)) | |
| application.add_handler(CommandHandler("model", model_command)) | |
| application.add_handler(CommandHandler("help", help_command)) | |
| application.add_handler(CommandHandler("clear", clear_command)) | |
| application.add_handler(CommandHandler("image", image_command)) | |
| # Callback query handler | |
| application.add_handler(CallbackQueryHandler(handle_model_callback)) | |
| # Message handlers | |
| application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message)) | |
| print("Icebox AI Chat Bot (Streaming Enabled) is running...") | |
| application.run_polling() | |
| def run_dummy_server(): | |
| """Run a dummy Flask server on port 7860 for Hugging Face Spaces.""" | |
| app = Flask(__name__) | |
| def health_check(): | |
| return "Chat Bot is running!", 200 | |
| port = int(os.getenv("PORT", 7860)) | |
| app.run(host='0.0.0.0', port=port) | |
| if __name__ == "__main__": | |
| # Start dummy server in a separate thread | |
| threading.Thread(target=run_dummy_server, daemon=True).start() | |
| main() | |