| import re |
| from asyncio import gather, sleep |
| from contextlib import suppress |
| from os import path as ospath, walk |
| from re import sub |
| from secrets import token_hex |
| from shlex import split |
|
|
| from aiofiles.os import listdir, makedirs, remove, path as aiopath |
| from aioshutil import move, rmtree |
| from pyrogram.enums import ChatAction |
|
|
| from .. import ( |
| DOWNLOAD_DIR, |
| LOGGER, |
| cores, |
| cpu_eater_lock, |
| excluded_extensions, |
| intervals, |
| multi_tags, |
| task_dict, |
| task_dict_lock, |
| user_data, |
| ) |
| from ..core.config_manager import Config, BinConfig |
| from ..core.tg_client import TgClient |
| from .ext_utils.bot_utils import get_size_bytes, new_task, sync_to_async |
| from .ext_utils.bulk_links import extract_bulk_links |
| from .ext_utils.files_utils import ( |
| SevenZ, |
| get_base_name, |
| get_path_size, |
| is_archive, |
| is_archive_split, |
| is_first_archive_split, |
| split_file, |
| ) |
| from .ext_utils.links_utils import ( |
| is_gdrive_id, |
| is_gdrive_link, |
| is_rclone_path, |
| is_telegram_link, |
| is_mega_link, |
| ) |
| from .ext_utils.media_utils import ( |
| FFMpeg, |
| create_thumb, |
| get_document_type, |
| take_ss, |
| ) |
| from .ext_utils.metadata_utils import MetadataProcessor |
| from .mirror_leech_utils.gdrive_utils.list import GoogleDriveList |
| from .mirror_leech_utils.rclone_utils.list import RcloneList |
| from .mirror_leech_utils.status_utils.ffmpeg_status import FFmpegStatus |
| from .mirror_leech_utils.status_utils.sevenz_status import SevenZStatus |
| from .telegram_helper.bot_commands import BotCommands |
| from .telegram_helper.message_utils import ( |
| get_tg_link_message, |
| send_message, |
| send_status_message, |
| ) |
|
|
|
|
| class TaskConfig: |
| def __init__(self): |
| self.mid = self.message.id |
| self.user = self.message.from_user or self.message.sender_chat |
| self.user_id = self.user.id |
| self.user_dict = user_data.get(self.user_id, {}) |
| self.metadata_processor = MetadataProcessor() |
| for k in ("METADATA", "AUDIO_METADATA", "VIDEO_METADATA", "SUBTITLE_METADATA"): |
| v = self.user_dict.get(k, {}) |
| if k == "METADATA": |
| k = "default_metadata" |
| if isinstance(v, dict): |
| setattr(self, f"{k.lower()}_dict", v) |
| elif isinstance(v, str): |
| setattr( |
| self, f"{k.lower()}_dict", self.metadata_processor.parse_string(v) |
| ) |
| else: |
| setattr(self, f"{k.lower()}_dict", {}) |
| self.dir = f"{DOWNLOAD_DIR}{self.mid}" |
| self.up_dir = "" |
| self.link = "" |
| self.up_dest = "" |
| self.leech_dest = "" |
| self.rc_flags = "" |
| self.tag = "" |
| self.name = "" |
| self.subname = "" |
| self.name_swap = "" |
| self.thumbnail_layout = "" |
| self.folder_name = "" |
| self.split_size = 0 |
| self.max_split_size = 0 |
| self.multi = 0 |
| self.size = 0 |
| self.subsize = 0 |
| self.proceed_count = 0 |
| self.is_leech = False |
| self.is_yt = False |
| self.is_qbit = False |
| self.is_mega = False |
| self.is_nzb = False |
| self.is_jd = False |
| self.is_clone = False |
| self.is_uphoster = False |
| self.is_gdrive = False |
| self.is_rclone = False |
| self.is_ytdlp = False |
| self.equal_splits = False |
| self.user_transmission = False |
| self.hybrid_leech = False |
| self.extract = False |
| self.compress = False |
| self.select = False |
| self.seed = False |
| self.join = False |
| self.private_link = False |
| self.stop_duplicate = False |
| self.sample_video = False |
| self.convert_audio = False |
| self.convert_video = False |
| self.screen_shots = False |
| self.is_cancelled = False |
| self.force_run = False |
| self.force_download = False |
| self.force_upload = False |
| self.is_torrent = False |
| self.as_med = False |
| self.as_doc = False |
| self.is_file = False |
| self.bot_trans = False |
| self.user_trans = False |
| self.progress = True |
| self.ffmpeg_cmds = None |
| self.metadata_title = None |
| self.chat_thread_id = None |
| self.subproc = None |
| self.thumb = None |
| self.excluded_extensions = [] |
| self.files_to_proceed = [] |
| self.is_super_chat = self.message.chat.type.name in [ |
| "SUPERGROUP", |
| "CHANNEL", |
| "FORUM", |
| ] |
| self.source_url = None |
| self.bot_pm = Config.BOT_PM or self.user_dict.get("BOT_PM") |
| self.pm_msg = None |
| self.file_details = {} |
| self.mode = tuple() |
|
|
| def _set_mode_engine(self): |
| self.source_url = ( |
| self.link |
| if len(self.link) > 0 and self.link.startswith("http") |
| else ( |
| f"https://t.me/share/url?url={self.link}" |
| if self.link |
| else self.message.link |
| ) |
| ) |
|
|
| out_mode = f"#{'Leech' if self.is_leech else 'UphosterUpload' if self.is_uphoster else 'Clone' if self.is_clone else 'RClone' if self.up_dest.startswith('mrcc:') or is_rclone_path(self.up_dest) else 'GDrive' if self.up_dest.startswith(('mtp:', 'tp:', 'sa:')) or is_gdrive_id(self.up_dest) else 'UpHosters'}" |
| out_mode += " (Zip)" if self.compress else " (Unzip)" if self.extract else "" |
|
|
| self.is_rclone = is_rclone_path(self.link) |
| self.is_gdrive = is_gdrive_link(self.source_url) if self.source_url else False |
| self.is_mega = is_mega_link(self.link) if self.source_url else False |
|
|
| in_mode = f"#{'Mega' if self.is_mega else 'qBit' if self.is_qbit else 'SABnzbd' if self.is_nzb else 'JDown' if self.is_jd else 'RCloneDL' if self.is_rclone else 'ytdlp' if self.is_ytdlp else 'GDrive' if (self.is_clone or self.is_gdrive) else 'Aria2' if (self.source_url and self.source_url != self.message.link) else 'TgMedia'}" |
|
|
| self.mode = (in_mode, out_mode) |
|
|
| def get_token_path(self, dest): |
| if dest.startswith("mtp:"): |
| return f"tokens/{self.user_id}.pickle" |
| elif ( |
| dest.startswith("sa:") |
| or Config.USE_SERVICE_ACCOUNTS |
| and not dest.startswith("tp:") |
| ): |
| return "accounts" |
| else: |
| return "token.pickle" |
|
|
| def get_config_path(self, dest): |
| return ( |
| f"rclone/{self.user_id}.conf" if dest.startswith("mrcc:") else "rclone.conf" |
| ) |
|
|
| async def is_token_exists(self, path, status): |
| if is_rclone_path(path): |
| config_path = self.get_config_path(path) |
| if config_path != "rclone.conf" and status == "up": |
| self.private_link = True |
| if not await aiopath.exists(config_path): |
| raise ValueError(f"Rclone Config: {config_path} not Exists!") |
| elif ( |
| status == "dl" |
| and is_gdrive_link(path) |
| or status == "up" |
| and is_gdrive_id(path) |
| ): |
| token_path = self.get_token_path(path) |
| if token_path.startswith("tokens/") and status == "up": |
| self.private_link = True |
| if not await aiopath.exists(token_path): |
| raise ValueError(f"NO TOKEN! {token_path} not Exists!") |
|
|
| async def before_start(self): |
| self.name_swap = ( |
| self.name_swap |
| or self.user_dict.get("NAME_SWAP", False) |
| or (Config.NAME_SWAP if "NAME_SWAP" not in self.user_dict else "") |
| ) |
| if self.name_swap: |
| self.name_swap = [x.split(":") for x in self.name_swap.split("|")] |
| self.excluded_extensions = self.user_dict.get("EXCLUDED_EXTENSIONS") or ( |
| excluded_extensions |
| if "EXCLUDED_EXTENSIONS" not in self.user_dict |
| else ["aria2", "!qB"] |
| ) |
| if not self.rc_flags: |
| if self.user_dict.get("RCLONE_FLAGS"): |
| self.rc_flags = self.user_dict["RCLONE_FLAGS"] |
| elif "RCLONE_FLAGS" not in self.user_dict and Config.RCLONE_FLAGS: |
| self.rc_flags = Config.RCLONE_FLAGS |
| if self.link not in ["rcl", "gdl"]: |
| if not self.is_jd: |
| if is_rclone_path(self.link): |
| if not self.link.startswith("mrcc:") and self.user_dict.get( |
| "USER_TOKENS", False |
| ): |
| self.link = f"mrcc:{self.link}" |
| await self.is_token_exists(self.link, "dl") |
| elif is_gdrive_link(self.link): |
| if not self.link.startswith( |
| ("mtp:", "tp:", "sa:") |
| ) and self.user_dict.get("USER_TOKENS", False): |
| self.link = f"mtp:{self.link}" |
| await self.is_token_exists(self.link, "dl") |
| elif self.link == "rcl": |
| if not self.is_ytdlp and not self.is_jd: |
| self.link = await RcloneList(self).get_rclone_path("rcd") |
| if not is_rclone_path(self.link): |
| raise ValueError(self.link) |
| elif self.link == "gdl": |
| if not self.is_ytdlp and not self.is_jd: |
| self.link = await GoogleDriveList(self).get_target_id("gdd") |
| if not is_gdrive_id(self.link): |
| raise ValueError(self.link) |
|
|
| self.user_transmission = TgClient.IS_PREMIUM_USER and ( |
| self.user_dict.get("USER_TRANSMISSION") |
| or Config.USER_TRANSMISSION |
| and "USER_TRANSMISSION" not in self.user_dict |
| ) |
|
|
| if self.user_dict.get("UPLOAD_PATHS", False): |
| if self.up_dest in self.user_dict["UPLOAD_PATHS"]: |
| self.up_dest = self.user_dict["UPLOAD_PATHS"][self.up_dest] |
| elif "UPLOAD_PATHS" not in self.user_dict and Config.UPLOAD_PATHS: |
| if self.up_dest in Config.UPLOAD_PATHS: |
| self.up_dest = Config.UPLOAD_PATHS[self.up_dest] |
|
|
| if self.ffmpeg_cmds and not isinstance(self.ffmpeg_cmds, list): |
| if self.user_dict.get("FFMPEG_CMDS", None): |
| ffmpeg_dict = self.user_dict["FFMPEG_CMDS"] |
| self.ffmpeg_cmds = [ |
| value |
| for key in list(self.ffmpeg_cmds) |
| if key in ffmpeg_dict |
| for value in ffmpeg_dict[key] |
| ] |
| elif "FFMPEG_CMDS" not in self.user_dict and Config.FFMPEG_CMDS: |
| ffmpeg_dict = Config.FFMPEG_CMDS |
| self.ffmpeg_cmds = [ |
| value |
| for key in list(self.ffmpeg_cmds) |
| if key in ffmpeg_dict |
| for value in ffmpeg_dict[key] |
| ] |
| else: |
| self.ffmpeg_cmds = None |
|
|
| self.metadata_title = self.user_dict.get("METADATA") |
|
|
| if not self.is_leech: |
| self.stop_duplicate = ( |
| self.user_dict.get("STOP_DUPLICATE") |
| or "STOP_DUPLICATE" not in self.user_dict |
| and Config.STOP_DUPLICATE |
| ) |
| default_upload = ( |
| self.user_dict.get("DEFAULT_UPLOAD", "") or Config.DEFAULT_UPLOAD |
| ) |
| if not self.is_uphoster and ( |
| (not self.up_dest and default_upload == "rc") or self.up_dest == "rc" |
| ): |
| self.up_dest = self.user_dict.get("RCLONE_PATH") or Config.RCLONE_PATH |
| elif not self.is_uphoster and ( |
| (not self.up_dest and default_upload == "gd") or self.up_dest == "gd" |
| ): |
| self.up_dest = self.user_dict.get("GDRIVE_ID") or Config.GDRIVE_ID |
|
|
| if self.is_uphoster and not self.up_dest: |
| uphoster_service = self.user_dict.get("UPHOSTER_SERVICE", "gofile") |
| services = uphoster_service.split(",") |
| for service in services: |
| if service == "gofile": |
| if not ( |
| self.user_dict.get("GOFILE_TOKEN") or Config.GOFILE_API |
| ): |
| raise ValueError("No Gofile Token Found!") |
| elif service == "buzzheavier": |
| if not ( |
| self.user_dict.get("BUZZHEAVIER_TOKEN") |
| or Config.BUZZHEAVIER_API |
| ): |
| raise ValueError("No BuzzHeavier Token Found!") |
| elif service == "pixeldrain": |
| if not ( |
| self.user_dict.get("PIXELDRAIN_KEY") |
| or Config.PIXELDRAIN_KEY |
| ): |
| raise ValueError("No PixelDrain Key Found!") |
| self.up_dest = "Uphoster" |
|
|
| if not self.up_dest: |
| raise ValueError("No Upload Destination!") |
|
|
| if is_gdrive_id(self.up_dest): |
| if not self.up_dest.startswith( |
| ("mtp:", "tp:", "sa:") |
| ) and self.user_dict.get("USER_TOKENS", False): |
| self.up_dest = f"mtp:{self.up_dest}" |
| elif is_rclone_path(self.up_dest): |
| if not self.up_dest.startswith("mrcc:") and self.user_dict.get( |
| "USER_TOKENS", False |
| ): |
| self.up_dest = f"mrcc:{self.up_dest}" |
| self.up_dest = self.up_dest.strip("/") |
| elif self.is_uphoster: |
| pass |
| else: |
| raise ValueError("Wrong Upload Destination!") |
|
|
| if self.up_dest not in ["rcl", "gdl"] and not self.is_uphoster: |
| await self.is_token_exists(self.up_dest, "up") |
|
|
| if self.up_dest == "rcl": |
| if self.is_clone: |
| if not is_rclone_path(self.link): |
| raise ValueError( |
| "You can't clone from different types of tools" |
| ) |
| config_path = self.get_config_path(self.link) |
| else: |
| config_path = None |
| self.up_dest = await RcloneList(self).get_rclone_path( |
| "rcu", config_path |
| ) |
| if not is_rclone_path(self.up_dest): |
| raise ValueError(self.up_dest) |
| elif self.up_dest == "gdl": |
| if self.is_clone: |
| if not is_gdrive_link(self.link): |
| raise ValueError( |
| "You can't clone from different types of tools" |
| ) |
| token_path = self.get_token_path(self.link) |
| else: |
| token_path = None |
| self.up_dest = await GoogleDriveList(self).get_target_id( |
| "gdu", token_path |
| ) |
| if not is_gdrive_id(self.up_dest): |
| raise ValueError(self.up_dest) |
| elif self.is_clone: |
| if is_gdrive_link(self.link) and self.get_token_path( |
| self.link |
| ) != self.get_token_path(self.up_dest): |
| raise ValueError("You must use the same token to clone!") |
| elif is_rclone_path(self.link) and self.get_config_path( |
| self.link |
| ) != self.get_config_path(self.up_dest): |
| raise ValueError("You must use the same config to clone!") |
| else: |
| self.leech_dest = self.up_dest or self.user_dict.get("LEECH_DUMP_CHAT") |
| self.up_dest = Config.LEECH_DUMP_CHAT |
| self.hybrid_leech = TgClient.IS_PREMIUM_USER and ( |
| self.user_dict.get("HYBRID_LEECH") |
| or Config.HYBRID_LEECH |
| and "HYBRID_LEECH" not in self.user_dict |
| ) |
| if self.bot_trans: |
| self.user_transmission = False |
| self.hybrid_leech = False |
| if self.user_trans: |
| self.user_transmission = TgClient.IS_PREMIUM_USER |
| if self.up_dest: |
| if not isinstance(self.up_dest, int): |
| if self.up_dest.startswith("b:"): |
| self.up_dest = self.up_dest.replace("b:", "", 1) |
| self.user_transmission = False |
| self.hybrid_leech = False |
| elif self.up_dest.startswith("u:"): |
| self.up_dest = self.up_dest.replace("u:", "", 1) |
| self.user_transmission = TgClient.IS_PREMIUM_USER |
| elif self.up_dest.startswith("h:"): |
| self.up_dest = self.up_dest.replace("h:", "", 1) |
| self.user_transmission = TgClient.IS_PREMIUM_USER |
| self.hybrid_leech = self.user_transmission |
| if "|" in self.up_dest: |
| self.up_dest, self.chat_thread_id = list( |
| map( |
| lambda x: int(x) if x.lstrip("-").isdigit() else x, |
| self.up_dest.split("|", 1), |
| ) |
| ) |
| elif self.up_dest.lstrip("-").isdigit(): |
| self.up_dest = int(self.up_dest) |
| elif self.up_dest.lower() == "pm": |
| self.up_dest = self.user_id |
|
|
| if self.user_transmission: |
| try: |
| chat = await TgClient.user.get_chat(self.up_dest) |
| except Exception: |
| chat = None |
| if chat is None: |
| self.user_transmission = False |
| self.hybrid_leech = False |
| else: |
| uploader_id = TgClient.user.me.id |
| if chat.type.name not in [ |
| "SUPERGROUP", |
| "CHANNEL", |
| "GROUP", |
| "FORUM", |
| ]: |
| self.user_transmission = False |
| self.hybrid_leech = False |
| else: |
| member = await chat.get_member(uploader_id) |
| if ( |
| not member.privileges.can_manage_chat |
| or not member.privileges.can_delete_messages |
| ): |
| self.user_transmission = False |
| self.hybrid_leech = False |
|
|
| if not self.user_transmission or self.hybrid_leech: |
| try: |
| chat = await self.client.get_chat(self.up_dest) |
| except Exception: |
| chat = None |
| if chat is None: |
| if self.user_transmission: |
| self.hybrid_leech = False |
| else: |
| raise ValueError("Chat not found!") |
| else: |
| uploader_id = self.client.me.id |
| if chat.type.name in [ |
| "SUPERGROUP", |
| "CHANNEL", |
| "GROUP", |
| "FORUM", |
| ]: |
| member = await chat.get_member(uploader_id) |
| if ( |
| not member.privileges.can_manage_chat |
| or not member.privileges.can_delete_messages |
| ): |
| if not self.user_transmission: |
| raise ValueError( |
| "You don't have enough privileges in this chat!" |
| ) |
| else: |
| self.hybrid_leech = False |
| else: |
| try: |
| await self.client.send_chat_action( |
| self.up_dest, ChatAction.TYPING |
| ) |
| except Exception: |
| raise ValueError("Start the bot and try again!") |
| elif ( |
| self.user_transmission or self.hybrid_leech |
| ) and not self.is_super_chat: |
| self.user_transmission = False |
| self.hybrid_leech = False |
| if self.split_size: |
| if self.split_size.isdigit(): |
| self.split_size = int(self.split_size) |
| else: |
| self.split_size = get_size_bytes(self.split_size) |
| self.split_size = ( |
| self.split_size |
| or self.user_dict.get("LEECH_SPLIT_SIZE") |
| or Config.LEECH_SPLIT_SIZE |
| ) |
| self.equal_splits = ( |
| self.user_dict.get("EQUAL_SPLITS") |
| or Config.EQUAL_SPLITS |
| and "EQUAL_SPLITS" not in self.user_dict |
| ) |
| self.max_split_size = ( |
| TgClient.MAX_SPLIT_SIZE if self.user_transmission else 2097152000 |
| ) |
| self.split_size = min(self.split_size, self.max_split_size) |
|
|
| if not self.as_doc: |
| self.as_doc = ( |
| not self.as_med |
| if self.as_med |
| else ( |
| self.user_dict.get("AS_DOCUMENT", False) |
| or Config.AS_DOCUMENT |
| and "AS_DOCUMENT" not in self.user_dict |
| ) |
| ) |
|
|
| self.thumbnail_layout = ( |
| self.thumbnail_layout |
| or self.user_dict.get("THUMBNAIL_LAYOUT", False) |
| or ( |
| Config.THUMBNAIL_LAYOUT |
| if "THUMBNAIL_LAYOUT" not in self.user_dict |
| else "" |
| ) |
| ) |
|
|
| if self.thumb != "none" and is_telegram_link(self.thumb): |
| msg = (await get_tg_link_message(self.thumb))[0] |
| self.thumb = ( |
| await create_thumb(msg) if msg.photo or msg.document else "" |
| ) |
|
|
| async def get_tag(self, text: list): |
| if len(text) > 1 and text[1].startswith("Tag: "): |
| user_info = text[1].split("Tag: ") |
| if len(user_info) >= 3: |
| id_ = user_info[-1] |
| self.tag = " ".join(user_info[:-1]) |
| else: |
| self.tag, id_ = text[1].split("Tag: ")[1].split() |
| self.user = self.message.from_user = await self.client.get_users(int(id_)) |
| self.user_id = self.user.id |
| self.user_dict = user_data.get(self.user_id, {}) |
| with suppress(Exception): |
| await self.message.unpin() |
| if self.user: |
| if username := self.user.username: |
| self.tag = f"@{username}" |
| elif hasattr(self.user, "mention"): |
| self.tag = self.user.mention |
| else: |
| self.tag = self.user.title |
|
|
| @new_task |
| async def run_multi(self, input_list, obj): |
| await sleep(7) |
| if not self.multi_tag and self.multi > 1: |
| self.multi_tag = token_hex(3) |
| multi_tags.add(self.multi_tag) |
| elif self.multi <= 1: |
| if self.multi_tag in multi_tags: |
| multi_tags.discard(self.multi_tag) |
| return |
| if self.multi_tag and self.multi_tag not in multi_tags: |
| await send_message( |
| self.message, f"{self.tag} Multi Task has been cancelled!" |
| ) |
| await send_status_message(self.message) |
| async with task_dict_lock: |
| for fd_name in self.same_dir: |
| self.same_dir[fd_name]["total"] -= self.multi |
| return |
| if len(self.bulk) != 0: |
| msg = input_list[:1] |
| msg.append(f"{self.bulk[0]} -i {self.multi - 1} {self.options}") |
| msgts = " ".join(msg) |
| if self.multi > 2: |
| msgts += f"\n• <b>Cancel Multi:</b> <i>/{BotCommands.CancelTaskCommand[1]}_{self.multi_tag}</i>" |
| nextmsg = await send_message(self.message, msgts) |
| else: |
| msg = [s.strip() for s in input_list] |
| index = msg.index("-i") |
| msg[index + 1] = f"{self.multi - 1}" |
| nextmsg = await self.client.get_messages( |
| chat_id=self.message.chat.id, |
| message_ids=self.message.reply_to_message_id + 1, |
| ) |
| msgts = " ".join(msg) |
| if self.multi > 2: |
| msgts += f"\n• <b>Cancel Multi:</b> <i>/{BotCommands.CancelTaskCommand[1]}_{self.multi_tag}</i>" |
| nextmsg = await send_message(nextmsg, msgts) |
| nextmsg = await self.client.get_messages( |
| chat_id=self.message.chat.id, message_ids=nextmsg.id |
| ) |
| if self.message.from_user: |
| nextmsg.from_user = self.user |
| else: |
| nextmsg.sender_chat = self.user |
| if intervals["stopAll"]: |
| return |
|
|
| await obj( |
| client=self.client, |
| message=nextmsg, |
| is_qbit=self.is_qbit, |
| is_leech=self.is_leech, |
| is_jd=self.is_jd, |
| is_nzb=self.is_nzb, |
| is_uphoster=self.is_uphoster, |
| same_dir=self.same_dir, |
| bulk=self.bulk, |
| multi_tag=self.multi_tag, |
| options=self.options, |
| ).new_event() |
|
|
| async def init_bulk(self, input_list, bulk_start, bulk_end, obj): |
| if Config.DISABLE_BULK: |
| await send_message(self.message, "Bulk downloads are currently disabled.") |
| return |
| try: |
| self.bulk = await extract_bulk_links(self.message, bulk_start, bulk_end) |
| if len(self.bulk) == 0: |
| raise ValueError("Bulk Empty!") |
| b_msg = input_list[:1] |
| self.options = input_list[1:] |
| index = self.options.index("-b") |
| del self.options[index] |
| if bulk_start or bulk_end: |
| del self.options[index] |
| self.options = " ".join(self.options) |
| b_msg.append(f"{self.bulk[0]} -i {len(self.bulk)} {self.options}") |
| msg = " ".join(b_msg) |
| if len(self.bulk) > 2: |
| self.multi_tag = token_hex(3) |
| multi_tags.add(self.multi_tag) |
| msg += f"\n• <b>Cancel Multi:</b> <i>/{BotCommands.CancelTaskCommand[1]}_{self.multi_tag}</i>" |
| nextmsg = await send_message(self.message, msg) |
| nextmsg = await self.client.get_messages( |
| chat_id=self.message.chat.id, message_ids=nextmsg.id |
| ) |
| if self.message.from_user: |
| nextmsg.from_user = self.user |
| else: |
| nextmsg.sender_chat = self.user |
|
|
| await obj( |
| client=self.client, |
| message=nextmsg, |
| is_qbit=self.is_qbit, |
| is_leech=self.is_leech, |
| is_jd=self.is_jd, |
| is_nzb=self.is_nzb, |
| is_uphoster=self.is_uphoster, |
| same_dir=self.same_dir, |
| bulk=self.bulk, |
| multi_tag=self.multi_tag, |
| options=self.options, |
| ).new_event() |
| except Exception: |
| await send_message( |
| self.message, |
| "Reply to text file or to telegram message that have links seperated by new line!", |
| ) |
|
|
| async def proceed_extract(self, dl_path, gid): |
| pswd = self.extract if isinstance(self.extract, str) else "" |
| self.files_to_proceed = [] |
| if self.is_file and is_archive(dl_path): |
| self.files_to_proceed.append(dl_path) |
| else: |
| for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): |
| for file_ in files: |
| if ( |
| is_first_archive_split(file_) |
| or is_archive(file_) |
| and not file_.strip().lower().endswith(".rar") |
| ): |
| f_path = ospath.join(dirpath, file_) |
| self.files_to_proceed.append(f_path) |
|
|
| if not self.files_to_proceed: |
| return dl_path |
| sevenz = SevenZ(self) |
| LOGGER.info(f"Extracting: {self.name}") |
| async with task_dict_lock: |
| task_dict[self.mid] = SevenZStatus(self, sevenz, gid, "Extract") |
| for dirpath, _, files in await sync_to_async( |
| walk, self.up_dir or self.dir, topdown=False |
| ): |
| code = 0 |
| for file_ in files: |
| if self.is_cancelled: |
| return False |
| if ( |
| is_first_archive_split(file_) |
| or is_archive(file_) |
| and not file_.strip().lower().endswith(".rar") |
| ): |
| self.proceed_count += 1 |
| f_path = ospath.join(dirpath, file_) |
| t_path = get_base_name(f_path) if self.is_file else dirpath |
| if not self.is_file: |
| self.subname = file_ |
| code = await sevenz.extract(f_path, t_path, pswd) |
| if self.is_cancelled: |
| return code |
| if code == 0: |
| for file_ in files: |
| if is_archive_split(file_) or is_archive(file_): |
| del_path = ospath.join(dirpath, file_) |
| try: |
| await remove(del_path) |
| except Exception: |
| self.is_cancelled = True |
| return t_path if self.is_file and code == 0 else dl_path |
|
|
| async def proceed_ffmpeg(self, dl_path, gid): |
| checked = False |
| cmds = [ |
| [part.strip() for part in split(item) if part.strip()] |
| for item in self.ffmpeg_cmds |
| ] |
| try: |
| ffmpeg = FFMpeg(self) |
| for ffmpeg_cmd in cmds: |
| self.proceed_count = 0 |
| cmd = [ |
| "taskset", |
| "-c", |
| f"{cores}", |
| BinConfig.FFMPEG_NAME, |
| "-hide_banner", |
| "-loglevel", |
| "error", |
| "-progress", |
| "pipe:1", |
| ] + ffmpeg_cmd |
| if "-del" in cmd: |
| cmd.remove("-del") |
| delete_files = True |
| else: |
| delete_files = False |
| index = cmd.index("-i") |
| input_file = cmd[index + 1] |
| if input_file.strip().endswith(".video"): |
| ext = "video" |
| elif input_file.strip().endswith(".audio"): |
| ext = "audio" |
| elif "." not in input_file: |
| ext = "all" |
| else: |
| ext = ospath.splitext(input_file)[-1].lower() |
| if await aiopath.isfile(dl_path): |
| is_video, is_audio, _ = await get_document_type(dl_path) |
| if not is_video and not is_audio: |
| break |
| elif is_video and ext == "audio": |
| break |
| elif is_audio and not is_video and ext == "video": |
| break |
| elif ext not in [ |
| "all", |
| "audio", |
| "video", |
| ] and not dl_path.strip().lower().endswith(ext): |
| break |
| new_folder = ospath.splitext(dl_path)[0] |
| if await aiopath.isfile(new_folder): |
| new_folder = f"{new_folder}_temp" |
| name = ospath.basename(dl_path) |
| await makedirs(new_folder, exist_ok=True) |
| file_path = f"{new_folder}/{name}" |
| await move(dl_path, file_path) |
| if not checked: |
| checked = True |
| async with task_dict_lock: |
| task_dict[self.mid] = FFmpegStatus( |
| self, ffmpeg, gid, "FFmpeg" |
| ) |
| self.progress = False |
| await cpu_eater_lock.acquire() |
| self.progress = True |
| LOGGER.info(f"Running ffmpeg cmd for: {file_path}") |
| var_cmd = cmd.copy() |
| var_cmd[index + 1] = file_path |
| self.subsize = self.size |
| res = await ffmpeg.ffmpeg_cmds(var_cmd, file_path) |
| if res: |
| if delete_files: |
| await remove(file_path) |
| if len(await listdir(new_folder)) == 1: |
| folder = new_folder.rsplit("/", 1)[0] |
| self.name = ospath.basename(res[0]) |
| if self.name.startswith("ffmpeg"): |
| self.name = self.name.split(".", 1)[-1] |
| dl_path = ospath.join(folder, self.name) |
| await move(res[0], dl_path) |
| await rmtree(new_folder) |
| else: |
| dl_path = new_folder |
| self.name = new_folder.rsplit("/", 1)[-1] |
| else: |
| dl_path = new_folder |
| self.name = new_folder.rsplit("/", 1)[-1] |
| else: |
| await move(file_path, dl_path) |
| await rmtree(new_folder) |
| else: |
| for dirpath, _, files in await sync_to_async( |
| walk, dl_path, topdown=False |
| ): |
| for file_ in files: |
| var_cmd = cmd.copy() |
| if self.is_cancelled: |
| return False |
| f_path = ospath.join(dirpath, file_) |
| is_video, is_audio, _ = await get_document_type(f_path) |
| if not is_video and not is_audio: |
| continue |
| elif is_video and ext == "audio": |
| continue |
| elif is_audio and not is_video and ext == "video": |
| continue |
| elif ext not in [ |
| "all", |
| "audio", |
| "video", |
| ] and not f_path.strip().lower().endswith(ext): |
| continue |
| self.proceed_count += 1 |
| var_cmd[index + 1] = f_path |
| if not checked: |
| checked = True |
| async with task_dict_lock: |
| task_dict[self.mid] = FFmpegStatus( |
| self, ffmpeg, gid, "FFmpeg" |
| ) |
| self.progress = False |
| await cpu_eater_lock.acquire() |
| self.progress = True |
| LOGGER.info(f"Running ffmpeg cmd for: {f_path}") |
| self.subsize = await get_path_size(f_path) |
| self.subname = file_ |
| res = await ffmpeg.ffmpeg_cmds(var_cmd, f_path) |
| if res and delete_files: |
| await remove(f_path) |
| if len(res) == 1: |
| file_name = ospath.basename(res[0]) |
| if file_name.startswith("ffmpeg"): |
| newname = file_name.split(".", 1)[-1] |
| newres = ospath.join(dirpath, newname) |
| await move(res[0], newres) |
| finally: |
| if checked: |
| cpu_eater_lock.release() |
| return dl_path |
|
|
| async def substitute(self, dl_path): |
| def perform_swap(name, swaps): |
| name, ext = ospath.splitext(name) |
| name = sub(r"www\S+", "", name) |
| for swap in swaps: |
| pattern, res, cnt, sen = ( |
| swap + ["", "0", "NOFLAG"][min(len(swap) - 1, 2) :] |
| )[0:4] |
| cnt = 0 if len(cnt) == 0 else int(cnt) |
| try: |
| name = sub( |
| rf"{pattern}", res, name, cnt, flags=getattr(re, sen.upper(), 0) |
| ) |
| except Exception as e: |
| LOGGER.error( |
| f"Swap Error: pattern: {pattern} res: {res}. Error: {e}" |
| ) |
| return False |
| if len(name.encode()) > 255: |
| LOGGER.error(f"Substitute: {name} is too long") |
| return False |
| return name + ext |
|
|
| if self.is_file: |
| up_dir, name = dl_path.rsplit("/", 1) |
| new_name = perform_swap(name, self.name_swap) |
| if not new_name: |
| return dl_path |
| new_path = ospath.join(up_dir, new_name) |
| await move(dl_path, new_path) |
| return new_path |
| else: |
| for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): |
| for file_ in files: |
| f_path = ospath.join(dirpath, file_) |
| new_name = perform_swap(file_, self.name_swap) |
| if not new_name: |
| continue |
| await move(f_path, ospath.join(dirpath, new_name)) |
| return dl_path |
|
|
| async def generate_screenshots(self, dl_path): |
| ss_nb = int(self.screen_shots) if isinstance(self.screen_shots, str) else 10 |
| if self.is_file: |
| if (await get_document_type(dl_path))[0]: |
| LOGGER.info(f"Creating Screenshot for: {dl_path}") |
| res = await take_ss(dl_path, ss_nb) |
| if res: |
| new_folder = ospath.splitext(dl_path)[0] |
| if await aiopath.isfile(new_folder): |
| new_folder = f"{new_folder}_temp" |
| name = ospath.basename(dl_path) |
| await makedirs(new_folder, exist_ok=True) |
| await gather( |
| move(dl_path, f"{new_folder}/{name}"), |
| move(res, new_folder), |
| ) |
| return new_folder |
| else: |
| LOGGER.info(f"Creating Screenshot for: {dl_path}") |
| for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): |
| for file_ in files: |
| f_path = ospath.join(dirpath, file_) |
| if (await get_document_type(f_path))[0]: |
| await take_ss(f_path, ss_nb) |
| return dl_path |
|
|
| async def convert_media(self, dl_path, gid): |
| fvext = [] |
| if self.convert_video: |
| vdata = self.convert_video.split() |
| vext = vdata[0].lower() |
| if len(vdata) > 2: |
| if "+" in vdata[1].split(): |
| vstatus = "+" |
| elif "-" in vdata[1].split(): |
| vstatus = "-" |
| else: |
| vstatus = "" |
| fvext.extend(f".{ext.lower()}" for ext in vdata[2:]) |
| else: |
| vstatus = "" |
| else: |
| vext = "" |
| vstatus = "" |
|
|
| faext = [] |
| if self.convert_audio: |
| adata = self.convert_audio.split() |
| aext = adata[0].lower() |
| if len(adata) > 2: |
| if "+" in adata[1].split(): |
| astatus = "+" |
| elif "-" in adata[1].split(): |
| astatus = "-" |
| else: |
| astatus = "" |
| faext.extend(f".{ext.lower()}" for ext in adata[2:]) |
| else: |
| astatus = "" |
| else: |
| aext = "" |
| astatus = "" |
|
|
| self.files_to_proceed = {} |
| all_files = [] |
| if self.is_file: |
| all_files.append(dl_path) |
| else: |
| for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): |
| for file_ in files: |
| f_path = ospath.join(dirpath, file_) |
| all_files.append(f_path) |
|
|
| for f_path in all_files: |
| is_video, is_audio, _ = await get_document_type(f_path) |
| if ( |
| is_video |
| and vext |
| and not f_path.strip().lower().endswith(f".{vext}") |
| and ( |
| vstatus == "+" |
| and f_path.strip().lower().endswith(tuple(fvext)) |
| or vstatus == "-" |
| and not f_path.strip().lower().endswith(tuple(fvext)) |
| or not vstatus |
| ) |
| ): |
| self.files_to_proceed[f_path] = "video" |
| elif ( |
| is_audio |
| and aext |
| and not is_video |
| and not f_path.strip().lower().endswith(f".{aext}") |
| and ( |
| astatus == "+" |
| and f_path.strip().lower().endswith(tuple(faext)) |
| or astatus == "-" |
| and not f_path.strip().lower().endswith(tuple(faext)) |
| or not astatus |
| ) |
| ): |
| self.files_to_proceed[f_path] = "audio" |
| del all_files |
|
|
| if self.files_to_proceed: |
| ffmpeg = FFMpeg(self) |
| async with task_dict_lock: |
| task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Convert") |
| self.progress = False |
| async with cpu_eater_lock: |
| self.progress = True |
| for f_path, f_type in self.files_to_proceed.items(): |
| self.proceed_count += 1 |
| LOGGER.info(f"Converting: {f_path}") |
| if self.is_file: |
| self.subsize = self.size |
| else: |
| self.subsize = await get_path_size(f_path) |
| self.subname = ospath.basename(f_path) |
| if f_type == "video": |
| res = await ffmpeg.convert_video(f_path, vext) |
| else: |
| res = await ffmpeg.convert_audio(f_path, aext) |
| if res: |
| try: |
| await remove(f_path) |
| except Exception: |
| self.is_cancelled = True |
| return False |
| if self.is_file: |
| return res |
| return dl_path |
|
|
| async def generate_sample_video(self, dl_path, gid): |
| data = ( |
| self.sample_video.split(":") if isinstance(self.sample_video, str) else "" |
| ) |
| if data: |
| sample_duration = int(data[0]) if data[0] else 60 |
| part_duration = int(data[1]) if len(data) > 1 else 4 |
| else: |
| sample_duration = 60 |
| part_duration = 4 |
|
|
| self.files_to_proceed = {} |
| if self.is_file and (await get_document_type(dl_path))[0]: |
| file_ = ospath.basename(dl_path) |
| self.files_to_proceed[dl_path] = file_ |
| else: |
| for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): |
| for file_ in files: |
| f_path = ospath.join(dirpath, file_) |
| if (await get_document_type(f_path))[0]: |
| self.files_to_proceed[f_path] = file_ |
| if self.files_to_proceed: |
| ffmpeg = FFMpeg(self) |
| async with task_dict_lock: |
| task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Sample Video") |
| self.progress = False |
| async with cpu_eater_lock: |
| self.progress = True |
| LOGGER.info(f"Creating Sample video: {self.name}") |
| for f_path, file_ in self.files_to_proceed.items(): |
| self.proceed_count += 1 |
| if self.is_file: |
| self.subsize = self.size |
| else: |
| self.subsize = await get_path_size(f_path) |
| self.subname = file_ |
| res = await ffmpeg.sample_video( |
| f_path, sample_duration, part_duration |
| ) |
| if res and self.is_file: |
| new_folder = ospath.splitext(f_path)[0] |
| if await aiopath.isfile(new_folder): |
| new_folder = f"{new_folder}_temp" |
| await makedirs(new_folder, exist_ok=True) |
| await gather( |
| move(f_path, f"{new_folder}/{file_}"), |
| move(res, f"{new_folder}/SAMPLE.{file_}"), |
| ) |
| return new_folder |
| return dl_path |
|
|
| async def proceed_compress(self, dl_path, gid): |
| pswd = self.compress if isinstance(self.compress, str) else "" |
| if self.is_leech and self.is_file: |
| new_folder = ospath.splitext(dl_path)[0] |
| if await aiopath.isfile(new_folder): |
| new_folder = f"{new_folder}_temp" |
| name = ospath.basename(dl_path) |
| await makedirs(new_folder, exist_ok=True) |
| new_dl_path = f"{new_folder}/{name}" |
| await move(dl_path, new_dl_path) |
| dl_path = new_dl_path |
| up_path = f"{new_dl_path}.zip" |
| self.is_file = False |
| else: |
| up_path = f"{dl_path}.zip" |
| sevenz = SevenZ(self) |
| async with task_dict_lock: |
| task_dict[self.mid] = SevenZStatus(self, sevenz, gid, "Zip") |
| return await sevenz.zip(dl_path, up_path, pswd) |
|
|
| async def proceed_split(self, dl_path, gid): |
| self.files_to_proceed = {} |
| if self.is_file: |
| f_size = await get_path_size(dl_path) |
| if f_size > self.split_size: |
| self.files_to_proceed[dl_path] = [f_size, ospath.basename(dl_path)] |
| else: |
| for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): |
| for file_ in files: |
| f_path = ospath.join(dirpath, file_) |
| f_size = await get_path_size(f_path) |
| if f_size > self.split_size: |
| self.files_to_proceed[f_path] = [f_size, file_] |
| if self.files_to_proceed: |
| ffmpeg = FFMpeg(self) |
| async with task_dict_lock: |
| task_dict[self.mid] = FFmpegStatus(self, ffmpeg, gid, "Split") |
| LOGGER.info(f"Splitting: {self.name}") |
| for f_path, (f_size, file_) in self.files_to_proceed.items(): |
| self.proceed_count += 1 |
| if self.is_file: |
| self.subsize = self.size |
| else: |
| self.subsize = f_size |
| self.subname = file_ |
| parts = -(-f_size // self.split_size) |
| if self.equal_splits: |
| split_size = (f_size // parts) + (f_size % parts) |
| else: |
| split_size = self.split_size |
| if not self.as_doc and (await get_document_type(f_path))[0]: |
| self.progress = True |
| res = await ffmpeg.split(f_path, file_, parts, split_size) |
| else: |
| self.progress = False |
| res = await split_file(f_path, split_size, self) |
| if self.is_cancelled: |
| return False |
| if res or f_size >= self.max_split_size: |
| try: |
| await remove(f_path) |
| except Exception: |
| self.is_cancelled = True |
|
|
| def parse_metadata_string(self, metadata_str): |
| return self.metadata_processor.parse_string(metadata_str) |
|
|
| def merge_metadata_dicts(self, default_dict, cmd_dict): |
| return self.metadata_processor.merge_dicts(default_dict, cmd_dict) |
|
|