Spaces:
Runtime error
Runtime error
| import asyncio | |
| import os | |
| import re | |
| import json | |
| import httpx # Make sure to import httpx | |
| from typing import Union | |
| import yt_dlp | |
| from pyrogram.enums import MessageEntityType | |
| from pyrogram.types import Message | |
| from youtubesearchpython.__future__ import VideosSearch | |
| from Devine.utils.database import is_on_off | |
| from Devine.utils.formatters import time_to_seconds | |
| import tempfile | |
| async def shell_cmd(cmd): | |
| proc = await asyncio.create_subprocess_shell( | |
| cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| out, errorz = await proc.communicate() | |
| if errorz: | |
| if "unavailable videos are hidden" in (errorz.decode("utf-8")).lower(): | |
| return out.decode("utf-8") | |
| else: | |
| return errorz.decode("utf-8") | |
| return out.decode("utf-8") | |
| class YouTubeAPI: | |
| def __init__(self): | |
| self.base = "https://www.youtube.com/watch?v=" | |
| self.regex = r"(?:youtube\.com|youtu\.be)" | |
| self.status = "https://www.youtube.com/oembed?url=" | |
| self.listbase = "https://youtube.com/playlist?list=" | |
| self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") | |
| async def exists(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if re.search(self.regex, link): | |
| return True | |
| else: | |
| return False | |
| async def url(self, message_1: Message) -> Union[str, None]: | |
| messages = [message_1] | |
| if message_1.reply_to_message: | |
| messages.append(message_1.reply_to_message) | |
| text = "" | |
| offset = None | |
| length = None | |
| for message in messages: | |
| if offset: | |
| break | |
| if message.entities: | |
| for entity in message.entities: | |
| if entity.type == MessageEntityType.URL: | |
| text = message.text or message.caption | |
| offset, length = entity.offset, entity.length | |
| break | |
| elif message.caption_entities: | |
| for entity in message.caption_entities: | |
| if entity.type == MessageEntityType.TEXT_LINK: | |
| return entity.url | |
| if offset in (None,): | |
| return None | |
| return text[offset : offset + length] | |
| async def details(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| results = VideosSearch(link, limit=1) | |
| for result in (await results.next())["result"]: | |
| title = result["title"] | |
| duration_min = result["duration"] | |
| thumbnail = result["thumbnails"][0]["url"].split("?")[0] | |
| vidid = result["id"] | |
| if str(duration_min) == "None": | |
| duration_sec = 0 | |
| else: | |
| duration_sec = int(time_to_seconds(duration_min)) | |
| return title, duration_min, duration_sec, thumbnail, vidid | |
| async def title(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| results = VideosSearch(link, limit=1) | |
| for result in (await results.next())["result"]: | |
| title = result["title"] | |
| return title | |
| async def duration(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| results = VideosSearch(link, limit=1) | |
| for result in (await results.next())["result"]: | |
| duration = result["duration"] | |
| return duration | |
| async def thumbnail(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| results = VideosSearch(link, limit=1) | |
| for result in (await results.next())["result"]: | |
| thumbnail = result["thumbnails"][0]["url"].split("?")[0] | |
| return thumbnail | |
| async def video(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| proc = await asyncio.create_subprocess_exec( | |
| "yt-dlp", | |
| "-g", | |
| "-f", | |
| "best[height<=?720][width<=?1280]", | |
| f"{link}", | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| stdout, stderr = await proc.communicate() | |
| if stdout: | |
| return 1, stdout.decode().split("\n")[0] | |
| else: | |
| return 0, stderr.decode() | |
| async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.listbase + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| playlist = await shell_cmd( | |
| f"yt-dlp -i --get-id --flat-playlist --playlist-end {limit} --skip-download {link}" | |
| ) | |
| try: | |
| result = playlist.split("\n") | |
| for key in result: | |
| if key == "": | |
| result.remove(key) | |
| except: | |
| result = [] | |
| return result | |
| async def track(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| results = VideosSearch(link, limit=1) | |
| for result in (await results.next())["result"]: | |
| title = result["title"] | |
| duration_min = result["duration"] | |
| vidid = result["id"] | |
| yturl = result["link"] | |
| thumbnail = result["thumbnails"][0]["url"].split("?")[0] | |
| track_details = { | |
| "title": title, | |
| "link": yturl, | |
| "vidid": vidid, | |
| "duration_min": duration_min, | |
| "thumb": thumbnail, | |
| } | |
| return track_details, vidid | |
| async def formats(self, link: str, videoid: Union[bool, str] = None): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| ytdl_opts = {"quiet": True} | |
| ydl = yt_dlp.YoutubeDL(ytdl_opts) | |
| with ydl: | |
| formats_available = [] | |
| r = ydl.extract_info(link, download=False) | |
| for format in r["formats"]: | |
| try: | |
| str(format["format"]) | |
| except: | |
| continue | |
| if not "dash" in str(format["format"]).lower(): | |
| try: | |
| format["format"] | |
| format["filesize"] | |
| format["format_id"] | |
| format["ext"] | |
| format["format_note"] | |
| except: | |
| continue | |
| formats_available.append( | |
| { | |
| "format": format["format"], | |
| "filesize": format["filesize"], | |
| "format_id": format["format_id"], | |
| "ext": format["ext"], | |
| "format_note": format["format_note"], | |
| "yturl": link, | |
| } | |
| ) | |
| return formats_available, link | |
| async def slider( | |
| self, | |
| link: str, | |
| query_type: int, | |
| videoid: Union[bool, str] = None, | |
| ): | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| a = VideosSearch(link, limit=10) | |
| result = (await a.next()).get("result") | |
| title = result[query_type]["title"] | |
| duration_min = result[query_type]["duration"] | |
| vidid = result[query_type]["id"] | |
| thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0] | |
| return title, duration_min, thumbnail, vidid | |
| async def get_video_info_from_ccndev(self, url: str, video: bool): | |
| api_url = "https://ccndev.live" | |
| params = { | |
| "query": url, | |
| "format": "video" if video else "audio", | |
| "download": True, | |
| "api_key": "J5zXWlEG2Cmz5c7cjJHkdoCCYGxtVRBc" | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(api_url, params=params, timeout=150) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| return {"status": "error", "message": "Failed to fetch data from Bitflow API."} | |
| async def download( | |
| self, | |
| link: str, | |
| mystic, | |
| video: Union[bool, str] = None, | |
| videoid: Union[bool, str] = None, | |
| songaudio: Union[bool, str] = None, | |
| songvideo: Union[bool, str] = None, | |
| format_id: Union[bool, str] = None, | |
| title: Union[bool, str] = None, | |
| ) -> str: | |
| if videoid: | |
| link = self.base + link | |
| if "&" in link: | |
| link = link.split("&")[0] | |
| loop = asyncio.get_running_loop() | |
| bitflow_info = await self.get_video_info_from_bitflow(link, video) | |
| def audio_dl(bitflow_info): | |
| temp_dir = tempfile.gettempdir() | |
| xyz = os.path.join(temp_dir, f"{bitflow_info['videoid']}.{bitflow_info['ext']}") | |
| url = bitflow_info['url'] | |
| # If the url is not a YouTube link, download directly | |
| if not (url.startswith('http') and ('youtube.com' in url or 'youtu.be' in url)): | |
| import httpx | |
| with httpx.Client() as client: | |
| r = client.get(url) | |
| with open(xyz, "wb") as f: | |
| f.write(r.content) | |
| return xyz | |
| ydl_optssx = { | |
| "format": "bestaudio/best", | |
| "outtmpl": xyz, | |
| "geo_bypass": True, | |
| "nocheckcertificate": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| } | |
| x = yt_dlp.YoutubeDL(ydl_optssx) | |
| if os.path.exists(xyz): | |
| return xyz | |
| x.download([url]) | |
| return xyz | |
| def video_dl(ccndev_info): | |
| temp_dir = tempfile.gettempdir() | |
| xyz = os.path.join(temp_dir, f"{bitflow_info['videoid']}.{bitflow_info['ext']}") | |
| url = bitflow_info['url'] | |
| # If the url is not a YouTube link, download directly | |
| if not (url.startswith('http') and ('youtube.com' in url or 'youtu.be' in url)): | |
| import httpx | |
| with httpx.Client() as client: | |
| r = client.get(url) | |
| with open(xyz, "wb") as f: | |
| f.write(r.content) | |
| return xyz | |
| ydl_optssx = { | |
| "format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])", | |
| "outtmpl": xyz, | |
| "geo_bypass": True, | |
| "nocheckcertificate": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| } | |
| x = yt_dlp.YoutubeDL(ydl_optssx) | |
| if os.path.exists(xyz): | |
| return xyz | |
| x.download([url]) | |
| return xyz | |
| def song_video_dl(): | |
| temp_dir = tempfile.gettempdir() | |
| fpath = os.path.join(temp_dir, f"{title}") | |
| formats = f"{format_id}+140" | |
| ydl_optssx = { | |
| "format": formats, | |
| "outtmpl": fpath, | |
| "geo_bypass": True, | |
| "nocheckcertificate": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| "prefer_ffmpeg": True, | |
| "merge_output_format": "mp4", | |
| } | |
| x = yt_dlp.YoutubeDL(ydl_optssx) | |
| x.download([link]) | |
| def song_audio_dl(): | |
| temp_dir = tempfile.gettempdir() | |
| fpath = os.path.join(temp_dir, f"{title}.%(ext)s") | |
| ydl_optssx = { | |
| "format": format_id, | |
| "outtmpl": fpath, | |
| "geo_bypass": True, | |
| "nocheckcertificate": True, | |
| "quiet": True, | |
| "no_warnings": True, | |
| "prefer_ffmpeg": True, | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "mp3", | |
| "preferredquality": "192", | |
| } | |
| ], | |
| } | |
| x = yt_dlp.YoutubeDL(ydl_optssx) | |
| x.download([link]) | |
| if songvideo: | |
| await loop.run_in_executor(None, song_video_dl) | |
| temp_dir = tempfile.gettempdir() | |
| fpath = os.path.join(temp_dir, f"{title}.mp4") | |
| return fpath | |
| elif songaudio: | |
| await loop.run_in_executor(None, song_audio_dl) | |
| temp_dir = tempfile.gettempdir() | |
| fpath = os.path.join(temp_dir, f"{title}.mp3") | |
| return fpath | |
| elif video: | |
| direct = True | |
| downloaded_file = await loop.run_in_executor(None, video_dl, bitflow_info) | |
| else: | |
| direct = True | |
| downloaded_file = await loop.run_in_executor(None, audio_dl, bitflow_info) | |
| return downloaded_file, direct |