Spaces:
No application file
No application file
| """ | |
| Real-Time Interview Recording and Violation Detection System | |
| UPDATED VERSION: | |
| - Fixed cv2.FONT_HERSHEY_BOLD error (use FONT_HERSHEY_SIMPLEX) | |
| - Captures violation images | |
| - Continues to next question after violation | |
| - Stores violation metadata for display in results | |
| """ | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import time | |
| import tempfile | |
| import os | |
| import speech_recognition as sr | |
| import warnings | |
| from collections import deque | |
| warnings.filterwarnings('ignore') | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
| class RecordingSystem: | |
| """Handles video/audio recording with real-time violation detection""" | |
| def __init__(self, models_dict): | |
| """ | |
| Initialize recording system with loaded models | |
| Args: | |
| models_dict: Dictionary containing pre-loaded AI models | |
| """ | |
| self.models = models_dict | |
| self.violation_detected = False | |
| self.violation_reason = "" | |
| # Frame boundaries (for sitting position: left, right, top only) | |
| self.frame_margin = 50 # pixels from edge | |
| # Position adjustment tracking | |
| self.position_adjusted = False | |
| self.baseline_environment = None # Store initial environment scan | |
| # Violation storage directory | |
| self.violation_images_dir = tempfile.mkdtemp(prefix="violations_") | |
| # Initialize pose detection if available | |
| try: | |
| import mediapipe as mp | |
| self.mp_pose = mp.solutions.pose | |
| self.pose_detector = self.mp_pose.Pose( | |
| static_image_mode=False, | |
| model_complexity=1, | |
| smooth_landmarks=True, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| self.pose_available = True | |
| except: | |
| self.pose_detector = None | |
| self.pose_available = False | |
| def save_violation_image(self, frame, question_number, violation_reason): | |
| """ | |
| Save an image of the violation for later display | |
| FIXED: Changed cv2.FONT_HERSHEY_BOLD to cv2.FONT_HERSHEY_SIMPLEX | |
| Args: | |
| frame: BGR image frame showing the violation | |
| question_number: Current question number | |
| violation_reason: Description of the violation | |
| Returns: | |
| Path to saved violation image | |
| """ | |
| try: | |
| # Create filename with timestamp | |
| timestamp = int(time.time() * 1000) | |
| filename = f"violation_q{question_number}_{timestamp}.jpg" | |
| filepath = os.path.join(self.violation_images_dir, filename) | |
| # Add violation text overlay to image | |
| overlay_frame = frame.copy() | |
| h, w = overlay_frame.shape[:2] | |
| # Add semi-transparent red overlay | |
| red_overlay = overlay_frame.copy() | |
| cv2.rectangle(red_overlay, (0, 0), (w, h), (0, 0, 255), -1) | |
| overlay_frame = cv2.addWeighted(overlay_frame, 0.7, red_overlay, 0.3, 0) | |
| # Add thick red border | |
| cv2.rectangle(overlay_frame, (0, 0), (w-1, h-1), (0, 0, 255), 10) | |
| # Add violation text with background - FIXED FONT | |
| text = "VIOLATION DETECTED" | |
| cv2.rectangle(overlay_frame, (0, 0), (w, 80), (0, 0, 0), -1) | |
| cv2.putText(overlay_frame, text, (w//2 - 200, 50), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) # FIXED: Was FONT_HERSHEY_BOLD | |
| # Add violation reason at bottom | |
| cv2.rectangle(overlay_frame, (0, h-100), (w, h), (0, 0, 0), -1) | |
| # Split long violation text into multiple lines | |
| words = violation_reason.split() | |
| lines = [] | |
| current_line = "" | |
| for word in words: | |
| test_line = current_line + " " + word if current_line else word | |
| if len(test_line) > 50: | |
| lines.append(current_line) | |
| current_line = word | |
| else: | |
| current_line = test_line | |
| if current_line: | |
| lines.append(current_line) | |
| # Draw violation reason lines | |
| y_offset = h - 90 | |
| for line in lines[:2]: # Max 2 lines | |
| cv2.putText(overlay_frame, line, (10, y_offset), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| y_offset += 30 | |
| # Save image | |
| cv2.imwrite(filepath, overlay_frame) | |
| return filepath | |
| except Exception as e: | |
| print(f"Error saving violation image: {e}") | |
| return None | |
| def scan_environment(self, frame): | |
| """ | |
| Scan and catalog the environment before test starts | |
| """ | |
| if self.models['yolo'] is None: | |
| return {'objects': [], 'positions': []} | |
| try: | |
| results = self.models['yolo'].predict(frame, conf=0.25, verbose=False) | |
| environment_data = { | |
| 'objects': [], | |
| 'positions': [], | |
| 'person_position': None | |
| } | |
| if results and len(results) > 0: | |
| names = self.models['yolo'].names | |
| boxes = results[0].boxes | |
| for box in boxes: | |
| cls_id = int(box.cls[0]) | |
| obj_name = names[cls_id] | |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() | |
| environment_data['objects'].append(obj_name) | |
| environment_data['positions'].append({ | |
| 'name': obj_name, | |
| 'bbox': (int(x1), int(y1), int(x2), int(y2)), | |
| 'center': (int((x1+x2)/2), int((y1+y2)/2)) | |
| }) | |
| if obj_name == 'person': | |
| environment_data['person_position'] = (int((x1+x2)/2), int((y1+y2)/2)) | |
| return environment_data | |
| except Exception as e: | |
| return {'objects': [], 'positions': []} | |
| def detect_new_objects(self, frame): | |
| """ | |
| Detect NEW objects that weren't in baseline environment | |
| """ | |
| if self.models['yolo'] is None or self.baseline_environment is None: | |
| return False, [] | |
| try: | |
| results = self.models['yolo'].predict(frame, conf=0.25, verbose=False) | |
| if results and len(results) > 0: | |
| names = self.models['yolo'].names | |
| boxes = results[0].boxes | |
| current_objects = [] | |
| for box in boxes: | |
| cls_id = int(box.cls[0]) | |
| obj_name = names[cls_id] | |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() | |
| current_center = (int((x1+x2)/2), int((y1+y2)/2)) | |
| current_objects.append({ | |
| 'name': obj_name, | |
| 'center': current_center, | |
| 'bbox': (int(x1), int(y1), int(x2), int(y2)) | |
| }) | |
| baseline_objects = self.baseline_environment['positions'] | |
| new_items = [] | |
| for curr_obj in current_objects: | |
| if curr_obj['name'] == 'person': | |
| continue | |
| is_baseline = False | |
| for base_obj in baseline_objects: | |
| if curr_obj['name'] == base_obj['name']: | |
| dist = np.sqrt( | |
| (curr_obj['center'][0] - base_obj['center'][0])**2 + | |
| (curr_obj['center'][1] - base_obj['center'][1])**2 | |
| ) | |
| if dist < 100: | |
| is_baseline = True | |
| break | |
| if not is_baseline: | |
| new_items.append(curr_obj['name']) | |
| if new_items: | |
| return True, list(set(new_items)) | |
| return False, [] | |
| except Exception as e: | |
| return False, [] | |
| def detect_suspicious_movements(self, frame): | |
| """Detect suspicious hand movements""" | |
| if self.models['hands'] is None: | |
| return False, "" | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| h, w = frame.shape[:2] | |
| try: | |
| hand_results = self.models['hands'].process(rgb_frame) | |
| if hand_results.multi_hand_landmarks: | |
| for hand_landmarks in hand_results.multi_hand_landmarks: | |
| wrist = hand_landmarks.landmark[0] | |
| index_tip = hand_landmarks.landmark[8] | |
| wrist_y = wrist.y * h | |
| tip_y = index_tip.y * h | |
| if wrist_y > h * 0.75: | |
| return True, "Hand movement below desk level detected" | |
| if wrist_y < h * 0.15: | |
| return True, "Suspicious hand movement at top of frame" | |
| except Exception as e: | |
| pass | |
| return False, "" | |
| def calculate_eye_gaze(self, face_landmarks, frame_shape): | |
| """Calculate if eyes are looking at camera""" | |
| h, w = frame_shape[:2] | |
| left_eye_indices = [468, 469, 470, 471, 472] | |
| right_eye_indices = [473, 474, 475, 476, 477] | |
| left_eye_center = [33, 133, 157, 158, 159, 160, 161, 163, 144, 145, 153, 154, 155] | |
| right_eye_center = [362, 263, 387, 386, 385, 384, 398, 382, 381, 380, 373, 374, 390] | |
| landmarks = face_landmarks.landmark | |
| left_iris_x = np.mean([landmarks[i].x for i in left_eye_indices if i < len(landmarks)]) | |
| left_eye_x = np.mean([landmarks[i].x for i in left_eye_center if i < len(landmarks)]) | |
| right_iris_x = np.mean([landmarks[i].x for i in right_eye_indices if i < len(landmarks)]) | |
| right_eye_x = np.mean([landmarks[i].x for i in right_eye_center if i < len(landmarks)]) | |
| left_gaze_ratio = (left_iris_x - left_eye_x) if left_iris_x and left_eye_x else 0 | |
| right_gaze_ratio = (right_iris_x - right_eye_x) if right_iris_x and right_eye_x else 0 | |
| avg_gaze = (left_gaze_ratio + right_gaze_ratio) / 2 | |
| return abs(avg_gaze) < 0.02 | |
| def estimate_head_pose(self, face_landmarks, frame_shape): | |
| """Estimate head pose angles""" | |
| h, w = frame_shape[:2] | |
| landmarks_3d = np.array([(lm.x * w, lm.y * h, lm.z) for lm in face_landmarks.landmark]) | |
| required_indices = [1, 33, 263, 61, 291] | |
| image_points = np.array([landmarks_3d[i] for i in required_indices], dtype="double") | |
| model_points = np.array([ | |
| (0.0, 0.0, 0.0), (-30.0, -125.0, -30.0), | |
| (30.0, -125.0, -30.0), (-60.0, -70.0, -60.0), | |
| (60.0, -70.0, -60.0) | |
| ]) | |
| focal_length = w | |
| center = (w / 2, h / 2) | |
| camera_matrix = np.array([ | |
| [focal_length, 0, center[0]], | |
| [0, focal_length, center[1]], | |
| [0, 0, 1] | |
| ], dtype="double") | |
| dist_coeffs = np.zeros((4, 1)) | |
| success, rotation_vector, _ = cv2.solvePnP( | |
| model_points, image_points, camera_matrix, | |
| dist_coeffs, flags=cv2.SOLVEPNP_ITERATIVE | |
| ) | |
| if success: | |
| rmat, _ = cv2.Rodrigues(rotation_vector) | |
| pose_mat = cv2.hconcat((rmat, rotation_vector)) | |
| _, _, _, _, _, _, euler = cv2.decomposeProjectionMatrix(pose_mat) | |
| yaw, pitch, roll = [float(a) for a in euler] | |
| return yaw, pitch, roll | |
| return 0, 0, 0 | |
| def detect_blink(self, face_landmarks): | |
| """Detect if eye is blinking""" | |
| upper_lid = face_landmarks.landmark[159] | |
| lower_lid = face_landmarks.landmark[145] | |
| eye_openness = abs(upper_lid.y - lower_lid.y) | |
| return eye_openness < 0.01 | |
| def analyze_lighting(self, frame): | |
| """Analyze lighting conditions""" | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| mean_brightness = np.mean(gray) | |
| std_brightness = np.std(gray) | |
| if mean_brightness < 60: | |
| return "Too Dark", mean_brightness | |
| elif mean_brightness > 200: | |
| return "Too Bright", mean_brightness | |
| elif std_brightness < 25: | |
| return "Low Contrast", mean_brightness | |
| else: | |
| return "Good", mean_brightness | |
| def check_frame_boundaries(self, frame, face_box): | |
| """Check if person is within frame boundaries""" | |
| if face_box is None: | |
| return False, "No face detected", "NO_FACE" | |
| h, w = frame.shape[:2] | |
| margin = self.frame_margin | |
| x, y, fw, fh = face_box | |
| face_center_x = x + fw // 2 | |
| face_top = y | |
| face_left = x | |
| face_right = x + fw | |
| if face_left < margin: | |
| return False, "Person too close to LEFT edge", "LEFT_VIOLATION" | |
| if face_right > (w - margin): | |
| return False, "Person too close to RIGHT edge", "RIGHT_VIOLATION" | |
| if face_top < margin: | |
| return False, "Person too close to TOP edge", "TOP_VIOLATION" | |
| return True, "Within boundaries", "OK" | |
| def detect_person_outside_frame(self, frame): | |
| """Detect if any person/living being is outside boundaries""" | |
| if self.models['yolo'] is None: | |
| return False, "", "" | |
| h, w = frame.shape[:2] | |
| margin = self.frame_margin | |
| try: | |
| results = self.models['yolo'].predict(frame, conf=0.4, verbose=False) | |
| if results and len(results) > 0: | |
| names = self.models['yolo'].names | |
| boxes = results[0].boxes | |
| living_beings = ['person', 'cat', 'dog', 'bird', 'horse', 'sheep', 'cow', | |
| 'elephant', 'bear', 'zebra', 'giraffe'] | |
| for i, box in enumerate(boxes): | |
| cls_id = int(box.cls[0]) | |
| obj_name = names[cls_id] | |
| if obj_name.lower() in living_beings: | |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() | |
| if x1 < margin or x2 < margin: | |
| return True, obj_name, "LEFT" | |
| if x1 > (w - margin) or x2 > (w - margin): | |
| return True, obj_name, "RIGHT" | |
| if y1 < margin or y2 < margin: | |
| return True, obj_name, "TOP" | |
| except Exception as e: | |
| pass | |
| return False, "", "" | |
| def detect_multiple_bodies(self, frame, num_faces): | |
| """Detect multiple bodies using pose and hand detection""" | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| body_count = 0 | |
| detected_parts = [] | |
| if self.pose_available and self.pose_detector: | |
| try: | |
| pose_results = self.pose_detector.process(rgb_frame) | |
| if pose_results.pose_landmarks: | |
| body_count += 1 | |
| detected_parts.append("body") | |
| landmarks = pose_results.pose_landmarks.landmark | |
| visible_shoulders = sum(1 for idx in [11, 12] | |
| if landmarks[idx].visibility > 0.5) | |
| visible_elbows = sum(1 for idx in [13, 14] | |
| if landmarks[idx].visibility > 0.5) | |
| if visible_shoulders > 2 or visible_elbows > 2: | |
| return True, "Multiple body parts detected (extra shoulders/arms)", body_count + 1 | |
| except Exception as e: | |
| pass | |
| if self.models['hands'] is not None: | |
| try: | |
| hand_results = self.models['hands'].process(rgb_frame) | |
| if hand_results.multi_hand_landmarks: | |
| num_hands = len(hand_results.multi_hand_landmarks) | |
| if num_hands > 2: | |
| detected_parts.append(f"{num_hands} hands") | |
| return True, f"Multiple persons detected ({num_hands} hands visible)", 2 | |
| if num_hands == 2: | |
| hand1 = hand_results.multi_hand_landmarks[0].landmark[0] | |
| hand2 = hand_results.multi_hand_landmarks[1].landmark[0] | |
| distance = np.sqrt((hand1.x - hand2.x)**2 + (hand1.y - hand2.y)**2) | |
| if distance > 0.7: | |
| detected_parts.append("widely separated hands") | |
| return True, "Suspicious hand positions (possible multiple persons)", 2 | |
| except Exception as e: | |
| pass | |
| if num_faces == 1 and body_count > 1: | |
| return True, "Body parts from multiple persons detected", 2 | |
| if num_faces > 1: | |
| return True, f"Multiple persons detected ({num_faces} faces)", num_faces | |
| return False, "", max(num_faces, body_count) | |
| def detect_hands_outside_main_person(self, frame, face_box): | |
| """Detect hands outside main person's area""" | |
| if self.models['hands'] is None or face_box is None: | |
| return False, "" | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| h, w = frame.shape[:2] | |
| try: | |
| hand_results = self.models['hands'].process(rgb_frame) | |
| if hand_results.multi_hand_landmarks: | |
| x, y, fw, fh = face_box | |
| expected_left = max(0, x - fw) | |
| expected_right = min(w, x + fw * 2) | |
| expected_top = max(0, y - fh) | |
| expected_bottom = min(h, y + fh * 4) | |
| for hand_landmarks in hand_results.multi_hand_landmarks: | |
| hand_x = hand_landmarks.landmark[0].x * w | |
| hand_y = hand_landmarks.landmark[0].y * h | |
| if (hand_x < expected_left - 50 or hand_x > expected_right + 50 or | |
| hand_y < expected_top - 50 or hand_y > expected_bottom + 50): | |
| return True, "Hand detected outside main person's area" | |
| except Exception as e: | |
| pass | |
| return False, "" | |
| def has_skin_tone(self, region): | |
| """Check if region contains skin-like colors""" | |
| if region.size == 0: | |
| return False | |
| hsv = cv2.cvtColor(region, cv2.COLOR_BGR2HSV) | |
| lower_skin1 = np.array([0, 20, 70], dtype=np.uint8) | |
| upper_skin1 = np.array([20, 255, 255], dtype=np.uint8) | |
| lower_skin2 = np.array([0, 20, 0], dtype=np.uint8) | |
| upper_skin2 = np.array([20, 150, 255], dtype=np.uint8) | |
| mask1 = cv2.inRange(hsv, lower_skin1, upper_skin1) | |
| mask2 = cv2.inRange(hsv, lower_skin2, upper_skin2) | |
| mask = cv2.bitwise_or(mask1, mask2) | |
| skin_ratio = np.sum(mask > 0) / mask.size | |
| return skin_ratio > 0.3 | |
| def detect_intrusion_at_edges(self, frame, face_box): | |
| """Detect body parts intruding from frame edges""" | |
| if face_box is None: | |
| return False, "" | |
| h, w = frame.shape[:2] | |
| x, y, fw, fh = face_box | |
| edge_width = 80 | |
| left_region = frame[:, :edge_width] | |
| right_region = frame[:, w-edge_width:] | |
| top_left = frame[:edge_width, :w//3] | |
| top_right = frame[:edge_width, 2*w//3:] | |
| face_center_x = x + fw // 2 | |
| face_far_from_left = face_center_x > w * 0.3 | |
| face_far_from_right = face_center_x < w * 0.7 | |
| if face_far_from_left and self.has_skin_tone(left_region): | |
| if self.models['hands']: | |
| rgb_region = cv2.cvtColor(left_region, cv2.COLOR_BGR2RGB) | |
| try: | |
| result = self.models['hands'].process(rgb_region) | |
| if result.multi_hand_landmarks: | |
| return True, "Body part detected at left edge (another person)" | |
| except: | |
| pass | |
| if face_far_from_right and self.has_skin_tone(right_region): | |
| if self.models['hands']: | |
| rgb_region = cv2.cvtColor(right_region, cv2.COLOR_BGR2RGB) | |
| try: | |
| result = self.models['hands'].process(rgb_region) | |
| if result.multi_hand_landmarks: | |
| return True, "Body part detected at right edge (another person)" | |
| except: | |
| pass | |
| if y > h * 0.2: | |
| if self.has_skin_tone(top_left) or self.has_skin_tone(top_right): | |
| return True, "Body part detected at top edge (another person)" | |
| return False, "" | |
| def draw_frame_boundaries(self, frame): | |
| """Draw visible frame boundaries""" | |
| h, w = frame.shape[:2] | |
| margin = self.frame_margin | |
| overlay = frame.copy() | |
| cv2.line(overlay, (margin, 0), (margin, h), (0, 255, 0), 3) | |
| cv2.line(overlay, (w - margin, 0), (w - margin, h), (0, 255, 0), 3) | |
| cv2.line(overlay, (0, margin), (w, margin), (0, 255, 0), 3) | |
| cv2.rectangle(overlay, (margin, margin), (w - margin, h), (0, 255, 0), 2) | |
| frame_with_boundaries = cv2.addWeighted(frame, 0.7, overlay, 0.3, 0) | |
| corner_size = 30 | |
| cv2.line(frame_with_boundaries, (margin, margin), (margin + corner_size, margin), (0, 255, 0), 3) | |
| cv2.line(frame_with_boundaries, (margin, margin), (margin, margin + corner_size), (0, 255, 0), 3) | |
| cv2.line(frame_with_boundaries, (w - margin, margin), (w - margin - corner_size, margin), (0, 255, 0), 3) | |
| cv2.line(frame_with_boundaries, (w - margin, margin), (w - margin, margin + corner_size), (0, 255, 0), 3) | |
| cv2.putText(frame_with_boundaries, "Stay within GREEN boundaries", | |
| (w//2 - 200, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
| return frame_with_boundaries | |
| def pre_test_setup_phase(self, ui_callbacks, timeout=60): | |
| """ | |
| ONE-TIME pre-test setup phase with environment scanning | |
| """ | |
| if self.position_adjusted: | |
| return True | |
| cap = cv2.VideoCapture(0) | |
| if not cap.isOpened(): | |
| return False | |
| start_time = time.time() | |
| position_ok_counter = 0 | |
| required_stable_frames = 30 | |
| ui_callbacks['countdown_update']("📸 ONE-TIME SETUP: Adjust your position within the GREEN frame") | |
| while (time.time() - start_time) < timeout: | |
| ret, frame = cap.read() | |
| if not ret: | |
| continue | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| h, w = frame.shape[:2] | |
| frame_with_boundaries = self.draw_frame_boundaries(frame) | |
| face_box = None | |
| is_ready = False | |
| status_message = "Detecting face..." | |
| status_color = (255, 165, 0) | |
| if self.models['face_mesh'] is not None: | |
| face_results = self.models['face_mesh'].process(rgb_frame) | |
| if face_results.multi_face_landmarks: | |
| num_faces = len(face_results.multi_face_landmarks) | |
| if num_faces > 1: | |
| status_message = "⚠️ Multiple faces detected! Only ONE person allowed" | |
| status_color = (0, 0, 255) | |
| position_ok_counter = 0 | |
| elif num_faces == 1: | |
| face_landmarks = face_results.multi_face_landmarks[0] | |
| landmarks_2d = np.array([(lm.x * w, lm.y * h) for lm in face_landmarks.landmark]) | |
| x_coords = landmarks_2d[:, 0] | |
| y_coords = landmarks_2d[:, 1] | |
| face_box = (int(np.min(x_coords)), int(np.min(y_coords)), | |
| int(np.max(x_coords) - np.min(x_coords)), | |
| int(np.max(y_coords) - np.min(y_coords))) | |
| within_bounds, boundary_msg, boundary_status = self.check_frame_boundaries(frame, face_box) | |
| outside_detected, obj_type, location = self.detect_person_outside_frame(frame) | |
| if outside_detected: | |
| status_message = f"⚠️ {obj_type.upper()} detected outside frame ({location} side)!" | |
| status_color = (0, 0, 255) | |
| position_ok_counter = 0 | |
| elif not within_bounds: | |
| status_message = f"⚠️ {boundary_msg} - Please adjust!" | |
| status_color = (0, 0, 255) | |
| position_ok_counter = 0 | |
| if boundary_status == "LEFT_VIOLATION": | |
| cv2.rectangle(frame_with_boundaries, (0, 0), (self.frame_margin, h), (0, 0, 255), -1) | |
| elif boundary_status == "RIGHT_VIOLATION": | |
| cv2.rectangle(frame_with_boundaries, (w - self.frame_margin, 0), (w, h), (0, 0, 255), -1) | |
| elif boundary_status == "TOP_VIOLATION": | |
| cv2.rectangle(frame_with_boundaries, (0, 0), (w, self.frame_margin), (0, 0, 255), -1) | |
| else: | |
| position_ok_counter += 1 | |
| progress = min(100, int((position_ok_counter / required_stable_frames) * 100)) | |
| status_message = f"✅ Good position! Hold steady... {progress}%" | |
| status_color = (0, 255, 0) | |
| if position_ok_counter >= required_stable_frames: | |
| is_ready = True | |
| else: | |
| status_message = "❌ No face detected - Please position yourself in frame" | |
| status_color = (0, 0, 255) | |
| position_ok_counter = 0 | |
| overlay_height = 140 | |
| overlay = frame_with_boundaries.copy() | |
| cv2.rectangle(overlay, (0, h - overlay_height), (w, h), (0, 0, 0), -1) | |
| frame_with_boundaries = cv2.addWeighted(frame_with_boundaries, 0.7, overlay, 0.3, 0) | |
| cv2.putText(frame_with_boundaries, status_message, (10, h - 110), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2) | |
| cv2.putText(frame_with_boundaries, "Instructions:", (10, h - 80), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
| cv2.putText(frame_with_boundaries, "• Keep your face within GREEN boundaries", (10, h - 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| cv2.putText(frame_with_boundaries, "• Ensure no one else is visible", (10, h - 40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| cv2.putText(frame_with_boundaries, "• Remove all unauthorized items from view", (10, h - 20), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) | |
| ui_callbacks['video_update'](cv2.resize(frame_with_boundaries, (640, 480))) | |
| elapsed = int(time.time() - start_time) | |
| ui_callbacks['timer_update'](f"⏱️ Setup time: {elapsed}s / {timeout}s") | |
| if is_ready: | |
| ui_callbacks['countdown_update']("🔍 Scanning environment... Please stay still") | |
| time.sleep(1) | |
| baseline_frames = [] | |
| for _ in range(10): | |
| ret, scan_frame = cap.read() | |
| if ret: | |
| baseline_frames.append(scan_frame) | |
| time.sleep(0.1) | |
| if baseline_frames: | |
| self.baseline_environment = self.scan_environment(baseline_frames[len(baseline_frames)//2]) | |
| success_frame = frame_with_boundaries.copy() | |
| cv2.rectangle(success_frame, (0, 0), (w, h), (0, 255, 0), 10) | |
| cv2.putText(success_frame, "SETUP COMPLETE!", | |
| (w//2 - 180, h//2 - 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3) | |
| cv2.putText(success_frame, "Test will begin shortly...", | |
| (w//2 - 180, h//2 + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) | |
| ui_callbacks['video_update'](cv2.resize(success_frame, (640, 480))) | |
| time.sleep(3) | |
| cap.release() | |
| ui_callbacks['countdown_update']('') | |
| self.position_adjusted = True | |
| return True | |
| time.sleep(0.03) | |
| cap.release() | |
| ui_callbacks['countdown_update']('⚠️ Setup timeout - Please try again') | |
| return False | |
| def record_interview(self, question_data, duration, ui_callbacks): | |
| """ | |
| DEPRECATED: Use record_continuous_interview() instead | |
| Kept for backward compatibility | |
| """ | |
| result = self.record_continuous_interview([question_data], duration, ui_callbacks) | |
| if isinstance(result, dict) and 'questions_results' in result: | |
| if result['questions_results']: | |
| first_result = result['questions_results'][0] | |
| first_result['video_path'] = result.get('session_video_path', '') | |
| first_result['violation_detected'] = len(first_result.get('violations', [])) > 0 | |
| first_result['violation_reason'] = first_result['violations'][0]['reason'] if first_result.get('violations') else '' | |
| return first_result | |
| return {"error": "Recording failed"} | |
| def record_audio_to_file(self, duration, path): | |
| """Record audio to WAV file""" | |
| r = sr.Recognizer() | |
| try: | |
| with sr.Microphone() as source: | |
| r.adjust_for_ambient_noise(source, duration=0.6) | |
| audio = r.record(source, duration=duration) | |
| with open(path, "wb") as f: | |
| f.write(audio.get_wav_data()) | |
| return path | |
| except: | |
| return None | |
| def transcribe_audio(self, path): | |
| """Transcribe audio file to text""" | |
| r = sr.Recognizer() | |
| try: | |
| with sr.AudioFile(path) as source: | |
| audio = r.record(source) | |
| text = r.recognize_google(audio) | |
| return text if text.strip() else "[Could not understand audio]" | |
| except sr.UnknownValueError: | |
| return "[Could not understand audio]" | |
| except sr.RequestError: | |
| return "[Speech recognition service unavailable]" | |
| except: | |
| return "[Could not understand audio]" | |
| def record_continuous_interview(self, questions_list, duration_per_question, ui_callbacks): | |
| """ | |
| Record ALL questions continuously - continues even if violations occur | |
| Captures violation images and stores them for display in results | |
| """ | |
| # ========== PRE-TEST SETUP ========== | |
| ui_callbacks['status_update']("**🔧 Initializing test environment...**") | |
| setup_success = self.pre_test_setup_phase(ui_callbacks, timeout=90) | |
| if not setup_success: | |
| return {"error": "Setup phase failed or timeout"} | |
| # ========== INSTRUCTIONS ========== | |
| ui_callbacks['countdown_update']("✅ Setup complete! Please read the instructions...") | |
| ui_callbacks['status_update'](f""" | |
| **📋 TEST INSTRUCTIONS:** | |
| - You will answer **{len(questions_list)} questions** continuously | |
| - Each question has **{duration_per_question} seconds** to answer | |
| - **Important:** Even if a violation is detected, the interview will continue | |
| - All violations will be reviewed at the end | |
| - Stay within boundaries and maintain focus throughout | |
| **The test will begin in 10 seconds...** | |
| """) | |
| time.sleep(10) | |
| # ========== START RECORDING ========== | |
| all_results = [] | |
| for i in range(3, 0, -1): | |
| ui_callbacks['countdown_update'](f"🎬 Test starts in {i}...") | |
| time.sleep(1) | |
| ui_callbacks['countdown_update']('') | |
| cap = cv2.VideoCapture(0) | |
| if not cap.isOpened(): | |
| return {"error": "Unable to access camera"} | |
| session_video_temp = tempfile.NamedTemporaryFile(delete=False, suffix=".avi") | |
| session_video_path = session_video_temp.name | |
| session_video_temp.close() | |
| fourcc = cv2.VideoWriter_fourcc(*"XVID") | |
| out = cv2.VideoWriter(session_video_path, fourcc, 15.0, (640, 480)) | |
| session_start_time = time.time() | |
| session_violations = [] | |
| # ========== LOOP THROUGH ALL QUESTIONS ========== | |
| for q_idx, question_data in enumerate(questions_list): | |
| ui_callbacks['countdown_update'](f"📝 Question {q_idx + 1} of {len(questions_list)}") | |
| question_text = question_data.get('question', 'No question text') | |
| question_tip = question_data.get('tip', 'Speak clearly and confidently') | |
| ui_callbacks['question_update'](q_idx + 1, question_text, question_tip) | |
| ui_callbacks['status_update'](f""" | |
| **⏱️ Recording Question {q_idx + 1}** | |
| Time to answer: **{duration_per_question} seconds** | |
| """) | |
| for i in range(3, 0, -1): | |
| ui_callbacks['timer_update'](f"⏱️ Starting in {i}s...") | |
| time.sleep(1) | |
| audio_temp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| audio_path = audio_temp.name | |
| audio_temp.close() | |
| audio_thread = threading.Thread( | |
| target=lambda path=audio_path: self.record_audio_to_file(duration_per_question, path), | |
| daemon=True | |
| ) | |
| audio_thread.start() | |
| # Question recording state | |
| question_start_time = time.time() | |
| frames = [] | |
| question_violations = [] # Store violations for THIS question | |
| no_face_start = None | |
| look_away_start = None | |
| eye_contact_frames = 0 | |
| total_frames = 0 | |
| blink_count = 0 | |
| prev_blink = False | |
| face_box = None | |
| # ========== RECORDING LOOP FOR THIS QUESTION ========== | |
| while (time.time() - question_start_time) < duration_per_question: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| frames.append(frame.copy()) | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| h, w, _ = frame.shape | |
| total_frames += 1 | |
| lighting_status, brightness = self.analyze_lighting(frame) | |
| num_faces = 0 | |
| looking_at_camera = False | |
| attention_status = "No Face" | |
| # ========== FACE DETECTION & VIOLATION CHECKS ========== | |
| if self.models['face_mesh'] is not None: | |
| face_results = self.models['face_mesh'].process(rgb_frame) | |
| if face_results.multi_face_landmarks: | |
| num_faces = len(face_results.multi_face_landmarks) | |
| # Check multiple bodies | |
| is_multi_body, multi_msg, body_count = self.detect_multiple_bodies(frame, num_faces) | |
| if is_multi_body: | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, multi_msg) | |
| question_violations.append({ | |
| 'reason': multi_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| # Continue to next question instead of breaking | |
| break | |
| if num_faces > 1: | |
| violation_msg = f"Multiple persons detected ({num_faces} faces)" | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) | |
| question_violations.append({ | |
| 'reason': violation_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| elif num_faces == 1: | |
| no_face_start = None | |
| face_landmarks = face_results.multi_face_landmarks[0] | |
| try: | |
| landmarks_2d = np.array([(lm.x * w, lm.y * h) for lm in face_landmarks.landmark]) | |
| x_coords = landmarks_2d[:, 0] | |
| y_coords = landmarks_2d[:, 1] | |
| face_box = (int(np.min(x_coords)), int(np.min(y_coords)), | |
| int(np.max(x_coords) - np.min(x_coords)), | |
| int(np.max(y_coords) - np.min(y_coords))) | |
| # Check boundaries | |
| within_bounds, boundary_msg, boundary_status = self.check_frame_boundaries(frame, face_box) | |
| if not within_bounds: | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, boundary_msg) | |
| question_violations.append({ | |
| 'reason': boundary_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| # Check person outside frame | |
| outside_detected, obj_type, location = self.detect_person_outside_frame(frame) | |
| if outside_detected: | |
| violation_msg = f"{obj_type.upper()} detected outside frame ({location} side)" | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) | |
| question_violations.append({ | |
| 'reason': violation_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| # Check intrusions | |
| is_intrusion, intrusion_msg = self.detect_intrusion_at_edges(frame, face_box) | |
| if is_intrusion: | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, intrusion_msg) | |
| question_violations.append({ | |
| 'reason': intrusion_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| # Check hands outside | |
| is_hand_violation, hand_msg = self.detect_hands_outside_main_person(frame, face_box) | |
| if is_hand_violation: | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, hand_msg) | |
| question_violations.append({ | |
| 'reason': hand_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| # Suspicious movements | |
| is_suspicious, sus_msg = self.detect_suspicious_movements(frame) | |
| if is_suspicious: | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, sus_msg) | |
| question_violations.append({ | |
| 'reason': sus_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| yaw, pitch, roll = self.estimate_head_pose(face_landmarks, frame.shape) | |
| gaze_centered = self.calculate_eye_gaze(face_landmarks, frame.shape) | |
| is_blink = self.detect_blink(face_landmarks) | |
| if is_blink and not prev_blink: | |
| blink_count += 1 | |
| prev_blink = is_blink | |
| head_looking_forward = abs(yaw) <= 20 and abs(pitch) <= 20 | |
| if head_looking_forward and gaze_centered: | |
| look_away_start = None | |
| looking_at_camera = True | |
| eye_contact_frames += 1 | |
| attention_status = "Looking at Camera ✓" | |
| else: | |
| if look_away_start is None: | |
| look_away_start = time.time() | |
| attention_status = "Looking Away" | |
| else: | |
| elapsed = time.time() - look_away_start | |
| if elapsed > 2.0: | |
| violation_msg = "Looking away for >2 seconds" | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) | |
| question_violations.append({ | |
| 'reason': violation_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| else: | |
| attention_status = f"Looking Away ({elapsed:.1f}s)" | |
| except: | |
| attention_status = "Face Error" | |
| else: | |
| if no_face_start is None: | |
| no_face_start = time.time() | |
| attention_status = "No Face Visible" | |
| else: | |
| elapsed = time.time() - no_face_start | |
| if elapsed > 2.0: | |
| violation_msg = "No face visible for >2 seconds" | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) | |
| question_violations.append({ | |
| 'reason': violation_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| else: | |
| attention_status = f"No Face ({elapsed:.1f}s)" | |
| # Check for new objects | |
| if total_frames % 20 == 0: | |
| new_detected, new_items = self.detect_new_objects(frame) | |
| if new_detected: | |
| violation_msg = f"New item(s) brought into view: {', '.join(new_items)}" | |
| violation_img_path = self.save_violation_image(frame, q_idx + 1, violation_msg) | |
| question_violations.append({ | |
| 'reason': violation_msg, | |
| 'timestamp': time.time() - question_start_time, | |
| 'image_path': violation_img_path | |
| }) | |
| break | |
| # Display frame | |
| overlay = frame.copy() | |
| cv2.rectangle(overlay, (0, 0), (w, 120), (0, 0, 0), -1) | |
| frame_display = cv2.addWeighted(frame, 0.6, overlay, 0.4, 0) | |
| # Show violation warning if any occurred | |
| status_color = (0, 255, 0) if len(question_violations) == 0 else (0, 165, 255) | |
| violation_text = f" | ⚠️ {len(question_violations)} violation(s)" if question_violations else "" | |
| cv2.putText(frame_display, f"Q{q_idx+1}/{len(questions_list)} - {attention_status}{violation_text}", (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, status_color, 2) | |
| cv2.putText(frame_display, f"Lighting: {lighting_status}", (10, 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| cv2.putText(frame_display, f"Eye Contact: {int((eye_contact_frames/max(total_frames,1))*100)}%", (10, 90), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| elapsed_q = time.time() - question_start_time | |
| remaining = max(0, int(duration_per_question - elapsed_q)) | |
| cv2.putText(frame_display, f"Time: {remaining}s", (10, 115), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| ui_callbacks['video_update'](cv2.resize(frame_display, (480, 360))) | |
| eye_contact_pct = (eye_contact_frames / max(total_frames, 1)) * 100 | |
| status_text = f""" | |
| **Question {q_idx + 1} of {len(questions_list)}** | |
| 👁️ **Eye Contact:** {eye_contact_pct:.1f}% | |
| 😴 **Blinks:** {blink_count} | |
| 💡 **Lighting:** {lighting_status} | |
| ⚠️ **Status:** {attention_status} | |
| """ | |
| if question_violations: | |
| status_text += f"\n\n⚠️ **Violations in this question:** {len(question_violations)}" | |
| ui_callbacks['status_update'](status_text) | |
| overall_progress = (q_idx + (elapsed_q / duration_per_question)) / len(questions_list) | |
| overall_progress = max(0.0, min(1.0, overall_progress)) | |
| ui_callbacks['progress_update'](overall_progress) | |
| ui_callbacks['timer_update'](f"🎥 Q{q_idx+1}/{len(questions_list)} - {remaining}s remaining") | |
| time.sleep(0.05) | |
| # Wait for audio | |
| audio_thread.join(timeout=duration_per_question + 5) | |
| # Transcribe | |
| transcript = "" | |
| if os.path.exists(audio_path): | |
| transcript = self.transcribe_audio(audio_path) | |
| # Add violations to session list | |
| if question_violations: | |
| session_violations.extend([f"Q{q_idx+1}: {v['reason']}" for v in question_violations]) | |
| # Store results for this question | |
| question_result = { | |
| 'question_number': q_idx + 1, | |
| 'question_text': question_data.get('question', ''), | |
| 'audio_path': audio_path, | |
| 'frames': frames, | |
| 'violations': question_violations, # Now includes image paths | |
| 'violation_detected': len(question_violations) > 0, | |
| 'eye_contact_pct': (eye_contact_frames / max(total_frames, 1)) * 100, | |
| 'blink_count': blink_count, | |
| 'face_box': face_box, | |
| 'transcript': transcript, | |
| 'lighting_status': lighting_status | |
| } | |
| all_results.append(question_result) | |
| # Show message and continue to next question | |
| if question_violations: | |
| ui_callbacks['countdown_update'](f"⚠️ Violation detected in Q{q_idx + 1}! Continuing to next question in 3s...") | |
| time.sleep(3) | |
| elif q_idx < len(questions_list) - 1: | |
| ui_callbacks['countdown_update'](f"✅ Question {q_idx + 1} complete! Next question in 3s...") | |
| time.sleep(3) | |
| # Cleanup | |
| cap.release() | |
| out.release() | |
| # Clear UI | |
| ui_callbacks['video_update'](None) | |
| ui_callbacks['progress_update'](1.0) | |
| # Final message | |
| total_violations = sum(len(r.get('violations', [])) for r in all_results) | |
| if total_violations > 0: | |
| ui_callbacks['countdown_update'](f"⚠️ TEST COMPLETED WITH {total_violations} VIOLATION(S)") | |
| ui_callbacks['status_update'](f"**⚠️ {total_violations} violation(s) detected across all questions. Review results below.**") | |
| else: | |
| ui_callbacks['countdown_update']("✅ TEST COMPLETED SUCCESSFULLY!") | |
| ui_callbacks['status_update']("**All questions answered with no violations. Processing results...**") | |
| ui_callbacks['timer_update']("") | |
| # Return comprehensive results | |
| return { | |
| 'questions_results': all_results, | |
| 'session_video_path': session_video_path, | |
| 'total_questions': len(questions_list), | |
| 'completed_questions': len(all_results), | |
| 'session_violations': session_violations, | |
| 'total_violations': total_violations, | |
| 'violation_images_dir': self.violation_images_dir, | |
| 'session_duration': time.time() - session_start_time | |
| } | |
| #### | |