test / behavior_backend /app /services /processing /body_language_analyzer.py
hibatorrahmen's picture
Add backend application and Dockerfile
8ae78b0
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import time
from collections import deque
import math
import json
import os
from pathlib import Path
from app.services.processing.eye_contact_analyzer import EyeContactAnalyzer
from app.services.processing.eye_contact_analyzer import analyze_eye_contact
from app.utils.device_utils import get_available_device
# Initialize device once at module level
DEVICE = get_available_device()
class BodyLanguageAnalyzer:
def __init__(self, history_size=100):
"""
Initialize the body language analyzer for interview assessment.
Args:
history_size: Number of frames to keep in history for rolling metrics
"""
# Initialize MediaPipe Pose and Holistic
self.mp_holistic = mp.solutions.holistic
self.mp_drawing = mp.solutions.drawing_utils
self.mp_drawing_styles = mp.solutions.drawing_styles
self.holistic = self.mp_holistic.Holistic(
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
static_image_mode=False
)
# Stats tracking
self.history_size = history_size
self.total_frames = 0
self.start_time = time.time()
# Posture tracking
self.shoulder_alignment_history = deque(maxlen=history_size)
self.lean_forward_history = deque(maxlen=history_size)
self.head_tilt_history = deque(maxlen=history_size)
# Gesture tracking
self.hand_movement_history = deque(maxlen=history_size)
self.self_touch_history = deque(maxlen=history_size)
self.crossing_arms_history = deque(maxlen=history_size)
# Movement tracking
self.fidgeting_history = deque(maxlen=history_size)
self.pose_shift_history = deque(maxlen=history_size)
# Previous frame landmarks for movement detection
self.prev_pose_landmarks = None
self.prev_face_landmarks = None
self.prev_left_hand_landmarks = None
self.prev_right_hand_landmarks = None
# Threshold values
self.thresholds = {
'shoulder_alignment': 0.05, # Shoulder height difference ratio
'lean_forward': 0.4, # Forward lean threshold
'head_tilt': 0.1, # Head tilt angle threshold
'hand_movement': 0.03, # Hand movement threshold
'self_touch': 0.1, # Self-touch proximity threshold
'crossing_arms': 0.15, # Arms crossing threshold
'fidgeting': 0.02, # Fidgeting movement threshold
'pose_shift': 0.05 # Major posture shift threshold
}
# Current state
self.current_state = {
'shoulder_misalignment': 0,
'leaning_forward': 0,
'head_tilted': 0,
'hand_movement': 0,
'self_touching': 0,
'arms_crossed': 0,
'fidgeting': 0,
'pose_shifting': 0,
'last_pose_shift': 0
}
def reset_stats(self):
"""Reset all statistics for a new session."""
self.shoulder_alignment_history.clear()
self.lean_forward_history.clear()
self.head_tilt_history.clear()
self.hand_movement_history.clear()
self.self_touch_history.clear()
self.crossing_arms_history.clear()
self.fidgeting_history.clear()
self.pose_shift_history.clear()
self.total_frames = 0
self.start_time = time.time()
self.prev_pose_landmarks = None
self.prev_face_landmarks = None
self.prev_left_hand_landmarks = None
self.prev_right_hand_landmarks = None
def _calculate_distance(self, point1, point2):
"""Calculate Euclidean distance between two 3D points."""
return math.sqrt((point1.x - point2.x)**2 +
(point1.y - point2.y)**2 +
(point1.z - point2.z)**2)
def _calculate_angle(self, point1, point2, point3):
"""Calculate angle between three points."""
vector1 = np.array([point1.x - point2.x, point1.y - point2.y, point1.z - point2.z])
vector2 = np.array([point3.x - point2.x, point3.y - point2.y, point3.z - point2.z])
# Normalize vectors
norm1 = np.linalg.norm(vector1)
norm2 = np.linalg.norm(vector2)
if norm1 > 0 and norm2 > 0:
vector1 = vector1 / norm1
vector2 = vector2 / norm2
# Calculate dot product and angle
dot_product = np.clip(np.dot(vector1, vector2), -1.0, 1.0)
angle = np.arccos(dot_product)
return np.degrees(angle)
return 0
def _calculate_landmark_movement(self, current_landmark, previous_landmark):
"""Calculate movement between current and previous landmark position."""
if current_landmark is None or previous_landmark is None:
return 0
return self._calculate_distance(current_landmark, previous_landmark)
def _analyze_shoulder_alignment(self, pose_landmarks):
"""Analyze shoulder alignment (level shoulders vs. one higher than the other)."""
if pose_landmarks:
left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER]
right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER]
# Calculate shoulder height difference (y-axis)
height_diff = abs(left_shoulder.y - right_shoulder.y)
# Normalize by shoulder width
shoulder_width = abs(left_shoulder.x - right_shoulder.x)
if shoulder_width > 0:
normalized_diff = height_diff / shoulder_width
self.shoulder_alignment_history.append(normalized_diff)
# Update current state
self.current_state['shoulder_misalignment'] = (
normalized_diff > self.thresholds['shoulder_alignment'])
return normalized_diff
return 0
def _analyze_lean_forward(self, pose_landmarks):
"""Analyze if the person is leaning forward."""
if pose_landmarks:
# Use shoulder and hip positions to determine lean
left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER]
right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER]
left_hip = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_HIP]
right_hip = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_HIP]
# Calculate average shoulder and hip positions
shoulder_z = (left_shoulder.z + right_shoulder.z) / 2
hip_z = (left_hip.z + right_hip.z) / 2
# Calculate lean (z-axis difference, normalized by height)
shoulder_hip_y_diff = abs((left_shoulder.y + right_shoulder.y)/2 -
(left_hip.y + right_hip.y)/2)
lean_forward = (shoulder_z - hip_z) / max(shoulder_hip_y_diff, 0.1)
# Track history
self.lean_forward_history.append(lean_forward)
# Update current state
self.current_state['leaning_forward'] = (
lean_forward > self.thresholds['lean_forward'])
return lean_forward
return 0
def _analyze_head_tilt(self, face_landmarks):
"""Analyze head tilt (left/right)."""
if face_landmarks:
# Use eye and ear positions to determine head tilt
left_eye = face_landmarks.landmark[33] # Left eye outer corner
right_eye = face_landmarks.landmark[263] # Right eye outer corner
# Calculate tilt angle from horizontal
angle = math.atan2(right_eye.y - left_eye.y, right_eye.x - left_eye.x)
tilt = abs(angle)
# Track history
self.head_tilt_history.append(tilt)
# Update current state
self.current_state['head_tilted'] = (
tilt > self.thresholds['head_tilt'])
return tilt
return 0
def _analyze_hand_movement(self, left_hand, right_hand):
"""Analyze hand movement and gestures."""
movement = 0
# Check left hand movement
if left_hand and self.prev_left_hand_landmarks:
# Use wrist as reference point
left_movement = self._calculate_landmark_movement(
left_hand.landmark[0], # Wrist landmark
self.prev_left_hand_landmarks.landmark[0]
)
movement = max(movement, left_movement)
# Check right hand movement
if right_hand and self.prev_right_hand_landmarks:
# Use wrist as reference point
right_movement = self._calculate_landmark_movement(
right_hand.landmark[0], # Wrist landmark
self.prev_right_hand_landmarks.landmark[0]
)
movement = max(movement, right_movement)
# Track history
self.hand_movement_history.append(movement)
# Update current state
self.current_state['hand_movement'] = (
movement > self.thresholds['hand_movement'])
return movement
def _analyze_self_touch(self, pose_landmarks, left_hand, right_hand, face_landmarks):
"""Detect if hands are touching face, hair, or other body parts."""
self_touch = 0
if face_landmarks:
# Check left hand to face proximity
if left_hand:
left_index_tip = left_hand.landmark[8] # Index finger tip
nose_tip = face_landmarks.landmark[4]
left_to_face_dist = self._calculate_distance(left_index_tip, nose_tip)
self_touch = max(self_touch, 1.0 - min(left_to_face_dist * 5, 1.0))
# Check right hand to face proximity
if right_hand:
right_index_tip = right_hand.landmark[8] # Index finger tip
nose_tip = face_landmarks.landmark[4]
right_to_face_dist = self._calculate_distance(right_index_tip, nose_tip)
self_touch = max(self_touch, 1.0 - min(right_to_face_dist * 5, 1.0))
# Track history
self.self_touch_history.append(self_touch)
# Update current state
self.current_state['self_touching'] = (
self_touch > self.thresholds['self_touch'])
return self_touch
def _analyze_crossing_arms(self, pose_landmarks):
"""Detect if arms are crossed."""
crossing_score = 0
if pose_landmarks:
# Get key landmarks
left_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_SHOULDER]
right_shoulder = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_SHOULDER]
left_elbow = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_ELBOW]
right_elbow = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_ELBOW]
left_wrist = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.LEFT_WRIST]
right_wrist = pose_landmarks.landmark[self.mp_holistic.PoseLandmark.RIGHT_WRIST]
# Check if wrists are crossing the center line
center_x = (left_shoulder.x + right_shoulder.x) / 2
left_wrist_right_of_center = left_wrist.x > center_x
right_wrist_left_of_center = right_wrist.x < center_x
elbows_down = (left_elbow.y > left_shoulder.y and
right_elbow.y > right_shoulder.y)
# Simple heuristic for crossed arms
if left_wrist_right_of_center and right_wrist_left_of_center and elbows_down:
# Calculate how far the wrists have crossed
left_cross_amount = (left_wrist.x - center_x) / (right_shoulder.x - center_x)
right_cross_amount = (center_x - right_wrist.x) / (center_x - left_shoulder.x)
crossing_score = min(1.0, (left_cross_amount + right_cross_amount) / 2)
# Track history
self.crossing_arms_history.append(crossing_score)
# Update current state
self.current_state['arms_crossed'] = (
crossing_score > self.thresholds['crossing_arms'])
return crossing_score
def _analyze_fidgeting(self, pose_landmarks, left_hand, right_hand):
"""Detect small repetitive movements (fidgeting)."""
fidgeting_score = 0
# Check for small hand movements
if self.prev_left_hand_landmarks and left_hand:
# Calculate average movement of all finger joints
total_movement = 0
count = 0
for i in range(21): # 21 hand landmarks
if i < len(left_hand.landmark) and i < len(self.prev_left_hand_landmarks.landmark):
movement = self._calculate_landmark_movement(
left_hand.landmark[i],
self.prev_left_hand_landmarks.landmark[i]
)
total_movement += movement
count += 1
if count > 0:
avg_movement = total_movement / count
fidgeting_score = max(fidgeting_score, avg_movement)
# Similar for right hand
if self.prev_right_hand_landmarks and right_hand:
total_movement = 0
count = 0
for i in range(21): # 21 hand landmarks
if i < len(right_hand.landmark) and i < len(self.prev_right_hand_landmarks.landmark):
movement = self._calculate_landmark_movement(
right_hand.landmark[i],
self.prev_right_hand_landmarks.landmark[i]
)
total_movement += movement
count += 1
if count > 0:
avg_movement = total_movement / count
fidgeting_score = max(fidgeting_score, avg_movement)
# Track history
self.fidgeting_history.append(fidgeting_score)
# Update current state - fidgeting is when movement is small but persistent
self.current_state['fidgeting'] = (
fidgeting_score > self.thresholds['fidgeting'] and
fidgeting_score < self.thresholds['hand_movement'])
return fidgeting_score
def _analyze_pose_shift(self, pose_landmarks):
"""Detect major posture shifts."""
pose_shift = 0
if pose_landmarks and self.prev_pose_landmarks:
# Calculate average movement of all upper body landmarks
upper_body_landmarks = [
self.mp_holistic.PoseLandmark.LEFT_SHOULDER,
self.mp_holistic.PoseLandmark.RIGHT_SHOULDER,
self.mp_holistic.PoseLandmark.LEFT_ELBOW,
self.mp_holistic.PoseLandmark.RIGHT_ELBOW,
self.mp_holistic.PoseLandmark.LEFT_WRIST,
self.mp_holistic.PoseLandmark.RIGHT_WRIST,
self.mp_holistic.PoseLandmark.LEFT_HIP,
self.mp_holistic.PoseLandmark.RIGHT_HIP
]
total_movement = 0
for landmark_idx in upper_body_landmarks:
movement = self._calculate_landmark_movement(
pose_landmarks.landmark[landmark_idx],
self.prev_pose_landmarks.landmark[landmark_idx]
)
total_movement += movement
pose_shift = total_movement / len(upper_body_landmarks)
# Track history
self.pose_shift_history.append(pose_shift)
# Update current state
current_time = time.time()
if pose_shift > self.thresholds['pose_shift']:
self.current_state['pose_shifting'] = 1
self.current_state['last_pose_shift'] = current_time
elif current_time - self.current_state['last_pose_shift'] > 3: # Reset after 3 seconds
self.current_state['pose_shifting'] = 0
return pose_shift
def process_frame(self, frame, annotate=False):
"""
Process a single frame to analyze body language.
Args:
frame: The video frame (BGR format)
annotate: Whether to draw annotations on the frame
Returns:
dict: Body language metrics for this frame
frame: Annotated frame if annotate=True, otherwise original frame
"""
self.total_frames += 1
frame_metrics = {
'timestamp': time.time(),
'frame_number': self.total_frames
}
# Convert to RGB for MediaPipe
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process the frame
results = self.holistic.process(frame_rgb)
# Make a copy for annotations if needed
if annotate:
annotated_frame = frame.copy()
else:
annotated_frame = frame
# Analyze different aspects of body language
if results.pose_landmarks:
# Posture analysis
shoulder_alignment = self._analyze_shoulder_alignment(results.pose_landmarks)
lean_forward = self._analyze_lean_forward(results.pose_landmarks)
frame_metrics['shoulder_alignment'] = shoulder_alignment
frame_metrics['lean_forward'] = lean_forward
# Arms crossed analysis
crossing_arms = self._analyze_crossing_arms(results.pose_landmarks)
frame_metrics['crossing_arms'] = crossing_arms
# Pose shift analysis
pose_shift = self._analyze_pose_shift(results.pose_landmarks)
frame_metrics['pose_shift'] = pose_shift
if results.face_landmarks:
# Head tilt analysis
head_tilt = self._analyze_head_tilt(results.face_landmarks)
frame_metrics['head_tilt'] = head_tilt
# Hand movement and gestures
hand_movement = self._analyze_hand_movement(
results.left_hand_landmarks,
results.right_hand_landmarks
)
frame_metrics['hand_movement'] = hand_movement
# Self-touch detection
self_touch = self._analyze_self_touch(
results.pose_landmarks,
results.left_hand_landmarks,
results.right_hand_landmarks,
results.face_landmarks
)
frame_metrics['self_touch'] = self_touch
# Fidgeting detection
fidgeting = self._analyze_fidgeting(
results.pose_landmarks,
results.left_hand_landmarks,
results.right_hand_landmarks
)
frame_metrics['fidgeting'] = fidgeting
# Store current landmarks for next frame comparison
self.prev_pose_landmarks = results.pose_landmarks
self.prev_face_landmarks = results.face_landmarks
self.prev_left_hand_landmarks = results.left_hand_landmarks
self.prev_right_hand_landmarks = results.right_hand_landmarks
# Add current state to metrics
for key, value in self.current_state.items():
if key != 'last_pose_shift': # Skip timestamp
frame_metrics[key] = value
# Draw annotations if requested
if annotate:
# Draw pose landmarks
if results.pose_landmarks:
self.mp_drawing.draw_landmarks(
annotated_frame,
results.pose_landmarks,
self.mp_holistic.POSE_CONNECTIONS,
landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
)
# Draw face landmarks
if results.face_landmarks:
self.mp_drawing.draw_landmarks(
annotated_frame,
results.face_landmarks,
self.mp_holistic.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=self.mp_drawing_styles.get_default_face_mesh_tesselation_style()
)
# Draw hand landmarks
if results.left_hand_landmarks:
self.mp_drawing.draw_landmarks(
annotated_frame,
results.left_hand_landmarks,
self.mp_holistic.HAND_CONNECTIONS,
landmark_drawing_spec=self.mp_drawing_styles.get_default_hand_landmarks_style(),
connection_drawing_spec=self.mp_drawing_styles.get_default_hand_connections_style()
)
if results.right_hand_landmarks:
self.mp_drawing.draw_landmarks(
annotated_frame,
results.right_hand_landmarks,
self.mp_holistic.HAND_CONNECTIONS,
landmark_drawing_spec=self.mp_drawing_styles.get_default_hand_landmarks_style(),
connection_drawing_spec=self.mp_drawing_styles.get_default_hand_connections_style()
)
# Draw body language status on the frame
y_pos = 30
font_scale = 0.6
# Draw posture status
if self.current_state['shoulder_misalignment']:
cv2.putText(annotated_frame, "Uneven Shoulders", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2)
y_pos += 25
if self.current_state['leaning_forward']:
cv2.putText(annotated_frame, "Leaning Forward", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2)
y_pos += 25
if self.current_state['head_tilted']:
cv2.putText(annotated_frame, "Head Tilted", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2)
y_pos += 25
# Draw gesture status
if self.current_state['hand_movement']:
cv2.putText(annotated_frame, "Gesturing", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 255, 0), 2)
y_pos += 25
if self.current_state['self_touching']:
cv2.putText(annotated_frame, "Self-Touching", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2)
y_pos += 25
if self.current_state['arms_crossed']:
cv2.putText(annotated_frame, "Arms Crossed", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2)
y_pos += 25
# Draw movement status
if self.current_state['fidgeting']:
cv2.putText(annotated_frame, "Fidgeting", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2)
y_pos += 25
if self.current_state['pose_shifting']:
cv2.putText(annotated_frame, "Shifting Posture", (20, y_pos),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), 2)
y_pos += 25
return frame_metrics, annotated_frame
def get_stats(self):
"""
Get comprehensive body language statistics.
Returns:
dict: Statistics about body language
"""
current_time = time.time()
total_duration = current_time - self.start_time
# Calculate stats for different metrics
stats = {
'total_frames': self.total_frames,
'total_duration_seconds': total_duration,
# Posture stats
'shoulder_misalignment_percentage': self._calculate_percentage(
[1 if x > self.thresholds['shoulder_alignment'] else 0
for x in self.shoulder_alignment_history]),
'leaning_forward_percentage': self._calculate_percentage(
[1 if x > self.thresholds['lean_forward'] else 0
for x in self.lean_forward_history]),
'head_tilt_percentage': self._calculate_percentage(
[1 if x > self.thresholds['head_tilt'] else 0
for x in self.head_tilt_history]),
# Gesture stats
'hand_movement_percentage': self._calculate_percentage(
[1 if x > self.thresholds['hand_movement'] else 0
for x in self.hand_movement_history]),
'self_touch_percentage': self._calculate_percentage(
[1 if x > self.thresholds['self_touch'] else 0
for x in self.self_touch_history]),
'arms_crossed_percentage': self._calculate_percentage(
[1 if x > self.thresholds['crossing_arms'] else 0
for x in self.crossing_arms_history]),
# Movement stats
'fidgeting_percentage': self._calculate_percentage(
[1 if (x > self.thresholds['fidgeting'] and x < self.thresholds['hand_movement']) else 0
for x in self.fidgeting_history]),
'pose_shifts_count': sum([1 if x > self.thresholds['pose_shift'] else 0
for x in self.pose_shift_history]),
# Average intensity (when present)
'avg_shoulder_misalignment': self._calculate_average(
[x for x in self.shoulder_alignment_history if x > self.thresholds['shoulder_alignment']]),
'avg_lean_forward': self._calculate_average(
[x for x in self.lean_forward_history if x > self.thresholds['lean_forward']]),
'avg_head_tilt': self._calculate_average(
[x for x in self.head_tilt_history if x > self.thresholds['head_tilt']]),
'avg_hand_movement': self._calculate_average(
[x for x in self.hand_movement_history if x > self.thresholds['hand_movement']]),
'avg_self_touch': self._calculate_average(
[x for x in self.self_touch_history if x > self.thresholds['self_touch']]),
'avg_arms_crossed': self._calculate_average(
[x for x in self.crossing_arms_history if x > self.thresholds['crossing_arms']]),
'avg_fidgeting': self._calculate_average(
[x for x in self.fidgeting_history if x > self.thresholds['fidgeting']
and x < self.thresholds['hand_movement']])
}
# Calculate pose shifts per minute
if total_duration > 0:
stats['pose_shifts_per_minute'] = stats['pose_shifts_count'] / (total_duration / 60)
else:
stats['pose_shifts_per_minute'] = 0
return stats
def _calculate_percentage(self, binary_list):
"""Calculate percentage of True/1 values in a list."""
if len(binary_list) == 0:
return 0
return sum(binary_list) / len(binary_list) * 100
def _calculate_average(self, values_list):
"""Calculate average of values in a list."""
if len(values_list) == 0:
return 0
return sum(values_list) / len(values_list)
def get_interview_assessment(self):
"""
Analyze body language patterns in the context of an interview.
Returns:
dict: Assessment of body language with interview-specific insights
"""
stats = self.get_stats()
# Initialize assessment
assessment = {
'confidence_score': 0, # 0-10 scale
'engagement_score': 0, # 0-10 scale
'comfort_score': 0, # 0-10 scale
'overall_score': 0, # 0-10 scale
'strengths': [],
'areas_for_improvement': [],
'recommendations': []
}
# CONFIDENCE SCORE
confidence_base = 7 # Start from a neutral-positive point
# Positive indicators of confidence
if stats['leaning_forward_percentage'] > 40:
confidence_base += 1
assessment['strengths'].append('Shows engagement by leaning forward')
if stats['hand_movement_percentage'] > 30 and stats['hand_movement_percentage'] < 70:
confidence_base += 1
assessment['strengths'].append('Uses appropriate hand gestures to emphasize points')
# Negative indicators
if stats['shoulder_misalignment_percentage'] > 30:
confidence_base -= 1
assessment['areas_for_improvement'].append('Uneven shoulders may convey tension')
assessment['recommendations'].append('Practice maintaining level shoulders')
if stats['self_touch_percentage'] > 30:
confidence_base -= 2
assessment['areas_for_improvement'].append('Frequent self-touching can signal nervousness')
assessment['recommendations'].append('Be mindful of touching your face or hair during interviews')
if stats['fidgeting_percentage'] > 40:
confidence_base -= 2
assessment['areas_for_improvement'].append('Fidgeting can distract from your message')
assessment['recommendations'].append('Practice stillness or channel energy into purposeful gestures')
if stats['arms_crossed_percentage'] > 50:
confidence_base -= 1
assessment['areas_for_improvement'].append('Frequently crossed arms can appear defensive')
assessment['recommendations'].append('Try to maintain a more open posture during interviews')
# Clamp confidence score to 0-10 range
assessment['confidence_score'] = max(0, min(10, confidence_base))
# ENGAGEMENT SCORE
engagement_base = 5 # Start from a neutral point
# Positive indicators of engagement
if stats['leaning_forward_percentage'] > 50:
engagement_base += 2
if 'Shows engagement by leaning forward' not in assessment['strengths']:
assessment['strengths'].append('Shows engagement by leaning forward')
if stats['hand_movement_percentage'] > 40:
engagement_base += 1
if 'Uses appropriate hand gestures to emphasize points' not in assessment['strengths']:
assessment['strengths'].append('Uses appropriate hand gestures to emphasize points')
# Negative indicators
if stats['pose_shifts_per_minute'] > 3:
engagement_base -= 1
assessment['areas_for_improvement'].append('Frequent posture shifts may indicate restlessness')
assessment['recommendations'].append('Work on maintaining a stable but comfortable posture')
if stats['arms_crossed_percentage'] > 60:
engagement_base -= 2
if 'Frequently crossed arms can appear defensive' not in assessment['areas_for_improvement']:
assessment['areas_for_improvement'].append('Crossed arms can signal disengagement or defensiveness')
# Clamp engagement score to 0-10 range
assessment['engagement_score'] = max(0, min(10, engagement_base))
# COMFORT SCORE
comfort_base = 6 # Start from a slightly positive point
# Negative indicators of comfort
if stats['fidgeting_percentage'] > 30:
comfort_base -= 1
if 'Fidgeting can distract from your message' not in assessment['areas_for_improvement']:
assessment['areas_for_improvement'].append('Fidgeting indicates nervousness or discomfort')
if stats['self_touch_percentage'] > 40:
comfort_base -= 1
if 'Frequent self-touching can signal nervousness' not in assessment['areas_for_improvement']:
assessment['areas_for_improvement'].append('Self-touching often indicates anxiety or discomfort')
if stats['pose_shifts_count'] > (stats['total_duration_seconds'] / 20): # More than 1 shift per 20 seconds
comfort_base -= 1
if 'Frequent posture shifts may indicate restlessness' not in assessment['areas_for_improvement']:
assessment['areas_for_improvement'].append('Frequent posture adjustments suggest discomfort')
assessment['recommendations'].append('Find a comfortable seated position before the interview')
# Positive indicators of comfort
if stats['shoulder_misalignment_percentage'] < 20:
comfort_base += 1
assessment['strengths'].append('Maintains balanced, relaxed shoulder posture')
if stats['fidgeting_percentage'] < 15 and stats['self_touch_percentage'] < 15:
comfort_base += 2
assessment['strengths'].append('Appears calm and composed through minimal nervous movements')
# Clamp comfort score to 0-10 range
assessment['comfort_score'] = max(0, min(10, comfort_base))
# OVERALL SCORE - weighted average of the three component scores
assessment['overall_score'] = (
assessment['confidence_score'] * 0.4 +
assessment['engagement_score'] * 0.4 +
assessment['comfort_score'] * 0.2
)
# Add general recommendations if none were added
if not assessment['recommendations']:
assessment['recommendations'] = [
'Practice interviews with video recording to observe your body language',
'Focus on maintaining an open, engaged posture',
'Use purposeful hand gestures to emphasize key points'
]
# Add general strengths if none were identified
if not assessment['strengths']:
assessment['strengths'] = [
'Shows baseline appropriate interview body language',
'Maintains basic professional demeanor'
]
return assessment
def analyze_body_language(frame, analyzer=None, annotate=False):
"""
Analyze body language in a single frame.
Args:
frame: The video frame (BGR format)
analyzer: An existing BodyLanguageAnalyzer instance, or None to create a new one
annotate: Whether to annotate the frame with visualization
Returns:
tuple: (metrics, analyzer, annotated_frame)
- metrics: Dictionary of body language metrics for this frame
- analyzer: The BodyLanguageAnalyzer instance (new or updated)
- annotated_frame: The frame with annotations if requested
"""
if analyzer is None:
analyzer = BodyLanguageAnalyzer()
metrics, annotated_frame = analyzer.process_frame(frame, annotate)
return metrics, analyzer, annotated_frame
class InterviewAnalyzer:
"""
Combined analyzer for comprehensive interview assessment including
eye contact and body language.
"""
def __init__(self):
self.eye_contact_analyzer = EyeContactAnalyzer()
self.body_language_analyzer = BodyLanguageAnalyzer()
self.total_frames = 0
self.start_time = time.time()
self.frame_metrics = []
def reset(self):
"""Reset all analyzers for a new session."""
self.eye_contact_analyzer.reset_stats()
self.body_language_analyzer.reset_stats()
self.total_frames = 0
self.start_time = time.time()
self.frame_metrics = []
def process_frame(self, frame, annotate=False):
"""
Process a frame through both eye contact and body language analyzers.
Args:
frame: The video frame (BGR format)
annotate: Whether to annotate the frame with visualization
Returns:
tuple: (combined_metrics, annotated_frame)
"""
self.total_frames += 1
# Process with eye contact analyzer
eye_metrics, _, _ = analyze_eye_contact(frame, self.eye_contact_analyzer, False)
# Process with body language analyzer
body_metrics, body_frame = self.body_language_analyzer.process_frame(frame, annotate)
# Combine metrics
combined_metrics = {**eye_metrics, **body_metrics}
combined_metrics['frame_number'] = self.total_frames
combined_metrics['timestamp'] = time.time()
# Store frame metrics for later analysis
self.frame_metrics.append(combined_metrics)
return combined_metrics, body_frame
def get_comprehensive_assessment(self):
"""
Get a comprehensive assessment combining eye contact and body language insights.
Returns:
dict: Combined assessment with overall interview performance metrics
"""
# Get individual assessments
eye_contact_stats = self.eye_contact_analyzer.get_stats()
eye_contact_assessment = self.eye_contact_analyzer.get_interview_assessment()
body_language_stats = self.body_language_analyzer.get_stats()
body_language_assessment = self.body_language_analyzer.get_interview_assessment()
# Create combined assessment
assessment = {
'overall_score': (eye_contact_assessment['score'] * 0.4 +
body_language_assessment['overall_score'] * 0.6),
'eye_contact': {
'score': eye_contact_assessment['score'],
'patterns': eye_contact_assessment['patterns'],
'recommendations': eye_contact_assessment['recommendations']
},
'body_language': {
'confidence_score': body_language_assessment['confidence_score'],
'engagement_score': body_language_assessment['engagement_score'],
'comfort_score': body_language_assessment['comfort_score'],
'strengths': body_language_assessment['strengths'],
'areas_for_improvement': body_language_assessment['areas_for_improvement'],
'recommendations': body_language_assessment['recommendations']
},
'key_statistics': {
'total_duration_seconds': time.time() - self.start_time,
'total_frames': self.total_frames,
'eye_contact_percentage': eye_contact_stats['eye_contact_percentage'],
'longest_eye_contact_seconds': eye_contact_stats['longest_eye_contact_seconds'],
'average_contact_duration_seconds': eye_contact_stats['average_contact_duration_seconds'],
'shoulder_misalignment_percentage': body_language_stats['shoulder_misalignment_percentage'],
'leaning_forward_percentage': body_language_stats['leaning_forward_percentage'],
'head_tilt_percentage': body_language_stats['head_tilt_percentage'],
'arms_crossed_percentage': body_language_stats['arms_crossed_percentage'],
'self_touch_percentage': body_language_stats['self_touch_percentage'],
'fidgeting_percentage': body_language_stats['fidgeting_percentage'],
'pose_shifts_per_minute': body_language_stats['pose_shifts_per_minute']
},
'processing_info': {
'device_used': DEVICE
}
}
# Generate overall assessment text
if assessment['overall_score'] >= 8.5:
assessment['overall_assessment'] = "Excellent interview presence. Your body language and eye contact project confidence and engagement."
elif assessment['overall_score'] >= 7:
assessment['overall_assessment'] = "Strong interview presence with some minor areas for improvement."
elif assessment['overall_score'] >= 5.5:
assessment['overall_assessment'] = "Adequate interview presence with several areas that could be strengthened."
else:
assessment['overall_assessment'] = "Your interview presence needs significant improvement to make a positive impression."
return assessment
def example_interview_assessment():
"""
Generate an example interview assessment for demonstration purposes.
Returns:
dict: Example assessment
"""
assessment = {
'overall_score': 7.8,
'overall_assessment': "Strong interview presence with some minor areas for improvement.",
'eye_contact': {
'score': 8.0,
'patterns': ["Good eye contact maintained throughout most of the interview"],
'recommendations': ["Slightly reduce the intensity of eye contact in some moments"]
},
'body_language': {
'confidence_score': 7.5,
'engagement_score': 8.0,
'comfort_score': 7.0,
'strengths': [
"Good upright posture",
"Appropriate hand gestures",
"Engaged facial expressions"
],
'areas_for_improvement': [
"Occasional fidgeting",
"Some tension in shoulders"
],
'recommendations': [
"Practice relaxation techniques before interviews",
"Be mindful of hand movements when nervous",
"Maintain balanced posture throughout"
]
},
'key_statistics': {
'total_duration_seconds': 300.0,
'total_frames': 9000,
'eye_contact_percentage': 65.0,
'longest_eye_contact_seconds': 8.5,
'average_contact_duration_seconds': 4.2,
'shoulder_misalignment_percentage': 85.0,
'leaning_forward_percentage': 40.0,
'head_tilt_percentage': 15.0,
'arms_crossed_percentage': 10.0,
'self_touch_percentage': 25.0,
'fidgeting_percentage': 30.0,
'pose_shifts_per_minute': 2.5
},
'processing_info': {
'device_used': DEVICE
}
}
print("\n=== EXAMPLE INTERVIEW ASSESSMENT ===")
print(f"Overall Score: {assessment['overall_score']}/10")
print(f"Assessment: {assessment['overall_assessment']}")
print("\nEYE CONTACT:")
print(f"Score: {assessment['eye_contact']['score']}/10")
for pattern in assessment['eye_contact']['patterns']:
print(f"- {pattern}")
print("\nBODY LANGUAGE:")
print(f"Confidence Score: {assessment['body_language']['confidence_score']}/10")
print(f"Engagement Score: {assessment['body_language']['engagement_score']}/10")
print(f"Comfort Score: {assessment['body_language']['comfort_score']}/10")
print("\nSTRENGTHS:")
for strength in assessment['body_language']['strengths']:
print(f"+ {strength}")
print("\nAREAS FOR IMPROVEMENT:")
for area in assessment['body_language']['areas_for_improvement']:
print(f"- {area}")
print("\nPRIORITY RECOMMENDATIONS:")
for i, rec in enumerate(assessment['body_language']['recommendations'], 1):
print(f"{i}. {rec}")
return assessment
def analyze_video_file(video_path, display_video=False, save_results=False):
"""
Analyze body language in a video file and get statistics.
Args:
video_path: Path to the video file
display_video: Whether to display the video during analysis
save_results: Whether to save results to a JSON file
Returns:
dict: Body language statistics and assessment
"""
# Open the video file
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video file {video_path}")
return None
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
# Initialize analyzer
analyzer = BodyLanguageAnalyzer()
frame_number = 0
# Process each frame
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process the frame
metrics, analyzer, annotated_frame = analyze_body_language(frame, analyzer, display_video)
# Display progress
frame_number += 1
# Display the frame if requested
if display_video:
cv2.imshow("Body Language Analysis", annotated_frame)
# Break if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Clean up
cap.release()
if display_video:
cv2.destroyAllWindows()
# Get statistics and assessment
stats = analyzer.get_stats()
assessment = analyzer.get_interview_assessment()
# Combine results
results = {
"video_info": {
"path": video_path,
"frames": frame_count,
"fps": fps,
"duration_seconds": duration,
"device_used": DEVICE
},
"body_language_stats": stats,
"assessment": assessment
}
# Save results if requested
if save_results:
from datetime import datetime
output_dir = os.path.join(os.path.dirname(video_path), "results")
os.makedirs(output_dir, exist_ok=True)
output_file = f"{output_dir}/{Path(video_path).stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_body_language_analysis.json"
with open(output_file, 'w') as f:
json.dump(results, f, indent=4)
return results
if __name__ == "__main__":
example_interview_assessment()