# --- START OF FILE bot_handlers/utils.py --- import asyncio import logging import io import time import re import html import httpx import orjson as json import random from datetime import datetime, timedelta from functools import lru_cache from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, User, Chat from telegram.ext import ContextTypes from telegram.constants import ParseMode from telegram.error import Forbidden, BadRequest import config import db from credentials import BOT_TOKEN from .redis_manager import get_redis_client_for_chat, get_redis_client_for_user logger = logging.getLogger(__name__) # --- PILLOW IMPORT & SETUP --- try: from PIL import Image, ImageDraw, ImageFont PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False logger.warning("Pillow library not found. Welcome cards will be disabled.") # --- BLOCKING IMAGE FUNCTION (CPU BOUND) --- def _generate_welcome_card_sync(user_name, profile_pic_bytes=None): if not PIL_AVAILABLE: return None try: # ১. ব্যাকগ্রাউন্ড ইমেজ তৈরি img = Image.new('RGB', (600, 300), color=(20, 20, 30)) draw = ImageDraw.Draw(img) # ২. ডিজাইনের জন্য কিছু শেপ draw.ellipse((-50, -50, 150, 150), fill=(40, 40, 60)) draw.ellipse((450, 150, 650, 350), fill=(40, 40, 60)) # ৩. ফন্ট লোড করা try: # আপনার কাছে যদি font.ttf থাকে তবে সেটি লোড হবে font_lg = ImageFont.truetype("font.ttf", 40) font_sm = ImageFont.truetype("font.ttf", 30) font_big_initial = ImageFont.truetype("font.ttf", 70) # অক্ষরের জন্য বড় ফন্ট except OSError: # ফন্ট না থাকলে ডিফল্ট font_lg = ImageFont.load_default() font_sm = ImageFont.load_default() font_big_initial = ImageFont.load_default() # --- প্রোফাইল পিকচার লজিক --- if profile_pic_bytes: # ক. যদি ছবি থাকে p_img = Image.open(io.BytesIO(profile_pic_bytes)).convert("RGBA") p_img = p_img.resize((120, 120)) # গোলাকার মাস্ক তৈরি mask = Image.new("L", (120, 120), 0) draw_mask = ImageDraw.Draw(mask) draw_mask.ellipse((0, 0, 120, 120), fill=255) # ছবি পেস্ট করা img.paste(p_img, (240, 50), mask) else: # খ. যদি ছবি না থাকে -> নামের প্রথম অক্ষর বসবে # সুন্দর কিছু রঙের লিস্ট colors = [ (231, 76, 60), # Red (52, 152, 219), # Blue (46, 204, 113), # Green (155, 89, 182), # Purple (241, 196, 15), # Yellow (230, 126, 34) # Orange ] # র‍্যান্ডম কালার সিলেক্ট করা bg_color = random.choice(colors) # রঙিন বৃত্ত আঁকা draw.ellipse((240, 50, 360, 170), fill=bg_color) # নামের প্রথম অক্ষর নেওয়া (বড় হাতের) initial = user_name[0].upper() if user_name else "?" # অক্ষরের পজিশন ঠিক করা (মাঝখানে) # anchor="mm" মানে Middle-Middle draw.text((300, 110), initial, fill=(255, 255, 255), anchor="mm", font=font_big_initial) # --- টেক্সট লেখা --- draw.text((300, 190), f"Welcome, {user_name[:15]}", fill=(255, 255, 255), anchor="mm", font=font_lg) draw.text((300, 230), "To Our Community", fill=(180, 180, 180), anchor="mm", font=font_sm) # ইমেজ সেভ করা bio = io.BytesIO() img.save(bio, 'JPEG') bio.seek(0) return bio.getvalue() except Exception as e: logger.error(f"Image Gen Error: {e}") return None # --- ASYNC WRAPPER FOR IMAGE GEN --- async def generate_welcome_card(user_name, profile_pic_bytes=None): loop = asyncio.get_running_loop() # Using None as executor runs the function in the default ThreadPoolExecutor return await loop.run_in_executor(None, _generate_welcome_card_sync, user_name, profile_pic_bytes) # --- HELPER FUNCTIONS --- from telegram.constants import MessageEntityType def get_text(key, lang="en"): data = config.LANGUAGES.get(lang, config.LANGUAGES["en"]) return data.get(key, f"_{key}_") def has_link_in_text(obj) -> bool: if not obj: return False # ১. সরাসরি টেক্সট বা বায়ো চেক if isinstance(obj, str): return bool(config.LINK_PATTERN_COMPILED.search(obj)) # ২. মেসেজের টেক্সট ও ক্যাপশন চেক text = (obj.text or obj.caption or "").strip() if config.LINK_PATTERN_COMPILED.search(text): return True # ৩. এনটিটি চেক (লুকানো লিংক ধরতে) entities = obj.entities or obj.caption_entities or [] for entity in entities: if entity.type in [MessageEntityType.URL, MessageEntityType.TEXT_LINK, MessageEntityType.MENTION]: return True # ৪. 🔥 বাটন চেক (LUXE স্প্যামের প্রধান অস্ত্র) # সাধারণ মেম্বার যদি কোনো বাটনওয়ালা মেসেজ পাঠায়, সেটি ১০০% স্প্যাম। if hasattr(obj, 'reply_markup') and obj.reply_markup and obj.reply_markup.inline_keyboard: return True # ৫. 🔥 প্রিভিউ চেক (যদি টেক্সটে লিংক না থাকে কিন্তু প্রিভিউ বক্সে লিংক থাকে) if hasattr(obj, 'link_preview_options') and obj.link_preview_options and obj.link_preview_options.url: return True return False async def delete_message_after_delay(context, chat_id, message_id, delay): if delay <= 0: return await asyncio.sleep(delay) try: await context.bot.delete_message(chat_id=chat_id, message_id=message_id) except: pass # --- 🔥 CACHED ADMIN CHECK (শক্তিশালী ও নির্ভরযোগ্য) --- async def is_group_admin(context, chat_id, user_id): redis = get_redis_client_for_chat(context, chat_id) cache_key = f"is_admin:{chat_id}:{user_id}" # ১. রেডিস ক্যাশ চেক if redis: try: cached = await redis.get(cache_key) if cached == "1": return True if cached == "0": return False except: pass # ২. ক্যাশ না থাকলে বা টেলিগ্রাম আপডেট মিস করলে API থেকে আনা (Force Update) try: member = await context.bot.get_chat_member(chat_id, user_id) is_admin = member.status in ['administrator', 'creator'] # ৩. ভবিষ্যতে দ্রুত কাজ করার জন্য ক্যাশে সেভ করে রাখা if redis: await redis.set(cache_key, "1" if is_admin else "0", ex=1800) # ৩০ মিনিট ক্যাশ return is_admin except Exception as e: if "Chat_admin_required" in str(e) or "there is no administrators in the private chat" in str(e): return False logger.error(f"Admin Check Error: {e}") return False # --- 🔥 CACHED OWNER CHECK --- async def get_group_owner_id(context, chat_id): redis = get_redis_client_for_chat(context, chat_id) cache_key = f"group_owner:{chat_id}" if redis: try: cached = await redis.get(cache_key) if cached: return int(cached) except Exception: pass try: admins = await context.bot.get_chat_administrators(chat_id) for admin in admins: if admin.status == 'creator': owner_id = admin.user.id if redis: try: await redis.set(cache_key, owner_id, ex=86400) except Exception: pass return owner_id except: return None return None # --- REPORT SYSTEM --- async def send_spam_report(context, chat_id, user_id, reason, text_content): settings = await db.get_all_settings(context, chat_id) if not settings.get('enable_spamscan', False): return report_target = context.bot_data.get('REPORT_CHANNEL_ID') if not report_target: report_target = await get_group_owner_id(context, chat_id) if not report_target: return try: chat = await context.bot.get_chat(chat_id) user = await context.bot.get_chat_member(chat_id, user_id) user_mention = user.user.mention_html() group_link = chat.title if chat.username: group_link = f"{chat.title}" except: return lang = "en" msg = get_text("spamscan_group_owner_dm_notification_header", lang) + "\n\n" msg += get_text("spamscan_group_owner_dm_notification_body", lang).format(group_link_html=group_link, chat_id=chat_id) + "\n" msg += get_text("spamscan_group_owner_dm_user_info", lang).format(user_mention=user_mention, user_id=user_id) + "\n" msg += get_text("spamscan_group_owner_dm_reason", lang).format(reason_text=reason) + "\n\n" if text_content: msg += "Message:\n" + f"
{text_content[:200]}
" + "\n\n" msg += get_text("spamscan_group_owner_dm_action_info", lang) keyboard = InlineKeyboardMarkup([ [InlineKeyboardButton(get_text("button_mute_user_very_short", lang), callback_data=f"mute_user_perm_{user_id}_{chat_id}")], [InlineKeyboardButton(get_text("button_cancel_report_short", lang), callback_data=f"cancel_report_{user_id}")] ]) try: await context.bot.send_message(report_target, msg, reply_markup=keyboard, parse_mode=ParseMode.HTML) except Exception as e: logger.error(f"Failed to send report: {e}") # --- SCORE & ACTIVITY UPDATE (MODIFIED: NO FLOATING SCORE) --- async def calculate_trust_score(context, user_id): pool = context.bot_data.get('db_pool') if not pool: return 100 try: user_data = await db.get_user_stats(pool, user_id) if user_data: return user_data.get('trust_score', 100) return 100 except Exception as e: logger.error(f"Error calculating trust score: {e}") return 100 async def update_user_activity_score(context, chat_id, user_id, event, reason, content): pool = context.bot_data.get('db_pool') if not pool: return redis = get_redis_client_for_chat(context, chat_id) is_known = False if redis: is_known = await redis.get(f"user_known:{user_id}") if not is_known: try: try: chat_member = await context.bot.get_chat_member(chat_id, user_id) fname = chat_member.user.first_name uname = chat_member.user.username except: fname = "Unknown" uname = None await db._add_user_to_db_core(pool, user_id, fname, uname) if redis: await redis.set(f"user_known:{user_id}", "1", ex=259200) except: pass if event == 'deletion' or event == 'warning': await db.increment_user_warning_count(pool, user_id) if redis: settings = await db.get_all_settings(context, chat_id) limit = settings.get('warn_limit', 0) if limit > 0: window_min = settings.get('warn_time_window', 15) action = settings.get('warn_action', 'mute') warn_key = f"autowarn:{chat_id}:{user_id}" current_warns = await redis.incr(warn_key) if current_warns == 1: await redis.expire(warn_key, window_min * 60) if current_warns >= limit: try: user_member = await context.bot.get_chat_member(chat_id, user_id) if user_member.status not in ['administrator', 'creator']: if action == 'kick': await context.bot.ban_chat_member(chat_id, user_id) await context.bot.unban_chat_member(chat_id, user_id) action_text = "kicked 👢" elif action == 'ban': await context.bot.ban_chat_member(chat_id, user_id) action_text = "banned ⛔" else: permissions = ChatPermissions(can_send_messages=False) await context.bot.restrict_chat_member(chat_id, user_id, permissions) action_text = "muted (Permanent) 🔇" await context.bot.send_message( chat_id, f"⚠️ User reached warning limit ({limit}/{limit}). Action: {action_text}", parse_mode=ParseMode.HTML ) await redis.delete(warn_key) except Exception as e: logger.error(f"Auto-punish failed: {e}") # Note: Floating point score logic removed here. # --- WRAPPERS --- async def check_admin(update, context, func): user = update.effective_user chat = update.effective_chat if chat.type == 'private': lang = await db.get_user_lang_from_db(context, user.id) await update.message.reply_text(get_text("group_only", lang), parse_mode=ParseMode.HTML) return settings = await db.get_all_settings(context, chat.id) allow_admins = settings.get('allow_admins_manage', False) owner_id = await get_group_owner_id(context, chat.id) bot_owner_id = context.bot_data.get('BOT_OWNER_ID') is_real_owner = (user.id == owner_id) or (user.id == bot_owner_id) if not is_real_owner: redis = get_redis_client_for_chat(context, chat.id) cache_key = f"is_admin:{chat.id}:{user.id}" is_admin = await is_group_admin(context, chat.id, user.id) if not is_admin: return if not allow_admins: lang = await db.get_user_lang_from_db(context, user.id) await update.message.reply_text(get_text("owner_only_command_text", lang), parse_mode=ParseMode.HTML) return await func(update, context) def bot_owner_only(func): async def wrapper(update, context, *args, **kwargs): if update.effective_user.id != context.bot_data.get('BOT_OWNER_ID'): return return await func(update, context, *args, **kwargs) return wrapper # --- KEYBOARDS --- def get_initial_keyboard(lang="en"): buttons = [[InlineKeyboardButton(get_text("admin_panel_button", lang), callback_data='show_admin_panel')]] return InlineKeyboardMarkup(buttons) def get_admin_commands_keyboard(lang="en", category="main"): if category == "main": buttons = [ [InlineKeyboardButton(get_text("btn_security", lang), callback_data="panel_security"), InlineKeyboardButton(get_text("btn_management", lang), callback_data="panel_management")], [InlineKeyboardButton(get_text("btn_advanced", lang), callback_data="panel_advanced"), InlineKeyboardButton(get_text("btn_tools", lang), callback_data="panel_tools")], [InlineKeyboardButton(get_text("back_button", lang), callback_data="back_to_start")] ] return InlineKeyboardMarkup(buttons) cmds = [] if category == "security": cmds = ["autowarn", "blockforwards", "antiduplicate", "spamscan", "ocrscan", "linktomute", "blockbiolink", "traffic"] elif category == "management": cmds = ["addword", "delword", "wordlist", "maxlength", "autodelete", "manageadmins", "allowusernames", "resetsettings"] elif category == "advanced": cmds = ["filter", "smartnotes", "welcomecard", "setwelcome", "sentiment", "ticket", "trustscore"] elif category == "tools": cmds = ["shadow", "whois", "clean_ghosts", "grant", "whisper"] buttons = [[InlineKeyboardButton(get_text(f"{cmd}_button_label", lang), callback_data=f'show_cmd_example_{cmd}') for cmd in cmds[i:i+2]] for i in range(0, len(cmds), 2)] buttons.append([InlineKeyboardButton(get_text("btn_back_menu", lang), callback_data='show_admin_panel')]) return InlineKeyboardMarkup(buttons) # --- TRAFFIC CONTROL --- async def check_traffic_control(context, chat_id): redis = get_redis_client_for_chat(context, chat_id) if not redis: return lock_flag = f"is_locked:{chat_id}" if await redis.get(lock_flag): return window_key = f"traffic_count:{chat_id}:{int(time.time() // 5)}" count = await redis.incr(window_key) if count == 1: await redis.expire(window_key, 10) if count >= 5: logger.info(f"🚨 Traffic Alert triggered for {chat_id}") await redis.set(lock_flag, "1", ex=40) try: chat_obj = await context.bot.get_chat(chat_id) p = chat_obj.permissions perms_dict = p.to_dict() perms_dict['can_send_messages'] = True await redis.set(f"backup_perms:{chat_id}", json.dumps(perms_dict).decode('utf-8'), ex=120) await context.bot.send_message( chat_id, "🚦 Traffic Jam!\nGroup locked for 15s to prevent spam.", parse_mode=ParseMode.HTML ) await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=False)) await asyncio.sleep(15) backup_data = await redis.get(f"backup_perms:{chat_id}") if backup_data: s = json.loads(backup_data) s.pop('can_send_media_messages', None) restore_obj = ChatPermissions(**s) await context.bot.set_chat_permissions(chat_id, restore_obj) await context.bot.send_message(chat_id, "✅ Traffic Normal. Settings restored.", parse_mode=ParseMode.HTML) except Exception as e: logger.error(f"Traffic Control Error: {e}") try: await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=True)) except: pass finally: await redis.delete(lock_flag) # --- REGEX CACHE --- @lru_cache(maxsize=128) def _compile_forbidden_regex(words_tuple): if not words_tuple: return None pattern = r'(?:^|\W)(' + '|'.join(re.escape(word) for word in sorted(words_tuple, key=len, reverse=True)) + r')(?:$|\W)' try: return re.compile(pattern, re.IGNORECASE | re.UNICODE) except re.error: return None def get_forbidden_words_regex(words: list): if not words: return None return _compile_forbidden_regex(tuple(words)) async def get_settings_from_cache_and_update_regex(context, chat_id): settings = await db.get_all_settings(context, chat_id) settings['fw_regex'] = get_forbidden_words_regex(settings.get('forbidden_words', [])) return settings # --- DELETION REPORT --- async def send_deletion_report(context, user, chat, reason_key, content): report_channel_id = context.bot_data.get('REPORT_CHANNEL_ID') if not report_channel_id: return try: owner_id = await get_group_owner_id(context, chat.id) owner_lang = await db.get_user_lang_from_db(context, owner_id or user.id) reason_text = get_text(reason_key, lang=owner_lang) user_mention = f"{html.escape(user.first_name)}" try: invite_link = await chat.export_invite_link() if not chat.username else f"https://t.me/{chat.username}" group_link = f'{html.escape(chat.title)}' except: group_link = f"{html.escape(chat.title)}" report_message = ( f"🗑️ Message Deleted\n\n" f"🏢 Group: {group_link} ({chat.id})\n" f"👤 User: {user_mention} ({user.id})\n" f"📝 Reason: {html.escape(reason_text)}\n" f"📜 Content:
{html.escape(content[:500])}
" ) await context.bot.send_message(chat_id=report_channel_id, text=report_message, parse_mode=ParseMode.HTML, disable_web_page_preview=True) except Exception as e: logger.error(f"Failed to send deletion report: {e}") # --- MISC UTILS --- async def is_channel_or_group_username(context, username) -> bool: username_clean = username.replace('@', '').strip() if not username_clean: return False redis_client = get_redis_client_for_user(context) cache_key = f"uname_type:{username_clean.lower()}" try: cached_type = await redis_client.get(cache_key) if cached_type: return cached_type == "channel" except Exception: pass try: chat_obj = await context.bot.get_chat(f"@{username_clean}") is_channel = chat_obj.type in ['channel', 'supergroup', 'group'] val = "channel" if is_channel else "user" await redis_client.set(cache_key, val, ex=86400) return is_channel except: await redis_client.set(cache_key, "user", ex=86400) return False def analyze_sentiment(text: str) -> str: if not text: return None text = text.lower() pos_words = ['good', 'great', 'love', 'best', 'thanks', 'thank you', 'amazing', 'cool', 'ধন্যবাদ', 'ভালো', 'সেরা', 'সুন্দর', 'ওসাম', 'লভব', 'জোশ', 'धन्यवाद', 'अच्छा', 'बढ़िया', 'प्रेम', 'सुंदर'] neg_words = ['bad', 'worst', 'scam', 'fake', 'hate', 'ugly', 'stupid', 'useless', 'বাজে', 'খারাপ', 'ভুয়া', 'স্ক্যাম', 'ফালতু', 'ঘৃণা', 'बकवास', 'खराब', 'झूठ', 'घटिया'] if any(w in text for w in pos_words): return 'positive' if any(w in text for w in neg_words): return 'negative' return None # --- TOGGLE & NUMERIC WRAPPERS --- async def toggle_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str, owner_only: bool = False): async def logic(u, c): chat_id = u.effective_chat.id lang = await db.get_user_lang_from_db(c, u.effective_user.id) # মালিকের জন্য অতিরিক্ত সুরক্ষা চেক if owner_only: owner_id = await get_group_owner_id(c, chat_id) bot_owner = c.bot_data.get('BOT_OWNER_ID') if u.effective_user.id != owner_id and u.effective_user.id != bot_owner: await u.message.reply_html(get_text("owner_only_command_text", lang)) return settings = await db.get_all_settings(c, chat_id) current_status = settings.get(db_col, False) new_status = not current_status if c.args: arg = c.args[0].lower() if arg == 'on': new_status = True elif arg == 'off': new_status = False await db.update_setting_in_db(c, chat_id, db_col, new_status) await get_settings_from_cache_and_update_regex(c, chat_id) status_text = "feature_enabled" if new_status else "feature_disabled" feature_name = get_text(f"{key}_button_label", lang).replace("🚫 ", "").replace("✅ ", "").replace("🛠️ ", "").replace("🚦 ", "") await u.message.reply_html(get_text(status_text, lang).format(feature=feature_name)) await check_admin(update, context, logic) async def numeric_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str): async def logic(u, c): lang = await db.get_user_lang_from_db(c, u.effective_user.id) if not c.args: return await u.message.reply_html(get_text(f"{key}_example_full_text", lang)) try: val = int(c.args[0]) if val < 0: raise ValueError except: return await u.message.reply_text("⚠️ Please enter a valid positive number.") await db.update_setting_in_db(c, u.effective_chat.id, db_col, val) if val == 0: await u.message.reply_html(get_text(f"{key}_disabled", lang)) else: msg_key = f"{key}_set" txt = get_text(msg_key, lang) if key == 'maxlength': txt = txt.format(length=val) elif key == 'autodelete': txt = txt.format(seconds=val) await u.message.reply_html(txt) await check_admin(update, context, logic)