File size: 6,005 Bytes
89a012c cfbaa51 89a012c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import cv2
import numpy as np
import os
from CPR_Module.Common.logging_config import cpr_logger
class WarningsOverlayer:
def __init__(self):
# Single drawer configuration
self.DRAWER_CONFIG = {
"base_position": (0.05, 0.15), # 5% from left, 15% from top
"vertical_spacing": 0.06 # 6% of frame height between warnings
}
# Warning config (colors only)
self.WARNING_CONFIG = {
# Posture Warnings
"Right arm bent!": {"color": (52, 110, 235)},
"Left arm bent!": {"color": (52, 110, 235)},
"Left hand not on chest!": {"color": (161, 127, 18)},
"Right hand not on chest!": {"color": (161, 127, 18)},
"Both hands not on chest!": {"color": (161, 127, 18)},
# Rate/Depth Warnings
"Depth too low!": {"color": (125, 52, 235)},
"Depth too high!": {"color": (125, 52, 235)},
"Rate too slow!": {"color": (235, 52, 214)},
"Rate too fast!": {"color": (235, 52, 214)}
}
def add_warnings_to_processed_video(self, video_output_path, sampling_interval_frames, rate_and_depth_warnings, posture_warnings):
"""Process both warning types with identical handling"""
cpr_logger.info("\n[POST-PROCESS] Starting warning overlay")
# Read processed video with original parameters
cap = cv2.VideoCapture(video_output_path)
if not cap.isOpened():
cpr_logger.info("[ERROR] Failed to open processed video")
return
# Get original video properties
original_fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
processed_fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create final writer with ORIGINAL codec and parameters
base = os.path.splitext(video_output_path)[0]
final_path = os.path.abspath(f"{base}_final.mp4")
writer = cv2.VideoWriter(final_path, original_fourcc, processed_fps, (width, height))
# Combine all warnings into unified list
all_warnings = []
for entry in posture_warnings:
if warnings := entry.get('posture_warnings'):
start = entry['start_frame'] // sampling_interval_frames
end = entry['end_frame'] // sampling_interval_frames
all_warnings.append((int(start), int(end), warnings))
for entry in rate_and_depth_warnings:
if warnings := entry.get('rate_and_depth_warnings'):
start = entry['start_frame'] // sampling_interval_frames
end = entry['end_frame'] // sampling_interval_frames
all_warnings.append((int(start), int(end), warnings))
# Screenshot tracking
os.makedirs("screenshots", exist_ok=True)
screenshot_counts = {}
# Warnings to skip for screenshots
skip_warnings = {
"Depth too low!",
"Depth too high!",
"Rate too slow!",
"Rate too fast!"
}
# Video processing loop
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
active_warnings = []
for start, end, warnings in all_warnings:
if start <= frame_idx <= end:
active_warnings.extend(warnings)
# Draw and optionally screenshot active warnings
self._draw_warnings(frame, active_warnings)
for warning_text in set(active_warnings):
if warning_text in skip_warnings:
continue
count = screenshot_counts.get(warning_text, 0)
if count < 2:
screenshot_path = os.path.join("screenshots", f"{warning_text.replace(' ', '_')}_{frame_idx}.jpg")
cv2.imwrite(screenshot_path, frame)
screenshot_counts[warning_text] = count + 1
writer.write(frame)
frame_idx += 1
cap.release()
writer.release()
cpr_logger.info(f"\n[POST-PROCESS] Final output saved to: {final_path}")
def _draw_warnings(self, frame, active_warnings):
"""Draw all warnings in a single vertical drawer"""
frame_height = frame.shape[0]
frame_width = frame.shape[1]
# Calculate starting position
base_x = int(self.DRAWER_CONFIG["base_position"][0] * frame_width)
current_y = int(self.DRAWER_CONFIG["base_position"][1] * frame_height)
# Calculate spacing between warnings
y_spacing = int(self.DRAWER_CONFIG["vertical_spacing"] * frame_height)
# Draw all active warnings vertically
for warning_text in active_warnings:
if color := self.WARNING_CONFIG.get(warning_text, {}).get("color"):
# Draw warning at current position
self._draw_warning_banner(
frame=frame,
text=warning_text,
color=color,
position=(base_x, current_y))
# Move down for next warning
current_y += y_spacing
def _draw_warning_banner(self, frame, text, color, position):
"""Base drawing function for warning banners"""
(text_width, text_height), _ = cv2.getTextSize(
text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
x, y = position
# Background rectangle
cv2.rectangle(frame,
(x - 10, y - text_height - 10),
(x + text_width + 10, y + 10),
color, -1)
# Text
cv2.putText(frame, text, (x, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
|