import cv2 import mediapipe as mp import numpy as np import pandas as pd import time from collections import deque import math import json import os from pathlib import Path from app.services.processing.eye_contact_analyzer import EyeContactAnalyzer from app.services.processing.eye_contact_analyzer import analyze_eye_contact from app.utils.device_utils import get_available_device # Initialize device once at module level DEVICE = get_available_device() class BodyLanguageAnalyzer: def __init__(self, history_size=100): """ Initialize the body language analyzer for interview assessment. Args: history_size: Number of frames to keep in history for rolling metrics """ # Initialize MediaPipe Pose and Holistic self.mp_holistic = mp.solutions.holistic self.mp_drawing = mp.solutions.drawing_utils self.mp_drawing_styles = mp.solutions.drawing_styles self.holistic = self.mp_holistic.Holistic( min_detection_confidence=0.5, min_tracking_confidence=0.5, static_image_mode=False ) # Stats tracking self.history_size = history_size self.total_frames = 0 self.start_time = time.time() # Posture tracking self.shoulder_alignment_history = deque(maxlen=history_size) self.lean_forward_history = deque(maxlen=history_size) self.head_tilt_history = deque(maxlen=history_size) # Gesture tracking self.hand_movement_history = deque(maxlen=history_size) self.self_touch_history = deque(maxlen=history_size) self.crossing_arms_history = deque(maxlen=history_size) # Movement tracking self.fidgeting_history = deque(maxlen=history_size) self.pose_shift_history = deque(maxlen=history_size) # Previous frame landmarks for movement detection self.prev_pose_landmarks = None self.prev_face_landmarks = None self.prev_left_hand_landmarks = None self.prev_right_hand_landmarks = None # Threshold values self.thresholds = { 'shoulder_alignment': 0.05, # Shoulder height difference ratio 'lean_forward': 0.4, # Forward lean threshold 'head_tilt': 0.1, # Head tilt angle threshold 'hand_movement': 0.03, # Hand movement threshold 'self_touch': 0.1, # Self-touch proximity threshold 'crossing_arms': 0.15, # Arms crossing threshold 'fidgeting': 0.02, # Fidgeting movement threshold 'pose_shift': 0.05 # Major posture shift threshold } # Current state self.current_state = { 'shoulder_misalignment': 0, 'leaning_forward': 0, 'head_tilted': 0, 'hand_movement': 0, 'self_touching': 0, 'arms_crossed': 0, 'fidgeting': 0, 'pose_shifting': 0, 'last_pose_shift': 0 } def reset_stats(self): """Reset all statistics for a new session.""" self.shoulder_alignment_history.clear() self.lean_forward_history.clear() self.head_tilt_history.clear() self.hand_movement_history.clear() self.self_touch_history.clear() self.crossing_arms_history.clear() self.fidgeting_history.clear() self.pose_shift_history.clear() self.total_frames = 0 self.start_time = time.time() self.prev_pose_landmarks = None self.prev_face_landmarks = None self.prev_left_hand_landmarks = None self.prev_right_hand_landmarks = None def _calculate_distance(self, point1, point2): """Calculate Euclidean distance between two 3D points.""" return math.sqrt((point1.x - point2.x)**2 + (point1.y - point2.y)**2 + (point1.z - point2.z)**2) def _calculate_angle(self, point1, point2, point3): """Calculate angle between three points.""" vector1 = np.array([point1.x - point2.x, point1.y - point2.y, point1.z - point2.z]) vector2 = np.array([point3.x - point2.x, point3.y - point2.y, point3.z - point2.z]) # Normalize vectors norm1 = np.linalg.norm(vector1) norm2 = np.linalg.norm(vector2) if norm1 > 0 and norm2 > 0: vector1 = vector1 / norm1 vector2 = vector2 / norm2 # Calculate dot product and angle dot_product = np.clip(np.dot(vector1, vector2), -1.0, 1.0) angle = np.arccos(dot_product) return np.degrees(angle) return 0 def _calculate_landmark_movement(self, current_landmark, previous_landmark): """Calculate movement between current and previous landmark position.""" if current_landmark is None or previous_landmark is None: return 0 return self._calculate_distance(current_landmark, previous_landmark) def _analyze_shoulder_alignment(self, pose_landmarks): """Analyze shoulder alignment (level shoulders vs. one higher than the other).""" if pose_landmarks: left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER] right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER] # Calculate shoulder height difference (y-axis) height_diff = abs(left_shoulder.y - right_shoulder.y) # Normalize by shoulder width shoulder_width = abs(left_shoulder.x - right_shoulder.x) if shoulder_width > 0: normalized_diff = height_diff / shoulder_width self.shoulder_alignment_history.append(normalized_diff) # Update current state self.current_state['shoulder_misalignment'] = ( normalized_diff > self.thresholds['shoulder_alignment']) return normalized_diff return 0 def _analyze_lean_forward(self, pose_landmarks): """Analyze if the person is leaning forward.""" if pose_landmarks: # Use shoulder and hip positions to determine lean left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER] right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER] left_hip = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_HIP] right_hip = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_HIP] # Calculate average shoulder and hip positions shoulder_z = (left_shoulder.z + right_shoulder.z) / 2 hip_z = (left_hip.z + right_hip.z) / 2 # Calculate lean (z-axis difference, normalized by height) shoulder_hip_y_diff = abs((left_shoulder.y + right_shoulder.y)/2 - (left_hip.y + right_hip.y)/2) lean_forward = (shoulder_z - hip_z) / max(shoulder_hip_y_diff, 0.1) # Track history self.lean_forward_history.append(lean_forward) # Update current state self.current_state['leaning_forward'] = ( lean_forward > self.thresholds['lean_forward']) return lean_forward return 0 def _analyze_head_tilt(self, face_landmarks): """Analyze head tilt (left/right).""" if face_landmarks: # Use eye and ear positions to determine head tilt left_eye = face_landmarks.landmark[33] # Left eye outer corner right_eye = face_landmarks.landmark[263] # Right eye outer corner # Calculate tilt angle from horizontal angle = math.atan2(right_eye.y - left_eye.y, right_eye.x - left_eye.x) tilt = abs(angle) # Track history self.head_tilt_history.append(tilt) # Update current state self.current_state['head_tilted'] = ( tilt > self.thresholds['head_tilt']) return tilt return 0 def _analyze_hand_movement(self, left_hand, right_hand): """Analyze hand movement and gestures.""" movement = 0 # Check left hand movement if left_hand and self.prev_left_hand_landmarks: # Use wrist as reference point left_movement = self._calculate_landmark_movement( left_hand.landmark[0], # Wrist landmark self.prev_left_hand_landmarks.landmark[0] ) movement = max(movement, left_movement) # Check right hand movement if right_hand and self.prev_right_hand_landmarks: # Use wrist as reference point right_movement = self._calculate_landmark_movement( right_hand.landmark[0], # Wrist landmark self.prev_right_hand_landmarks.landmark[0] ) movement = max(movement, right_movement) # Track history self.hand_movement_history.append(movement) # Update current state self.current_state['hand_movement'] = ( movement > self.thresholds['hand_movement']) return movement def _analyze_self_touch(self, pose_landmarks, left_hand, right_hand, face_landmarks): """Detect if hands are touching face, hair, or other body parts.""" self_touch = 0 if face_landmarks: # Check left hand to face proximity if left_hand: left_index_tip = left_hand.landmark[8] # Index finger tip nose_tip = face_landmarks.landmark[4] left_to_face_dist = self._calculate_distance(left_index_tip, nose_tip) self_touch = max(self_touch, 1.0 - min(left_to_face_dist * 5, 1.0)) # Check right hand to face proximity if right_hand: right_index_tip = right_hand.landmark[8] # Index finger tip nose_tip = face_landmarks.landmark[4] right_to_face_dist = self._calculate_distance(right_index_tip, nose_tip) self_touch = max(self_touch, 1.0 - min(right_to_face_dist * 5, 1.0)) # Track history self.self_touch_history.append(self_touch) # Update current state self.current_state['self_touching'] = ( self_touch > self.thresholds['self_touch']) return self_touch def _analyze_crossing_arms(self, pose_landmarks): """Detect if arms are crossed.""" crossing_score = 0 if pose_landmarks: # Get key landmarks left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER] right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER] left_elbow = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_ELBOW] right_elbow = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_ELBOW] left_wrist = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_WRIST] right_wrist = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_WRIST] # Check if wrists are crossing the center line center_x = (left_shoulder.x + right_shoulder.x) / 2 left_wrist_right_of_center = left_wrist.x > center_x right_wrist_left_of_center = right_wrist.x < center_x elbows_down = (left_elbow.y > left_shoulder.y and right_elbow.y > right_shoulder.y) # Simple heuristic for crossed arms if left_wrist_right_of_center and right_wrist_left_of_center and elbows_down: # Calculate how far the wrists have crossed left_cross_amount = (left_wrist.x - center_x) / (right_shoulder.x - center_x) right_cross_amount = (center_x - right_wrist.x) / (center_x - left_shoulder.x) crossing_score = min(1.0, (left_cross_amount + right_cross_amount) / 2) # Track history self.crossing_arms_history.append(crossing_score) # Update current state self.current_state['arms_crossed'] = ( crossing_score > self.thresholds['crossing_arms']) return crossing_score def _analyze_fidgeting(self, pose_landmarks, left_hand, right_hand): """Detect small repetitive movements (fidgeting).""" fidgeting_score = 0 # Check for small hand movements if self.prev_left_hand_landmarks and left_hand: # Calculate average movement of all finger joints total_movement = 0 count = 0 for i in range(21): # 21 hand landmarks if i < len(left_hand.landmark) and i < len(self.prev_left_hand_landmarks.landmark): movement = self._calculate_landmark_movement( left_hand.landmark[i], self.prev_left_hand_landmarks.landmark[i] ) total_movement += movement count += 1 if count > 0: avg_movement = total_movement / count fidgeting_score = max(fidgeting_score, avg_movement) # Similar for right hand if self.prev_right_hand_landmarks and right_hand: total_movement = 0 count = 0 for i in range(21): # 21 hand landmarks if i < len(right_hand.landmark) and i < len(self.prev_right_hand_landmarks.landmark): movement = self._calculate_landmark_movement( right_hand.landmark[i], self.prev_right_hand_landmarks.landmark[i] ) total_movement += movement count += 1 if count > 0: avg_movement = total_movement / count fidgeting_score = max(fidgeting_score, avg_movement) # Track history self.fidgeting_history.append(fidgeting_score) # Update current state - fidgeting is when movement is small but persistent self.current_state['fidgeting'] = ( fidgeting_score > self.thresholds['fidgeting'] and fidgeting_score < self.thresholds['hand_movement']) return fidgeting_score def _analyze_pose_shift(self, pose_landmarks): """Detect major posture shifts.""" pose_shift = 0 if pose_landmarks and self.prev_pose_landmarks: # Calculate average movement of all upper body landmarks upper_body_landmarks = [ self.mp_holistic.PoseLandmark.LEFT_SHOULDER, self.mp_holistic.PoseLandmark.RIGHT_SHOULDER, self.mp_holistic.PoseLandmark.LEFT_ELBOW, self.mp_holistic.PoseLandmark.RIGHT_ELBOW, self.mp_holistic.PoseLandmark.LEFT_WRIST, self.mp_holistic.PoseLandmark.RIGHT_WRIST, self.mp_holistic.PoseLandmark.LEFT_HIP, self.mp_holistic.PoseLandmark.RIGHT_HIP ] total_movement = 0 for landmark_idx in upper_body_landmarks: movement = self._calculate_landmark_movement( pose_landmarks.landmark[landmark_idx], self.prev_pose_landmarks.landmark[landmark_idx] ) total_movement += movement pose_shift = total_movement / len(upper_body_landmarks) # Track history self.pose_shift_history.append(pose_shift) # Update current state current_time = time.time() if pose_shift > self.thresholds['pose_shift']: self.current_state['pose_shifting'] = 1 self.current_state['last_pose_shift'] = current_time elif current_time - self.current_state['last_pose_shift'] > 3: # Reset after 3 seconds self.current_state['pose_shifting'] = 0 return pose_shift def process_frame(self, frame, annotate=False): """ Process a single frame to analyze body language. Args: frame: The video frame (BGR format) annotate: Whether to draw annotations on the frame Returns: dict: Body language metrics for this frame frame: Annotated frame if annotate=True, otherwise original frame """ self.total_frames += 1 frame_metrics = { 'timestamp': time.time(), 'frame_number': self.total_frames } # Convert to RGB for MediaPipe frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Process the frame results = self.holistic.process(frame_rgb) # Make a copy for annotations if needed if annotate: annotated_frame = frame.copy() else: annotated_frame = frame # Analyze different aspects of body language if results.pose_landmarks: # Posture analysis shoulder_alignment = self._analyze_shoulder_alignment(results.pose_landmarks) lean_forward = self._analyze_lean_forward(results.pose_landmarks) frame_metrics['shoulder_alignment'] = shoulder_alignment frame_metrics['lean_forward'] = lean_forward # Arms crossed analysis crossing_arms = self._analyze_crossing_arms(results.pose_landmarks) frame_metrics['crossing_arms'] = crossing_arms # Pose shift analysis pose_shift = self._analyze_pose_shift(results.pose_landmarks) frame_metrics['pose_shift'] = pose_shift if results.face_landmarks: # Head tilt analysis head_tilt = self._analyze_head_tilt(results.face_landmarks) frame_metrics['head_tilt'] = head_tilt # Hand movement and gestures hand_movement = self._analyze_hand_movement( results.left_hand_landmarks, results.right_hand_landmarks ) frame_metrics['hand_movement'] = hand_movement # Self-touch detection self_touch = self._analyze_self_touch( results.pose_landmarks, results.left_hand_landmarks, results.right_hand_landmarks, results.face_landmarks ) frame_metrics['self_touch'] = self_touch # Fidgeting detection fidgeting = self._analyze_fidgeting( results.pose_landmarks, results.left_hand_landmarks, results.right_hand_landmarks ) frame_metrics['fidgeting'] = fidgeting # Store current landmarks for next frame comparison self.prev_pose_landmarks = results.pose_landmarks self.prev_face_landmarks = results.face_landmarks self.prev_left_hand_landmarks = results.left_hand_landmarks self.prev_right_hand_landmarks = results.right_hand_landmarks # Add current state to metrics for key, value in self.current_state.items(): if key != 'last_pose_shift': # Skip timestamp frame_metrics[key] = value # Draw annotations if requested if annotate: # Draw pose landmarks if results.pose_landmarks: self.mp_drawing.draw_landmarks( annotated_frame, results.pose_landmarks, self.mp_holistic.POSE_CONNECTIONS, landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style() ) # Draw face landmarks if results.face_landmarks: self.mp_drawing.draw_landmarks( annotated_frame, results.face_landmarks, self.mp_holistic.FACEMESH_TESSELATION, landmark_drawing_spec=None, connection_drawing_spec=self.mp_drawing_styles.get_default_face_mesh_tesselation_style() ) # Draw hand landmarks if results.left_hand_landmarks: self.mp_drawing.draw_landmarks( annotated_frame, results.left_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS, landmark_drawing_spec=self.mp_drawing_styles.get_default_hand_landmarks_style(), connection_drawing_spec=self.mp_drawing_styles.get_default_hand_connections_style() ) if results.right_hand_landmarks: self.mp_drawing.draw_landmarks( annotated_frame, results.right_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS, landmark_drawing_spec=self.mp_drawing_styles.get_default_hand_landmarks_style(), connection_drawing_spec=self.mp_drawing_styles.get_default_hand_connections_style() ) # Draw body language status on the frame y_pos = 30 font_scale = 0.6 # Draw posture status if self.current_state['shoulder_misalignment']: cv2.putText(annotated_frame, "Uneven Shoulders", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) y_pos += 25 if self.current_state['leaning_forward']: cv2.putText(annotated_frame, "Leaning Forward", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2) y_pos += 25 if self.current_state['head_tilted']: cv2.putText(annotated_frame, "Head Tilted", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) y_pos += 25 # Draw gesture status if self.current_state['hand_movement']: cv2.putText(annotated_frame, "Gesturing", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2) y_pos += 25 if self.current_state['self_touching']: cv2.putText(annotated_frame, "Self-Touching", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) y_pos += 25 if self.current_state['arms_crossed']: cv2.putText(annotated_frame, "Arms Crossed", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) y_pos += 25 # Draw movement status if self.current_state['fidgeting']: cv2.putText(annotated_frame, "Fidgeting", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) y_pos += 25 if self.current_state['pose_shifting']: cv2.putText(annotated_frame, "Shifting Posture", (20, y_pos), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2) y_pos += 25 return frame_metrics, annotated_frame def get_stats(self): """ Get comprehensive body language statistics. Returns: dict: Statistics about body language """ current_time = time.time() total_duration = current_time - self.start_time # Calculate stats for different metrics stats = { 'total_frames': self.total_frames, 'total_duration_seconds': total_duration, # Posture stats 'shoulder_misalignment_percentage': self._calculate_percentage( [1 if x > self.thresholds['shoulder_alignment'] else 0 for x in self.shoulder_alignment_history]), 'leaning_forward_percentage': self._calculate_percentage( [1 if x > self.thresholds['lean_forward'] else 0 for x in self.lean_forward_history]), 'head_tilt_percentage': self._calculate_percentage( [1 if x > self.thresholds['head_tilt'] else 0 for x in self.head_tilt_history]), # Gesture stats 'hand_movement_percentage': self._calculate_percentage( [1 if x > self.thresholds['hand_movement'] else 0 for x in self.hand_movement_history]), 'self_touch_percentage': self._calculate_percentage( [1 if x > self.thresholds['self_touch'] else 0 for x in self.self_touch_history]), 'arms_crossed_percentage': self._calculate_percentage( [1 if x > self.thresholds['crossing_arms'] else 0 for x in self.crossing_arms_history]), # Movement stats 'fidgeting_percentage': self._calculate_percentage( [1 if (x > self.thresholds['fidgeting'] and x < self.thresholds['hand_movement']) else 0 for x in self.fidgeting_history]), 'pose_shifts_count': sum([1 if x > self.thresholds['pose_shift'] else 0 for x in self.pose_shift_history]), # Average intensity (when present) 'avg_shoulder_misalignment': self._calculate_average( [x for x in self.shoulder_alignment_history if x > self.thresholds['shoulder_alignment']]), 'avg_lean_forward': self._calculate_average( [x for x in self.lean_forward_history if x > self.thresholds['lean_forward']]), 'avg_head_tilt': self._calculate_average( [x for x in self.head_tilt_history if x > self.thresholds['head_tilt']]), 'avg_hand_movement': self._calculate_average( [x for x in self.hand_movement_history if x > self.thresholds['hand_movement']]), 'avg_self_touch': self._calculate_average( [x for x in self.self_touch_history if x > self.thresholds['self_touch']]), 'avg_arms_crossed': self._calculate_average( [x for x in self.crossing_arms_history if x > self.thresholds['crossing_arms']]), 'avg_fidgeting': self._calculate_average( [x for x in self.fidgeting_history if x > self.thresholds['fidgeting'] and x < self.thresholds['hand_movement']]) } # Calculate pose shifts per minute if total_duration > 0: stats['pose_shifts_per_minute'] = stats['pose_shifts_count'] / (total_duration / 60) else: stats['pose_shifts_per_minute'] = 0 return stats def _calculate_percentage(self, binary_list): """Calculate percentage of True/1 values in a list.""" if len(binary_list) == 0: return 0 return sum(binary_list) / len(binary_list) * 100 def _calculate_average(self, values_list): """Calculate average of values in a list.""" if len(values_list) == 0: return 0 return sum(values_list) / len(values_list) def get_interview_assessment(self): """ Analyze body language patterns in the context of an interview. Returns: dict: Assessment of body language with interview-specific insights """ stats = self.get_stats() # Initialize assessment assessment = { 'confidence_score': 0, # 0-10 scale 'engagement_score': 0, # 0-10 scale 'comfort_score': 0, # 0-10 scale 'overall_score': 0, # 0-10 scale 'strengths': [], 'areas_for_improvement': [], 'recommendations': [] } # CONFIDENCE SCORE confidence_base = 7 # Start from a neutral-positive point # Positive indicators of confidence if stats['leaning_forward_percentage'] > 40: confidence_base += 1 assessment['strengths'].append('Shows engagement by leaning forward') if stats['hand_movement_percentage'] > 30 and stats['hand_movement_percentage'] < 70: confidence_base += 1 assessment['strengths'].append('Uses appropriate hand gestures to emphasize points') # Negative indicators if stats['shoulder_misalignment_percentage'] > 30: confidence_base -= 1 assessment['areas_for_improvement'].append('Uneven shoulders may convey tension') assessment['recommendations'].append('Practice maintaining level shoulders') if stats['self_touch_percentage'] > 30: confidence_base -= 2 assessment['areas_for_improvement'].append('Frequent self-touching can signal nervousness') assessment['recommendations'].append('Be mindful of touching your face or hair during interviews') if stats['fidgeting_percentage'] > 40: confidence_base -= 2 assessment['areas_for_improvement'].append('Fidgeting can distract from your message') assessment['recommendations'].append('Practice stillness or channel energy into purposeful gestures') if stats['arms_crossed_percentage'] > 50: confidence_base -= 1 assessment['areas_for_improvement'].append('Frequently crossed arms can appear defensive') assessment['recommendations'].append('Try to maintain a more open posture during interviews') # Clamp confidence score to 0-10 range assessment['confidence_score'] = max(0, min(10, confidence_base)) # ENGAGEMENT SCORE engagement_base = 5 # Start from a neutral point # Positive indicators of engagement if stats['leaning_forward_percentage'] > 50: engagement_base += 2 if 'Shows engagement by leaning forward' not in assessment['strengths']: assessment['strengths'].append('Shows engagement by leaning forward') if stats['hand_movement_percentage'] > 40: engagement_base += 1 if 'Uses appropriate hand gestures to emphasize points' not in assessment['strengths']: assessment['strengths'].append('Uses appropriate hand gestures to emphasize points') # Negative indicators if stats['pose_shifts_per_minute'] > 3: engagement_base -= 1 assessment['areas_for_improvement'].append('Frequent posture shifts may indicate restlessness') assessment['recommendations'].append('Work on maintaining a stable but comfortable posture') if stats['arms_crossed_percentage'] > 60: engagement_base -= 2 if 'Frequently crossed arms can appear defensive' not in assessment['areas_for_improvement']: assessment['areas_for_improvement'].append('Crossed arms can signal disengagement or defensiveness') # Clamp engagement score to 0-10 range assessment['engagement_score'] = max(0, min(10, engagement_base)) # COMFORT SCORE comfort_base = 6 # Start from a slightly positive point # Negative indicators of comfort if stats['fidgeting_percentage'] > 30: comfort_base -= 1 if 'Fidgeting can distract from your message' not in assessment['areas_for_improvement']: assessment['areas_for_improvement'].append('Fidgeting indicates nervousness or discomfort') if stats['self_touch_percentage'] > 40: comfort_base -= 1 if 'Frequent self-touching can signal nervousness' not in assessment['areas_for_improvement']: assessment['areas_for_improvement'].append('Self-touching often indicates anxiety or discomfort') if stats['pose_shifts_count'] > (stats['total_duration_seconds'] / 20): # More than 1 shift per 20 seconds comfort_base -= 1 if 'Frequent posture shifts may indicate restlessness' not in assessment['areas_for_improvement']: assessment['areas_for_improvement'].append('Frequent posture adjustments suggest discomfort') assessment['recommendations'].append('Find a comfortable seated position before the interview') # Positive indicators of comfort if stats['shoulder_misalignment_percentage'] < 20: comfort_base += 1 assessment['strengths'].append('Maintains balanced, relaxed shoulder posture') if stats['fidgeting_percentage'] < 15 and stats['self_touch_percentage'] < 15: comfort_base += 2 assessment['strengths'].append('Appears calm and composed through minimal nervous movements') # Clamp comfort score to 0-10 range assessment['comfort_score'] = max(0, min(10, comfort_base)) # OVERALL SCORE - weighted average of the three component scores assessment['overall_score'] = ( assessment['confidence_score'] * 0.4 + assessment['engagement_score'] * 0.4 + assessment['comfort_score'] * 0.2 ) # Add general recommendations if none were added if not assessment['recommendations']: assessment['recommendations'] = [ 'Practice interviews with video recording to observe your body language', 'Focus on maintaining an open, engaged posture', 'Use purposeful hand gestures to emphasize key points' ] # Add general strengths if none were identified if not assessment['strengths']: assessment['strengths'] = [ 'Shows baseline appropriate interview body language', 'Maintains basic professional demeanor' ] return assessment def analyze_body_language(frame, analyzer=None, annotate=False): """ Analyze body language in a single frame. Args: frame: The video frame (BGR format) analyzer: An existing BodyLanguageAnalyzer instance, or None to create a new one annotate: Whether to annotate the frame with visualization Returns: tuple: (metrics, analyzer, annotated_frame) - metrics: Dictionary of body language metrics for this frame - analyzer: The BodyLanguageAnalyzer instance (new or updated) - annotated_frame: The frame with annotations if requested """ if analyzer is None: analyzer = BodyLanguageAnalyzer() metrics, annotated_frame = analyzer.process_frame(frame, annotate) return metrics, analyzer, annotated_frame class InterviewAnalyzer: """ Combined analyzer for comprehensive interview assessment including eye contact and body language. """ def __init__(self): self.eye_contact_analyzer = EyeContactAnalyzer() self.body_language_analyzer = BodyLanguageAnalyzer() self.total_frames = 0 self.start_time = time.time() self.frame_metrics = [] def reset(self): """Reset all analyzers for a new session.""" self.eye_contact_analyzer.reset_stats() self.body_language_analyzer.reset_stats() self.total_frames = 0 self.start_time = time.time() self.frame_metrics = [] def process_frame(self, frame, annotate=False): """ Process a frame through both eye contact and body language analyzers. Args: frame: The video frame (BGR format) annotate: Whether to annotate the frame with visualization Returns: tuple: (combined_metrics, annotated_frame) """ self.total_frames += 1 # Process with eye contact analyzer eye_metrics, _, _ = analyze_eye_contact(frame, self.eye_contact_analyzer, False) # Process with body language analyzer body_metrics, body_frame = self.body_language_analyzer.process_frame(frame, annotate) # Combine metrics combined_metrics = {**eye_metrics, **body_metrics} combined_metrics['frame_number'] = self.total_frames combined_metrics['timestamp'] = time.time() # Store frame metrics for later analysis self.frame_metrics.append(combined_metrics) return combined_metrics, body_frame def get_comprehensive_assessment(self): """ Get a comprehensive assessment combining eye contact and body language insights. Returns: dict: Combined assessment with overall interview performance metrics """ # Get individual assessments eye_contact_stats = self.eye_contact_analyzer.get_stats() eye_contact_assessment = self.eye_contact_analyzer.get_interview_assessment() body_language_stats = self.body_language_analyzer.get_stats() body_language_assessment = self.body_language_analyzer.get_interview_assessment() # Create combined assessment assessment = { 'overall_score': (eye_contact_assessment['score'] * 0.4 + body_language_assessment['overall_score'] * 0.6), 'eye_contact': { 'score': eye_contact_assessment['score'], 'patterns': eye_contact_assessment['patterns'], 'recommendations': eye_contact_assessment['recommendations'] }, 'body_language': { 'confidence_score': body_language_assessment['confidence_score'], 'engagement_score': body_language_assessment['engagement_score'], 'comfort_score': body_language_assessment['comfort_score'], 'strengths': body_language_assessment['strengths'], 'areas_for_improvement': body_language_assessment['areas_for_improvement'], 'recommendations': body_language_assessment['recommendations'] }, 'key_statistics': { 'total_duration_seconds': time.time() - self.start_time, 'total_frames': self.total_frames, 'eye_contact_percentage': eye_contact_stats['eye_contact_percentage'], 'longest_eye_contact_seconds': eye_contact_stats['longest_eye_contact_seconds'], 'average_contact_duration_seconds': eye_contact_stats['average_contact_duration_seconds'], 'shoulder_misalignment_percentage': body_language_stats['shoulder_misalignment_percentage'], 'leaning_forward_percentage': body_language_stats['leaning_forward_percentage'], 'head_tilt_percentage': body_language_stats['head_tilt_percentage'], 'arms_crossed_percentage': body_language_stats['arms_crossed_percentage'], 'self_touch_percentage': body_language_stats['self_touch_percentage'], 'fidgeting_percentage': body_language_stats['fidgeting_percentage'], 'pose_shifts_per_minute': body_language_stats['pose_shifts_per_minute'] }, 'processing_info': { 'device_used': DEVICE } } # Generate overall assessment text if assessment['overall_score'] >= 8.5: assessment['overall_assessment'] = "Excellent interview presence. Your body language and eye contact project confidence and engagement." elif assessment['overall_score'] >= 7: assessment['overall_assessment'] = "Strong interview presence with some minor areas for improvement." elif assessment['overall_score'] >= 5.5: assessment['overall_assessment'] = "Adequate interview presence with several areas that could be strengthened." else: assessment['overall_assessment'] = "Your interview presence needs significant improvement to make a positive impression." return assessment def example_interview_assessment(): """ Generate an example interview assessment for demonstration purposes. Returns: dict: Example assessment """ assessment = { 'overall_score': 7.8, 'overall_assessment': "Strong interview presence with some minor areas for improvement.", 'eye_contact': { 'score': 8.0, 'patterns': ["Good eye contact maintained throughout most of the interview"], 'recommendations': ["Slightly reduce the intensity of eye contact in some moments"] }, 'body_language': { 'confidence_score': 7.5, 'engagement_score': 8.0, 'comfort_score': 7.0, 'strengths': [ "Good upright posture", "Appropriate hand gestures", "Engaged facial expressions" ], 'areas_for_improvement': [ "Occasional fidgeting", "Some tension in shoulders" ], 'recommendations': [ "Practice relaxation techniques before interviews", "Be mindful of hand movements when nervous", "Maintain balanced posture throughout" ] }, 'key_statistics': { 'total_duration_seconds': 300.0, 'total_frames': 9000, 'eye_contact_percentage': 65.0, 'longest_eye_contact_seconds': 8.5, 'average_contact_duration_seconds': 4.2, 'shoulder_misalignment_percentage': 85.0, 'leaning_forward_percentage': 40.0, 'head_tilt_percentage': 15.0, 'arms_crossed_percentage': 10.0, 'self_touch_percentage': 25.0, 'fidgeting_percentage': 30.0, 'pose_shifts_per_minute': 2.5 }, 'processing_info': { 'device_used': DEVICE } } print("\n=== EXAMPLE INTERVIEW ASSESSMENT ===") print(f"Overall Score: {assessment['overall_score']}/10") print(f"Assessment: {assessment['overall_assessment']}") print("\nEYE CONTACT:") print(f"Score: {assessment['eye_contact']['score']}/10") for pattern in assessment['eye_contact']['patterns']: print(f"- {pattern}") print("\nBODY LANGUAGE:") print(f"Confidence Score: {assessment['body_language']['confidence_score']}/10") print(f"Engagement Score: {assessment['body_language']['engagement_score']}/10") print(f"Comfort Score: {assessment['body_language']['comfort_score']}/10") print("\nSTRENGTHS:") for strength in assessment['body_language']['strengths']: print(f"+ {strength}") print("\nAREAS FOR IMPROVEMENT:") for area in assessment['body_language']['areas_for_improvement']: print(f"- {area}") print("\nPRIORITY RECOMMENDATIONS:") for i, rec in enumerate(assessment['body_language']['recommendations'], 1): print(f"{i}. {rec}") return assessment def analyze_video_file(video_path, display_video=False, save_results=False): """ Analyze body language in a video file and get statistics. Args: video_path: Path to the video file display_video: Whether to display the video during analysis save_results: Whether to save results to a JSON file Returns: dict: Body language statistics and assessment """ # Open the video file cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video file {video_path}") return None # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps if fps > 0 else 0 # Initialize analyzer analyzer = BodyLanguageAnalyzer() frame_number = 0 # Process each frame while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process the frame metrics, analyzer, annotated_frame = analyze_body_language(frame, analyzer, display_video) # Display progress frame_number += 1 # Display the frame if requested if display_video: cv2.imshow("Body Language Analysis", annotated_frame) # Break if 'q' is pressed if cv2.waitKey(1) & 0xFF == ord('q'): break # Clean up cap.release() if display_video: cv2.destroyAllWindows() # Get statistics and assessment stats = analyzer.get_stats() assessment = analyzer.get_interview_assessment() # Combine results results = { "video_info": { "path": video_path, "frames": frame_count, "fps": fps, "duration_seconds": duration, "device_used": DEVICE }, "body_language_stats": stats, "assessment": assessment } # Save results if requested if save_results: from datetime import datetime output_dir = os.path.join(os.path.dirname(video_path), "results") os.makedirs(output_dir, exist_ok=True) output_file = f"{output_dir}/{Path(video_path).stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_body_language_analysis.json" with open(output_file, 'w') as f: json.dump(results, f, indent=4) return results if __name__ == "__main__": example_interview_assessment()