Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import numpy as np | |
| from youtubesearchpython import VideosSearch,PlaylistsSearch | |
| import uvicorn | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect,status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Union,List | |
| from fastapi.responses import FileResponse | |
| from caesarmusic import CaesarMusic | |
| from fastapi.responses import StreamingResponse | |
| app = FastAPI() | |
| class CaesarAIMusicModel(BaseModel): | |
| artist :str | |
| album: str | |
| album_or_song: str | |
| CURRENT_DIR = os.path.realpath(__file__).replace(f"/main.py","").replace(f"\main.py","") | |
| download_dir = f"{CURRENT_DIR}/Songs" | |
| caesarmusic = CaesarMusic(first_time_running=True) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # can alter with time | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def caesaraihome(): | |
| return "Welcome to CaesarAIMusic" | |
| def caesarailogo(): | |
| return FileResponse(f"{CURRENT_DIR}/CaesarAILogo.png") | |
| def caesaraimusic(musicjson : CaesarAIMusicModel): | |
| musicjson = dict(musicjson) | |
| artist = musicjson["artist"] | |
| album = musicjson["album"] | |
| album_or_song = musicjson["album_or_song"] | |
| query = f"{artist} {album}" | |
| print("Hi") | |
| caesarmusic = CaesarMusic() | |
| try: | |
| if album_or_song == "album": | |
| videosSearch = PlaylistsSearch(query,limit=1) | |
| playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0] | |
| songs = caesarmusic.fetch_playslist_songs(artist,playlisturl) | |
| songzipresponse = caesarmusic.caesarmusicextract(songs) | |
| if songzipresponse == "No song detected": | |
| return {"message":"No song detected"} | |
| elif songzipresponse != "No song detected": | |
| streamed_songs = caesarmusic.stream_song(songzipresponse) | |
| response = StreamingResponse( | |
| content=streamed_songs, | |
| status_code=status.HTTP_200_OK, | |
| media_type="audio/mpeg", | |
| ) | |
| return response | |
| elif album_or_song == "song": | |
| songs = caesarmusic.caesarmusicfetch(artist,album) | |
| songzipresponse = caesarmusic.caesarmusicextract(songs) | |
| if songzipresponse == "No song detected": | |
| return {"message":"No song detected"} | |
| elif songzipresponse != "No song detected": | |
| return songzipresponse | |
| except Exception as ex: | |
| return {"message":f"{type(ex)},{ex}"} | |
| async def caesarmusicws(websocket: WebSocket): | |
| # listen for connections | |
| await websocket.accept() | |
| try: | |
| #caesarmusic.clean_up_dir(download_dir,"mp3") | |
| #caesarmusic.clean_up_dir(download_dir,"mp4") | |
| while True: | |
| musicjson = await websocket.receive_json() | |
| artist = musicjson["artist"] | |
| album = musicjson["album"] | |
| album_or_song = musicjson["album_or_song"] | |
| artist = artist.lower().strip() | |
| if "youtube" not in album: | |
| album = album.lower().strip() | |
| query = f"{artist} {album}" | |
| songs_downloaded = 0 | |
| if album_or_song == "album": | |
| if "youtube" not in album: | |
| videosSearch = PlaylistsSearch(query,limit=4) | |
| resultnum,result = [],[] | |
| for playlist in videosSearch.result()["result"]: | |
| if "playlist" in playlist["link"]: | |
| songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"]) | |
| resultnum.append(len(songs)) | |
| result.append(songs) | |
| biggest_number = max(resultnum) | |
| #print(result) | |
| songs = result[resultnum.index(biggest_number)] | |
| elif "youtube" in album: | |
| songs = caesarmusic.fetch_playslist_songs(artist,album) | |
| #playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0] | |
| #songs = caesarmusic.fetch_playslist_songs(artist,playlisturl) | |
| #print(songs[0]) | |
| elif album_or_song == "song": | |
| if "/watch?v=" not in album: | |
| songs = caesarmusic.caesarmusicfetch(artist,album) | |
| elif "/watch?v=" in album: | |
| songs = [{"link":album}] | |
| for song,songtitle in caesarmusic.caesarmusicextractgenerator(songs): | |
| if song == "No song detected": | |
| await websocket.send_json({"message":"No song detected"}) | |
| elif song != "No song detected": | |
| #await websocket.send_bytes(song) # sends the buffer as bytes | |
| #await websocket.send_json({"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) # sends the buffer as bytes | |
| try: | |
| songtitlemp4 = songtitle.replace(".mp3",".mp4") | |
| if songtitle in os.listdir(download_dir): | |
| os.remove(os.path.join(download_dir, songtitle)) | |
| elif songtitlemp4 in os.listdir(download_dir): | |
| os.remove(os.path.join(download_dir, songtitlemp4)) | |
| except RuntimeError as rex: | |
| caesarmusic.clean_up_dir(download_dir,"mp3") | |
| caesarmusic.clean_up_dir(download_dir,"mp4") | |
| await websocket.send_json({"filename":songtitle,"filebase64":song,"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) | |
| songs_downloaded += 1 | |
| #caesarmusic.clean_up_dir(download_dir,"mp3") | |
| #caesarmusic.clean_up_dir(download_dir,"mp4") | |
| print({"message":"all songs are downloaded"}) | |
| #caesarmusic.first_time_running = False | |
| await websocket.send_json({"message":"all songs are downloaded"}) | |
| except (ValueError,FileNotFoundError) as vex: | |
| response = {"message":"No song detected","error":f"{type(vex)}{vex}"} | |
| caesarmusic.clean_up_dir(download_dir,"mp3") | |
| caesarmusic.clean_up_dir(download_dir,"mp4") | |
| await websocket.send_json(response) | |
| except Exception as ex: | |
| print("Client disconnected") | |
| await websocket.send_json({"message":f"{type(ex)},{ex}"}) | |
| async def caesarmusicsongload(songname): | |
| try: | |
| songfile = FileResponse(f"{download_dir}/{songname}") | |
| return songfile | |
| except Exception as rrx: | |
| return {"message":f"{type(rrx)},{rrx}"} | |
| async def caesarcleanup(): | |
| try: | |
| caesarmusic.clean_up_dir(download_dir,"mp3") | |
| caesarmusic.clean_up_dir(download_dir,"mp4") | |
| return {"message":"directory is clean now"} | |
| except Exception as rrx: | |
| return {"message":"No need to remove"} | |
| async def main(): | |
| config = uvicorn.Config("main:app", port=7860, log_level="info",host="0.0.0.0",reload=True) | |
| server = uvicorn.Server(config) | |
| await server.serve() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |