Spaces:
Sleeping
Sleeping
| """ | |
| video_processor.py | |
| ================== | |
| Processes a folder of dashcam videos: | |
| 1. Splits each 20-second video into 3-second clips | |
| 2. Runs the LK optical flow pipeline on each clip | |
| 3. Encodes each clip as a 53-dim trajectory embedding | |
| 4. Saves everything to a .pkl file | |
| Usage: | |
| python video_processor.py \ | |
| --video_dir /path/to/videos \ | |
| --output_pkl trajectory_db.pkl \ | |
| --max_videos 10 | |
| """ | |
| import os | |
| import cv2 | |
| import pickle | |
| import argparse | |
| import numpy as np | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| from typing import Dict, List, Optional | |
| from trajectory_extractor import ( | |
| extract_clip_frames, | |
| compute_lk_flow_clip, | |
| encode_trajectory, | |
| CLIP_FRAMES, | |
| FPS, | |
| CLIP_SEC, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SUPPORTED_EXTS = {".mp4", ".avi", ".mov", ".mkv", ".MP4", ".AVI", ".MOV"} | |
| # def get_video_files(video_dir: str, max_videos: Optional[int] = None) -> List[str]: | |
| # """Return sorted list of video file paths from directory.""" | |
| # video_dir = Path(video_dir) | |
| # files = [ | |
| # str(f) for f in sorted(video_dir.iterdir()) | |
| # if f.suffix in SUPPORTED_EXTS | |
| # ] | |
| # if max_videos is not None: | |
| # files = files[:max_videos] | |
| # return files | |
| def get_video_files(video_dir: str, max_videos: Optional[int] = None) -> List[str]: | |
| """Return sorted list of video file paths from directory and all subfolders.""" | |
| video_dir = Path(video_dir) | |
| files = sorted([ | |
| str(f) for f in video_dir.rglob("*") | |
| if f.suffix in SUPPORTED_EXTS | |
| ]) | |
| if max_videos is not None: | |
| files = files[:max_videos] | |
| return files | |
| def get_video_total_frames(video_path: str) -> int: | |
| """Return total frame count of a video.""" | |
| cap = cv2.VideoCapture(video_path) | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| cap.release() | |
| return total | |
| def get_clip_boundaries(total_frames: int, clip_frames: int = CLIP_FRAMES) -> List[tuple]: | |
| """ | |
| Split video into non-overlapping 3-second clips. | |
| Returns list of (start_frame, end_frame) tuples. | |
| Last partial clip is discarded if < 1.5 seconds. | |
| """ | |
| boundaries = [] | |
| start = 0 | |
| while start + clip_frames <= total_frames: | |
| boundaries.append((start, start + clip_frames)) | |
| start += clip_frames | |
| # Include last partial clip only if it's at least half a clip | |
| remaining = total_frames - start | |
| if remaining >= clip_frames // 2: | |
| boundaries.append((start, total_frames)) | |
| return boundaries | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Per-clip Processing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_clip(video_path: str, start_frame: int, end_frame: int, clip_id: str) -> Optional[Dict]: | |
| """ | |
| Process a single clip and return its database entry. | |
| Returns None if clip is too short or processing fails. | |
| """ | |
| try: | |
| frames = extract_clip_frames(video_path, start_frame, end_frame) | |
| if len(frames) < 10: | |
| return None | |
| # Run LK optical flow pipeline | |
| flow_result = compute_lk_flow_clip(frames) | |
| lateral_signals = flow_result["lateral_signals"] | |
| forward_signals = flow_result["forward_signals"] | |
| if len(lateral_signals) < 5: | |
| return None | |
| # Encode to 53-dim embedding | |
| encoding = encode_trajectory(lateral_signals, forward_signals) | |
| start_sec = start_frame / FPS | |
| end_sec = end_frame / FPS | |
| return { | |
| "clip_id" : clip_id, | |
| "video_path" : video_path, | |
| "video_name" : Path(video_path).name, | |
| "start_frame" : start_frame, | |
| "end_frame" : end_frame, | |
| "start_sec" : round(start_sec, 2), | |
| "end_sec" : round(end_sec, 2), | |
| "embedding" : encoding["embedding"], | |
| "turn_ratio" : encoding["turn_ratio"], | |
| "peak_lateral" : encoding["peak_lateral"], | |
| "direction" : encoding["direction"], | |
| "trajectory_raw" : encoding["trajectory_raw"], | |
| "lateral_signals": encoding["lateral_signals"], | |
| "n_frames" : encoding["n_frames"], | |
| "n_features_avg" : float(np.mean(flow_result["n_features"])) if flow_result["n_features"] else 0.0, | |
| } | |
| except Exception as e: | |
| print(f" [ERROR] clip {clip_id}: {e}") | |
| return None | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main Processing Loop | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_video_folder( | |
| video_dir: str, | |
| output_pkl: str, | |
| max_videos: Optional[int] = None, | |
| verbose: bool = True, | |
| ) -> Dict: | |
| """ | |
| Process all videos in a folder and save trajectory database to pkl. | |
| Returns the database dict. | |
| """ | |
| video_files = get_video_files(video_dir, max_videos) | |
| if not video_files: | |
| raise FileNotFoundError(f"No video files found in: {video_dir}") | |
| print(f"\n{'='*60}") | |
| print(f"Trajectory Extraction Pipeline") | |
| print(f"{'='*60}") | |
| print(f"Video directory : {video_dir}") | |
| print(f"Videos to process: {len(video_files)}") | |
| print(f"Clip length : {CLIP_SEC}s ({CLIP_FRAMES} frames)") | |
| print(f"Output pkl : {output_pkl}") | |
| print(f"{'='*60}\n") | |
| database = {} # clip_id β entry dict | |
| stats = { | |
| "total_clips" : 0, | |
| "failed_clips" : 0, | |
| "left_clips" : 0, | |
| "right_clips" : 0, | |
| "straight_clips": 0, | |
| } | |
| for vid_idx, vpath in enumerate(video_files): | |
| vname = Path(vpath).stem | |
| total_frames = get_video_total_frames(vpath) | |
| clips = get_clip_boundaries(total_frames) | |
| print(f"[{vid_idx+1}/{len(video_files)}] {Path(vpath).name}") | |
| print(f" Total frames: {total_frames} | Clips: {len(clips)}") | |
| for clip_idx, (start_f, end_f) in enumerate( | |
| tqdm(clips, desc=f" Clips", leave=False, disable=not verbose) | |
| ): | |
| clip_id = f"{vname}__clip_{clip_idx:03d}" | |
| entry = process_clip(vpath, start_f, end_f, clip_id) | |
| if entry is None: | |
| stats["failed_clips"] += 1 | |
| continue | |
| database[clip_id] = entry | |
| stats["total_clips"] += 1 | |
| stats[f"{entry['direction']}_clips"] += 1 | |
| if verbose: | |
| tqdm.write( | |
| f" [{clip_idx}] {entry['start_sec']:.1f}sβ{entry['end_sec']:.1f}s" | |
| f" | {entry['direction']:8s}" | |
| f" | turn_ratio={entry['turn_ratio']:+.3f}" | |
| f" | featuresβ{entry['n_features_avg']:.0f}" | |
| ) | |
| print() | |
| # ββ Save to pkl βββββββββββββββββββββββββββββββββββββββββββ | |
| os.makedirs(os.path.dirname(os.path.abspath(output_pkl)), exist_ok=True) | |
| with open(output_pkl, "wb") as f: | |
| pickle.dump(database, f, protocol=pickle.HIGHEST_PROTOCOL) | |
| # ββ Summary βββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{'='*60}") | |
| print(f"Processing Complete") | |
| print(f"{'='*60}") | |
| print(f"Total clips saved : {stats['total_clips']}") | |
| print(f"Failed clips : {stats['failed_clips']}") | |
| print(f"Left turns : {stats['left_clips']}") | |
| print(f"Right turns : {stats['right_clips']}") | |
| print(f"Straight : {stats['straight_clips']}") | |
| print(f"Saved to : {output_pkl}") | |
| print(f"{'='*60}\n") | |
| return database | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI Entry Point | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Trajectory extraction pipeline") | |
| parser.add_argument( | |
| "--video_dir", | |
| type=str, | |
| default="/media/RTCIN15TBD/AllUsers/sjl3kor/trajectory_retrieval_system/data/videos", | |
| help="Path to folder containing dashcam videos", | |
| ) | |
| parser.add_argument( | |
| "--output_pkl", | |
| type=str, | |
| default="trajectory_db.pkl", | |
| help="Output pickle file path", | |
| ) | |
| parser.add_argument( | |
| "--max_videos", | |
| type=int, | |
| default=10, | |
| help="Max number of videos to process (use None for all)", | |
| ) | |
| args = parser.parse_args() | |
| process_video_folder( | |
| video_dir = args.video_dir, | |
| output_pkl = args.output_pkl, | |
| max_videos = args.max_videos, | |
| ) | |