vr-hmr / scripts /label_videos.py
zirobtc's picture
Upload folder using huggingface_hub
7e120dd
#!/usr/bin/env python3
"""
Video Labeling Pipeline for GENMO Training Data
Automatically labels video footage to identify clips suitable for GENMO motion capture:
- Single person in frame (no multi-person scenes)
- Person consistently visible
- Filters out false positives (posters, stickers) via motion analysis
Usage:
python label_videos.py --video path/to/video.mp4 --output labels.json
python label_videos.py --video-dir path/to/videos/ --output labels.json
"""
import os
import sys
import json
import argparse
import time
import cv2
import torch
import numpy as np
from PIL import Image
from tqdm import tqdm
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple, Iterator, Callable
from collections import defaultdict
# Add GVHMR to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "third_party", "GVHMR"))
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
@dataclass
class Detection:
"""Single detection in a frame."""
bbox_xyxy: List[float] # [x1, y1, x2, y2]
confidence: float
area_pct: float # bbox area as percentage of frame
@dataclass
class Segment:
"""A continuous segment of video with labeling info."""
start_sec: float
end_sec: float
dynamic_persons: int
static_detections: int
avg_confidence: float
avg_bbox_area_pct: float
bbox_variance: float
usable: bool
reason: Optional[str] = None
@dataclass
class TrackState:
"""Streaming track state with running variance."""
last_bbox: List[float]
last_ts: float
count: int
mean_center: np.ndarray
m2_center: np.ndarray
mean_size: np.ndarray
m2_size: np.ndarray
class VitPoseValidator:
"""Validate that a bbox contains a complete person using ViTPose joints."""
HEAD_KP = {0, 1, 2, 3, 4} # nose, eyes, ears - any one visible = head visible
EXCLUDE_KP = {0, 1, 2, 3, 4} # nose, eyes, ears (excluded from body joint count)
UPPER_KP = {5, 6, 7, 8, 9, 10} # shoulders, elbows, wrists
LOWER_KP = {11, 12, 13, 14, 15, 16} # hips, knees, ankles
def __init__(
self,
config_path: str,
ckpt_path: str,
device: str,
min_joints: int,
conf_threshold: float,
require_upper_lower: bool,
min_vertical_span: float,
require_head: bool = True
):
try:
from mmpose.apis import init_model, inference_topdown
except Exception as exc:
raise RuntimeError(f"mmpose not available: {exc}") from exc
if not os.path.exists(config_path):
raise RuntimeError(f"ViTPose config not found: {config_path}")
if not os.path.exists(ckpt_path):
raise RuntimeError(f"ViTPose checkpoint not found: {ckpt_path}")
self._inference_topdown = inference_topdown
self.pose = init_model(config_path, ckpt_path, device=device)
self.pose.eval()
self.min_joints = int(min_joints)
self.conf_threshold = float(conf_threshold)
self.require_upper_lower = bool(require_upper_lower)
self.min_vertical_span = float(min_vertical_span)
self.require_head = bool(require_head)
@torch.no_grad()
def is_complete(self, frame_rgb: np.ndarray, bbox_xyxy: List[float]) -> bool:
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
x1, y1, x2, y2 = bbox_xyxy
bbox = np.array([[x1, y1, x2, y2]], dtype=np.float32)
results = self._inference_topdown(self.pose, frame_bgr, bboxes=bbox)
if not results:
return False
pred = results[0].pred_instances
scores = np.asarray(pred.keypoint_scores[0]).reshape(-1)
keypoints = np.asarray(pred.keypoints[0]).reshape(-1, 2)
count = 0
upper_count = 0
lower_count = 0
head_visible = False
ys = []
for idx, score in enumerate(scores):
# Check head visibility (nose, eyes, or ears)
if idx in self.HEAD_KP and float(score) >= self.conf_threshold:
head_visible = True
if idx in self.EXCLUDE_KP:
continue
if float(score) >= self.conf_threshold:
count += 1
if idx in self.UPPER_KP:
upper_count += 1
if idx in self.LOWER_KP:
lower_count += 1
ys.append(float(keypoints[idx][1]))
if self.require_head and not head_visible:
return False # Head not visible
if count < self.min_joints:
return False
if self.require_upper_lower and (upper_count == 0 or lower_count == 0):
return False
if self.min_vertical_span > 0.0 and len(ys) >= 2:
span = max(ys) - min(ys)
bbox_h = max(1.0, float(y2) - float(y1))
if (span / bbox_h) < self.min_vertical_span:
return False
return True
class VideoLabeler:
"""Labels videos for GENMO training suitability."""
# Thresholds
STATIC_VARIANCE_THRESHOLD = 50.0 # px² - below this = static object
MIN_CONFIDENCE = 0.4
MIN_BBOX_AREA_PCT = 0.01 # 1% of frame
MAX_BBOX_JUMP_RATIO = 0.5 # max center movement as ratio of bbox size
MIN_SEGMENT_DURATION = 10.0 # seconds
DUPLICATE_OVERLAP_THRESHOLD = 0.1
DUPLICATE_IOU_THRESHOLD = 0.2
DUPLICATE_CENTER_RATIO = 0.75
DUPLICATE_CENTER_ONLY_RATIO = 0.35
DUPLICATE_AREA_RATIO = 3.0
MULTI_PERSON_MIN_AREA_PCT = 0.08
MULTI_PERSON_REL_AREA = 0.35
LOW_CONF_SMOOTH_MAX_SEC = 2.0
def __init__(
self,
sample_fps: float = 1.0,
debug_dir: Optional[str] = None,
debug_all: bool = False,
vitpose_filter: bool = True, # Enabled by default to filter out animals/false positives
vitpose_filter_all: bool = True, # Validate all detections, not just multi-person frames
vitpose_min_joints: int = 4,
vitpose_conf_threshold: float = 0.3,
vitpose_require_upper_lower: bool = True,
vitpose_min_vertical_span: float = 0.35,
vitpose_config: Optional[str] = None,
vitpose_ckpt: Optional[str] = None
):
self.sample_fps = sample_fps
self.debug_dir = debug_dir
self.debug_all = debug_all
self.vitpose_filter = vitpose_filter
self.vitpose_filter_all = vitpose_filter_all
self.vitpose_min_joints = vitpose_min_joints
self.vitpose_conf_threshold = vitpose_conf_threshold
self.vitpose_require_upper_lower = vitpose_require_upper_lower
self.vitpose_min_vertical_span = vitpose_min_vertical_span
self.vitpose_config = vitpose_config
self.vitpose_ckpt = vitpose_ckpt
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.vitpose_validator = None
# Precision
if torch.cuda.is_available() and torch.cuda.get_device_properties(0).major >= 8:
self.dtype = torch.bfloat16
else:
self.dtype = torch.float16
print(f"[Labeler] Device: {self.device}, Precision: {self.dtype}")
# Initialize Grounding DINO
self._init_dino()
self._init_vitpose()
def _init_vitpose(self):
"""Initialize ViTPose for validation if enabled."""
if not self.vitpose_filter:
return
config_path = self.vitpose_config or os.path.join(
os.path.dirname(__file__),
"..",
"third_party",
"GVHMR",
"mmpose",
"configs",
"body_2d_keypoint",
"topdown_heatmap",
"coco",
"vitpose_huge_finetune.py"
)
ckpt_path = self.vitpose_ckpt or os.path.join(
os.path.dirname(__file__),
"..",
"third_party",
"GVHMR",
"work_dirs",
"best_coco_AP_epoch_1.pth"
)
try:
self.vitpose_validator = VitPoseValidator(
config_path=config_path,
ckpt_path=ckpt_path,
device=self.device,
min_joints=self.vitpose_min_joints,
conf_threshold=self.vitpose_conf_threshold,
require_upper_lower=self.vitpose_require_upper_lower,
min_vertical_span=self.vitpose_min_vertical_span
)
print("[Labeler] ViTPose validation enabled")
except Exception as exc:
print(f"[Labeler] ViTPose validation disabled: {exc}")
self.vitpose_validator = None
def _init_dino(self):
"""Initialize Grounding DINO model."""
model_id = "IDEA-Research/grounding-dino-tiny"
cache_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "third_party", "GVHMR", ".cache", "huggingface")
)
os.makedirs(cache_dir, exist_ok=True)
try:
self.processor = AutoProcessor.from_pretrained(
model_id, local_files_only=True, cache_dir=cache_dir
)
self.model = AutoModelForZeroShotObjectDetection.from_pretrained(
model_id, local_files_only=True, cache_dir=cache_dir
).to(self.device)
print("[Labeler] Loaded Grounding DINO from cache")
except Exception:
print("[Labeler] Downloading Grounding DINO...")
self.processor = AutoProcessor.from_pretrained(model_id, cache_dir=cache_dir)
self.model = AutoModelForZeroShotObjectDetection.from_pretrained(
model_id, cache_dir=cache_dir
).to(self.device)
self.text_prompt = "person."
self.box_threshold = 0.35 # Raised from 0.25 to reduce false positives (animals, etc.)
self.text_threshold = 0.3
def _iter_sampled_frames(
self,
video_path: str,
end_time: Optional[float] = None
) -> Tuple[Tuple[int, int], Optional[float], Iterator[Tuple[np.ndarray, float]]]:
"""Stream sampled frames at target FPS without loading all frames into memory."""
import subprocess
probe_cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height,duration',
'-of', 'csv=p=0',
video_path
]
try:
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
parts = result.stdout.strip().split(',')
width = int(parts[0])
height = int(parts[1])
duration = float(parts[2]) if len(parts) > 2 and parts[2] else None
except Exception as e:
print(f"[Labeler] ffprobe failed: {e}, falling back to OpenCV for metadata")
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps if fps > 0 else None
cap.release()
if duration and end_time is not None:
duration = min(duration, end_time)
ffmpeg_cmd = [
'ffmpeg', '-v', 'warning', '-nostdin',
'-i', video_path,
'-vf', f'fps={self.sample_fps}',
'-f', 'rawvideo',
'-pix_fmt', 'rgb24',
]
if end_time is not None:
ffmpeg_cmd += ['-t', str(end_time)]
ffmpeg_cmd.append('pipe:1')
print(f"[Labeler] Streaming frames at {self.sample_fps} fps using ffmpeg...")
process = subprocess.Popen(
ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
frame_size = width * height * 3
def iterator():
idx = 0
try:
while True:
raw = process.stdout.read(frame_size)
if raw is None or len(raw) < frame_size:
break
frame = np.frombuffer(raw, np.uint8).reshape((height, width, 3))
ts = idx / self.sample_fps
idx += 1
yield frame, ts
finally:
if process.stdout:
process.stdout.close()
process.wait()
return (width, height), duration, iterator()
def _nms(self, detections: List[Detection], iou_threshold: float = 0.5) -> List[Detection]:
"""Apply Non-Maximum Suppression to filter overlapping/contained detections."""
if len(detections) <= 1:
return detections
# Sort by confidence (highest first)
sorted_dets = sorted(detections, key=lambda d: d.confidence, reverse=True)
def compute_iou(box1, box2):
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area > 0 else 0
def compute_overlap_small(box1, box2):
"""Intersection over smaller area; higher when one box sits on the same person."""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
small_area = min(box1_area, box2_area)
return inter_area / small_area if small_area > 0 else 0
def box_diag(box):
width = max(0, box[2] - box[0])
height = max(0, box[3] - box[1])
return np.sqrt(width * width + height * height)
def is_contained(box_small, box_large, threshold=0.7):
"""Check if box_small is mostly contained within box_large."""
x1 = max(box_small[0], box_large[0])
y1 = max(box_small[1], box_large[1])
x2 = min(box_small[2], box_large[2])
y2 = min(box_small[3], box_large[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
small_area = (box_small[2] - box_small[0]) * (box_small[3] - box_small[1])
if small_area <= 0:
return False
return (inter_area / small_area) >= threshold
def is_near_duplicate(box1, box2, overlap_threshold=0.3, center_ratio=0.5):
"""Suppress boxes that likely describe the same person with weak IoU."""
overlap_small = compute_overlap_small(box1, box2)
if overlap_small < overlap_threshold:
return False
c1 = ((box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2)
c2 = ((box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2)
dist = np.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2)
max_diag = max(box_diag(box1), box_diag(box2))
return dist <= (center_ratio * max_diag)
keep = []
while sorted_dets:
best = sorted_dets.pop(0)
keep.append(best)
# Remove detections that overlap OR are contained within the best detection
sorted_dets = [d for d in sorted_dets
if compute_iou(best.bbox_xyxy, d.bbox_xyxy) < iou_threshold
and not is_contained(d.bbox_xyxy, best.bbox_xyxy)
and not is_near_duplicate(best.bbox_xyxy, d.bbox_xyxy)]
return keep
def _dedupe_nearby(
self,
detections: List[Detection],
overlap_threshold: Optional[float] = None,
iou_threshold: Optional[float] = None,
center_ratio: Optional[float] = None,
center_only_ratio: Optional[float] = None,
area_ratio: Optional[float] = None
) -> List[Detection]:
"""Merge nearby detections that likely describe the same person."""
if len(detections) <= 1:
return detections
overlap_threshold = self.DUPLICATE_OVERLAP_THRESHOLD if overlap_threshold is None else overlap_threshold
iou_threshold = self.DUPLICATE_IOU_THRESHOLD if iou_threshold is None else iou_threshold
center_ratio = self.DUPLICATE_CENTER_RATIO if center_ratio is None else center_ratio
center_only_ratio = 0.0 if center_only_ratio is None else center_only_ratio
area_ratio = self.DUPLICATE_AREA_RATIO if area_ratio is None else area_ratio
def compute_iou(box1, box2):
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area > 0 else 0
def compute_overlap_small(box1, box2):
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
small_area = min(box1_area, box2_area)
return inter_area / small_area if small_area > 0 else 0
def box_diag(box):
width = max(0, box[2] - box[0])
height = max(0, box[3] - box[1])
return np.sqrt(width * width + height * height)
def center_distance(box1, box2):
c1 = ((box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2)
c2 = ((box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2)
return np.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2)
n = len(detections)
parent = list(range(n))
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
def should_merge(box1, box2):
iou = compute_iou(box1, box2)
overlap_small = compute_overlap_small(box1, box2)
max_diag = max(box_diag(box1), box_diag(box2))
if max_diag <= 0:
return False
dist = center_distance(box1, box2)
if iou >= iou_threshold or overlap_small >= overlap_threshold:
return dist <= (center_ratio * max_diag)
if center_only_ratio > 0.0:
area1 = max(0.0, (box1[2] - box1[0]) * (box1[3] - box1[1]))
area2 = max(0.0, (box2[2] - box2[0]) * (box2[3] - box2[1]))
if area1 <= 0 or area2 <= 0:
return False
ratio = max(area1, area2) / min(area1, area2)
if ratio <= area_ratio:
return dist <= (center_only_ratio * max_diag)
return False
for i in range(n):
box_i = detections[i].bbox_xyxy
for j in range(i + 1, n):
box_j = detections[j].bbox_xyxy
if should_merge(box_i, box_j):
union(i, j)
best_by_root = {}
for idx, det in enumerate(detections):
root = find(idx)
if root not in best_by_root or det.confidence > best_by_root[root].confidence:
best_by_root[root] = det
return list(best_by_root.values())
def _detect_frame(self, frame: np.ndarray, width: int, height: int) -> List[Detection]:
"""Run DINO detection on a single frame."""
frame_area = width * height
img = Image.fromarray(frame)
with torch.inference_mode():
inputs = self.processor(
images=img,
text=self.text_prompt,
return_tensors="pt"
).to(self.device)
outputs = self.model(**inputs)
results = self.processor.post_process_grounded_object_detection(
outputs,
inputs.input_ids,
threshold=self.box_threshold,
text_threshold=self.text_threshold,
target_sizes=[img.size[::-1]] # (height, width)
)
frame_dets = []
if len(results) > 0 and 'boxes' in results[0]:
boxes = results[0]['boxes'].cpu().numpy()
scores = results[0]['scores'].cpu().numpy()
for box, score in zip(boxes, scores):
x1, y1, x2, y2 = box
area = (x2 - x1) * (y2 - y1)
area_pct = area / frame_area
if area_pct < self.MIN_BBOX_AREA_PCT:
continue
frame_dets.append(Detection(
bbox_xyxy=[float(x1), float(y1), float(x2), float(y2)],
confidence=float(score),
area_pct=float(area_pct)
))
frame_dets = self._nms(frame_dets, iou_threshold=0.5)
frame_dets = self._dedupe_nearby(frame_dets)
return frame_dets
def _detect_batch(self, frames: List[np.ndarray], width: int, height: int) -> List[List[Detection]]:
"""Run DINO detection on a list of frames."""
all_detections = []
for frame in tqdm(frames, desc="DINO detection"):
all_detections.append(self._detect_frame(frame, width, height))
return all_detections
def _save_debug_frame(
self,
frame: np.ndarray,
frame_idx: int,
timestamp: float,
detections: List[Detection],
out_dir: str,
save_all: bool = False
) -> Optional[Dict]:
"""Save a single debug frame with detection boxes drawn."""
if not save_all and len(detections) <= 1:
return None
frame_dir = os.path.join(out_dir, "frames")
os.makedirs(frame_dir, exist_ok=True)
colors = [
(0, 255, 0),
(0, 0, 255),
(255, 0, 0),
(0, 255, 255),
(255, 0, 255),
(255, 255, 0),
]
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
h, w = frame_bgr.shape[:2]
for det_idx, det in enumerate(detections):
x1, y1, x2, y2 = det.bbox_xyxy
x1 = int(max(0, min(w - 1, round(x1))))
y1 = int(max(0, min(h - 1, round(y1))))
x2 = int(max(0, min(w - 1, round(x2))))
y2 = int(max(0, min(h - 1, round(y2))))
color = colors[det_idx % len(colors)]
cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), color, 2)
label = f"{det_idx} {det.confidence:.2f}"
cv2.putText(
frame_bgr,
label,
(x1 + 4, max(10, y1 - 6)),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
1,
cv2.LINE_AA
)
filename = f"frame_{frame_idx:06d}_t{timestamp:.2f}_n{len(detections)}.jpg"
out_path = os.path.join(frame_dir, filename)
cv2.imwrite(out_path, frame_bgr)
return {
"frame_idx": frame_idx,
"timestamp": float(timestamp),
"num_detections": len(detections),
"image": filename,
"detections": [
{
"bbox_xyxy": det.bbox_xyxy,
"confidence": det.confidence,
"area_pct": det.area_pct
} for det in detections
]
}
def _filter_frame_detections_with_vitpose(
self,
frame: np.ndarray,
detections: List[Detection]
) -> List[Detection]:
"""Filter detections that look like partial people (head/limbs)."""
if not self.vitpose_validator or not detections:
return detections
if not self.vitpose_filter_all and len(detections) <= 1:
return detections
keep = []
for det in detections:
if self.vitpose_validator.is_complete(frame, det.bbox_xyxy):
keep.append(det)
return keep
def _build_tracks(self, detections: List[List[Detection]], timestamps: List[float],
img_width: int = 1920, img_height: int = 1080) -> Dict[int, Dict]:
"""Build detection tracks over time using center distance matching.
IoU-based tracking fails at 1fps because the person moves too much.
Instead, use center distance - match to the nearest previous detection.
"""
tracks = {} # track_id -> {timestamps, bboxes, confidences}
next_track_id = 0
active_tracks = {} # track_id -> last_bbox
# Max distance threshold: 50% of image diagonal (for fast motion at 1fps)
img_diagonal = np.sqrt(img_width**2 + img_height**2)
MAX_DISTANCE = img_diagonal * 0.5
def bbox_center(box):
return ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2)
def center_distance(box1, box2):
c1 = bbox_center(box1)
c2 = bbox_center(box2)
return np.sqrt((c1[0] - c2[0])**2 + (c1[1] - c2[1])**2)
for frame_idx, (frame_dets, ts) in enumerate(zip(detections, timestamps)):
matched_tracks = set()
unmatched_dets = list(range(len(frame_dets)))
# Match detections to existing tracks by nearest center
for track_id, last_bbox in list(active_tracks.items()):
best_dist = float('inf')
best_det_idx = None
for det_idx in unmatched_dets:
dist = center_distance(last_bbox, frame_dets[det_idx].bbox_xyxy)
if dist < best_dist and dist <= MAX_DISTANCE:
best_dist = dist
best_det_idx = det_idx
if best_det_idx is not None:
det = frame_dets[best_det_idx]
tracks[track_id]['timestamps'].append(ts)
tracks[track_id]['bboxes'].append(det.bbox_xyxy)
tracks[track_id]['confidences'].append(det.confidence)
tracks[track_id]['areas'].append(det.area_pct)
active_tracks[track_id] = det.bbox_xyxy
matched_tracks.add(track_id)
unmatched_dets.remove(best_det_idx)
# Create new tracks for unmatched detections
for det_idx in unmatched_dets:
det = frame_dets[det_idx]
tracks[next_track_id] = {
'timestamps': [ts],
'bboxes': [det.bbox_xyxy],
'confidences': [det.confidence],
'areas': [det.area_pct]
}
active_tracks[next_track_id] = det.bbox_xyxy
next_track_id += 1
# Remove stale tracks (not seen in 3 seconds)
stale_threshold = 3.0
for track_id in list(active_tracks.keys()):
if track_id not in matched_tracks:
last_ts = tracks[track_id]['timestamps'][-1]
if ts - last_ts > stale_threshold:
del active_tracks[track_id]
return tracks
def _is_dynamic_track(self, track: TrackState) -> bool:
"""Decide dynamic/static using running variance."""
if track.count < 3:
return True
center_var = (track.m2_center / max(1, track.count - 1)).sum()
size_var = (track.m2_size / max(1, track.count - 1)).sum()
total_variance = center_var + size_var
return total_variance >= self.STATIC_VARIANCE_THRESHOLD
def _update_track_stats(self, track: TrackState, bbox_xyxy: List[float]) -> None:
"""Update running mean/variance for a track."""
x1, y1, x2, y2 = bbox_xyxy
center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32)
size = np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32)
track.count += 1
delta_c = center - track.mean_center
track.mean_center += delta_c / track.count
track.m2_center += delta_c * (center - track.mean_center)
delta_s = size - track.mean_size
track.mean_size += delta_s / track.count
track.m2_size += delta_s * (size - track.mean_size)
def _classify_tracks(self, tracks: Dict[int, Dict]) -> Tuple[List[int], List[int]]:
"""Classify tracks as dynamic (real person) or static (poster/sticker)."""
dynamic_tracks = []
static_tracks = []
for track_id, track in tracks.items():
bboxes = np.array(track['bboxes'])
if len(bboxes) < 3:
# Too short to classify reliably - assume dynamic (real person)
dynamic_tracks.append(track_id)
continue
# Compute bbox center variance
centers = (bboxes[:, :2] + bboxes[:, 2:]) / 2 # (N, 2)
center_variance = np.var(centers, axis=0).sum() # px²
# Also check if bbox size changes (person moving closer/farther)
sizes = bboxes[:, 2:] - bboxes[:, :2] # (N, 2) widths and heights
size_variance = np.var(sizes, axis=0).sum()
total_variance = center_variance + size_variance
if total_variance < self.STATIC_VARIANCE_THRESHOLD:
static_tracks.append(track_id)
else:
dynamic_tracks.append(track_id)
return dynamic_tracks, static_tracks
def _create_segments(
self,
tracks: Dict[int, Dict],
dynamic_tracks: List[int],
static_tracks: List[int],
timestamps: List[float]
) -> List[Segment]:
"""Create time segments with labeling info."""
if not timestamps:
return []
video_duration = timestamps[-1]
segments = []
# Build per-second person count
time_bins = defaultdict(lambda: {'dynamic': {}, 'static': set()})
for track_id in dynamic_tracks:
track = tracks[track_id]
for ts, bbox, conf, area in zip(
track['timestamps'],
track['bboxes'],
track['confidences'],
track['areas']
):
sec = int(ts)
det = Detection(
bbox_xyxy=list(bbox),
confidence=float(conf),
area_pct=float(area)
)
existing = time_bins[sec]['dynamic'].get(track_id)
if existing is None or det.confidence > existing.confidence:
time_bins[sec]['dynamic'][track_id] = det
for track_id in static_tracks:
track = tracks[track_id]
for ts in track['timestamps']:
sec = int(ts)
time_bins[sec]['static'].add(track_id)
# Merge consecutive seconds with same characteristics
# Use ceil to ensure we cover the full video duration without gaps
import math
max_sec = math.ceil(video_duration)
current_segment = None
for sec in range(max_sec + 1):
bin_data = time_bins.get(sec, {'dynamic': {}, 'static': set()})
detections = list(bin_data['dynamic'].values())
detections = self._dedupe_nearby(
detections,
center_only_ratio=self.DUPLICATE_CENTER_ONLY_RATIO,
area_ratio=self.DUPLICATE_AREA_RATIO
)
n_dynamic = len(detections)
n_static = len(bin_data['static'])
avg_conf = np.mean([d.confidence for d in detections]) if detections else 0.0
avg_area = np.mean([d.area_pct for d in detections]) if detections else 0.0
# Determine usability
usable = n_dynamic == 1 and avg_conf >= self.MIN_CONFIDENCE and avg_area >= self.MIN_BBOX_AREA_PCT
reason = None
if n_dynamic == 0:
reason = "no_person"
elif n_dynamic > 1:
reason = "multiple_persons"
elif avg_conf < self.MIN_CONFIDENCE:
reason = "low_confidence"
elif avg_area < self.MIN_BBOX_AREA_PCT:
reason = "person_too_small"
# Check if we should start a new segment
if current_segment is None:
current_segment = {
'start_sec': sec,
'end_sec': sec + 1,
'n_dynamic': n_dynamic,
'n_static': n_static,
'confs': [d.confidence for d in detections],
'areas': [d.area_pct for d in detections],
'usable': usable,
'reason': reason
}
elif (current_segment['n_dynamic'] == n_dynamic and
current_segment['usable'] == usable and
current_segment['reason'] == reason):
# Extend current segment
current_segment['end_sec'] = sec + 1
current_segment['confs'].extend([d.confidence for d in detections])
current_segment['areas'].extend([d.area_pct for d in detections])
else:
# Finish current segment and start new one
segments.append(self._finalize_segment(current_segment))
current_segment = {
'start_sec': sec,
'end_sec': sec + 1,
'n_dynamic': n_dynamic,
'n_static': n_static,
'confs': [d.confidence for d in detections],
'areas': [d.area_pct for d in detections],
'usable': usable,
'reason': reason
}
if current_segment:
segments.append(self._finalize_segment(current_segment))
# Keep all segments - the usability field indicates whether each segment is good for training
# Note: MIN_SEGMENT_DURATION is used to determine which segments count toward usable_duration,
# but all segments are included in output for complete coverage
return segments
def _finalize_segment(self, seg_data: Dict) -> Segment:
"""Convert segment data to Segment dataclass."""
return Segment(
start_sec=float(seg_data['start_sec']),
end_sec=float(seg_data['end_sec']),
dynamic_persons=int(seg_data['n_dynamic']),
static_detections=int(seg_data['n_static']),
avg_confidence=float(np.mean(seg_data['confs'])) if seg_data['confs'] else 0.0,
avg_bbox_area_pct=float(np.mean(seg_data['areas'])) if seg_data['areas'] else 0.0,
bbox_variance=0.0, # TODO: compute if needed
usable=bool(seg_data['usable']), # Cast to native Python bool for JSON
reason=seg_data['reason']
)
def label_video(
self,
video_path: str,
end_time: Optional[float] = None,
segment_writer: Optional[Callable[[Segment], None]] = None
) -> Dict:
"""Label a single video and return results."""
print(f"\n[Labeler] Processing: {video_path}")
# Step 1: Stream sampled frames
(width, height), duration, frame_iter = self._iter_sampled_frames(video_path, end_time=end_time)
frame_count = 0
last_ts = None
debug_meta = []
if self.debug_dir:
video_tag = os.path.splitext(os.path.basename(video_path))[0]
out_dir = os.path.join(self.debug_dir, video_tag)
os.makedirs(out_dir, exist_ok=True)
else:
out_dir = None
total_before = 0
total_after = 0
active_tracks: Dict[int, TrackState] = {}
next_track_id = 0
img_diagonal = np.sqrt(width**2 + height**2)
max_distance = img_diagonal * 0.5
current_sec = None
sec_dynamic: Dict[int, Detection] = {}
sec_static: set = set()
current_segment = None
segments = []
usable_duration = 0.0
total_segments = 0
pending_segments: List[Dict] = []
def emit_segment(seg_data: Dict):
nonlocal usable_duration, total_segments
segment = self._finalize_segment(seg_data)
total_segments += 1
if segment.usable:
usable_duration += (segment.end_sec - segment.start_sec)
if segment_writer:
segment_writer(segment)
else:
segments.append(segment)
def should_merge_low_conf(prev_seg: Dict, mid_seg: Dict, next_seg: Dict) -> bool:
if mid_seg['reason'] != "low_confidence":
return False
if (mid_seg['end_sec'] - mid_seg['start_sec']) >= self.LOW_CONF_SMOOTH_MAX_SEC:
return False
return (
prev_seg['n_dynamic'] == next_seg['n_dynamic']
and prev_seg['usable'] == next_seg['usable']
and prev_seg['reason'] == next_seg['reason']
)
def merge_triplet(prev_seg: Dict, mid_seg: Dict, next_seg: Dict) -> Dict:
return {
'start_sec': prev_seg['start_sec'],
'end_sec': next_seg['end_sec'],
'n_dynamic': prev_seg['n_dynamic'],
'n_static': max(prev_seg['n_static'], mid_seg['n_static'], next_seg['n_static']),
'confs': prev_seg['confs'] + mid_seg['confs'] + next_seg['confs'],
'areas': prev_seg['areas'] + mid_seg['areas'] + next_seg['areas'],
'usable': prev_seg['usable'],
'reason': prev_seg['reason']
}
def queue_segment(seg_data: Dict):
pending_segments.append(seg_data)
while len(pending_segments) >= 3:
prev_seg, mid_seg, next_seg = pending_segments[0], pending_segments[1], pending_segments[2]
if should_merge_low_conf(prev_seg, mid_seg, next_seg):
merged = merge_triplet(prev_seg, mid_seg, next_seg)
pending_segments[:3] = [merged]
else:
emit_segment(pending_segments.pop(0))
def flush_pending_segments():
while pending_segments:
emit_segment(pending_segments.pop(0))
def finalize_sec(sec_idx: int, dynamic_map: Dict[int, Detection], static_set: set):
nonlocal current_segment
detections_list = list(dynamic_map.values())
detections_list = self._dedupe_nearby(
detections_list,
center_only_ratio=self.DUPLICATE_CENTER_ONLY_RATIO,
area_ratio=self.DUPLICATE_AREA_RATIO
)
if len(detections_list) > 1:
max_area = max(d.area_pct for d in detections_list)
detections_list = [
d for d in detections_list
if d.area_pct >= self.MULTI_PERSON_MIN_AREA_PCT
and d.area_pct >= (max_area * self.MULTI_PERSON_REL_AREA)
]
n_dynamic = len(detections_list)
n_static = len(static_set)
avg_conf = np.mean([d.confidence for d in detections_list]) if detections_list else 0.0
avg_area = np.mean([d.area_pct for d in detections_list]) if detections_list else 0.0
usable = n_dynamic == 1 and avg_conf >= self.MIN_CONFIDENCE and avg_area >= self.MIN_BBOX_AREA_PCT
reason = None
if n_dynamic == 0:
reason = "no_person"
elif n_dynamic > 1:
reason = "multiple_persons"
elif avg_conf < self.MIN_CONFIDENCE:
reason = "low_confidence"
elif avg_area < self.MIN_BBOX_AREA_PCT:
reason = "person_too_small"
if current_segment is None:
current_segment = {
'start_sec': sec_idx,
'end_sec': sec_idx + 1,
'n_dynamic': n_dynamic,
'n_static': n_static,
'confs': [d.confidence for d in detections_list],
'areas': [d.area_pct for d in detections_list],
'usable': usable,
'reason': reason
}
elif (current_segment['n_dynamic'] == n_dynamic and
current_segment['usable'] == usable and
current_segment['reason'] == reason):
current_segment['end_sec'] = sec_idx + 1
current_segment['confs'].extend([d.confidence for d in detections_list])
current_segment['areas'].extend([d.area_pct for d in detections_list])
else:
queue_segment(current_segment)
current_segment = {
'start_sec': sec_idx,
'end_sec': sec_idx + 1,
'n_dynamic': n_dynamic,
'n_static': n_static,
'confs': [d.confidence for d in detections_list],
'areas': [d.area_pct for d in detections_list],
'usable': usable,
'reason': reason
}
def finalize_missing_secs(start_sec: int, end_sec: int):
for missing_sec in range(start_sec, end_sec + 1):
finalize_sec(missing_sec, {}, set())
start_time = time.time()
last_log = start_time
for idx, (frame, ts) in enumerate(frame_iter):
frame_count += 1
last_ts = ts
frame_dets = self._detect_frame(frame, width, height)
total_before += len(frame_dets)
frame_dets = self._filter_frame_detections_with_vitpose(frame, frame_dets)
total_after += len(frame_dets)
# Track matching
matched_tracks = set()
unmatched_dets = list(range(len(frame_dets)))
assignments: Dict[int, int] = {}
for track_id, track in list(active_tracks.items()):
best_dist = float('inf')
best_det_idx = None
for det_idx in unmatched_dets:
det = frame_dets[det_idx]
x1, y1, x2, y2 = det.bbox_xyxy
cx = (x1 + x2) * 0.5
cy = (y1 + y2) * 0.5
last = track.last_bbox
lx = (last[0] + last[2]) * 0.5
ly = (last[1] + last[3]) * 0.5
dist = np.sqrt((cx - lx)**2 + (cy - ly)**2)
if dist < best_dist and dist <= max_distance:
best_dist = dist
best_det_idx = det_idx
if best_det_idx is not None:
det = frame_dets[best_det_idx]
track.last_bbox = det.bbox_xyxy
track.last_ts = ts
self._update_track_stats(track, det.bbox_xyxy)
matched_tracks.add(track_id)
assignments[best_det_idx] = track_id
unmatched_dets.remove(best_det_idx)
for det_idx in unmatched_dets:
det = frame_dets[det_idx]
x1, y1, x2, y2 = det.bbox_xyxy
center = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=np.float32)
size = np.array([max(1.0, x2 - x1), max(1.0, y2 - y1)], dtype=np.float32)
active_tracks[next_track_id] = TrackState(
last_bbox=det.bbox_xyxy,
last_ts=ts,
count=1,
mean_center=center.copy(),
m2_center=np.zeros_like(center),
mean_size=size.copy(),
m2_size=np.zeros_like(size)
)
assignments[det_idx] = next_track_id
matched_tracks.add(next_track_id)
next_track_id += 1
# Remove stale tracks (not seen in 3 seconds)
stale_threshold = 3.0
for track_id in list(active_tracks.keys()):
if track_id not in matched_tracks:
last_ts = active_tracks[track_id].last_ts
if ts - last_ts > stale_threshold:
del active_tracks[track_id]
sec = int(ts)
if current_sec is None:
current_sec = sec
elif sec > current_sec:
finalize_sec(current_sec, sec_dynamic, sec_static)
if sec > current_sec + 1:
finalize_missing_secs(current_sec + 1, sec - 1)
sec_dynamic = {}
sec_static = set()
current_sec = sec
for det_idx, det in enumerate(frame_dets):
track_id = assignments.get(det_idx)
if track_id is None or track_id not in active_tracks:
continue
if self._is_dynamic_track(active_tracks[track_id]):
existing = sec_dynamic.get(track_id)
if existing is None or det.confidence > existing.confidence:
sec_dynamic[track_id] = det
else:
sec_static.add(track_id)
if out_dir:
meta = self._save_debug_frame(
frame,
idx,
ts,
frame_dets,
out_dir,
save_all=self.debug_all
)
if meta:
debug_meta.append(meta)
now = time.time()
if now - last_log >= 30.0:
elapsed = now - start_time
fps = frame_count / elapsed if elapsed > 0 else 0.0
if duration:
pct = min(100.0, (ts / duration) * 100.0) if duration > 0 else 0.0
print(f"[Labeler] Progress: {frame_count} frames, t={ts:.1f}s ({pct:.1f}%), {fps:.2f} fps")
else:
print(f"[Labeler] Progress: {frame_count} frames, t={ts:.1f}s, {fps:.2f} fps")
last_log = now
if current_sec is not None:
finalize_sec(current_sec, sec_dynamic, sec_static)
if current_segment:
queue_segment(current_segment)
flush_pending_segments()
if self.vitpose_validator:
print(f"[Labeler] ViTPose filtered detections: {total_before} -> {total_after}")
if out_dir:
meta_path = os.path.join(out_dir, "detections.json")
with open(meta_path, "w") as f:
json.dump(debug_meta, f, indent=2)
if frame_count == 0:
return {'video': video_path, 'error': 'No frames extracted', 'segments': []}
# Step 3: Build tracks
tracks = self._build_tracks(detections, timestamps, width, height)
print(f"[Labeler] Found {len(tracks)} detection tracks")
# Step 4: Classify tracks as dynamic/static
dynamic_tracks, static_tracks = self._classify_tracks(tracks)
print(f"[Labeler] Dynamic (person): {len(dynamic_tracks)}, Static (poster/sticker): {len(static_tracks)}")
# Step 5: Create segments
# Summary
total_duration = duration if duration is not None else (last_ts if last_ts is not None else 0)
print(f"[Labeler] Found {usable_duration:.0f}s usable ({total_duration:.0f}s total)")
return {
'video': os.path.abspath(video_path),
'total_duration_sec': total_duration,
'usable_duration_sec': usable_duration,
'num_segments': total_segments if segment_writer else len(segments),
'segments': [asdict(s) for s in segments] if not segment_writer else []
}
def main():
parser = argparse.ArgumentParser(description='Label videos for GENMO training suitability')
parser.add_argument('--video', type=str, help='Path to a single video file')
parser.add_argument('--video-dir', type=str, help='Path to directory containing videos')
parser.add_argument('--output', type=str, required=True, help='Output JSON file path')
parser.add_argument('--sample-fps', type=float, default=1.0, help='Frames per second to sample (default: 1.0)')
parser.add_argument('--end-time', type=float, default=None, help='Only process first N seconds of video')
parser.add_argument('--debug-dir', type=str, default=None, help='Directory to save debug frames with bboxes')
parser.add_argument('--debug-all', action='store_true', help='Save debug frames for all detections (default: only multi-person frames)')
parser.add_argument('--vitpose-filter', action='store_true', help='Filter detections using ViTPose joint visibility')
parser.add_argument('--vitpose-filter-all', action='store_true', help='Apply ViTPose filtering to all frames')
parser.add_argument('--vitpose-min-joints', type=int, default=4, help='Minimum visible joints (excluding face) to keep')
parser.add_argument('--vitpose-conf-threshold', type=float, default=0.3, help='Minimum joint confidence for ViTPose')
parser.add_argument('--vitpose-disable-upper-lower', action='store_true', help='Disable upper/lower body joint requirement')
parser.add_argument('--vitpose-min-vertical-span', type=float, default=0.35, help='Min joint vertical span ratio within bbox')
parser.add_argument('--vitpose-config', type=str, default=None, help='ViTPose config path')
parser.add_argument('--vitpose-ckpt', type=str, default=None, help='ViTPose checkpoint path')
parser.add_argument('--stream-jsonl', action='store_true', help='Stream segments as JSON Lines (append)')
args = parser.parse_args()
if not args.video and not args.video_dir:
parser.error("Must specify either --video or --video-dir")
# Collect video paths
video_paths = []
if args.video:
video_paths.append(args.video)
if args.video_dir:
for fname in os.listdir(args.video_dir):
if fname.endswith(('.mp4', '.avi', '.mov', '.mkv')):
video_paths.append(os.path.join(args.video_dir, fname))
print(f"[Labeler] Found {len(video_paths)} video(s) to process")
# Initialize labeler
labeler = VideoLabeler(
sample_fps=args.sample_fps,
debug_dir=args.debug_dir,
debug_all=args.debug_all,
vitpose_filter=args.vitpose_filter,
vitpose_filter_all=args.vitpose_filter_all,
vitpose_min_joints=args.vitpose_min_joints,
vitpose_conf_threshold=args.vitpose_conf_threshold,
vitpose_require_upper_lower=not args.vitpose_disable_upper_lower,
vitpose_min_vertical_span=args.vitpose_min_vertical_span,
vitpose_config=args.vitpose_config,
vitpose_ckpt=args.vitpose_ckpt
)
# Process each video
results = {'videos': []}
segment_writer = None
output_path = args.output
stream_jsonl = args.stream_jsonl or output_path.endswith(".jsonl")
if stream_jsonl:
os.makedirs(os.path.dirname(os.path.abspath(output_path)) or ".", exist_ok=True)
with open(output_path, "a") as f:
def write_segment(segment: Segment, video_path: str):
payload = {'video': os.path.abspath(video_path)}
payload.update(asdict(segment))
f.write(json.dumps(payload) + "\n")
f.flush()
segment_writer = write_segment
for video_path in video_paths:
result = labeler.label_video(
video_path,
end_time=args.end_time,
segment_writer=lambda seg, vp=video_path: segment_writer(seg, vp)
)
results['videos'].append(result)
else:
for video_path in video_paths:
result = labeler.label_video(video_path, end_time=args.end_time)
results['videos'].append(result)
# Save results
if not stream_jsonl:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
print(f"\n[Labeler] Results saved to: {args.output}")
else:
print(f"\n[Labeler] Segments appended to: {args.output}")
# Print summary
total_usable = sum(v.get('usable_duration_sec', 0) for v in results['videos'])
total_duration = sum(v.get('total_duration_sec', 0) for v in results['videos'])
print(f"[Labeler] Total usable: {total_usable/3600:.2f} hours / {total_duration/3600:.2f} hours")
if __name__ == '__main__':
main()