Spaces:
Runtime error
Runtime error
| # --- START OF FILE bot_handlers/utils.py --- | |
| import asyncio | |
| import logging | |
| import io | |
| import time | |
| import re | |
| import html | |
| import httpx | |
| import orjson as json | |
| import random | |
| from datetime import datetime, timedelta | |
| from functools import lru_cache | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, User, Chat | |
| from telegram.ext import ContextTypes | |
| from telegram.constants import ParseMode | |
| from telegram.error import Forbidden, BadRequest | |
| import config | |
| import db | |
| from credentials import BOT_TOKEN | |
| from .redis_manager import get_redis_client_for_chat, get_redis_client_for_user | |
| logger = logging.getLogger(__name__) | |
| # --- PILLOW IMPORT & SETUP --- | |
| try: | |
| from PIL import Image, ImageDraw, ImageFont | |
| PIL_AVAILABLE = True | |
| except ImportError: | |
| PIL_AVAILABLE = False | |
| logger.warning("Pillow library not found. Welcome cards will be disabled.") | |
| # --- BLOCKING IMAGE FUNCTION (CPU BOUND) --- | |
| def _generate_welcome_card_sync(user_name, profile_pic_bytes=None): | |
| if not PIL_AVAILABLE: return None | |
| try: | |
| # ১. ব্যাকগ্রাউন্ড ইমেজ তৈরি | |
| img = Image.new('RGB', (600, 300), color=(20, 20, 30)) | |
| draw = ImageDraw.Draw(img) | |
| # ২. ডিজাইনের জন্য কিছু শেপ | |
| draw.ellipse((-50, -50, 150, 150), fill=(40, 40, 60)) | |
| draw.ellipse((450, 150, 650, 350), fill=(40, 40, 60)) | |
| # ৩. ফন্ট লোড করা | |
| try: | |
| # আপনার কাছে যদি font.ttf থাকে তবে সেটি লোড হবে | |
| font_lg = ImageFont.truetype("font.ttf", 40) | |
| font_sm = ImageFont.truetype("font.ttf", 30) | |
| font_big_initial = ImageFont.truetype("font.ttf", 70) # অক্ষরের জন্য বড় ফন্ট | |
| except OSError: | |
| # ফন্ট না থাকলে ডিফল্ট | |
| font_lg = ImageFont.load_default() | |
| font_sm = ImageFont.load_default() | |
| font_big_initial = ImageFont.load_default() | |
| # --- প্রোফাইল পিকচার লজিক --- | |
| if profile_pic_bytes: | |
| # ক. যদি ছবি থাকে | |
| p_img = Image.open(io.BytesIO(profile_pic_bytes)).convert("RGBA") | |
| p_img = p_img.resize((120, 120)) | |
| # গোলাকার মাস্ক তৈরি | |
| mask = Image.new("L", (120, 120), 0) | |
| draw_mask = ImageDraw.Draw(mask) | |
| draw_mask.ellipse((0, 0, 120, 120), fill=255) | |
| # ছবি পেস্ট করা | |
| img.paste(p_img, (240, 50), mask) | |
| else: | |
| # খ. যদি ছবি না থাকে -> নামের প্রথম অক্ষর বসবে | |
| # সুন্দর কিছু রঙের লিস্ট | |
| colors = [ | |
| (231, 76, 60), # Red | |
| (52, 152, 219), # Blue | |
| (46, 204, 113), # Green | |
| (155, 89, 182), # Purple | |
| (241, 196, 15), # Yellow | |
| (230, 126, 34) # Orange | |
| ] | |
| # র্যান্ডম কালার সিলেক্ট করা | |
| bg_color = random.choice(colors) | |
| # রঙিন বৃত্ত আঁকা | |
| draw.ellipse((240, 50, 360, 170), fill=bg_color) | |
| # নামের প্রথম অক্ষর নেওয়া (বড় হাতের) | |
| initial = user_name[0].upper() if user_name else "?" | |
| # অক্ষরের পজিশন ঠিক করা (মাঝখানে) | |
| # anchor="mm" মানে Middle-Middle | |
| draw.text((300, 110), initial, fill=(255, 255, 255), anchor="mm", font=font_big_initial) | |
| # --- টেক্সট লেখা --- | |
| draw.text((300, 190), f"Welcome, {user_name[:15]}", fill=(255, 255, 255), anchor="mm", font=font_lg) | |
| draw.text((300, 230), "To Our Community", fill=(180, 180, 180), anchor="mm", font=font_sm) | |
| # ইমেজ সেভ করা | |
| bio = io.BytesIO() | |
| img.save(bio, 'JPEG') | |
| bio.seek(0) | |
| return bio.getvalue() | |
| except Exception as e: | |
| logger.error(f"Image Gen Error: {e}") | |
| return None | |
| # --- ASYNC WRAPPER FOR IMAGE GEN --- | |
| async def generate_welcome_card(user_name, profile_pic_bytes=None): | |
| loop = asyncio.get_running_loop() | |
| # Using None as executor runs the function in the default ThreadPoolExecutor | |
| return await loop.run_in_executor(None, _generate_welcome_card_sync, user_name, profile_pic_bytes) | |
| # --- HELPER FUNCTIONS --- | |
| from telegram.constants import MessageEntityType | |
| def get_text(key, lang="en"): | |
| data = config.LANGUAGES.get(lang, config.LANGUAGES["en"]) | |
| return data.get(key, f"_{key}_") | |
| def has_link_in_text(obj) -> bool: | |
| if not obj: return False | |
| # ১. সরাসরি টেক্সট বা বায়ো চেক | |
| if isinstance(obj, str): | |
| return bool(config.LINK_PATTERN_COMPILED.search(obj)) | |
| # ২. মেসেজের টেক্সট ও ক্যাপশন চেক | |
| text = (obj.text or obj.caption or "").strip() | |
| if config.LINK_PATTERN_COMPILED.search(text): | |
| return True | |
| # ৩. এনটিটি চেক (লুকানো লিংক ধরতে) | |
| entities = obj.entities or obj.caption_entities or [] | |
| for entity in entities: | |
| if entity.type in [MessageEntityType.URL, MessageEntityType.TEXT_LINK, MessageEntityType.MENTION]: | |
| return True | |
| # ৪. 🔥 বাটন চেক (LUXE স্প্যামের প্রধান অস্ত্র) | |
| # সাধারণ মেম্বার যদি কোনো বাটনওয়ালা মেসেজ পাঠায়, সেটি ১০০% স্প্যাম। | |
| if hasattr(obj, 'reply_markup') and obj.reply_markup and obj.reply_markup.inline_keyboard: | |
| return True | |
| # ৫. 🔥 প্রিভিউ চেক (যদি টেক্সটে লিংক না থাকে কিন্তু প্রিভিউ বক্সে লিংক থাকে) | |
| if hasattr(obj, 'link_preview_options') and obj.link_preview_options and obj.link_preview_options.url: | |
| return True | |
| return False | |
| async def delete_message_after_delay(context, chat_id, message_id, delay): | |
| if delay <= 0: return | |
| await asyncio.sleep(delay) | |
| try: await context.bot.delete_message(chat_id=chat_id, message_id=message_id) | |
| except: pass | |
| # --- 🔥 CACHED ADMIN CHECK (শক্তিশালী ও নির্ভরযোগ্য) --- | |
| async def is_group_admin(context, chat_id, user_id): | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| cache_key = f"is_admin:{chat_id}:{user_id}" | |
| # ১. রেডিস ক্যাশ চেক | |
| if redis: | |
| try: | |
| cached = await redis.get(cache_key) | |
| if cached == "1": return True | |
| if cached == "0": return False | |
| except: pass | |
| # ২. ক্যাশ না থাকলে বা টেলিগ্রাম আপডেট মিস করলে API থেকে আনা (Force Update) | |
| try: | |
| member = await context.bot.get_chat_member(chat_id, user_id) | |
| is_admin = member.status in ['administrator', 'creator'] | |
| # ৩. ভবিষ্যতে দ্রুত কাজ করার জন্য ক্যাশে সেভ করে রাখা | |
| if redis: | |
| await redis.set(cache_key, "1" if is_admin else "0", ex=1800) # ৩০ মিনিট ক্যাশ | |
| return is_admin | |
| except Exception as e: | |
| if "Chat_admin_required" in str(e) or "there is no administrators in the private chat" in str(e): | |
| return False | |
| logger.error(f"Admin Check Error: {e}") | |
| return False | |
| # --- 🔥 CACHED OWNER CHECK --- | |
| async def get_group_owner_id(context, chat_id): | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| cache_key = f"group_owner:{chat_id}" | |
| if redis: | |
| try: | |
| cached = await redis.get(cache_key) | |
| if cached: return int(cached) | |
| except Exception: pass | |
| try: | |
| admins = await context.bot.get_chat_administrators(chat_id) | |
| for admin in admins: | |
| if admin.status == 'creator': | |
| owner_id = admin.user.id | |
| if redis: | |
| try: await redis.set(cache_key, owner_id, ex=86400) | |
| except Exception: pass | |
| return owner_id | |
| except: return None | |
| return None | |
| # --- REPORT SYSTEM --- | |
| async def send_spam_report(context, chat_id, user_id, reason, text_content): | |
| settings = await db.get_all_settings(context, chat_id) | |
| if not settings.get('enable_spamscan', False): return | |
| report_target = context.bot_data.get('REPORT_CHANNEL_ID') | |
| if not report_target: | |
| report_target = await get_group_owner_id(context, chat_id) | |
| if not report_target: return | |
| try: | |
| chat = await context.bot.get_chat(chat_id) | |
| user = await context.bot.get_chat_member(chat_id, user_id) | |
| user_mention = user.user.mention_html() | |
| group_link = chat.title | |
| if chat.username: | |
| group_link = f"<a href='https://t.me/{chat.username}'>{chat.title}</a>" | |
| except: return | |
| lang = "en" | |
| msg = get_text("spamscan_group_owner_dm_notification_header", lang) + "\n\n" | |
| msg += get_text("spamscan_group_owner_dm_notification_body", lang).format(group_link_html=group_link, chat_id=chat_id) + "\n" | |
| msg += get_text("spamscan_group_owner_dm_user_info", lang).format(user_mention=user_mention, user_id=user_id) + "\n" | |
| msg += get_text("spamscan_group_owner_dm_reason", lang).format(reason_text=reason) + "\n\n" | |
| if text_content: | |
| msg += "<b>Message:</b>\n" + f"<blockquote>{text_content[:200]}</blockquote>" + "\n\n" | |
| msg += get_text("spamscan_group_owner_dm_action_info", lang) | |
| keyboard = InlineKeyboardMarkup([ | |
| [InlineKeyboardButton(get_text("button_mute_user_very_short", lang), callback_data=f"mute_user_perm_{user_id}_{chat_id}")], | |
| [InlineKeyboardButton(get_text("button_cancel_report_short", lang), callback_data=f"cancel_report_{user_id}")] | |
| ]) | |
| try: | |
| await context.bot.send_message(report_target, msg, reply_markup=keyboard, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Failed to send report: {e}") | |
| # --- SCORE & ACTIVITY UPDATE (MODIFIED: NO FLOATING SCORE) --- | |
| async def calculate_trust_score(context, user_id): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return 100 | |
| try: | |
| user_data = await db.get_user_stats(pool, user_id) | |
| if user_data: | |
| return user_data.get('trust_score', 100) | |
| return 100 | |
| except Exception as e: | |
| logger.error(f"Error calculating trust score: {e}") | |
| return 100 | |
| async def update_user_activity_score(context, chat_id, user_id, event, reason, content): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| is_known = False | |
| if redis: is_known = await redis.get(f"user_known:{user_id}") | |
| if not is_known: | |
| try: | |
| try: | |
| chat_member = await context.bot.get_chat_member(chat_id, user_id) | |
| fname = chat_member.user.first_name | |
| uname = chat_member.user.username | |
| except: | |
| fname = "Unknown" | |
| uname = None | |
| await db._add_user_to_db_core(pool, user_id, fname, uname) | |
| if redis: await redis.set(f"user_known:{user_id}", "1", ex=259200) | |
| except: pass | |
| if event == 'deletion' or event == 'warning': | |
| await db.increment_user_warning_count(pool, user_id) | |
| if redis: | |
| settings = await db.get_all_settings(context, chat_id) | |
| limit = settings.get('warn_limit', 0) | |
| if limit > 0: | |
| window_min = settings.get('warn_time_window', 15) | |
| action = settings.get('warn_action', 'mute') | |
| warn_key = f"autowarn:{chat_id}:{user_id}" | |
| current_warns = await redis.incr(warn_key) | |
| if current_warns == 1: | |
| await redis.expire(warn_key, window_min * 60) | |
| if current_warns >= limit: | |
| try: | |
| user_member = await context.bot.get_chat_member(chat_id, user_id) | |
| if user_member.status not in ['administrator', 'creator']: | |
| if action == 'kick': | |
| await context.bot.ban_chat_member(chat_id, user_id) | |
| await context.bot.unban_chat_member(chat_id, user_id) | |
| action_text = "kicked 👢" | |
| elif action == 'ban': | |
| await context.bot.ban_chat_member(chat_id, user_id) | |
| action_text = "banned ⛔" | |
| else: | |
| permissions = ChatPermissions(can_send_messages=False) | |
| await context.bot.restrict_chat_member(chat_id, user_id, permissions) | |
| action_text = "muted (Permanent) 🔇" | |
| await context.bot.send_message( | |
| chat_id, | |
| f"⚠️ User reached warning limit ({limit}/{limit}). Action: {action_text}", | |
| parse_mode=ParseMode.HTML | |
| ) | |
| await redis.delete(warn_key) | |
| except Exception as e: | |
| logger.error(f"Auto-punish failed: {e}") | |
| # Note: Floating point score logic removed here. | |
| # --- WRAPPERS --- | |
| async def check_admin(update, context, func): | |
| user = update.effective_user | |
| chat = update.effective_chat | |
| if chat.type == 'private': | |
| lang = await db.get_user_lang_from_db(context, user.id) | |
| await update.message.reply_text(get_text("group_only", lang), parse_mode=ParseMode.HTML) | |
| return | |
| settings = await db.get_all_settings(context, chat.id) | |
| allow_admins = settings.get('allow_admins_manage', False) | |
| owner_id = await get_group_owner_id(context, chat.id) | |
| bot_owner_id = context.bot_data.get('BOT_OWNER_ID') | |
| is_real_owner = (user.id == owner_id) or (user.id == bot_owner_id) | |
| if not is_real_owner: | |
| redis = get_redis_client_for_chat(context, chat.id) | |
| cache_key = f"is_admin:{chat.id}:{user.id}" | |
| is_admin = await is_group_admin(context, chat.id, user.id) | |
| if not is_admin: return | |
| if not allow_admins: | |
| lang = await db.get_user_lang_from_db(context, user.id) | |
| await update.message.reply_text(get_text("owner_only_command_text", lang), parse_mode=ParseMode.HTML) | |
| return | |
| await func(update, context) | |
| def bot_owner_only(func): | |
| async def wrapper(update, context, *args, **kwargs): | |
| if update.effective_user.id != context.bot_data.get('BOT_OWNER_ID'): return | |
| return await func(update, context, *args, **kwargs) | |
| return wrapper | |
| # --- KEYBOARDS --- | |
| def get_initial_keyboard(lang="en"): | |
| buttons = [[InlineKeyboardButton(get_text("admin_panel_button", lang), callback_data='show_admin_panel')]] | |
| return InlineKeyboardMarkup(buttons) | |
| def get_admin_commands_keyboard(lang="en", category="main"): | |
| if category == "main": | |
| buttons = [ | |
| [InlineKeyboardButton(get_text("btn_security", lang), callback_data="panel_security"), | |
| InlineKeyboardButton(get_text("btn_management", lang), callback_data="panel_management")], | |
| [InlineKeyboardButton(get_text("btn_advanced", lang), callback_data="panel_advanced"), | |
| InlineKeyboardButton(get_text("btn_tools", lang), callback_data="panel_tools")], | |
| [InlineKeyboardButton(get_text("back_button", lang), callback_data="back_to_start")] | |
| ] | |
| return InlineKeyboardMarkup(buttons) | |
| cmds = [] | |
| if category == "security": | |
| cmds = ["autowarn", "blockforwards", "antiduplicate", "spamscan", "ocrscan", "linktomute", "blockbiolink", "traffic"] | |
| elif category == "management": | |
| cmds = ["addword", "delword", "wordlist", "maxlength", "autodelete", "manageadmins", "allowusernames", "resetsettings"] | |
| elif category == "advanced": | |
| cmds = ["filter", "smartnotes", "welcomecard", "setwelcome", "sentiment", "ticket", "trustscore"] | |
| elif category == "tools": | |
| cmds = ["shadow", "whois", "clean_ghosts", "grant", "whisper"] | |
| buttons = [[InlineKeyboardButton(get_text(f"{cmd}_button_label", lang), callback_data=f'show_cmd_example_{cmd}') for cmd in cmds[i:i+2]] for i in range(0, len(cmds), 2)] | |
| buttons.append([InlineKeyboardButton(get_text("btn_back_menu", lang), callback_data='show_admin_panel')]) | |
| return InlineKeyboardMarkup(buttons) | |
| # --- TRAFFIC CONTROL --- | |
| async def check_traffic_control(context, chat_id): | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| if not redis: return | |
| lock_flag = f"is_locked:{chat_id}" | |
| if await redis.get(lock_flag): return | |
| window_key = f"traffic_count:{chat_id}:{int(time.time() // 5)}" | |
| count = await redis.incr(window_key) | |
| if count == 1: await redis.expire(window_key, 10) | |
| if count >= 5: | |
| logger.info(f"🚨 Traffic Alert triggered for {chat_id}") | |
| await redis.set(lock_flag, "1", ex=40) | |
| try: | |
| chat_obj = await context.bot.get_chat(chat_id) | |
| p = chat_obj.permissions | |
| perms_dict = p.to_dict() | |
| perms_dict['can_send_messages'] = True | |
| await redis.set(f"backup_perms:{chat_id}", json.dumps(perms_dict).decode('utf-8'), ex=120) | |
| await context.bot.send_message( | |
| chat_id, | |
| "🚦 <b>Traffic Jam!</b>\nGroup locked for 15s to prevent spam.", | |
| parse_mode=ParseMode.HTML | |
| ) | |
| await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=False)) | |
| await asyncio.sleep(15) | |
| backup_data = await redis.get(f"backup_perms:{chat_id}") | |
| if backup_data: | |
| s = json.loads(backup_data) | |
| s.pop('can_send_media_messages', None) | |
| restore_obj = ChatPermissions(**s) | |
| await context.bot.set_chat_permissions(chat_id, restore_obj) | |
| await context.bot.send_message(chat_id, "✅ <b>Traffic Normal.</b> Settings restored.", parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Traffic Control Error: {e}") | |
| try: await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=True)) | |
| except: pass | |
| finally: | |
| await redis.delete(lock_flag) | |
| # --- REGEX CACHE --- | |
| def _compile_forbidden_regex(words_tuple): | |
| if not words_tuple: return None | |
| pattern = r'(?:^|\W)(' + '|'.join(re.escape(word) for word in sorted(words_tuple, key=len, reverse=True)) + r')(?:$|\W)' | |
| try: return re.compile(pattern, re.IGNORECASE | re.UNICODE) | |
| except re.error: return None | |
| def get_forbidden_words_regex(words: list): | |
| if not words: return None | |
| return _compile_forbidden_regex(tuple(words)) | |
| async def get_settings_from_cache_and_update_regex(context, chat_id): | |
| settings = await db.get_all_settings(context, chat_id) | |
| settings['fw_regex'] = get_forbidden_words_regex(settings.get('forbidden_words', [])) | |
| return settings | |
| # --- DELETION REPORT --- | |
| async def send_deletion_report(context, user, chat, reason_key, content): | |
| report_channel_id = context.bot_data.get('REPORT_CHANNEL_ID') | |
| if not report_channel_id: return | |
| try: | |
| owner_id = await get_group_owner_id(context, chat.id) | |
| owner_lang = await db.get_user_lang_from_db(context, owner_id or user.id) | |
| reason_text = get_text(reason_key, lang=owner_lang) | |
| user_mention = f"<a href='tg://user?id={user.id}'>{html.escape(user.first_name)}</a>" | |
| try: | |
| invite_link = await chat.export_invite_link() if not chat.username else f"https://t.me/{chat.username}" | |
| group_link = f'<a href="{invite_link}">{html.escape(chat.title)}</a>' | |
| except: | |
| group_link = f"{html.escape(chat.title)}" | |
| report_message = ( | |
| f"🗑️ <b>Message Deleted</b>\n\n" | |
| f"🏢 <b>Group:</b> {group_link} (<code>{chat.id}</code>)\n" | |
| f"👤 <b>User:</b> {user_mention} (<code>{user.id}</code>)\n" | |
| f"📝 <b>Reason:</b> {html.escape(reason_text)}\n" | |
| f"📜 <b>Content:</b> <blockquote>{html.escape(content[:500])}</blockquote>" | |
| ) | |
| await context.bot.send_message(chat_id=report_channel_id, text=report_message, parse_mode=ParseMode.HTML, disable_web_page_preview=True) | |
| except Exception as e: | |
| logger.error(f"Failed to send deletion report: {e}") | |
| # --- MISC UTILS --- | |
| async def is_channel_or_group_username(context, username) -> bool: | |
| username_clean = username.replace('@', '').strip() | |
| if not username_clean: return False | |
| redis_client = get_redis_client_for_user(context) | |
| cache_key = f"uname_type:{username_clean.lower()}" | |
| try: | |
| cached_type = await redis_client.get(cache_key) | |
| if cached_type: return cached_type == "channel" | |
| except Exception: pass | |
| try: | |
| chat_obj = await context.bot.get_chat(f"@{username_clean}") | |
| is_channel = chat_obj.type in ['channel', 'supergroup', 'group'] | |
| val = "channel" if is_channel else "user" | |
| await redis_client.set(cache_key, val, ex=86400) | |
| return is_channel | |
| except: | |
| await redis_client.set(cache_key, "user", ex=86400) | |
| return False | |
| def analyze_sentiment(text: str) -> str: | |
| if not text: return None | |
| text = text.lower() | |
| pos_words = ['good', 'great', 'love', 'best', 'thanks', 'thank you', 'amazing', 'cool', 'ধন্যবাদ', 'ভালো', 'সেরা', 'সুন্দর', 'ওসাম', 'লভব', 'জোশ', 'धन्यवाद', 'अच्छा', 'बढ़िया', 'प्रेम', 'सुंदर'] | |
| neg_words = ['bad', 'worst', 'scam', 'fake', 'hate', 'ugly', 'stupid', 'useless', 'বাজে', 'খারাপ', 'ভুয়া', 'স্ক্যাম', 'ফালতু', 'ঘৃণা', 'बकवास', 'खराब', 'झूठ', 'घटिया'] | |
| if any(w in text for w in pos_words): return 'positive' | |
| if any(w in text for w in neg_words): return 'negative' | |
| return None | |
| # --- TOGGLE & NUMERIC WRAPPERS --- | |
| async def toggle_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str, owner_only: bool = False): | |
| async def logic(u, c): | |
| chat_id = u.effective_chat.id | |
| lang = await db.get_user_lang_from_db(c, u.effective_user.id) | |
| # মালিকের জন্য অতিরিক্ত সুরক্ষা চেক | |
| if owner_only: | |
| owner_id = await get_group_owner_id(c, chat_id) | |
| bot_owner = c.bot_data.get('BOT_OWNER_ID') | |
| if u.effective_user.id != owner_id and u.effective_user.id != bot_owner: | |
| await u.message.reply_html(get_text("owner_only_command_text", lang)) | |
| return | |
| settings = await db.get_all_settings(c, chat_id) | |
| current_status = settings.get(db_col, False) | |
| new_status = not current_status | |
| if c.args: | |
| arg = c.args[0].lower() | |
| if arg == 'on': new_status = True | |
| elif arg == 'off': new_status = False | |
| await db.update_setting_in_db(c, chat_id, db_col, new_status) | |
| await get_settings_from_cache_and_update_regex(c, chat_id) | |
| status_text = "feature_enabled" if new_status else "feature_disabled" | |
| feature_name = get_text(f"{key}_button_label", lang).replace("🚫 ", "").replace("✅ ", "").replace("🛠️ ", "").replace("🚦 ", "") | |
| await u.message.reply_html(get_text(status_text, lang).format(feature=feature_name)) | |
| await check_admin(update, context, logic) | |
| async def numeric_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str): | |
| async def logic(u, c): | |
| lang = await db.get_user_lang_from_db(c, u.effective_user.id) | |
| if not c.args: | |
| return await u.message.reply_html(get_text(f"{key}_example_full_text", lang)) | |
| try: | |
| val = int(c.args[0]) | |
| if val < 0: raise ValueError | |
| except: | |
| return await u.message.reply_text("⚠️ Please enter a valid positive number.") | |
| await db.update_setting_in_db(c, u.effective_chat.id, db_col, val) | |
| if val == 0: | |
| await u.message.reply_html(get_text(f"{key}_disabled", lang)) | |
| else: | |
| msg_key = f"{key}_set" | |
| txt = get_text(msg_key, lang) | |
| if key == 'maxlength': txt = txt.format(length=val) | |
| elif key == 'autodelete': txt = txt.format(seconds=val) | |
| await u.message.reply_html(txt) | |
| await check_admin(update, context, logic) |