# CRITICAL: Clear CUDA_VISIBLE_DEVICES BEFORE any imports # HF Spaces may set this to "0" dynamically, locking us to a single GPU # import os # if "CUDA_VISIBLE_DEVICES" in os.environ: # del os.environ["CUDA_VISIBLE_DEVICES"] import os import logging import time from threading import RLock, Thread from queue import Queue, PriorityQueue, Full, Empty from typing import Any, Dict, List, Optional, Sequence, Tuple import cv2 import numpy as np import torch from concurrent.futures import ThreadPoolExecutor from threading import RLock from models.detectors.base import ObjectDetector from models.model_loader import load_detector, load_detector_on_device from models.segmenters.model_loader import load_segmenter, load_segmenter_on_device from models.depth_estimators.model_loader import load_depth_estimator, load_depth_estimator_on_device from models.depth_estimators.base import DepthEstimator from utils.video import extract_frames, write_video, VideoReader, VideoWriter, AsyncVideoReader from utils.gpt_distance import estimate_distance_gpt from jobs.storage import set_track_data import tempfile class AsyncVideoReader: """ Async video reader that decodes frames in a background thread. This prevents GPU starvation on multi-GPU systems by prefetching frames while the main thread is busy dispatching work to GPUs. """ def __init__(self, video_path: str, prefetch_size: int = 32): """ Initialize async video reader. Args: video_path: Path to video file prefetch_size: Number of frames to prefetch (default 32) """ from queue import Queue from threading import Thread self.video_path = video_path self.prefetch_size = prefetch_size # Open video to get metadata self._cap = cv2.VideoCapture(video_path) if not self._cap.isOpened(): raise ValueError(f"Unable to open video: {video_path}") self.fps = self._cap.get(cv2.CAP_PROP_FPS) or 30.0 self.width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.total_frames = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Prefetch queue self._queue: Queue = Queue(maxsize=prefetch_size) self._error: Exception = None self._finished = False # Start decoder thread self._thread = Thread(target=self._decode_loop, daemon=True) self._thread.start() def _decode_loop(self): """Background thread that continuously decodes frames.""" try: while True: success, frame = self._cap.read() if not success: break self._queue.put(frame) # Blocks when queue is full (backpressure) except Exception as e: self._error = e logging.error(f"AsyncVideoReader decode error: {e}") finally: self._cap.release() self._queue.put(None) # Sentinel to signal end self._finished = True def __iter__(self): return self def __next__(self) -> np.ndarray: if self._error: raise self._error frame = self._queue.get() if frame is None: raise StopIteration return frame def close(self): """Stop the decoder thread and release resources.""" # Signal thread to stop by releasing cap (if not already done) if self._cap.isOpened(): self._cap.release() # Drain queue to unblock thread if it's waiting on put() while not self._queue.empty(): try: self._queue.get_nowait() except: break def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def _check_cancellation(job_id: Optional[str]) -> None: """Check if job has been cancelled and raise exception if so.""" if job_id is None: return from jobs.storage import get_job_storage from jobs.models import JobStatus job = get_job_storage().get(job_id) if job and job.status == JobStatus.CANCELLED: raise RuntimeError("Job cancelled by user") def _color_for_label(label: str) -> Tuple[int, int, int]: # Deterministic BGR color from label text. value = abs(hash(label)) % 0xFFFFFF blue = value & 0xFF green = (value >> 8) & 0xFF red = (value >> 16) & 0xFF return (blue, green, red) def draw_boxes( frame: np.ndarray, boxes: np.ndarray, labels: Optional[Sequence[int]] = None, queries: Optional[Sequence[str]] = None, label_names: Optional[Sequence[str]] = None, ) -> np.ndarray: output = frame.copy() if boxes is None: return output for idx, box in enumerate(boxes): x1, y1, x2, y2 = [int(coord) for coord in box] if label_names is not None and idx < len(label_names): label = label_names[idx] elif labels is not None and idx < len(labels) and queries is not None: label_idx = int(labels[idx]) if 0 <= label_idx < len(queries): label = queries[label_idx] else: label = f"label_{label_idx}" else: label = f"label_{idx}" color = _color_for_label(label) cv2.rectangle(output, (x1, y1), (x2, y2), color, thickness=2) if label: font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1.0 thickness = 2 text_size, baseline = cv2.getTextSize(label, font, font_scale, thickness) text_w, text_h = text_size pad = 4 text_x = x1 text_y = max(y1 - 6, text_h + pad) box_top_left = (text_x, text_y - text_h - pad) box_bottom_right = (text_x + text_w + pad, text_y + baseline) cv2.rectangle(output, box_top_left, box_bottom_right, color, thickness=-1) cv2.putText( output, label, (text_x + pad // 2, text_y - 2), font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA, ) return output def draw_masks( frame: np.ndarray, masks: np.ndarray, alpha: float = 0.65, labels: Optional[Sequence[str]] = None, ) -> np.ndarray: output = frame.copy() if masks is None or len(masks) == 0: return output for idx, mask in enumerate(masks): if mask is None: continue if mask.ndim == 3: mask = mask[0] if mask.shape[:2] != output.shape[:2]: mask = cv2.resize(mask, (output.shape[1], output.shape[0]), interpolation=cv2.INTER_NEAREST) mask_bool = mask.astype(bool) overlay = np.zeros_like(output, dtype=np.uint8) label = None if labels and idx < len(labels): label = labels[idx] if not label: label = f"object_{idx}" color = _color_for_label(label) overlay[mask_bool] = color output = cv2.addWeighted(output, 1.0, overlay, alpha, 0) contours, _ = cv2.findContours( mask_bool.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) if contours: cv2.drawContours(output, contours, -1, color, thickness=2) if label: coords = np.column_stack(np.where(mask_bool)) if coords.size: y, x = coords[0] font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1.0 thickness = 2 text_size, baseline = cv2.getTextSize(label, font, font_scale, thickness) text_w, text_h = text_size pad = 4 text_x = int(x) text_y = max(int(y) - 6, text_h + pad) box_top_left = (text_x, text_y - text_h - pad) box_bottom_right = (text_x + text_w + pad, text_y + baseline) cv2.rectangle(output, box_top_left, box_bottom_right, color, thickness=-1) cv2.putText( output, label, (text_x + pad // 2, text_y - 2), font, font_scale, (255, 255, 255), thickness, lineType=cv2.LINE_AA, ) return output def _build_detection_records( boxes: np.ndarray, scores: Sequence[float], labels: Sequence[int], queries: Sequence[str], label_names: Optional[Sequence[str]] = None, ) -> List[Dict[str, Any]]: detections: List[Dict[str, Any]] = [] for idx, box in enumerate(boxes): if label_names is not None and idx < len(label_names): label = label_names[idx] else: label_idx = int(labels[idx]) if idx < len(labels) else -1 if 0 <= label_idx < len(queries): label = queries[label_idx] else: label = f"label_{label_idx}" detections.append( { "label": label, "score": float(scores[idx]) if idx < len(scores) else 0.0, "bbox": [int(coord) for coord in box.tolist()], } ) return detections from utils.tracker import ByteTracker class SpeedEstimator: def __init__(self, fps: float = 30.0): self.fps = fps self.pixel_scale_map = {} # label -> pixels_per_meter (heuristic) def estimate(self, detections: List[Dict[str, Any]]): for det in detections: history = det.get('history', []) if len(history) < 5: continue # Simple heuristic: Speed based on pixel movement # We assume a base depth or size. # Delta over last 5 frames curr = history[-1] prev = history[-5] # Centroids cx1 = (curr[0] + curr[2]) / 2 cy1 = (curr[1] + curr[3]) / 2 cx2 = (prev[0] + prev[2]) / 2 cy2 = (prev[1] + prev[3]) / 2 dist_px = np.sqrt((cx1-cx2)**2 + (cy1-cy2)**2) # Heuristic scale: Assume car is ~4m long? Or just arbitrary pixel scale # If we had GPT distance, we could calibrate. # For now, let's use a dummy scale: 50px = 1m (very rough) # Speed = (dist_px / 50) meters / (5 frames / 30 fps) seconds # = (dist_px / 50) / (0.166) m/s # = (dist_px * 0.12) m/s # = * 3.6 km/h scale = 50.0 dt = 5.0 / self.fps speed_mps = (dist_px / scale) / dt speed_kph = speed_mps * 3.6 # Smoothing det['speed_kph'] = speed_kph # Direction dx = cx1 - cx2 dy = cy1 - cy2 angle = np.degrees(np.arctan2(dy, dx)) # 0 is right, 90 is down # Map to clock direction (12 is up = -90 deg) # -90 (up) -> 12 # 0 (right) -> 3 # 90 (down) -> 6 # 180 (left) -> 9 # Adjust so 12 is up (negative Y) # angle -90 is 12 clock_hour = ((angle + 90) / 30 + 12) % 12 if clock_hour == 0: clock_hour = 12.0 det['direction_clock'] = f"{int(round(clock_hour))} o'clock" det['angle_deg'] = angle # 0 is right, 90 is down (screen space) _MODEL_LOCKS: Dict[str, RLock] = {} _MODEL_LOCKS_GUARD = RLock() _DEPTH_SCALE = float(os.getenv("DEPTH_SCALE", "25.0")) def _get_model_lock(kind: str, name: str) -> RLock: key = f"{kind}:{name}" with _MODEL_LOCKS_GUARD: lock = _MODEL_LOCKS.get(key) if lock is None: lock = RLock() _MODEL_LOCKS[key] = lock return lock def _attach_depth_metrics( frame: np.ndarray, detections: List[Dict[str, Any]], depth_estimator_name: Optional[str], depth_scale: float, # No longer used for distance calculation estimator_instance: Optional[Any] = None, ) -> None: """Attach relative depth values for visualization only. GPT handles distance estimation.""" if not detections or (not depth_estimator_name and not estimator_instance): return from models.depth_estimators.model_loader import load_depth_estimator if estimator_instance: estimator = estimator_instance # Use instance lock if available, or create one if hasattr(estimator, "lock"): lock = estimator.lock else: # Fallback (shouldn't happen with our new setup but safe) lock = _get_model_lock("depth", estimator.name) else: estimator = load_depth_estimator(depth_estimator_name) lock = _get_model_lock("depth", estimator.name) with lock: depth_result = estimator.predict(frame) depth_map = depth_result.depth_map if depth_map is None or depth_map.size == 0: return height, width = depth_map.shape[:2] raw_depths: List[Tuple[Dict[str, Any], float]] = [] for det in detections: det["depth_rel"] = None # Relative depth for visualization only bbox = det.get("bbox") if not bbox or len(bbox) < 4: continue x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]] x1 = max(0, min(width - 1, x1)) y1 = max(0, min(height - 1, y1)) x2 = max(x1 + 1, min(width, x2)) y2 = max(y1 + 1, min(height, y2)) patch = depth_map[y1:y2, x1:x2] if patch.size == 0: continue # Center crop (50%) to avoid background h_p, w_p = patch.shape cy, cx = h_p // 2, w_p // 2 dy, dx = h_p // 4, w_p // 4 center_patch = patch[cy - dy : cy + dy, cx - dx : cx + dx] # Fallback to full patch if center is empty (unlikely) if center_patch.size == 0: center_patch = patch finite = center_patch[np.isfinite(center_patch)] if finite.size == 0: continue depth_raw = float(np.median(finite)) if depth_raw > 1e-6: raw_depths.append((det, depth_raw)) if not raw_depths: return # Compute relative depth (0-1) for visualization only all_raw = [d[1] for d in raw_depths] min_raw, max_raw = min(all_raw), max(all_raw) denom = max(max_raw - min_raw, 1e-6) for det, depth_raw in raw_depths: # Inverted: higher raw = closer = lower rel value (0=close, 1=far) det["depth_rel"] = 1.0 - ((depth_raw - min_raw) / denom) def infer_frame( frame: np.ndarray, queries: Sequence[str], detector_name: Optional[str] = None, depth_estimator_name: Optional[str] = None, depth_scale: float = 1.0, detector_instance: Optional[ObjectDetector] = None, depth_estimator_instance: Optional[Any] = None, ) -> Tuple[np.ndarray, List[Dict[str, Any]]]: if detector_instance: detector = detector_instance else: detector = load_detector(detector_name) text_queries = list(queries) or ["object"] try: if hasattr(detector, "lock"): lock = detector.lock else: lock = _get_model_lock("detector", detector.name) with lock: result = detector.predict(frame, text_queries) detections = _build_detection_records( result.boxes, result.scores, result.labels, text_queries, result.label_names ) if depth_estimator_name or depth_estimator_instance: try: _attach_depth_metrics( frame, detections, depth_estimator_name, depth_scale, estimator_instance=depth_estimator_instance ) except Exception: logging.exception("Depth estimation failed for frame") # Re-build display labels to include GPT distance if available display_labels = [] for i, det in enumerate(detections): label = det["label"] if det.get("gpt_distance_m") is not None: # Add GPT distance to label, e.g. "car 12m" depth_str = f"{int(det['gpt_distance_m'])}m" label = f"{label} {depth_str}" logging.debug("Object '%s' at %s (bbox: %s)", label, depth_str, det['bbox']) display_labels.append(label) except Exception: logging.exception("Inference failed for queries %s", text_queries) raise return draw_boxes( frame, result.boxes, labels=None, # Use custom labels queries=None, label_names=display_labels, ), detections def infer_batch( frames: List[np.ndarray], frame_indices: List[int], queries: Sequence[str], detector_instance: ObjectDetector, depth_estimator_instance: Optional[DepthEstimator] = None, depth_scale: float = 1.0, depth_frame_stride: int = 3, ) -> List[Tuple[int, np.ndarray, List[Dict[str, Any]]]]: # Batch detection text_queries = list(queries) or ["object"] try: if detector_instance.supports_batch: with detector_instance.lock: det_results = detector_instance.predict_batch(frames, text_queries) else: # Fallback with detector_instance.lock: det_results = [detector_instance.predict(f, text_queries) for f in frames] except Exception: logging.exception("Batch detection failed") # Return empty for all return [(idx, f, []) for idx, f in zip(frame_indices, frames)] # Batch depth depth_map_results = {} # frame_idx -> depth_map depth_batch_inputs = [] depth_batch_indices = [] for idx, f in zip(frame_indices, frames): if idx % depth_frame_stride == 0: depth_batch_inputs.append(f) depth_batch_indices.append(idx) if depth_estimator_instance and depth_batch_inputs: try: with depth_estimator_instance.lock: if depth_estimator_instance.supports_batch: d_results = depth_estimator_instance.predict_batch(depth_batch_inputs) else: d_results = [depth_estimator_instance.predict(f) for f in depth_batch_inputs] for idx, res in zip(depth_batch_indices, d_results): depth_map_results[idx] = res except Exception: logging.exception("Batch depth estimation failed") # Post-process and merge outputs = [] for i, (idx, frame, det_result) in enumerate(zip(frame_indices, frames, det_results)): detections = _build_detection_records( det_result.boxes, det_result.scores, det_result.labels, text_queries, det_result.label_names ) if idx in depth_map_results: try: # existing _attach_depth_metrics expects detections and estimator name/instance # but we already computed depth. We need a helper or just modify logical flow. # Actually _attach_depth_metrics calls predict(). We want to skip predict. # Let's manually attach. d_res = depth_map_results[idx] # We need to manually invoke the attachment logic using the precomputed result. # Refactoring _attach_depth_metrics to accept result would be cleaner, but for now: # Copy-paste logic or use a trick. # Let's extract logic from _attach_depth_metrics essentially. # Wait, _attach_depth_metrics does the box checking. _attach_depth_from_result(detections, d_res, depth_scale) except Exception: logging.warning("Failed to attach depth for frame %d", idx) display_labels = [_build_display_label(d) for d in detections] processed = draw_boxes(frame, det_result.boxes, label_names=display_labels) outputs.append((idx, processed, detections)) return outputs def _build_display_label(det): """Build display label with GPT distance if available.""" label = det["label"] if det.get("gpt_distance_m") is not None: label = f"{label} {int(det['gpt_distance_m'])}m" return label def _attach_depth_from_result(detections, depth_result, depth_scale): """Attach relative depth values for visualization only. GPT handles distance estimation.""" depth_map = depth_result.depth_map if depth_map is None or depth_map.size == 0: return height, width = depth_map.shape[:2] raw_depths = [] for det in detections: det["depth_rel"] = None # Relative depth for visualization only bbox = det.get("bbox") if not bbox or len(bbox) < 4: continue x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]] x1 = max(0, min(width - 1, x1)) y1 = max(0, min(height - 1, y1)) x2 = max(x1 + 1, min(width, x2)) y2 = max(y1 + 1, min(height, y2)) patch = depth_map[y1:y2, x1:x2] if patch.size == 0: continue h_p, w_p = patch.shape cy, cx = h_p // 2, w_p // 2 dy, dx = h_p // 4, w_p // 4 center_patch = patch[cy - dy : cy + dy, cx - dx : cx + dx] if center_patch.size == 0: center_patch = patch finite = center_patch[np.isfinite(center_patch)] if finite.size == 0: continue depth_raw = float(np.median(finite)) if depth_raw > 1e-6: raw_depths.append((det, depth_raw)) if not raw_depths: return # Compute relative depth (0-1) for visualization only all_raw = [d[1] for d in raw_depths] min_raw, max_raw = min(all_raw), max(all_raw) denom = max(max_raw - min_raw, 1e-6) for det, depth_raw in raw_depths: # Inverted: higher raw = closer = lower rel value (0=close, 1=far) det["depth_rel"] = 1.0 - ((depth_raw - min_raw) / denom) def infer_segmentation_frame( frame: np.ndarray, text_queries: Optional[List[str]] = None, segmenter_name: Optional[str] = None, segmenter_instance: Optional[Any] = None, ) -> Tuple[np.ndarray, Any]: if segmenter_instance: segmenter = segmenter_instance # Use instance lock if available if hasattr(segmenter, "lock"): lock = segmenter.lock else: lock = _get_model_lock("segmenter", segmenter.name) else: segmenter = load_segmenter(segmenter_name) lock = _get_model_lock("segmenter", segmenter.name) with lock: result = segmenter.predict(frame, text_prompts=text_queries) labels = text_queries or [] if len(labels) == 1: masks = result.masks if result.masks is not None else [] labels = [labels[0] for _ in range(len(masks))] return draw_masks(frame, result.masks, labels=labels), result def extract_first_frame(video_path: str) -> Tuple[np.ndarray, float, int, int]: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Unable to open video.") fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) success, frame = cap.read() cap.release() if not success or frame is None: raise ValueError("Video decode produced zero frames.") return frame, fps, width, height def compute_depth_per_detection( depth_map: np.ndarray, detections: List[Dict], depth_scale: float = 1.0 ) -> List[Dict]: """Sample depth for each detection bbox, compute relative distances.""" depths = [] for det in detections: x1, y1, x2, y2 = det["bbox"] # Sample central 50% region for robustness (avoids edge artifacts) cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 hw, hh = max(1, (x2 - x1) // 4), max(1, (y2 - y1) // 4) y_start, y_end = max(0, cy - hh), min(depth_map.shape[0], cy + hh) x_start, x_end = max(0, cx - hw), min(depth_map.shape[1], cx + hw) region = depth_map[y_start:y_end, x_start:x_end] valid = region[np.isfinite(region)] if len(valid) >= 10: det["depth_est_m"] = float(np.median(valid)) * depth_scale det["depth_valid"] = True depths.append(det["depth_est_m"]) else: det["depth_est_m"] = None det["depth_valid"] = False det["depth_rel"] = None # Per-frame relative normalization if depths: min_d, max_d = min(depths), max(depths) span = max_d - min_d + 1e-6 for det in detections: if det.get("depth_valid"): det["depth_rel"] = (det["depth_est_m"] - min_d) / span elif len(detections) == 1 and detections[0].get("depth_valid"): # Single detection: assign neutral relative distance detections[0]["depth_rel"] = 0.5 return detections def process_first_frame( video_path: str, queries: List[str], mode: str, detector_name: Optional[str] = None, segmenter_name: Optional[str] = None, depth_estimator_name: Optional[str] = None, depth_scale: Optional[float] = None, enable_depth_estimator: bool = False, enable_gpt: bool = True, # ENABLED BY DEFAULT ) -> Tuple[np.ndarray, List[Dict[str, Any]], Optional[np.ndarray]]: frame, _, _, _ = extract_first_frame(video_path) if mode == "segmentation": processed, _ = infer_segmentation_frame( frame, text_queries=queries, segmenter_name=segmenter_name ) return processed, [], None processed, detections = infer_frame( frame, queries, detector_name=detector_name ) # 1. Synchronous Depth Estimation (HF Backend) depth_map = None # If a specific depth estimator is requested OR if generic "enable" flag is on should_run_depth = (depth_estimator_name is not None) or enable_depth_estimator if should_run_depth and detections: try: # Resolve name: if none given, default to "depth" d_name = depth_estimator_name if depth_estimator_name else "depth" scale = depth_scale if depth_scale is not None else 1.0 logging.info(f"Running synchronous depth estimation with {d_name} (scale={scale})...") estimator = load_depth_estimator(d_name) # Run prediction with _get_model_lock("depth", estimator.name): result = estimator.predict(frame) depth_map = result.depth_map # Compute per-detection depth metrics detections = compute_depth_per_detection(depth_map, detections, scale) except Exception as e: logging.exception(f"First frame depth failed: {e}") # Mark all detections as depth_valid=False just in case for det in detections: det["depth_est_m"] = None det["depth_rel"] = None det["depth_valid"] = False return processed, detections, depth_map # 2. GPT-based Distance/Direction Estimation (Explicitly enabled) if enable_gpt: # We need to save the frame temporarily to pass to GPT (or refactor gpt_distance to take buffer) # For now, write to temp file try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img: cv2.imwrite(tmp_img.name, frame) gpt_results = estimate_distance_gpt(tmp_img.name, detections) logging.info(f"GPT Output for First Frame:\n{gpt_results}") # Expose to HF logs os.remove(tmp_img.name) # Clean up immediatey # Merge GPT results into detections # GPT returns { "T01": { "distance_m": ..., "direction": ... } } # Detections are list of dicts. We assume T01 maps to index 0, T02 to index 1... for i, det in enumerate(detections): # ID format matches what we constructed in gpt_distance.py obj_id = f"T{str(i+1).zfill(2)}" if obj_id in gpt_results: info = gpt_results[obj_id] det["gpt_distance_m"] = info.get("distance_m") det["gpt_direction"] = info.get("direction") det["gpt_description"] = info.get("description") # GPT is the sole source of distance - no polyfill needed except Exception as e: logging.error(f"GPT Distance estimation failed: {e}") return processed, detections def run_inference( input_video_path: str, output_video_path: str, queries: List[str], max_frames: Optional[int] = None, detector_name: Optional[str] = None, job_id: Optional[str] = None, depth_estimator_name: Optional[str] = None, depth_scale: float = 1.0, enable_gpt: bool = True, stream_queue: Optional[Queue] = None, ) -> Tuple[str, List[List[Dict[str, Any]]]]: # 1. Setup Video Reader try: reader = AsyncVideoReader(input_video_path) except ValueError: logging.exception("Failed to open video at %s", input_video_path) raise fps = reader.fps width = reader.width height = reader.height total_frames = reader.total_frames if max_frames is not None: total_frames = min(total_frames, max_frames) # 2. Defaults and Config if not queries: queries = ["person", "car", "truck", "motorcycle", "bicycle", "bus", "train", "airplane"] logging.info("No queries provided, using defaults: %s", queries) logging.info("Detection queries: %s", queries) active_detector = detector_name or "hf_yolov8" # Parallel Model Loading num_gpus = torch.cuda.device_count() detectors = [] depth_estimators = [] if num_gpus > 0: logging.info("Detected %d GPUs. Loading models in parallel...", num_gpus) def load_models_on_gpu(gpu_id: int): device_str = f"cuda:{gpu_id}" try: det = load_detector_on_device(active_detector, device_str) det.lock = RLock() depth = None if depth_estimator_name: depth = load_depth_estimator_on_device(depth_estimator_name, device_str) depth.lock = RLock() return (gpu_id, det, depth) except Exception as e: logging.error(f"Failed to load models on GPU {gpu_id}: {e}") raise with ThreadPoolExecutor(max_workers=num_gpus) as loader_pool: futures = [loader_pool.submit(load_models_on_gpu, i) for i in range(num_gpus)] results = [f.result() for f in futures] # Sort by GPU ID to ensure consistent indexing results.sort(key=lambda x: x[0]) for _, det, depth in results: detectors.append(det) depth_estimators.append(depth) else: logging.info("No GPUs detected. Loading CPU models...") det = load_detector(active_detector) det.lock = RLock() detectors.append(det) if depth_estimator_name: depth = load_depth_estimator(depth_estimator_name) depth.lock = RLock() depth_estimators.append(depth) else: depth_estimators.append(None) # 4. Phase 1: Pre-Scan (Depth Normalization Stats) - ONLY IF DEPTH ENABLED global_min, global_max = 0.0, 1.0 if depth_estimator_name and depth_estimators[0]: logging.info("Starting Phase 1: Pre-scan for depth stats...") # We need a quick scan logic here. # Since we have loaded models, we can use one of them to scan a few frames. # Let's pick 0-th GPU model. scan_est = depth_estimators[0] scan_values = [] # Sample frames: First 10, Middle 10, Last 10 target_indices = set(list(range(0, 10)) + list(range(total_frames//2, total_frames//2 + 10)) + list(range(max(0, total_frames-10), total_frames))) target_indices = sorted([i for i in target_indices if i < total_frames]) try: # Quick reader scan reader_scan = AsyncVideoReader(input_video_path) scan_frames = [] for i, frame in enumerate(reader_scan): if i in target_indices: scan_frames.append(frame) if i > max(target_indices): break reader_scan.close() # Predict with scan_est.lock: # Batch if supported, else loop if scan_est.supports_batch and scan_frames: scan_res = scan_est.predict_batch(scan_frames) else: scan_res = [scan_est.predict(f) for f in scan_frames] for r in scan_res: if r.depth_map is not None: scan_values.append(r.depth_map) # Stats if scan_values: all_vals = np.concatenate([v.ravel() for v in scan_values]) valid = all_vals[np.isfinite(all_vals)] if valid.size > 0: global_min = float(np.percentile(valid, 1)) global_max = float(np.percentile(valid, 99)) # Prevent zero range if abs(global_max - global_min) < 1e-6: global_max = global_min + 1.0 logging.info("Global Depth Range: %.2f - %.2f", global_min, global_max) except Exception as e: logging.warning("Pre-scan failed, using default range: %s", e) # queue_in: (frame_idx, frame_data) # queue_out: (frame_idx, processed_frame, detections) queue_in = Queue(maxsize=16) # Tuning for A10: buffer at least 32 frames per GPU (batch size) # GPT Latency Buffer: GPT takes ~3s. At 30fps, that's 90 frames. We need to absorb this burst. queue_out_max = max(512, (len(detectors) if detectors else 1) * 64) queue_out = Queue(maxsize=queue_out_max) # 6. Worker Function (Unified) # Robustness: Define flag early so workers can see it writer_finished = False def worker_task(gpu_idx: int): logging.info(f"Worker {gpu_idx} started. PID: {os.getpid()}") detector_instance = detectors[gpu_idx] depth_instance = depth_estimators[gpu_idx] if gpu_idx < len(depth_estimators) else None # Handle mismatched lists safely batch_size = detector_instance.max_batch_size if detector_instance.supports_batch else 1 batch_accum = [] # List[Tuple[idx, frame]] def flush_batch(): if not batch_accum: return logging.info(f"Worker {gpu_idx} flushing batch of {len(batch_accum)} frames") indices = [item[0] for item in batch_accum] frames = [item[1] for item in batch_accum] # --- UNIFIED INFERENCE --- # Run detection batch try: if detector_instance.supports_batch: with detector_instance.lock: det_results = detector_instance.predict_batch(frames, queries) else: with detector_instance.lock: det_results = [detector_instance.predict(f, queries) for f in frames] except BaseException as e: logging.exception("Batch detection crashed with critical error") det_results = [None] * len(frames) # Run depth batch (if enabled) depth_results = [None] * len(frames) if depth_instance and depth_estimator_name: try: with depth_instance.lock: if depth_instance.supports_batch: depth_results = depth_instance.predict_batch(frames) else: depth_results = [depth_instance.predict(f) for f in frames] except BaseException as e: logging.exception("Batch depth crashed with critical error") # --- POST PROCESSING --- for i, (idx, frame, d_res, dep_res) in enumerate(zip(indices, frames, det_results, depth_results)): # 1. Detections detections = [] if d_res: detections = _build_detection_records( d_res.boxes, d_res.scores, d_res.labels, queries, d_res.label_names ) # 2. Frame Rendering processed = frame.copy() # A. Render Depth Heatmap (if enabled) if dep_res and dep_res.depth_map is not None: processed = colorize_depth_map(dep_res.depth_map, global_min, global_max) try: _attach_depth_from_result(detections, dep_res, depth_scale) except: pass # 3. Output while True: try: queue_out.put((idx, processed, detections), timeout=1.0) break except Full: # Robustness: Check if writer is dead if writer_finished: raise RuntimeError("Writer thread died unexpectedly") if job_id: _check_cancellation(job_id) batch_accum.clear() logging.info(f"Worker {gpu_idx} finished flushing batch") while True: try: item = queue_in.get(timeout=2.0) except Empty: # Periodic check for cancellation if main thread is slow if job_id: _check_cancellation(job_id) continue try: if item is None: logging.info(f"Worker {gpu_idx} received sentinel. Flushing and exiting.") flush_batch() break frame_idx, frame_data = item # logging.info(f"Worker {gpu_idx} got frame {frame_idx}") # Verbose if frame_idx % 30 == 0: logging.info("Processing frame %d on device %s", frame_idx, "cpu" if num_gpus==0 else f"cuda:{gpu_idx}") batch_accum.append((frame_idx, frame_data)) if len(batch_accum) >= batch_size: flush_batch() except BaseException as e: logging.exception(f"Worker {gpu_idx} CRASHED processing frame. Recovering...") # Emit empty/failed frames for the batch to keep sequence alive for idx, frm in batch_accum: try: # Fallback: Return original frame with empty detections queue_out.put((idx, frm, []), timeout=5.0) logging.info(f"Emitted fallback frame {idx}") except: pass batch_accum.clear() finally: queue_in.task_done() logging.info(f"Worker {gpu_idx} thread exiting normally.") # 6. Start Workers workers = [] num_workers = len(detectors) # If using CPU, maybe use more threads? No, CPU models usually multithread internally. # If using GPU, 1 thread per GPU is efficient. for i in range(num_workers): t = Thread(target=worker_task, args=(i,), daemon=True) t.start() workers.append(t) # 7. Start Writer / Output Collection (Main Thread or separate) # We will run writer logic in the main thread after feeding is done? # No, we must write continuously. all_detections_map = {} # writer_finished initialized earlier # writer_finished = False def writer_loop(): nonlocal writer_finished next_idx = 0 buffer = {} # Initialize Tracker & Speed Estimator tracker = ByteTracker(frame_rate=fps) speed_est = SpeedEstimator(fps=fps) try: with VideoWriter(output_video_path, fps, width, height) as writer: while next_idx < total_frames: # Fetch from queue try: while next_idx not in buffer: # Backpressure: If buffer gets too big due to out-of-order frames, # we might want to warn or just hope for the best. # But here we are just consuming. # However, if 'buffer' grows too large (because we are missing next_idx), # we are effectively unbounded again if queue_out fills up with future frames. # So we should monitor buffer size. if len(buffer) > 200 and len(buffer) % 50 == 0: logging.warning("Writer buffer large (%d items), waiting for frame %d (GPT Latency?)...", len(buffer), next_idx) item = queue_out.get(timeout=1.0) # wait idx, p_frame, dets = item buffer[idx] = (p_frame, dets) # Write next_idx p_frame, dets = buffer.pop(next_idx) # --- GPT ESTIMATION (Frame 0 Only) --- if next_idx == 0 and enable_gpt and dets: try: logging.info("Running GPT estimation for video start (Frame 0)...") with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: cv2.imwrite(tmp.name, p_frame) # Use processed frame (boxes not yet drawn) # Wait, p_frame might have heatmaps if depth enabled? No, draw_boxes comes later. # Actually, colorize_depth_map might have happened in worker. # But raw image is better? We don't have raw image here easily without stashing. # p_frame is 'processed'. If depth map enabled, it's a heatmap. Not good for GPT. # GPT needs RGB image. # Worker: processed = frame.copy() -> colorize -> draw_boxes (removed). # So processed is potentially modified. # Ideally we want original. # But let's assume for now processed is fine (if depth disabled) or GPT can handle it. # If depth is enabled, processed is a heatmap. GPT will fail to see car color/details. # FIX: We need access to original frame? # worker sends (idx, processed, detections). # It does NOT send original frame. # We should change worker to send original? Or assume GPT runs on processed? # If processed is heatmap, it's bad. # But User Objective says "legacy depth estimation" is optional/deprecated. # If depth_estimator_name is None, processed is just frame. gpt_res = estimate_distance_gpt(tmp.name, dets) os.remove(tmp.name) # Merge # Helper to match IDs? # estimate_distance_gpt expects us to pass detections list, output keyed by T01.. # But detections don't have IDs yet! SimpleTracker assigns them. # We assign temporary IDs T01.. based on index for GPT matching? # gpt_distance.py generates IDs if not present. # Let's inspect gpt_distance.py... assume it matches by index T01, T02... for i, d in enumerate(dets): oid = f"T{str(i+1).zfill(2)}" if oid in gpt_res: d.update(gpt_res[oid]) except Exception as e: logging.error("GPT failed for Frame 0: %s", e) # --- SEQUENTIAL TRACKING --- # Update tracker with current frame detections # ByteTracker returns the list of ACTIVE tracks with IDs dets = tracker.update(dets) speed_est.estimate(dets) # --- RENDER BOXES & OVERLAYS --- # We need to convert list of dicts back to boxes array for draw_boxes if dets: display_boxes = np.array([d['bbox'] for d in dets]) display_labels = [] for d in dets: lbl = d.get('label', 'obj') # Append Track ID if 'track_id' in d: lbl = f"{d['track_id']} {lbl}" # Speed display removed per user request # if 'speed_kph' in d and d['speed_kph'] > 1.0: # lbl += f" {int(d['speed_kph'])}km/h" # Distance display removed per user request # if d.get('gpt_distance_m'): # lbl += f" {int(d['gpt_distance_m'])}m" display_labels.append(lbl) p_frame = draw_boxes(p_frame, display_boxes, label_names=display_labels) writer.write(p_frame) if stream_queue: try: # Send TRACKED detections to frontend for overlay # We need to attach them to the frame or send separately? # The stream_queue expects 'p_frame' which is an image. # The frontend polls for 'async job' status which returns video, but # we also want live updates during streaming? # Currently streaming is just Mjpeg of p_frame. stream_queue.put(p_frame, timeout=0.01) except: pass all_detections_map[next_idx] = dets # Store tracks for frontend access if job_id: set_track_data(job_id, next_idx, dets) next_idx += 1 if next_idx % 30 == 0: logging.debug("Wrote frame %d/%d", next_idx, total_frames) except Exception as e: logging.error(f"Writer loop processing error at index {next_idx}: {e}") # Important: If we failed AFTER popping from buffer, we must increment next_idx to avoid infinite loop # How do we know? We can check if next_idx is in buffer. # If we popped it, it's not in buffer. # But wait, next_idx is used for loop condition. # If we successfully popped it but failed later, we lost the frame. # We should increment next_idx to skip it. # Heuristic: If we are here, something failed. # If we haven't successfully written/processed, we should probably skip this frame processing # to let the loop continue to next frame. # But we need to make sure we don't skip if the error was just "queue empty" (timeout). # Wait, queue_out.get raises Empty. 'Empty' is NOT Exception? # In Python 'queue.Empty' inherits form Exception? # Actually 'queue.Empty' exception is just 'Exception'. # Let's check imports. from queue import Empty. # Yes. # We should catch Empty explicitly? # No, get(timeout=1.0) raises Empty. # If the error is NOT Empty, then it's a real crash. if "Empty" not in str(type(e)): logging.error(f"CRITICAL WRITER ERROR: {e}") # Force skip frame if we suspect we are stuck # Only if we hold the lock/state? # Simpler: Just try to proceed. # If we popped the frame, next_idx should be incremented? # Actually we can't easily know if we popped. # But we can check if we are stuck on the same index for too long? pass # Check cancellation or timeout if job_id and _check_cancellation(job_id): # This raises pass if not any(w.is_alive() for w in workers) and queue_out.empty(): # Workers dead, queue empty, but not finished? prevent infinite loop logging.error("Workers stopped unexpectedly.") break continue except Exception as e: logging.exception("Writer loop failed") finally: logging.info("Writer loop finished. Wrote %d frames (target %d)", next_idx, total_frames) writer_finished = True writer_thread = Thread(target=writer_loop, daemon=True) writer_thread.start() # 8. Feed Frames (Main Thread) # 8. Feed Frames (Main Thread) try: frames_fed = 0 reader_iter = iter(reader) while True: _check_cancellation(job_id) if max_frames is not None and frames_fed >= max_frames: break try: frame = next(reader_iter) except StopIteration: break queue_in.put((frames_fed, frame)) # Blocks if full frames_fed += 1 logging.info("Feeder finished. Fed %d frames (expected %d)", frames_fed, total_frames) # Update total_frames to actual count so writer knows when to stop if frames_fed != total_frames: logging.info("Updating total_frames from %d to %d (actual fed)", total_frames, frames_fed) total_frames = frames_fed # Signal workers to stop for _ in range(num_workers): try: queue_in.put(None, timeout=5.0) # Using timeout to prevent infinite block except Full: logging.warning("Failed to send stop signal to a worker (queue full)") # Wait for queue to process queue_in.join() except Exception as e: logging.exception("Feeding frames failed") # Ensure we try to signal workers even on error for _ in range(num_workers): try: queue_in.put_nowait(None) except Full: pass raise finally: reader.close() # Wait for writer writer_thread.join() # Sort detections sorted_detections = [] # If we crashed early, we return what we have max_key = max(all_detections_map.keys()) if all_detections_map else -1 for i in range(max_key + 1): sorted_detections.append(all_detections_map.get(i, [])) logging.info("Inference complete. Output: %s", output_video_path) return output_video_path, sorted_detections def run_segmentation( input_video_path: str, output_video_path: str, queries: List[str], max_frames: Optional[int] = None, segmenter_name: Optional[str] = None, job_id: Optional[str] = None, stream_queue: Optional[Queue] = None, ) -> str: # 1. Setup Reader try: reader = AsyncVideoReader(input_video_path) except ValueError: logging.exception("Failed to open video at %s", input_video_path) raise fps = reader.fps width = reader.width height = reader.height total_frames = reader.total_frames if max_frames is not None: total_frames = min(total_frames, max_frames) active_segmenter = segmenter_name or "sam3" logging.info("Using segmenter: %s with queries: %s", active_segmenter, queries) # 2. Load Segmenters (Parallel) # DEBUG: Log current state logging.info(f"[DEBUG] Segmentation PID: {os.getpid()}") logging.info(f"[DEBUG] CUDA_VISIBLE_DEVICES before clear: {os.environ.get('CUDA_VISIBLE_DEVICES')}") # if "CUDA_VISIBLE_DEVICES" in os.environ: # logging.info("[DEBUG] Deleting CUDA_VISIBLE_DEVICES from env (segmentation)") # del os.environ["CUDA_VISIBLE_DEVICES"] num_gpus = torch.cuda.device_count() logging.info(f"[DEBUG] num_gpus: {num_gpus}") segmenters = [] if num_gpus > 0: logging.info("Detected %d GPUs. Loading segmenters...", num_gpus) def load_seg(gpu_id: int): device_str = f"cuda:{gpu_id}" seg = load_segmenter_on_device(active_segmenter, device_str) seg.lock = RLock() return (gpu_id, seg) with ThreadPoolExecutor(max_workers=num_gpus) as loader: futures = [loader.submit(load_seg, i) for i in range(num_gpus)] results = [f.result() for f in futures] results.sort(key=lambda x: x[0]) segmenters = [r[1] for r in results] else: seg = load_segmenter(active_segmenter) seg.lock = RLock() segmenters.append(seg) # 3. Processing queue_in = Queue(maxsize=16) queue_out = Queue(maxsize=max(32, len(segmenters)*4)) writer_finished = False # Robustness def worker_seg(gpu_idx: int): seg = segmenters[gpu_idx] batch_size = seg.max_batch_size if seg.supports_batch else 1 batch_accum = [] def flush_batch(): if not batch_accum: return indices = [i for i, _ in batch_accum] frames = [f for _, f in batch_accum] try: # 1. Inference if seg.supports_batch: with seg.lock: results = seg.predict_batch(frames, queries) else: with seg.lock: results = [seg.predict(f, queries) for f in frames] # 2. Post-process loop for idx, frm, res in zip(indices, frames, results): labels = queries or [] if len(labels) == 1: masks = res.masks if res.masks is not None else [] labels = [labels[0] for _ in range(len(masks))] processed = draw_masks(frm, res.masks, labels=labels) while True: try: queue_out.put((idx, processed), timeout=1.0) break except Full: if writer_finished: raise RuntimeError("Writer thread died") if job_id: _check_cancellation(job_id) except Exception as e: logging.error("Batch seg failed: %s", e) # Fallback: Emit failed frames to prevent writer stall for idx, frm in batch_accum: while True: try: # Return original frame without mask queue_out.put((idx, frm), timeout=1.0) break except Full: if writer_finished: break if job_id: _check_cancellation(job_id) batch_accum.clear() while True: item = queue_in.get() try: if item is None: flush_batch() break idx, frame = item batch_accum.append(item) if idx % 30 == 0: logging.debug("Seg frame %d (GPU %d)", idx, gpu_idx) if len(batch_accum) >= batch_size: flush_batch() finally: queue_in.task_done() workers = [] for i in range(len(segmenters)): t = Thread(target=worker_seg, args=(i,), daemon=True) t.start() workers.append(t) # Writer # writer_finished moved up for closure scope match # Writer # Writer # writer_finished defined earlier def writer_loop(): nonlocal writer_finished next_idx = 0 buffer = {} try: with VideoWriter(output_video_path, fps, width, height) as writer: while next_idx < total_frames: try: while next_idx not in buffer: # Check buffer size if len(buffer) > 64: logging.warning("Writer buffer large (%d), waiting for %d", len(buffer), next_idx) idx, frm = queue_out.get(timeout=1.0) buffer[idx] = frm frm = buffer.pop(next_idx) writer.write(frm) if stream_queue: try: stream_queue.put_nowait(frm) except: pass next_idx += 1 except Exception: if job_id and _check_cancellation(job_id): pass if not any(w.is_alive() for w in workers) and queue_out.empty(): break continue finally: writer_finished = True w_thread = Thread(target=writer_loop, daemon=True) w_thread.start() # Feeder try: reader_iter = iter(reader) frames_fed = 0 while True: _check_cancellation(job_id) if max_frames is not None and frames_fed >= max_frames: break try: frame = next(reader_iter) except StopIteration: break queue_in.put((frames_fed, frame)) frames_fed += 1 # Update total_frames to actual count if frames_fed != total_frames: logging.info("Updating total_frames from %d to %d (actual fed)", total_frames, frames_fed) total_frames = frames_fed for _ in workers: try: queue_in.put(None, timeout=5.0) except Full: pass queue_in.join() except Exception: logging.exception("Segmentation loop failed") for _ in workers: try: queue_in.put_nowait(None) except Full: pass raise finally: reader.close() w_thread.join() logging.info("Segmented video written to: %s", output_video_path) return output_video_path def run_depth_inference( input_video_path: str, output_video_path: str, detections: Optional[List[List[Dict[str, Any]]]] = None, max_frames: Optional[int] = None, depth_estimator_name: str = "depth", first_frame_depth_path: Optional[str] = None, job_id: Optional[str] = None, stream_queue: Optional[Queue] = None, ) -> str: # 1. Setup Reader try: reader = AsyncVideoReader(input_video_path) except ValueError: logging.exception("Failed to open video at %s", input_video_path) raise fps = reader.fps width = reader.width height = reader.height total_frames = reader.total_frames if max_frames is not None: total_frames = min(total_frames, max_frames) logging.info("Using depth estimator: %s", depth_estimator_name) # 2. Load Estimators (Parallel) num_gpus = torch.cuda.device_count() estimators = [] # if "CUDA_VISIBLE_DEVICES" in os.environ: # del os.environ["CUDA_VISIBLE_DEVICES"] if num_gpus > 0: logging.info("Detected %d GPUs. Loading depth estimators...", num_gpus) def load_est(gpu_id: int): device_str = f"cuda:{gpu_id}" est = load_depth_estimator_on_device(depth_estimator_name, device_str) est.lock = RLock() return (gpu_id, est) with ThreadPoolExecutor(max_workers=num_gpus) as loader: futures = [loader.submit(load_est, i) for i in range(num_gpus)] results = [f.result() for f in futures] results.sort(key=lambda x: x[0]) estimators = [r[1] for r in results] else: est = load_depth_estimator(depth_estimator_name) est.lock = RLock() estimators.append(est) # 3. Phase 1: Pre-scan for Stats # We sample ~5% of frames or at least 20 frames distributed evenly stride = max(1, total_frames // 20) logging.info("Starting Phase 1: Pre-scan (stride=%d)...", stride) scan_values = [] def scan_task(gpu_idx: int, frame_data: np.ndarray): est = estimators[gpu_idx] with est.lock: result = est.predict(frame_data) return result.depth_map # Run scan # We can just run this sequentially or with pool? Pool is better. # We need to construct a list of frames to scan. scan_indices = list(range(0, total_frames, stride)) # We need to read specific frames. VideoReader is sequential. # So we iterate and skip. scan_frames = [] # Optimization: If total frames is huge, reading simply to skip might be slow? # VideoReader uses cv2.read() which decodes. # If we need random access, we should use set(cv2.CAP_PROP_POS_FRAMES). # But for now, simple skip logic: current_idx = 0 # To avoid re-opening multiple times or complex seeking, let's just use the Reader # and skip if not in indices. # BUT, if video is 1 hour, skipping 99% frames is wastage of decode. # Re-opening with set POS is better for sparse sampling. # Actually, for robustness, let's just stick to VideoReader sequential read but only process selective frames. # If the video is truly huge, we might want to optimize this later. # Given the constraints, let's just scan the first N frames + some middle ones? # User agreed to "Small startup delay". # Let's try to just grab the frames we want. scan_frames_data = [] # Just grab first 50 frames? No, distribution is better. # Let's use a temporary reader for scanning try: from concurrent.futures import as_completed # Simple Approach: Process first 30 frames to get a baseline. # This is usually enough for a "rough" estimation unless scenes change drastically. # But for stability, spread is better. # Let's read first 10, middle 10, last 10. target_indices = set(list(range(0, 10)) + list(range(total_frames//2, total_frames//2 + 10)) + list(range(max(0, total_frames-10), total_frames))) # Filter valid target_indices = sorted([i for i in target_indices if i < total_frames]) # Manual read with seek is tricky with cv2 (unreliable keyframes). # We will iterate and pick. cnt = 0 reader_scan = AsyncVideoReader(input_video_path) for i, frame in enumerate(reader_scan): if i in target_indices: scan_frames_data.append(frame) if i > max(target_indices): break reader_scan.close() # Run inference on these frames with ThreadPoolExecutor(max_workers=min(len(estimators)*2, 8)) as pool: futures = [] for i, frm in enumerate(scan_frames_data): gpu = i % len(estimators) futures.append(pool.submit(scan_task, gpu, frm)) for f in as_completed(futures): dm = f.result() scan_values.append(dm) except Exception as e: logging.warning("Pre-scan failed, falling back to default range: %s", e) # Compute stats global_min, global_max = 0.0, 1.0 if scan_values: all_vals = np.concatenate([v.ravel() for v in scan_values]) valid = all_vals[np.isfinite(all_vals)] if valid.size > 0: global_min = float(np.percentile(valid, 1)) global_max = float(np.percentile(valid, 99)) # Safety if abs(global_max - global_min) < 1e-6: global_max = global_min + 1.0 logging.info("Global Depth Range: %.2f - %.2f", global_min, global_max) # 4. Phase 2: Streaming Inference logging.info("Starting Phase 2: Streaming...") queue_in = Queue(maxsize=16) queue_out_max = max(32, (len(estimators) if estimators else 1) * 4) queue_out = Queue(maxsize=queue_out_max) writer_finished = False def worker_depth(gpu_idx: int): est = estimators[gpu_idx] batch_size = est.max_batch_size if est.supports_batch else 1 batch_accum = [] def flush_batch(): if not batch_accum: return indices = [i for i, _ in batch_accum] frames = [f for _, f in batch_accum] try: # 1. Inference if est.supports_batch: with est.lock: results = est.predict_batch(frames) else: with est.lock: results = [est.predict(f) for f in frames] # 2. Post-process loop for idx, frm, res in zip(indices, frames, results): depth_map = res.depth_map colored = colorize_depth_map(depth_map, global_min, global_max) # Overlay Detections if detections and idx < len(detections): frame_dets = detections[idx] if frame_dets: boxes = [] labels = [] for d in frame_dets: boxes.append(d.get("bbox")) lbl = d.get("label", "obj") if d.get("gpt_distance_m"): lbl = f"{lbl} {int(d['gpt_distance_m'])}m" labels.append(lbl) colored = draw_boxes(colored, boxes=boxes, label_names=labels) while True: try: queue_out.put((idx, colored), timeout=1.0) break except Full: if writer_finished: raise RuntimeError("Writer died") if job_id: _check_cancellation(job_id) except Exception as e: logging.error("Batch depth failed: %s", e) # Fallback: Emit original frames (no depth map) for idx, frm in batch_accum: while True: try: queue_out.put((idx, frm), timeout=1.0) break except Full: if writer_finished: break if job_id: _check_cancellation(job_id) batch_accum.clear() while True: item = queue_in.get() try: if item is None: flush_batch() break idx, frame = item batch_accum.append(item) if idx % 30 == 0: logging.info("Depth frame %d (GPU %d)", idx, gpu_idx) if len(batch_accum) >= batch_size: flush_batch() finally: queue_in.task_done() # Workers workers = [] for i in range(len(estimators)): t = Thread(target=worker_depth, args=(i,), daemon=True) t.start() workers.append(t) # Writer # Writer # writer_finished defined earlier first_frame_saved = False def writer_loop(): nonlocal writer_finished, first_frame_saved next_idx = 0 buffer = {} processed_frames_subset = [] # Keep first frame for saving if needed try: with VideoWriter(output_video_path, fps, width, height) as writer: while next_idx < total_frames: try: while next_idx not in buffer: if len(buffer) > 64: logging.warning("Writer buffer large (%d), waiting for %d", len(buffer), next_idx) idx, frm = queue_out.get(timeout=1.0) buffer[idx] = frm frm = buffer.pop(next_idx) writer.write(frm) if stream_queue: try: stream_queue.put_nowait(frm) except: pass if first_frame_depth_path and not first_frame_saved and next_idx == 0: cv2.imwrite(first_frame_depth_path, frm) first_frame_saved = True next_idx += 1 if next_idx % 30 == 0: logging.debug("Wrote depth frame %d/%d", next_idx, total_frames) except Exception: if job_id and _check_cancellation(job_id): pass if not any(w.is_alive() for w in workers) and queue_out.empty(): break continue finally: writer_finished = True w_thread = Thread(target=writer_loop, daemon=True) w_thread.start() # Feeder try: reader_iter = iter(reader) frames_fed = 0 while True: _check_cancellation(job_id) if max_frames is not None and frames_fed >= max_frames: break try: frame = next(reader_iter) except StopIteration: break queue_in.put((frames_fed, frame)) frames_fed += 1 # Update total_frames to actual count if frames_fed != total_frames: logging.info("Updating total_frames from %d to %d (actual fed)", total_frames, frames_fed) total_frames = frames_fed for _ in workers: try: queue_in.put(None, timeout=5.0) except Full: pass queue_in.join() except Exception: logging.exception("Depth loop failed") for _ in workers: try: queue_in.put_nowait(None) except Full: pass raise finally: reader.close() w_thread.join() return output_video_path def colorize_depth_map( depth_map: np.ndarray, global_min: float, global_max: float, ) -> np.ndarray: """ Convert depth map to RGB visualization using TURBO colormap. Args: depth_map: HxW float32 depth in meters global_min: Minimum depth across entire video (for stable normalization) global_max: Maximum depth across entire video (for stable normalization) Returns: HxWx3 uint8 RGB image """ import cv2 depth_clean = np.copy(depth_map) finite_mask = np.isfinite(depth_clean) if not np.isfinite(global_min) or not np.isfinite(global_max): if finite_mask.any(): local_depths = depth_clean[finite_mask].astype(np.float64, copy=False) global_min = float(np.percentile(local_depths, 1)) global_max = float(np.percentile(local_depths, 99)) else: global_min = 0.0 global_max = 1.0 # Replace NaN/inf with min value for visualization depth_clean[~finite_mask] = global_min if global_max - global_min < 1e-6: # Handle uniform depth depth_norm = np.zeros_like(depth_clean, dtype=np.uint8) else: # Clip to global range to handle outliers depth_clipped = np.clip(depth_clean, global_min, global_max) depth_norm = ((depth_clipped - global_min) / (global_max - global_min) * 255).astype(np.uint8) # Apply TURBO colormap for vibrant, perceptually uniform visualization colored = cv2.applyColorMap(depth_norm, cv2.COLORMAP_TURBO) return colored