Spaces:
Paused
Paused
| # CRITICAL: Clear CUDA_VISIBLE_DEVICES BEFORE any imports | |
| # HF Spaces may set this to "0" dynamically, locking us to a single GPU | |
| # import os | |
| # if "CUDA_VISIBLE_DEVICES" in os.environ: | |
| # del os.environ["CUDA_VISIBLE_DEVICES"] | |
| import os | |
| import collections | |
| import logging | |
| import time | |
| from threading import Event, RLock, Thread | |
| from queue import Queue, Full, Empty | |
| from typing import Any, Dict, List, Optional, Sequence, Tuple | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from concurrent.futures import ThreadPoolExecutor | |
| from models.detectors.base import ObjectDetector | |
| from models.model_loader import load_detector, load_detector_on_device | |
| from models.segmenters.model_loader import load_segmenter, load_segmenter_on_device | |
| from models.depth_estimators.model_loader import load_depth_estimator, load_depth_estimator_on_device | |
| from utils.video import StreamingVideoWriter | |
| from utils.relevance import evaluate_relevance | |
| from utils.enrichment import run_enrichment | |
| from utils.schemas import AssessmentStatus | |
| from jobs.storage import set_track_data | |
| import tempfile | |
| import json as json_module | |
| class AsyncVideoReader: | |
| """ | |
| Async video reader that decodes frames in a background thread. | |
| This prevents GPU starvation on multi-GPU systems by prefetching frames | |
| while the main thread is busy dispatching work to GPUs. | |
| """ | |
| def __init__(self, video_path: str, prefetch_size: int = 32): | |
| """ | |
| Initialize async video reader. | |
| Args: | |
| video_path: Path to video file | |
| prefetch_size: Number of frames to prefetch (default 32) | |
| """ | |
| from queue import Queue | |
| from threading import Thread | |
| self.video_path = video_path | |
| self.prefetch_size = prefetch_size | |
| # Open video to get metadata | |
| self._cap = cv2.VideoCapture(video_path) | |
| if not self._cap.isOpened(): | |
| raise ValueError(f"Unable to open video: {video_path}") | |
| self.fps = self._cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| self.width = int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| self.height = int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| self.total_frames = int(self._cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Prefetch queue | |
| self._queue: Queue = Queue(maxsize=prefetch_size) | |
| self._error: Exception = None | |
| self._finished = False | |
| # Start decoder thread | |
| self._thread = Thread(target=self._decode_loop, daemon=True) | |
| self._thread.start() | |
| def _decode_loop(self): | |
| """Background thread that continuously decodes frames.""" | |
| try: | |
| while True: | |
| success, frame = self._cap.read() | |
| if not success: | |
| break | |
| self._queue.put(frame) # Blocks when queue is full (backpressure) | |
| except Exception as e: | |
| self._error = e | |
| logging.error(f"AsyncVideoReader decode error: {e}") | |
| finally: | |
| self._cap.release() | |
| self._queue.put(None) # Sentinel to signal end | |
| self._finished = True | |
| def __iter__(self): | |
| return self | |
| def __next__(self) -> np.ndarray: | |
| if self._error: | |
| raise self._error | |
| frame = self._queue.get() | |
| if frame is None: | |
| raise StopIteration | |
| return frame | |
| def close(self): | |
| """Stop the decoder thread and release resources.""" | |
| # Signal thread to stop by releasing cap (if not already done) | |
| if self._cap.isOpened(): | |
| self._cap.release() | |
| # Drain queue to unblock thread if it's waiting on put() | |
| while not self._queue.empty(): | |
| try: | |
| self._queue.get_nowait() | |
| except: | |
| break | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| def _check_cancellation(job_id: Optional[str]) -> None: | |
| """Check if job has been cancelled and raise exception if so.""" | |
| if job_id is None: | |
| return | |
| from jobs.storage import get_job_storage | |
| from jobs.models import JobStatus | |
| job = get_job_storage().get(job_id) | |
| if job and job.status == JobStatus.CANCELLED: | |
| raise RuntimeError("Job cancelled by user") | |
| def _color_for_label(label: str) -> Tuple[int, int, int]: | |
| # Deterministic BGR color from label text. | |
| value = abs(hash(label)) % 0xFFFFFF | |
| blue = value & 0xFF | |
| green = (value >> 8) & 0xFF | |
| red = (value >> 16) & 0xFF | |
| return (blue, green, red) | |
| def draw_boxes( | |
| frame: np.ndarray, | |
| boxes: np.ndarray, | |
| labels: Optional[Sequence[int]] = None, | |
| queries: Optional[Sequence[str]] = None, | |
| label_names: Optional[Sequence[str]] = None, | |
| ) -> np.ndarray: | |
| output = frame.copy() | |
| if boxes is None: | |
| return output | |
| for idx, box in enumerate(boxes): | |
| x1, y1, x2, y2 = [int(coord) for coord in box] | |
| if label_names is not None and idx < len(label_names): | |
| label = label_names[idx] | |
| elif labels is not None and idx < len(labels) and queries is not None: | |
| label_idx = int(labels[idx]) | |
| if 0 <= label_idx < len(queries): | |
| label = queries[label_idx] | |
| else: | |
| label = f"label_{label_idx}" | |
| else: | |
| label = f"label_{idx}" | |
| color = _color_for_label(label) | |
| cv2.rectangle(output, (x1, y1), (x2, y2), color, thickness=2) | |
| if label: | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| font_scale = 1.0 | |
| thickness = 2 | |
| text_size, baseline = cv2.getTextSize(label, font, font_scale, thickness) | |
| text_w, text_h = text_size | |
| pad = 4 | |
| text_x = x1 | |
| text_y = max(y1 - 6, text_h + pad) | |
| box_top_left = (text_x, text_y - text_h - pad) | |
| box_bottom_right = (text_x + text_w + pad, text_y + baseline) | |
| cv2.rectangle(output, box_top_left, box_bottom_right, color, thickness=-1) | |
| cv2.putText( | |
| output, | |
| label, | |
| (text_x + pad // 2, text_y - 2), | |
| font, | |
| font_scale, | |
| (255, 255, 255), | |
| thickness, | |
| lineType=cv2.LINE_AA, | |
| ) | |
| return output | |
| def draw_masks( | |
| frame: np.ndarray, | |
| masks: np.ndarray, | |
| alpha: float = 0.65, | |
| labels: Optional[Sequence[str]] = None, | |
| ) -> np.ndarray: | |
| output = frame.copy() | |
| if masks is None or len(masks) == 0: | |
| return output | |
| for idx, mask in enumerate(masks): | |
| if mask is None: | |
| continue | |
| if mask.ndim == 3: | |
| mask = mask[0] | |
| if mask.shape[:2] != output.shape[:2]: | |
| mask = cv2.resize(mask, (output.shape[1], output.shape[0]), interpolation=cv2.INTER_NEAREST) | |
| mask_bool = mask.astype(bool) | |
| overlay = np.zeros_like(output, dtype=np.uint8) | |
| label = None | |
| if labels and idx < len(labels): | |
| label = labels[idx] | |
| # Use a fallback key for consistent color even when no label text | |
| color_key = label if label else f"object_{idx}" | |
| color = _color_for_label(color_key) | |
| overlay[mask_bool] = color | |
| output = cv2.addWeighted(output, 1.0, overlay, alpha, 0) | |
| contours, _ = cv2.findContours( | |
| mask_bool.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE | |
| ) | |
| if contours: | |
| cv2.drawContours(output, contours, -1, color, thickness=2) | |
| # Only draw label text when explicit labels were provided | |
| if label: | |
| coords = np.column_stack(np.where(mask_bool)) | |
| if coords.size: | |
| y, x = coords[0] | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| font_scale = 1.0 | |
| thickness = 2 | |
| text_size, baseline = cv2.getTextSize(label, font, font_scale, thickness) | |
| text_w, text_h = text_size | |
| pad = 4 | |
| text_x = int(x) | |
| text_y = max(int(y) - 6, text_h + pad) | |
| box_top_left = (text_x, text_y - text_h - pad) | |
| box_bottom_right = (text_x + text_w + pad, text_y + baseline) | |
| cv2.rectangle(output, box_top_left, box_bottom_right, color, thickness=-1) | |
| cv2.putText( | |
| output, | |
| label, | |
| (text_x + pad // 2, text_y - 2), | |
| font, | |
| font_scale, | |
| (255, 255, 255), | |
| thickness, | |
| lineType=cv2.LINE_AA, | |
| ) | |
| return output | |
| def _build_detection_records( | |
| boxes: np.ndarray, | |
| scores: Sequence[float], | |
| labels: Sequence[int], | |
| queries: Sequence[str], | |
| label_names: Optional[Sequence[str]] = None, | |
| ) -> List[Dict[str, Any]]: | |
| detections: List[Dict[str, Any]] = [] | |
| for idx, box in enumerate(boxes): | |
| if label_names is not None and idx < len(label_names): | |
| label = label_names[idx] | |
| else: | |
| label_idx = int(labels[idx]) if idx < len(labels) else -1 | |
| if 0 <= label_idx < len(queries): | |
| label = queries[label_idx] | |
| else: | |
| label = f"label_{label_idx}" | |
| detections.append( | |
| { | |
| "label": label, | |
| "score": float(scores[idx]) if idx < len(scores) else 0.0, | |
| "bbox": [int(coord) for coord in box.tolist()], | |
| } | |
| ) | |
| return detections | |
| from utils.tracker import ByteTracker | |
| class SpeedEstimator: | |
| def __init__(self, fps: float = 30.0): | |
| self.fps = fps | |
| self.pixel_scale_map = {} # label -> pixels_per_meter (heuristic) | |
| def estimate(self, detections: List[Dict[str, Any]]): | |
| for det in detections: | |
| history = det.get('history', []) | |
| if len(history) < 5: continue | |
| # Simple heuristic: Speed based on pixel movement | |
| # We assume a base depth or size. | |
| # Delta over last 5 frames | |
| curr = history[-1] | |
| prev = history[-5] | |
| # Centroids | |
| cx1 = (curr[0] + curr[2]) / 2 | |
| cy1 = (curr[1] + curr[3]) / 2 | |
| cx2 = (prev[0] + prev[2]) / 2 | |
| cy2 = (prev[1] + prev[3]) / 2 | |
| dist_px = np.sqrt((cx1-cx2)**2 + (cy1-cy2)**2) | |
| # Heuristic scale: Assume car is ~4m long? Or just arbitrary pixel scale | |
| # If we had GPT distance, we could calibrate. | |
| # For now, let's use a dummy scale: 50px = 1m (very rough) | |
| # Speed = (dist_px / 50) meters / (5 frames / 30 fps) seconds | |
| # = (dist_px / 50) / (0.166) m/s | |
| # = (dist_px * 0.12) m/s | |
| # = * 3.6 km/h | |
| scale = 50.0 | |
| dt = 5.0 / self.fps | |
| speed_mps = (dist_px / scale) / dt | |
| speed_kph = speed_mps * 3.6 | |
| # Smoothing | |
| det['speed_kph'] = speed_kph | |
| # Direction | |
| dx = cx1 - cx2 | |
| dy = cy1 - cy2 | |
| angle = np.degrees(np.arctan2(dy, dx)) # 0 is right, 90 is down | |
| # Map to clock direction (12 is up = -90 deg) | |
| # -90 (up) -> 12 | |
| # 0 (right) -> 3 | |
| # 90 (down) -> 6 | |
| # 180 (left) -> 9 | |
| # Adjust so 12 is up (negative Y) | |
| # angle -90 is 12 | |
| clock_hour = ((angle + 90) / 30 + 12) % 12 | |
| if clock_hour == 0: clock_hour = 12.0 | |
| det['direction_clock'] = f"{int(round(clock_hour))} o'clock" | |
| det['angle_deg'] = angle # 0 is right, 90 is down (screen space) | |
| class IncrementalDepthStats: | |
| """Thread-safe incremental depth range estimator. | |
| Collects depth statistics frame-by-frame so the expensive pre-scan | |
| (opening a second video reader) can be eliminated. Before | |
| ``warmup_frames`` updates the range defaults to (0.0, 1.0). | |
| """ | |
| def __init__(self, warmup_frames: int = 30): | |
| self._lock = RLock() | |
| self._warmup = warmup_frames | |
| self._count = 0 | |
| self._global_min = float("inf") | |
| self._global_max = float("-inf") | |
| def update(self, depth_map: np.ndarray) -> None: | |
| if depth_map is None or depth_map.size == 0: | |
| return | |
| finite = depth_map[np.isfinite(depth_map)] | |
| if finite.size == 0: | |
| return | |
| lo = float(np.percentile(finite, 1)) | |
| hi = float(np.percentile(finite, 99)) | |
| with self._lock: | |
| self._global_min = min(self._global_min, lo) | |
| self._global_max = max(self._global_max, hi) | |
| self._count += 1 | |
| def range(self) -> Tuple[float, float]: | |
| with self._lock: | |
| if self._count < self._warmup: | |
| # Not enough data yet — use default range | |
| if self._count == 0: | |
| return (0.0, 1.0) | |
| # Use what we have but may be less stable | |
| lo, hi = self._global_min, self._global_max | |
| else: | |
| lo, hi = self._global_min, self._global_max | |
| if abs(hi - lo) < 1e-6: | |
| hi = lo + 1.0 | |
| return (lo, hi) | |
| _MODEL_LOCKS: Dict[str, RLock] = {} | |
| _MODEL_LOCKS_GUARD = RLock() | |
| _DEPTH_SCALE = float(os.getenv("DEPTH_SCALE", "25.0")) | |
| def _get_model_lock(kind: str, name: str) -> RLock: | |
| key = f"{kind}:{name}" | |
| with _MODEL_LOCKS_GUARD: | |
| lock = _MODEL_LOCKS.get(key) | |
| if lock is None: | |
| lock = RLock() | |
| _MODEL_LOCKS[key] = lock | |
| return lock | |
| def _attach_depth_metrics( | |
| frame: np.ndarray, | |
| detections: List[Dict[str, Any]], | |
| depth_estimator_name: Optional[str], | |
| depth_scale: float, # No longer used for distance calculation | |
| estimator_instance: Optional[Any] = None, | |
| ) -> None: | |
| """Attach relative depth values for visualization only. GPT handles distance estimation.""" | |
| if not detections or (not depth_estimator_name and not estimator_instance): | |
| return | |
| from models.depth_estimators.model_loader import load_depth_estimator | |
| if estimator_instance: | |
| estimator = estimator_instance | |
| # Use instance lock if available, or create one | |
| if hasattr(estimator, "lock"): | |
| lock = estimator.lock | |
| else: | |
| # Fallback (shouldn't happen with our new setup but safe) | |
| lock = _get_model_lock("depth", estimator.name) | |
| else: | |
| estimator = load_depth_estimator(depth_estimator_name) | |
| lock = _get_model_lock("depth", estimator.name) | |
| with lock: | |
| depth_result = estimator.predict(frame) | |
| depth_map = depth_result.depth_map | |
| if depth_map is None or depth_map.size == 0: | |
| return | |
| height, width = depth_map.shape[:2] | |
| raw_depths: List[Tuple[Dict[str, Any], float]] = [] | |
| for det in detections: | |
| det["depth_rel"] = None # Relative depth for visualization only | |
| bbox = det.get("bbox") | |
| if not bbox or len(bbox) < 4: | |
| continue | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]] | |
| x1 = max(0, min(width - 1, x1)) | |
| y1 = max(0, min(height - 1, y1)) | |
| x2 = max(x1 + 1, min(width, x2)) | |
| y2 = max(y1 + 1, min(height, y2)) | |
| patch = depth_map[y1:y2, x1:x2] | |
| if patch.size == 0: | |
| continue | |
| # Center crop (50%) to avoid background | |
| h_p, w_p = patch.shape | |
| cy, cx = h_p // 2, w_p // 2 | |
| dy, dx = h_p // 4, w_p // 4 | |
| center_patch = patch[cy - dy : cy + dy, cx - dx : cx + dx] | |
| # Fallback to full patch if center is empty (unlikely) | |
| if center_patch.size == 0: | |
| center_patch = patch | |
| finite = center_patch[np.isfinite(center_patch)] | |
| if finite.size == 0: | |
| continue | |
| depth_raw = float(np.median(finite)) | |
| if depth_raw > 1e-6: | |
| raw_depths.append((det, depth_raw)) | |
| if not raw_depths: | |
| return | |
| # Compute relative depth (0-1) for visualization only | |
| all_raw = [d[1] for d in raw_depths] | |
| min_raw, max_raw = min(all_raw), max(all_raw) | |
| denom = max(max_raw - min_raw, 1e-6) | |
| for det, depth_raw in raw_depths: | |
| # Inverted: higher raw = closer = lower rel value (0=close, 1=far) | |
| det["depth_rel"] = 1.0 - ((depth_raw - min_raw) / denom) | |
| def infer_frame( | |
| frame: np.ndarray, | |
| queries: Sequence[str], | |
| detector_name: Optional[str] = None, | |
| depth_estimator_name: Optional[str] = None, | |
| depth_scale: float = 1.0, | |
| detector_instance: Optional[ObjectDetector] = None, | |
| depth_estimator_instance: Optional[Any] = None, | |
| ) -> Tuple[np.ndarray, List[Dict[str, Any]]]: | |
| if detector_instance: | |
| detector = detector_instance | |
| else: | |
| detector = load_detector(detector_name) | |
| text_queries = list(queries) or ["object"] | |
| try: | |
| if hasattr(detector, "lock"): | |
| lock = detector.lock | |
| else: | |
| lock = _get_model_lock("detector", detector.name) | |
| with lock: | |
| result = detector.predict(frame, text_queries) | |
| detections = _build_detection_records( | |
| result.boxes, result.scores, result.labels, text_queries, result.label_names | |
| ) | |
| if depth_estimator_name or depth_estimator_instance: | |
| try: | |
| _attach_depth_metrics( | |
| frame, detections, depth_estimator_name, depth_scale, estimator_instance=depth_estimator_instance | |
| ) | |
| except Exception: | |
| logging.exception("Depth estimation failed for frame") | |
| # Re-build display labels to include GPT distance if available | |
| display_labels = [] | |
| for i, det in enumerate(detections): | |
| label = det["label"] | |
| if det.get("gpt_distance_m") is not None: | |
| # Add GPT distance to label, e.g. "car 12m" | |
| depth_str = f"{int(det['gpt_distance_m'])}m" | |
| label = f"{label} {depth_str}" | |
| logging.debug("Object '%s' at %s (bbox: %s)", label, depth_str, det['bbox']) | |
| display_labels.append(label) | |
| except Exception: | |
| logging.exception("Inference failed for queries %s", text_queries) | |
| raise | |
| return draw_boxes( | |
| frame, | |
| result.boxes, | |
| labels=None, # Use custom labels | |
| queries=None, | |
| label_names=display_labels, | |
| ), detections | |
| def _build_display_label(det): | |
| """Build display label with GPT distance if available.""" | |
| label = det["label"] | |
| if det.get("gpt_distance_m") is not None: | |
| label = f"{label} {int(det['gpt_distance_m'])}m" | |
| return label | |
| def _attach_depth_from_result(detections, depth_result, depth_scale): | |
| """Attach relative depth values for visualization only. GPT handles distance estimation.""" | |
| depth_map = depth_result.depth_map | |
| if depth_map is None or depth_map.size == 0: return | |
| height, width = depth_map.shape[:2] | |
| raw_depths = [] | |
| for det in detections: | |
| det["depth_rel"] = None # Relative depth for visualization only | |
| bbox = det.get("bbox") | |
| if not bbox or len(bbox) < 4: continue | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]] | |
| x1 = max(0, min(width - 1, x1)) | |
| y1 = max(0, min(height - 1, y1)) | |
| x2 = max(x1 + 1, min(width, x2)) | |
| y2 = max(y1 + 1, min(height, y2)) | |
| patch = depth_map[y1:y2, x1:x2] | |
| if patch.size == 0: continue | |
| h_p, w_p = patch.shape | |
| cy, cx = h_p // 2, w_p // 2 | |
| dy, dx = h_p // 4, w_p // 4 | |
| center_patch = patch[cy - dy : cy + dy, cx - dx : cx + dx] | |
| if center_patch.size == 0: center_patch = patch | |
| finite = center_patch[np.isfinite(center_patch)] | |
| if finite.size == 0: continue | |
| depth_raw = float(np.median(finite)) | |
| if depth_raw > 1e-6: | |
| raw_depths.append((det, depth_raw)) | |
| if not raw_depths: return | |
| # Compute relative depth (0-1) for visualization only | |
| all_raw = [d[1] for d in raw_depths] | |
| min_raw, max_raw = min(all_raw), max(all_raw) | |
| denom = max(max_raw - min_raw, 1e-6) | |
| for det, depth_raw in raw_depths: | |
| # Inverted: higher raw = closer = lower rel value (0=close, 1=far) | |
| det["depth_rel"] = 1.0 - ((depth_raw - min_raw) / denom) | |
| def infer_segmentation_frame( | |
| frame: np.ndarray, | |
| text_queries: Optional[List[str]] = None, | |
| segmenter_name: Optional[str] = None, | |
| segmenter_instance: Optional[Any] = None, | |
| ) -> Tuple[np.ndarray, Any]: | |
| if segmenter_instance: | |
| segmenter = segmenter_instance | |
| # Use instance lock if available | |
| if hasattr(segmenter, "lock"): | |
| lock = segmenter.lock | |
| else: | |
| lock = _get_model_lock("segmenter", segmenter.name) | |
| else: | |
| segmenter = load_segmenter(segmenter_name) | |
| lock = _get_model_lock("segmenter", segmenter.name) | |
| with lock: | |
| result = segmenter.predict(frame, text_prompts=text_queries) | |
| labels = text_queries or [] | |
| if len(labels) == 1: | |
| masks = result.masks if result.masks is not None else [] | |
| labels = [labels[0] for _ in range(len(masks))] | |
| return draw_masks(frame, result.masks, labels=labels), result | |
| def extract_first_frame(video_path: str) -> Tuple[np.ndarray, float, int, int]: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError("Unable to open video.") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 0.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| success, frame = cap.read() | |
| cap.release() | |
| if not success or frame is None: | |
| raise ValueError("Video decode produced zero frames.") | |
| return frame, fps, width, height | |
| def process_first_frame( | |
| video_path: str, | |
| queries: List[str], | |
| mode: str, | |
| detector_name: Optional[str] = None, | |
| segmenter_name: Optional[str] = None, | |
| ) -> Tuple[np.ndarray, List[Dict[str, Any]]]: | |
| """Lightweight first-frame processing: detection + rendering only. | |
| GPT, depth, and LLM relevance are handled later in the async pipeline | |
| (writer enrichment thread), avoiding 2-8s synchronous startup delay. | |
| Returns: | |
| (processed_frame, detections) — all detections tagged UNASSESSED. | |
| """ | |
| frame, _, _, _ = extract_first_frame(video_path) | |
| if mode == "segmentation": | |
| processed, seg_result = infer_segmentation_frame( | |
| frame, text_queries=queries, segmenter_name=segmenter_name | |
| ) | |
| detections = [] | |
| if seg_result.boxes is not None and len(seg_result.boxes) > 0: | |
| labels = seg_result.label_names or queries or [] | |
| for idx, box in enumerate(seg_result.boxes): | |
| label = labels[idx] if idx < len(labels) else "object" | |
| detections.append({ | |
| "label": label, | |
| "bbox": [int(c) for c in box], | |
| "score": float(seg_result.scores[idx]) if seg_result.scores is not None and idx < len(seg_result.scores) else 1.0, | |
| "track_id": f"T{idx + 1:02d}", | |
| "assessment_status": AssessmentStatus.UNASSESSED, | |
| }) | |
| return processed, detections | |
| processed, detections = infer_frame( | |
| frame, queries, detector_name=detector_name | |
| ) | |
| # Tag all detections as unassessed — GPT runs later in enrichment thread | |
| for det in detections: | |
| det["assessment_status"] = AssessmentStatus.UNASSESSED | |
| return processed, detections | |
| def run_inference( | |
| input_video_path: str, | |
| output_video_path: str, | |
| queries: List[str], | |
| max_frames: Optional[int] = None, | |
| detector_name: Optional[str] = None, | |
| job_id: Optional[str] = None, | |
| depth_estimator_name: Optional[str] = None, | |
| depth_scale: float = 1.0, | |
| enable_gpt: bool = True, | |
| stream_queue: Optional[Queue] = None, | |
| mission_spec=None, # Optional[MissionSpecification] | |
| first_frame_gpt_results: Optional[Dict[str, Any]] = None, | |
| first_frame_detections: Optional[List[Dict[str, Any]]] = None, | |
| ) -> Tuple[str, List[List[Dict[str, Any]]]]: | |
| # 1. Setup Video Reader | |
| try: | |
| reader = AsyncVideoReader(input_video_path) | |
| except ValueError: | |
| logging.exception("Failed to open video at %s", input_video_path) | |
| raise | |
| fps = reader.fps | |
| width = reader.width | |
| height = reader.height | |
| total_frames = reader.total_frames | |
| if max_frames is not None: | |
| total_frames = min(total_frames, max_frames) | |
| # 2. Defaults and Config | |
| if not queries: | |
| queries = ["person", "car", "truck", "motorcycle", "bicycle", "bus", "train", "airplane"] | |
| logging.info("No queries provided, using defaults: %s", queries) | |
| logging.info("Detection queries: %s", queries) | |
| active_detector = detector_name or "yolo11" | |
| # Parallel Model Loading | |
| num_gpus = torch.cuda.device_count() | |
| detectors = [] | |
| depth_estimators = [] | |
| if num_gpus > 0: | |
| logging.info("Detected %d GPUs. Loading models in parallel...", num_gpus) | |
| def load_models_on_gpu(gpu_id: int): | |
| device_str = f"cuda:{gpu_id}" | |
| try: | |
| det = load_detector_on_device(active_detector, device_str) | |
| det.lock = RLock() | |
| depth = None | |
| if depth_estimator_name: | |
| depth = load_depth_estimator_on_device(depth_estimator_name, device_str) | |
| depth.lock = RLock() | |
| return (gpu_id, det, depth) | |
| except Exception as e: | |
| logging.error(f"Failed to load models on GPU {gpu_id}: {e}") | |
| raise | |
| with ThreadPoolExecutor(max_workers=num_gpus) as loader_pool: | |
| futures = [loader_pool.submit(load_models_on_gpu, i) for i in range(num_gpus)] | |
| results = [f.result() for f in futures] | |
| # Sort by GPU ID to ensure consistent indexing | |
| results.sort(key=lambda x: x[0]) | |
| for _, det, depth in results: | |
| detectors.append(det) | |
| depth_estimators.append(depth) | |
| else: | |
| logging.info("No GPUs detected. Loading CPU models...") | |
| det = load_detector(active_detector) | |
| det.lock = RLock() | |
| detectors.append(det) | |
| if depth_estimator_name: | |
| depth = load_depth_estimator(depth_estimator_name) | |
| depth.lock = RLock() | |
| depth_estimators.append(depth) | |
| else: | |
| depth_estimators.append(None) | |
| # 4. Incremental Depth Stats (replaces expensive pre-scan) | |
| depth_stats = IncrementalDepthStats(warmup_frames=30) if depth_estimator_name else None | |
| # queue_in: (frame_idx, frame_data) | |
| # queue_out: (frame_idx, processed_frame, detections) | |
| queue_in = Queue(maxsize=16) | |
| # Tuning for A10: buffer at least 32 frames per GPU (batch size) | |
| # GPT Latency Buffer: GPT takes ~3s. At 30fps, that's 90 frames. We need to absorb this burst. | |
| queue_out_max = max(128, (len(detectors) if detectors else 1) * 32) | |
| queue_out = Queue(maxsize=queue_out_max) | |
| # 6. Worker Function (Unified) | |
| # Robustness: Define flag early so workers can see it | |
| writer_finished = False | |
| def worker_task(gpu_idx: int): | |
| logging.info(f"Worker {gpu_idx} started. PID: {os.getpid()}") | |
| detector_instance = detectors[gpu_idx] | |
| depth_instance = depth_estimators[gpu_idx] if gpu_idx < len(depth_estimators) else None # Handle mismatched lists safely | |
| batch_size = detector_instance.max_batch_size if detector_instance.supports_batch else 1 | |
| batch_accum = [] # List[Tuple[idx, frame]] | |
| def flush_batch(): | |
| if not batch_accum: return | |
| logging.info(f"Worker {gpu_idx} flushing batch of {len(batch_accum)} frames") | |
| indices = [item[0] for item in batch_accum] | |
| frames = [item[1] for item in batch_accum] | |
| # --- UNIFIED INFERENCE --- | |
| # Separate frame 0 if we have cached detections (avoid re-detecting) | |
| cached_frame0 = None | |
| detect_indices = indices | |
| detect_frames = frames | |
| if first_frame_detections is not None and 0 in indices: | |
| f0_pos = indices.index(0) | |
| cached_frame0 = (indices[f0_pos], frames[f0_pos]) | |
| detect_indices = indices[:f0_pos] + indices[f0_pos+1:] | |
| detect_frames = frames[:f0_pos] + frames[f0_pos+1:] | |
| logging.info("Worker %d: reusing cached detections for frame 0", gpu_idx) | |
| # Run detection batch (excluding frame 0 if cached) | |
| det_results_map = {} | |
| if detect_frames: | |
| try: | |
| if detector_instance.supports_batch: | |
| with detector_instance.lock: | |
| raw_results = detector_instance.predict_batch(detect_frames, queries) | |
| else: | |
| with detector_instance.lock: | |
| raw_results = [detector_instance.predict(f, queries) for f in detect_frames] | |
| for di, dr in zip(detect_indices, raw_results): | |
| det_results_map[di] = dr | |
| except BaseException as e: | |
| logging.exception("Batch detection crashed with critical error") | |
| for di in detect_indices: | |
| det_results_map[di] = None | |
| # Run depth batch (if enabled) — always for all frames | |
| depth_results = [None] * len(frames) | |
| if depth_instance and depth_estimator_name: | |
| try: | |
| with depth_instance.lock: | |
| if depth_instance.supports_batch: | |
| depth_results = depth_instance.predict_batch(frames) | |
| else: | |
| depth_results = [depth_instance.predict(f) for f in frames] | |
| except BaseException as e: | |
| logging.exception("Batch depth crashed with critical error") | |
| # Update incremental depth stats | |
| if depth_stats is not None: | |
| for dep_res in depth_results: | |
| if dep_res and dep_res.depth_map is not None: | |
| depth_stats.update(dep_res.depth_map) | |
| # --- POST PROCESSING --- | |
| batch_det_summary = [] | |
| for i, (idx, frame, dep_res) in enumerate(zip(indices, frames, depth_results)): | |
| # 1. Detections — use cached for frame 0 if available | |
| detections = [] | |
| if cached_frame0 is not None and idx == 0: | |
| detections = [d.copy() for d in first_frame_detections] | |
| else: | |
| d_res = det_results_map.get(idx) | |
| if d_res: | |
| detections = _build_detection_records( | |
| d_res.boxes, d_res.scores, d_res.labels, queries, d_res.label_names | |
| ) | |
| batch_det_summary.append((idx, len(detections))) | |
| # 2. Frame Rendering | |
| processed = frame.copy() | |
| # A. Render Depth Heatmap (if enabled) | |
| if dep_res and dep_res.depth_map is not None: | |
| ds_min, ds_max = depth_stats.range if depth_stats else (0.0, 1.0) | |
| processed = colorize_depth_map(dep_res.depth_map, ds_min, ds_max) | |
| try: | |
| _attach_depth_from_result(detections, dep_res, depth_scale) | |
| except: pass | |
| # 3. Output | |
| while True: | |
| try: | |
| queue_out.put((idx, processed, detections), timeout=1.0) | |
| break | |
| except Full: | |
| # Robustness: Check if writer is dead | |
| if writer_finished: | |
| raise RuntimeError("Writer thread died unexpectedly") | |
| if job_id: _check_cancellation(job_id) | |
| total_dets = sum(c for _, c in batch_det_summary) | |
| if total_dets == 0 or indices[0] % 90 == 0: | |
| logging.info("Worker %d batch [frames %s]: %d total detections %s", | |
| gpu_idx, | |
| f"{indices[0]}-{indices[-1]}", | |
| total_dets, | |
| [(idx, cnt) for idx, cnt in batch_det_summary if cnt > 0]) | |
| batch_accum.clear() | |
| logging.info(f"Worker {gpu_idx} finished flushing batch") | |
| while True: | |
| try: | |
| item = queue_in.get(timeout=2.0) | |
| except Empty: | |
| # Periodic check for cancellation if main thread is slow | |
| if job_id: _check_cancellation(job_id) | |
| continue | |
| try: | |
| if item is None: | |
| logging.info(f"Worker {gpu_idx} received sentinel. Flushing and exiting.") | |
| flush_batch() | |
| break | |
| frame_idx, frame_data = item | |
| # logging.info(f"Worker {gpu_idx} got frame {frame_idx}") # Verbose | |
| if frame_idx % 30 == 0: | |
| logging.info("Processing frame %d on device %s", frame_idx, "cpu" if num_gpus==0 else f"cuda:{gpu_idx}") | |
| batch_accum.append((frame_idx, frame_data)) | |
| if len(batch_accum) >= batch_size: | |
| flush_batch() | |
| except BaseException as e: | |
| logging.exception(f"Worker {gpu_idx} CRASHED processing frame. Recovering...") | |
| # Emit empty/failed frames for the batch to keep sequence alive | |
| for idx, frm in batch_accum: | |
| try: | |
| # Fallback: Return original frame with empty detections | |
| queue_out.put((idx, frm, []), timeout=5.0) | |
| logging.info(f"Emitted fallback frame {idx}") | |
| except: | |
| pass | |
| batch_accum.clear() | |
| finally: | |
| queue_in.task_done() | |
| logging.info(f"Worker {gpu_idx} thread exiting normally.") | |
| # 6. Start Workers | |
| workers = [] | |
| num_workers = len(detectors) | |
| # If using CPU, maybe use more threads? No, CPU models usually multithread internally. | |
| # If using GPU, 1 thread per GPU is efficient. | |
| for i in range(num_workers): | |
| t = Thread(target=worker_task, args=(i,), daemon=True) | |
| t.start() | |
| workers.append(t) | |
| # 7. Start Writer / Output Collection (Main Thread or separate) | |
| # We will run writer logic in the main thread after feeding is done? | |
| # No, we must write continuously. | |
| all_detections_map = {} | |
| # writer_finished initialized earlier | |
| # writer_finished = False | |
| # --- GPT Enrichment Thread (non-blocking) --- | |
| # Runs LLM relevance + GPT threat assessment off the writer's critical path. | |
| gpt_enrichment_queue = Queue(maxsize=4) | |
| _relevance_refined = Event() | |
| def enrichment_thread_fn(tracker_ref): | |
| """Dedicated thread for GPT/LLM calls. Receives work from writer, injects results via tracker.""" | |
| while True: | |
| item = gpt_enrichment_queue.get() | |
| if item is None: | |
| break # Sentinel — shutdown | |
| frame_idx, frame_data, gpt_dets, ms = item | |
| try: | |
| gpt_res = run_enrichment( | |
| frame_idx, frame_data, gpt_dets, ms, | |
| first_frame_gpt_results=first_frame_gpt_results, | |
| job_id=job_id, | |
| relevance_refined_event=_relevance_refined, | |
| ) | |
| if gpt_res: | |
| tracker_ref.inject_metadata(gpt_dets) | |
| logging.info("Enrichment: GPT results injected into tracker for frame %d", frame_idx) | |
| except Exception as e: | |
| logging.error("Enrichment thread failed for frame %d: %s", frame_idx, e) | |
| def writer_loop(): | |
| nonlocal writer_finished | |
| next_idx = 0 | |
| buffer = {} | |
| # Initialize Tracker & Speed Estimator | |
| tracker = ByteTracker(frame_rate=fps) | |
| speed_est = SpeedEstimator(fps=fps) | |
| gpt_submitted = False # GPT enrichment submitted once for frame 0 | |
| # Start enrichment thread | |
| enrich_thread = Thread(target=enrichment_thread_fn, args=(tracker,), daemon=True) | |
| enrich_thread.start() | |
| try: | |
| with StreamingVideoWriter(output_video_path, fps, width, height) as writer: | |
| while next_idx < total_frames: | |
| # Fetch from queue | |
| try: | |
| while next_idx not in buffer: | |
| # Backpressure: bound the reorder buffer to prevent memory blowup | |
| if len(buffer) > 128: | |
| logging.warning("Writer reorder buffer too large (%d items), applying backpressure (waiting for frame %d)...", len(buffer), next_idx) | |
| time.sleep(0.05) | |
| item = queue_out.get(timeout=1.0) # wait | |
| idx, p_frame, dets = item | |
| buffer[idx] = (p_frame, dets) | |
| # Write next_idx | |
| p_frame, dets = buffer.pop(next_idx) | |
| # --- SEQUENTIAL TRACKING --- | |
| # Run tracker FIRST so detections get real track_id from ByteTracker | |
| pre_track_count = len(dets) | |
| dets = tracker.update(dets) | |
| if (next_idx % 30 == 0) or (pre_track_count > 0 and len(dets) == 0): | |
| logging.info("Writer frame %d: %d detections in -> %d tracked out", | |
| next_idx, pre_track_count, len(dets)) | |
| speed_est.estimate(dets) | |
| # --- RELEVANCE GATE (deterministic, fast — stays in writer) --- | |
| if mission_spec: | |
| if (mission_spec.parse_mode == "LLM_EXTRACTED" | |
| and not _relevance_refined.is_set()): | |
| # LLM post-filter hasn't run yet — pass all through | |
| for d in dets: | |
| d["mission_relevant"] = True | |
| d["relevance_reason"] = "pending_llm_postfilter" | |
| gpt_dets = dets | |
| else: | |
| # Normal deterministic gate (with refined or FAST_PATH classes) | |
| for d in dets: | |
| decision = evaluate_relevance(d, mission_spec.relevance_criteria) | |
| d["mission_relevant"] = decision.relevant | |
| d["relevance_reason"] = decision.reason | |
| if not decision.relevant: | |
| logging.info( | |
| json_module.dumps({ | |
| "event": "relevance_decision", | |
| "track_id": d.get("track_id"), | |
| "label": d.get("label"), | |
| "relevant": False, | |
| "reason": decision.reason, | |
| "required_classes": mission_spec.relevance_criteria.required_classes, | |
| "frame": next_idx, | |
| }) | |
| ) | |
| gpt_dets = [d for d in dets if d.get("mission_relevant", True)] | |
| else: | |
| for d in dets: | |
| d["mission_relevant"] = None | |
| gpt_dets = dets | |
| # --- GPT ENRICHMENT (non-blocking, offloaded to enrichment thread) --- | |
| if enable_gpt and gpt_dets and not gpt_submitted: | |
| # Tag as pending — enrichment thread will update to ASSESSED later | |
| for d in gpt_dets: | |
| d["assessment_status"] = AssessmentStatus.PENDING_GPT | |
| try: | |
| gpt_enrichment_queue.put( | |
| (next_idx, p_frame.copy(), gpt_dets, mission_spec), | |
| timeout=1.0, | |
| ) | |
| gpt_submitted = True | |
| logging.info("Writer: offloaded GPT enrichment for frame %d", next_idx) | |
| except Full: | |
| logging.warning("GPT enrichment queue full, skipping frame 0 GPT") | |
| # Tag unassessed detections (INV-6) | |
| for d in dets: | |
| if "assessment_status" not in d: | |
| d["assessment_status"] = AssessmentStatus.UNASSESSED | |
| # --- RENDER BOXES & OVERLAYS --- | |
| if dets: | |
| display_boxes = np.array([d['bbox'] for d in dets]) | |
| display_labels = [] | |
| for d in dets: | |
| if d.get("mission_relevant") is False: | |
| display_labels.append("") | |
| continue | |
| lbl = d.get('label', 'obj') | |
| display_labels.append(lbl) | |
| p_frame = draw_boxes(p_frame, display_boxes, label_names=display_labels) | |
| writer.write(p_frame) | |
| if stream_queue: | |
| try: | |
| from jobs.streaming import publish_frame as _publish | |
| if job_id: | |
| _publish(job_id, p_frame) | |
| else: | |
| stream_queue.put(p_frame, timeout=0.01) | |
| except: | |
| pass | |
| all_detections_map[next_idx] = dets | |
| # Store tracks for frontend access | |
| if job_id: | |
| set_track_data(job_id, next_idx, dets) | |
| next_idx += 1 | |
| if next_idx % 30 == 0: | |
| logging.debug("Wrote frame %d/%d", next_idx, total_frames) | |
| except Empty: | |
| # Normal when waiting for out-of-order worker output. | |
| if job_id: | |
| _check_cancellation(job_id) | |
| if not any(w.is_alive() for w in workers) and queue_out.empty(): | |
| logging.error( | |
| "Workers stopped unexpectedly while waiting for frame %d.", | |
| next_idx, | |
| ) | |
| break | |
| continue | |
| except Exception: | |
| logging.exception("Writer loop processing error at index %d", next_idx) | |
| if job_id: | |
| _check_cancellation(job_id) | |
| if not any(w.is_alive() for w in workers) and queue_out.empty(): | |
| logging.error( | |
| "Workers stopped unexpectedly while writer handled frame %d.", | |
| next_idx, | |
| ) | |
| break | |
| continue | |
| except Exception as e: | |
| logging.exception("Writer loop failed") | |
| finally: | |
| logging.info("Writer loop finished. Wrote %d frames (target %d)", next_idx, total_frames) | |
| # Shut down enrichment thread | |
| try: | |
| gpt_enrichment_queue.put(None, timeout=5.0) | |
| enrich_thread.join(timeout=30) | |
| except Exception: | |
| logging.warning("Enrichment thread shutdown timed out") | |
| writer_finished = True | |
| writer_thread = Thread(target=writer_loop, daemon=True) | |
| writer_thread.start() | |
| # 8. Feed Frames (Main Thread) | |
| # 8. Feed Frames (Main Thread) | |
| try: | |
| frames_fed = 0 | |
| reader_iter = iter(reader) | |
| while True: | |
| _check_cancellation(job_id) | |
| if max_frames is not None and frames_fed >= max_frames: | |
| break | |
| try: | |
| frame = next(reader_iter) | |
| except StopIteration: | |
| break | |
| queue_in.put((frames_fed, frame)) # Blocks if full | |
| frames_fed += 1 | |
| logging.info("Feeder finished. Fed %d frames (expected %d)", frames_fed, total_frames) | |
| # Update total_frames to actual count so writer knows when to stop | |
| if frames_fed != total_frames: | |
| logging.info("Updating total_frames from %d to %d (actual fed)", total_frames, frames_fed) | |
| total_frames = frames_fed | |
| # Signal workers to stop | |
| for _ in range(num_workers): | |
| try: | |
| queue_in.put(None, timeout=5.0) # Using timeout to prevent infinite block | |
| except Full: | |
| logging.warning("Failed to send stop signal to a worker (queue full)") | |
| # Wait for queue to process | |
| queue_in.join() | |
| except Exception as e: | |
| logging.exception("Feeding frames failed") | |
| # Ensure we try to signal workers even on error | |
| for _ in range(num_workers): | |
| try: | |
| queue_in.put_nowait(None) | |
| except Full: pass | |
| raise | |
| finally: | |
| reader.close() | |
| # Wait for writer | |
| writer_thread.join() | |
| # Sort detections | |
| sorted_detections = [] | |
| # If we crashed early, we return what we have | |
| max_key = max(all_detections_map.keys()) if all_detections_map else -1 | |
| for i in range(max_key + 1): | |
| sorted_detections.append(all_detections_map.get(i, [])) | |
| logging.info("Inference complete. Output: %s", output_video_path) | |
| return output_video_path, sorted_detections | |
| def _gsam2_render_frame( | |
| frame_dir: str, | |
| frame_names: List[str], | |
| frame_idx: int, | |
| frame_objects: Dict, | |
| height: int, | |
| width: int, | |
| masks_only: bool = False, | |
| frame_store=None, | |
| ) -> np.ndarray: | |
| """Render a single GSAM2 tracking frame (masks + boxes). CPU-only. | |
| When *masks_only* is True, skip box rendering so the writer thread can | |
| draw boxes later with enriched (GPT) labels. | |
| """ | |
| if frame_store is not None: | |
| frame = frame_store.get_bgr(frame_idx).copy() # .copy() — render mutates | |
| else: | |
| frame_path = os.path.join(frame_dir, frame_names[frame_idx]) | |
| frame = cv2.imread(frame_path) | |
| if frame is None: | |
| return np.zeros((height, width, 3), dtype=np.uint8) | |
| if not frame_objects: | |
| return frame | |
| masks_list: List[np.ndarray] = [] | |
| boxes_list: List[List[int]] = [] | |
| box_labels: List[str] = [] | |
| for _obj_id, obj_info in frame_objects.items(): | |
| mask = obj_info.mask | |
| label = obj_info.class_name | |
| if mask is not None: | |
| if isinstance(mask, torch.Tensor): | |
| mask_np = mask.cpu().numpy().astype(bool) | |
| else: | |
| mask_np = np.asarray(mask).astype(bool) | |
| if mask_np.shape[:2] != (height, width): | |
| mask_np = cv2.resize( | |
| mask_np.astype(np.uint8), | |
| (width, height), | |
| interpolation=cv2.INTER_NEAREST, | |
| ).astype(bool) | |
| masks_list.append(mask_np) | |
| has_box = not ( | |
| obj_info.x1 == 0 and obj_info.y1 == 0 | |
| and obj_info.x2 == 0 and obj_info.y2 == 0 | |
| ) | |
| if has_box: | |
| boxes_list.append([obj_info.x1, obj_info.y1, obj_info.x2, obj_info.y2]) | |
| box_labels.append(label) | |
| if masks_list: | |
| # Always pass labels=None here; label text is drawn by draw_boxes | |
| # below to avoid duplicate label rendering. | |
| frame = draw_masks(frame, np.stack(masks_list), labels=None) | |
| if boxes_list and not masks_only: | |
| frame = draw_boxes(frame, np.array(boxes_list), label_names=box_labels) | |
| return frame | |
| def run_grounded_sam2_tracking( | |
| input_video_path: str, | |
| output_video_path: str, | |
| queries: List[str], | |
| max_frames: Optional[int] = None, | |
| segmenter_name: Optional[str] = None, | |
| job_id: Optional[str] = None, | |
| stream_queue: Optional[Queue] = None, | |
| step: int = 20, | |
| enable_gpt: bool = False, | |
| mission_spec=None, # Optional[MissionSpecification] | |
| first_frame_gpt_results: Optional[Dict[str, Any]] = None, | |
| _perf_metrics: Optional[Dict[str, float]] = None, | |
| _perf_lock=None, | |
| num_maskmem: Optional[int] = None, | |
| detector_name: Optional[str] = None, | |
| _ttfs_t0: Optional[float] = None, | |
| ) -> str: | |
| """Run Grounded-SAM-2 video tracking pipeline. | |
| Uses multi-GPU data parallelism when multiple GPUs are available. | |
| Falls back to single-GPU ``process_video`` otherwise. | |
| """ | |
| import copy | |
| import shutil | |
| from contextlib import nullcontext | |
| from PIL import Image as PILImage | |
| from utils.video import extract_frames_to_jpeg_dir | |
| from utils.frame_store import SharedFrameStore, MemoryBudgetExceeded | |
| from models.segmenters.grounded_sam2 import MaskDictionary, ObjectInfo, LazyFrameObjects | |
| active_segmenter = segmenter_name or "GSAM2-L" | |
| def _ttfs(msg): | |
| if _ttfs_t0 is not None: | |
| logging.info("[TTFS:%s] +%.1fs %s", job_id, time.perf_counter() - _ttfs_t0, msg) | |
| _ttfs("enter run_grounded_sam2_tracking") | |
| logging.info( | |
| "Grounded-SAM-2 tracking: segmenter=%s, queries=%s, step=%d", | |
| active_segmenter, queries, step, | |
| ) | |
| # 1. Load frames — prefer in-memory SharedFrameStore, fall back to JPEG dir | |
| _use_frame_store = True | |
| frame_store = None | |
| _t_ext = time.perf_counter() | |
| try: | |
| frame_store = SharedFrameStore(input_video_path, max_frames=max_frames) | |
| fps, width, height = frame_store.fps, frame_store.width, frame_store.height | |
| total_frames = len(frame_store) | |
| frame_names = [f"{i:06d}.jpg" for i in range(total_frames)] | |
| # Write single dummy JPEG for init_state bootstrapping | |
| dummy_frame_dir = tempfile.mkdtemp(prefix="gsam2_dummy_") | |
| cv2.imwrite(os.path.join(dummy_frame_dir, "000000.jpg"), frame_store.get_bgr(0)) | |
| frame_dir = dummy_frame_dir | |
| logging.info("SharedFrameStore: %d frames in memory (dummy dir: %s)", total_frames, frame_dir) | |
| except MemoryBudgetExceeded: | |
| logging.info("Memory budget exceeded, falling back to JPEG extraction") | |
| _use_frame_store = False | |
| frame_store = None | |
| frame_dir = tempfile.mkdtemp(prefix="gsam2_frames_") | |
| frame_names, fps, width, height = extract_frames_to_jpeg_dir( | |
| input_video_path, frame_dir, max_frames=max_frames, | |
| ) | |
| total_frames = len(frame_names) | |
| try: | |
| if _perf_metrics is not None: | |
| _t_e2e = time.perf_counter() | |
| if torch.cuda.is_available(): | |
| torch.cuda.reset_peak_memory_stats() | |
| _perf_metrics["frame_extraction_ms"] = (time.perf_counter() - _t_ext) * 1000.0 | |
| _ttfs(f"frame_extraction done ({total_frames} frames, in_memory={_use_frame_store})") | |
| logging.info("Loaded %d frames (in_memory=%s)", total_frames, _use_frame_store) | |
| num_gpus = torch.cuda.device_count() | |
| # ================================================================== | |
| # Phase 5: Parallel rendering + sequential video writing | |
| # (Hoisted above tracking so render pipeline starts before tracking | |
| # completes — segments are fed incrementally via callback / queue.) | |
| # ================================================================== | |
| _check_cancellation(job_id) | |
| render_in: Queue = Queue(maxsize=32) | |
| render_out: Queue = Queue(maxsize=128) | |
| render_done = False | |
| num_render_workers = min(4, os.cpu_count() or 1) | |
| def _render_worker(): | |
| while True: | |
| item = render_in.get() | |
| if item is None: | |
| break | |
| fidx, fobjs = item | |
| try: | |
| # Deferred GPU->CPU: materialize lazy objects in render thread | |
| if isinstance(fobjs, LazyFrameObjects): | |
| fobjs = fobjs.materialize() | |
| if _perf_metrics is not None: | |
| _t_r = time.perf_counter() | |
| frm = _gsam2_render_frame( | |
| frame_dir, frame_names, fidx, fobjs, | |
| height, width, | |
| masks_only=enable_gpt, | |
| frame_store=frame_store, | |
| ) | |
| if _perf_metrics is not None: | |
| _r_ms = (time.perf_counter() - _t_r) * 1000.0 | |
| if _perf_lock: | |
| with _perf_lock: _perf_metrics["render_total_ms"] += _r_ms | |
| else: | |
| _perf_metrics["render_total_ms"] += _r_ms | |
| payload = (fidx, frm, fobjs) if enable_gpt else (fidx, frm, {}) | |
| while True: | |
| try: | |
| render_out.put(payload, timeout=1.0) | |
| break | |
| except Full: | |
| if render_done: | |
| return | |
| except Exception: | |
| logging.exception("Render failed for frame %d", fidx) | |
| blank = np.zeros((height, width, 3), dtype=np.uint8) | |
| try: | |
| render_out.put((fidx, blank, {}), timeout=5.0) | |
| except Full: | |
| pass | |
| r_workers = [ | |
| Thread(target=_render_worker, daemon=True) | |
| for _ in range(num_render_workers) | |
| ] | |
| for t in r_workers: | |
| t.start() | |
| # --- ObjectInfo → detection dict adapter --- | |
| def _objectinfo_to_dets(frame_objects_dict): | |
| dets = [] | |
| for obj_id, info in frame_objects_dict.items(): | |
| dets.append({ | |
| "label": info.class_name, | |
| "bbox": [info.x1, info.y1, info.x2, info.y2], | |
| "score": 1.0, | |
| "track_id": f"T{obj_id:02d}", | |
| "instance_id": obj_id, | |
| }) | |
| return dets | |
| # --- GPT enrichment thread (when enabled) --- | |
| gpt_enrichment_queue: Queue = Queue(maxsize=4) | |
| gpt_data_by_track: Dict[str, Dict] = {} | |
| gpt_data_lock = RLock() | |
| _relevance_refined = Event() | |
| def _gsam2_enrichment_thread_fn(): | |
| while True: | |
| item = gpt_enrichment_queue.get() | |
| if item is None: | |
| break | |
| frame_idx, frame_data, gpt_dets, ms = item | |
| try: | |
| gpt_res = run_enrichment( | |
| frame_idx, frame_data, gpt_dets, ms, | |
| first_frame_gpt_results=first_frame_gpt_results, | |
| job_id=job_id, | |
| relevance_refined_event=_relevance_refined, | |
| ) | |
| # GSAM2-specific: store results in per-track dict and persist to job storage | |
| if gpt_res: | |
| for d in gpt_dets: | |
| tid = d.get("track_id") | |
| if tid and tid in gpt_res: | |
| merged = dict(gpt_res[tid]) | |
| merged["gpt_raw"] = gpt_res[tid] | |
| merged["assessment_frame_index"] = frame_idx | |
| merged["assessment_status"] = merged.get( | |
| "assessment_status", AssessmentStatus.ASSESSED | |
| ) | |
| with gpt_data_lock: | |
| gpt_data_by_track[tid] = merged | |
| logging.info("GSAM2 enrichment: GPT results stored for %d tracks", len(gpt_data_by_track)) | |
| # Persist GPT-enriched detections to job storage so | |
| # frontend polling (/detect/status) picks them up. | |
| if job_id: | |
| try: | |
| from jobs.storage import get_job_storage as _gjs | |
| _st = _gjs().get(job_id) | |
| if _st and _st.first_frame_detections: | |
| for det in _st.first_frame_detections: | |
| tid = det.get("track_id") | |
| with gpt_data_lock: | |
| payload = gpt_data_by_track.get(tid) | |
| if payload: | |
| det.update(payload) | |
| # Also sync relevance from gpt_dets | |
| src = next((d for d in gpt_dets if d.get("track_id") == tid), None) | |
| if src: | |
| if "mission_relevant" in src: | |
| det["mission_relevant"] = src["mission_relevant"] | |
| if "relevance_reason" in src: | |
| det["relevance_reason"] = src["relevance_reason"] | |
| from jobs.storage import get_job_storage as _gjs2 | |
| _gjs2().update( | |
| job_id, | |
| first_frame_detections=_st.first_frame_detections, | |
| first_frame_gpt_results=gpt_res, | |
| ) | |
| logging.info( | |
| "GSAM2 enrichment: updated first_frame_detections in job storage for %s", | |
| job_id, | |
| ) | |
| except Exception: | |
| logging.exception( | |
| "GSAM2 enrichment: failed to update job storage for %s", job_id | |
| ) | |
| except Exception as e: | |
| logging.error("GSAM2 enrichment thread failed for frame %d: %s", frame_idx, e) | |
| # Shared streaming state (publisher ↔ writer) | |
| _stream_deque: collections.deque = collections.deque(maxlen=200) | |
| _stream_lock = RLock() | |
| _stream_writer_done = Event() | |
| def _writer_loop(): | |
| nonlocal render_done | |
| _first_deposit = False | |
| next_idx = 0 | |
| buf: Dict[int, Tuple] = {} | |
| # Per-track bbox history (replaces ByteTracker for GSAM2) | |
| track_history: Dict[int, List] = {} | |
| speed_est = SpeedEstimator(fps=fps) if enable_gpt else None | |
| gpt_submitted = False | |
| # Start enrichment thread when GPT enabled | |
| enrich_thread = None | |
| if enable_gpt: | |
| enrich_thread = Thread(target=_gsam2_enrichment_thread_fn, daemon=True) | |
| enrich_thread.start() | |
| try: | |
| with StreamingVideoWriter( | |
| output_video_path, fps, width, height | |
| ) as writer: | |
| # --- Write + stream (publisher handles pacing) --- | |
| while next_idx < total_frames: | |
| try: | |
| while next_idx not in buf: | |
| if len(buf) > 128: | |
| logging.warning( | |
| "Render reorder buffer large (%d), " | |
| "waiting for frame %d", | |
| len(buf), next_idx, | |
| ) | |
| time.sleep(0.05) | |
| idx, frm, fobjs = render_out.get(timeout=1.0) | |
| buf[idx] = (frm, fobjs) | |
| frm, fobjs = buf.pop(next_idx) | |
| # --- GPT enrichment path --- | |
| if enable_gpt and fobjs: | |
| dets = _objectinfo_to_dets(fobjs) | |
| # Maintain per-track bbox history (30-frame window) | |
| for det in dets: | |
| iid = det["instance_id"] | |
| track_history.setdefault(iid, []).append(det["bbox"]) | |
| if len(track_history[iid]) > 30: | |
| track_history[iid].pop(0) | |
| # Store an immutable per-frame snapshot. | |
| det["history"] = list(track_history[iid]) | |
| # Speed estimation | |
| if speed_est: | |
| speed_est.estimate(dets) | |
| # Relevance gate | |
| if mission_spec: | |
| if (mission_spec.parse_mode == "LLM_EXTRACTED" | |
| and not _relevance_refined.is_set()): | |
| for d in dets: | |
| d["mission_relevant"] = True | |
| d["relevance_reason"] = "pending_llm_postfilter" | |
| gpt_dets = dets | |
| else: | |
| for d in dets: | |
| decision = evaluate_relevance(d, mission_spec.relevance_criteria) | |
| d["mission_relevant"] = decision.relevant | |
| d["relevance_reason"] = decision.reason | |
| gpt_dets = [d for d in dets if d.get("mission_relevant", True)] | |
| else: | |
| for d in dets: | |
| d["mission_relevant"] = None | |
| gpt_dets = dets | |
| # GPT enrichment (one-shot, first frame with detections) | |
| if gpt_dets and not gpt_submitted: | |
| for d in gpt_dets: | |
| d["assessment_status"] = AssessmentStatus.PENDING_GPT | |
| try: | |
| gpt_enrichment_queue.put( | |
| ( | |
| next_idx, | |
| frm.copy(), | |
| copy.deepcopy(gpt_dets), | |
| mission_spec, | |
| ), | |
| timeout=1.0, | |
| ) | |
| gpt_submitted = True | |
| logging.info("GSAM2 writer: offloaded GPT enrichment for frame %d", next_idx) | |
| except Full: | |
| logging.warning("GSAM2 GPT enrichment queue full, skipping") | |
| # Merge persistent GPT data | |
| for det in dets: | |
| tid = det["track_id"] | |
| with gpt_data_lock: | |
| gpt_payload = gpt_data_by_track.get(tid) | |
| if gpt_payload: | |
| det.update(gpt_payload) | |
| det["assessment_status"] = AssessmentStatus.ASSESSED | |
| elif "assessment_status" not in det: | |
| det["assessment_status"] = AssessmentStatus.UNASSESSED | |
| # Build enriched display labels | |
| display_labels = [] | |
| for d in dets: | |
| if d.get("mission_relevant") is False: | |
| display_labels.append("") | |
| continue | |
| lbl = d.get("label", "obj") | |
| if d.get("gpt_distance_m") is not None: | |
| try: | |
| lbl = f"{lbl} {int(float(d['gpt_distance_m']))}m" | |
| except (TypeError, ValueError): | |
| pass | |
| display_labels.append(lbl) | |
| # Draw boxes on mask-rendered frame | |
| if dets: | |
| boxes = np.array([d["bbox"] for d in dets]) | |
| frm = draw_boxes(frm, boxes, label_names=display_labels) | |
| # Store tracks for frontend | |
| if job_id: | |
| set_track_data(job_id, next_idx, copy.deepcopy(dets)) | |
| elif enable_gpt: | |
| # No objects this frame — still store empty track data | |
| if job_id: | |
| set_track_data(job_id, next_idx, []) | |
| if _perf_metrics is not None: | |
| _t_w = time.perf_counter() | |
| # Write to video file (always, single copy) | |
| writer.write(frm) | |
| if _perf_metrics is not None: | |
| _perf_metrics["writer_total_ms"] += (time.perf_counter() - _t_w) * 1000.0 | |
| # --- Deposit frame for stream publisher --- | |
| if stream_queue or job_id: | |
| with _stream_lock: | |
| _stream_deque.append(frm) | |
| if not _first_deposit: | |
| _first_deposit = True | |
| _ttfs("first_frame_deposited_to_deque") | |
| next_idx += 1 | |
| if next_idx % 30 == 0: | |
| logging.info( | |
| "Rendered frame %d / %d", | |
| next_idx, total_frames, | |
| ) | |
| except Empty: | |
| if job_id: | |
| _check_cancellation(job_id) | |
| if not any(t.is_alive() for t in r_workers) and render_out.empty(): | |
| logging.error( | |
| "Render workers stopped while waiting " | |
| "for frame %d", next_idx, | |
| ) | |
| break | |
| continue | |
| finally: | |
| render_done = True | |
| _stream_writer_done.set() | |
| # Shut down enrichment thread | |
| if enrich_thread: | |
| try: | |
| gpt_enrichment_queue.put(None, timeout=5.0) | |
| enrich_thread.join(timeout=30) | |
| except Exception: | |
| logging.warning("GSAM2 enrichment thread shutdown timed out") | |
| def _stream_publisher_thread(): | |
| """Adaptive-rate publisher: reads from _stream_deque, publishes at measured pace.""" | |
| from jobs.streaming import publish_frame as _pub | |
| STARTUP_WAIT = 5.0 # max seconds to accumulate before streaming | |
| MIN_FPS = 2.0 | |
| MAX_FPS = 30.0 | |
| HEARTBEAT_INTERVAL = 0.5 # re-publish last frame if deque empty | |
| LOW_WATER = 10 | |
| HIGH_WATER = 50 | |
| ADJUST_INTERVAL = 1.0 | |
| last_frame = None | |
| published = 0 | |
| # --- Phase 1: startup accumulation --- | |
| t_start = time.perf_counter() | |
| while True: | |
| elapsed = time.perf_counter() - t_start | |
| if elapsed >= STARTUP_WAIT: | |
| break | |
| if _stream_writer_done.is_set(): | |
| break | |
| time.sleep(0.1) | |
| with _stream_lock: | |
| accumulated = len(_stream_deque) | |
| elapsed = max(time.perf_counter() - t_start, 0.1) | |
| r_prod = accumulated / elapsed if accumulated > 0 else 10.0 | |
| r_stream = max(MIN_FPS, min(MAX_FPS, 0.85 * r_prod)) | |
| logging.info( | |
| "Stream publisher started: R_prod=%.1f fps, R_stream=%.1f fps, " | |
| "accumulated=%d frames in %.1fs", | |
| r_prod, r_stream, accumulated, elapsed, | |
| ) | |
| _ttfs(f"publisher: startup_wait done ({accumulated} frames in {elapsed:.1f}s)") | |
| # --- Phase 2: adaptive streaming --- | |
| last_adjust = time.perf_counter() | |
| last_publish_time = 0.0 | |
| while True: | |
| frame_interval = 1.0 / r_stream | |
| # Try to pop a frame | |
| frame = None | |
| with _stream_lock: | |
| if _stream_deque: | |
| frame = _stream_deque.popleft() | |
| if frame is not None: | |
| last_frame = frame | |
| if job_id: | |
| _pub(job_id, frame) | |
| elif stream_queue: | |
| try: | |
| stream_queue.put(frame, timeout=0.01) | |
| except Exception: | |
| pass | |
| if published == 0: | |
| _ttfs("first_publish_frame") | |
| published += 1 | |
| last_publish_time = time.perf_counter() | |
| time.sleep(frame_interval) | |
| else: | |
| # Deque empty — check termination | |
| if _stream_writer_done.is_set(): | |
| with _stream_lock: | |
| if not _stream_deque: | |
| break | |
| continue | |
| # Heartbeat: re-publish last frame to keep MJPEG alive | |
| now = time.perf_counter() | |
| if last_frame is not None and (now - last_publish_time) >= HEARTBEAT_INTERVAL: | |
| if job_id: | |
| _pub(job_id, last_frame) | |
| elif stream_queue: | |
| try: | |
| stream_queue.put(last_frame, timeout=0.01) | |
| except Exception: | |
| pass | |
| last_publish_time = now | |
| time.sleep(0.05) | |
| # Adaptive rate adjustment (every ~1s) | |
| now = time.perf_counter() | |
| if now - last_adjust >= ADJUST_INTERVAL: | |
| with _stream_lock: | |
| level = len(_stream_deque) | |
| if level < LOW_WATER: | |
| r_stream = max(MIN_FPS, r_stream * 0.9) | |
| elif level > HIGH_WATER: | |
| r_stream = min(MAX_FPS, r_stream * 1.05) | |
| last_adjust = now | |
| # Publish final frame | |
| if last_frame is not None: | |
| if job_id: | |
| _pub(job_id, last_frame) | |
| elif stream_queue: | |
| try: | |
| stream_queue.put(last_frame, timeout=0.01) | |
| except Exception: | |
| pass | |
| logging.info("Stream publisher finished: published %d frames", published) | |
| writer_thread = Thread(target=_writer_loop, daemon=True) | |
| writer_thread.start() | |
| _publisher_thread = None | |
| if stream_queue or job_id: | |
| _publisher_thread = Thread(target=_stream_publisher_thread, daemon=True) | |
| _publisher_thread.start() | |
| _ttfs("writer+publisher threads started") | |
| # ================================================================== | |
| # Phase 1-4: Tracking (single-GPU fallback vs multi-GPU pipeline) | |
| # Segments are fed incrementally to render_in as they complete. | |
| # ================================================================== | |
| try: | |
| if num_gpus <= 1: | |
| # ---------- Single-GPU fallback ---------- | |
| device_str = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| _seg_kw = {"num_maskmem": num_maskmem} if num_maskmem is not None else {} | |
| if detector_name is not None: | |
| _seg_kw["detector_name"] = detector_name | |
| if _perf_metrics is not None: | |
| _t_load = time.perf_counter() | |
| segmenter = load_segmenter_on_device(active_segmenter, device_str, **_seg_kw) | |
| _check_cancellation(job_id) | |
| if _perf_metrics is not None: | |
| _perf_metrics["model_load_ms"] = (time.perf_counter() - _t_load) * 1000.0 | |
| segmenter._perf_metrics = _perf_metrics | |
| segmenter._perf_lock = None | |
| _ttfs(f"model loaded ({active_segmenter})") | |
| if _perf_metrics is not None: | |
| _t_track = time.perf_counter() | |
| def _feed_segment(seg_frames): | |
| """Fallback for empty/carry-forward segments (already CPU).""" | |
| for fidx in sorted(seg_frames.keys()): | |
| render_in.put((fidx, seg_frames[fidx])) | |
| def _feed_segment_gpu(segment_output): | |
| """Feed LazyFrameObjects into render_in (GPU->CPU deferred).""" | |
| # Deduplicate: frame_indices has one entry per (frame, obj) | |
| seen = set() | |
| for fi in segment_output.frame_indices: | |
| if fi not in seen: | |
| seen.add(fi) | |
| render_in.put((fi, LazyFrameObjects(segment_output, fi))) | |
| _ttfs("process_video started") | |
| tracking_results = segmenter.process_video( | |
| frame_dir, frame_names, queries, | |
| on_segment=_feed_segment, | |
| on_segment_output=_feed_segment_gpu, | |
| _ttfs_t0=_ttfs_t0, | |
| _ttfs_job_id=job_id, | |
| frame_store=frame_store, | |
| ) | |
| if _perf_metrics is not None: | |
| _perf_metrics["tracking_total_ms"] = (time.perf_counter() - _t_track) * 1000.0 | |
| logging.info( | |
| "Single-GPU tracking complete: %d frames", | |
| len(tracking_results), | |
| ) | |
| else: | |
| # ---------- Multi-GPU pipeline ---------- | |
| logging.info( | |
| "Multi-GPU GSAM2 tracking: %d GPUs, %d frames, step=%d", | |
| num_gpus, total_frames, step, | |
| ) | |
| # Phase 1: Load one segmenter per GPU (parallel) | |
| if _perf_metrics is not None: | |
| _t_load = time.perf_counter() | |
| segmenters = [] | |
| with ThreadPoolExecutor(max_workers=num_gpus) as pool: | |
| _seg_kw_multi = {"num_maskmem": num_maskmem} if num_maskmem is not None else {} | |
| if detector_name is not None: | |
| _seg_kw_multi["detector_name"] = detector_name | |
| futs = [ | |
| pool.submit( | |
| load_segmenter_on_device, | |
| active_segmenter, | |
| f"cuda:{i}", | |
| **_seg_kw_multi, | |
| ) | |
| for i in range(num_gpus) | |
| ] | |
| segmenters = [f.result() for f in futs] | |
| logging.info("Loaded %d segmenters", len(segmenters)) | |
| if _perf_metrics is not None: | |
| _perf_metrics["model_load_ms"] = (time.perf_counter() - _t_load) * 1000.0 | |
| import threading as _th | |
| _actual_lock = _perf_lock or _th.Lock() | |
| for seg in segmenters: | |
| seg._perf_metrics = _perf_metrics | |
| seg._perf_lock = _actual_lock | |
| _ttfs(f"model loaded ({active_segmenter}, {num_gpus} GPUs)") | |
| # Phase 2: Init SAM2 models/state per GPU (parallel) | |
| if _perf_metrics is not None: | |
| _t_init = time.perf_counter() | |
| if frame_store is not None: | |
| # Models are lazy-loaded; ensure at least one is ready so we | |
| # can read image_size. Phase 1 (load_segmenter_on_device) | |
| # only constructs the object — _video_predictor is still None. | |
| segmenters[0]._ensure_models_loaded() | |
| sam2_img_size = segmenters[0]._video_predictor.image_size | |
| # Pre-create the shared adapter (validates memory budget) | |
| shared_adapter = frame_store.sam2_adapter(image_size=sam2_img_size) | |
| _REQUIRED_KEYS = {"images", "num_frames", "video_height", "video_width", "cached_features"} | |
| def _init_seg_state(seg): | |
| seg._ensure_models_loaded() | |
| state = seg._video_predictor.init_state( | |
| video_path=frame_dir, # dummy dir with 1 JPEG | |
| offload_video_to_cpu=True, | |
| async_loading_frames=False, # 1 dummy frame, instant | |
| ) | |
| # Validate expected keys exist before patching | |
| missing = _REQUIRED_KEYS - set(state.keys()) | |
| if missing: | |
| raise RuntimeError(f"SAM2 init_state missing expected keys: {missing}") | |
| # CRITICAL: Clear cached_features BEFORE patching images | |
| # init_state caches dummy frame 0's backbone features — must evict | |
| state["cached_features"] = {} | |
| # Patch in real frame data | |
| state["images"] = shared_adapter | |
| state["num_frames"] = total_frames | |
| state["video_height"] = height | |
| state["video_width"] = width | |
| return state | |
| else: | |
| def _init_seg_state(seg): | |
| seg._ensure_models_loaded() | |
| return seg._video_predictor.init_state( | |
| video_path=frame_dir, | |
| offload_video_to_cpu=True, | |
| async_loading_frames=True, | |
| ) | |
| with ThreadPoolExecutor(max_workers=len(segmenters)) as pool: | |
| futs = [pool.submit(_init_seg_state, seg) for seg in segmenters] | |
| inference_states = [f.result() for f in futs] | |
| if _perf_metrics is not None: | |
| _perf_metrics["init_state_ms"] = (time.perf_counter() - _t_init) * 1000.0 | |
| _t_track = time.perf_counter() | |
| _ttfs("multi-GPU tracking started") | |
| # Phase 3: Parallel segment processing (queue-based workers) | |
| segments = list(range(0, total_frames, step)) | |
| num_total_segments = len(segments) | |
| seg_queue_in: Queue = Queue() | |
| seg_queue_out: Queue = Queue() | |
| for i, start_idx in enumerate(segments): | |
| seg_queue_in.put((i, start_idx)) | |
| for _ in segmenters: | |
| seg_queue_in.put(None) # sentinel | |
| iou_thresh = segmenters[0].iou_threshold | |
| def _segment_worker(gpu_idx: int): | |
| seg = segmenters[gpu_idx] | |
| state = inference_states[gpu_idx] | |
| device_type = seg.device.split(":")[0] | |
| ac = ( | |
| torch.autocast(device_type=device_type, dtype=torch.bfloat16) | |
| if device_type == "cuda" | |
| else nullcontext() | |
| ) | |
| with ac: | |
| while True: | |
| if job_id: | |
| try: | |
| _check_cancellation(job_id) | |
| except RuntimeError as e: | |
| if "cancelled" in str(e).lower(): | |
| logging.info( | |
| "Segment worker %d cancelled.", | |
| gpu_idx, | |
| ) | |
| break | |
| raise | |
| item = seg_queue_in.get() | |
| if item is None: | |
| break | |
| seg_idx, start_idx = item | |
| try: | |
| logging.info( | |
| "GPU %d processing segment %d (frame %d)", | |
| gpu_idx, seg_idx, start_idx, | |
| ) | |
| if frame_store is not None: | |
| image = frame_store.get_pil_rgb(start_idx) | |
| else: | |
| img_path = os.path.join( | |
| frame_dir, frame_names[start_idx] | |
| ) | |
| with PILImage.open(img_path) as pil_img: | |
| image = pil_img.convert("RGB") | |
| if job_id: | |
| _check_cancellation(job_id) | |
| masks, boxes, labels = seg.detect_keyframe( | |
| image, queries, | |
| ) | |
| if masks is None: | |
| seg_queue_out.put( | |
| (seg_idx, start_idx, None, {}) | |
| ) | |
| continue | |
| mask_dict = MaskDictionary() | |
| mask_dict.add_new_frame_annotation( | |
| mask_list=masks, | |
| box_list=( | |
| boxes.clone() | |
| if torch.is_tensor(boxes) | |
| else torch.tensor(boxes) | |
| ), | |
| label_list=labels, | |
| ) | |
| segment_output = seg.propagate_segment( | |
| state, start_idx, mask_dict, step, | |
| ) | |
| seg_queue_out.put( | |
| (seg_idx, start_idx, mask_dict, segment_output) | |
| ) | |
| except RuntimeError as e: | |
| if "cancelled" in str(e).lower(): | |
| logging.info( | |
| "Segment worker %d cancelled.", | |
| gpu_idx, | |
| ) | |
| break | |
| raise | |
| except Exception: | |
| logging.exception( | |
| "Segment %d failed on GPU %d", | |
| seg_idx, gpu_idx, | |
| ) | |
| seg_queue_out.put( | |
| (seg_idx, start_idx, None, {}) | |
| ) | |
| seg_workers = [] | |
| for i in range(num_gpus): | |
| t = Thread( | |
| target=_segment_worker, args=(i,), daemon=True, | |
| ) | |
| t.start() | |
| seg_workers.append(t) | |
| # Phase 4: Streaming reconciliation — process segments in order | |
| # as they arrive, feeding render_in incrementally. | |
| _recon_accum_ms = 0.0 | |
| global_id_counter = 0 | |
| sam2_masks = MaskDictionary() | |
| tracking_results: Dict[int, Dict[int, ObjectInfo]] = {} | |
| def _mask_to_cpu(mask): | |
| """Normalize mask to CPU tensor (still used for keyframe mask_dict).""" | |
| if torch.is_tensor(mask): | |
| return mask.detach().cpu() | |
| return mask | |
| next_seg_idx = 0 | |
| segment_buffer: Dict[int, Tuple] = {} | |
| while next_seg_idx < num_total_segments: | |
| try: | |
| seg_idx, start_idx, mask_dict, segment_output = seg_queue_out.get(timeout=1.0) | |
| except Empty: | |
| if job_id: | |
| _check_cancellation(job_id) | |
| # Check if all segment workers are still alive | |
| if not any(t.is_alive() for t in seg_workers) and seg_queue_out.empty(): | |
| logging.error( | |
| "All segment workers stopped while waiting for segment %d", | |
| next_seg_idx, | |
| ) | |
| break | |
| continue | |
| segment_buffer[seg_idx] = (start_idx, mask_dict, segment_output) | |
| # Process contiguous ready segments in order | |
| while next_seg_idx in segment_buffer: | |
| start_idx, mask_dict, segment_output = segment_buffer.pop(next_seg_idx) | |
| if mask_dict is None or not mask_dict.labels: | |
| # No detections — carry forward previous masks | |
| for fi in range( | |
| start_idx, min(start_idx + step, total_frames) | |
| ): | |
| if fi not in tracking_results: | |
| tracking_results[fi] = ( | |
| { | |
| k: ObjectInfo( | |
| instance_id=v.instance_id, | |
| mask=v.mask, | |
| class_name=v.class_name, | |
| x1=v.x1, y1=v.y1, | |
| x2=v.x2, y2=v.y2, | |
| ) | |
| for k, v in sam2_masks.labels.items() | |
| } | |
| if sam2_masks.labels | |
| else {} | |
| ) | |
| render_in.put((fi, tracking_results.get(fi, {}))) | |
| if next_seg_idx == 0: | |
| _ttfs("first_segment_reconciled (multi-GPU, no detections)") | |
| next_seg_idx += 1 | |
| continue | |
| # Normalize keyframe masks to CPU before cross-GPU IoU matching. | |
| if _perf_metrics is not None: | |
| _t_rc = time.perf_counter() | |
| for info in mask_dict.labels.values(): | |
| info.mask = _mask_to_cpu(info.mask) | |
| # IoU match + get local→global remapping | |
| global_id_counter, remapping = ( | |
| mask_dict.update_masks_with_remapping( | |
| tracking_dict=sam2_masks, | |
| iou_threshold=iou_thresh, | |
| objects_count=global_id_counter, | |
| ) | |
| ) | |
| if not mask_dict.labels: | |
| if _perf_metrics is not None: | |
| _recon_accum_ms += (time.perf_counter() - _t_rc) * 1000.0 | |
| for fi in range( | |
| start_idx, min(start_idx + step, total_frames) | |
| ): | |
| tracking_results[fi] = {} | |
| render_in.put((fi, {})) | |
| if next_seg_idx == 0: | |
| _ttfs("first_segment_reconciled (multi-GPU, empty masks)") | |
| next_seg_idx += 1 | |
| continue | |
| # Materialize ONLY the last frame for IoU tracking continuity | |
| last_fi = segment_output.last_frame_idx() | |
| if last_fi is not None: | |
| last_objs = segment_output.frame_to_object_dict( | |
| last_fi, remapping=remapping, to_cpu=True, | |
| ) | |
| tracking_results[last_fi] = last_objs | |
| sam2_masks = MaskDictionary() | |
| sam2_masks.labels = copy.deepcopy(last_objs) | |
| if last_objs: | |
| first_info = next(iter(last_objs.values())) | |
| if first_info.mask is not None: | |
| m = first_info.mask | |
| sam2_masks.mask_height = ( | |
| m.shape[-2] if m.ndim >= 2 else 0 | |
| ) | |
| sam2_masks.mask_width = ( | |
| m.shape[-1] if m.ndim >= 2 else 0 | |
| ) | |
| if _perf_metrics is not None: | |
| _recon_accum_ms += (time.perf_counter() - _t_rc) * 1000.0 | |
| # Feed LazyFrameObjects to render — GPU->CPU deferred to render workers | |
| seen_fi: set = set() | |
| for fi in segment_output.frame_indices: | |
| if fi not in seen_fi: | |
| seen_fi.add(fi) | |
| render_in.put(( | |
| fi, | |
| LazyFrameObjects(segment_output, fi, remapping), | |
| )) | |
| if next_seg_idx == 0: | |
| _ttfs("first_segment_reconciled (multi-GPU)") | |
| next_seg_idx += 1 | |
| for t in seg_workers: | |
| t.join() | |
| if _perf_metrics is not None: | |
| _perf_metrics["id_reconciliation_ms"] = _recon_accum_ms | |
| _perf_metrics["tracking_total_ms"] = (time.perf_counter() - _t_track) * 1000.0 | |
| logging.info( | |
| "Multi-GPU reconciliation complete: %d frames, %d objects", | |
| len(tracking_results), global_id_counter, | |
| ) | |
| finally: | |
| # Sentinels for render workers — always sent even on error/cancellation | |
| for _ in r_workers: | |
| try: | |
| render_in.put(None, timeout=5.0) | |
| except Full: | |
| pass | |
| for t in r_workers: | |
| t.join() | |
| writer_thread.join() | |
| if _publisher_thread is not None: | |
| _publisher_thread.join(timeout=15) | |
| if _perf_metrics is not None: | |
| _perf_metrics["end_to_end_ms"] = (time.perf_counter() - _t_e2e) * 1000.0 | |
| if torch.cuda.is_available(): | |
| _perf_metrics["gpu_peak_mem_mb"] = torch.cuda.max_memory_allocated() / (1024 * 1024) | |
| logging.info("Grounded-SAM-2 output written to: %s", output_video_path) | |
| return output_video_path | |
| finally: | |
| try: | |
| shutil.rmtree(frame_dir) | |
| logging.info("Cleaned up temp frame dir: %s", frame_dir) | |
| except Exception: | |
| logging.warning("Failed to clean up temp frame dir: %s", frame_dir) | |
| def colorize_depth_map( | |
| depth_map: np.ndarray, | |
| global_min: float, | |
| global_max: float, | |
| ) -> np.ndarray: | |
| """ | |
| Convert depth map to RGB visualization using TURBO colormap. | |
| Args: | |
| depth_map: HxW float32 depth in meters | |
| global_min: Minimum depth across entire video (for stable normalization) | |
| global_max: Maximum depth across entire video (for stable normalization) | |
| Returns: | |
| HxWx3 uint8 RGB image | |
| """ | |
| import cv2 | |
| depth_clean = np.copy(depth_map) | |
| finite_mask = np.isfinite(depth_clean) | |
| if not np.isfinite(global_min) or not np.isfinite(global_max): | |
| if finite_mask.any(): | |
| local_depths = depth_clean[finite_mask].astype(np.float64, copy=False) | |
| global_min = float(np.percentile(local_depths, 1)) | |
| global_max = float(np.percentile(local_depths, 99)) | |
| else: | |
| global_min = 0.0 | |
| global_max = 1.0 | |
| # Replace NaN/inf with min value for visualization | |
| depth_clean[~finite_mask] = global_min | |
| if global_max - global_min < 1e-6: # Handle uniform depth | |
| depth_norm = np.zeros_like(depth_clean, dtype=np.uint8) | |
| else: | |
| # Clip to global range to handle outliers | |
| depth_clipped = np.clip(depth_clean, global_min, global_max) | |
| depth_norm = ((depth_clipped - global_min) / (global_max - global_min) * 255).astype(np.uint8) | |
| # Apply TURBO colormap for vibrant, perceptually uniform visualization | |
| colored = cv2.applyColorMap(depth_norm, cv2.COLORMAP_TURBO) | |
| return colored | |