YOLO-3D / app.py
unknown
Max CPU realtime: optical-flow live preview and aggressive detector throttling
9a33183
#!/usr/bin/env python3
import os
import threading
import time
from collections import deque
import cv2
import gradio as gr
import numpy as np
import torch
from bbox3d_utils import BBox3DEstimator, BirdEyeView
from depth_model import DepthEstimator
from detection_model import ObjectDetector
DEPTH_MODE = "Depth V2 Realtime (CPU)"
DEPTH_BALANCED_MODE = "Depth V2 Balanced (CPU)"
DEPTH_QUALITY_MODE = "Depth V2 Quality (CPU)"
GEOMETRY_MODE = "Geometry Depth Realtime (CPU)"
FAST_MODE = "Fast Detect (CPU)"
ULTRA_FAST_MODE = "Ultra Fast Detect (CPU)"
MODE_OPTIONS = [
DEPTH_MODE,
DEPTH_BALANCED_MODE,
DEPTH_QUALITY_MODE,
GEOMETRY_MODE,
FAST_MODE,
ULTRA_FAST_MODE,
]
MODE_PROFILES = {
DEPTH_MODE: {
"depth_source": "depth_v2",
"max_side": 640,
"depth_side": 384,
"depth_interval": 3,
"detect_every": 2,
"conf_floor": 0.22,
"allow_tracking": False,
"allow_bev": True,
"max_det": 120,
"hud": "Depth Realtime",
},
DEPTH_BALANCED_MODE: {
"depth_source": "depth_v2",
"max_side": 576,
"depth_side": 320,
"depth_interval": 4,
"detect_every": 3,
"conf_floor": 0.25,
"allow_tracking": False,
"allow_bev": True,
"max_det": 100,
"hud": "Depth Balanced",
},
DEPTH_QUALITY_MODE: {
"depth_source": "depth_v2",
"max_side": 768,
"depth_side": 512,
"depth_interval": 1,
"detect_every": 1,
"conf_floor": 0.20,
"allow_tracking": False,
"allow_bev": True,
"max_det": 150,
"hud": "Depth Quality",
},
GEOMETRY_MODE: {
"depth_source": "geometry",
"max_side": 640,
"depth_side": 0,
"depth_interval": 0,
"detect_every": 3,
"conf_floor": 0.25,
"allow_tracking": False,
"allow_bev": True,
"max_det": 120,
"hud": "Geometry Depth",
},
FAST_MODE: {
"depth_source": "none",
"max_side": 640,
"depth_side": 0,
"depth_interval": 0,
"detect_every": 4,
"conf_floor": 0.30,
"allow_tracking": False,
"allow_bev": False,
"max_det": 100,
"hud": "Fast Detect",
},
ULTRA_FAST_MODE: {
"depth_source": "none",
"max_side": 256,
"depth_side": 0,
"depth_interval": 0,
"detect_every": 6,
"conf_floor": 0.40,
"allow_tracking": False,
"allow_bev": False,
"max_det": 35,
"hud": "Ultra Fast",
},
}
def _configure_cpu_runtime():
cpu_count = max(1, os.cpu_count() or 1)
thread_count = min(4, cpu_count)
os.environ.setdefault("OMP_NUM_THREADS", str(thread_count))
os.environ.setdefault("MKL_NUM_THREADS", str(thread_count))
torch.set_num_threads(thread_count)
if hasattr(torch, "set_num_interop_threads"):
torch.set_num_interop_threads(max(1, thread_count // 2))
class GeometryDepthEstimator:
def __init__(self):
self.class_heights_m = {
"person": 1.70,
"car": 1.52,
"truck": 3.00,
"bus": 3.00,
"motorcycle": 1.40,
"bicycle": 1.40,
"dog": 0.60,
"cat": 0.35,
"potted plant": 0.60,
"plant": 0.60,
"chair": 0.90,
"bottle": 0.25,
}
self.depth_history = {}
self.depth_smoothing = 0.65
def _class_height(self, class_name):
key = class_name.lower()
if key in self.class_heights_m:
return self.class_heights_m[key]
for name, height in self.class_heights_m.items():
if name in key:
return height
return 1.60
def estimate(self, bbox, class_name, frame_shape, object_id=None):
h, w = frame_shape[:2]
x1, y1, x2, y2 = [float(v) for v in bbox]
bbox_h = max(2.0, y2 - y1)
bbox_w = max(2.0, x2 - x1)
bottom_y = min(float(h - 1), max(0.0, y2))
center_x = (x1 + x2) * 0.5
fx = 0.95 * float(w)
cy = 0.50 * float(h)
camera_height_m = 1.55
object_height_m = self._class_height(class_name)
# Size-based distance: d = f * H / h
dist_size = (fx * object_height_m) / bbox_h
# Ground-plane cue from vertical position relative to horizon.
pixel_to_ground = max(2.0, bottom_y - cy)
dist_ground = (camera_height_m * fx) / pixel_to_ground
# Box aspect ratio prior: very wide boxes are often closer.
aspect = bbox_w / bbox_h
aspect_factor = 1.0 - min(0.20, max(0.0, aspect - 0.6) * 0.08)
# Horizontal offset cue: objects at far sides are usually slightly farther.
rel_x = abs((center_x / max(1.0, float(w))) - 0.5) * 2.0
side_factor = 1.0 + 0.10 * rel_x
distance_m = (0.72 * dist_size + 0.28 * dist_ground) * aspect_factor * side_factor
distance_m = float(np.clip(distance_m, 0.8, 25.0))
if object_id is not None:
prev = self.depth_history.get(int(object_id))
if prev is not None:
distance_m = self.depth_smoothing * prev + (1.0 - self.depth_smoothing) * distance_m
self.depth_history[int(object_id)] = distance_m
depth_norm = float(np.clip((distance_m - 1.0) / 9.0, 0.0, 1.0))
return depth_norm, distance_m
class RealtimeEngine:
def __init__(self):
_configure_cpu_runtime()
self.lock = threading.Lock()
self.depth_lock = threading.Lock()
self.detector = None
self.depth_estimator = None
self.geometry_depth = GeometryDepthEstimator()
self.bbox3d_estimator = BBox3DEstimator()
self.bev = BirdEyeView(scale=55, size=(260, 260))
self.latency_ms = deque(maxlen=30)
self.depth_input_side = 384
self.frame_idx = 0
self.input_idx = 0
self.cached_depth_map = None
self.depth_job_running = False
self.depth_last_update = 0.0
self.last_detections = []
self.last_class_names = {}
self.last_detect_mode = None
self.last_detect_shape = None
self.live_lock = threading.Lock()
self.live_boxes = []
self.live_prev_gray = None
self.live_prev_pts = None
self.live_shape = None
self.live_hud = "Live"
@staticmethod
def _profile(mode):
return MODE_PROFILES.get(mode, MODE_PROFILES[DEPTH_MODE])
@staticmethod
def _class_name(class_names, class_id):
if isinstance(class_names, dict):
return class_names.get(class_id, str(class_id))
if isinstance(class_names, (list, tuple)) and 0 <= class_id < len(class_names):
return class_names[class_id]
return str(class_id)
@staticmethod
def _color_for_class(class_name):
name = class_name.lower()
if "car" in name or "vehicle" in name:
return (0, 0, 255)
if "truck" in name or "bus" in name:
return (0, 165, 255)
if "person" in name:
return (0, 255, 0)
if "bicycle" in name or "motorcycle" in name:
return (255, 0, 0)
if "potted plant" in name or "plant" in name:
return (0, 255, 255)
return (255, 255, 255)
@staticmethod
def _resize_for_inference(frame, max_side):
h, w = frame.shape[:2]
longest = max(h, w)
if longest <= max_side:
return frame
scale = max_side / float(longest)
new_w = max(32, int(w * scale))
new_h = max(32, int(h * scale))
return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
@staticmethod
def _overlay_corner(base, overlay, size_ratio=0.26, anchor="tl"):
h, w = base.shape[:2]
target_h = max(64, int(h * size_ratio))
target_w = int((overlay.shape[1] / max(1, overlay.shape[0])) * target_h)
target_w = max(64, min(target_w, w // 2))
target_h = min(target_h, h // 2)
resized = cv2.resize(overlay, (target_w, target_h), interpolation=cv2.INTER_AREA)
if anchor == "tr":
x0, y0 = w - target_w, 0
elif anchor == "bl":
x0, y0 = 0, h - target_h
elif anchor == "br":
x0, y0 = w - target_w, h - target_h
else:
x0, y0 = 0, 0
base[y0 : y0 + target_h, x0 : x0 + target_w] = resized
cv2.rectangle(base, (x0, y0), (x0 + target_w, y0 + target_h), (255, 255, 255), 1)
def _draw_hud(self, frame, mode_name, extra=""):
mean_latency = float(np.mean(self.latency_ms)) if self.latency_ms else 0.0
fps = (1000.0 / mean_latency) if mean_latency > 0 else 0.0
text = f"{mode_name} | CPU | FPS {fps:.1f} | Latency {mean_latency:.1f} ms"
if extra:
text = f"{text} | {extra}"
cv2.putText(frame, text, (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.60, (0, 0, 255), 2)
def _should_run_detector(self, mode, profile, frame_shape):
detect_every = int(profile.get("detect_every", 1))
if detect_every <= 1:
return True
if not self.last_detections:
return True
if self.last_detect_mode != mode:
return True
if self.last_detect_shape != frame_shape[:2]:
return True
return (self.input_idx % detect_every) == 0
def _set_live_state_from_detections(self, frame_bgr, detections, class_names, hud_name):
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
boxes = []
pts = []
h, w = frame_bgr.shape[:2]
for bbox, score, class_id, obj_id in detections:
x1, y1, x2, y2 = [float(v) for v in bbox]
x1 = float(np.clip(x1, 0, w - 1))
x2 = float(np.clip(x2, 0, w - 1))
y1 = float(np.clip(y1, 0, h - 1))
y2 = float(np.clip(y2, 0, h - 1))
if x2 <= x1 or y2 <= y1:
continue
boxes.append(
{
"bbox": [x1, y1, x2, y2],
"score": float(score),
"class_name": self._class_name(class_names, class_id),
"object_id": obj_id,
}
)
pts.append([(x1 + x2) * 0.5, (y1 + y2) * 0.5])
pts_arr = None
if pts:
pts_arr = np.array(pts, dtype=np.float32).reshape(-1, 1, 2)
with self.live_lock:
self.live_boxes = boxes
self.live_prev_gray = gray
self.live_prev_pts = pts_arr
self.live_shape = frame_bgr.shape[:2]
self.live_hud = hud_name
def render_live_preview(self, frame_rgb, mode, auto_optimize, max_side):
profile = self._profile(mode)
if auto_optimize:
effective_max_side = int(profile["max_side"])
hud_name = profile["hud"]
else:
effective_max_side = int(max_side)
hud_name = profile["hud"]
orig_h, orig_w = frame_rgb.shape[:2]
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
frame_bgr = self._resize_for_inference(frame_bgr, effective_max_side)
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
with self.live_lock:
boxes = [dict(item) for item in self.live_boxes]
prev_gray = None if self.live_prev_gray is None else self.live_prev_gray.copy()
prev_pts = None if self.live_prev_pts is None else self.live_prev_pts.copy()
live_shape = self.live_shape
cached_hud = self.live_hud
if boxes and prev_gray is not None and prev_pts is not None and live_shape == frame_bgr.shape[:2]:
try:
next_pts, status, _ = cv2.calcOpticalFlowPyrLK(
prev_gray,
gray,
prev_pts,
None,
winSize=(17, 17),
maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 20, 0.03),
)
if next_pts is not None and status is not None:
h, w = frame_bgr.shape[:2]
for i, box in enumerate(boxes):
if i >= len(next_pts) or i >= len(prev_pts):
continue
if int(status[i][0]) == 0:
continue
dx = float(next_pts[i][0][0] - prev_pts[i][0][0])
dy = float(next_pts[i][0][1] - prev_pts[i][0][1])
x1, y1, x2, y2 = box["bbox"]
x1 = float(np.clip(x1 + dx, 0, w - 1))
x2 = float(np.clip(x2 + dx, 0, w - 1))
y1 = float(np.clip(y1 + dy, 0, h - 1))
y2 = float(np.clip(y2 + dy, 0, h - 1))
if x2 > x1 and y2 > y1:
box["bbox"] = [x1, y1, x2, y2]
prev_pts = next_pts
except Exception:
pass
out = frame_bgr.copy()
for box in boxes:
x1, y1, x2, y2 = [int(v) for v in box["bbox"]]
class_name = box["class_name"]
score = box["score"]
obj_id = box["object_id"]
color = self._color_for_class(class_name)
cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)
if obj_id is not None:
label = f"{class_name} #{obj_id} {score:.2f}"
else:
label = f"{class_name} {score:.2f}"
cv2.putText(out, label, (x1, max(18, y1 - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
depth_source = profile.get("depth_source", "none")
if depth_source == "depth_v2" and self.depth_estimator is not None:
depth_map, depth_busy, _ = self._get_depth_snapshot()
if depth_map is not None and depth_map.shape[:2] == out.shape[:2]:
depth_colored = self.depth_estimator.colorize_depth(depth_map)
self._overlay_corner(out, depth_colored, size_ratio=0.24, anchor="tl")
extra = "live tracker | depth async" if depth_busy else "live tracker | depth ready"
else:
extra = "live tracker | depth warming"
elif depth_source == "geometry":
pseudo_depth = np.zeros(out.shape[:2], dtype=np.float32)
for box in boxes:
x1, y1, x2, y2 = [int(v) for v in box["bbox"]]
x1 = max(0, min(out.shape[1] - 1, x1))
x2 = max(0, min(out.shape[1], x2))
y1 = max(0, min(out.shape[0] - 1, y1))
y2 = max(0, min(out.shape[0], y2))
if x2 > x1 and y2 > y1:
pseudo_depth[y1:y2, x1:x2] = max(0.25, pseudo_depth[y1:y2, x1:x2].max())
pseudo_depth = cv2.GaussianBlur(pseudo_depth, (17, 17), 0)
depth_colored = cv2.applyColorMap((pseudo_depth * 255).astype(np.uint8), cv2.COLORMAP_INFERNO)
self._overlay_corner(out, depth_colored, size_ratio=0.24, anchor="tl")
extra = "live tracker | formula depth"
else:
extra = "live tracker"
self._draw_hud(out, cached_hud or hud_name, extra=extra)
with self.live_lock:
self.live_boxes = boxes
self.live_prev_gray = gray
self.live_prev_pts = None if not boxes else prev_pts
self.live_shape = frame_bgr.shape[:2]
self.live_hud = hud_name
if out.shape[:2] != (orig_h, orig_w):
out = cv2.resize(out, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
return cv2.cvtColor(out, cv2.COLOR_BGR2RGB), bool(boxes)
def _ensure_detector(self):
if self.detector is None:
self.detector = ObjectDetector(
model_size="nano",
conf_thres=0.25,
iou_thres=0.45,
classes=None,
device="cpu",
)
self.detector.model.overrides["max_det"] = 120
def _ensure_depth(self):
if self.depth_estimator is None:
self.depth_estimator = DepthEstimator(model_size="small", device="cpu")
def _get_depth_snapshot(self):
with self.depth_lock:
depth_map = None if self.cached_depth_map is None else self.cached_depth_map.copy()
busy = self.depth_job_running
updated = self.depth_last_update
return depth_map, busy, updated
def _start_depth_job(self, frame_bgr):
with self.depth_lock:
if self.depth_job_running:
return
self.depth_job_running = True
job_frame = frame_bgr.copy()
target_h, target_w = job_frame.shape[:2]
def worker():
try:
self._ensure_depth()
depth_input = self._resize_for_inference(job_frame, self.depth_input_side)
depth_map = self.depth_estimator.estimate_depth(depth_input)
if depth_map.shape[:2] != (target_h, target_w):
depth_map = cv2.resize(depth_map, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
with self.depth_lock:
self.cached_depth_map = depth_map
self.depth_last_update = time.time()
finally:
with self.depth_lock:
self.depth_job_running = False
threading.Thread(target=worker, daemon=True).start()
def _build_sparse_depth_map(self, shape, boxes_3d):
h, w = shape[:2]
depth_map = np.zeros((h, w), dtype=np.float32)
for box in boxes_3d:
x1, y1, x2, y2 = [int(v) for v in box["bbox_2d"]]
x1 = max(0, min(w - 1, x1))
x2 = max(0, min(w, x2))
y1 = max(0, min(h - 1, y1))
y2 = max(0, min(h, y2))
if x2 <= x1 or y2 <= y1:
continue
depth_map[y1:y2, x1:x2] = max(float(box["depth_value"]), float(depth_map[y1:y2, x1:x2].max()))
depth_map = cv2.GaussianBlur(depth_map, (21, 21), 0)
return depth_map
def _build_boxes_3d(self, detections, class_names, frame_shape, depth_map=None, method_prefix="geometry"):
boxes_3d = []
active_ids = []
for detection in detections:
bbox, score, class_id, obj_id = detection
class_name = self._class_name(class_names, class_id)
if depth_map is not None:
if class_name.lower() in ["person", "cat", "dog"]:
center_x = int((bbox[0] + bbox[2]) / 2.0)
center_y = int((bbox[1] + bbox[3]) / 2.0)
depth_value = self.depth_estimator.get_depth_at_point(depth_map, center_x, center_y)
depth_method = f"{method_prefix}-center"
else:
depth_value = self.depth_estimator.get_depth_in_region(depth_map, bbox, method="median")
depth_method = f"{method_prefix}-median"
distance_m = 1.0 + float(depth_value) * 9.0
else:
depth_value, distance_m = self.geometry_depth.estimate(
bbox=bbox,
class_name=class_name,
frame_shape=frame_shape,
object_id=obj_id,
)
depth_method = f"{method_prefix}-formula"
boxes_3d.append(
{
"bbox_2d": bbox,
"depth_value": float(np.clip(depth_value, 0.0, 1.0)),
"depth_method": depth_method,
"distance_m": float(distance_m),
"class_name": class_name,
"object_id": obj_id,
"score": score,
}
)
if obj_id is not None:
active_ids.append(obj_id)
self.bbox3d_estimator.cleanup_trackers(active_ids)
return boxes_3d
def _draw_boxes(self, frame_bgr, boxes_3d):
out = frame_bgr.copy()
for box_3d in boxes_3d:
color = self._color_for_class(box_3d["class_name"])
out = self.bbox3d_estimator.draw_box_3d(out, box_3d, color=color)
return out
def _render_depth_mode(self, frame_bgr, detections, class_names, enable_bev, depth_interval, hud_name):
self.frame_idx += 1
if self.frame_idx % max(1, depth_interval) == 0:
self._start_depth_job(frame_bgr)
depth_map, depth_busy, depth_updated = self._get_depth_snapshot()
if depth_map is None:
self._start_depth_job(frame_bgr)
if depth_map is not None:
boxes_3d = self._build_boxes_3d(
detections=detections,
class_names=class_names,
frame_shape=frame_bgr.shape,
depth_map=depth_map,
method_prefix="depthv2",
)
depth_colored = self.depth_estimator.colorize_depth(depth_map)
extra = f"depth@{int(depth_updated)}"
else:
boxes_3d = self._build_boxes_3d(
detections=detections,
class_names=class_names,
frame_shape=frame_bgr.shape,
depth_map=None,
method_prefix="geo-fallback",
)
depth_map = self._build_sparse_depth_map(frame_bgr.shape, boxes_3d)
depth_colored = cv2.applyColorMap((depth_map * 255).astype(np.uint8), cv2.COLORMAP_INFERNO)
cv2.putText(depth_colored, "Depth warming up...", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
extra = "depth async loading"
result_frame = self._draw_boxes(frame_bgr, boxes_3d)
if enable_bev:
self.bev.reset()
for box in boxes_3d:
self.bev.draw_box(box)
self._overlay_corner(result_frame, self.bev.get_image(), size_ratio=0.30, anchor="bl")
self._overlay_corner(result_frame, depth_colored, size_ratio=0.24, anchor="tl")
self._draw_hud(result_frame, hud_name, extra=("depth busy" if depth_busy else extra))
return result_frame
def _render_geometry_mode(self, frame_bgr, detections, class_names, enable_bev, hud_name):
boxes_3d = self._build_boxes_3d(
detections=detections,
class_names=class_names,
frame_shape=frame_bgr.shape,
depth_map=None,
method_prefix="geometry",
)
result_frame = self._draw_boxes(frame_bgr, boxes_3d)
pseudo_depth = self._build_sparse_depth_map(frame_bgr.shape, boxes_3d)
depth_colored = cv2.applyColorMap((pseudo_depth * 255).astype(np.uint8), cv2.COLORMAP_INFERNO)
self._overlay_corner(result_frame, depth_colored, size_ratio=0.24, anchor="tl")
if enable_bev:
self.bev.reset()
for box in boxes_3d:
self.bev.draw_box(box)
self._overlay_corner(result_frame, self.bev.get_image(), size_ratio=0.30, anchor="bl")
self._draw_hud(result_frame, hud_name, extra="formula depth")
return result_frame
def _render_fast_mode(self, frame_bgr, hud_name, detections, class_names):
out = frame_bgr.copy()
for bbox, score, class_id, obj_id in detections:
x1, y1, x2, y2 = [int(v) for v in bbox]
class_name = self._class_name(class_names, class_id)
color = self._color_for_class(class_name)
cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)
if obj_id is not None:
label = f"{class_name} #{obj_id} {score:.2f}"
else:
label = f"{class_name} {score:.2f}"
cv2.putText(out, label, (x1, max(18, y1 - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
self._draw_hud(out, hud_name)
return out
def process(
self,
frame_rgb,
mode,
conf_threshold,
iou_threshold,
enable_tracking,
enable_bev,
auto_optimize,
max_side,
depth_interval,
):
if frame_rgb is None:
return None
with self.lock:
start = time.perf_counter()
profile = self._profile(mode)
self.input_idx += 1
self._ensure_detector()
effective_conf = float(conf_threshold)
if auto_optimize:
effective_conf = max(effective_conf, float(profile.get("conf_floor", effective_conf)))
self.detector.model.overrides["conf"] = effective_conf
self.detector.model.overrides["iou"] = float(iou_threshold)
self.detector.model.overrides["max_det"] = int(profile["max_det"])
if auto_optimize:
effective_max_side = int(profile["max_side"])
effective_depth_interval = int(profile["depth_interval"])
if profile["depth_side"] > 0:
self.depth_input_side = int(profile["depth_side"])
else:
effective_max_side = int(max_side)
effective_depth_interval = max(1, int(depth_interval))
effective_tracking = bool(enable_tracking and profile["allow_tracking"])
effective_bev = bool(enable_bev and profile["allow_bev"])
orig_h, orig_w = frame_rgb.shape[:2]
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
frame_bgr = self._resize_for_inference(frame_bgr, effective_max_side)
self.detector.model.overrides["imgsz"] = int(max(frame_bgr.shape[:2]))
run_detector = self._should_run_detector(mode, profile, frame_bgr.shape)
if run_detector:
_, detections = self.detector.detect(frame_bgr, track=effective_tracking)
class_names = self.detector.get_class_names()
self.last_detections = detections
self.last_class_names = class_names
self.last_detect_mode = mode
self.last_detect_shape = frame_bgr.shape[:2]
self._set_live_state_from_detections(frame_bgr, detections, class_names, profile["hud"])
else:
detections = self.last_detections
class_names = self.last_class_names
depth_source = profile["depth_source"]
if depth_source == "depth_v2":
out_bgr = self._render_depth_mode(
frame_bgr=frame_bgr,
detections=detections,
class_names=class_names,
enable_bev=effective_bev,
depth_interval=effective_depth_interval,
hud_name=profile["hud"],
)
elif depth_source == "geometry":
out_bgr = self._render_geometry_mode(
frame_bgr=frame_bgr,
detections=detections,
class_names=class_names,
enable_bev=effective_bev,
hud_name=profile["hud"],
)
else:
out_bgr = self._render_fast_mode(
frame_bgr=frame_bgr,
hud_name=profile["hud"],
detections=detections,
class_names=class_names,
)
if out_bgr.shape[:2] != (orig_h, orig_w):
out_bgr = cv2.resize(out_bgr, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR)
elapsed_ms = (time.perf_counter() - start) * 1000.0
self.latency_ms.append(elapsed_ms)
return cv2.cvtColor(out_bgr, cv2.COLOR_BGR2RGB)
engine = RealtimeEngine()
class AsyncStreamRunner:
def __init__(self, realtime_engine):
self.engine = realtime_engine
self.lock = threading.Lock()
self.pending_job = None
self.latest_output = None
self.worker = threading.Thread(target=self._loop, daemon=True)
self.worker.start()
def _loop(self):
while True:
job = None
with self.lock:
if self.pending_job is not None:
job = self.pending_job
self.pending_job = None
if job is None:
time.sleep(0.004)
continue
frame, kwargs = job
try:
output = self.engine.process(frame_rgb=frame, **kwargs)
except Exception as exc:
output = np.zeros((360, 640, 3), dtype=np.uint8)
cv2.putText(output, "Runtime error", (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2)
cv2.putText(output, str(exc)[:90], (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
with self.lock:
self.latest_output = output
def step(self, frame, **kwargs):
if frame is None:
return None
frame_copy = frame.copy()
with self.lock:
self.pending_job = (frame_copy, kwargs)
last = None if self.latest_output is None else self.latest_output.copy()
preview, has_live_boxes = self.engine.render_live_preview(
frame_rgb=frame_copy,
mode=kwargs.get("mode", DEPTH_MODE),
auto_optimize=kwargs.get("auto_optimize", True),
max_side=kwargs.get("max_side", 640),
)
if has_live_boxes:
return preview
if last is not None:
return last
cv2.putText(preview, "Loading model/inference...", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
return preview
async_runner = AsyncStreamRunner(engine)
def process_frame(
frame,
mode,
conf_threshold,
iou_threshold,
enable_tracking,
enable_bev,
auto_optimize,
max_side,
depth_interval,
):
return async_runner.step(
frame=frame,
mode=mode,
conf_threshold=conf_threshold,
iou_threshold=iou_threshold,
enable_tracking=enable_tracking,
enable_bev=enable_bev,
auto_optimize=auto_optimize,
max_side=max_side,
depth_interval=depth_interval,
)
with gr.Blocks(title="YOLO-3D Realtime CPU (HF Space)") as demo:
gr.Markdown(
"""
# YOLO-3D Realtime CPU
Detection is always live.
Depth V2 runs asynchronously so the stream does not freeze.
"""
)
with gr.Row():
mode = gr.Radio(
choices=MODE_OPTIONS,
value=DEPTH_MODE,
label="Inference Mode",
)
auto_optimize = gr.Checkbox(value=True, label="Auto Optimize By Mode")
enable_tracking = gr.Checkbox(value=False, label="Tracking")
enable_bev = gr.Checkbox(value=False, label="Bird Eye View (Depth/Geometry)")
with gr.Row():
conf_threshold = gr.Slider(0.10, 0.80, value=0.25, step=0.05, label="Confidence")
iou_threshold = gr.Slider(0.20, 0.80, value=0.45, step=0.05, label="IoU")
max_side = gr.Slider(320, 960, value=640, step=32, label="Max Inference Side")
depth_interval = gr.Slider(1, 8, value=3, step=1, label="Depth Refresh (frames)")
with gr.Row():
webcam = gr.Image(sources=["webcam"], streaming=True, type="numpy", label="Webcam")
output = gr.Image(streaming=True, type="numpy", label="Output")
webcam.stream(
fn=process_frame,
inputs=[
webcam,
mode,
conf_threshold,
iou_threshold,
enable_tracking,
enable_bev,
auto_optimize,
max_side,
depth_interval,
],
outputs=output,
show_progress="hidden",
trigger_mode="always_last",
stream_every=0.03,
concurrency_limit=1,
)
if __name__ == "__main__":
demo.queue(max_size=6).launch()