File size: 4,437 Bytes
afce118
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Copyright (c) 2023 EDM115
import os
from asyncio import get_running_loop
from functools import partial
import subprocess

from pykeyboard import InlineKeyboard
from pyrogram.types import InlineKeyboardButton

from unzipper import LOGGER
from unzipper.modules.bot_data import Messages


def __run_cmds_unzipper(command):
    ext_cmd = subprocess.Popen(
        command["cmd"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        shell=True
    )
    ext_out = ext_cmd.stdout.read()[:-1].decode("utf-8").rstrip('\n')
    LOGGER.info(ext_out)
    if ext_cmd.stderr:
        ext_cmd.stderr.close()
    if ext_cmd.stdout:
        ext_cmd.stdout.close()
    return ext_out


async def run_cmds_on_cr(func, **kwargs):
    loop = get_running_loop()
    return await loop.run_in_executor(None, partial(func, kwargs))


# Extract with 7z
async def _extract_with_7z_helper(path, archive_path, password=None):
    if password:
        command = f'7z x -o{path} -p"{password}" {archive_path} -y'
    else:
        command = f"7z x -o{path} {archive_path} -y"
    return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)


async def _test_with_7z_helper(archive_path):
    command = f'7z t {archive_path} -p"IAmVeryProbablySureThatThisPasswordWillNeverBeUsedElseItsVeryStrangeAAAAAAAAAAAAAAAAAAA" -y'  # skipcq: FLK-E501
    return "Everything is Ok" in await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)


# Extract with zstd (for .zst files)
async def _extract_with_zstd(path, archive_path):
    command = f"zstd -f --output-dir-flat {path} -d {archive_path}"
    return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)


# Main function to extract files
async def extr_files(path, archive_path, password=None):
    file_path = os.path.splitext(archive_path)[1]
    if file_path == ".zst":
        os.mkdir(path)
        return await _extract_with_zstd(path, archive_path)
    return await _extract_with_7z_helper(path, archive_path, password)


# Split files
async def split_files(iinput, ooutput, size):
    command = f'7z a -tzip -mx=0 "{ooutput}" "{iinput}" -v{size}b'
    await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)
    spdir = ooutput.replace("/" + ooutput.split("/")[-1], "")
    return await get_files(spdir)


# Merge files
async def merge_files(iinput, ooutput, password=None):
    if password:
        command = f'7z x -o"{ooutput}" -p"{password}" "{iinput}" -y'
    else:
        command = f'7z x -o"{ooutput}" "{iinput}" -y'
    return await run_cmds_on_cr(__run_cmds_unzipper, cmd=command)


# Get files in directory as a list
async def get_files(path):
    path_list = [val for sublist in [[os.path.join(i[0], j) for j in i[2]] for i in os.walk(path)] for val in sublist]  # skipcq: FLK-E501
    return sorted(path_list)


# Make keyboard
async def make_keyboard(paths, user_id, chat_id, unziphttp, rzfile=None):
    num = 0
    i_kbd = InlineKeyboard(row_width=1)
    data = []
    if unziphttp:
        data.append(InlineKeyboardButton(
            Messages.UP_ALL,
            f"ext_a|{user_id}|{chat_id}|{unziphttp}|{rzfile}"
        ))
    else:
        data.append(InlineKeyboardButton(
            Messages.UP_ALL,
            f"ext_a|{user_id}|{chat_id}|{unziphttp}"
        ))
    data.append(InlineKeyboardButton(Messages.CANCEL_IT, "cancel_dis"))
    for file in paths:
        if num > 96:
            break
        if unziphttp:
            data.append(InlineKeyboardButton(
                f"{num} - {os.path.basename(file)}".encode("utf-8").decode("utf-8"),
                f"ext_f|{user_id}|{chat_id}|{num}|{unziphttp}|{rzfile}",
            ))
        else:
            data.append(InlineKeyboardButton(
                f"{num} - {os.path.basename(file)}".encode("utf-8").decode("utf-8"),
                f"ext_f|{user_id}|{chat_id}|{num}|{unziphttp}",
            ))
        num += 1
    i_kbd.add(*data)
    return i_kbd


async def make_keyboard_empty(user_id, chat_id, unziphttp, rzfile=None):
    i_kbd = InlineKeyboard(row_width=2)
    data = []
    if unziphttp:
        data.append(InlineKeyboardButton(
            Messages.UP_ALL,
            f"ext_a|{user_id}|{chat_id}|{unziphttp}|{rzfile}"
        ))
    else:
        data.append(InlineKeyboardButton(
            Messages.UP_ALL,
            f"ext_a|{user_id}|{chat_id}|{unziphttp}"
        ))
    data.append(InlineKeyboardButton(Messages.CANCEL_IT, "cancel_dis"))
    i_kbd.add(*data)
    return i_kbd