Spaces:
Sleeping
Sleeping
| # Download youtube videos in python using pytube library | |
| import os | |
| # paste the YouTube video URL here | |
| import io | |
| import re | |
| import zipfile | |
| from io import BytesIO | |
| from youtubesearchpython import VideosSearch,PlaylistsSearch | |
| from fastapi.responses import StreamingResponse | |
| from moviepy.editor import * | |
| from caesarpytube import YouTube | |
| #from pytube import YouTube | |
| #from caesarpytube import YouTube | |
| from pytube.exceptions import PytubeError | |
| from typing import Generator | |
| #from youtubesearchpython import Playlist | |
| from pytube.contrib.playlist import Playlist | |
| import time | |
| import base64 | |
| class CaesarMusic: | |
| def __init__(self,download_dir=f"{os.getcwd()}/Songs",first_time_running=True) -> None: | |
| self.first_time_running = first_time_running | |
| self.download_dir= download_dir | |
| def fetch_playslist_songs(self,artist,playlisturl): | |
| playlist = Playlist(playlisturl) | |
| #songs = [song for song in playlist.videos if artist in song["channel"]["name"].lower()] | |
| #print(songs) | |
| #return songs | |
| return [{"link":url} for url in playlist] | |
| def download_mp4_song(self,url): | |
| # create a YouTube object and get the video stream | |
| youtube = YouTube(url) | |
| song_title = youtube.title + ".mp4" | |
| print(song_title) | |
| video_streams = youtube.streams.filter(res="720p") | |
| print(video_streams) | |
| # set the download path and download the video | |
| download_path = "D:\CaesarAI\CaesarAIAPI\CaesarAI\CaesarAIMusicLoad" | |
| video_streams.first().download(download_path) | |
| def download_mp3_song(self,url,DOWNLOAD_DIR=f"{os.getcwd()}/Songs"): | |
| if "Songs" not in os.listdir(os.getcwd()): | |
| os.mkdir(DOWNLOAD_DIR) | |
| # importing packages | |
| try: | |
| # url input from user | |
| #str(input("Enter the URL of the video you want to download: \n>> ")) | |
| #print("Starting...") | |
| yt = YouTube(url,use_oauth=True, allow_oauth_cache=True) | |
| if self.first_time_running == True: | |
| #time.sleep(40) | |
| self.first_time_running = False | |
| try: | |
| print('Downloading : {} with url : {}'.format(yt.title, yt.watch_url)) | |
| # extract only audio | |
| video = yt.streams.filter(only_audio=True).first() | |
| # check for destination to save file | |
| #print("Enter the destination (leave blank for current directory)") | |
| # download the file | |
| out_file = video.download(output_path=DOWNLOAD_DIR) | |
| # save the file | |
| base, ext = os.path.splitext(out_file) | |
| new_file = base + '.mp3' | |
| os.rename(out_file, new_file) | |
| # result of success | |
| #print(yt.title + " has been successfully downloaded.") | |
| except (RuntimeError,FileNotFoundError) as rex: | |
| print(rex) | |
| self.clean_up_dir(download_dir,"mp3") | |
| self.clean_up_dir(download_dir,"mp4") | |
| pass | |
| return new_file#yt.title | |
| except (FileExistsError,Exception) as fex: | |
| print(type(fex),fex) | |
| def zipfiles(self,filenames): | |
| iosongs = BytesIO() | |
| zip_filename = "songs.zip" | |
| with zipfile.ZipFile(iosongs, 'w') as zipMe: | |
| for f in filenames: | |
| if "mp3" in f: | |
| zipMe.write(f, compress_type=zipfile.ZIP_DEFLATED) | |
| zipMe.close() | |
| return StreamingResponse( | |
| iter([iosongs.getvalue()]), | |
| media_type="application/x-zip-compressed", | |
| headers = { "Content-Disposition":f"attachment;filename=%s" % zip_filename} | |
| ) | |
| def clean_up_dir(self,download_dir,ext=None): | |
| try: | |
| if ext: | |
| filelist = [i for i in os.listdir(download_dir) if ext in i] | |
| elif not ext: | |
| filelist =list(os.listdir(download_dir)) | |
| for f in filelist: | |
| os.remove(os.path.join(download_dir, f)) | |
| except FileNotFoundError as fex: | |
| pass | |
| def caesarmusicfetch(self,artist,album): | |
| self.clean_up_dir(self.download_dir) | |
| artist = artist.lower().strip() | |
| album = album.lower().strip() | |
| query = f"{artist} {album}" | |
| videosSearch = VideosSearch(query,limit=4) | |
| videos = [] | |
| for video in videosSearch.result()["result"]: | |
| if artist in video["channel"]["name"].lower(): | |
| if album in video["title"].lower(): | |
| videos.append(video) | |
| videosunique = [] | |
| [videosunique.append(item) for item in videos if item not in videosunique] | |
| return videosunique | |
| def caesarmusicextract(self,videos): | |
| songtitles = [] | |
| if videos != []: | |
| for video in videos: | |
| while True: | |
| try: | |
| songtitle = self.download_mp3_song(video["link"],self.download_dir) | |
| songtitles.append(songtitle) | |
| break | |
| except PytubeError as pex: | |
| continue | |
| songtitles= [f"Songs/{i}" for i in os.listdir(self.download_dir) if "mp3" in i] | |
| songzipresponse = self.zipfiles(songtitles) | |
| self.clean_up_dir(self.download_dir,"mp3") | |
| self.clean_up_dir(self.download_dir,"mp4") | |
| return songzipresponse | |
| elif videos == []: | |
| return "No song detected" | |
| def caesarmusicextractgenerator(self,videos) -> Generator: | |
| songtitles = [] | |
| if videos != []: | |
| for video in videos: | |
| while True: | |
| try: | |
| songtitle = self.download_mp3_song(video["link"],self.download_dir) | |
| songtitles.append(songtitle) | |
| break | |
| except PytubeError as pex: | |
| continue | |
| #print(songtitle) | |
| try: | |
| with open(f'{songtitle}', 'rb') as f: | |
| data = f.read() | |
| encoded_string = base64.b64encode(data).decode("utf-8") | |
| yield encoded_string,songtitle.replace(f"{os.getcwd()}/Songs\\","").replace(f"{os.getcwd()}/Songs/","") #data | |
| except FileNotFoundError as fex: | |
| print(fex) | |
| continue | |
| #songtitles= [f"Songs/{i}" for i in os.listdir(self.download_dir) if "mp3" in i] | |
| #songzipresponse = self.zipfiles(songtitles) | |
| #self.clean_up_dir(self.download_dir,"mp3") | |
| #self.clean_up_dir(self.download_dir,"mp4") | |
| elif videos == []: | |
| yield "No song detected" | |
| if __name__ == "__main__": | |
| def test2(): | |
| artist = artist.lower().strip() | |
| album = album.lower().strip() | |
| query = f"{artist} {album}" | |
| caesarmusic = CaesarMusic() | |
| videosSearch = PlaylistsSearch(query,limit=4) | |
| #print(videosSearch.result()["result"][1]) | |
| resultnum,result = [],[] | |
| for playlist in videosSearch.result()["result"]: | |
| if "playlist" in playlist["link"]: | |
| songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"]) | |
| print(songs) | |
| resultnum.append(len(songs)) | |
| result.append(songs) | |
| biggest_number = max(resultnum) | |
| #songs = result[resultnum.index(biggest_number)] | |
| #for song in caesarmusic.caesarmusicextractgenerator(songs): | |
| # pass | |
| #playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0] | |
| #songs = caesarmusic.fetch_playslist_songs(artist,playlisturl) | |
| #print(len(songs)) | |
| #print(songs[0]) | |
| #for song in caesarmusic.caesarmusicextractgenerator(songs): | |
| # pass | |
| #print(song) | |
| download_dir = f"{os.getcwd()}/Songs" | |
| artist = "brent faiyaz" | |
| album = "sonder son" | |
| album_or_song = "album" | |
| query = f"{artist} {album}" | |
| caesarmusic = CaesarMusic() | |
| if album_or_song == "album": | |
| videosSearch = PlaylistsSearch(query,limit=1) | |
| playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0] | |
| songs = caesarmusic.fetch_playslist_songs(artist,playlisturl) | |
| #print(songs[0]) | |
| songzipresponse = caesarmusic.caesarmusicextract(songs) | |
| if songzipresponse == "No song detected": | |
| print({"message":"No song detected"}) | |
| elif songzipresponse != "No song detected": | |
| print(songzipresponse) | |
| elif album_or_song == "song": | |
| songs = caesarmusic.caesarmusicfetch(artist,album) | |
| #print(songs) | |
| songzipresponse = caesarmusic.caesarmusicextract(songs) | |
| if songzipresponse == "No song detected": | |
| print({"message":"No song detected"}) | |
| elif songzipresponse != "No song detected": | |
| print(songzipresponse) | |