Spaces:
Runtime error
Runtime error
| """ | |
| MTXtyle - Kiyim va Aksessuarlar Do'koni Telegram Bot | |
| """ | |
| import logging | |
| import logging | |
| import openpyxl | |
| import shutil | |
| import os | |
| from io import BytesIO | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, ReplyKeyboardMarkup, ReplyKeyboardRemove | |
| from telegram.ext import ( | |
| Application, CommandHandler, CallbackQueryHandler, | |
| ConversationHandler, MessageHandler, filters | |
| ) | |
| from config import * | |
| from database import * | |
| # Logging configuration | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("bot.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ==================== MIDDLEWARE (Rate Limiting) ==================== | |
| import time | |
| from collections import defaultdict | |
| class ThrottlingMiddleware: | |
| def __init__(self, limit=1.0): | |
| self.rate_limit = limit | |
| self.last_time = defaultdict(float) | |
| async def __call__(self, update: Update, context): | |
| if not update.effective_user: | |
| return | |
| user_id = update.effective_user.id | |
| current_time = time.time() | |
| if current_time - self.last_time[user_id] < self.rate_limit: | |
| # Too fast | |
| return | |
| self.last_time[user_id] = current_time | |
| # ConversationHandler states | |
| (ADD_NAME, ADD_DESC, ADD_PRICE, ADD_CATEGORY, ADD_PHOTO) = range(5) | |
| (ORDER_NAME, ORDER_PHONE, ORDER_ADDRESS) = range(10, 13) | |
| (EDIT_FIELD, EDIT_VALUE) = range(20, 22) | |
| (BROADCAST_MSG,) = range(40, 41) | |
| SEARCH_QUERY = 30 | |
| # ==================== HELPERS ==================== | |
| def is_admin(user_id): | |
| return user_id in ADMIN_IDS | |
| def format_price(price): | |
| return f"{price:,} {CURRENCY}".replace(",", " ") | |
| def main_menu_keyboard(): | |
| return InlineKeyboardMarkup([ | |
| [InlineKeyboardButton("🛍 Katalog", callback_data="catalog"), | |
| InlineKeyboardButton("🔍 Qidirish", callback_data="search")], | |
| [InlineKeyboardButton("🛒 Savat", callback_data="cart"), | |
| InlineKeyboardButton("📦 Buyurtmalarim", callback_data="my_orders")], | |
| [InlineKeyboardButton("📞 Bog'lanish", callback_data="contact"), | |
| InlineKeyboardButton("ℹ️ Yordam", callback_data="help")], | |
| ]) | |
| # ==================== START & MENU ==================== | |
| async def start_command(update: Update, context): | |
| user = update.effective_user | |
| await add_user(user.id, user.username, user.first_name) | |
| if ADMIN_IDS == []: | |
| # Birinchi foydalanuvchini admin qilib belgilash | |
| ADMIN_IDS.append(user.id) | |
| logger.info(f"Admin belgilandi: {user.id}") | |
| text = ( | |
| f"Assalomu alaykum, {user.first_name}! 👋\n\n" | |
| f"🏪 **{STORE_NAME}** ga xush kelibsiz!\n\n" | |
| f"{STORE_DESCRIPTION}\n\n" | |
| f"Quyidagi tugmalardan birini tanlang:" | |
| ) | |
| if update.callback_query: | |
| await update.callback_query.edit_message_text(text, reply_markup=main_menu_keyboard(), parse_mode="Markdown") | |
| else: | |
| await update.message.reply_text(text, reply_markup=main_menu_keyboard(), parse_mode="Markdown") | |
| # ==================== CATALOG ==================== | |
| async def show_categories(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| categories = await get_categories() | |
| buttons = [] | |
| for cat in categories: | |
| count_text = "" | |
| prods, total = await get_products_by_category(cat["id"], 0, 1) | |
| count_text = f" ({total})" | |
| buttons.append([InlineKeyboardButton(f"{cat['emoji']} {cat['name']}{count_text}", callback_data=f"cat_{cat['id']}")]) | |
| buttons.append([InlineKeyboardButton("🔙 Orqaga", callback_data="main_menu")]) | |
| await query.edit_message_text("📂 **Kategoriyalarni tanlang:**", reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def show_products(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| data = query.data | |
| parts = data.split("_") | |
| category_id = int(parts[1]) | |
| page = int(parts[2]) if len(parts) > 2 else 0 | |
| products, total = await get_products_by_category(category_id, page, PRODUCTS_PER_PAGE) | |
| category = await get_category_by_id(category_id) | |
| if not products: | |
| buttons = [[InlineKeyboardButton("🔙 Kategoriyalar", callback_data="catalog")]] | |
| await query.edit_message_text(f"😔 Bu kategoriyada hozircha mahsulot yo'q.", reply_markup=InlineKeyboardMarkup(buttons)) | |
| return | |
| text = f"{category['emoji']} **{category['name']}**\n\n" | |
| buttons = [] | |
| for p in products: | |
| text += f"📌 *{p['name']}*\n💰 {format_price(p['price'])}\n\n" | |
| buttons.append([ | |
| InlineKeyboardButton(f"👁 {p['name']}", callback_data=f"prod_{p['id']}"), | |
| InlineKeyboardButton("🛒 Qo'shish", callback_data=f"addcart_{p['id']}") | |
| ]) | |
| # Pagination | |
| nav_buttons = [] | |
| total_pages = (total + PRODUCTS_PER_PAGE - 1) // PRODUCTS_PER_PAGE | |
| if page > 0: | |
| nav_buttons.append(InlineKeyboardButton("⬅️", callback_data=f"cat_{category_id}_{page-1}")) | |
| nav_buttons.append(InlineKeyboardButton(f"{page+1}/{total_pages}", callback_data="noop")) | |
| if page < total_pages - 1: | |
| nav_buttons.append(InlineKeyboardButton("➡️", callback_data=f"cat_{category_id}_{page+1}")) | |
| if nav_buttons: | |
| buttons.append(nav_buttons) | |
| buttons.append([InlineKeyboardButton("🔙 Kategoriyalar", callback_data="catalog")]) | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def show_product_detail(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| product_id = int(query.data.split("_")[1]) | |
| product = await get_product_by_id(product_id) | |
| if not product: | |
| await query.edit_message_text("❌ Mahsulot topilmadi.") | |
| return | |
| text = ( | |
| f"📌 **{product['name']}**\n\n" | |
| f"📝 {product['description']}\n\n" | |
| f"💰 Narxi: **{format_price(product['price'])}**\n" | |
| f"{'✅ Mavjud' if product['in_stock'] else '❌ Tugagan'}" | |
| ) | |
| buttons = [ | |
| [InlineKeyboardButton("🛒 Savatga qo'shish", callback_data=f"addcart_{product_id}")], | |
| [InlineKeyboardButton("🔙 Orqaga", callback_data=f"cat_{product['category_id']}")] | |
| ] | |
| if product.get("photo_id"): | |
| await query.message.delete() | |
| await query.message.chat.send_photo( | |
| photo=product["photo_id"], caption=text, | |
| reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown" | |
| ) | |
| else: | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| # ==================== CART ==================== | |
| async def add_to_cart_handler(update: Update, context): | |
| query = update.callback_query | |
| product_id = int(query.data.split("_")[1]) | |
| user_id = query.from_user.id | |
| await add_to_cart(user_id, product_id, 1) | |
| await query.answer("✅ Savatga qo'shildi!", show_alert=True) | |
| async def show_cart(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| user_id = query.from_user.id | |
| cart = await get_cart(user_id) | |
| if not cart: | |
| buttons = [[InlineKeyboardButton("🛍 Katalog", callback_data="catalog"), | |
| InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]] | |
| await query.edit_message_text("🛒 Savatingiz bo'sh.", reply_markup=InlineKeyboardMarkup(buttons)) | |
| return | |
| total = await get_cart_total(user_id) | |
| text = "🛒 **Savatingiz:**\n\n" | |
| buttons = [] | |
| for item in cart: | |
| subtotal = item["price"] * item["quantity"] | |
| text += f"📌 {item['name']}\n {item['quantity']} x {format_price(item['price'])} = {format_price(subtotal)}\n\n" | |
| buttons.append([ | |
| InlineKeyboardButton("➖", callback_data=f"cartminus_{item['product_id']}"), | |
| InlineKeyboardButton(f"{item['quantity']} dona", callback_data="noop"), | |
| InlineKeyboardButton("➕", callback_data=f"cartplus_{item['product_id']}"), | |
| InlineKeyboardButton("🗑", callback_data=f"cartdel_{item['product_id']}") | |
| ]) | |
| text += f"💰 **Jami: {format_price(total)}**" | |
| buttons.append([InlineKeyboardButton("✅ Buyurtma berish", callback_data="checkout")]) | |
| buttons.append([InlineKeyboardButton("🗑 Savatni tozalash", callback_data="clear_cart"), | |
| InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]) | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def cart_plus(update: Update, context): | |
| query = update.callback_query | |
| product_id = int(query.data.split("_")[1]) | |
| await add_to_cart(query.from_user.id, product_id, 1) | |
| await show_cart(update, context) | |
| async def cart_minus(update: Update, context): | |
| query = update.callback_query | |
| product_id = int(query.data.split("_")[1]) | |
| await add_to_cart(query.from_user.id, product_id, -1) | |
| await show_cart(update, context) | |
| async def cart_delete(update: Update, context): | |
| query = update.callback_query | |
| product_id = int(query.data.split("_")[1]) | |
| await remove_from_cart(query.from_user.id, product_id) | |
| await show_cart(update, context) | |
| async def clear_cart_handler(update: Update, context): | |
| query = update.callback_query | |
| await clear_cart(query.from_user.id) | |
| await query.answer("🗑 Savat tozalandi!", show_alert=True) | |
| await show_cart(update, context) | |
| # ==================== CHECKOUT ==================== | |
| async def checkout_start(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| cart = await get_cart(query.from_user.id) | |
| if not cart: | |
| await query.edit_message_text("🛒 Savatingiz bo'sh!") | |
| return ConversationHandler.END | |
| keyboard = [[InlineKeyboardButton("🔙 Bekor qilis", callback_data="cancel_checkout")]] | |
| await query.edit_message_text("📝 **Buyurtma berish**\n\nIsmingizni kiriting:", parse_mode="Markdown", reply_markup=InlineKeyboardMarkup(keyboard)) | |
| return ORDER_NAME | |
| async def checkout_name(update: Update, context): | |
| if update.callback_query and update.callback_query.data == "cancel_checkout": | |
| await checkout_cancel(update, context) | |
| return ConversationHandler.END | |
| context.user_data["order_name"] = update.message.text | |
| contact_btn = KeyboardButton("📱 Raqamni yuborish", request_contact=True) | |
| reply_markup = ReplyKeyboardMarkup([[contact_btn]], resize_keyboard=True, one_time_keyboard=True) | |
| await update.message.reply_text( | |
| "📱 Telefon raqamingizni kiriting:\n(Masalan: +998901234567)\nYoki pastdagi tugmani bosing:", | |
| reply_markup=reply_markup | |
| ) | |
| return ORDER_PHONE | |
| async def checkout_phone(update: Update, context): | |
| if update.message.contact: | |
| phone = update.message.contact.phone_number | |
| else: | |
| phone = update.message.text | |
| # Simple validation | |
| import re | |
| if not re.match(r"^\+?[0-9]{9,15}$", phone): | |
| await update.message.reply_text("❌ Noto'g'ri format! Qaytadan kiriting yoki tugmani bosing:") | |
| return ORDER_PHONE | |
| context.user_data["order_phone"] = phone | |
| await update.message.reply_text("📍 Yetkazib berish manzilini kiriting:", reply_markup=ReplyKeyboardRemove()) | |
| return ORDER_ADDRESS | |
| async def checkout_address(update: Update, context): | |
| user_id = update.effective_user.id | |
| full_name = context.user_data.get("order_name", "") | |
| phone = context.user_data.get("order_phone", "") | |
| address = update.message.text | |
| order_id = await create_order(user_id, full_name, phone, address) | |
| if not order_id: | |
| await update.message.reply_text("❌ Xatolik! Savatingiz bo'sh.", reply_markup=main_menu_keyboard()) | |
| return ConversationHandler.END | |
| order = await get_order(order_id) | |
| items = await get_order_items(order_id) | |
| text = ( | |
| f"✅ **Buyurtma #{order_id} qabul qilindi!**\n\n" | |
| f"👤 {full_name}\n📱 {phone}\n📍 {address}\n\n" | |
| f"📦 Mahsulotlar:\n" | |
| ) | |
| for item in items: | |
| text += f" • {item['product_name']} x{item['quantity']} — {format_price(item['price'] * item['quantity'])}\n" | |
| text += f"\n💰 **Jami: {format_price(order['total_price'])}**\n\n" | |
| text += "📞 Tez orada siz bilan bog'lanamiz!" | |
| await update.message.reply_text(text, reply_markup=main_menu_keyboard(), parse_mode="Markdown") | |
| # Adminga xabar | |
| for admin_id in ADMIN_IDS: | |
| try: | |
| admin_text = ( | |
| f"🆕 **Yangi buyurtma #{order_id}!**\n\n" | |
| f"👤 {full_name}\n📱 {phone}\n📍 {address}\n" | |
| f"💰 Jami: {format_price(order['total_price'])}\n\n" | |
| ) | |
| for item in items: | |
| admin_text += f" • {item['product_name']} x{item['quantity']}\n" | |
| admin_buttons = [[ | |
| InlineKeyboardButton("✅ Qabul qilish", callback_data=f"orderstatus_{order_id}_qabul_qilindi"), | |
| InlineKeyboardButton("❌ Bekor", callback_data=f"orderstatus_{order_id}_bekor_qilindi") | |
| ]] | |
| await context.bot.send_message(admin_id, admin_text, reply_markup=InlineKeyboardMarkup(admin_buttons), parse_mode="Markdown") | |
| except Exception as e: | |
| logger.error(f"Admin xabar xato: {e}") | |
| return ConversationHandler.END | |
| async def checkout_cancel(update: Update, context): | |
| # Remove keyboard if present | |
| reply_markup = ReplyKeyboardRemove() | |
| if update.callback_query: | |
| await update.callback_query.message.reply_text("❌ Buyurtma bekor qilindi.", reply_markup=reply_markup) | |
| await update.callback_query.message.reply_text("🏠 Asosiy menyu:", reply_markup=main_menu_keyboard()) | |
| else: | |
| await update.message.reply_text("❌ Buyurtma bekor qilindi.", reply_markup=reply_markup) | |
| await update.message.reply_text("🏠 Asosiy menyu:", reply_markup=main_menu_keyboard()) | |
| return ConversationHandler.END | |
| # ==================== MY ORDERS ==================== | |
| async def show_my_orders(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| orders = await get_user_orders(query.from_user.id) | |
| if not orders: | |
| buttons = [[InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]] | |
| await query.edit_message_text("📦 Sizda hali buyurtmalar yo'q.", reply_markup=InlineKeyboardMarkup(buttons)) | |
| return | |
| text = "📦 **Buyurtmalaringiz:**\n\n" | |
| for o in orders[:10]: | |
| status = ORDER_STATUSES.get(o["status"], o["status"]) | |
| text += f"🔹 #{o['id']} — {format_price(o['total_price'])} — {status}\n" | |
| buttons = [[InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]] | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| # ==================== SEARCH ==================== | |
| async def search_start(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| await query.edit_message_text("🔍 Mahsulot nomini yozing:") | |
| return SEARCH_QUERY | |
| async def search_results(update: Update, context): | |
| query_text = update.message.text | |
| products = await search_products(query_text) | |
| if not products: | |
| await update.message.reply_text( | |
| f"😔 \"{query_text}\" bo'yicha hech narsa topilmadi.", | |
| reply_markup=main_menu_keyboard() | |
| ) | |
| return ConversationHandler.END | |
| text = f"🔍 **Natijalar: \"{query_text}\"**\n\n" | |
| buttons = [] | |
| for p in products[:10]: | |
| text += f"📌 {p['name']} — {format_price(p['price'])}\n" | |
| buttons.append([ | |
| InlineKeyboardButton(f"👁 {p['name']}", callback_data=f"prod_{p['id']}"), | |
| InlineKeyboardButton("🛒", callback_data=f"addcart_{p['id']}") | |
| ]) | |
| buttons.append([InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]) | |
| await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| return ConversationHandler.END | |
| # ==================== CONTACT & HELP ==================== | |
| async def show_contact(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| text = f"📞 **Bog'lanish**\n\n{CONTACT_INFO}\n\nSavollaringiz bo'lsa, yozing!" | |
| buttons = [[InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]] | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def show_help(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| text = ( | |
| "ℹ️ **Qanday foydalanish:**\n\n" | |
| "1️⃣ 🛍 Katalogdan mahsulot tanlang\n" | |
| "2️⃣ 🛒 Savatga qo'shing\n" | |
| "3️⃣ ✅ Buyurtma bering\n" | |
| "4️⃣ 📞 Biz siz bilan bog'lanamiz\n\n" | |
| "🔍 Qidirish orqali kerakli mahsulotni topishingiz mumkin!" | |
| ) | |
| buttons = [[InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")]] | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| # ==================== ADMIN ==================== | |
| async def admin_command(update: Update, context): | |
| if not is_admin(update.effective_user.id): | |
| await update.message.reply_text("⛔ Sizda admin huquqi yo'q!") | |
| return | |
| buttons = [ | |
| [InlineKeyboardButton("➕ Mahsulot qo'shish", callback_data="admin_add"), | |
| InlineKeyboardButton("📋 Mahsulotlar", callback_data="admin_products")], | |
| [InlineKeyboardButton("📦 Buyurtmalar", callback_data="admin_orders"), | |
| InlineKeyboardButton("📊 Statistika", callback_data="admin_stats")], | |
| [InlineKeyboardButton("📢 Broadcast", callback_data="admin_broadcast"), | |
| InlineKeyboardButton("📂 Export Excel", callback_data="admin_export")], | |
| [InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")] | |
| ] | |
| await update.message.reply_text("⚙️ **Admin Panel**", reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def admin_panel_callback(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔ Admin emas!", show_alert=True) | |
| return | |
| await query.answer() | |
| buttons = [ | |
| [InlineKeyboardButton("➕ Mahsulot qo'shish", callback_data="admin_add"), | |
| InlineKeyboardButton("📋 Mahsulotlar", callback_data="admin_products")], | |
| [InlineKeyboardButton("📦 Buyurtmalar", callback_data="admin_orders"), | |
| InlineKeyboardButton("📊 Statistika", callback_data="admin_stats")], | |
| [InlineKeyboardButton("📢 Broadcast", callback_data="admin_broadcast"), | |
| InlineKeyboardButton("📂 Export Excel", callback_data="admin_export")], | |
| [InlineKeyboardButton("🔙 Menyu", callback_data="main_menu")] | |
| ] | |
| await query.edit_message_text("⚙️ **Admin Panel**", reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| # Admin: Mahsulot qo'shish | |
| async def admin_add_start(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return ConversationHandler.END | |
| await query.answer() | |
| await query.edit_message_text("📝 Mahsulot nomini kiriting:") | |
| return ADD_NAME | |
| async def admin_add_name(update: Update, context): | |
| context.user_data["new_product"] = {"name": update.message.text} | |
| await update.message.reply_text("📝 Tavsifini kiriting:") | |
| return ADD_DESC | |
| async def admin_add_desc(update: Update, context): | |
| context.user_data["new_product"]["description"] = update.message.text | |
| await update.message.reply_text("💰 Narxini kiriting (faqat raqam, so'mda):") | |
| return ADD_PRICE | |
| async def admin_add_price(update: Update, context): | |
| try: | |
| price = int(update.message.text.replace(" ", "").replace(",", "")) | |
| context.user_data["new_product"]["price"] = price | |
| except ValueError: | |
| await update.message.reply_text("❌ Noto'g'ri raqam! Qaytadan kiriting:") | |
| return ADD_PRICE | |
| categories = await get_categories() | |
| buttons = [] | |
| for cat in categories: | |
| buttons.append([InlineKeyboardButton(f"{cat['emoji']} {cat['name']}", callback_data=f"newcat_{cat['id']}")]) | |
| await update.message.reply_text("📂 Kategoriyani tanlang:", reply_markup=InlineKeyboardMarkup(buttons)) | |
| return ADD_CATEGORY | |
| async def admin_add_category(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| category_id = int(query.data.split("_")[1]) | |
| context.user_data["new_product"]["category_id"] = category_id | |
| await query.edit_message_text("📷 Mahsulot rasmini yuboring (yoki /skip bosing):") | |
| return ADD_PHOTO | |
| async def admin_add_photo(update: Update, context): | |
| if update.message.photo: | |
| photo_id = update.message.photo[-1].file_id | |
| context.user_data["new_product"]["photo_id"] = photo_id | |
| elif update.message.text and update.message.text == "/skip": | |
| context.user_data["new_product"]["photo_id"] = "" | |
| else: | |
| await update.message.reply_text("📷 Rasm yuboring yoki /skip bosing:") | |
| return ADD_PHOTO | |
| p = context.user_data["new_product"] | |
| product_id = await add_product(p["name"], p["description"], p["price"], p["category_id"], p.get("photo_id", "")) | |
| await update.message.reply_text( | |
| f"✅ Mahsulot qo'shildi!\n\n" | |
| f"📌 {p['name']}\n💰 {format_price(p['price'])}\n🆔 ID: {product_id}", | |
| reply_markup=main_menu_keyboard() | |
| ) | |
| return ConversationHandler.END | |
| async def admin_add_cancel(update: Update, context): | |
| await update.message.reply_text("❌ Bekor qilindi.", reply_markup=main_menu_keyboard()) | |
| return ConversationHandler.END | |
| # Admin: Mahsulotlar ro'yxati | |
| async def admin_show_products(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| await query.answer() | |
| products = await get_all_products() | |
| if not products: | |
| buttons = [[InlineKeyboardButton("🔙 Admin", callback_data="admin_panel")]] | |
| await query.edit_message_text("📋 Mahsulotlar yo'q.", reply_markup=InlineKeyboardMarkup(buttons)) | |
| return | |
| text = "📋 **Mahsulotlar:**\n\n" | |
| buttons = [] | |
| for p in products[:20]: | |
| stock = "✅" if p["in_stock"] else "❌" | |
| text += f"{stock} #{p['id']} {p['name']} — {format_price(p['price'])}\n" | |
| buttons.append([ | |
| InlineKeyboardButton(f"✏️ #{p['id']}", callback_data=f"aedit_{p['id']}"), | |
| InlineKeyboardButton(f"🗑 #{p['id']}", callback_data=f"adel_{p['id']}") | |
| ]) | |
| buttons.append([InlineKeyboardButton("🔙 Admin", callback_data="admin_panel")]) | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| # Admin: Mahsulotni o'chirish | |
| async def admin_delete_product(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| product_id = int(query.data.split("_")[1]) | |
| product = await get_product_by_id(product_id) | |
| if product: | |
| await delete_product(product_id) | |
| await query.answer(f"🗑 '{product['name']}' o'chirildi!", show_alert=True) | |
| await admin_show_products(update, context) | |
| # Admin: Mahsulotni tahrirlash | |
| async def admin_edit_product(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return ConversationHandler.END | |
| await query.answer() | |
| product_id = int(query.data.split("_")[1]) | |
| product = await get_product_by_id(product_id) | |
| if not product: | |
| await query.edit_message_text("❌ Mahsulot topilmadi.") | |
| return ConversationHandler.END | |
| context.user_data["edit_product_id"] = product_id | |
| buttons = [ | |
| [InlineKeyboardButton("📌 Nom", callback_data="editf_name"), | |
| InlineKeyboardButton("💰 Narx", callback_data="editf_price")], | |
| [InlineKeyboardButton("📝 Tavsif", callback_data="editf_description"), | |
| InlineKeyboardButton("📷 Rasm", callback_data="editf_photo")], | |
| [InlineKeyboardButton("🔄 Stock", callback_data=f"togglestock_{product_id}")], | |
| [InlineKeyboardButton("🔙 Orqaga", callback_data="admin_products")] | |
| ] | |
| text = f"✏️ **Tahrirlash: {product['name']}**\n\nNimani o'zgartirmoqchisiz?" | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| return EDIT_FIELD | |
| async def admin_edit_field(update: Update, context): | |
| query = update.callback_query | |
| await query.answer() | |
| field = query.data.split("_")[1] | |
| context.user_data["edit_field"] = field | |
| labels = {"name": "Yangi nom", "price": "Yangi narx (raqam)", "description": "Yangi tavsif"} | |
| await query.edit_message_text(f"✏️ {labels.get(field, field)} kiriting:") | |
| return EDIT_VALUE | |
| async def admin_edit_value(update: Update, context): | |
| field = context.user_data.get("edit_field") | |
| product_id = context.user_data.get("edit_product_id") | |
| value = update.message.text | |
| if field == "price": | |
| try: | |
| value = int(value.replace(" ", "").replace(",", "")) | |
| except ValueError: | |
| await update.message.reply_text("❌ Noto'g'ri raqam!") | |
| return EDIT_VALUE | |
| if field == "photo": | |
| if update.message.photo: | |
| value = update.message.photo[-1].file_id | |
| field = "photo_id" | |
| else: | |
| await update.message.reply_text("📷 Rasm yuboring!") | |
| return EDIT_VALUE | |
| await update_product(product_id, **{field: value}) | |
| await update.message.reply_text(f"✅ Yangilandi!", reply_markup=main_menu_keyboard()) | |
| return ConversationHandler.END | |
| # Admin: Stock toggle | |
| async def admin_toggle_stock(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| product_id = int(query.data.split("_")[1]) | |
| product = await get_product_by_id(product_id) | |
| if product: | |
| new_stock = 0 if product["in_stock"] else 1 | |
| await update_product(product_id, in_stock=new_stock) | |
| status = "✅ Mavjud" if new_stock else "❌ Tugagan" | |
| await query.answer(f"Status: {status}", show_alert=True) | |
| # Admin: Buyurtmalar | |
| async def admin_show_orders(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| await query.answer() | |
| orders = await get_all_orders() | |
| if not orders: | |
| buttons = [[InlineKeyboardButton("🔙 Admin", callback_data="admin_panel")]] | |
| await query.edit_message_text("📦 Buyurtmalar yo'q.", reply_markup=InlineKeyboardMarkup(buttons)) | |
| return | |
| text = "📦 **Buyurtmalar:**\n\n" | |
| buttons = [] | |
| for o in orders[:15]: | |
| status = ORDER_STATUSES.get(o["status"], o["status"]) | |
| text += f"#{o['id']} | {o['full_name']} | {format_price(o['total_price'])} | {status}\n" | |
| buttons.append([InlineKeyboardButton(f"👁 #{o['id']}", callback_data=f"orderdetail_{o['id']}")]) | |
| buttons.append([InlineKeyboardButton("🔙 Admin", callback_data="admin_panel")]) | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def admin_order_detail(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| await query.answer() | |
| order_id = int(query.data.split("_")[1]) | |
| order = await get_order(order_id) | |
| items = await get_order_items(order_id) | |
| if not order: | |
| await query.edit_message_text("❌ Buyurtma topilmadi.") | |
| return | |
| status = ORDER_STATUSES.get(order["status"], order["status"]) | |
| text = ( | |
| f"📦 **Buyurtma #{order_id}**\n\n" | |
| f"👤 {order['full_name']}\n📱 {order['phone']}\n📍 {order['address']}\n" | |
| f"📊 Status: {status}\n💰 Jami: {format_price(order['total_price'])}\n\n" | |
| f"📋 Mahsulotlar:\n" | |
| ) | |
| for item in items: | |
| text += f" • {item['product_name']} x{item['quantity']} — {format_price(item['price'] * item['quantity'])}\n" | |
| buttons = [] | |
| for s_key, s_val in ORDER_STATUSES.items(): | |
| if s_key != order["status"]: | |
| buttons.append([InlineKeyboardButton(s_val, callback_data=f"orderstatus_{order_id}_{s_key}")]) | |
| buttons.append([InlineKeyboardButton("🔙 Buyurtmalar", callback_data="admin_orders")]) | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| async def admin_change_order_status(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| parts = query.data.split("_") | |
| order_id = int(parts[1]) | |
| new_status = "_".join(parts[2:]) | |
| await update_order_status(order_id, new_status) | |
| status_text = ORDER_STATUSES.get(new_status, new_status) | |
| await query.answer(f"Status: {status_text}", show_alert=True) | |
| # Mijozga xabar | |
| order = await get_order(order_id) | |
| if order: | |
| try: | |
| await context.bot.send_message( | |
| order["user_id"], | |
| f"📦 Buyurtma #{order_id} statusi: {status_text}" | |
| ) | |
| except Exception: | |
| pass | |
| await admin_order_detail(update, context) | |
| # Admin: Statistika | |
| async def admin_show_stats(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| await query.answer() | |
| stats = await get_stats() | |
| text = ( | |
| f"📊 **Statistika**\n\n" | |
| f"📦 Mahsulotlar: {stats['products_count']}\n" | |
| f"🛒 Buyurtmalar: {stats['orders_count']}\n" | |
| f"🆕 Yangi buyurtmalar: {stats['new_orders']}\n" | |
| f"👥 Mijozlar: {stats['unique_customers']}\n" | |
| f"💰 Umumiy daromad: {format_price(stats['total_revenue'])}" | |
| ) | |
| buttons = [[InlineKeyboardButton("🔙 Admin", callback_data="admin_panel")]] | |
| await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode="Markdown") | |
| # Admin: Export Excel | |
| async def admin_export(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return | |
| await query.answer("⏳ Fayl tayyorlanmoqda...") | |
| try: | |
| orders = await get_all_orders_with_items() | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| ws.title = "Buyurtmalar" | |
| headers = ["ID", "Sana", "Mijoz", "Telefon", "Manzil", "Mahsulot", "Soni", "Narx", "Status", "Jami"] | |
| ws.append(headers) | |
| for o in orders: | |
| row = [ | |
| o["id"], time.strftime('%Y-%m-%d %H:%M', time.localtime(o["created_at"])), | |
| o["full_name"], o["phone"], o["address"], | |
| o["product_name"], o["quantity"], o["price"], | |
| o["status"], o["total_price"] | |
| ] | |
| ws.append(row) | |
| bio = BytesIO() | |
| wb.save(bio) | |
| bio.seek(0) | |
| await query.message.reply_document( | |
| document=bio, | |
| filename=f"orders_{time.strftime('%Y%m%d')}.xlsx", | |
| caption="📂 Buyurtmalar tarixi" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Export error: {e}") | |
| await query.message.reply_text("❌ Xatolik yuz berdi.") | |
| # Admin: Broadcast | |
| async def admin_broadcast_start(update: Update, context): | |
| query = update.callback_query | |
| if not is_admin(query.from_user.id): | |
| await query.answer("⛔", show_alert=True) | |
| return ConversationHandler.END | |
| await query.answer() | |
| await query.message.reply_text("📢 Xabarni yuboring (matn, rasm yoki forward):") | |
| return BROADCAST_MSG | |
| async def admin_broadcast_send(update: Update, context): | |
| user_id = update.effective_user.id | |
| users = await get_all_users() | |
| count = 0 | |
| status_msg = await update.message.reply_text(f"⏳ Yuborilmoqda... (0/{len(users)})") | |
| for uid in users: | |
| try: | |
| await update.message.copy(uid) | |
| count += 1 | |
| except Exception: | |
| pass | |
| if count % 10 == 0: | |
| try: | |
| await status_msg.edit_text(f"⏳ Yuborilmoqda... ({count}/{len(users)})") | |
| except Exception: | |
| pass | |
| await status_msg.edit_text(f"✅ Xabar {count} ta foydalanuvchiga yuborildi.", reply_markup=main_menu_keyboard()) | |
| return ConversationHandler.END | |
| async def admin_broadcast_cancel(update: Update, context): | |
| await update.message.reply_text("❌ Bekor qilindi.", reply_markup=main_menu_keyboard()) | |
| return ConversationHandler.END | |
| # Admin: Backup | |
| async def admin_backup(update: Update, context): | |
| if not is_admin(update.effective_user.id): | |
| return | |
| try: | |
| if not os.path.exists("backups"): | |
| os.makedirs("backups") | |
| backup_path = f"backups/store_backup_{int(time.time())}.db" | |
| shutil.copy(DATABASE_PATH, backup_path) | |
| await update.message.reply_document( | |
| document=open(backup_path, "rb"), | |
| caption=f"✅ Baza nusxalandi: {backup_path}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Backup error: {e}") | |
| await update.message.reply_text("❌ Backup xatolik.") | |
| # ==================== NOOP ==================== | |
| async def noop_callback(update: Update, context): | |
| await update.callback_query.answer() | |
| # ==================== MAIN ==================== | |
| def main(): | |
| app = Application.builder().token(BOT_TOKEN).build() | |
| # Checkout conversation | |
| checkout_conv = ConversationHandler( | |
| entry_points=[CallbackQueryHandler(checkout_start, pattern="^checkout$")], | |
| states={ | |
| ORDER_NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, checkout_name)], | |
| ORDER_PHONE: [MessageHandler(filters.TEXT & ~filters.COMMAND, checkout_phone)], | |
| ORDER_ADDRESS: [MessageHandler(filters.TEXT & ~filters.COMMAND, checkout_address)], | |
| }, | |
| fallbacks=[CommandHandler("cancel", checkout_cancel)], | |
| ) | |
| # Admin add product conversation | |
| admin_add_conv = ConversationHandler( | |
| entry_points=[CallbackQueryHandler(admin_add_start, pattern="^admin_add$")], | |
| states={ | |
| ADD_NAME: [MessageHandler(filters.TEXT & ~filters.COMMAND, admin_add_name)], | |
| ADD_DESC: [MessageHandler(filters.TEXT & ~filters.COMMAND, admin_add_desc)], | |
| ADD_PRICE: [MessageHandler(filters.TEXT & ~filters.COMMAND, admin_add_price)], | |
| ADD_CATEGORY: [CallbackQueryHandler(admin_add_category, pattern="^newcat_")], | |
| ADD_PHOTO: [ | |
| MessageHandler(filters.PHOTO, admin_add_photo), | |
| CommandHandler("skip", admin_add_photo), | |
| ], | |
| }, | |
| fallbacks=[CommandHandler("cancel", admin_add_cancel)], | |
| ) | |
| # Admin edit product conversation | |
| admin_edit_conv = ConversationHandler( | |
| entry_points=[CallbackQueryHandler(admin_edit_product, pattern=r"^aedit_\d+$")], | |
| states={ | |
| EDIT_FIELD: [ | |
| CallbackQueryHandler(admin_edit_field, pattern="^editf_"), | |
| CallbackQueryHandler(admin_toggle_stock, pattern="^togglestock_"), | |
| CallbackQueryHandler(admin_show_products, pattern="^admin_products$"), | |
| ], | |
| EDIT_VALUE: [ | |
| MessageHandler(filters.TEXT & ~filters.COMMAND, admin_edit_value), | |
| MessageHandler(filters.PHOTO, admin_edit_value), | |
| ], | |
| }, | |
| fallbacks=[CommandHandler("cancel", admin_add_cancel)], | |
| ) | |
| # Search conversation | |
| search_conv = ConversationHandler( | |
| entry_points=[CallbackQueryHandler(search_start, pattern="^search$")], | |
| states={ | |
| SEARCH_QUERY: [MessageHandler(filters.TEXT & ~filters.COMMAND, search_results)], | |
| }, | |
| fallbacks=[CommandHandler("cancel", checkout_cancel)], | |
| ) | |
| # Admin broadcast conversation | |
| admin_broadcast_conv = ConversationHandler( | |
| entry_points=[CallbackQueryHandler(admin_broadcast_start, pattern="^admin_broadcast$")], | |
| states={ | |
| BROADCAST_MSG: [ | |
| MessageHandler(filters.ALL & ~filters.COMMAND, admin_broadcast_send) | |
| ], | |
| }, | |
| fallbacks=[CommandHandler("cancel", admin_broadcast_cancel)], | |
| ) | |
| # Add conversations first | |
| app.add_handler(checkout_conv) | |
| app.add_handler(admin_add_conv) | |
| app.add_handler(admin_edit_conv) | |
| app.add_handler(admin_broadcast_conv) | |
| app.add_handler(search_conv) | |
| # Commands | |
| app.add_handler(CommandHandler("start", start_command)) | |
| app.add_handler(CommandHandler("admin", admin_command)) | |
| app.add_handler(CommandHandler("backup", admin_backup)) | |
| # Callback handlers | |
| app.add_handler(CallbackQueryHandler(start_command, pattern="^main_menu$")) | |
| app.add_handler(CallbackQueryHandler(show_categories, pattern="^catalog$")) | |
| app.add_handler(CallbackQueryHandler(show_products, pattern=r"^cat_\d+")) | |
| app.add_handler(CallbackQueryHandler(show_product_detail, pattern=r"^prod_\d+$")) | |
| app.add_handler(CallbackQueryHandler(add_to_cart_handler, pattern=r"^addcart_\d+$")) | |
| app.add_handler(CallbackQueryHandler(show_cart, pattern="^cart$")) | |
| app.add_handler(CallbackQueryHandler(cart_plus, pattern=r"^cartplus_\d+$")) | |
| app.add_handler(CallbackQueryHandler(cart_minus, pattern=r"^cartminus_\d+$")) | |
| app.add_handler(CallbackQueryHandler(cart_delete, pattern=r"^cartdel_\d+$")) | |
| app.add_handler(CallbackQueryHandler(clear_cart_handler, pattern="^clear_cart$")) | |
| app.add_handler(CallbackQueryHandler(show_my_orders, pattern="^my_orders$")) | |
| app.add_handler(CallbackQueryHandler(show_contact, pattern="^contact$")) | |
| app.add_handler(CallbackQueryHandler(show_help, pattern="^help$")) | |
| app.add_handler(CallbackQueryHandler(admin_panel_callback, pattern="^admin_panel$")) | |
| app.add_handler(CallbackQueryHandler(admin_show_products, pattern="^admin_products$")) | |
| app.add_handler(CallbackQueryHandler(admin_delete_product, pattern=r"^adel_\d+$")) | |
| app.add_handler(CallbackQueryHandler(admin_show_orders, pattern="^admin_orders$")) | |
| app.add_handler(CallbackQueryHandler(admin_order_detail, pattern=r"^orderdetail_\d+$")) | |
| app.add_handler(CallbackQueryHandler(admin_change_order_status, pattern=r"^orderstatus_")) | |
| app.add_handler(CallbackQueryHandler(admin_show_stats, pattern="^admin_stats$")) | |
| app.add_handler(CallbackQueryHandler(admin_export, pattern="^admin_export$")) | |
| app.add_handler(CallbackQueryHandler(noop_callback, pattern="^noop$")) | |
| # Init DB on startup | |
| import asyncio | |
| asyncio.get_event_loop().run_until_complete(init_db()) | |
| logger.info(f"🤖 {STORE_NAME} bot ishga tushdi!") | |
| app.run_polling(drop_pending_updates=True) | |
| if __name__ == "__main__": | |
| main() | |