Spaces:
Running
Running
| # Ultroid - UserBot | |
| # Copyright (C) 2021-2025 TeamUltroid | |
| # | |
| # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
| # PLease read the GNU Affero General Public License in | |
| # <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>. | |
| import os | |
| import re | |
| try: | |
| from PIL import Image | |
| except ImportError: | |
| Image = None | |
| from telethon import Button | |
| from telethon.errors.rpcerrorlist import FilePartLengthInvalidError, MediaEmptyError | |
| from telethon.tl.types import DocumentAttributeAudio, DocumentAttributeVideo | |
| from telethon.tl.types import InputWebDocument as wb | |
| from pyUltroid.fns.helper import ( | |
| bash, | |
| fast_download, | |
| humanbytes, | |
| numerize, | |
| time_formatter, | |
| ) | |
| from pyUltroid.fns.ytdl import dler, get_buttons, get_formats | |
| from . import LOGS, asst, callback, in_pattern, udB | |
| try: | |
| from youtubesearchpython import VideosSearch | |
| except ImportError: | |
| LOGS.info("'youtubesearchpython' not installed!") | |
| VideosSearch = None | |
| ytt = "https://graph.org/file/afd04510c13914a06dd03.jpg" | |
| _yt_base_url = "https://www.youtube.com/watch?v=" | |
| BACK_BUTTON = {} | |
| async def _(event): | |
| try: | |
| string = event.text.split(" ", maxsplit=1)[1] | |
| except IndexError: | |
| fuk = event.builder.article( | |
| title="Search Something", | |
| thumb=wb(ytt, 0, "image/jpeg", []), | |
| text="**Yα΄α΄Tα΄Κα΄ Sα΄α΄Κα΄Κ**\n\nYou didn't search anything", | |
| buttons=Button.switch_inline( | |
| "Sα΄α΄Κα΄Κ AΙ’α΄ΙͺΙ΄", | |
| query="yt ", | |
| same_peer=True, | |
| ), | |
| ) | |
| await event.answer([fuk]) | |
| return | |
| results = [] | |
| search = VideosSearch(string, limit=50) | |
| nub = search.result() | |
| nibba = nub["result"] | |
| for v in nibba: | |
| ids = v["id"] | |
| link = _yt_base_url + ids | |
| title = v["title"] | |
| duration = v["duration"] | |
| views = v["viewCount"]["short"] | |
| publisher = v["channel"]["name"] | |
| published_on = v["publishedTime"] | |
| description = ( | |
| v["descriptionSnippet"][0]["text"] | |
| if v.get("descriptionSnippet") | |
| and len(v["descriptionSnippet"][0]["text"]) < 500 | |
| else "None" | |
| ) | |
| thumb = f"https://i.ytimg.com/vi/{ids}/hqdefault.jpg" | |
| text = f"**Title: [{title}]({link})**\n\n" | |
| text += f"`Description: {description}\n\n" | |
| text += f"γ Duration: {duration} γ\n" | |
| text += f"γ Views: {views} γ\n" | |
| text += f"γ Publisher: {publisher} γ\n" | |
| text += f"γ Published on: {published_on} γ`" | |
| desc = f"{title}\n{duration}" | |
| file = wb(thumb, 0, "image/jpeg", []) | |
| buttons = [ | |
| [ | |
| Button.inline("Audio", data=f"ytdl:audio:{ids}"), | |
| Button.inline("Video", data=f"ytdl:video:{ids}"), | |
| ], | |
| [ | |
| Button.switch_inline( | |
| "Sα΄α΄Κα΄Κ AΙ’α΄ΙͺΙ΄", | |
| query="yt ", | |
| same_peer=True, | |
| ), | |
| Button.switch_inline( | |
| "SΚα΄Κα΄", | |
| query=f"yt {string}", | |
| same_peer=False, | |
| ), | |
| ], | |
| ] | |
| BACK_BUTTON.update({ids: {"text": text, "buttons": buttons}}) | |
| results.append( | |
| await event.builder.article( | |
| type="photo", | |
| title=title, | |
| description=desc, | |
| thumb=file, | |
| content=file, | |
| text=text, | |
| include_media=True, | |
| buttons=buttons, | |
| ), | |
| ) | |
| await event.answer(results[:50]) | |
| async def _(e): | |
| _e = e.pattern_match.group(1).strip().decode("UTF-8") | |
| _lets_split = _e.split(":") | |
| _ytdl_data = await dler(e, _yt_base_url + _lets_split[1]) | |
| _data = get_formats(_lets_split[0], _lets_split[1], _ytdl_data) | |
| _buttons = get_buttons(_data) | |
| _text = ( | |
| "`Select Your Format.`" | |
| if _buttons | |
| else "`Error downloading from YouTube.\nTry Restarting your bot.`" | |
| ) | |
| await e.edit(_text, buttons=_buttons) | |
| async def _(event): | |
| url = event.pattern_match.group(1).strip().decode("UTF-8") | |
| lets_split = url.split(":") | |
| vid_id = lets_split[2] | |
| link = _yt_base_url + vid_id | |
| format = lets_split[1] | |
| try: | |
| ext = lets_split[3] | |
| except IndexError: | |
| ext = "mp3" | |
| if lets_split[0] == "audio": | |
| opts = { | |
| "format": "bestaudio", | |
| "addmetadata": True, | |
| "key": "FFmpegMetadata", | |
| "prefer_ffmpeg": True, | |
| "geo_bypass": True, | |
| "outtmpl": f"%(id)s.{ext}", | |
| "logtostderr": False, | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": ext, | |
| "preferredquality": format, | |
| }, | |
| {"key": "FFmpegMetadata"}, | |
| ], | |
| } | |
| ytdl_data = await dler(event, link, opts, True) | |
| title = ytdl_data["title"] | |
| if ytdl_data.get("artist"): | |
| artist = ytdl_data["artist"] | |
| elif ytdl_data.get("creator"): | |
| artist = ytdl_data["creator"] | |
| elif ytdl_data.get("channel"): | |
| artist = ytdl_data["channel"] | |
| views = numerize(ytdl_data.get("view_count")) or 0 | |
| thumb, _ = await fast_download(ytdl_data["thumbnail"], filename=f"{vid_id}.jpg") | |
| likes = numerize(ytdl_data.get("like_count")) or 0 | |
| duration = ytdl_data.get("duration") or 0 | |
| description = ( | |
| ytdl_data["description"] | |
| if len(ytdl_data["description"]) < 100 | |
| else ytdl_data["description"][:100] | |
| ) | |
| description = description or "None" | |
| filepath = f"{vid_id}.{ext}" | |
| if not os.path.exists(filepath): | |
| filepath = f"{filepath}.{ext}" | |
| size = os.path.getsize(filepath) | |
| file, _ = await event.client.fast_uploader( | |
| filepath, | |
| filename=f"{title}.{ext}", | |
| show_progress=True, | |
| event=event, | |
| to_delete=True, | |
| ) | |
| attributes = [ | |
| DocumentAttributeAudio( | |
| duration=int(duration), | |
| title=title, | |
| performer=artist, | |
| ), | |
| ] | |
| elif lets_split[0] == "video": | |
| opts = { | |
| "format": str(format), | |
| "addmetadata": True, | |
| "key": "FFmpegMetadata", | |
| "prefer_ffmpeg": True, | |
| "geo_bypass": True, | |
| "outtmpl": f"%(id)s.{ext}", | |
| "logtostderr": False, | |
| "postprocessors": [{"key": "FFmpegMetadata"}], | |
| } | |
| ytdl_data = await dler(event, link, opts, True) | |
| title = ytdl_data["title"] | |
| if ytdl_data.get("artist"): | |
| artist = ytdl_data["artist"] | |
| elif ytdl_data.get("creator"): | |
| artist = ytdl_data["creator"] | |
| elif ytdl_data.get("channel"): | |
| artist = ytdl_data["channel"] | |
| views = numerize(ytdl_data.get("view_count")) or 0 | |
| thumb, _ = await fast_download(ytdl_data["thumbnail"], filename=f"{vid_id}.jpg") | |
| try: | |
| Image.open(thumb).save(thumb, "JPEG") | |
| except Exception as er: | |
| LOGS.exception(er) | |
| thumb = None | |
| description = ( | |
| ytdl_data["description"] | |
| if len(ytdl_data["description"]) < 100 | |
| else ytdl_data["description"][:100] | |
| ) | |
| likes = numerize(ytdl_data.get("like_count")) or 0 | |
| hi, wi = ytdl_data.get("height") or 720, ytdl_data.get("width") or 1280 | |
| duration = ytdl_data.get("duration") or 0 | |
| filepath = f"{vid_id}.mkv" | |
| if not os.path.exists(filepath): | |
| filepath = f"{filepath}.webm" | |
| size = os.path.getsize(filepath) | |
| file, _ = await event.client.fast_uploader( | |
| filepath, | |
| filename=f"{title}.mkv", | |
| show_progress=True, | |
| event=event, | |
| to_delete=True, | |
| ) | |
| attributes = [ | |
| DocumentAttributeVideo( | |
| duration=int(duration), | |
| w=wi, | |
| h=hi, | |
| supports_streaming=True, | |
| ), | |
| ] | |
| description = description if description != "" else "None" | |
| text = f"**Title: [{title}]({_yt_base_url}{vid_id})**\n\n" | |
| text += f"`π Description: {description}\n\n" | |
| text += f"γ Duration: {time_formatter(int(duration)*1000)} γ\n" | |
| text += f"γ Artist: {artist} γ\n" | |
| text += f"γ Views: {views} γ\n" | |
| text += f"γ Likes: {likes} γ\n" | |
| text += f"γ Size: {humanbytes(size)} γ`" | |
| button = Button.switch_inline("Search More", query="yt ", same_peer=True) | |
| try: | |
| await event.edit( | |
| text, | |
| file=file, | |
| buttons=button, | |
| attributes=attributes, | |
| thumb=thumb, | |
| ) | |
| except (FilePartLengthInvalidError, MediaEmptyError): | |
| file = await asst.send_message( | |
| udB.get_key("LOG_CHANNEL"), | |
| text, | |
| file=file, | |
| buttons=button, | |
| attributes=attributes, | |
| thumb=thumb, | |
| ) | |
| await event.edit(text, file=file.media, buttons=button) | |
| await bash(f"rm {vid_id}.jpg") | |
| async def ytdl_back(event): | |
| id_ = event.data_match.group(1).decode("utf-8") | |
| if not BACK_BUTTON.get(id_): | |
| return await event.answer("Query Expired! Search again π") | |
| await event.edit(**BACK_BUTTON[id_]) | |