tgfs / WebStreamer /utils /bot_utils.py
S VIVEGANANDAN
Push latest changes from FileStreamBot-1
b0bc1a0
# This file is a part of TG-FileStreamBot
from __future__ import annotations
from pyrogram.errors import UserNotParticipant
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES
from WebStreamer.utils.Translation import Language
from WebStreamer.utils.database import Database
from WebStreamer.utils.file_properties import get_media_file_size, get_name
from WebStreamer.utils.human_readable import humanbytes
from WebStreamer.vars import Var
db = Database(Var.DATABASE_URL, Var.SESSION_NAME)
async def is_user_joined(message:Message,lang) -> bool:
try:
user = await message._client.get_chat_member(Var.UPDATES_CHANNEL, message.chat.id)
if user.status == "BANNED":
await message.reply_text(
text=lang.BAN_TEXT.format(Var.OWNER_ID),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True
)
return False
except UserNotParticipant:
await message.reply_text(
text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton("Jᴏɪɴ ɴᴏᴡ 🔓", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
]]
),
parse_mode=ParseMode.HTML
)
return False
except Exception:
await message.reply_text(
text=f"<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='https://t.me/{Var.UPDATES_CHANNEL}'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>",
parse_mode=ParseMode.HTML,
disable_web_page_preview=True)
return False
return True
# Generate Text, Stream Link, reply_markup
async def gen_link(m: Message, _id, name: list) -> tuple[InlineKeyboardMarkup, str]:
"""Generate Text for Stream Link, Reply Text and reply_markup"""
lang = Language(m)
file_name = get_name(m)
file_size = humanbytes(get_media_file_size(m))
page_link = f"{Var.URL}watch/{_id}"
stream_link = f"{Var.URL}dl/{_id}"
stream_text=lang.STREAM_MSG_TEXT.format(file_name, file_size, stream_link, page_link, name[0], name[1])
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🖥STREAM", url=page_link), InlineKeyboardButton("Dᴏᴡɴʟᴏᴀᴅ 📥", url=stream_link)]
]
)
return reply_markup, stream_text
async def is_user_banned(message, lang) -> bool:
if await db.is_user_banned(message.from_user.id):
await message.reply_text(
text=lang.BAN_TEXT.format(Var.OWNER_ID),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True
)
return True
return False
async def is_user_exist(message: Message):
if not bool(await db.get_user(message.from_user.id)):
await db.add_user(message.from_user.id)
await message._client.send_message(
Var.BIN_CHANNEL,
f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ:** \n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{message.from_user.first_name}](tg://user?id={message.from_user.id}) __Sᴛᴀʀᴛᴇᴅ Yᴏᴜʀ Bᴏᴛ !!__"
)
async def is_user_accepted_tos(message: Message) -> bool:
user=await db.get_user(message.from_user.id)
if not ("agreed_to_tos" in user) or not user["agreed_to_tos"]:
await message.reply(f"Hi {message.from_user.mention},\nplease read and accept the Terms of Service to continue using the bot")
await message.reply_text(
Var.TOS,
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("I accept the TOS", callback_data=f"accepttos_{message.from_user.id}")]])
)
return False
return True
async def is_allowed(message: Message):
if Var.ALLOWED_USERS and not ((str(message.from_user.id) in Var.ALLOWED_USERS) or (message.from_user.username in Var.ALLOWED_USERS)):
await message.reply("You are not in the allowed list of users who can use me.", quote=True)
return False
return True
async def validate_user(message: Message, lang=None) -> bool:
if not await is_allowed(message):
print("User validation failed: Not in allowed users")
return False
await is_user_exist(message)
if Var.TOS:
if not await is_user_accepted_tos(message):
print("User validation failed: TOS not accepted")
return False
if not lang:
lang = Language(message)
if await is_user_banned(message, lang):
print("User validation failed: User banned")
return False
if Var.FORCE_UPDATES_CHANNEL:
if not await is_user_joined(message,lang):
print("User validation failed: User not in update channel")
return False
return True
def file_format(file_id: str | FileId) -> str:
if isinstance(file_id, str):
file_id=FileId.decode(file_id)
if file_id.file_type in PHOTO_TYPES:
return "Photo"
elif file_id.file_type == FileType.VOICE:
return "Voice"
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE):
return "Video"
elif file_id.file_type == FileType.DOCUMENT:
return "Document"
elif file_id.file_type == FileType.STICKER:
return "Sticker"
elif file_id.file_type == FileType.AUDIO:
return "Audio"
else:
return "Unknown"