File size: 4,437 Bytes
afce118 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# Copyright (c) 2023 EDM115
import os
from asyncio import get_running_loop
from functools import partial
import subprocess
from pykeyboard import InlineKeyboard
from pyrogram.types import InlineKeyboardButton
from unzipper import LOGGER
from unzipper.modules.bot_data import Messages
def __run_cmds_unzipper(command):
ext_cmd = subprocess.Popen(
command["cmd"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
ext_out = ext_cmd.stdout.read()[:-1].decode("utf-8").rstrip('\n')
LOGGER.info(ext_out)
if ext_cmd.stderr:
ext_cmd.stderr.close()
if ext_cmd.stdout:
ext_cmd.stdout.close()
return ext_out
async def run_cmds_on_cr(func, **kwargs):
loop = get_running_loop()
return await loop.run_in_executor(None, partial(func, kwargs))
# Extract with 7z
async def _extract_with_7z_helper(path, archive_path, password=None):
if password:
command = f'7z x -o{path} -p"{password}" {archive_path} -y'
else:
command = f"7z x -o{path} {archive_path} -y"
return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)
async def _test_with_7z_helper(archive_path):
command = f'7z t {archive_path} -p"IAmVeryProbablySureThatThisPasswordWillNeverBeUsedElseItsVeryStrangeAAAAAAAAAAAAAAAAAAA" -y' # skipcq: FLK-E501
return "Everything is Ok" in await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)
# Extract with zstd (for .zst files)
async def _extract_with_zstd(path, archive_path):
command = f"zstd -f --output-dir-flat {path} -d {archive_path}"
return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)
# Main function to extract files
async def extr_files(path, archive_path, password=None):
file_path = os.path.splitext(archive_path)[1]
if file_path == ".zst":
os.mkdir(path)
return await _extract_with_zstd(path, archive_path)
return await _extract_with_7z_helper(path, archive_path, password)
# Split files
async def split_files(iinput, ooutput, size):
command = f'7z a -tzip -mx=0 "{ooutput}" "{iinput}" -v{size}b'
await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)
spdir = ooutput.replace("/" + ooutput.split("/")[-1], "")
return await get_files(spdir)
# Merge files
async def merge_files(iinput, ooutput, password=None):
if password:
command = f'7z x -o"{ooutput}" -p"{password}" "{iinput}" -y'
else:
command = f'7z x -o"{ooutput}" "{iinput}" -y'
return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)
# Get files in directory as a list
async def get_files(path):
path_list = [val for sublist in [[os.path.join(i[0], j) for j in i[2]] for i in os.walk(path)] for val in sublist] # skipcq: FLK-E501
return sorted(path_list)
# Make keyboard
async def make_keyboard(paths, user_id, chat_id, unziphttp, rzfile=None):
num = 0
i_kbd = InlineKeyboard(row_width=1)
data = []
if unziphttp:
data.append(InlineKeyboardButton(
Messages.UP_ALL,
f"ext_a|{user_id}|{chat_id}|{unziphttp}|{rzfile}"
))
else:
data.append(InlineKeyboardButton(
Messages.UP_ALL,
f"ext_a|{user_id}|{chat_id}|{unziphttp}"
))
data.append(InlineKeyboardButton(Messages.CANCEL_IT, "cancel_dis"))
for file in paths:
if num > 96:
break
if unziphttp:
data.append(InlineKeyboardButton(
f"{num} - {os.path.basename(file)}".encode("utf-8").decode("utf-8"),
f"ext_f|{user_id}|{chat_id}|{num}|{unziphttp}|{rzfile}",
))
else:
data.append(InlineKeyboardButton(
f"{num} - {os.path.basename(file)}".encode("utf-8").decode("utf-8"),
f"ext_f|{user_id}|{chat_id}|{num}|{unziphttp}",
))
num += 1
i_kbd.add(*data)
return i_kbd
async def make_keyboard_empty(user_id, chat_id, unziphttp, rzfile=None):
i_kbd = InlineKeyboard(row_width=2)
data = []
if unziphttp:
data.append(InlineKeyboardButton(
Messages.UP_ALL,
f"ext_a|{user_id}|{chat_id}|{unziphttp}|{rzfile}"
))
else:
data.append(InlineKeyboardButton(
Messages.UP_ALL,
f"ext_a|{user_id}|{chat_id}|{unziphttp}"
))
data.append(InlineKeyboardButton(Messages.CANCEL_IT, "cancel_dis"))
i_kbd.add(*data)
return i_kbd
|