testing / utils.py
no-name-here's picture
Upload 10 files
3f48026 verified
# Copyright (C) @TheSmartBisnu
# Channel: https://t.me/itsSmartDev
import asyncio
from time import time
from pyrogram import Client
from pyrogram.errors import FloodWait
from pyrogram.types import InputMediaPhoto, InputMediaVideo, InputMediaAudio, InputMediaDocument
from helpers.files import get_download_path, cleanup_download, get_readable_file_size
from helpers.msg import get_file_name
from logger import LOGGER
def progressArgs(action: str, progress_message, start_time: float):
"""Return kwargs tuple for Leaves.progress_for_pyrogram."""
return (action, progress_message, start_time)
async def send_media(
bot: Client,
message, # _MsgProxy or real pyrogram Message
media_path: str,
media_type: str,
caption: str,
progress_message,
start_time: float,
):
"""Upload a local file back to the user via Pyrogram MTProto."""
chat_id = message.chat.id
reply_to = message.id
send_map = {
"photo": bot.send_photo,
"video": bot.send_video,
"audio": bot.send_audio,
"document": bot.send_document,
}
sender = send_map.get(media_type, bot.send_document)
for attempt in range(2):
try:
await sender(
chat_id,
media_path,
caption=caption or "",
reply_to_message_id=reply_to,
)
return
except FloodWait as e:
wait_s = int(getattr(e, "value", 0) or 0)
LOGGER(__name__).warning(f"FloodWait while uploading: {wait_s}s")
if wait_s > 0 and attempt == 0:
await asyncio.sleep(wait_s + 1)
continue
raise
async def processMediaGroup(chat_message, bot: Client, message) -> bool:
"""
Download all items in a media group and send them as an album (or
individually if the album send fails).
Returns True if at least one item was sent.
"""
from pyrogram import Client as _Client
user_client = None
# Retrieve the user client from the message's chat context isn't possible
# without passing it in, so we import it from main at call time.
try:
from __main__ import user as user_client
except ImportError:
pass
if user_client is None:
LOGGER(__name__).error("processMediaGroup: could not obtain user client")
return False
chat_id = message.chat.id
reply_to = message.id
try:
group_messages = await user_client.get_media_group(
chat_message.chat.id, chat_message.id
)
except Exception as e:
LOGGER(__name__).error(f"Failed to get media group: {e}")
return False
if not group_messages:
return False
downloaded_paths = []
media_list = []
for idx, gm in enumerate(group_messages):
try:
filename = get_file_name(gm.id, gm)
download_path = get_download_path(reply_to, f"grp_{idx}_{filename}")
path = await gm.download(file_name=download_path)
if not path:
continue
downloaded_paths.append(path)
caption_text = ""
if idx == 0 and (gm.caption or gm.text):
from pyrogram.parser import Parser
caption_text = Parser.unparse(
gm.caption or gm.text or "",
gm.caption_entities or gm.entities or [],
is_html=False,
)
if gm.photo:
media_list.append(InputMediaPhoto(path, caption=caption_text))
elif gm.video:
media_list.append(InputMediaVideo(path, caption=caption_text))
elif gm.audio:
media_list.append(InputMediaAudio(path, caption=caption_text))
else:
media_list.append(InputMediaDocument(path, caption=caption_text))
except Exception as e:
LOGGER(__name__).error(f"Error downloading group item {gm.id}: {e}")
if not media_list:
for p in downloaded_paths:
cleanup_download(p)
return False
try:
await bot.send_media_group(chat_id, media_list, reply_to_message_id=reply_to)
except Exception as e:
LOGGER(__name__).warning(f"send_media_group failed ({e}), sending individually…")
for path in downloaded_paths:
try:
await bot.send_document(chat_id, path, reply_to_message_id=reply_to)
except Exception as ie:
LOGGER(__name__).error(f"Individual send failed: {ie}")
for p in downloaded_paths:
cleanup_download(p)
return True