| import cv2 |
| import mediapipe as mp |
| import numpy as np |
| import pandas as pd |
| import time |
| from collections import deque |
| import math |
| import json |
| import os |
| from pathlib import Path |
| from app.services.processing.eye_contact_analyzer import EyeContactAnalyzer |
| from app.services.processing.eye_contact_analyzer import analyze_eye_contact |
| from app.utils.device_utils import get_available_device |
|
|
| |
| DEVICE = get_available_device() |
|
|
| class BodyLanguageAnalyzer: |
| def __init__(self, history_size=100): |
| """ |
| Initialize the body language analyzer for interview assessment. |
| |
| Args: |
| history_size: Number of frames to keep in history for rolling metrics |
| """ |
| |
| self.mp_holistic = mp.solutions.holistic |
| self.mp_drawing = mp.solutions.drawing_utils |
| self.mp_drawing_styles = mp.solutions.drawing_styles |
| |
| self.holistic = self.mp_holistic.Holistic( |
| min_detection_confidence=0.5, |
| min_tracking_confidence=0.5, |
| static_image_mode=False |
| ) |
| |
| |
| self.history_size = history_size |
| self.total_frames = 0 |
| self.start_time = time.time() |
| |
| |
| self.shoulder_alignment_history = deque(maxlen=history_size) |
| self.lean_forward_history = deque(maxlen=history_size) |
| self.head_tilt_history = deque(maxlen=history_size) |
| |
| |
| self.hand_movement_history = deque(maxlen=history_size) |
| self.self_touch_history = deque(maxlen=history_size) |
| self.crossing_arms_history = deque(maxlen=history_size) |
| |
| |
| self.fidgeting_history = deque(maxlen=history_size) |
| self.pose_shift_history = deque(maxlen=history_size) |
| |
| |
| self.prev_pose_landmarks = None |
| self.prev_face_landmarks = None |
| self.prev_left_hand_landmarks = None |
| self.prev_right_hand_landmarks = None |
| |
| |
| self.thresholds = { |
| 'shoulder_alignment': 0.05, |
| 'lean_forward': 0.4, |
| 'head_tilt': 0.1, |
| 'hand_movement': 0.03, |
| 'self_touch': 0.1, |
| 'crossing_arms': 0.15, |
| 'fidgeting': 0.02, |
| 'pose_shift': 0.05 |
| } |
| |
| |
| self.current_state = { |
| 'shoulder_misalignment': 0, |
| 'leaning_forward': 0, |
| 'head_tilted': 0, |
| 'hand_movement': 0, |
| 'self_touching': 0, |
| 'arms_crossed': 0, |
| 'fidgeting': 0, |
| 'pose_shifting': 0, |
| 'last_pose_shift': 0 |
| } |
|
|
| def reset_stats(self): |
| """Reset all statistics for a new session.""" |
| self.shoulder_alignment_history.clear() |
| self.lean_forward_history.clear() |
| self.head_tilt_history.clear() |
| self.hand_movement_history.clear() |
| self.self_touch_history.clear() |
| self.crossing_arms_history.clear() |
| self.fidgeting_history.clear() |
| self.pose_shift_history.clear() |
| |
| self.total_frames = 0 |
| self.start_time = time.time() |
| self.prev_pose_landmarks = None |
| self.prev_face_landmarks = None |
| self.prev_left_hand_landmarks = None |
| self.prev_right_hand_landmarks = None |
|
|
| def _calculate_distance(self, point1, point2): |
| """Calculate Euclidean distance between two 3D points.""" |
| return math.sqrt((point1.x - point2.x)**2 + |
| (point1.y - point2.y)**2 + |
| (point1.z - point2.z)**2) |
|
|
| def _calculate_angle(self, point1, point2, point3): |
| """Calculate angle between three points.""" |
| vector1 = np.array([point1.x - point2.x, point1.y - point2.y, point1.z - point2.z]) |
| vector2 = np.array([point3.x - point2.x, point3.y - point2.y, point3.z - point2.z]) |
| |
| |
| norm1 = np.linalg.norm(vector1) |
| norm2 = np.linalg.norm(vector2) |
| |
| if norm1 > 0 and norm2 > 0: |
| vector1 = vector1 / norm1 |
| vector2 = vector2 / norm2 |
| |
| |
| dot_product = np.clip(np.dot(vector1, vector2), -1.0, 1.0) |
| angle = np.arccos(dot_product) |
| return np.degrees(angle) |
| |
| return 0 |
|
|
| def _calculate_landmark_movement(self, current_landmark, previous_landmark): |
| """Calculate movement between current and previous landmark position.""" |
| if current_landmark is None or previous_landmark is None: |
| return 0 |
| |
| return self._calculate_distance(current_landmark, previous_landmark) |
|
|
| def _analyze_shoulder_alignment(self, pose_landmarks): |
| """Analyze shoulder alignment (level shoulders vs. one higher than the other).""" |
| if pose_landmarks: |
| left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER] |
| right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER] |
| |
| |
| height_diff = abs(left_shoulder.y - right_shoulder.y) |
| |
| |
| shoulder_width = abs(left_shoulder.x - right_shoulder.x) |
| if shoulder_width > 0: |
| normalized_diff = height_diff / shoulder_width |
| self.shoulder_alignment_history.append(normalized_diff) |
| |
| |
| self.current_state['shoulder_misalignment'] = ( |
| normalized_diff > self.thresholds['shoulder_alignment']) |
| |
| return normalized_diff |
| |
| return 0 |
|
|
| def _analyze_lean_forward(self, pose_landmarks): |
| """Analyze if the person is leaning forward.""" |
| if pose_landmarks: |
| |
| left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER] |
| right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER] |
| left_hip = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_HIP] |
| right_hip = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_HIP] |
| |
| |
| shoulder_z = (left_shoulder.z + right_shoulder.z) / 2 |
| hip_z = (left_hip.z + right_hip.z) / 2 |
| |
| |
| shoulder_hip_y_diff = abs((left_shoulder.y + right_shoulder.y)/2 - |
| (left_hip.y + right_hip.y)/2) |
| |
| lean_forward = (shoulder_z - hip_z) / max(shoulder_hip_y_diff, 0.1) |
| |
| |
| self.lean_forward_history.append(lean_forward) |
| |
| |
| self.current_state['leaning_forward'] = ( |
| lean_forward > self.thresholds['lean_forward']) |
| |
| return lean_forward |
| |
| return 0 |
|
|
| def _analyze_head_tilt(self, face_landmarks): |
| """Analyze head tilt (left/right).""" |
| if face_landmarks: |
| |
| left_eye = face_landmarks.landmark[33] |
| right_eye = face_landmarks.landmark[263] |
| |
| |
| angle = math.atan2(right_eye.y - left_eye.y, right_eye.x - left_eye.x) |
| tilt = abs(angle) |
| |
| |
| self.head_tilt_history.append(tilt) |
| |
| |
| self.current_state['head_tilted'] = ( |
| tilt > self.thresholds['head_tilt']) |
| |
| return tilt |
| |
| return 0 |
|
|
| def _analyze_hand_movement(self, left_hand, right_hand): |
| """Analyze hand movement and gestures.""" |
| movement = 0 |
| |
| |
| if left_hand and self.prev_left_hand_landmarks: |
| |
| left_movement = self._calculate_landmark_movement( |
| left_hand.landmark[0], |
| self.prev_left_hand_landmarks.landmark[0] |
| ) |
| movement = max(movement, left_movement) |
| |
| |
| if right_hand and self.prev_right_hand_landmarks: |
| |
| right_movement = self._calculate_landmark_movement( |
| right_hand.landmark[0], |
| self.prev_right_hand_landmarks.landmark[0] |
| ) |
| movement = max(movement, right_movement) |
| |
| |
| self.hand_movement_history.append(movement) |
| |
| |
| self.current_state['hand_movement'] = ( |
| movement > self.thresholds['hand_movement']) |
| |
| return movement |
|
|
| def _analyze_self_touch(self, pose_landmarks, left_hand, right_hand, face_landmarks): |
| """Detect if hands are touching face, hair, or other body parts.""" |
| self_touch = 0 |
| |
| if face_landmarks: |
| |
| if left_hand: |
| left_index_tip = left_hand.landmark[8] |
| nose_tip = face_landmarks.landmark[4] |
| |
| left_to_face_dist = self._calculate_distance(left_index_tip, nose_tip) |
| self_touch = max(self_touch, 1.0 - min(left_to_face_dist * 5, 1.0)) |
| |
| |
| if right_hand: |
| right_index_tip = right_hand.landmark[8] |
| nose_tip = face_landmarks.landmark[4] |
| |
| right_to_face_dist = self._calculate_distance(right_index_tip, nose_tip) |
| self_touch = max(self_touch, 1.0 - min(right_to_face_dist * 5, 1.0)) |
| |
| |
| self.self_touch_history.append(self_touch) |
| |
| |
| self.current_state['self_touching'] = ( |
| self_touch > self.thresholds['self_touch']) |
| |
| return self_touch |
|
|
| def _analyze_crossing_arms(self, pose_landmarks): |
| """Detect if arms are crossed.""" |
| crossing_score = 0 |
| |
| if pose_landmarks: |
| |
| left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER] |
| right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER] |
| left_elbow = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_ELBOW] |
| right_elbow = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_ELBOW] |
| left_wrist = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_WRIST] |
| right_wrist = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_WRIST] |
| |
| |
| center_x = (left_shoulder.x + right_shoulder.x) / 2 |
| |
| left_wrist_right_of_center = left_wrist.x > center_x |
| right_wrist_left_of_center = right_wrist.x < center_x |
| |
| elbows_down = (left_elbow.y > left_shoulder.y and |
| right_elbow.y > right_shoulder.y) |
| |
| |
| if left_wrist_right_of_center and right_wrist_left_of_center and elbows_down: |
| |
| left_cross_amount = (left_wrist.x - center_x) / (right_shoulder.x - center_x) |
| right_cross_amount = (center_x - right_wrist.x) / (center_x - left_shoulder.x) |
| |
| crossing_score = min(1.0, (left_cross_amount + right_cross_amount) / 2) |
| |
| |
| self.crossing_arms_history.append(crossing_score) |
| |
| |
| self.current_state['arms_crossed'] = ( |
| crossing_score > self.thresholds['crossing_arms']) |
| |
| return crossing_score |
|
|
| def _analyze_fidgeting(self, pose_landmarks, left_hand, right_hand): |
| """Detect small repetitive movements (fidgeting).""" |
| fidgeting_score = 0 |
| |
| |
| if self.prev_left_hand_landmarks and left_hand: |
| |
| total_movement = 0 |
| count = 0 |
| |
| for i in range(21): |
| if i < len(left_hand.landmark) and i < len(self.prev_left_hand_landmarks.landmark): |
| movement = self._calculate_landmark_movement( |
| left_hand.landmark[i], |
| self.prev_left_hand_landmarks.landmark[i] |
| ) |
| total_movement += movement |
| count += 1 |
| |
| if count > 0: |
| avg_movement = total_movement / count |
| fidgeting_score = max(fidgeting_score, avg_movement) |
| |
| |
| if self.prev_right_hand_landmarks and right_hand: |
| total_movement = 0 |
| count = 0 |
| |
| for i in range(21): |
| if i < len(right_hand.landmark) and i < len(self.prev_right_hand_landmarks.landmark): |
| movement = self._calculate_landmark_movement( |
| right_hand.landmark[i], |
| self.prev_right_hand_landmarks.landmark[i] |
| ) |
| total_movement += movement |
| count += 1 |
| |
| if count > 0: |
| avg_movement = total_movement / count |
| fidgeting_score = max(fidgeting_score, avg_movement) |
| |
| |
| self.fidgeting_history.append(fidgeting_score) |
| |
| |
| self.current_state['fidgeting'] = ( |
| fidgeting_score > self.thresholds['fidgeting'] and |
| fidgeting_score < self.thresholds['hand_movement']) |
| |
| return fidgeting_score |
|
|
| def _analyze_pose_shift(self, pose_landmarks): |
| """Detect major posture shifts.""" |
| pose_shift = 0 |
| |
| if pose_landmarks and self.prev_pose_landmarks: |
| |
| upper_body_landmarks = [ |
| self.mp_holistic.PoseLandmark.LEFT_SHOULDER, |
| self.mp_holistic.PoseLandmark.RIGHT_SHOULDER, |
| self.mp_holistic.PoseLandmark.LEFT_ELBOW, |
| self.mp_holistic.PoseLandmark.RIGHT_ELBOW, |
| self.mp_holistic.PoseLandmark.LEFT_WRIST, |
| self.mp_holistic.PoseLandmark.RIGHT_WRIST, |
| self.mp_holistic.PoseLandmark.LEFT_HIP, |
| self.mp_holistic.PoseLandmark.RIGHT_HIP |
| ] |
| |
| total_movement = 0 |
| for landmark_idx in upper_body_landmarks: |
| movement = self._calculate_landmark_movement( |
| pose_landmarks.landmark[landmark_idx], |
| self.prev_pose_landmarks.landmark[landmark_idx] |
| ) |
| total_movement += movement |
| |
| pose_shift = total_movement / len(upper_body_landmarks) |
| |
| |
| self.pose_shift_history.append(pose_shift) |
| |
| |
| current_time = time.time() |
| if pose_shift > self.thresholds['pose_shift']: |
| self.current_state['pose_shifting'] = 1 |
| self.current_state['last_pose_shift'] = current_time |
| elif current_time - self.current_state['last_pose_shift'] > 3: |
| self.current_state['pose_shifting'] = 0 |
| |
| return pose_shift |
|
|
| def process_frame(self, frame, annotate=False): |
| """ |
| Process a single frame to analyze body language. |
| |
| Args: |
| frame: The video frame (BGR format) |
| annotate: Whether to draw annotations on the frame |
| |
| Returns: |
| dict: Body language metrics for this frame |
| frame: Annotated frame if annotate=True, otherwise original frame |
| """ |
| self.total_frames += 1 |
| frame_metrics = { |
| 'timestamp': time.time(), |
| 'frame_number': self.total_frames |
| } |
| |
| |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| |
| |
| results = self.holistic.process(frame_rgb) |
| |
| |
| if annotate: |
| annotated_frame = frame.copy() |
| else: |
| annotated_frame = frame |
| |
| |
| if results.pose_landmarks: |
| |
| shoulder_alignment = self._analyze_shoulder_alignment(results.pose_landmarks) |
| lean_forward = self._analyze_lean_forward(results.pose_landmarks) |
| |
| frame_metrics['shoulder_alignment'] = shoulder_alignment |
| frame_metrics['lean_forward'] = lean_forward |
| |
| |
| crossing_arms = self._analyze_crossing_arms(results.pose_landmarks) |
| frame_metrics['crossing_arms'] = crossing_arms |
| |
| |
| pose_shift = self._analyze_pose_shift(results.pose_landmarks) |
| frame_metrics['pose_shift'] = pose_shift |
| |
| if results.face_landmarks: |
| |
| head_tilt = self._analyze_head_tilt(results.face_landmarks) |
| frame_metrics['head_tilt'] = head_tilt |
| |
| |
| hand_movement = self._analyze_hand_movement( |
| results.left_hand_landmarks, |
| results.right_hand_landmarks |
| ) |
| frame_metrics['hand_movement'] = hand_movement |
| |
| |
| self_touch = self._analyze_self_touch( |
| results.pose_landmarks, |
| results.left_hand_landmarks, |
| results.right_hand_landmarks, |
| results.face_landmarks |
| ) |
| frame_metrics['self_touch'] = self_touch |
| |
| |
| fidgeting = self._analyze_fidgeting( |
| results.pose_landmarks, |
| results.left_hand_landmarks, |
| results.right_hand_landmarks |
| ) |
| frame_metrics['fidgeting'] = fidgeting |
| |
| |
| self.prev_pose_landmarks = results.pose_landmarks |
| self.prev_face_landmarks = results.face_landmarks |
| self.prev_left_hand_landmarks = results.left_hand_landmarks |
| self.prev_right_hand_landmarks = results.right_hand_landmarks |
| |
| |
| for key, value in self.current_state.items(): |
| if key != 'last_pose_shift': |
| frame_metrics[key] = value |
| |
| |
| if annotate: |
| |
| if results.pose_landmarks: |
| self.mp_drawing.draw_landmarks( |
| annotated_frame, |
| results.pose_landmarks, |
| self.mp_holistic.POSE_CONNECTIONS, |
| landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style() |
| ) |
| |
| |
| if results.face_landmarks: |
| self.mp_drawing.draw_landmarks( |
| annotated_frame, |
| results.face_landmarks, |
| self.mp_holistic.FACEMESH_TESSELATION, |
| landmark_drawing_spec=None, |
| connection_drawing_spec=self.mp_drawing_styles.get_default_face_mesh_tesselation_style() |
| ) |
| |
| |
| if results.left_hand_landmarks: |
| self.mp_drawing.draw_landmarks( |
| annotated_frame, |
| results.left_hand_landmarks, |
| self.mp_holistic.HAND_CONNECTIONS, |
| landmark_drawing_spec=self.mp_drawing_styles.get_default_hand_landmarks_style(), |
| connection_drawing_spec=self.mp_drawing_styles.get_default_hand_connections_style() |
| ) |
| if results.right_hand_landmarks: |
| self.mp_drawing.draw_landmarks( |
| annotated_frame, |
| results.right_hand_landmarks, |
| self.mp_holistic.HAND_CONNECTIONS, |
| landmark_drawing_spec=self.mp_drawing_styles.get_default_hand_landmarks_style(), |
| connection_drawing_spec=self.mp_drawing_styles.get_default_hand_connections_style() |
| ) |
| |
| |
| y_pos = 30 |
| font_scale = 0.6 |
| |
| |
| if self.current_state['shoulder_misalignment']: |
| cv2.putText(annotated_frame, "Uneven Shoulders", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) |
| y_pos += 25 |
| |
| if self.current_state['leaning_forward']: |
| cv2.putText(annotated_frame, "Leaning Forward", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2) |
| y_pos += 25 |
| |
| if self.current_state['head_tilted']: |
| cv2.putText(annotated_frame, "Head Tilted", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) |
| y_pos += 25 |
| |
| |
| if self.current_state['hand_movement']: |
| cv2.putText(annotated_frame, "Gesturing", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2) |
| y_pos += 25 |
| |
| if self.current_state['self_touching']: |
| cv2.putText(annotated_frame, "Self-Touching", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) |
| y_pos += 25 |
| |
| if self.current_state['arms_crossed']: |
| cv2.putText(annotated_frame, "Arms Crossed", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) |
| y_pos += 25 |
| |
| |
| if self.current_state['fidgeting']: |
| cv2.putText(annotated_frame, "Fidgeting", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) |
| y_pos += 25 |
| |
| if self.current_state['pose_shifting']: |
| cv2.putText(annotated_frame, "Shifting Posture", (20, y_pos), |
| cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) |
| y_pos += 25 |
| |
| return frame_metrics, annotated_frame |
|
|
| def get_stats(self): |
| """ |
| Get comprehensive body language statistics. |
| |
| Returns: |
| dict: Statistics about body language |
| """ |
| current_time = time.time() |
| total_duration = current_time - self.start_time |
| |
| |
| stats = { |
| 'total_frames': self.total_frames, |
| 'total_duration_seconds': total_duration, |
| |
| |
| 'shoulder_misalignment_percentage': self._calculate_percentage( |
| [1 if x > self.thresholds['shoulder_alignment'] else 0 |
| for x in self.shoulder_alignment_history]), |
| 'leaning_forward_percentage': self._calculate_percentage( |
| [1 if x > self.thresholds['lean_forward'] else 0 |
| for x in self.lean_forward_history]), |
| 'head_tilt_percentage': self._calculate_percentage( |
| [1 if x > self.thresholds['head_tilt'] else 0 |
| for x in self.head_tilt_history]), |
| |
| |
| 'hand_movement_percentage': self._calculate_percentage( |
| [1 if x > self.thresholds['hand_movement'] else 0 |
| for x in self.hand_movement_history]), |
| 'self_touch_percentage': self._calculate_percentage( |
| [1 if x > self.thresholds['self_touch'] else 0 |
| for x in self.self_touch_history]), |
| 'arms_crossed_percentage': self._calculate_percentage( |
| [1 if x > self.thresholds['crossing_arms'] else 0 |
| for x in self.crossing_arms_history]), |
| |
| |
| 'fidgeting_percentage': self._calculate_percentage( |
| [1 if (x > self.thresholds['fidgeting'] and x < self.thresholds['hand_movement']) else 0 |
| for x in self.fidgeting_history]), |
| 'pose_shifts_count': sum([1 if x > self.thresholds['pose_shift'] else 0 |
| for x in self.pose_shift_history]), |
| |
| |
| 'avg_shoulder_misalignment': self._calculate_average( |
| [x for x in self.shoulder_alignment_history if x > self.thresholds['shoulder_alignment']]), |
| 'avg_lean_forward': self._calculate_average( |
| [x for x in self.lean_forward_history if x > self.thresholds['lean_forward']]), |
| 'avg_head_tilt': self._calculate_average( |
| [x for x in self.head_tilt_history if x > self.thresholds['head_tilt']]), |
| 'avg_hand_movement': self._calculate_average( |
| [x for x in self.hand_movement_history if x > self.thresholds['hand_movement']]), |
| 'avg_self_touch': self._calculate_average( |
| [x for x in self.self_touch_history if x > self.thresholds['self_touch']]), |
| 'avg_arms_crossed': self._calculate_average( |
| [x for x in self.crossing_arms_history if x > self.thresholds['crossing_arms']]), |
| 'avg_fidgeting': self._calculate_average( |
| [x for x in self.fidgeting_history if x > self.thresholds['fidgeting'] |
| and x < self.thresholds['hand_movement']]) |
| } |
| |
| |
| if total_duration > 0: |
| stats['pose_shifts_per_minute'] = stats['pose_shifts_count'] / (total_duration / 60) |
| else: |
| stats['pose_shifts_per_minute'] = 0 |
| |
| return stats |
|
|
| def _calculate_percentage(self, binary_list): |
| """Calculate percentage of True/1 values in a list.""" |
| if len(binary_list) == 0: |
| return 0 |
| return sum(binary_list) / len(binary_list) * 100 |
|
|
| def _calculate_average(self, values_list): |
| """Calculate average of values in a list.""" |
| if len(values_list) == 0: |
| return 0 |
| return sum(values_list) / len(values_list) |
|
|
| def get_interview_assessment(self): |
| """ |
| Analyze body language patterns in the context of an interview. |
| |
| Returns: |
| dict: Assessment of body language with interview-specific insights |
| """ |
| stats = self.get_stats() |
| |
| |
| assessment = { |
| 'confidence_score': 0, |
| 'engagement_score': 0, |
| 'comfort_score': 0, |
| 'overall_score': 0, |
| 'strengths': [], |
| 'areas_for_improvement': [], |
| 'recommendations': [] |
| } |
| |
| |
| confidence_base = 7 |
| |
| |
| if stats['leaning_forward_percentage'] > 40: |
| confidence_base += 1 |
| assessment['strengths'].append('Shows engagement by leaning forward') |
| |
| if stats['hand_movement_percentage'] > 30 and stats['hand_movement_percentage'] < 70: |
| confidence_base += 1 |
| assessment['strengths'].append('Uses appropriate hand gestures to emphasize points') |
| |
| |
| if stats['shoulder_misalignment_percentage'] > 30: |
| confidence_base -= 1 |
| assessment['areas_for_improvement'].append('Uneven shoulders may convey tension') |
| assessment['recommendations'].append('Practice maintaining level shoulders') |
| |
| if stats['self_touch_percentage'] > 30: |
| confidence_base -= 2 |
| assessment['areas_for_improvement'].append('Frequent self-touching can signal nervousness') |
| assessment['recommendations'].append('Be mindful of touching your face or hair during interviews') |
| |
| if stats['fidgeting_percentage'] > 40: |
| confidence_base -= 2 |
| assessment['areas_for_improvement'].append('Fidgeting can distract from your message') |
| assessment['recommendations'].append('Practice stillness or channel energy into purposeful gestures') |
| |
| if stats['arms_crossed_percentage'] > 50: |
| confidence_base -= 1 |
| assessment['areas_for_improvement'].append('Frequently crossed arms can appear defensive') |
| assessment['recommendations'].append('Try to maintain a more open posture during interviews') |
| |
| |
| assessment['confidence_score'] = max(0, min(10, confidence_base)) |
| |
| |
| engagement_base = 5 |
| |
| |
| if stats['leaning_forward_percentage'] > 50: |
| engagement_base += 2 |
| if 'Shows engagement by leaning forward' not in assessment['strengths']: |
| assessment['strengths'].append('Shows engagement by leaning forward') |
| |
| if stats['hand_movement_percentage'] > 40: |
| engagement_base += 1 |
| if 'Uses appropriate hand gestures to emphasize points' not in assessment['strengths']: |
| assessment['strengths'].append('Uses appropriate hand gestures to emphasize points') |
| |
| |
| if stats['pose_shifts_per_minute'] > 3: |
| engagement_base -= 1 |
| assessment['areas_for_improvement'].append('Frequent posture shifts may indicate restlessness') |
| assessment['recommendations'].append('Work on maintaining a stable but comfortable posture') |
| |
| if stats['arms_crossed_percentage'] > 60: |
| engagement_base -= 2 |
| if 'Frequently crossed arms can appear defensive' not in assessment['areas_for_improvement']: |
| assessment['areas_for_improvement'].append('Crossed arms can signal disengagement or defensiveness') |
| |
| |
| assessment['engagement_score'] = max(0, min(10, engagement_base)) |
| |
| |
| comfort_base = 6 |
| |
| |
| if stats['fidgeting_percentage'] > 30: |
| comfort_base -= 1 |
| if 'Fidgeting can distract from your message' not in assessment['areas_for_improvement']: |
| assessment['areas_for_improvement'].append('Fidgeting indicates nervousness or discomfort') |
| |
| if stats['self_touch_percentage'] > 40: |
| comfort_base -= 1 |
| if 'Frequent self-touching can signal nervousness' not in assessment['areas_for_improvement']: |
| assessment['areas_for_improvement'].append('Self-touching often indicates anxiety or discomfort') |
| |
| if stats['pose_shifts_count'] > (stats['total_duration_seconds'] / 20): |
| comfort_base -= 1 |
| if 'Frequent posture shifts may indicate restlessness' not in assessment['areas_for_improvement']: |
| assessment['areas_for_improvement'].append('Frequent posture adjustments suggest discomfort') |
| assessment['recommendations'].append('Find a comfortable seated position before the interview') |
| |
| |
| if stats['shoulder_misalignment_percentage'] < 20: |
| comfort_base += 1 |
| assessment['strengths'].append('Maintains balanced, relaxed shoulder posture') |
| |
| if stats['fidgeting_percentage'] < 15 and stats['self_touch_percentage'] < 15: |
| comfort_base += 2 |
| assessment['strengths'].append('Appears calm and composed through minimal nervous movements') |
| |
| |
| assessment['comfort_score'] = max(0, min(10, comfort_base)) |
| |
| |
| assessment['overall_score'] = ( |
| assessment['confidence_score'] * 0.4 + |
| assessment['engagement_score'] * 0.4 + |
| assessment['comfort_score'] * 0.2 |
| ) |
| |
| |
| if not assessment['recommendations']: |
| assessment['recommendations'] = [ |
| 'Practice interviews with video recording to observe your body language', |
| 'Focus on maintaining an open, engaged posture', |
| 'Use purposeful hand gestures to emphasize key points' |
| ] |
| |
| |
| if not assessment['strengths']: |
| assessment['strengths'] = [ |
| 'Shows baseline appropriate interview body language', |
| 'Maintains basic professional demeanor' |
| ] |
| |
| return assessment |
|
|
|
|
| def analyze_body_language(frame, analyzer=None, annotate=False): |
| """ |
| Analyze body language in a single frame. |
| |
| Args: |
| frame: The video frame (BGR format) |
| analyzer: An existing BodyLanguageAnalyzer instance, or None to create a new one |
| annotate: Whether to annotate the frame with visualization |
| |
| Returns: |
| tuple: (metrics, analyzer, annotated_frame) |
| - metrics: Dictionary of body language metrics for this frame |
| - analyzer: The BodyLanguageAnalyzer instance (new or updated) |
| - annotated_frame: The frame with annotations if requested |
| """ |
| if analyzer is None: |
| analyzer = BodyLanguageAnalyzer() |
| |
| metrics, annotated_frame = analyzer.process_frame(frame, annotate) |
| return metrics, analyzer, annotated_frame |
|
|
|
|
| class InterviewAnalyzer: |
| """ |
| Combined analyzer for comprehensive interview assessment including |
| eye contact and body language. |
| """ |
| def __init__(self): |
| self.eye_contact_analyzer = EyeContactAnalyzer() |
| self.body_language_analyzer = BodyLanguageAnalyzer() |
| self.total_frames = 0 |
| self.start_time = time.time() |
| self.frame_metrics = [] |
| |
| def reset(self): |
| """Reset all analyzers for a new session.""" |
| self.eye_contact_analyzer.reset_stats() |
| self.body_language_analyzer.reset_stats() |
| self.total_frames = 0 |
| self.start_time = time.time() |
| self.frame_metrics = [] |
| |
| def process_frame(self, frame, annotate=False): |
| """ |
| Process a frame through both eye contact and body language analyzers. |
| |
| Args: |
| frame: The video frame (BGR format) |
| annotate: Whether to annotate the frame with visualization |
| |
| Returns: |
| tuple: (combined_metrics, annotated_frame) |
| """ |
| self.total_frames += 1 |
| |
| |
| eye_metrics, _, _ = analyze_eye_contact(frame, self.eye_contact_analyzer, False) |
| |
| |
| body_metrics, body_frame = self.body_language_analyzer.process_frame(frame, annotate) |
| |
| |
| combined_metrics = {**eye_metrics, **body_metrics} |
| combined_metrics['frame_number'] = self.total_frames |
| combined_metrics['timestamp'] = time.time() |
| |
| |
| self.frame_metrics.append(combined_metrics) |
| |
| return combined_metrics, body_frame |
| |
| def get_comprehensive_assessment(self): |
| """ |
| Get a comprehensive assessment combining eye contact and body language insights. |
| |
| Returns: |
| dict: Combined assessment with overall interview performance metrics |
| """ |
| |
| eye_contact_stats = self.eye_contact_analyzer.get_stats() |
| eye_contact_assessment = self.eye_contact_analyzer.get_interview_assessment() |
| |
| body_language_stats = self.body_language_analyzer.get_stats() |
| body_language_assessment = self.body_language_analyzer.get_interview_assessment() |
| |
| |
| assessment = { |
| 'overall_score': (eye_contact_assessment['score'] * 0.4 + |
| body_language_assessment['overall_score'] * 0.6), |
| 'eye_contact': { |
| 'score': eye_contact_assessment['score'], |
| 'patterns': eye_contact_assessment['patterns'], |
| 'recommendations': eye_contact_assessment['recommendations'] |
| }, |
| 'body_language': { |
| 'confidence_score': body_language_assessment['confidence_score'], |
| 'engagement_score': body_language_assessment['engagement_score'], |
| 'comfort_score': body_language_assessment['comfort_score'], |
| 'strengths': body_language_assessment['strengths'], |
| 'areas_for_improvement': body_language_assessment['areas_for_improvement'], |
| 'recommendations': body_language_assessment['recommendations'] |
| }, |
| 'key_statistics': { |
| 'total_duration_seconds': time.time() - self.start_time, |
| 'total_frames': self.total_frames, |
| 'eye_contact_percentage': eye_contact_stats['eye_contact_percentage'], |
| 'longest_eye_contact_seconds': eye_contact_stats['longest_eye_contact_seconds'], |
| 'average_contact_duration_seconds': eye_contact_stats['average_contact_duration_seconds'], |
| 'shoulder_misalignment_percentage': body_language_stats['shoulder_misalignment_percentage'], |
| 'leaning_forward_percentage': body_language_stats['leaning_forward_percentage'], |
| 'head_tilt_percentage': body_language_stats['head_tilt_percentage'], |
| 'arms_crossed_percentage': body_language_stats['arms_crossed_percentage'], |
| 'self_touch_percentage': body_language_stats['self_touch_percentage'], |
| 'fidgeting_percentage': body_language_stats['fidgeting_percentage'], |
| 'pose_shifts_per_minute': body_language_stats['pose_shifts_per_minute'] |
| }, |
| 'processing_info': { |
| 'device_used': DEVICE |
| } |
| } |
| |
| |
| if assessment['overall_score'] >= 8.5: |
| assessment['overall_assessment'] = "Excellent interview presence. Your body language and eye contact project confidence and engagement." |
| elif assessment['overall_score'] >= 7: |
| assessment['overall_assessment'] = "Strong interview presence with some minor areas for improvement." |
| elif assessment['overall_score'] >= 5.5: |
| assessment['overall_assessment'] = "Adequate interview presence with several areas that could be strengthened." |
| else: |
| assessment['overall_assessment'] = "Your interview presence needs significant improvement to make a positive impression." |
| |
| return assessment |
|
|
|
|
| def example_interview_assessment(): |
| """ |
| Generate an example interview assessment for demonstration purposes. |
| |
| Returns: |
| dict: Example assessment |
| """ |
| assessment = { |
| 'overall_score': 7.8, |
| 'overall_assessment': "Strong interview presence with some minor areas for improvement.", |
| 'eye_contact': { |
| 'score': 8.0, |
| 'patterns': ["Good eye contact maintained throughout most of the interview"], |
| 'recommendations': ["Slightly reduce the intensity of eye contact in some moments"] |
| }, |
| 'body_language': { |
| 'confidence_score': 7.5, |
| 'engagement_score': 8.0, |
| 'comfort_score': 7.0, |
| 'strengths': [ |
| "Good upright posture", |
| "Appropriate hand gestures", |
| "Engaged facial expressions" |
| ], |
| 'areas_for_improvement': [ |
| "Occasional fidgeting", |
| "Some tension in shoulders" |
| ], |
| 'recommendations': [ |
| "Practice relaxation techniques before interviews", |
| "Be mindful of hand movements when nervous", |
| "Maintain balanced posture throughout" |
| ] |
| }, |
| 'key_statistics': { |
| 'total_duration_seconds': 300.0, |
| 'total_frames': 9000, |
| 'eye_contact_percentage': 65.0, |
| 'longest_eye_contact_seconds': 8.5, |
| 'average_contact_duration_seconds': 4.2, |
| 'shoulder_misalignment_percentage': 85.0, |
| 'leaning_forward_percentage': 40.0, |
| 'head_tilt_percentage': 15.0, |
| 'arms_crossed_percentage': 10.0, |
| 'self_touch_percentage': 25.0, |
| 'fidgeting_percentage': 30.0, |
| 'pose_shifts_per_minute': 2.5 |
| }, |
| 'processing_info': { |
| 'device_used': DEVICE |
| } |
| } |
| |
| print("\n=== EXAMPLE INTERVIEW ASSESSMENT ===") |
| print(f"Overall Score: {assessment['overall_score']}/10") |
| print(f"Assessment: {assessment['overall_assessment']}") |
| |
| print("\nEYE CONTACT:") |
| print(f"Score: {assessment['eye_contact']['score']}/10") |
| for pattern in assessment['eye_contact']['patterns']: |
| print(f"- {pattern}") |
| |
| print("\nBODY LANGUAGE:") |
| print(f"Confidence Score: {assessment['body_language']['confidence_score']}/10") |
| print(f"Engagement Score: {assessment['body_language']['engagement_score']}/10") |
| print(f"Comfort Score: {assessment['body_language']['comfort_score']}/10") |
| |
| print("\nSTRENGTHS:") |
| for strength in assessment['body_language']['strengths']: |
| print(f"+ {strength}") |
| |
| print("\nAREAS FOR IMPROVEMENT:") |
| for area in assessment['body_language']['areas_for_improvement']: |
| print(f"- {area}") |
| |
| print("\nPRIORITY RECOMMENDATIONS:") |
| for i, rec in enumerate(assessment['body_language']['recommendations'], 1): |
| print(f"{i}. {rec}") |
| |
| return assessment |
|
|
|
|
| def analyze_video_file(video_path, display_video=False, save_results=False): |
| """ |
| Analyze body language in a video file and get statistics. |
| |
| Args: |
| video_path: Path to the video file |
| display_video: Whether to display the video during analysis |
| save_results: Whether to save results to a JSON file |
| |
| Returns: |
| dict: Body language statistics and assessment |
| """ |
| |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| print(f"Error: Could not open video file {video_path}") |
| return None |
| |
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = frame_count / fps if fps > 0 else 0 |
| |
| |
| analyzer = BodyLanguageAnalyzer() |
| frame_number = 0 |
| |
| |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| |
| metrics, analyzer, annotated_frame = analyze_body_language(frame, analyzer, display_video) |
| |
| |
| frame_number += 1 |
| |
| |
| if display_video: |
| cv2.imshow("Body Language Analysis", annotated_frame) |
| |
| |
| if cv2.waitKey(1) & 0xFF == ord('q'): |
| break |
| |
| |
| cap.release() |
| if display_video: |
| cv2.destroyAllWindows() |
| |
| |
| stats = analyzer.get_stats() |
| assessment = analyzer.get_interview_assessment() |
| |
| |
| results = { |
| "video_info": { |
| "path": video_path, |
| "frames": frame_count, |
| "fps": fps, |
| "duration_seconds": duration, |
| "device_used": DEVICE |
| }, |
| "body_language_stats": stats, |
| "assessment": assessment |
| } |
| |
| |
| if save_results: |
| from datetime import datetime |
| output_dir = os.path.join(os.path.dirname(video_path), "results") |
| os.makedirs(output_dir, exist_ok=True) |
| output_file = f"{output_dir}/{Path(video_path).stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_body_language_analysis.json" |
| with open(output_file, 'w') as f: |
| json.dump(results, f, indent=4) |
| |
| return results |
|
|
|
|
| if __name__ == "__main__": |
| example_interview_assessment() |