"""Extract random frames from CCTV clips for YOLO fine-tuning. Design choices: - CLIP-LEVEL split: entire clips go to train or val, never mixed. Prevents frame-pair leakage (two near-duplicate frames in train AND val would inflate val metrics). - Sample evenly across time within each clip (not clustered at start). - Skip corrupted / blown-out frames (variance-based filter). """ from __future__ import annotations import argparse, json, random from pathlib import Path import cv2 import numpy as np def usable_frame(img) -> bool: if img is None or img.size == 0: return False # Variance-based corruption filter — blown-out HEVC frames have near-zero variance return float(img.var()) > 300.0 def main(): ap = argparse.ArgumentParser() ap.add_argument('--clips-root', default='/home/azureuser/helmet_v5/data/t4_clips') ap.add_argument('--out', default='/home/azureuser/helmet_v5/data/yolo_ft_frames') ap.add_argument('--frames-per-clip', type=int, default=150) ap.add_argument('--val-clip-ratio', type=float, default=0.15) ap.add_argument('--seed', type=int, default=42) args = ap.parse_args() random.seed(args.seed) out = Path(args.out) (out / 'train').mkdir(parents=True, exist_ok=True) (out / 'val').mkdir(parents=True, exist_ok=True) clip_dirs = [d for d in Path(args.clips_root).iterdir() if (d / 'full.mp4').exists() or (d / 'clean.mp4').exists()] clip_dirs.sort(); random.shuffle(clip_dirs) n_val = max(1, int(len(clip_dirs) * args.val_clip_ratio)) val_clips = set(d.name for d in clip_dirs[:n_val]) print(f'[split] clips={len(clip_dirs)} val={len(val_clips)}: {sorted(val_clips)}') manifest = [] for d in clip_dirs: mp4 = d / 'clean.mp4' if not mp4.exists(): mp4 = d / 'full.mp4' cap = cv2.VideoCapture(str(mp4)) n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) or 10 if n_frames <= 0: cap.release(); continue split = 'val' if d.name in val_clips else 'train' # Sample frame indices evenly across the clip target = args.frames_per_clip wanted = sorted(random.sample(range(n_frames), min(target * 3, n_frames))) saved_in_clip = 0 for fi in wanted: if saved_in_clip >= target: break cap.set(cv2.CAP_PROP_POS_FRAMES, fi) ok, img = cap.read() if not ok or not usable_frame(img): continue name = f'{d.name}__f{fi:06d}.jpg' cv2.imwrite(str(out / split / name), img, [cv2.IMWRITE_JPEG_QUALITY, 92]) manifest.append({'clip': d.name, 'frame': fi, 'split': split, 'file': f'{split}/{name}', 'time_sec': fi / fps}) saved_in_clip += 1 cap.release() print(f' {d.name} ({split}): saved {saved_in_clip}/{target}') (out / 'manifest.jsonl').write_text('\n'.join(json.dumps(r) for r in manifest)) n_tr = sum(1 for r in manifest if r['split'] == 'train') n_val = sum(1 for r in manifest if r['split'] == 'val') print(f'\n[done] total saved: train={n_tr}, val={n_val}, output: {out}') if __name__ == '__main__': main()