""" app.py — Sports Observer Gradio app for Hugging Face Spaces. UI follows DESIGN.md "The Digital Observer" spec exactly: background #0b0e14 void black surface #161a21 primary workspace surface-high #1c2028 panels primary #a1ffc2 green accent secondary #00d2fd cyan accent tertiary #ff7350 orange alert on-surface #ecedf6 body text (never pure white) Space Grotesk headlines / Inter body / IBM Plex Mono data """ from __future__ import annotations import json import math import os import tempfile import traceback from collections import defaultdict, deque from pathlib import Path import cv2 import numpy as np import gradio as gr # ── Palette (BGR for OpenCV) ─────────────────────────────────────────────── PALETTE_BGR = [ (253,210,0),(194,255,161),(80,115,255),(187,212,0), (29,178,255),(134,219,61),(56,56,255),(255,115,100), (255,194,0),(49,210,207),(151,157,255),(23,204,146), (255,56,132),(31,112,255),(52,147,26),(255,56,203), (168,153,44),(200,149,255),(10,249,72),(133,0,82), ] PPM = 20.0 # pixels per metre # ══════════════════════════════════════════════════════════════════════════ # PIPELINE # ══════════════════════════════════════════════════════════════════════════ def process_video(video_path, conf, iou, show_traj, show_speed, traj_len, progress=gr.Progress()): if video_path is None: return None, None, '{"status":"waiting"}', _status("idle", "Upload a video to begin.") try: return _run(video_path, float(conf), float(iou), bool(show_traj), bool(show_speed), int(traj_len), progress) except Exception as exc: traceback.print_exc() return None, None, json.dumps({"error": str(exc)}), _status("error", str(exc)) def _find_working_fourcc(fps, W, H): """Try several codecs and return (fourcc, suffix) for the first one that works.""" import shutil candidates = [ ("avc1", ".mp4"), # H.264 — best for browsers ("H264", ".mp4"), ("X264", ".mp4"), ("mp4v", ".mp4"), # MPEG-4 fallback (needs re-encode for browser) ] for codec, ext in candidates: test_path = tempfile.mktemp(suffix=f"_test{ext}") try: fourcc = cv2.VideoWriter_fourcc(*codec) w = cv2.VideoWriter(test_path, fourcc, fps, (W, H)) if w.isOpened(): # Write a test frame to make sure it really works w.write(np.zeros((H, W, 3), dtype=np.uint8)) w.release() if Path(test_path).exists() and Path(test_path).stat().st_size > 0: Path(test_path).unlink(missing_ok=True) print(f"[Codec] Using {codec}") return fourcc, ext, codec w.release() except Exception: pass finally: Path(test_path).unlink(missing_ok=True) # absolute fallback print("[Codec] Falling back to mp4v") return cv2.VideoWriter_fourcc(*"mp4v"), ".mp4", "mp4v" def _reencode_for_browser(input_path, output_path): """Try to re-encode to H.264 with ffmpeg/ffmpeg.exe. Returns True on success.""" import subprocess, shutil # Check if ffmpeg is available ffmpeg_cmd = shutil.which("ffmpeg") if ffmpeg_cmd is None: print("[Encode] ffmpeg not found, skipping re-encode") return False try: result = subprocess.run( [ffmpeg_cmd, "-y", "-i", input_path, "-vcodec", "libx264", "-crf", "23", "-preset", "fast", "-movflags", "+faststart", output_path], capture_output=True, text=True, timeout=600, ) if result.returncode == 0 and Path(output_path).exists() and Path(output_path).stat().st_size > 0: print("[Encode] H.264 re-encode successful") return True else: print(f"[Encode] ffmpeg failed: {result.stderr[:300]}") return False except Exception as e: print(f"[Encode] ffmpeg error: {e}") return False def _run(video_path, conf, iou, show_traj, show_speed, traj_len, progress): from ultralytics import YOLO import supervision as sv # ── open video ──────────────────────────────────────────────────────── cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError("Cannot open video file.") W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = float(cap.get(cv2.CAP_PROP_FPS) or 30.0) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 1 # ── writers ─────────────────────────────────────────────────────────── fourcc, ext, codec_name = _find_working_fourcc(fps, W, H) tmp_raw = tempfile.mktemp(suffix=f"_raw{ext}") tmp_out = tempfile.mktemp(suffix="_out.mp4") writer = cv2.VideoWriter(tmp_raw, fourcc, fps, (W, H)) if not writer.isOpened(): raise RuntimeError(f"Cannot create video writer with codec {codec_name}. " "Please install ffmpeg or an H.264-capable OpenCV build.") # ── model ───────────────────────────────────────────────────────────── model = YOLO("yolov8n.pt") # ── tracker (supervision 0.21 stable API) ───────────────────────────── byte_tracker = sv.ByteTrack( track_activation_threshold=conf, lost_track_buffer=max(60, int(fps * 3)), minimum_matching_threshold=0.80, frame_rate=int(fps), ) # ── state ───────────────────────────────────────────────────────────── trajs: dict = defaultdict(lambda: deque(maxlen=traj_len)) prev_c: dict = {} speeds: dict = {} hm_acc = np.zeros((H, W), dtype=np.float32) counts = [] fi = 0 progress(0, desc="Initialising…") while True: ret, frame = cap.read() if not ret: break # detect res = model(frame, conf=conf, iou=iou, classes=[0], verbose=False)[0] dets = sv.Detections.from_ultralytics(res) # track if len(dets) > 0: tracked = byte_tracker.update_with_detections(dets) else: tracked = sv.Detections.empty() out = frame.copy() # collect active tracks active_tracks = [] if tracked.tracker_id is not None and len(tracked) > 0: for i, tid in enumerate(tracked.tracker_id): if tid is None: continue tid = int(tid) x1, y1, x2, y2 = [int(v) for v in tracked.xyxy[i]] active_tracks.append({"id": tid, "box": (x1,y1,x2,y2)}) cx, cy = (x1+x2)//2, (y1+y2)//2 trajs[tid].append((cx, cy)) if 0 <= cy < H and 0 <= cx < W: cv2.circle(hm_acc, (cx, cy), 18, 1.0, -1) # speed EMA if show_speed and tid in prev_c: d = math.hypot(cx - prev_c[tid][0], cy - prev_c[tid][1]) spd = (d / PPM) * fps * 3.6 speeds[tid] = 0.7 * speeds.get(tid, spd) + 0.3 * spd prev_c[tid] = (cx, cy) # ── draw trajectories ───────────────────────────────────────────── if show_traj: ovl = out.copy() for tid, pts_dq in trajs.items(): pts = list(pts_dq) col = PALETTE_BGR[tid % len(PALETTE_BGR)] for j in range(1, len(pts)): a = j / max(len(pts), 1) c = tuple(int(v * a) for v in col) cv2.line(ovl, pts[j-1], pts[j], c, 2, cv2.LINE_AA) cv2.addWeighted(ovl, 0.70, out, 0.30, 0, out) # ── draw boxes + labels (DESIGN.md colours) ─────────────────────── for t in active_tracks: tid = t["id"] x1, y1, x2, y2 = t["box"] # secondary #00d2fd (BGR: 253,210,0) cv2.rectangle(out, (x1,y1), (x2,y2), (253,210,0), 2) s_str = f" {speeds[tid]:.0f}km/h" if (show_speed and tid in speeds) else "" label = f"#{tid}{s_str}" fs, tk = 0.45, 1 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, tk) lx = x1 ly = max(y1 - 4, th + 6) # secondary-container #00677e (BGR: 126,103,0) cv2.rectangle(out, (lx, ly-th-4), (lx+tw+8, ly+2), (126,103,0), -1) # on-secondary-container #eefaff cv2.putText(out, label, (lx+4, ly-1), cv2.FONT_HERSHEY_SIMPLEX, fs, (255,250,238), tk, cv2.LINE_AA) # ── HUD ─────────────────────────────────────────────────────────── n = len(active_tracks) hud = f"SUBJECTS:{n:02d} FRAME:{fi:05d}" (hw, hh), _ = cv2.getTextSize(hud, cv2.FONT_HERSHEY_SIMPLEX, 0.47, 1) cv2.rectangle(out, (8,8), (hw+20, hh+16), (0,0,0), -1) # primary #a1ffc2 (BGR: 194,255,161) cv2.putText(out, hud, (13, hh+10), cv2.FONT_HERSHEY_SIMPLEX, 0.47, (194,255,161), 1, cv2.LINE_AA) writer.write(out) counts.append({"frame": fi, "count": n}) fi += 1 if fi % 25 == 0: progress(fi / total, desc=f"Frame {fi}/{total} · Subjects: {n}") cap.release() writer.release() # ── re-encode to browser-compatible H.264 if needed ─────────────────── final = tmp_raw if codec_name not in ("avc1", "H264", "X264"): # mp4v isn't browser-playable, try re-encoding with ffmpeg if _reencode_for_browser(tmp_raw, tmp_out): final = tmp_out Path(tmp_raw).unlink(missing_ok=True) else: # Last resort: serve the mp4v file as-is; Gradio may still handle it print("[Warning] Output video may not play in browser without ffmpeg. " "Install ffmpeg for best results: https://ffmpeg.org/download.html") final = tmp_raw # ── heatmap ─────────────────────────────────────────────────────────── hm_path = tempfile.mktemp(suffix="_hm.png") norm = cv2.normalize(hm_acc, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) cv2.imwrite(hm_path, cv2.applyColorMap(norm, cv2.COLORMAP_JET)) uid = len(trajs) stats = json.dumps({ "total_frames" : fi, "unique_ids" : uid, "all_track_ids" : list(trajs.keys()), "fps" : round(fps, 2), "counts_over_time": counts[-300:], }, indent=2) return ( final, hm_path, stats, _status("ok", f"Complete · {fi} frames processed · {uid} unique IDs tracked"), ) def _status(kind: str, msg: str) -> str: cfg = { "ok": ("#a1ffc2", "SYSTEM NOMINAL"), "error": ("#ff716c", "SYSTEM ERROR"), "idle": ("#45484f", "STANDBY"), } col, prefix = cfg.get(kind, cfg["idle"]) dot_anim = "animation:pulse 2s ease-in-out infinite;" if kind == "ok" else "" return f"""