rslbot / utils.py
pmrony's picture
Upload utils.py
95693c5 verified
# --- START OF FILE bot_handlers/utils.py ---
import asyncio
import logging
import io
import time
import re
import html
import httpx
import orjson as json
import random
from datetime import datetime, timedelta
from functools import lru_cache
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, User, Chat
from telegram.ext import ContextTypes
from telegram.constants import ParseMode
from telegram.error import Forbidden, BadRequest
import config
import db
from credentials import BOT_TOKEN
from .redis_manager import get_redis_client_for_chat, get_redis_client_for_user
logger = logging.getLogger(__name__)
# --- PILLOW IMPORT & SETUP ---
try:
from PIL import Image, ImageDraw, ImageFont
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
logger.warning("Pillow library not found. Welcome cards will be disabled.")
# --- BLOCKING IMAGE FUNCTION (CPU BOUND) ---
def _generate_welcome_card_sync(user_name, profile_pic_bytes=None):
if not PIL_AVAILABLE: return None
try:
# ১. ব্যাকগ্রাউন্ড ইমেজ তৈরি
img = Image.new('RGB', (600, 300), color=(20, 20, 30))
draw = ImageDraw.Draw(img)
# ২. ডিজাইনের জন্য কিছু শেপ
draw.ellipse((-50, -50, 150, 150), fill=(40, 40, 60))
draw.ellipse((450, 150, 650, 350), fill=(40, 40, 60))
# ৩. ফন্ট লোড করা
try:
# আপনার কাছে যদি font.ttf থাকে তবে সেটি লোড হবে
font_lg = ImageFont.truetype("font.ttf", 40)
font_sm = ImageFont.truetype("font.ttf", 30)
font_big_initial = ImageFont.truetype("font.ttf", 70) # অক্ষরের জন্য বড় ফন্ট
except OSError:
# ফন্ট না থাকলে ডিফল্ট
font_lg = ImageFont.load_default()
font_sm = ImageFont.load_default()
font_big_initial = ImageFont.load_default()
# --- প্রোফাইল পিকচার লজিক ---
if profile_pic_bytes:
# ক. যদি ছবি থাকে
p_img = Image.open(io.BytesIO(profile_pic_bytes)).convert("RGBA")
p_img = p_img.resize((120, 120))
# গোলাকার মাস্ক তৈরি
mask = Image.new("L", (120, 120), 0)
draw_mask = ImageDraw.Draw(mask)
draw_mask.ellipse((0, 0, 120, 120), fill=255)
# ছবি পেস্ট করা
img.paste(p_img, (240, 50), mask)
else:
# খ. যদি ছবি না থাকে -> নামের প্রথম অক্ষর বসবে
# সুন্দর কিছু রঙের লিস্ট
colors = [
(231, 76, 60), # Red
(52, 152, 219), # Blue
(46, 204, 113), # Green
(155, 89, 182), # Purple
(241, 196, 15), # Yellow
(230, 126, 34) # Orange
]
# র‍্যান্ডম কালার সিলেক্ট করা
bg_color = random.choice(colors)
# রঙিন বৃত্ত আঁকা
draw.ellipse((240, 50, 360, 170), fill=bg_color)
# নামের প্রথম অক্ষর নেওয়া (বড় হাতের)
initial = user_name[0].upper() if user_name else "?"
# অক্ষরের পজিশন ঠিক করা (মাঝখানে)
# anchor="mm" মানে Middle-Middle
draw.text((300, 110), initial, fill=(255, 255, 255), anchor="mm", font=font_big_initial)
# --- টেক্সট লেখা ---
draw.text((300, 190), f"Welcome, {user_name[:15]}", fill=(255, 255, 255), anchor="mm", font=font_lg)
draw.text((300, 230), "To Our Community", fill=(180, 180, 180), anchor="mm", font=font_sm)
# ইমেজ সেভ করা
bio = io.BytesIO()
img.save(bio, 'JPEG')
bio.seek(0)
return bio.getvalue()
except Exception as e:
logger.error(f"Image Gen Error: {e}")
return None
# --- ASYNC WRAPPER FOR IMAGE GEN ---
async def generate_welcome_card(user_name, profile_pic_bytes=None):
loop = asyncio.get_running_loop()
# Using None as executor runs the function in the default ThreadPoolExecutor
return await loop.run_in_executor(None, _generate_welcome_card_sync, user_name, profile_pic_bytes)
# --- HELPER FUNCTIONS ---
from telegram.constants import MessageEntityType
def get_text(key, lang="en"):
data = config.LANGUAGES.get(lang, config.LANGUAGES["en"])
return data.get(key, f"_{key}_")
def has_link_in_text(obj) -> bool:
if not obj: return False
# ১. সরাসরি টেক্সট বা বায়ো চেক
if isinstance(obj, str):
return bool(config.LINK_PATTERN_COMPILED.search(obj))
# ২. মেসেজের টেক্সট ও ক্যাপশন চেক
text = (obj.text or obj.caption or "").strip()
if config.LINK_PATTERN_COMPILED.search(text):
return True
# ৩. এনটিটি চেক (লুকানো লিংক ধরতে)
entities = obj.entities or obj.caption_entities or []
for entity in entities:
if entity.type in [MessageEntityType.URL, MessageEntityType.TEXT_LINK, MessageEntityType.MENTION]:
return True
# ৪. 🔥 বাটন চেক (LUXE স্প্যামের প্রধান অস্ত্র)
# সাধারণ মেম্বার যদি কোনো বাটনওয়ালা মেসেজ পাঠায়, সেটি ১০০% স্প্যাম।
if hasattr(obj, 'reply_markup') and obj.reply_markup and obj.reply_markup.inline_keyboard:
return True
# ৫. 🔥 প্রিভিউ চেক (যদি টেক্সটে লিংক না থাকে কিন্তু প্রিভিউ বক্সে লিংক থাকে)
if hasattr(obj, 'link_preview_options') and obj.link_preview_options and obj.link_preview_options.url:
return True
return False
async def delete_message_after_delay(context, chat_id, message_id, delay):
if delay <= 0: return
await asyncio.sleep(delay)
try: await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
except: pass
# --- 🔥 CACHED ADMIN CHECK (শক্তিশালী ও নির্ভরযোগ্য) ---
async def is_group_admin(context, chat_id, user_id):
redis = get_redis_client_for_chat(context, chat_id)
cache_key = f"is_admin:{chat_id}:{user_id}"
# ১. রেডিস ক্যাশ চেক
if redis:
try:
cached = await redis.get(cache_key)
if cached == "1": return True
if cached == "0": return False
except: pass
# ২. ক্যাশ না থাকলে বা টেলিগ্রাম আপডেট মিস করলে API থেকে আনা (Force Update)
try:
member = await context.bot.get_chat_member(chat_id, user_id)
is_admin = member.status in ['administrator', 'creator']
# ৩. ভবিষ্যতে দ্রুত কাজ করার জন্য ক্যাশে সেভ করে রাখা
if redis:
await redis.set(cache_key, "1" if is_admin else "0", ex=1800) # ৩০ মিনিট ক্যাশ
return is_admin
except Exception as e:
if "Chat_admin_required" in str(e) or "there is no administrators in the private chat" in str(e):
return False
logger.error(f"Admin Check Error: {e}")
return False
# --- 🔥 CACHED OWNER CHECK ---
async def get_group_owner_id(context, chat_id):
redis = get_redis_client_for_chat(context, chat_id)
cache_key = f"group_owner:{chat_id}"
if redis:
try:
cached = await redis.get(cache_key)
if cached: return int(cached)
except Exception: pass
try:
admins = await context.bot.get_chat_administrators(chat_id)
for admin in admins:
if admin.status == 'creator':
owner_id = admin.user.id
if redis:
try: await redis.set(cache_key, owner_id, ex=86400)
except Exception: pass
return owner_id
except: return None
return None
# --- REPORT SYSTEM ---
async def send_spam_report(context, chat_id, user_id, reason, text_content):
settings = await db.get_all_settings(context, chat_id)
if not settings.get('enable_spamscan', False): return
report_target = context.bot_data.get('REPORT_CHANNEL_ID')
if not report_target:
report_target = await get_group_owner_id(context, chat_id)
if not report_target: return
try:
chat = await context.bot.get_chat(chat_id)
user = await context.bot.get_chat_member(chat_id, user_id)
user_mention = user.user.mention_html()
group_link = chat.title
if chat.username:
group_link = f"<a href='https://t.me/{chat.username}'>{chat.title}</a>"
except: return
lang = "en"
msg = get_text("spamscan_group_owner_dm_notification_header", lang) + "\n\n"
msg += get_text("spamscan_group_owner_dm_notification_body", lang).format(group_link_html=group_link, chat_id=chat_id) + "\n"
msg += get_text("spamscan_group_owner_dm_user_info", lang).format(user_mention=user_mention, user_id=user_id) + "\n"
msg += get_text("spamscan_group_owner_dm_reason", lang).format(reason_text=reason) + "\n\n"
if text_content:
msg += "<b>Message:</b>\n" + f"<blockquote>{text_content[:200]}</blockquote>" + "\n\n"
msg += get_text("spamscan_group_owner_dm_action_info", lang)
keyboard = InlineKeyboardMarkup([
[InlineKeyboardButton(get_text("button_mute_user_very_short", lang), callback_data=f"mute_user_perm_{user_id}_{chat_id}")],
[InlineKeyboardButton(get_text("button_cancel_report_short", lang), callback_data=f"cancel_report_{user_id}")]
])
try:
await context.bot.send_message(report_target, msg, reply_markup=keyboard, parse_mode=ParseMode.HTML)
except Exception as e:
logger.error(f"Failed to send report: {e}")
# --- SCORE & ACTIVITY UPDATE (MODIFIED: NO FLOATING SCORE) ---
async def calculate_trust_score(context, user_id):
pool = context.bot_data.get('db_pool')
if not pool: return 100
try:
user_data = await db.get_user_stats(pool, user_id)
if user_data:
return user_data.get('trust_score', 100)
return 100
except Exception as e:
logger.error(f"Error calculating trust score: {e}")
return 100
async def update_user_activity_score(context, chat_id, user_id, event, reason, content):
pool = context.bot_data.get('db_pool')
if not pool: return
redis = get_redis_client_for_chat(context, chat_id)
is_known = False
if redis: is_known = await redis.get(f"user_known:{user_id}")
if not is_known:
try:
try:
chat_member = await context.bot.get_chat_member(chat_id, user_id)
fname = chat_member.user.first_name
uname = chat_member.user.username
except:
fname = "Unknown"
uname = None
await db._add_user_to_db_core(pool, user_id, fname, uname)
if redis: await redis.set(f"user_known:{user_id}", "1", ex=259200)
except: pass
if event == 'deletion' or event == 'warning':
await db.increment_user_warning_count(pool, user_id)
if redis:
settings = await db.get_all_settings(context, chat_id)
limit = settings.get('warn_limit', 0)
if limit > 0:
window_min = settings.get('warn_time_window', 15)
action = settings.get('warn_action', 'mute')
warn_key = f"autowarn:{chat_id}:{user_id}"
current_warns = await redis.incr(warn_key)
if current_warns == 1:
await redis.expire(warn_key, window_min * 60)
if current_warns >= limit:
try:
user_member = await context.bot.get_chat_member(chat_id, user_id)
if user_member.status not in ['administrator', 'creator']:
if action == 'kick':
await context.bot.ban_chat_member(chat_id, user_id)
await context.bot.unban_chat_member(chat_id, user_id)
action_text = "kicked 👢"
elif action == 'ban':
await context.bot.ban_chat_member(chat_id, user_id)
action_text = "banned ⛔"
else:
permissions = ChatPermissions(can_send_messages=False)
await context.bot.restrict_chat_member(chat_id, user_id, permissions)
action_text = "muted (Permanent) 🔇"
await context.bot.send_message(
chat_id,
f"⚠️ User reached warning limit ({limit}/{limit}). Action: {action_text}",
parse_mode=ParseMode.HTML
)
await redis.delete(warn_key)
except Exception as e:
logger.error(f"Auto-punish failed: {e}")
# Note: Floating point score logic removed here.
# --- WRAPPERS ---
async def check_admin(update, context, func):
user = update.effective_user
chat = update.effective_chat
if chat.type == 'private':
lang = await db.get_user_lang_from_db(context, user.id)
await update.message.reply_text(get_text("group_only", lang), parse_mode=ParseMode.HTML)
return
settings = await db.get_all_settings(context, chat.id)
allow_admins = settings.get('allow_admins_manage', False)
owner_id = await get_group_owner_id(context, chat.id)
bot_owner_id = context.bot_data.get('BOT_OWNER_ID')
is_real_owner = (user.id == owner_id) or (user.id == bot_owner_id)
if not is_real_owner:
redis = get_redis_client_for_chat(context, chat.id)
cache_key = f"is_admin:{chat.id}:{user.id}"
is_admin = await is_group_admin(context, chat.id, user.id)
if not is_admin: return
if not allow_admins:
lang = await db.get_user_lang_from_db(context, user.id)
await update.message.reply_text(get_text("owner_only_command_text", lang), parse_mode=ParseMode.HTML)
return
await func(update, context)
def bot_owner_only(func):
async def wrapper(update, context, *args, **kwargs):
if update.effective_user.id != context.bot_data.get('BOT_OWNER_ID'): return
return await func(update, context, *args, **kwargs)
return wrapper
# --- KEYBOARDS ---
def get_initial_keyboard(lang="en"):
buttons = [[InlineKeyboardButton(get_text("admin_panel_button", lang), callback_data='show_admin_panel')]]
return InlineKeyboardMarkup(buttons)
def get_admin_commands_keyboard(lang="en", category="main"):
if category == "main":
buttons = [
[InlineKeyboardButton(get_text("btn_security", lang), callback_data="panel_security"),
InlineKeyboardButton(get_text("btn_management", lang), callback_data="panel_management")],
[InlineKeyboardButton(get_text("btn_advanced", lang), callback_data="panel_advanced"),
InlineKeyboardButton(get_text("btn_tools", lang), callback_data="panel_tools")],
[InlineKeyboardButton(get_text("back_button", lang), callback_data="back_to_start")]
]
return InlineKeyboardMarkup(buttons)
cmds = []
if category == "security":
cmds = ["autowarn", "blockforwards", "antiduplicate", "spamscan", "ocrscan", "linktomute", "blockbiolink", "traffic"]
elif category == "management":
cmds = ["addword", "delword", "wordlist", "maxlength", "autodelete", "manageadmins", "allowusernames", "resetsettings"]
elif category == "advanced":
cmds = ["filter", "smartnotes", "welcomecard", "setwelcome", "sentiment", "ticket", "trustscore"]
elif category == "tools":
cmds = ["shadow", "whois", "clean_ghosts", "grant", "whisper"]
buttons = [[InlineKeyboardButton(get_text(f"{cmd}_button_label", lang), callback_data=f'show_cmd_example_{cmd}') for cmd in cmds[i:i+2]] for i in range(0, len(cmds), 2)]
buttons.append([InlineKeyboardButton(get_text("btn_back_menu", lang), callback_data='show_admin_panel')])
return InlineKeyboardMarkup(buttons)
# --- TRAFFIC CONTROL ---
async def check_traffic_control(context, chat_id):
redis = get_redis_client_for_chat(context, chat_id)
if not redis: return
lock_flag = f"is_locked:{chat_id}"
if await redis.get(lock_flag): return
window_key = f"traffic_count:{chat_id}:{int(time.time() // 5)}"
count = await redis.incr(window_key)
if count == 1: await redis.expire(window_key, 10)
if count >= 5:
logger.info(f"🚨 Traffic Alert triggered for {chat_id}")
await redis.set(lock_flag, "1", ex=40)
try:
chat_obj = await context.bot.get_chat(chat_id)
p = chat_obj.permissions
perms_dict = p.to_dict()
perms_dict['can_send_messages'] = True
await redis.set(f"backup_perms:{chat_id}", json.dumps(perms_dict).decode('utf-8'), ex=120)
await context.bot.send_message(
chat_id,
"🚦 <b>Traffic Jam!</b>\nGroup locked for 15s to prevent spam.",
parse_mode=ParseMode.HTML
)
await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=False))
await asyncio.sleep(15)
backup_data = await redis.get(f"backup_perms:{chat_id}")
if backup_data:
s = json.loads(backup_data)
s.pop('can_send_media_messages', None)
restore_obj = ChatPermissions(**s)
await context.bot.set_chat_permissions(chat_id, restore_obj)
await context.bot.send_message(chat_id, "✅ <b>Traffic Normal.</b> Settings restored.", parse_mode=ParseMode.HTML)
except Exception as e:
logger.error(f"Traffic Control Error: {e}")
try: await context.bot.set_chat_permissions(chat_id, ChatPermissions(can_send_messages=True))
except: pass
finally:
await redis.delete(lock_flag)
# --- REGEX CACHE ---
@lru_cache(maxsize=128)
def _compile_forbidden_regex(words_tuple):
if not words_tuple: return None
pattern = r'(?:^|\W)(' + '|'.join(re.escape(word) for word in sorted(words_tuple, key=len, reverse=True)) + r')(?:$|\W)'
try: return re.compile(pattern, re.IGNORECASE | re.UNICODE)
except re.error: return None
def get_forbidden_words_regex(words: list):
if not words: return None
return _compile_forbidden_regex(tuple(words))
async def get_settings_from_cache_and_update_regex(context, chat_id):
settings = await db.get_all_settings(context, chat_id)
settings['fw_regex'] = get_forbidden_words_regex(settings.get('forbidden_words', []))
return settings
# --- DELETION REPORT ---
async def send_deletion_report(context, user, chat, reason_key, content):
report_channel_id = context.bot_data.get('REPORT_CHANNEL_ID')
if not report_channel_id: return
try:
owner_id = await get_group_owner_id(context, chat.id)
owner_lang = await db.get_user_lang_from_db(context, owner_id or user.id)
reason_text = get_text(reason_key, lang=owner_lang)
user_mention = f"<a href='tg://user?id={user.id}'>{html.escape(user.first_name)}</a>"
try:
invite_link = await chat.export_invite_link() if not chat.username else f"https://t.me/{chat.username}"
group_link = f'<a href="{invite_link}">{html.escape(chat.title)}</a>'
except:
group_link = f"{html.escape(chat.title)}"
report_message = (
f"🗑️ <b>Message Deleted</b>\n\n"
f"🏢 <b>Group:</b> {group_link} (<code>{chat.id}</code>)\n"
f"👤 <b>User:</b> {user_mention} (<code>{user.id}</code>)\n"
f"📝 <b>Reason:</b> {html.escape(reason_text)}\n"
f"📜 <b>Content:</b> <blockquote>{html.escape(content[:500])}</blockquote>"
)
await context.bot.send_message(chat_id=report_channel_id, text=report_message, parse_mode=ParseMode.HTML, disable_web_page_preview=True)
except Exception as e:
logger.error(f"Failed to send deletion report: {e}")
# --- MISC UTILS ---
async def is_channel_or_group_username(context, username) -> bool:
username_clean = username.replace('@', '').strip()
if not username_clean: return False
redis_client = get_redis_client_for_user(context)
cache_key = f"uname_type:{username_clean.lower()}"
try:
cached_type = await redis_client.get(cache_key)
if cached_type: return cached_type == "channel"
except Exception: pass
try:
chat_obj = await context.bot.get_chat(f"@{username_clean}")
is_channel = chat_obj.type in ['channel', 'supergroup', 'group']
val = "channel" if is_channel else "user"
await redis_client.set(cache_key, val, ex=86400)
return is_channel
except:
await redis_client.set(cache_key, "user", ex=86400)
return False
def analyze_sentiment(text: str) -> str:
if not text: return None
text = text.lower()
pos_words = ['good', 'great', 'love', 'best', 'thanks', 'thank you', 'amazing', 'cool', 'ধন্যবাদ', 'ভালো', 'সেরা', 'সুন্দর', 'ওসাম', 'লভব', 'জোশ', 'धन्यवाद', 'अच्छा', 'बढ़िया', 'प्रेम', 'सुंदर']
neg_words = ['bad', 'worst', 'scam', 'fake', 'hate', 'ugly', 'stupid', 'useless', 'বাজে', 'খারাপ', 'ভুয়া', 'স্ক্যাম', 'ফালতু', 'ঘৃণা', 'बकवास', 'खराब', 'झूठ', 'घटिया']
if any(w in text for w in pos_words): return 'positive'
if any(w in text for w in neg_words): return 'negative'
return None
# --- TOGGLE & NUMERIC WRAPPERS ---
async def toggle_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str, owner_only: bool = False):
async def logic(u, c):
chat_id = u.effective_chat.id
lang = await db.get_user_lang_from_db(c, u.effective_user.id)
# মালিকের জন্য অতিরিক্ত সুরক্ষা চেক
if owner_only:
owner_id = await get_group_owner_id(c, chat_id)
bot_owner = c.bot_data.get('BOT_OWNER_ID')
if u.effective_user.id != owner_id and u.effective_user.id != bot_owner:
await u.message.reply_html(get_text("owner_only_command_text", lang))
return
settings = await db.get_all_settings(c, chat_id)
current_status = settings.get(db_col, False)
new_status = not current_status
if c.args:
arg = c.args[0].lower()
if arg == 'on': new_status = True
elif arg == 'off': new_status = False
await db.update_setting_in_db(c, chat_id, db_col, new_status)
await get_settings_from_cache_and_update_regex(c, chat_id)
status_text = "feature_enabled" if new_status else "feature_disabled"
feature_name = get_text(f"{key}_button_label", lang).replace("🚫 ", "").replace("✅ ", "").replace("🛠️ ", "").replace("🚦 ", "")
await u.message.reply_html(get_text(status_text, lang).format(feature=feature_name))
await check_admin(update, context, logic)
async def numeric_setting_wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, key: str, db_col: str):
async def logic(u, c):
lang = await db.get_user_lang_from_db(c, u.effective_user.id)
if not c.args:
return await u.message.reply_html(get_text(f"{key}_example_full_text", lang))
try:
val = int(c.args[0])
if val < 0: raise ValueError
except:
return await u.message.reply_text("⚠️ Please enter a valid positive number.")
await db.update_setting_in_db(c, u.effective_chat.id, db_col, val)
if val == 0:
await u.message.reply_html(get_text(f"{key}_disabled", lang))
else:
msg_key = f"{key}_set"
txt = get_text(msg_key, lang)
if key == 'maxlength': txt = txt.format(length=val)
elif key == 'autodelete': txt = txt.format(seconds=val)
await u.message.reply_html(txt)
await check_admin(update, context, logic)