Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import os | |
| import cv2 | |
| import mediapipe as mp | |
| import numpy as np | |
| # Set DeepFace home directory to writable location | |
| os.environ["DEEPFACE_HOME"] = "/tmp/.deepface" | |
| from deepface import DeepFace | |
| import base64 | |
| from io import BytesIO | |
| from PIL import Image | |
| import json | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize MediaPipe | |
| mp_face_mesh = mp.solutions.face_mesh | |
| face_mesh = mp_face_mesh.FaceMesh( | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| mp_face_detection = mp.solutions.face_detection | |
| face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.5) | |
| # Eye and iris landmark indices | |
| LEFT_EYE = [33, 160, 158, 133, 153, 144] | |
| RIGHT_EYE = [362, 385, 387, 263, 373, 380] | |
| LEFT_IRIS = [468, 469, 470, 471, 472] | |
| RIGHT_IRIS = [473, 474, 475, 476, 477] | |
| def eye_aspect_ratio(landmarks, eye_points, image_w, image_h): | |
| p = [] | |
| for idx in eye_points: | |
| lm = landmarks[idx] | |
| x, y = int(lm.x * image_w), int(lm.y * image_h) | |
| p.append((x, y)) | |
| A = np.linalg.norm(np.array(p[1]) - np.array(p[5])) | |
| B = np.linalg.norm(np.array(p[2]) - np.array(p[4])) | |
| C = np.linalg.norm(np.array(p[0]) - np.array(p[3])) | |
| ear = (A + B) / (2.0 * C) | |
| return ear | |
| def get_iris_position_2d(landmarks, iris_points, eye_points, image_w, image_h): | |
| try: | |
| iris_center = landmarks[iris_points[0]] | |
| iris_x = iris_center.x * image_w | |
| iris_y = iris_center.y * image_h | |
| left_corner = landmarks[eye_points[0]] | |
| right_corner = landmarks[eye_points[3]] | |
| top_point = landmarks[eye_points[1]] | |
| bottom_point = landmarks[eye_points[4]] | |
| eye_left = left_corner.x * image_w | |
| eye_right = right_corner.x * image_w | |
| eye_top = top_point.y * image_h | |
| eye_bottom = bottom_point.y * image_h | |
| eye_width = eye_right - eye_left | |
| eye_height = eye_bottom - eye_top | |
| if eye_width > 0: | |
| horizontal_pos = (iris_x - eye_left) / eye_width | |
| horizontal_pos = max(0, min(1, horizontal_pos)) | |
| else: | |
| horizontal_pos = 0.5 | |
| if eye_height > 0: | |
| vertical_pos = (iris_y - eye_top) / eye_height | |
| vertical_pos = max(0, min(1, vertical_pos)) | |
| else: | |
| vertical_pos = 0.5 | |
| return horizontal_pos, vertical_pos | |
| except: | |
| return 0.5, 0.5 | |
| def get_gaze_direction(h_pos, v_pos): | |
| directions = [] | |
| if h_pos < 0.35: | |
| directions.append("LEFT") | |
| elif h_pos > 0.65: | |
| directions.append("RIGHT") | |
| if v_pos < 0.35: | |
| directions.append("UP") | |
| elif v_pos > 0.65: | |
| directions.append("DOWN") | |
| if not directions: | |
| directions.append("CENTER") | |
| return " + ".join(directions) | |
| def get_gaze_score(left_h_pos, left_v_pos, right_h_pos, right_v_pos): | |
| avg_h_pos = (left_h_pos + right_h_pos) / 2.0 | |
| avg_v_pos = (left_v_pos + right_v_pos) / 2.0 | |
| h_score = 1.0 if 0.35 <= avg_h_pos <= 0.65 else 0.5 | |
| v_score = 1.0 if 0.35 <= avg_v_pos <= 0.65 else 0.5 | |
| return (h_score + v_score) / 2.0 | |
| def get_head_pose_score(landmarks, image_w, image_h): | |
| nose = landmarks[1] | |
| x = nose.x * image_w | |
| y = nose.y * image_h | |
| d = np.linalg.norm(np.array([x - image_w / 2, y - image_h / 2])) | |
| return 1.0 if d < 0.3 * image_w else 0.0 | |
| def compute_concentration_score(gaze, head_pose, blink): | |
| score = 0.5 * gaze + 0.3 * head_pose + 0.2 * (0 if blink else 1) | |
| return round(score * 100, 2) | |
| def analyze_emotion(frame): | |
| try: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = face_detection.process(frame_rgb) | |
| if results.detections: | |
| for detection in results.detections: | |
| bboxC = detection.location_data.relative_bounding_box | |
| ih, iw, _ = frame.shape | |
| x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \ | |
| int(bboxC.width * iw), int(bboxC.height * ih) | |
| x, y = max(0, x), max(0, y) | |
| w = min(w, iw - x) | |
| h = min(h, ih - y) | |
| if w > 50 and h > 50: | |
| face_img = frame[y:y+h, x:x+w] | |
| emotion_result = DeepFace.analyze(face_img, | |
| actions=['emotion'], | |
| enforce_detection=False) | |
| if isinstance(emotion_result, list): | |
| return emotion_result[0]['dominant_emotion'] | |
| else: | |
| return emotion_result['dominant_emotion'] | |
| except Exception as e: | |
| print(f"Error analyzing emotion: {e}") | |
| return "neutral" | |
| async def analyze_image(file: UploadFile = File(...)): | |
| try: | |
| # Read image | |
| contents = await file.read() | |
| nparr = np.frombuffer(contents, np.uint8) | |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if frame is None: | |
| raise HTTPException(status_code=400, detail="Invalid image format") | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| image_h, image_w, _ = frame.shape | |
| # Process face mesh | |
| results = face_mesh.process(frame_rgb) | |
| # Analyze emotion | |
| emotion = analyze_emotion(frame) | |
| response_data = { | |
| "emotion": emotion, | |
| "face_detected": False, | |
| "gaze_direction": "UNKNOWN", | |
| "concentration_score": 0.0, | |
| "blinking": False, | |
| "gaze_positions": { | |
| "left_eye": {"horizontal": 0.5, "vertical": 0.5}, | |
| "right_eye": {"horizontal": 0.5, "vertical": 0.5} | |
| } | |
| } | |
| if results.multi_face_landmarks: | |
| for face_landmarks in results.multi_face_landmarks: | |
| landmarks = face_landmarks.landmark | |
| response_data["face_detected"] = True | |
| # Calculate eye aspect ratios | |
| left_ear = eye_aspect_ratio(landmarks, LEFT_EYE, image_w, image_h) | |
| right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE, image_w, image_h) | |
| avg_ear = (left_ear + right_ear) / 2 | |
| # Get iris positions | |
| left_h_pos, left_v_pos = get_iris_position_2d(landmarks, LEFT_IRIS, LEFT_EYE, image_w, image_h) | |
| right_h_pos, right_v_pos = get_iris_position_2d(landmarks, RIGHT_IRIS, RIGHT_EYE, image_w, image_h) | |
| # Calculate gaze direction | |
| avg_direction = get_gaze_direction((left_h_pos + right_h_pos) / 2, (left_v_pos + right_v_pos) / 2) | |
| # Calculate scores | |
| blink = avg_ear < 0.25 | |
| gaze_score = get_gaze_score(left_h_pos, left_v_pos, right_h_pos, right_v_pos) | |
| head_score = get_head_pose_score(landmarks, image_w, image_h) | |
| concentration = compute_concentration_score(gaze_score, head_score, blink) | |
| response_data.update({ | |
| "gaze_direction": avg_direction, | |
| "concentration_score": float(concentration), | |
| "blinking": bool(blink), | |
| "gaze_positions": { | |
| "left_eye": {"horizontal": float(left_h_pos), "vertical": float(left_v_pos)}, | |
| "right_eye": {"horizontal": float(right_h_pos), "vertical": float(right_v_pos)} | |
| } | |
| }) | |
| break | |
| return response_data | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}") | |
| async def root(): | |
| return {"message": "Gaze and Emotion Detection API"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |