| |
| """ |
| Video Labeling Pipeline for GENMO Training Data |
| |
| Automatically labels video footage to identify clips suitable for GENMO motion capture: |
| - Single person in frame (no multi-person scenes) |
| - Person consistently visible |
| - Filters out false positives (posters, stickers) via motion analysis |
| |
| Usage: |
| python label_videos.py --video path/to/video.mp4 --output labels.json |
| python label_videos.py --video-dir path/to/videos/ --output labels.json |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import time |
| import cv2 |
| import torch |
| import numpy as np |
| from PIL import Image |
| from tqdm import tqdm |
| from dataclasses import dataclass, asdict |
| from typing import List, Dict, Optional, Tuple, Iterator, Callable |
| from collections import defaultdict |
|
|
| |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "third_party", "GVHMR")) |
|
|
| from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection |
|
|
|
|
| @dataclass |
| class Detection: |
| """Single detection in a frame.""" |
| bbox_xyxy: List[float] |
| confidence: float |
| area_pct: float |
|
|
|
|
| @dataclass |
| class Segment: |
| """A continuous segment of video with labeling info.""" |
| start_sec: float |
| end_sec: float |
| dynamic_persons: int |
| static_detections: int |
| avg_confidence: float |
| avg_bbox_area_pct: float |
| bbox_variance: float |
| usable: bool |
| reason: Optional[str] = None |
|
|
|
|
| @dataclass |
| class TrackState: |
| """Streaming track state with running variance.""" |
| last_bbox: List[float] |
| last_ts: float |
| count: int |
| mean_center: np.ndarray |
| m2_center: np.ndarray |
| mean_size: np.ndarray |
| m2_size: np.ndarray |
|
|
|
|
| class VitPoseValidator: |
| """Validate that a bbox contains a complete person using ViTPose joints.""" |
|
|
| HEAD_KP = {0, 1, 2, 3, 4} |
| EXCLUDE_KP = {0, 1, 2, 3, 4} |
| UPPER_KP = {5, 6, 7, 8, 9, 10} |
| LOWER_KP = {11, 12, 13, 14, 15, 16} |
|
|
| def __init__( |
| self, |
| config_path: str, |
| ckpt_path: str, |
| device: str, |
| min_joints: int, |
| conf_threshold: float, |
| require_upper_lower: bool, |
| min_vertical_span: float, |
| require_head: bool = True |
| ): |
| try: |
| from mmpose.apis import init_model, inference_topdown |
| except Exception as exc: |
| raise RuntimeError(f"mmpose not available: {exc}") from exc |
|
|
| if not os.path.exists(config_path): |
| raise RuntimeError(f"ViTPose config not found: {config_path}") |
| if not os.path.exists(ckpt_path): |
| raise RuntimeError(f"ViTPose checkpoint not found: {ckpt_path}") |
|
|
| self._inference_topdown = inference_topdown |
| self.pose = init_model(config_path, ckpt_path, device=device) |
| self.pose.eval() |
| self.min_joints = int(min_joints) |
| self.conf_threshold = float(conf_threshold) |
| self.require_upper_lower = bool(require_upper_lower) |
| self.min_vertical_span = float(min_vertical_span) |
| self.require_head = bool(require_head) |
|
|
| @torch.no_grad() |
| def is_complete(self, frame_rgb: np.ndarray, bbox_xyxy: List[float]) -> bool: |
| frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) |
| x1, y1, x2, y2 = bbox_xyxy |
| bbox = np.array([[x1, y1, x2, y2]], dtype=np.float32) |
|
|
| results = self._inference_topdown(self.pose, frame_bgr, bboxes=bbox) |
| if not results: |
| return False |
|
|
| pred = results[0].pred_instances |
| scores = np.asarray(pred.keypoint_scores[0]).reshape(-1) |
| keypoints = np.asarray(pred.keypoints[0]).reshape(-1, 2) |
| count = 0 |
| upper_count = 0 |
| lower_count = 0 |
| head_visible = False |
| ys = [] |
|
|
| for idx, score in enumerate(scores): |
| |
| if idx in self.HEAD_KP and float(score) >= self.conf_threshold: |
| head_visible = True |
| if idx in self.EXCLUDE_KP: |
| continue |
| if float(score) >= self.conf_threshold: |
| count += 1 |
| if idx in self.UPPER_KP: |
| upper_count += 1 |
| if idx in self.LOWER_KP: |
| lower_count += 1 |
| ys.append(float(keypoints[idx][1])) |
|
|
| if self.require_head and not head_visible: |
| return False |
| if count < self.min_joints: |
| return False |
| if self.require_upper_lower and (upper_count == 0 or lower_count == 0): |
| return False |
| if self.min_vertical_span > 0.0 and len(ys) >= 2: |
| span = max(ys) - min(ys) |
| bbox_h = max(1.0, float(y2) - float(y1)) |
| if (span / bbox_h) < self.min_vertical_span: |
| return False |
|
|
| return True |
|
|
|
|
| class VideoLabeler: |
| """Labels videos for GENMO training suitability.""" |
| |
| |
| STATIC_VARIANCE_THRESHOLD = 50.0 |
| MIN_CONFIDENCE = 0.4 |
| MIN_BBOX_AREA_PCT = 0.01 |
| MAX_BBOX_JUMP_RATIO = 0.5 |
| MIN_SEGMENT_DURATION = 10.0 |
| DUPLICATE_OVERLAP_THRESHOLD = 0.1 |
| DUPLICATE_IOU_THRESHOLD = 0.2 |
| DUPLICATE_CENTER_RATIO = 0.75 |
| DUPLICATE_CENTER_ONLY_RATIO = 0.35 |
| DUPLICATE_AREA_RATIO = 3.0 |
| MULTI_PERSON_MIN_AREA_PCT = 0.08 |
| MULTI_PERSON_REL_AREA = 0.35 |
| LOW_CONF_SMOOTH_MAX_SEC = 2.0 |
|
|
| def __init__( |
| self, |
| sample_fps: float = 1.0, |
| debug_dir: Optional[str] = None, |
| debug_all: bool = False, |
| vitpose_filter: bool = True, |
| vitpose_filter_all: bool = True, |
| vitpose_min_joints: int = 4, |
| vitpose_conf_threshold: float = 0.3, |
| vitpose_require_upper_lower: bool = True, |
| vitpose_min_vertical_span: float = 0.35, |
| vitpose_config: Optional[str] = None, |
| vitpose_ckpt: Optional[str] = None |
| ): |
| self.sample_fps = sample_fps |
| self.debug_dir = debug_dir |
| self.debug_all = debug_all |
| self.vitpose_filter = vitpose_filter |
| self.vitpose_filter_all = vitpose_filter_all |
| self.vitpose_min_joints = vitpose_min_joints |
| self.vitpose_conf_threshold = vitpose_conf_threshold |
| self.vitpose_require_upper_lower = vitpose_require_upper_lower |
| self.vitpose_min_vertical_span = vitpose_min_vertical_span |
| self.vitpose_config = vitpose_config |
| self.vitpose_ckpt = vitpose_ckpt |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.vitpose_validator = None |
| |
| |
| if torch.cuda.is_available() and torch.cuda.get_device_properties(0).major >= 8: |
| self.dtype = torch.bfloat16 |
| else: |
| self.dtype = torch.float16 |
| |
| print(f"[Labeler] Device: {self.device}, Precision: {self.dtype}") |
| |
| |
| self._init_dino() |
| self._init_vitpose() |
|
|
| def _init_vitpose(self): |
| """Initialize ViTPose for validation if enabled.""" |
| if not self.vitpose_filter: |
| return |
|
|
| config_path = self.vitpose_config or os.path.join( |
| os.path.dirname(__file__), |
| "..", |
| "third_party", |
| "GVHMR", |
| "mmpose", |
| "configs", |
| "body_2d_keypoint", |
| "topdown_heatmap", |
| "coco", |
| "vitpose_huge_finetune.py" |
| ) |
| ckpt_path = self.vitpose_ckpt or os.path.join( |
| os.path.dirname(__file__), |
| "..", |
| "third_party", |
| "GVHMR", |
| "work_dirs", |
| "best_coco_AP_epoch_1.pth" |
| ) |
|
|
| try: |
| self.vitpose_validator = VitPoseValidator( |
| config_path=config_path, |
| ckpt_path=ckpt_path, |
| device=self.device, |
| min_joints=self.vitpose_min_joints, |
| conf_threshold=self.vitpose_conf_threshold, |
| require_upper_lower=self.vitpose_require_upper_lower, |
| min_vertical_span=self.vitpose_min_vertical_span |
| ) |
| print("[Labeler] ViTPose validation enabled") |
| except Exception as exc: |
| print(f"[Labeler] ViTPose validation disabled: {exc}") |
| self.vitpose_validator = None |
| |
| def _init_dino(self): |
| """Initialize Grounding DINO model.""" |
| model_id = "IDEA-Research/grounding-dino-tiny" |
| cache_dir = os.path.abspath( |
| os.path.join(os.path.dirname(__file__), "..", "third_party", "GVHMR", ".cache", "huggingface") |
| ) |
| os.makedirs(cache_dir, exist_ok=True) |
| |
| try: |
| self.processor = AutoProcessor.from_pretrained( |
| model_id, local_files_only=True, cache_dir=cache_dir |
| ) |
| self.model = AutoModelForZeroShotObjectDetection.from_pretrained( |
| model_id, local_files_only=True, cache_dir=cache_dir |
| ).to(self.device) |
| print("[Labeler] Loaded Grounding DINO from cache") |
| except Exception: |
| print("[Labeler] Downloading Grounding DINO...") |
| self.processor = AutoProcessor.from_pretrained(model_id, cache_dir=cache_dir) |
| self.model = AutoModelForZeroShotObjectDetection.from_pretrained( |
| model_id, cache_dir=cache_dir |
| ).to(self.device) |
| |
| self.text_prompt = "person." |
| self.box_threshold = 0.35 |
| self.text_threshold = 0.3 |
| |
| def _iter_sampled_frames( |
| self, |
| video_path: str, |
| end_time: Optional[float] = None |
| ) -> Tuple[Tuple[int, int], Optional[float], Iterator[Tuple[np.ndarray, float]]]: |
| """Stream sampled frames at target FPS without loading all frames into memory.""" |
| import subprocess |
|
|
| probe_cmd = [ |
| 'ffprobe', '-v', 'error', |
| '-select_streams', 'v:0', |
| '-show_entries', 'stream=width,height,duration', |
| '-of', 'csv=p=0', |
| video_path |
| ] |
| try: |
| result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) |
| parts = result.stdout.strip().split(',') |
| width = int(parts[0]) |
| height = int(parts[1]) |
| duration = float(parts[2]) if len(parts) > 2 and parts[2] else None |
| except Exception as e: |
| print(f"[Labeler] ffprobe failed: {e}, falling back to OpenCV for metadata") |
| cap = cv2.VideoCapture(video_path) |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 |
| duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps if fps > 0 else None |
| cap.release() |
|
|
| if duration and end_time is not None: |
| duration = min(duration, end_time) |
|
|
| ffmpeg_cmd = [ |
| 'ffmpeg', '-v', 'warning', '-nostdin', |
| '-i', video_path, |
| '-vf', f'fps={self.sample_fps}', |
| '-f', 'rawvideo', |
| '-pix_fmt', 'rgb24', |
| ] |
| if end_time is not None: |
| ffmpeg_cmd += ['-t', str(end_time)] |
| ffmpeg_cmd.append('pipe:1') |
|
|
| print(f"[Labeler] Streaming frames at {self.sample_fps} fps using ffmpeg...") |
| process = subprocess.Popen( |
| ffmpeg_cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.DEVNULL |
| ) |
|
|
| frame_size = width * height * 3 |
|
|
| def iterator(): |
| idx = 0 |
| try: |
| while True: |
| raw = process.stdout.read(frame_size) |
| if raw is None or len(raw) < frame_size: |
| break |
| frame = np.frombuffer(raw, np.uint8).reshape((height, width, 3)) |
| ts = idx / self.sample_fps |
| idx += 1 |
| yield frame, ts |
| finally: |
| if process.stdout: |
| process.stdout.close() |
| process.wait() |
|
|
| return (width, height), duration, iterator() |
| |
| def _nms(self, detections: List[Detection], iou_threshold: float = 0.5) -> List[Detection]: |
| """Apply Non-Maximum Suppression to filter overlapping/contained detections.""" |
| if len(detections) <= 1: |
| return detections |
| |
| |
| sorted_dets = sorted(detections, key=lambda d: d.confidence, reverse=True) |
| |
| def compute_iou(box1, box2): |
| x1 = max(box1[0], box2[0]) |
| y1 = max(box1[1], box2[1]) |
| x2 = min(box1[2], box2[2]) |
| y2 = min(box1[3], box2[3]) |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| union_area = box1_area + box2_area - inter_area |
| return inter_area / union_area if union_area > 0 else 0 |
|
|
| def compute_overlap_small(box1, box2): |
| """Intersection over smaller area; higher when one box sits on the same person.""" |
| x1 = max(box1[0], box2[0]) |
| y1 = max(box1[1], box2[1]) |
| x2 = min(box1[2], box2[2]) |
| y2 = min(box1[3], box2[3]) |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| small_area = min(box1_area, box2_area) |
| return inter_area / small_area if small_area > 0 else 0 |
|
|
| def box_diag(box): |
| width = max(0, box[2] - box[0]) |
| height = max(0, box[3] - box[1]) |
| return np.sqrt(width * width + height * height) |
|
|
| def is_contained(box_small, box_large, threshold=0.7): |
| """Check if box_small is mostly contained within box_large.""" |
| x1 = max(box_small[0], box_large[0]) |
| y1 = max(box_small[1], box_large[1]) |
| x2 = min(box_small[2], box_large[2]) |
| y2 = min(box_small[3], box_large[3]) |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| small_area = (box_small[2] - box_small[0]) * (box_small[3] - box_small[1]) |
| if small_area <= 0: |
| return False |
| return (inter_area / small_area) >= threshold |
|
|
| def is_near_duplicate(box1, box2, overlap_threshold=0.3, center_ratio=0.5): |
| """Suppress boxes that likely describe the same person with weak IoU.""" |
| overlap_small = compute_overlap_small(box1, box2) |
| if overlap_small < overlap_threshold: |
| return False |
| c1 = ((box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2) |
| c2 = ((box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2) |
| dist = np.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2) |
| max_diag = max(box_diag(box1), box_diag(box2)) |
| return dist <= (center_ratio * max_diag) |
| |
| keep = [] |
| while sorted_dets: |
| best = sorted_dets.pop(0) |
| keep.append(best) |
| |
| sorted_dets = [d for d in sorted_dets |
| if compute_iou(best.bbox_xyxy, d.bbox_xyxy) < iou_threshold |
| and not is_contained(d.bbox_xyxy, best.bbox_xyxy) |
| and not is_near_duplicate(best.bbox_xyxy, d.bbox_xyxy)] |
| |
| return keep |
|
|
| def _dedupe_nearby( |
| self, |
| detections: List[Detection], |
| overlap_threshold: Optional[float] = None, |
| iou_threshold: Optional[float] = None, |
| center_ratio: Optional[float] = None, |
| center_only_ratio: Optional[float] = None, |
| area_ratio: Optional[float] = None |
| ) -> List[Detection]: |
| """Merge nearby detections that likely describe the same person.""" |
| if len(detections) <= 1: |
| return detections |
|
|
| overlap_threshold = self.DUPLICATE_OVERLAP_THRESHOLD if overlap_threshold is None else overlap_threshold |
| iou_threshold = self.DUPLICATE_IOU_THRESHOLD if iou_threshold is None else iou_threshold |
| center_ratio = self.DUPLICATE_CENTER_RATIO if center_ratio is None else center_ratio |
| center_only_ratio = 0.0 if center_only_ratio is None else center_only_ratio |
| area_ratio = self.DUPLICATE_AREA_RATIO if area_ratio is None else area_ratio |
|
|
| def compute_iou(box1, box2): |
| x1 = max(box1[0], box2[0]) |
| y1 = max(box1[1], box2[1]) |
| x2 = min(box1[2], box2[2]) |
| y2 = min(box1[3], box2[3]) |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| union_area = box1_area + box2_area - inter_area |
| return inter_area / union_area if union_area > 0 else 0 |
|
|
| def compute_overlap_small(box1, box2): |
| x1 = max(box1[0], box2[0]) |
| y1 = max(box1[1], box2[1]) |
| x2 = min(box1[2], box2[2]) |
| y2 = min(box1[3], box2[3]) |
| inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| small_area = min(box1_area, box2_area) |
| return inter_area / small_area if small_area > 0 else 0 |
|
|
| def box_diag(box): |
| width = max(0, box[2] - box[0]) |
| height = max(0, box[3] - box[1]) |
| return np.sqrt(width * width + height * height) |
|
|
| def center_distance(box1, box2): |
| c1 = ((box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2) |
| c2 = ((box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2) |
| return np.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2) |
|
|
| n = len(detections) |
| parent = list(range(n)) |
|
|
| def find(x): |
| while parent[x] != x: |
| parent[x] = parent[parent[x]] |
| x = parent[x] |
| return x |
|
|
| def union(a, b): |
| ra, rb = find(a), find(b) |
| if ra != rb: |
| parent[rb] = ra |
|
|
| def should_merge(box1, box2): |
| iou = compute_iou(box1, box2) |
| overlap_small = compute_overlap_small(box1, box2) |
| max_diag = max(box_diag(box1), box_diag(box2)) |
| if max_diag <= 0: |
| return False |
| dist = center_distance(box1, box2) |
| if iou >= iou_threshold or overlap_small >= overlap_threshold: |
| return dist <= (center_ratio * max_diag) |
| if center_only_ratio > 0.0: |
| area1 = max(0.0, (box1[2] - box1[0]) * (box1[3] - box1[1])) |
| area2 = max(0.0, (box2[2] - box2[0]) * (box2[3] - box2[1])) |
| if area1 <= 0 or area2 <= 0: |
| return False |
| ratio = max(area1, area2) / min(area1, area2) |
| if ratio <= area_ratio: |
| return dist <= (center_only_ratio * max_diag) |
| return False |
|
|
| for i in range(n): |
| box_i = detections[i].bbox_xyxy |
| for j in range(i + 1, n): |
| box_j = detections[j].bbox_xyxy |
| if should_merge(box_i, box_j): |
| union(i, j) |
|
|
| best_by_root = {} |
| for idx, det in enumerate(detections): |
| root = find(idx) |
| if root not in best_by_root or det.confidence > best_by_root[root].confidence: |
| best_by_root[root] = det |
|
|
| return list(best_by_root.values()) |
| |
| def _detect_frame(self, frame: np.ndarray, width: int, height: int) -> List[Detection]: |
| """Run DINO detection on a single frame.""" |
| frame_area = width * height |
| img = Image.fromarray(frame) |
|
|
| with torch.inference_mode(): |
| inputs = self.processor( |
| images=img, |
| text=self.text_prompt, |
| return_tensors="pt" |
| ).to(self.device) |
|
|
| outputs = self.model(**inputs) |
|
|
| results = self.processor.post_process_grounded_object_detection( |
| outputs, |
| inputs.input_ids, |
| threshold=self.box_threshold, |
| text_threshold=self.text_threshold, |
| target_sizes=[img.size[::-1]] |
| ) |
|
|
| frame_dets = [] |
| if len(results) > 0 and 'boxes' in results[0]: |
| boxes = results[0]['boxes'].cpu().numpy() |
| scores = results[0]['scores'].cpu().numpy() |
|
|
| for box, score in zip(boxes, scores): |
| x1, y1, x2, y2 = box |
| area = (x2 - x1) * (y2 - y1) |
| area_pct = area / frame_area |
|
|
| if area_pct < self.MIN_BBOX_AREA_PCT: |
| continue |
|
|
| frame_dets.append(Detection( |
| bbox_xyxy=[float(x1), float(y1), float(x2), float(y2)], |
| confidence=float(score), |
| area_pct=float(area_pct) |
| )) |
|
|
| frame_dets = self._nms(frame_dets, iou_threshold=0.5) |
| frame_dets = self._dedupe_nearby(frame_dets) |
| return frame_dets |
|
|
| def _detect_batch(self, frames: List[np.ndarray], width: int, height: int) -> List[List[Detection]]: |
| """Run DINO detection on a list of frames.""" |
| all_detections = [] |
| for frame in tqdm(frames, desc="DINO detection"): |
| all_detections.append(self._detect_frame(frame, width, height)) |
| return all_detections |
|
|
| def _save_debug_frame( |
| self, |
| frame: np.ndarray, |
| frame_idx: int, |
| timestamp: float, |
| detections: List[Detection], |
| out_dir: str, |
| save_all: bool = False |
| ) -> Optional[Dict]: |
| """Save a single debug frame with detection boxes drawn.""" |
| if not save_all and len(detections) <= 1: |
| return None |
|
|
| frame_dir = os.path.join(out_dir, "frames") |
| os.makedirs(frame_dir, exist_ok=True) |
|
|
| colors = [ |
| (0, 255, 0), |
| (0, 0, 255), |
| (255, 0, 0), |
| (0, 255, 255), |
| (255, 0, 255), |
| (255, 255, 0), |
| ] |
|
|
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) |
| h, w = frame_bgr.shape[:2] |
|
|
| for det_idx, det in enumerate(detections): |
| x1, y1, x2, y2 = det.bbox_xyxy |
| x1 = int(max(0, min(w - 1, round(x1)))) |
| y1 = int(max(0, min(h - 1, round(y1)))) |
| x2 = int(max(0, min(w - 1, round(x2)))) |
| y2 = int(max(0, min(h - 1, round(y2)))) |
|
|
| color = colors[det_idx % len(colors)] |
| cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), color, 2) |
| label = f"{det_idx} {det.confidence:.2f}" |
| cv2.putText( |
| frame_bgr, |
| label, |
| (x1 + 4, max(10, y1 - 6)), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 0.5, |
| color, |
| 1, |
| cv2.LINE_AA |
| ) |
|
|
| filename = f"frame_{frame_idx:06d}_t{timestamp:.2f}_n{len(detections)}.jpg" |
| out_path = os.path.join(frame_dir, filename) |
| cv2.imwrite(out_path, frame_bgr) |
|
|
| return { |
| "frame_idx": frame_idx, |
| "timestamp": float(timestamp), |
| "num_detections": len(detections), |
| "image": filename, |
| "detections": [ |
| { |
| "bbox_xyxy": det.bbox_xyxy, |
| "confidence": det.confidence, |
| "area_pct": det.area_pct |
| } for det in detections |
| ] |
| } |
|
|
| def _filter_frame_detections_with_vitpose( |
| self, |
| frame: np.ndarray, |
| detections: List[Detection] |
| ) -> List[Detection]: |
| """Filter detections that look like partial people (head/limbs).""" |
| if not self.vitpose_validator or not detections: |
| return detections |
| if not self.vitpose_filter_all and len(detections) <= 1: |
| return detections |
|
|
| keep = [] |
| for det in detections: |
| if self.vitpose_validator.is_complete(frame, det.bbox_xyxy): |
| keep.append(det) |
| return keep |
| |
| def _build_tracks(self, detections: List[List[Detection]], timestamps: List[float], |
| img_width: int = 1920, img_height: int = 1080) -> Dict[int, Dict]: |
| """Build detection tracks over time using center distance matching. |
| |
| IoU-based tracking fails at 1fps because the person moves too much. |
| Instead, use center distance - match to the nearest previous detection. |
| """ |
| tracks = {} |
| next_track_id = 0 |
| active_tracks = {} |
| |
| |
| img_diagonal = np.sqrt(img_width**2 + img_height**2) |
| MAX_DISTANCE = img_diagonal * 0.5 |
| |
| def bbox_center(box): |
| return ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2) |
| |
| def center_distance(box1, box2): |
| c1 = bbox_center(box1) |
| c2 = bbox_center(box2) |
| return np.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2) |
| |
| for frame_idx, (frame_dets, ts) in enumerate(zip(detections, timestamps)): |
| matched_tracks = set() |
| unmatched_dets = list(range(len(frame_dets))) |
| |
| |
| for track_id, last_bbox in list(active_tracks.items()): |
| best_dist = float('inf') |
| best_det_idx = None |
| |
| for det_idx in unmatched_dets: |
| dist = center_distance(last_bbox, frame_dets[det_idx].bbox_xyxy) |
| if dist < best_dist and dist <= MAX_DISTANCE: |
| best_dist = dist |
| best_det_idx = det_idx |
| |
| if best_det_idx is not None: |
| det = frame_dets[best_det_idx] |
| tracks[track_id]['timestamps'].append(ts) |
| tracks[track_id]['bboxes'].append(det.bbox_xyxy) |
| tracks[track_id]['confidences'].append(det.confidence) |
| tracks[track_id]['areas'].append(det.area_pct) |
| active_tracks[track_id] = det.bbox_xyxy |
| matched_tracks.add(track_id) |
| unmatched_dets.remove(best_det_idx) |
| |
| |
| for det_idx in unmatched_dets: |
| det = frame_dets[det_idx] |
| tracks[next_track_id] = { |
| 'timestamps': [ts], |
| 'bboxes': [det.bbox_xyxy], |
| 'confidences': [det.confidence], |
| 'areas': [det.area_pct] |
| } |
| active_tracks[next_track_id] = det.bbox_xyxy |
| next_track_id += 1 |
| |
| |
| stale_threshold = 3.0 |
| for track_id in list(active_tracks.keys()): |
| if track_id not in matched_tracks: |
| last_ts = tracks[track_id]['timestamps'][-1] |
| if ts - last_ts > stale_threshold: |
| del active_tracks[track_id] |
| |
| return tracks |
|
|
| def _is_dynamic_track(self, track: TrackState) -> bool: |
| """Decide dynamic/static using running variance.""" |
| if track.count < 3: |
| return True |
| center_var = (track.m2_center / max(1, track.count - 1)).sum() |
| size_var = (track.m2_size / max(1, track.count - 1)).sum() |
| total_variance = center_var + size_var |
| return total_variance >= self.STATIC_VARIANCE_THRESHOLD |
|
|
| def _update_track_stats(self, track: TrackState, bbox_xyxy: List[float]) -> None: |
| """Update running mean/variance for a track.""" |
| x1, y1, x2, y2 = bbox_xyxy |
| center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32) |
| size = np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32) |
|
|
| track.count += 1 |
| delta_c = center - track.mean_center |
| track.mean_center += delta_c / track.count |
| track.m2_center += delta_c * (center - track.mean_center) |
|
|
| delta_s = size - track.mean_size |
| track.mean_size += delta_s / track.count |
| track.m2_size += delta_s * (size - track.mean_size) |
| |
| def _classify_tracks(self, tracks: Dict[int, Dict]) -> Tuple[List[int], List[int]]: |
| """Classify tracks as dynamic (real person) or static (poster/sticker).""" |
| dynamic_tracks = [] |
| static_tracks = [] |
| |
| for track_id, track in tracks.items(): |
| bboxes = np.array(track['bboxes']) |
| |
| if len(bboxes) < 3: |
| |
| dynamic_tracks.append(track_id) |
| continue |
| |
| |
| centers = (bboxes[:, :2] + bboxes[:, 2:]) / 2 |
| center_variance = np.var(centers, axis=0).sum() |
| |
| |
| sizes = bboxes[:, 2:] - bboxes[:, :2] |
| size_variance = np.var(sizes, axis=0).sum() |
| |
| total_variance = center_variance + size_variance |
| |
| if total_variance < self.STATIC_VARIANCE_THRESHOLD: |
| static_tracks.append(track_id) |
| else: |
| dynamic_tracks.append(track_id) |
| |
| return dynamic_tracks, static_tracks |
| |
| def _create_segments( |
| self, |
| tracks: Dict[int, Dict], |
| dynamic_tracks: List[int], |
| static_tracks: List[int], |
| timestamps: List[float] |
| ) -> List[Segment]: |
| """Create time segments with labeling info.""" |
| if not timestamps: |
| return [] |
| |
| video_duration = timestamps[-1] |
| segments = [] |
| |
| |
| time_bins = defaultdict(lambda: {'dynamic': {}, 'static': set()}) |
| |
| for track_id in dynamic_tracks: |
| track = tracks[track_id] |
| for ts, bbox, conf, area in zip( |
| track['timestamps'], |
| track['bboxes'], |
| track['confidences'], |
| track['areas'] |
| ): |
| sec = int(ts) |
| det = Detection( |
| bbox_xyxy=list(bbox), |
| confidence=float(conf), |
| area_pct=float(area) |
| ) |
| existing = time_bins[sec]['dynamic'].get(track_id) |
| if existing is None or det.confidence > existing.confidence: |
| time_bins[sec]['dynamic'][track_id] = det |
| |
| for track_id in static_tracks: |
| track = tracks[track_id] |
| for ts in track['timestamps']: |
| sec = int(ts) |
| time_bins[sec]['static'].add(track_id) |
| |
| |
| |
| import math |
| max_sec = math.ceil(video_duration) |
| current_segment = None |
| |
| for sec in range(max_sec + 1): |
| bin_data = time_bins.get(sec, {'dynamic': {}, 'static': set()}) |
| detections = list(bin_data['dynamic'].values()) |
| detections = self._dedupe_nearby( |
| detections, |
| center_only_ratio=self.DUPLICATE_CENTER_ONLY_RATIO, |
| area_ratio=self.DUPLICATE_AREA_RATIO |
| ) |
| n_dynamic = len(detections) |
| n_static = len(bin_data['static']) |
| avg_conf = np.mean([d.confidence for d in detections]) if detections else 0.0 |
| avg_area = np.mean([d.area_pct for d in detections]) if detections else 0.0 |
| |
| |
| usable = n_dynamic == 1 and avg_conf >= self.MIN_CONFIDENCE and avg_area >= self.MIN_BBOX_AREA_PCT |
| reason = None |
| if n_dynamic == 0: |
| reason = "no_person" |
| elif n_dynamic > 1: |
| reason = "multiple_persons" |
| elif avg_conf < self.MIN_CONFIDENCE: |
| reason = "low_confidence" |
| elif avg_area < self.MIN_BBOX_AREA_PCT: |
| reason = "person_too_small" |
| |
| |
| if current_segment is None: |
| current_segment = { |
| 'start_sec': sec, |
| 'end_sec': sec + 1, |
| 'n_dynamic': n_dynamic, |
| 'n_static': n_static, |
| 'confs': [d.confidence for d in detections], |
| 'areas': [d.area_pct for d in detections], |
| 'usable': usable, |
| 'reason': reason |
| } |
| elif (current_segment['n_dynamic'] == n_dynamic and |
| current_segment['usable'] == usable and |
| current_segment['reason'] == reason): |
| |
| current_segment['end_sec'] = sec + 1 |
| current_segment['confs'].extend([d.confidence for d in detections]) |
| current_segment['areas'].extend([d.area_pct for d in detections]) |
| else: |
| |
| segments.append(self._finalize_segment(current_segment)) |
| current_segment = { |
| 'start_sec': sec, |
| 'end_sec': sec + 1, |
| 'n_dynamic': n_dynamic, |
| 'n_static': n_static, |
| 'confs': [d.confidence for d in detections], |
| 'areas': [d.area_pct for d in detections], |
| 'usable': usable, |
| 'reason': reason |
| } |
| |
| if current_segment: |
| segments.append(self._finalize_segment(current_segment)) |
| |
| |
| |
| |
| |
| return segments |
| |
| def _finalize_segment(self, seg_data: Dict) -> Segment: |
| """Convert segment data to Segment dataclass.""" |
| return Segment( |
| start_sec=float(seg_data['start_sec']), |
| end_sec=float(seg_data['end_sec']), |
| dynamic_persons=int(seg_data['n_dynamic']), |
| static_detections=int(seg_data['n_static']), |
| avg_confidence=float(np.mean(seg_data['confs'])) if seg_data['confs'] else 0.0, |
| avg_bbox_area_pct=float(np.mean(seg_data['areas'])) if seg_data['areas'] else 0.0, |
| bbox_variance=0.0, |
| usable=bool(seg_data['usable']), |
| reason=seg_data['reason'] |
| ) |
| |
| def label_video( |
| self, |
| video_path: str, |
| end_time: Optional[float] = None, |
| segment_writer: Optional[Callable[[Segment], None]] = None |
| ) -> Dict: |
| """Label a single video and return results.""" |
| print(f"\n[Labeler] Processing: {video_path}") |
| |
| |
| (width, height), duration, frame_iter = self._iter_sampled_frames(video_path, end_time=end_time) |
|
|
| frame_count = 0 |
| last_ts = None |
| debug_meta = [] |
| if self.debug_dir: |
| video_tag = os.path.splitext(os.path.basename(video_path))[0] |
| out_dir = os.path.join(self.debug_dir, video_tag) |
| os.makedirs(out_dir, exist_ok=True) |
| else: |
| out_dir = None |
|
|
| total_before = 0 |
| total_after = 0 |
|
|
| active_tracks: Dict[int, TrackState] = {} |
| next_track_id = 0 |
| img_diagonal = np.sqrt(width**2 + height**2) |
| max_distance = img_diagonal * 0.5 |
|
|
| current_sec = None |
| sec_dynamic: Dict[int, Detection] = {} |
| sec_static: set = set() |
| current_segment = None |
| segments = [] |
| usable_duration = 0.0 |
| total_segments = 0 |
| pending_segments: List[Dict] = [] |
|
|
| def emit_segment(seg_data: Dict): |
| nonlocal usable_duration, total_segments |
| segment = self._finalize_segment(seg_data) |
| total_segments += 1 |
| if segment.usable: |
| usable_duration += (segment.end_sec - segment.start_sec) |
| if segment_writer: |
| segment_writer(segment) |
| else: |
| segments.append(segment) |
|
|
| def should_merge_low_conf(prev_seg: Dict, mid_seg: Dict, next_seg: Dict) -> bool: |
| if mid_seg['reason'] != "low_confidence": |
| return False |
| if (mid_seg['end_sec'] - mid_seg['start_sec']) >= self.LOW_CONF_SMOOTH_MAX_SEC: |
| return False |
| return ( |
| prev_seg['n_dynamic'] == next_seg['n_dynamic'] |
| and prev_seg['usable'] == next_seg['usable'] |
| and prev_seg['reason'] == next_seg['reason'] |
| ) |
|
|
| def merge_triplet(prev_seg: Dict, mid_seg: Dict, next_seg: Dict) -> Dict: |
| return { |
| 'start_sec': prev_seg['start_sec'], |
| 'end_sec': next_seg['end_sec'], |
| 'n_dynamic': prev_seg['n_dynamic'], |
| 'n_static': max(prev_seg['n_static'], mid_seg['n_static'], next_seg['n_static']), |
| 'confs': prev_seg['confs'] + mid_seg['confs'] + next_seg['confs'], |
| 'areas': prev_seg['areas'] + mid_seg['areas'] + next_seg['areas'], |
| 'usable': prev_seg['usable'], |
| 'reason': prev_seg['reason'] |
| } |
|
|
| def queue_segment(seg_data: Dict): |
| pending_segments.append(seg_data) |
| while len(pending_segments) >= 3: |
| prev_seg, mid_seg, next_seg = pending_segments[0], pending_segments[1], pending_segments[2] |
| if should_merge_low_conf(prev_seg, mid_seg, next_seg): |
| merged = merge_triplet(prev_seg, mid_seg, next_seg) |
| pending_segments[:3] = [merged] |
| else: |
| emit_segment(pending_segments.pop(0)) |
|
|
| def flush_pending_segments(): |
| while pending_segments: |
| emit_segment(pending_segments.pop(0)) |
|
|
| def finalize_sec(sec_idx: int, dynamic_map: Dict[int, Detection], static_set: set): |
| nonlocal current_segment |
| detections_list = list(dynamic_map.values()) |
| detections_list = self._dedupe_nearby( |
| detections_list, |
| center_only_ratio=self.DUPLICATE_CENTER_ONLY_RATIO, |
| area_ratio=self.DUPLICATE_AREA_RATIO |
| ) |
| if len(detections_list) > 1: |
| max_area = max(d.area_pct for d in detections_list) |
| detections_list = [ |
| d for d in detections_list |
| if d.area_pct >= self.MULTI_PERSON_MIN_AREA_PCT |
| and d.area_pct >= (max_area * self.MULTI_PERSON_REL_AREA) |
| ] |
| n_dynamic = len(detections_list) |
| n_static = len(static_set) |
| avg_conf = np.mean([d.confidence for d in detections_list]) if detections_list else 0.0 |
| avg_area = np.mean([d.area_pct for d in detections_list]) if detections_list else 0.0 |
|
|
| usable = n_dynamic == 1 and avg_conf >= self.MIN_CONFIDENCE and avg_area >= self.MIN_BBOX_AREA_PCT |
| reason = None |
| if n_dynamic == 0: |
| reason = "no_person" |
| elif n_dynamic > 1: |
| reason = "multiple_persons" |
| elif avg_conf < self.MIN_CONFIDENCE: |
| reason = "low_confidence" |
| elif avg_area < self.MIN_BBOX_AREA_PCT: |
| reason = "person_too_small" |
|
|
| if current_segment is None: |
| current_segment = { |
| 'start_sec': sec_idx, |
| 'end_sec': sec_idx + 1, |
| 'n_dynamic': n_dynamic, |
| 'n_static': n_static, |
| 'confs': [d.confidence for d in detections_list], |
| 'areas': [d.area_pct for d in detections_list], |
| 'usable': usable, |
| 'reason': reason |
| } |
| elif (current_segment['n_dynamic'] == n_dynamic and |
| current_segment['usable'] == usable and |
| current_segment['reason'] == reason): |
| current_segment['end_sec'] = sec_idx + 1 |
| current_segment['confs'].extend([d.confidence for d in detections_list]) |
| current_segment['areas'].extend([d.area_pct for d in detections_list]) |
| else: |
| queue_segment(current_segment) |
| current_segment = { |
| 'start_sec': sec_idx, |
| 'end_sec': sec_idx + 1, |
| 'n_dynamic': n_dynamic, |
| 'n_static': n_static, |
| 'confs': [d.confidence for d in detections_list], |
| 'areas': [d.area_pct for d in detections_list], |
| 'usable': usable, |
| 'reason': reason |
| } |
|
|
| def finalize_missing_secs(start_sec: int, end_sec: int): |
| for missing_sec in range(start_sec, end_sec + 1): |
| finalize_sec(missing_sec, {}, set()) |
|
|
| start_time = time.time() |
| last_log = start_time |
| for idx, (frame, ts) in enumerate(frame_iter): |
| frame_count += 1 |
| last_ts = ts |
| frame_dets = self._detect_frame(frame, width, height) |
| total_before += len(frame_dets) |
| frame_dets = self._filter_frame_detections_with_vitpose(frame, frame_dets) |
| total_after += len(frame_dets) |
|
|
| |
| matched_tracks = set() |
| unmatched_dets = list(range(len(frame_dets))) |
| assignments: Dict[int, int] = {} |
|
|
| for track_id, track in list(active_tracks.items()): |
| best_dist = float('inf') |
| best_det_idx = None |
| for det_idx in unmatched_dets: |
| det = frame_dets[det_idx] |
| x1, y1, x2, y2 = det.bbox_xyxy |
| cx = (x1 + x2) * 0.5 |
| cy = (y1 + y2) * 0.5 |
| last = track.last_bbox |
| lx = (last[0] + last[2]) * 0.5 |
| ly = (last[1] + last[3]) * 0.5 |
| dist = np.sqrt((cx - lx)**2 + (cy - ly)**2) |
| if dist < best_dist and dist <= max_distance: |
| best_dist = dist |
| best_det_idx = det_idx |
|
|
| if best_det_idx is not None: |
| det = frame_dets[best_det_idx] |
| track.last_bbox = det.bbox_xyxy |
| track.last_ts = ts |
| self._update_track_stats(track, det.bbox_xyxy) |
| matched_tracks.add(track_id) |
| assignments[best_det_idx] = track_id |
| unmatched_dets.remove(best_det_idx) |
|
|
| for det_idx in unmatched_dets: |
| det = frame_dets[det_idx] |
| x1, y1, x2, y2 = det.bbox_xyxy |
| center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32) |
| size = np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32) |
| active_tracks[next_track_id] = TrackState( |
| last_bbox=det.bbox_xyxy, |
| last_ts=ts, |
| count=1, |
| mean_center=center.copy(), |
| m2_center=np.zeros_like(center), |
| mean_size=size.copy(), |
| m2_size=np.zeros_like(size) |
| ) |
| assignments[det_idx] = next_track_id |
| matched_tracks.add(next_track_id) |
| next_track_id += 1 |
|
|
| |
| stale_threshold = 3.0 |
| for track_id in list(active_tracks.keys()): |
| if track_id not in matched_tracks: |
| last_ts = active_tracks[track_id].last_ts |
| if ts - last_ts > stale_threshold: |
| del active_tracks[track_id] |
|
|
| sec = int(ts) |
| if current_sec is None: |
| current_sec = sec |
| elif sec > current_sec: |
| finalize_sec(current_sec, sec_dynamic, sec_static) |
| if sec > current_sec + 1: |
| finalize_missing_secs(current_sec + 1, sec - 1) |
| sec_dynamic = {} |
| sec_static = set() |
| current_sec = sec |
|
|
| for det_idx, det in enumerate(frame_dets): |
| track_id = assignments.get(det_idx) |
| if track_id is None or track_id not in active_tracks: |
| continue |
| if self._is_dynamic_track(active_tracks[track_id]): |
| existing = sec_dynamic.get(track_id) |
| if existing is None or det.confidence > existing.confidence: |
| sec_dynamic[track_id] = det |
| else: |
| sec_static.add(track_id) |
|
|
| if out_dir: |
| meta = self._save_debug_frame( |
| frame, |
| idx, |
| ts, |
| frame_dets, |
| out_dir, |
| save_all=self.debug_all |
| ) |
| if meta: |
| debug_meta.append(meta) |
|
|
| now = time.time() |
| if now - last_log >= 30.0: |
| elapsed = now - start_time |
| fps = frame_count / elapsed if elapsed > 0 else 0.0 |
| if duration: |
| pct = min(100.0, (ts / duration) * 100.0) if duration > 0 else 0.0 |
| print(f"[Labeler] Progress: {frame_count} frames, t={ts:.1f}s ({pct:.1f}%), {fps:.2f} fps") |
| else: |
| print(f"[Labeler] Progress: {frame_count} frames, t={ts:.1f}s, {fps:.2f} fps") |
| last_log = now |
|
|
| if current_sec is not None: |
| finalize_sec(current_sec, sec_dynamic, sec_static) |
|
|
| if current_segment: |
| queue_segment(current_segment) |
| flush_pending_segments() |
|
|
| if self.vitpose_validator: |
| print(f"[Labeler] ViTPose filtered detections: {total_before} -> {total_after}") |
|
|
| if out_dir: |
| meta_path = os.path.join(out_dir, "detections.json") |
| with open(meta_path, "w") as f: |
| json.dump(debug_meta, f, indent=2) |
|
|
| if frame_count == 0: |
| return {'video': video_path, 'error': 'No frames extracted', 'segments': []} |
| |
| |
| tracks = self._build_tracks(detections, timestamps, width, height) |
| print(f"[Labeler] Found {len(tracks)} detection tracks") |
| |
| |
| dynamic_tracks, static_tracks = self._classify_tracks(tracks) |
| print(f"[Labeler] Dynamic (person): {len(dynamic_tracks)}, Static (poster/sticker): {len(static_tracks)}") |
| |
| |
| |
| total_duration = duration if duration is not None else (last_ts if last_ts is not None else 0) |
|
|
| print(f"[Labeler] Found {usable_duration:.0f}s usable ({total_duration:.0f}s total)") |
|
|
| return { |
| 'video': os.path.abspath(video_path), |
| 'total_duration_sec': total_duration, |
| 'usable_duration_sec': usable_duration, |
| 'num_segments': total_segments if segment_writer else len(segments), |
| 'segments': [asdict(s) for s in segments] if not segment_writer else [] |
| } |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Label videos for GENMO training suitability') |
| parser.add_argument('--video', type=str, help='Path to a single video file') |
| parser.add_argument('--video-dir', type=str, help='Path to directory containing videos') |
| parser.add_argument('--output', type=str, required=True, help='Output JSON file path') |
| parser.add_argument('--sample-fps', type=float, default=1.0, help='Frames per second to sample (default: 1.0)') |
| parser.add_argument('--end-time', type=float, default=None, help='Only process first N seconds of video') |
| parser.add_argument('--debug-dir', type=str, default=None, help='Directory to save debug frames with bboxes') |
| parser.add_argument('--debug-all', action='store_true', help='Save debug frames for all detections (default: only multi-person frames)') |
| parser.add_argument('--vitpose-filter', action='store_true', help='Filter detections using ViTPose joint visibility') |
| parser.add_argument('--vitpose-filter-all', action='store_true', help='Apply ViTPose filtering to all frames') |
| parser.add_argument('--vitpose-min-joints', type=int, default=4, help='Minimum visible joints (excluding face) to keep') |
| parser.add_argument('--vitpose-conf-threshold', type=float, default=0.3, help='Minimum joint confidence for ViTPose') |
| parser.add_argument('--vitpose-disable-upper-lower', action='store_true', help='Disable upper/lower body joint requirement') |
| parser.add_argument('--vitpose-min-vertical-span', type=float, default=0.35, help='Min joint vertical span ratio within bbox') |
| parser.add_argument('--vitpose-config', type=str, default=None, help='ViTPose config path') |
| parser.add_argument('--vitpose-ckpt', type=str, default=None, help='ViTPose checkpoint path') |
| parser.add_argument('--stream-jsonl', action='store_true', help='Stream segments as JSON Lines (append)') |
| |
| args = parser.parse_args() |
| |
| if not args.video and not args.video_dir: |
| parser.error("Must specify either --video or --video-dir") |
| |
| |
| video_paths = [] |
| if args.video: |
| video_paths.append(args.video) |
| if args.video_dir: |
| for fname in os.listdir(args.video_dir): |
| if fname.endswith(('.mp4', '.avi', '.mov', '.mkv')): |
| video_paths.append(os.path.join(args.video_dir, fname)) |
| |
| print(f"[Labeler] Found {len(video_paths)} video(s) to process") |
| |
| |
| labeler = VideoLabeler( |
| sample_fps=args.sample_fps, |
| debug_dir=args.debug_dir, |
| debug_all=args.debug_all, |
| vitpose_filter=args.vitpose_filter, |
| vitpose_filter_all=args.vitpose_filter_all, |
| vitpose_min_joints=args.vitpose_min_joints, |
| vitpose_conf_threshold=args.vitpose_conf_threshold, |
| vitpose_require_upper_lower=not args.vitpose_disable_upper_lower, |
| vitpose_min_vertical_span=args.vitpose_min_vertical_span, |
| vitpose_config=args.vitpose_config, |
| vitpose_ckpt=args.vitpose_ckpt |
| ) |
| |
| |
| results = {'videos': []} |
| segment_writer = None |
| output_path = args.output |
|
|
| stream_jsonl = args.stream_jsonl or output_path.endswith(".jsonl") |
|
|
| if stream_jsonl: |
| os.makedirs(os.path.dirname(os.path.abspath(output_path)) or ".", exist_ok=True) |
| with open(output_path, "a") as f: |
| def write_segment(segment: Segment, video_path: str): |
| payload = {'video': os.path.abspath(video_path)} |
| payload.update(asdict(segment)) |
| f.write(json.dumps(payload) + "\n") |
| f.flush() |
|
|
| segment_writer = write_segment |
| for video_path in video_paths: |
| result = labeler.label_video( |
| video_path, |
| end_time=args.end_time, |
| segment_writer=lambda seg, vp=video_path: segment_writer(seg, vp) |
| ) |
| results['videos'].append(result) |
| else: |
| for video_path in video_paths: |
| result = labeler.label_video(video_path, end_time=args.end_time) |
| results['videos'].append(result) |
| |
| |
| if not stream_jsonl: |
| with open(args.output, 'w') as f: |
| json.dump(results, f, indent=2) |
| print(f"\n[Labeler] Results saved to: {args.output}") |
| else: |
| print(f"\n[Labeler] Segments appended to: {args.output}") |
| |
| |
| total_usable = sum(v.get('usable_duration_sec', 0) for v in results['videos']) |
| total_duration = sum(v.get('total_duration_sec', 0) for v in results['videos']) |
| print(f"[Labeler] Total usable: {total_usable/3600:.2f} hours / {total_duration/3600:.2f} hours") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|