import asyncio import random import string import time from aiogram import types, F from aiogram.filters import Command from aiogram.fsm.context import FSMContext from aiogram.types import ( Message, InlineKeyboardMarkup, InlineKeyboardButton, BufferedInputFile ) from config import ADMINS, logger from database import ( db_pool, ensure_user, update_user_status, get_top_buyers, get_manga_queue, get_manga_queue_count, save_manga_queue, get_weekly_stats, build_weekly_report_text, get_cached_file_id, save_cached_file_id ) from scraper import ( collect_manga_links, get_manga_chapters_api, fetch_chapter_images_api, download_images_as_zip_async, download_images_as_pdf_async, get_manga_slug ) from handlers import AdminState, safe_edit, safe_answer, back_kb # ========================= # CACHING CONTROL # ========================= caching_control = {"is_running": False, "stop_requested": False} # ========================= # ADMIN KEYBOARD # ========================= def get_admin_kb(): return InlineKeyboardMarkup(inline_keyboard=[ [InlineKeyboardButton(text="📊 Stats & Leaderboard", callback_data="admin_stats")], [ InlineKeyboardButton(text="💰 Add Coins", callback_data="admin_add_coins"), InlineKeyboardButton(text="🌟 Set VIP", callback_data="admin_set_vip"), ], [ InlineKeyboardButton(text="📡 Broadcast", callback_data="admin_broadcast"), InlineKeyboardButton(text="✉️ Send PM", callback_data="admin_pm"), ], [InlineKeyboardButton(text="🎟 Voucher Guide", callback_data="admin_voucher_guide")], [InlineKeyboardButton(text="🔗 Fetch Links (71 Pages)", callback_data="admin_fetch_links")], [ InlineKeyboardButton(text="🚀 Cache ZIP (Saved Links)", callback_data="admin_start_cache_saved_zip"), InlineKeyboardButton(text="📄 Cache PDF (Saved Links)", callback_data="admin_start_cache_saved_pdf"), ], [InlineKeyboardButton(text="« Back", callback_data="back_to_start")], ]) # ========================= # BACKGROUND CACHE TASK # ========================= async def background_cache_task(bot, admin_id: int, manga_url: str, file_format: str = "zip"): from database import get_manga_slug as _slug slug = _slug(manga_url) chapters = await get_manga_chapters_api(manga_url) if not chapters: return for chap in reversed(chapters): if caching_control["stop_requested"]: break ch_id = chap["id"] if get_cached_file_id(slug, ch_id, file_format): continue images = await fetch_chapter_images_api(ch_id) if not images: continue try: if file_format == "pdf": file_data = await download_images_as_pdf_async(images, slug, ch_id) file_name = f"{slug}_chapter-{chap['slug']}.pdf" else: file_data = await download_images_as_zip_async(images, slug, ch_id) file_name = f"{slug}_chapter-{chap['slug']}.zip" sent_msg = await bot.send_document( chat_id=admin_id, document=BufferedInputFile(file_data, filename=file_name), caption=f"⚙️ Cached ({file_format.upper()}): {slug} — {chap['title']}", disable_notification=True ) new_file_id = sent_msg.document.file_id save_cached_file_id(slug, ch_id, file_format, new_file_id) await asyncio.sleep(20) except Exception as e: logger.error(f"Error caching chapter {ch_id}: {e}") await asyncio.sleep(10) async def _fetch_and_save_links(bot, admin_id: int): async def progress_cb(page_num, count): try: await bot.send_message(admin_id, f"📄 تم سحب صفحة {page_num}/71 — روابط حتى الآن: {count}") except Exception: pass all_links = await collect_manga_links( start_page=1, end_page=71, stop_flag=caching_control, progress_cb=progress_cb ) save_manga_queue(all_links) try: await bot.send_message( admin_id, f"✅ *اكتمل سحب الروابط!*\n\n" f"🔗 إجمالي الروابط المحفوظة: *{len(all_links)}*\n\n" f"اضغط على زر ZIP أو PDF لبدء الأرشفة." ) except Exception: pass async def _cache_from_queue(bot, admin_id: int, manga_links: list, file_format: str = "zip"): caching_control["is_running"] = True caching_control["stop_requested"] = False total = len(manga_links) try: await bot.send_message(admin_id, f"🛰 بدأت الأرشفة ({file_format.upper()}): {total} مانجا في الطابور.") for index, manga_url in enumerate(manga_links, 1): if caching_control["stop_requested"]: await bot.send_message(admin_id, "🛑 تم إيقاف الأرشفة بناءً على طلبك.") break try: if index % 10 == 0 or index == 1: await bot.send_message(admin_id, f"📊 التقدم: {index}/{total}") await background_cache_task(bot, admin_id, manga_url, file_format=file_format) await asyncio.sleep(5) except Exception as e: logger.error(f"Error caching {manga_url}: {e}") finally: caching_control["is_running"] = False caching_control["stop_requested"] = False try: await bot.send_message(admin_id, f"✅ اكتملت عملية الأرشفة ({file_format.upper()}) من الطابور المحفوظ.") except Exception: pass # ========================= # REGISTER ADMIN HANDLERS # ========================= def register_admin_handlers(dp, bot): @dp.callback_query(F.data == "admin_btn") async def admin_panel(cb: types.CallbackQuery): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await cb.answer() await safe_edit(cb.message, "⚙️ *Admin Panel*\n\nChoose an action:", reply_markup=get_admin_kb()) @dp.callback_query(F.data == "admin_stats") async def admin_stats(cb: types.CallbackQuery): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await cb.answer() conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT COUNT(*) FROM users") total_users = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM opened_chapters") total_opened = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM users WHERE vip_until > %s", (int(time.time()),)) total_vip = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM manga_queue") queued_links = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM chapter_cache") cached_chapters = c.fetchone()[0] finally: db_pool.putconn(conn) top = get_top_buyers(5) medals = ["🥇","🥈","🥉","4️⃣","5️⃣"] board = "\n".join( f"{medals[i]} {('@'+uname) if uname else uid} — *{cnt}* chapters" for i, (uid, uname, cnt) in enumerate(top) ) or "No data yet" await safe_edit( cb.message, f"📊 *Bot Stats*\n\n" f"👥 Total Users: *{total_users}*\n" f"🌟 VIP Users: *{total_vip}*\n" f"📖 Total Chapters Opened: *{total_opened}*\n" f"💾 Cached Chapters: *{cached_chapters}*\n" f"🔗 Queued Links: *{queued_links}*\n\n" f"🏆 *Top Buyers:*\n{board}", reply_markup=back_kb("admin_btn") ) @dp.callback_query(F.data == "admin_voucher_guide") async def admin_voucher_guide(cb: types.CallbackQuery): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await safe_edit( cb.message, "🎟 *Voucher Creation Guide*\n\n" "• `/gen_code coins 1000` — Creates a 1000 coin code\n" "• `/gen_code vip 7` — Creates a 7-day VIP code\n" "• `/gen_code vip inf` — Creates a Lifetime VIP code\n", reply_markup=back_kb("admin_btn") ) @dp.callback_query(F.data == "admin_add_coins") async def admin_add_coins_btn(cb: types.CallbackQuery, state: FSMContext): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await cb.answer() await state.set_state(AdminState.waiting_add_coins) await safe_edit( cb.message, "💰 *Add Coins*\n\nSend: `USER_ID AMOUNT [Optional Message]`\nExample: `12345 500 Enjoy!`", reply_markup=back_kb("admin_btn") ) @dp.message(AdminState.waiting_add_coins) async def admin_add_coins_input(m: Message, state: FSMContext): if m.from_user.id not in ADMINS: return try: parts = m.text.split(maxsplit=2) uid, amount = int(parts[0]), int(parts[1]) msg_to_user = parts[2] if len(parts) > 2 else "An admin has sent you some coins! 🎉" user = ensure_user(uid) update_user_status(uid, user[2] + amount) await state.clear() await safe_answer( m, f"✅ Added *{amount}* coins to `{uid}`\nNew balance: *{user[2] + amount}*", reply_markup=back_kb("admin_btn") ) try: await bot.send_message(uid, f"💰 *{amount} Coins Added!*\n\nMessage: _{msg_to_user}_") except Exception: pass except Exception: await m.answer("❌ Format: `USER_ID AMOUNT [MESSAGE]`") @dp.callback_query(F.data == "admin_set_vip") async def admin_set_vip_btn(cb: types.CallbackQuery, state: FSMContext): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await cb.answer() await state.set_state(AdminState.waiting_vip) await safe_edit( cb.message, "🌟 *Set VIP*\n\nSend: `USER_ID DAYS [Message]` or `USER_ID inf [Message]`", reply_markup=back_kb("admin_btn") ) @dp.message(AdminState.waiting_vip) async def admin_vip_input(m: Message, state: FSMContext): if m.from_user.id not in ADMINS: return try: parts = m.text.split(maxsplit=2) uid, d = int(parts[0]), parts[1].lower() msg_to_user = parts[2] if len(parts) > 2 else "An admin has upgraded your account to VIP! 🌟" expire = 9999999999 if d == "inf" else int(time.time()) + int(d) * 86400 user = ensure_user(uid) update_user_status(uid, user[2], expire) await state.clear() vip_text = "Lifetime" if d == "inf" else f"{d} days" await safe_answer(m, f"✅ VIP set for `{uid}` — *{vip_text}*", reply_markup=back_kb("admin_btn")) try: await bot.send_message(uid, f"🌟 *VIP Status Activated!* ({vip_text})\n\nMessage: _{msg_to_user}_") except Exception: pass except Exception: await m.answer("❌ Format: `USER_ID DAYS/inf [MESSAGE]`") @dp.callback_query(F.data == "admin_pm") async def admin_pm_btn(cb: types.CallbackQuery, state: FSMContext): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await cb.answer() await state.set_state(AdminState.waiting_pm) await safe_edit(cb.message, "✉️ *Send PM*\n\nSend: `USER_ID Your Message Here`", reply_markup=back_kb("admin_btn")) @dp.message(AdminState.waiting_pm) async def admin_pm_input(m: Message, state: FSMContext): if m.from_user.id not in ADMINS: return try: parts = m.text.split(maxsplit=1) uid = int(parts[0]) msg = parts[1] await state.clear() try: await bot.send_message(uid, f"📩 *Message from Admin:*\n\n{msg}") await safe_answer(m, f"✅ PM sent to `{uid}`", reply_markup=back_kb("admin_btn")) except Exception as e: await safe_answer(m, f"❌ Failed: {e}", reply_markup=back_kb("admin_btn")) except Exception: await m.answer("❌ Format: `USER_ID Message`") @dp.callback_query(F.data == "admin_broadcast") async def admin_broadcast_btn(cb: types.CallbackQuery, state: FSMContext): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) await cb.answer() await state.set_state(AdminState.waiting_broadcast) await safe_edit(cb.message, "📡 *Broadcast*\n\nSend your message now:", reply_markup=back_kb("admin_btn")) @dp.message(AdminState.waiting_broadcast) async def admin_broadcast_input(m: Message, state: FSMContext): if m.from_user.id not in ADMINS: return text = m.text.strip() await state.clear() conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT user_id FROM users") users = c.fetchall() finally: db_pool.putconn(conn) sent = 0 msg_obj = await m.answer(f"📡 Sending to {len(users)} users...") for i, (uid,) in enumerate(users): try: await bot.send_message(uid, text) sent += 1 except Exception: pass if i % 20 == 0: try: await msg_obj.edit_text(f"📡 {i}/{len(users)}") except Exception: pass await msg_obj.edit_text(f"✅ Done: *{sent}/{len(users)}*", reply_markup=back_kb("admin_btn")) # ── Voucher generation ──────────────────────────────── def _gen_code(length=10): return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length)) @dp.message(Command("gen_code")) async def generate_voucher(m: Message): if m.from_user.id not in ADMINS: return parts = m.text.split() if len(parts) < 3: return await m.answer("Usage:\n`/gen_code coins 100`\n`/gen_code vip 7`\n`/gen_code vip inf`") code_type = parts[1].lower() value = parts[2].lower() code = _gen_code() conn = db_pool.getconn() try: with conn.cursor() as c: if code_type == "coins": coins = int(value) c.execute("INSERT INTO vouchers (code, coins) VALUES (%s,%s)", (code, coins)) msg = f"🎟 Code Created!\n\nCode: `{code}`\n💰 Coins: *{coins}*" elif code_type == "vip": vip_days = -1 if value == "inf" else int(value) c.execute("INSERT INTO vouchers (code, vip_days) VALUES (%s,%s)", (code, vip_days)) msg = f"🎟 Code Created!\n\nCode: `{code}`\n🌟 VIP: *{'Lifetime' if value=='inf' else value + ' days'}*" else: return await m.answer("❌ Type must be `coins` or `vip`") conn.commit() finally: db_pool.putconn(conn) await m.answer(msg) @dp.message(Command("pm")) async def pm_cmd(m: Message): if m.from_user.id not in ADMINS: return try: parts = m.text.split(maxsplit=2) uid = int(parts[1]) msg = parts[2] await bot.send_message(uid, f"📩 *Message from Admin:*\n\n{msg}") await m.answer(f"✅ Sent to `{uid}`") except Exception: await m.answer("Usage: `/pm USER_ID Message`") # ── Fetch Links & Cache Buttons ─────────────────────── @dp.callback_query(F.data == "admin_fetch_links") async def admin_fetch_links_btn(cb: types.CallbackQuery): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) if caching_control["is_running"]: return await cb.answer("⚠️ عملية أرشفة تعمل الآن! أوقفها أولاً.", show_alert=True) await cb.answer("🔗 جاري سحب الروابط...", show_alert=True) await safe_edit( cb.message, "🔗 *جاري سحب روابط الـ 71 صفحة...*\n\n" "سيتم إرسال تحديث كل 10 صفحات.\n" "بعد الانتهاء اضغط زر الكاشينج.", reply_markup=back_kb("admin_btn") ) asyncio.create_task(_fetch_and_save_links(bot, cb.from_user.id)) @dp.callback_query(F.data.in_({"admin_start_cache_saved_zip", "admin_start_cache_saved_pdf"})) async def admin_start_cache_saved_btn(cb: types.CallbackQuery): if cb.from_user.id not in ADMINS: return await cb.answer("Unauthorized!", show_alert=True) if caching_control["is_running"]: return await cb.answer("⚠️ الأرشفة تعمل بالفعل! استخدم /stop_cache أولاً.", show_alert=True) queue = get_manga_queue() if not queue: return await cb.answer("❌ لا توجد روابط محفوظة! اسحب الروابط أولاً.", show_alert=True) file_format = "pdf" if cb.data.endswith("_pdf") else "zip" await cb.answer(f"🚀 بدأ الكاشينج ({file_format.upper()}) من الروابط المحفوظة...", show_alert=True) await safe_edit( cb.message, f"🚀 *بدأ الكاشينج!*\n\n" f"🔗 عدد الروابط: *{len(queue)}*\n" f"🗂 النوع: *{file_format.upper()}*\n" f"استخدم /stop_cache للإيقاف.", reply_markup=back_kb("admin_btn") ) asyncio.create_task(_cache_from_queue(bot, cb.from_user.id, queue, file_format=file_format)) @dp.message(Command("cache_site")) async def cache_site_cmd(m: Message): if m.from_user.id not in ADMINS: return if caching_control["is_running"]: return await m.answer("⚠️ الأرشفة تعمل! استخدم /stop_cache أولاً.") parts = m.text.split() start_p = int(parts[1]) if len(parts) >= 2 and parts[1].isdigit() else 1 end_p = int(parts[2]) if len(parts) >= 3 and parts[2].isdigit() else 71 queue = get_manga_queue() if not queue: await m.answer(f"🔗 لا توجد روابط محفوظة. سيتم سحبها أولاً من صفحة {start_p} إلى {end_p}.") asyncio.create_task(_fetch_and_save_links(bot, m.from_user.id)) else: asyncio.create_task(_cache_from_queue(bot, m.from_user.id, queue, file_format="zip")) await m.answer(f"🛰 بدأت الأرشفة ZIP من الروابط المحفوظة ({len(queue)} مانجا).") @dp.message(Command("stop_cache")) async def stop_cache_cmd(m: Message): if m.from_user.id not in ADMINS: return if not caching_control["is_running"]: return await m.answer("❌ لا توجد عملية أرشفة تعمل حالياً.") caching_control["stop_requested"] = True await m.answer("⏳ جاري طلب الإيقاف... سيتوقف بعد المهمة الحالية.")