Spaces:
Running
Running
| import argparse | |
| import json | |
| import math | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| import cv2 | |
| import numpy as np | |
| import mediapipe as mp | |
| from mediapipe.tasks.python import vision | |
| from mediapipe.tasks.python.core.base_options import BaseOptions | |
| RunningMode = vision.RunningMode | |
| VISIBILITY_THRESHOLD = 0.5 | |
| # Pose landmark indices (BlazePose, 33 points) | |
| POSE_IDX = { | |
| "left_shoulder": 11, | |
| "right_shoulder": 12, | |
| "left_hip": 23, | |
| "right_hip": 24, | |
| } | |
| # Face mesh indices | |
| NOSE_TIP_IDX = 1 | |
| LEFT_EYE_OUTER_IDX = 33 | |
| RIGHT_EYE_OUTER_IDX = 263 | |
| # ARKit blendshapes | |
| BLINK_BLENDSHAPES = ["eyeBlinkLeft", "eyeBlinkRight"] | |
| # Brow tension: browInnerUp is the primary anxiety/tension indicator | |
| BROW_TENSION_BLENDSHAPES = [ | |
| "browDownLeft", # anger | |
| "browDownRight", | |
| "browInnerUp", # worry / nervousness | |
| ] | |
| # Fingertip indices only (more accurate than palm centroid) | |
| FINGERTIP_INDICES = [4, 8, 12, 16, 20] | |
| # --------------------------------------------------------------------------- | |
| # Geometry helpers | |
| # --------------------------------------------------------------------------- | |
| def euclidean(p1, p2) -> float: | |
| return float(np.linalg.norm(np.array(p1) - np.array(p2))) | |
| def rotation_matrix_to_euler_angles(R: np.ndarray): | |
| """Returns (pitch, yaw, roll) in degrees from a 3x3 rotation matrix.""" | |
| sy = math.sqrt(R[0, 0] ** 2 + R[1, 0] ** 2) | |
| singular = sy < 1e-6 | |
| if not singular: | |
| x = math.atan2(R[2, 1], R[2, 2]) | |
| y = math.atan2(-R[2, 0], sy) | |
| z = math.atan2(R[1, 0], R[0, 0]) | |
| else: | |
| x = math.atan2(-R[1, 2], R[1, 1]) | |
| y = math.atan2(-R[2, 0], sy) | |
| z = 0.0 | |
| return tuple(math.degrees(a) for a in (x, y, z)) | |
| def angle_from_horizontal(p_left, p_right) -> float: | |
| dx = p_right[0] - p_left[0] | |
| dy = p_right[1] - p_left[1] | |
| return math.degrees(math.atan2(dy, dx)) | |
| def angle_from_vertical(p_top, p_bottom) -> float: | |
| dx = p_bottom[0] - p_top[0] | |
| dy = p_bottom[1] - p_top[1] | |
| return math.degrees(math.atan2(dx, dy)) | |
| def blendshape_score(blendshapes, names) -> Optional[float]: | |
| if not blendshapes: | |
| return None | |
| lookup = {c.category_name: c.score for c in blendshapes} | |
| vals = [lookup[n] for n in names if n in lookup] | |
| return float(np.mean(vals)) if vals else None | |
| # --------------------------------------------------------------------------- | |
| # Per-frame raw metrics container | |
| # --------------------------------------------------------------------------- | |
| class FrameMetrics: | |
| timestamp: float | |
| face_detected: bool = False | |
| pose_detected: bool = False | |
| hand_detected: bool = False | |
| blink_score: Optional[float] = None | |
| is_blink_frame: bool = False | |
| brow_tension_score: Optional[float] = None | |
| looking_at_camera: Optional[bool] = None | |
| yaw: Optional[float] = None | |
| pitch: Optional[float] = None | |
| shoulder_tilt_deg: Optional[float] = None | |
| torso_lean_deg: Optional[float] = None | |
| head_x: Optional[float] = None | |
| head_y: Optional[float] = None | |
| face_scale: Optional[float] = None | |
| hand_to_face_ratio: Optional[float] = None | |
| is_face_touch: bool = False | |
| # --------------------------------------------------------------------------- | |
| # Main analyzer | |
| # --------------------------------------------------------------------------- | |
| class BodyLanguageAnalyzer: | |
| def __init__( | |
| self, | |
| pose_model_path: str, | |
| face_model_path: str, | |
| hand_model_path: str, | |
| calibration_seconds: float = 5.0, | |
| window_seconds: float = 1.0, | |
| blink_score_threshold: float = 0.35, # sensible default for eyeBlinkLeft/Right | |
| blink_min_consec_frames: int = 2, | |
| gaze_yaw_threshold_deg: float = 20.0, | |
| gaze_pitch_threshold_deg: float = 15.0, | |
| face_touch_distance_ratio: float = 2.5, | |
| posture_deviation_threshold_deg: float = 10.0, | |
| process_every_n_frames: int = 1, | |
| ): | |
| self.pose_model_path = pose_model_path | |
| self.face_model_path = face_model_path | |
| self.hand_model_path = hand_model_path | |
| self.calibration_seconds = calibration_seconds | |
| self.window_seconds = window_seconds | |
| self.blink_score_threshold = blink_score_threshold | |
| self.blink_min_consec_frames = blink_min_consec_frames | |
| self.gaze_yaw_threshold_deg = gaze_yaw_threshold_deg | |
| self.gaze_pitch_threshold_deg = gaze_pitch_threshold_deg | |
| self.face_touch_distance_ratio = face_touch_distance_ratio | |
| self.posture_deviation_threshold_deg = posture_deviation_threshold_deg | |
| self.process_every_n_frames = max(1, process_every_n_frames) | |
| # ------------------------------------------------------------------ | |
| def _build_landmarkers(self): | |
| pose = vision.PoseLandmarker.create_from_options( | |
| vision.PoseLandmarkerOptions( | |
| base_options=BaseOptions(model_asset_path=self.pose_model_path), | |
| running_mode=RunningMode.VIDEO, | |
| ) | |
| ) | |
| face = vision.FaceLandmarker.create_from_options( | |
| vision.FaceLandmarkerOptions( | |
| base_options=BaseOptions(model_asset_path=self.face_model_path), | |
| running_mode=RunningMode.VIDEO, | |
| output_face_blendshapes=True, | |
| output_facial_transformation_matrixes=True, | |
| ) | |
| ) | |
| hand = vision.HandLandmarker.create_from_options( | |
| vision.HandLandmarkerOptions( | |
| base_options=BaseOptions(model_asset_path=self.hand_model_path), | |
| running_mode=RunningMode.VIDEO, | |
| num_hands=2, | |
| ) | |
| ) | |
| return pose, face, hand | |
| # ------------------------------------------------------------------ | |
| # Calibrate blink threshold per-person from the first N seconds | |
| # ------------------------------------------------------------------ | |
| def _calibrate_blink_threshold( | |
| self, frames: list[FrameMetrics]) -> float: | |
| """ | |
| FIX: The eyeBlinkLeft/Right blendshape is HIGH when eye is CLOSED | |
| (approaching 1.0 = fully closed) and LOW when eye is open (≈0.0–0.2). | |
| Strategy: | |
| 1. Collect blink scores from the first 10 s (mostly open-eye baseline). | |
| 2. Compute mean of open-eye scores. | |
| 3. Set threshold = mean + 1.5 * std → catches spikes above normal open-eye level. | |
| 4. Clamp to [0.25, 0.70] for safety. | |
| This means "is_closed = blink_score >= threshold" is correct: | |
| a spike in the blink score above the open-eye baseline = blink. | |
| """ | |
| cutoff = 10.0 | |
| scores = [ | |
| f.blink_score | |
| for f in frames | |
| if f.timestamp <= cutoff and f.blink_score is not None | |
| ] | |
| if len(scores) < 10: | |
| return self.blink_score_threshold # not enough data → fallback | |
| mean = float(np.mean(scores)) | |
| std = float(np.std(scores)) | |
| # Open-eye scores are low (≈0.05–0.15). A blink = spike above that. | |
| # mean + 1.5*std gives a threshold that is clearly above normal noise. | |
| threshold = mean + 1.5 * std | |
| # Clamp: never lower than 0.25 (avoid noise triggers), | |
| # never higher than 0.70 (would miss real blinks). | |
| return float(np.clip(threshold, 0.25, 0.70)) | |
| # ------------------------------------------------------------------ | |
| # Classify head movement as stable / natural / nervous | |
| # ------------------------------------------------------------------ | |
| def _classify_head_movement( | |
| self, displacements: list[float]) -> str: | |
| """ | |
| Distinguish between: | |
| stable — barely any movement | |
| natural — occasional deliberate nods / turns | |
| nervous — frequent small rapid movements | |
| """ | |
| if not displacements: | |
| return "stable" | |
| mean_disp = float(np.mean(displacements)) | |
| rapid_moves = sum(1 for d in displacements if d > 0.05) | |
| frequency = rapid_moves / len(displacements) | |
| if mean_disp < 0.02: | |
| return "stable" | |
| elif frequency > 0.3: | |
| return "nervous" | |
| else: | |
| return "natural" | |
| # ------------------------------------------------------------------ | |
| def process_video(self, video_path: str) -> dict: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise IOError(f"Could not open video file: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| pose_lm, face_lm, hand_lm = self._build_landmarkers() | |
| raw_frames: list[FrameMetrics] = [] | |
| blink_timestamps: list[float] = [] | |
| # ── Pass 1: collect all frames ─────────────────────────────── | |
| frame_idx = 0 | |
| try: | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| if frame_idx % self.process_every_n_frames != 0: | |
| frame_idx += 1 | |
| continue | |
| timestamp = frame_idx / fps | |
| timestamp_ms = int(timestamp * 1000) | |
| h, w = frame.shape[:2] | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| mp_image = mp.Image( | |
| image_format=mp.ImageFormat.SRGB, data=rgb) | |
| pose_result = pose_lm.detect_for_video( | |
| mp_image, timestamp_ms) | |
| face_result = face_lm.detect_for_video( | |
| mp_image, timestamp_ms) | |
| hand_result = hand_lm.detect_for_video( | |
| mp_image, timestamp_ms) | |
| fm = FrameMetrics(timestamp=timestamp) | |
| # ── Face ────────────────────────────────────────────── | |
| if face_result.face_landmarks: | |
| fm.face_detected = True | |
| fl = face_result.face_landmarks[0] | |
| blendshapes = ( | |
| face_result.face_blendshapes[0] | |
| if face_result.face_blendshapes else None | |
| ) | |
| fm.blink_score = blendshape_score( | |
| blendshapes, BLINK_BLENDSHAPES) | |
| fm.brow_tension_score = blendshape_score( | |
| blendshapes, BROW_TENSION_BLENDSHAPES) | |
| fm.face_scale = euclidean( | |
| (fl[LEFT_EYE_OUTER_IDX].x * w, | |
| fl[LEFT_EYE_OUTER_IDX].y * h), | |
| (fl[RIGHT_EYE_OUTER_IDX].x * w, | |
| fl[RIGHT_EYE_OUTER_IDX].y * h), | |
| ) | |
| fm.head_x = fl[NOSE_TIP_IDX].x * w | |
| fm.head_y = fl[NOSE_TIP_IDX].y * h | |
| if face_result.facial_transformation_matrixes: | |
| matrix = ( | |
| face_result.facial_transformation_matrixes[0]) | |
| rotation = matrix[:3, :3] | |
| pitch, yaw, _roll = ( | |
| rotation_matrix_to_euler_angles(rotation)) | |
| fm.yaw, fm.pitch = yaw, pitch | |
| fm.looking_at_camera = ( | |
| abs(yaw) <= self.gaze_yaw_threshold_deg | |
| and abs(pitch) <= self.gaze_pitch_threshold_deg | |
| ) | |
| # ── Pose ────────────────────────────────────────────── | |
| if pose_result.pose_landmarks: | |
| pl = pose_result.pose_landmarks[0] | |
| def vis_ok(i): | |
| v = pl[i].visibility | |
| return v is None or v >= VISIBILITY_THRESHOLD | |
| if (vis_ok(POSE_IDX["left_shoulder"]) | |
| and vis_ok(POSE_IDX["right_shoulder"])): | |
| fm.pose_detected = True | |
| ls = (pl[POSE_IDX["left_shoulder"]].x * w, | |
| pl[POSE_IDX["left_shoulder"]].y * h) | |
| rs = (pl[POSE_IDX["right_shoulder"]].x * w, | |
| pl[POSE_IDX["right_shoulder"]].y * h) | |
| fm.shoulder_tilt_deg = angle_from_horizontal( | |
| ls, rs) | |
| if (vis_ok(POSE_IDX["left_hip"]) | |
| and vis_ok(POSE_IDX["right_hip"])): | |
| lh = (pl[POSE_IDX["left_hip"]].x * w, | |
| pl[POSE_IDX["left_hip"]].y * h) | |
| rh = (pl[POSE_IDX["right_hip"]].x * w, | |
| pl[POSE_IDX["right_hip"]].y * h) | |
| shoulder_mid = ( | |
| (ls[0] + rs[0]) / 2, | |
| (ls[1] + rs[1]) / 2, | |
| ) | |
| hip_mid = ( | |
| (lh[0] + rh[0]) / 2, | |
| (lh[1] + rh[1]) / 2, | |
| ) | |
| fm.torso_lean_deg = angle_from_vertical( | |
| shoulder_mid, hip_mid) | |
| # ── Hands ───────────────────────────────────────────── | |
| if hand_result.hand_landmarks: | |
| fm.hand_detected = True | |
| if (fm.face_detected | |
| and fm.head_x is not None | |
| and fm.face_scale): | |
| min_ratio = None | |
| for hand_pts in hand_result.hand_landmarks: | |
| fingertips = [ | |
| hand_pts[i] for i in FINGERTIP_INDICES] | |
| cx = float( | |
| np.mean([p.x for p in fingertips])) * w | |
| cy = float( | |
| np.mean([p.y for p in fingertips])) * h | |
| dist = euclidean( | |
| (cx, cy), (fm.head_x, fm.head_y)) | |
| ratio = dist / fm.face_scale | |
| if min_ratio is None or ratio < min_ratio: | |
| min_ratio = ratio | |
| if min_ratio is not None: | |
| fm.hand_to_face_ratio = min_ratio | |
| fm.is_face_touch = ( | |
| min_ratio <= self.face_touch_distance_ratio) | |
| raw_frames.append(fm) | |
| frame_idx += 1 | |
| finally: | |
| cap.release() | |
| pose_lm.close() | |
| face_lm.close() | |
| hand_lm.close() | |
| # ── FIX: calibrate blink threshold then re-detect blinks ───── | |
| calibrated_threshold = self._calibrate_blink_threshold(raw_frames) | |
| below_threshold_run = 0 | |
| for fm in raw_frames: | |
| fm.is_blink_frame = False # reset | |
| if fm.blink_score is not None: | |
| # eyeBlinkLeft/Right is HIGH when closed → spike = blink | |
| is_closed = fm.blink_score >= calibrated_threshold | |
| if is_closed: | |
| below_threshold_run += 1 | |
| else: | |
| # Transition: was closed for ≥ N frames → count as blink | |
| if below_threshold_run >= self.blink_min_consec_frames: | |
| blink_timestamps.append(fm.timestamp) | |
| fm.is_blink_frame = True | |
| below_threshold_run = 0 | |
| # Baseline uses median (robust to nervous first seconds) | |
| baseline = self._compute_baseline(raw_frames) | |
| time_series = self._aggregate_windows( | |
| raw_frames, blink_timestamps, baseline) | |
| summary = self._compute_summary( | |
| time_series, blink_timestamps, raw_frames, | |
| calibrated_threshold) | |
| return { | |
| "fps": fps, | |
| "duration_seconds": frame_idx / fps if fps else None, | |
| "calibration_baseline": baseline, | |
| "calibrated_blink_threshold": calibrated_threshold, | |
| "time_series": time_series, | |
| "summary": summary, | |
| } | |
| # ------------------------------------------------------------------ | |
| # Baseline uses median (robust to outliers in first few seconds) | |
| # ------------------------------------------------------------------ | |
| def _compute_baseline( | |
| self, frames: list[FrameMetrics]) -> dict: | |
| cutoff = self.calibration_seconds | |
| shoulder_vals = [ | |
| f.shoulder_tilt_deg for f in frames | |
| if f.timestamp <= cutoff | |
| and f.shoulder_tilt_deg is not None | |
| ] | |
| torso_vals = [ | |
| f.torso_lean_deg for f in frames | |
| if f.timestamp <= cutoff | |
| and f.torso_lean_deg is not None | |
| ] | |
| return { | |
| "shoulder_tilt_deg": ( | |
| float(np.median(shoulder_vals)) | |
| if shoulder_vals else None), | |
| "torso_lean_deg": ( | |
| float(np.median(torso_vals)) | |
| if torso_vals else None), | |
| "samples_used": len(shoulder_vals), | |
| } | |
| # ------------------------------------------------------------------ | |
| def _aggregate_windows( | |
| self, frames, blink_timestamps, baseline) -> list[dict]: | |
| if not frames: | |
| return [] | |
| total_duration = frames[-1].timestamp | |
| n_windows = int(total_duration // self.window_seconds) + 1 | |
| time_series = [] | |
| prev_head_pos = None | |
| for w_idx in range(n_windows): | |
| w_start = w_idx * self.window_seconds | |
| w_end = w_start + self.window_seconds | |
| window_frames = [ | |
| f for f in frames | |
| if w_start <= f.timestamp < w_end | |
| ] | |
| if not window_frames: | |
| continue | |
| looking_flags = [ | |
| f.looking_at_camera for f in window_frames | |
| if f.looking_at_camera is not None | |
| ] | |
| eye_contact_pct = ( | |
| float(np.mean(looking_flags) * 100) | |
| if looking_flags else None) | |
| shoulder_vals = [ | |
| f.shoulder_tilt_deg for f in window_frames | |
| if f.shoulder_tilt_deg is not None | |
| ] | |
| torso_vals = [ | |
| f.torso_lean_deg for f in window_frames | |
| if f.torso_lean_deg is not None | |
| ] | |
| shoulder_dev = ( | |
| float(np.mean(shoulder_vals)) | |
| - baseline["shoulder_tilt_deg"] | |
| if shoulder_vals | |
| and baseline.get("shoulder_tilt_deg") is not None | |
| else None | |
| ) | |
| torso_dev = ( | |
| float(np.mean(torso_vals)) - baseline["torso_lean_deg"] | |
| if torso_vals | |
| and baseline.get("torso_lean_deg") is not None | |
| else None | |
| ) | |
| poor_posture = ( | |
| (shoulder_dev is not None | |
| and abs(shoulder_dev) | |
| > self.posture_deviation_threshold_deg) | |
| or (torso_dev is not None | |
| and abs(torso_dev) | |
| > self.posture_deviation_threshold_deg) | |
| ) | |
| displacements = [] | |
| for f in window_frames: | |
| if f.head_x is not None and f.face_scale: | |
| if prev_head_pos is not None: | |
| disp = ( | |
| euclidean( | |
| (f.head_x, f.head_y), prev_head_pos) | |
| / f.face_scale | |
| ) | |
| displacements.append(disp) | |
| prev_head_pos = (f.head_x, f.head_y) | |
| head_movement_score = ( | |
| float(np.mean(displacements)) | |
| if displacements else None) | |
| head_movement_type = self._classify_head_movement(displacements) | |
| brow_vals = [ | |
| f.brow_tension_score for f in window_frames | |
| if f.brow_tension_score is not None | |
| ] | |
| brow_tension = ( | |
| float(np.mean(brow_vals)) if brow_vals else None) | |
| face_touch_count = sum( | |
| 1 for f in window_frames if f.is_face_touch) | |
| blinks_in_window = sum( | |
| 1 for t in blink_timestamps | |
| if w_start <= t < w_end) | |
| time_series.append({ | |
| "window_start": round(w_start, 2), | |
| "window_end": round(w_end, 2), | |
| "eye_contact_pct": eye_contact_pct, | |
| "shoulder_deviation_deg": shoulder_dev, | |
| "torso_deviation_deg": torso_dev, | |
| "poor_posture_flag": poor_posture, | |
| "head_movement_score": head_movement_score, | |
| "head_movement_type": head_movement_type, | |
| "brow_tension_score": brow_tension, | |
| "face_touch_count": face_touch_count, | |
| "blink_count": blinks_in_window, | |
| }) | |
| return time_series | |
| # ------------------------------------------------------------------ | |
| def _compute_summary( | |
| self, time_series, blink_timestamps, | |
| frames, calibrated_threshold: float) -> dict: | |
| duration_min = ( | |
| frames[-1].timestamp / 60.0) if frames else 0.0 | |
| eye_contact_vals = [ | |
| w["eye_contact_pct"] for w in time_series | |
| if w["eye_contact_pct"] is not None | |
| ] | |
| head_movement_vals = [ | |
| w["head_movement_score"] for w in time_series | |
| if w["head_movement_score"] is not None | |
| ] | |
| brow_vals = [ | |
| w["brow_tension_score"] for w in time_series | |
| if w["brow_tension_score"] is not None | |
| ] | |
| movement_types = [ | |
| w["head_movement_type"] for w in time_series | |
| if w["head_movement_type"] is not None | |
| ] | |
| dominant_movement = ( | |
| max(set(movement_types), key=movement_types.count) | |
| if movement_types else "stable" | |
| ) | |
| return { | |
| "avg_eye_contact_pct": ( | |
| float(np.mean(eye_contact_vals)) | |
| if eye_contact_vals else None), | |
| "poor_posture_window_pct": ( | |
| float( | |
| np.mean([w["poor_posture_flag"] | |
| for w in time_series]) * 100) | |
| if time_series else None), | |
| "avg_head_movement_score": ( | |
| float(np.mean(head_movement_vals)) | |
| if head_movement_vals else None), | |
| "dominant_head_movement_type": dominant_movement, | |
| "avg_brow_tension_score": ( | |
| float(np.mean(brow_vals)) | |
| if brow_vals else None), | |
| "total_face_touch_events": sum( | |
| w["face_touch_count"] for w in time_series), | |
| "blink_rate_per_minute": ( | |
| len(blink_timestamps) / duration_min | |
| if duration_min > 0 else None), | |
| "calibrated_blink_threshold": calibrated_threshold, | |
| "frames_with_face_detected_pct": ( | |
| float(np.mean( | |
| [f.face_detected for f in frames]) * 100) | |
| if frames else None), | |
| "frames_with_pose_detected_pct": ( | |
| float(np.mean( | |
| [f.pose_detected for f in frames]) * 100) | |
| if frames else None), | |
| "frames_with_hand_detected_pct": ( | |
| float(np.mean( | |
| [f.hand_detected for f in frames]) * 100) | |
| if frames else None), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # CLI entry point | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Mock interview body language analyzer (MediaPipe Tasks API)") | |
| parser.add_argument("video_path", help="Path to the interview video file") | |
| parser.add_argument("--pose-model", required=True) | |
| parser.add_argument("--face-model", required=True) | |
| parser.add_argument("--hand-model", required=True) | |
| parser.add_argument("-o", "--output", default="body_language_report.json") | |
| parser.add_argument("--calibration-seconds", type=float, default=5.0) | |
| parser.add_argument("--window-seconds", type=float, default=1.0) | |
| parser.add_argument("--process-every-n-frames", type=int, default=1) | |
| args = parser.parse_args() | |
| analyzer = BodyLanguageAnalyzer( | |
| pose_model_path=args.pose_model, | |
| face_model_path=args.face_model, | |
| hand_model_path=args.hand_model, | |
| calibration_seconds=args.calibration_seconds, | |
| window_seconds=args.window_seconds, | |
| process_every_n_frames=args.process_every_n_frames, | |
| ) | |
| result = analyzer.process_video(args.video_path) | |
| with open(args.output, "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2, ensure_ascii=False) | |
| print(f"Analysis complete. Report written to {args.output}") | |
| print(json.dumps(result["summary"], indent=2)) | |
| if __name__ == "__main__": | |
| main() |