| |
| import asyncio |
| import html |
| import logging |
| import re |
| import httpx |
| import io |
| from datetime import datetime, timedelta, timezone |
| from urllib.parse import quote |
|
|
| from telegram import Update, MessageOriginChannel, ChatMember, Chat, User, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, ReactionTypeEmoji |
| from telegram.ext import ContextTypes |
| from telegram.constants import ParseMode, MessageEntityType |
| from telegram.error import Forbidden, BadRequest |
|
|
| import config |
| import db |
| from . import utils |
| from .batch import BatchDeleter |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
|
|
| async def extract_text_with_own_api(image_bytes: bytes, api_url: str) -> str: |
| try: |
| files = {'file': ('image.jpg', image_bytes, 'image/jpeg')} |
| async with httpx.AsyncClient() as client: |
| response = await client.post(api_url, files=files, timeout=60.0) |
| if response.status_code == 200: |
| return response.json().get("extracted_text", "").strip() |
| return "" |
| except Exception as e: |
| logger.error(f"OCR API Error: {e}") |
| return "" |
|
|
| async def scheduled_delete(context, chat_id, message_id, delay): |
| try: |
| await asyncio.sleep(delay) |
| await context.bot.delete_message(chat_id=chat_id, message_id=message_id) |
| except: pass |
|
|
| |
| async def alert_missing_permissions(context, chat_id, user_id, trigger_msg_id=None): |
| redis = utils.get_redis_client_for_chat(context, chat_id) |
| if not redis: return |
| if await redis.get(f"perm_warn:{chat_id}"): return |
| try: |
| lang = await db.get_user_lang_from_db(context, user_id) |
| msg_text = utils.get_text("deletion_fail_group_warning", lang) |
| sent = await context.bot.send_message(chat_id, msg_text, parse_mode='HTML') |
| |
| await redis.set(f"perm_warn:{chat_id}", "1", ex=3600) |
| await redis.set(f"perm_warn_msg_id:{chat_id}", str(sent.message_id), ex=3600) |
| |
| if trigger_msg_id: |
| await redis.set(f"perm_trigger_msg_id:{chat_id}", str(trigger_msg_id), ex=3600) |
| except: pass |
|
|
| |
|
|
| async def process_deletion_and_tasks(context, user, chat, msg_id, settings, reason_keys, content): |
| batch_deleter = context.bot_data.get('batch_deleter') |
| if batch_deleter: |
| await batch_deleter.delete(chat.id, msg_id) |
| else: |
| try: |
| await context.bot.delete_message(chat.id, msg_id) |
| except BadRequest as e: |
| if "not found" not in str(e).lower(): |
| asyncio.create_task(alert_missing_permissions(context, chat.id, user.id, msg_id)) |
| except Forbidden: |
| asyncio.create_task(alert_missing_permissions(context, chat.id, user.id, msg_id)) |
| |
| asyncio.create_task(_background_processing(context, user, chat, settings, reason_keys, content)) |
|
|
| async def _background_processing(context, user, chat, settings, reason_keys, content): |
| redis_client = utils.get_redis_client_for_chat(context, chat.id) |
| is_join_leave = 'censor_reason_join_leave' in reason_keys |
| is_bot_link = False |
| |
| if 'censor_reason_link' in reason_keys or 'censor_reason_bio_link' in reason_keys: |
| if re.search(r'(@[\w_]+bot\b|(?:t\.me|telegram\.me)\/[\w_]+bot\b)', content, re.IGNORECASE): |
| is_bot_link = True |
|
|
| if not is_join_leave and not is_bot_link: |
| if re.search(r'(?:t\.me|telegram\.me)', content, re.IGNORECASE) or 'censor_reason_word' in reason_keys: |
| asyncio.create_task(utils.send_deletion_report(context, user, chat, reason_keys[0], content)) |
|
|
| await utils.update_user_activity_score(context, chat.id, user.id, 'deletion', reason_keys, content) |
| |
| if settings.get('mute_on_link_24_h', False) and ('censor_reason_link' in reason_keys or 'censor_reason_bio_link' in reason_keys): |
| try: |
| mute_until = datetime.now(timezone.utc) + timedelta(hours=24) |
| await context.bot.restrict_chat_member(chat.id, user.id, ChatPermissions(can_send_messages=False), until_date=mute_until) |
| except: pass |
|
|
| if redis_client: |
| cooldown_key = f"warn_cooldown:{chat.id}:{user.id}" |
| if not await redis_client.get(cooldown_key): |
| try: |
| lang = await db.get_user_lang_from_db(context, user.id) |
| reason_msg = utils.get_text(reason_keys[0], lang) |
| warn_text = utils.get_text("censor_warning", lang).format(user_mention=user.mention_html(), reason=reason_msg, bot_username=context.bot.username) |
| sent_warn = await context.bot.send_message(chat.id, warn_text, parse_mode=ParseMode.HTML) |
| await redis_client.set(cooldown_key, "1", ex=600) |
| asyncio.create_task(utils.delete_message_after_delay(context, chat.id, sent_warn.message_id, config.CENSOR_WARNING_DELETE_SECONDS)) |
| except: pass |
|
|
| async def process_image_with_ocr_in_background(context, chat_id, user_id, message_id, image_bytes): |
| try: |
| ocr_semaphore = context.bot_data.get('ocr_semaphore') |
| async with ocr_semaphore: |
| ocr_text = await extract_text_with_own_api(image_bytes, config.OCR_API_URL) |
| if ocr_text: |
| normalized_text = re.sub(r'\s+', '', ocr_text).lower() |
| if config.LINK_PATTERN_COMPILED.search(ocr_text) or any(indicator in normalized_text for indicator in config.LINK_INDICATOR_KEYWORDS): |
| settings = await utils.get_settings_from_cache_and_update_regex(context, chat_id) |
| try: |
| chat_obj = await context.bot.get_chat(chat_id) |
| member = await context.bot.get_chat_member(chat_id, user_id) |
| asyncio.create_task(process_deletion_and_tasks(context, member.user, chat_obj, message_id, settings, ['censor_reason_link'], ocr_text)) |
| except: pass |
| except: pass |
|
|
| |
|
|
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| msg, chat, user = update.effective_message, update.effective_chat, update.effective_user |
| if not all([msg, chat, user]): return |
|
|
| redis = utils.get_redis_client_for_chat(context, chat.id) |
| if redis and await redis.get(f"perm_warn:{chat.id}"): return |
|
|
| is_bot_admin = await utils.is_group_admin(context, chat.id, context.bot.id) |
| if not is_bot_admin and chat.type != 'private': |
| asyncio.create_task(alert_missing_permissions(context, chat.id, user.id, msg.message_id)) |
| return |
|
|
| if chat.type == 'private': |
| if not user.is_bot and not msg.new_chat_members: |
| asyncio.create_task(db.buffer_user_stats(context, user.id, user.first_name, user.username)) |
| return |
|
|
| if msg.new_chat_members: |
| is_adder_admin = await utils.is_group_admin(context, chat.id, user.id) |
| for member in msg.new_chat_members: |
| if member.is_bot and member.id != context.bot.id and not is_adder_admin: |
| try: |
| await context.bot.ban_chat_member(chat.id, member.id) |
| await msg.delete() |
| return |
| except: pass |
|
|
| is_edit = update.edited_message is not None |
| asyncio.create_task(db.track_chat_member(context.bot_data.get('db_pool'), chat.id, user.id)) |
| if not is_edit: |
| asyncio.create_task(db.buffer_user_stats(context, user.id, user.first_name, user.username)) |
| |
| settings = await utils.get_settings_from_cache_and_update_regex(context, chat.id) |
| owner_id = await utils.get_group_owner_id(context, chat.id) |
| is_exempt = (user.id == owner_id) or (user.id == context.bot_data.get('BOT_OWNER_ID')) or (settings.get('allow_admins_manage') and await utils.is_group_admin(context, chat.id, user.id)) |
|
|
| if is_exempt and not settings.get('enable_traffic_control', False): return |
| if settings.get('enable_traffic_control', False): asyncio.create_task(utils.check_traffic_control(context, chat.id)) |
| if is_exempt: return |
|
|
| |
| text_content = (msg.text or msg.caption or "").strip() |
| |
| |
| is_link = utils.has_link_in_text(msg) |
| |
| |
| if not is_link: |
| text_content = (msg.text or msg.caption or "").strip() |
| entities = msg.entities or msg.caption_entities or [] |
| |
| |
| has_mention_entity = any(ent.type in [MessageEntityType.MENTION, MessageEntityType.TEXT_MENTION] for ent in entities) |
| |
| if has_mention_entity: |
| if not settings.get('allow_usernames', False): |
| is_link = True |
| else: |
| |
| mentions = config.MENTION_PATTERN_COMPILED.findall(text_content) |
| for mention in mentions: |
| if await utils.is_channel_or_group_username(context, mention): |
| is_link = True |
| break |
|
|
| |
| violations = [] |
| |
| |
| if is_link: |
| violations.append(('link', 'censor_reason_link')) |
|
|
| |
| if (fw_regex := settings.get('fw_regex')) and fw_regex.search(text_content): |
| violations.append(('word', 'censor_reason_word')) |
| |
| |
| if settings.get('block_channel_forwards') and isinstance(msg.forward_origin, MessageOriginChannel): |
| violations.append(('forward', 'censor_reason_forward')) |
|
|
| |
| if violations: |
| if redis: |
| spam_key = f"spam_violation_counter:{chat.id}:{user.id}" |
| current_count = await redis.incr(spam_key) |
| if current_count == 1: |
| await redis.expire(spam_key, 600) |
| |
| if current_count >= 11: |
| try: |
| |
| await context.bot.restrict_chat_member(chat.id, user.id, ChatPermissions(can_send_messages=False)) |
| except: pass |
|
|
| |
| asyncio.create_task(process_deletion_and_tasks(context, user, chat, msg.message_id, settings, [v[1] for v in violations], text_content)) |
| return |
|
|
| |
| if settings.get('enable_spamscan') and config.SPAM_PHRASES_COMPILED.search(text_content): |
| try: |
| |
| if chat.username: |
| jump_url = f"https://t.me/{chat.username}/{msg.message_id}" |
| else: |
| chat_id_clean = str(chat.id).replace("-100", "") |
| jump_url = f"https://t.me/c/{chat_id_clean}/{msg.message_id}" |
| |
| |
| owner_id = await utils.get_group_owner_id(context, chat.id) |
| mention_text = "" |
| if owner_id: |
| |
| mention_text = f"📢 <b>Attention:</b> <a href='tg://user?id={owner_id}'>Admin</a>\n" |
| |
| |
| warn_text = f"🚨 <b>Spam Alert!</b>\n{mention_text}⚠️ একটি স্প্যাম মেসেজ পাওয়া গেছে।" |
| |
| |
| btns = [[InlineKeyboardButton("👀 View Message", url=jump_url)],[InlineKeyboardButton("🔇 Mute & Delete", callback_data=f"spam_act_mute_{user.id}_{msg.message_id}"), |
| InlineKeyboardButton("❌ Cancel", callback_data="spam_act_cancel")] |
| ] |
| |
| |
| await context.bot.send_message( |
| chat_id=chat.id, |
| text=warn_text, |
| parse_mode=ParseMode.HTML, |
| reply_markup=InlineKeyboardMarkup(btns), |
| disable_web_page_preview=True |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Spam Scan Error: {e}") |
| return |
|
|
| if settings.get('enable_sentiment', False) and text_content: |
| senti = await asyncio.to_thread(utils.analyze_sentiment, text_content) |
| if senti: |
| try: await msg.set_reaction(reaction=[ReactionTypeEmoji("❤️" if senti == 'positive' else "👎")]) |
| except: pass |
|
|
| if settings.get('block_bio_links', True): |
| bio_status_key = f"bio_status:{user.id}" |
| cached_bio = await redis.get(bio_status_key) if redis else None |
| if cached_bio == "bad": |
| asyncio.create_task(process_deletion_and_tasks(context, user, chat, msg.message_id, settings, ['censor_reason_bio_link'], "Bio contains links (cached)")) |
| return |
| if not cached_bio: |
| async with context.bot_data.get('api_semaphore'): |
| try: |
| profile = await context.bot.get_chat(user.id) |
| if profile.bio and utils.has_link_in_text(profile.bio): |
| if redis: await redis.set(bio_status_key, "bad", ex=600) |
| asyncio.create_task(process_deletion_and_tasks(context, user, chat, msg.message_id, settings, ['censor_reason_bio_link'], f"Bio: {profile.bio}")) |
| return |
| if redis: await redis.set(bio_status_key, "safe", ex=600) |
| except: pass |
|
|
| if settings.get('delete_join_messages') and (msg.new_chat_members or msg.left_chat_member): |
| asyncio.create_task(process_deletion_and_tasks(context, user, chat, msg.message_id, settings, ['censor_reason_join_leave'], "Join/Leave message")) |
| return |
|
|
| if text_content: |
| filters_cache = await db.get_chat_filters(context, chat.id) |
| for keyword, fdata in filters_cache.items(): |
| if keyword in text_content.lower(): |
| args = {'chat_id': chat.id, 'reply_to_message_id': msg.message_id} |
| if fdata['type'] == 'photo': await context.bot.send_photo(photo=fdata['file_id'], caption=fdata['text'], **args) |
| else: await context.bot.send_message(text=fdata['text'], parse_mode=ParseMode.HTML, **args) |
| return |
| if text_content.startswith('#'): |
| if note_data := await db.get_note_from_db(context, chat.id, text_content.split(' ')[0][1:].lower()): |
| await context.bot.send_message(chat.id, note_data['text'], parse_mode=ParseMode.HTML, reply_to_message_id=msg.message_id) |
|
|
| auto_del = settings.get('auto_delete_seconds', 0) |
| if auto_del > 0: |
| is_media = (msg.photo or msg.video or msg.document or msg.voice or msg.audio or msg.sticker or msg.animation or msg.video_note) |
| if is_media: asyncio.create_task(scheduled_delete(context, chat.id, msg.message_id, auto_del)) |
| |
| if settings.get('enable_ocr_scan') and msg.photo and config.OCR_API_URL: |
| try: |
| photo_file = await msg.photo[-1].get_file() |
| photo_bytes = await photo_file.download_as_bytearray() |
| asyncio.create_task(process_image_with_ocr_in_background(context, chat_id, user.id, msg.message_id, bytes(photo_bytes))) |
| except: pass |
|
|
| |
|
|
| async def handle_member_status_change(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| if not update.chat_member: return |
| chat = update.chat_member.chat |
| user = update.chat_member.new_chat_member.user |
| new_status = update.chat_member.new_chat_member.status |
| old_status = update.chat_member.old_chat_member.status |
| redis = utils.get_redis_client_for_chat(context, chat.id) |
| |
| if redis: |
| if new_status in [ChatMember.ADMINISTRATOR, ChatMember.OWNER]: |
| await redis.set(f"is_admin:{chat.id}:{user.id}", "1", ex=86400) |
| else: |
| await redis.set(f"is_admin:{chat.id}:{user.id}", "0", ex=86400) |
| await redis.delete(f"bio_status:{user.id}") |
| if new_status == ChatMember.OWNER: |
| await redis.set(f"group_owner:{chat.id}", str(user.id), ex=86400) |
|
|
| if new_status == ChatMember.MEMBER and old_status not in [ChatMember.MEMBER, ChatMember.ADMINISTRATOR, ChatMember.OWNER] and not user.is_bot: |
| await db._add_user_to_db_core(context.bot_data.get('db_pool'), user.id, user.first_name, user.username) |
| |
| try: |
| settings = await utils.get_settings_from_cache_and_update_regex(context, chat.id) |
| welcome_data = settings.get('welcome_data') or {} |
| |
| if redis: |
| last_msg_id = await redis.get(f"last_welcome_msg:{chat.id}") |
| if last_msg_id: |
| try: await context.bot.delete_message(chat.id, int(last_msg_id)) |
| except: pass |
|
|
| raw_text = welcome_data.get('text') or ( |
| "👋 <b>Welcome {name} to our Group!</b>\n\n" |
| "🛡️ I am your <b>Group Protector</b>. I will keep this chat safe from spam.\n" |
| "📝 Please follow the rules to avoid being muted or banned." |
| ) |
| welcome_text = raw_text.replace("{name}", user.mention_html()) |
| |
| |
| buttons_list = welcome_data.get('buttons', []) |
| keyboard = [] |
| for i, btn in enumerate(buttons_list): |
| btn_name = btn['name'] |
| btn_content = btn['content'].strip() |
| |
| if btn_content.startswith('share:'): |
| |
| from urllib.parse import quote |
| link_to_share = btn_content.replace('share:', '').strip() |
| share_url = f"https://t.me/share/url?url={quote(link_to_share)}" |
| keyboard.append([InlineKeyboardButton(btn_name, url=share_url)]) |
| |
| elif btn_content.startswith('http'): |
| keyboard.append([InlineKeyboardButton(btn_name, url=btn_content)]) |
| |
| else: |
| keyboard.append([InlineKeyboardButton(btn_name, callback_data=f"w_pop_{i}")]) |
| |
| reply_markup = InlineKeyboardMarkup(keyboard) if keyboard else None |
| sent = None |
| |
| if settings.get('welcome_mode', 'text') == 'card': |
| photos = await user.get_profile_photos(limit=1) |
| pfp = None |
| if photos.total_count > 0: |
| try: |
| pfp_file = await photos.photos[0][-1].get_file() |
| pfp = bytes(await pfp_file.download_as_bytearray()) |
| except: pfp = None |
| |
| card = await utils.generate_welcome_card(user.first_name, pfp) |
| if card: |
| sent = await context.bot.send_photo(chat.id, photo=card, caption=welcome_text, parse_mode=ParseMode.HTML, reply_markup=reply_markup) |
| |
| if not sent: |
| sent = await context.bot.send_message(chat.id, text=welcome_text, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_markup=reply_markup) |
| |
| if sent and redis: |
| await redis.set(f"last_welcome_msg:{chat.id}", sent.message_id) |
| ds = welcome_data.get('delete_seconds', 0) |
| if ds > 0: |
| asyncio.create_task(utils.delete_message_after_delay(context, chat.id, sent.message_id, ds)) |
| |
| except Exception as e: |
| logger.error(f"Welcome Error: {e}") |
|
|
| |
| async def handle_bot_status_change(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| result = update.my_chat_member |
| if not result: return |
| chat = result.chat |
| new_status = result.new_chat_member.status |
| pool = context.bot_data.get('db_pool') |
| |
| if new_status in [ChatMember.MEMBER, ChatMember.ADMINISTRATOR]: |
| await db._add_chat_to_db_core(pool, chat.id) |
| if new_status == ChatMember.ADMINISTRATOR: |
| redis = utils.get_redis_client_for_chat(context, chat.id) |
| if redis: |
| |
| await redis.set(f"is_admin:{chat.id}:{context.bot.id}", "1", ex=86400) |
| |
| |
| await redis.delete(f"perm_warn:{chat.id}") |
| |
| |
| if (last_warn := await redis.get(f"perm_warn_msg_id:{chat.id}")): |
| try: |
| await context.bot.delete_message(chat.id, int(last_warn)) |
| await redis.delete(f"perm_warn_msg_id:{chat.id}") |
| except: pass |
| |
| |
| if (trigger_link_id := await redis.get(f"perm_trigger_msg_id:{chat.id}")): |
| try: |
| await context.bot.delete_message(chat.id, int(trigger_link_id)) |
| await redis.delete(f"perm_trigger_msg_id:{chat.id}") |
| except: pass |
| |
| elif new_status in [ChatMember.LEFT, ChatMember.KICKED, ChatMember.BANNED]: |
| if pool: |
| try: |
| async with pool.acquire() as conn: await conn.execute("DELETE FROM chats WHERE chat_id = $1", chat.id) |
| except: pass |
|
|
| async def handle_edited_message(update, context): |
| await handle_message(update, context) |
|
|
| |