Spaces:
Sleeping
Sleeping
File size: 6,322 Bytes
e1832f4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
import cv2
import numpy as np
import copy
import time
from boxmot.motion.cmc.base_cmc import BaseCMC
class SOF(BaseCMC):
"""
Sparse Optical Flow (SOF) tracker for estimating a 2x3 warp (affine transformation)
between consecutive frames. This class is modeled after a GMC implementation using
the 'sparseOptFlow' method.
"""
def __init__(self, scale=0.1):
"""
Initialize the SOF object.
Parameters
----------
downscale : int, optional
Factor by which to downscale the input frames. Defaults to 1 (no downscale).
feature_params : dict, optional
Parameters for cv2.goodFeaturesToTrack. Defaults to:
{
maxCorners: 1000,
qualityLevel: 0.01,
minDistance: 1,
blockSize: 3,
useHarrisDetector: False,
k: 0.04
}
lk_params : dict, optional
Lucas-Kanade optical flow parameters. Defaults to:
{
winSize: (21, 21),
maxLevel: 3,
criteria: (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
}
"""
self.scale = scale
self.grayscale = True
# Set default feature detection parameters if not provided
self.feature_params = dict(
maxCorners=1000,
qualityLevel=0.01,
minDistance=1,
blockSize=3,
useHarrisDetector=False,
k=0.04
)
# Set default Lucas-Kanade optical flow parameters if not provided.
self.lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
self.prevFrame = None
self.prevKeyPoints = None
self.initializedFirstFrame = False
def apply(self, img, detections=None):
"""
Apply sparse optical flow tracking to estimate a warp (affine transformation)
between the previous frame and the current raw frame.
Parameters
----------
raw_frame : np.ndarray
The current input color image.
detections : Any, optional
(Not used here but provided for API compatibility.)
Returns
-------
np.ndarray
The estimated 2x3 warp matrix. If estimation fails, returns an identity matrix.
"""
# Convert the raw frame to grayscale.
frame_gray = self.preprocess(img)
height, width = frame_gray.shape
# Default transformation: identity.
H = np.eye(2, 3, dtype=np.float32)
# On the first frame, detect keypoints and initialize internal state.
if not self.initializedFirstFrame:
keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params)
if keypoints is None:
return H
# Optional subpixel refinement.
term_crit = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
cv2.cornerSubPix(frame_gray, keypoints, winSize=(5, 5), zeroZone=(-1, -1), criteria=term_crit)
self.prevFrame = frame_gray.copy()
self.prevKeyPoints = keypoints.copy()
self.initializedFirstFrame = True
return H
# Compute optical flow to track the previous keypoints into the current frame.
nextKeypoints, status, err = cv2.calcOpticalFlowPyrLK(
self.prevFrame, frame_gray, self.prevKeyPoints, None, **self.lk_params
)
# Filter out points that were not successfully tracked.
valid_prev = []
valid_next = []
for i, s in enumerate(status):
if s:
valid_prev.append(self.prevKeyPoints[i])
valid_next.append(nextKeypoints[i])
if len(valid_prev) < 4:
print("Warning: not enough matching points detected; redetecting keypoints.")
# If too few matches, re-detect keypoints for the current frame.
keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params)
self.prevFrame = frame_gray.copy()
self.prevKeyPoints = keypoints if keypoints is not None else np.array([])
return H
valid_prev = np.array(valid_prev)
valid_next = np.array(valid_next)
# Estimate the affine warp matrix using robust RANSAC.
H_est, inliers = cv2.estimateAffinePartial2D(valid_prev, valid_next, method=cv2.RANSAC)
if H_est is None:
H_est = H
else:
# If the frame was downscaled, adjust the translation parameters back to original scale.
if self.scale < 1:
H_est[0, 2] /= self.scale
H_est[1, 2] /= self.scale
# Update the previous frame and keypoints for the next iteration.
# Optionally, you might want to re-detect keypoints rather than simply tracking them.
new_keypoints = cv2.goodFeaturesToTrack(frame_gray, mask=None, **self.feature_params)
if new_keypoints is None:
# Use the tracked keypoints if new detection fails.
new_keypoints = valid_next
self.prevFrame = frame_gray.copy()
self.prevKeyPoints = new_keypoints.copy()
return H_est
# ==============================================================================
# Example Usage
# ==============================================================================
def main():
# Create an instance of the SOF class with a downscaling factor, if desired.
sof_tracker = SOF(scale=0.3)
# For example purposes, load two consecutive frames.
prev_img = cv2.imread("assets/MOT17-mini/train/MOT17-13-FRCNN/img1/000001.jpg")
curr_img = cv2.imread("assets/MOT17-mini/train/MOT17-13-FRCNN/img1/000005.jpg")
# Process the first frame to initialize the tracker.
_ = sof_tracker.apply(prev_img)
# Now process the next frame to compute the warp matrix.
H = sof_tracker.apply(curr_img)
print("Estimated warp matrix:\n", H)
# Optionally, you can visualize the transformation (overlay, etc.)
if __name__ == "__main__":
main()
|