mockInterview / Video_Analysis.py
Rawanfx's picture
initial commit
3b7703c
Raw
History Blame Contribute Delete
25.5 kB
import argparse
import json
import math
from dataclasses import dataclass
from typing import Optional
import cv2
import numpy as np
import mediapipe as mp
from mediapipe.tasks.python import vision
from mediapipe.tasks.python.core.base_options import BaseOptions
RunningMode = vision.RunningMode
VISIBILITY_THRESHOLD = 0.5
# Pose landmark indices (BlazePose, 33 points)
POSE_IDX = {
"left_shoulder": 11,
"right_shoulder": 12,
"left_hip": 23,
"right_hip": 24,
}
# Face mesh indices
NOSE_TIP_IDX = 1
LEFT_EYE_OUTER_IDX = 33
RIGHT_EYE_OUTER_IDX = 263
# ARKit blendshapes
BLINK_BLENDSHAPES = ["eyeBlinkLeft", "eyeBlinkRight"]
# Brow tension: browInnerUp is the primary anxiety/tension indicator
BROW_TENSION_BLENDSHAPES = [
"browDownLeft", # anger
"browDownRight",
"browInnerUp", # worry / nervousness
]
# Fingertip indices only (more accurate than palm centroid)
FINGERTIP_INDICES = [4, 8, 12, 16, 20]
# ---------------------------------------------------------------------------
# Geometry helpers
# ---------------------------------------------------------------------------
def euclidean(p1, p2) -> float:
return float(np.linalg.norm(np.array(p1) - np.array(p2)))
def rotation_matrix_to_euler_angles(R: np.ndarray):
"""Returns (pitch, yaw, roll) in degrees from a 3x3 rotation matrix."""
sy = math.sqrt(R[0, 0] ** 2 + R[1, 0] ** 2)
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0.0
return tuple(math.degrees(a) for a in (x, y, z))
def angle_from_horizontal(p_left, p_right) -> float:
dx = p_right[0] - p_left[0]
dy = p_right[1] - p_left[1]
return math.degrees(math.atan2(dy, dx))
def angle_from_vertical(p_top, p_bottom) -> float:
dx = p_bottom[0] - p_top[0]
dy = p_bottom[1] - p_top[1]
return math.degrees(math.atan2(dx, dy))
def blendshape_score(blendshapes, names) -> Optional[float]:
if not blendshapes:
return None
lookup = {c.category_name: c.score for c in blendshapes}
vals = [lookup[n] for n in names if n in lookup]
return float(np.mean(vals)) if vals else None
# ---------------------------------------------------------------------------
# Per-frame raw metrics container
# ---------------------------------------------------------------------------
@dataclass
class FrameMetrics:
timestamp: float
face_detected: bool = False
pose_detected: bool = False
hand_detected: bool = False
blink_score: Optional[float] = None
is_blink_frame: bool = False
brow_tension_score: Optional[float] = None
looking_at_camera: Optional[bool] = None
yaw: Optional[float] = None
pitch: Optional[float] = None
shoulder_tilt_deg: Optional[float] = None
torso_lean_deg: Optional[float] = None
head_x: Optional[float] = None
head_y: Optional[float] = None
face_scale: Optional[float] = None
hand_to_face_ratio: Optional[float] = None
is_face_touch: bool = False
# ---------------------------------------------------------------------------
# Main analyzer
# ---------------------------------------------------------------------------
class BodyLanguageAnalyzer:
def __init__(
self,
pose_model_path: str,
face_model_path: str,
hand_model_path: str,
calibration_seconds: float = 5.0,
window_seconds: float = 1.0,
blink_score_threshold: float = 0.35, # sensible default for eyeBlinkLeft/Right
blink_min_consec_frames: int = 2,
gaze_yaw_threshold_deg: float = 20.0,
gaze_pitch_threshold_deg: float = 15.0,
face_touch_distance_ratio: float = 2.5,
posture_deviation_threshold_deg: float = 10.0,
process_every_n_frames: int = 1,
):
self.pose_model_path = pose_model_path
self.face_model_path = face_model_path
self.hand_model_path = hand_model_path
self.calibration_seconds = calibration_seconds
self.window_seconds = window_seconds
self.blink_score_threshold = blink_score_threshold
self.blink_min_consec_frames = blink_min_consec_frames
self.gaze_yaw_threshold_deg = gaze_yaw_threshold_deg
self.gaze_pitch_threshold_deg = gaze_pitch_threshold_deg
self.face_touch_distance_ratio = face_touch_distance_ratio
self.posture_deviation_threshold_deg = posture_deviation_threshold_deg
self.process_every_n_frames = max(1, process_every_n_frames)
# ------------------------------------------------------------------
def _build_landmarkers(self):
pose = vision.PoseLandmarker.create_from_options(
vision.PoseLandmarkerOptions(
base_options=BaseOptions(model_asset_path=self.pose_model_path),
running_mode=RunningMode.VIDEO,
)
)
face = vision.FaceLandmarker.create_from_options(
vision.FaceLandmarkerOptions(
base_options=BaseOptions(model_asset_path=self.face_model_path),
running_mode=RunningMode.VIDEO,
output_face_blendshapes=True,
output_facial_transformation_matrixes=True,
)
)
hand = vision.HandLandmarker.create_from_options(
vision.HandLandmarkerOptions(
base_options=BaseOptions(model_asset_path=self.hand_model_path),
running_mode=RunningMode.VIDEO,
num_hands=2,
)
)
return pose, face, hand
# ------------------------------------------------------------------
# Calibrate blink threshold per-person from the first N seconds
# ------------------------------------------------------------------
def _calibrate_blink_threshold(
self, frames: list[FrameMetrics]) -> float:
"""
FIX: The eyeBlinkLeft/Right blendshape is HIGH when eye is CLOSED
(approaching 1.0 = fully closed) and LOW when eye is open (≈0.0–0.2).
Strategy:
1. Collect blink scores from the first 10 s (mostly open-eye baseline).
2. Compute mean of open-eye scores.
3. Set threshold = mean + 1.5 * std → catches spikes above normal open-eye level.
4. Clamp to [0.25, 0.70] for safety.
This means "is_closed = blink_score >= threshold" is correct:
a spike in the blink score above the open-eye baseline = blink.
"""
cutoff = 10.0
scores = [
f.blink_score
for f in frames
if f.timestamp <= cutoff and f.blink_score is not None
]
if len(scores) < 10:
return self.blink_score_threshold # not enough data → fallback
mean = float(np.mean(scores))
std = float(np.std(scores))
# Open-eye scores are low (≈0.05–0.15). A blink = spike above that.
# mean + 1.5*std gives a threshold that is clearly above normal noise.
threshold = mean + 1.5 * std
# Clamp: never lower than 0.25 (avoid noise triggers),
# never higher than 0.70 (would miss real blinks).
return float(np.clip(threshold, 0.25, 0.70))
# ------------------------------------------------------------------
# Classify head movement as stable / natural / nervous
# ------------------------------------------------------------------
def _classify_head_movement(
self, displacements: list[float]) -> str:
"""
Distinguish between:
stable — barely any movement
natural — occasional deliberate nods / turns
nervous — frequent small rapid movements
"""
if not displacements:
return "stable"
mean_disp = float(np.mean(displacements))
rapid_moves = sum(1 for d in displacements if d > 0.05)
frequency = rapid_moves / len(displacements)
if mean_disp < 0.02:
return "stable"
elif frequency > 0.3:
return "nervous"
else:
return "natural"
# ------------------------------------------------------------------
def process_video(self, video_path: str) -> dict:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"Could not open video file: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
pose_lm, face_lm, hand_lm = self._build_landmarkers()
raw_frames: list[FrameMetrics] = []
blink_timestamps: list[float] = []
# ── Pass 1: collect all frames ───────────────────────────────
frame_idx = 0
try:
while True:
ok, frame = cap.read()
if not ok:
break
if frame_idx % self.process_every_n_frames != 0:
frame_idx += 1
continue
timestamp = frame_idx / fps
timestamp_ms = int(timestamp * 1000)
h, w = frame.shape[:2]
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
mp_image = mp.Image(
image_format=mp.ImageFormat.SRGB, data=rgb)
pose_result = pose_lm.detect_for_video(
mp_image, timestamp_ms)
face_result = face_lm.detect_for_video(
mp_image, timestamp_ms)
hand_result = hand_lm.detect_for_video(
mp_image, timestamp_ms)
fm = FrameMetrics(timestamp=timestamp)
# ── Face ──────────────────────────────────────────────
if face_result.face_landmarks:
fm.face_detected = True
fl = face_result.face_landmarks[0]
blendshapes = (
face_result.face_blendshapes[0]
if face_result.face_blendshapes else None
)
fm.blink_score = blendshape_score(
blendshapes, BLINK_BLENDSHAPES)
fm.brow_tension_score = blendshape_score(
blendshapes, BROW_TENSION_BLENDSHAPES)
fm.face_scale = euclidean(
(fl[LEFT_EYE_OUTER_IDX].x * w,
fl[LEFT_EYE_OUTER_IDX].y * h),
(fl[RIGHT_EYE_OUTER_IDX].x * w,
fl[RIGHT_EYE_OUTER_IDX].y * h),
)
fm.head_x = fl[NOSE_TIP_IDX].x * w
fm.head_y = fl[NOSE_TIP_IDX].y * h
if face_result.facial_transformation_matrixes:
matrix = (
face_result.facial_transformation_matrixes[0])
rotation = matrix[:3, :3]
pitch, yaw, _roll = (
rotation_matrix_to_euler_angles(rotation))
fm.yaw, fm.pitch = yaw, pitch
fm.looking_at_camera = (
abs(yaw) <= self.gaze_yaw_threshold_deg
and abs(pitch) <= self.gaze_pitch_threshold_deg
)
# ── Pose ──────────────────────────────────────────────
if pose_result.pose_landmarks:
pl = pose_result.pose_landmarks[0]
def vis_ok(i):
v = pl[i].visibility
return v is None or v >= VISIBILITY_THRESHOLD
if (vis_ok(POSE_IDX["left_shoulder"])
and vis_ok(POSE_IDX["right_shoulder"])):
fm.pose_detected = True
ls = (pl[POSE_IDX["left_shoulder"]].x * w,
pl[POSE_IDX["left_shoulder"]].y * h)
rs = (pl[POSE_IDX["right_shoulder"]].x * w,
pl[POSE_IDX["right_shoulder"]].y * h)
fm.shoulder_tilt_deg = angle_from_horizontal(
ls, rs)
if (vis_ok(POSE_IDX["left_hip"])
and vis_ok(POSE_IDX["right_hip"])):
lh = (pl[POSE_IDX["left_hip"]].x * w,
pl[POSE_IDX["left_hip"]].y * h)
rh = (pl[POSE_IDX["right_hip"]].x * w,
pl[POSE_IDX["right_hip"]].y * h)
shoulder_mid = (
(ls[0] + rs[0]) / 2,
(ls[1] + rs[1]) / 2,
)
hip_mid = (
(lh[0] + rh[0]) / 2,
(lh[1] + rh[1]) / 2,
)
fm.torso_lean_deg = angle_from_vertical(
shoulder_mid, hip_mid)
# ── Hands ─────────────────────────────────────────────
if hand_result.hand_landmarks:
fm.hand_detected = True
if (fm.face_detected
and fm.head_x is not None
and fm.face_scale):
min_ratio = None
for hand_pts in hand_result.hand_landmarks:
fingertips = [
hand_pts[i] for i in FINGERTIP_INDICES]
cx = float(
np.mean([p.x for p in fingertips])) * w
cy = float(
np.mean([p.y for p in fingertips])) * h
dist = euclidean(
(cx, cy), (fm.head_x, fm.head_y))
ratio = dist / fm.face_scale
if min_ratio is None or ratio < min_ratio:
min_ratio = ratio
if min_ratio is not None:
fm.hand_to_face_ratio = min_ratio
fm.is_face_touch = (
min_ratio <= self.face_touch_distance_ratio)
raw_frames.append(fm)
frame_idx += 1
finally:
cap.release()
pose_lm.close()
face_lm.close()
hand_lm.close()
# ── FIX: calibrate blink threshold then re-detect blinks ─────
calibrated_threshold = self._calibrate_blink_threshold(raw_frames)
below_threshold_run = 0
for fm in raw_frames:
fm.is_blink_frame = False # reset
if fm.blink_score is not None:
# eyeBlinkLeft/Right is HIGH when closed → spike = blink
is_closed = fm.blink_score >= calibrated_threshold
if is_closed:
below_threshold_run += 1
else:
# Transition: was closed for ≥ N frames → count as blink
if below_threshold_run >= self.blink_min_consec_frames:
blink_timestamps.append(fm.timestamp)
fm.is_blink_frame = True
below_threshold_run = 0
# Baseline uses median (robust to nervous first seconds)
baseline = self._compute_baseline(raw_frames)
time_series = self._aggregate_windows(
raw_frames, blink_timestamps, baseline)
summary = self._compute_summary(
time_series, blink_timestamps, raw_frames,
calibrated_threshold)
return {
"fps": fps,
"duration_seconds": frame_idx / fps if fps else None,
"calibration_baseline": baseline,
"calibrated_blink_threshold": calibrated_threshold,
"time_series": time_series,
"summary": summary,
}
# ------------------------------------------------------------------
# Baseline uses median (robust to outliers in first few seconds)
# ------------------------------------------------------------------
def _compute_baseline(
self, frames: list[FrameMetrics]) -> dict:
cutoff = self.calibration_seconds
shoulder_vals = [
f.shoulder_tilt_deg for f in frames
if f.timestamp <= cutoff
and f.shoulder_tilt_deg is not None
]
torso_vals = [
f.torso_lean_deg for f in frames
if f.timestamp <= cutoff
and f.torso_lean_deg is not None
]
return {
"shoulder_tilt_deg": (
float(np.median(shoulder_vals))
if shoulder_vals else None),
"torso_lean_deg": (
float(np.median(torso_vals))
if torso_vals else None),
"samples_used": len(shoulder_vals),
}
# ------------------------------------------------------------------
def _aggregate_windows(
self, frames, blink_timestamps, baseline) -> list[dict]:
if not frames:
return []
total_duration = frames[-1].timestamp
n_windows = int(total_duration // self.window_seconds) + 1
time_series = []
prev_head_pos = None
for w_idx in range(n_windows):
w_start = w_idx * self.window_seconds
w_end = w_start + self.window_seconds
window_frames = [
f for f in frames
if w_start <= f.timestamp < w_end
]
if not window_frames:
continue
looking_flags = [
f.looking_at_camera for f in window_frames
if f.looking_at_camera is not None
]
eye_contact_pct = (
float(np.mean(looking_flags) * 100)
if looking_flags else None)
shoulder_vals = [
f.shoulder_tilt_deg for f in window_frames
if f.shoulder_tilt_deg is not None
]
torso_vals = [
f.torso_lean_deg for f in window_frames
if f.torso_lean_deg is not None
]
shoulder_dev = (
float(np.mean(shoulder_vals))
- baseline["shoulder_tilt_deg"]
if shoulder_vals
and baseline.get("shoulder_tilt_deg") is not None
else None
)
torso_dev = (
float(np.mean(torso_vals)) - baseline["torso_lean_deg"]
if torso_vals
and baseline.get("torso_lean_deg") is not None
else None
)
poor_posture = (
(shoulder_dev is not None
and abs(shoulder_dev)
> self.posture_deviation_threshold_deg)
or (torso_dev is not None
and abs(torso_dev)
> self.posture_deviation_threshold_deg)
)
displacements = []
for f in window_frames:
if f.head_x is not None and f.face_scale:
if prev_head_pos is not None:
disp = (
euclidean(
(f.head_x, f.head_y), prev_head_pos)
/ f.face_scale
)
displacements.append(disp)
prev_head_pos = (f.head_x, f.head_y)
head_movement_score = (
float(np.mean(displacements))
if displacements else None)
head_movement_type = self._classify_head_movement(displacements)
brow_vals = [
f.brow_tension_score for f in window_frames
if f.brow_tension_score is not None
]
brow_tension = (
float(np.mean(brow_vals)) if brow_vals else None)
face_touch_count = sum(
1 for f in window_frames if f.is_face_touch)
blinks_in_window = sum(
1 for t in blink_timestamps
if w_start <= t < w_end)
time_series.append({
"window_start": round(w_start, 2),
"window_end": round(w_end, 2),
"eye_contact_pct": eye_contact_pct,
"shoulder_deviation_deg": shoulder_dev,
"torso_deviation_deg": torso_dev,
"poor_posture_flag": poor_posture,
"head_movement_score": head_movement_score,
"head_movement_type": head_movement_type,
"brow_tension_score": brow_tension,
"face_touch_count": face_touch_count,
"blink_count": blinks_in_window,
})
return time_series
# ------------------------------------------------------------------
def _compute_summary(
self, time_series, blink_timestamps,
frames, calibrated_threshold: float) -> dict:
duration_min = (
frames[-1].timestamp / 60.0) if frames else 0.0
eye_contact_vals = [
w["eye_contact_pct"] for w in time_series
if w["eye_contact_pct"] is not None
]
head_movement_vals = [
w["head_movement_score"] for w in time_series
if w["head_movement_score"] is not None
]
brow_vals = [
w["brow_tension_score"] for w in time_series
if w["brow_tension_score"] is not None
]
movement_types = [
w["head_movement_type"] for w in time_series
if w["head_movement_type"] is not None
]
dominant_movement = (
max(set(movement_types), key=movement_types.count)
if movement_types else "stable"
)
return {
"avg_eye_contact_pct": (
float(np.mean(eye_contact_vals))
if eye_contact_vals else None),
"poor_posture_window_pct": (
float(
np.mean([w["poor_posture_flag"]
for w in time_series]) * 100)
if time_series else None),
"avg_head_movement_score": (
float(np.mean(head_movement_vals))
if head_movement_vals else None),
"dominant_head_movement_type": dominant_movement,
"avg_brow_tension_score": (
float(np.mean(brow_vals))
if brow_vals else None),
"total_face_touch_events": sum(
w["face_touch_count"] for w in time_series),
"blink_rate_per_minute": (
len(blink_timestamps) / duration_min
if duration_min > 0 else None),
"calibrated_blink_threshold": calibrated_threshold,
"frames_with_face_detected_pct": (
float(np.mean(
[f.face_detected for f in frames]) * 100)
if frames else None),
"frames_with_pose_detected_pct": (
float(np.mean(
[f.pose_detected for f in frames]) * 100)
if frames else None),
"frames_with_hand_detected_pct": (
float(np.mean(
[f.hand_detected for f in frames]) * 100)
if frames else None),
}
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Mock interview body language analyzer (MediaPipe Tasks API)")
parser.add_argument("video_path", help="Path to the interview video file")
parser.add_argument("--pose-model", required=True)
parser.add_argument("--face-model", required=True)
parser.add_argument("--hand-model", required=True)
parser.add_argument("-o", "--output", default="body_language_report.json")
parser.add_argument("--calibration-seconds", type=float, default=5.0)
parser.add_argument("--window-seconds", type=float, default=1.0)
parser.add_argument("--process-every-n-frames", type=int, default=1)
args = parser.parse_args()
analyzer = BodyLanguageAnalyzer(
pose_model_path=args.pose_model,
face_model_path=args.face_model,
hand_model_path=args.hand_model,
calibration_seconds=args.calibration_seconds,
window_seconds=args.window_seconds,
process_every_n_frames=args.process_every_n_frames,
)
result = analyzer.process_video(args.video_path)
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"Analysis complete. Report written to {args.output}")
print(json.dumps(result["summary"], indent=2))
if __name__ == "__main__":
main()