File size: 7,139 Bytes
cfbaa51 cca6a52 cfbaa51 1ca9b28 cca6a52 4cdc77a cca6a52 1ca9b28 cca6a52 1ca9b28 cca6a52 1ca9b28 4cdc77a cca6a52 1ca9b28 89a012c 1ca9b28 4cdc77a 1ca9b28 cca6a52 1ca9b28 4cdc77a 1ca9b28 4cdc77a 1ca9b28 4cdc77a cca6a52 1ca9b28 cca6a52 1ca9b28 4cdc77a 1ca9b28 4cdc77a 1ca9b28 89a012c 1ca9b28 4cdc77a 89a012c 1ca9b28 cca6a52 1ca9b28 89a012c 1ca9b28 4cdc77a 1ca9b28 4cdc77a 1ca9b28 4cdc77a 1ca9b28 4cdc77a 89a012c 1ca9b28 cca6a52 1ca9b28 4cdc77a 1ca9b28 4cdc77a 1ca9b28 89a012c cca6a52 4cdc77a 1ca9b28 4cdc77a 1ca9b28 89a012c cca6a52 1ca9b28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
from ultralytics.utils.plotting import Annotator
from CPR_Module.Common.logging_config import cpr_logger
class RoleClassifier:
"""Classify roles of rescuer and patient based on detected keypoints and bounding boxes."""
def __init__(self, proximity_thresh=0.3):
self.proximity_thresh = proximity_thresh
self.rescuer_id = None
self.rescuer_processed_results = None
self.patient_processed_results = None
def _calculate_verticality_score(self, bounding_box):
"""Calculate posture verticality score (0=horizontal, 1=vertical) using bounding box aspect ratio."""
try:
x1, y1, x2, y2 = bounding_box
width = abs(x2 - x1)
height = abs(y2 - y1)
# Handle edge cases with invalid dimensions
if width == 0 or height == 0:
return -1
return 1 if height > width else 0
except (TypeError, ValueError) as e:
cpr_logger.error(f"Verticality score calculation error: {e}")
return -1
def _calculate_bounding_box_center(self, bounding_box):
"""Calculate the center coordinates of a bounding box."""
x1, y1, x2, y2 = bounding_box
return (x1 + x2) / 2, (y1 + y2) / 2
def _calculate_distance(self, point1, point2):
"""Calculate Euclidean distance between two points"""
return ((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2)**0.5
def _calculate_bbox_areas(self, rescuer_bbox, patient_bbox):
""" Calculate areas of rescuer and patient bounding boxes."""
def compute_area(bbox):
if bbox is None:
return 0
width = bbox[2] - bbox[0] # x2 - x1
height = bbox[3] - bbox[1] # y2 - y1
return abs(width * height) # Absolute value to handle negative coordinates
return compute_area(rescuer_bbox), compute_area(patient_bbox)
def classify_roles(self, results, prev_rescuer_processed_results=None, prev_patient_processed_results=None):
"""Classify rescuer and patient roles based on detection results."""
processed_results = []
# Calculate combined area threshold if previous boxes exist
threshold = None
if prev_rescuer_processed_results and prev_patient_processed_results:
prev_rescuer_bbox = prev_rescuer_processed_results["bounding_box"]
prev_patient_bbox = prev_patient_processed_results["bounding_box"]
rescuer_area, patient_area = self._calculate_bbox_areas(prev_rescuer_bbox, prev_patient_bbox)
threshold = rescuer_area + patient_area
for i, (box, keypoints) in enumerate(zip(results.boxes.xywh.cpu().numpy(), results.keypoints.xy.cpu().numpy())):
try:
# Convert box to [x1,y1,x2,y2] format
x_center, y_center, width, height = box
bounding_box = [
x_center - width/2, # x1
y_center - height/2, # y1
x_center + width/2, # x2
y_center + height/2 # y2
]
# Skip if box exceeds area threshold (when threshold exists)
if threshold:
box_area = width * height
if box_area > threshold * 1.2: # 20% tolerance
cpr_logger.info(f"Filtered oversized box {i} (area: {box_area:.1f} > threshold: {threshold:.1f})")
continue
# Calculate features
verticality_score = self._calculate_verticality_score(bounding_box)
#!We already have the center coordinates from the bounding box, no need to recalculate it.
bounding_box_center = self._calculate_bounding_box_center(bounding_box)
# Store valid results
processed_results.append({
'original_index': i,
'bounding_box': bounding_box,
'bounding_box_center': bounding_box_center,
'verticality_score': verticality_score,
'keypoints': keypoints,
})
except Exception as e:
cpr_logger.error(f"Error processing detection {i}: {e}")
continue
# Identify the patient (horizontal posture)
patient_candidates = [res for res in processed_results if res['verticality_score'] == 0]
# If more than one horizontal person, select person with lowest center (likely lying down)
if len(patient_candidates) > 1:
patient_candidates = sorted(patient_candidates, key=lambda x: x['bounding_box_center'][1])[:1] # Sort by y-coordinate
patient = patient_candidates[0] if patient_candidates else None
# Identify the rescuer
rescuer = None
if patient:
# Find vertical people who aren't the patient
potential_rescuers = [
res for res in processed_results
if res['verticality_score'] == 1
#! Useless condition because the patient was horizontal
and res['original_index'] != patient['original_index']
]
if potential_rescuers:
# Select rescuer closest to patient
rescuer = min(potential_rescuers,
key=lambda x: self._calculate_distance(
x['bounding_box_center'],
patient['bounding_box_center']))
return rescuer, patient
def draw_rescuer_and_patient(self, frame):
# Create annotator object
annotator = Annotator(frame)
# Draw rescuer
if self.rescuer_processed_results:
try:
x1, y1, x2, y2 = map(int, self.rescuer_processed_results["bounding_box"])
annotator.box_label((x1, y1, x2, y2), "Rescuer", color=(0, 255, 0))
if "keypoints" in self.rescuer_processed_results:
keypoints = self.rescuer_processed_results["keypoints"]
annotator.kpts(keypoints, shape=frame.shape[:2])
except Exception as e:
cpr_logger.error(f"Error drawing rescuer: {str(e)}")
# Draw patient
if self.patient_processed_results:
try:
x1, y1, x2, y2 = map(int, self.patient_processed_results["bounding_box"])
annotator.box_label((x1, y1, x2, y2), "Patient", color=(0, 0, 255))
if "keypoints" in self.patient_processed_results:
keypoints = self.patient_processed_results["keypoints"]
annotator.kpts(keypoints, shape=frame.shape[:2])
except Exception as e:
cpr_logger.error(f"Error drawing patient: {str(e)}")
return annotator.result()
|