| """FUT World Cup Coach — custom frontend on gradio.Server (Off-Brand quest). |
| |
| A bespoke football-stadium HTML/JS frontend (index.html) talks to Gradio's backend engine: |
| @app.api() runs RF-DETR detection on ZeroGPU, renders the analysed video, and calls the |
| Nemotron-VL coach on Modal (MiniCPM-V on ZeroGPU as the local fallback) — all behind |
| Gradio's queue + gradio_client, hosted on a ZeroGPU Space. |
| """ |
| import glob |
| import json |
| import os |
| import shutil |
| import subprocess |
|
|
| from gradio import Server |
| from gradio.data_classes import FileData |
| from fastapi.responses import HTMLResponse, FileResponse, JSONResponse |
| import spaces |
| from huggingface_hub import hf_hub_download |
|
|
| from futheros import pipeline as P |
| from futheros import coach as C |
| from futheros.render import render, pose_card_img |
|
|
| HF_MODEL = "build-small-hackathon/kicky-ai-rfdetr-seg" |
| RENDER_DIR = "/tmp/app_renders"; os.makedirs(RENDER_DIR, exist_ok=True) |
| HERE = os.path.dirname(os.path.abspath(__file__)) |
| GT = json.load(open(os.path.join(HERE, "labels/goal_labels.json"))) |
| LEG = json.load(open(os.path.join(HERE, "labels/leg_labels.json"))) |
| EXAMPLES = [os.path.basename(p) for p in sorted(glob.glob(os.path.join(HERE, "examples/*.mp4")))] |
|
|
| app = Server() |
| |
| |
| |
| _CKPT = hf_hub_download(HF_MODEL, "checkpoint_best_ema.pth") |
| try: |
| _MODEL = P.rfdetr_model(_CKPT, "small") |
| except Exception as _e: |
| print("eager detector build failed, will lazy-load:", _e, flush=True); _MODEL = None |
|
|
|
|
| def _model(): |
| global _MODEL |
| if _MODEL is None: |
| _MODEL = P.rfdetr_model(_CKPT, "small") |
| return _MODEL |
|
|
|
|
| @spaces.GPU(duration=120) |
| def _detect(path): |
| return P.detect_rfdetr(path, _model()) |
|
|
|
|
| |
| |
| MINICPM_REPO = "openbmb/MiniCPM-V-4.6" |
| _MINICPM = None |
| |
| |
| try: |
| from huggingface_hub import snapshot_download |
| snapshot_download(MINICPM_REPO) |
| except Exception as _e: |
| print("MiniCPM prefetch skipped:", _e, flush=True) |
|
|
|
|
| def _minicpm(): |
| global _MINICPM |
| if _MINICPM is None: |
| import torch |
| from transformers import AutoModelForImageTextToText, AutoProcessor |
| proc = AutoProcessor.from_pretrained(MINICPM_REPO, trust_remote_code=True) |
| mdl = AutoModelForImageTextToText.from_pretrained( |
| MINICPM_REPO, torch_dtype=torch.bfloat16, trust_remote_code=True).eval() |
| _MINICPM = (mdl, proc) |
| return _MINICPM |
|
|
|
|
| @spaces.GPU(duration=120) |
| def _coach_offline(frame_paths, sys_text, user_text): |
| import torch |
| mdl, proc = _minicpm(); mdl = mdl.to("cuda") |
| content = [{"type": "image", "url": os.path.abspath(p)} for p in frame_paths] |
| content.append({"type": "text", "text": user_text}) |
| messages = [{"role": "system", "content": [{"type": "text", "text": sys_text}]}, |
| {"role": "user", "content": content}] |
| inputs = proc.apply_chat_template( |
| messages, tokenize=True, add_generation_prompt=True, return_dict=True, |
| return_tensors="pt", downsample_mode="16x", max_slice_nums=9).to("cuda") |
| with torch.no_grad(): |
| gen = mdl.generate(**inputs, downsample_mode="16x", max_new_tokens=320, do_sample=False) |
| trimmed = [o[len(i):] for i, o in zip(inputs.input_ids, gen)] |
| return proc.batch_decode(trimmed, skip_special_tokens=True)[0] |
|
|
|
|
| |
| |
| _PENDING = {} |
|
|
|
|
| @app.api() |
| def analyse(video: FileData, hint: str = "") -> dict: |
| """RF-DETR detect (ZeroGPU) -> goal physics + pose + speed -> rendered video + stats. |
| Returns fast; the AI coach runs separately via /coach so it never blocks the results.""" |
| src = video["path"] |
| |
| name = os.path.splitext(os.path.basename(hint or src))[0] or "clip" |
| path = f"/tmp/in_{name}_{os.getpid()}.mp4" |
| shutil.copyfile(src, path) |
| tr = _detect(path) |
| res = P.analyse(tr, path) |
| out = f"{RENDER_DIR}/{name}.mp4" |
| try: |
| render(path, tr, res, out) |
| video_out = out |
| except Exception as e: |
| print("render failed, falling back to raw clip:", e, flush=True) |
| video_out = path |
| pose = pose_card_img(path, tr, res, f"{RENDER_DIR}/{name}_pose.png") |
| |
| _PENDING[name] = {"payload": C.build_payload(res), |
| "frames_b64": C.key_frames_b64(path, res), |
| "frame_paths": C.key_frame_paths(path, res, RENDER_DIR)} |
| return { |
| "key": name, |
| "video": FileData(path=video_out), |
| "pose": FileData(path=pose) if pose else None, |
| "goal": res.goal, |
| "goal_time": round(res.goal_time, 1) if res.goal_time else None, |
| "foot": res.foot, |
| "speed": round(res.speed_kmh) if res.speed_kmh else None, |
| "gt_goal": GT.get(f"5/{name}.mp4"), "gt_leg": LEG.get(f"5/{name}.mp4"), |
| } |
|
|
|
|
| @app.api() |
| def coach(key: str, mode: str = "online") -> dict: |
| """Run the AI coach for an earlier /analyse result. |
| mode="online" -> Nemotron-Nano-VL on Modal · mode="offline" -> MiniCPM-V-4.6 on this Space's GPU.""" |
| d = _PENDING.get(key) |
| if d is None: |
| return {"coach": None, "coach_model": None} |
| if mode == "offline": |
| coach_model = "MiniCPM-V-4.6 · OpenBMB · on-Space GPU (ZeroGPU)" |
| try: |
| coaching = C._clean(_coach_offline(d["frame_paths"], C.SYS, C.user_msg(d["payload"]))) |
| except Exception as e: |
| print("offline coach failed:", e, flush=True); coaching = None |
| else: |
| coach_model = "Nemotron-Nano-VL · online (Modal GPU)" |
| try: |
| coaching = C.get_coaching(d["payload"], d["frames_b64"]) |
| except C.CoachUnavailable: |
| coaching = None |
| return {"coach": coaching, "coach_model": coach_model} |
|
|
|
|
| @app.get("/api/examples") |
| async def api_examples(): |
| return JSONResponse(EXAMPLES) |
|
|
|
|
| @app.get("/examples/{name}") |
| async def serve_example(name: str): |
| path = os.path.join(HERE, "examples", os.path.basename(name)) |
| if not os.path.isfile(path): |
| return JSONResponse({"error": "not found"}, status_code=404) |
| return FileResponse(path, media_type="video/mp4") |
|
|
|
|
| @app.get("/thumb/{name}") |
| async def thumb(name: str): |
| src = os.path.join(HERE, "examples", os.path.basename(name)) |
| if not os.path.isfile(src): |
| return JSONResponse({"error": "not found"}, status_code=404) |
| out = f"/tmp/thumb_{os.path.basename(name)}.jpg" |
| if not os.path.exists(out): |
| subprocess.run(["ffmpeg", "-y", "-v", "error", "-ss", "1.2", "-i", src, |
| "-vframes", "1", "-vf", "scale=360:-1", out], check=False) |
| return FileResponse(out, media_type="image/jpeg") |
|
|
|
|
| @app.get("/") |
| async def home(): |
| with open(os.path.join(HERE, "index.html"), encoding="utf-8") as f: |
| return HTMLResponse(f.read(), headers={"Cache-Control": "no-store, max-age=0"}) |
|
|
|
|
| app.launch(show_error=True, allowed_paths=[RENDER_DIR, "/tmp"]) |
|
|