Spaces:
Sleeping
Sleeping
| import cv2 | |
| import math | |
| import numpy as np | |
| from scipy import signal | |
| from scipy.signal import find_peaks | |
| import matplotlib.pyplot as plt | |
| import io | |
| import base64 | |
| import gradio as gr | |
| import mediapipe as mp | |
| import os | |
| # Initialize MediaPipe Face Detection | |
| mp_face_detection = mp.solutions.face_detection | |
| def convert_video_if_needed(video_path): | |
| """Convert video to mp4 if it's in avi format""" | |
| if video_path.lower().endswith('.avi'): | |
| output_path = video_path.rsplit('.', 1)[0] + '.mp4' | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, 30.0, | |
| (int(cap.get(3)), int(cap.get(4)))) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| return output_path | |
| except Exception as e: | |
| print(f"Error converting video: {e}") | |
| return video_path | |
| return video_path | |
| def process_video(video_path): | |
| """Process video to calculate respiration rate""" | |
| cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) | |
| if not cap.isOpened(): | |
| raise ValueError("Error opening video file. Please check the format or path.") | |
| points = (0, 0, 0, 0) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if not fps or fps == 0: | |
| fps = 30.0 # Default FPS | |
| num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if not num_frames or num_frames <= 0: | |
| num_frames = 1 # Avoid division by zero | |
| # Detect face for the first frame | |
| with mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5) as face_detection: | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = face_detection.process(rgb_frame) | |
| if results.detections: | |
| for detection in results.detections: | |
| bboxC = detection.location_data.relative_bounding_box | |
| ih, iw, _ = frame.shape | |
| x1, y1, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \ | |
| int(bboxC.width * iw), int(bboxC.height * ih) | |
| x3 = math.ceil(x1 - (0.75 * w)) | |
| x4 = math.ceil(x1 + w + (0.75 * w)) | |
| y3 = math.ceil(y1 + h + (0.3 * h)) | |
| y4 = math.ceil(y3 + h) | |
| w = x4 - x3 | |
| h = y4 - y3 | |
| points = (x3, y3, w, h) | |
| break | |
| # Process frames to get intensity values | |
| val_list = [] | |
| x, y, w, h = points | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| cropped_frame = frame[y:y + h, x:x + w] | |
| int_frame = cv2.integral(cropped_frame) | |
| val_list.append(int_frame[-1][-1]) | |
| cap.release() | |
| # Apply Band-Pass-Filter | |
| data = val_list | |
| lowcut = 0.16 | |
| highcut = 0.5 | |
| fs = 30 # Sampling frequency | |
| nyq = 0.5 * fs | |
| low = lowcut / nyq | |
| high = highcut / nyq | |
| order = 5 | |
| b, a = signal.butter(order, [low, high], btype='band') | |
| filtered = signal.filtfilt(b, a, data) | |
| # Find peaks and calculate metrics | |
| peaks, _ = find_peaks(filtered) | |
| time_length = num_frames / fps if fps > 0 else 0.0 | |
| if time_length == 0.0: | |
| raise ValueError("Unable to determine video duration.") | |
| respiration_times = peaks / fps | |
| rr = math.ceil((len(peaks) / time_length) * 60) | |
| # Create plot | |
| plt.figure(figsize=(10, 6)) | |
| plt.plot(filtered, color='orange', label='Filtered Signal') | |
| plt.plot(peaks, filtered[peaks], "x", color='red', label='Peaks') | |
| plt.title('Respiratory Signal Analysis', color='darkorange') | |
| plt.xlabel('Frames', color='darkorange') | |
| plt.ylabel('Intensity', color='darkorange') | |
| plt.grid(True, color='lightcoral', alpha=0.3) | |
| plt.legend() | |
| # Save plot to BytesIO | |
| plot_output = io.BytesIO() | |
| plt.savefig(plot_output, format='png', facecolor='#001f3f', edgecolor='none') | |
| plt.close() | |
| plot_output.seek(0) | |
| plot_img = plot_output.getvalue() | |
| return { | |
| "fps": f"{fps:.2f}", | |
| "frames": str(num_frames), | |
| "respiration_times": respiration_times.tolist(), | |
| "duration": f"{time_length:.2f} seconds", | |
| "respiration_rate": f"{rr} breaths/minute", | |
| "plot": plot_img | |
| } | |
| def process_input(video_path): | |
| """Handle input video processing with error handling""" | |
| try: | |
| if video_path is None: | |
| return ["Please upload a video file."] * 5 + [None] | |
| if not os.path.exists(video_path): | |
| return ["Video file not found."] * 5 + [None] | |
| # Convert video if needed | |
| video_path = convert_video_if_needed(video_path) | |
| result = process_video(video_path) | |
| return [ | |
| result["fps"], | |
| result["frames"], | |
| result["respiration_times"], | |
| result["duration"], | |
| result["respiration_rate"], | |
| result["plot"] | |
| ] | |
| except Exception as e: | |
| error_msg = f"Error processing video: {str(e)}" | |
| return [error_msg] * 5 + [None] | |
| # Create Gradio interface | |
| interface = gr.Interface( | |
| fn=process_input, | |
| inputs=gr.Video( | |
| label="Upload or Stream Video", | |
| format=["avi", "mp4", "webm", "mov"] | |
| ), | |
| outputs=[ | |
| gr.Label(label="Frames per second"), | |
| gr.Label(label="Total number of frames"), | |
| gr.JSON(label="Respiration Times"), | |
| gr.Label(label="Time Length"), | |
| gr.Label(label="Respiration Rate (RR)"), | |
| gr.Image(label="Filtered Peaks Plot") | |
| ], | |
| title="Respiration Rate Analyzer", | |
| description="🫁 Analyze your respiration rate with ease!", | |
| theme="default", | |
| css=""" | |
| body { | |
| background-color: #001f3f; | |
| color: #f0f8ff; | |
| } | |
| .output { | |
| font-size: 16px; | |
| color: #f0f8ff; | |
| } | |
| .label { | |
| font-weight: bold; | |
| color: #0074D9; | |
| } | |
| .plot { | |
| border: 2px solid #0074D9; | |
| border-radius: 10px; | |
| } | |
| .gradio-container { | |
| background-color: #002b57; | |
| } | |
| .output-markdown p { | |
| color: #f0f8ff !important; | |
| } | |
| """ | |
| ) | |
| interface.launch() | |