import cv2 import math import numpy as np from scipy import signal from scipy.signal import find_peaks import matplotlib.pyplot as plt import io import base64 import gradio as gr import mediapipe as mp import os # Initialize MediaPipe Face Detection mp_face_detection = mp.solutions.face_detection def convert_video_if_needed(video_path): """Convert video to mp4 if it's in avi format""" if video_path.lower().endswith('.avi'): output_path = video_path.rsplit('.', 1)[0] + '.mp4' try: cap = cv2.VideoCapture(video_path) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4)))) while cap.isOpened(): ret, frame = cap.read() if not ret: break out.write(frame) cap.release() out.release() return output_path except Exception as e: print(f"Error converting video: {e}") return video_path return video_path def process_video(video_path): """Process video to calculate respiration rate""" cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) if not cap.isOpened(): raise ValueError("Error opening video file. Please check the format or path.") points = (0, 0, 0, 0) fps = cap.get(cv2.CAP_PROP_FPS) if not fps or fps == 0: fps = 30.0 # Default FPS num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if not num_frames or num_frames <= 0: num_frames = 1 # Avoid division by zero # Detect face for the first frame with mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5) as face_detection: while cap.isOpened(): ret, frame = cap.read() if not ret: break rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_detection.process(rgb_frame) if results.detections: for detection in results.detections: bboxC = detection.location_data.relative_bounding_box ih, iw, _ = frame.shape x1, y1, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \ int(bboxC.width * iw), int(bboxC.height * ih) x3 = math.ceil(x1 - (0.75 * w)) x4 = math.ceil(x1 + w + (0.75 * w)) y3 = math.ceil(y1 + h + (0.3 * h)) y4 = math.ceil(y3 + h) w = x4 - x3 h = y4 - y3 points = (x3, y3, w, h) break # Process frames to get intensity values val_list = [] x, y, w, h = points while cap.isOpened(): ret, frame = cap.read() if not ret: break frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) cropped_frame = frame[y:y + h, x:x + w] int_frame = cv2.integral(cropped_frame) val_list.append(int_frame[-1][-1]) cap.release() # Apply Band-Pass-Filter data = val_list lowcut = 0.16 highcut = 0.5 fs = 30 # Sampling frequency nyq = 0.5 * fs low = lowcut / nyq high = highcut / nyq order = 5 b, a = signal.butter(order, [low, high], btype='band') filtered = signal.filtfilt(b, a, data) # Find peaks and calculate metrics peaks, _ = find_peaks(filtered) time_length = num_frames / fps if fps > 0 else 0.0 if time_length == 0.0: raise ValueError("Unable to determine video duration.") respiration_times = peaks / fps rr = math.ceil((len(peaks) / time_length) * 60) # Create plot plt.figure(figsize=(10, 6)) plt.plot(filtered, color='orange', label='Filtered Signal') plt.plot(peaks, filtered[peaks], "x", color='red', label='Peaks') plt.title('Respiratory Signal Analysis', color='darkorange') plt.xlabel('Frames', color='darkorange') plt.ylabel('Intensity', color='darkorange') plt.grid(True, color='lightcoral', alpha=0.3) plt.legend() # Save plot to BytesIO plot_output = io.BytesIO() plt.savefig(plot_output, format='png', facecolor='#001f3f', edgecolor='none') plt.close() plot_output.seek(0) plot_img = plot_output.getvalue() return { "fps": f"{fps:.2f}", "frames": str(num_frames), "respiration_times": respiration_times.tolist(), "duration": f"{time_length:.2f} seconds", "respiration_rate": f"{rr} breaths/minute", "plot": plot_img } def process_input(video_path): """Handle input video processing with error handling""" try: if video_path is None: return ["Please upload a video file."] * 5 + [None] if not os.path.exists(video_path): return ["Video file not found."] * 5 + [None] # Convert video if needed video_path = convert_video_if_needed(video_path) result = process_video(video_path) return [ result["fps"], result["frames"], result["respiration_times"], result["duration"], result["respiration_rate"], result["plot"] ] except Exception as e: error_msg = f"Error processing video: {str(e)}" return [error_msg] * 5 + [None] # Create Gradio interface interface = gr.Interface( fn=process_input, inputs=gr.Video( label="Upload or Stream Video", format=["avi", "mp4", "webm", "mov"] ), outputs=[ gr.Label(label="Frames per second"), gr.Label(label="Total number of frames"), gr.JSON(label="Respiration Times"), gr.Label(label="Time Length"), gr.Label(label="Respiration Rate (RR)"), gr.Image(label="Filtered Peaks Plot") ], title="Respiration Rate Analyzer", description="🫁 Analyze your respiration rate with ease!", theme="default", css=""" body { background-color: #001f3f; color: #f0f8ff; } .output { font-size: 16px; color: #f0f8ff; } .label { font-weight: bold; color: #0074D9; } .plot { border: 2px solid #0074D9; border-radius: 10px; } .gradio-container { background-color: #002b57; } .output-markdown p { color: #f0f8ff !important; } """ ) interface.launch()