File size: 6,005 Bytes
89a012c
 
 
 
cfbaa51
89a012c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import cv2
import numpy as np
import os

from CPR_Module.Common.logging_config import cpr_logger

class WarningsOverlayer:
    def __init__(self):
        # Single drawer configuration
        self.DRAWER_CONFIG = {
            "base_position": (0.05, 0.15),  # 5% from left, 15% from top
            "vertical_spacing": 0.06  # 6% of frame height between warnings
        }

        # Warning config (colors only)
        self.WARNING_CONFIG = {
            # Posture Warnings
            "Right arm bent!": {"color": (52, 110, 235)},
            "Left arm bent!": {"color": (52, 110, 235)},
            "Left hand not on chest!": {"color": (161, 127, 18)},
            "Right hand not on chest!": {"color": (161, 127, 18)},
            "Both hands not on chest!": {"color": (161, 127, 18)},
            
            # Rate/Depth Warnings
            "Depth too low!": {"color": (125, 52, 235)},
            "Depth too high!": {"color": (125, 52, 235)},
            "Rate too slow!": {"color": (235, 52, 214)},
            "Rate too fast!": {"color": (235, 52, 214)}
        }
    
    def add_warnings_to_processed_video(self, video_output_path, sampling_interval_frames, rate_and_depth_warnings, posture_warnings):
        """Process both warning types with identical handling"""
        cpr_logger.info("\n[POST-PROCESS] Starting warning overlay")
        
        # Read processed video with original parameters
        cap = cv2.VideoCapture(video_output_path)
        if not cap.isOpened():
            cpr_logger.info("[ERROR] Failed to open processed video")
            return

        # Get original video properties
        original_fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
        processed_fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        # Create final writer with ORIGINAL codec and parameters
        base = os.path.splitext(video_output_path)[0]
        final_path = os.path.abspath(f"{base}_final.mp4")
        writer = cv2.VideoWriter(final_path, original_fourcc, processed_fps, (width, height))

        # Combine all warnings into unified list
        all_warnings = []
        
        for entry in posture_warnings:
            if warnings := entry.get('posture_warnings'):
                start = entry['start_frame'] // sampling_interval_frames
                end = entry['end_frame'] // sampling_interval_frames
                all_warnings.append((int(start), int(end), warnings))
        
        for entry in rate_and_depth_warnings:
            if warnings := entry.get('rate_and_depth_warnings'):
                start = entry['start_frame'] // sampling_interval_frames
                end = entry['end_frame'] // sampling_interval_frames
                all_warnings.append((int(start), int(end), warnings))

        # Screenshot tracking
        os.makedirs("screenshots", exist_ok=True)
        screenshot_counts = {}

        # Warnings to skip for screenshots
        skip_warnings = {
            "Depth too low!",
            "Depth too high!",
            "Rate too slow!",
            "Rate too fast!"
        }

        # Video processing loop
        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret: 
                break
            
            active_warnings = []
            for start, end, warnings in all_warnings:
                if start <= frame_idx <= end:
                    active_warnings.extend(warnings)
            
            # Draw and optionally screenshot active warnings
            self._draw_warnings(frame, active_warnings)

            for warning_text in set(active_warnings):
                if warning_text in skip_warnings:
                    continue
                count = screenshot_counts.get(warning_text, 0)
                if count < 2:
                    screenshot_path = os.path.join("screenshots", f"{warning_text.replace(' ', '_')}_{frame_idx}.jpg")
                    cv2.imwrite(screenshot_path, frame)
                    screenshot_counts[warning_text] = count + 1

            writer.write(frame)
            frame_idx += 1
        
        cap.release()
        writer.release()
        cpr_logger.info(f"\n[POST-PROCESS] Final output saved to: {final_path}")

    def _draw_warnings(self, frame, active_warnings):
        """Draw all warnings in a single vertical drawer"""
        frame_height = frame.shape[0]
        frame_width = frame.shape[1]
        
        # Calculate starting position
        base_x = int(self.DRAWER_CONFIG["base_position"][0] * frame_width)
        current_y = int(self.DRAWER_CONFIG["base_position"][1] * frame_height)
        
        # Calculate spacing between warnings
        y_spacing = int(self.DRAWER_CONFIG["vertical_spacing"] * frame_height)

        # Draw all active warnings vertically
        for warning_text in active_warnings:
            if color := self.WARNING_CONFIG.get(warning_text, {}).get("color"):
                # Draw warning at current position
                self._draw_warning_banner(
                    frame=frame,
                    text=warning_text,
                    color=color,
                    position=(base_x, current_y))
                
                # Move down for next warning
                current_y += y_spacing
    
    def _draw_warning_banner(self, frame, text, color, position):
            """Base drawing function for warning banners"""
            (text_width, text_height), _ = cv2.getTextSize(
                text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
            
            x, y = position
            # Background rectangle
            cv2.rectangle(frame, 
                        (x - 10, y - text_height - 10),
                        (x + text_width + 10, y + 10),
                        color, -1)
            # Text
            cv2.putText(frame, text, (x, y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)