dragxd's picture
Initial commit: Push project to Hugging Face
db78256
from asyncio import create_subprocess_exec, gather, sleep, wait_for
from asyncio.subprocess import PIPE
from configparser import RawConfigParser
from json import loads
from logging import getLogger
from random import randrange
from re import findall as re_findall
from aiofiles import open as aiopen
from aiofiles.os import listdir, makedirs, path as aiopath
from contextlib import suppress
from ....core.config_manager import Config, BinConfig
from ...ext_utils.bot_utils import cmd_exec, sync_to_async
from ...ext_utils.files_utils import (
count_files_and_folders,
get_mime_type,
)
LOGGER = getLogger(__name__)
class RcloneTransferHelper:
def __init__(self, listener):
self._listener = listener
self._proc = None
self._transferred_size = "0 B"
self._eta = "-"
self._percentage = "0%"
self._speed = "0 B/s"
self._size = "0 B"
self._is_download = False
self._is_upload = False
self._sa_count = 1
self._sa_index = 0
self._sa_number = 0
self._use_service_accounts = Config.USE_SERVICE_ACCOUNTS
self._rclone_select = False
@property
def transferred_size(self):
return self._transferred_size
@property
def percentage(self):
return self._percentage
@property
def speed(self):
return self._speed
@property
def eta(self):
return self._eta
@property
def size(self):
return self._size
async def _progress(self):
while not (
self._proc.returncode is not None
or self._proc.stdout.at_eof()
or self._listener.is_cancelled
):
try:
data = await wait_for(self._proc.stdout.readline(), 60)
except Exception:
break
if not data:
break
data = data.decode().strip()
if data := re_findall(
r"Transferred:\s+([\d.]+\s*\w+)\s+/\s+([\d.]+\s*\w+),\s+([\d.]+%)\s*,\s+([\d.]+\s*\w+/s),\s+ETA\s+([\dwdhms]+)",
data,
):
(
self._transferred_size,
self._size,
self._percentage,
self._speed,
self._eta,
) = data[0]
await sleep(0.05)
def _switch_service_account(self):
if self._sa_index == self._sa_number - 1:
self._sa_index = 0
else:
self._sa_index += 1
self._sa_count += 1
remote = f"sa{self._sa_index:03}"
LOGGER.info(f"Switching to {remote} remote")
return remote
async def _create_rc_sa(self, remote, remote_opts):
sa_conf_dir = "rclone_sa"
sa_conf_file = f"{sa_conf_dir}/{remote}.conf"
if await aiopath.isfile(sa_conf_file):
return sa_conf_file
await makedirs(sa_conf_dir, exist_ok=True)
if gd_id := remote_opts.get("team_drive"):
option = "team_drive"
elif gd_id := remote_opts.get("root_folder_id"):
option = "root_folder_id"
else:
self._use_service_accounts = False
return "rclone.conf"
files = await listdir("accounts")
text = "".join(
f"[sa{i:03}]\ntype = drive\nscope = drive\nservice_account_file = accounts/{sa}\n{option} = {gd_id}\n\n"
for i, sa in enumerate(files)
)
async with aiopen(sa_conf_file, "w") as f:
await f.write(text)
return sa_conf_file
async def _start_download(self, cmd, remote_type):
self._proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
await self._progress()
_, stderr = await self._proc.communicate()
return_code = self._proc.returncode
if self._listener.is_cancelled:
return
if return_code == 0:
await self._listener.on_download_complete()
elif return_code != -9:
error = stderr.decode().strip()
if not error and remote_type == "drive" and self._use_service_accounts:
error = "Mostly your service accounts don't have access to this drive!"
LOGGER.error(error)
if (
self._sa_number != 0
and remote_type == "drive"
and "RATE_LIMIT_EXCEEDED" in error
and self._use_service_accounts
):
if self._sa_count < self._sa_number:
remote = self._switch_service_account()
cmd[6] = f"{remote}:{cmd[6].split(':', 1)[1]}"
if self._listener.is_cancelled:
return
return await self._start_download(cmd, remote_type)
else:
LOGGER.info(
f"Reached maximum number of service accounts switching, which is {self._sa_count}"
)
await self._listener.on_download_error(error[:4000])
return
async def download(self, remote, config_path, path):
self._is_download = True
try:
remote_opts = await self._get_remote_options(config_path, remote)
except Exception as err:
await self._listener.on_download_error(str(err))
return
remote_type = remote_opts["type"]
if (
remote_type == "drive"
and self._use_service_accounts
and config_path == "rclone.conf"
and await aiopath.isdir("accounts")
and not remote_opts.get("service_account_file")
):
config_path = await self._create_rc_sa(remote, remote_opts)
if config_path != "rclone.conf":
sa_files = await listdir("accounts")
self._sa_number = len(sa_files)
self._sa_index = randrange(self._sa_number)
remote = f"sa{self._sa_index:03}"
LOGGER.info(f"Download with service account {remote}")
cmd = self._get_updated_command(
config_path, f"{remote}:{self._listener.link}", path, "copy"
)
if remote_type == "drive" and not self._listener.rc_flags:
cmd.extend(
(
"--drive-acknowledge-abuse",
"--drive-chunk-size",
"128M",
"--tpslimit",
"1",
"--tpslimit-burst",
"1",
"--transfers",
"1",
)
)
await self._start_download(cmd, remote_type)
async def _get_gdrive_link(self, config_path, destination, mime_type):
epath = destination.rsplit("/", 1)[0] if mime_type == "Folder" else destination
cmd = [
BinConfig.RCLONE_NAME,
"lsjson",
"--fast-list",
"--no-mimetype",
"--no-modtime",
"--config",
config_path,
epath,
]
res, err, code = await cmd_exec(cmd)
if code == 0:
result = loads(res)
fid = next(
(r["ID"] for r in result if r["Path"] == self._listener.name), "err"
)
link = (
f"https://drive.google.com/drive/folders/{fid}"
if mime_type == "Folder"
else f"https://drive.google.com/uc?id={fid}&export=download"
)
elif code != -9:
LOGGER.error(
f"while getting drive link. Path: {destination}. Stderr: {err}"
)
link = ""
return link
async def _start_upload(self, cmd, remote_type):
self._proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
await self._progress()
_, stderr = await self._proc.communicate()
return_code = self._proc.returncode
if self._listener.is_cancelled:
return False
if return_code == -9:
return False
elif return_code == 0:
return True
else:
error = stderr.decode().strip()
LOGGER.error(error)
if (
self._sa_number != 0
and remote_type == "drive"
and "RATE_LIMIT_EXCEEDED" in error
and self._use_service_accounts
):
if self._sa_count < self._sa_number:
remote = self._switch_service_account()
cmd[7] = f"{remote}:{cmd[7].split(':', 1)[1]}"
return (
False
if self._listener.is_cancelled
else await self._start_upload(cmd, remote_type)
)
else:
LOGGER.info(
f"Reached maximum number of service accounts switching, which is {self._sa_count}"
)
await self._listener.on_upload_error(error[:4000])
return False
async def upload(self, path):
self._is_upload = True
rc_path = self._listener.up_dest
if rc_path.startswith("mrcc:"):
rc_path = rc_path.split("mrcc:", 1)[1]
oconfig_path = f"rclone/{self._listener.user_id}.conf"
else:
oconfig_path = "rclone.conf"
oremote, rc_path = rc_path.split(":", 1)
if await aiopath.isdir(path):
mime_type = "Folder"
folders, files = await count_files_and_folders(path)
rc_path += f"/{self._listener.name}" if rc_path else self._listener.name
else:
mime_type = await sync_to_async(get_mime_type, path)
folders = 0
files = 1
try:
remote_opts = await self._get_remote_options(oconfig_path, oremote)
except Exception as err:
await self._listener.on_upload_error(str(err))
return
remote_type = remote_opts["type"]
fremote = oremote
fconfig_path = oconfig_path
if (
remote_type == "drive"
and self._use_service_accounts
and fconfig_path == "rclone.conf"
and await aiopath.isdir("accounts")
and not remote_opts.get("service_account_file")
):
fconfig_path = await self._create_rc_sa(oremote, remote_opts)
if fconfig_path != "rclone.conf":
sa_files = await listdir("accounts")
self._sa_number = len(sa_files)
self._sa_index = randrange(self._sa_number)
fremote = f"sa{self._sa_index:03}"
LOGGER.info(f"Upload with service account {fremote}")
cmd = self._get_updated_command(
fconfig_path, path, f"{fremote}:{rc_path}", "move"
)
if remote_type == "drive" and not self._listener.rc_flags:
cmd.extend(
(
"--tpslimit",
"1",
"--tpslimit-burst",
"1",
"--transfers",
"1",
)
)
result = await self._start_upload(cmd, remote_type)
if not result:
return
if mime_type == "Folder":
destination = f"{oremote}:{rc_path}"
elif rc_path:
destination = f"{oremote}:{rc_path}/{self._listener.name}"
else:
destination = f"{oremote}:{self._listener.name}"
if remote_type == "drive":
link = await self._get_gdrive_link(oconfig_path, destination, mime_type)
else:
cmd = [
BinConfig.RCLONE_NAME,
"link",
"--config",
oconfig_path,
destination,
]
res, err, code = await cmd_exec(cmd)
if code == 0:
link = res
elif code != -9:
LOGGER.error(f"while getting link. Path: {destination} | Stderr: {err}")
link = ""
if self._listener.is_cancelled:
return
LOGGER.info(f"Upload Done. Path: {destination}")
await self._listener.on_upload_complete(
link, files, folders, mime_type, destination
)
return
async def clone(self, config_path, src_remote, src_path, mime_type, method):
destination = self._listener.up_dest
dst_remote, dst_path = destination.split(":", 1)
try:
src_remote_opts, dst_remote_opt = await gather(
self._get_remote_options(config_path, src_remote),
self._get_remote_options(config_path, dst_remote),
)
except Exception as err:
await self._listener.on_upload_error(str(err))
return None, None
src_remote_type, dst_remote_type = (
src_remote_opts["type"],
dst_remote_opt["type"],
)
cmd = self._get_updated_command(
config_path, f"{src_remote}:{src_path}", destination, method
)
if not self._listener.rc_flags and src_remote_type == "drive":
cmd.extend(
(
"--drive-acknowledge-abuse",
"--tpslimit",
"3",
"--tpslimit-burst",
"1",
"--transfers",
"3",
)
)
self._proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
await self._progress()
_, stderr = await self._proc.communicate()
return_code = self._proc.returncode
if self._listener.is_cancelled:
return None, None
if return_code == -9:
return None, None
elif return_code == 0:
if mime_type != "Folder":
destination += (
f"/{self._listener.name}" if dst_path else self._listener.name
)
if dst_remote_type == "drive":
link = await self._get_gdrive_link(config_path, destination, mime_type)
return (
(None, None) if self._listener.is_cancelled else (link, destination)
)
else:
cmd = [
BinConfig.RCLONE_NAME,
"link",
"--config",
config_path,
destination,
]
res, err, code = await cmd_exec(cmd)
if self._listener.is_cancelled:
return None, None
if code == 0:
return res, destination
elif code != -9:
LOGGER.error(
f"while getting link. Path: {destination} | Stderr: {err}"
)
return None, destination
else:
error = stderr.decode().strip()
LOGGER.error(error)
await self._listener.on_upload_error(error[:4000])
return None, None
def _get_updated_command(
self,
config_path,
source,
destination,
method,
):
if source.split(":")[-1].startswith("rclone_select"):
source = f"{source.split(':')[0]}:"
self._rclone_select = True
else:
ext = "*.{" + ",".join(self._listener.excluded_extensions) + "}"
cmd = [
BinConfig.RCLONE_NAME,
method,
"--fast-list",
"--config",
config_path,
"-P",
source,
destination,
"-L",
"--retries-sleep",
"3s",
"--ignore-case",
"--low-level-retries",
"1",
"-M",
]
if self._rclone_select:
cmd.extend(("--files-from", self._listener.link))
else:
cmd.extend(("--exclude", ext))
if rcflags := self._listener.rc_flags:
rcflags = rcflags.split("|")
for flag in rcflags:
if ":" in flag:
key, value = map(str.strip, flag.split(":", 1))
cmd.extend((key, value))
elif len(flag) > 0:
cmd.append(flag.strip())
return cmd
@staticmethod
async def _get_remote_options(config_path, remote):
config = RawConfigParser()
async with aiopen(config_path, "r") as f:
contents = await f.read()
config.read_string(contents)
options = config.options(remote)
return {opt: config.get(remote, opt) for opt in options}
async def cancel_task(self):
self._listener.is_cancelled = True
if self._proc is not None:
with suppress(Exception):
self._proc.kill()
if self._is_download:
LOGGER.info(f"Cancelling Download: {self._listener.name}")
await self._listener.on_download_error("Stopped by user!")
elif self._is_upload:
LOGGER.info(f"Cancelling Upload: {self._listener.name}")
await self._listener.on_upload_error("your upload has been stopped!")
else:
LOGGER.info(f"Cancelling Clone: {self._listener.name}")
await self._listener.on_upload_error("your clone has been stopped!")