File size: 5,593 Bytes
b0bc1a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# This file is a part of TG-FileStreamBot

from __future__ import annotations
from pyrogram.errors import UserNotParticipant
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, Message
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES
from WebStreamer.utils.Translation import Language
from WebStreamer.utils.database import Database
from WebStreamer.utils.file_properties import get_media_file_size, get_name
from WebStreamer.utils.human_readable import humanbytes
from WebStreamer.vars import Var

db = Database(Var.DATABASE_URL, Var.SESSION_NAME)

async def is_user_joined(message:Message,lang) -> bool:
    try:
        user = await message._client.get_chat_member(Var.UPDATES_CHANNEL, message.chat.id)
        if user.status == "BANNED":
            await message.reply_text(
                text=lang.BAN_TEXT.format(Var.OWNER_ID),
                parse_mode=ParseMode.MARKDOWN,
                disable_web_page_preview=True
            )
            return False
    except UserNotParticipant:
        await message.reply_text(
            text="<i>Jᴏɪɴ ᴍʏ ᴜᴘᴅᴀᴛᴇ ᴄʜᴀɴɴᴇʟ ᴛᴏ ᴜsᴇ ᴍᴇ 🔐</i>",
            reply_markup=InlineKeyboardMarkup(
                [[
                    InlineKeyboardButton("Jᴏɪɴ ɴᴏᴡ 🔓", url=f"https://t.me/{Var.UPDATES_CHANNEL}")
                    ]]
            ),
            parse_mode=ParseMode.HTML
        )
        return False
    except Exception:
        await message.reply_text(
            text=f"<i>Sᴏᴍᴇᴛʜɪɴɢ ᴡʀᴏɴɢ ᴄᴏɴᴛᴀᴄᴛ ᴍʏ ᴅᴇᴠᴇʟᴏᴘᴇʀ</i> <b><a href='https://t.me/{Var.UPDATES_CHANNEL}'>[ ᴄʟɪᴄᴋ ʜᴇʀᴇ ]</a></b>",
            parse_mode=ParseMode.HTML,
            disable_web_page_preview=True)
        return False
    return True

# Generate Text, Stream Link, reply_markup
async def gen_link(m: Message, _id, name: list) -> tuple[InlineKeyboardMarkup, str]:
    """Generate Text for Stream Link, Reply Text and reply_markup"""
    lang = Language(m)
    file_name = get_name(m)
    file_size = humanbytes(get_media_file_size(m))
    page_link = f"{Var.URL}watch/{_id}"

    stream_link = f"{Var.URL}dl/{_id}"
    stream_text=lang.STREAM_MSG_TEXT.format(file_name, file_size, stream_link, page_link, name[0], name[1])
    reply_markup=InlineKeyboardMarkup(
        [
            [InlineKeyboardButton("🖥STREAM", url=page_link), InlineKeyboardButton("Dᴏᴡɴʟᴏᴀᴅ 📥", url=stream_link)]
            ]
        )

    return reply_markup, stream_text

async def is_user_banned(message, lang) -> bool:
    if await db.is_user_banned(message.from_user.id):
        await message.reply_text(
            text=lang.BAN_TEXT.format(Var.OWNER_ID),
            parse_mode=ParseMode.MARKDOWN,
            disable_web_page_preview=True
        )
        return True
    return False

async def is_user_exist(message: Message):
    if not bool(await db.get_user(message.from_user.id)):
        await db.add_user(message.from_user.id)
        await message._client.send_message(
            Var.BIN_CHANNEL,
            f"**Nᴇᴡ Usᴇʀ Jᴏɪɴᴇᴅ:** \n\n__Mʏ Nᴇᴡ Fʀɪᴇɴᴅ__ [{message.from_user.first_name}](tg://user?id={message.from_user.id}) __Sᴛᴀʀᴛᴇᴅ Yᴏᴜʀ Bᴏᴛ !!__"
        )

async def is_user_accepted_tos(message: Message) -> bool:
    user=await db.get_user(message.from_user.id)
    if not ("agreed_to_tos" in user) or not user["agreed_to_tos"]:
        await message.reply(f"Hi {message.from_user.mention},\nplease read and accept the Terms of Service to continue using the bot")
        await message.reply_text(
            Var.TOS,
            reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("I accept the TOS", callback_data=f"accepttos_{message.from_user.id}")]])
            )
        return False
    return True

async def is_allowed(message: Message):
    if Var.ALLOWED_USERS and not ((str(message.from_user.id) in Var.ALLOWED_USERS) or (message.from_user.username in Var.ALLOWED_USERS)):
        await message.reply("You are not in the allowed list of users who can use me.", quote=True)
        return False
    return True

async def validate_user(message: Message, lang=None) -> bool:
    if not await is_allowed(message):
        print("User validation failed: Not in allowed users")
        return False
    await is_user_exist(message)
    if Var.TOS:
        if not await is_user_accepted_tos(message):
            print("User validation failed: TOS not accepted")
            return False

    if not lang:
        lang = Language(message)
    if await is_user_banned(message, lang):
        print("User validation failed: User banned")
        return False
    if Var.FORCE_UPDATES_CHANNEL:
        if not await is_user_joined(message,lang):
            print("User validation failed: User not in update channel")
            return False
    return True

def file_format(file_id: str | FileId) -> str:
    if isinstance(file_id, str):
        file_id=FileId.decode(file_id)
    if file_id.file_type in PHOTO_TYPES:
        return "Photo"
    elif file_id.file_type == FileType.VOICE:
        return "Voice"
    elif file_id.file_type in (FileType.VIDEO, FileType.ANIMATION, FileType.VIDEO_NOTE):
        return "Video"
    elif file_id.file_type == FileType.DOCUMENT:
        return "Document"
    elif file_id.file_type == FileType.STICKER:
        return "Sticker"
    elif file_id.file_type == FileType.AUDIO:
        return "Audio"
    else:
        return "Unknown"