| | from aioshutil import rmtree as aiormtree, move |
| | from asyncio import create_subprocess_exec, sleep, wait_for |
| | from asyncio.subprocess import PIPE |
| | from contextlib import suppress |
| | from psutil import disk_usage |
| | from os import path as ospath, readlink, walk |
| | from re import I, escape, search as re_search, split as re_split |
| |
|
| | from aiofiles.os import ( |
| | listdir, |
| | remove, |
| | rmdir, |
| | symlink, |
| | makedirs as aiomakedirs, |
| | path as aiopath, |
| | readlink as aioreadlink, |
| | ) |
| | from magic import Magic |
| |
|
| | from ... import DOWNLOAD_DIR, LOGGER |
| | from ...core.torrent_manager import TorrentManager |
| | from .bot_utils import cmd_exec, sync_to_async |
| | from .exceptions import NotSupportedExtractionArchive |
| |
|
| | ARCH_EXT = [ |
| | ".tar.bz2", |
| | ".tar.gz", |
| | ".bz2", |
| | ".gz", |
| | ".tar.xz", |
| | ".tar", |
| | ".tbz2", |
| | ".tgz", |
| | ".lzma2", |
| | ".zip", |
| | ".7z", |
| | ".z", |
| | ".rar", |
| | ".iso", |
| | ".wim", |
| | ".cab", |
| | ".apm", |
| | ".arj", |
| | ".chm", |
| | ".cpio", |
| | ".cramfs", |
| | ".deb", |
| | ".dmg", |
| | ".fat", |
| | ".hfs", |
| | ".lzh", |
| | ".lzma", |
| | ".mbr", |
| | ".msi", |
| | ".mslz", |
| | ".nsis", |
| | ".ntfs", |
| | ".rpm", |
| | ".squashfs", |
| | ".udf", |
| | ".vhd", |
| | ".xar", |
| | ".zst", |
| | ".zstd", |
| | ".cbz", |
| | ".apfs", |
| | ".ar", |
| | ".qcow", |
| | ".macho", |
| | ".exe", |
| | ".dll", |
| | ".sys", |
| | ".pmd", |
| | ".swf", |
| | ".swfc", |
| | ".simg", |
| | ".vdi", |
| | ".vhdx", |
| | ".vmdk", |
| | ".gzip", |
| | ".lzma86", |
| | ".sha256", |
| | ".sha512", |
| | ".sha224", |
| | ".sha384", |
| | ".sha1", |
| | ".md5", |
| | ".crc32", |
| | ".crc64", |
| | ] |
| |
|
| |
|
| | FIRST_SPLIT_REGEX = ( |
| | r"\.part0*1\.rar$|\.7z\.0*1$|\.zip\.0*1$|^(?!.*\.part\d+\.rar$).*\.rar$" |
| | ) |
| |
|
| | SPLIT_REGEX = r"\.r\d+$|\.7z\.\d+$|\.z\d+$|\.zip\.\d+$|\.part\d+\.rar$" |
| |
|
| |
|
| | def is_first_archive_split(file): |
| | return bool(re_search(FIRST_SPLIT_REGEX, file.lower(), I)) |
| |
|
| |
|
| | def is_archive(file): |
| | return file.strip().lower().endswith(tuple(ARCH_EXT)) |
| |
|
| |
|
| | def is_archive_split(file): |
| | return bool(re_search(SPLIT_REGEX, file.lower(), I)) |
| |
|
| |
|
| | async def clean_target(opath): |
| | if await aiopath.exists(opath): |
| | LOGGER.info(f"Cleaning Target: {opath}") |
| | try: |
| | if await aiopath.isdir(opath): |
| | await aiormtree(opath, ignore_errors=True) |
| | else: |
| | await remove(opath) |
| | except Exception as e: |
| | LOGGER.error(str(e)) |
| |
|
| |
|
| | async def clean_download(opath): |
| | if await aiopath.exists(opath): |
| | LOGGER.info(f"Cleaning Download: {opath}") |
| | try: |
| | await aiormtree(opath, ignore_errors=True) |
| | except Exception as e: |
| | LOGGER.error(str(e)) |
| |
|
| |
|
| | async def clean_all(): |
| | await TorrentManager.remove_all() |
| | LOGGER.info("Cleaning Download Directory") |
| | await (await create_subprocess_exec("rm", "-rf", DOWNLOAD_DIR)).wait() |
| | await aiomakedirs(DOWNLOAD_DIR, exist_ok=True) |
| |
|
| |
|
| | async def clean_unwanted(opath): |
| | LOGGER.info(f"Cleaning unwanted files/folders: {opath}") |
| | for dirpath, _, files in await sync_to_async(walk, opath, topdown=False): |
| | for filee in files: |
| | f_path = ospath.join(dirpath, filee) |
| | if filee.strip().endswith(".parts") and filee.startswith("."): |
| | await remove(f_path) |
| | if dirpath.strip().endswith(".unwanted"): |
| | await aiormtree(dirpath, ignore_errors=True) |
| | for dirpath, _, files in await sync_to_async(walk, opath, topdown=False): |
| | if not await listdir(dirpath): |
| | await rmdir(dirpath) |
| |
|
| |
|
| | async def check_storage_threshold(size, threshold, io_task=False, alloc=False): |
| | free = (await sync_to_async(disk_usage, DOWNLOAD_DIR)).free |
| | return free >= (threshold + (size * (2 if io_task else 1) if not alloc else 0)) |
| |
|
| |
|
| | async def get_path_size(opath): |
| | total_size = 0 |
| | if await aiopath.isfile(opath): |
| | if await aiopath.islink(opath): |
| | opath = await aioreadlink(opath) |
| | return await aiopath.getsize(opath) |
| | for root, _, files in await sync_to_async(walk, opath): |
| | for f in files: |
| | abs_path = ospath.join(root, f) |
| | if await aiopath.islink(abs_path): |
| | abs_path = await aioreadlink(abs_path) |
| | total_size += await aiopath.getsize(abs_path) |
| | return total_size |
| |
|
| |
|
| | async def count_files_and_folders(opath): |
| | total_files = 0 |
| | total_folders = 0 |
| | for _, dirs, files in await sync_to_async(walk, opath): |
| | total_files += len(files) |
| | total_folders += len(dirs) |
| | return total_folders, total_files |
| |
|
| |
|
| | def get_base_name(orig_path): |
| | extension = next( |
| | (ext for ext in ARCH_EXT if orig_path.strip().lower().endswith(ext)), "" |
| | ) |
| | if extension != "": |
| | return re_split(f"{extension}$", orig_path, maxsplit=1, flags=I)[0] |
| | else: |
| | raise NotSupportedExtractionArchive("File format not supported for extraction") |
| |
|
| |
|
| | async def create_recursive_symlink(source, destination): |
| | if ospath.isdir(source): |
| | await aiomakedirs(destination, exist_ok=True) |
| | for item in await listdir(source): |
| | item_source = ospath.join(source, item) |
| | item_dest = ospath.join(destination, item) |
| | await create_recursive_symlink(item_source, item_dest) |
| | elif ospath.isfile(source): |
| | try: |
| | await symlink(source, destination) |
| | except FileExistsError: |
| | LOGGER.error(f"Shortcut already exists: {destination}") |
| | except Exception as e: |
| | LOGGER.error(f"Error creating shortcut for {source}: {e}") |
| |
|
| |
|
| | def get_mime_type(file_path): |
| | if ospath.islink(file_path): |
| | file_path = readlink(file_path) |
| | mime = Magic(mime=True) |
| | mime_type = mime.from_file(file_path) |
| | mime_type = mime_type or "text/plain" |
| | return mime_type |
| |
|
| |
|
| | async def remove_excluded_files(fpath, ee): |
| | for root, _, files in await sync_to_async(walk, fpath): |
| | if root.strip().endswith("/yt-dlp-thumb"): |
| | continue |
| | for f in files: |
| | if f.strip().lower().endswith(tuple(ee)): |
| | await remove(ospath.join(root, f)) |
| |
|
| |
|
| | async def move_and_merge(source, destination, mid): |
| | if not await aiopath.exists(destination): |
| | await aiomakedirs(destination, exist_ok=True) |
| | for item in await listdir(source): |
| | item = item.strip() |
| | src_path = f"{source}/{item}" |
| | dest_path = f"{destination}/{item}" |
| | if await aiopath.isdir(src_path): |
| | if await aiopath.exists(dest_path): |
| | await move_and_merge(src_path, dest_path, mid) |
| | else: |
| | await move(src_path, dest_path) |
| | else: |
| | if item.endswith((".aria2", ".!qB")): |
| | continue |
| | if await aiopath.exists(dest_path): |
| | dest_path = f"{destination}/{mid}-{item}" |
| | await move(src_path, dest_path) |
| |
|
| |
|
| | async def join_files(opath): |
| | files = await listdir(opath) |
| | results = [] |
| | exists = False |
| | for file_ in files: |
| | if re_search(r"\.0+2$", file_) and await sync_to_async( |
| | get_mime_type, f"{opath}/{file_}" |
| | ) not in ["application/x-7z-compressed", "application/zip"]: |
| | exists = True |
| | final_name = file_.rsplit(".", 1)[0] |
| | fpath = f"{opath}/{final_name}" |
| | cmd = f'cat "{fpath}."* > "{fpath}"' |
| | _, stderr, code = await cmd_exec(cmd, True) |
| | if code != 0: |
| | LOGGER.error(f"Failed to join {final_name}, stderr: {stderr}") |
| | if await aiopath.isfile(fpath): |
| | await remove(fpath) |
| | else: |
| | results.append(final_name) |
| |
|
| | if not exists: |
| | LOGGER.warning("No files to join!") |
| | elif results: |
| | LOGGER.info("Join Completed!") |
| | for res in results: |
| | for file_ in files: |
| | if re_search(rf"{escape(res)}\.0[0-9]+$", file_): |
| | await remove(f"{opath}/{file_}") |
| |
|
| |
|
| | async def split_file(f_path, split_size, listener): |
| | out_path = f"{f_path}." |
| | if listener.is_cancelled: |
| | return False |
| | listener.subproc = await create_subprocess_exec( |
| | "split", |
| | "--numeric-suffixes=1", |
| | "--suffix-length=3", |
| | f"--bytes={split_size}", |
| | f_path, |
| | out_path, |
| | stderr=PIPE, |
| | ) |
| | _, stderr = await listener.subproc.communicate() |
| | code = listener.subproc.returncode |
| | if listener.is_cancelled: |
| | return False |
| | if code == -9: |
| | listener.is_cancelled = True |
| | return False |
| | elif code != 0: |
| | try: |
| | stderr = stderr.decode().strip() |
| | except Exception: |
| | stderr = "Unable to decode the error!" |
| | LOGGER.error(f"{stderr}. Split Document: {f_path}") |
| | return True |
| |
|
| |
|
| | class SevenZ: |
| | def __init__(self, listener): |
| | self._listener = listener |
| | self._processed_bytes = 0 |
| | self._percentage = "0%" |
| |
|
| | @property |
| | def processed_bytes(self): |
| | return self._processed_bytes |
| |
|
| | @property |
| | def progress(self): |
| | return self._percentage |
| |
|
| | async def _sevenz_progress(self): |
| | pattern = r"(\d+)\s+bytes|Total Physical Size\s*=\s*(\d+)" |
| | while not ( |
| | self._listener.subproc.returncode is not None |
| | or self._listener.is_cancelled |
| | or self._listener.subproc.stdout.at_eof() |
| | ): |
| | try: |
| | line = await wait_for(self._listener.subproc.stdout.readline(), 2) |
| | except Exception: |
| | break |
| | line = line.decode().strip() |
| | if match := re_search(pattern, line): |
| | self._listener.subsize = int(match[1] or match[2]) |
| | await sleep(0.05) |
| | s = b"" |
| | while not ( |
| | self._listener.is_cancelled |
| | or self._listener.subproc.returncode is not None |
| | or self._listener.subproc.stdout.at_eof() |
| | ): |
| | try: |
| | char = await wait_for(self._listener.subproc.stdout.read(1), 60) |
| | except Exception: |
| | break |
| | if not char: |
| | break |
| | s += char |
| | if char == b"%": |
| | try: |
| | self._percentage = s.decode().rsplit(" ", 1)[-1].strip() |
| | self._processed_bytes = ( |
| | int(self._percentage.strip("%")) / 100 |
| | ) * self._listener.subsize |
| | except Exception: |
| | self._processed_bytes = 0 |
| | self._percentage = "0%" |
| | s = b"" |
| | await sleep(0.05) |
| |
|
| | self._processed_bytes = 0 |
| | self._percentage = "0%" |
| |
|
| | async def extract(self, f_path, t_path, pswd): |
| | cmd = [ |
| | "7z", |
| | "x", |
| | f"-p{pswd}", |
| | f_path, |
| | f"-o{t_path}", |
| | "-aot", |
| | "-xr!@PaxHeader", |
| | "-bsp1", |
| | "-bse1", |
| | "-bb3", |
| | ] |
| | if not pswd: |
| | del cmd[2] |
| | if self._listener.is_cancelled: |
| | return False |
| | self._listener.subproc = await create_subprocess_exec( |
| | *cmd, |
| | stdout=PIPE, |
| | stderr=PIPE, |
| | ) |
| | await self._sevenz_progress() |
| | _, stderr = await self._listener.subproc.communicate() |
| | code = self._listener.subproc.returncode |
| | if self._listener.is_cancelled: |
| | return False |
| | if code == -9: |
| | self._listener.is_cancelled = True |
| | return False |
| | elif code != 0: |
| | try: |
| | stderr = stderr.decode().strip() |
| | except Exception: |
| | stderr = "Unable to decode the error!" |
| | LOGGER.error(f"{stderr}. Unable to extract archive!. Path: {f_path}") |
| | return code |
| |
|
| | async def zip(self, dl_path, up_path, pswd): |
| | size = await get_path_size(dl_path) |
| | if self._listener.equal_splits: |
| | parts = -(-size // self._listener.split_size) |
| | split_size = (size // parts) + (size % parts) |
| | else: |
| | split_size = self._listener.split_size |
| | cmd = [ |
| | "7z", |
| | f"-v{split_size}b", |
| | "a", |
| | "-mx=0", |
| | f"-p{pswd}", |
| | up_path, |
| | dl_path, |
| | "-bsp1", |
| | "-bse1", |
| | "-bb3", |
| | ] |
| | if self._listener.is_leech and int(size) > self._listener.split_size: |
| | if not pswd: |
| | del cmd[4] |
| | LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}.0*") |
| | else: |
| | del cmd[1] |
| | if not pswd: |
| | del cmd[3] |
| | LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}") |
| | if self._listener.is_cancelled: |
| | return False |
| | self._listener.subproc = await create_subprocess_exec( |
| | *cmd, stdout=PIPE, stderr=PIPE |
| | ) |
| | await self._sevenz_progress() |
| | _, stderr = await self._listener.subproc.communicate() |
| | code = self._listener.subproc.returncode |
| | if self._listener.is_cancelled: |
| | return False |
| | if code == -9: |
| | self._listener.is_cancelled = True |
| | return False |
| | elif code == 0: |
| | await clean_target(dl_path) |
| | return up_path |
| | else: |
| | if await aiopath.exists(up_path): |
| | await remove(up_path) |
| | try: |
| | stderr = stderr.decode().strip() |
| | except Exception: |
| | stderr = "Unable to decode the error!" |
| | LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}") |
| | return dl_path |
| |
|