| |
|
|
| import asyncio |
| import logging |
| import io |
| import time |
| import re |
| import html |
| import httpx |
| import orjson as json |
| import random |
| from datetime import datetime, timedelta |
| from functools import lru_cache |
|
|
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, User, Chat |
| from telegram.ext import ContextTypes |
| from telegram.constants import ParseMode |
| from telegram.error import Forbidden, BadRequest |
|
|
| import config |
| import db |
| from credentials import BOT_TOKEN |
| from .redis_manager import get_redis_client_for_chat, get_redis_client_for_user |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| from PIL import Image, ImageDraw, ImageFont |
| PIL_AVAILABLE = True |
| except ImportError: |
| PIL_AVAILABLE = False |
| logger.warning("Pillow library not found. Welcome cards will be disabled.") |
|
|
| |
| def _generate_welcome_card_sync(user_name, profile_pic_bytes=None): |
| if not PIL_AVAILABLE: return None |
| try: |
| |
| img = Image.new('RGB', (600, 300), color=(20, 20, 30)) |
| draw = ImageDraw.Draw(img) |
| |
| |
| draw.ellipse((-50, -50, 150, 150), fill=(40, 40, 60)) |
| draw.ellipse((450, 150, 650, 350), fill=(40, 40, 60)) |
|
|
| |
| try: |
| |
| font_lg = ImageFont.truetype("font.ttf", 40) |
| font_sm = ImageFont.truetype("font.ttf", 30) |
| font_big_initial = ImageFont.truetype("font.ttf", 70) |
| except OSError: |
| |
| font_lg = ImageFont.load_default() |
| font_sm = ImageFont.load_default() |
| font_big_initial = ImageFont.load_default() |
|
|
| |
| if profile_pic_bytes: |
| |
| p_img = Image.open(io.BytesIO(profile_pic_bytes)).convert("RGBA") |
| p_img = p_img.resize((120, 120)) |
| |
| |
| mask = Image.new("L", (120, 120), 0) |
| draw_mask = ImageDraw.Draw(mask) |
| draw_mask.ellipse((0, 0, 120, 120), fill=255) |
| |
| |
| img.paste(p_img, (240, 50), mask) |
| |
| else: |
| |
| |
| |
| colors = [ |
| (231, 76, 60), |
| (52, 152, 219), |
| (46, 204, 113), |
| (155, 89, 182), |
| (241, 196, 15), |
| (230, 126, 34) |
| ] |
| |
| bg_color = random.choice(colors) |
|
|
| |
| draw.ellipse((240, 50, 360, 170), fill=bg_color) |
| |
| |
| initial = user_name[0].upper() if user_name else "?" |
| |
| |
| |
| draw.text((300, 110), initial, fill=(255, 255, 255), anchor="mm", font=font_big_initial) |
|
|
| |
| draw.text((300, 190), f"Welcome, {user_name[:15]}", fill=(255, 255, 255), anchor="mm", font=font_lg) |
| draw.text((300, 230), "To Our Community", fill=(180, 180, 180), anchor="mm", font=font_sm) |
|
|
| |
| bio = io.BytesIO() |
| img.save(bio, 'JPEG') |
| bio.seek(0) |
| return bio.getvalue() |
| |
| except Exception as e: |
| logger.error(f"Image Gen Error: {e}") |
| return None |
|
|
| |
| async def generate_welcome_card(user_name, profile_pic_bytes=None): |
| loop = asyncio.get_running_loop() |
| |
| return await loop.run_in_executor(None, _generate_welcome_card_sync, user_name, profile_pic_bytes) |
|
|
| |
|
|
| from telegram.constants import MessageEntityType |
|
|
| def get_text(key, lang="en"): |
| data = config.LANGUAGES.get(lang, config.LANGUAGES["en"]) |
| return data.get(key, f"_{key}_") |
|
|
| def has_link_in_text(obj) -> bool: |
| if not obj: return False |
| |
| |
| if isinstance(obj, str): |
| return bool(config.LINK_PATTERN_COMPILED.search(obj)) |
|
|
| |
| text = (obj.text or obj.caption or "").strip() |
| if config.LINK_PATTERN_COMPILED.search(text): |
| return True |
|
|
| |
| entities = obj.entities or obj.caption_entities or [] |
| for entity in entities: |
| if entity.type in [MessageEntityType.URL, MessageEntityType.TEXT_LINK, MessageEntityType.MENTION]: |
| return True |
|
|
| |
| |
| if hasattr(obj, 'reply_markup') and obj.reply_markup and obj.reply_markup.inline_keyboard: |
| return True |
|
|
| |
| if hasattr(obj, 'link_preview_options') and obj.link_preview_options and obj.link_preview_options.url: |
| return True |
| |
| return False |
| async def delete_message_after_delay(context, chat_id, message_id, delay): |
| if delay <= 0: return |
| await asyncio.sleep(delay) |
| try: await context.bot.delete_message(chat_id=chat_id, message_id=message_id) |
| except: pass |
|
|
| |
| async def is_group_admin(context, chat_id, user_id): |
| redis = get_redis_client_for_chat(context, chat_id) |
| cache_key = f"is_admin:{chat_id}:{user_id}" |
| |
| |
| if redis: |
| try: |
| cached = await redis.get(cache_key) |
| if cached == "1": return True |
| if cached == "0": return False |
| except: pass |
| |
| |
| try: |
| member = await context.bot.get_chat_member(chat_id, user_id) |
| is_admin = member.status in ['administrator', 'creator'] |
| |
| |
| if redis: |
| await redis.set(cache_key, "1" if is_admin else "0", ex=1800) |
| |
| return is_admin |
| |
| except Exception as e: |
| if "Chat_admin_required" in str(e) or "there is no administrators in the private chat" in str(e): |
| return False |
| |
| logger.error(f"Admin Check Error: {e}") |
| return False |
|
|
| |
| async def get_group_owner_id(context, chat_id): |
| redis = get_redis_client_for_chat(context, chat_id) |
| cache_key = f"group_owner:{chat_id}" |
| if redis: |
| try: |
| cached = await redis.get(cache_key) |
| if cached: return int(cached) |
| except Exception: pass |
| try: |
| admins = await context.bot.get_chat_administrators(chat_id) |
| for admin in admins: |
| if admin.status == 'creator': |
| owner_id = admin.user.id |
| if redis: |
| try: await redis.set(cache_key, owner_id, ex=86400) |
| except Exception: pass |
| return owner_id |
| except: return None |
| return None |
|
|
| |
|
|
| async def send_spam_report(context, chat_id, user_id, reason, text_content): |
| settings = await db.get_all_settings(context, chat_id) |
| if not settings.get('enable_spamscan', False): return |
| report_target = context.bot_data.get('REPORT_CHANNEL_ID') |
| if not report_target: |
| report_target = await get_group_owner_id(context, chat_id) |
| if not report_target: return |
| try: |
| chat = await context.bot.get_chat(chat_id) |
| user = await context.bot.get_chat_member(chat_id, user_id) |
| user_mention = user.user.mention_html() |
| group_link = chat.title |
| if chat.username: |
| group_link = f"<a href='https://t.me/{chat.username}'>{chat.title}</a>" |
| except: return |
| lang = "en" |
| msg = get_text("spamscan_group_owner_dm_notification_header", lang) + "\n\n" |
| msg += get_text("spamscan_group_owner_dm_notification_body", lang).format(group_link_html=group_link, chat_id=chat_id) + "\n" |
| msg += get_text("spamscan_group_owner_dm_user_info", lang).format(user_mention=user_mention, user_id=user_id) + "\n" |
| msg += get_text("spamscan_group_owner_dm_reason", lang).format(reason_text=reason) + "\n\n" |
| if text_content: |
| msg += "<b>Message:</b>\n" + f"<blockquote>{text_content[:200]}</blockquote>" + "\n\n" |
| msg += get_text("spamscan_group_owner_dm_action_info", lang) |
| keyboard = InlineKeyboardMarkup([ |
| [InlineKeyboardButton(get_text("button_mute_user_very_short", lang), callback_data=f"mute_user_perm_{user_id}_{chat_id}")], |
| [InlineKeyboardButton(get_text("button_cancel_report_short", lang), callback_data=f"cancel_report_{user_id}")] |
| ]) |
| try: |
| await context.bot.send_message(report_target, msg, reply_markup=keyboard, parse_mode=ParseMode.HTML) |
| except Exception as e: |
| logger.error(f"Failed to send report: {e}") |
|
|
| |
|
|
| async def calculate_trust_score(context, user_id): |
| pool = context.bot_data.get('db_pool') |
| if not pool: return 100 |
| try: |
| user_data = await db.get_user_stats(pool, user_id) |
| if user_data: |
| return user_data.get('trust_score', 100) |
| return 100 |
| except Exception as e: |
| logger.error(f"Error calculating trust score: {e}") |
| return 100 |
|
|
| async def update_user_activity_score(context, chat_id, user_id, event, reason, content): |
| pool = context.bot_data.get('db_pool') |
| if not pool: return |
| redis = get_redis_client_for_chat(context, chat_id) |
| is_known = False |
| if redis: is_known = await redis.get(f"user_known:{user_id}") |
| if not is_known: |
| try: |
| try: |
| chat_member = await context.bot.get_chat_member(chat_id, user_id) |
| fname = chat_member.user.first_name |
| uname = chat_member.user.username |
| except: |
| fname = "Unknown" |
| uname = None |
| await db._add_user_to_db_core(pool, user_id, fname, uname) |
| if redis: await redis.set(f"user_known:{user_id}", "1", ex=259200) |
| except: pass |
| |
| if event == 'deletion' or event == 'warning': |
| await db.increment_user_warning_count(pool, user_id) |
| if redis: |
| settings = await db.get_all_settings(context, chat_id) |
| limit = settings.get('warn_limit', 0) |
| if limit > 0: |
| window_min = settings.get('warn_time_window', 15) |
| action = settings.get('warn_action', 'mute') |
| warn_key = f"autowarn:{chat_id}:{user_id}" |
| current_warns = await redis.incr(warn_key) |
| if current_warns == 1: |
| await redis.expire(warn_key, window_min * 60) |
| if current_warns >= limit: |
| try: |
| user_member = await context.bot.get_chat_member(chat_id, user_id) |
| if user_member.status not in ['administrator', 'creator']: |
| if action == 'kick': |
| await context.bot.ban_chat_member(chat_id, user_id) |
| await context.bot.unban_chat_member(chat_id, user_id) |
| action_text = "kicked 👢" |
| elif action == 'ban': |
| await context.bot.ban_chat_member(chat_id, user_id) |
| action_text = "banned ⛔" |
| else: |
| permissions = ChatPermissions(can_send_messages=False) |
| await context.bot.restrict_chat_member(chat_id, user_id, permissions) |
| action_text = "muted (Permanent) 🔇" |
| await context.bot.send_message( |
| chat_id, |
| f"⚠️ User reached warning limit ({limit}/{limit}). Action: {action_text}", |
| parse_mode=ParseMode.HTML |
| ) |
| await redis.delete(warn_key) |
| except Exception as e: |
| logger.error(f"Auto-punish failed: {e}") |
| |
|
|
| |
|
|
| async def check_admin(update, context, func): |
| user = update.effective_user |
| chat = update.effective_chat |
| |
| if chat.type == 'private': |
| lang = await db.get_user_lang_from_db(context, user.id) |
| await update.message.reply_text(get_text("group_only", lang), parse_mode=ParseMode.HTML) |
| return |
| |
| settings = await db.get_all_settings(context, chat.id) |
| allow_admins = settings.get('allow_admins_manage', False) |
| owner_id = await get_group_owner_id(context, chat.id) |
| bot_owner_id = context.bot_data.get('BOT_OWNER_ID') |
|
|
| is_real_owner = (user.id == owner_id) or (user.id == bot_owner_id) |
|
|
| if not is_real_owner: |
| redis = get_redis_client_for_chat(context, chat.id) |
| cache_key = f"is_admin:{chat.id}:{user.id}" |
| is_admin = await is_group_admin(context, chat.id, user.id) |
| if not is_admin: return |
|
|
| if not allow_admins: |
| lang = await db.get_user_lang_from_db(context, user.id) |
| await update.message.reply_text(get_text("owner_only_command_text", lang), parse_mode=ParseMode.HTML) |
| return |
| |
| await func(update, context) |
|
|
| def bot_owner_only(func): |
| async def wrapper(update, context, *args, **kwargs): |
| if update.effective_user.id != context.bot_data.get('BOT_OWNER_ID'): return |
| return await func(update, context, *args, **kwargs) |
| return wrapper |
|
|
| |
|
|
| def get_initial_keyboard(lang="en"): |
| buttons = [[InlineKeyboardButton(get_text("admin_panel_button", lang), callback_data='show_admin_panel')]] |
| return InlineKeyboardMarkup(buttons) |
|
|
| def get_admin_commands_keyboard(lang="en", category="main"): |
| if category == "main": |
| buttons = [ |
| [InlineKeyboardButton(get_text("btn_security", lang), callback_data="panel_security"), |
| InlineKeyboardButton(get_text("btn_management", lang), callback_data="panel_management")], |
| [InlineKeyboardButton(get_text("btn_advanced", lang), callback_data="panel_advanced"), |
| InlineKeyboardButton(get_text("btn_tools", lang), callback_data="panel_tools")], |
| [InlineKeyboardButton(get_text("back_button", lang), callback_data="back_to_start")] |
| ] |
| return InlineKeyboardMarkup(buttons) |
| |
| cmds = [] |
| if category == "security": |
| cmds = ["autowarn", "blockforwards", "antiduplicate", "spamscan", "ocrscan", "linktomute", "blockbiolink", "traffic"] |
| elif category == "management": |
| cmds = ["addword", "delword", "wordlist", "maxlength", "autodelete", "manageadmins", "allowusernames", "resetsettings"] |
| elif category == "advanced": |
| cmds = ["filter", "smartnotes", "welcomecard", "setwelcome", "sentiment", "ticket", "trustscore"] |
| elif category == "tools": |
| cmds = ["shadow", "whois", "clean_ghosts", "grant", "whisper"] |
| |
| buttons = [[InlineKeyboardButton(get_text(f"{cmd}_button_label", lang), callback_data=f'show_cmd_example_{cmd}') for cmd in cmds[i:i+2]] for i in range(0, len(cmds), 2)] |
| buttons.append([InlineKeyboardButton(get_text("btn_back_menu", lang), callback_data='show_admin_panel')]) |
| return InlineKeyboardMarkup(buttons) |
|
|
| |
|
|
| async def check_traffic_control(context, chat_id): |
| redis = get_redis_client_for_chat(context, chat_id) |
| if not redis: return |
| lock_flag = f"is_locked:{chat_id}" |
| if await redis.get(lock_flag): return |
| window_key = f"traffic_count:{chat_id}:{int(time.time() // 5)}" |
| count = await redis.incr(window_key) |
| if count == 1: await redis.expire(window_key, 10) |
| if count >= 5: |
| logger.info(f"🚨 Traffic Alert triggered for {chat_id}") |
| await redis.set(lock_flag, "1", ex=40) |
| try: |
| chat_obj = await context.bot.get_chat(chat_id) |
| p = chat_obj.permissions |
| perms_dict = p.to_dict() |
| perms_dict['can_send_messages'] = True |
| await redis.set(f"backup_perms:{chat_id}", json.dumps(perms_dict).decode('utf-8'), ex=120) |
| await context.bot.send_message( |
| chat_id, |
| "🚦 <b>Traffic Jam!</b>\nGroup locked for 15s to prevent spam.", |
| parse_mode=ParseMode.HTML |
| ) |
| await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=False)) |
| await asyncio.sleep(15) |
| backup_data = await redis.get(f"backup_perms:{chat_id}") |
| if backup_data: |
| s = json.loads(backup_data) |
| s.pop('can_send_media_messages', None) |
| restore_obj = ChatPermissions(**s) |
| await context.bot.set_chat_permissions(chat_id, restore_obj) |
| await context.bot.send_message(chat_id, "✅ <b>Traffic Normal.</b> Settings restored.", parse_mode=ParseMode.HTML) |
| except Exception as e: |
| logger.error(f"Traffic Control Error: {e}") |
| try: await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=True)) |
| except: pass |
| finally: |
| await redis.delete(lock_flag) |
|
|
| |
|
|
| @lru_cache(maxsize=128) |
| def _compile_forbidden_regex(words_tuple): |
| if not words_tuple: return None |
| pattern = r'(?:^|\W)(' + '|'.join(re.escape(word) for word in sorted(words_tuple, key=len, reverse=True)) + r')(?:$|\W)' |
| try: return re.compile(pattern, re.IGNORECASE | re.UNICODE) |
| except re.error: return None |
|
|
| def get_forbidden_words_regex(words: list): |
| if not words: return None |
| return _compile_forbidden_regex(tuple(words)) |
|
|
| async def get_settings_from_cache_and_update_regex(context, chat_id): |
| settings = await db.get_all_settings(context, chat_id) |
| settings['fw_regex'] = get_forbidden_words_regex(settings.get('forbidden_words', [])) |
| return settings |
|
|
| |
|
|
| async def send_deletion_report(context, user, chat, reason_key, content): |
| report_channel_id = context.bot_data.get('REPORT_CHANNEL_ID') |
| if not report_channel_id: return |
| try: |
| owner_id = await get_group_owner_id(context, chat.id) |
| owner_lang = await db.get_user_lang_from_db(context, owner_id or user.id) |
| reason_text = get_text(reason_key, lang=owner_lang) |
| user_mention = f"<a href='tg://user?id={user.id}'>{html.escape(user.first_name)}</a>" |
| try: |
| invite_link = await chat.export_invite_link() if not chat.username else f"https://t.me/{chat.username}" |
| group_link = f'<a href="{invite_link}">{html.escape(chat.title)}</a>' |
| except: |
| group_link = f"{html.escape(chat.title)}" |
| report_message = ( |
| f"🗑️ <b>Message Deleted</b>\n\n" |
| f"🏢 <b>Group:</b> {group_link} (<code>{chat.id}</code>)\n" |
| f"👤 <b>User:</b> {user_mention} (<code>{user.id}</code>)\n" |
| f"📝 <b>Reason:</b> {html.escape(reason_text)}\n" |
| f"📜 <b>Content:</b> <blockquote>{html.escape(content[:500])}</blockquote>" |
| ) |
| await context.bot.send_message(chat_id=report_channel_id, text=report_message, parse_mode=ParseMode.HTML, disable_web_page_preview=True) |
| except Exception as e: |
| logger.error(f"Failed to send deletion report: {e}") |
| |
| |
|
|
| async def is_channel_or_group_username(context, username) -> bool: |
| username_clean = username.replace('@', '').strip() |
| if not username_clean: return False |
| redis_client = get_redis_client_for_user(context) |
| cache_key = f"uname_type:{username_clean.lower()}" |
| try: |
| cached_type = await redis_client.get(cache_key) |
| if cached_type: return cached_type == "channel" |
| except Exception: pass |
| try: |
| chat_obj = await context.bot.get_chat(f"@{username_clean}") |
| is_channel = chat_obj.type in ['channel', 'supergroup', 'group'] |
| val = "channel" if is_channel else "user" |
| await redis_client.set(cache_key, val, ex=86400) |
| return is_channel |
| except: |
| await redis_client.set(cache_key, "user", ex=86400) |
| return False |
|
|
| def analyze_sentiment(text: str) -> str: |
| if not text: return None |
| text = text.lower() |
| pos_words = ['good', 'great', 'love', 'best', 'thanks', 'thank you', 'amazing', 'cool', 'ধন্যবাদ', 'ভালো', 'সেরা', 'সুন্দর', 'ওসাম', 'লভব', 'জোশ', 'धन्यवाद', 'अच्छा', 'बढ़िया', 'प्रेम', 'सुंदर'] |
| neg_words = ['bad', 'worst', 'scam', 'fake', 'hate', 'ugly', 'stupid', 'useless', 'বাজে', 'খারাপ', 'ভুয়া', 'স্ক্যাম', 'ফালতু', 'ঘৃণা', 'बकवास', 'खराब', 'झूठ', 'घटिया'] |
| if any(w in text for w in pos_words): return 'positive' |
| if any(w in text for w in neg_words): return 'negative' |
| return None |
|
|
| |
|
|
| async def toggle_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str, owner_only: bool = False): |
| async def logic(u, c): |
| chat_id = u.effective_chat.id |
| lang = await db.get_user_lang_from_db(c, u.effective_user.id) |
| |
| |
| if owner_only: |
| owner_id = await get_group_owner_id(c, chat_id) |
| bot_owner = c.bot_data.get('BOT_OWNER_ID') |
| if u.effective_user.id != owner_id and u.effective_user.id != bot_owner: |
| await u.message.reply_html(get_text("owner_only_command_text", lang)) |
| return |
| |
| settings = await db.get_all_settings(c, chat_id) |
| current_status = settings.get(db_col, False) |
| new_status = not current_status |
| if c.args: |
| arg = c.args[0].lower() |
| if arg == 'on': new_status = True |
| elif arg == 'off': new_status = False |
| await db.update_setting_in_db(c, chat_id, db_col, new_status) |
| await get_settings_from_cache_and_update_regex(c, chat_id) |
| status_text = "feature_enabled" if new_status else "feature_disabled" |
| feature_name = get_text(f"{key}_button_label", lang).replace("🚫 ", "").replace("✅ ", "").replace("🛠️ ", "").replace("🚦 ", "") |
| await u.message.reply_html(get_text(status_text, lang).format(feature=feature_name)) |
| |
| await check_admin(update, context, logic) |
|
|
| async def numeric_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str): |
| async def logic(u, c): |
| lang = await db.get_user_lang_from_db(c, u.effective_user.id) |
| if not c.args: |
| return await u.message.reply_html(get_text(f"{key}_example_full_text", lang)) |
| try: |
| val = int(c.args[0]) |
| if val < 0: raise ValueError |
| except: |
| return await u.message.reply_text("⚠️ Please enter a valid positive number.") |
| await db.update_setting_in_db(c, u.effective_chat.id, db_col, val) |
| if val == 0: |
| await u.message.reply_html(get_text(f"{key}_disabled", lang)) |
| else: |
| msg_key = f"{key}_set" |
| txt = get_text(msg_key, lang) |
| if key == 'maxlength': txt = txt.format(length=val) |
| elif key == 'autodelete': txt = txt.format(seconds=val) |
| await u.message.reply_html(txt) |
| await check_admin(update, context, logic) |