leech / bot /helper /ext_utils /files_utils.py
dragxd's picture
Initial commit: Push project to Hugging Face
db78256
from aioshutil import rmtree as aiormtree, move
from asyncio import create_subprocess_exec, sleep, wait_for
from asyncio.subprocess import PIPE
from contextlib import suppress
from psutil import disk_usage
from os import path as ospath, readlink, walk
from re import I, escape, search as re_search, split as re_split
from aiofiles.os import (
listdir,
remove,
rmdir,
symlink,
makedirs as aiomakedirs,
path as aiopath,
readlink as aioreadlink,
)
from magic import Magic
from ... import DOWNLOAD_DIR, LOGGER
from ...core.torrent_manager import TorrentManager
from .bot_utils import cmd_exec, sync_to_async
from .exceptions import NotSupportedExtractionArchive
ARCH_EXT = [
".tar.bz2",
".tar.gz",
".bz2",
".gz",
".tar.xz",
".tar",
".tbz2",
".tgz",
".lzma2",
".zip",
".7z",
".z",
".rar",
".iso",
".wim",
".cab",
".apm",
".arj",
".chm",
".cpio",
".cramfs",
".deb",
".dmg",
".fat",
".hfs",
".lzh",
".lzma",
".mbr",
".msi",
".mslz",
".nsis",
".ntfs",
".rpm",
".squashfs",
".udf",
".vhd",
".xar",
".zst",
".zstd",
".cbz",
".apfs",
".ar",
".qcow",
".macho",
".exe",
".dll",
".sys",
".pmd",
".swf",
".swfc",
".simg",
".vdi",
".vhdx",
".vmdk",
".gzip",
".lzma86",
".sha256",
".sha512",
".sha224",
".sha384",
".sha1",
".md5",
".crc32",
".crc64",
]
FIRST_SPLIT_REGEX = (
r"\.part0*1\.rar$|\.7z\.0*1$|\.zip\.0*1$|^(?!.*\.part\d+\.rar$).*\.rar$"
)
SPLIT_REGEX = r"\.r\d+$|\.7z\.\d+$|\.z\d+$|\.zip\.\d+$|\.part\d+\.rar$"
def is_first_archive_split(file):
return bool(re_search(FIRST_SPLIT_REGEX, file.lower(), I))
def is_archive(file):
return file.strip().lower().endswith(tuple(ARCH_EXT))
def is_archive_split(file):
return bool(re_search(SPLIT_REGEX, file.lower(), I))
async def clean_target(opath):
if await aiopath.exists(opath):
LOGGER.info(f"Cleaning Target: {opath}")
try:
if await aiopath.isdir(opath):
await aiormtree(opath, ignore_errors=True)
else:
await remove(opath)
except Exception as e:
LOGGER.error(str(e))
async def clean_download(opath):
if await aiopath.exists(opath):
LOGGER.info(f"Cleaning Download: {opath}")
try:
await aiormtree(opath, ignore_errors=True)
except Exception as e:
LOGGER.error(str(e))
async def clean_all():
await TorrentManager.remove_all()
LOGGER.info("Cleaning Download Directory")
await (await create_subprocess_exec("rm", "-rf", DOWNLOAD_DIR)).wait()
await aiomakedirs(DOWNLOAD_DIR, exist_ok=True)
async def clean_unwanted(opath):
LOGGER.info(f"Cleaning unwanted files/folders: {opath}")
for dirpath, _, files in await sync_to_async(walk, opath, topdown=False):
for filee in files:
f_path = ospath.join(dirpath, filee)
if filee.strip().endswith(".parts") and filee.startswith("."):
await remove(f_path)
if dirpath.strip().endswith(".unwanted"):
await aiormtree(dirpath, ignore_errors=True)
for dirpath, _, files in await sync_to_async(walk, opath, topdown=False):
if not await listdir(dirpath):
await rmdir(dirpath)
async def check_storage_threshold(size, threshold, io_task=False, alloc=False):
free = (await sync_to_async(disk_usage, DOWNLOAD_DIR)).free
return free >= (threshold + (size * (2 if io_task else 1) if not alloc else 0))
async def get_path_size(opath):
total_size = 0
if await aiopath.isfile(opath):
if await aiopath.islink(opath):
opath = await aioreadlink(opath)
return await aiopath.getsize(opath)
for root, _, files in await sync_to_async(walk, opath):
for f in files:
abs_path = ospath.join(root, f)
if await aiopath.islink(abs_path):
abs_path = await aioreadlink(abs_path)
total_size += await aiopath.getsize(abs_path)
return total_size
async def count_files_and_folders(opath):
total_files = 0
total_folders = 0
for _, dirs, files in await sync_to_async(walk, opath):
total_files += len(files)
total_folders += len(dirs)
return total_folders, total_files
def get_base_name(orig_path):
extension = next(
(ext for ext in ARCH_EXT if orig_path.strip().lower().endswith(ext)), ""
)
if extension != "":
return re_split(f"{extension}$", orig_path, maxsplit=1, flags=I)[0]
else:
raise NotSupportedExtractionArchive("File format not supported for extraction")
async def create_recursive_symlink(source, destination):
if ospath.isdir(source):
await aiomakedirs(destination, exist_ok=True)
for item in await listdir(source):
item_source = ospath.join(source, item)
item_dest = ospath.join(destination, item)
await create_recursive_symlink(item_source, item_dest)
elif ospath.isfile(source):
try:
await symlink(source, destination)
except FileExistsError:
LOGGER.error(f"Shortcut already exists: {destination}")
except Exception as e:
LOGGER.error(f"Error creating shortcut for {source}: {e}")
def get_mime_type(file_path):
if ospath.islink(file_path):
file_path = readlink(file_path)
mime = Magic(mime=True)
mime_type = mime.from_file(file_path)
mime_type = mime_type or "text/plain"
return mime_type
async def remove_excluded_files(fpath, ee):
for root, _, files in await sync_to_async(walk, fpath):
if root.strip().endswith("/yt-dlp-thumb"):
continue
for f in files:
if f.strip().lower().endswith(tuple(ee)):
await remove(ospath.join(root, f))
async def move_and_merge(source, destination, mid):
if not await aiopath.exists(destination):
await aiomakedirs(destination, exist_ok=True)
for item in await listdir(source):
item = item.strip()
src_path = f"{source}/{item}"
dest_path = f"{destination}/{item}"
if await aiopath.isdir(src_path):
if await aiopath.exists(dest_path):
await move_and_merge(src_path, dest_path, mid)
else:
await move(src_path, dest_path)
else:
if item.endswith((".aria2", ".!qB")):
continue
if await aiopath.exists(dest_path):
dest_path = f"{destination}/{mid}-{item}"
await move(src_path, dest_path)
async def join_files(opath):
files = await listdir(opath)
results = []
exists = False
for file_ in files:
if re_search(r"\.0+2$", file_) and await sync_to_async(
get_mime_type, f"{opath}/{file_}"
) not in ["application/x-7z-compressed", "application/zip"]:
exists = True
final_name = file_.rsplit(".", 1)[0]
fpath = f"{opath}/{final_name}"
cmd = f'cat "{fpath}."* > "{fpath}"'
_, stderr, code = await cmd_exec(cmd, True)
if code != 0:
LOGGER.error(f"Failed to join {final_name}, stderr: {stderr}")
if await aiopath.isfile(fpath):
await remove(fpath)
else:
results.append(final_name)
if not exists:
LOGGER.warning("No files to join!")
elif results:
LOGGER.info("Join Completed!")
for res in results:
for file_ in files:
if re_search(rf"{escape(res)}\.0[0-9]+$", file_):
await remove(f"{opath}/{file_}")
async def split_file(f_path, split_size, listener):
out_path = f"{f_path}."
if listener.is_cancelled:
return False
listener.subproc = await create_subprocess_exec(
"split",
"--numeric-suffixes=1",
"--suffix-length=3",
f"--bytes={split_size}",
f_path,
out_path,
stderr=PIPE,
)
_, stderr = await listener.subproc.communicate()
code = listener.subproc.returncode
if listener.is_cancelled:
return False
if code == -9:
listener.is_cancelled = True
return False
elif code != 0:
try:
stderr = stderr.decode().strip()
except Exception:
stderr = "Unable to decode the error!"
LOGGER.error(f"{stderr}. Split Document: {f_path}")
return True
class SevenZ:
def __init__(self, listener):
self._listener = listener
self._processed_bytes = 0
self._percentage = "0%"
@property
def processed_bytes(self):
return self._processed_bytes
@property
def progress(self):
return self._percentage
async def _sevenz_progress(self):
pattern = r"(\d+)\s+bytes|Total Physical Size\s*=\s*(\d+)"
while not (
self._listener.subproc.returncode is not None
or self._listener.is_cancelled
or self._listener.subproc.stdout.at_eof()
):
try:
line = await wait_for(self._listener.subproc.stdout.readline(), 2)
except Exception:
break
line = line.decode().strip()
if match := re_search(pattern, line):
self._listener.subsize = int(match[1] or match[2])
await sleep(0.05)
s = b""
while not (
self._listener.is_cancelled
or self._listener.subproc.returncode is not None
or self._listener.subproc.stdout.at_eof()
):
try:
char = await wait_for(self._listener.subproc.stdout.read(1), 60)
except Exception:
break
if not char:
break
s += char
if char == b"%":
try:
self._percentage = s.decode().rsplit(" ", 1)[-1].strip()
self._processed_bytes = (
int(self._percentage.strip("%")) / 100
) * self._listener.subsize
except Exception:
self._processed_bytes = 0
self._percentage = "0%"
s = b""
await sleep(0.05)
self._processed_bytes = 0
self._percentage = "0%"
async def extract(self, f_path, t_path, pswd):
cmd = [
"7z",
"x",
f"-p{pswd}",
f_path,
f"-o{t_path}",
"-aot",
"-xr!@PaxHeader",
"-bsp1",
"-bse1",
"-bb3",
]
if not pswd:
del cmd[2]
if self._listener.is_cancelled:
return False
self._listener.subproc = await create_subprocess_exec(
*cmd,
stdout=PIPE,
stderr=PIPE,
)
await self._sevenz_progress()
_, stderr = await self._listener.subproc.communicate()
code = self._listener.subproc.returncode
if self._listener.is_cancelled:
return False
if code == -9:
self._listener.is_cancelled = True
return False
elif code != 0:
try:
stderr = stderr.decode().strip()
except Exception:
stderr = "Unable to decode the error!"
LOGGER.error(f"{stderr}. Unable to extract archive!. Path: {f_path}")
return code
async def zip(self, dl_path, up_path, pswd):
size = await get_path_size(dl_path)
if self._listener.equal_splits:
parts = -(-size // self._listener.split_size)
split_size = (size // parts) + (size % parts)
else:
split_size = self._listener.split_size
cmd = [
"7z",
f"-v{split_size}b",
"a",
"-mx=0",
f"-p{pswd}",
up_path,
dl_path,
"-bsp1",
"-bse1",
"-bb3",
]
if self._listener.is_leech and int(size) > self._listener.split_size:
if not pswd:
del cmd[4]
LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}.0*")
else:
del cmd[1]
if not pswd:
del cmd[3]
LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}")
if self._listener.is_cancelled:
return False
self._listener.subproc = await create_subprocess_exec(
*cmd, stdout=PIPE, stderr=PIPE
)
await self._sevenz_progress()
_, stderr = await self._listener.subproc.communicate()
code = self._listener.subproc.returncode
if self._listener.is_cancelled:
return False
if code == -9:
self._listener.is_cancelled = True
return False
elif code == 0:
await clean_target(dl_path)
return up_path
else:
if await aiopath.exists(up_path):
await remove(up_path)
try:
stderr = stderr.decode().strip()
except Exception:
stderr = "Unable to decode the error!"
LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}")
return dl_path