|
|
|
|
|
import os |
|
|
import pathlib |
|
|
import re |
|
|
import shutil |
|
|
import subprocess |
|
|
import asyncio |
|
|
from time import time |
|
|
|
|
|
from pyrogram.errors import FloodWait |
|
|
|
|
|
from config import Config |
|
|
from unzipper import LOGGER |
|
|
from unzipper import unzipperbot |
|
|
from unzipper.helpers.database import get_upload_mode |
|
|
from unzipper.helpers.unzip_help import extentions_list, progress_urls |
|
|
from unzipper.helpers.unzip_help import progress_for_pyrogram |
|
|
from unzipper.modules.bot_data import Messages |
|
|
from unzipper.modules.ext_script.custom_thumbnail import thumb_exists |
|
|
|
|
|
|
|
|
|
|
|
async def run_shell_cmds(command): |
|
|
run = subprocess.Popen( |
|
|
command, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
shell=True |
|
|
) |
|
|
shell_output = run.stdout.read()[:-1].decode("utf-8").rstrip('\n') |
|
|
LOGGER.info(shell_output) |
|
|
if run.stderr: |
|
|
run.stderr.close() |
|
|
if run.stdout: |
|
|
run.stdout.close() |
|
|
return shell_output |
|
|
|
|
|
|
|
|
|
|
|
async def get_size(doc_f): |
|
|
try: |
|
|
fsize = os.stat(doc_f).st_size |
|
|
return fsize |
|
|
except: |
|
|
return -1 |
|
|
|
|
|
|
|
|
|
|
|
async def send_file(unzip_bot, c_id, doc_f, query, full_path, log_msg, split): |
|
|
fsize = await get_size(doc_f) |
|
|
if fsize == -1 or fsize == 0: |
|
|
try: |
|
|
await unzipperbot.send_message(c_id, Messages.EMPTY_FILE.format(os.path.basename(doc_f))) |
|
|
except: |
|
|
pass |
|
|
return |
|
|
try: |
|
|
ul_mode = await get_upload_mode(c_id) |
|
|
fname = os.path.basename(doc_f) |
|
|
fext = ((pathlib.Path(os.path.abspath(doc_f)).suffix).casefold().replace(".", "")) |
|
|
thumbornot = await thumb_exists(c_id) |
|
|
upmsg = await unzipperbot.send_message(c_id, Messages.PROCESSING2) |
|
|
if ul_mode == "media" and fext in extentions_list["audio"]: |
|
|
if thumbornot: |
|
|
thumb_image = Config.THUMB_LOCATION + "/" + str(c_id) + ".jpg" |
|
|
sentfile = await unzip_bot.send_audio( |
|
|
chat_id=c_id, |
|
|
audio=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
thumb=thumb_image, |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
else: |
|
|
sentfile = await unzip_bot.send_audio( |
|
|
chat_id=c_id, |
|
|
audio=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
elif ul_mode == "media" and fext in extentions_list["photo"]: |
|
|
|
|
|
sentfile = await unzip_bot.send_photo( |
|
|
chat_id=c_id, |
|
|
photo=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
elif ul_mode == "media" and fext in extentions_list["video"]: |
|
|
vid_duration = await run_shell_cmds( |
|
|
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {doc_f}" |
|
|
) |
|
|
if thumbornot: |
|
|
thumb_image = Config.THUMB_LOCATION + "/" + str(c_id) + ".jpg" |
|
|
sentfile = await unzip_bot.send_video( |
|
|
chat_id=c_id, |
|
|
video=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
duration=int(vid_duration) if vid_duration.isnumeric() else 0, |
|
|
thumb=thumb_image, |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
else: |
|
|
thmb_pth = ( |
|
|
f"{Config.THUMB_LOCATION}/thumbnail_{os.path.basename(doc_f)}.jpg" |
|
|
) |
|
|
if os.path.exists(thmb_pth): |
|
|
os.remove(thmb_pth) |
|
|
try: |
|
|
await run_shell_cmds( |
|
|
f"ffmpeg -ss 00:00:00.00 -i {doc_f} -vf 'scale=320:320:force_original_aspect_ratio=decrease' -vframes 1 {thmb_pth}" |
|
|
) |
|
|
except Exception as e: |
|
|
LOGGER.warning(e) |
|
|
shutil.copy(Config.BOT_THUMB, thmb_pth) |
|
|
try: |
|
|
sentfile = await unzip_bot.send_video( |
|
|
chat_id=c_id, |
|
|
video=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
duration=int(vid_duration) if vid_duration.isnumeric() else 0, |
|
|
thumb=str(thmb_pth), |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
try: |
|
|
os.remove(thmb_pth) |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
try: |
|
|
sentfile = await unzip_bot.send_video( |
|
|
chat_id=c_id, |
|
|
video=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
duration=0, |
|
|
thumb=str(Config.BOT_THUMB), |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
except: |
|
|
sentfile = await unzip_bot.send_document( |
|
|
chat_id=c_id, |
|
|
document=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
force_document=True, |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
else: |
|
|
if thumbornot: |
|
|
thumb_image = Config.THUMB_LOCATION + "/" + str(c_id) + ".jpg" |
|
|
sentfile = await unzip_bot.send_document( |
|
|
chat_id=c_id, |
|
|
document=doc_f, |
|
|
thumb=thumb_image, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
force_document=True, |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
else: |
|
|
sentfile = await unzip_bot.send_document( |
|
|
chat_id=c_id, |
|
|
document=doc_f, |
|
|
caption=Messages.EXT_CAPTION.format(fname), |
|
|
force_document=True, |
|
|
progress=progress_for_pyrogram, |
|
|
progress_args=( |
|
|
Messages.TRY_UP.format(fname), |
|
|
upmsg, |
|
|
time(), |
|
|
unzip_bot, |
|
|
), |
|
|
) |
|
|
await upmsg.delete() |
|
|
os.remove(doc_f) |
|
|
except FloodWait as f: |
|
|
await asyncio.sleep(f.value) |
|
|
return await send_file(unzip_bot, c_id, doc_f, query, full_path, log_msg, split) |
|
|
except FileNotFoundError: |
|
|
try: |
|
|
await unzipperbot.send_message(c_id, Messages.CANT_FIND.format(os.path.basename(doc_f))) |
|
|
except: |
|
|
pass |
|
|
return |
|
|
except BaseException as e: |
|
|
LOGGER.error(e) |
|
|
shutil.rmtree(full_path) |
|
|
|
|
|
|
|
|
async def forward_file(message, cid): |
|
|
try: |
|
|
await unzipperbot.copy_message( |
|
|
chat_id=cid, |
|
|
from_chat_id=message.chat.id, |
|
|
message_id=message.id, |
|
|
) |
|
|
except FloodWait as f: |
|
|
await asyncio.sleep(f.value) |
|
|
return await forward_file(message, cid) |
|
|
|
|
|
|
|
|
async def send_url_logs(unzip_bot, c_id, doc_f, source, message): |
|
|
try: |
|
|
u_file_size = os.stat(doc_f).st_size |
|
|
if Config.TG_MAX_SIZE < int(u_file_size): |
|
|
await unzip_bot.send_message( |
|
|
chat_id=c_id, |
|
|
text=Messages.TOO_LARGE |
|
|
) |
|
|
return |
|
|
fname = os.path.basename(doc_f) |
|
|
await unzip_bot.send_document( |
|
|
chat_id=c_id, |
|
|
document=doc_f, |
|
|
caption=Messages.LOG_CAPTION.format(fname, source), |
|
|
progress=progress_urls, |
|
|
progress_args=( |
|
|
Messages.CHECK_MSG, |
|
|
message, |
|
|
time(), |
|
|
), |
|
|
) |
|
|
except FloodWait as f: |
|
|
await asyncio.sleep(f.value) |
|
|
return send_url_logs(unzip_bot, c_id, doc_f, source, message) |
|
|
except FileNotFoundError: |
|
|
await unzip_bot.send_message( |
|
|
chat_id=Config.LOGS_CHANNEL, |
|
|
text=Messages.ARCHIVE_GONE, |
|
|
) |
|
|
except BaseException: |
|
|
pass |
|
|
|
|
|
|
|
|
async def merge_splitted_archives(user_id, path): |
|
|
cmd = f"cd {path} && cat * > MERGED_{user_id}.zip" |
|
|
await run_shell_cmds(cmd) |
|
|
|
|
|
|
|
|
|
|
|
async def rm_mark_chars(text: str): |
|
|
return re.sub("[*`_]", "", text) |
|
|
|
|
|
|
|
|
|
|
|
async def answer_query( |
|
|
query, message_text: str, answer_only: bool = False, unzip_client=None, buttons=None |
|
|
): |
|
|
try: |
|
|
if answer_only: |
|
|
await query.answer(await rm_mark_chars(message_text), show_alert=True) |
|
|
else: |
|
|
await query.message.edit(message_text, reply_markup=buttons) |
|
|
except: |
|
|
try: |
|
|
if unzip_client: |
|
|
await unzip_client.send_message( |
|
|
chat_id=query.message.chat.id, text=message_text, reply_markup=buttons |
|
|
) |
|
|
else: |
|
|
await unzipperbot.send_message( |
|
|
chat_id=query.message.chat.id, text=message_text, reply_markup=buttons |
|
|
) |
|
|
except: |
|
|
pass |
|
|
|