File size: 5,860 Bytes
e1832f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from pathlib import Path
import numpy as np
import argparse
from sklearn.gaussian_process import GaussianProcessRegressor as GPR
from sklearn.gaussian_process.kernels import RBF
from boxmot.utils import logger as LOGGER
import concurrent.futures
from tqdm import tqdm


def linear_interpolation(data: np.ndarray, interval: int) -> np.ndarray:
    """
    Apply linear interpolation between rows in the tracking results.
    
    The function assumes the first two columns of `data` represent frame number and object ID.
    Interpolated rows are added when consecutive rows for the same ID have a gap of more than 1
    frame but less than the specified interval.
    
    Parameters:
        data (np.ndarray): Input tracking results.
        interval (int): Maximum gap to perform interpolation.
    
    Returns:
        np.ndarray: Tracking results with interpolated rows included.
    """
    # Sort data by frame and then by ID
    sorted_data = data[np.lexsort((data[:, 0], data[:, 1]))]
    result_rows = []
    previous_id = None
    previous_frame = None
    previous_row = None

    for row in sorted_data:
        current_frame, current_id = int(row[0]), int(row[1])
        if previous_id is not None and current_id == previous_id and previous_frame + 1 < current_frame < previous_frame + interval:
            gap = current_frame - previous_frame - 1
            for i in range(1, gap + 1):
                # Linear interpolation for each missing frame
                new_row = previous_row + (row - previous_row) * (i / (current_frame - previous_frame))
                result_rows.append(new_row)
        result_rows.append(row)
        previous_id, previous_frame, previous_row = current_id, current_frame, row

    result_array = np.array(result_rows)
    # Resort the array
    return result_array[np.lexsort((result_array[:, 0], result_array[:, 1]))]


def gaussian_smooth(data: np.ndarray, tau: float) -> np.ndarray:
    """
    Apply Gaussian process smoothing to specified columns in the tracking results.
    
    For each unique object ID in the data, this function smooths columns 2 through 5 using
    a Gaussian Process with an RBF kernel. Additional columns (columns 6 and 7) and a constant
    value (-1) are appended to each row.
    
    Parameters:
        data (np.ndarray): Tracking results.
        tau (float): Smoothing parameter.
    
    Returns:
        np.ndarray: Tracking results with smoothed columns.
    """
    smoothed_output = []
    unique_ids = np.unique(data[:, 1])
    for obj_id in unique_ids:
        tracks = data[data[:, 1] == obj_id]
        num_tracks = len(tracks)
        # Determine length scale using logarithmic scaling with clipping
        length_scale = np.clip(tau * np.log(tau ** 3 / num_tracks), tau ** -1, tau ** 2)
        t = tracks[:, 0].reshape(-1, 1)
        kernel = RBF(length_scale, length_scale_bounds="fixed")
        gpr = GPR(kernel)
        
        # Smooth columns 2 to 5 simultaneously (if supported by your version of scikit-learn)
        smoothed_columns = gpr.fit(t, tracks[:, 2:6]).predict(t)
        
        # Build new rows with the smoothed data, retaining other columns and appending -1
        for i in range(len(tracks)):
            new_row = np.concatenate(([tracks[i, 0], obj_id], smoothed_columns[i], tracks[i, 6:8], [-1]))
            smoothed_output.append(new_row)
            
    return np.array(smoothed_output)


def process_file(file_path: Path, interval: int, tau: float):
    """
    Process a single MOT results file by applying linear interpolation and Gaussian smoothing.
    
    Parameters:
        file_path (Path): Path to the tracking results file.
        interval (int): Interval for linear interpolation.
        tau (float): Smoothing parameter for Gaussian process.
    """
    LOGGER.info(f"Applying GSI to: {file_path}")
    tracking_results = np.loadtxt(file_path, delimiter=',')
    if tracking_results.size != 0:
        interpolated_results = linear_interpolation(tracking_results, interval)
        smoothed_results = gaussian_smooth(interpolated_results, tau)
        np.savetxt(file_path, smoothed_results, fmt='%d %d %d %d %d %d %d %d %d')
    else:
        LOGGER.warning(f'No tracking results in {file_path}. Skipping...')


def gsi(mot_results_folder: Path, interval: int = 20, tau: float = 10):
    """
    Apply Gaussian Smoothed Interpolation (GSI) to all tracking result files in a folder.
    
    Parameters:
        mot_results_folder (Path): Path to the folder containing MOT result files.
        interval (int, optional): Maximum gap to perform interpolation. Defaults to 20.
        tau (float, optional): Smoothing parameter for Gaussian process. Defaults to 10.
    """
    tracking_files = list(mot_results_folder.glob('MOT*.txt'))
    total_files = len(tracking_files)
    LOGGER.info(f"Found {total_files} file(s) to process.")

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(process_file, file_path, interval, tau): file_path for file_path in tracking_files}
        for future in tqdm(concurrent.futures.as_completed(futures), total=total_files, desc="Processing files"):
            file_path = futures[future]
            try:
                future.result()
            except Exception as e:
                LOGGER.error(f"Error processing file {file_path}: {e}")


def main():
    """
    Parse command line arguments and run the Gaussian Smoothed Interpolation process.
    """
    parser = argparse.ArgumentParser(
        description='Apply Gaussian Smoothed Interpolation (GSI) to tracking results.'
    )
    parser.add_argument('--path', type=str, required=True, help='Path to MOT results folder')
    args = parser.parse_args()

    mot_results_folder = Path(args.path)
    gsi(mot_results_folder)


if __name__ == "__main__":
    main()