rppg / app.py
tbdavid2019's picture
0.0.0.0
47f1f27
#!/usr/bin/env python3
"""
rPPG Heart Rate Estimation using OpenCV and POS algorithm
"""
import os
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
import gradio as gr
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from scipy.fft import fft, fftfreq
import tempfile
import time
from tqdm import tqdm
class SimpleRPPG:
def __init__(self, min_bpm=45, max_bpm=180):
self.min_bpm = min_bpm
self.max_bpm = max_bpm
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def detect_faces(self, frame):
"""Detect faces using OpenCV Haar cascades"""
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
# Try multiple parameter sets for better detection
param_sets = [
{"scaleFactor": 1.1, "minNeighbors": 5, "minSize": (50, 50)},
{"scaleFactor": 1.05, "minNeighbors": 3, "minSize": (30, 30)},
{"scaleFactor": 1.2, "minNeighbors": 6, "minSize": (60, 60)},
]
for params in param_sets:
faces = self.face_cascade.detectMultiScale(gray, **params)
if len(faces) > 0:
return faces
return []
def extract_roi_signal(self, frame, face_box):
"""Extract ROI and compute mean RGB values"""
x, y, w, h = face_box
# Define ROI (forehead and cheek areas)
roi_y1 = y + int(0.2 * h)
roi_y2 = y + int(0.7 * h)
roi_x1 = x + int(0.15 * w)
roi_x2 = x + int(0.85 * w)
roi = frame[roi_y1:roi_y2, roi_x1:roi_x2]
if roi.size == 0:
return None
# Calculate mean RGB values
mean_rgb = np.mean(roi, axis=(0, 1))
return mean_rgb
def pos_algorithm(self, rgb_signals, fps):
"""POS (Plane-Orthogonal-to-Skin) algorithm"""
if len(rgb_signals) < 30: # Need at least 1 second of data at 30fps
return None, None
rgb_signals = np.array(rgb_signals)
# Normalize RGB signals
mean_rgb = np.mean(rgb_signals, axis=0)
normalized_rgb = rgb_signals / mean_rgb
# POS algorithm
X1 = normalized_rgb[:, 0] - normalized_rgb[:, 1] # R - G
X2 = normalized_rgb[:, 0] + normalized_rgb[:, 1] - 2 * normalized_rgb[:, 2] # R + G - 2B
# Temporal filtering (bandpass)
low_freq = self.min_bpm / 60.0
high_freq = self.max_bpm / 60.0
sos = signal.butter(4, [low_freq, high_freq], btype='band', fs=fps, output='sos')
X1_filtered = signal.sosfilt(sos, X1)
X2_filtered = signal.sosfilt(sos, X2)
# POS combination
alpha = np.std(X1_filtered) / np.std(X2_filtered)
pulse_signal = X1_filtered - alpha * X2_filtered
return pulse_signal, self.estimate_heart_rate(pulse_signal, fps)
def estimate_heart_rate(self, pulse_signal, fps):
"""Estimate heart rate using FFT"""
if len(pulse_signal) < fps: # Need at least 1 second
return None
# Apply window function
windowed_signal = pulse_signal * signal.windows.hann(len(pulse_signal))
# FFT
freqs = fftfreq(len(windowed_signal), 1/fps)
fft_values = np.abs(fft(windowed_signal))
# Find frequency range corresponding to heart rate
min_freq = self.min_bpm / 60.0
max_freq = self.max_bpm / 60.0
valid_indices = (freqs >= min_freq) & (freqs <= max_freq)
if not np.any(valid_indices):
return None
valid_freqs = freqs[valid_indices]
valid_fft = fft_values[valid_indices]
# Find peak frequency
peak_idx = np.argmax(valid_fft)
peak_freq = valid_freqs[peak_idx]
heart_rate = peak_freq * 60.0
# Confidence based on peak prominence
confidence = np.max(valid_fft) / np.mean(valid_fft)
confidence = min(confidence / 10.0, 1.0) # Normalize to 0-1
return {"hr": heart_rate, "confidence": confidence}
def process_video(self, video_path, window_seconds=10.0, step_seconds=2.0, conf_threshold=0.3, progress_callback=None):
"""Process video and extract heart rate"""
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0 or total_frames <= 0:
return [], [], []
window_frames = int(window_seconds * fps)
step_frames = int(step_seconds * fps)
results_time = []
results_hr = []
results_conf = []
frame_buffer = []
rgb_buffer = []
frame_idx = 0
processed_chunks = 0
# Console progress bar
pbar = tqdm(total=total_frames, desc="Processing video", unit="frames")
# First check for face detection
if progress_callback:
progress_callback(0.1, "🔍 檢測人臉中...")
face_found = False
for i in range(0, min(300, total_frames), 30): # Check first 10 seconds
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
faces = self.detect_faces(rgb_frame)
if len(faces) > 0:
face_found = True
if progress_callback:
progress_callback(0.15, f"✅ 在第 {i} 幀 ({i/fps:.1f}秒) 檢測到人臉!")
break
if not face_found:
if progress_callback:
progress_callback(0.15, "⚠️ 未檢測到人臉,繼續處理...")
# Reset to beginning and process in chunks
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
estimated_chunks = max(1, (total_frames - window_frames) // step_frames + 1)
pbar.reset(total=estimated_chunks)
pbar.set_description("Processing chunks")
processed_chunks = 0
# Process video in chunks (much more efficient)
for chunk_start in range(0, total_frames - window_frames + 1, step_frames):
chunk_frames = []
# Read a batch of frames for this chunk
cap.set(cv2.CAP_PROP_POS_FRAMES, chunk_start)
batch_frames = []
# Read all frames for this window at once
for i in range(window_frames):
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
batch_frames.append(rgb_frame)
# Detect face only in the first frame of the batch
if len(batch_frames) > 0:
faces = self.detect_faces(batch_frames[0])
if len(faces) > 0:
current_face_box = max(faces, key=lambda x: x[2] * x[3])
# Extract signals from all frames using the same face box
for rgb_frame in batch_frames:
rgb_signal = self.extract_roi_signal(rgb_frame, current_face_box)
if rgb_signal is not None:
chunk_frames.append(rgb_signal)
# Process this chunk if we have enough data
if len(chunk_frames) >= fps: # Need at least 1 second of data
pulse_signal, hr_result = self.pos_algorithm(chunk_frames, fps)
if hr_result is not None and hr_result["hr"] > 0 and hr_result["confidence"] >= conf_threshold:
t_sec = (chunk_start + window_frames // 2) / fps # Center time of window
results_time.append(t_sec)
results_hr.append(hr_result["hr"])
results_conf.append(hr_result["confidence"])
print(f"✅ Chunk {processed_chunks + 1}: HR = {hr_result['hr']:.1f} BPM at {t_sec:.1f}s")
processed_chunks += 1
pbar.update(1)
# Update Gradio progress
if progress_callback:
progress_val = 0.15 + (processed_chunks / estimated_chunks) * 0.7
if len(results_hr) > 0:
progress_callback(progress_val, f"💓 找到 {len(results_hr)} 個心率測量值")
else:
progress_callback(progress_val, f"處理第 {processed_chunks}/{estimated_chunks} 段...")
# Early termination if we have enough successful measurements
if len(results_hr) >= 10: # Stop if we have 10 good measurements
print(f"✅ Early termination: Found {len(results_hr)} measurements")
break
cap.release()
pbar.close() # Close console progress bar
if progress_callback:
progress_callback(1.0, f"完成!找到 {len(results_hr)} 個心率測量值")
return results_time, results_hr, results_conf
def quick_face_check(video_path, progress=None):
"""Quick face detection check"""
if not video_path:
return "請先上傳影片檔案"
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
if progress:
progress(0.1, "🎬 開始檢查影片...")
# 載入 OpenCV 人臉檢測器
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# Console progress bar for face detection
face_pbar = tqdm(total=total_frames//15, desc="Face detection", unit="frames")
face_detected = False
face_found_at_frame = None
for i in range(0, total_frames, 15): # 每隔15幀檢查一次
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 嘗試多種參數組合
param_sets = [
{"scaleFactor": 1.1, "minNeighbors": 5, "minSize": (30, 30)},
{"scaleFactor": 1.05, "minNeighbors": 3, "minSize": (20, 20)},
{"scaleFactor": 1.2, "minNeighbors": 6, "minSize": (40, 40)},
]
faces_found = False
for params in param_sets:
faces = face_cascade.detectMultiScale(gray, **params)
if len(faces) > 0:
faces_found = True
face_detected = True
face_found_at_frame = i
time_stamp = i / fps
if progress:
progress(0.8, f"✅ 在第 {i} 幀 ({time_stamp:.1f}秒) 檢測到 {len(faces)} 個人臉!")
break
if faces_found:
break
face_pbar.update(1) # Update console progress bar
# 更新檢測進度
if progress and i % 150 == 0:
detection_progress = 0.1 + min((i / total_frames) * 0.7, 0.7)
current_time = i / fps
progress(detection_progress, f"🔍 檢測人臉中... 已檢查到 {current_time:.1f}秒")
cap.release()
face_pbar.close() # Close console progress bar
if face_detected:
success_msg = f"✅ 成功!在第 {face_found_at_frame} 幀 ({face_found_at_frame/fps:.1f}秒) 檢測到人臉"
if progress:
progress(1.0, success_msg)
return success_msg + "\n\n💡 這個影片適合進行心率分析!"
else:
fail_msg = "❌ 整個影片中未檢測到人臉"
if progress:
progress(1.0, fail_msg)
return fail_msg + "\n\n📋 建議:\n• 確保影片中有清晰的正面人臉\n• 檢查光線是否充足\n• 避免過度的頭部移動"
def process_video(video_path, method, window, step, min_bpm, max_bpm, conf, progress=gr.Progress()):
"""Process video and extract heart rate"""
if not video_path:
return "請上傳影片檔案", None, None
start_time = time.time()
print(f"🚀 開始處理影片: {video_path}")
# Initialize rPPG processor
rppg = SimpleRPPG(min_bpm=min_bpm, max_bpm=max_bpm)
# Process video
ts, hr, cf = rppg.process_video(
video_path,
window_seconds=window,
step_seconds=step,
conf_threshold=conf,
progress_callback=progress
)
processing_time = time.time() - start_time
print(f"⏱️ 處理完成!耗時: {processing_time:.1f} 秒,找到 {len(hr)} 個心率測量值")
if not hr:
return f"未檢測到心率數據。處理時間: {processing_time:.1f}秒", None, None
# Create CSV
csv_content = "time_sec,hr_bpm,confidence\n"
for a, b, c in zip(ts, hr, cf):
csv_content += f"{a:.2f},{b:.2f},{c:.3f}\n"
# Create plot
plt.figure(figsize=(10, 4))
plt.plot(ts, hr, 'b-', linewidth=2)
plt.xlabel("Time (s)")
plt.ylabel("Heart Rate (bpm)")
plt.title(f"Heart Rate Estimation (Avg: {np.mean(hr):.1f} BPM)")
plt.grid(True)
plt.tight_layout()
# Save plot to temp file
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
plt.savefig(tmp.name, dpi=150, bbox_inches='tight')
plot_path = tmp.name
plt.close()
# Save CSV to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp:
tmp.write(csv_content)
csv_path = tmp.name
result_msg = f"✅ 成功分析!\n平均心率: {np.mean(hr):.1f} BPM\n測量點數: {len(hr)}\n處理時間: {processing_time:.1f} 秒"
return result_msg, plot_path, csv_path
# Gradio interface
with gr.Blocks(title="rPPG Heart Rate Analysis") as demo:
gr.Markdown("# rPPG Heart Rate Analysis")
gr.Markdown("Upload a video to estimate heart rate using computer vision.")
with gr.Tabs():
with gr.Tab("Heart Rate Analysis"):
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Upload Video")
with gr.Row():
method_select = gr.Dropdown(
choices=["POS"],
value="POS",
label="Method"
)
conf_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.3,
step=0.1,
label="Confidence Threshold"
)
with gr.Row():
window_slider = gr.Slider(
minimum=5.0,
maximum=30.0,
value=10.0,
step=1.0,
label="Window (sec)"
)
step_slider = gr.Slider(
minimum=0.5,
maximum=5.0,
value=2.0,
step=0.5,
label="Step (sec)"
)
with gr.Row():
min_bpm = gr.Slider(
minimum=30,
maximum=100,
value=45,
step=5,
label="Min BPM"
)
max_bpm = gr.Slider(
minimum=100,
maximum=200,
value=180,
step=5,
label="Max BPM"
)
process_btn = gr.Button("Process Video", variant="primary", size="lg")
with gr.Column():
result_text = gr.Textbox(label="Results", lines=4)
plot_output = gr.Image(label="Heart Rate Plot")
csv_output = gr.File(label="Download CSV Data")
with gr.Tab("Face Detection Test"):
with gr.Row():
with gr.Column():
test_video_input = gr.Video(label="Upload Video for Face Test")
check_btn = gr.Button("Test Face Detection", variant="secondary", size="lg")
with gr.Column():
check_result = gr.Textbox(label="Face Detection Results", lines=8)
# Connect functions
process_btn.click(
fn=process_video,
inputs=[video_input, method_select, window_slider, step_slider, min_bpm, max_bpm, conf_slider],
outputs=[result_text, plot_output, csv_output],
show_progress=True
)
check_btn.click(
fn=quick_face_check,
inputs=[test_video_input],
outputs=[check_result],
show_progress=True
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)