from __future__ import annotations import math import os import sys import time import warnings from functools import lru_cache from pathlib import Path from typing import Optional import cv2 import numpy as np import gradio as gr warnings.filterwarnings("ignore") # ── ZeroGPU / spaces compatibility ─────────────────────────────────────────── try: import spaces # Hugging Face ZeroGPU HF_SPACES = True except ImportError: HF_SPACES = False # Shim so we can write @spaces.GPU unconditionally below class _FakeSpaces: @staticmethod def GPU(fn=None, duration=60): if fn is not None: return fn def decorator(f): return f return decorator spaces = _FakeSpaces() # type: ignore # ── Device ──────────────────────────────────────────────────────────────────── try: import torch _ncpu = os.cpu_count() or 4 torch.set_num_threads(_ncpu) torch.set_num_interop_threads(max(1, _ncpu // 2)) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" except ImportError: sys.exit("[FATAL] pip install torch") # ── Env-tunable constants ───────────────────────────────────────────────────── YOLO_MODEL = os.getenv("YOLO_MODEL", "best (4).pt") DEPTH_MODEL = os.getenv("DEPTH_MODEL", "depth-anything/Depth-Anything-V2-Metric-Indoor-Small-hf") DEPTH_SIZE = int(os.getenv("DEPTH_SIZE", "308")) SNAP_DIR = Path(os.getenv("SNAP_DIR", "snapshots")) SNAP_DIR.mkdir(exist_ok=True) # ══════════════════════════════════════════════════════════════════════════════ # Model singletons — loaded exactly once per worker process # ══════════════════════════════════════════════════════════════════════════════ @lru_cache(maxsize=1) def _get_yolo(): """Load YOLO-seg once; subsequent calls return the cached instance.""" try: from ultralytics import YOLO except ImportError: raise RuntimeError("pip install ultralytics") print(f"[model] Loading YOLO {YOLO_MODEL} …", flush=True) model = YOLO(YOLO_MODEL) print("[model] YOLO ready.", flush=True) return model @lru_cache(maxsize=1) def _get_depth(): """Load Depth-Anything-V2 once; subsequent calls return the cached instance.""" os.environ.setdefault("TRANSFORMERS_NO_TF", "1") os.environ.setdefault("TRANSFORMERS_NO_FLAX", "1") try: from transformers import pipeline as hf_pipeline except ImportError: raise RuntimeError("pip install transformers accelerate") print(f"[model] Loading Depth {DEPTH_MODEL.split('/')[-1]} …", flush=True) pipe = hf_pipeline("depth-estimation", model=DEPTH_MODEL, device=DEVICE) print("[model] Depth ready.", flush=True) return pipe # ── Names helper ────────────────────────────────────────────────────────────── @lru_cache(maxsize=1) def _class_names() -> dict[int, str]: return {int(k): str(v) for k, v in _get_yolo().names.items()} # ══════════════════════════════════════════════════════════════════════════════ # Unit formatting # ══════════════════════════════════════════════════════════════════════════════ def _fmt_len(m: float) -> str: if m <= 0: return "—" if m < 0.01: return f"{m * 1000:.1f} mm" if m < 1.0: return f"{m * 100:.2f} cm" return f"{m:.3f} m" def _fmt_vol(m3: float) -> str: if m3 <= 0: return "—" ml = m3 * 1e6 if ml < 1000: return f"{ml:.1f} ml" if ml < 1e6: return f"{ml / 1000:.3f} L" return f"{m3:.4f} m³" # ══════════════════════════════════════════════════════════════════════════════ # Depth inference helper # ══════════════════════════════════════════════════════════════════════════════ def _depth_infer(frame_bgr: np.ndarray) -> np.ndarray: """ Run depth model on *frame_bgr*. Returns float32 depth map in metres, same (H, W) as input. """ from PIL import Image as PILImage h, w = frame_bgr.shape[:2] small = cv2.resize(frame_bgr, (DEPTH_SIZE, DEPTH_SIZE)) rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB) out = _get_depth()(PILImage.fromarray(rgb)) d_small = np.array(out["depth"], dtype=np.float32) return cv2.resize(d_small, (w, h), interpolation=cv2.INTER_LINEAR) def _depth_sample(depth: np.ndarray, mask: np.ndarray, pct: float = 25.0) -> float: """ Robust depth estimate inside mask. 25th-percentile suppresses background bleed at mask edges. """ vals = depth[mask > 0] if not len(vals): return 0.0 lo, hi = np.percentile(vals, [5, 95]) vals = vals[(vals >= lo) & (vals <= hi)] return float(np.percentile(vals, pct)) if len(vals) else 0.0 # ══════════════════════════════════════════════════════════════════════════════ # Cylinder geometry # ══════════════════════════════════════════════════════════════════════════════ def _ellipse_from_mask(mask: np.ndarray): """Return (cx, cy, major_px, minor_px, angle_deg) or None.""" cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not cnts: return None cnt = max(cnts, key=cv2.contourArea) if len(cnt) < 5: return None try: (cx, cy), (ma, mi), angle = cv2.fitEllipse(cnt) return cx, cy, max(ma, mi), min(ma, mi), angle except cv2.error: return None def _measure_cylinder( mask: np.ndarray, box: list, depth_map: np.ndarray, fy: float, aspect_thresh: float, ) -> dict: ys, xs = np.where(mask > 0) if len(xs) < 5: return {} pts = np.column_stack([xs, ys]).astype(np.float32) (cx, cy), (rw, rh), _ = cv2.minAreaRect(pts) rw, rh = max(rw, rh), min(rw, rh) # rw = long axis rect_pts = cv2.boxPoints( cv2.minAreaRect(pts)).astype(np.int32) ell = _ellipse_from_mask(mask) x1, y1, x2, y2 = [int(v) for v in box] aspect = (y2 - y1) / max(x2 - x1, 1) orientation = "upright" if aspect >= aspect_thresh else "on-side" H_px = rw D_px = rh if ell: D_px = min(D_px, ell[3]) # use minor axis of ellipse as tighter D estimate Z = _depth_sample(depth_map, mask, pct=25.0) res: dict = dict( cx=int(cx), cy=int(cy), H_px=H_px, D_px=D_px, Z_m=Z, orientation=orientation, aspect=aspect, rect_pts=rect_pts, ellipse=ell, ) if Z > 0.02 and fy > 0: H_m = (H_px / fy) * Z D_m = (D_px / fy) * Z R_m = D_m / 2.0 V_m3 = math.pi * R_m ** 2 * H_m res.update( H_m=H_m, D_m=D_m, R_m=R_m, V_m3=V_m3, H_str=_fmt_len(H_m), D_str=_fmt_len(D_m), V_str=_fmt_vol(V_m3), ) else: res.update( H_m=None, D_m=None, V_m3=None, H_str=f"{int(H_px)} px", D_str=f"{int(D_px)} px", V_str="need depth", ) return res # ══════════════════════════════════════════════════════════════════════════════ # Drawing # ══════════════════════════════════════════════════════════════════════════════ _PAL: dict[int, tuple] = {} def _colour(cls: int) -> tuple: if cls not in _PAL: np.random.seed(cls * 17 + 5) _PAL[cls] = tuple(int(v) for v in np.random.randint(80, 235, 3)) return _PAL[cls] def _annotate( frame: np.ndarray, dets: list, names: dict, mask_alpha: float = 0.35, ) -> np.ndarray: """Draw masks, bounding boxes, ellipses, and measurement overlays.""" h, w = frame.shape[:2] fs = max(0.40, w / 1800) for d in dets: c = _colour(d["cls"]) x1, y1, x2, y2 = (int(v) for v in d["box"]) mask = d.get("mask") meas = d.get("meas") or {} # ── segmentation mask + contour ─────────────────────────────── if mask is not None: overlay = np.zeros_like(frame) overlay[mask > 0] = c cv2.addWeighted(overlay, mask_alpha, frame, 1.0, 0, frame) cnts, _ = cv2.findContours( mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) cv2.drawContours(frame, cnts, -1, c, 1) # ── oriented bounding rect ──────────────────────────────────── if "rect_pts" in meas: cv2.drawContours(frame, [meas["rect_pts"]], 0, c, 2) # ── fitted ellipse (yellow) ─────────────────────────────────── if "ellipse" in meas and meas["ellipse"]: ecx, ecy, ema, emi, ean = meas["ellipse"] cv2.ellipse(frame, (int(ecx), int(ecy)), (max(1, int(ema / 2)), max(1, int(emi / 2))), ean, 0, 360, (255, 255, 60), 1) # ── bbox ────────────────────────────────────────────────────── cv2.rectangle(frame, (x1, y1), (x2, y2), c, 1) # ── label: class + track-id + conf ──────────────────────────── name = names.get(d["cls"], str(d["cls"])) ori = meas.get("orientation", "") tid = d.get("tid", "?") label = f"{name}#{tid} {d.get('best', d['conf']):.0%} [{ori}]" (lw, lh), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, 1) ly = max(y1 - 4, lh + 4) cv2.rectangle(frame, (x1, ly - lh - 4), (x1 + lw + 4, ly + 2), c, -1) cv2.putText(frame, label, (x1 + 2, ly - 2), cv2.FONT_HERSHEY_SIMPLEX, fs, (255, 255, 255), 1, cv2.LINE_AA) # ── measurement block ───────────────────────────────────────── if meas: cx_ = meas.get("cx", (x1 + x2) // 2) cy_ = meas.get("cy", (y1 + y2) // 2) Z = meas.get("Z_m", 0) lines = [ ("H: " + meas.get("H_str", "—"), (0, 220, 255)), ("D: " + meas.get("D_str", "—"), (255, 200, 0)), ("V: " + meas.get("V_str", "—"), (0, 255, 140)), (f"Z: {Z:.2f} m" if Z else "Z: —", (180, 180, 180)), ] for i, (ln, col_txt) in enumerate(lines): (tw, th), _ = cv2.getTextSize( ln, cv2.FONT_HERSHEY_SIMPLEX, fs * 0.82, 1) tx = max(0, min(w - tw - 6, cx_ - tw // 2)) ty = min(h - 6, cy_ + (i + 1) * (th + 6)) cv2.rectangle(frame, (tx - 2, ty - th - 2), (tx + tw + 2, ty + 2), (12, 12, 12), -1) cv2.putText(frame, ln, (tx, ty - 1), cv2.FONT_HERSHEY_SIMPLEX, fs * 0.82, col_txt, 1, cv2.LINE_AA) return frame # ══════════════════════════════════════════════════════════════════════════════ # Core inference — single frame # ══════════════════════════════════════════════════════════════════════════════ def _run_frame( frame_bgr: np.ndarray, fy: float, conf: float, iou: float, imgsz: int, aspect_thresh: float, mask_alpha: float, ) -> tuple[np.ndarray, list[dict]]: """ Full pipeline on one BGR frame. Returns (annotated_BGR, list_of_measurement_dicts). """ yolo = _get_yolo() names = _class_names() h, w = frame_bgr.shape[:2] # ── YOLO segmentation ───────────────────────────────────────────── results = yolo.predict( source=frame_bgr, conf=conf, iou=iou, imgsz=imgsz, verbose=False )[0] dets: list[dict] = [] if results.boxes is not None and len(results.boxes): for i, (box, conf_v, cls) in enumerate(zip( results.boxes.xyxy.cpu().numpy(), results.boxes.conf.cpu().numpy(), results.boxes.cls.cpu().numpy().astype(int), )): d: dict = {"box": box.tolist(), "conf": float(conf_v), "cls": int(cls), "mask": None, "tid": i + 1, "best": float(conf_v)} if results.masks is not None: try: raw = results.masks.data[i].cpu().numpy() rsz = cv2.resize(raw, (w, h), interpolation=cv2.INTER_NEAREST) d["mask"] = (rsz > 0.5).astype(np.uint8) except Exception: pass dets.append(d) # ── Depth estimation ────────────────────────────────────────────── depth_map: Optional[np.ndarray] = None if dets: depth_map = _depth_infer(frame_bgr) # ── Per-object cylinder measurement ────────────────────────────── records: list[dict] = [] for d in dets: if d.get("mask") is not None and depth_map is not None: meas = _measure_cylinder( d["mask"], d["box"], depth_map, fy, aspect_thresh) else: meas = {} d["meas"] = meas records.append({ "class": names.get(d["cls"], str(d["cls"])), "id": d["tid"], "conf": f"{d['conf']:.0%}", "orientation": meas.get("orientation", "—"), "H": meas.get("H_str", "—"), "D": meas.get("D_str", "—"), "Volume": meas.get("V_str", "—"), "Z (m)": f"{meas['Z_m']:.2f}" if meas.get("Z_m") else "—", }) # ── Annotate frame ──────────────────────────────────────────────── annotated = _annotate(frame_bgr.copy(), dets, names, mask_alpha) return annotated, records # ══════════════════════════════════════════════════════════════════════════════ # Gradio callbacks — decorated with @spaces.GPU for ZeroGPU Spaces # ══════════════════════════════════════════════════════════════════════════════ @spaces.GPU(duration=60) def infer_image( image_rgb: np.ndarray, fy: float, vfov: float, conf: float, iou: float, imgsz: int, aspect_thresh: float, mask_alpha: float, ) -> tuple: """ Gradio image tab callback. Gradio passes RGB numpy arrays; OpenCV works in BGR internally. """ if image_rgb is None: return None, [] # Auto-compute fy if not overridden if fy <= 0: h = image_rgb.shape[0] fy = h / (2.0 * math.tan(math.radians(vfov / 2.0))) frame_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) ann_bgr, records = _run_frame( frame_bgr, fy, conf, iou, imgsz, aspect_thresh, mask_alpha) ann_rgb = cv2.cvtColor(ann_bgr, cv2.COLOR_BGR2RGB) return ann_rgb, records @spaces.GPU(duration=120) def infer_video( video_path: str, fy: float, vfov: float, conf: float, iou: float, imgsz: int, aspect_thresh: float, mask_alpha: float, stride: int, ) -> tuple: """ Process a video file; return (annotated_video_path, aggregated_records). *stride* — process every Nth frame (reduces latency; HF free tier is CPU). Depth runs on every processed frame; YOLO on every processed frame. """ if not video_path: return None, [] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, [{"error": "Cannot open video"}] fps_src = cap.get(cv2.CAP_PROP_FPS) or 25.0 fw = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) fh = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if fy <= 0: fy = fh / (2.0 * math.tan(math.radians(vfov / 2.0))) out_path = str(SNAP_DIR / f"out_{int(time.time())}.mp4") fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(out_path, fourcc, fps_src, (fw, fh)) all_records: list[dict] = [] frame_idx = 0 last_ann = None while True: ret, frame_bgr = cap.read() if not ret: break frame_idx += 1 if frame_idx % stride == 0: ann_bgr, records = _run_frame( frame_bgr, fy, conf, iou, imgsz, aspect_thresh, mask_alpha) last_ann = ann_bgr for r in records: r["frame"] = frame_idx all_records.extend(records) else: # Reuse last annotation for non-processed frames (saves compute) ann_bgr = last_ann if last_ann is not None else frame_bgr writer.write(ann_bgr) cap.release() writer.release() return out_path, all_records # ══════════════════════════════════════════════════════════════════════════════ # Gradio UI # ══════════════════════════════════════════════════════════════════════════════ DESCRIPTION = """ # 🔭 Cylinder Volume Estimator **YOLO-seg** detects objects → **Depth Anything V2** measures metric depth → **Pinhole geometry** computes real-world Height, Diameter & Volume. `V = π × (D/2)² × H`      `H = (H_px / fy) × Z`      `D = (W_px / fy) × Z` > **Tip:** For best accuracy set *Focal length fy* from your camera spec, > or use the VFOV slider for auto-estimation. """ def _sidebar(): """Shared camera / model controls.""" with gr.Accordion("📷 Camera & Model Settings", open=True): fy = gr.Number(label="Focal length fy (px) — 0 = auto from VFOV", value=0, minimum=0, maximum=5000, step=10, info="Camera focal length in pixels. 0 = auto-estimated.") vfov = gr.Slider(30, 120, value=60, step=1, label="Vertical FOV (°) — used when fy = 0") conf = gr.Slider(0.20, 0.90, value=0.40, step=0.05, label="YOLO confidence threshold") iou = gr.Slider(0.20, 0.80, value=0.45, step=0.05, label="YOLO NMS IoU threshold") imgsz = gr.Dropdown([160, 224, 320, 416, 512], value=320, label="YOLO inference size (px) — lower = faster") asp = gr.Slider(0.8, 3.0, value=1.2, step=0.1, label="Aspect threshold (H/W upright vs on-side)") alpha = gr.Slider(0.10, 0.70, value=0.35, step=0.05, label="Mask overlay alpha") return fy, vfov, conf, iou, imgsz, asp, alpha with gr.Blocks( title="Cylinder Volume Estimator", theme=gr.themes.Base( primary_hue="cyan", secondary_hue="slate", font=[gr.themes.GoogleFont("IBM Plex Mono"), "monospace"], ), ) as demo: gr.Markdown(DESCRIPTION) with gr.Tabs(): # ── Image tab ───────────────────────────────────────────────────────── with gr.Tab("🖼️ Image"): with gr.Row(): with gr.Column(scale=1): img_in = gr.Image(label="Input image", type="numpy", sources=["upload", "webcam"]) # Camera / model controls i_fy, i_vfov, i_conf, i_iou, i_imgsz, i_asp, i_alpha = _sidebar() img_btn = gr.Button("▶ Estimate", variant="primary", size="lg") with gr.Column(scale=1): img_out = gr.Image(label="Annotated output", type="numpy") img_tbl = gr.DataFrame( label="Measurements", headers=["class","id","conf","orientation", "H","D","Volume","Z (m)"], wrap=True, ) img_btn.click( fn=infer_image, inputs=[img_in, i_fy, i_vfov, i_conf, i_iou, i_imgsz, i_asp, i_alpha], outputs=[img_out, img_tbl], api_name="infer_image", ) # ── Video tab ───────────────────────────────────────────────────────── with gr.Tab("🎬 Video"): with gr.Row(): with gr.Column(scale=1): vid_in = gr.Video(label="Input video") v_fy, v_vfov, v_conf, v_iou, v_imgsz, v_asp, v_alpha = _sidebar() v_stride = gr.Slider(1, 10, value=4, step=1, label="Process every N frames (stride)") vid_btn = gr.Button("▶ Process video", variant="primary", size="lg") gr.Markdown( "> ⚠️ Free-tier Spaces run on CPU — keep videos short (<30 s) " "or increase stride to reduce latency." ) with gr.Column(scale=1): vid_out = gr.Video(label="Annotated video") vid_tbl = gr.DataFrame( label="Measurements (all frames)", headers=["frame","class","id","conf","orientation", "H","D","Volume","Z (m)"], wrap=True, ) vid_btn.click( fn=infer_video, inputs=[vid_in, v_fy, v_vfov, v_conf, v_iou, v_imgsz, v_asp, v_alpha, v_stride], outputs=[vid_out, vid_tbl], api_name="infer_video", ) gr.Markdown(""" --- **Formula reference** | Symbol | Meaning | |--------|---------| | `fy` | Vertical focal length in pixels | | `Z` | Metric depth from Depth-Anything-V2 (metres) | | `H_px` | Pixel height of the detected cylinder | | `D_px` | Pixel diameter (minor axis of fitted ellipse) | | `H` | Real height = `(H_px / fy) × Z` | | `D` | Real diameter = `(D_px / fy) × Z` | | `V` | Volume = `π × (D/2)² × H` | *Upright* : bbox H/W ≥ aspect threshold → long axis = cylinder height *On-side* : bbox H/W < aspect threshold → long axis = cylinder length """) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", # required for HF Spaces server_port=7860, show_error=True, )