| |
| """ |
| Extract video features from Scene Camera videos using a pretrained backbone. |
| Uses CLIP (ViT-B/16) which is lightweight and doesn't need video-specific pretraining. |
| |
| Output: per-frame feature vectors saved as .npy files, aligned to 100Hz sensor data. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import glob |
| import argparse |
| import numpy as np |
| import cv2 |
| import torch |
| import torch.nn as nn |
| from torchvision import transforms |
|
|
| DATASET_DIR = "${PULSE_ROOT}/dataset" |
|
|
|
|
| class CLIPFeatureExtractor: |
| """Extract features using CLIP ViT-B/16 (via torchvision).""" |
|
|
| def __init__(self, device='cpu'): |
| self.device = device |
| |
| from torchvision.models import vit_b_16, ViT_B_16_Weights |
| weights = ViT_B_16_Weights.IMAGENET1K_V1 |
| model = vit_b_16(weights=weights) |
| |
| model.heads = nn.Identity() |
| model.eval() |
| self.model = model.to(device) |
| self.transform = weights.transforms() |
| self.feat_dim = 768 |
|
|
| @torch.no_grad() |
| def extract_batch(self, frames): |
| """Extract features from a batch of frames. |
| |
| Args: |
| frames: list of numpy arrays (H, W, 3) in BGR format |
| Returns: |
| features: numpy array (N, feat_dim) |
| """ |
| tensors = [] |
| for frame in frames: |
| |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| tensor = torch.from_numpy(rgb).permute(2, 0, 1).float() / 255.0 |
| tensor = self.transform(tensor) |
| tensors.append(tensor) |
|
|
| batch = torch.stack(tensors).to(self.device) |
| features = self.model(batch) |
| return features.cpu().numpy() |
|
|
|
|
| def find_scene_video(scenario_dir, vol, scenario): |
| """Find the Scene Camera video file.""" |
| pattern = os.path.join(scenario_dir, f"trimmed_{vol}{scenario}*Scene Cam.mp4") |
| matches = glob.glob(pattern) |
| return matches[0] if matches else None |
|
|
|
|
| def extract_features_for_video(extractor, video_path, target_fps=100, |
| batch_size=32, sample_fps=2): |
| """Extract features from a video file. |
| |
| Args: |
| extractor: feature extractor |
| video_path: path to video file |
| target_fps: target frame rate to align with sensor data (100Hz) |
| batch_size: batch size for feature extraction |
| sample_fps: extract features at this rate (e.g., 2 = every 0.5s) |
| Features are then interpolated to target_fps. |
| Returns: |
| features: numpy array (T_target, feat_dim) aligned to target_fps |
| """ |
| cap = cv2.VideoCapture(video_path) |
| video_fps = cap.get(cv2.CAP_PROP_FPS) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = total_frames / video_fps |
|
|
| |
| sample_interval = int(video_fps / sample_fps) |
| sample_indices = list(range(0, total_frames, sample_interval)) |
|
|
| print(f" Video: {total_frames} frames @ {video_fps:.1f}fps = {duration:.1f}s") |
| print(f" Sampling {len(sample_indices)} frames @ {sample_fps}fps") |
|
|
| |
| all_features = [] |
| batch_frames = [] |
| batch_indices = [] |
|
|
| for idx in sample_indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
| ret, frame = cap.read() |
| if not ret: |
| break |
| batch_frames.append(frame) |
| batch_indices.append(idx) |
|
|
| if len(batch_frames) >= batch_size: |
| feats = extractor.extract_batch(batch_frames) |
| all_features.append(feats) |
| batch_frames = [] |
| if len(all_features) % 10 == 0: |
| print(f" Processed {len(all_features) * batch_size} frames...") |
|
|
| if batch_frames: |
| feats = extractor.extract_batch(batch_frames) |
| all_features.append(feats) |
|
|
| cap.release() |
|
|
| if not all_features: |
| return None |
|
|
| features = np.concatenate(all_features, axis=0) |
| sample_times = np.array(batch_indices[:features.shape[0]]) / video_fps |
|
|
| |
| target_times = np.arange(0, duration, 1.0 / target_fps) |
| n_target = len(target_times) |
|
|
| |
| from scipy.interpolate import interp1d |
| if len(sample_times) < 2: |
| |
| interpolated = np.tile(features[0], (n_target, 1)) |
| else: |
| interp_func = interp1d( |
| sample_times, features, axis=0, |
| kind='linear', fill_value='extrapolate' |
| ) |
| interpolated = interp_func(target_times).astype(np.float32) |
|
|
| print(f" Output: {interpolated.shape} @ {target_fps}Hz") |
| return interpolated |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Extract video features') |
| parser.add_argument('--sample_fps', type=int, default=2, |
| help='Sample rate for feature extraction (default: 2fps)') |
| parser.add_argument('--batch_size', type=int, default=16, |
| help='Batch size for feature extraction') |
| parser.add_argument('--device', type=str, default='cuda', |
| help='Device (cuda or cpu)') |
| args = parser.parse_args() |
|
|
| device = args.device if torch.cuda.is_available() and args.device == 'cuda' else 'cpu' |
| print(f"Device: {device}") |
|
|
| print("Loading ViT-B/16 feature extractor...") |
| extractor = CLIPFeatureExtractor(device=device) |
| print(f"Feature dim: {extractor.feat_dim}") |
|
|
| |
| processed = 0 |
| skipped = 0 |
|
|
| for vol_dir in sorted(glob.glob(f"{DATASET_DIR}/v*")): |
| vol = os.path.basename(vol_dir) |
| for scenario_dir in sorted(glob.glob(f"{vol_dir}/s*")): |
| scenario = os.path.basename(scenario_dir) |
| output_path = os.path.join(scenario_dir, "video_features_100hz.npy") |
|
|
| |
| if os.path.exists(output_path): |
| print(f"[{vol}/{scenario}] Already exists, skipping") |
| skipped += 1 |
| continue |
|
|
| |
| video_path = find_scene_video(scenario_dir, vol, scenario) |
| if video_path is None: |
| print(f"[{vol}/{scenario}] No Scene Camera video found, skipping") |
| skipped += 1 |
| continue |
|
|
| print(f"\n[{vol}/{scenario}]") |
| print(f" Video: {os.path.basename(video_path)}") |
|
|
| features = extract_features_for_video( |
| extractor, video_path, |
| batch_size=args.batch_size, |
| sample_fps=args.sample_fps, |
| ) |
|
|
| if features is not None: |
| np.save(output_path, features) |
| print(f" Saved: {output_path} ({features.shape})") |
| processed += 1 |
| else: |
| print(f" FAILED: Could not extract features") |
|
|
| print(f"\n{'='*60}") |
| print(f"Done! Processed: {processed}, Skipped: {skipped}") |
| print(f"Feature files: {DATASET_DIR}/*/*/video_features_100hz.npy") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|