helmet-v5 / tools /extract_ft_frames.py
vivekvar's picture
Initial push: helmet v5 code + trained models
e90abd8 verified
"""Extract random frames from CCTV clips for YOLO fine-tuning.
Design choices:
- CLIP-LEVEL split: entire clips go to train or val, never mixed. Prevents
frame-pair leakage (two near-duplicate frames in train AND val would
inflate val metrics).
- Sample evenly across time within each clip (not clustered at start).
- Skip corrupted / blown-out frames (variance-based filter).
"""
from __future__ import annotations
import argparse, json, random
from pathlib import Path
import cv2
import numpy as np
def usable_frame(img) -> bool:
if img is None or img.size == 0: return False
# Variance-based corruption filter — blown-out HEVC frames have near-zero variance
return float(img.var()) > 300.0
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--clips-root', default='/home/azureuser/helmet_v5/data/t4_clips')
ap.add_argument('--out', default='/home/azureuser/helmet_v5/data/yolo_ft_frames')
ap.add_argument('--frames-per-clip', type=int, default=150)
ap.add_argument('--val-clip-ratio', type=float, default=0.15)
ap.add_argument('--seed', type=int, default=42)
args = ap.parse_args()
random.seed(args.seed)
out = Path(args.out)
(out / 'train').mkdir(parents=True, exist_ok=True)
(out / 'val').mkdir(parents=True, exist_ok=True)
clip_dirs = [d for d in Path(args.clips_root).iterdir()
if (d / 'full.mp4').exists() or (d / 'clean.mp4').exists()]
clip_dirs.sort(); random.shuffle(clip_dirs)
n_val = max(1, int(len(clip_dirs) * args.val_clip_ratio))
val_clips = set(d.name for d in clip_dirs[:n_val])
print(f'[split] clips={len(clip_dirs)} val={len(val_clips)}: {sorted(val_clips)}')
manifest = []
for d in clip_dirs:
mp4 = d / 'clean.mp4'
if not mp4.exists(): mp4 = d / 'full.mp4'
cap = cv2.VideoCapture(str(mp4))
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 10
if n_frames <= 0:
cap.release(); continue
split = 'val' if d.name in val_clips else 'train'
# Sample frame indices evenly across the clip
target = args.frames_per_clip
wanted = sorted(random.sample(range(n_frames), min(target * 3, n_frames)))
saved_in_clip = 0
for fi in wanted:
if saved_in_clip >= target: break
cap.set(cv2.CAP_PROP_POS_FRAMES, fi)
ok, img = cap.read()
if not ok or not usable_frame(img): continue
name = f'{d.name}__f{fi:06d}.jpg'
cv2.imwrite(str(out / split / name), img, [cv2.IMWRITE_JPEG_QUALITY, 92])
manifest.append({'clip': d.name, 'frame': fi, 'split': split,
'file': f'{split}/{name}', 'time_sec': fi / fps})
saved_in_clip += 1
cap.release()
print(f' {d.name} ({split}): saved {saved_in_clip}/{target}')
(out / 'manifest.jsonl').write_text('\n'.join(json.dumps(r) for r in manifest))
n_tr = sum(1 for r in manifest if r['split'] == 'train')
n_val = sum(1 for r in manifest if r['split'] == 'val')
print(f'\n[done] total saved: train={n_tr}, val={n_val}, output: {out}')
if __name__ == '__main__':
main()