cylinder / app.py
eho69's picture
Create app.py
56884a9 verified
from __future__ import annotations
import math
import os
import sys
import time
import warnings
from functools import lru_cache
from pathlib import Path
from typing import Optional
import cv2
import numpy as np
import gradio as gr
warnings.filterwarnings("ignore")
# ── ZeroGPU / spaces compatibility ───────────────────────────────────────────
try:
import spaces # Hugging Face ZeroGPU
HF_SPACES = True
except ImportError:
HF_SPACES = False
# Shim so we can write @spaces.GPU unconditionally below
class _FakeSpaces:
@staticmethod
def GPU(fn=None, duration=60):
if fn is not None:
return fn
def decorator(f):
return f
return decorator
spaces = _FakeSpaces() # type: ignore
# ── Device ────────────────────────────────────────────────────────────────────
try:
import torch
_ncpu = os.cpu_count() or 4
torch.set_num_threads(_ncpu)
torch.set_num_interop_threads(max(1, _ncpu // 2))
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
except ImportError:
sys.exit("[FATAL] pip install torch")
# ── Env-tunable constants ─────────────────────────────────────────────────────
YOLO_MODEL = os.getenv("YOLO_MODEL", "best (4).pt")
DEPTH_MODEL = os.getenv("DEPTH_MODEL",
"depth-anything/Depth-Anything-V2-Metric-Indoor-Small-hf")
DEPTH_SIZE = int(os.getenv("DEPTH_SIZE", "308"))
SNAP_DIR = Path(os.getenv("SNAP_DIR", "snapshots"))
SNAP_DIR.mkdir(exist_ok=True)
# ══════════════════════════════════════════════════════════════════════════════
# Model singletons β€” loaded exactly once per worker process
# ══════════════════════════════════════════════════════════════════════════════
@lru_cache(maxsize=1)
def _get_yolo():
"""Load YOLO-seg once; subsequent calls return the cached instance."""
try:
from ultralytics import YOLO
except ImportError:
raise RuntimeError("pip install ultralytics")
print(f"[model] Loading YOLO {YOLO_MODEL} …", flush=True)
model = YOLO(YOLO_MODEL)
print("[model] YOLO ready.", flush=True)
return model
@lru_cache(maxsize=1)
def _get_depth():
"""Load Depth-Anything-V2 once; subsequent calls return the cached instance."""
os.environ.setdefault("TRANSFORMERS_NO_TF", "1")
os.environ.setdefault("TRANSFORMERS_NO_FLAX", "1")
try:
from transformers import pipeline as hf_pipeline
except ImportError:
raise RuntimeError("pip install transformers accelerate")
print(f"[model] Loading Depth {DEPTH_MODEL.split('/')[-1]} …", flush=True)
pipe = hf_pipeline("depth-estimation", model=DEPTH_MODEL, device=DEVICE)
print("[model] Depth ready.", flush=True)
return pipe
# ── Names helper ──────────────────────────────────────────────────────────────
@lru_cache(maxsize=1)
def _class_names() -> dict[int, str]:
return {int(k): str(v) for k, v in _get_yolo().names.items()}
# ══════════════════════════════════════════════════════════════════════════════
# Unit formatting
# ══════════════════════════════════════════════════════════════════════════════
def _fmt_len(m: float) -> str:
if m <= 0: return "β€”"
if m < 0.01: return f"{m * 1000:.1f} mm"
if m < 1.0: return f"{m * 100:.2f} cm"
return f"{m:.3f} m"
def _fmt_vol(m3: float) -> str:
if m3 <= 0: return "β€”"
ml = m3 * 1e6
if ml < 1000: return f"{ml:.1f} ml"
if ml < 1e6: return f"{ml / 1000:.3f} L"
return f"{m3:.4f} mΒ³"
# ══════════════════════════════════════════════════════════════════════════════
# Depth inference helper
# ══════════════════════════════════════════════════════════════════════════════
def _depth_infer(frame_bgr: np.ndarray) -> np.ndarray:
"""
Run depth model on *frame_bgr*.
Returns float32 depth map in metres, same (H, W) as input.
"""
from PIL import Image as PILImage
h, w = frame_bgr.shape[:2]
small = cv2.resize(frame_bgr, (DEPTH_SIZE, DEPTH_SIZE))
rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)
out = _get_depth()(PILImage.fromarray(rgb))
d_small = np.array(out["depth"], dtype=np.float32)
return cv2.resize(d_small, (w, h), interpolation=cv2.INTER_LINEAR)
def _depth_sample(depth: np.ndarray, mask: np.ndarray, pct: float = 25.0) -> float:
"""
Robust depth estimate inside mask.
25th-percentile suppresses background bleed at mask edges.
"""
vals = depth[mask > 0]
if not len(vals):
return 0.0
lo, hi = np.percentile(vals, [5, 95])
vals = vals[(vals >= lo) & (vals <= hi)]
return float(np.percentile(vals, pct)) if len(vals) else 0.0
# ══════════════════════════════════════════════════════════════════════════════
# Cylinder geometry
# ══════════════════════════════════════════════════════════════════════════════
def _ellipse_from_mask(mask: np.ndarray):
"""Return (cx, cy, major_px, minor_px, angle_deg) or None."""
cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not cnts:
return None
cnt = max(cnts, key=cv2.contourArea)
if len(cnt) < 5:
return None
try:
(cx, cy), (ma, mi), angle = cv2.fitEllipse(cnt)
return cx, cy, max(ma, mi), min(ma, mi), angle
except cv2.error:
return None
def _measure_cylinder(
mask: np.ndarray,
box: list,
depth_map: np.ndarray,
fy: float,
aspect_thresh: float,
) -> dict:
ys, xs = np.where(mask > 0)
if len(xs) < 5:
return {}
pts = np.column_stack([xs, ys]).astype(np.float32)
(cx, cy), (rw, rh), _ = cv2.minAreaRect(pts)
rw, rh = max(rw, rh), min(rw, rh) # rw = long axis
rect_pts = cv2.boxPoints(
cv2.minAreaRect(pts)).astype(np.int32)
ell = _ellipse_from_mask(mask)
x1, y1, x2, y2 = [int(v) for v in box]
aspect = (y2 - y1) / max(x2 - x1, 1)
orientation = "upright" if aspect >= aspect_thresh else "on-side"
H_px = rw
D_px = rh
if ell:
D_px = min(D_px, ell[3]) # use minor axis of ellipse as tighter D estimate
Z = _depth_sample(depth_map, mask, pct=25.0)
res: dict = dict(
cx=int(cx), cy=int(cy),
H_px=H_px, D_px=D_px,
Z_m=Z,
orientation=orientation,
aspect=aspect,
rect_pts=rect_pts,
ellipse=ell,
)
if Z > 0.02 and fy > 0:
H_m = (H_px / fy) * Z
D_m = (D_px / fy) * Z
R_m = D_m / 2.0
V_m3 = math.pi * R_m ** 2 * H_m
res.update(
H_m=H_m, D_m=D_m, R_m=R_m, V_m3=V_m3,
H_str=_fmt_len(H_m),
D_str=_fmt_len(D_m),
V_str=_fmt_vol(V_m3),
)
else:
res.update(
H_m=None, D_m=None, V_m3=None,
H_str=f"{int(H_px)} px",
D_str=f"{int(D_px)} px",
V_str="need depth",
)
return res
# ══════════════════════════════════════════════════════════════════════════════
# Drawing
# ══════════════════════════════════════════════════════════════════════════════
_PAL: dict[int, tuple] = {}
def _colour(cls: int) -> tuple:
if cls not in _PAL:
np.random.seed(cls * 17 + 5)
_PAL[cls] = tuple(int(v) for v in np.random.randint(80, 235, 3))
return _PAL[cls]
def _annotate(
frame: np.ndarray,
dets: list,
names: dict,
mask_alpha: float = 0.35,
) -> np.ndarray:
"""Draw masks, bounding boxes, ellipses, and measurement overlays."""
h, w = frame.shape[:2]
fs = max(0.40, w / 1800)
for d in dets:
c = _colour(d["cls"])
x1, y1, x2, y2 = (int(v) for v in d["box"])
mask = d.get("mask")
meas = d.get("meas") or {}
# ── segmentation mask + contour ───────────────────────────────
if mask is not None:
overlay = np.zeros_like(frame)
overlay[mask > 0] = c
cv2.addWeighted(overlay, mask_alpha, frame, 1.0, 0, frame)
cnts, _ = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(frame, cnts, -1, c, 1)
# ── oriented bounding rect ────────────────────────────────────
if "rect_pts" in meas:
cv2.drawContours(frame, [meas["rect_pts"]], 0, c, 2)
# ── fitted ellipse (yellow) ───────────────────────────────────
if "ellipse" in meas and meas["ellipse"]:
ecx, ecy, ema, emi, ean = meas["ellipse"]
cv2.ellipse(frame,
(int(ecx), int(ecy)),
(max(1, int(ema / 2)), max(1, int(emi / 2))),
ean, 0, 360, (255, 255, 60), 1)
# ── bbox ──────────────────────────────────────────────────────
cv2.rectangle(frame, (x1, y1), (x2, y2), c, 1)
# ── label: class + track-id + conf ────────────────────────────
name = names.get(d["cls"], str(d["cls"]))
ori = meas.get("orientation", "")
tid = d.get("tid", "?")
label = f"{name}#{tid} {d.get('best', d['conf']):.0%} [{ori}]"
(lw, lh), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, 1)
ly = max(y1 - 4, lh + 4)
cv2.rectangle(frame, (x1, ly - lh - 4), (x1 + lw + 4, ly + 2), c, -1)
cv2.putText(frame, label, (x1 + 2, ly - 2),
cv2.FONT_HERSHEY_SIMPLEX, fs, (255, 255, 255), 1, cv2.LINE_AA)
# ── measurement block ─────────────────────────────────────────
if meas:
cx_ = meas.get("cx", (x1 + x2) // 2)
cy_ = meas.get("cy", (y1 + y2) // 2)
Z = meas.get("Z_m", 0)
lines = [
("H: " + meas.get("H_str", "β€”"), (0, 220, 255)),
("D: " + meas.get("D_str", "β€”"), (255, 200, 0)),
("V: " + meas.get("V_str", "β€”"), (0, 255, 140)),
(f"Z: {Z:.2f} m" if Z else "Z: β€”", (180, 180, 180)),
]
for i, (ln, col_txt) in enumerate(lines):
(tw, th), _ = cv2.getTextSize(
ln, cv2.FONT_HERSHEY_SIMPLEX, fs * 0.82, 1)
tx = max(0, min(w - tw - 6, cx_ - tw // 2))
ty = min(h - 6, cy_ + (i + 1) * (th + 6))
cv2.rectangle(frame, (tx - 2, ty - th - 2),
(tx + tw + 2, ty + 2), (12, 12, 12), -1)
cv2.putText(frame, ln, (tx, ty - 1),
cv2.FONT_HERSHEY_SIMPLEX, fs * 0.82,
col_txt, 1, cv2.LINE_AA)
return frame
# ══════════════════════════════════════════════════════════════════════════════
# Core inference β€” single frame
# ══════════════════════════════════════════════════════════════════════════════
def _run_frame(
frame_bgr: np.ndarray,
fy: float,
conf: float,
iou: float,
imgsz: int,
aspect_thresh: float,
mask_alpha: float,
) -> tuple[np.ndarray, list[dict]]:
"""
Full pipeline on one BGR frame.
Returns (annotated_BGR, list_of_measurement_dicts).
"""
yolo = _get_yolo()
names = _class_names()
h, w = frame_bgr.shape[:2]
# ── YOLO segmentation ─────────────────────────────────────────────
results = yolo.predict(
source=frame_bgr, conf=conf, iou=iou,
imgsz=imgsz, verbose=False
)[0]
dets: list[dict] = []
if results.boxes is not None and len(results.boxes):
for i, (box, conf_v, cls) in enumerate(zip(
results.boxes.xyxy.cpu().numpy(),
results.boxes.conf.cpu().numpy(),
results.boxes.cls.cpu().numpy().astype(int),
)):
d: dict = {"box": box.tolist(), "conf": float(conf_v),
"cls": int(cls), "mask": None, "tid": i + 1,
"best": float(conf_v)}
if results.masks is not None:
try:
raw = results.masks.data[i].cpu().numpy()
rsz = cv2.resize(raw, (w, h),
interpolation=cv2.INTER_NEAREST)
d["mask"] = (rsz > 0.5).astype(np.uint8)
except Exception:
pass
dets.append(d)
# ── Depth estimation ──────────────────────────────────────────────
depth_map: Optional[np.ndarray] = None
if dets:
depth_map = _depth_infer(frame_bgr)
# ── Per-object cylinder measurement ──────────────────────────────
records: list[dict] = []
for d in dets:
if d.get("mask") is not None and depth_map is not None:
meas = _measure_cylinder(
d["mask"], d["box"], depth_map, fy, aspect_thresh)
else:
meas = {}
d["meas"] = meas
records.append({
"class": names.get(d["cls"], str(d["cls"])),
"id": d["tid"],
"conf": f"{d['conf']:.0%}",
"orientation": meas.get("orientation", "β€”"),
"H": meas.get("H_str", "β€”"),
"D": meas.get("D_str", "β€”"),
"Volume": meas.get("V_str", "β€”"),
"Z (m)": f"{meas['Z_m']:.2f}" if meas.get("Z_m") else "β€”",
})
# ── Annotate frame ────────────────────────────────────────────────
annotated = _annotate(frame_bgr.copy(), dets, names, mask_alpha)
return annotated, records
# ══════════════════════════════════════════════════════════════════════════════
# Gradio callbacks β€” decorated with @spaces.GPU for ZeroGPU Spaces
# ══════════════════════════════════════════════════════════════════════════════
@spaces.GPU(duration=60)
def infer_image(
image_rgb: np.ndarray,
fy: float,
vfov: float,
conf: float,
iou: float,
imgsz: int,
aspect_thresh: float,
mask_alpha: float,
) -> tuple:
"""
Gradio image tab callback.
Gradio passes RGB numpy arrays; OpenCV works in BGR internally.
"""
if image_rgb is None:
return None, []
# Auto-compute fy if not overridden
if fy <= 0:
h = image_rgb.shape[0]
fy = h / (2.0 * math.tan(math.radians(vfov / 2.0)))
frame_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
ann_bgr, records = _run_frame(
frame_bgr, fy, conf, iou, imgsz, aspect_thresh, mask_alpha)
ann_rgb = cv2.cvtColor(ann_bgr, cv2.COLOR_BGR2RGB)
return ann_rgb, records
@spaces.GPU(duration=120)
def infer_video(
video_path: str,
fy: float,
vfov: float,
conf: float,
iou: float,
imgsz: int,
aspect_thresh: float,
mask_alpha: float,
stride: int,
) -> tuple:
"""
Process a video file; return (annotated_video_path, aggregated_records).
*stride* β€” process every Nth frame (reduces latency; HF free tier is CPU).
Depth runs on every processed frame; YOLO on every processed frame.
"""
if not video_path:
return None, []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, [{"error": "Cannot open video"}]
fps_src = cap.get(cv2.CAP_PROP_FPS) or 25.0
fw = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
fh = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if fy <= 0:
fy = fh / (2.0 * math.tan(math.radians(vfov / 2.0)))
out_path = str(SNAP_DIR / f"out_{int(time.time())}.mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(out_path, fourcc, fps_src, (fw, fh))
all_records: list[dict] = []
frame_idx = 0
last_ann = None
while True:
ret, frame_bgr = cap.read()
if not ret:
break
frame_idx += 1
if frame_idx % stride == 0:
ann_bgr, records = _run_frame(
frame_bgr, fy, conf, iou, imgsz, aspect_thresh, mask_alpha)
last_ann = ann_bgr
for r in records:
r["frame"] = frame_idx
all_records.extend(records)
else:
# Reuse last annotation for non-processed frames (saves compute)
ann_bgr = last_ann if last_ann is not None else frame_bgr
writer.write(ann_bgr)
cap.release()
writer.release()
return out_path, all_records
# ══════════════════════════════════════════════════════════════════════════════
# Gradio UI
# ══════════════════════════════════════════════════════════════════════════════
DESCRIPTION = """
# πŸ”­ Cylinder Volume Estimator
**YOLO-seg** detects objects β†’ **Depth Anything V2** measures metric depth β†’
**Pinhole geometry** computes real-world Height, Diameter & Volume.
`V = Ο€ Γ— (D/2)Β² Γ— H` &nbsp;&nbsp;&nbsp;&nbsp; `H = (H_px / fy) Γ— Z` &nbsp;&nbsp;&nbsp;&nbsp; `D = (W_px / fy) Γ— Z`
> **Tip:** For best accuracy set *Focal length fy* from your camera spec,
> or use the VFOV slider for auto-estimation.
"""
def _sidebar():
"""Shared camera / model controls."""
with gr.Accordion("πŸ“· Camera & Model Settings", open=True):
fy = gr.Number(label="Focal length fy (px) β€” 0 = auto from VFOV",
value=0, minimum=0, maximum=5000, step=10,
info="Camera focal length in pixels. 0 = auto-estimated.")
vfov = gr.Slider(30, 120, value=60, step=1,
label="Vertical FOV (Β°) β€” used when fy = 0")
conf = gr.Slider(0.20, 0.90, value=0.40, step=0.05,
label="YOLO confidence threshold")
iou = gr.Slider(0.20, 0.80, value=0.45, step=0.05,
label="YOLO NMS IoU threshold")
imgsz = gr.Dropdown([160, 224, 320, 416, 512], value=320,
label="YOLO inference size (px) β€” lower = faster")
asp = gr.Slider(0.8, 3.0, value=1.2, step=0.1,
label="Aspect threshold (H/W upright vs on-side)")
alpha = gr.Slider(0.10, 0.70, value=0.35, step=0.05,
label="Mask overlay alpha")
return fy, vfov, conf, iou, imgsz, asp, alpha
with gr.Blocks(
title="Cylinder Volume Estimator",
theme=gr.themes.Base(
primary_hue="cyan",
secondary_hue="slate",
font=[gr.themes.GoogleFont("IBM Plex Mono"), "monospace"],
),
) as demo:
gr.Markdown(DESCRIPTION)
with gr.Tabs():
# ── Image tab ─────────────────────────────────────────────────────────
with gr.Tab("πŸ–ΌοΈ Image"):
with gr.Row():
with gr.Column(scale=1):
img_in = gr.Image(label="Input image", type="numpy",
sources=["upload", "webcam"])
# Camera / model controls
i_fy, i_vfov, i_conf, i_iou, i_imgsz, i_asp, i_alpha = _sidebar()
img_btn = gr.Button("β–Ά Estimate", variant="primary", size="lg")
with gr.Column(scale=1):
img_out = gr.Image(label="Annotated output", type="numpy")
img_tbl = gr.DataFrame(
label="Measurements",
headers=["class","id","conf","orientation",
"H","D","Volume","Z (m)"],
wrap=True,
)
img_btn.click(
fn=infer_image,
inputs=[img_in, i_fy, i_vfov, i_conf, i_iou, i_imgsz, i_asp, i_alpha],
outputs=[img_out, img_tbl],
api_name="infer_image",
)
# ── Video tab ─────────────────────────────────────────────────────────
with gr.Tab("🎬 Video"):
with gr.Row():
with gr.Column(scale=1):
vid_in = gr.Video(label="Input video")
v_fy, v_vfov, v_conf, v_iou, v_imgsz, v_asp, v_alpha = _sidebar()
v_stride = gr.Slider(1, 10, value=4, step=1,
label="Process every N frames (stride)")
vid_btn = gr.Button("β–Ά Process video", variant="primary", size="lg")
gr.Markdown(
"> ⚠️ Free-tier Spaces run on CPU β€” keep videos short (<30 s) "
"or increase stride to reduce latency."
)
with gr.Column(scale=1):
vid_out = gr.Video(label="Annotated video")
vid_tbl = gr.DataFrame(
label="Measurements (all frames)",
headers=["frame","class","id","conf","orientation",
"H","D","Volume","Z (m)"],
wrap=True,
)
vid_btn.click(
fn=infer_video,
inputs=[vid_in, v_fy, v_vfov, v_conf, v_iou, v_imgsz,
v_asp, v_alpha, v_stride],
outputs=[vid_out, vid_tbl],
api_name="infer_video",
)
gr.Markdown("""
---
**Formula reference**
| Symbol | Meaning |
|--------|---------|
| `fy` | Vertical focal length in pixels |
| `Z` | Metric depth from Depth-Anything-V2 (metres) |
| `H_px` | Pixel height of the detected cylinder |
| `D_px` | Pixel diameter (minor axis of fitted ellipse) |
| `H` | Real height = `(H_px / fy) Γ— Z` |
| `D` | Real diameter = `(D_px / fy) Γ— Z` |
| `V` | Volume = `Ο€ Γ— (D/2)Β² Γ— H` |
*Upright* : bbox H/W β‰₯ aspect threshold β†’ long axis = cylinder height
*On-side* : bbox H/W < aspect threshold β†’ long axis = cylinder length
""")
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0", # required for HF Spaces
server_port=7860,
show_error=True,
)