|
|
|
|
|
import os |
|
|
from asyncio import get_running_loop |
|
|
from functools import partial |
|
|
import subprocess |
|
|
|
|
|
from pykeyboard import InlineKeyboard |
|
|
from pyrogram.types import InlineKeyboardButton |
|
|
|
|
|
from unzipper import LOGGER |
|
|
from unzipper.modules.bot_data import Messages |
|
|
|
|
|
|
|
|
def __run_cmds_unzipper(command): |
|
|
ext_cmd = subprocess.Popen( |
|
|
command["cmd"], |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
shell=True |
|
|
) |
|
|
ext_out = ext_cmd.stdout.read()[:-1].decode("utf-8").rstrip('\n') |
|
|
LOGGER.info(ext_out) |
|
|
if ext_cmd.stderr: |
|
|
ext_cmd.stderr.close() |
|
|
if ext_cmd.stdout: |
|
|
ext_cmd.stdout.close() |
|
|
return ext_out |
|
|
|
|
|
|
|
|
async def run_cmds_on_cr(func, **kwargs): |
|
|
loop = get_running_loop() |
|
|
return await loop.run_in_executor(None, partial(func, kwargs)) |
|
|
|
|
|
|
|
|
|
|
|
async def _extract_with_7z_helper(path, archive_path, password=None): |
|
|
if password: |
|
|
command = f'7z x -o{path} -p"{password}" {archive_path} -y' |
|
|
else: |
|
|
command = f"7z x -o{path} {archive_path} -y" |
|
|
return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command) |
|
|
|
|
|
|
|
|
async def _test_with_7z_helper(archive_path): |
|
|
command = f'7z t {archive_path} -p"IAmVeryProbablySureThatThisPasswordWillNeverBeUsedElseItsVeryStrangeAAAAAAAAAAAAAAAAAAA" -y' |
|
|
return "Everything is Ok" in await run_cmds_on_cr(__run_cmds_unzipper, cmd=command) |
|
|
|
|
|
|
|
|
|
|
|
async def _extract_with_zstd(path, archive_path): |
|
|
command = f"zstd -f --output-dir-flat {path} -d {archive_path}" |
|
|
return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command) |
|
|
|
|
|
|
|
|
|
|
|
async def extr_files(path, archive_path, password=None): |
|
|
file_path = os.path.splitext(archive_path)[1] |
|
|
if file_path == ".zst": |
|
|
os.mkdir(path) |
|
|
return await _extract_with_zstd(path, archive_path) |
|
|
return await _extract_with_7z_helper(path, archive_path, password) |
|
|
|
|
|
|
|
|
|
|
|
async def split_files(iinput, ooutput, size): |
|
|
command = f'7z a -tzip -mx=0 "{ooutput}" "{iinput}" -v{size}b' |
|
|
await run_cmds_on_cr(__run_cmds_unzipper, cmd=command) |
|
|
spdir = ooutput.replace("/" + ooutput.split("/")[-1], "") |
|
|
return await get_files(spdir) |
|
|
|
|
|
|
|
|
|
|
|
async def merge_files(iinput, ooutput, password=None): |
|
|
if password: |
|
|
command = f'7z x -o"{ooutput}" -p"{password}" "{iinput}" -y' |
|
|
else: |
|
|
command = f'7z x -o"{ooutput}" "{iinput}" -y' |
|
|
return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command) |
|
|
|
|
|
|
|
|
|
|
|
async def get_files(path): |
|
|
path_list = [val for sublist in [[os.path.join(i[0], j) for j in i[2]] for i in os.walk(path)] for val in sublist] |
|
|
return sorted(path_list) |
|
|
|
|
|
|
|
|
|
|
|
async def make_keyboard(paths, user_id, chat_id, unziphttp, rzfile=None): |
|
|
num = 0 |
|
|
i_kbd = InlineKeyboard(row_width=1) |
|
|
data = [] |
|
|
if unziphttp: |
|
|
data.append(InlineKeyboardButton( |
|
|
Messages.UP_ALL, |
|
|
f"ext_a|{user_id}|{chat_id}|{unziphttp}|{rzfile}" |
|
|
)) |
|
|
else: |
|
|
data.append(InlineKeyboardButton( |
|
|
Messages.UP_ALL, |
|
|
f"ext_a|{user_id}|{chat_id}|{unziphttp}" |
|
|
)) |
|
|
data.append(InlineKeyboardButton(Messages.CANCEL_IT, "cancel_dis")) |
|
|
for file in paths: |
|
|
if num > 96: |
|
|
break |
|
|
if unziphttp: |
|
|
data.append(InlineKeyboardButton( |
|
|
f"{num} - {os.path.basename(file)}".encode("utf-8").decode("utf-8"), |
|
|
f"ext_f|{user_id}|{chat_id}|{num}|{unziphttp}|{rzfile}", |
|
|
)) |
|
|
else: |
|
|
data.append(InlineKeyboardButton( |
|
|
f"{num} - {os.path.basename(file)}".encode("utf-8").decode("utf-8"), |
|
|
f"ext_f|{user_id}|{chat_id}|{num}|{unziphttp}", |
|
|
)) |
|
|
num += 1 |
|
|
i_kbd.add(*data) |
|
|
return i_kbd |
|
|
|
|
|
|
|
|
async def make_keyboard_empty(user_id, chat_id, unziphttp, rzfile=None): |
|
|
i_kbd = InlineKeyboard(row_width=2) |
|
|
data = [] |
|
|
if unziphttp: |
|
|
data.append(InlineKeyboardButton( |
|
|
Messages.UP_ALL, |
|
|
f"ext_a|{user_id}|{chat_id}|{unziphttp}|{rzfile}" |
|
|
)) |
|
|
else: |
|
|
data.append(InlineKeyboardButton( |
|
|
Messages.UP_ALL, |
|
|
f"ext_a|{user_id}|{chat_id}|{unziphttp}" |
|
|
)) |
|
|
data.append(InlineKeyboardButton(Messages.CANCEL_IT, "cancel_dis")) |
|
|
i_kbd.add(*data) |
|
|
return i_kbd |
|
|
|