File size: 7,139 Bytes
cfbaa51
cca6a52
cfbaa51
1ca9b28
cca6a52
4cdc77a
cca6a52
 
 
 
1ca9b28
 
cca6a52
1ca9b28
 
cca6a52
1ca9b28
 
 
 
 
 
 
 
4cdc77a
cca6a52
1ca9b28
89a012c
1ca9b28
 
 
4cdc77a
1ca9b28
 
cca6a52
1ca9b28
 
 
 
 
4cdc77a
1ca9b28
 
 
4cdc77a
1ca9b28
 
 
 
 
 
4cdc77a
cca6a52
1ca9b28
cca6a52
1ca9b28
 
 
 
 
 
4cdc77a
1ca9b28
 
4cdc77a
1ca9b28
 
 
 
 
 
 
 
 
 
 
 
 
 
89a012c
1ca9b28
 
 
 
4cdc77a
89a012c
1ca9b28
 
 
 
 
 
 
 
 
 
cca6a52
1ca9b28
89a012c
1ca9b28
 
4cdc77a
 
1ca9b28
 
 
4cdc77a
1ca9b28
 
 
4cdc77a
1ca9b28
 
 
 
 
 
4cdc77a
89a012c
1ca9b28
 
cca6a52
1ca9b28
 
 
 
 
 
 
 
 
 
 
 
 
4cdc77a
1ca9b28
 
 
4cdc77a
1ca9b28
 
 
 
 
89a012c
cca6a52
4cdc77a
1ca9b28
 
 
4cdc77a
1ca9b28
 
 
 
 
89a012c
cca6a52
1ca9b28
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from ultralytics.utils.plotting import Annotator

from CPR_Module.Common.logging_config import cpr_logger

class RoleClassifier:
    """Classify roles of rescuer and patient based on detected keypoints and bounding boxes."""
    
    def __init__(self, proximity_thresh=0.3):
        self.proximity_thresh = proximity_thresh
        self.rescuer_id = None
        self.rescuer_processed_results = None
        self.patient_processed_results = None

    def _calculate_verticality_score(self, bounding_box):
        """Calculate posture verticality score (0=horizontal, 1=vertical) using bounding box aspect ratio."""
        try:
            x1, y1, x2, y2 = bounding_box
            width = abs(x2 - x1)
            height = abs(y2 - y1)
            
            # Handle edge cases with invalid dimensions
            if width == 0 or height == 0:
                return -1
            
            return 1 if height > width else 0
            
        except (TypeError, ValueError) as e:
            cpr_logger.error(f"Verticality score calculation error: {e}")
            return -1
        
    def _calculate_bounding_box_center(self, bounding_box):
        """Calculate the center coordinates of a bounding box."""
        x1, y1, x2, y2 = bounding_box
        return (x1 + x2) / 2, (y1 + y2) / 2

    def _calculate_distance(self, point1, point2):
        """Calculate Euclidean distance between two points"""
        return ((point1[0]-point2[0])**2 + (point1[1]-point2[1])**2)**0.5
 
    def _calculate_bbox_areas(self, rescuer_bbox, patient_bbox):
        """ Calculate areas of rescuer and patient bounding boxes."""
        def compute_area(bbox):
            if bbox is None:
                return 0
            width = bbox[2] - bbox[0]   # x2 - x1
            height = bbox[3] - bbox[1]  # y2 - y1
            return abs(width * height)  # Absolute value to handle negative coordinates
        
        return compute_area(rescuer_bbox), compute_area(patient_bbox)
    
    def classify_roles(self, results, prev_rescuer_processed_results=None, prev_patient_processed_results=None):
        """Classify rescuer and patient roles based on detection results."""

        processed_results = []
        
        # Calculate combined area threshold if previous boxes exist
        threshold = None
        if prev_rescuer_processed_results and prev_patient_processed_results:
            prev_rescuer_bbox = prev_rescuer_processed_results["bounding_box"]
            prev_patient_bbox = prev_patient_processed_results["bounding_box"]

            rescuer_area, patient_area = self._calculate_bbox_areas(prev_rescuer_bbox, prev_patient_bbox)
            threshold = rescuer_area + patient_area
        
        for i, (box, keypoints) in enumerate(zip(results.boxes.xywh.cpu().numpy(), results.keypoints.xy.cpu().numpy())):
            try:
                # Convert box to [x1,y1,x2,y2] format
                x_center, y_center, width, height = box
                bounding_box = [
                    x_center - width/2,  # x1
                    y_center - height/2,  # y1 
                    x_center + width/2,   # x2
                    y_center + height/2   # y2
                ]
                
                # Skip if box exceeds area threshold (when threshold exists)
                if threshold:
                    box_area = width * height
                    if box_area > threshold * 1.2:  # 20% tolerance
                        cpr_logger.info(f"Filtered oversized box {i} (area: {box_area:.1f} > threshold: {threshold:.1f})")
                        continue
                
                # Calculate features
                verticality_score = self._calculate_verticality_score(bounding_box)

                #!We already have the center coordinates from the bounding box, no need to recalculate it.
                bounding_box_center = self._calculate_bounding_box_center(bounding_box)
                
                # Store valid results
                processed_results.append({
                    'original_index': i,
                    'bounding_box': bounding_box,
                    'bounding_box_center': bounding_box_center,
                    'verticality_score': verticality_score,
                    'keypoints': keypoints,
                })
            
            except Exception as e:
                cpr_logger.error(f"Error processing detection {i}: {e}")
                continue

        # Identify the patient (horizontal posture)
        patient_candidates = [res for res in processed_results if res['verticality_score'] == 0]
        
        # If more than one horizontal person, select person with lowest center (likely lying down)
        if len(patient_candidates) > 1:
            patient_candidates = sorted(patient_candidates, key=lambda x: x['bounding_box_center'][1])[:1]  # Sort by y-coordinate
        
        patient = patient_candidates[0] if patient_candidates else None
        
        # Identify the rescuer
        rescuer = None
        if patient:
            # Find vertical people who aren't the patient
            potential_rescuers = [
                res for res in processed_results 
                if res['verticality_score'] == 1 

                #! Useless condition because the patient was horizontal
                and res['original_index'] != patient['original_index']
            ]
            
            if potential_rescuers:
                # Select rescuer closest to patient
                rescuer = min(potential_rescuers,
                            key=lambda x: self._calculate_distance(
                                x['bounding_box_center'], 
                                patient['bounding_box_center']))
        
        return rescuer, patient
    
    def draw_rescuer_and_patient(self, frame):        
        # Create annotator object
        annotator = Annotator(frame)
        
        # Draw rescuer
        if self.rescuer_processed_results:
            try:
                x1, y1, x2, y2 = map(int, self.rescuer_processed_results["bounding_box"])
                annotator.box_label((x1, y1, x2, y2), "Rescuer", color=(0, 255, 0))
                
                if "keypoints" in self.rescuer_processed_results:
                    keypoints = self.rescuer_processed_results["keypoints"]
                    annotator.kpts(keypoints, shape=frame.shape[:2])
            except Exception as e:
                cpr_logger.error(f"Error drawing rescuer: {str(e)}")

        # Draw patient
        if self.patient_processed_results:
            try:
                x1, y1, x2, y2 = map(int, self.patient_processed_results["bounding_box"])
                annotator.box_label((x1, y1, x2, y2), "Patient", color=(0, 0, 255))
                
                if "keypoints" in self.patient_processed_results:
                    keypoints = self.patient_processed_results["keypoints"]
                    annotator.kpts(keypoints, shape=frame.shape[:2])
            except Exception as e:
                cpr_logger.error(f"Error drawing patient: {str(e)}")

        return annotator.result()