usiddiquee
hi
e1832f4
from pathlib import Path
import numpy as np
import argparse
from sklearn.gaussian_process import GaussianProcessRegressor as GPR
from sklearn.gaussian_process.kernels import RBF
from boxmot.utils import logger as LOGGER
import concurrent.futures
from tqdm import tqdm
def linear_interpolation(data: np.ndarray, interval: int) -> np.ndarray:
"""
Apply linear interpolation between rows in the tracking results.
The function assumes the first two columns of `data` represent frame number and object ID.
Interpolated rows are added when consecutive rows for the same ID have a gap of more than 1
frame but less than the specified interval.
Parameters:
data (np.ndarray): Input tracking results.
interval (int): Maximum gap to perform interpolation.
Returns:
np.ndarray: Tracking results with interpolated rows included.
"""
# Sort data by frame and then by ID
sorted_data = data[np.lexsort((data[:, 0], data[:, 1]))]
result_rows = []
previous_id = None
previous_frame = None
previous_row = None
for row in sorted_data:
current_frame, current_id = int(row[0]), int(row[1])
if previous_id is not None and current_id == previous_id and previous_frame + 1 < current_frame < previous_frame + interval:
gap = current_frame - previous_frame - 1
for i in range(1, gap + 1):
# Linear interpolation for each missing frame
new_row = previous_row + (row - previous_row) * (i / (current_frame - previous_frame))
result_rows.append(new_row)
result_rows.append(row)
previous_id, previous_frame, previous_row = current_id, current_frame, row
result_array = np.array(result_rows)
# Resort the array
return result_array[np.lexsort((result_array[:, 0], result_array[:, 1]))]
def gaussian_smooth(data: np.ndarray, tau: float) -> np.ndarray:
"""
Apply Gaussian process smoothing to specified columns in the tracking results.
For each unique object ID in the data, this function smooths columns 2 through 5 using
a Gaussian Process with an RBF kernel. Additional columns (columns 6 and 7) and a constant
value (-1) are appended to each row.
Parameters:
data (np.ndarray): Tracking results.
tau (float): Smoothing parameter.
Returns:
np.ndarray: Tracking results with smoothed columns.
"""
smoothed_output = []
unique_ids = np.unique(data[:, 1])
for obj_id in unique_ids:
tracks = data[data[:, 1] == obj_id]
num_tracks = len(tracks)
# Determine length scale using logarithmic scaling with clipping
length_scale = np.clip(tau * np.log(tau ** 3 / num_tracks), tau ** -1, tau ** 2)
t = tracks[:, 0].reshape(-1, 1)
kernel = RBF(length_scale, length_scale_bounds="fixed")
gpr = GPR(kernel)
# Smooth columns 2 to 5 simultaneously (if supported by your version of scikit-learn)
smoothed_columns = gpr.fit(t, tracks[:, 2:6]).predict(t)
# Build new rows with the smoothed data, retaining other columns and appending -1
for i in range(len(tracks)):
new_row = np.concatenate(([tracks[i, 0], obj_id], smoothed_columns[i], tracks[i, 6:8], [-1]))
smoothed_output.append(new_row)
return np.array(smoothed_output)
def process_file(file_path: Path, interval: int, tau: float):
"""
Process a single MOT results file by applying linear interpolation and Gaussian smoothing.
Parameters:
file_path (Path): Path to the tracking results file.
interval (int): Interval for linear interpolation.
tau (float): Smoothing parameter for Gaussian process.
"""
LOGGER.info(f"Applying GSI to: {file_path}")
tracking_results = np.loadtxt(file_path, delimiter=',')
if tracking_results.size != 0:
interpolated_results = linear_interpolation(tracking_results, interval)
smoothed_results = gaussian_smooth(interpolated_results, tau)
np.savetxt(file_path, smoothed_results, fmt='%d %d %d %d %d %d %d %d %d')
else:
LOGGER.warning(f'No tracking results in {file_path}. Skipping...')
def gsi(mot_results_folder: Path, interval: int = 20, tau: float = 10):
"""
Apply Gaussian Smoothed Interpolation (GSI) to all tracking result files in a folder.
Parameters:
mot_results_folder (Path): Path to the folder containing MOT result files.
interval (int, optional): Maximum gap to perform interpolation. Defaults to 20.
tau (float, optional): Smoothing parameter for Gaussian process. Defaults to 10.
"""
tracking_files = list(mot_results_folder.glob('MOT*.txt'))
total_files = len(tracking_files)
LOGGER.info(f"Found {total_files} file(s) to process.")
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(process_file, file_path, interval, tau): file_path for file_path in tracking_files}
for future in tqdm(concurrent.futures.as_completed(futures), total=total_files, desc="Processing files"):
file_path = futures[future]
try:
future.result()
except Exception as e:
LOGGER.error(f"Error processing file {file_path}: {e}")
def main():
"""
Parse command line arguments and run the Gaussian Smoothed Interpolation process.
"""
parser = argparse.ArgumentParser(
description='Apply Gaussian Smoothed Interpolation (GSI) to tracking results.'
)
parser.add_argument('--path', type=str, required=True, help='Path to MOT results folder')
args = parser.parse_args()
mot_results_folder = Path(args.path)
gsi(mot_results_folder)
if __name__ == "__main__":
main()