Spaces:
Sleeping
Sleeping
| from flask import request, jsonify | |
| from deepface import DeepFace | |
| import tempfile | |
| import os | |
| import cv2 | |
| import numpy as np | |
| from __init__ import app, db | |
| from flask_cors import CORS | |
| import cloudinary | |
| import cloudinary.uploader | |
| from bson.objectid import ObjectId | |
| CORS(app) | |
| # DNN FACE DETECTOR SETUP | |
| prototxt_path = os.path.join("dnn", "deploy.prototxt.txt") | |
| caffemodel_path = os.path.join("dnn", "res10_300x300_ssd_iter_140000.caffemodel") | |
| net = cv2.dnn.readNetFromCaffe(prototxt_path, caffemodel_path) | |
| def get_closest_human_face(frame): | |
| h, w = frame.shape[:2] | |
| blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104, 117, 123)) | |
| net.setInput(blob) | |
| detections = net.forward() | |
| max_area = 0 | |
| best_face = None | |
| for i in range(detections.shape[2]): | |
| confidence = detections[0, 0, i, 2] | |
| if confidence > 0.85: | |
| box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) | |
| x1, y1, x2, y2 = map(int, box) | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(w, x2), min(h, y2) | |
| face = frame[y1:y2, x1:x2] | |
| area = (x2 - x1) * (y2 - y1) | |
| if face.shape[0] > 50 and face.shape[1] > 50 and area > max_area: | |
| max_area = area | |
| best_face = face | |
| return best_face | |
| def index(): | |
| return jsonify({"message": "Flask backend running"}) | |
| def analyze(): | |
| if 'video' not in request.files: | |
| return jsonify({"error": "No video file provided"}), 400 | |
| video = request.files['video'] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video: | |
| video_path = temp_video.name | |
| video.save(video_path) | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames == 0: | |
| raise Exception("Video has no frames.") | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames // 2) | |
| success, frame = cap.read() | |
| cap.release() | |
| if not success: | |
| raise Exception("Could not read frame from video.") | |
| # Strict human face detection | |
| face = get_closest_human_face(frame) | |
| if face is None: | |
| return jsonify({"error": "No valid human face detected in video"}), 422 | |
| result = DeepFace.analyze(face, actions=['emotion'], enforce_detection=True) | |
| emotions = result[0]['emotion'] | |
| grouped = { | |
| 'angry': emotions.get('angry', 0) + emotions.get('disgust', 0), | |
| 'happy': emotions.get('happy', 0), | |
| 'sad': emotions.get('sad', 0) + emotions.get('fear', 0), | |
| 'surprise': emotions.get('surprise', 0), | |
| 'neutral': emotions.get('neutral', 0), | |
| } | |
| dominant_emotion = max(grouped, key=grouped.get) | |
| raw_score = grouped[dominant_emotion] | |
| total = sum(grouped.values()) | |
| confidence = (raw_score / total) * 100 if total > 0 else 0 | |
| confidence = max(83.0, min(confidence * 1.2, 98.0)) # Boost confidence | |
| # MongoDB fetch | |
| songs = list(db.songs_by_emotion.find({"emotion": dominant_emotion})) | |
| for song in songs: | |
| song['_id'] = str(song['_id']) | |
| return jsonify({ | |
| "emotion": dominant_emotion, | |
| "confidence": confidence, | |
| "songs": songs | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| finally: | |
| if os.path.exists(video_path): | |
| os.unlink(video_path) | |
| def get_songs_by_emotion(emotion): | |
| songs = list(db.songs_by_emotion.find({"emotion": emotion})) | |
| for song in songs: | |
| song['_id'] = str(song['_id']) | |
| return jsonify({"emotion": emotion, "songs": songs}), 200 | |
| def add_song(): | |
| try: | |
| # Get data with correct keys from the frontend form | |
| song_mood = request.form.get("song_mood") | |
| song_name = request.form.get("song_name") | |
| song_artist = request.form.get("song_artist") | |
| song_file = request.files.get("song_file") | |
| song_image = request.files.get("song_image") | |
| # Basic validation | |
| if not all([song_mood, song_name, song_artist, song_file, song_image]): | |
| return jsonify({"error": "All fields are required"}), 400 | |
| # Upload song to Cloudinary (audio/video type) | |
| song_upload = cloudinary.uploader.upload( | |
| song_file, | |
| resource_type="video", | |
| folder="songs" | |
| ) | |
| # Upload image to Cloudinary | |
| image_upload = cloudinary.uploader.upload( | |
| song_image, | |
| folder="song_images" | |
| ) | |
| song_data = { | |
| "emotion": song_mood, | |
| "song_title": song_name, | |
| "artist": song_artist, | |
| "song_uri": song_upload["secure_url"], | |
| "song_image": image_upload["secure_url"] | |
| } | |
| # Insert into MongoDB | |
| db.songs_by_emotion.insert_one(song_data) | |
| # Add the created_at timestamp to match the frontend display | |
| song_data['_id'] = str(song_data['_id']) | |
| return jsonify(song_data), 201 | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"error": str(e)}), 500 | |
| def get_all_songs(): | |
| try: | |
| songs = list(db.songs_by_emotion.find({})) | |
| for song in songs: | |
| song['_id'] = str(song['_id']) | |
| return jsonify(songs), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def delete_song(id): | |
| try: | |
| # Check if the ID is a valid ObjectId | |
| if not ObjectId.is_valid(id): | |
| return jsonify({"error": "Invalid song ID"}), 400 | |
| result = db.songs_by_emotion.delete_one({"_id": ObjectId(id)}) | |
| if result.deleted_count == 1: | |
| return jsonify({"message": "Song deleted successfully"}), 200 | |
| else: | |
| return jsonify({"error": "Song not found"}), 404 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 |