IFMedTechdemo commited on
Commit
b2dc4cd
·
verified ·
1 Parent(s): 3c1a91c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +79 -108
app.py CHANGED
@@ -5,50 +5,33 @@ from collections import deque
5
  import time
6
  import matplotlib.pyplot as plt
7
  from scipy import signal
8
- from datetime import datetime
9
 
10
- # Length of signal history to display (in samples)
11
  MAX_LEN = 500
12
- FPS = 30 # Approximate frames per second
13
- WINDOW_SIZE = 5 # seconds of data for HR estimation
14
 
15
- # Deques to store PPG samples and timestamps
16
  red_signal = deque(maxlen=MAX_LEN)
17
  green_signal = deque(maxlen=MAX_LEN)
18
  timestamps = deque(maxlen=MAX_LEN)
19
 
20
  def estimate_heart_rate(signal_data, fps=30, window_secs=5):
21
- """
22
- Estimate heart rate from PPG signal using FFT.
23
- Returns BPM (beats per minute)
24
- """
25
  if len(signal_data) < fps * window_secs:
26
  return None, None
27
-
28
- # Normalize the signal
29
  signal_array = np.array(list(signal_data)[-fps*window_secs:])
30
  if signal_array.std() == 0:
31
  return None, None
32
-
33
  signal_norm = (signal_array - signal_array.mean()) / signal_array.std()
34
-
35
- # Apply bandpass filter (40-200 BPM = 0.67-3.33 Hz)
36
  nyquist = fps / 2
37
  low = 0.67 / nyquist
38
  high = 3.33 / nyquist
39
-
40
  if low > 0 and high < 1:
41
  b, a = signal.butter(4, [low, high], btype='band')
42
  signal_filt = signal.filtfilt(b, a, signal_norm)
43
  else:
44
  signal_filt = signal_norm
45
-
46
- # FFT to find dominant frequency
47
  fft = np.fft.fft(signal_filt)
48
  freqs = np.fft.fftfreq(len(fft), d=1/fps)
49
  power = np.abs(fft) ** 2
50
-
51
- # Find peak in valid HR range (40-200 BPM)
52
  valid_range = (freqs >= 0.67) & (freqs <= 3.33)
53
  if valid_range.sum() > 0:
54
  peak_idx = np.argmax(power[valid_range])
@@ -58,150 +41,131 @@ def estimate_heart_rate(signal_data, fps=30, window_secs=5):
58
  else:
59
  bpm = None
60
  confidence = 0
61
-
62
  return bpm, confidence
63
 
64
  def calculate_signal_quality(signal_data):
65
- """
66
- Calculate signal quality based on variance and SNR
67
- Returns quality score 0-100
68
- """
69
  if len(signal_data) < 10:
70
  return 0
71
-
72
  data = np.array(list(signal_data))
73
  std = data.std()
74
  mean = data.mean()
75
-
76
- # Normalize standard deviation by mean
77
  if mean > 0:
78
- cv = (std / mean) * 100 # Coefficient of variation
79
- # Quality score: higher CV is better for PPG, but too high is noise
80
  quality = min(100, max(0, 100 - abs(cv - 10)))
81
  else:
82
  quality = 0
83
-
84
  return quality
85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  def process_frame(frame, state):
87
- """
88
- frame: numpy array (BGR or RGB, Gradio passes RGB by default)
89
- state: dict holding rolling signals and session info
90
- """
91
  if frame is None:
92
  return None, state["red"], state["green"], state["time"]
93
-
94
  img = frame.copy()
95
  h, w, _ = img.shape
96
-
97
- # Define ROI: square with side = 1/5 of frame height, centered
98
  box_size = h // 5
99
  cx, cy = w // 2, h // 2
100
  x1 = cx - box_size // 2
101
  y1 = cy - box_size // 2
102
  x2 = cx + box_size // 2
103
  y2 = cy + box_size // 2
104
-
105
- # Clamp to frame bounds
106
- x1 = max(0, x1); y1 = max(0, y1)
107
- x2 = min(w, x2); y2 = min(h, y2)
108
-
109
- # Draw green bounding box
110
  cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
111
-
112
- # Extract ROI
113
  roi = img[y1:y2, x1:x2, :]
114
-
115
  if roi.size > 0:
116
- # Compute mean intensity for R and G channels (assuming RGB)
117
  roi_float = roi.astype(np.float32)
118
  red_mean = float(np.mean(roi_float[..., 0]))
119
  green_mean = float(np.mean(roi_float[..., 1]))
120
  else:
121
  red_mean = 0.0
122
  green_mean = 0.0
123
-
124
  t = time.time()
125
-
126
- # Update deques
127
  state["red"].append(red_mean)
128
  state["green"].append(green_mean)
129
  state["time"].append(t)
130
  state["frame_count"] += 1
131
-
132
- # Convert to list for plotting
133
  red_list = list(state["red"])
134
  green_list = list(state["green"])
135
  time_list = list(state["time"])
136
-
137
- # Normalize time to start at 0
138
  if len(time_list) > 0:
139
  t0 = time_list[0]
140
  time_list = [ti - t0 for ti in time_list]
141
-
142
- # Update session duration
143
  state["session_duration"] = t - state["session_start"]
144
-
145
  return img, red_list, green_list, time_list
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  def snap(frame, state):
148
- """
149
- Wrapper for Gradio streaming.
150
- Gradio passes latest frame, we update state and return:
151
- - video frame with bounding box
152
- - matplotlib figure with both signals and stats
153
- """
154
  img, red_list, green_list, time_list = process_frame(frame, state)
155
-
156
- # Estimate heart rate from red signal (more reliable)
157
  bpm, confidence = estimate_heart_rate(state["red"], fps=FPS, window_secs=WINDOW_SIZE)
158
-
159
- # Calculate signal quality
160
  red_quality = calculate_signal_quality(state["red"])
161
  green_quality = calculate_signal_quality(state["green"])
162
-
163
- # Create matplotlib figure with stats
164
- fig = plt.figure(figsize=(10, 6))
165
- gs = fig.add_gridspec(2, 2, hspace=0.3, wspace=0.3)
166
-
167
- # Plot 1: PPG signals
168
- ax1 = fig.add_subplot(gs[0, :])
169
- ax1.plot(time_list, red_list, label='Red PPG', color='red', linewidth=1.5)
170
- ax1.plot(time_list, green_list, label='Green PPG', color='green', linewidth=1.5)
171
- ax1.set_xlabel('Time (s)')
172
- ax1.set_ylabel('Intensity')
173
- ax1.set_title('PPG Signals (Red & Green)')
174
- ax1.legend(loc='upper right')
175
- ax1.grid(True, alpha=0.3)
176
-
177
- # Plot 2: HR estimation
178
- ax2 = fig.add_subplot(gs[1, 0])
179
- if bpm is not None:
180
- ax2.text(0.5, 0.7, f'{bpm:.1f}', ha='center', va='top', fontsize=48, fontweight='bold', color='darkred', transform=ax2.transAxes)
181
- ax2.text(0.5, 0.3, 'BPM', ha='center', va='top', fontsize=16, fontweight='bold', color='darkred', transform=ax2.transAxes)
182
- ax2.text(0.5, 0.1, f'Conf: {confidence*100:.0f}%', ha='center', va='top', fontsize=10, color='gray', transform=ax2.transAxes)
183
- else:
184
- ax2.text(0.5, 0.5, 'Estimating...', ha='center', va='center', fontsize=14, color='gray', transform=ax2.transAxes)
185
- ax2.axis('off')
186
- ax2.set_title('Heart Rate')
187
-
188
- # Plot 3: Signal quality and session info
189
- ax3 = fig.add_subplot(gs[1, 1])
190
- stats_text = f'Signal Quality:\n Red: {red_quality:.0f}%\n Green: {green_quality:.0f}%\n\nSession Info:\n Frames: {state["frame_count"]}\n Duration: {state["session_duration"]:.1f}s\n FPS: {state["frame_count"]/max(state["session_duration"], 0.1):.1f}\n Samples: {len(state["red"])}/{MAX_LEN}'
191
- ax3.text(0.1, 0.5, stats_text, ha='left', va='center', fontsize=10, family='monospace', transform=ax3.transAxes)
192
- ax3.axis('off')
193
- ax3.set_title('Monitoring Stats')
194
-
195
- plt.close(fig)
196
-
197
- return img, fig, state
198
 
199
- # Create Gradio interface with continuous streaming
200
  with gr.Blocks(title="Finger Vital Sign Monitor") as demo:
201
  gr.Markdown("# Real-time Finger Vital Sign Monitoring")
202
  gr.Markdown("Place your finger on the camera and keep it steady. The system will continuously monitor PPG signals and estimate heart rate.")
203
 
204
- # Stateful object to store signals and session info
205
  state = gr.State({
206
  "red": red_signal,
207
  "green": green_signal,
@@ -217,13 +181,20 @@ with gr.Blocks(title="Finger Vital Sign Monitor") as demo:
217
  streaming=True,
218
  label="Webcam with ROI",
219
  )
220
- plot = gr.Plot(label="PPG Analysis")
 
 
 
 
 
 
 
 
221
 
222
- # When webcam streams, call snap continuously
223
  cam.stream(
224
  fn=snap,
225
  inputs=[cam, state],
226
- outputs=[cam, plot, state],
227
  )
228
 
229
  if __name__ == "__main__":
 
5
  import time
6
  import matplotlib.pyplot as plt
7
  from scipy import signal
 
8
 
 
9
  MAX_LEN = 500
10
+ FPS = 30
11
+ WINDOW_SIZE = 5
12
 
 
13
  red_signal = deque(maxlen=MAX_LEN)
14
  green_signal = deque(maxlen=MAX_LEN)
15
  timestamps = deque(maxlen=MAX_LEN)
16
 
17
  def estimate_heart_rate(signal_data, fps=30, window_secs=5):
 
 
 
 
18
  if len(signal_data) < fps * window_secs:
19
  return None, None
 
 
20
  signal_array = np.array(list(signal_data)[-fps*window_secs:])
21
  if signal_array.std() == 0:
22
  return None, None
 
23
  signal_norm = (signal_array - signal_array.mean()) / signal_array.std()
 
 
24
  nyquist = fps / 2
25
  low = 0.67 / nyquist
26
  high = 3.33 / nyquist
 
27
  if low > 0 and high < 1:
28
  b, a = signal.butter(4, [low, high], btype='band')
29
  signal_filt = signal.filtfilt(b, a, signal_norm)
30
  else:
31
  signal_filt = signal_norm
 
 
32
  fft = np.fft.fft(signal_filt)
33
  freqs = np.fft.fftfreq(len(fft), d=1/fps)
34
  power = np.abs(fft) ** 2
 
 
35
  valid_range = (freqs >= 0.67) & (freqs <= 3.33)
36
  if valid_range.sum() > 0:
37
  peak_idx = np.argmax(power[valid_range])
 
41
  else:
42
  bpm = None
43
  confidence = 0
 
44
  return bpm, confidence
45
 
46
  def calculate_signal_quality(signal_data):
 
 
 
 
47
  if len(signal_data) < 10:
48
  return 0
 
49
  data = np.array(list(signal_data))
50
  std = data.std()
51
  mean = data.mean()
 
 
52
  if mean > 0:
53
+ cv = (std / mean) * 100
 
54
  quality = min(100, max(0, 100 - abs(cv - 10)))
55
  else:
56
  quality = 0
 
57
  return quality
58
 
59
+ def find_peaks_simple(signal_data, fps=30, min_distance=0.4):
60
+ if len(signal_data) < int(fps * 1.0):
61
+ return []
62
+ data = np.array(signal_data)
63
+ data_norm = (data - data.mean()) / (data.std() + 1e-9)
64
+ min_dist = int(min_distance * fps)
65
+ peaks = []
66
+ for i in range(min_dist, len(data_norm) - min_dist):
67
+ if data_norm[i] > data_norm[i-1] and data_norm[i] > data_norm[i+1]:
68
+ if not peaks or i - peaks[-1] >= min_dist:
69
+ peaks.append(i)
70
+ return peaks
71
+
72
  def process_frame(frame, state):
 
 
 
 
73
  if frame is None:
74
  return None, state["red"], state["green"], state["time"]
 
75
  img = frame.copy()
76
  h, w, _ = img.shape
 
 
77
  box_size = h // 5
78
  cx, cy = w // 2, h // 2
79
  x1 = cx - box_size // 2
80
  y1 = cy - box_size // 2
81
  x2 = cx + box_size // 2
82
  y2 = cy + box_size // 2
83
+ x1 = max(0, x1)
84
+ y1 = max(0, y1)
85
+ x2 = min(w, x2)
86
+ y2 = min(h, y2)
 
 
87
  cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
 
 
88
  roi = img[y1:y2, x1:x2, :]
 
89
  if roi.size > 0:
 
90
  roi_float = roi.astype(np.float32)
91
  red_mean = float(np.mean(roi_float[..., 0]))
92
  green_mean = float(np.mean(roi_float[..., 1]))
93
  else:
94
  red_mean = 0.0
95
  green_mean = 0.0
 
96
  t = time.time()
 
 
97
  state["red"].append(red_mean)
98
  state["green"].append(green_mean)
99
  state["time"].append(t)
100
  state["frame_count"] += 1
 
 
101
  red_list = list(state["red"])
102
  green_list = list(state["green"])
103
  time_list = list(state["time"])
 
 
104
  if len(time_list) > 0:
105
  t0 = time_list[0]
106
  time_list = [ti - t0 for ti in time_list]
 
 
107
  state["session_duration"] = t - state["session_start"]
 
108
  return img, red_list, green_list, time_list
109
 
110
+ def make_plots(time_list, red_list, green_list):
111
+ """2-subplot layout like app_old.py"""
112
+ if len(time_list) < 10:
113
+ return None
114
+ time_array = np.array(time_list)
115
+ red_array = np.array(red_list)
116
+ green_array = np.array(green_list)
117
+ red_norm = (red_array - red_array.mean()) / (red_array.std() + 1e-9)
118
+ green_norm = (green_array - green_array.mean()) / (green_array.std() + 1e-9)
119
+ nyquist = FPS / 2
120
+ low = 0.67 / nyquist
121
+ high = 3.33 / nyquist
122
+ if low > 0 and high < 1:
123
+ b, a = signal.butter(4, [low, high], btype='band')
124
+ red_filt = signal.filtfilt(b, a, red_norm)
125
+ green_filt = signal.filtfilt(b, a, green_norm)
126
+ else:
127
+ red_filt = red_norm
128
+ green_filt = green_norm
129
+ hr_bpm, _ = estimate_heart_rate(green_array, fps=FPS, window_secs=WINDOW_SIZE)
130
+ peaks_idx = find_peaks_simple(green_filt, fps=FPS)
131
+ fig = plt.figure(figsize=(12, 4))
132
+ ax1 = fig.add_subplot(1, 2, 1)
133
+ ax2 = fig.add_subplot(1, 2, 2)
134
+ ax1.plot(time_array, red_filt, label='Red (filtered)', color='red', linewidth=1.5)
135
+ ax1.plot(time_array, green_filt, label='Green (filtered)', color='green', linewidth=1.5)
136
+ ax1.set_xlabel('Time (s)')
137
+ ax1.set_ylabel('Normalized Intensity')
138
+ ax1.set_title('Filtered PPG Signals (Last 5s)')
139
+ ax1.legend(loc='upper right')
140
+ ax1.grid(True, alpha=0.3)
141
+ ax2.plot(time_array, green_filt, label='Green (filtered)', color='green', linewidth=1.5)
142
+ if len(peaks_idx) > 0:
143
+ ax2.scatter(time_array[peaks_idx], green_filt[peaks_idx], color='red', s=50, label='Peaks', zorder=5)
144
+ hr_title = f'Peak Detection (HR ≈ {hr_bpm:.1f} bpm)' if hr_bpm is not None else 'Peak Detection'
145
+ ax2.set_xlabel('Time (s)')
146
+ ax2.set_ylabel('Normalized Intensity')
147
+ ax2.set_title(hr_title)
148
+ ax2.legend(loc='upper right')
149
+ ax2.grid(True, alpha=0.3)
150
+ fig.tight_layout()
151
+ return fig
152
+
153
  def snap(frame, state):
 
 
 
 
 
 
154
  img, red_list, green_list, time_list = process_frame(frame, state)
 
 
155
  bpm, confidence = estimate_heart_rate(state["red"], fps=FPS, window_secs=WINDOW_SIZE)
 
 
156
  red_quality = calculate_signal_quality(state["red"])
157
  green_quality = calculate_signal_quality(state["green"])
158
+ plot_fig = make_plots(time_list, red_list, green_list)
159
+ hr_text = f'{bpm:.1f} bpm' if bpm is not None else 'Measuring...'
160
+ red_quality_text = f'Red: {red_quality:.0f}%'
161
+ green_quality_text = f'Green: {green_quality:.0f}%'
162
+ stats_text = f'Frames: {state["frame_count"]} | Duration: {state["session_duration"]:.1f}s | Samples: {len(state["red"])}/{MAX_LEN}'
163
+ return img, plot_fig, hr_text, red_quality_text, green_quality_text, stats_text, state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
 
 
165
  with gr.Blocks(title="Finger Vital Sign Monitor") as demo:
166
  gr.Markdown("# Real-time Finger Vital Sign Monitoring")
167
  gr.Markdown("Place your finger on the camera and keep it steady. The system will continuously monitor PPG signals and estimate heart rate.")
168
 
 
169
  state = gr.State({
170
  "red": red_signal,
171
  "green": green_signal,
 
181
  streaming=True,
182
  label="Webcam with ROI",
183
  )
184
+ plot = gr.Plot(label="PPG Analysis - Signals & Peaks")
185
+
186
+ with gr.Row():
187
+ hr_output = gr.Textbox(label="Heart Rate (BPM)", value="Measuring...", interactive=False)
188
+ red_quality_output = gr.Textbox(label="Red Channel Quality", value="Measuring...", interactive=False)
189
+ green_quality_output = gr.Textbox(label="Green Channel Quality", value="Measuring...", interactive=False)
190
+
191
+ with gr.Row():
192
+ stats_output = gr.Textbox(label="Session Statistics", value="Waiting for data...", interactive=False)
193
 
 
194
  cam.stream(
195
  fn=snap,
196
  inputs=[cam, state],
197
+ outputs=[cam, plot, hr_output, red_quality_output, green_quality_output, stats_output, state],
198
  )
199
 
200
  if __name__ == "__main__":