seesaw112233 commited on
Commit
1e8a048
·
verified ·
1 Parent(s): 8d2db9a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +241 -151
app.py CHANGED
@@ -4,12 +4,45 @@ import json
4
  import tempfile
5
  from dataclasses import dataclass
6
  from typing import Dict, List, Tuple, Optional
 
7
 
8
  import cv2
9
  import numpy as np
10
  import pandas as pd
11
  import gradio as gr
12
  import mediapipe as mp
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
 
15
  # -------------------------
@@ -53,70 +86,87 @@ def angle_3pts(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> Optional[float]:
53
  # -------------------------
54
  # MediaPipe indices
55
  # -------------------------
56
- # FaceMesh landmarks for EAR (common set)
57
  LEFT_EYE_EAR_IDX = [33, 160, 158, 133, 153, 144]
58
  RIGHT_EYE_EAR_IDX = [362, 385, 387, 263, 373, 380]
59
 
60
- # Pose landmark enum mapping (MediaPipe Pose)
61
- POSE = mp.solutions.pose
62
- POSE_LM = POSE.PoseLandmark
63
-
64
- # Key joints for limb movement/angles
65
- JOINTS = {
66
- "left_wrist": POSE_LM.LEFT_WRIST.value,
67
- "right_wrist": POSE_LM.RIGHT_WRIST.value,
68
- "left_ankle": POSE_LM.LEFT_ANKLE.value,
69
- "right_ankle": POSE_LM.RIGHT_ANKLE.value,
70
-
71
- "left_shoulder": POSE_LM.LEFT_SHOULDER.value,
72
- "right_shoulder": POSE_LM.RIGHT_SHOULDER.value,
73
- "left_elbow": POSE_LM.LEFT_ELBOW.value,
74
- "right_elbow": POSE_LM.RIGHT_ELBOW.value,
75
-
76
- "left_hip": POSE_LM.LEFT_HIP.value,
77
- "right_hip": POSE_LM.RIGHT_HIP.value,
78
- "left_knee": POSE_LM.LEFT_KNEE.value,
79
- "right_knee": POSE_LM.RIGHT_KNEE.value,
80
  }
81
 
82
 
83
  # -------------------------
84
- # Drawing
85
  # -------------------------
86
- mp_drawing = mp.solutions.drawing_utils
87
- mp_drawing_styles = mp.solutions.drawing_styles
88
- mp_face_mesh = mp.solutions.face_mesh
89
 
90
- def draw_pose(image_bgr, pose_results):
91
- if pose_results.pose_landmarks:
92
- mp_drawing.draw_landmarks(
93
- image_bgr,
94
- pose_results.pose_landmarks,
95
- POSE.POSE_CONNECTIONS,
96
- landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style(),
97
- )
98
 
99
- def draw_face(image_bgr, face_results, draw_full_mesh: bool = False):
100
- if not face_results.multi_face_landmarks:
 
101
  return
102
- for face_landmarks in face_results.multi_face_landmarks:
103
- if draw_full_mesh:
104
- # full mesh (dense) - heavier visually
105
- mp_drawing.draw_landmarks(
106
- image_bgr,
107
- face_landmarks,
108
- mp_face_mesh.FACEMESH_TESSELATION,
109
- landmark_drawing_spec=None,
110
- connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_tesselation_style(),
111
- )
112
- # contours are enough for most
113
  mp_drawing.draw_landmarks(
114
- image_bgr,
115
- face_landmarks,
116
- mp_face_mesh.FACEMESH_CONTOURS,
117
  landmark_drawing_spec=None,
118
- connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_contours_style(),
119
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
 
121
 
122
  # -------------------------
@@ -135,7 +185,6 @@ def update_blink(state: BlinkState, ear: Optional[float], thr: float, min_consec
135
  - when ear goes back above => blink end (count once)
136
  """
137
  if ear is None:
138
- # treat missing as no-update
139
  return state
140
 
141
  if ear < thr:
@@ -151,23 +200,25 @@ def update_blink(state: BlinkState, ear: Optional[float], thr: float, min_consec
151
 
152
 
153
  # -------------------------
154
- # Core processing
155
  # -------------------------
156
  def process_video(
157
  video_path: str,
158
- pose_model_complexity: int = 1,
 
159
  min_pose_det_conf: float = 0.5,
160
  min_pose_track_conf: float = 0.5,
161
- min_face_det_conf: float = 0.5,
162
  ear_threshold: float = 0.21,
163
  blink_min_consec: int = 2,
164
  draw_full_face_mesh: bool = False,
165
- max_frames: int = 0, # 0 => all
166
  ) -> Tuple[str, str, str, str]:
167
  """
168
- Returns:
169
- annotated_video_path, csv_path, json_path, report_md
170
  """
 
 
 
171
  cap = cv2.VideoCapture(video_path)
172
  if not cap.isOpened():
173
  raise RuntimeError("Cannot open video. Please upload a valid video file.")
@@ -179,7 +230,7 @@ def process_video(
179
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
180
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
181
 
182
- # output paths
183
  tmpdir = tempfile.mkdtemp(prefix="mp_analysis_")
184
  out_video = os.path.join(tmpdir, "annotated.mp4")
185
  out_csv = os.path.join(tmpdir, "per_frame_metrics.csv")
@@ -189,23 +240,42 @@ def process_video(
189
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
190
  writer = cv2.VideoWriter(out_video, fourcc, fps, (width, height))
191
 
192
- # MediaPipe init - using legacy API (works without model downloads)
193
- with mp.solutions.pose.Pose(
194
- static_image_mode=False,
195
- model_complexity=pose_model_complexity,
196
- enable_segmentation=False,
197
- min_detection_confidence=min_pose_det_conf,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
  min_tracking_confidence=min_pose_track_conf,
199
- ) as pose, mp_face_mesh.FaceMesh(
200
- static_image_mode=False,
201
- max_num_faces=1,
202
- refine_landmarks=True, # improves eye landmarks
203
- min_detection_confidence=min_face_det_conf,
204
- min_tracking_confidence=min_face_det_conf,
205
- ) as face_mesh:
206
 
207
  rows = []
208
- prev_pts = {} # for movement delta (normalized coordinates)
209
  left_blink = BlinkState()
210
  right_blink = BlinkState()
211
 
@@ -218,43 +288,53 @@ def process_video(
218
  if max_frames and frame_idx > max_frames:
219
  break
220
 
 
221
  frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
 
 
 
 
222
 
223
- pose_res = pose.process(frame_rgb)
224
- face_res = face_mesh.process(frame_rgb)
 
225
 
226
- # Extract face landmarks (pixel coords)
227
  face_pts: Dict[int, np.ndarray] = {}
228
- if face_res.multi_face_landmarks:
229
- lm = face_res.multi_face_landmarks[0].landmark
230
- for i in range(len(lm)):
231
- face_pts[i] = np.array([lm[i].x * width, lm[i].y * height], dtype=np.float32)
 
232
 
233
- # EAR
234
  left_ear = eye_aspect_ratio(face_pts, LEFT_EYE_EAR_IDX)
235
  right_ear = eye_aspect_ratio(face_pts, RIGHT_EYE_EAR_IDX)
236
 
237
  left_blink = update_blink(left_blink, left_ear, ear_threshold, blink_min_consec)
238
  right_blink = update_blink(right_blink, right_ear, ear_threshold, blink_min_consec)
239
 
240
- # Extract pose landmarks (normalized coords + pixel)
241
  pose_norm: Dict[str, Optional[np.ndarray]] = {}
242
  pose_px: Dict[str, Optional[np.ndarray]] = {}
243
- if pose_res.pose_landmarks:
244
- lms = pose_res.pose_landmarks.landmark
245
- for name, idx in JOINTS.items():
246
- if idx < len(lms):
247
- pose_norm[name] = np.array([lms[idx].x, lms[idx].y], dtype=np.float32)
248
- pose_px[name] = np.array([lms[idx].x * width, lms[idx].y * height], dtype=np.float32)
 
 
 
249
  else:
250
  pose_norm[name] = None
251
  pose_px[name] = None
252
  else:
253
- for name in JOINTS:
254
  pose_norm[name] = None
255
  pose_px[name] = None
256
 
257
- # Limb movement: per-frame displacement & speed (in normalized units)
258
  def movement_metrics(key: str):
259
  cur = pose_norm.get(key)
260
  if cur is None:
@@ -273,7 +353,7 @@ def process_video(
273
  la_d, la_v = movement_metrics("left_ankle")
274
  ra_d, ra_v = movement_metrics("right_ankle")
275
 
276
- # Joint angles (pixel coords for stability)
277
  def get_angle(a, b, c):
278
  if a is None or b is None or c is None:
279
  return None
@@ -285,19 +365,19 @@ def process_video(
285
  right_knee_ang = get_angle(pose_px["right_hip"], pose_px["right_knee"], pose_px["right_ankle"])
286
 
287
  # Draw overlays
288
- draw_pose(frame_bgr, pose_res)
289
- draw_face(frame_bgr, face_res, draw_full_mesh=draw_full_face_mesh)
290
 
291
  # HUD text
292
  hud_lines = [
293
- f"frame: {frame_idx}/{total_frames if total_frames>0 else '?'} fps:{fps:.1f}",
294
  f"EAR L:{left_ear:.3f}" if left_ear is not None else "EAR L:None",
295
  f"EAR R:{right_ear:.3f}" if right_ear is not None else "EAR R:None",
296
- f"Blink L:{left_blink.blink_count} R:{right_blink.blink_count}",
297
  ]
298
  y0 = 24
299
  for line in hud_lines:
300
- cv2.putText(frame_bgr, line, (12, y0), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
301
  y0 += 22
302
 
303
  writer.write(frame_bgr)
@@ -305,20 +385,16 @@ def process_video(
305
  rows.append({
306
  "frame": frame_idx,
307
  "time_s": (frame_idx - 1) / fps,
308
-
309
  "left_ear": left_ear,
310
  "right_ear": right_ear,
311
-
312
  "lw_disp": lw_d,
313
  "rw_disp": rw_d,
314
  "la_disp": la_d,
315
  "ra_disp": ra_d,
316
-
317
  "lw_speed": lw_v,
318
  "rw_speed": rw_v,
319
  "la_speed": la_v,
320
  "ra_speed": ra_v,
321
-
322
  "left_elbow_angle": left_elbow_ang,
323
  "right_elbow_angle": right_elbow_ang,
324
  "left_knee_angle": left_knee_ang,
@@ -337,7 +413,6 @@ def process_video(
337
  return {"mean": None, "min": None, "max": None}
338
  return {"mean": float(s2.mean()), "min": float(s2.min()), "max": float(s2.max())}
339
 
340
- # movement totals in normalized units (roughly proportional)
341
  summary = {
342
  "video": {
343
  "fps": float(fps),
@@ -383,34 +458,36 @@ def process_video(
383
  with open(out_json, "w", encoding="utf-8") as f:
384
  json.dump(summary, f, ensure_ascii=False, indent=2)
385
 
386
- report_md = f"""# MediaPipe Pose + FaceLandmarks Analysis Report
387
 
388
- ## Video Information
389
- - Resolution: {width} x {height}
390
  - FPS: {fps:.2f}
391
- - Frames Processed: {len(df)}
392
- - Duration (seconds): {summary["video"]["duration_s"]:.2f}
393
-
394
- ## Blink Analysis (EAR)
395
- - Threshold: {ear_threshold}
396
- - Minimum Consecutive Frames: {blink_min_consec}
397
- - Left Eye Blinks: {summary["blink"]["left_blinks"]} ({summary["blink"]["left_blinks_per_min"]:.2f} blinks/min)
398
- - Right Eye Blinks: {summary["blink"]["right_blinks"]} ({summary["blink"]["right_blinks_per_min"]:.2f} blinks/min)
399
- - Left Eye EAR: mean={summary["blink"]["left_ear_stats"]["mean"]} min={summary["blink"]["left_ear_stats"]["min"]} max={summary["blink"]["left_ear_stats"]["max"]}
400
- - Right Eye EAR: mean={summary["blink"]["right_ear_stats"]["mean"]} min={summary["blink"]["right_ear_stats"]["min"]} max={summary["blink"]["right_ear_stats"]["max"]}
401
-
402
- ## Limb Movement (normalized units)
403
- > Displacement/speed based on normalized coordinates (0~1), suitable for relative comparison and trend analysis.
404
- - Total Displacement (higher = more movement):
405
- - Left Wrist: {summary["limb_movement"]["total_disp"]["left_wrist"]:.6f}
406
- - Right Wrist: {summary["limb_movement"]["total_disp"]["right_wrist"]:.6f}
407
- - Left Ankle: {summary["limb_movement"]["total_disp"]["left_ankle"]:.6f}
408
- - Right Ankle: {summary["limb_movement"]["total_disp"]["right_ankle"]:.6f}
409
-
410
- ## Output Files
411
- - annotated.mp4: Video with Pose and FaceMesh overlays
412
- - per_frame_metrics.csv: Frame-by-frame metrics (EAR / displacement / speed / joint angles)
413
- - summary.json: Statistical summary
 
 
414
  """
415
  with open(out_report, "w", encoding="utf-8") as f:
416
  f.write(report_md)
@@ -423,16 +500,15 @@ def process_video(
423
  # -------------------------
424
  def ui_process(
425
  video,
426
- pose_model_complexity,
 
427
  min_pose_det_conf,
428
  min_pose_track_conf,
429
- min_face_det_conf,
430
  ear_threshold,
431
  blink_min_consec,
432
  draw_full_face_mesh,
433
  max_frames
434
  ):
435
- # video may be dict in some gradio versions
436
  if isinstance(video, dict) and "path" in video:
437
  video_path = video["path"]
438
  else:
@@ -441,64 +517,78 @@ def ui_process(
441
  try:
442
  out_video, out_csv, out_json, out_report = process_video(
443
  video_path=str(video_path),
444
- pose_model_complexity=int(pose_model_complexity),
 
445
  min_pose_det_conf=float(min_pose_det_conf),
446
  min_pose_track_conf=float(min_pose_track_conf),
447
- min_face_det_conf=float(min_face_det_conf),
448
  ear_threshold=float(ear_threshold),
449
  blink_min_consec=int(blink_min_consec),
450
  draw_full_face_mesh=bool(draw_full_face_mesh),
451
  max_frames=int(max_frames),
452
  )
453
 
454
- # Show report text + return files
455
  with open(out_report, "r", encoding="utf-8") as f:
456
  report_text = f.read()
457
 
458
  return out_video, out_csv, out_json, report_text
459
 
460
  except Exception as e:
461
- error_msg = f"# Error Processing Video\n\n{str(e)}"
 
462
  return None, None, None, error_msg
463
 
464
 
465
- demo = gr.Blocks(title="Video Pose + FaceLandmarks + Blink/Limb Analytics")
466
 
467
  with demo:
468
- gr.Markdown("## Upload Video → MediaPipe Pose + FaceMesh → Limb Movement & Blink Quantification (EAR)")
 
 
 
 
 
 
 
 
 
469
 
470
  with gr.Row():
471
- video_in = gr.Video(label="Upload Video")
472
 
473
- with gr.Accordion("Parameters (defaults work well)", open=False):
474
- pose_model_complexity = gr.Radio([0, 1, 2], value=1, label="Pose model_complexity (0=fast / 2=accurate)")
475
- min_pose_det_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Pose min_detection_confidence")
476
- min_pose_track_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Pose min_tracking_confidence")
477
- min_face_det_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="Face min_detection_confidence")
 
 
 
478
 
479
- ear_threshold = gr.Slider(0.10, 0.35, value=0.21, step=0.01, label="Blink Threshold EAR (lower = stricter)")
480
- blink_min_consec = gr.Slider(1, 6, value=2, step=1, label="Blink Min Consecutive Frames (anti-jitter)")
 
481
 
482
- draw_full_face_mesh = gr.Checkbox(value=False, label="Overlay Full FaceMesh (denser/slower)")
483
- max_frames = gr.Number(value=0, precision=0, label="Max Frames to Process (0=all, set 300 for debugging)")
 
484
 
485
- run_btn = gr.Button("Start Analysis", variant="primary")
486
 
487
  with gr.Row():
488
- video_out = gr.Video(label="Output: Annotated Video")
489
  with gr.Row():
490
- csv_out = gr.File(label="Per-Frame Metrics CSV")
491
- json_out = gr.File(label="Summary JSON")
492
  report_out = gr.Markdown()
493
 
494
  run_btn.click(
495
  fn=ui_process,
496
  inputs=[
497
  video_in,
498
- pose_model_complexity,
 
499
  min_pose_det_conf,
500
  min_pose_track_conf,
501
- min_face_det_conf,
502
  ear_threshold,
503
  blink_min_consec,
504
  draw_full_face_mesh,
 
4
  import tempfile
5
  from dataclasses import dataclass
6
  from typing import Dict, List, Tuple, Optional
7
+ import urllib.request
8
 
9
  import cv2
10
  import numpy as np
11
  import pandas as pd
12
  import gradio as gr
13
  import mediapipe as mp
14
+ from mediapipe import solutions
15
+ from mediapipe.framework.formats import landmark_pb2
16
+ from mediapipe.tasks import python
17
+ from mediapipe.tasks.python import vision
18
+
19
+
20
+ # -------------------------
21
+ # Model download helper
22
+ # -------------------------
23
+ def download_models():
24
+ """Download required MediaPipe models if not present"""
25
+ models_dir = "/tmp/mediapipe_models"
26
+ os.makedirs(models_dir, exist_ok=True)
27
+
28
+ models = {
29
+ "face_landmarker": {
30
+ "url": "https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task",
31
+ "path": os.path.join(models_dir, "face_landmarker.task")
32
+ },
33
+ "pose_landmarker": {
34
+ "url": "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_heavy/float16/1/pose_landmarker_heavy.task",
35
+ "path": os.path.join(models_dir, "pose_landmarker_heavy.task")
36
+ }
37
+ }
38
+
39
+ for model_name, model_info in models.items():
40
+ if not os.path.exists(model_info["path"]):
41
+ print(f"Downloading {model_name}...")
42
+ urllib.request.urlretrieve(model_info["url"], model_info["path"])
43
+ print(f"✓ Downloaded {model_name}")
44
+
45
+ return models["face_landmarker"]["path"], models["pose_landmarker"]["path"]
46
 
47
 
48
  # -------------------------
 
86
  # -------------------------
87
  # MediaPipe indices
88
  # -------------------------
89
+ # FaceMesh landmarks for EAR (same indices work for new API)
90
  LEFT_EYE_EAR_IDX = [33, 160, 158, 133, 153, 144]
91
  RIGHT_EYE_EAR_IDX = [362, 385, 387, 263, 373, 380]
92
 
93
+ # Pose landmark indices for new API
94
+ POSE_LANDMARKS = {
95
+ "left_wrist": 15,
96
+ "right_wrist": 16,
97
+ "left_ankle": 27,
98
+ "right_ankle": 28,
99
+ "left_shoulder": 11,
100
+ "right_shoulder": 12,
101
+ "left_elbow": 13,
102
+ "right_elbow": 14,
103
+ "left_hip": 23,
104
+ "right_hip": 24,
105
+ "left_knee": 25,
106
+ "right_knee": 26,
 
 
 
 
 
 
107
  }
108
 
109
 
110
  # -------------------------
111
+ # Drawing helpers for new API
112
  # -------------------------
113
+ mp_drawing = solutions.drawing_utils
114
+ mp_drawing_styles = solutions.drawing_styles
 
115
 
116
+ # Face mesh connections
117
+ FACEMESH_TESSELATION = solutions.face_mesh.FACEMESH_TESSELATION
118
+ FACEMESH_CONTOURS = solutions.face_mesh.FACEMESH_CONTOURS
119
+
120
+ # Pose connections
121
+ POSE_CONNECTIONS = solutions.pose.POSE_CONNECTIONS
 
 
122
 
123
+ def draw_face_landmarks(image, face_landmarks, draw_full_mesh=False):
124
+ """Draw face landmarks on image using new API format"""
125
+ if face_landmarks is None:
126
  return
127
+
128
+ # Convert to landmark_pb2 format for drawing
129
+ face_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
130
+ face_landmarks_proto.landmark.extend([
131
+ landmark_pb2.NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z)
132
+ for lm in face_landmarks
133
+ ])
134
+
135
+ if draw_full_mesh:
 
 
136
  mp_drawing.draw_landmarks(
137
+ image=image,
138
+ landmark_list=face_landmarks_proto,
139
+ connections=FACEMESH_TESSELATION,
140
  landmark_drawing_spec=None,
141
+ connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_tesselation_style()
142
  )
143
+
144
+ mp_drawing.draw_landmarks(
145
+ image=image,
146
+ landmark_list=face_landmarks_proto,
147
+ connections=FACEMESH_CONTOURS,
148
+ landmark_drawing_spec=None,
149
+ connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_contours_style()
150
+ )
151
+
152
+ def draw_pose_landmarks(image, pose_landmarks):
153
+ """Draw pose landmarks on image using new API format"""
154
+ if pose_landmarks is None:
155
+ return
156
+
157
+ # Convert to landmark_pb2 format for drawing
158
+ pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
159
+ pose_landmarks_proto.landmark.extend([
160
+ landmark_pb2.NormalizedLandmark(x=lm.x, y=lm.y, z=lm.z)
161
+ for lm in pose_landmarks
162
+ ])
163
+
164
+ mp_drawing.draw_landmarks(
165
+ image=image,
166
+ landmark_list=pose_landmarks_proto,
167
+ connections=POSE_CONNECTIONS,
168
+ landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
169
+ )
170
 
171
 
172
  # -------------------------
 
185
  - when ear goes back above => blink end (count once)
186
  """
187
  if ear is None:
 
188
  return state
189
 
190
  if ear < thr:
 
200
 
201
 
202
  # -------------------------
203
+ # Core processing with new API
204
  # -------------------------
205
  def process_video(
206
  video_path: str,
207
+ min_face_det_conf: float = 0.5,
208
+ min_face_track_conf: float = 0.5,
209
  min_pose_det_conf: float = 0.5,
210
  min_pose_track_conf: float = 0.5,
 
211
  ear_threshold: float = 0.21,
212
  blink_min_consec: int = 2,
213
  draw_full_face_mesh: bool = False,
214
+ max_frames: int = 0,
215
  ) -> Tuple[str, str, str, str]:
216
  """
217
+ Process video using new MediaPipe API with GPU support
 
218
  """
219
+ # Download models first
220
+ face_model_path, pose_model_path = download_models()
221
+
222
  cap = cv2.VideoCapture(video_path)
223
  if not cap.isOpened():
224
  raise RuntimeError("Cannot open video. Please upload a valid video file.")
 
230
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
231
  total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
232
 
233
+ # Output paths
234
  tmpdir = tempfile.mkdtemp(prefix="mp_analysis_")
235
  out_video = os.path.join(tmpdir, "annotated.mp4")
236
  out_csv = os.path.join(tmpdir, "per_frame_metrics.csv")
 
240
  fourcc = cv2.VideoWriter_fourcc(*"mp4v")
241
  writer = cv2.VideoWriter(out_video, fourcc, fps, (width, height))
242
 
243
+ # Create face landmarker with GPU delegate
244
+ base_options_face = python.BaseOptions(
245
+ model_asset_path=face_model_path,
246
+ delegate=python.BaseOptions.Delegate.GPU
247
+ )
248
+ face_options = vision.FaceLandmarkerOptions(
249
+ base_options=base_options_face,
250
+ running_mode=vision.RunningMode.VIDEO,
251
+ num_faces=1,
252
+ min_face_detection_confidence=min_face_det_conf,
253
+ min_face_presence_confidence=min_face_track_conf,
254
+ min_tracking_confidence=min_face_track_conf,
255
+ output_face_blendshapes=False,
256
+ output_facial_transformation_matrixes=False
257
+ )
258
+
259
+ # Create pose landmarker with GPU delegate
260
+ base_options_pose = python.BaseOptions(
261
+ model_asset_path=pose_model_path,
262
+ delegate=python.BaseOptions.Delegate.GPU
263
+ )
264
+ pose_options = vision.PoseLandmarkerOptions(
265
+ base_options=base_options_pose,
266
+ running_mode=vision.RunningMode.VIDEO,
267
+ num_poses=1,
268
+ min_pose_detection_confidence=min_pose_det_conf,
269
+ min_pose_presence_confidence=min_pose_track_conf,
270
  min_tracking_confidence=min_pose_track_conf,
271
+ output_segmentation_masks=False
272
+ )
273
+
274
+ with vision.FaceLandmarker.create_from_options(face_options) as face_landmarker, \
275
+ vision.PoseLandmarker.create_from_options(pose_options) as pose_landmarker:
 
 
276
 
277
  rows = []
278
+ prev_pts = {}
279
  left_blink = BlinkState()
280
  right_blink = BlinkState()
281
 
 
288
  if max_frames and frame_idx > max_frames:
289
  break
290
 
291
+ # Convert to RGB and create MediaPipe Image
292
  frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
293
+ mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
294
+
295
+ # Timestamp in milliseconds
296
+ timestamp_ms = int((frame_idx - 1) * 1000 / fps)
297
 
298
+ # Process with new API
299
+ face_result = face_landmarker.detect_for_video(mp_image, timestamp_ms)
300
+ pose_result = pose_landmarker.detect_for_video(mp_image, timestamp_ms)
301
 
302
+ # Extract face landmarks
303
  face_pts: Dict[int, np.ndarray] = {}
304
+ face_landmarks = None
305
+ if face_result.face_landmarks:
306
+ face_landmarks = face_result.face_landmarks[0]
307
+ for i, lm in enumerate(face_landmarks):
308
+ face_pts[i] = np.array([lm.x * width, lm.y * height], dtype=np.float32)
309
 
310
+ # Calculate EAR
311
  left_ear = eye_aspect_ratio(face_pts, LEFT_EYE_EAR_IDX)
312
  right_ear = eye_aspect_ratio(face_pts, RIGHT_EYE_EAR_IDX)
313
 
314
  left_blink = update_blink(left_blink, left_ear, ear_threshold, blink_min_consec)
315
  right_blink = update_blink(right_blink, right_ear, ear_threshold, blink_min_consec)
316
 
317
+ # Extract pose landmarks
318
  pose_norm: Dict[str, Optional[np.ndarray]] = {}
319
  pose_px: Dict[str, Optional[np.ndarray]] = {}
320
+ pose_landmarks = None
321
+
322
+ if pose_result.pose_landmarks:
323
+ pose_landmarks = pose_result.pose_landmarks[0]
324
+ for name, idx in POSE_LANDMARKS.items():
325
+ if idx < len(pose_landmarks):
326
+ lm = pose_landmarks[idx]
327
+ pose_norm[name] = np.array([lm.x, lm.y], dtype=np.float32)
328
+ pose_px[name] = np.array([lm.x * width, lm.y * height], dtype=np.float32)
329
  else:
330
  pose_norm[name] = None
331
  pose_px[name] = None
332
  else:
333
+ for name in POSE_LANDMARKS:
334
  pose_norm[name] = None
335
  pose_px[name] = None
336
 
337
+ # Movement metrics
338
  def movement_metrics(key: str):
339
  cur = pose_norm.get(key)
340
  if cur is None:
 
353
  la_d, la_v = movement_metrics("left_ankle")
354
  ra_d, ra_v = movement_metrics("right_ankle")
355
 
356
+ # Joint angles
357
  def get_angle(a, b, c):
358
  if a is None or b is None or c is None:
359
  return None
 
365
  right_knee_ang = get_angle(pose_px["right_hip"], pose_px["right_knee"], pose_px["right_ankle"])
366
 
367
  # Draw overlays
368
+ draw_pose_landmarks(frame_bgr, pose_landmarks)
369
+ draw_face_landmarks(frame_bgr, face_landmarks, draw_full_mesh=draw_full_face_mesh)
370
 
371
  # HUD text
372
  hud_lines = [
373
+ f"Frame: {frame_idx}/{total_frames if total_frames>0 else '?'} FPS:{fps:.1f}",
374
  f"EAR L:{left_ear:.3f}" if left_ear is not None else "EAR L:None",
375
  f"EAR R:{right_ear:.3f}" if right_ear is not None else "EAR R:None",
376
+ f"Blinks L:{left_blink.blink_count} R:{right_blink.blink_count}",
377
  ]
378
  y0 = 24
379
  for line in hud_lines:
380
+ cv2.putText(frame_bgr, line, (12, y0), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
381
  y0 += 22
382
 
383
  writer.write(frame_bgr)
 
385
  rows.append({
386
  "frame": frame_idx,
387
  "time_s": (frame_idx - 1) / fps,
 
388
  "left_ear": left_ear,
389
  "right_ear": right_ear,
 
390
  "lw_disp": lw_d,
391
  "rw_disp": rw_d,
392
  "la_disp": la_d,
393
  "ra_disp": ra_d,
 
394
  "lw_speed": lw_v,
395
  "rw_speed": rw_v,
396
  "la_speed": la_v,
397
  "ra_speed": ra_v,
 
398
  "left_elbow_angle": left_elbow_ang,
399
  "right_elbow_angle": right_elbow_ang,
400
  "left_knee_angle": left_knee_ang,
 
413
  return {"mean": None, "min": None, "max": None}
414
  return {"mean": float(s2.mean()), "min": float(s2.min()), "max": float(s2.max())}
415
 
 
416
  summary = {
417
  "video": {
418
  "fps": float(fps),
 
458
  with open(out_json, "w", encoding="utf-8") as f:
459
  json.dump(summary, f, ensure_ascii=False, indent=2)
460
 
461
+ report_md = f"""# MediaPipe 面部+姿态分析报告 (GPU加速)
462
 
463
+ ## 视频信息
464
+ - 分辨率: {width} x {height}
465
  - FPS: {fps:.2f}
466
+ - 处理帧数: {len(df)}
467
+ - 时长: {summary["video"]["duration_s"]:.2f}
468
+
469
+ ## 眨眼分析 (EAR)
470
+ - 阈值: {ear_threshold}
471
+ - 最小连续帧: {blink_min_consec}
472
+ - 左眼眨眼: {summary["blink"]["left_blinks"]} ({summary["blink"]["left_blinks_per_min"]:.2f} /分钟)
473
+ - 右眼眨眼: {summary["blink"]["right_blinks"]} ({summary["blink"]["right_blinks_per_min"]:.2f} /分钟)
474
+ - 左眼EAR: 平均={summary["blink"]["left_ear_stats"]["mean"]} 最小={summary["blink"]["left_ear_stats"]["min"]} 最大={summary["blink"]["left_ear_stats"]["max"]}
475
+ - 右眼EAR: 平均={summary["blink"]["right_ear_stats"]["mean"]} 最小={summary["blink"]["right_ear_stats"]["min"]} 最大={summary["blink"]["right_ear_stats"]["max"]}
476
+
477
+ ## 肢体运动量 (归一化单位)
478
+ > 基于归一化坐标(0~1)计算,适合相对比较和趋势分析
479
+ - 累计位移 (数值越大=运动越多):
480
+ - 左手腕: {summary["limb_movement"]["total_disp"]["left_wrist"]:.6f}
481
+ - 右手腕: {summary["limb_movement"]["total_disp"]["right_wrist"]:.6f}
482
+ - 左脚踝: {summary["limb_movement"]["total_disp"]["left_ankle"]:.6f}
483
+ - 右脚踝: {summary["limb_movement"]["total_disp"]["right_ankle"]:.6f}
484
+
485
+ ## 输出文件
486
+ - annotated.mp4: 叠加了姿态和面部mesh的视频
487
+ - per_frame_metrics.csv: 逐帧指标
488
+ - summary.json: 统计汇总
489
+
490
+ **使用GPU加速处理 | 新版Face Landmarker API**
491
  """
492
  with open(out_report, "w", encoding="utf-8") as f:
493
  f.write(report_md)
 
500
  # -------------------------
501
  def ui_process(
502
  video,
503
+ min_face_det_conf,
504
+ min_face_track_conf,
505
  min_pose_det_conf,
506
  min_pose_track_conf,
 
507
  ear_threshold,
508
  blink_min_consec,
509
  draw_full_face_mesh,
510
  max_frames
511
  ):
 
512
  if isinstance(video, dict) and "path" in video:
513
  video_path = video["path"]
514
  else:
 
517
  try:
518
  out_video, out_csv, out_json, out_report = process_video(
519
  video_path=str(video_path),
520
+ min_face_det_conf=float(min_face_det_conf),
521
+ min_face_track_conf=float(min_face_track_conf),
522
  min_pose_det_conf=float(min_pose_det_conf),
523
  min_pose_track_conf=float(min_pose_track_conf),
 
524
  ear_threshold=float(ear_threshold),
525
  blink_min_consec=int(blink_min_consec),
526
  draw_full_face_mesh=bool(draw_full_face_mesh),
527
  max_frames=int(max_frames),
528
  )
529
 
 
530
  with open(out_report, "r", encoding="utf-8") as f:
531
  report_text = f.read()
532
 
533
  return out_video, out_csv, out_json, report_text
534
 
535
  except Exception as e:
536
+ import traceback
537
+ error_msg = f"# 处理视频时出错\n\n```\n{traceback.format_exc()}\n```"
538
  return None, None, None, error_msg
539
 
540
 
541
+ demo = gr.Blocks(title="视频姿态+面部分析 (GPU加速)")
542
 
543
  with demo:
544
+ gr.Markdown("""
545
+ ## 上传视频 → MediaPipe GPU加速 → 姿态+面部mesh追踪 + 眨眼/肢体运动分析
546
+
547
+ **特性:**
548
+ - ✅ GPU加速处理
549
+ - ✅ 新版Face Landmarker API (更精确的面部mesh)
550
+ - ✅ 眨眼检测 (EAR算法)
551
+ - ✅ 肢体运动量化
552
+ - ✅ 关节角度分析
553
+ """)
554
 
555
  with gr.Row():
556
+ video_in = gr.Video(label="上传视频")
557
 
558
+ with gr.Accordion("参数设置 (默认值通常就够用)", open=False):
559
+ gr.Markdown("### 面部检测参数")
560
+ min_face_det_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="面部检测置信度阈值")
561
+ min_face_track_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="面部追踪置信度阈值")
562
+
563
+ gr.Markdown("### 姿态检测参数")
564
+ min_pose_det_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="姿态检测置信度阈值")
565
+ min_pose_track_conf = gr.Slider(0.1, 0.9, value=0.5, step=0.05, label="姿态追踪置信度阈值")
566
 
567
+ gr.Markdown("### 眨眼检测参数")
568
+ ear_threshold = gr.Slider(0.10, 0.35, value=0.21, step=0.01, label="眨眼阈值 (EAR, 越小越严格)")
569
+ blink_min_consec = gr.Slider(1, 6, value=2, step=1, label="眨眼最小连续帧数 (抗抖动)")
570
 
571
+ gr.Markdown("### 可视化选项")
572
+ draw_full_face_mesh = gr.Checkbox(value=False, label="绘制完整面部mesh (更密集,速度较慢)")
573
+ max_frames = gr.Number(value=0, precision=0, label="最多处理帧数 (0=全部处理,调试可设300)")
574
 
575
+ run_btn = gr.Button("🚀 开始分析 (GPU加速)", variant="primary", size="lg")
576
 
577
  with gr.Row():
578
+ video_out = gr.Video(label="输出: 标注后的视频")
579
  with gr.Row():
580
+ csv_out = gr.File(label="逐帧指标CSV")
581
+ json_out = gr.File(label="汇总JSON")
582
  report_out = gr.Markdown()
583
 
584
  run_btn.click(
585
  fn=ui_process,
586
  inputs=[
587
  video_in,
588
+ min_face_det_conf,
589
+ min_face_track_conf,
590
  min_pose_det_conf,
591
  min_pose_track_conf,
 
592
  ear_threshold,
593
  blink_min_consec,
594
  draw_full_face_mesh,