import cv2 import numpy as np import copy import time from boxmot.motion.cmc.base_cmc import BaseCMC class SOF(BaseCMC): """ Sparse Optical Flow (SOF) tracker for estimating a 2x3 warp (affine transformation) between consecutive frames. This class is modeled after a GMC implementation using the 'sparseOptFlow' method. """ def __init__(self, scale=0.1): """ Initialize the SOF object. Parameters ---------- downscale : int, optional Factor by which to downscale the input frames. Defaults to 1 (no downscale). feature_params : dict, optional Parameters for cv2.goodFeaturesToTrack. Defaults to: { maxCorners: 1000, qualityLevel: 0.01, minDistance: 1, blockSize: 3, useHarrisDetector: False, k: 0.04 } lk_params : dict, optional Lucas-Kanade optical flow parameters. Defaults to: { winSize: (21, 21), maxLevel: 3, criteria: (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) } """ self.scale = scale self.grayscale = True # Set default feature detection parameters if not provided self.feature_params = dict( maxCorners=1000, qualityLevel=0.01, minDistance=1, blockSize=3, useHarrisDetector=False, k=0.04 ) # Set default Lucas-Kanade optical flow parameters if not provided. self.lk_params = dict( winSize=(21, 21), maxLevel=3, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) ) self.prevFrame = None self.prevKeyPoints = None self.initializedFirstFrame = False def apply(self, img, detections=None): """ Apply sparse optical flow tracking to estimate a warp (affine transformation) between the previous frame and the current raw frame. Parameters ---------- raw_frame : np.ndarray The current input color image. detections : Any, optional (Not used here but provided for API compatibility.) Returns ------- np.ndarray The estimated 2x3 warp matrix. If estimation fails, returns an identity matrix. """ # Convert the raw frame to grayscale. frame_gray = self.preprocess(img) height, width = frame_gray.shape # Default transformation: identity. H = np.eye(2, 3, dtype=np.float32) # On the first frame, detect keypoints and initialize internal state. if not self.initializedFirstFrame: keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params) if keypoints is None: return H # Optional subpixel refinement. term_crit = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) cv2.cornerSubPix(frame_gray, keypoints, winSize=(5, 5), zeroZone=(-1, -1), criteria=term_crit) self.prevFrame = frame_gray.copy() self.prevKeyPoints = keypoints.copy() self.initializedFirstFrame = True return H # Compute optical flow to track the previous keypoints into the current frame. nextKeypoints, status, err = cv2.calcOpticalFlowPyrLK( self.prevFrame, frame_gray, self.prevKeyPoints, None, **self.lk_params ) # Filter out points that were not successfully tracked. valid_prev = [] valid_next = [] for i, s in enumerate(status): if s: valid_prev.append(self.prevKeyPoints[i]) valid_next.append(nextKeypoints[i]) if len(valid_prev) < 4: print("Warning: not enough matching points detected; redetecting keypoints.") # If too few matches, re-detect keypoints for the current frame. keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params) self.prevFrame = frame_gray.copy() self.prevKeyPoints = keypoints if keypoints is not None else np.array([]) return H valid_prev = np.array(valid_prev) valid_next = np.array(valid_next) # Estimate the affine warp matrix using robust RANSAC. H_est, inliers = cv2.estimateAffinePartial2D(valid_prev, valid_next, method=cv2.RANSAC) if H_est is None: H_est = H else: # If the frame was downscaled, adjust the translation parameters back to original scale. if self.scale < 1: H_est[0, 2] /= self.scale H_est[1, 2] /= self.scale # Update the previous frame and keypoints for the next iteration. # Optionally, you might want to re-detect keypoints rather than simply tracking them. new_keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params) if new_keypoints is None: # Use the tracked keypoints if new detection fails. new_keypoints = valid_next self.prevFrame = frame_gray.copy() self.prevKeyPoints = new_keypoints.copy() return H_est # ============================================================================== # Example Usage # ============================================================================== def main(): # Create an instance of the SOF class with a downscaling factor, if desired. sof_tracker = SOF(scale=0.3) # For example purposes, load two consecutive frames. prev_img = cv2.imread("assets/MOT17-mini/train/MOT17-13-FRCNN/img1/000001.jpg") curr_img = cv2.imread("assets/MOT17-mini/train/MOT17-13-FRCNN/img1/000005.jpg") # Process the first frame to initialize the tracker. _ = sof_tracker.apply(prev_img) # Now process the next frame to compute the warp matrix. H = sof_tracker.apply(curr_img) print("Estimated warp matrix:\n", H) # Optionally, you can visualize the transformation (overlay, etc.) if __name__ == "__main__": main()