|
|
import os |
|
|
import cv2 |
|
|
import joblib |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
|
|
|
class CameraMotionVisualizer: |
|
|
@staticmethod |
|
|
def draw_motion_grid(frame, transform_matrix, grid_size=32): |
|
|
if transform_matrix is None: |
|
|
return frame |
|
|
|
|
|
h, w = frame.shape[:2] |
|
|
for y in range(0, h, grid_size): |
|
|
for x in range(0, w, grid_size): |
|
|
start = np.array([x, y, 1]) |
|
|
end = np.dot(transform_matrix, start) |
|
|
if abs(end[0] - x) > 1 or abs(end[1] - y) > 1: |
|
|
cv2.arrowedLine( |
|
|
frame, |
|
|
(int(x), int(y)), |
|
|
(int(end[0]), int(end[1])), |
|
|
(0, 255, 0), |
|
|
1, |
|
|
tipLength=0.2 |
|
|
) |
|
|
return frame |
|
|
|
|
|
|
|
|
class SlidingWindowRefiner: |
|
|
def __init__(self): |
|
|
self.sift = cv2.SIFT_create(nfeatures=2000) |
|
|
|
|
|
FLANN_INDEX_KDTREE = 1 |
|
|
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5) |
|
|
search_params = dict(checks=50) |
|
|
self.flann = cv2.FlannBasedMatcher(index_params, search_params) |
|
|
|
|
|
self.scale_levels = 3 |
|
|
self.scale_step = 1.2 |
|
|
self.scale_factor = 2.0 |
|
|
self.overlap = 0.3 |
|
|
|
|
|
self.template = None |
|
|
self.template_kp = None |
|
|
self.template_desc = None |
|
|
|
|
|
def initialize_template(self, gray, bbox): |
|
|
x, y, w, h = map(int, bbox) |
|
|
self.template = gray[y:y+h, x:x+w].copy() |
|
|
self.template_kp, self.template_desc = self.sift.detectAndCompute( |
|
|
self.template, None |
|
|
) |
|
|
|
|
|
def generate_windows(self, img_shape, prev_bbox, transform_matrix=None): |
|
|
x, y, w, h = map(int, prev_bbox) |
|
|
|
|
|
if transform_matrix is not None: |
|
|
center = np.array([[x + w/2, y + h/2, 1]]).T |
|
|
transformed = np.dot(transform_matrix, center) |
|
|
x = int(transformed[0] - w/2) |
|
|
y = int(transformed[1] - h/2) |
|
|
|
|
|
windows = [] |
|
|
for scale in np.linspace(1/self.scale_step, self.scale_step, self.scale_levels): |
|
|
ww = int(w * self.scale_factor * scale) |
|
|
hh = int(h * self.scale_factor * scale) |
|
|
|
|
|
step_x = int(ww * (1 - self.overlap)) |
|
|
step_y = int(hh * (1 - self.overlap)) |
|
|
|
|
|
cx, cy = x + w // 2, y + h // 2 |
|
|
|
|
|
for dy in range(-step_y, step_y + 1, max(1, step_y // 2)): |
|
|
for dx in range(-step_x, step_x + 1, max(1, step_x // 2)): |
|
|
wx = max(0, min(cx - ww // 2 + dx, img_shape[1] - ww)) |
|
|
wy = max(0, min(cy - hh // 2 + dy, img_shape[0] - hh)) |
|
|
windows.append((wx, wy, ww, hh)) |
|
|
return windows |
|
|
|
|
|
def score_window(self, gray, window): |
|
|
if self.template_desc is None: |
|
|
return 0 |
|
|
|
|
|
x, y, w, h = map(int, window) |
|
|
roi = gray[y:y+h, x:x+w] |
|
|
|
|
|
if roi.shape[0] < 20 or roi.shape[1] < 20: |
|
|
return 0 |
|
|
|
|
|
roi = cv2.resize(roi, self.template.shape[::-1]) |
|
|
kp, desc = self.sift.detectAndCompute(roi, None) |
|
|
|
|
|
if desc is None or len(desc) < 2: |
|
|
return 0 |
|
|
|
|
|
if len(self.template_desc) < 2: |
|
|
return 0 |
|
|
|
|
|
matches = self.flann.knnMatch(self.template_desc, desc, k=2) |
|
|
|
|
|
good = [] |
|
|
for match_pair in matches: |
|
|
if len(match_pair) == 2: |
|
|
m, n = match_pair |
|
|
if m.distance < 0.7 * n.distance: |
|
|
good.append(m) |
|
|
|
|
|
if not good: |
|
|
return 0 |
|
|
|
|
|
avg_dist = np.mean([m.distance for m in good]) |
|
|
return len(good) * (1 - avg_dist / 512) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ObjectTrackerInference: |
|
|
def __init__(self, model_dir='models'): |
|
|
self.model_dir = model_dir |
|
|
|
|
|
print("Loading pre-trained models...") |
|
|
self.position_model = joblib.load(os.path.join(model_dir, 'position_model.joblib')) |
|
|
self.size_model = joblib.load(os.path.join(model_dir, 'size_model.joblib')) |
|
|
self.position_scaler = joblib.load(os.path.join(model_dir, 'position_scaler.joblib')) |
|
|
self.size_scaler = joblib.load(os.path.join(model_dir, 'size_scaler.joblib')) |
|
|
print("Models loaded successfully!") |
|
|
|
|
|
self.sift = cv2.SIFT_create(nfeatures=2000) |
|
|
|
|
|
self.orb = cv2.ORB_create(nfeatures=1000) |
|
|
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) |
|
|
self.prev_frame = None |
|
|
self.prev_kp = None |
|
|
self.prev_desc = None |
|
|
|
|
|
|
|
|
self.window_refiner = SlidingWindowRefiner() |
|
|
self.template_initialized = False |
|
|
|
|
|
def estimate_camera_motion(self, frame): |
|
|
if frame is None: |
|
|
return np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
kp, desc = self.orb.detectAndCompute(gray, None) |
|
|
|
|
|
if self.prev_frame is None: |
|
|
self.prev_frame = gray |
|
|
self.prev_kp = kp |
|
|
self.prev_desc = desc |
|
|
return np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
if desc is None or self.prev_desc is None or len(desc) < 4 or len(self.prev_desc) < 4: |
|
|
return np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
matches = self.matcher.match(self.prev_desc, desc) |
|
|
|
|
|
if len(matches) < 4: |
|
|
return np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
matches = sorted(matches, key=lambda x: x.distance) |
|
|
good_matches = matches[:min(len(matches), 50)] |
|
|
|
|
|
src_pts = np.float32([self.prev_kp[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2) |
|
|
dst_pts = np.float32([kp[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2) |
|
|
|
|
|
transform_matrix, _ = cv2.estimateAffinePartial2D(src_pts, dst_pts) |
|
|
|
|
|
if transform_matrix is None: |
|
|
transform_matrix = np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
self.prev_frame = gray |
|
|
self.prev_kp = kp |
|
|
self.prev_desc = desc |
|
|
|
|
|
return transform_matrix |
|
|
|
|
|
def local_binary_pattern(self, image, n_points=8, radius=1): |
|
|
rows, cols = image.shape |
|
|
output = np.zeros((rows, cols)) |
|
|
|
|
|
for i in range(radius, rows-radius): |
|
|
for j in range(radius, cols-radius): |
|
|
center = image[i, j] |
|
|
pattern = 0 |
|
|
|
|
|
for k in range(n_points): |
|
|
angle = 2 * np.pi * k / n_points |
|
|
x = j + radius * np.cos(angle) |
|
|
y = i - radius * np.sin(angle) |
|
|
x1, x2 = int(np.floor(x)), int(np.ceil(x)) |
|
|
y1, y2 = int(np.floor(y)), int(np.ceil(y)) |
|
|
|
|
|
f11 = image[y1, x1] |
|
|
f12 = image[y1, x2] |
|
|
f21 = image[y2, x1] |
|
|
f22 = image[y2, x2] |
|
|
|
|
|
x_weight = x - x1 |
|
|
y_weight = y - y1 |
|
|
|
|
|
pixel_value = (f11 * (1-x_weight) * (1-y_weight) + |
|
|
f21 * (1-x_weight) * y_weight + |
|
|
f12 * x_weight * (1-y_weight) + |
|
|
f22 * x_weight * y_weight) |
|
|
|
|
|
pattern |= (pixel_value > center) << k |
|
|
|
|
|
output[i, j] = pattern |
|
|
|
|
|
return output |
|
|
|
|
|
def extract_features(self, frame, bbox, transform_matrix=None): |
|
|
if frame is None: |
|
|
return None |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
x, y, w, h = map(int, bbox) |
|
|
|
|
|
x = max(0, min(x, gray.shape[1] - w)) |
|
|
y = max(0, min(y, gray.shape[0] - h)) |
|
|
w = min(w, gray.shape[1] - x) |
|
|
h = min(h, gray.shape[0] - y) |
|
|
|
|
|
roi = gray[y:y+h, x:x+w] |
|
|
if roi.size == 0: |
|
|
roi = gray |
|
|
|
|
|
roi = cv2.resize(roi, (64, 64)) |
|
|
|
|
|
features = [] |
|
|
|
|
|
hog = cv2.HOGDescriptor((64,64), (16,16), (8,8), (8,8), 9) |
|
|
hog_features = hog.compute(roi) |
|
|
features.extend(hog_features.flatten()[:64]) |
|
|
|
|
|
lbp = self.local_binary_pattern(roi, n_points=8, radius=1) |
|
|
features.extend([ |
|
|
np.mean(lbp), |
|
|
np.std(lbp), |
|
|
*np.percentile(lbp, [25, 50, 75]) |
|
|
]) |
|
|
|
|
|
if transform_matrix is not None: |
|
|
features.extend([ |
|
|
transform_matrix[0,0], |
|
|
transform_matrix[1,1], |
|
|
transform_matrix[0,2], |
|
|
transform_matrix[1,2] |
|
|
]) |
|
|
else: |
|
|
features.extend([1, 1, 0, 0]) |
|
|
|
|
|
features.extend([x, y, w, h]) |
|
|
|
|
|
return np.array(features).reshape(1, -1) |
|
|
|
|
|
def predict_bbox(self, features): |
|
|
features_position = self.position_scaler.transform(features) |
|
|
features_size = self.size_scaler.transform(features) |
|
|
|
|
|
position_pred = self.position_model.predict(features_position) |
|
|
size_pred = self.size_model.predict(features_size) |
|
|
|
|
|
bbox = np.hstack([position_pred, size_pred])[0] |
|
|
|
|
|
return bbox |
|
|
|
|
|
def track_video(self, video_path, initial_bbox, output_path='output_tracked.mp4', fps=30): |
|
|
print(f"Processing video: {video_path}") |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Could not open video: {video_path}") |
|
|
|
|
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
print(f"Video: {frame_width}x{frame_height}, {total_frames} frames") |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) |
|
|
|
|
|
self.prev_frame = None |
|
|
self.prev_kp = None |
|
|
self.prev_desc = None |
|
|
|
|
|
current_bbox = initial_bbox |
|
|
frame_idx = 0 |
|
|
|
|
|
print("Tracking object...") |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
transform_matrix = self.estimate_camera_motion(frame) |
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if not self.template_initialized: |
|
|
self.window_refiner.initialize_template(gray, current_bbox) |
|
|
self.template_initialized = True |
|
|
|
|
|
windows = self.window_refiner.generate_windows( |
|
|
frame.shape, current_bbox, transform_matrix |
|
|
) |
|
|
|
|
|
best_score = -1 |
|
|
best_window = None |
|
|
|
|
|
for win in windows: |
|
|
score = self.window_refiner.score_window(gray, win) |
|
|
xw, yw, ww, hh = map(int, win) |
|
|
cv2.rectangle(frame, (xw, yw), (xw+ww, yw+hh), (0, 255, 255), 1) |
|
|
if score > best_score: |
|
|
best_score = score |
|
|
best_window = win |
|
|
|
|
|
if best_window is not None: |
|
|
current_bbox = best_window |
|
|
|
|
|
features = self.extract_features(frame, current_bbox, transform_matrix) |
|
|
if features is not None: |
|
|
current_bbox = self.predict_bbox(features) |
|
|
|
|
|
frame = CameraMotionVisualizer.draw_motion_grid(frame, transform_matrix) |
|
|
|
|
|
x, y, w, h = map(int, current_bbox) |
|
|
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2) |
|
|
cv2.putText(frame, f'Frame: {frame_idx}', (10, 30), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) |
|
|
|
|
|
out.write(frame) |
|
|
frame_idx += 1 |
|
|
|
|
|
if frame_idx % 30 == 0: |
|
|
print(f"Processed {frame_idx}/{total_frames} frames") |
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
print(f"Tracking complete! Video saved to: {output_path}") |
|
|
return output_path |
|
|
|