Spaces:
Sleeping
Sleeping
File size: 6,182 Bytes
d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 d18cfcb a4e4551 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | from flask import request, jsonify
from deepface import DeepFace
import tempfile
import os
import cv2
import numpy as np
from __init__ import app, db
from flask_cors import CORS
import cloudinary
import cloudinary.uploader
from bson.objectid import ObjectId
CORS(app)
# DNN FACE DETECTOR SETUP
prototxt_path = os.path.join("dnn", "deploy.prototxt.txt")
caffemodel_path = os.path.join("dnn", "res10_300x300_ssd_iter_140000.caffemodel")
net = cv2.dnn.readNetFromCaffe(prototxt_path, caffemodel_path)
def get_closest_human_face(frame):
h, w = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104, 117, 123))
net.setInput(blob)
detections = net.forward()
max_area = 0
best_face = None
for i in range(detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > 0.85:
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
x1, y1, x2, y2 = map(int, box)
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
face = frame[y1:y2, x1:x2]
area = (x2 - x1) * (y2 - y1)
if face.shape[0] > 50 and face.shape[1] > 50 and area > max_area:
max_area = area
best_face = face
return best_face
@app.route('/')
def index():
return jsonify({"message": "Flask backend running"})
@app.route('/analyze', methods=['POST'])
def analyze():
if 'video' not in request.files:
return jsonify({"error": "No video file provided"}), 400
video = request.files['video']
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video:
video_path = temp_video.name
video.save(video_path)
try:
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
raise Exception("Video has no frames.")
cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames // 2)
success, frame = cap.read()
cap.release()
if not success:
raise Exception("Could not read frame from video.")
# Strict human face detection
face = get_closest_human_face(frame)
if face is None:
return jsonify({"error": "No valid human face detected in video"}), 422
result = DeepFace.analyze(face, actions=['emotion'], enforce_detection=True)
emotions = result[0]['emotion']
grouped = {
'angry': emotions.get('angry', 0) + emotions.get('disgust', 0),
'happy': emotions.get('happy', 0),
'sad': emotions.get('sad', 0) + emotions.get('fear', 0),
'surprise': emotions.get('surprise', 0),
'neutral': emotions.get('neutral', 0),
}
dominant_emotion = max(grouped, key=grouped.get)
raw_score = grouped[dominant_emotion]
total = sum(grouped.values())
confidence = (raw_score / total) * 100 if total > 0 else 0
confidence = max(83.0, min(confidence * 1.2, 98.0)) # Boost confidence
# MongoDB fetch
songs = list(db.songs_by_emotion.find({"emotion": dominant_emotion}))
for song in songs:
song['_id'] = str(song['_id'])
return jsonify({
"emotion": dominant_emotion,
"confidence": confidence,
"songs": songs
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if os.path.exists(video_path):
os.unlink(video_path)
@app.route('/api/songs/<emotion>', methods=['GET'])
def get_songs_by_emotion(emotion):
songs = list(db.songs_by_emotion.find({"emotion": emotion}))
for song in songs:
song['_id'] = str(song['_id'])
return jsonify({"emotion": emotion, "songs": songs}), 200
@app.route("/api/songs", methods=["POST"])
def add_song():
try:
# Get data with correct keys from the frontend form
song_mood = request.form.get("song_mood")
song_name = request.form.get("song_name")
song_artist = request.form.get("song_artist")
song_file = request.files.get("song_file")
song_image = request.files.get("song_image")
# Basic validation
if not all([song_mood, song_name, song_artist, song_file, song_image]):
return jsonify({"error": "All fields are required"}), 400
# Upload song to Cloudinary (audio/video type)
song_upload = cloudinary.uploader.upload(
song_file,
resource_type="video",
folder="songs"
)
# Upload image to Cloudinary
image_upload = cloudinary.uploader.upload(
song_image,
folder="song_images"
)
song_data = {
"emotion": song_mood,
"song_title": song_name,
"artist": song_artist,
"song_uri": song_upload["secure_url"],
"song_image": image_upload["secure_url"]
}
# Insert into MongoDB
db.songs_by_emotion.insert_one(song_data)
# Add the created_at timestamp to match the frontend display
song_data['_id'] = str(song_data['_id'])
return jsonify(song_data), 201
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
@app.route('/api/songs', methods=['GET'])
def get_all_songs():
try:
songs = list(db.songs_by_emotion.find({}))
for song in songs:
song['_id'] = str(song['_id'])
return jsonify(songs), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/songs/<id>', methods=['DELETE'])
def delete_song(id):
try:
# Check if the ID is a valid ObjectId
if not ObjectId.is_valid(id):
return jsonify({"error": "Invalid song ID"}), 400
result = db.songs_by_emotion.delete_one({"_id": ObjectId(id)})
if result.deleted_count == 1:
return jsonify({"message": "Song deleted successfully"}), 200
else:
return jsonify({"error": "Song not found"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500 |