usiddiquee
hi
e1832f4
import cv2
import numpy as np
import copy
import time
from boxmot.motion.cmc.base_cmc import BaseCMC
class SOF(BaseCMC):
"""
Sparse Optical Flow (SOF) tracker for estimating a 2x3 warp (affine transformation)
between consecutive frames. This class is modeled after a GMC implementation using
the 'sparseOptFlow' method.
"""
def __init__(self, scale=0.1):
"""
Initialize the SOF object.
Parameters
----------
downscale : int, optional
Factor by which to downscale the input frames. Defaults to 1 (no downscale).
feature_params : dict, optional
Parameters for cv2.goodFeaturesToTrack. Defaults to:
{
maxCorners: 1000,
qualityLevel: 0.01,
minDistance: 1,
blockSize: 3,
useHarrisDetector: False,
k: 0.04
}
lk_params : dict, optional
Lucas-Kanade optical flow parameters. Defaults to:
{
winSize: (21, 21),
maxLevel: 3,
criteria: (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
}
"""
self.scale = scale
self.grayscale = True
# Set default feature detection parameters if not provided
self.feature_params = dict(
maxCorners=1000,
qualityLevel=0.01,
minDistance=1,
blockSize=3,
useHarrisDetector=False,
k=0.04
)
# Set default Lucas-Kanade optical flow parameters if not provided.
self.lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
self.prevFrame = None
self.prevKeyPoints = None
self.initializedFirstFrame = False
def apply(self, img, detections=None):
"""
Apply sparse optical flow tracking to estimate a warp (affine transformation)
between the previous frame and the current raw frame.
Parameters
----------
raw_frame : np.ndarray
The current input color image.
detections : Any, optional
(Not used here but provided for API compatibility.)
Returns
-------
np.ndarray
The estimated 2x3 warp matrix. If estimation fails, returns an identity matrix.
"""
# Convert the raw frame to grayscale.
frame_gray = self.preprocess(img)
height, width = frame_gray.shape
# Default transformation: identity.
H = np.eye(2, 3, dtype=np.float32)
# On the first frame, detect keypoints and initialize internal state.
if not self.initializedFirstFrame:
keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params)
if keypoints is None:
return H
# Optional subpixel refinement.
term_crit = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
cv2.cornerSubPix(frame_gray, keypoints, winSize=(5, 5), zeroZone=(-1, -1), criteria=term_crit)
self.prevFrame = frame_gray.copy()
self.prevKeyPoints = keypoints.copy()
self.initializedFirstFrame = True
return H
# Compute optical flow to track the previous keypoints into the current frame.
nextKeypoints, status, err = cv2.calcOpticalFlowPyrLK(
self.prevFrame, frame_gray, self.prevKeyPoints, None, **self.lk_params
)
# Filter out points that were not successfully tracked.
valid_prev = []
valid_next = []
for i, s in enumerate(status):
if s:
valid_prev.append(self.prevKeyPoints[i])
valid_next.append(nextKeypoints[i])
if len(valid_prev) < 4:
print("Warning: not enough matching points detected; redetecting keypoints.")
# If too few matches, re-detect keypoints for the current frame.
keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params)
self.prevFrame = frame_gray.copy()
self.prevKeyPoints = keypoints if keypoints is not None else np.array([])
return H
valid_prev = np.array(valid_prev)
valid_next = np.array(valid_next)
# Estimate the affine warp matrix using robust RANSAC.
H_est, inliers = cv2.estimateAffinePartial2D(valid_prev, valid_next, method=cv2.RANSAC)
if H_est is None:
H_est = H
else:
# If the frame was downscaled, adjust the translation parameters back to original scale.
if self.scale < 1:
H_est[0, 2] /= self.scale
H_est[1, 2] /= self.scale
# Update the previous frame and keypoints for the next iteration.
# Optionally, you might want to re-detect keypoints rather than simply tracking them.
new_keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params)
if new_keypoints is None:
# Use the tracked keypoints if new detection fails.
new_keypoints = valid_next
self.prevFrame = frame_gray.copy()
self.prevKeyPoints = new_keypoints.copy()
return H_est
# ==============================================================================
# Example Usage
# ==============================================================================
def main():
# Create an instance of the SOF class with a downscaling factor, if desired.
sof_tracker = SOF(scale=0.3)
# For example purposes, load two consecutive frames.
prev_img = cv2.imread("assets/MOT17-mini/train/MOT17-13-FRCNN/img1/000001.jpg")
curr_img = cv2.imread("assets/MOT17-mini/train/MOT17-13-FRCNN/img1/000005.jpg")
# Process the first frame to initialize the tracker.
_ = sof_tracker.apply(prev_img)
# Now process the next frame to compute the warp matrix.
H = sof_tracker.apply(curr_img)
print("Estimated warp matrix:\n", H)
# Optionally, you can visualize the transformation (overlay, etc.)
if __name__ == "__main__":
main()