|
|
from asyncio import Event |
|
|
from time import time |
|
|
|
|
|
from ... import ( |
|
|
LOGGER, |
|
|
bot_cache, |
|
|
non_queued_dl, |
|
|
non_queued_up, |
|
|
queue_dict_lock, |
|
|
queued_dl, |
|
|
queued_up, |
|
|
user_data, |
|
|
) |
|
|
from ...core.config_manager import Config |
|
|
from ..mirror_leech_utils.gdrive_utils.search import GoogleDriveSearch |
|
|
from ..telegram_helper.filters import CustomFilters |
|
|
from ..telegram_helper.tg_utils import check_botpm, forcesub, verify_token |
|
|
from .bot_utils import get_telegraph_list, sync_to_async, safe_int |
|
|
from .files_utils import get_base_name, check_storage_threshold |
|
|
from .links_utils import is_gdrive_id |
|
|
from .status_utils import get_readable_time, get_readable_file_size, get_specific_tasks |
|
|
|
|
|
|
|
|
async def stop_duplicate_check(listener): |
|
|
if ( |
|
|
isinstance(listener.up_dest, int) |
|
|
or listener.is_leech |
|
|
or listener.select |
|
|
or not is_gdrive_id(listener.up_dest) |
|
|
or (listener.up_dest.startswith("mtp:") and listener.stop_duplicate) |
|
|
or not listener.stop_duplicate |
|
|
or listener.same_dir |
|
|
): |
|
|
return False, None |
|
|
|
|
|
name = listener.name |
|
|
LOGGER.info(f"Checking File/Folder if already in Drive: {name}") |
|
|
|
|
|
if listener.compress: |
|
|
name = f"{name}.zip" |
|
|
elif listener.extract: |
|
|
try: |
|
|
name = get_base_name(name) |
|
|
except Exception: |
|
|
name = None |
|
|
|
|
|
if name is not None: |
|
|
telegraph_content, contents_no = await sync_to_async( |
|
|
GoogleDriveSearch(stop_dup=True, no_multi=listener.is_clone).drive_list, |
|
|
name, |
|
|
listener.up_dest, |
|
|
listener.user_id, |
|
|
) |
|
|
if telegraph_content: |
|
|
msg = f"File/Folder is already available in Drive.\nHere are {contents_no} list results:" |
|
|
button = await get_telegraph_list(telegraph_content) |
|
|
return msg, button |
|
|
|
|
|
return False, None |
|
|
|
|
|
|
|
|
async def check_running_tasks(listener, state="dl"): |
|
|
all_limit = safe_int(Config.QUEUE_ALL) |
|
|
state_limit = ( |
|
|
safe_int(Config.QUEUE_DOWNLOAD) |
|
|
if state == "dl" |
|
|
else safe_int(Config.QUEUE_UPLOAD) |
|
|
) |
|
|
event = None |
|
|
is_over_limit = False |
|
|
async with queue_dict_lock: |
|
|
if state == "up" and listener.mid in non_queued_dl: |
|
|
non_queued_dl.remove(listener.mid) |
|
|
if ( |
|
|
(all_limit or state_limit) |
|
|
and not listener.force_run |
|
|
and not (listener.force_upload and state == "up") |
|
|
and not (listener.force_download and state == "dl") |
|
|
): |
|
|
dl_count = len(non_queued_dl) |
|
|
up_count = len(non_queued_up) |
|
|
t_count = dl_count if state == "dl" else up_count |
|
|
is_over_limit = ( |
|
|
all_limit |
|
|
and dl_count + up_count >= all_limit |
|
|
and (not state_limit or t_count >= state_limit) |
|
|
) or (state_limit and t_count >= state_limit) |
|
|
if is_over_limit: |
|
|
event = Event() |
|
|
if state == "dl": |
|
|
queued_dl[listener.mid] = event |
|
|
else: |
|
|
queued_up[listener.mid] = event |
|
|
if not is_over_limit: |
|
|
if state == "up": |
|
|
non_queued_up.add(listener.mid) |
|
|
else: |
|
|
non_queued_dl.add(listener.mid) |
|
|
|
|
|
return is_over_limit, event |
|
|
|
|
|
|
|
|
async def start_dl_from_queued(mid: int): |
|
|
queued_dl[mid].set() |
|
|
del queued_dl[mid] |
|
|
non_queued_dl.add(mid) |
|
|
|
|
|
|
|
|
async def start_up_from_queued(mid: int): |
|
|
queued_up[mid].set() |
|
|
del queued_up[mid] |
|
|
non_queued_up.add(mid) |
|
|
|
|
|
|
|
|
async def start_from_queued(): |
|
|
if all_limit := safe_int(Config.QUEUE_ALL): |
|
|
dl_limit = safe_int(Config.QUEUE_DOWNLOAD) |
|
|
up_limit = safe_int(Config.QUEUE_UPLOAD) |
|
|
async with queue_dict_lock: |
|
|
dl = len(non_queued_dl) |
|
|
up = len(non_queued_up) |
|
|
all_ = dl + up |
|
|
if all_ < all_limit: |
|
|
f_tasks = all_limit - all_ |
|
|
if queued_up and (not up_limit or up < up_limit): |
|
|
for index, mid in enumerate(list(queued_up.keys()), start=1): |
|
|
await start_up_from_queued(mid) |
|
|
f_tasks -= 1 |
|
|
if f_tasks == 0 or (up_limit and index >= up_limit - up): |
|
|
break |
|
|
if queued_dl and (not dl_limit or dl < dl_limit) and f_tasks != 0: |
|
|
for index, mid in enumerate(list(queued_dl.keys()), start=1): |
|
|
await start_dl_from_queued(mid) |
|
|
if (dl_limit and index >= dl_limit - dl) or index == f_tasks: |
|
|
break |
|
|
return |
|
|
|
|
|
if up_limit := Config.QUEUE_UPLOAD: |
|
|
async with queue_dict_lock: |
|
|
up = len(non_queued_up) |
|
|
if queued_up and up < up_limit: |
|
|
f_tasks = up_limit - up |
|
|
for index, mid in enumerate(list(queued_up.keys()), start=1): |
|
|
await start_up_from_queued(mid) |
|
|
if index == f_tasks: |
|
|
break |
|
|
else: |
|
|
async with queue_dict_lock: |
|
|
if queued_up: |
|
|
for mid in list(queued_up.keys()): |
|
|
await start_up_from_queued(mid) |
|
|
|
|
|
if dl_limit := Config.QUEUE_DOWNLOAD: |
|
|
async with queue_dict_lock: |
|
|
dl = len(non_queued_dl) |
|
|
if queued_dl and dl < dl_limit: |
|
|
f_tasks = dl_limit - dl |
|
|
for index, mid in enumerate(list(queued_dl.keys()), start=1): |
|
|
await start_dl_from_queued(mid) |
|
|
if index == f_tasks: |
|
|
break |
|
|
else: |
|
|
async with queue_dict_lock: |
|
|
if queued_dl: |
|
|
for mid in list(queued_dl.keys()): |
|
|
await start_dl_from_queued(mid) |
|
|
|
|
|
|
|
|
async def limit_checker(listener, yt_playlist=0): |
|
|
LOGGER.info("Checking Size Limit...") |
|
|
if await CustomFilters.sudo("", listener.message): |
|
|
LOGGER.info("SUDO User. Skipping Size Limit...") |
|
|
return |
|
|
|
|
|
user_id, size = listener.user_id, listener.size |
|
|
|
|
|
async def recurr_limits(limits): |
|
|
nonlocal yt_playlist, size |
|
|
limit_exceeded = "" |
|
|
for condition, attr, name in limits: |
|
|
if condition and (limit := getattr(Config, attr, 0)): |
|
|
if attr == "PLAYLIST_LIMIT": |
|
|
if yt_playlist >= limit: |
|
|
limit_exceeded = f"┠ <b>{name} Limit Count</b> → {limit}" |
|
|
else: |
|
|
byte_limit = limit * 1024**3 |
|
|
if size >= byte_limit: |
|
|
limit_exceeded = f"┠ <b>{name} Limit</b> → {get_readable_file_size(byte_limit)}" |
|
|
|
|
|
LOGGER.info( |
|
|
f"{name} Limit Breached: {listener.name} & Size: {get_readable_file_size(size)}" |
|
|
) |
|
|
break |
|
|
return limit_exceeded |
|
|
|
|
|
limits = [ |
|
|
(listener.is_torrent or listener.is_qbit, "TORRENT_LIMIT", "Torrent"), |
|
|
(listener.is_mega, "MEGA_LIMIT", "Mega"), |
|
|
(listener.is_gdrive, "GD_DL_LIMIT", "GDriveDL"), |
|
|
(listener.is_clone, "CLONE_LIMIT", "Clone"), |
|
|
(listener.is_jd, "JD_LIMIT", "JDownloader"), |
|
|
(listener.is_nzb, "NZB_LIMIT", "SABnzbd"), |
|
|
(listener.is_rclone, "RC_DL_LIMIT", "RCloneDL"), |
|
|
(listener.is_ytdlp, "YTDLP_LIMIT", "YT-DLP"), |
|
|
(bool(yt_playlist), "PLAYLIST_LIMIT", "Playlist"), |
|
|
(True, "DIRECT_LIMIT", "Direct"), |
|
|
] |
|
|
limit_exceeded = await recurr_limits(limits) |
|
|
|
|
|
if not limit_exceeded: |
|
|
extra_limits = [ |
|
|
(listener.is_leech, "LEECH_LIMIT", "Leech"), |
|
|
(listener.compress, "ARCHIVE_LIMIT", "Archive"), |
|
|
(listener.extract, "EXTRACT_LIMIT", "Extract"), |
|
|
] |
|
|
limit_exceeded = await recurr_limits(extra_limits) |
|
|
|
|
|
if Config.STORAGE_LIMIT and not listener.is_clone: |
|
|
limit = Config.STORAGE_LIMIT * 1024**3 |
|
|
if not await check_storage_threshold( |
|
|
size, limit, any([listener.compress, listener.extract]) |
|
|
): |
|
|
limit_exceeded = f"┠ <b>Threshold Storage Limit</b> → {get_readable_file_size(limit)}" |
|
|
|
|
|
if limit_exceeded: |
|
|
return limit_exceeded + f"\n┖ <b>Task By</b> → {listener.tag}" |
|
|
|
|
|
|
|
|
""" |
|
|
class UsageChecks: # TODO: Dynamic Check for All Task |
|
|
|
|
|
class DailyUsageChecks: |
|
|
""" |
|
|
|
|
|
|
|
|
async def user_interval_check(user_id): |
|
|
bot_cache.setdefault("time_interval", {}) |
|
|
if (time_interval := bot_cache["time_interval"].get(user_id, False)) and ( |
|
|
time() - time_interval |
|
|
) < (UTI := Config.USER_TIME_INTERVAL): |
|
|
return UTI - (time() - time_interval) |
|
|
bot_cache["time_interval"][user_id] = time() |
|
|
return None |
|
|
|
|
|
|
|
|
async def pre_task_check(message): |
|
|
LOGGER.info("Running Pre Task Checks ...") |
|
|
msg = [] |
|
|
button = None |
|
|
if await CustomFilters.sudo("", message): |
|
|
return msg, button |
|
|
user_id = (message.from_user or message.sender_chat).id |
|
|
if Config.RSS_CHAT and user_id == int(Config.RSS_CHAT): |
|
|
return msg, button |
|
|
user_dict = user_data.get(user_id, {}) |
|
|
if message.chat.type != message.chat.type.BOT: |
|
|
if ids := Config.FORCE_SUB_IDS: |
|
|
_msg, button = await forcesub(message, ids, button) |
|
|
if _msg: |
|
|
msg.append(_msg) |
|
|
if Config.BOT_PM or user_dict.get("BOT_PM"): |
|
|
_msg, button = await check_botpm(message, button) |
|
|
if _msg: |
|
|
msg.append(_msg) |
|
|
if (uti := Config.USER_TIME_INTERVAL) and ( |
|
|
ut := await user_interval_check(user_id) |
|
|
): |
|
|
msg.append( |
|
|
f"┠ <b>Waiting Time</b> → {get_readable_time(ut)}\n┠ <i>User's Time Interval Restrictions</i> → {get_readable_time(uti)}" |
|
|
) |
|
|
bmax_tasks = safe_int(user_dict.get("bmax_tasks", Config.BOT_MAX_TASKS)) |
|
|
if bmax_tasks > 0 and len(await get_specific_tasks("All", False)) >= bmax_tasks: |
|
|
msg.append( |
|
|
f"┠ Max Concurrent Bot's Tasks Limit exceeded.\n┠ Bot Tasks Limit : {bmax_tasks} task" |
|
|
) |
|
|
|
|
|
maxtask = safe_int(user_dict.get("maxtask", Config.USER_MAX_TASKS)) |
|
|
if maxtask > 0 and len(await get_specific_tasks("All", user_id)) >= maxtask: |
|
|
msg.append( |
|
|
f"┠ Max Concurrent User's Task(s) Limit exceeded! \n┠ User Task Limit : {maxtask} tasks" |
|
|
) |
|
|
|
|
|
token_msg, button = await verify_token(user_id, button) |
|
|
if token_msg is not None: |
|
|
msg.append(token_msg) |
|
|
|
|
|
if msg: |
|
|
username = message.from_user.mention |
|
|
final_msg = f"⌬ <b>Task Checks :</b>\n│\n┟ <b>Name</b> → {username}\n┃\n" |
|
|
for i, m_part in enumerate(msg, 1): |
|
|
final_msg += f"{m_part}\n" |
|
|
if button is not None: |
|
|
button = button.build_menu(2) |
|
|
return final_msg, button |
|
|
|
|
|
return None, None |
|
|
|