FatemaKotb's picture
CPR Partial Code Cleaning
4cdc77a
import cv2
import numpy as np
import os
from CPR_Module.Common.logging_config import cpr_logger
class WarningsOverlayer:
def __init__(self):
# Single drawer configuration
self.DRAWER_CONFIG = {
"base_position": (0.05, 0.15), # 5% from left, 15% from top
"vertical_spacing": 0.06 # 6% of frame height between warnings
}
# Warning config (colors only)
self.WARNING_CONFIG = {
# Posture Warnings
"Right arm bent!": {"color": (52, 110, 235)},
"Left arm bent!": {"color": (52, 110, 235)},
"Left hand not on chest!": {"color": (161, 127, 18)},
"Right hand not on chest!": {"color": (161, 127, 18)},
"Both hands not on chest!": {"color": (161, 127, 18)},
# Rate/Depth Warnings
"Depth too low!": {"color": (125, 52, 235)},
"Depth too high!": {"color": (125, 52, 235)},
"Rate too slow!": {"color": (235, 52, 214)},
"Rate too fast!": {"color": (235, 52, 214)}
}
def add_warnings_to_processed_video(self, video_output_path, sampling_interval_frames, rate_and_depth_warnings, posture_warnings):
"""Process both warning types with identical handling"""
cpr_logger.info("\n[POST-PROCESS] Starting warning overlay")
# Read processed video with original parameters
cap = cv2.VideoCapture(video_output_path)
if not cap.isOpened():
cpr_logger.info("[ERROR] Failed to open processed video")
return
# Get original video properties
original_fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
processed_fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create final writer with ORIGINAL codec and parameters
base = os.path.splitext(video_output_path)[0]
final_path = os.path.abspath(f"{base}_final.mp4")
writer = cv2.VideoWriter(final_path, original_fourcc, processed_fps, (width, height))
# Combine all warnings into unified list
all_warnings = []
for entry in posture_warnings:
if warnings := entry.get('posture_warnings'):
start = entry['start_frame'] // sampling_interval_frames
end = entry['end_frame'] // sampling_interval_frames
all_warnings.append((int(start), int(end), warnings))
for entry in rate_and_depth_warnings:
if warnings := entry.get('rate_and_depth_warnings'):
start = entry['start_frame'] // sampling_interval_frames
end = entry['end_frame'] // sampling_interval_frames
all_warnings.append((int(start), int(end), warnings))
# Screenshot tracking
os.makedirs("screenshots", exist_ok=True)
screenshot_counts = {}
# Warnings to skip for screenshots
skip_warnings = {
"Depth too low!",
"Depth too high!",
"Rate too slow!",
"Rate too fast!"
}
# Video processing loop
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
active_warnings = []
for start, end, warnings in all_warnings:
if start <= frame_idx <= end:
active_warnings.extend(warnings)
# Draw and optionally screenshot active warnings
self._draw_warnings(frame, active_warnings)
for warning_text in set(active_warnings):
if warning_text in skip_warnings:
continue
count = screenshot_counts.get(warning_text, 0)
if count < 2:
screenshot_path = os.path.join("screenshots", f"{warning_text.replace(' ', '_')}_{frame_idx}.jpg")
cv2.imwrite(screenshot_path, frame)
screenshot_counts[warning_text] = count + 1
writer.write(frame)
frame_idx += 1
cap.release()
writer.release()
cpr_logger.info(f"\n[POST-PROCESS] Final output saved to: {final_path}")
def _draw_warnings(self, frame, active_warnings):
"""Draw all warnings in a single vertical drawer"""
frame_height = frame.shape[0]
frame_width = frame.shape[1]
# Calculate starting position
base_x = int(self.DRAWER_CONFIG["base_position"][0] * frame_width)
current_y = int(self.DRAWER_CONFIG["base_position"][1] * frame_height)
# Calculate spacing between warnings
y_spacing = int(self.DRAWER_CONFIG["vertical_spacing"] * frame_height)
# Draw all active warnings vertically
for warning_text in active_warnings:
if color := self.WARNING_CONFIG.get(warning_text, {}).get("color"):
# Draw warning at current position
self._draw_warning_banner(
frame=frame,
text=warning_text,
color=color,
position=(base_x, current_y))
# Move down for next warning
current_y += y_spacing
def _draw_warning_banner(self, frame, text, color, position):
"""Base drawing function for warning banners"""
(text_width, text_height), _ = cv2.getTextSize(
text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
x, y = position
# Background rectangle
cv2.rectangle(frame,
(x - 10, y - text_height - 10),
(x + text_width + 10, y + 10),
color, -1)
# Text
cv2.putText(frame, text, (x, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)