File size: 5,827 Bytes
8db43b6 7eace11 d963cc7 8db43b6 1f3bcd2 8db43b6 1f3bcd2 8db43b6 1f3bcd2 8db43b6 1f3bcd2 8db43b6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | # This file is a part of TG-FileStreamBot
from __future__ import annotations
from pyrogram.errors import UserNotParticipant
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES
from WebStreamer.utils.Translation import Language
from WebStreamer.utils.database import Database
from WebStreamer.utils.file_properties import get_media_file_size, get_name
from WebStreamer.utils.human_readable import humanbytes
from WebStreamer.vars import Var
db = Database(Var.DATABASE_URL, Var.SESSION_NAME)
async def is_user_joined(message:Message,lang) -> bool:
try:
user = await message._client.get_chat_member(Var.UPDATES_CHANNEL, message.chat.id)
if user.status == "BANNED":
await message.reply_text(
text=lang.BAN_TEXT.format(Var.OWNER_ID),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True
)
return False
except UserNotParticipant:
await message.reply_text(
text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton("Jᴏɪɴ ɴᴏᴡ 🔓", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
]]
),
parse_mode=ParseMode.HTML
)
return False
except Exception:
await message.reply_text(
text=f"<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='https://t.me/{Var.UPDATES_CHANNEL}'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>",
parse_mode=ParseMode.HTML,
disable_web_page_preview=True)
return False
return True
# Generate Text, Stream Link, reply_markup
async def gen_link(m: Message, _id, name: list) -> tuple[InlineKeyboardMarkup, str]:
"""Generate Text for Stream Link, Reply Text and reply_markup"""
lang = Language(m)
file_name = get_name(m)
file_size = humanbytes(get_media_file_size(m))
import random
available_urls = Var.MULTI_URLS if Var.MULTI_URLS else ([Var.WORKER_URL] if Var.WORKER_URL else [Var.URL])
base_url = random.choice(available_urls)
if not base_url.endswith("/"):
base_url += "/"
page_link = f"{base_url}watch/{_id}"
stream_link = f"{base_url}dl/{_id}"
stream_text=lang.STREAM_MSG_TEXT.format(file_name, file_size, stream_link, page_link, name[0], name[1])
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton("🖥STREAM", url=page_link), InlineKeyboardButton("Dᴏᴡɴʟᴏᴀᴅ 📥", url=stream_link)]
]
)
return reply_markup, stream_text
async def is_user_banned(message, lang) -> bool:
if await db.is_user_banned(message.from_user.id):
await message.reply_text(
text=lang.BAN_TEXT.format(Var.OWNER_ID),
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True
)
return True
return False
async def is_user_exist(message: Message):
if not bool(await db.get_user(message.from_user.id)):
await db.add_user(message.from_user.id)
await message._client.send_message(
Var.BIN_CHANNEL,
f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ:** \n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{message.from_user.first_name}](tg://user?id={message.from_user.id}) __Sᴛᴀʀᴛᴇᴅ Yᴏᴜʀ Bᴏᴛ !!__"
)
async def is_user_accepted_tos(message: Message) -> bool:
user=await db.get_user(message.from_user.id)
if not ("agreed_to_tos" in user) or not user["agreed_to_tos"]:
await message.reply(f"Hi {message.from_user.mention},\nplease read and accept the Terms of Service to continue using the bot")
await message.reply_text(
Var.TOS,
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("I accept the TOS", callback_data=f"accepttos_{message.from_user.id}")]])
)
return False
return True
async def is_allowed(message: Message):
if Var.ALLOWED_USERS and not ((str(message.from_user.id) in Var.ALLOWED_USERS) or (message.from_user.username in Var.ALLOWED_USERS)):
await message.reply("You are not in the allowed list of users who can use me.", quote=True)
return False
return True
async def validate_user(message: Message, lang=None) -> bool:
if not await is_allowed(message):
print("User validation failed: Not in allowed users")
return False
await is_user_exist(message)
if Var.TOS:
if not await is_user_accepted_tos(message):
print("User validation failed: TOS not accepted")
return False
if not lang:
lang = Language(message)
if await is_user_banned(message, lang):
print("User validation failed: User banned")
return False
if Var.FORCE_UPDATES_CHANNEL:
if not await is_user_joined(message,lang):
print("User validation failed: User not in update channel")
return False
return True
def file_format(file_id: str | FileId) -> str:
if isinstance(file_id, str):
file_id=FileId.decode(file_id)
if file_id.file_type in PHOTO_TYPES:
return "Photo"
elif file_id.file_type == FileType.VOICE:
return "Voice"
elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE):
return "Video"
elif file_id.file_type == FileType.DOCUMENT:
return "Document"
elif file_id.file_type == FileType.STICKER:
return "Sticker"
elif file_id.file_type == FileType.AUDIO:
return "Audio"
else:
return "Unknown"
|