| | import math |
| | import asyncio |
| | import logging |
| | from pyrogram import filters, Client |
| | from pyrogram.types import BotCommand |
| | from pyrogram.enums.parse_mode import ParseMode |
| | from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, Message, InlineQueryResultArticle, InputTextMessageContent, InlineQueryResultPhoto |
| | from pyrogram.types import InlineQueryResultVideo, InlineQueryResultAudio, InlineQueryResultCachedDocument,WebAppInfo |
| | |
| | from FileStream import __version__ |
| | from FileStream.bot import FileStream |
| | from FileStream.config import Telegram |
| | from FileStream.Database import Database |
| | from FileStream.Exceptions import FileNotFound |
| | from FileStream.utils.FileProcessors.human_readable import humanbytes |
| | from FileStream.utils.FileProcessors.bot_utils import gen_linkx, verify_user, verify_users |
| | from FileStream.utils.FileProcessors.translation import LANG, BUTTON |
| |
|
| | |
| |
|
| | db = Database(Telegram.DATABASE_URL, Telegram.SESSION_NAME) |
| |
|
| |
|
| | |
| | @FileStream.on_message((filters.command('search') & filters.private) | (filters.regex("ꜱᴇᴀʀᴄʜ") & filters.private)) |
| | @verify_users |
| | async def search_files(bot: Client, message: Message, response): |
| |
|
| | await message.reply(text="ꜱᴇᴀʀᴄʜ ɪɴ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ", |
| | reply_markup=InlineKeyboardMarkup([[ |
| | InlineKeyboardButton( |
| | 'Search', switch_inline_query_current_chat='') |
| | ]])) |
| |
|
| | @FileStream.on_message(filters.command("webview")) |
| | @verify_users |
| | async def send_webview_button(bot: Client, message: Message, response): |
| | |
| | webapp_button = InlineKeyboardMarkup( |
| | [ |
| | [InlineKeyboardButton("Open WebApp", web_app=WebAppInfo(url="https://youtube.com"))] |
| | ] |
| | ) |
| |
|
| | await message.reply_text( |
| | "Click the button below to open the WebApp:", |
| | reply_markup=webapp_button |
| | |
| | ) |
| | |
| | @FileStream.on_message((filters.command('files') & filters.private) | (filters.regex("ᴍʏ ᴘᴜʙʟɪᴄ ꜰɪʟᴇꜱ") & filters.private)) |
| | @verify_users |
| | async def my_files(bot: Client, message: Message, response): |
| |
|
| | user_files, total_files = await db.find_files(message.from_user.id, [1, 10]) |
| |
|
| | file_list = [] |
| | async for x in user_files: |
| | file_list.append([ |
| | InlineKeyboardButton(f"📦 {x['file']['caption']}", |
| | callback_data=f"myfile_{x['_id']}_{1}") |
| | ]) |
| | if total_files > 10: |
| | file_list.append([ |
| | InlineKeyboardButton("◄", callback_data="N/A"), |
| | InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", |
| | callback_data="N/A"), |
| | InlineKeyboardButton("►", callback_data="userfiles_2") |
| | ], ) |
| | if not file_list: |
| | file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) |
| | file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
| | await message.reply_photo(photo=Telegram.FILE_PIC, |
| | caption="Total files: {}".format(total_files), |
| | reply_markup=InlineKeyboardMarkup(file_list)) |
| |
|
| |
|
| | |
| | @FileStream.on_message((filters.command('myfiles') & filters.private) | (filters.regex("ᴘʀɪᴠᴀᴛᴇ ꜱᴘᴀᴄᴇ") & filters.private)) |
| | @verify_users |
| | async def my_privfiles(bot: Client, message: Message, response): |
| | user_files, total_files = await db.find_private_files( |
| | message.from_user.id, [1, 10]) |
| |
|
| | file_list = [] |
| | async for x in user_files: |
| | file_list.append([ |
| | InlineKeyboardButton(f" 📦 {x['file']['caption']}",callback_data=f"myprivfile_{x['_id']}_{1}") |
| | ]) |
| | if total_files > 10: |
| | file_list.append([ |
| | InlineKeyboardButton("◄", callback_data="N/A"), |
| | InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}", |
| | callback_data="N/A"), |
| | InlineKeyboardButton("►", callback_data="userfiles_2") |
| | ], ) |
| | if not file_list: |
| | file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) |
| | file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
| | await message.reply_photo(photo=Telegram.FILE_PIC, |
| | caption="Total files: {}".format(total_files), |
| | reply_markup=InlineKeyboardMarkup(file_list)) |
| |
|
| |
|
| | |
| | @FileStream.on_message((filters.command('filebank') & filters.private) | (filters.regex("ꜰɪʟᴇ ʙᴀɴᴋ") & filters.private)) |
| | @verify_users |
| | async def my_filebank(bot: Client, message: Message, response): |
| |
|
| | if await db.is_admin(message.from_user.id): |
| | user_files, total_files = await db.find_all_files([1, 10]) |
| | else: |
| | user_files, total_files = await db.find_all_public_files([1, 10]) |
| |
|
| | file_list = [] |
| | async for x in user_files: |
| | file_list.append([ |
| | InlineKeyboardButton(f"📦 {x['file']['caption']}",callback_data=f"allfile_{x['_id']}_{1}")]) |
| | if total_files > 10: |
| | file_list.append([ |
| | InlineKeyboardButton("◄", callback_data="N/A"), |
| | InlineKeyboardButton(f"1/{math.ceil(total_files / 10)}",callback_data="N/A"), |
| | InlineKeyboardButton("►", callback_data="userallfiles_2") |
| | ], ) |
| | if not file_list: |
| | file_list.append([InlineKeyboardButton("ᴇᴍᴘᴛʏ", callback_data="N/A")], ) |
| | file_list.append([InlineKeyboardButton("ᴄʟᴏsᴇ", callback_data="close")]) |
| |
|
| | await message.reply_photo(photo=Telegram.FILE_PIC, |
| | caption="Total files: {}".format(total_files), |
| | reply_markup=InlineKeyboardMarkup(file_list)) |
| |
|
| |
|
| | @FileStream.on_inline_query() |
| | async def handle_inline_query(client, query): |
| | results = [] |
| | if '|' in query.query: |
| | text, file_type = query.query.split('|', maxsplit=1) |
| | text = text.strip() |
| | file_type = file_type.strip().lower() |
| | else: |
| | text = query.query.strip() |
| | file_type = None |
| | offset = int(query.offset or 0) |
| | files, next_offset = await db.get_search_results(text,file_type=file_type,max_results=10,offset=offset) |
| | for file in files: |
| | results.append( |
| | InlineQueryResultCachedDocument( |
| | title=file['file']['file_name'], |
| | document_file_id=file['file']['file_id'], |
| | caption=file['file']['file_name'] or "", |
| | description=f"Size: {humanbytes(file['file']['file_size'])}\nType:{file['file']['mime_type']} ", |
| | reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton('Search',switch_inline_query_current_chat='')]]) |
| | ) |
| | ) |
| | if results: |
| | switch_pm_text = f"Results" |
| | if text: |
| | switch_pm_text += f" for {text}" |
| |
|
| | await query.answer(results=results, cache_time=300, switch_pm_text=str(switch_pm_text), switch_pm_parameter="start", next_offset=str(next_offset) ) |
| | else: |
| | switch_pm_text = f'No results' |
| | if text: |
| | switch_pm_text += f' for "{text}"' |
| |
|
| | await query.answer(results=[],cache_time=300,switch_pm_text=switch_pm_text,switch_pm_parameter="okay") |
| | |
| |
|