NightPrince's picture
Update app.py
86e8714 verified
import cv2
import math
import numpy as np
from scipy import signal
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import io
import base64
import gradio as gr
import mediapipe as mp
import os
# Initialize MediaPipe Face Detection
mp_face_detection = mp.solutions.face_detection
def convert_video_if_needed(video_path):
"""Convert video to mp4 if it's in avi format"""
if video_path.lower().endswith('.avi'):
output_path = video_path.rsplit('.', 1)[0] + '.mp4'
try:
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 30.0,
(int(cap.get(3)), int(cap.get(4))))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
out.release()
return output_path
except Exception as e:
print(f"Error converting video: {e}")
return video_path
return video_path
def process_video(video_path):
"""Process video to calculate respiration rate"""
cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG)
if not cap.isOpened():
raise ValueError("Error opening video file. Please check the format or path.")
points = (0, 0, 0, 0)
fps = cap.get(cv2.CAP_PROP_FPS)
if not fps or fps == 0:
fps = 30.0 # Default FPS
num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if not num_frames or num_frames <= 0:
num_frames = 1 # Avoid division by zero
# Detect face for the first frame
with mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5) as face_detection:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_detection.process(rgb_frame)
if results.detections:
for detection in results.detections:
bboxC = detection.location_data.relative_bounding_box
ih, iw, _ = frame.shape
x1, y1, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), \
int(bboxC.width * iw), int(bboxC.height * ih)
x3 = math.ceil(x1 - (0.75 * w))
x4 = math.ceil(x1 + w + (0.75 * w))
y3 = math.ceil(y1 + h + (0.3 * h))
y4 = math.ceil(y3 + h)
w = x4 - x3
h = y4 - y3
points = (x3, y3, w, h)
break
# Process frames to get intensity values
val_list = []
x, y, w, h = points
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
cropped_frame = frame[y:y + h, x:x + w]
int_frame = cv2.integral(cropped_frame)
val_list.append(int_frame[-1][-1])
cap.release()
# Apply Band-Pass-Filter
data = val_list
lowcut = 0.16
highcut = 0.5
fs = 30 # Sampling frequency
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
order = 5
b, a = signal.butter(order, [low, high], btype='band')
filtered = signal.filtfilt(b, a, data)
# Find peaks and calculate metrics
peaks, _ = find_peaks(filtered)
time_length = num_frames / fps if fps > 0 else 0.0
if time_length == 0.0:
raise ValueError("Unable to determine video duration.")
respiration_times = peaks / fps
rr = math.ceil((len(peaks) / time_length) * 60)
# Create plot
plt.figure(figsize=(10, 6))
plt.plot(filtered, color='orange', label='Filtered Signal')
plt.plot(peaks, filtered[peaks], "x", color='red', label='Peaks')
plt.title('Respiratory Signal Analysis', color='darkorange')
plt.xlabel('Frames', color='darkorange')
plt.ylabel('Intensity', color='darkorange')
plt.grid(True, color='lightcoral', alpha=0.3)
plt.legend()
# Save plot to BytesIO
plot_output = io.BytesIO()
plt.savefig(plot_output, format='png', facecolor='#001f3f', edgecolor='none')
plt.close()
plot_output.seek(0)
plot_img = plot_output.getvalue()
return {
"fps": f"{fps:.2f}",
"frames": str(num_frames),
"respiration_times": respiration_times.tolist(),
"duration": f"{time_length:.2f} seconds",
"respiration_rate": f"{rr} breaths/minute",
"plot": plot_img
}
def process_input(video_path):
"""Handle input video processing with error handling"""
try:
if video_path is None:
return ["Please upload a video file."] * 5 + [None]
if not os.path.exists(video_path):
return ["Video file not found."] * 5 + [None]
# Convert video if needed
video_path = convert_video_if_needed(video_path)
result = process_video(video_path)
return [
result["fps"],
result["frames"],
result["respiration_times"],
result["duration"],
result["respiration_rate"],
result["plot"]
]
except Exception as e:
error_msg = f"Error processing video: {str(e)}"
return [error_msg] * 5 + [None]
# Create Gradio interface
interface = gr.Interface(
fn=process_input,
inputs=gr.Video(
label="Upload or Stream Video",
format=["avi", "mp4", "webm", "mov"]
),
outputs=[
gr.Label(label="Frames per second"),
gr.Label(label="Total number of frames"),
gr.JSON(label="Respiration Times"),
gr.Label(label="Time Length"),
gr.Label(label="Respiration Rate (RR)"),
gr.Image(label="Filtered Peaks Plot")
],
title="Respiration Rate Analyzer",
description="🫁 Analyze your respiration rate with ease!",
theme="default",
css="""
body {
background-color: #001f3f;
color: #f0f8ff;
}
.output {
font-size: 16px;
color: #f0f8ff;
}
.label {
font-weight: bold;
color: #0074D9;
}
.plot {
border: 2px solid #0074D9;
border-radius: 10px;
}
.gradio-container {
background-color: #002b57;
}
.output-markdown p {
color: #f0f8ff !important;
}
"""
)
interface.launch()