| | import logging |
| | from pyrogram import Client, emoji, filters |
| | from pyrogram.errors.exceptions.bad_request_400 import QueryIdInvalid |
| | from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultCachedDocument, InlineQuery |
| | from database.ia_filterdb import get_search_results |
| | from utils import is_subscribed, get_size, temp |
| | from info import CACHE_TIME, AUTH_USERS, AUTH_CHANNEL, CUSTOM_FILE_CAPTION |
| |
|
| | logger = logging.getLogger(__name__) |
| | cache_time = 0 if AUTH_USERS or AUTH_CHANNEL else CACHE_TIME |
| |
|
| | async def inline_users(query: InlineQuery): |
| | if AUTH_USERS: |
| | if query.from_user and query.from_user.id in AUTH_USERS: |
| | return True |
| | else: |
| | return False |
| | if query.from_user and query.from_user.id not in temp.BANNED_USERS: |
| | return True |
| | return False |
| |
|
| | @Client.on_inline_query() |
| | async def answer(bot, query): |
| | if not await inline_users(query): |
| | await query.answer(results=[], |
| | cache_time=0, |
| | switch_pm_text='okDa', |
| | switch_pm_parameter="hehe") |
| | return |
| |
|
| | if AUTH_CHANNEL and not await is_subscribed(bot, query): |
| | await query.answer(results=[], |
| | cache_time=0, |
| | switch_pm_text='You have to subscribe my channel to use the bot', |
| | switch_pm_parameter="subscribe") |
| | return |
| |
|
| | results = [] |
| | if '|' in query.query: |
| | string, file_type = query.query.split('|', maxsplit=1) |
| | string = string.strip() |
| | file_type = file_type.strip().lower() |
| | else: |
| | string = query.query.strip() |
| | file_type = None |
| |
|
| | offset = int(query.offset or 0) |
| | reply_markup = get_reply_markup(query=string) |
| | files, next_offset, total = await get_search_results(string, file_type=file_type, max_results=10, offset=offset) |
| | |
| | for file in files: |
| | title=file.file_name |
| | size=get_size(file.file_size) |
| | f_caption=file.caption |
| | if CUSTOM_FILE_CAPTION: |
| | try: |
| | f_caption=CUSTOM_FILE_CAPTION.format(mention=query.from_user.mention, file_name= '' if title is None else title, file_size='' if size is None else size, file_caption='' if f_caption is None else f_caption) |
| | except Exception as e: |
| | logger.exception(e) |
| | f_caption=f_caption |
| | if f_caption is None: |
| | f_caption = f"{file.file_name}" |
| | results.append( |
| | InlineQueryResultCachedDocument( |
| | title=file.file_name, |
| | document_file_id=file.file_id, |
| | caption=f_caption, |
| | description=f'Size: {get_size(file.file_size)}\nType: {file.file_type}', |
| | reply_markup=reply_markup)) |
| |
|
| | if results: |
| | switch_pm_text = f"{emoji.FILE_FOLDER} Results - {total}" |
| | if string: |
| | switch_pm_text += f" for {string}" |
| | try: |
| | await query.answer(results=results, |
| | is_personal = True, |
| | cache_time=cache_time, |
| | switch_pm_text=switch_pm_text, |
| | switch_pm_parameter="start", |
| | next_offset=str(next_offset)) |
| | except QueryIdInvalid: |
| | pass |
| | except Exception as e: |
| | logging.exception(str(e)) |
| | else: |
| | switch_pm_text = f'{emoji.CROSS_MARK} No Results' |
| | if string: |
| | switch_pm_text += f' for "{string}"' |
| |
|
| | await query.answer(results=[], |
| | is_personal = True, |
| | cache_time=cache_time, |
| | switch_pm_text=switch_pm_text, |
| | switch_pm_parameter="okay") |
| |
|
| |
|
| | def get_reply_markup(query): |
| | buttons = [[InlineKeyboardButton('⟳ ꜱᴇᴀʀᴄʜ ᴀɢᴀɪɴ', switch_inline_query_current_chat=query)]] |
| | return InlineKeyboardMarkup(buttons) |
| |
|
| |
|
| |
|
| |
|