import asyncio import os import numpy as np from youtubesearchpython import VideosSearch,PlaylistsSearch import uvicorn from fastapi import FastAPI, WebSocket, WebSocketDisconnect,status from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Union,List from fastapi.responses import FileResponse from caesarmusic import CaesarMusic from fastapi.responses import StreamingResponse app = FastAPI() class CaesarAIMusicModel(BaseModel): artist :str album: str album_or_song: str CURRENT_DIR = os.path.realpath(__file__).replace(f"/main.py","").replace(f"\main.py","") download_dir = f"{CURRENT_DIR}/Songs" caesarmusic = CaesarMusic(first_time_running=True) app.add_middleware( CORSMiddleware, allow_origins=["*"], # can alter with time allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def caesaraihome(): return "Welcome to CaesarAIMusic" @app.get("/caesarailogo") def caesarailogo(): return FileResponse(f"{CURRENT_DIR}/CaesarAILogo.png") @app.post("/caesarmusic") def caesaraimusic(musicjson : CaesarAIMusicModel): musicjson = dict(musicjson) artist = musicjson["artist"] album = musicjson["album"] album_or_song = musicjson["album_or_song"] query = f"{artist} {album}" print("Hi") caesarmusic = CaesarMusic() try: if album_or_song == "album": videosSearch = PlaylistsSearch(query,limit=1) playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0] songs = caesarmusic.fetch_playslist_songs(artist,playlisturl) songzipresponse = caesarmusic.caesarmusicextract(songs) if songzipresponse == "No song detected": return {"message":"No song detected"} elif songzipresponse != "No song detected": streamed_songs = caesarmusic.stream_song(songzipresponse) response = StreamingResponse( content=streamed_songs, status_code=status.HTTP_200_OK, media_type="audio/mpeg", ) return response elif album_or_song == "song": songs = caesarmusic.caesarmusicfetch(artist,album) songzipresponse = caesarmusic.caesarmusicextract(songs) if songzipresponse == "No song detected": return {"message":"No song detected"} elif songzipresponse != "No song detected": return songzipresponse except Exception as ex: return {"message":f"{type(ex)},{ex}"} @app.websocket("/caesarmusicws") async def caesarmusicws(websocket: WebSocket): # listen for connections await websocket.accept() try: #caesarmusic.clean_up_dir(download_dir,"mp3") #caesarmusic.clean_up_dir(download_dir,"mp4") while True: musicjson = await websocket.receive_json() artist = musicjson["artist"] album = musicjson["album"] album_or_song = musicjson["album_or_song"] artist = artist.lower().strip() if "youtube" not in album: album = album.lower().strip() query = f"{artist} {album}" songs_downloaded = 0 if album_or_song == "album": if "youtube" not in album: videosSearch = PlaylistsSearch(query,limit=4) resultnum,result = [],[] for playlist in videosSearch.result()["result"]: if "playlist" in playlist["link"]: songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"]) resultnum.append(len(songs)) result.append(songs) biggest_number = max(resultnum) #print(result) songs = result[resultnum.index(biggest_number)] elif "youtube" in album: songs = caesarmusic.fetch_playslist_songs(artist,album) #playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0] #songs = caesarmusic.fetch_playslist_songs(artist,playlisturl) #print(songs[0]) elif album_or_song == "song": if "/watch?v=" not in album: songs = caesarmusic.caesarmusicfetch(artist,album) elif "/watch?v=" in album: songs = [{"link":album}] for song,songtitle in caesarmusic.caesarmusicextractgenerator(songs): if song == "No song detected": await websocket.send_json({"message":"No song detected"}) elif song != "No song detected": #await websocket.send_bytes(song) # sends the buffer as bytes #await websocket.send_json({"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) # sends the buffer as bytes try: songtitlemp4 = songtitle.replace(".mp3",".mp4") if songtitle in os.listdir(download_dir): os.remove(os.path.join(download_dir, songtitle)) elif songtitlemp4 in os.listdir(download_dir): os.remove(os.path.join(download_dir, songtitlemp4)) except RuntimeError as rex: caesarmusic.clean_up_dir(download_dir,"mp3") caesarmusic.clean_up_dir(download_dir,"mp4") await websocket.send_json({"filename":songtitle,"filebase64":song,"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) songs_downloaded += 1 #caesarmusic.clean_up_dir(download_dir,"mp3") #caesarmusic.clean_up_dir(download_dir,"mp4") print({"message":"all songs are downloaded"}) #caesarmusic.first_time_running = False await websocket.send_json({"message":"all songs are downloaded"}) except (ValueError,FileNotFoundError) as vex: response = {"message":"No song detected","error":f"{type(vex)}{vex}"} caesarmusic.clean_up_dir(download_dir,"mp3") caesarmusic.clean_up_dir(download_dir,"mp4") await websocket.send_json(response) except Exception as ex: print("Client disconnected") await websocket.send_json({"message":f"{type(ex)},{ex}"}) @app.get("/caesarmusicsongload/{songname}") async def caesarmusicsongload(songname): try: songfile = FileResponse(f"{download_dir}/{songname}") return songfile except Exception as rrx: return {"message":f"{type(rrx)},{rrx}"} @app.get("/caesarcleanup") async def caesarcleanup(): try: caesarmusic.clean_up_dir(download_dir,"mp3") caesarmusic.clean_up_dir(download_dir,"mp4") return {"message":"directory is clean now"} except Exception as rrx: return {"message":"No need to remove"} async def main(): config = uvicorn.Config("main:app", port=7860, log_level="info",host="0.0.0.0",reload=True) server = uvicorn.Server(config) await server.serve() if __name__ == "__main__": asyncio.run(main())