CaesarMusic / caesarmusic.py
CaesarCloudSync
add songs
3306588
# Download youtube videos in python using pytube library
import os
# paste the YouTube video URL here
import io
import re
import zipfile
from io import BytesIO
from youtubesearchpython import VideosSearch,PlaylistsSearch
from fastapi.responses import StreamingResponse
from moviepy.editor import *
from caesarpytube import YouTube
#from pytube import YouTube
#from caesarpytube import YouTube
from pytube.exceptions import PytubeError
from typing import Generator
#from youtubesearchpython import Playlist
from pytube.contrib.playlist import Playlist
import time
import base64
class CaesarMusic:
def __init__(self,download_dir=f"{os.getcwd()}/Songs",first_time_running=True) -> None:
self.first_time_running = first_time_running
self.download_dir= download_dir
def fetch_playslist_songs(self,artist,playlisturl):
playlist = Playlist(playlisturl)
#songs = [song for song in playlist.videos if artist in song["channel"]["name"].lower()]
#print(songs)
#return songs
return [{"link":url} for url in playlist]
def download_mp4_song(self,url):
# create a YouTube object and get the video stream
youtube = YouTube(url)
song_title = youtube.title + ".mp4"
print(song_title)
video_streams = youtube.streams.filter(res="720p")
print(video_streams)
# set the download path and download the video
download_path = "D:\CaesarAI\CaesarAIAPI\CaesarAI\CaesarAIMusicLoad"
video_streams.first().download(download_path)
def download_mp3_song(self,url,DOWNLOAD_DIR=f"{os.getcwd()}/Songs"):
if "Songs" not in os.listdir(os.getcwd()):
os.mkdir(DOWNLOAD_DIR)
# importing packages
try:
# url input from user
#str(input("Enter the URL of the video you want to download: \n>> "))
#print("Starting...")
yt = YouTube(url,use_oauth=True, allow_oauth_cache=True)
if self.first_time_running == True:
#time.sleep(40)
self.first_time_running = False
try:
print('Downloading : {} with url : {}'.format(yt.title, yt.watch_url))
# extract only audio
video = yt.streams.filter(only_audio=True).first()
# check for destination to save file
#print("Enter the destination (leave blank for current directory)")
# download the file
out_file = video.download(output_path=DOWNLOAD_DIR)
# save the file
base, ext = os.path.splitext(out_file)
new_file = base + '.mp3'
os.rename(out_file, new_file)
# result of success
#print(yt.title + " has been successfully downloaded.")
except (RuntimeError,FileNotFoundError) as rex:
print(rex)
self.clean_up_dir(download_dir,"mp3")
self.clean_up_dir(download_dir,"mp4")
pass
return new_file#yt.title
except (FileExistsError,Exception) as fex:
print(type(fex),fex)
def zipfiles(self,filenames):
iosongs = BytesIO()
zip_filename = "songs.zip"
with zipfile.ZipFile(iosongs, 'w') as zipMe:
for f in filenames:
if "mp3" in f:
zipMe.write(f, compress_type=zipfile.ZIP_DEFLATED)
zipMe.close()
return StreamingResponse(
iter([iosongs.getvalue()]),
media_type="application/x-zip-compressed",
headers = { "Content-Disposition":f"attachment;filename=%s" % zip_filename}
)
def clean_up_dir(self,download_dir,ext=None):
try:
if ext:
filelist = [i for i in os.listdir(download_dir) if ext in i]
elif not ext:
filelist =list(os.listdir(download_dir))
for f in filelist:
os.remove(os.path.join(download_dir, f))
except FileNotFoundError as fex:
pass
def caesarmusicfetch(self,artist,album):
self.clean_up_dir(self.download_dir)
artist = artist.lower().strip()
album = album.lower().strip()
query = f"{artist} {album}"
videosSearch = VideosSearch(query,limit=4)
videos = []
for video in videosSearch.result()["result"]:
if artist in video["channel"]["name"].lower():
if album in video["title"].lower():
videos.append(video)
videosunique = []
[videosunique.append(item) for item in videos if item not in videosunique]
return videosunique
def caesarmusicextract(self,videos):
songtitles = []
if videos != []:
for video in videos:
while True:
try:
songtitle = self.download_mp3_song(video["link"],self.download_dir)
songtitles.append(songtitle)
break
except PytubeError as pex:
continue
songtitles= [f"Songs/{i}" for i in os.listdir(self.download_dir) if "mp3" in i]
songzipresponse = self.zipfiles(songtitles)
self.clean_up_dir(self.download_dir,"mp3")
self.clean_up_dir(self.download_dir,"mp4")
return songzipresponse
elif videos == []:
return "No song detected"
def caesarmusicextractgenerator(self,videos) -> Generator:
songtitles = []
if videos != []:
for video in videos:
while True:
try:
songtitle = self.download_mp3_song(video["link"],self.download_dir)
songtitles.append(songtitle)
break
except PytubeError as pex:
continue
#print(songtitle)
try:
with open(f'{songtitle}', 'rb') as f:
data = f.read()
encoded_string = base64.b64encode(data).decode("utf-8")
yield encoded_string,songtitle.replace(f"{os.getcwd()}/Songs\\","").replace(f"{os.getcwd()}/Songs/","") #data
except FileNotFoundError as fex:
print(fex)
continue
#songtitles= [f"Songs/{i}" for i in os.listdir(self.download_dir) if "mp3" in i]
#songzipresponse = self.zipfiles(songtitles)
#self.clean_up_dir(self.download_dir,"mp3")
#self.clean_up_dir(self.download_dir,"mp4")
elif videos == []:
yield "No song detected"
if __name__ == "__main__":
def test2():
artist = artist.lower().strip()
album = album.lower().strip()
query = f"{artist} {album}"
caesarmusic = CaesarMusic()
videosSearch = PlaylistsSearch(query,limit=4)
#print(videosSearch.result()["result"][1])
resultnum,result = [],[]
for playlist in videosSearch.result()["result"]:
if "playlist" in playlist["link"]:
songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"])
print(songs)
resultnum.append(len(songs))
result.append(songs)
biggest_number = max(resultnum)
#songs = result[resultnum.index(biggest_number)]
#for song in caesarmusic.caesarmusicextractgenerator(songs):
# pass
#playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
#songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
#print(len(songs))
#print(songs[0])
#for song in caesarmusic.caesarmusicextractgenerator(songs):
# pass
#print(song)
download_dir = f"{os.getcwd()}/Songs"
artist = "brent faiyaz"
album = "sonder son"
album_or_song = "album"
query = f"{artist} {album}"
caesarmusic = CaesarMusic()
if album_or_song == "album":
videosSearch = PlaylistsSearch(query,limit=1)
playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
#print(songs[0])
songzipresponse = caesarmusic.caesarmusicextract(songs)
if songzipresponse == "No song detected":
print({"message":"No song detected"})
elif songzipresponse != "No song detected":
print(songzipresponse)
elif album_or_song == "song":
songs = caesarmusic.caesarmusicfetch(artist,album)
#print(songs)
songzipresponse = caesarmusic.caesarmusicextract(songs)
if songzipresponse == "No song detected":
print({"message":"No song detected"})
elif songzipresponse != "No song detected":
print(songzipresponse)