File size: 7,500 Bytes
b467181
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ef5372
 
 
 
 
 
 
 
 
b5ebba0
9ef5372
 
b467181
 
 
b5ebba0
 
b467181
f59f0aa
b5ebba0
b467181
 
 
 
ac062a1
b467181
ac062a1
 
b467181
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import asyncio
import os
import numpy as np
from youtubesearchpython import VideosSearch,PlaylistsSearch
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect,status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Union,List
from fastapi.responses import FileResponse
from caesarmusic import CaesarMusic
from fastapi.responses import StreamingResponse
app = FastAPI()
class CaesarAIMusicModel(BaseModel):
    artist :str
    album: str
    album_or_song: str
CURRENT_DIR = os.path.realpath(__file__).replace(f"/main.py","").replace(f"\main.py","")
download_dir = f"{CURRENT_DIR}/Songs"

caesarmusic = CaesarMusic(first_time_running=True)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # can alter with time
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
def caesaraihome():
    return "Welcome to CaesarAIMusic"
@app.get("/caesarailogo")
def caesarailogo():
    return FileResponse(f"{CURRENT_DIR}/CaesarAILogo.png")


@app.post("/caesarmusic")
def caesaraimusic(musicjson : CaesarAIMusicModel):
    musicjson = dict(musicjson)
    artist = musicjson["artist"]
    album = musicjson["album"]
    album_or_song = musicjson["album_or_song"]
    query = f"{artist} {album}"
    print("Hi")
    caesarmusic = CaesarMusic()
    try:
        if album_or_song == "album":
            videosSearch = PlaylistsSearch(query,limit=1)
            playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
            songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
            songzipresponse = caesarmusic.caesarmusicextract(songs)
            if songzipresponse == "No song detected":
                return {"message":"No song detected"}
            elif songzipresponse != "No song detected":
                streamed_songs = caesarmusic.stream_song(songzipresponse)
                response = StreamingResponse(
                        content=streamed_songs,
                        status_code=status.HTTP_200_OK,
                        media_type="audio/mpeg",
                    )
                return response
        elif album_or_song == "song":
            songs = caesarmusic.caesarmusicfetch(artist,album)
            songzipresponse = caesarmusic.caesarmusicextract(songs)
            if songzipresponse == "No song detected":
                return {"message":"No song detected"}
            elif songzipresponse != "No song detected":
                return songzipresponse
    except Exception as ex:
        return {"message":f"{type(ex)},{ex}"}
@app.websocket("/caesarmusicws")
async def caesarmusicws(websocket: WebSocket):
    # listen for connections
    await websocket.accept()

    try:
        #caesarmusic.clean_up_dir(download_dir,"mp3")
        #caesarmusic.clean_up_dir(download_dir,"mp4")
        while True:
            musicjson = await websocket.receive_json()
            artist = musicjson["artist"]
            album = musicjson["album"]
            album_or_song = musicjson["album_or_song"]

            artist = artist.lower().strip()
            if "youtube" not in album:
                album = album.lower().strip()
            query = f"{artist} {album}"
            songs_downloaded = 0
            if album_or_song == "album":
                if "youtube" not in album:
                    videosSearch = PlaylistsSearch(query,limit=4)
                    resultnum,result = [],[]
                    for playlist in videosSearch.result()["result"]:
                        if "playlist" in playlist["link"]:
                            songs = caesarmusic.fetch_playslist_songs(artist,playlist["link"])
                            resultnum.append(len(songs))
                            result.append(songs)         
                    biggest_number = max(resultnum)
                    #print(result)
                    songs = result[resultnum.index(biggest_number)]
                elif "youtube" in album:
                    songs = caesarmusic.fetch_playslist_songs(artist,album)
                #playlisturl = [playlist["link"] for playlist in videosSearch.result()["result"] if "playlist" in playlist["link"]][0]
                #songs = caesarmusic.fetch_playslist_songs(artist,playlisturl)
                #print(songs[0])
            elif album_or_song == "song":
                if "/watch?v=" not in album:
                    songs = caesarmusic.caesarmusicfetch(artist,album)
                elif "/watch?v=" in album:
                    songs = [{"link":album}]
            for song,songtitle in caesarmusic.caesarmusicextractgenerator(songs):
                if song == "No song detected":
                    await websocket.send_json({"message":"No song detected"})
                elif song != "No song detected":
                    #await websocket.send_bytes(song) # sends the buffer as bytes
                    #await websocket.send_json({"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"}) # sends the buffer as bytes
                    try:
                        songtitlemp4 = songtitle.replace(".mp3",".mp4")
                        if songtitle in os.listdir(download_dir):
                            os.remove(os.path.join(download_dir, songtitle))
                        elif songtitlemp4 in os.listdir(download_dir):
                            os.remove(os.path.join(download_dir, songtitlemp4))
                    except RuntimeError as rex:
                        caesarmusic.clean_up_dir(download_dir,"mp3")
                        caesarmusic.clean_up_dir(download_dir,"mp4")
                    


                    await websocket.send_json({"filename":songtitle,"filebase64":song,"message":f"{songs_downloaded}/{len(songs)} - {round((songs_downloaded/len(songs))*100,2)}%"})

                    songs_downloaded += 1 
            #caesarmusic.clean_up_dir(download_dir,"mp3")
            #caesarmusic.clean_up_dir(download_dir,"mp4")
            print({"message":"all songs are downloaded"})
            

            #caesarmusic.first_time_running = False
            
            await websocket.send_json({"message":"all songs are downloaded"})

    except (ValueError,FileNotFoundError) as vex:
        response = {"message":"No song detected","error":f"{type(vex)}{vex}"}
        caesarmusic.clean_up_dir(download_dir,"mp3")
        caesarmusic.clean_up_dir(download_dir,"mp4")
        await websocket.send_json(response)
    except Exception as ex:
        print("Client disconnected")
        await websocket.send_json({"message":f"{type(ex)},{ex}"})
@app.get("/caesarmusicsongload/{songname}")
async def caesarmusicsongload(songname):
    try:
        songfile = FileResponse(f"{download_dir}/{songname}")
        return songfile
    except Exception as rrx:
        return {"message":f"{type(rrx)},{rrx}"}

@app.get("/caesarcleanup")
async def caesarcleanup():
    try:
        caesarmusic.clean_up_dir(download_dir,"mp3")
        caesarmusic.clean_up_dir(download_dir,"mp4")
        return {"message":"directory is clean now"}
    except Exception as rrx:
        return {"message":"No need to remove"}


async def main():
    config = uvicorn.Config("main:app", port=7860, log_level="info",host="0.0.0.0",reload=True)
    server = uvicorn.Server(config)
    await server.serve()

if __name__ == "__main__":
    asyncio.run(main())