import asyncio from fastapi import FastAPI, WebSocket from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import uvicorn import json from transformers import pipeline from collections import deque from collections import defaultdict import math import sys import random import os from fastapi.middleware.cors import CORSMiddleware sys.path.append(".") app = FastAPI() reset_timer = asyncio.Event() # app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") app.add_middleware( CORSMiddleware, allow_origins=["*"], # Bisa diubah ke domain spesifik untuk keamanan allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) emotion_classifier = pipeline( "zero-shot-classification", model="MarfinF/marfin_emotion", force_download=True ) clients = {} chat_history = deque(maxlen=4) mood_to_genre = { "senang": { "genre": "pop", "word": "Poppin pops!" }, "sedih": { "genre": "acoustic", "word": "Playing acoustic" }, "marah": { "genre": "rock", "word": "Rock 'n play!!" }, "cinta": { "genre": "romantic", "word": "Playing romantic" }, "chill": { "genre": "chill", "word": "Chill!" } } genre_to_song = { "pop": ["https://sounds.pond5.com/inspirational-motivational-uplifting-acoustic-positive-music-102770115_nw_prev.m4a"], "acoustic": ["https://sounds.pond5.com/sad-piano-music-063450274_nw_prev.m4a"], "rock": ["https://sounds.pond5.com/powerful-gritty-action-extreme-rock-music-136693652_nw_prev.m4a"], "romantic": ["https://sounds.pond5.com/bloom-sweet-tender-delicate-romantic-music-158563013_nw_prev.m4a"], "chill": ["https://sounds.pond5.com/fall-chill-music-088328584_nw_prev.m4a"] } def detect_emotion(text): labels = ["marah", "sedih", "senang", "cinta"] result = emotion_classifier(text, candidate_labels=labels) top_emotion = result['labels'][0] top_scores = result['scores'][0] return top_emotion, top_scores def get_recommendations_by_mood(genre): songs = genre_to_song.get(genre, []) random.shuffle(songs) return songs[:3] # Return top 3 shuffled songs def softmax(scores): exp_scores = [math.exp(score) for score in scores] total = sum(exp_scores) return [exp_score / total for exp_score in exp_scores] # 🔹 Broadcast User List async def broadcast_user_list(): user_list = list(clients.keys()) message = json.dumps({ "type": "user_list", "users": user_list }) for client in clients.values(): await client.send_text(message) # 🔹 Periodic Music Recommender every 30 seconds async def periodic_recommendation(): while True: user_list = list(clients.keys()) if len(user_list) >= 1: sleep_task = asyncio.create_task(asyncio.sleep(60)) # Start sleep try: await asyncio.wait_for(sleep_task, timeout=60) # Wait 60s unless interrupted except asyncio.TimeoutError: pass # Timer completed without reset except asyncio.CancelledError: continue # Timer was reset, restart loop if reset_timer.is_set(): # If reset happened, restart countdown continue if clients: # Only run if someone is connected if len(chat_history) > 0: # 1. Detect emotion dan ambil (label, score) print("chat history") print(chat_history) emotions = [detect_emotion(msg) for msg in chat_history] print("Detected Emotions:", emotions) # 2. Group by emotion + sum score emotion_score_sum = defaultdict(float) for label, score in emotions: emotion_score_sum[label] += score # 3. Softmax labels = list(emotion_score_sum.keys()) scores = list(emotion_score_sum.values()) softmax_scores = softmax(scores) # 4. Pair label + softmax_score softmax_result = list(zip(labels, softmax_scores)) print("Softmax Result:", softmax_result) # 5. Dominant emotion most_common_emotion = max(softmax_result, key=lambda x: x[1])[0] print("Dominant Emotion:", most_common_emotion) music = mood_to_genre.get(most_common_emotion, mood_to_genre["chill"]) music_recommendations = get_recommendations_by_mood(music["genre"]) word = music["word"] else: music_recommendations = get_recommendations_by_mood("chill") word = "Chill!" recommendation_response = { "recommendations": music_recommendations, "genre": word } for client in clients.values(): await client.send_text(json.dumps(recommendation_response)) else: await asyncio.sleep(2) await broadcast_user_list() await reset_recommendation_timer() async def reset_recommendation_timer(): """Call this function when you want to reset the timer to 60 seconds.""" if reset_timer.is_set(): # Check if the timer is running before resetting reset_timer.clear() # Clear the event before setting it again reset_timer.set() # Trigger the reset @app.on_event("startup") async def start_recommender(): asyncio.create_task(periodic_recommendation()) @app.websocket("/chat/{username}") async def chat_endpoint(websocket: WebSocket, username: str): await websocket.accept() clients[username] = websocket print(f"{username} joined") await broadcast_user_list() try: while True: data = await websocket.receive_text() message_data = json.loads(data) chat_history.append(message_data["message"]) response = { "username": message_data["username"], "message": message_data["message"] } # Broadcast message to all clients for client in clients.values(): await client.send_text(json.dumps(response)) except Exception as e: print(f"{username} disconnected: {e}") del clients[username] await broadcast_user_list() @app.get("/mp") def read_root(): print("frontend") return FileResponse("frontend/index.html") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)