Spaces:
Sleeping
Sleeping
| # Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio) | |
| # ---------------------------------------------------- | |
| # Goal (AGENTS): Build a cohesive app that: upload video -> frame timestamps -> | |
| # YOLO scoreboard detect + gray-mask background -> color feature timeseries -> | |
| # AutoGluon Tabular detector -> multi-event 4s clips in a Gradio gallery. | |
| # | |
| # Plan (AGENTS): | |
| # 1) Load YOLO weights from HF Hub; load AutoGluon Tabular predictor from HF Hub. | |
| # 2) For each (skipped) frame: YOLO infer -> gray-mask non-scoreboard parts | |
| # (keep color inside any bbox with conf>=0.85), then compute red/green features. | |
| # 3) Roll features to add z-scores/diffs. Predict with AG Tabular. | |
| # 4) Find local events with persistence + spacing; group & cut (-2s, +2s). | |
| # 5) Gradio UI: video in → gallery of clips + status text out. | |
| # | |
| # Fencing Scoreboard Clips - YOLO x AutoGluon (Gradio) | |
| import os, cv2, zipfile, shutil, tempfile, subprocess, pathlib | |
| import numpy as np, pandas as pd | |
| from typing import List, Tuple | |
| import gradio as gr | |
| # --- Patch for FastAI/AutoGluon deserialization --- | |
| import fastai.tabular.core as ftc | |
| from fastai.data.load import _FakeLoader, DataLoader | |
| class TabWeightedDL(DataLoader): | |
| "Compatibility patch for missing dataloader class in old AutoGluon FastAI models." | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| ftc.TabWeightedDL = TabWeightedDL | |
| # ===================================================== | |
| # Configuration | |
| # ===================================================== | |
| YOLO_REPO_ID = "mastefan/fencing-scoreboard-yolov8" | |
| YOLO_FILENAME = "best.pt" | |
| AG_REPO_ID = "emkessle/2024-24679-fencing-touch-predictor" | |
| AG_ZIP_NAME = "autogluon_predictor_dir.zip" | |
| FRAME_SKIP = 2 | |
| KEEP_CONF = 0.85 | |
| YOLO_CONF = 0.30 | |
| YOLO_IOU = 0.50 | |
| CLIP_PAD_S = 2.0 | |
| MIN_SEP_S = 1.2 | |
| GROUP_GAP_S = 1.5 | |
| DEBUG_MODE = False # set True to save debug images/CSVs | |
| # ===================================================== | |
| # Dependency setup | |
| # ===================================================== | |
| def _pip(pkgs): | |
| import subprocess, sys | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *pkgs]) | |
| try: | |
| from ultralytics import YOLO | |
| except: | |
| _pip(["ultralytics"]); from ultralytics import YOLO | |
| try: | |
| from autogluon.tabular import TabularPredictor | |
| except: | |
| _pip(["autogluon.tabular"]); from autogluon.tabular import TabularPredictor | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| except: | |
| _pip(["huggingface_hub"]); from huggingface_hub import hf_hub_download | |
| # ===================================================== | |
| # Model loading | |
| # ===================================================== | |
| CACHE_DIR = pathlib.Path("hf_assets"); CACHE_DIR.mkdir(exist_ok=True) | |
| def load_yolo_from_hub(): | |
| w = hf_hub_download(repo_id=YOLO_REPO_ID, filename=YOLO_FILENAME, cache_dir=CACHE_DIR) | |
| print(f"[INFO] Loaded YOLO weights from {w}") | |
| return YOLO(w) | |
| def load_autogluon_tabular_from_hub(): | |
| """Download and load AutoGluon predictor, removing any FastAI submodels.""" | |
| z = hf_hub_download(repo_id=AG_REPO_ID, filename=AG_ZIP_NAME, cache_dir=CACHE_DIR) | |
| extract_dir = CACHE_DIR / "ag_predictor_native" | |
| if extract_dir.exists(): | |
| shutil.rmtree(extract_dir) | |
| with zipfile.ZipFile(z, "r") as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| # --- delete fastai models before loading to avoid deserialization errors --- | |
| fastai_dirs = list(extract_dir.rglob("*fastai*")) | |
| for p in fastai_dirs: | |
| try: | |
| if p.is_dir(): | |
| shutil.rmtree(p) | |
| else: | |
| p.unlink() | |
| except Exception as e: | |
| print(f"[WARN] Could not remove {p}: {e}") | |
| print(f"[CLEANUP] Removed {len(fastai_dirs)} FastAI model files.") | |
| # Now load normally (no version check) | |
| from autogluon.tabular import TabularPredictor | |
| predictor = TabularPredictor.load(str(extract_dir), require_py_version_match=False) | |
| print(f"[INFO] Loaded AutoGluon predictor from {extract_dir}") | |
| return predictor | |
| _YOLO = None | |
| _AGP = None | |
| def yolo(): # lazy load | |
| global _YOLO | |
| if _YOLO is None: _YOLO = load_yolo_from_hub() | |
| return _YOLO | |
| def ag_predictor(): | |
| global _AGP | |
| if _AGP is None: _AGP = load_autogluon_tabular_from_hub() | |
| return _AGP | |
| # ===================================================== | |
| # Image + feature utilities | |
| # ===================================================== | |
| DEBUG_DIR = pathlib.Path("debug_frames"); DEBUG_DIR.mkdir(exist_ok=True) | |
| def isolate_scoreboard_color(frame_bgr: np.ndarray, | |
| conf: float = YOLO_CONF, | |
| iou: float = YOLO_IOU, | |
| keep_conf: float = KEEP_CONF, | |
| debug: bool = False, | |
| frame_id: int = None) -> np.ndarray: | |
| """ | |
| Improved version: | |
| - Choose the largest bbox among candidates meeting confidence. | |
| - Primary threshold: >= max(0.80, keep_conf) | |
| - Fallback threshold: >= (primary - 0.05) | |
| - Entire chosen bbox is restored to color; everything else is grayscale. | |
| - Rejects low-saturation ROIs (flat/neutral areas). | |
| """ | |
| H, W = frame_bgr.shape[:2] | |
| # Start fully grayscale | |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) | |
| primary_thr = max(0.85, keep_conf) | |
| fallback_thr = max(0.70, primary_thr - 0.03) | |
| chosen_box = None | |
| res = yolo().predict(frame_bgr, conf=conf, iou=iou, verbose=False) | |
| if len(res): | |
| r = res[0] | |
| if getattr(r, "boxes", None) is not None and len(r.boxes) > 0: | |
| boxes = r.boxes.xyxy.cpu().numpy() | |
| scores = r.boxes.conf.cpu().numpy() | |
| candidates = list(zip(boxes, scores)) | |
| # Prefer largest box meeting primary threshold | |
| strong = [(b, s) for (b, s) in candidates if float(s) >= primary_thr] | |
| if strong: | |
| chosen_box, _ = max(strong, key=lambda bs: (bs[0][2]-bs[0][0]) * (bs[0][3]-bs[0][1])) | |
| else: | |
| # Fallback: largest box meeting fallback threshold | |
| medium = [(b, s) for (b, s) in candidates if float(s) >= fallback_thr] | |
| if medium: | |
| chosen_box, _ = max(medium, key=lambda bs: (bs[0][2]-bs[0][0]) * (bs[0][3]-bs[0][1])) | |
| if chosen_box is not None: | |
| x1, y1, x2, y2 = [int(round(v)) for v in chosen_box] | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(W-1, x2), min(H-1, y2) | |
| if x2 > x1 and y2 > y1: | |
| # Single safeguard: reject very low-saturation ROIs | |
| roi_color = frame_bgr[y1:y2, x1:x2] | |
| if roi_color.size > 0: | |
| hsv = cv2.cvtColor(roi_color, cv2.COLOR_BGR2HSV) | |
| sat_mean = hsv[:, :, 1].mean() | |
| if sat_mean < 30: | |
| print(f"[WARN] Rejected bbox due to low saturation (mean={sat_mean:.1f})") | |
| chosen_box = None | |
| # If accepted, restore whole bbox to color | |
| if chosen_box is not None: | |
| gray[y1:y2, x1:x2] = frame_bgr[y1:y2, x1:x2] | |
| # Optional debug save | |
| if debug and frame_id is not None: | |
| dbg = gray.copy() | |
| if chosen_box is not None: | |
| x1, y1, x2, y2 = [int(round(v)) for v in chosen_box] | |
| cv2.rectangle(dbg, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| out_path = DEBUG_DIR / f"frame_{frame_id:06d}.jpg" | |
| cv2.imwrite(str(out_path), dbg) | |
| print(f"[DEBUG] Saved debug frame → {out_path}") | |
| return gray | |
| def _count_color_pixels(rgb, ch): | |
| R, G, B = rgb[:,:,0], rgb[:,:,1], rgb[:,:,2] | |
| if ch==0: mask=(R>150)&(R>1.2*G)&(R>1.2*B) | |
| else: mask=(G>100)&(G>1.05*R)&(G>1.05*B) | |
| return int(np.sum(mask)) | |
| def color_pixel_ratio(rgb,ch): return _count_color_pixels(rgb,ch)/(rgb.shape[0]*rgb.shape[1]+1e-9) | |
| def rolling_z(series, win=40): | |
| med = series.rolling(win,min_periods=5).median() | |
| mad = series.rolling(win,min_periods=5).apply(lambda x: np.median(np.abs(x-np.median(x))),raw=True) | |
| mad = mad.replace(0, mad[mad>0].min() if (mad>0).any() else 1.0) | |
| return (series-med)/mad | |
| # ===================================================== | |
| # Video feature extraction | |
| # ===================================================== | |
| def extract_feature_timeseries(video_path:str, frame_skip:int=FRAME_SKIP, debug:bool=DEBUG_MODE): | |
| cap=cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): return pd.DataFrame(),0.0 | |
| fps=cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1 | |
| print(f"[INFO] Reading {total} frames @ {fps:.2f}fps ...") | |
| if total > 1200: | |
| frame_skip = 4 | |
| elif total > 2400: | |
| frame_skip = 6 | |
| else: | |
| frame_skip = FRAME_SKIP | |
| rec,idx=[],0 | |
| while True: | |
| ret,frame=cap.read() | |
| if not ret: break | |
| if idx%frame_skip==0: | |
| ts=idx/fps | |
| masked=isolate_scoreboard_color(frame,debug=debug,frame_id=idx) | |
| rgb=cv2.cvtColor(masked,cv2.COLOR_BGR2RGB) | |
| rec.append({ | |
| "frame_id":idx,"timestamp":ts, | |
| "red_ratio":color_pixel_ratio(rgb,0), | |
| "green_ratio":color_pixel_ratio(rgb,1) | |
| }) | |
| idx+=1 | |
| cap.release() | |
| df=pd.DataFrame(rec) | |
| if df.empty: return df,fps | |
| df["red_diff"]=df["red_ratio"].diff().fillna(0) | |
| df["green_diff"]=df["green_ratio"].diff().fillna(0) | |
| df["z_red"]=rolling_z(df["red_ratio"]) | |
| df["z_green"]=rolling_z(df["green_ratio"]) | |
| print(f"[INFO] Extracted {len(df)} processed frames.") | |
| return df,fps | |
| # ===================================================== | |
| # Predictor & event logic | |
| # ===================================================== | |
| def predict_scores(df): | |
| """Predict illumination likelihoods using AutoGluon regression ensemble.""" | |
| feats = ["red_ratio", "green_ratio", "red_diff", "green_diff", "z_red", "z_green"] | |
| X = df[feats].copy() | |
| ag = ag_predictor() | |
| # Get model list (older AG versions have .model_names(), newer have .model_names) | |
| try: | |
| models_all = ag.model_names() | |
| except Exception: | |
| models_all = ag.model_names | |
| print(f"[INFO] Evaluating models: {models_all}") | |
| preds_total = [] | |
| for m in models_all: | |
| try: | |
| print(f"[INFO] → Predicting with {m}") | |
| y_pred = ag._learner.predict(X, model=m) | |
| preds_total.append(pd.Series(y_pred, name=m)) | |
| except Exception as e: | |
| print(f"[WARN] Skipping model {m}: {e}") | |
| if not preds_total: | |
| print("[ERROR] No usable models, returning zeros.") | |
| return pd.Series(np.zeros(len(df))) | |
| # Average predictions from working models | |
| y_mean = pd.concat(preds_total, axis=1).mean(axis=1) | |
| # Normalize 0–1 for consistency | |
| rng = (y_mean.quantile(0.95) - y_mean.quantile(0.05)) or 1.0 | |
| score = ((y_mean - y_mean.quantile(0.05)) / rng).clip(0, 1) | |
| print(f"[INFO] Used {len(preds_total)} valid submodels for regression scoring.") | |
| return score | |
| def pick_events(df,score,fps): | |
| z=rolling_z(score,35); strong=(z>4.0); keep=strong.rolling(3,min_periods=1).sum()>=2 | |
| min_dist=max(1,int(MIN_SEP_S*fps)) | |
| y=score.values; out=[]; last=-min_dist | |
| for i in range(1,len(y)-1): | |
| if keep.iloc[i] and y[i]>y[i-1] and y[i]>y[i+1] and (i-last)>=min_dist: | |
| out.append(float(df.iloc[i]["timestamp"])); last=i | |
| if not out and len(y)>0: out=[float(df.iloc[int(np.argmax(y))]["timestamp"])] | |
| grouped=[] | |
| for t in sorted(out): | |
| if (not grouped) or (t-grouped[-1])>GROUP_GAP_S: grouped.append(t) | |
| return grouped | |
| # ===================================================== | |
| # Clip utilities | |
| # ===================================================== | |
| def _probe_duration(video_path): | |
| try: | |
| import ffmpeg | |
| meta=ffmpeg.probe(video_path) | |
| return float(meta["format"]["duration"]) | |
| except: return 0.0 | |
| def cut_clip(video_path,start,end,out_path): | |
| try: | |
| cmd=["ffmpeg","-y","-ss",str(start),"-to",str(end),"-i",video_path,"-c","copy",out_path] | |
| sp=subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE) | |
| if sp.returncode==0 and os.path.exists(out_path): return out_path | |
| except: pass | |
| from moviepy.editor import VideoFileClip | |
| clip=VideoFileClip(video_path).subclip(start,end) | |
| clip.write_videofile(out_path,codec="libx264",audio_codec="aac",verbose=False,logger=None) | |
| return out_path | |
| def extract_score_clips(video_path:str,debug:bool=DEBUG_MODE): | |
| df,fps=extract_feature_timeseries(video_path,FRAME_SKIP,debug) | |
| if df.empty: return [],"No frames processed." | |
| score=predict_scores(df); events=pick_events(df,score,fps) | |
| print(f"[INFO] Detected {len(events)} potential events: {events}") | |
| dur=_probe_duration(video_path) or float(df["timestamp"].max()+CLIP_PAD_S+0.5) | |
| out=[]; base=os.path.splitext(os.path.basename(video_path))[0] | |
| for i,t in enumerate(events): | |
| s=max(0,t-CLIP_PAD_S); e=min(dur,t+CLIP_PAD_S) | |
| tmp=os.path.join(tempfile.gettempdir(),f"{base}_score_{i+1:02d}.mp4") | |
| print(f"[INFO] Cutting clip {i+1}: {s:.2f}s→{e:.2f}s") | |
| cut_clip(video_path,s,e,tmp) | |
| out.append((tmp,f"Touch {i+1} @ {t:.2f}s")) | |
| return out,f"✅ Detected {len(out)} event(s)." | |
| # ===================================================== | |
| # Progress GUI helpers | |
| # ===================================================== | |
| CSS = """ | |
| .gradio-container {max-width:900px;margin:auto;} | |
| .full-width{width:100%!important;} | |
| .progress-bar{width:100%;height:30px;background:#e0e0e0;border-radius:15px;margin:15px 0;position:relative;overflow:hidden;} | |
| .progress-fill{height:100%;background:#4CAF50;border-radius:15px;text-align:center;line-height:30px;color:white;font-weight:bold;transition:width .3s;} | |
| .fencer{position:absolute;top:-5px;font-size:24px;transition:left .3s;transform:scaleX(-1);} | |
| """ | |
| def _make_progress_bar(percent:int,final_text:str=None,label:str=""): | |
| text=f"{percent}%" if not final_text else final_text | |
| return f""" | |
| <div class="progress-bar"> | |
| <div id="progress-fill" class="progress-fill" style="width:{percent}%">{label} {text}</div> | |
| <div id="fencer" class="fencer" style="left:{percent}%">🤺</div> | |
| </div> | |
| """ | |
| def run_with_progress(video_file): | |
| if not video_file: | |
| yield [],"Please upload a video.",_make_progress_bar(0) | |
| return | |
| print("[GUI] Starting processing...") | |
| yield [],"🔄 Extracting frames...",_make_progress_bar(20,"","Pipeline") | |
| df,fps=extract_feature_timeseries(video_file,FRAME_SKIP,DEBUG_MODE) | |
| if df.empty: | |
| yield [],"❌ No frames processed!",_make_progress_bar(100,"No Frames ❌","Pipeline");return | |
| yield [],"🔄 YOLO masking...",_make_progress_bar(40,"","Pipeline") | |
| yield [],"🔄 Feature analysis...",_make_progress_bar(60,"","Pipeline") | |
| yield [],"🔄 Scoring...",_make_progress_bar(80,"","Pipeline") | |
| clips,msg=extract_score_clips(video_file,DEBUG_MODE) | |
| final=_make_progress_bar(100,f"Detected {len(clips)} Touches ⚡","Pipeline") | |
| print("[GUI] Finished.") | |
| yield clips,msg,final | |
| # ===================================================== | |
| # Gradio interface | |
| # ===================================================== | |
| with gr.Blocks(css=CSS,title="Fencing Scoreboard Detector") as demo: | |
| gr.Markdown("## 🤺 Fencing Score Detector\nUpload a fencing bout video and automatically detect scoreboard lights using YOLO + AutoGluon.") | |
| in_video=gr.Video(label="Upload Bout Video",elem_classes="full-width",height=400) | |
| run_btn=gr.Button("⚡ Detect Touches",elem_classes="full-width") | |
| progress_html=gr.HTML(value="",label="Progress",visible=False) | |
| status=gr.Markdown("Ready.") | |
| gallery=gr.Gallery(label="Detected Clips",columns=1,height=400,visible=False) | |
| def wrapped_run(video_file): | |
| print("[SYSTEM] User started detection.") | |
| yield [],"Processing started...",gr.update(value=_make_progress_bar(0),visible=True) | |
| for clips,msg,bar in run_with_progress(video_file): | |
| print(f"[SYSTEM] {msg}") | |
| yield gr.update(value=clips,visible=bool(clips)),msg,gr.update(value=bar,visible=True) | |
| run_btn.click(fn=wrapped_run,inputs=in_video,outputs=[gallery,status,progress_html]) | |
| if __name__=="__main__": | |
| demo.launch(debug=True) | |