mastefan commited on
Commit
10b5e29
·
verified ·
1 Parent(s): 20726c8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +30 -251
app.py CHANGED
@@ -17,9 +17,6 @@
17
  # -*- coding: utf-8 -*-
18
  # Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio)
19
 
20
- # -*- coding: utf-8 -*-
21
- # Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio)
22
-
23
  import os, sys, zipfile, shutil, subprocess, tempfile, pathlib
24
  from typing import List, Tuple
25
  import uuid
@@ -29,6 +26,14 @@ import pandas as pd
29
  import cv2
30
  import gradio as gr
31
 
 
 
 
 
 
 
 
 
32
  def _pip(pkgs: List[str]):
33
  import subprocess, sys
34
  subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *pkgs])
@@ -82,6 +87,8 @@ CACHE_DIR = pathlib.Path("hf_assets")
82
  CACHE_DIR.mkdir(parents=True, exist_ok=True)
83
 
84
  DEBUG_DIR = pathlib.Path("debug_frames")
 
 
85
  DEBUG_DIR.mkdir(exist_ok=True)
86
 
87
  # ----------------
@@ -98,8 +105,11 @@ def load_autogluon_tabular_from_hub() -> TabularPredictor:
98
  shutil.rmtree(extract_dir)
99
  with zipfile.ZipFile(z, "r") as zip_ref:
100
  zip_ref.extractall(extract_dir)
101
- return TabularPredictor.load(str(extract_dir), require_py_version_match=False)
102
-
 
 
 
103
 
104
  _YOLO = None
105
  _AG_PRED = None
@@ -162,155 +172,19 @@ def isolate_scoreboard_color(frame_bgr: np.ndarray,
162
  if x2 > x1 and y2 > y1:
163
  gray[y1:y2, x1:x2] = frame_bgr[y1:y2, x1:x2]
164
 
165
- if debug and frame_id is not None:
166
  dbg = gray.copy()
167
  if chosen_box is not None:
168
  x1, y1, x2, y2 = [int(round(v)) for v in chosen_box]
169
  cv2.rectangle(dbg, (x1,y1), (x2,y2), (0,255,0), 2)
170
  out_path = DEBUG_DIR / f"frame_{frame_id:06d}.jpg"
171
  cv2.imwrite(str(out_path), dbg)
172
- print(f"[DEBUG] Saved debug frame → {out_path}")
173
 
174
  return gray
175
 
176
- def color_pixel_ratio(rgb: np.ndarray, ch: int) -> float:
177
- R, G, B = rgb[:, :, 0], rgb[:, :, 1], rgb[:, :, 2]
178
- if ch == 0:
179
- mask = (R > 150) & (R > 1.2*G) & (R > 1.2*B)
180
- else:
181
- mask = (G > 100) & (G > 1.05*R) & (G > 1.05*B)
182
- return np.sum(mask) / (rgb.shape[0]*rgb.shape[1] + 1e-9)
183
-
184
- def rolling_z(series: pd.Series, win: int = 45) -> pd.Series:
185
- med = series.rolling(win, min_periods=5).median()
186
- mad = series.rolling(win, min_periods=5).apply(
187
- lambda x: np.median(np.abs(x - np.median(x))), raw=True
188
- )
189
- mad = mad.replace(0, mad[mad > 0].min() if (mad > 0).any() else 1.0)
190
- return (series - med) / mad
191
-
192
- # ----------------------------
193
- # Video → features
194
- # ----------------------------
195
- def extract_feature_timeseries(video_path: str,
196
- frame_skip: int = FRAME_SKIP,
197
- debug: bool = False) -> Tuple[pd.DataFrame, float]:
198
- print("[INFO] Starting frame extraction...")
199
- cap = cv2.VideoCapture(video_path)
200
- if not cap.isOpened():
201
- return pd.DataFrame(), 0.0
202
- fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
203
- records, frame_idx = [], 0
204
- total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
205
-
206
- while True:
207
- ret, frame = cap.read()
208
- if not ret:
209
- break
210
- if frame_idx % frame_skip == 0:
211
- ts = frame_idx / fps
212
- masked = isolate_scoreboard_color(frame, debug=debug, frame_id=frame_idx)
213
- rgb = cv2.cvtColor(masked, cv2.COLOR_BGR2RGB)
214
- red_ratio = color_pixel_ratio(rgb, 0)
215
- green_ratio = color_pixel_ratio(rgb, 1)
216
- records.append({
217
- "frame_id": frame_idx,
218
- "timestamp": ts,
219
- "red_ratio": red_ratio,
220
- "green_ratio": green_ratio,
221
- })
222
- frame_idx += 1
223
-
224
- cap.release()
225
- df = pd.DataFrame(records)
226
- print(f"[INFO] Processed {len(df)} frames out of {total_frames} (fps={fps:.2f})")
227
-
228
- if df.empty:
229
- return df, fps
230
-
231
- df["red_diff"] = df["red_ratio"].diff().fillna(0)
232
- df["green_diff"] = df["green_ratio"].diff().fillna(0)
233
- df["z_red"] = rolling_z(df["red_ratio"])
234
- df["z_green"] = rolling_z(df["green_ratio"])
235
-
236
- if debug:
237
- out_csv = DEBUG_DIR / f"features_{uuid.uuid4().hex}.csv"
238
- df.to_csv(out_csv, index=False)
239
- print(f"[DEBUG] Saved features CSV → {out_csv}")
240
-
241
- return df, fps
242
-
243
- # ----------------------------
244
- # Predictor + event picking
245
- # ----------------------------
246
- def predict_scores(df: pd.DataFrame) -> pd.Series:
247
- feat_cols = ["red_ratio", "green_ratio", "red_diff", "green_diff", "z_red", "z_green"]
248
- X = df[feat_cols].copy()
249
- pred = ag_predictor().predict(X)
250
- try:
251
- proba = ag_predictor().predict_proba(X)
252
- if isinstance(proba, pd.DataFrame) and (1 in proba.columns):
253
- return proba[1]
254
- except Exception:
255
- pass
256
- s = pd.Series(pred).astype(float)
257
- rng = (s.quantile(0.95) - s.quantile(0.05)) or 1.0
258
- return ((s - s.quantile(0.05)) / rng).clip(0, 1)
259
-
260
- def pick_events(df: pd.DataFrame, score: pd.Series, fps: float,
261
- min_start_guard_s: float = 1.0,
262
- guard_enable_min_duration_s: float = 6.0) -> List[float]:
263
- max_score = score.max()
264
- raw_cutoff = 0.7 * max_score if max_score > 0 else 0.4
265
- z = rolling_z(score, win=45)
266
- max_z = z.max()
267
- z_cutoff = max(2.0, 0.6 * max_z)
268
-
269
- print(f"[DEBUG] Predictor score stats: min={score.min():.3f}, max={max_score:.3f}, mean={score.mean():.3f}")
270
- print(f"[DEBUG] Adaptive thresholds: raw>{raw_cutoff:.3f}, z>{z_cutoff:.2f}")
271
-
272
- duration_est = float(df["timestamp"].max()) if not df.empty else 0.0
273
- enforce_guard = duration_est >= guard_enable_min_duration_s
274
- out_times = []
275
- min_dist_frames = max(1, int(1.0 * max(1.0, fps)))
276
- y = score.values
277
- last_kept = -min_dist_frames
278
-
279
- for i in range(1, len(y)-1):
280
- ts = float(df.iloc[i]["timestamp"])
281
- local_peak = y[i] > y[i-1] and y[i] > y[i+1]
282
- if ((z.iloc[i] > z_cutoff) or (y[i] > raw_cutoff)) and local_peak and (i - last_kept) >= min_dist_frames:
283
- if (not enforce_guard) or (ts >= min_start_guard_s):
284
- out_times.append(ts)
285
- last_kept = i
286
-
287
- if not out_times and len(y) > 0:
288
- best_idx = int(np.argmax(y))
289
- ts_best = float(df.iloc[best_idx]["timestamp"])
290
- if (not enforce_guard) or (ts_best >= min_start_guard_s):
291
- out_times = [ts_best]
292
- print(f"[DEBUG] Fallback → using global max at {ts_best:.2f}s")
293
-
294
- out_times.sort()
295
- grouped = []
296
- for t in out_times:
297
- if (not grouped) or (t - grouped[-1]) > GROUP_GAP_S:
298
- grouped.append(t)
299
- print(f"[DEBUG] Final detected events: {grouped}")
300
- return grouped
301
-
302
  # ----------------------------
303
  # Clip helpers
304
  # ----------------------------
305
- def _probe_duration(video_path: str) -> float:
306
- try:
307
- if ffmpeg is None:
308
- raise RuntimeError("ffmpeg-python not available")
309
- meta = ffmpeg.probe(video_path)
310
- return float(meta["format"]["duration"])
311
- except:
312
- return 0.0
313
-
314
  def cut_clip(video_path: str, start: float, end: float, out_path: str) -> str:
315
  try:
316
  cmd = ["ffmpeg", "-y", "-ss", str(max(0, start)), "-to", str(max(start, end)),
@@ -326,18 +200,18 @@ def cut_clip(video_path: str, start: float, end: float, out_path: str) -> str:
326
  return out_path
327
 
328
  # ----------------------------
329
- # Orchestrator
330
  # ----------------------------
331
- def extract_score_clips(video_path: str, debug: bool = False) -> Tuple[List[Tuple[str, str]], str]:
332
  print("[INFO] Running full detection pipeline...")
 
333
  df, fps = extract_feature_timeseries(video_path, frame_skip=FRAME_SKIP, debug=debug)
334
  if df.empty:
335
  return [], "No frames processed."
336
 
337
  score = predict_scores(df)
338
  if score.max() <= 1e-6:
339
- print("[WARN] Flat scores from predictor (possible YOLO miss or feature mismatch).")
340
- return [], "⚠️ No scoreboard detected or illumination scores flat. Please check video or model."
341
 
342
  events = pick_events(df, score, fps)
343
  if not events:
@@ -347,11 +221,10 @@ def extract_score_clips(video_path: str, debug: bool = False) -> Tuple[List[Tupl
347
  if duration <= 0:
348
  duration = float(df["timestamp"].max() + CLIP_PAD_S + 0.5)
349
 
350
- clips = []
351
  base = os.path.splitext(os.path.basename(video_path))[0]
352
  for i, t in enumerate(events):
353
- s = t - CLIP_PAD_S
354
- e = t + CLIP_PAD_S
355
  if s < 0:
356
  e = min(duration, e - s)
357
  s = 0
@@ -360,108 +233,14 @@ def extract_score_clips(video_path: str, debug: bool = False) -> Tuple[List[Tupl
360
  e = duration
361
  clip_path = os.path.join(tempfile.gettempdir(), f"{base}_score_{i+1:02d}.mp4")
362
  cut_clip(video_path, s, e, clip_path)
363
- label = f"Touch {i+1} @ {t:.2f}s"
364
- clips.append((clip_path, label))
365
-
366
- return clips, f"✅ Detected {len(clips)} event(s)."
367
-
368
- # ----------------------------
369
- # Gradio UI
370
- # ----------------------------
371
- CSS = """
372
- .gradio-container {max-width: 900px; margin: auto;}
373
- .header {text-align: center; margin-bottom: 20px;}
374
- .full-width {width: 100% !important;}
375
- .progress-bar {
376
- width: 100%;
377
- height: 30px;
378
- background-color: #e0e0e0;
379
- border-radius: 15px;
380
- margin: 15px 0;
381
- position: relative;
382
- overflow: hidden;
383
- }
384
- .progress-fill {
385
- height: 100%;
386
- background-color: #4CAF50;
387
- border-radius: 15px;
388
- text-align: center;
389
- line-height: 30px;
390
- color: white;
391
- font-weight: bold;
392
- transition: width 0.3s;
393
- }
394
- .fencer {
395
- position: absolute;
396
- top: -5px;
397
- font-size: 24px;
398
- transition: left 0.3s;
399
- transform: scaleX(-1);
400
- }
401
- """
402
-
403
- def _make_progress_bar(percent: int, final_text: str = None):
404
- text = f"{percent}%" if not final_text else final_text
405
- return f"""
406
- <div class="progress-bar">
407
- <div id="progress-fill" class="progress-fill" style="width:{percent}%">{text}</div>
408
- <div id="fencer" class="fencer" style="left:{percent}%">🤺</div>
409
- </div>
410
- """
411
-
412
- def run_with_progress(video_file):
413
- if not video_file:
414
- yield [], "Please upload a video file.", gr.update(visible=False)
415
- return
416
 
417
- yield [], "🔄 Extracting frames...", _make_progress_bar(20)
418
- df, fps = extract_feature_timeseries(video_file, frame_skip=FRAME_SKIP, debug=False)
419
- if df.empty:
420
- yield [], "❌ No frames processed!", _make_progress_bar(100, "No Frames ❌")
421
- return
422
-
423
- yield [], "🔄 Scoring & detecting touches...", _make_progress_bar(80)
424
- clips, status_msg = extract_score_clips(video_file, debug=False)
425
 
426
- final_bar = _make_progress_bar(
427
- 100, f"Detected {len(clips)} Touches ⚡" if clips else "No Touches"
428
- )
429
- yield clips, status_msg, final_bar
430
-
431
- with gr.Blocks(css=CSS, title="Fencing Scoreboard Detector") as demo:
432
- with gr.Row(elem_classes="header"):
433
- gr.Markdown(
434
- "## 🤺 Fencing Score Detector\n"
435
- "Upload a fencing bout video. The system detects scoreboard lights "
436
- "(YOLO + AutoGluon) and returns highlight clips around each scoring event."
437
- )
438
-
439
- in_video = gr.Video(label="Upload Bout Video", elem_classes="full-width", height=400)
440
- run_btn = gr.Button("⚡ Detect Touches", elem_classes="full-width")
441
-
442
- progress_html = gr.HTML(value="", label="Processing Progress", visible=False)
443
- status = gr.Markdown("Ready.")
444
- gallery = gr.Gallery(
445
- label="Detected Clips",
446
- columns=1,
447
- height=400,
448
- preview=True,
449
- allow_preview=True,
450
- show_download_button=True,
451
- visible=False
452
- )
453
-
454
- def wrapped_run(video_file):
455
- yield gr.update(value=[], visible=False), "Processing started...", gr.update(value=_make_progress_bar(0), visible=True)
456
- for clips, msg, bar in run_with_progress(video_file):
457
- gallery_update = gr.update(value=clips, visible=bool(clips))
458
- yield gallery_update, msg, gr.update(value=bar, visible=True)
459
-
460
- run_btn.click(
461
- fn=wrapped_run,
462
- inputs=in_video,
463
- outputs=[gallery, status, progress_html],
464
- )
465
 
466
- if __name__ == "__main__":
467
- demo.launch(debug=True)
 
17
  # -*- coding: utf-8 -*-
18
  # Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio)
19
 
 
 
 
20
  import os, sys, zipfile, shutil, subprocess, tempfile, pathlib
21
  from typing import List, Tuple
22
  import uuid
 
26
  import cv2
27
  import gradio as gr
28
 
29
+ # ----------------
30
+ # Flags
31
+ # ----------------
32
+ DEBUG_SAVE_FRAMES = False # disable debug frames by default
33
+
34
+ # ----------------
35
+ # Utility
36
+ # ----------------
37
  def _pip(pkgs: List[str]):
38
  import subprocess, sys
39
  subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *pkgs])
 
87
  CACHE_DIR.mkdir(parents=True, exist_ok=True)
88
 
89
  DEBUG_DIR = pathlib.Path("debug_frames")
90
+ if DEBUG_DIR.exists():
91
+ shutil.rmtree(DEBUG_DIR) # wipe old debug frames at startup
92
  DEBUG_DIR.mkdir(exist_ok=True)
93
 
94
  # ----------------
 
105
  shutil.rmtree(extract_dir)
106
  with zipfile.ZipFile(z, "r") as zip_ref:
107
  zip_ref.extractall(extract_dir)
108
+ return TabularPredictor.load(
109
+ str(extract_dir),
110
+ require_py_version_match=False,
111
+ require_version_match=False
112
+ )
113
 
114
  _YOLO = None
115
  _AG_PRED = None
 
172
  if x2 > x1 and y2 > y1:
173
  gray[y1:y2, x1:x2] = frame_bgr[y1:y2, x1:x2]
174
 
175
+ if DEBUG_SAVE_FRAMES and debug and frame_id is not None:
176
  dbg = gray.copy()
177
  if chosen_box is not None:
178
  x1, y1, x2, y2 = [int(round(v)) for v in chosen_box]
179
  cv2.rectangle(dbg, (x1,y1), (x2,y2), (0,255,0), 2)
180
  out_path = DEBUG_DIR / f"frame_{frame_id:06d}.jpg"
181
  cv2.imwrite(str(out_path), dbg)
 
182
 
183
  return gray
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  # ----------------------------
186
  # Clip helpers
187
  # ----------------------------
 
 
 
 
 
 
 
 
 
188
  def cut_clip(video_path: str, start: float, end: float, out_path: str) -> str:
189
  try:
190
  cmd = ["ffmpeg", "-y", "-ss", str(max(0, start)), "-to", str(max(start, end)),
 
200
  return out_path
201
 
202
  # ----------------------------
203
+ # Orchestrator (with cleanup)
204
  # ----------------------------
205
+ def extract_score_clips(video_path: str, debug: bool = False):
206
  print("[INFO] Running full detection pipeline...")
207
+ from moviepy.editor import VideoFileClip
208
  df, fps = extract_feature_timeseries(video_path, frame_skip=FRAME_SKIP, debug=debug)
209
  if df.empty:
210
  return [], "No frames processed."
211
 
212
  score = predict_scores(df)
213
  if score.max() <= 1e-6:
214
+ return [], "⚠️ No scoreboard detected or illumination scores flat."
 
215
 
216
  events = pick_events(df, score, fps)
217
  if not events:
 
221
  if duration <= 0:
222
  duration = float(df["timestamp"].max() + CLIP_PAD_S + 0.5)
223
 
224
+ clips, kept_paths = [], []
225
  base = os.path.splitext(os.path.basename(video_path))[0]
226
  for i, t in enumerate(events):
227
+ s, e = t - CLIP_PAD_S, t + CLIP_PAD_S
 
228
  if s < 0:
229
  e = min(duration, e - s)
230
  s = 0
 
233
  e = duration
234
  clip_path = os.path.join(tempfile.gettempdir(), f"{base}_score_{i+1:02d}.mp4")
235
  cut_clip(video_path, s, e, clip_path)
236
+ clips.append((clip_path, f"Touch {i+1} @ {t:.2f}s"))
237
+ kept_paths.append(clip_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238
 
239
+ # cleanup: delete unused temp clips
240
+ for f in pathlib.Path(tempfile.gettempdir()).glob(f"{base}_score_*.mp4"):
241
+ if str(f) not in kept_paths:
242
+ try: f.unlink()
243
+ except: pass
 
 
 
244
 
245
+ return clips, f"✅ Detected {len(clips)} event(s)."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246