Spaces:
Runtime error
Runtime error
| import copy | |
| import random | |
| import torch | |
| from util.edges_utils import get_edges_alldirections_rev | |
| from util.math_utils import clip | |
| from util.mean_std import mean, std | |
| def data_to_cuda(samples, targets): | |
| return samples.to(torch.device("cuda")), [ | |
| {k: v if isinstance(v, str) else v.to(torch.device("cuda")) for k, v in t.items()} for t in targets | |
| ] | |
| def get_random_layer_targets(targets, gt_layer): | |
| random_targets = [] | |
| for batch_i, target_i in enumerate(targets): | |
| random_layer_targets_i = copy.deepcopy(target_i) | |
| if gt_layer[batch_i] != len(random_layer_targets_i["layer_indices"]) - 1: | |
| start = random_layer_targets_i["layer_indices"][gt_layer[batch_i]].item() | |
| end = random_layer_targets_i["layer_indices"][gt_layer[batch_i] + 1].item() | |
| else: | |
| start = random_layer_targets_i["layer_indices"][gt_layer[batch_i]].item() | |
| end = len(random_layer_targets_i["points"]) | |
| random_points_i = random_layer_targets_i["points"][start:end, :] | |
| random_edges_i = random_layer_targets_i["edges"][start:end] | |
| random_unnormalized_points_i = random_layer_targets_i["unnormalized_points"][start:end, :] | |
| random_layer_targets_i["points"] = random_points_i | |
| random_layer_targets_i["edges"] = random_edges_i | |
| random_layer_targets_i["unnormalized_points"] = random_unnormalized_points_i | |
| del random_layer_targets_i["layer_indices"] | |
| random_targets.append(random_layer_targets_i) | |
| return random_targets | |
| def random_layers(targets): | |
| return [random.randint(0, len(targets[i]["layer_indices"]) - 1) for i in range(len(targets))] | |
| def get_given_layers_random_region(targets, graphs): | |
| random_regions = [] | |
| for bs_i in range(len(targets)): | |
| # target | |
| targets_i = targets[bs_i] | |
| graphs_i = graphs[bs_i] | |
| # level 0: start | |
| start_i = tuple(targets_i["unnormalized_points"][0].tolist()) | |
| # sampled prob: for a neighborhood, each node is sampled by this probability | |
| # sampled_prob = 0.0001 | |
| # sampled_prob = random.random() | |
| sampled_prob = 0.5 | |
| # sampled_prob = 1 | |
| # sampled nodes | |
| sampled_points = {} | |
| for point_tensor in targets_i["unnormalized_points"]: | |
| pos = tuple(point_tensor.tolist()) | |
| sampled_points[pos] = 0 | |
| # edges of sampled nodes | |
| sampled_edges = [] | |
| # nodes number of subgraph | |
| # sampled_amount = random.randint(0, len(sampled_points) + 2) | |
| # if sampled_amount in [len(sampled_points) + 1]: | |
| # sampled_amount = 0 | |
| # if sampled_amount in [len(sampled_points) + 2]: | |
| # sampled_amount = len(sampled_points) | |
| sampled_amount = random.randint(0, len(sampled_points)) # TODO: 1~len(sampled_points) | |
| # Note that when sampled_prob = 1, the number of sampled nodes must be in 'layer_indices' or be the total number of points to ensure that the entire layers is sampled. | |
| # equal to BFS | |
| if sampled_prob == 1: | |
| l = targets_i["layer_indices"].tolist() | |
| l.append(len(sampled_points)) | |
| l.append(0) | |
| l.append(len(sampled_points)) | |
| sampled_amount = l[random.randint(0, len(l) - 1)] | |
| # start sampling | |
| if sampled_amount == 0: | |
| random_regions.append((sampled_points, sampled_edges)) | |
| continue | |
| sampled_points[start_i] = 1 | |
| if sampled_amount == 1: | |
| random_regions.append((sampled_points, sampled_edges)) | |
| continue | |
| max_iterations = max(1000, 10 * sampled_amount) # Ensure at least 1000 iterations | |
| iteration_count = 0 | |
| while sum(sampled_points.values()) < sampled_amount: | |
| iteration_count += 1 | |
| if iteration_count > max_iterations: | |
| print("Reached maximum iterations, breaking to avoid infinite loop.") | |
| break | |
| all_sampled_points = set([k for k, v in sampled_points.items() if v == 1]) | |
| all_sampled_points_adjs = set() | |
| for sampled_point in all_sampled_points: | |
| adj = set([(int(x[0]), int(x[1])) for x in graphs_i[sampled_point]]) | |
| all_sampled_points_adjs = all_sampled_points_adjs.union(adj) | |
| if (-1, -1) in all_sampled_points_adjs: | |
| all_sampled_points_adjs.remove((-1, -1)) | |
| all_sampled_points_adjs = list(all_sampled_points_adjs.difference(all_sampled_points)) | |
| if not all_sampled_points_adjs: # If no more adjacent points to sample, break | |
| print("No more adjacent points to sample, breaking the loop.") | |
| break | |
| # shuffle the last layer to let it uniform (no bias of sample order) | |
| random.shuffle(all_sampled_points_adjs) | |
| # determine whether to sample nodes in each neighborhood based on probability | |
| for all_sampled_points_adj_index, all_sampled_points_adj in enumerate(all_sampled_points_adjs): | |
| all_sampled_points = set([k for k, v in sampled_points.items() if v == 1]) | |
| if sum(sampled_points.values()) == sampled_amount: | |
| break | |
| else: | |
| if 1: | |
| if random.random() < sampled_prob: | |
| sampled_points[all_sampled_points_adj] = 1 | |
| # sample edges | |
| all_pos1s = graphs_i[all_sampled_points_adj] | |
| pos2 = all_sampled_points_adj | |
| for pos1 in all_pos1s: | |
| if pos1 in all_sampled_points: | |
| sampled_edges.append((pos1, pos2)) | |
| else: | |
| sampled_points[all_sampled_points_adj] = 0 | |
| random_regions.append((sampled_points, sampled_edges)) | |
| return random_regions | |
| def get_random_region_targets(given_layers, graphs, targets): | |
| random_region_targets = [] | |
| for bs_i in range(len(targets)): | |
| random_region_target = {} | |
| targets_i = targets[bs_i] | |
| graphs_i = graphs[bs_i] | |
| given_layers_i = given_layers[bs_i] | |
| sampled_points_i, sampled_edges_i = given_layers_i | |
| if sum(sampled_points_i.values()) == 0: | |
| random_region_target["edges"] = targets_i["edges"][:1] | |
| if "semantic_left_up" in targets_i: | |
| random_region_target["semantic_left_up"] = targets_i["semantic_left_up"][:1] | |
| random_region_target["semantic_right_up"] = targets_i["semantic_right_up"][:1] | |
| random_region_target["semantic_right_down"] = targets_i["semantic_right_down"][:1] | |
| random_region_target["semantic_left_down"] = targets_i["semantic_left_down"][:1] | |
| random_region_target["image_id"] = targets_i["image_id"] | |
| random_region_target["size"] = targets_i["size"] | |
| random_region_target["unnormalized_points"] = targets_i["unnormalized_points"][:1] | |
| random_region_target["points"] = targets_i["points"][:1] | |
| random_region_target["last_edges"] = torch.zeros( | |
| (1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_target["this_edges"] = torch.zeros( | |
| (1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_targets.append(random_region_target) | |
| elif 1 <= sum(sampled_points_i.values()) <= len(sampled_points_i) - 1: | |
| sampled_points_i_given = set([k for k, v in sampled_points_i.items() if v == 1]) | |
| unnormalized_points = [] | |
| for point, sampled_or_not in sampled_points_i.items(): | |
| if sampled_or_not == 0: | |
| adjs = graphs_i[point] | |
| for adj in adjs: | |
| if adj in sampled_points_i_given: | |
| unnormalized_points.append(point) | |
| break | |
| if len(unnormalized_points) == 0: | |
| random_region_target["edges"] = targets_i["edges"][:1] | |
| if "semantic_left_up" in targets_i: | |
| random_region_target["semantic_left_up"] = targets_i["semantic_left_up"][:1] | |
| random_region_target["semantic_right_up"] = targets_i["semantic_right_up"][:1] | |
| random_region_target["semantic_right_down"] = targets_i["semantic_right_down"][:1] | |
| random_region_target["semantic_left_down"] = targets_i["semantic_left_down"][:1] | |
| random_region_target["image_id"] = targets_i["image_id"] | |
| random_region_target["size"] = targets_i["size"] | |
| random_region_target["unnormalized_points"] = targets_i["unnormalized_points"][:1] | |
| random_region_target["points"] = targets_i["points"][:1] | |
| random_region_target["last_edges"] = torch.zeros( | |
| (1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_target["this_edges"] = torch.zeros( | |
| (1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_targets.append(random_region_target) | |
| continue | |
| indices_for_semantic = [] | |
| for unnormalized_point in unnormalized_points: | |
| for ind, every_point in enumerate(targets_i["unnormalized_points"]): | |
| every_point = tuple(every_point.tolist()) | |
| if ( | |
| abs(every_point[0] - unnormalized_point[0]) <= 2 | |
| and abs(every_point[1] - unnormalized_point[1]) <= 2 | |
| ): | |
| indices_for_semantic.append(ind) | |
| # assert len(unnormalized_points) == len(indices_for_semantic) | |
| semantic_left_up = [] | |
| semantic_right_up = [] | |
| semantic_right_down = [] | |
| semantic_left_down = [] | |
| if "semantic_left_up" in targets_i: | |
| for ind in indices_for_semantic: | |
| semantic_left_up.append(targets_i["semantic_left_up"][ind].item()) | |
| semantic_right_up.append(targets_i["semantic_right_up"][ind].item()) | |
| semantic_right_down.append(targets_i["semantic_right_down"][ind].item()) | |
| semantic_left_down.append(targets_i["semantic_left_down"][ind].item()) | |
| edges = [] | |
| for unnormalized_point in unnormalized_points: | |
| edge = "" | |
| adjs = graphs_i[unnormalized_point] | |
| for adj in adjs: | |
| if adj != (-1, -1): | |
| edge += "1" | |
| else: | |
| edge += "0" | |
| edge = get_edges_alldirections_rev(edge) | |
| edges.append(edge) | |
| last_edges = [] | |
| for unnormalized_point in unnormalized_points: | |
| last_edge = "" | |
| adjs = graphs_i[unnormalized_point] | |
| for adj in adjs: | |
| if adj in sampled_points_i_given: | |
| last_edge += "1" | |
| else: | |
| last_edge += "0" | |
| last_edge = get_edges_alldirections_rev(last_edge) | |
| last_edges.append(last_edge) | |
| this_edges = [] | |
| for unnormalized_point in unnormalized_points: | |
| this_edge = "" | |
| adjs = graphs_i[unnormalized_point] | |
| for adj in adjs: | |
| if adj in unnormalized_points: | |
| this_edge += "1" | |
| else: | |
| this_edge += "0" | |
| this_edge = get_edges_alldirections_rev(this_edge) | |
| this_edges.append(this_edge) | |
| random_region_target["edges"] = torch.tensor( | |
| edges, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| if "semantic_left_up" in targets_i: | |
| random_region_target["semantic_left_up"] = torch.tensor( | |
| semantic_left_up, | |
| dtype=targets_i["semantic_left_up"].dtype, | |
| device=targets_i["semantic_left_up"].device, | |
| ) | |
| random_region_target["semantic_right_up"] = torch.tensor( | |
| semantic_right_up, | |
| dtype=targets_i["semantic_right_up"].dtype, | |
| device=targets_i["semantic_right_up"].device, | |
| ) | |
| random_region_target["semantic_right_down"] = torch.tensor( | |
| semantic_right_down, | |
| dtype=targets_i["semantic_right_down"].dtype, | |
| device=targets_i["semantic_right_down"].device, | |
| ) | |
| random_region_target["semantic_left_down"] = torch.tensor( | |
| semantic_left_down, | |
| dtype=targets_i["semantic_left_down"].dtype, | |
| device=targets_i["semantic_left_down"].device, | |
| ) | |
| random_region_target["image_id"] = targets_i["image_id"] | |
| random_region_target["size"] = targets_i["size"] | |
| # NEW | |
| # if len(unnormalized_points) == 0: | |
| # print("Warning: unnormalized_points is empty. Initializing to default value.") | |
| # unnormalized_points = torch.zeros((1, 2), dtype=targets_i['unnormalized_points'].dtype, | |
| # device=targets_i['unnormalized_points'].device) | |
| # else: | |
| # random_region_target['unnormalized_points'] = torch.tensor(unnormalized_points, | |
| # dtype=targets_i['unnormalized_points'].dtype, | |
| # device=targets_i['unnormalized_points'].device) | |
| random_region_target["unnormalized_points"] = torch.tensor( | |
| unnormalized_points, | |
| dtype=targets_i["unnormalized_points"].dtype, | |
| device=targets_i["unnormalized_points"].device, | |
| ) | |
| random_region_target["points"] = ( | |
| torch.tensor(unnormalized_points, dtype=targets_i["points"].dtype, device=targets_i["points"].device) | |
| / targets_i["size"] | |
| ) | |
| random_region_target["last_edges"] = torch.tensor( | |
| last_edges, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_target["this_edges"] = torch.tensor( | |
| this_edges, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_targets.append(random_region_target) | |
| else: | |
| random_region_target["edges"] = 16 * torch.ones( | |
| targets_i["edges"][:1].shape, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| if "semantic_left_up" in targets_i: | |
| random_region_target["semantic_left_up"] = 11 * torch.ones( | |
| targets_i["semantic_left_up"][:1].shape, | |
| dtype=targets_i["semantic_left_up"].dtype, | |
| device=targets_i["semantic_left_up"].device, | |
| ) | |
| random_region_target["semantic_right_up"] = 11 * torch.ones( | |
| targets_i["semantic_right_up"][:1].shape, | |
| dtype=targets_i["semantic_right_up"].dtype, | |
| device=targets_i["semantic_right_up"].device, | |
| ) | |
| random_region_target["semantic_right_down"] = 11 * torch.ones( | |
| targets_i["semantic_right_down"][:1].shape, | |
| dtype=targets_i["semantic_right_down"].dtype, | |
| device=targets_i["semantic_right_down"].device, | |
| ) | |
| random_region_target["semantic_left_down"] = 11 * torch.ones( | |
| targets_i["semantic_left_down"][:1].shape, | |
| dtype=targets_i["semantic_left_down"].dtype, | |
| device=targets_i["semantic_left_down"].device, | |
| ) | |
| random_region_target["image_id"] = targets_i["image_id"] | |
| random_region_target["size"] = targets_i["size"] | |
| random_region_target["unnormalized_points"] = 505 * torch.ones( | |
| targets_i["unnormalized_points"][:1].shape, | |
| dtype=targets_i["unnormalized_points"][:1].dtype, | |
| device=targets_i["unnormalized_points"][:1].device, | |
| ) | |
| random_region_target["points"] = ( | |
| 505 | |
| * torch.ones( | |
| targets_i["unnormalized_points"][:1].shape, | |
| dtype=targets_i["points"][:1].dtype, | |
| device=targets_i["points"][:1].device, | |
| ) | |
| ) / targets_i["size"] | |
| random_region_target["last_edges"] = 16 * torch.ones( | |
| (1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_target["this_edges"] = 16 * torch.ones( | |
| (1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device | |
| ) | |
| random_region_targets.append(random_region_target) | |
| return random_region_targets | |
| def random_pertubation(sampled_points_i, sampled_edges_i): | |
| random_pertube_map = {} | |
| sigma = 2 | |
| pertube_threshold = 5 | |
| for sampled_point in sampled_points_i: | |
| random_pertube_map[sampled_point] = ( | |
| sampled_point[0] + clip(int(random.gauss(0, sigma)), -1 * pertube_threshold, pertube_threshold), | |
| sampled_point[1] + clip(int(random.gauss(0, sigma)), -1 * pertube_threshold, pertube_threshold), | |
| ) | |
| new_sampled_points_i = {} | |
| new_sampled_edges_i = [] | |
| for sampled_point in sampled_points_i: | |
| new_sampled_points_i[random_pertube_map[sampled_point]] = sampled_points_i[sampled_point] | |
| for pos1, pos2 in sampled_edges_i: | |
| new_sampled_edges_i.append((random_pertube_map[pos1], random_pertube_map[pos2])) | |
| return new_sampled_points_i, new_sampled_edges_i | |
| def draw_given_layers_on_tensors_random_region(given_layers, tensors, graphs): | |
| """draw 9*9 yellow squares and width 2 blue lines""" | |
| tensors_list = [] | |
| unnormalized_list = [] | |
| for i in range(len(given_layers)): | |
| temp_tensor = tensors[i] | |
| temp_tensor_0 = (temp_tensor[0] * std[0] + mean[0]) * 255 | |
| temp_tensor_1 = (temp_tensor[1] * std[1] + mean[1]) * 255 | |
| temp_tensor_2 = (temp_tensor[2] * std[2] + mean[2]) * 255 | |
| rectangle_radius = 5 | |
| # end sign | |
| endsign = (505, 505) | |
| valid_violet_endsign_up = endsign[1] - rectangle_radius | |
| valid_violet_endsign_down = endsign[1] + rectangle_radius | |
| valid_violet_endsign_left = endsign[0] - rectangle_radius | |
| valid_violet_endsign_right = endsign[0] + rectangle_radius | |
| temp_tensor_0[ | |
| valid_violet_endsign_up : valid_violet_endsign_down + 1, | |
| valid_violet_endsign_left : valid_violet_endsign_right + 1, | |
| ] = 255 | |
| temp_tensor_1[ | |
| valid_violet_endsign_up : valid_violet_endsign_down + 1, | |
| valid_violet_endsign_left : valid_violet_endsign_right + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| valid_violet_endsign_up : valid_violet_endsign_down + 1, | |
| valid_violet_endsign_left : valid_violet_endsign_right + 1, | |
| ] = 255 | |
| sampled_points_i, sampled_edges_i = given_layers[i] | |
| sampled_points_i, sampled_edges_i = random_pertubation(sampled_points_i, sampled_edges_i) | |
| given_points = [k for k, v in sampled_points_i.items() if v == 1] | |
| for j, pos in enumerate(given_points): | |
| valid_yellow_pos_up = int(pos[1] - rectangle_radius) if (pos[1] - rectangle_radius) >= 0 else 0 | |
| valid_yellow_pos_down = ( | |
| int(pos[1] + rectangle_radius) | |
| if (pos[1] + rectangle_radius) < temp_tensor.shape[2] | |
| else temp_tensor.shape[2] - 1 | |
| ) | |
| valid_yellow_pos_left = int(pos[0] - rectangle_radius) if (pos[0] - rectangle_radius) >= 0 else 0 | |
| valid_yellow_pos_right = ( | |
| int(pos[0] + rectangle_radius) | |
| if (pos[0] + rectangle_radius) < temp_tensor.shape[1] | |
| else temp_tensor.shape[1] - 1 | |
| ) | |
| temp_tensor_0[ | |
| valid_yellow_pos_up : valid_yellow_pos_down + 1, valid_yellow_pos_left : valid_yellow_pos_right + 1 | |
| ] = 255 | |
| temp_tensor_1[ | |
| valid_yellow_pos_up : valid_yellow_pos_down + 1, valid_yellow_pos_left : valid_yellow_pos_right + 1 | |
| ] = 255 | |
| temp_tensor_2[ | |
| valid_yellow_pos_up : valid_yellow_pos_down + 1, valid_yellow_pos_left : valid_yellow_pos_right + 1 | |
| ] = 0 | |
| # draw blue lines | |
| line_width = 2 | |
| for edge in sampled_edges_i: | |
| pos1 = (int(edge[0][0]), int(edge[0][1])) | |
| pos2 = (int(edge[1][0]), int(edge[1][1])) | |
| if abs(pos1[0] - pos2[0]) < abs(pos1[1] - pos2[1]): | |
| if pos1[1] > pos2[1]: | |
| temp_tensor_0[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 255 | |
| else: | |
| temp_tensor_0[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 255 | |
| else: | |
| if pos1[0] > pos2[0]: | |
| temp_tensor_0[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 255 | |
| else: | |
| temp_tensor_0[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 255 | |
| unnormalized = torch.stack((temp_tensor_0, temp_tensor_1, temp_tensor_2), dim=0) | |
| unnormalized_list.append(unnormalized) | |
| temp_tensor_0_renorm = ((temp_tensor_0 / 255) - mean[0]) / std[0] | |
| temp_tensor_1_renorm = ((temp_tensor_1 / 255) - mean[1]) / std[1] | |
| temp_tensor_2_renorm = ((temp_tensor_2 / 255) - mean[2]) / std[2] | |
| temp_tensor = torch.stack([temp_tensor_0_renorm, temp_tensor_1_renorm, temp_tensor_2_renorm], dim=0) | |
| tensors_list.append(temp_tensor) | |
| return torch.stack(tensors_list, dim=0), torch.stack(unnormalized_list, dim=0) | |
| def initialize_tensors(tensors): | |
| tensors_list = [] | |
| unnormalized_list = [] | |
| for i in range(len(tensors)): | |
| temp_tensor = tensors[i] | |
| temp_tensor_0 = (temp_tensor[0] * std[0] + mean[0]) * 255 | |
| temp_tensor_1 = (temp_tensor[1] * std[1] + mean[1]) * 255 | |
| temp_tensor_2 = (temp_tensor[2] * std[2] + mean[2]) * 255 | |
| rectangle_radius = 5 # 4+1+4=9 | |
| # end sign (when predict this, AR iteration terminates) | |
| endsign = (505, 505) | |
| valid_violet_endsign_up = endsign[1] - rectangle_radius | |
| valid_violet_endsign_down = endsign[1] + rectangle_radius | |
| valid_violet_endsign_left = endsign[0] - rectangle_radius | |
| valid_violet_endsign_right = endsign[0] + rectangle_radius | |
| temp_tensor_0[ | |
| valid_violet_endsign_up : valid_violet_endsign_down + 1, | |
| valid_violet_endsign_left : valid_violet_endsign_right + 1, | |
| ] = 255 | |
| temp_tensor_1[ | |
| valid_violet_endsign_up : valid_violet_endsign_down + 1, | |
| valid_violet_endsign_left : valid_violet_endsign_right + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| valid_violet_endsign_up : valid_violet_endsign_down + 1, | |
| valid_violet_endsign_left : valid_violet_endsign_right + 1, | |
| ] = 255 | |
| unnormalized = torch.stack((temp_tensor_0, temp_tensor_1, temp_tensor_2), dim=0) | |
| unnormalized_list.append(unnormalized) | |
| temp_tensor_0_renorm = ((temp_tensor_0 / 255) - mean[0]) / std[0] | |
| temp_tensor_1_renorm = ((temp_tensor_1 / 255) - mean[1]) / std[1] | |
| temp_tensor_2_renorm = ((temp_tensor_2 / 255) - mean[2]) / std[2] | |
| temp_tensor = torch.stack([temp_tensor_0_renorm, temp_tensor_1_renorm, temp_tensor_2_renorm], dim=0) | |
| tensors_list.append(temp_tensor) | |
| return torch.stack(tensors_list, dim=0), torch.stack(unnormalized_list, dim=0) | |
| def l1_dist(pos1, pos2): | |
| return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) | |
| def delete_graphs(targets): | |
| no_graph_targets = [] | |
| for target in targets: | |
| target_ = copy.deepcopy(target) | |
| del target_["graph"] | |
| no_graph_targets.append(target_) | |
| return no_graph_targets | |
| def delete_graphs_and_unnormpoints(targets): | |
| no_graph_targets = [] | |
| for target in targets: | |
| target_ = copy.deepcopy(target) | |
| del target_["graph"] | |
| del target_["unnormalized_points"] | |
| no_graph_targets.append(target_) | |
| return no_graph_targets | |
| def get_remove_point(this_preds, dist_threshold): | |
| for point1 in this_preds: | |
| for point2 in this_preds: | |
| # if point1 != point2: | |
| if not ( | |
| (point1["points"].tolist()[0] == point2["points"].tolist()[0]) | |
| and (point1["points"].tolist()[1] == point2["points"].tolist()[1]) | |
| ): | |
| dist_chebyshev = max( | |
| abs(point1["points"].tolist()[0] - point2["points"].tolist()[0]), | |
| abs(point1["points"].tolist()[1] - point2["points"].tolist()[1]), | |
| ) | |
| if dist_chebyshev <= dist_threshold: | |
| point1_confidence = point1["scores"].item() | |
| point2_confidence = point2["scores"].item() | |
| if point1_confidence < point2_confidence: | |
| return point1 | |
| elif point2_confidence < point1_confidence: | |
| return point2 | |
| else: | |
| return [point1, point2][random.randint(0, 1)] | |
| return None | |
| def point_inside(point, points_list): | |
| point1 = tuple(point["points"].tolist()) | |
| for point_i in points_list: | |
| point1_i = tuple(point_i["points"].tolist()) | |
| if point1 == point1_i: | |
| return True | |
| return False | |
| def remove_points(need_to_remove_in_last_edges, this_preds): | |
| result = [] | |
| for this_pred in this_preds: | |
| if not point_inside(this_pred, need_to_remove_in_last_edges): | |
| result.append(this_pred) | |
| return result | |
| def nms(this_preds): | |
| if len(this_preds) <= 1: | |
| return this_preds | |
| else: | |
| dist_threshold = 5 | |
| while True: | |
| remove_point = get_remove_point(this_preds, dist_threshold) | |
| if remove_point is None: | |
| break | |
| else: | |
| # this_preds.remove(remove_point) | |
| this_preds = remove_points([remove_point], this_preds) | |
| return this_preds | |
| def nms_givenpoints(this_preds, preds): | |
| if len(this_preds) == 0: | |
| return this_preds | |
| else: | |
| all_given_points = [] | |
| for given_points, given_last_edges, given_this_edges in preds: | |
| all_given_points.extend(given_points) | |
| if len(all_given_points) == 0: | |
| return this_preds | |
| this_preds_copy = copy.deepcopy(this_preds) | |
| dist_threshold = 5 | |
| for this_pred in this_preds_copy: | |
| for given_point in all_given_points: | |
| this_pred_pos = tuple(this_pred["points"].tolist()) | |
| given_point_pos = tuple(given_point["points"].tolist()) | |
| dist_chebyshev = max( | |
| abs(this_pred_pos[0] - given_point_pos[0]), abs(this_pred_pos[1] - given_point_pos[1]) | |
| ) | |
| if dist_chebyshev <= dist_threshold: | |
| this_preds = remove_points([this_pred], this_preds) | |
| break | |
| return this_preds | |
| def random_keep(this_preds): | |
| if len(this_preds) <= 1: | |
| return this_preds | |
| else: | |
| while True: | |
| random_keep_this_preds = [] | |
| for point in this_preds: | |
| # is_keep = random.random() < point['scores'].item() | |
| is_keep = random.random() < 1.01 | |
| # is_keep = random.random() < 0.5 | |
| if is_keep: | |
| random_keep_this_preds.append(point) | |
| if len(random_keep_this_preds) > 0: | |
| return random_keep_this_preds | |
| def is_stop(this_preds): | |
| if len(this_preds) == 0: | |
| return 1 # stop | |
| elif (len(this_preds) >= 1) and (16 in [p["edges"].item() for p in this_preds]): | |
| return 2 # normally terminate | |
| else: | |
| return 0 # not stop | |
| def draw_preds_on_tensors(preds, tensors): | |
| tensors_list = [] | |
| unnormalized_list = [] | |
| for i in range(len(tensors)): | |
| temp_tensor = tensors[i] | |
| temp_tensor_0 = (temp_tensor[0] * std[0] + mean[0]) * 255 | |
| temp_tensor_1 = (temp_tensor[1] * std[1] + mean[1]) * 255 | |
| temp_tensor_2 = (temp_tensor[2] * std[2] + mean[2]) * 255 | |
| rectangle_radius = 5 | |
| this_preds, last_edges, this_edges = preds[-1] | |
| for this_pred in this_preds: | |
| point = tuple([int(_) for _ in this_pred["points"].tolist()]) | |
| up = point[1] - rectangle_radius | |
| down = point[1] + rectangle_radius | |
| left = point[0] - rectangle_radius | |
| right = point[0] + rectangle_radius | |
| temp_tensor_0[up : down + 1, left : right + 1] = 255 | |
| temp_tensor_1[up : down + 1, left : right + 1] = 255 | |
| temp_tensor_2[up : down + 1, left : right + 1] = 0 | |
| line_width = 2 | |
| for last_edge in last_edges: | |
| pos1 = tuple([int(_) for _ in last_edge[0]["points"].tolist()]) | |
| pos2 = tuple([int(_) for _ in last_edge[1]["points"].tolist()]) | |
| if abs(pos1[0] - pos2[0]) < abs(pos1[1] - pos2[1]): | |
| if pos1[1] > pos2[1]: | |
| temp_tensor_0[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 255 | |
| else: | |
| temp_tensor_0[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 255 | |
| else: | |
| if pos1[0] > pos2[0]: | |
| temp_tensor_0[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 255 | |
| else: | |
| temp_tensor_0[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 255 | |
| for this_edge in this_edges: | |
| pos1 = tuple([int(_) for _ in this_edge[0]["points"].tolist()]) | |
| pos2 = tuple([int(_) for _ in this_edge[1]["points"].tolist()]) | |
| if abs(pos1[0] - pos2[0]) < abs(pos1[1] - pos2[1]): | |
| if pos1[1] > pos2[1]: | |
| temp_tensor_0[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| pos2[1] : pos1[1] + 1, | |
| int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 255 | |
| else: | |
| temp_tensor_0[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| pos1[1] : pos2[1] + 1, | |
| int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| ] = 255 | |
| else: | |
| if pos1[0] > pos2[0]: | |
| temp_tensor_0[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos2[0] : pos1[0] + 1, | |
| ] = 255 | |
| else: | |
| temp_tensor_0[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 0 | |
| temp_tensor_1[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 0 | |
| temp_tensor_2[ | |
| int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2) | |
| + int(line_width / 2) | |
| + 1, | |
| pos1[0] : pos2[0] + 1, | |
| ] = 255 | |
| unnormalized = torch.stack((temp_tensor_0, temp_tensor_1, temp_tensor_2), dim=0) | |
| unnormalized_list.append(unnormalized) | |
| temp_tensor_0_renorm = ((temp_tensor_0 / 255) - mean[0]) / std[0] | |
| temp_tensor_1_renorm = ((temp_tensor_1 / 255) - mean[1]) / std[1] | |
| temp_tensor_2_renorm = ((temp_tensor_2 / 255) - mean[2]) / std[2] | |
| temp_tensor = torch.stack([temp_tensor_0_renorm, temp_tensor_1_renorm, temp_tensor_2_renorm], dim=0) | |
| tensors_list.append(temp_tensor) | |
| return torch.stack(tensors_list, dim=0), torch.stack(unnormalized_list, dim=0) | |
| def edge_inside(edge, edges_list): | |
| edge_point1 = tuple(edge[0]["points"].tolist()) | |
| edge_point2 = tuple(edge[1]["points"].tolist()) | |
| for edge_i in edges_list: | |
| edge_i_point1 = tuple(edge_i[0]["points"].tolist()) | |
| edge_i_point2 = tuple(edge_i[1]["points"].tolist()) | |
| if ((edge_point1 == edge_i_point1) and (edge_point2 == edge_i_point2)) or ( | |
| (edge_point1 == edge_i_point2) and (edge_point2 == edge_i_point1) | |
| ): | |
| return True | |
| return False | |
| def remove_edge(edge, edges_list): | |
| result = [] | |
| edge_point1 = tuple(edge[0]["points"].tolist()) | |
| edge_point2 = tuple(edge[1]["points"].tolist()) | |
| for edge_i in edges_list: | |
| edge_i_point1 = tuple(edge_i[0]["points"].tolist()) | |
| edge_i_point2 = tuple(edge_i[1]["points"].tolist()) | |
| if (edge_point1 == edge_i_point1) and (edge_point2 == edge_i_point2): | |
| pass | |
| else: | |
| result.append(edge_i) | |
| return result | |
| def get_edges_amount(preds): | |
| count = 0 | |
| for this_preds, last_edges, this_edges in preds: | |
| count += len(last_edges) | |
| count += len(this_edges) | |
| return count | |
| def get_reserve_preds(results, keep_confidence_threshold, targets): | |
| reserve_preds = [] | |
| valid_label_indices_edges = torch.where(results["edges"] != 0)[0] | |
| valid_label_indices_scores = torch.where(results["scores"] <= keep_confidence_threshold)[0] | |
| valid_label_indices = torch.tensor( | |
| list(set(valid_label_indices_edges.tolist()).intersection(set(valid_label_indices_scores.tolist()))), | |
| dtype=valid_label_indices_edges.dtype, | |
| device=valid_label_indices_edges.device, | |
| ) | |
| for valid_label_indice in valid_label_indices: | |
| valid_results_i = {} | |
| valid_results_i["scores"] = results["scores"][valid_label_indice] | |
| valid_results_i["points"] = results["points"][valid_label_indice] | |
| valid_results_i["last_edges"] = results["last_edges"][valid_label_indice] | |
| valid_results_i["this_edges"] = results["this_edges"][valid_label_indice] | |
| valid_results_i["edges"] = results["edges"][valid_label_indice] | |
| valid_results_i["size"] = targets[0]["size"] | |
| reserve_preds.append(valid_results_i) | |
| return reserve_preds | |