| |
| import os |
| import threading |
| import time |
| from collections import deque |
|
|
| import cv2 |
| import gradio as gr |
| import numpy as np |
| import torch |
|
|
| from bbox3d_utils import BBox3DEstimator, BirdEyeView |
| from depth_model import DepthEstimator |
| from detection_model import ObjectDetector |
|
|
|
|
| DEPTH_MODE = "Depth V2 Realtime (CPU)" |
| DEPTH_BALANCED_MODE = "Depth V2 Balanced (CPU)" |
| DEPTH_QUALITY_MODE = "Depth V2 Quality (CPU)" |
| GEOMETRY_MODE = "Geometry Depth Realtime (CPU)" |
| FAST_MODE = "Fast Detect (CPU)" |
| ULTRA_FAST_MODE = "Ultra Fast Detect (CPU)" |
|
|
| MODE_OPTIONS = [ |
| DEPTH_MODE, |
| DEPTH_BALANCED_MODE, |
| DEPTH_QUALITY_MODE, |
| GEOMETRY_MODE, |
| FAST_MODE, |
| ULTRA_FAST_MODE, |
| ] |
|
|
| MODE_PROFILES = { |
| DEPTH_MODE: { |
| "depth_source": "depth_v2", |
| "max_side": 640, |
| "depth_side": 384, |
| "depth_interval": 3, |
| "detect_every": 2, |
| "conf_floor": 0.22, |
| "allow_tracking": False, |
| "allow_bev": True, |
| "max_det": 120, |
| "hud": "Depth Realtime", |
| }, |
| DEPTH_BALANCED_MODE: { |
| "depth_source": "depth_v2", |
| "max_side": 576, |
| "depth_side": 320, |
| "depth_interval": 4, |
| "detect_every": 3, |
| "conf_floor": 0.25, |
| "allow_tracking": False, |
| "allow_bev": True, |
| "max_det": 100, |
| "hud": "Depth Balanced", |
| }, |
| DEPTH_QUALITY_MODE: { |
| "depth_source": "depth_v2", |
| "max_side": 768, |
| "depth_side": 512, |
| "depth_interval": 1, |
| "detect_every": 1, |
| "conf_floor": 0.20, |
| "allow_tracking": False, |
| "allow_bev": True, |
| "max_det": 150, |
| "hud": "Depth Quality", |
| }, |
| GEOMETRY_MODE: { |
| "depth_source": "geometry", |
| "max_side": 640, |
| "depth_side": 0, |
| "depth_interval": 0, |
| "detect_every": 3, |
| "conf_floor": 0.25, |
| "allow_tracking": False, |
| "allow_bev": True, |
| "max_det": 120, |
| "hud": "Geometry Depth", |
| }, |
| FAST_MODE: { |
| "depth_source": "none", |
| "max_side": 640, |
| "depth_side": 0, |
| "depth_interval": 0, |
| "detect_every": 4, |
| "conf_floor": 0.30, |
| "allow_tracking": False, |
| "allow_bev": False, |
| "max_det": 100, |
| "hud": "Fast Detect", |
| }, |
| ULTRA_FAST_MODE: { |
| "depth_source": "none", |
| "max_side": 256, |
| "depth_side": 0, |
| "depth_interval": 0, |
| "detect_every": 6, |
| "conf_floor": 0.40, |
| "allow_tracking": False, |
| "allow_bev": False, |
| "max_det": 35, |
| "hud": "Ultra Fast", |
| }, |
| } |
|
|
|
|
| def _configure_cpu_runtime(): |
| cpu_count = max(1, os.cpu_count() or 1) |
| thread_count = min(4, cpu_count) |
| os.environ.setdefault("OMP_NUM_THREADS", str(thread_count)) |
| os.environ.setdefault("MKL_NUM_THREADS", str(thread_count)) |
| torch.set_num_threads(thread_count) |
| if hasattr(torch, "set_num_interop_threads"): |
| torch.set_num_interop_threads(max(1, thread_count // 2)) |
|
|
|
|
| class GeometryDepthEstimator: |
| def __init__(self): |
| self.class_heights_m = { |
| "person": 1.70, |
| "car": 1.52, |
| "truck": 3.00, |
| "bus": 3.00, |
| "motorcycle": 1.40, |
| "bicycle": 1.40, |
| "dog": 0.60, |
| "cat": 0.35, |
| "potted plant": 0.60, |
| "plant": 0.60, |
| "chair": 0.90, |
| "bottle": 0.25, |
| } |
| self.depth_history = {} |
| self.depth_smoothing = 0.65 |
|
|
| def _class_height(self, class_name): |
| key = class_name.lower() |
| if key in self.class_heights_m: |
| return self.class_heights_m[key] |
| for name, height in self.class_heights_m.items(): |
| if name in key: |
| return height |
| return 1.60 |
|
|
| def estimate(self, bbox, class_name, frame_shape, object_id=None): |
| h, w = frame_shape[:2] |
| x1, y1, x2, y2 = [float(v) for v in bbox] |
|
|
| bbox_h = max(2.0, y2 - y1) |
| bbox_w = max(2.0, x2 - x1) |
| bottom_y = min(float(h - 1), max(0.0, y2)) |
| center_x = (x1 + x2) * 0.5 |
|
|
| fx = 0.95 * float(w) |
| cy = 0.50 * float(h) |
| camera_height_m = 1.55 |
| object_height_m = self._class_height(class_name) |
|
|
| |
| dist_size = (fx * object_height_m) / bbox_h |
|
|
| |
| pixel_to_ground = max(2.0, bottom_y - cy) |
| dist_ground = (camera_height_m * fx) / pixel_to_ground |
|
|
| |
| aspect = bbox_w / bbox_h |
| aspect_factor = 1.0 - min(0.20, max(0.0, aspect - 0.6) * 0.08) |
|
|
| |
| rel_x = abs((center_x / max(1.0, float(w))) - 0.5) * 2.0 |
| side_factor = 1.0 + 0.10 * rel_x |
|
|
| distance_m = (0.72 * dist_size + 0.28 * dist_ground) * aspect_factor * side_factor |
| distance_m = float(np.clip(distance_m, 0.8, 25.0)) |
|
|
| if object_id is not None: |
| prev = self.depth_history.get(int(object_id)) |
| if prev is not None: |
| distance_m = self.depth_smoothing * prev + (1.0 - self.depth_smoothing) * distance_m |
| self.depth_history[int(object_id)] = distance_m |
|
|
| depth_norm = float(np.clip((distance_m - 1.0) / 9.0, 0.0, 1.0)) |
| return depth_norm, distance_m |
|
|
|
|
| class RealtimeEngine: |
| def __init__(self): |
| _configure_cpu_runtime() |
| self.lock = threading.Lock() |
| self.depth_lock = threading.Lock() |
| self.detector = None |
| self.depth_estimator = None |
| self.geometry_depth = GeometryDepthEstimator() |
| self.bbox3d_estimator = BBox3DEstimator() |
| self.bev = BirdEyeView(scale=55, size=(260, 260)) |
| self.latency_ms = deque(maxlen=30) |
| self.depth_input_side = 384 |
| self.frame_idx = 0 |
| self.input_idx = 0 |
| self.cached_depth_map = None |
| self.depth_job_running = False |
| self.depth_last_update = 0.0 |
| self.last_detections = [] |
| self.last_class_names = {} |
| self.last_detect_mode = None |
| self.last_detect_shape = None |
| self.live_lock = threading.Lock() |
| self.live_boxes = [] |
| self.live_prev_gray = None |
| self.live_prev_pts = None |
| self.live_shape = None |
| self.live_hud = "Live" |
|
|
| @staticmethod |
| def _profile(mode): |
| return MODE_PROFILES.get(mode, MODE_PROFILES[DEPTH_MODE]) |
|
|
| @staticmethod |
| def _class_name(class_names, class_id): |
| if isinstance(class_names, dict): |
| return class_names.get(class_id, str(class_id)) |
| if isinstance(class_names, (list, tuple)) and 0 <= class_id < len(class_names): |
| return class_names[class_id] |
| return str(class_id) |
|
|
| @staticmethod |
| def _color_for_class(class_name): |
| name = class_name.lower() |
| if "car" in name or "vehicle" in name: |
| return (0, 0, 255) |
| if "truck" in name or "bus" in name: |
| return (0, 165, 255) |
| if "person" in name: |
| return (0, 255, 0) |
| if "bicycle" in name or "motorcycle" in name: |
| return (255, 0, 0) |
| if "potted plant" in name or "plant" in name: |
| return (0, 255, 255) |
| return (255, 255, 255) |
|
|
| @staticmethod |
| def _resize_for_inference(frame, max_side): |
| h, w = frame.shape[:2] |
| longest = max(h, w) |
| if longest <= max_side: |
| return frame |
| scale = max_side / float(longest) |
| new_w = max(32, int(w * scale)) |
| new_h = max(32, int(h * scale)) |
| return cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
|
| @staticmethod |
| def _overlay_corner(base, overlay, size_ratio=0.26, anchor="tl"): |
| h, w = base.shape[:2] |
| target_h = max(64, int(h * size_ratio)) |
| target_w = int((overlay.shape[1] / max(1, overlay.shape[0])) * target_h) |
| target_w = max(64, min(target_w, w // 2)) |
| target_h = min(target_h, h // 2) |
| resized = cv2.resize(overlay, (target_w, target_h), interpolation=cv2.INTER_AREA) |
|
|
| if anchor == "tr": |
| x0, y0 = w - target_w, 0 |
| elif anchor == "bl": |
| x0, y0 = 0, h - target_h |
| elif anchor == "br": |
| x0, y0 = w - target_w, h - target_h |
| else: |
| x0, y0 = 0, 0 |
|
|
| base[y0 : y0 + target_h, x0 : x0 + target_w] = resized |
| cv2.rectangle(base, (x0, y0), (x0 + target_w, y0 + target_h), (255, 255, 255), 1) |
|
|
| def _draw_hud(self, frame, mode_name, extra=""): |
| mean_latency = float(np.mean(self.latency_ms)) if self.latency_ms else 0.0 |
| fps = (1000.0 / mean_latency) if mean_latency > 0 else 0.0 |
| text = f"{mode_name} | CPU | FPS {fps:.1f} | Latency {mean_latency:.1f} ms" |
| if extra: |
| text = f"{text} | {extra}" |
| cv2.putText(frame, text, (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.60, (0, 0, 255), 2) |
|
|
| def _should_run_detector(self, mode, profile, frame_shape): |
| detect_every = int(profile.get("detect_every", 1)) |
| if detect_every <= 1: |
| return True |
| if not self.last_detections: |
| return True |
| if self.last_detect_mode != mode: |
| return True |
| if self.last_detect_shape != frame_shape[:2]: |
| return True |
| return (self.input_idx % detect_every) == 0 |
|
|
| def _set_live_state_from_detections(self, frame_bgr, detections, class_names, hud_name): |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
| boxes = [] |
| pts = [] |
| h, w = frame_bgr.shape[:2] |
|
|
| for bbox, score, class_id, obj_id in detections: |
| x1, y1, x2, y2 = [float(v) for v in bbox] |
| x1 = float(np.clip(x1, 0, w - 1)) |
| x2 = float(np.clip(x2, 0, w - 1)) |
| y1 = float(np.clip(y1, 0, h - 1)) |
| y2 = float(np.clip(y2, 0, h - 1)) |
| if x2 <= x1 or y2 <= y1: |
| continue |
| boxes.append( |
| { |
| "bbox": [x1, y1, x2, y2], |
| "score": float(score), |
| "class_name": self._class_name(class_names, class_id), |
| "object_id": obj_id, |
| } |
| ) |
| pts.append([(x1 + x2) * 0.5, (y1 + y2) * 0.5]) |
|
|
| pts_arr = None |
| if pts: |
| pts_arr = np.array(pts, dtype=np.float32).reshape(-1, 1, 2) |
|
|
| with self.live_lock: |
| self.live_boxes = boxes |
| self.live_prev_gray = gray |
| self.live_prev_pts = pts_arr |
| self.live_shape = frame_bgr.shape[:2] |
| self.live_hud = hud_name |
|
|
| def render_live_preview(self, frame_rgb, mode, auto_optimize, max_side): |
| profile = self._profile(mode) |
| if auto_optimize: |
| effective_max_side = int(profile["max_side"]) |
| hud_name = profile["hud"] |
| else: |
| effective_max_side = int(max_side) |
| hud_name = profile["hud"] |
|
|
| orig_h, orig_w = frame_rgb.shape[:2] |
| frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) |
| frame_bgr = self._resize_for_inference(frame_bgr, effective_max_side) |
| gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY) |
|
|
| with self.live_lock: |
| boxes = [dict(item) for item in self.live_boxes] |
| prev_gray = None if self.live_prev_gray is None else self.live_prev_gray.copy() |
| prev_pts = None if self.live_prev_pts is None else self.live_prev_pts.copy() |
| live_shape = self.live_shape |
| cached_hud = self.live_hud |
|
|
| if boxes and prev_gray is not None and prev_pts is not None and live_shape == frame_bgr.shape[:2]: |
| try: |
| next_pts, status, _ = cv2.calcOpticalFlowPyrLK( |
| prev_gray, |
| gray, |
| prev_pts, |
| None, |
| winSize=(17, 17), |
| maxLevel=2, |
| criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 20, 0.03), |
| ) |
| if next_pts is not None and status is not None: |
| h, w = frame_bgr.shape[:2] |
| for i, box in enumerate(boxes): |
| if i >= len(next_pts) or i >= len(prev_pts): |
| continue |
| if int(status[i][0]) == 0: |
| continue |
| dx = float(next_pts[i][0][0] - prev_pts[i][0][0]) |
| dy = float(next_pts[i][0][1] - prev_pts[i][0][1]) |
| x1, y1, x2, y2 = box["bbox"] |
| x1 = float(np.clip(x1 + dx, 0, w - 1)) |
| x2 = float(np.clip(x2 + dx, 0, w - 1)) |
| y1 = float(np.clip(y1 + dy, 0, h - 1)) |
| y2 = float(np.clip(y2 + dy, 0, h - 1)) |
| if x2 > x1 and y2 > y1: |
| box["bbox"] = [x1, y1, x2, y2] |
| prev_pts = next_pts |
| except Exception: |
| pass |
|
|
| out = frame_bgr.copy() |
| for box in boxes: |
| x1, y1, x2, y2 = [int(v) for v in box["bbox"]] |
| class_name = box["class_name"] |
| score = box["score"] |
| obj_id = box["object_id"] |
| color = self._color_for_class(class_name) |
| cv2.rectangle(out, (x1, y1), (x2, y2), color, 2) |
| if obj_id is not None: |
| label = f"{class_name} #{obj_id} {score:.2f}" |
| else: |
| label = f"{class_name} {score:.2f}" |
| cv2.putText(out, label, (x1, max(18, y1 - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
|
|
| depth_source = profile.get("depth_source", "none") |
| if depth_source == "depth_v2" and self.depth_estimator is not None: |
| depth_map, depth_busy, _ = self._get_depth_snapshot() |
| if depth_map is not None and depth_map.shape[:2] == out.shape[:2]: |
| depth_colored = self.depth_estimator.colorize_depth(depth_map) |
| self._overlay_corner(out, depth_colored, size_ratio=0.24, anchor="tl") |
| extra = "live tracker | depth async" if depth_busy else "live tracker | depth ready" |
| else: |
| extra = "live tracker | depth warming" |
| elif depth_source == "geometry": |
| pseudo_depth = np.zeros(out.shape[:2], dtype=np.float32) |
| for box in boxes: |
| x1, y1, x2, y2 = [int(v) for v in box["bbox"]] |
| x1 = max(0, min(out.shape[1] - 1, x1)) |
| x2 = max(0, min(out.shape[1], x2)) |
| y1 = max(0, min(out.shape[0] - 1, y1)) |
| y2 = max(0, min(out.shape[0], y2)) |
| if x2 > x1 and y2 > y1: |
| pseudo_depth[y1:y2, x1:x2] = max(0.25, pseudo_depth[y1:y2, x1:x2].max()) |
| pseudo_depth = cv2.GaussianBlur(pseudo_depth, (17, 17), 0) |
| depth_colored = cv2.applyColorMap((pseudo_depth * 255).astype(np.uint8), cv2.COLORMAP_INFERNO) |
| self._overlay_corner(out, depth_colored, size_ratio=0.24, anchor="tl") |
| extra = "live tracker | formula depth" |
| else: |
| extra = "live tracker" |
|
|
| self._draw_hud(out, cached_hud or hud_name, extra=extra) |
|
|
| with self.live_lock: |
| self.live_boxes = boxes |
| self.live_prev_gray = gray |
| self.live_prev_pts = None if not boxes else prev_pts |
| self.live_shape = frame_bgr.shape[:2] |
| self.live_hud = hud_name |
|
|
| if out.shape[:2] != (orig_h, orig_w): |
| out = cv2.resize(out, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR) |
|
|
| return cv2.cvtColor(out, cv2.COLOR_BGR2RGB), bool(boxes) |
|
|
| def _ensure_detector(self): |
| if self.detector is None: |
| self.detector = ObjectDetector( |
| model_size="nano", |
| conf_thres=0.25, |
| iou_thres=0.45, |
| classes=None, |
| device="cpu", |
| ) |
| self.detector.model.overrides["max_det"] = 120 |
|
|
| def _ensure_depth(self): |
| if self.depth_estimator is None: |
| self.depth_estimator = DepthEstimator(model_size="small", device="cpu") |
|
|
| def _get_depth_snapshot(self): |
| with self.depth_lock: |
| depth_map = None if self.cached_depth_map is None else self.cached_depth_map.copy() |
| busy = self.depth_job_running |
| updated = self.depth_last_update |
| return depth_map, busy, updated |
|
|
| def _start_depth_job(self, frame_bgr): |
| with self.depth_lock: |
| if self.depth_job_running: |
| return |
| self.depth_job_running = True |
|
|
| job_frame = frame_bgr.copy() |
| target_h, target_w = job_frame.shape[:2] |
|
|
| def worker(): |
| try: |
| self._ensure_depth() |
| depth_input = self._resize_for_inference(job_frame, self.depth_input_side) |
| depth_map = self.depth_estimator.estimate_depth(depth_input) |
| if depth_map.shape[:2] != (target_h, target_w): |
| depth_map = cv2.resize(depth_map, (target_w, target_h), interpolation=cv2.INTER_LINEAR) |
| with self.depth_lock: |
| self.cached_depth_map = depth_map |
| self.depth_last_update = time.time() |
| finally: |
| with self.depth_lock: |
| self.depth_job_running = False |
|
|
| threading.Thread(target=worker, daemon=True).start() |
|
|
| def _build_sparse_depth_map(self, shape, boxes_3d): |
| h, w = shape[:2] |
| depth_map = np.zeros((h, w), dtype=np.float32) |
| for box in boxes_3d: |
| x1, y1, x2, y2 = [int(v) for v in box["bbox_2d"]] |
| x1 = max(0, min(w - 1, x1)) |
| x2 = max(0, min(w, x2)) |
| y1 = max(0, min(h - 1, y1)) |
| y2 = max(0, min(h, y2)) |
| if x2 <= x1 or y2 <= y1: |
| continue |
| depth_map[y1:y2, x1:x2] = max(float(box["depth_value"]), float(depth_map[y1:y2, x1:x2].max())) |
| depth_map = cv2.GaussianBlur(depth_map, (21, 21), 0) |
| return depth_map |
|
|
| def _build_boxes_3d(self, detections, class_names, frame_shape, depth_map=None, method_prefix="geometry"): |
| boxes_3d = [] |
| active_ids = [] |
|
|
| for detection in detections: |
| bbox, score, class_id, obj_id = detection |
| class_name = self._class_name(class_names, class_id) |
|
|
| if depth_map is not None: |
| if class_name.lower() in ["person", "cat", "dog"]: |
| center_x = int((bbox[0] + bbox[2]) / 2.0) |
| center_y = int((bbox[1] + bbox[3]) / 2.0) |
| depth_value = self.depth_estimator.get_depth_at_point(depth_map, center_x, center_y) |
| depth_method = f"{method_prefix}-center" |
| else: |
| depth_value = self.depth_estimator.get_depth_in_region(depth_map, bbox, method="median") |
| depth_method = f"{method_prefix}-median" |
| distance_m = 1.0 + float(depth_value) * 9.0 |
| else: |
| depth_value, distance_m = self.geometry_depth.estimate( |
| bbox=bbox, |
| class_name=class_name, |
| frame_shape=frame_shape, |
| object_id=obj_id, |
| ) |
| depth_method = f"{method_prefix}-formula" |
|
|
| boxes_3d.append( |
| { |
| "bbox_2d": bbox, |
| "depth_value": float(np.clip(depth_value, 0.0, 1.0)), |
| "depth_method": depth_method, |
| "distance_m": float(distance_m), |
| "class_name": class_name, |
| "object_id": obj_id, |
| "score": score, |
| } |
| ) |
|
|
| if obj_id is not None: |
| active_ids.append(obj_id) |
|
|
| self.bbox3d_estimator.cleanup_trackers(active_ids) |
| return boxes_3d |
|
|
| def _draw_boxes(self, frame_bgr, boxes_3d): |
| out = frame_bgr.copy() |
| for box_3d in boxes_3d: |
| color = self._color_for_class(box_3d["class_name"]) |
| out = self.bbox3d_estimator.draw_box_3d(out, box_3d, color=color) |
| return out |
|
|
| def _render_depth_mode(self, frame_bgr, detections, class_names, enable_bev, depth_interval, hud_name): |
| self.frame_idx += 1 |
| if self.frame_idx % max(1, depth_interval) == 0: |
| self._start_depth_job(frame_bgr) |
|
|
| depth_map, depth_busy, depth_updated = self._get_depth_snapshot() |
| if depth_map is None: |
| self._start_depth_job(frame_bgr) |
|
|
| if depth_map is not None: |
| boxes_3d = self._build_boxes_3d( |
| detections=detections, |
| class_names=class_names, |
| frame_shape=frame_bgr.shape, |
| depth_map=depth_map, |
| method_prefix="depthv2", |
| ) |
| depth_colored = self.depth_estimator.colorize_depth(depth_map) |
| extra = f"depth@{int(depth_updated)}" |
| else: |
| boxes_3d = self._build_boxes_3d( |
| detections=detections, |
| class_names=class_names, |
| frame_shape=frame_bgr.shape, |
| depth_map=None, |
| method_prefix="geo-fallback", |
| ) |
| depth_map = self._build_sparse_depth_map(frame_bgr.shape, boxes_3d) |
| depth_colored = cv2.applyColorMap((depth_map * 255).astype(np.uint8), cv2.COLORMAP_INFERNO) |
| cv2.putText(depth_colored, "Depth warming up...", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
| extra = "depth async loading" |
|
|
| result_frame = self._draw_boxes(frame_bgr, boxes_3d) |
|
|
| if enable_bev: |
| self.bev.reset() |
| for box in boxes_3d: |
| self.bev.draw_box(box) |
| self._overlay_corner(result_frame, self.bev.get_image(), size_ratio=0.30, anchor="bl") |
|
|
| self._overlay_corner(result_frame, depth_colored, size_ratio=0.24, anchor="tl") |
| self._draw_hud(result_frame, hud_name, extra=("depth busy" if depth_busy else extra)) |
| return result_frame |
|
|
| def _render_geometry_mode(self, frame_bgr, detections, class_names, enable_bev, hud_name): |
| boxes_3d = self._build_boxes_3d( |
| detections=detections, |
| class_names=class_names, |
| frame_shape=frame_bgr.shape, |
| depth_map=None, |
| method_prefix="geometry", |
| ) |
| result_frame = self._draw_boxes(frame_bgr, boxes_3d) |
|
|
| pseudo_depth = self._build_sparse_depth_map(frame_bgr.shape, boxes_3d) |
| depth_colored = cv2.applyColorMap((pseudo_depth * 255).astype(np.uint8), cv2.COLORMAP_INFERNO) |
| self._overlay_corner(result_frame, depth_colored, size_ratio=0.24, anchor="tl") |
|
|
| if enable_bev: |
| self.bev.reset() |
| for box in boxes_3d: |
| self.bev.draw_box(box) |
| self._overlay_corner(result_frame, self.bev.get_image(), size_ratio=0.30, anchor="bl") |
|
|
| self._draw_hud(result_frame, hud_name, extra="formula depth") |
| return result_frame |
|
|
| def _render_fast_mode(self, frame_bgr, hud_name, detections, class_names): |
| out = frame_bgr.copy() |
| for bbox, score, class_id, obj_id in detections: |
| x1, y1, x2, y2 = [int(v) for v in bbox] |
| class_name = self._class_name(class_names, class_id) |
| color = self._color_for_class(class_name) |
| cv2.rectangle(out, (x1, y1), (x2, y2), color, 2) |
| if obj_id is not None: |
| label = f"{class_name} #{obj_id} {score:.2f}" |
| else: |
| label = f"{class_name} {score:.2f}" |
| cv2.putText(out, label, (x1, max(18, y1 - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
| self._draw_hud(out, hud_name) |
| return out |
|
|
| def process( |
| self, |
| frame_rgb, |
| mode, |
| conf_threshold, |
| iou_threshold, |
| enable_tracking, |
| enable_bev, |
| auto_optimize, |
| max_side, |
| depth_interval, |
| ): |
| if frame_rgb is None: |
| return None |
|
|
| with self.lock: |
| start = time.perf_counter() |
| profile = self._profile(mode) |
| self.input_idx += 1 |
|
|
| self._ensure_detector() |
| effective_conf = float(conf_threshold) |
| if auto_optimize: |
| effective_conf = max(effective_conf, float(profile.get("conf_floor", effective_conf))) |
| self.detector.model.overrides["conf"] = effective_conf |
| self.detector.model.overrides["iou"] = float(iou_threshold) |
| self.detector.model.overrides["max_det"] = int(profile["max_det"]) |
|
|
| if auto_optimize: |
| effective_max_side = int(profile["max_side"]) |
| effective_depth_interval = int(profile["depth_interval"]) |
| if profile["depth_side"] > 0: |
| self.depth_input_side = int(profile["depth_side"]) |
| else: |
| effective_max_side = int(max_side) |
| effective_depth_interval = max(1, int(depth_interval)) |
|
|
| effective_tracking = bool(enable_tracking and profile["allow_tracking"]) |
| effective_bev = bool(enable_bev and profile["allow_bev"]) |
|
|
| orig_h, orig_w = frame_rgb.shape[:2] |
| frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) |
| frame_bgr = self._resize_for_inference(frame_bgr, effective_max_side) |
| self.detector.model.overrides["imgsz"] = int(max(frame_bgr.shape[:2])) |
|
|
| run_detector = self._should_run_detector(mode, profile, frame_bgr.shape) |
| if run_detector: |
| _, detections = self.detector.detect(frame_bgr, track=effective_tracking) |
| class_names = self.detector.get_class_names() |
| self.last_detections = detections |
| self.last_class_names = class_names |
| self.last_detect_mode = mode |
| self.last_detect_shape = frame_bgr.shape[:2] |
| self._set_live_state_from_detections(frame_bgr, detections, class_names, profile["hud"]) |
| else: |
| detections = self.last_detections |
| class_names = self.last_class_names |
|
|
| depth_source = profile["depth_source"] |
| if depth_source == "depth_v2": |
| out_bgr = self._render_depth_mode( |
| frame_bgr=frame_bgr, |
| detections=detections, |
| class_names=class_names, |
| enable_bev=effective_bev, |
| depth_interval=effective_depth_interval, |
| hud_name=profile["hud"], |
| ) |
| elif depth_source == "geometry": |
| out_bgr = self._render_geometry_mode( |
| frame_bgr=frame_bgr, |
| detections=detections, |
| class_names=class_names, |
| enable_bev=effective_bev, |
| hud_name=profile["hud"], |
| ) |
| else: |
| out_bgr = self._render_fast_mode( |
| frame_bgr=frame_bgr, |
| hud_name=profile["hud"], |
| detections=detections, |
| class_names=class_names, |
| ) |
|
|
| if out_bgr.shape[:2] != (orig_h, orig_w): |
| out_bgr = cv2.resize(out_bgr, (orig_w, orig_h), interpolation=cv2.INTER_LINEAR) |
|
|
| elapsed_ms = (time.perf_counter() - start) * 1000.0 |
| self.latency_ms.append(elapsed_ms) |
| return cv2.cvtColor(out_bgr, cv2.COLOR_BGR2RGB) |
|
|
|
|
| engine = RealtimeEngine() |
|
|
|
|
| class AsyncStreamRunner: |
| def __init__(self, realtime_engine): |
| self.engine = realtime_engine |
| self.lock = threading.Lock() |
| self.pending_job = None |
| self.latest_output = None |
| self.worker = threading.Thread(target=self._loop, daemon=True) |
| self.worker.start() |
|
|
| def _loop(self): |
| while True: |
| job = None |
| with self.lock: |
| if self.pending_job is not None: |
| job = self.pending_job |
| self.pending_job = None |
|
|
| if job is None: |
| time.sleep(0.004) |
| continue |
|
|
| frame, kwargs = job |
| try: |
| output = self.engine.process(frame_rgb=frame, **kwargs) |
| except Exception as exc: |
| output = np.zeros((360, 640, 3), dtype=np.uint8) |
| cv2.putText(output, "Runtime error", (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) |
| cv2.putText(output, str(exc)[:90], (20, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) |
|
|
| with self.lock: |
| self.latest_output = output |
|
|
| def step(self, frame, **kwargs): |
| if frame is None: |
| return None |
|
|
| frame_copy = frame.copy() |
| with self.lock: |
| self.pending_job = (frame_copy, kwargs) |
| last = None if self.latest_output is None else self.latest_output.copy() |
|
|
| preview, has_live_boxes = self.engine.render_live_preview( |
| frame_rgb=frame_copy, |
| mode=kwargs.get("mode", DEPTH_MODE), |
| auto_optimize=kwargs.get("auto_optimize", True), |
| max_side=kwargs.get("max_side", 640), |
| ) |
| if has_live_boxes: |
| return preview |
| if last is not None: |
| return last |
|
|
| cv2.putText(preview, "Loading model/inference...", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2) |
| return preview |
|
|
|
|
| async_runner = AsyncStreamRunner(engine) |
|
|
|
|
| def process_frame( |
| frame, |
| mode, |
| conf_threshold, |
| iou_threshold, |
| enable_tracking, |
| enable_bev, |
| auto_optimize, |
| max_side, |
| depth_interval, |
| ): |
| return async_runner.step( |
| frame=frame, |
| mode=mode, |
| conf_threshold=conf_threshold, |
| iou_threshold=iou_threshold, |
| enable_tracking=enable_tracking, |
| enable_bev=enable_bev, |
| auto_optimize=auto_optimize, |
| max_side=max_side, |
| depth_interval=depth_interval, |
| ) |
|
|
|
|
| with gr.Blocks(title="YOLO-3D Realtime CPU (HF Space)") as demo: |
| gr.Markdown( |
| """ |
| # YOLO-3D Realtime CPU |
| Detection is always live. |
| Depth V2 runs asynchronously so the stream does not freeze. |
| """ |
| ) |
|
|
| with gr.Row(): |
| mode = gr.Radio( |
| choices=MODE_OPTIONS, |
| value=DEPTH_MODE, |
| label="Inference Mode", |
| ) |
| auto_optimize = gr.Checkbox(value=True, label="Auto Optimize By Mode") |
| enable_tracking = gr.Checkbox(value=False, label="Tracking") |
| enable_bev = gr.Checkbox(value=False, label="Bird Eye View (Depth/Geometry)") |
|
|
| with gr.Row(): |
| conf_threshold = gr.Slider(0.10, 0.80, value=0.25, step=0.05, label="Confidence") |
| iou_threshold = gr.Slider(0.20, 0.80, value=0.45, step=0.05, label="IoU") |
| max_side = gr.Slider(320, 960, value=640, step=32, label="Max Inference Side") |
| depth_interval = gr.Slider(1, 8, value=3, step=1, label="Depth Refresh (frames)") |
|
|
| with gr.Row(): |
| webcam = gr.Image(sources=["webcam"], streaming=True, type="numpy", label="Webcam") |
| output = gr.Image(streaming=True, type="numpy", label="Output") |
|
|
| webcam.stream( |
| fn=process_frame, |
| inputs=[ |
| webcam, |
| mode, |
| conf_threshold, |
| iou_threshold, |
| enable_tracking, |
| enable_bev, |
| auto_optimize, |
| max_side, |
| depth_interval, |
| ], |
| outputs=output, |
| show_progress="hidden", |
| trigger_mode="always_last", |
| stream_every=0.03, |
| concurrency_limit=1, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.queue(max_size=6).launch() |
|
|