Spaces:
Running
Running
| """ | |
| This file is the main runtime orchestrator for one person inside one frame. | |
| It connects pose feature generation, posture classification, hand cropping, phone detection, | |
| and final fusion into one clean modular flow. This is the clean replacement for the large | |
| processOnePerson logic from your current project while keeping the same decision behavior. | |
| """ | |
| import time | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from src.components.distraction_fusion import DistractionFusion | |
| from src.components.hand_cropper import HandCropper | |
| from src.components.phone_detector import PhoneDetector | |
| from src.components.pose_feature_generator import PoseFeatureGenerator | |
| from src.components.posture_detector import PostureDetector | |
| from src.config.constants import ( | |
| STATE_BACKSIDE, | |
| STATE_NOT_USING, | |
| STATE_OUT_OF_FRAME, | |
| STATE_SUSPICIOUS, | |
| STATE_TO_BE_CLASSIFIED, | |
| ) | |
| from src.entity.config_entity import ( | |
| InferenceConfig, | |
| MMPoseConfig, | |
| PhoneDetectorConfig, | |
| PostureModelConfig, | |
| ) | |
| from src.utils.logger import get_logger | |
| from src.utils.opencv_utils import ( | |
| crop_frame, | |
| relative_to_absolute, | |
| render_detection_rectangle, | |
| resize_frame_to_square, | |
| ) | |
| class RuntimeDetector: | |
| """ | |
| End-to-end runtime detector for one person. | |
| """ | |
| def __init__( | |
| self, | |
| mmpose_config: MMPoseConfig, | |
| posture_model_config: PostureModelConfig, | |
| phone_detector_config: PhoneDetectorConfig, | |
| inference_config: InferenceConfig, | |
| log_dir: Path | None = None, | |
| log_level: str = "INFO", | |
| ) -> None: | |
| self.mmpose_config = mmpose_config | |
| self.posture_model_config = posture_model_config | |
| self.phone_detector_config = phone_detector_config | |
| self.inference_config = inference_config | |
| self.logger = get_logger( | |
| self.__class__.__name__, log_dir=log_dir, level=log_level | |
| ) | |
| self.pose_feature_generator = PoseFeatureGenerator( | |
| mmpose_config=mmpose_config, | |
| posture_model_config=posture_model_config, | |
| log_dir=log_dir, | |
| log_level=log_level, | |
| ) | |
| self.posture_detector = PostureDetector( | |
| config=posture_model_config, | |
| log_dir=log_dir, | |
| log_level=log_level, | |
| ) | |
| self.hand_cropper = HandCropper( | |
| mmpose_config=mmpose_config, | |
| phone_detector_config=phone_detector_config, | |
| log_dir=log_dir, | |
| log_level=log_level, | |
| ) | |
| self.phone_detector = PhoneDetector( | |
| config=phone_detector_config, | |
| log_dir=log_dir, | |
| log_level=log_level, | |
| ) | |
| self.fusion = DistractionFusion(log_dir=log_dir, log_level=log_level) | |
| self.posture_detector.load() | |
| def _initial_state_checks( | |
| self, keypoints: np.ndarray, xyxy: np.ndarray | |
| ) -> tuple[int, str]: | |
| """ | |
| Reproduce your current lightweight state machine checks | |
| before posture classification. | |
| """ | |
| state = STATE_TO_BE_CLASSIFIED | |
| score_text = "" | |
| keypoint_score_threshold = ( | |
| self.inference_config.posture.keypoint_score_threshold | |
| ) | |
| missing_threshold = ( | |
| self.inference_config.posture.out_of_frame_missing_keypoints_threshold | |
| ) | |
| visible_keypoints = keypoints[:13, 2] | |
| if np.sum(visible_keypoints < keypoint_score_threshold) >= missing_threshold: | |
| return STATE_OUT_OF_FRAME, score_text | |
| left_shoulder_x = keypoints[self.mmpose_config.keypoints.left_shoulder_index][0] | |
| right_shoulder_x = keypoints[self.mmpose_config.keypoints.right_shoulder_index][ | |
| 0 | |
| ] | |
| left_shoulder_score = keypoints[ | |
| self.mmpose_config.keypoints.left_shoulder_index | |
| ][2] | |
| right_shoulder_score = keypoints[ | |
| self.mmpose_config.keypoints.right_shoulder_index | |
| ][2] | |
| backside_ratio = (left_shoulder_x - right_shoulder_x) / ( | |
| (xyxy[2] - xyxy[0]) + np.finfo(np.float32).eps | |
| ) | |
| if ( | |
| right_shoulder_score > keypoint_score_threshold | |
| and left_shoulder_score > keypoint_score_threshold | |
| and backside_ratio < self.inference_config.posture.backside_ratio_threshold | |
| ): | |
| numeric_value = ( | |
| (right_shoulder_score + left_shoulder_score) / 2.0 + 1.0 | |
| ) / 2.0 | |
| score_text = f"{numeric_value:.2f}" | |
| return STATE_BACKSIDE, score_text | |
| return state, score_text | |
| def _run_posture_stage(self, keypoints: np.ndarray) -> tuple[int, str]: | |
| """ | |
| Convert keypoints to pose features and run posture classifier. | |
| """ | |
| feature_tensor = self.pose_feature_generator.build_feature_tensor( | |
| keypoints, normalize=True | |
| ) | |
| posture_result = self.posture_detector.predict(feature_tensor) | |
| state = ( | |
| STATE_SUSPICIOUS if posture_result["class_signal"] == 1 else STATE_NOT_USING | |
| ) | |
| score_text = posture_result["score_text"] | |
| return state, score_text | |
| def _run_phone_stage( | |
| self, | |
| frame: np.ndarray, | |
| original_frame: np.ndarray, | |
| keypoints: np.ndarray, | |
| xyxy: np.ndarray, | |
| ) -> tuple[bool, np.ndarray | None]: | |
| """ | |
| Crop hands, run phone detection, and return phone status. | |
| """ | |
| use_trained_model = ( | |
| self.phone_detector_config.inference.use_trained_model_by_default | |
| ) | |
| primary_crop, secondary_crop, spare_ratio = ( | |
| self.hand_cropper.get_priority_hand_crops( | |
| frame=original_frame, | |
| keypoints=keypoints, | |
| xyxy=xyxy, | |
| ) | |
| ) | |
| for crop_index, crop_item in enumerate([primary_crop, secondary_crop]): | |
| if crop_item is None: | |
| continue | |
| hand_frame, hand_xyxy = crop_item | |
| if hand_frame is None or hand_xyxy is None or hand_frame.size == 0: | |
| continue | |
| subframe_wh = ( | |
| abs(hand_xyxy[2] - hand_xyxy[0]), | |
| abs(hand_xyxy[3] - hand_xyxy[1]), | |
| ) | |
| resized_hand = resize_frame_to_square( | |
| frame=hand_frame, | |
| edge_length=( | |
| self.phone_detector_config.inference.image_size | |
| if use_trained_model | |
| else 640 | |
| ), | |
| ratio_threshold=0.5625, | |
| ) | |
| rgb_hand = cv2.cvtColor(resized_hand, cv2.COLOR_BGR2RGB) | |
| phone_result = self.phone_detector.predict( | |
| rgb_hand, use_trained=use_trained_model | |
| ) | |
| render_detection_rectangle( | |
| frame=frame, | |
| text=f"Hand {crop_index}", | |
| xyxy=hand_xyxy, | |
| color="green" if not phone_result["detected"] else "pink", | |
| ) | |
| if phone_result["detected"]: | |
| relative_xyxy = np.array( | |
| phone_result["relative_xyxy"], dtype=np.float32 | |
| ) | |
| from_mother_wh = ( | |
| ( | |
| self.phone_detector_config.inference.image_size | |
| if use_trained_model | |
| else 640 | |
| ), | |
| ( | |
| self.phone_detector_config.inference.image_size | |
| if use_trained_model | |
| else 640 | |
| ), | |
| ) | |
| absolute_xyxy = relative_to_absolute( | |
| from_mother_wh=from_mother_wh, | |
| to_mother_wh=subframe_wh, | |
| from_child_xyxy=relative_xyxy, | |
| to_mother_xy=(hand_xyxy[0], hand_xyxy[1]), | |
| ) | |
| render_detection_rectangle( | |
| frame=frame, | |
| text=phone_result["text"], | |
| xyxy=absolute_xyxy, | |
| color="pink", | |
| ) | |
| return True, None | |
| if spare_ratio < self.inference_config.phone.spare_ratio_threshold: | |
| break | |
| return False, None | |
| def process_one_person( | |
| self, | |
| frame: np.ndarray, | |
| original_frame: np.ndarray, | |
| keypoints: np.ndarray, | |
| xyxy: np.ndarray, | |
| runtime_parameters: dict, | |
| ) -> dict: | |
| """ | |
| Main entrypoint for one person in one frame. | |
| Returns a structure similar to your current runtime response. | |
| """ | |
| start_posture = time.time() | |
| initial_state, initial_score_text = self._initial_state_checks(keypoints, xyxy) | |
| if initial_state in {STATE_OUT_OF_FRAME, STATE_BACKSIDE}: | |
| posture_state = initial_state | |
| posture_score_text = initial_score_text | |
| else: | |
| posture_state, posture_score_text = self._run_posture_stage(keypoints) | |
| posture_time = time.time() - start_posture | |
| phone_detected = False | |
| announced_face_frame = None | |
| face_xyxy = None | |
| phone_time = 0.0 | |
| if posture_state == STATE_SUSPICIOUS: | |
| start_phone = time.time() | |
| phone_detected, announced_face_frame = self._run_phone_stage( | |
| frame=frame, | |
| original_frame=original_frame, | |
| keypoints=keypoints, | |
| xyxy=xyxy, | |
| ) | |
| phone_time = time.time() - start_phone | |
| fusion_result = self.fusion.fuse( | |
| base_state=posture_state, | |
| posture_score_text=posture_score_text, | |
| phone_detected=phone_detected, | |
| ) | |
| render_detection_rectangle( | |
| frame=frame, | |
| text=f"{fusion_result['display_text']} {fusion_result['score_text']}".strip(), | |
| xyxy=xyxy, | |
| color=fusion_result["display_color"], | |
| ) | |
| # Optional face crop announce, similar to your old runtime logic | |
| if fusion_result["final_label"] == "distracted": | |
| face_center = keypoints[self.mmpose_config.keypoints.face_center_index][:2] | |
| bbox_width = abs(xyxy[2] - xyxy[0]) | |
| left_ear_x = keypoints[self.mmpose_config.keypoints.left_ear_index][0] | |
| right_ear_x = keypoints[self.mmpose_config.keypoints.right_ear_index][0] | |
| face_len = max( | |
| abs(int((right_ear_x - left_ear_x) * 1.1)), int(0.3 * bbox_width) | |
| ) | |
| face_frame, face_xyxy = crop_frame( | |
| original_frame, face_center, (face_len, face_len) | |
| ) | |
| if face_frame is not None and face_xyxy is not None: | |
| last_announce_time = runtime_parameters.get( | |
| "time_last_announce_face", 0.0 | |
| ) | |
| current_frame_time = runtime_parameters.get( | |
| "time_last_record_framerate", time.time() | |
| ) | |
| if ( | |
| current_frame_time - last_announce_time | |
| > self.inference_config.phone.face_announce_interval_seconds | |
| ): | |
| announced_face_frame = face_frame | |
| render_detection_rectangle( | |
| frame=frame, | |
| text="Face", | |
| xyxy=face_xyxy, | |
| color="white", | |
| ) | |
| result = { | |
| "performance": (posture_time, phone_time), | |
| "announced_face_frame": announced_face_frame, | |
| "posture": fusion_result["final_label"], | |
| "phone": phone_detected, | |
| "state": fusion_result["state"], | |
| "display_text": fusion_result["display_text"], | |
| "score_text": fusion_result["score_text"], | |
| # "face_xyxy": face_xyxy.tolist() if face_xyxy is not None else None, | |
| "face_xyxy": ( | |
| face_xyxy.tolist() | |
| if hasattr(face_xyxy, "tolist") | |
| else face_xyxy if face_xyxy is not None else None | |
| ), | |
| } | |
| self.logger.info("Runtime one-person result: %s", result) | |
| return result | |