| from __future__ import annotations |
|
|
| import math |
| import os |
| import sys |
| import time |
| import warnings |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Optional |
|
|
| import cv2 |
| import numpy as np |
| import gradio as gr |
|
|
| warnings.filterwarnings("ignore") |
|
|
| |
| try: |
| import spaces |
| HF_SPACES = True |
| except ImportError: |
| HF_SPACES = False |
|
|
| |
| class _FakeSpaces: |
| @staticmethod |
| def GPU(fn=None, duration=60): |
| if fn is not None: |
| return fn |
| def decorator(f): |
| return f |
| return decorator |
| spaces = _FakeSpaces() |
|
|
| |
| try: |
| import torch |
| _ncpu = os.cpu_count() or 4 |
| torch.set_num_threads(_ncpu) |
| torch.set_num_interop_threads(max(1, _ncpu // 2)) |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| except ImportError: |
| sys.exit("[FATAL] pip install torch") |
|
|
| |
| YOLO_MODEL = os.getenv("YOLO_MODEL", "best (4).pt") |
| DEPTH_MODEL = os.getenv("DEPTH_MODEL", |
| "depth-anything/Depth-Anything-V2-Metric-Indoor-Small-hf") |
| DEPTH_SIZE = int(os.getenv("DEPTH_SIZE", "308")) |
| SNAP_DIR = Path(os.getenv("SNAP_DIR", "snapshots")) |
| SNAP_DIR.mkdir(exist_ok=True) |
|
|
|
|
| |
| |
| |
|
|
| @lru_cache(maxsize=1) |
| def _get_yolo(): |
| """Load YOLO-seg once; subsequent calls return the cached instance.""" |
| try: |
| from ultralytics import YOLO |
| except ImportError: |
| raise RuntimeError("pip install ultralytics") |
| print(f"[model] Loading YOLO {YOLO_MODEL} β¦", flush=True) |
| model = YOLO(YOLO_MODEL) |
| print("[model] YOLO ready.", flush=True) |
| return model |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_depth(): |
| """Load Depth-Anything-V2 once; subsequent calls return the cached instance.""" |
| os.environ.setdefault("TRANSFORMERS_NO_TF", "1") |
| os.environ.setdefault("TRANSFORMERS_NO_FLAX", "1") |
| try: |
| from transformers import pipeline as hf_pipeline |
| except ImportError: |
| raise RuntimeError("pip install transformers accelerate") |
| print(f"[model] Loading Depth {DEPTH_MODEL.split('/')[-1]} β¦", flush=True) |
| pipe = hf_pipeline("depth-estimation", model=DEPTH_MODEL, device=DEVICE) |
| print("[model] Depth ready.", flush=True) |
| return pipe |
|
|
|
|
| |
| @lru_cache(maxsize=1) |
| def _class_names() -> dict[int, str]: |
| return {int(k): str(v) for k, v in _get_yolo().names.items()} |
|
|
|
|
| |
| |
| |
|
|
| def _fmt_len(m: float) -> str: |
| if m <= 0: return "β" |
| if m < 0.01: return f"{m * 1000:.1f} mm" |
| if m < 1.0: return f"{m * 100:.2f} cm" |
| return f"{m:.3f} m" |
|
|
|
|
| def _fmt_vol(m3: float) -> str: |
| if m3 <= 0: return "β" |
| ml = m3 * 1e6 |
| if ml < 1000: return f"{ml:.1f} ml" |
| if ml < 1e6: return f"{ml / 1000:.3f} L" |
| return f"{m3:.4f} mΒ³" |
|
|
|
|
| |
| |
| |
|
|
| def _depth_infer(frame_bgr: np.ndarray) -> np.ndarray: |
| """ |
| Run depth model on *frame_bgr*. |
| Returns float32 depth map in metres, same (H, W) as input. |
| """ |
| from PIL import Image as PILImage |
| h, w = frame_bgr.shape[:2] |
| small = cv2.resize(frame_bgr, (DEPTH_SIZE, DEPTH_SIZE)) |
| rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB) |
| out = _get_depth()(PILImage.fromarray(rgb)) |
| d_small = np.array(out["depth"], dtype=np.float32) |
| return cv2.resize(d_small, (w, h), interpolation=cv2.INTER_LINEAR) |
|
|
|
|
| def _depth_sample(depth: np.ndarray, mask: np.ndarray, pct: float = 25.0) -> float: |
| """ |
| Robust depth estimate inside mask. |
| 25th-percentile suppresses background bleed at mask edges. |
| """ |
| vals = depth[mask > 0] |
| if not len(vals): |
| return 0.0 |
| lo, hi = np.percentile(vals, [5, 95]) |
| vals = vals[(vals >= lo) & (vals <= hi)] |
| return float(np.percentile(vals, pct)) if len(vals) else 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def _ellipse_from_mask(mask: np.ndarray): |
| """Return (cx, cy, major_px, minor_px, angle_deg) or None.""" |
| cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| if not cnts: |
| return None |
| cnt = max(cnts, key=cv2.contourArea) |
| if len(cnt) < 5: |
| return None |
| try: |
| (cx, cy), (ma, mi), angle = cv2.fitEllipse(cnt) |
| return cx, cy, max(ma, mi), min(ma, mi), angle |
| except cv2.error: |
| return None |
|
|
|
|
| def _measure_cylinder( |
| mask: np.ndarray, |
| box: list, |
| depth_map: np.ndarray, |
| fy: float, |
| aspect_thresh: float, |
| ) -> dict: |
| |
| ys, xs = np.where(mask > 0) |
| if len(xs) < 5: |
| return {} |
|
|
| pts = np.column_stack([xs, ys]).astype(np.float32) |
| (cx, cy), (rw, rh), _ = cv2.minAreaRect(pts) |
| rw, rh = max(rw, rh), min(rw, rh) |
| rect_pts = cv2.boxPoints( |
| cv2.minAreaRect(pts)).astype(np.int32) |
|
|
| ell = _ellipse_from_mask(mask) |
|
|
| x1, y1, x2, y2 = [int(v) for v in box] |
| aspect = (y2 - y1) / max(x2 - x1, 1) |
| orientation = "upright" if aspect >= aspect_thresh else "on-side" |
|
|
| H_px = rw |
| D_px = rh |
| if ell: |
| D_px = min(D_px, ell[3]) |
|
|
| Z = _depth_sample(depth_map, mask, pct=25.0) |
| res: dict = dict( |
| cx=int(cx), cy=int(cy), |
| H_px=H_px, D_px=D_px, |
| Z_m=Z, |
| orientation=orientation, |
| aspect=aspect, |
| rect_pts=rect_pts, |
| ellipse=ell, |
| ) |
|
|
| if Z > 0.02 and fy > 0: |
| H_m = (H_px / fy) * Z |
| D_m = (D_px / fy) * Z |
| R_m = D_m / 2.0 |
| V_m3 = math.pi * R_m ** 2 * H_m |
| res.update( |
| H_m=H_m, D_m=D_m, R_m=R_m, V_m3=V_m3, |
| H_str=_fmt_len(H_m), |
| D_str=_fmt_len(D_m), |
| V_str=_fmt_vol(V_m3), |
| ) |
| else: |
| res.update( |
| H_m=None, D_m=None, V_m3=None, |
| H_str=f"{int(H_px)} px", |
| D_str=f"{int(D_px)} px", |
| V_str="need depth", |
| ) |
| return res |
|
|
|
|
| |
| |
| |
|
|
| _PAL: dict[int, tuple] = {} |
|
|
| def _colour(cls: int) -> tuple: |
| if cls not in _PAL: |
| np.random.seed(cls * 17 + 5) |
| _PAL[cls] = tuple(int(v) for v in np.random.randint(80, 235, 3)) |
| return _PAL[cls] |
|
|
|
|
| def _annotate( |
| frame: np.ndarray, |
| dets: list, |
| names: dict, |
| mask_alpha: float = 0.35, |
| ) -> np.ndarray: |
| """Draw masks, bounding boxes, ellipses, and measurement overlays.""" |
| h, w = frame.shape[:2] |
| fs = max(0.40, w / 1800) |
|
|
| for d in dets: |
| c = _colour(d["cls"]) |
| x1, y1, x2, y2 = (int(v) for v in d["box"]) |
| mask = d.get("mask") |
| meas = d.get("meas") or {} |
|
|
| |
| if mask is not None: |
| overlay = np.zeros_like(frame) |
| overlay[mask > 0] = c |
| cv2.addWeighted(overlay, mask_alpha, frame, 1.0, 0, frame) |
| cnts, _ = cv2.findContours( |
| mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| cv2.drawContours(frame, cnts, -1, c, 1) |
|
|
| |
| if "rect_pts" in meas: |
| cv2.drawContours(frame, [meas["rect_pts"]], 0, c, 2) |
|
|
| |
| if "ellipse" in meas and meas["ellipse"]: |
| ecx, ecy, ema, emi, ean = meas["ellipse"] |
| cv2.ellipse(frame, |
| (int(ecx), int(ecy)), |
| (max(1, int(ema / 2)), max(1, int(emi / 2))), |
| ean, 0, 360, (255, 255, 60), 1) |
|
|
| |
| cv2.rectangle(frame, (x1, y1), (x2, y2), c, 1) |
|
|
| |
| name = names.get(d["cls"], str(d["cls"])) |
| ori = meas.get("orientation", "") |
| tid = d.get("tid", "?") |
| label = f"{name}#{tid} {d.get('best', d['conf']):.0%} [{ori}]" |
| (lw, lh), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, 1) |
| ly = max(y1 - 4, lh + 4) |
| cv2.rectangle(frame, (x1, ly - lh - 4), (x1 + lw + 4, ly + 2), c, -1) |
| cv2.putText(frame, label, (x1 + 2, ly - 2), |
| cv2.FONT_HERSHEY_SIMPLEX, fs, (255, 255, 255), 1, cv2.LINE_AA) |
|
|
| |
| if meas: |
| cx_ = meas.get("cx", (x1 + x2) // 2) |
| cy_ = meas.get("cy", (y1 + y2) // 2) |
| Z = meas.get("Z_m", 0) |
| lines = [ |
| ("H: " + meas.get("H_str", "β"), (0, 220, 255)), |
| ("D: " + meas.get("D_str", "β"), (255, 200, 0)), |
| ("V: " + meas.get("V_str", "β"), (0, 255, 140)), |
| (f"Z: {Z:.2f} m" if Z else "Z: β", (180, 180, 180)), |
| ] |
| for i, (ln, col_txt) in enumerate(lines): |
| (tw, th), _ = cv2.getTextSize( |
| ln, cv2.FONT_HERSHEY_SIMPLEX, fs * 0.82, 1) |
| tx = max(0, min(w - tw - 6, cx_ - tw // 2)) |
| ty = min(h - 6, cy_ + (i + 1) * (th + 6)) |
| cv2.rectangle(frame, (tx - 2, ty - th - 2), |
| (tx + tw + 2, ty + 2), (12, 12, 12), -1) |
| cv2.putText(frame, ln, (tx, ty - 1), |
| cv2.FONT_HERSHEY_SIMPLEX, fs * 0.82, |
| col_txt, 1, cv2.LINE_AA) |
| return frame |
|
|
|
|
| |
| |
| |
|
|
| def _run_frame( |
| frame_bgr: np.ndarray, |
| fy: float, |
| conf: float, |
| iou: float, |
| imgsz: int, |
| aspect_thresh: float, |
| mask_alpha: float, |
| ) -> tuple[np.ndarray, list[dict]]: |
| """ |
| Full pipeline on one BGR frame. |
| Returns (annotated_BGR, list_of_measurement_dicts). |
| """ |
| yolo = _get_yolo() |
| names = _class_names() |
| h, w = frame_bgr.shape[:2] |
|
|
| |
| results = yolo.predict( |
| source=frame_bgr, conf=conf, iou=iou, |
| imgsz=imgsz, verbose=False |
| )[0] |
|
|
| dets: list[dict] = [] |
| if results.boxes is not None and len(results.boxes): |
| for i, (box, conf_v, cls) in enumerate(zip( |
| results.boxes.xyxy.cpu().numpy(), |
| results.boxes.conf.cpu().numpy(), |
| results.boxes.cls.cpu().numpy().astype(int), |
| )): |
| d: dict = {"box": box.tolist(), "conf": float(conf_v), |
| "cls": int(cls), "mask": None, "tid": i + 1, |
| "best": float(conf_v)} |
| if results.masks is not None: |
| try: |
| raw = results.masks.data[i].cpu().numpy() |
| rsz = cv2.resize(raw, (w, h), |
| interpolation=cv2.INTER_NEAREST) |
| d["mask"] = (rsz > 0.5).astype(np.uint8) |
| except Exception: |
| pass |
| dets.append(d) |
|
|
| |
| depth_map: Optional[np.ndarray] = None |
| if dets: |
| depth_map = _depth_infer(frame_bgr) |
|
|
| |
| records: list[dict] = [] |
| for d in dets: |
| if d.get("mask") is not None and depth_map is not None: |
| meas = _measure_cylinder( |
| d["mask"], d["box"], depth_map, fy, aspect_thresh) |
| else: |
| meas = {} |
| d["meas"] = meas |
| records.append({ |
| "class": names.get(d["cls"], str(d["cls"])), |
| "id": d["tid"], |
| "conf": f"{d['conf']:.0%}", |
| "orientation": meas.get("orientation", "β"), |
| "H": meas.get("H_str", "β"), |
| "D": meas.get("D_str", "β"), |
| "Volume": meas.get("V_str", "β"), |
| "Z (m)": f"{meas['Z_m']:.2f}" if meas.get("Z_m") else "β", |
| }) |
|
|
| |
| annotated = _annotate(frame_bgr.copy(), dets, names, mask_alpha) |
|
|
| return annotated, records |
|
|
|
|
| |
| |
| |
|
|
| @spaces.GPU(duration=60) |
| def infer_image( |
| image_rgb: np.ndarray, |
| fy: float, |
| vfov: float, |
| conf: float, |
| iou: float, |
| imgsz: int, |
| aspect_thresh: float, |
| mask_alpha: float, |
| ) -> tuple: |
| """ |
| Gradio image tab callback. |
| Gradio passes RGB numpy arrays; OpenCV works in BGR internally. |
| """ |
| if image_rgb is None: |
| return None, [] |
|
|
| |
| if fy <= 0: |
| h = image_rgb.shape[0] |
| fy = h / (2.0 * math.tan(math.radians(vfov / 2.0))) |
|
|
| frame_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
| ann_bgr, records = _run_frame( |
| frame_bgr, fy, conf, iou, imgsz, aspect_thresh, mask_alpha) |
| ann_rgb = cv2.cvtColor(ann_bgr, cv2.COLOR_BGR2RGB) |
| return ann_rgb, records |
|
|
|
|
| @spaces.GPU(duration=120) |
| def infer_video( |
| video_path: str, |
| fy: float, |
| vfov: float, |
| conf: float, |
| iou: float, |
| imgsz: int, |
| aspect_thresh: float, |
| mask_alpha: float, |
| stride: int, |
| ) -> tuple: |
| """ |
| Process a video file; return (annotated_video_path, aggregated_records). |
| |
| *stride* β process every Nth frame (reduces latency; HF free tier is CPU). |
| Depth runs on every processed frame; YOLO on every processed frame. |
| """ |
| if not video_path: |
| return None, [] |
|
|
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return None, [{"error": "Cannot open video"}] |
|
|
| fps_src = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
| fw = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| fh = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| if fy <= 0: |
| fy = fh / (2.0 * math.tan(math.radians(vfov / 2.0))) |
|
|
| out_path = str(SNAP_DIR / f"out_{int(time.time())}.mp4") |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| writer = cv2.VideoWriter(out_path, fourcc, fps_src, (fw, fh)) |
|
|
| all_records: list[dict] = [] |
| frame_idx = 0 |
| last_ann = None |
|
|
| while True: |
| ret, frame_bgr = cap.read() |
| if not ret: |
| break |
| frame_idx += 1 |
|
|
| if frame_idx % stride == 0: |
| ann_bgr, records = _run_frame( |
| frame_bgr, fy, conf, iou, imgsz, aspect_thresh, mask_alpha) |
| last_ann = ann_bgr |
| for r in records: |
| r["frame"] = frame_idx |
| all_records.extend(records) |
| else: |
| |
| ann_bgr = last_ann if last_ann is not None else frame_bgr |
|
|
| writer.write(ann_bgr) |
|
|
| cap.release() |
| writer.release() |
| return out_path, all_records |
|
|
|
|
| |
| |
| |
|
|
| DESCRIPTION = """ |
| # π Cylinder Volume Estimator |
| **YOLO-seg** detects objects β **Depth Anything V2** measures metric depth β |
| **Pinhole geometry** computes real-world Height, Diameter & Volume. |
| |
| `V = Ο Γ (D/2)Β² Γ H` `H = (H_px / fy) Γ Z` `D = (W_px / fy) Γ Z` |
| |
| > **Tip:** For best accuracy set *Focal length fy* from your camera spec, |
| > or use the VFOV slider for auto-estimation. |
| """ |
|
|
| def _sidebar(): |
| """Shared camera / model controls.""" |
| with gr.Accordion("π· Camera & Model Settings", open=True): |
| fy = gr.Number(label="Focal length fy (px) β 0 = auto from VFOV", |
| value=0, minimum=0, maximum=5000, step=10, |
| info="Camera focal length in pixels. 0 = auto-estimated.") |
| vfov = gr.Slider(30, 120, value=60, step=1, |
| label="Vertical FOV (Β°) β used when fy = 0") |
| conf = gr.Slider(0.20, 0.90, value=0.40, step=0.05, |
| label="YOLO confidence threshold") |
| iou = gr.Slider(0.20, 0.80, value=0.45, step=0.05, |
| label="YOLO NMS IoU threshold") |
| imgsz = gr.Dropdown([160, 224, 320, 416, 512], value=320, |
| label="YOLO inference size (px) β lower = faster") |
| asp = gr.Slider(0.8, 3.0, value=1.2, step=0.1, |
| label="Aspect threshold (H/W upright vs on-side)") |
| alpha = gr.Slider(0.10, 0.70, value=0.35, step=0.05, |
| label="Mask overlay alpha") |
| return fy, vfov, conf, iou, imgsz, asp, alpha |
|
|
|
|
| with gr.Blocks( |
| title="Cylinder Volume Estimator", |
| theme=gr.themes.Base( |
| primary_hue="cyan", |
| secondary_hue="slate", |
| font=[gr.themes.GoogleFont("IBM Plex Mono"), "monospace"], |
| ), |
| ) as demo: |
|
|
| gr.Markdown(DESCRIPTION) |
|
|
| with gr.Tabs(): |
|
|
| |
| with gr.Tab("πΌοΈ Image"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| img_in = gr.Image(label="Input image", type="numpy", |
| sources=["upload", "webcam"]) |
| |
| i_fy, i_vfov, i_conf, i_iou, i_imgsz, i_asp, i_alpha = _sidebar() |
| img_btn = gr.Button("βΆ Estimate", variant="primary", size="lg") |
|
|
| with gr.Column(scale=1): |
| img_out = gr.Image(label="Annotated output", type="numpy") |
| img_tbl = gr.DataFrame( |
| label="Measurements", |
| headers=["class","id","conf","orientation", |
| "H","D","Volume","Z (m)"], |
| wrap=True, |
| ) |
|
|
| img_btn.click( |
| fn=infer_image, |
| inputs=[img_in, i_fy, i_vfov, i_conf, i_iou, i_imgsz, i_asp, i_alpha], |
| outputs=[img_out, img_tbl], |
| api_name="infer_image", |
| ) |
|
|
| |
| with gr.Tab("π¬ Video"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| vid_in = gr.Video(label="Input video") |
| v_fy, v_vfov, v_conf, v_iou, v_imgsz, v_asp, v_alpha = _sidebar() |
| v_stride = gr.Slider(1, 10, value=4, step=1, |
| label="Process every N frames (stride)") |
| vid_btn = gr.Button("βΆ Process video", variant="primary", size="lg") |
| gr.Markdown( |
| "> β οΈ Free-tier Spaces run on CPU β keep videos short (<30 s) " |
| "or increase stride to reduce latency." |
| ) |
|
|
| with gr.Column(scale=1): |
| vid_out = gr.Video(label="Annotated video") |
| vid_tbl = gr.DataFrame( |
| label="Measurements (all frames)", |
| headers=["frame","class","id","conf","orientation", |
| "H","D","Volume","Z (m)"], |
| wrap=True, |
| ) |
|
|
| vid_btn.click( |
| fn=infer_video, |
| inputs=[vid_in, v_fy, v_vfov, v_conf, v_iou, v_imgsz, |
| v_asp, v_alpha, v_stride], |
| outputs=[vid_out, vid_tbl], |
| api_name="infer_video", |
| ) |
|
|
| gr.Markdown(""" |
| --- |
| **Formula reference** |
| |
| | Symbol | Meaning | |
| |--------|---------| |
| | `fy` | Vertical focal length in pixels | |
| | `Z` | Metric depth from Depth-Anything-V2 (metres) | |
| | `H_px` | Pixel height of the detected cylinder | |
| | `D_px` | Pixel diameter (minor axis of fitted ellipse) | |
| | `H` | Real height = `(H_px / fy) Γ Z` | |
| | `D` | Real diameter = `(D_px / fy) Γ Z` | |
| | `V` | Volume = `Ο Γ (D/2)Β² Γ H` | |
| |
| *Upright* : bbox H/W β₯ aspect threshold β long axis = cylinder height |
| *On-side* : bbox H/W < aspect threshold β long axis = cylinder length |
| """) |
|
|
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True, |
| ) |