from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware import os import cv2 import mediapipe as mp import numpy as np # Set DeepFace home directory to writable location os.environ["DEEPFACE_HOME"] = "/tmp/.deepface" from deepface import DeepFace import base64 from io import BytesIO from PIL import Image import json app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize MediaPipe mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) mp_face_detection = mp.solutions.face_detection face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.5) # Eye and iris landmark indices LEFT_EYE = [33, 160, 158, 133, 153, 144] RIGHT_EYE = [362, 385, 387, 263, 373, 380] LEFT_IRIS = [468, 469, 470, 471, 472] RIGHT_IRIS = [473, 474, 475, 476, 477] def eye_aspect_ratio(landmarks, eye_points, image_w, image_h): p = [] for idx in eye_points: lm = landmarks[idx] x, y = int(lm.x * image_w), int(lm.y * image_h) p.append((x, y)) A = np.linalg.norm(np.array(p[1]) - np.array(p[5])) B = np.linalg.norm(np.array(p[2]) - np.array(p[4])) C = np.linalg.norm(np.array(p[0]) - np.array(p[3])) ear = (A + B) / (2.0 * C) return ear def get_iris_position_2d(landmarks, iris_points, eye_points, image_w, image_h): try: iris_center = landmarks[iris_points[0]] iris_x = iris_center.x * image_w iris_y = iris_center.y * image_h left_corner = landmarks[eye_points[0]] right_corner = landmarks[eye_points[3]] top_point = landmarks[eye_points[1]] bottom_point = landmarks[eye_points[4]] eye_left = left_corner.x * image_w eye_right = right_corner.x * image_w eye_top = top_point.y * image_h eye_bottom = bottom_point.y * image_h eye_width = eye_right - eye_left eye_height = eye_bottom - eye_top if eye_width > 0: horizontal_pos = (iris_x - eye_left) / eye_width horizontal_pos = max(0, min(1, horizontal_pos)) else: horizontal_pos = 0.5 if eye_height > 0: vertical_pos = (iris_y - eye_top) / eye_height vertical_pos = max(0, min(1, vertical_pos)) else: vertical_pos = 0.5 return horizontal_pos, vertical_pos except: return 0.5, 0.5 def get_gaze_direction(h_pos, v_pos): directions = [] if h_pos < 0.35: directions.append("LEFT") elif h_pos > 0.65: directions.append("RIGHT") if v_pos < 0.35: directions.append("UP") elif v_pos > 0.65: directions.append("DOWN") if not directions: directions.append("CENTER") return " + ".join(directions) def get_gaze_score(left_h_pos, left_v_pos, right_h_pos, right_v_pos): avg_h_pos = (left_h_pos + right_h_pos) / 2.0 avg_v_pos = (left_v_pos + right_v_pos) / 2.0 h_score = 1.0 if 0.35 <= avg_h_pos <= 0.65 else 0.5 v_score = 1.0 if 0.35 <= avg_v_pos <= 0.65 else 0.5 return (h_score + v_score) / 2.0 def get_head_pose_score(landmarks, image_w, image_h): nose = landmarks[1] x = nose.x * image_w y = nose.y * image_h d = np.linalg.norm(np.array([x - image_w / 2, y - image_h / 2])) return 1.0 if d < 0.3 * image_w else 0.0 def compute_concentration_score(gaze, head_pose, blink): score = 0.5 * gaze + 0.3 * head_pose + 0.2 * (0 if blink else 1) return round(score * 100, 2) def analyze_emotion(frame): try: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_detection.process(frame_rgb) if results.detections: for detection in results.detections: bboxC = detection.location_data.relative_bounding_box ih, iw, _ = frame.shape x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \ int(bboxC.width * iw), int(bboxC.height * ih) x, y = max(0, x), max(0, y) w = min(w, iw - x) h = min(h, ih - y) if w > 50 and h > 50: face_img = frame[y:y+h, x:x+w] emotion_result = DeepFace.analyze(face_img, actions=['emotion'], enforce_detection=False) if isinstance(emotion_result, list): return emotion_result[0]['dominant_emotion'] else: return emotion_result['dominant_emotion'] except Exception as e: print(f"Error analyzing emotion: {e}") return "neutral" @app.post("/analyze") async def analyze_image(file: UploadFile = File(...)): try: # Read image contents = await file.read() nparr = np.frombuffer(contents, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: raise HTTPException(status_code=400, detail="Invalid image format") frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image_h, image_w, _ = frame.shape # Process face mesh results = face_mesh.process(frame_rgb) # Analyze emotion emotion = analyze_emotion(frame) response_data = { "emotion": emotion, "face_detected": False, "gaze_direction": "UNKNOWN", "concentration_score": 0.0, "blinking": False, "gaze_positions": { "left_eye": {"horizontal": 0.5, "vertical": 0.5}, "right_eye": {"horizontal": 0.5, "vertical": 0.5} } } if results.multi_face_landmarks: for face_landmarks in results.multi_face_landmarks: landmarks = face_landmarks.landmark response_data["face_detected"] = True # Calculate eye aspect ratios left_ear = eye_aspect_ratio(landmarks, LEFT_EYE, image_w, image_h) right_ear = eye_aspect_ratio(landmarks, RIGHT_EYE, image_w, image_h) avg_ear = (left_ear + right_ear) / 2 # Get iris positions left_h_pos, left_v_pos = get_iris_position_2d(landmarks, LEFT_IRIS, LEFT_EYE, image_w, image_h) right_h_pos, right_v_pos = get_iris_position_2d(landmarks, RIGHT_IRIS, RIGHT_EYE, image_w, image_h) # Calculate gaze direction avg_direction = get_gaze_direction((left_h_pos + right_h_pos) / 2, (left_v_pos + right_v_pos) / 2) # Calculate scores blink = avg_ear < 0.25 gaze_score = get_gaze_score(left_h_pos, left_v_pos, right_h_pos, right_v_pos) head_score = get_head_pose_score(landmarks, image_w, image_h) concentration = compute_concentration_score(gaze_score, head_score, blink) response_data.update({ "gaze_direction": avg_direction, "concentration_score": float(concentration), "blinking": bool(blink), "gaze_positions": { "left_eye": {"horizontal": float(left_h_pos), "vertical": float(left_v_pos)}, "right_eye": {"horizontal": float(right_h_pos), "vertical": float(right_v_pos)} } }) break return response_data except Exception as e: raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}") @app.get("/") async def root(): return {"message": "Gaze and Emotion Detection API"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)