Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import cv2 | |
| import torch | |
| # Note that the coordinates passed to the model must not exceed 256. | |
| # xy range 256 | |
| def pdf2(sigma_matrix, grid): | |
| """Calculate PDF of the bivariate Gaussian distribution. | |
| Args: | |
| sigma_matrix (ndarray): with the shape (2, 2) | |
| grid (ndarray): generated by :func:`mesh_grid`, | |
| with the shape (K, K, 2), K is the kernel size. | |
| Returns: | |
| kernel (ndarrray): un-normalized kernel. | |
| """ | |
| inverse_sigma = np.linalg.inv(sigma_matrix) | |
| kernel = np.exp(-0.5 * np.sum(np.dot(grid, inverse_sigma) * grid, 2)) | |
| return kernel | |
| def mesh_grid(kernel_size): | |
| """Generate the mesh grid, centering at zero. | |
| Args: | |
| kernel_size (int): | |
| Returns: | |
| xy (ndarray): with the shape (kernel_size, kernel_size, 2) | |
| xx (ndarray): with the shape (kernel_size, kernel_size) | |
| yy (ndarray): with the shape (kernel_size, kernel_size) | |
| """ | |
| ax = np.arange(-kernel_size // 2 + 1.0, kernel_size // 2 + 1.0) | |
| xx, yy = np.meshgrid(ax, ax) | |
| xy = np.hstack( | |
| ( | |
| xx.reshape((kernel_size * kernel_size, 1)), | |
| yy.reshape(kernel_size * kernel_size, 1), | |
| ) | |
| ).reshape(kernel_size, kernel_size, 2) | |
| return xy, xx, yy | |
| def sigma_matrix2(sig_x, sig_y, theta): | |
| """Calculate the rotated sigma matrix (two dimensional matrix). | |
| Args: | |
| sig_x (float): | |
| sig_y (float): | |
| theta (float): Radian measurement. | |
| Returns: | |
| ndarray: Rotated sigma matrix. | |
| """ | |
| d_matrix = np.array([[sig_x**2, 0], [0, sig_y**2]]) | |
| u_matrix = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) | |
| return np.dot(u_matrix, np.dot(d_matrix, u_matrix.T)) | |
| def bivariate_Gaussian(kernel_size, sig_x, sig_y, theta, grid=None, isotropic=True): | |
| """Generate a bivariate isotropic or anisotropic Gaussian kernel. | |
| In the isotropic mode, only `sig_x` is used. `sig_y` and `theta` is ignored. | |
| Args: | |
| kernel_size (int): | |
| sig_x (float): | |
| sig_y (float): | |
| theta (float): Radian measurement. | |
| grid (ndarray, optional): generated by :func:`mesh_grid`, | |
| with the shape (K, K, 2), K is the kernel size. Default: None | |
| isotropic (bool): | |
| Returns: | |
| kernel (ndarray): normalized kernel. | |
| """ | |
| if grid is None: | |
| grid, _, _ = mesh_grid(kernel_size) | |
| if isotropic: | |
| sigma_matrix = np.array([[sig_x**2, 0], [0, sig_x**2]]) | |
| else: | |
| sigma_matrix = sigma_matrix2(sig_x, sig_y, theta) | |
| kernel = pdf2(sigma_matrix, grid) | |
| kernel = kernel / np.sum(kernel) | |
| return kernel | |
| size = 99 | |
| sigma = 10 | |
| blur_kernel = bivariate_Gaussian(size, sigma, sigma, 0, grid=None, isotropic=True) | |
| blur_kernel = blur_kernel / blur_kernel[size // 2, size // 2] | |
| canvas_width, canvas_height = 256, 256 | |
| def get_flow(points, optical_flow, video_len): | |
| for i in range(video_len - 1): | |
| p = points[i] | |
| p1 = points[i + 1] | |
| optical_flow[i + 1, p[1], p[0], 0] = p1[0] - p[0] | |
| optical_flow[i + 1, p[1], p[0], 1] = p1[1] - p[1] | |
| return optical_flow | |
| def process_points(points, frames=49): | |
| defualt_points = [[128, 128]] * frames | |
| if len(points) < 2: | |
| return defualt_points | |
| elif len(points) >= frames: | |
| skip = len(points) // frames | |
| return points[::skip][: frames - 1] + points[-1:] | |
| else: | |
| insert_num = frames - len(points) | |
| insert_num_dict = {} | |
| interval = len(points) - 1 | |
| n = insert_num // interval | |
| m = insert_num % interval | |
| for i in range(interval): | |
| insert_num_dict[i] = n | |
| for i in range(m): | |
| insert_num_dict[i] += 1 | |
| res = [] | |
| for i in range(interval): | |
| insert_points = [] | |
| x0, y0 = points[i] | |
| x1, y1 = points[i + 1] | |
| delta_x = x1 - x0 | |
| delta_y = y1 - y0 | |
| for j in range(insert_num_dict[i]): | |
| x = x0 + (j + 1) / (insert_num_dict[i] + 1) * delta_x | |
| y = y0 + (j + 1) / (insert_num_dict[i] + 1) * delta_y | |
| insert_points.append([int(x), int(y)]) | |
| res += points[i : i + 1] + insert_points | |
| res += points[-1:] | |
| return res | |
| def read_points_from_list(traj_list, video_len=16, reverse=False): | |
| points = [] | |
| for point in traj_list: | |
| if isinstance(point, str): | |
| x, y = point.strip().split(",") | |
| else: | |
| x, y = point[0], point[1] | |
| points.append((int(x), int(y))) | |
| if reverse: | |
| points = points[::-1] | |
| if len(points) > video_len: | |
| skip = len(points) // video_len | |
| points = points[::skip] | |
| points = points[:video_len] | |
| return points | |
| def read_points_from_file(file, video_len=16, reverse=False): | |
| with open(file, "r") as f: | |
| lines = f.readlines() | |
| points = [] | |
| for line in lines: | |
| x, y = line.strip().split(",") | |
| points.append((int(x), int(y))) | |
| if reverse: | |
| points = points[::-1] | |
| if len(points) > video_len: | |
| skip = len(points) // video_len | |
| points = points[::skip] | |
| points = points[:video_len] | |
| return points | |
| def process_traj(trajs_list, num_frames, video_size, device="cpu"): | |
| if trajs_list and trajs_list[0] and (not isinstance(trajs_list[0][0], (list, tuple))): | |
| tmp = trajs_list | |
| trajs_list = [tmp] | |
| optical_flow = np.zeros((num_frames, video_size[0], video_size[1], 2), dtype=np.float32) | |
| processed_points = [] | |
| for traj_list in trajs_list: | |
| points = read_points_from_list(traj_list, video_len=num_frames) | |
| xy_range = 256 | |
| h, w = video_size | |
| points = process_points(points, num_frames) | |
| points = [[int(w * x / xy_range), int(h * y / xy_range)] for x, y in points] | |
| optical_flow = get_flow(points, optical_flow, video_len=num_frames) | |
| processed_points.append(points) | |
| print(f"received {len(trajs_list)} trajectorie(s)") | |
| for i in range(1, num_frames): | |
| optical_flow[i] = cv2.filter2D(optical_flow[i], -1, blur_kernel) | |
| optical_flow = torch.tensor(optical_flow).to(device) | |
| return optical_flow, processed_points | |
| def add_provided_traj(traj_name): | |
| global traj_list | |
| traj_list = PROVIDED_TRAJS[traj_name] | |
| traj_str = [f"{traj}" for traj in traj_list] | |
| return ", ".join(traj_str) | |
| def scale_traj_list_to_256(traj_list, canvas_width, canvas_height): | |
| scale_x = 256 / canvas_width | |
| scale_y = 256 / canvas_height | |
| scaled_traj_list = [[int(x * scale_x), int(y * scale_y)] for x, y in traj_list] | |
| return scaled_traj_list |