from os import getcwd, path as ospath from re import search from shlex import split from aiofiles import open as aiopen from aiofiles.os import mkdir, path as aiopath, remove as aioremove from aiohttp import ClientSession from .. import LOGGER from ..core.tg_client import TgClient from ..helper.ext_utils.bot_utils import cmd_exec from ..helper.ext_utils.telegraph_helper import telegraph from ..helper.telegram_helper.bot_commands import BotCommands from ..helper.telegram_helper.message_utils import send_message, edit_message async def gen_mediainfo(message, link=None, media=None, mmsg=None): temp_send = await send_message(message, "Generating MediaInfo...") try: path = "mediainfo/" if not await aiopath.isdir(path): await mkdir(path) file_size = 0 if link: filename = search(".+/(.+)", link).group(1) des_path = ospath.join(path, filename) headers = { "user-agent": "Mozilla/5.0 (Linux; Android 12; 2201116PI) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Mobile Safari/537.36" } async with ClientSession() as session: async with session.get(link, headers=headers) as response: file_size = int(response.headers.get("Content-Length", 0)) async with aiopen(des_path, "wb") as f: async for chunk in response.content.iter_chunked(10000000): await f.write(chunk) break elif media: des_path = ospath.join(path, media.file_name) file_size = media.file_size if file_size <= 50000000: await mmsg.download(ospath.join(getcwd(), des_path)) else: async for chunk in TgClient.bot.stream_media(media, limit=5): async with aiopen(des_path, "ab") as f: await f.write(chunk) stdout, _, _ = await cmd_exec(split(f'mediainfo "{des_path}"')) tc = f"
"
trigger = False
else:
tc += line + "\n"
tc += "/{BotCommands.MediaInfoCommand[0]} or /{BotCommands.MediaInfoCommand[1]} [media]
By reply/sending download link:
/{BotCommands.MediaInfoCommand[0]} or /{BotCommands.MediaInfoCommand[1]} [link]
"""
if len(message.command) > 1 or rply and rply.text:
link = rply.text if rply else message.command[1]
return await gen_mediainfo(message, link)
elif rply:
if file := next(
(
i
for i in [
rply.document,
rply.video,
rply.audio,
rply.voice,
rply.animation,
rply.video_note,
]
if i is not None
),
None,
):
return await gen_mediainfo(message, None, file, rply)
else:
return await send_message(message, help_msg)
else:
return await send_message(message, help_msg)