import logging import os import asyncio import threading import re from fastapi import FastAPI import uvicorn import redis.asyncio as redis from datetime import datetime, timedelta, timezone from concurrent.futures import ThreadPoolExecutor # --- OPTIMIZATION: uvloop setup (For faster performance) --- try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: pass from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatMember, ChatPermissions from telegram.ext import ( Application, CommandHandler, MessageHandler, ChatJoinRequestHandler, filters, ChatMemberHandler, ContextTypes, CallbackQueryHandler ) # আপনার প্রজেক্টের ফাইলগুলো থেকে ইম্পোর্ট from credentials import BOT_TOKEN, BOT_OWNER_ID, REPORT_CHANNEL_ID, REDIS_URL_1, REDIS_URL_2 import config from config import logger, BOT_VERSION, TELEGRAM_API_CONCURRENCY_LIMIT from db import create_db_pool_with_retry, setup_database import db from bot_handlers.batch import BatchDeleter from bot_handlers import utils # --- ১. Hugging Face Health Check Server (Port 7860) --- app_web = FastAPI() @app_web.get("/") def read_root(): return {"status": "Bot is alive and running!", "version": BOT_VERSION} def run_web_server(): # Hugging Face ডিফল্ট ৭৮৬০ পোর্ট ব্যবহার করে uvicorn.run(app_web, host="0.0.0.0", port=7860) # --- ২. গ্লোবাল এরর হ্যান্ডেলার --- async def global_error_handler(update: object, context: ContextTypes.DEFAULT_TYPE): logger.error("Exception while handling an update:", exc_info=context.error) # --- ৩. ব্যাকগ্রাউন্ড টাস্কসমূহ --- async def user_stats_flush_loop(app): while True: await asyncio.sleep(300) # প্রতি ৫ মিনিট পর DB-তে ডাটা ফ্লাশ হবে await db.flush_user_stats_to_db(app) async def check_expired_trials_loop(app): while True: await asyncio.sleep(60) pool = app.bot_data.get('db_pool') if not pool: continue try: async with pool.acquire() as conn: now_utc = datetime.now(timezone.utc) expired_vips = await conn.fetch("SELECT chat_id, user_id FROM vip_trials WHERE expiry_date < $1 AND is_safe = FALSE", now_utc) if expired_vips: for row in expired_vips: try: await app.bot.ban_chat_member(row['chat_id'], row['user_id']) await app.bot.unban_chat_member(row['chat_id'], row['user_id']) await asyncio.sleep(0.5) except: pass await conn.execute("DELETE FROM vip_trials WHERE expiry_date < $1", now_utc) except Exception as e: logger.error(f"❌ VIP Monitor Error: {e}") async def recovery_traffic_locks(app): """বট স্টার্ট হওয়ার সময় চেক করবে কোনো গ্রুপ লক হয়ে আছে কি না""" pool = app.bot_data.get('db_pool') if not pool: return group_ids = await db.get_all_group_ids(pool) for chat_id in group_ids: from bot_handlers.redis_manager import get_redis_client_for_chat redis_client = get_redis_client_for_chat(app, chat_id) if redis_client: backup = await redis_client.get(f"backup_perms:{chat_id}") if backup: from bot_handlers.utils import restore_group_permissions await restore_group_permissions(app, chat_id, backup) # 🔥 নতুন: ব্যাকগ্রাউন্ড বায়ো চেকার টাস্ক (একই প্রসেসের ভেতরে চলবে) async def background_bio_task(app): """এটি মেইন বটের ভেতরেই কিউ থেকে আইডি নিয়ে এক এক করে প্রোফাইল চেক করবে""" while True: try: # লাইন (Queue) থেকে ডাটা নিবে data = await app.bot_data['bio_queue'].get() chat_id, user_id, msg_id, settings, user_obj = data # রেট লিমিট এড়াতে ০.১ সেকেন্ড বিরতি await asyncio.sleep(0.1) try: full_user = await app.bot.get_chat(user_id) bio_text = getattr(full_user, 'bio', "") or "" first_name = full_user.first_name or "" last_name = full_user.last_name or "" personal_chat = getattr(full_user, 'personal_chat', None) full_profile_text = f"{first_name} {last_name} {bio_text}".strip() is_bad = False reason_text = "Bad Profile (Auto-detected)" if personal_chat: is_bad = True reason_text = "Personal Channel Linked" elif full_profile_text: if utils.has_link_in_text(full_profile_text) or config.SPAM_PHRASES_COMPILED.search(full_profile_text): is_bad = True reason_text = "Link or Spam words in Bio/Name" redis_client = app.bot_data.get('redis_clients')[0] if app.bot_data.get('redis_clients') else None if is_bad: if redis_client: await redis_client.set(f"bio_status:{user_id}", "bad", ex=1800) # মেসেজ ডিলিট try: await app.bot.delete_message(chat_id, msg_id) except: pass # ডিলেশন রিপোর্ট পাঠানো (Optional) try: from bot_handlers.core import _background_processing class DummyContext: def __init__(self, bot, bot_data): self.bot = bot self.bot_data = bot_data dummy_ctx = DummyContext(app.bot, app.bot_data) chat_obj = await app.bot.get_chat(chat_id) asyncio.create_task(_background_processing(dummy_ctx, user_obj, chat_obj, settings, ['censor_reason_bio_link'], reason_text)) except: pass else: if redis_client: await redis_client.set(f"bio_status:{user_id}", "safe", ex=1600) except Exception: pass finally: # কাজ শেষ হলে ইউজার লক রিলিজ করা redis_client = app.bot_data.get('redis_clients')[0] if app.bot_data.get('redis_clients') else None if redis_client: await redis_client.delete(f"bio_lock:{user_id}") app.bot_data['bio_queue'].task_done() except Exception as e: logger.error(f"Bio Task Error: {e}") # --- ৪. জয়েন রিকোয়েস্ট হ্যান্ডেলার --- async def handle_join_request(update: Update, context: ContextTypes.DEFAULT_TYPE): req = update.chat_join_request user, chat = req.from_user, req.chat pool = context.bot_data.get('db_pool') if user.username: clean_username = user.username.lower().replace("@", "") clients = context.bot_data.get('redis_clients') if clients: try: await clients[0].set(f"join_cache:{clean_username}", user.id, ex=2592000) except: pass if pool: try: temp_expiry = datetime.now(timezone.utc) + timedelta(minutes=10) async with pool.acquire() as conn: await conn.execute(""" INSERT INTO vip_trials (chat_id, user_id, expiry_date, joined_at, is_safe) VALUES ($1, $2, $3, CURRENT_TIMESTAMP, TRUE) ON CONFLICT (chat_id, user_id) DO UPDATE SET expiry_date = EXCLUDED.expiry_date, is_safe = TRUE """, chat.id, user.id, temp_expiry) except: pass # --- ৫. কোর হ্যান্ডেলার র‍্যাপার --- async def w_bot_status(u, c): from bot_handlers.core import handle_bot_status_change; await handle_bot_status_change(u, c) async def w_member_status(u, c): from bot_handlers.core import handle_member_status_change; await handle_member_status_change(u, c) async def w_msg(u, c): from bot_handlers.core import handle_message; await handle_message(u, c) async def w_edit_msg(u, c): from bot_handlers.core import handle_edited_message; await handle_edited_message(u, c) async def w_callback(u, c): from bot_handlers import callbacks; await callbacks.w_callback(u, c) async def safe_start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): user = update.effective_user if update.effective_chat.type != 'private': return pool = context.bot_data.get('db_pool') if pool: asyncio.create_task(pool.execute(""" INSERT INTO users (user_id, first_name, username, has_started_bot, last_active_on) VALUES ($1, $2, $3, TRUE, CURRENT_TIMESTAMP) ON CONFLICT (user_id) DO UPDATE SET has_started_bot = TRUE """, user.id, user.first_name, user.username)) keyboard = InlineKeyboardMarkup([ [InlineKeyboardButton("English 🇬🇧", callback_data='lang_en'), InlineKeyboardButton("বাংলা 🇧🇩", callback_data='lang_bn')], [InlineKeyboardButton("हिन्दी 🇮🇳", callback_data='lang_hi')] ]) await update.message.reply_html(f"👋 Hello {user.first_name}!\n\nPlease choose language:", reply_markup=keyboard) # --- ৬. POST INIT & STOP --- async def post_init(app: Application): app.bot_data['db_pool'] = await create_db_pool_with_retry() await setup_database(app.bot_data['db_pool']) app.bot_data['redis_clients'] = [] redis_urls = [REDIS_URL_1, REDIS_URL_2] for i, url in enumerate(redis_urls, 1): if url: try: r = redis.Redis.from_url(url, decode_responses=True); await r.ping() app.bot_data['redis_clients'].append(r) logger.info(f"✅ Redis {i} Connected!") except: pass app.bot_data['api_semaphore'] = asyncio.Semaphore(TELEGRAM_API_CONCURRENCY_LIMIT) app.bot_data['ocr_semaphore'] = asyncio.Semaphore(2) app.bot_data['batch_deleter'] = BatchDeleter(app.bot) app.bot_data['proc_pool'] = ThreadPoolExecutor(max_workers=4) app.bot_data['BOT_OWNER_ID'] = int(BOT_OWNER_ID) if BOT_OWNER_ID else None app.bot_data['REPORT_CHANNEL_ID'] = int(REPORT_CHANNEL_ID) if REPORT_CHANNEL_ID else None # 🔥 কিউ তৈরি এবং ব্যাকগ্রাউন্ড টাস্ক চালু করা app.bot_data['bio_queue'] = asyncio.Queue() asyncio.create_task(background_bio_task(app)) # বাকি লুপগুলো শুরু asyncio.create_task(check_expired_trials_loop(app)) asyncio.create_task(user_stats_flush_loop(app)) asyncio.create_task(recovery_traffic_locks(app)) async def post_stop(app: Application): await db.flush_user_stats_to_db(app) if p := app.bot_data.get('db_pool'): await p.close() for r in app.bot_data.get('redis_clients', []): await r.close() # --- ৭. প্রধান ফাংশন --- def main(): if not BOT_TOKEN: return app = Application.builder().token(BOT_TOKEN).post_init(post_init).post_stop(post_stop).build() app.add_error_handler(global_error_handler) from bot_handlers import admin_commands as ac from bot_handlers import extra_commands as ec from bot_handlers import owner_commands as oc app.add_handler(CommandHandler("start", safe_start_command)) app.add_handler(CommandHandler("grant", ac.vip_trial_command)) app.add_handler(CommandHandler("whois", ac.whois_command)) app.add_handler(CommandHandler("whisper", ac.whisper_command)) app.add_handler(CommandHandler("setwarn", ac.setwarn_command)) app.add_handler(CommandHandler("addword", ac.add_word_command)) app.add_handler(CommandHandler("delword", ac.del_word_command)) app.add_handler(CommandHandler("wordlist", ac.wordlist_command)) app.add_handler(CommandHandler("resetsettings", ac.reset_settings_command)) app.add_handler(CommandHandler("filter", ec.filter_command)) app.add_handler(CommandHandler("stop", ec.stop_filter_command)) app.add_handler(CommandHandler("save", ec.save_note_command)) app.add_handler(CommandHandler("setwelcome", ec.set_welcome_command)) app.add_handler(CommandHandler("welcomecard", ec.welcome_card_command)) app.add_handler(CommandHandler("shadow", ec.shadow_command)) app.add_handler(CommandHandler("clean_ghosts", ec.clean_ghosts_command)) app.add_handler(CommandHandler("linktomute", ec.linktomute_command)) app.add_handler(CommandHandler("spamscan", ec.spamscan_command)) app.add_handler(CommandHandler("antiduplicate", ec.antiduplicate_command)) app.add_handler(CommandHandler("blockforwards", ec.blockforwards_command)) app.add_handler(CommandHandler("blockbiolink", ec.blockbiolink_command)) app.add_handler(CommandHandler("ocrscan", ec.ocrscan_command)) app.add_handler(CommandHandler("traffic", ec.traffic_command)) app.add_handler(CommandHandler("manageadmins", ec.manageadmins_command)) app.add_handler(CommandHandler("allowusernames", ec.allowusernames_command)) app.add_handler(CommandHandler("maxlength", ec.maxlength_command)) app.add_handler(CommandHandler("autodelete", ec.autodelete_command)) app.add_handler(CommandHandler("sentiment", ec.sentiment_command)) app.add_handler(CommandHandler("ticket", ec.ticket_command)) app.add_handler(CommandHandler("trustscore", ec.trustscore_command)) app.add_handler(CommandHandler("fixdb", ec.fix_database_command)) app.add_handler(CommandHandler("broadcast", oc.broadcast_command)) app.add_handler(CommandHandler("bc", oc.broadcast_groups_command)) app.add_handler(CommandHandler("stats", oc.stats_command)) app.add_handler(CommandHandler("activity", oc.check_activity_command)) app.add_handler(CommandHandler("get_dead_groups", oc.get_dead_groups_command)) app.add_handler(CommandHandler("purge_dead_groups", oc.purge_dead_groups_command)) app.add_handler(CommandHandler("backup", oc.backup_database_command)) app.add_handler(CommandHandler("sban", oc.sban_command)) app.add_handler(CommandHandler("sdel", oc.sdel_command)) app.add_handler(CommandHandler("smute", oc.smute_command)) app.add_handler(CallbackQueryHandler(w_callback)) app.add_handler(ChatJoinRequestHandler(handle_join_request)) app.add_handler(MessageHandler(filters.ALL & ~filters.COMMAND, w_msg)) app.add_handler(MessageHandler(filters.UpdateType.EDITED_MESSAGE, w_edit_msg)) app.add_handler(ChatMemberHandler(w_bot_status, ChatMemberHandler.MY_CHAT_MEMBER)) app.add_handler(ChatMemberHandler(w_member_status, ChatMemberHandler.CHAT_MEMBER)) threading.Thread(target=run_web_server, daemon=True).start() logger.info("Bot is polling silently...") app.run_polling(allowed_updates=["message", "callback_query", "chat_member", "my_chat_member", "chat_join_request", "edited_message"]) if __name__ == '__main__': main()