Spaces:
Paused
Paused
| """Per-frame GPU/CPU profiling for detection and segmentation pipelines. | |
| Provides CUDA event-based timing and decomposed profiling for | |
| transformers-based and opaque (YOLO) detectors. Runs in a dedicated | |
| single-threaded path for accurate, reproducible measurements. | |
| """ | |
| import logging | |
| import statistics | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional, Sequence | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| logger = logging.getLogger(__name__) | |
| # Detectors whose predict() can be decomposed into processor -> model -> post_process | |
| _DECOMPOSABLE_DETECTORS = {"detr_resnet50", "grounding_dino"} | |
| # Detectors with opaque predict() calls (YOLO-based) | |
| _OPAQUE_DETECTORS = {"yolo11", "drone_yolo"} | |
| class TimingStats: | |
| """Aggregate statistics for a set of measurements (in ms).""" | |
| min_ms: float = 0.0 | |
| max_ms: float = 0.0 | |
| mean_ms: float = 0.0 | |
| std_ms: float = 0.0 | |
| p50_ms: float = 0.0 | |
| p95_ms: float = 0.0 | |
| p99_ms: float = 0.0 | |
| count: int = 0 | |
| def from_samples(samples: List[float]) -> "TimingStats": | |
| if not samples: | |
| return TimingStats() | |
| sorted_s = sorted(samples) | |
| n = len(sorted_s) | |
| return TimingStats( | |
| min_ms=sorted_s[0], | |
| max_ms=sorted_s[-1], | |
| mean_ms=statistics.mean(sorted_s), | |
| std_ms=statistics.stdev(sorted_s) if n > 1 else 0.0, | |
| p50_ms=sorted_s[n // 2], | |
| p95_ms=sorted_s[int(n * 0.95)], | |
| p99_ms=sorted_s[int(n * 0.99)], | |
| count=n, | |
| ) | |
| class FrameTiming: | |
| """Timing breakdown for a single frame (all values in ms).""" | |
| frame_idx: int = 0 | |
| decode_ms: float = 0.0 | |
| preprocess_ms: float = 0.0 # CPU: image processor / resize | |
| transfer_ms: float = 0.0 # CPU->GPU data transfer | |
| gpu_kernel_ms: float = 0.0 # GPU model forward pass | |
| postprocess_ms: float = 0.0 # CPU: post-processing + NMS | |
| total_ms: float = 0.0 | |
| num_detections: int = 0 | |
| class ProfilingResult: | |
| """Full profiling result for a video.""" | |
| detector_name: str = "" | |
| mode: str = "" | |
| total_frames: int = 0 | |
| warmup_frames: int = 0 | |
| profiled_frames: int = 0 | |
| video_resolution: str = "" | |
| video_fps: float = 0.0 | |
| # Per-frame timings | |
| frame_timings: List[FrameTiming] = field(default_factory=list) | |
| # Aggregate stats | |
| decode_stats: TimingStats = field(default_factory=TimingStats) | |
| preprocess_stats: TimingStats = field(default_factory=TimingStats) | |
| transfer_stats: TimingStats = field(default_factory=TimingStats) | |
| gpu_kernel_stats: TimingStats = field(default_factory=TimingStats) | |
| postprocess_stats: TimingStats = field(default_factory=TimingStats) | |
| total_stats: TimingStats = field(default_factory=TimingStats) | |
| # GPU memory | |
| gpu_peak_memory_mb: float = 0.0 | |
| gpu_allocated_mb: float = 0.0 | |
| # Throughput | |
| avg_fps: float = 0.0 | |
| avg_detections_per_frame: float = 0.0 | |
| class CudaTimer: | |
| """Non-blocking GPU timer using CUDA events. | |
| Records start/stop on the current CUDA stream; synchronizes lazily | |
| on ``elapsed_ms()`` call. | |
| """ | |
| def __init__(self): | |
| self._start = torch.cuda.Event(enable_timing=True) | |
| self._end = torch.cuda.Event(enable_timing=True) | |
| def start(self): | |
| self._start.record() | |
| def stop(self): | |
| self._end.record() | |
| def elapsed_ms(self) -> float: | |
| self._end.synchronize() | |
| return self._start.elapsed_time(self._end) | |
| def _profile_decomposed(detector, frame: np.ndarray, queries: Sequence[str]) -> FrameTiming: | |
| """Profile a transformers-based detector with decomposed phases. | |
| Works for DETR and Grounding DINO where we can separate: | |
| processor(image) -> .to(device) -> model(**inputs) -> post_process() | |
| """ | |
| timing = FrameTiming() | |
| # 1. Preprocess (CPU) | |
| t0 = time.perf_counter() | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| if hasattr(detector, "processor"): | |
| processor = detector.processor | |
| if hasattr(detector, "_build_prompt"): | |
| # Grounding DINO | |
| prompt = detector._build_prompt(queries) | |
| inputs = processor(images=frame_rgb, text=prompt, return_tensors="pt") | |
| else: | |
| # DETR | |
| inputs = processor(images=frame_rgb, return_tensors="pt") | |
| else: | |
| timing.preprocess_ms = (time.perf_counter() - t0) * 1000 | |
| return timing | |
| timing.preprocess_ms = (time.perf_counter() - t0) * 1000 | |
| # 2. Transfer to GPU | |
| cuda_timer_transfer = CudaTimer() | |
| cuda_timer_transfer.start() | |
| inputs = {key: value.to(detector.device) for key, value in inputs.items()} | |
| cuda_timer_transfer.stop() | |
| timing.transfer_ms = cuda_timer_transfer.elapsed_ms() | |
| # 3. GPU forward pass | |
| cuda_timer_kernel = CudaTimer() | |
| cuda_timer_kernel.start() | |
| with torch.no_grad(): | |
| outputs = detector.model(**inputs) | |
| cuda_timer_kernel.stop() | |
| timing.gpu_kernel_ms = cuda_timer_kernel.elapsed_ms() | |
| # 4. Post-process (CPU) | |
| t0 = time.perf_counter() | |
| target_sizes = torch.tensor([frame.shape[:2]], device=detector.device) | |
| if hasattr(detector, "_post_process"): | |
| # Grounding DINO | |
| processed_list = detector._post_process(outputs, inputs["input_ids"], target_sizes) | |
| else: | |
| # DETR | |
| processed_list = detector.processor.post_process_object_detection( | |
| outputs, threshold=detector.score_threshold, target_sizes=target_sizes, | |
| ) | |
| result = detector._parse_single_result(processed_list[0]) | |
| timing.postprocess_ms = (time.perf_counter() - t0) * 1000 | |
| timing.num_detections = len(result.boxes) | |
| timing.total_ms = timing.preprocess_ms + timing.transfer_ms + timing.gpu_kernel_ms + timing.postprocess_ms | |
| return timing | |
| def _profile_opaque(detector, frame: np.ndarray, queries: Sequence[str]) -> FrameTiming: | |
| """Profile an opaque detector (YOLO) where internals aren't separable.""" | |
| timing = FrameTiming() | |
| # Wrap entire predict() with CUDA events | |
| cuda_timer = CudaTimer() | |
| t0 = time.perf_counter() | |
| cuda_timer.start() | |
| result = detector.predict(frame, queries) | |
| cuda_timer.stop() | |
| wall_ms = (time.perf_counter() - t0) * 1000 | |
| timing.gpu_kernel_ms = cuda_timer.elapsed_ms() | |
| timing.preprocess_ms = 0.0 # Included in gpu_kernel | |
| timing.transfer_ms = -1.0 # Not separable | |
| timing.postprocess_ms = max(0, wall_ms - timing.gpu_kernel_ms) | |
| timing.total_ms = wall_ms | |
| timing.num_detections = len(result.boxes) | |
| return timing | |
| def run_profiled_detection( | |
| video_path: str, | |
| detector_name: str, | |
| queries: List[str], | |
| max_frames: int = 100, | |
| warmup_frames: int = 5, | |
| ) -> ProfilingResult: | |
| """Run profiled detection on a video file. | |
| Single-threaded profiling path (not injected into the multi-threaded | |
| production pipeline) for accurate, reproducible measurements. | |
| """ | |
| from models.model_loader import load_detector | |
| from utils.video import VideoReader | |
| result = ProfilingResult( | |
| detector_name=detector_name, | |
| mode="detection", | |
| warmup_frames=warmup_frames, | |
| ) | |
| # Load detector | |
| detector = load_detector(detector_name) | |
| device = getattr(detector, "device", None) | |
| has_cuda = device is not None and str(device).startswith("cuda") | |
| if not has_cuda: | |
| logger.warning("No CUDA device found for profiling; GPU timings will be 0") | |
| # Open video | |
| reader = VideoReader(video_path) | |
| result.video_resolution = f"{reader.width}x{reader.height}" | |
| result.video_fps = reader.fps | |
| is_decomposable = detector_name in _DECOMPOSABLE_DETECTORS | |
| # Reset CUDA peak memory | |
| if has_cuda: | |
| torch.cuda.reset_peak_memory_stats() | |
| torch.cuda.synchronize() | |
| frame_timings: List[FrameTiming] = [] | |
| frame_idx = 0 | |
| for frame in reader: | |
| if frame_idx >= max_frames: | |
| break | |
| # Decode timing | |
| t_decode_start = time.perf_counter() | |
| # frame is already decoded by VideoReader, so decode = iteration time | |
| # We measure it before predict for consistency | |
| decode_ms = 0.0 # Measured below | |
| if frame_idx < warmup_frames: | |
| # Warmup: run prediction but don't record | |
| if is_decomposable: | |
| _profile_decomposed(detector, frame, queries) | |
| else: | |
| _profile_opaque(detector, frame, queries) | |
| frame_idx += 1 | |
| continue | |
| # Time the decode (approximated as read time for next frame) | |
| t_before = time.perf_counter() | |
| # Profile prediction | |
| if is_decomposable: | |
| timing = _profile_decomposed(detector, frame, queries) | |
| else: | |
| timing = _profile_opaque(detector, frame, queries) | |
| timing.frame_idx = frame_idx | |
| # decode_ms is effectively 0 here since VideoReader pre-decoded; | |
| # for a real decode benchmark we'd time cv2.read separately. | |
| # We'll measure a representative decode cost from the first non-warmup frame. | |
| if frame_idx == warmup_frames: | |
| # Benchmark decode cost: re-read one frame | |
| cap = cv2.VideoCapture(video_path) | |
| if cap.isOpened(): | |
| td0 = time.perf_counter() | |
| cap.read() | |
| timing.decode_ms = (time.perf_counter() - td0) * 1000 | |
| cap.release() | |
| else: | |
| # Approximate: use same decode cost as first frame | |
| if frame_timings: | |
| timing.decode_ms = frame_timings[0].decode_ms | |
| frame_timings.append(timing) | |
| frame_idx += 1 | |
| reader.close() | |
| # Aggregate results | |
| result.total_frames = frame_idx | |
| result.profiled_frames = len(frame_timings) | |
| result.frame_timings = frame_timings | |
| if frame_timings: | |
| result.decode_stats = TimingStats.from_samples([t.decode_ms for t in frame_timings]) | |
| result.preprocess_stats = TimingStats.from_samples([t.preprocess_ms for t in frame_timings]) | |
| transfer_samples = [t.transfer_ms for t in frame_timings if t.transfer_ms >= 0] | |
| result.transfer_stats = TimingStats.from_samples(transfer_samples) | |
| result.gpu_kernel_stats = TimingStats.from_samples([t.gpu_kernel_ms for t in frame_timings]) | |
| result.postprocess_stats = TimingStats.from_samples([t.postprocess_ms for t in frame_timings]) | |
| result.total_stats = TimingStats.from_samples([t.total_ms for t in frame_timings]) | |
| result.avg_fps = 1000.0 / result.total_stats.mean_ms if result.total_stats.mean_ms > 0 else 0 | |
| result.avg_detections_per_frame = statistics.mean([t.num_detections for t in frame_timings]) | |
| # GPU memory | |
| if has_cuda: | |
| torch.cuda.synchronize() | |
| result.gpu_peak_memory_mb = round(torch.cuda.max_memory_allocated() / (1024 ** 2), 1) | |
| result.gpu_allocated_mb = round(torch.cuda.memory_allocated() / (1024 ** 2), 1) | |
| return result | |
| def run_profiled_segmentation( | |
| video_path: str, | |
| segmenter_name: str, | |
| queries: List[str], | |
| max_frames: int = 100, | |
| step: int = 60, | |
| num_maskmem: Optional[int] = None, | |
| ) -> ProfilingResult: | |
| """Run profiled segmentation (GSAM2) on a video file. | |
| Profiles the GSAM2 stages: GDINO keyframe detection, | |
| SAM2 image prediction, SAM2 video propagation. | |
| """ | |
| import tempfile | |
| import os | |
| result = ProfilingResult( | |
| detector_name=segmenter_name, | |
| mode="segmentation", | |
| warmup_frames=0, | |
| ) | |
| # Open video for metadata | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {video_path}") | |
| result.video_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| result.video_resolution = f"{int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}" | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| cap.release() | |
| result.total_frames = min(total, max_frames) | |
| has_cuda = torch.cuda.is_available() | |
| if has_cuda: | |
| torch.cuda.reset_peak_memory_stats() | |
| torch.cuda.synchronize() | |
| # Run GSAM2 with perf metrics | |
| import threading | |
| metrics = { | |
| "end_to_end_ms": 0.0, | |
| "frame_extraction_ms": 0.0, | |
| "model_load_ms": 0.0, | |
| "init_state_ms": 0.0, | |
| "tracking_total_ms": 0.0, | |
| "gdino_total_ms": 0.0, | |
| "sam_image_total_ms": 0.0, | |
| "sam_video_total_ms": 0.0, | |
| "id_reconciliation_ms": 0.0, | |
| "render_total_ms": 0.0, | |
| "writer_total_ms": 0.0, | |
| "gpu_peak_mem_mb": 0.0, | |
| } | |
| lock = threading.Lock() | |
| fd, output_path = tempfile.mkstemp(prefix="profile_seg_", suffix=".mp4") | |
| os.close(fd) | |
| try: | |
| from inference import run_grounded_sam2_tracking | |
| run_grounded_sam2_tracking( | |
| video_path, | |
| output_path, | |
| queries, | |
| segmenter_name=segmenter_name, | |
| step=step, | |
| enable_gpt=False, | |
| max_frames=max_frames, | |
| _perf_metrics=metrics, | |
| _perf_lock=lock, | |
| num_maskmem=num_maskmem, | |
| ) | |
| except Exception as e: | |
| logger.error("Profiled segmentation failed: %s", e) | |
| raise | |
| finally: | |
| try: | |
| os.remove(output_path) | |
| except OSError: | |
| pass | |
| # Convert GSAM2 metrics to FrameTiming-like structure | |
| n_frames = result.total_frames | |
| n_keyframes = max(1, n_frames // step) | |
| # Create synthetic per-frame timings from aggregate metrics | |
| if n_frames > 0: | |
| avg_gdino = metrics["gdino_total_ms"] / n_keyframes if n_keyframes else 0 | |
| avg_sam_img = metrics["sam_image_total_ms"] / n_keyframes if n_keyframes else 0 | |
| avg_sam_vid = metrics["sam_video_total_ms"] / max(1, n_frames - n_keyframes) | |
| avg_render = metrics["render_total_ms"] / n_frames | |
| for i in range(n_frames): | |
| ft = FrameTiming(frame_idx=i) | |
| is_keyframe = (i % step == 0) | |
| if is_keyframe: | |
| ft.preprocess_ms = avg_gdino | |
| ft.gpu_kernel_ms = avg_sam_img | |
| else: | |
| ft.gpu_kernel_ms = avg_sam_vid | |
| ft.postprocess_ms = avg_render | |
| ft.decode_ms = metrics["frame_extraction_ms"] / n_frames | |
| ft.total_ms = ft.decode_ms + ft.preprocess_ms + ft.gpu_kernel_ms + ft.postprocess_ms | |
| result.frame_timings.append(ft) | |
| result.profiled_frames = len(result.frame_timings) | |
| if result.frame_timings: | |
| result.decode_stats = TimingStats.from_samples([t.decode_ms for t in result.frame_timings]) | |
| result.preprocess_stats = TimingStats.from_samples([t.preprocess_ms for t in result.frame_timings]) | |
| result.gpu_kernel_stats = TimingStats.from_samples([t.gpu_kernel_ms for t in result.frame_timings]) | |
| result.postprocess_stats = TimingStats.from_samples([t.postprocess_ms for t in result.frame_timings]) | |
| result.total_stats = TimingStats.from_samples([t.total_ms for t in result.frame_timings]) | |
| result.avg_fps = 1000.0 / result.total_stats.mean_ms if result.total_stats.mean_ms > 0 else 0 | |
| # Additional GSAM2-specific metrics stored as metadata | |
| result._gsam2_metrics = metrics # type: ignore[attr-defined] | |
| if has_cuda: | |
| torch.cuda.synchronize() | |
| result.gpu_peak_memory_mb = max( | |
| round(torch.cuda.max_memory_allocated() / (1024 ** 2), 1), | |
| metrics.get("gpu_peak_mem_mb", 0), | |
| ) | |
| result.gpu_allocated_mb = round(torch.cuda.memory_allocated() / (1024 ** 2), 1) | |
| return result | |