|
|
from ultralytics.utils.plotting import Annotator |
|
|
|
|
|
from CPR_Module.Common.logging_config import cpr_logger |
|
|
|
|
|
class RoleClassifier: |
|
|
"""Classify roles of rescuer and patient based on detected keypoints and bounding boxes.""" |
|
|
|
|
|
def __init__(self, proximity_thresh=0.3): |
|
|
self.proximity_thresh = proximity_thresh |
|
|
self.rescuer_id = None |
|
|
self.rescuer_processed_results = None |
|
|
self.patient_processed_results = None |
|
|
|
|
|
def _calculate_verticality_score(self, bounding_box): |
|
|
"""Calculate posture verticality score (0=horizontal, 1=vertical) using bounding box aspect ratio.""" |
|
|
try: |
|
|
x1, y1, x2, y2 = bounding_box |
|
|
width = abs(x2 - x1) |
|
|
height = abs(y2 - y1) |
|
|
|
|
|
|
|
|
if width == 0 or height == 0: |
|
|
return -1 |
|
|
|
|
|
return 1 if height > width else 0 |
|
|
|
|
|
except (TypeError, ValueError) as e: |
|
|
cpr_logger.error(f"Verticality score calculation error: {e}") |
|
|
return -1 |
|
|
|
|
|
def _calculate_bounding_box_center(self, bounding_box): |
|
|
"""Calculate the center coordinates of a bounding box.""" |
|
|
x1, y1, x2, y2 = bounding_box |
|
|
return (x1 + x2) / 2, (y1 + y2) / 2 |
|
|
|
|
|
def _calculate_distance(self, point1, point2): |
|
|
"""Calculate Euclidean distance between two points""" |
|
|
return ((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2)**0.5 |
|
|
|
|
|
def _calculate_bbox_areas(self, rescuer_bbox, patient_bbox): |
|
|
""" Calculate areas of rescuer and patient bounding boxes.""" |
|
|
def compute_area(bbox): |
|
|
if bbox is None: |
|
|
return 0 |
|
|
width = bbox[2] - bbox[0] |
|
|
height = bbox[3] - bbox[1] |
|
|
return abs(width * height) |
|
|
|
|
|
return compute_area(rescuer_bbox), compute_area(patient_bbox) |
|
|
|
|
|
def classify_roles(self, results, prev_rescuer_processed_results=None, prev_patient_processed_results=None): |
|
|
"""Classify rescuer and patient roles based on detection results.""" |
|
|
|
|
|
processed_results = [] |
|
|
|
|
|
|
|
|
threshold = None |
|
|
if prev_rescuer_processed_results and prev_patient_processed_results: |
|
|
prev_rescuer_bbox = prev_rescuer_processed_results["bounding_box"] |
|
|
prev_patient_bbox = prev_patient_processed_results["bounding_box"] |
|
|
|
|
|
rescuer_area, patient_area = self._calculate_bbox_areas(prev_rescuer_bbox, prev_patient_bbox) |
|
|
threshold = rescuer_area + patient_area |
|
|
|
|
|
for i, (box, keypoints) in enumerate(zip(results.boxes.xywh.cpu().numpy(), results.keypoints.xy.cpu().numpy())): |
|
|
try: |
|
|
|
|
|
x_center, y_center, width, height = box |
|
|
bounding_box = [ |
|
|
x_center - width/2, |
|
|
y_center - height/2, |
|
|
x_center + width/2, |
|
|
y_center + height/2 |
|
|
] |
|
|
|
|
|
|
|
|
if threshold: |
|
|
box_area = width * height |
|
|
if box_area > threshold * 1.2: |
|
|
cpr_logger.info(f"Filtered oversized box {i} (area: {box_area:.1f} > threshold: {threshold:.1f})") |
|
|
continue |
|
|
|
|
|
|
|
|
verticality_score = self._calculate_verticality_score(bounding_box) |
|
|
|
|
|
|
|
|
bounding_box_center = self._calculate_bounding_box_center(bounding_box) |
|
|
|
|
|
|
|
|
processed_results.append({ |
|
|
'original_index': i, |
|
|
'bounding_box': bounding_box, |
|
|
'bounding_box_center': bounding_box_center, |
|
|
'verticality_score': verticality_score, |
|
|
'keypoints': keypoints, |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
cpr_logger.error(f"Error processing detection {i}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
patient_candidates = [res for res in processed_results if res['verticality_score'] == 0] |
|
|
|
|
|
|
|
|
if len(patient_candidates) > 1: |
|
|
patient_candidates = sorted(patient_candidates, key=lambda x: x['bounding_box_center'][1])[:1] |
|
|
|
|
|
patient = patient_candidates[0] if patient_candidates else None |
|
|
|
|
|
|
|
|
rescuer = None |
|
|
if patient: |
|
|
|
|
|
potential_rescuers = [ |
|
|
res for res in processed_results |
|
|
if res['verticality_score'] == 1 |
|
|
|
|
|
|
|
|
and res['original_index'] != patient['original_index'] |
|
|
] |
|
|
|
|
|
if potential_rescuers: |
|
|
|
|
|
rescuer = min(potential_rescuers, |
|
|
key=lambda x: self._calculate_distance( |
|
|
x['bounding_box_center'], |
|
|
patient['bounding_box_center'])) |
|
|
|
|
|
return rescuer, patient |
|
|
|
|
|
def draw_rescuer_and_patient(self, frame): |
|
|
|
|
|
annotator = Annotator(frame) |
|
|
|
|
|
|
|
|
if self.rescuer_processed_results: |
|
|
try: |
|
|
x1, y1, x2, y2 = map(int, self.rescuer_processed_results["bounding_box"]) |
|
|
annotator.box_label((x1, y1, x2, y2), "Rescuer", color=(0, 255, 0)) |
|
|
|
|
|
if "keypoints" in self.rescuer_processed_results: |
|
|
keypoints = self.rescuer_processed_results["keypoints"] |
|
|
annotator.kpts(keypoints, shape=frame.shape[:2]) |
|
|
except Exception as e: |
|
|
cpr_logger.error(f"Error drawing rescuer: {str(e)}") |
|
|
|
|
|
|
|
|
if self.patient_processed_results: |
|
|
try: |
|
|
x1, y1, x2, y2 = map(int, self.patient_processed_results["bounding_box"]) |
|
|
annotator.box_label((x1, y1, x2, y2), "Patient", color=(0, 0, 255)) |
|
|
|
|
|
if "keypoints" in self.patient_processed_results: |
|
|
keypoints = self.patient_processed_results["keypoints"] |
|
|
annotator.kpts(keypoints, shape=frame.shape[:2]) |
|
|
except Exception as e: |
|
|
cpr_logger.error(f"Error drawing patient: {str(e)}") |
|
|
|
|
|
return annotator.result() |
|
|
|