| | from .. import LOGGER, user_data |
| | from ..helper.ext_utils.bot_utils import ( |
| | sync_to_async, |
| | get_telegraph_list, |
| | new_task, |
| | ) |
| | from ..helper.mirror_leech_utils.gdrive_utils.search import GoogleDriveSearch |
| | from ..helper.telegram_helper.button_build import ButtonMaker |
| | from ..helper.telegram_helper.message_utils import send_message, edit_message |
| |
|
| |
|
| | async def list_buttons(user_id, is_recursive=True, user_token=False): |
| | buttons = ButtonMaker() |
| | buttons.data_button( |
| | f"{'✅️' if user_token else '❌️'} User Token", |
| | f"list_types {user_id} ut {is_recursive} {user_token}", |
| | "header", |
| | ) |
| | buttons.data_button( |
| | f"{'✅️' if is_recursive else '❌️'} Recursive", |
| | f"list_types {user_id} rec {is_recursive} {user_token}", |
| | "header", |
| | ) |
| | buttons.data_button( |
| | "Folders", f"list_types {user_id} folders {is_recursive} {user_token}" |
| | ) |
| | buttons.data_button( |
| | "Files", f"list_types {user_id} files {is_recursive} {user_token}" |
| | ) |
| | buttons.data_button( |
| | "Both", f"list_types {user_id} both {is_recursive} {user_token}" |
| | ) |
| |
|
| | buttons.data_button("Cancel", f"list_types {user_id} cancel", "footer") |
| | return buttons.build_menu(2) |
| |
|
| |
|
| | async def _list_drive(key, message, item_type, is_recursive, user_token, user_id): |
| | LOGGER.info(f"GD Listing: {key}") |
| | if user_token: |
| | user_dict = user_data.get(user_id, {}) |
| | target_id = user_dict.get("GDRIVE_ID", "") or "" |
| | LOGGER.info(target_id) |
| | else: |
| | target_id = "" |
| | telegraph_content, contents_no = await sync_to_async( |
| | GoogleDriveSearch(is_recursive=is_recursive, item_type=item_type).drive_list, |
| | key, |
| | target_id, |
| | user_id, |
| | ) |
| | if telegraph_content: |
| | try: |
| | button = await get_telegraph_list(telegraph_content) |
| | except Exception as e: |
| | await edit_message(message, e) |
| | return |
| | msg = f"<b>Found {contents_no} result for <i>{key}</i></b>" |
| | await edit_message(message, msg, button) |
| | else: |
| | await edit_message(message, f"No result found for <i>{key}</i>") |
| |
|
| |
|
| | @new_task |
| | async def select_type(_, query): |
| | user_id = query.from_user.id |
| | message = query.message |
| | key = message.reply_to_message.text.split(maxsplit=1)[1].strip() |
| | data = query.data.split() |
| | if user_id != int(data[1]): |
| | return await query.answer(text="Not Yours!", show_alert=True) |
| | elif data[2] == "rec": |
| | await query.answer() |
| | is_recursive = not bool(eval(data[3])) |
| | buttons = await list_buttons(user_id, is_recursive, eval(data[4])) |
| | return await edit_message(message, "Choose list options:", buttons) |
| | elif data[2] == "ut": |
| | await query.answer() |
| | user_token = not bool(eval(data[4])) |
| | buttons = await list_buttons(user_id, eval(data[3]), user_token) |
| | return await edit_message(message, "Choose list options:", buttons) |
| | elif data[2] == "cancel": |
| | await query.answer() |
| | return await edit_message(message, "<i>List has been canceled!</i>") |
| | await query.answer() |
| | item_type = data[2] |
| | is_recursive = eval(data[3]) |
| | user_token = eval(data[4]) |
| | await edit_message(message, f"<b>Searching.. for <i>{key}</i></b>") |
| | await _list_drive(key, message, item_type, is_recursive, user_token, user_id) |
| |
|
| |
|
| | @new_task |
| | async def gdrive_search(_, message): |
| | if len(message.text.split()) == 1: |
| | return await send_message( |
| | message, "<i>Send a search query along with list command</i>" |
| | ) |
| | user_id = message.from_user.id |
| | buttons = await list_buttons(user_id) |
| | await send_message(message, "Choose list options:", buttons) |
| |
|