FatemaKotb's picture
CPR Partial Code Cleaning
4cdc77a
from ultralytics.utils.plotting import Annotator
from CPR_Module.Common.logging_config import cpr_logger
class RoleClassifier:
"""Classify roles of rescuer and patient based on detected keypoints and bounding boxes."""
def __init__(self, proximity_thresh=0.3):
self.proximity_thresh = proximity_thresh
self.rescuer_id = None
self.rescuer_processed_results = None
self.patient_processed_results = None
def _calculate_verticality_score(self, bounding_box):
"""Calculate posture verticality score (0=horizontal, 1=vertical) using bounding box aspect ratio."""
try:
x1, y1, x2, y2 = bounding_box
width = abs(x2 - x1)
height = abs(y2 - y1)
# Handle edge cases with invalid dimensions
if width == 0 or height == 0:
return -1
return 1 if height > width else 0
except (TypeError, ValueError) as e:
cpr_logger.error(f"Verticality score calculation error: {e}")
return -1
def _calculate_bounding_box_center(self, bounding_box):
"""Calculate the center coordinates of a bounding box."""
x1, y1, x2, y2 = bounding_box
return (x1 + x2) / 2, (y1 + y2) / 2
def _calculate_distance(self, point1, point2):
"""Calculate Euclidean distance between two points"""
return ((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2)**0.5
def _calculate_bbox_areas(self, rescuer_bbox, patient_bbox):
""" Calculate areas of rescuer and patient bounding boxes."""
def compute_area(bbox):
if bbox is None:
return 0
width = bbox[2] - bbox[0] # x2 - x1
height = bbox[3] - bbox[1] # y2 - y1
return abs(width * height) # Absolute value to handle negative coordinates
return compute_area(rescuer_bbox), compute_area(patient_bbox)
def classify_roles(self, results, prev_rescuer_processed_results=None, prev_patient_processed_results=None):
"""Classify rescuer and patient roles based on detection results."""
processed_results = []
# Calculate combined area threshold if previous boxes exist
threshold = None
if prev_rescuer_processed_results and prev_patient_processed_results:
prev_rescuer_bbox = prev_rescuer_processed_results["bounding_box"]
prev_patient_bbox = prev_patient_processed_results["bounding_box"]
rescuer_area, patient_area = self._calculate_bbox_areas(prev_rescuer_bbox, prev_patient_bbox)
threshold = rescuer_area + patient_area
for i, (box, keypoints) in enumerate(zip(results.boxes.xywh.cpu().numpy(), results.keypoints.xy.cpu().numpy())):
try:
# Convert box to [x1,y1,x2,y2] format
x_center, y_center, width, height = box
bounding_box = [
x_center - width/2, # x1
y_center - height/2, # y1
x_center + width/2, # x2
y_center + height/2 # y2
]
# Skip if box exceeds area threshold (when threshold exists)
if threshold:
box_area = width * height
if box_area > threshold * 1.2: # 20% tolerance
cpr_logger.info(f"Filtered oversized box {i} (area: {box_area:.1f} > threshold: {threshold:.1f})")
continue
# Calculate features
verticality_score = self._calculate_verticality_score(bounding_box)
#!We already have the center coordinates from the bounding box, no need to recalculate it.
bounding_box_center = self._calculate_bounding_box_center(bounding_box)
# Store valid results
processed_results.append({
'original_index': i,
'bounding_box': bounding_box,
'bounding_box_center': bounding_box_center,
'verticality_score': verticality_score,
'keypoints': keypoints,
})
except Exception as e:
cpr_logger.error(f"Error processing detection {i}: {e}")
continue
# Identify the patient (horizontal posture)
patient_candidates = [res for res in processed_results if res['verticality_score'] == 0]
# If more than one horizontal person, select person with lowest center (likely lying down)
if len(patient_candidates) > 1:
patient_candidates = sorted(patient_candidates, key=lambda x: x['bounding_box_center'][1])[:1] # Sort by y-coordinate
patient = patient_candidates[0] if patient_candidates else None
# Identify the rescuer
rescuer = None
if patient:
# Find vertical people who aren't the patient
potential_rescuers = [
res for res in processed_results
if res['verticality_score'] == 1
#! Useless condition because the patient was horizontal
and res['original_index'] != patient['original_index']
]
if potential_rescuers:
# Select rescuer closest to patient
rescuer = min(potential_rescuers,
key=lambda x: self._calculate_distance(
x['bounding_box_center'],
patient['bounding_box_center']))
return rescuer, patient
def draw_rescuer_and_patient(self, frame):
# Create annotator object
annotator = Annotator(frame)
# Draw rescuer
if self.rescuer_processed_results:
try:
x1, y1, x2, y2 = map(int, self.rescuer_processed_results["bounding_box"])
annotator.box_label((x1, y1, x2, y2), "Rescuer", color=(0, 255, 0))
if "keypoints" in self.rescuer_processed_results:
keypoints = self.rescuer_processed_results["keypoints"]
annotator.kpts(keypoints, shape=frame.shape[:2])
except Exception as e:
cpr_logger.error(f"Error drawing rescuer: {str(e)}")
# Draw patient
if self.patient_processed_results:
try:
x1, y1, x2, y2 = map(int, self.patient_processed_results["bounding_box"])
annotator.box_label((x1, y1, x2, y2), "Patient", color=(0, 0, 255))
if "keypoints" in self.patient_processed_results:
keypoints = self.patient_processed_results["keypoints"]
annotator.kpts(keypoints, shape=frame.shape[:2])
except Exception as e:
cpr_logger.error(f"Error drawing patient: {str(e)}")
return annotator.result()