Spaces:
Sleeping
Sleeping
File size: 7,500 Bytes
b467181 9ef5372 b5ebba0 9ef5372 b467181 b5ebba0 b467181 f59f0aa b5ebba0 b467181 ac062a1 b467181 ac062a1 b467181 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import asyncio
import os
import numpy as np
from youtubesearchpython import VideosSearch,PlaylistsSearch
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect,status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Union,List
from fastapi.responses import FileResponse
from caesarmusic import CaesarMusic
from fastapi.responses import StreamingResponse
app = FastAPI()
class CaesarAIMusicModel(BaseModel):
artist :str
album: str
album_or_song: str
CURRENT_DIR = os.path.realpath(__file__).replace(f"/main.py","").replace(f"\main.py","")
download_dir = f"{CURRENT_DIR}/Songs"
caesarmusic = CaesarMusic(first_time_running=True)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # can alter with time
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def caesaraihome():
return "Welcome to CaesarAIMusic"
@app.get("/caesarailogo")
def caesarailogo():
return FileResponse(f"{CURRENT_DIR}/CaesarAILogo.png")
@app.post("/caesarmusic")
def caesaraimusic(musicjson : CaesarAIMusicModel):
musicjson = dict(musicjson)
artist = musicjson["artist"]
album = musicjson["album"]
album_or_song = musicjson["album_or_song"]
query = f"{artist} {album}"
print("Hi")
caesarmusic = CaesarMusic()
try:
if album_or_song == "album":
videosSearch = PlaylistsSearch(query,limit=1)
playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
songzipresponse = caesarmusic.caesarmusicextract(songs)
if songzipresponse == "No song detected":
return {"message":"No song detected"}
elif songzipresponse != "No song detected":
streamed_songs = caesarmusic.stream_song(songzipresponse)
response = StreamingResponse(
content=streamed_songs,
status_code=status.HTTP_200_OK,
media_type="audio/mpeg",
)
return response
elif album_or_song == "song":
songs = caesarmusic.caesarmusicfetch(artist,album)
songzipresponse = caesarmusic.caesarmusicextract(songs)
if songzipresponse == "No song detected":
return {"message":"No song detected"}
elif songzipresponse != "No song detected":
return songzipresponse
except Exception as ex:
return {"message":f"{type(ex)},{ex}"}
@app.websocket("/caesarmusicws")
async def caesarmusicws(websocket: WebSocket):
# listen for connections
await websocket.accept()
try:
#caesarmusic.clean_up_dir(download_dir,"mp3")
#caesarmusic.clean_up_dir(download_dir,"mp4")
while True:
musicjson = await websocket.receive_json()
artist = musicjson["artist"]
album = musicjson["album"]
album_or_song = musicjson["album_or_song"]
artist = artist.lower().strip()
if "youtube" not in album:
album = album.lower().strip()
query = f"{artist} {album}"
songs_downloaded = 0
if album_or_song == "album":
if "youtube" not in album:
videosSearch = PlaylistsSearch(query,limit=4)
resultnum,result = [],[]
for playlist in videosSearch.result()["result"]:
if "playlist" in playlist["link"]:
songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"])
resultnum.append(len(songs))
result.append(songs)
biggest_number = max(resultnum)
#print(result)
songs = result[resultnum.index(biggest_number)]
elif "youtube" in album:
songs = caesarmusic.fetch_playslist_songs(artist,album)
#playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
#songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
#print(songs[0])
elif album_or_song == "song":
if "/watch?v=" not in album:
songs = caesarmusic.caesarmusicfetch(artist,album)
elif "/watch?v=" in album:
songs = [{"link":album}]
for song,songtitle in caesarmusic.caesarmusicextractgenerator(songs):
if song == "No song detected":
await websocket.send_json({"message":"No song detected"})
elif song != "No song detected":
#await websocket.send_bytes(song) # sends the buffer as bytes
#await websocket.send_json({"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) # sends the buffer as bytes
try:
songtitlemp4 = songtitle.replace(".mp3",".mp4")
if songtitle in os.listdir(download_dir):
os.remove(os.path.join(download_dir, songtitle))
elif songtitlemp4 in os.listdir(download_dir):
os.remove(os.path.join(download_dir, songtitlemp4))
except RuntimeError as rex:
caesarmusic.clean_up_dir(download_dir,"mp3")
caesarmusic.clean_up_dir(download_dir,"mp4")
await websocket.send_json({"filename":songtitle,"filebase64":song,"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"})
songs_downloaded += 1
#caesarmusic.clean_up_dir(download_dir,"mp3")
#caesarmusic.clean_up_dir(download_dir,"mp4")
print({"message":"all songs are downloaded"})
#caesarmusic.first_time_running = False
await websocket.send_json({"message":"all songs are downloaded"})
except (ValueError,FileNotFoundError) as vex:
response = {"message":"No song detected","error":f"{type(vex)}{vex}"}
caesarmusic.clean_up_dir(download_dir,"mp3")
caesarmusic.clean_up_dir(download_dir,"mp4")
await websocket.send_json(response)
except Exception as ex:
print("Client disconnected")
await websocket.send_json({"message":f"{type(ex)},{ex}"})
@app.get("/caesarmusicsongload/{songname}")
async def caesarmusicsongload(songname):
try:
songfile = FileResponse(f"{download_dir}/{songname}")
return songfile
except Exception as rrx:
return {"message":f"{type(rrx)},{rrx}"}
@app.get("/caesarcleanup")
async def caesarcleanup():
try:
caesarmusic.clean_up_dir(download_dir,"mp3")
caesarmusic.clean_up_dir(download_dir,"mp4")
return {"message":"directory is clean now"}
except Exception as rrx:
return {"message":"No need to remove"}
async def main():
config = uvicorn.Config("main:app", port=7860, log_level="info",host="0.0.0.0",reload=True)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(main())
|