""" video_processor.py ================== Processes a folder of dashcam videos: 1. Splits each 20-second video into 3-second clips 2. Runs the LK optical flow pipeline on each clip 3. Encodes each clip as a 53-dim trajectory embedding 4. Saves everything to a .pkl file Usage: python video_processor.py \ --video_dir /path/to/videos \ --output_pkl trajectory_db.pkl \ --max_videos 10 """ import os import cv2 import pickle import argparse import numpy as np from pathlib import Path from tqdm import tqdm from typing import Dict, List, Optional from trajectory_extractor import ( extract_clip_frames, compute_lk_flow_clip, encode_trajectory, CLIP_FRAMES, FPS, CLIP_SEC, ) # ───────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────── SUPPORTED_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".MP4", ".AVI", ".MOV"} # def get_video_files(video_dir: str, max_videos: Optional[int] = None) -> List[str]: # """Return sorted list of video file paths from directory.""" # video_dir = Path(video_dir) # files = [ # str(f) for f in sorted(video_dir.iterdir()) # if f.suffix in SUPPORTED_EXTS # ] # if max_videos is not None: # files = files[:max_videos] # return files def get_video_files(video_dir: str, max_videos: Optional[int] = None) -> List[str]: """Return sorted list of video file paths from directory and all subfolders.""" video_dir = Path(video_dir) files = sorted([ str(f) for f in video_dir.rglob("*") if f.suffix in SUPPORTED_EXTS ]) if max_videos is not None: files = files[:max_videos] return files def get_video_total_frames(video_path: str) -> int: """Return total frame count of a video.""" cap = cv2.VideoCapture(video_path) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() return total def get_clip_boundaries(total_frames: int, clip_frames: int = CLIP_FRAMES) -> List[tuple]: """ Split video into non-overlapping 3-second clips. Returns list of (start_frame, end_frame) tuples. Last partial clip is discarded if < 1.5 seconds. """ boundaries = [] start = 0 while start + clip_frames <= total_frames: boundaries.append((start, start + clip_frames)) start += clip_frames # Include last partial clip only if it's at least half a clip remaining = total_frames - start if remaining >= clip_frames // 2: boundaries.append((start, total_frames)) return boundaries # ───────────────────────────────────────────────────────────── # Per-clip Processing # ───────────────────────────────────────────────────────────── def process_clip(video_path: str, start_frame: int, end_frame: int, clip_id: str) -> Optional[Dict]: """ Process a single clip and return its database entry. Returns None if clip is too short or processing fails. """ try: frames = extract_clip_frames(video_path, start_frame, end_frame) if len(frames) < 10: return None # Run LK optical flow pipeline flow_result = compute_lk_flow_clip(frames) lateral_signals = flow_result["lateral_signals"] forward_signals = flow_result["forward_signals"] if len(lateral_signals) < 5: return None # Encode to 53-dim embedding encoding = encode_trajectory(lateral_signals, forward_signals) start_sec = start_frame / FPS end_sec = end_frame / FPS return { "clip_id" : clip_id, "video_path" : video_path, "video_name" : Path(video_path).name, "start_frame" : start_frame, "end_frame" : end_frame, "start_sec" : round(start_sec, 2), "end_sec" : round(end_sec, 2), "embedding" : encoding["embedding"], "turn_ratio" : encoding["turn_ratio"], "peak_lateral" : encoding["peak_lateral"], "direction" : encoding["direction"], "trajectory_raw" : encoding["trajectory_raw"], "lateral_signals": encoding["lateral_signals"], "n_frames" : encoding["n_frames"], "n_features_avg" : float(np.mean(flow_result["n_features"])) if flow_result["n_features"] else 0.0, } except Exception as e: print(f" [ERROR] clip {clip_id}: {e}") return None # ───────────────────────────────────────────────────────────── # Main Processing Loop # ───────────────────────────────────────────────────────────── def process_video_folder( video_dir: str, output_pkl: str, max_videos: Optional[int] = None, verbose: bool = True, ) -> Dict: """ Process all videos in a folder and save trajectory database to pkl. Returns the database dict. """ video_files = get_video_files(video_dir, max_videos) if not video_files: raise FileNotFoundError(f"No video files found in: {video_dir}") print(f"\n{'='*60}") print(f"Trajectory Extraction Pipeline") print(f"{'='*60}") print(f"Video directory : {video_dir}") print(f"Videos to process: {len(video_files)}") print(f"Clip length : {CLIP_SEC}s ({CLIP_FRAMES} frames)") print(f"Output pkl : {output_pkl}") print(f"{'='*60}\n") database = {} # clip_id → entry dict stats = { "total_clips" : 0, "failed_clips" : 0, "left_clips" : 0, "right_clips" : 0, "straight_clips": 0, } for vid_idx, vpath in enumerate(video_files): vname = Path(vpath).stem total_frames = get_video_total_frames(vpath) clips = get_clip_boundaries(total_frames) print(f"[{vid_idx+1}/{len(video_files)}] {Path(vpath).name}") print(f" Total frames: {total_frames} | Clips: {len(clips)}") for clip_idx, (start_f, end_f) in enumerate( tqdm(clips, desc=f" Clips", leave=False, disable=not verbose) ): clip_id = f"{vname}__clip_{clip_idx:03d}" entry = process_clip(vpath, start_f, end_f, clip_id) if entry is None: stats["failed_clips"] += 1 continue database[clip_id] = entry stats["total_clips"] += 1 stats[f"{entry['direction']}_clips"] += 1 if verbose: tqdm.write( f" [{clip_idx}] {entry['start_sec']:.1f}s–{entry['end_sec']:.1f}s" f" | {entry['direction']:8s}" f" | turn_ratio={entry['turn_ratio']:+.3f}" f" | features≈{entry['n_features_avg']:.0f}" ) print() # ── Save to pkl ─────────────────────────────────────────── os.makedirs(os.path.dirname(os.path.abspath(output_pkl)), exist_ok=True) with open(output_pkl, "wb") as f: pickle.dump(database, f, protocol=pickle.HIGHEST_PROTOCOL) # ── Summary ─────────────────────────────────────────────── print(f"\n{'='*60}") print(f"Processing Complete") print(f"{'='*60}") print(f"Total clips saved : {stats['total_clips']}") print(f"Failed clips : {stats['failed_clips']}") print(f"Left turns : {stats['left_clips']}") print(f"Right turns : {stats['right_clips']}") print(f"Straight : {stats['straight_clips']}") print(f"Saved to : {output_pkl}") print(f"{'='*60}\n") return database # ───────────────────────────────────────────────────────────── # CLI Entry Point # ───────────────────────────────────────────────────────────── if __name__ == "__main__": parser = argparse.ArgumentParser(description="Trajectory extraction pipeline") parser.add_argument( "--video_dir", type=str, default="/media/RTCIN15TBD/AllUsers/sjl3kor/trajectory_retrieval_system/data/videos", help="Path to folder containing dashcam videos", ) parser.add_argument( "--output_pkl", type=str, default="trajectory_db.pkl", help="Output pickle file path", ) parser.add_argument( "--max_videos", type=int, default=10, help="Max number of videos to process (use None for all)", ) args = parser.parse_args() process_video_folder( video_dir = args.video_dir, output_pkl = args.output_pkl, max_videos = args.max_videos, )