CaesarMusic / main.py
CaesarCloudSync
add songs
ac062a1
import asyncio
import os
import numpy as np
from youtubesearchpython import VideosSearch,PlaylistsSearch
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect,status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Union,List
from fastapi.responses import FileResponse
from caesarmusic import CaesarMusic
from fastapi.responses import StreamingResponse
app = FastAPI()
class CaesarAIMusicModel(BaseModel):
artist :str
album: str
album_or_song: str
CURRENT_DIR = os.path.realpath(__file__).replace(f"/main.py","").replace(f"\main.py","")
download_dir = f"{CURRENT_DIR}/Songs"
caesarmusic = CaesarMusic(first_time_running=True)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # can alter with time
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def caesaraihome():
return "Welcome to CaesarAIMusic"
@app.get("/caesarailogo")
def caesarailogo():
return FileResponse(f"{CURRENT_DIR}/CaesarAILogo.png")
@app.post("/caesarmusic")
def caesaraimusic(musicjson : CaesarAIMusicModel):
musicjson = dict(musicjson)
artist = musicjson["artist"]
album = musicjson["album"]
album_or_song = musicjson["album_or_song"]
query = f"{artist} {album}"
print("Hi")
caesarmusic = CaesarMusic()
try:
if album_or_song == "album":
videosSearch = PlaylistsSearch(query,limit=1)
playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
songzipresponse = caesarmusic.caesarmusicextract(songs)
if songzipresponse == "No song detected":
return {"message":"No song detected"}
elif songzipresponse != "No song detected":
streamed_songs = caesarmusic.stream_song(songzipresponse)
response = StreamingResponse(
content=streamed_songs,
status_code=status.HTTP_200_OK,
media_type="audio/mpeg",
)
return response
elif album_or_song == "song":
songs = caesarmusic.caesarmusicfetch(artist,album)
songzipresponse = caesarmusic.caesarmusicextract(songs)
if songzipresponse == "No song detected":
return {"message":"No song detected"}
elif songzipresponse != "No song detected":
return songzipresponse
except Exception as ex:
return {"message":f"{type(ex)},{ex}"}
@app.websocket("/caesarmusicws")
async def caesarmusicws(websocket: WebSocket):
# listen for connections
await websocket.accept()
try:
#caesarmusic.clean_up_dir(download_dir,"mp3")
#caesarmusic.clean_up_dir(download_dir,"mp4")
while True:
musicjson = await websocket.receive_json()
artist = musicjson["artist"]
album = musicjson["album"]
album_or_song = musicjson["album_or_song"]
artist = artist.lower().strip()
if "youtube" not in album:
album = album.lower().strip()
query = f"{artist} {album}"
songs_downloaded = 0
if album_or_song == "album":
if "youtube" not in album:
videosSearch = PlaylistsSearch(query,limit=4)
resultnum,result = [],[]
for playlist in videosSearch.result()["result"]:
if "playlist" in playlist["link"]:
songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"])
resultnum.append(len(songs))
result.append(songs)
biggest_number = max(resultnum)
#print(result)
songs = result[resultnum.index(biggest_number)]
elif "youtube" in album:
songs = caesarmusic.fetch_playslist_songs(artist,album)
#playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
#songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
#print(songs[0])
elif album_or_song == "song":
if "/watch?v=" not in album:
songs = caesarmusic.caesarmusicfetch(artist,album)
elif "/watch?v=" in album:
songs = [{"link":album}]
for song,songtitle in caesarmusic.caesarmusicextractgenerator(songs):
if song == "No song detected":
await websocket.send_json({"message":"No song detected"})
elif song != "No song detected":
#await websocket.send_bytes(song) # sends the buffer as bytes
#await websocket.send_json({"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) # sends the buffer as bytes
try:
songtitlemp4 = songtitle.replace(".mp3",".mp4")
if songtitle in os.listdir(download_dir):
os.remove(os.path.join(download_dir, songtitle))
elif songtitlemp4 in os.listdir(download_dir):
os.remove(os.path.join(download_dir, songtitlemp4))
except RuntimeError as rex:
caesarmusic.clean_up_dir(download_dir,"mp3")
caesarmusic.clean_up_dir(download_dir,"mp4")
await websocket.send_json({"filename":songtitle,"filebase64":song,"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"})
songs_downloaded += 1
#caesarmusic.clean_up_dir(download_dir,"mp3")
#caesarmusic.clean_up_dir(download_dir,"mp4")
print({"message":"all songs are downloaded"})
#caesarmusic.first_time_running = False
await websocket.send_json({"message":"all songs are downloaded"})
except (ValueError,FileNotFoundError) as vex:
response = {"message":"No song detected","error":f"{type(vex)}{vex}"}
caesarmusic.clean_up_dir(download_dir,"mp3")
caesarmusic.clean_up_dir(download_dir,"mp4")
await websocket.send_json(response)
except Exception as ex:
print("Client disconnected")
await websocket.send_json({"message":f"{type(ex)},{ex}"})
@app.get("/caesarmusicsongload/{songname}")
async def caesarmusicsongload(songname):
try:
songfile = FileResponse(f"{download_dir}/{songname}")
return songfile
except Exception as rrx:
return {"message":f"{type(rrx)},{rrx}"}
@app.get("/caesarcleanup")
async def caesarcleanup():
try:
caesarmusic.clean_up_dir(download_dir,"mp3")
caesarmusic.clean_up_dir(download_dir,"mp4")
return {"message":"directory is clean now"}
except Exception as rrx:
return {"message":"No need to remove"}
async def main():
config = uvicorn.Config("main:app", port=7860, log_level="info",host="0.0.0.0",reload=True)
server = uvicorn.Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(main())