Spaces:
Runtime error
Runtime error
| import os | |
| import cv2 | |
| import numpy as np | |
| import base64 | |
| import json | |
| from io import BytesIO | |
| from PIL import Image | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from fastapi import FastAPI, Request, Form, File, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| import requests | |
| emotion_model = None | |
| app = FastAPI() | |
| def load_model(): | |
| global emotion_model | |
| file_id = "1ist4U_0oCmHe7atGZvnxRCDMEGXCqqke" # π Ganti dengan ID milikmu | |
| url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
| output_path = "/tmp/moodDetection.keras" | |
| print("π₯ Mengunduh model dari Google Drive...") | |
| response = requests.get(url) | |
| if response.status_code != 200: | |
| raise Exception("β Gagal mengunduh model dari Google Drive") | |
| with open(output_path, "wb") as f: | |
| f.write(response.content) | |
| print("β Model berhasil diunduh, memuat ke memory...") | |
| emotion_model = keras.models.load_model(output_path) | |
| print("β Model siap digunakan!") | |
| emotion_labels = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise'] | |
| face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| sessions = {} | |
| async def detect_emotion(request: Request): | |
| try: | |
| form = await request.form() | |
| image_data = form.get("image") | |
| print(f"Received request with sessionId: {form.get('sessionId', 'not provided')}") | |
| print(f"Image data received: {bool(image_data)}") | |
| if image_data and "base64" in image_data: | |
| try: | |
| base64_data = image_data.split(',')[1] | |
| image_bytes = base64.b64decode(base64_data) | |
| print("Successfully decoded base64 data") | |
| img = Image.open(BytesIO(image_bytes)) | |
| img_array = np.array(img) | |
| if len(img_array.shape) > 2 and img_array.shape[2] == 3: | |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| else: | |
| gray = img_array | |
| print(f"Processed image shape: {gray.shape}") | |
| faces = face_cascade.detectMultiScale(gray, 1.3, 5) | |
| print(f"Detected {len(faces)} faces") | |
| if len(faces) > 0: | |
| (x, y, w, h) = faces[0] | |
| face_roi = gray[y:y+h, x:x+w] | |
| resized_face = cv2.resize(face_roi, (48, 48)) | |
| normalized_face = resized_face / 255.0 | |
| reshaped_face = normalized_face.reshape(1, 48, 48, 1) | |
| prediction = emotion_model.predict(reshaped_face) | |
| emotion_idx = np.argmax(prediction[0]) | |
| emotion = emotion_labels[emotion_idx] | |
| confidence = float(prediction[0][emotion_idx]) | |
| stress_mapping = { | |
| 'angry': 85, 'disgust': 65, 'fear': 70, | |
| 'sad': 75, 'surprise': 45, 'neutral': 30, 'happy': 15 | |
| } | |
| stress_level = stress_mapping.get(emotion, 50) | |
| session_id = form.get("sessionId", "default") | |
| if session_id not in sessions: | |
| sessions[session_id] = { | |
| "emotions": [], | |
| "stress_levels": [] | |
| } | |
| sessions[session_id]["emotions"].append(emotion) | |
| sessions[session_id]["stress_levels"].append(stress_level) | |
| result = { | |
| "emotion": emotion, | |
| "confidence": confidence, | |
| "stressLevel": stress_level, | |
| "faceDetected": True, | |
| "faceRegion": {"x": int(x), "y": int(y), "width": int(w), "height": int(h)} | |
| } | |
| else: | |
| result = { | |
| "emotion": "unknown", | |
| "confidence": 0, | |
| "stressLevel": 0, | |
| "faceDetected": False | |
| } | |
| except Exception as e: | |
| print(f"Error processing image: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| result = {"error": f"Image processing error: {str(e)}"} | |
| else: | |
| result = {"error": "Invalid image data"} | |
| except Exception as e: | |
| print(f"Request handling error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| result = {"error": f"Server error: {str(e)}"} | |
| response = JSONResponse(content=result) | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Credentials"] = "true" | |
| return response | |
| async def session_report(session_id: str): | |
| if session_id not in sessions: | |
| return JSONResponse(content={"error": "Session not found"}, status_code=404) | |
| session_data = sessions[session_id] | |
| if session_data["emotions"]: | |
| emotion_counts = {} | |
| for emotion in session_data["emotions"]: | |
| if emotion in emotion_counts: | |
| emotion_counts[emotion] += 1 | |
| else: | |
| emotion_counts[emotion] = 1 | |
| dominant_emotion = max(emotion_counts, key=emotion_counts.get) | |
| avg_stress = sum(session_data["stress_levels"]) / len(session_data["stress_levels"]) | |
| min_stress = min(session_data["stress_levels"]) | |
| max_stress = max(session_data["stress_levels"]) | |
| result = { | |
| "dominantEmotion": dominant_emotion, | |
| "emotionCounts": emotion_counts, | |
| "averageStressLevel": round(avg_stress, 2), | |
| "minStressLevel": min_stress, | |
| "maxStressLevel": max_stress, | |
| "totalFrames": len(session_data["emotions"]) | |
| } | |
| else: | |
| result = { | |
| "error": "No data in session" | |
| } | |
| return JSONResponse(content=result) | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="127.0.0.1", port=8080, reload=True) |