Spaces:
Sleeping
Sleeping
| # Copyright (C) @TheSmartBisnu | |
| # Channel: https://t.me/itsSmartDev | |
| import asyncio | |
| from time import time | |
| from pyrogram import Client | |
| from pyrogram.errors import FloodWait | |
| from pyrogram.types import InputMediaPhoto, InputMediaVideo, InputMediaAudio, InputMediaDocument | |
| from helpers.files import get_download_path, cleanup_download, get_readable_file_size | |
| from helpers.msg import get_file_name | |
| from logger import LOGGER | |
| def progressArgs(action: str, progress_message, start_time: float): | |
| """Return kwargs tuple for Leaves.progress_for_pyrogram.""" | |
| return (action, progress_message, start_time) | |
| async def send_media( | |
| bot: Client, | |
| message, # _MsgProxy or real pyrogram Message | |
| media_path: str, | |
| media_type: str, | |
| caption: str, | |
| progress_message, | |
| start_time: float, | |
| ): | |
| """Upload a local file back to the user via Pyrogram MTProto.""" | |
| chat_id = message.chat.id | |
| reply_to = message.id | |
| send_map = { | |
| "photo": bot.send_photo, | |
| "video": bot.send_video, | |
| "audio": bot.send_audio, | |
| "document": bot.send_document, | |
| } | |
| sender = send_map.get(media_type, bot.send_document) | |
| for attempt in range(2): | |
| try: | |
| await sender( | |
| chat_id, | |
| media_path, | |
| caption=caption or "", | |
| reply_to_message_id=reply_to, | |
| ) | |
| return | |
| except FloodWait as e: | |
| wait_s = int(getattr(e, "value", 0) or 0) | |
| LOGGER(__name__).warning(f"FloodWait while uploading: {wait_s}s") | |
| if wait_s > 0 and attempt == 0: | |
| await asyncio.sleep(wait_s + 1) | |
| continue | |
| raise | |
| async def processMediaGroup(chat_message, bot: Client, message) -> bool: | |
| """ | |
| Download all items in a media group and send them as an album (or | |
| individually if the album send fails). | |
| Returns True if at least one item was sent. | |
| """ | |
| from pyrogram import Client as _Client | |
| user_client = None | |
| # Retrieve the user client from the message's chat context isn't possible | |
| # without passing it in, so we import it from main at call time. | |
| try: | |
| from __main__ import user as user_client | |
| except ImportError: | |
| pass | |
| if user_client is None: | |
| LOGGER(__name__).error("processMediaGroup: could not obtain user client") | |
| return False | |
| chat_id = message.chat.id | |
| reply_to = message.id | |
| try: | |
| group_messages = await user_client.get_media_group( | |
| chat_message.chat.id, chat_message.id | |
| ) | |
| except Exception as e: | |
| LOGGER(__name__).error(f"Failed to get media group: {e}") | |
| return False | |
| if not group_messages: | |
| return False | |
| downloaded_paths = [] | |
| media_list = [] | |
| for idx, gm in enumerate(group_messages): | |
| try: | |
| filename = get_file_name(gm.id, gm) | |
| download_path = get_download_path(reply_to, f"grp_{idx}_{filename}") | |
| path = await gm.download(file_name=download_path) | |
| if not path: | |
| continue | |
| downloaded_paths.append(path) | |
| caption_text = "" | |
| if idx == 0 and (gm.caption or gm.text): | |
| from pyrogram.parser import Parser | |
| caption_text = Parser.unparse( | |
| gm.caption or gm.text or "", | |
| gm.caption_entities or gm.entities or [], | |
| is_html=False, | |
| ) | |
| if gm.photo: | |
| media_list.append(InputMediaPhoto(path, caption=caption_text)) | |
| elif gm.video: | |
| media_list.append(InputMediaVideo(path, caption=caption_text)) | |
| elif gm.audio: | |
| media_list.append(InputMediaAudio(path, caption=caption_text)) | |
| else: | |
| media_list.append(InputMediaDocument(path, caption=caption_text)) | |
| except Exception as e: | |
| LOGGER(__name__).error(f"Error downloading group item {gm.id}: {e}") | |
| if not media_list: | |
| for p in downloaded_paths: | |
| cleanup_download(p) | |
| return False | |
| try: | |
| await bot.send_media_group(chat_id, media_list, reply_to_message_id=reply_to) | |
| except Exception as e: | |
| LOGGER(__name__).warning(f"send_media_group failed ({e}), sending individually…") | |
| for path in downloaded_paths: | |
| try: | |
| await bot.send_document(chat_id, path, reply_to_message_id=reply_to) | |
| except Exception as ie: | |
| LOGGER(__name__).error(f"Individual send failed: {ie}") | |
| for p in downloaded_paths: | |
| cleanup_download(p) | |
| return True | |