| | import math |
| | import logging |
| | import asyncio |
| | import traceback |
| | from pyrogram import filters, Client |
| | from pyrogram.enums.parse_mode import ParseMode |
| | from pyrogram.types import BotCommand, Message, ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton |
| |
|
| | from FileStream import __version__ |
| | from FileStream.bot import FileStream |
| | from FileStream.Exceptions import FIleNotFound |
| | from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users |
| | from FileStream.config import Telegram |
| | from FileStream.Database import Database |
| | from FileStream.utils.FileProcessors.translation import LANG, BUTTON |
| |
|
| | db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
| |
|
| |
|
| | async def menu(bot, message): |
| | await bot.send_message( |
| | chat_id=message.from_user.id, |
| | text="ꜱᴇʟᴇᴄᴛ ꜰʀᴏᴍ ᴍᴇɴᴜ", |
| | reply_markup=ReplyKeyboardMarkup( |
| | [[("ꜰɪʟᴇ ʙᴀɴᴋ"),("ꜱᴇᴀʀᴄʜ")], [("ᴍʏ ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ"), ("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ")]], |
| | one_time_keyboard=False, |
| | resize_keyboard=True, |
| | placeholder="🄼🄰🄸🄽 🄼🄴🄽🅄 👉")) |
| |
|
| |
|
| | @FileStream.on_message(filters.command('start') & filters.private) |
| | @verify_users |
| | async def start(bot: Client, message: Message, response): |
| | if response['status'] == "AUTHORIZED": |
| | usr_cmd = message.text.split("_")[-1] |
| | if usr_cmd == "/start": |
| | await bot.set_bot_commands([ |
| | BotCommand("start", "ꜱᴛᴀʀᴛ ᴛʜᴇ ʙᴏᴛ"), |
| | BotCommand("help", "ʙᴏᴛ ꜱᴇᴛᴛɪɴɢꜱ"), |
| | BotCommand("about", "ᴀʙᴏᴜᴛ ᴛʜᴇ ʙᴏᴛ"), |
| | BotCommand("search", "ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ"), |
| | BotCommand("filebank", "ꜰɪʟᴇ ʙᴀɴᴋ"), |
| | BotCommand("files", "ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ"), |
| | BotCommand("myfiles", "ᴍʏ ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ"), |
| | ]) |
| | await menu(bot, message) |
| | if Telegram.START_PIC: |
| | await message.reply_photo(photo=Telegram.START_PIC, |
| | caption=LANG.START_TEXT.format( |
| | message.from_user.mention, |
| | FileStream.username), |
| | parse_mode=ParseMode.HTML, |
| | reply_markup=BUTTON.START_BUTTONS) |
| | else: |
| | await message.reply_text(text=LANG.START_TEXT.format( |
| | message.from_user.mention, FileStream.username), |
| | parse_mode=ParseMode.HTML, |
| | disable_web_page_preview=True, |
| | reply_markup=BUTTON.START_BUTTONS) |
| | else: |
| | if "stream_" in message.text: |
| | try: |
| | file_check = await db.get_file(usr_cmd) |
| | file_id = str(file_check['_id']) |
| | if file_id == usr_cmd: |
| | reply_markup, stream_text = await gen_linkx( |
| | m=message, |
| | _id=file_id, |
| | name=[FileStream.username, FileStream.fname]) |
| | await message.reply_text(text=stream_text, |
| | parse_mode=ParseMode.HTML, |
| | disable_web_page_preview=True, |
| | reply_markup=reply_markup, |
| | quote=True) |
| |
|
| | except FIleNotFound as e: |
| | await message.reply_text("File Not Found") |
| | except Exception as e: |
| | await message.reply_text("Something Went Wrong") |
| | logging.error(e) |
| |
|
| | elif "file_" in message.text: |
| | try: |
| | file_check = await db.get_file(usr_cmd) |
| | db_id = str(file_check['_id']) |
| | file_id = file_check['file']['file_id'] |
| | file_name = file_check['file']['file_name'] |
| | if db_id == usr_cmd: |
| | filex = await message.reply_cached_media( |
| | file_id=file_id, caption=f'**{file_name}**') |
| | await asyncio.sleep(3600) |
| | try: |
| | await filex.delete() |
| | await message.delete() |
| | except Exception: |
| | pass |
| |
|
| | except FIleNotFound as e: |
| | await message.reply_text("**File Not Found**") |
| | except Exception as e: |
| | await message.reply_text("Something Went Wrong") |
| | logging.error(e) |
| |
|
| | else: |
| | await message.reply_text(f"**Invalid Command**") |
| |
|
| |
|
| | @FileStream.on_message(filters.private & filters.command(["about"])) |
| | async def about_handler(bot, message): |
| | if Telegram.START_PIC: |
| | await message.reply_photo(photo=Telegram.START_PIC, |
| | caption=LANG.ABOUT_TEXT.format( |
| | FileStream.fname, __version__), |
| | parse_mode=ParseMode.HTML, |
| | reply_markup=BUTTON.ABOUT_BUTTONS) |
| | else: |
| | await message.reply_text(text=LANG.ABOUT_TEXT.format( |
| | FileStream.fname, __version__), |
| | disable_web_page_preview=True, |
| | reply_markup=BUTTON.ABOUT_BUTTONS) |
| |
|
| |
|
| | @FileStream.on_message((filters.command('help')) & filters.private) |
| | async def help_handler(bot, message): |
| | if Telegram.START_PIC: |
| | await message.reply_photo(photo=Telegram.START_PIC, |
| | caption=LANG.HELP_TEXT.format(Telegram.OWNER_ID), |
| | parse_mode=ParseMode.HTML, |
| | reply_markup=BUTTON.HELP_BUTTONS) |
| | else: |
| | await message.reply_text(text=LANG.HELP_TEXT.format(Telegram.OWNER_ID), |
| | parse_mode=ParseMode.HTML, |
| | disable_web_page_preview=True, |
| | reply_markup=BUTTON.HELP_BUTTONS) |
| |
|