Spaces:
Running
Running
| # Ultroid - UserBot | |
| # Copyright (C) 2021-2025 TeamUltroid | |
| # | |
| # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
| # PLease read the GNU Affero General Public License in | |
| # <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>. | |
| import glob | |
| import os | |
| import re | |
| import time | |
| from telethon import Button | |
| try: | |
| from youtubesearchpython import Playlist, VideosSearch | |
| except ImportError: | |
| Playlist, VideosSearch = None, None | |
| from yt_dlp import YoutubeDL | |
| from .. import LOGS, udB | |
| from .helper import download_file, humanbytes, run_async, time_formatter | |
| from .tools import set_attributes | |
| async def ytdl_progress(k, start_time, event): | |
| if k["status"] == "error": | |
| return await event.edit("error") | |
| while k["status"] == "downloading": | |
| text = ( | |
| f"`Downloading: {k['filename']}\n" | |
| + f"Total Size: {humanbytes(k['total_bytes'])}\n" | |
| + f"Downloaded: {humanbytes(k['downloaded_bytes'])}\n" | |
| + f"Speed: {humanbytes(k['speed'])}/s\n" | |
| + f"ETA: {time_formatter(k['eta']*1000)}`" | |
| ) | |
| if round((time.time() - start_time) % 10.0) == 0: | |
| try: | |
| await event.edit(text) | |
| except Exception as ex: | |
| LOGS.error(f"ytdl_progress: {ex}") | |
| def get_yt_link(query): | |
| search = VideosSearch(query, limit=1).result() | |
| try: | |
| return search["result"][0]["link"] | |
| except IndexError: | |
| return | |
| async def download_yt(event, link, ytd): | |
| reply_to = event.reply_to_msg_id or event | |
| info = await dler(event, link, ytd, download=True) | |
| if not info: | |
| return | |
| if info.get("_type", None) == "playlist": | |
| total = info["playlist_count"] | |
| for num, file in enumerate(info["entries"]): | |
| num += 1 | |
| id_ = file["id"] | |
| thumb = id_ + ".jpg" | |
| title = file["title"] | |
| await download_file( | |
| file.get("thumbnail", None) or file["thumbnails"][-1]["url"], thumb | |
| ) | |
| ext = "." + ytd["outtmpl"]["default"].split(".")[-1] | |
| if ext == ".m4a": | |
| ext = ".mp3" | |
| id = None | |
| for x in glob.glob(f"{id_}*"): | |
| if not x.endswith("jpg"): | |
| id = x | |
| if not id: | |
| return | |
| ext = "." + id.split(".")[-1] | |
| file = title + ext | |
| try: | |
| os.rename(id, file) | |
| except FileNotFoundError: | |
| try: | |
| os.rename(id + ext, file) | |
| except FileNotFoundError as er: | |
| if os.path.exists(id): | |
| file = id | |
| else: | |
| raise er | |
| if file.endswith(".part"): | |
| os.remove(file) | |
| os.remove(thumb) | |
| await event.client.send_message( | |
| event.chat_id, | |
| f"`[{num}/{total}]` `Invalid Video format.\nIgnoring that...`", | |
| ) | |
| return | |
| attributes = await set_attributes(file) | |
| res, _ = await event.client.fast_uploader( | |
| file, show_progress=True, event=event, to_delete=True | |
| ) | |
| from_ = info["extractor"].split(":")[0] | |
| caption = f"`[{num}/{total}]` `{title}`\n\n`from {from_}`" | |
| await event.client.send_file( | |
| event.chat_id, | |
| file=res, | |
| caption=caption, | |
| attributes=attributes, | |
| supports_streaming=True, | |
| thumb=thumb, | |
| reply_to=reply_to, | |
| ) | |
| os.remove(thumb) | |
| try: | |
| await event.delete() | |
| except BaseException: | |
| pass | |
| return | |
| title = info["title"] | |
| if len(title) > 20: | |
| title = title[:17] + "..." | |
| id_ = info["id"] | |
| thumb = id_ + ".jpg" | |
| await download_file( | |
| info.get("thumbnail", None) or f"https://i.ytimg.com/vi/{id_}/hqdefault.jpg", | |
| thumb, | |
| ) | |
| ext = "." + ytd["outtmpl"]["default"].split(".")[-1] | |
| for _ext in [".m4a", ".mp3", ".opus"]: | |
| if ext == _ext: | |
| ext = _ext | |
| break | |
| id = None | |
| for x in glob.glob(f"{id_}*"): | |
| if not x.endswith("jpg"): | |
| id = x | |
| if not id: | |
| return | |
| ext = "." + id.split(".")[-1] | |
| file = title + ext | |
| try: | |
| os.rename(id, file) | |
| except FileNotFoundError: | |
| os.rename(id + ext, file) | |
| attributes = await set_attributes(file) | |
| res, _ = await event.client.fast_uploader( | |
| file, show_progress=True, event=event, to_delete=True | |
| ) | |
| caption = f"`{info['title']}`" | |
| await event.client.send_file( | |
| event.chat_id, | |
| file=res, | |
| caption=caption, | |
| attributes=attributes, | |
| supports_streaming=True, | |
| thumb=thumb, | |
| reply_to=reply_to, | |
| ) | |
| os.remove(thumb) | |
| try: | |
| await event.delete() | |
| except BaseException: | |
| pass | |
| # ---------------YouTube Downloader Inline--------------- | |
| # @New-Dev0 @buddhhu @1danish-00 | |
| def get_formats(type, id, data): | |
| if type == "audio": | |
| audio = [] | |
| for _quality in ["64", "128", "256", "320"]: | |
| _audio = {} | |
| _audio.update( | |
| { | |
| "ytid": id, | |
| "type": "audio", | |
| "id": _quality, | |
| "quality": _quality + "KBPS", | |
| } | |
| ) | |
| audio.append(_audio) | |
| return audio | |
| if type == "video": | |
| video = [] | |
| size = 0 | |
| for vid in data["formats"]: | |
| if vid["format_id"] == "251": | |
| size += vid["filesize"] if vid.get("filesize") else 0 | |
| if vid["vcodec"] != "none": | |
| _id = int(vid["format_id"]) | |
| _quality = str(vid["width"]) + "×" + str(vid["height"]) | |
| _size = size + (vid["filesize"] if vid.get("filesize") else 0) | |
| _ext = "mkv" if vid["ext"] == "webm" else "mp4" | |
| if _size < 2147483648: # Telegram's Limit of 2GB | |
| _video = {} | |
| _video.update( | |
| { | |
| "ytid": id, | |
| "type": "video", | |
| "id": str(_id) + "+251", | |
| "quality": _quality, | |
| "size": _size, | |
| "ext": _ext, | |
| } | |
| ) | |
| video.append(_video) | |
| return video | |
| return [] | |
| def get_buttons(listt): | |
| id = listt[0]["ytid"] | |
| butts = [ | |
| Button.inline( | |
| text=f"[{x['quality']}" | |
| + (f" {humanbytes(x['size'])}]" if x.get("size") else "]"), | |
| data=f"ytdownload:{x['type']}:{x['id']}:{x['ytid']}" | |
| + (f":{x['ext']}" if x.get("ext") else ""), | |
| ) | |
| for x in listt | |
| ] | |
| buttons = list(zip(butts[::2], butts[1::2])) | |
| if len(butts) % 2 == 1: | |
| buttons.append((butts[-1],)) | |
| buttons.append([Button.inline("« Back", f"ytdl_back:{id}")]) | |
| return buttons | |
| async def dler(event, url, opts: dict = {}, download=False): | |
| await event.edit("`Getting Data...`") | |
| if "quiet" not in opts: | |
| opts["quiet"] = True | |
| opts["username"] = udB.get_key("YT_USERNAME") | |
| opts["password"] = udB.get_key("YT_PASSWORD") | |
| if download: | |
| await ytdownload(url, opts) | |
| try: | |
| return await extract_info(url, opts) | |
| except Exception as e: | |
| await event.edit(f"{type(e)}: {e}") | |
| return | |
| def ytdownload(url, opts): | |
| try: | |
| return YoutubeDL(opts).download([url]) | |
| except Exception as ex: | |
| LOGS.error(ex) | |
| def extract_info(url, opts): | |
| return YoutubeDL(opts).extract_info(url=url, download=False) | |
| def get_videos_link(url): | |
| to_return = [] | |
| regex = re.search(r"\?list=([(\w+)\-]*)", url) | |
| if not regex: | |
| return to_return | |
| playlist_id = regex.group(1) | |
| videos = Playlist(playlist_id) | |
| for vid in videos.videos: | |
| link = re.search(r"\?v=([(\w+)\-]*)", vid["link"]).group(1) | |
| to_return.append(f"https://youtube.com/watch?v={link}") | |
| return to_return | |