# Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio) # ---------------------------------------------------- # Goal (AGENTS): Build a cohesive app that: upload video -> frame timestamps -> # YOLO scoreboard detect + gray-mask background -> color feature timeseries -> # AutoGluon Tabular detector -> multi-event 4s clips in a Gradio gallery. # # Plan (AGENTS): # 1) Load YOLO weights from HF Hub; load AutoGluon Tabular predictor from HF Hub. # 2) For each (skipped) frame: YOLO infer -> gray-mask non-scoreboard parts # (keep color inside any bbox with conf>=0.85), then compute red/green features. # 3) Roll features to add z-scores/diffs. Predict with AG Tabular. # 4) Find local events with persistence + spacing; group & cut (-2s, +2s). # 5) Gradio UI: video in → gallery of clips + status text out. # # Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio) import os, cv2, zipfile, shutil, tempfile, subprocess, pathlib import numpy as np, pandas as pd from typing import List, Tuple import gradio as gr # --- Patch for FastAI/AutoGluon deserialization --- import fastai.tabular.core as ftc from fastai.data.load import _FakeLoader, DataLoader class TabWeightedDL(DataLoader): "Compatibility patch for missing dataloader class in old AutoGluon FastAI models." def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) ftc.TabWeightedDL = TabWeightedDL # ===================================================== # Configuration # ===================================================== YOLO_REPO_ID = "mastefan/fencing-scoreboard-yolov8" YOLO_FILENAME = "best.pt" AG_REPO_ID = "emkessle/2024-24679-fencing-touch-predictor" AG_ZIP_NAME = "autogluon_predictor_dir.zip" FRAME_SKIP = 2 KEEP_CONF = 0.85 YOLO_CONF = 0.30 YOLO_IOU = 0.50 CLIP_PAD_S = 2.0 MIN_SEP_S = 1.2 GROUP_GAP_S = 1.5 DEBUG_MODE = False # set True to save debug images/CSVs # ===================================================== # Dependency setup # ===================================================== def _pip(pkgs): import subprocess, sys subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *pkgs]) try: from ultralytics import YOLO except: _pip(["ultralytics"]); from ultralytics import YOLO try: from autogluon.tabular import TabularPredictor except: _pip(["autogluon.tabular"]); from autogluon.tabular import TabularPredictor try: from huggingface_hub import hf_hub_download except: _pip(["huggingface_hub"]); from huggingface_hub import hf_hub_download # ===================================================== # Model loading # ===================================================== CACHE_DIR = pathlib.Path("hf_assets"); CACHE_DIR.mkdir(exist_ok=True) def load_yolo_from_hub(): w = hf_hub_download(repo_id=YOLO_REPO_ID, filename=YOLO_FILENAME, cache_dir=CACHE_DIR) print(f"[INFO] Loaded YOLO weights from {w}") return YOLO(w) def load_autogluon_tabular_from_hub(): """Download and load AutoGluon predictor, removing any FastAI submodels.""" z = hf_hub_download(repo_id=AG_REPO_ID, filename=AG_ZIP_NAME, cache_dir=CACHE_DIR) extract_dir = CACHE_DIR / "ag_predictor_native" if extract_dir.exists(): shutil.rmtree(extract_dir) with zipfile.ZipFile(z, "r") as zip_ref: zip_ref.extractall(extract_dir) # --- delete fastai models before loading to avoid deserialization errors --- fastai_dirs = list(extract_dir.rglob("*fastai*")) for p in fastai_dirs: try: if p.is_dir(): shutil.rmtree(p) else: p.unlink() except Exception as e: print(f"[WARN] Could not remove {p}: {e}") print(f"[CLEANUP] Removed {len(fastai_dirs)} FastAI model files.") # Now load normally (no version check) from autogluon.tabular import TabularPredictor predictor = TabularPredictor.load(str(extract_dir), require_py_version_match=False) print(f"[INFO] Loaded AutoGluon predictor from {extract_dir}") return predictor _YOLO = None _AGP = None def yolo(): # lazy load global _YOLO if _YOLO is None: _YOLO = load_yolo_from_hub() return _YOLO def ag_predictor(): global _AGP if _AGP is None: _AGP = load_autogluon_tabular_from_hub() return _AGP # ===================================================== # Image + feature utilities # ===================================================== DEBUG_DIR = pathlib.Path("debug_frames"); DEBUG_DIR.mkdir(exist_ok=True) def isolate_scoreboard_color(frame_bgr: np.ndarray, conf: float = YOLO_CONF, iou: float = YOLO_IOU, keep_conf: float = KEEP_CONF, debug: bool = False, frame_id: int = None) -> np.ndarray: """ Improved version: - Choose the largest bbox among candidates meeting confidence. - Primary threshold: >= max(0.80, keep_conf) - Fallback threshold: >= (primary - 0.05) - Entire chosen bbox is restored to color; everything else is grayscale. - Rejects low-saturation ROIs (flat/neutral areas). """ H, W = frame_bgr.shape[:2] # Start fully grayscale gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) gray = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) primary_thr = max(0.85, keep_conf) fallback_thr = max(0.70, primary_thr - 0.03) chosen_box = None res = yolo().predict(frame_bgr, conf=conf, iou=iou, verbose=False) if len(res): r = res[0] if getattr(r, "boxes", None) is not None and len(r.boxes) > 0: boxes = r.boxes.xyxy.cpu().numpy() scores = r.boxes.conf.cpu().numpy() candidates = list(zip(boxes, scores)) # Prefer largest box meeting primary threshold strong = [(b, s) for (b, s) in candidates if float(s) >= primary_thr] if strong: chosen_box, _ = max(strong, key=lambda bs: (bs[0][2]-bs[0][0]) * (bs[0][3]-bs[0][1])) else: # Fallback: largest box meeting fallback threshold medium = [(b, s) for (b, s) in candidates if float(s) >= fallback_thr] if medium: chosen_box, _ = max(medium, key=lambda bs: (bs[0][2]-bs[0][0]) * (bs[0][3]-bs[0][1])) if chosen_box is not None: x1, y1, x2, y2 = [int(round(v)) for v in chosen_box] x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(W-1, x2), min(H-1, y2) if x2 > x1 and y2 > y1: # Single safeguard: reject very low-saturation ROIs roi_color = frame_bgr[y1:y2, x1:x2] if roi_color.size > 0: hsv = cv2.cvtColor(roi_color, cv2.COLOR_BGR2HSV) sat_mean = hsv[:, :, 1].mean() if sat_mean < 30: print(f"[WARN] Rejected bbox due to low saturation (mean={sat_mean:.1f})") chosen_box = None # If accepted, restore whole bbox to color if chosen_box is not None: gray[y1:y2, x1:x2] = frame_bgr[y1:y2, x1:x2] # Optional debug save if debug and frame_id is not None: dbg = gray.copy() if chosen_box is not None: x1, y1, x2, y2 = [int(round(v)) for v in chosen_box] cv2.rectangle(dbg, (x1, y1), (x2, y2), (0, 255, 0), 2) out_path = DEBUG_DIR / f"frame_{frame_id:06d}.jpg" cv2.imwrite(str(out_path), dbg) print(f"[DEBUG] Saved debug frame → {out_path}") return gray def _count_color_pixels(rgb, ch): R, G, B = rgb[:,:,0], rgb[:,:,1], rgb[:,:,2] if ch==0: mask=(R>150)&(R>1.2*G)&(R>1.2*B) else: mask=(G>100)&(G>1.05*R)&(G>1.05*B) return int(np.sum(mask)) def color_pixel_ratio(rgb,ch): return _count_color_pixels(rgb,ch)/(rgb.shape[0]*rgb.shape[1]+1e-9) def rolling_z(series, win=40): med = series.rolling(win,min_periods=5).median() mad = series.rolling(win,min_periods=5).apply(lambda x: np.median(np.abs(x-np.median(x))),raw=True) mad = mad.replace(0, mad[mad>0].min() if (mad>0).any() else 1.0) return (series-med)/mad # ===================================================== # Video feature extraction # ===================================================== def extract_feature_timeseries(video_path:str, frame_skip:int=FRAME_SKIP, debug:bool=DEBUG_MODE): cap=cv2.VideoCapture(video_path) if not cap.isOpened(): return pd.DataFrame(),0.0 fps=cap.get(cv2.CAP_PROP_FPS) or 30.0 total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1 print(f"[INFO] Reading {total} frames @ {fps:.2f}fps ...") if total > 1200: frame_skip = 4 elif total > 2400: frame_skip = 6 else: frame_skip = FRAME_SKIP rec,idx=[],0 while True: ret,frame=cap.read() if not ret: break if idx%frame_skip==0: ts=idx/fps masked=isolate_scoreboard_color(frame,debug=debug,frame_id=idx) rgb=cv2.cvtColor(masked,cv2.COLOR_BGR2RGB) rec.append({ "frame_id":idx,"timestamp":ts, "red_ratio":color_pixel_ratio(rgb,0), "green_ratio":color_pixel_ratio(rgb,1) }) idx+=1 cap.release() df=pd.DataFrame(rec) if df.empty: return df,fps df["red_diff"]=df["red_ratio"].diff().fillna(0) df["green_diff"]=df["green_ratio"].diff().fillna(0) df["z_red"]=rolling_z(df["red_ratio"]) df["z_green"]=rolling_z(df["green_ratio"]) print(f"[INFO] Extracted {len(df)} processed frames.") return df,fps # ===================================================== # Predictor & event logic # ===================================================== def predict_scores(df): """Predict illumination likelihoods using AutoGluon regression ensemble.""" feats = ["red_ratio", "green_ratio", "red_diff", "green_diff", "z_red", "z_green"] X = df[feats].copy() ag = ag_predictor() # Get model list (older AG versions have .model_names(), newer have .model_names) try: models_all = ag.model_names() except Exception: models_all = ag.model_names print(f"[INFO] Evaluating models: {models_all}") preds_total = [] for m in models_all: try: print(f"[INFO] → Predicting with {m}") y_pred = ag._learner.predict(X, model=m) preds_total.append(pd.Series(y_pred, name=m)) except Exception as e: print(f"[WARN] Skipping model {m}: {e}") if not preds_total: print("[ERROR] No usable models, returning zeros.") return pd.Series(np.zeros(len(df))) # Average predictions from working models y_mean = pd.concat(preds_total, axis=1).mean(axis=1) # Normalize 0–1 for consistency rng = (y_mean.quantile(0.95) - y_mean.quantile(0.05)) or 1.0 score = ((y_mean - y_mean.quantile(0.05)) / rng).clip(0, 1) print(f"[INFO] Used {len(preds_total)} valid submodels for regression scoring.") return score def pick_events(df,score,fps): z=rolling_z(score,35); strong=(z>4.0); keep=strong.rolling(3,min_periods=1).sum()>=2 min_dist=max(1,int(MIN_SEP_S*fps)) y=score.values; out=[]; last=-min_dist for i in range(1,len(y)-1): if keep.iloc[i] and y[i]>y[i-1] and y[i]>y[i+1] and (i-last)>=min_dist: out.append(float(df.iloc[i]["timestamp"])); last=i if not out and len(y)>0: out=[float(df.iloc[int(np.argmax(y))]["timestamp"])] grouped=[] for t in sorted(out): if (not grouped) or (t-grouped[-1])>GROUP_GAP_S: grouped.append(t) return grouped # ===================================================== # Clip utilities # ===================================================== def _probe_duration(video_path): try: import ffmpeg meta=ffmpeg.probe(video_path) return float(meta["format"]["duration"]) except: return 0.0 def cut_clip(video_path,start,end,out_path): try: cmd=["ffmpeg","-y","-ss",str(start),"-to",str(end),"-i",video_path,"-c","copy",out_path] sp=subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) if sp.returncode==0 and os.path.exists(out_path): return out_path except: pass from moviepy.editor import VideoFileClip clip=VideoFileClip(video_path).subclip(start,end) clip.write_videofile(out_path,codec="libx264",audio_codec="aac",verbose=False,logger=None) return out_path def extract_score_clips(video_path:str,debug:bool=DEBUG_MODE): df,fps=extract_feature_timeseries(video_path,FRAME_SKIP,debug) if df.empty: return [],"No frames processed." score=predict_scores(df); events=pick_events(df,score,fps) print(f"[INFO] Detected {len(events)} potential events: {events}") dur=_probe_duration(video_path) or float(df["timestamp"].max()+CLIP_PAD_S+0.5) out=[]; base=os.path.splitext(os.path.basename(video_path))[0] for i,t in enumerate(events): s=max(0,t-CLIP_PAD_S); e=min(dur,t+CLIP_PAD_S) tmp=os.path.join(tempfile.gettempdir(),f"{base}_score_{i+1:02d}.mp4") print(f"[INFO] Cutting clip {i+1}: {s:.2f}s→{e:.2f}s") cut_clip(video_path,s,e,tmp) out.append((tmp,f"Touch {i+1} @ {t:.2f}s")) return out,f"✅ Detected {len(out)} event(s)." # ===================================================== # Progress GUI helpers # ===================================================== CSS = """ .gradio-container {max-width:900px;margin:auto;} .full-width{width:100%!important;} .progress-bar{width:100%;height:30px;background:#e0e0e0;border-radius:15px;margin:15px 0;position:relative;overflow:hidden;} .progress-fill{height:100%;background:#4CAF50;border-radius:15px;text-align:center;line-height:30px;color:white;font-weight:bold;transition:width .3s;} .fencer{position:absolute;top:-5px;font-size:24px;transition:left .3s;transform:scaleX(-1);} """ def _make_progress_bar(percent:int,final_text:str=None,label:str=""): text=f"{percent}%" if not final_text else final_text return f"""
""" def run_with_progress(video_file): if not video_file: yield [],"Please upload a video.",_make_progress_bar(0) return print("[GUI] Starting processing...") yield [],"🔄 Extracting frames...",_make_progress_bar(20,"","Pipeline") df,fps=extract_feature_timeseries(video_file,FRAME_SKIP,DEBUG_MODE) if df.empty: yield [],"❌ No frames processed!",_make_progress_bar(100,"No Frames ❌","Pipeline");return yield [],"🔄 YOLO masking...",_make_progress_bar(40,"","Pipeline") yield [],"🔄 Feature analysis...",_make_progress_bar(60,"","Pipeline") yield [],"🔄 Scoring...",_make_progress_bar(80,"","Pipeline") clips,msg=extract_score_clips(video_file,DEBUG_MODE) final=_make_progress_bar(100,f"Detected {len(clips)} Touches ⚡","Pipeline") print("[GUI] Finished.") yield clips,msg,final # ===================================================== # Gradio interface # ===================================================== with gr.Blocks(css=CSS,title="Fencing Scoreboard Detector") as demo: gr.Markdown("## 🤺 Fencing Score Detector\nUpload a fencing bout video and automatically detect scoreboard lights using YOLO + AutoGluon.") in_video=gr.Video(label="Upload Bout Video",elem_classes="full-width",height=400) run_btn=gr.Button("⚡ Detect Touches",elem_classes="full-width") progress_html=gr.HTML(value="",label="Progress",visible=False) status=gr.Markdown("Ready.") gallery=gr.Gallery(label="Detected Clips",columns=1,height=400,visible=False) def wrapped_run(video_file): print("[SYSTEM] User started detection.") yield [],"Processing started...",gr.update(value=_make_progress_bar(0),visible=True) for clips,msg,bar in run_with_progress(video_file): print(f"[SYSTEM] {msg}") yield gr.update(value=clips,visible=bool(clips)),msg,gr.update(value=bar,visible=True) run_btn.click(fn=wrapped_run,inputs=in_video,outputs=[gallery,status,progress_html]) if __name__=="__main__": demo.launch(debug=True)