anas
Initial deployment of Raster2Seq floor plan vectorization API
fadb92b
import copy
import random
import torch
from util.edges_utils import get_edges_alldirections_rev
from util.math_utils import clip
from util.mean_std import mean, std
def data_to_cuda(samples, targets):
return samples.to(torch.device("cuda")), [
{k: v if isinstance(v, str) else v.to(torch.device("cuda")) for k, v in t.items()} for t in targets
]
def get_random_layer_targets(targets, gt_layer):
random_targets = []
for batch_i, target_i in enumerate(targets):
random_layer_targets_i = copy.deepcopy(target_i)
if gt_layer[batch_i] != len(random_layer_targets_i["layer_indices"]) - 1:
start = random_layer_targets_i["layer_indices"][gt_layer[batch_i]].item()
end = random_layer_targets_i["layer_indices"][gt_layer[batch_i] + 1].item()
else:
start = random_layer_targets_i["layer_indices"][gt_layer[batch_i]].item()
end = len(random_layer_targets_i["points"])
random_points_i = random_layer_targets_i["points"][start:end, :]
random_edges_i = random_layer_targets_i["edges"][start:end]
random_unnormalized_points_i = random_layer_targets_i["unnormalized_points"][start:end, :]
random_layer_targets_i["points"] = random_points_i
random_layer_targets_i["edges"] = random_edges_i
random_layer_targets_i["unnormalized_points"] = random_unnormalized_points_i
del random_layer_targets_i["layer_indices"]
random_targets.append(random_layer_targets_i)
return random_targets
def random_layers(targets):
return [random.randint(0, len(targets[i]["layer_indices"]) - 1) for i in range(len(targets))]
def get_given_layers_random_region(targets, graphs):
random_regions = []
for bs_i in range(len(targets)):
# target
targets_i = targets[bs_i]
graphs_i = graphs[bs_i]
# level 0: start
start_i = tuple(targets_i["unnormalized_points"][0].tolist())
# sampled prob: for a neighborhood, each node is sampled by this probability
# sampled_prob = 0.0001
# sampled_prob = random.random()
sampled_prob = 0.5
# sampled_prob = 1
# sampled nodes
sampled_points = {}
for point_tensor in targets_i["unnormalized_points"]:
pos = tuple(point_tensor.tolist())
sampled_points[pos] = 0
# edges of sampled nodes
sampled_edges = []
# nodes number of subgraph
# sampled_amount = random.randint(0, len(sampled_points) + 2)
# if sampled_amount in [len(sampled_points) + 1]:
# sampled_amount = 0
# if sampled_amount in [len(sampled_points) + 2]:
# sampled_amount = len(sampled_points)
sampled_amount = random.randint(0, len(sampled_points)) # TODO: 1~len(sampled_points)
# Note that when sampled_prob = 1, the number of sampled nodes must be in 'layer_indices' or be the total number of points to ensure that the entire layers is sampled.
# equal to BFS
if sampled_prob == 1:
l = targets_i["layer_indices"].tolist()
l.append(len(sampled_points))
l.append(0)
l.append(len(sampled_points))
sampled_amount = l[random.randint(0, len(l) - 1)]
# start sampling
if sampled_amount == 0:
random_regions.append((sampled_points, sampled_edges))
continue
sampled_points[start_i] = 1
if sampled_amount == 1:
random_regions.append((sampled_points, sampled_edges))
continue
max_iterations = max(1000, 10 * sampled_amount) # Ensure at least 1000 iterations
iteration_count = 0
while sum(sampled_points.values()) < sampled_amount:
iteration_count += 1
if iteration_count > max_iterations:
print("Reached maximum iterations, breaking to avoid infinite loop.")
break
all_sampled_points = set([k for k, v in sampled_points.items() if v == 1])
all_sampled_points_adjs = set()
for sampled_point in all_sampled_points:
adj = set([(int(x[0]), int(x[1])) for x in graphs_i[sampled_point]])
all_sampled_points_adjs = all_sampled_points_adjs.union(adj)
if (-1, -1) in all_sampled_points_adjs:
all_sampled_points_adjs.remove((-1, -1))
all_sampled_points_adjs = list(all_sampled_points_adjs.difference(all_sampled_points))
if not all_sampled_points_adjs: # If no more adjacent points to sample, break
print("No more adjacent points to sample, breaking the loop.")
break
# shuffle the last layer to let it uniform (no bias of sample order)
random.shuffle(all_sampled_points_adjs)
# determine whether to sample nodes in each neighborhood based on probability
for all_sampled_points_adj_index, all_sampled_points_adj in enumerate(all_sampled_points_adjs):
all_sampled_points = set([k for k, v in sampled_points.items() if v == 1])
if sum(sampled_points.values()) == sampled_amount:
break
else:
if 1:
if random.random() < sampled_prob:
sampled_points[all_sampled_points_adj] = 1
# sample edges
all_pos1s = graphs_i[all_sampled_points_adj]
pos2 = all_sampled_points_adj
for pos1 in all_pos1s:
if pos1 in all_sampled_points:
sampled_edges.append((pos1, pos2))
else:
sampled_points[all_sampled_points_adj] = 0
random_regions.append((sampled_points, sampled_edges))
return random_regions
def get_random_region_targets(given_layers, graphs, targets):
random_region_targets = []
for bs_i in range(len(targets)):
random_region_target = {}
targets_i = targets[bs_i]
graphs_i = graphs[bs_i]
given_layers_i = given_layers[bs_i]
sampled_points_i, sampled_edges_i = given_layers_i
if sum(sampled_points_i.values()) == 0:
random_region_target["edges"] = targets_i["edges"][:1]
if "semantic_left_up" in targets_i:
random_region_target["semantic_left_up"] = targets_i["semantic_left_up"][:1]
random_region_target["semantic_right_up"] = targets_i["semantic_right_up"][:1]
random_region_target["semantic_right_down"] = targets_i["semantic_right_down"][:1]
random_region_target["semantic_left_down"] = targets_i["semantic_left_down"][:1]
random_region_target["image_id"] = targets_i["image_id"]
random_region_target["size"] = targets_i["size"]
random_region_target["unnormalized_points"] = targets_i["unnormalized_points"][:1]
random_region_target["points"] = targets_i["points"][:1]
random_region_target["last_edges"] = torch.zeros(
(1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_target["this_edges"] = torch.zeros(
(1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_targets.append(random_region_target)
elif 1 <= sum(sampled_points_i.values()) <= len(sampled_points_i) - 1:
sampled_points_i_given = set([k for k, v in sampled_points_i.items() if v == 1])
unnormalized_points = []
for point, sampled_or_not in sampled_points_i.items():
if sampled_or_not == 0:
adjs = graphs_i[point]
for adj in adjs:
if adj in sampled_points_i_given:
unnormalized_points.append(point)
break
if len(unnormalized_points) == 0:
random_region_target["edges"] = targets_i["edges"][:1]
if "semantic_left_up" in targets_i:
random_region_target["semantic_left_up"] = targets_i["semantic_left_up"][:1]
random_region_target["semantic_right_up"] = targets_i["semantic_right_up"][:1]
random_region_target["semantic_right_down"] = targets_i["semantic_right_down"][:1]
random_region_target["semantic_left_down"] = targets_i["semantic_left_down"][:1]
random_region_target["image_id"] = targets_i["image_id"]
random_region_target["size"] = targets_i["size"]
random_region_target["unnormalized_points"] = targets_i["unnormalized_points"][:1]
random_region_target["points"] = targets_i["points"][:1]
random_region_target["last_edges"] = torch.zeros(
(1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_target["this_edges"] = torch.zeros(
(1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_targets.append(random_region_target)
continue
indices_for_semantic = []
for unnormalized_point in unnormalized_points:
for ind, every_point in enumerate(targets_i["unnormalized_points"]):
every_point = tuple(every_point.tolist())
if (
abs(every_point[0] - unnormalized_point[0]) <= 2
and abs(every_point[1] - unnormalized_point[1]) <= 2
):
indices_for_semantic.append(ind)
# assert len(unnormalized_points) == len(indices_for_semantic)
semantic_left_up = []
semantic_right_up = []
semantic_right_down = []
semantic_left_down = []
if "semantic_left_up" in targets_i:
for ind in indices_for_semantic:
semantic_left_up.append(targets_i["semantic_left_up"][ind].item())
semantic_right_up.append(targets_i["semantic_right_up"][ind].item())
semantic_right_down.append(targets_i["semantic_right_down"][ind].item())
semantic_left_down.append(targets_i["semantic_left_down"][ind].item())
edges = []
for unnormalized_point in unnormalized_points:
edge = ""
adjs = graphs_i[unnormalized_point]
for adj in adjs:
if adj != (-1, -1):
edge += "1"
else:
edge += "0"
edge = get_edges_alldirections_rev(edge)
edges.append(edge)
last_edges = []
for unnormalized_point in unnormalized_points:
last_edge = ""
adjs = graphs_i[unnormalized_point]
for adj in adjs:
if adj in sampled_points_i_given:
last_edge += "1"
else:
last_edge += "0"
last_edge = get_edges_alldirections_rev(last_edge)
last_edges.append(last_edge)
this_edges = []
for unnormalized_point in unnormalized_points:
this_edge = ""
adjs = graphs_i[unnormalized_point]
for adj in adjs:
if adj in unnormalized_points:
this_edge += "1"
else:
this_edge += "0"
this_edge = get_edges_alldirections_rev(this_edge)
this_edges.append(this_edge)
random_region_target["edges"] = torch.tensor(
edges, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
if "semantic_left_up" in targets_i:
random_region_target["semantic_left_up"] = torch.tensor(
semantic_left_up,
dtype=targets_i["semantic_left_up"].dtype,
device=targets_i["semantic_left_up"].device,
)
random_region_target["semantic_right_up"] = torch.tensor(
semantic_right_up,
dtype=targets_i["semantic_right_up"].dtype,
device=targets_i["semantic_right_up"].device,
)
random_region_target["semantic_right_down"] = torch.tensor(
semantic_right_down,
dtype=targets_i["semantic_right_down"].dtype,
device=targets_i["semantic_right_down"].device,
)
random_region_target["semantic_left_down"] = torch.tensor(
semantic_left_down,
dtype=targets_i["semantic_left_down"].dtype,
device=targets_i["semantic_left_down"].device,
)
random_region_target["image_id"] = targets_i["image_id"]
random_region_target["size"] = targets_i["size"]
# NEW
# if len(unnormalized_points) == 0:
# print("Warning: unnormalized_points is empty. Initializing to default value.")
# unnormalized_points = torch.zeros((1, 2), dtype=targets_i['unnormalized_points'].dtype,
# device=targets_i['unnormalized_points'].device)
# else:
# random_region_target['unnormalized_points'] = torch.tensor(unnormalized_points,
# dtype=targets_i['unnormalized_points'].dtype,
# device=targets_i['unnormalized_points'].device)
random_region_target["unnormalized_points"] = torch.tensor(
unnormalized_points,
dtype=targets_i["unnormalized_points"].dtype,
device=targets_i["unnormalized_points"].device,
)
random_region_target["points"] = (
torch.tensor(unnormalized_points, dtype=targets_i["points"].dtype, device=targets_i["points"].device)
/ targets_i["size"]
)
random_region_target["last_edges"] = torch.tensor(
last_edges, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_target["this_edges"] = torch.tensor(
this_edges, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_targets.append(random_region_target)
else:
random_region_target["edges"] = 16 * torch.ones(
targets_i["edges"][:1].shape, dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
if "semantic_left_up" in targets_i:
random_region_target["semantic_left_up"] = 11 * torch.ones(
targets_i["semantic_left_up"][:1].shape,
dtype=targets_i["semantic_left_up"].dtype,
device=targets_i["semantic_left_up"].device,
)
random_region_target["semantic_right_up"] = 11 * torch.ones(
targets_i["semantic_right_up"][:1].shape,
dtype=targets_i["semantic_right_up"].dtype,
device=targets_i["semantic_right_up"].device,
)
random_region_target["semantic_right_down"] = 11 * torch.ones(
targets_i["semantic_right_down"][:1].shape,
dtype=targets_i["semantic_right_down"].dtype,
device=targets_i["semantic_right_down"].device,
)
random_region_target["semantic_left_down"] = 11 * torch.ones(
targets_i["semantic_left_down"][:1].shape,
dtype=targets_i["semantic_left_down"].dtype,
device=targets_i["semantic_left_down"].device,
)
random_region_target["image_id"] = targets_i["image_id"]
random_region_target["size"] = targets_i["size"]
random_region_target["unnormalized_points"] = 505 * torch.ones(
targets_i["unnormalized_points"][:1].shape,
dtype=targets_i["unnormalized_points"][:1].dtype,
device=targets_i["unnormalized_points"][:1].device,
)
random_region_target["points"] = (
505
* torch.ones(
targets_i["unnormalized_points"][:1].shape,
dtype=targets_i["points"][:1].dtype,
device=targets_i["points"][:1].device,
)
) / targets_i["size"]
random_region_target["last_edges"] = 16 * torch.ones(
(1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_target["this_edges"] = 16 * torch.ones(
(1,), dtype=targets_i["edges"].dtype, device=targets_i["edges"].device
)
random_region_targets.append(random_region_target)
return random_region_targets
def random_pertubation(sampled_points_i, sampled_edges_i):
random_pertube_map = {}
sigma = 2
pertube_threshold = 5
for sampled_point in sampled_points_i:
random_pertube_map[sampled_point] = (
sampled_point[0] + clip(int(random.gauss(0, sigma)), -1 * pertube_threshold, pertube_threshold),
sampled_point[1] + clip(int(random.gauss(0, sigma)), -1 * pertube_threshold, pertube_threshold),
)
new_sampled_points_i = {}
new_sampled_edges_i = []
for sampled_point in sampled_points_i:
new_sampled_points_i[random_pertube_map[sampled_point]] = sampled_points_i[sampled_point]
for pos1, pos2 in sampled_edges_i:
new_sampled_edges_i.append((random_pertube_map[pos1], random_pertube_map[pos2]))
return new_sampled_points_i, new_sampled_edges_i
def draw_given_layers_on_tensors_random_region(given_layers, tensors, graphs):
"""draw 9*9 yellow squares and width 2 blue lines"""
tensors_list = []
unnormalized_list = []
for i in range(len(given_layers)):
temp_tensor = tensors[i]
temp_tensor_0 = (temp_tensor[0] * std[0] + mean[0]) * 255
temp_tensor_1 = (temp_tensor[1] * std[1] + mean[1]) * 255
temp_tensor_2 = (temp_tensor[2] * std[2] + mean[2]) * 255
rectangle_radius = 5
# end sign
endsign = (505, 505)
valid_violet_endsign_up = endsign[1] - rectangle_radius
valid_violet_endsign_down = endsign[1] + rectangle_radius
valid_violet_endsign_left = endsign[0] - rectangle_radius
valid_violet_endsign_right = endsign[0] + rectangle_radius
temp_tensor_0[
valid_violet_endsign_up : valid_violet_endsign_down + 1,
valid_violet_endsign_left : valid_violet_endsign_right + 1,
] = 255
temp_tensor_1[
valid_violet_endsign_up : valid_violet_endsign_down + 1,
valid_violet_endsign_left : valid_violet_endsign_right + 1,
] = 0
temp_tensor_2[
valid_violet_endsign_up : valid_violet_endsign_down + 1,
valid_violet_endsign_left : valid_violet_endsign_right + 1,
] = 255
sampled_points_i, sampled_edges_i = given_layers[i]
sampled_points_i, sampled_edges_i = random_pertubation(sampled_points_i, sampled_edges_i)
given_points = [k for k, v in sampled_points_i.items() if v == 1]
for j, pos in enumerate(given_points):
valid_yellow_pos_up = int(pos[1] - rectangle_radius) if (pos[1] - rectangle_radius) >= 0 else 0
valid_yellow_pos_down = (
int(pos[1] + rectangle_radius)
if (pos[1] + rectangle_radius) < temp_tensor.shape[2]
else temp_tensor.shape[2] - 1
)
valid_yellow_pos_left = int(pos[0] - rectangle_radius) if (pos[0] - rectangle_radius) >= 0 else 0
valid_yellow_pos_right = (
int(pos[0] + rectangle_radius)
if (pos[0] + rectangle_radius) < temp_tensor.shape[1]
else temp_tensor.shape[1] - 1
)
temp_tensor_0[
valid_yellow_pos_up : valid_yellow_pos_down + 1, valid_yellow_pos_left : valid_yellow_pos_right + 1
] = 255
temp_tensor_1[
valid_yellow_pos_up : valid_yellow_pos_down + 1, valid_yellow_pos_left : valid_yellow_pos_right + 1
] = 255
temp_tensor_2[
valid_yellow_pos_up : valid_yellow_pos_down + 1, valid_yellow_pos_left : valid_yellow_pos_right + 1
] = 0
# draw blue lines
line_width = 2
for edge in sampled_edges_i:
pos1 = (int(edge[0][0]), int(edge[0][1]))
pos2 = (int(edge[1][0]), int(edge[1][1]))
if abs(pos1[0] - pos2[0]) < abs(pos1[1] - pos2[1]):
if pos1[1] > pos2[1]:
temp_tensor_0[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_1[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_2[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 255
else:
temp_tensor_0[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_1[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_2[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 255
else:
if pos1[0] > pos2[0]:
temp_tensor_0[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 0
temp_tensor_1[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 0
temp_tensor_2[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 255
else:
temp_tensor_0[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 0
temp_tensor_1[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 0
temp_tensor_2[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 255
unnormalized = torch.stack((temp_tensor_0, temp_tensor_1, temp_tensor_2), dim=0)
unnormalized_list.append(unnormalized)
temp_tensor_0_renorm = ((temp_tensor_0 / 255) - mean[0]) / std[0]
temp_tensor_1_renorm = ((temp_tensor_1 / 255) - mean[1]) / std[1]
temp_tensor_2_renorm = ((temp_tensor_2 / 255) - mean[2]) / std[2]
temp_tensor = torch.stack([temp_tensor_0_renorm, temp_tensor_1_renorm, temp_tensor_2_renorm], dim=0)
tensors_list.append(temp_tensor)
return torch.stack(tensors_list, dim=0), torch.stack(unnormalized_list, dim=0)
def initialize_tensors(tensors):
tensors_list = []
unnormalized_list = []
for i in range(len(tensors)):
temp_tensor = tensors[i]
temp_tensor_0 = (temp_tensor[0] * std[0] + mean[0]) * 255
temp_tensor_1 = (temp_tensor[1] * std[1] + mean[1]) * 255
temp_tensor_2 = (temp_tensor[2] * std[2] + mean[2]) * 255
rectangle_radius = 5 # 4+1+4=9
# end sign (when predict this, AR iteration terminates)
endsign = (505, 505)
valid_violet_endsign_up = endsign[1] - rectangle_radius
valid_violet_endsign_down = endsign[1] + rectangle_radius
valid_violet_endsign_left = endsign[0] - rectangle_radius
valid_violet_endsign_right = endsign[0] + rectangle_radius
temp_tensor_0[
valid_violet_endsign_up : valid_violet_endsign_down + 1,
valid_violet_endsign_left : valid_violet_endsign_right + 1,
] = 255
temp_tensor_1[
valid_violet_endsign_up : valid_violet_endsign_down + 1,
valid_violet_endsign_left : valid_violet_endsign_right + 1,
] = 0
temp_tensor_2[
valid_violet_endsign_up : valid_violet_endsign_down + 1,
valid_violet_endsign_left : valid_violet_endsign_right + 1,
] = 255
unnormalized = torch.stack((temp_tensor_0, temp_tensor_1, temp_tensor_2), dim=0)
unnormalized_list.append(unnormalized)
temp_tensor_0_renorm = ((temp_tensor_0 / 255) - mean[0]) / std[0]
temp_tensor_1_renorm = ((temp_tensor_1 / 255) - mean[1]) / std[1]
temp_tensor_2_renorm = ((temp_tensor_2 / 255) - mean[2]) / std[2]
temp_tensor = torch.stack([temp_tensor_0_renorm, temp_tensor_1_renorm, temp_tensor_2_renorm], dim=0)
tensors_list.append(temp_tensor)
return torch.stack(tensors_list, dim=0), torch.stack(unnormalized_list, dim=0)
def l1_dist(pos1, pos2):
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def delete_graphs(targets):
no_graph_targets = []
for target in targets:
target_ = copy.deepcopy(target)
del target_["graph"]
no_graph_targets.append(target_)
return no_graph_targets
def delete_graphs_and_unnormpoints(targets):
no_graph_targets = []
for target in targets:
target_ = copy.deepcopy(target)
del target_["graph"]
del target_["unnormalized_points"]
no_graph_targets.append(target_)
return no_graph_targets
def get_remove_point(this_preds, dist_threshold):
for point1 in this_preds:
for point2 in this_preds:
# if point1 != point2:
if not (
(point1["points"].tolist()[0] == point2["points"].tolist()[0])
and (point1["points"].tolist()[1] == point2["points"].tolist()[1])
):
dist_chebyshev = max(
abs(point1["points"].tolist()[0] - point2["points"].tolist()[0]),
abs(point1["points"].tolist()[1] - point2["points"].tolist()[1]),
)
if dist_chebyshev <= dist_threshold:
point1_confidence = point1["scores"].item()
point2_confidence = point2["scores"].item()
if point1_confidence < point2_confidence:
return point1
elif point2_confidence < point1_confidence:
return point2
else:
return [point1, point2][random.randint(0, 1)]
return None
def point_inside(point, points_list):
point1 = tuple(point["points"].tolist())
for point_i in points_list:
point1_i = tuple(point_i["points"].tolist())
if point1 == point1_i:
return True
return False
def remove_points(need_to_remove_in_last_edges, this_preds):
result = []
for this_pred in this_preds:
if not point_inside(this_pred, need_to_remove_in_last_edges):
result.append(this_pred)
return result
def nms(this_preds):
if len(this_preds) <= 1:
return this_preds
else:
dist_threshold = 5
while True:
remove_point = get_remove_point(this_preds, dist_threshold)
if remove_point is None:
break
else:
# this_preds.remove(remove_point)
this_preds = remove_points([remove_point], this_preds)
return this_preds
def nms_givenpoints(this_preds, preds):
if len(this_preds) == 0:
return this_preds
else:
all_given_points = []
for given_points, given_last_edges, given_this_edges in preds:
all_given_points.extend(given_points)
if len(all_given_points) == 0:
return this_preds
this_preds_copy = copy.deepcopy(this_preds)
dist_threshold = 5
for this_pred in this_preds_copy:
for given_point in all_given_points:
this_pred_pos = tuple(this_pred["points"].tolist())
given_point_pos = tuple(given_point["points"].tolist())
dist_chebyshev = max(
abs(this_pred_pos[0] - given_point_pos[0]), abs(this_pred_pos[1] - given_point_pos[1])
)
if dist_chebyshev <= dist_threshold:
this_preds = remove_points([this_pred], this_preds)
break
return this_preds
def random_keep(this_preds):
if len(this_preds) <= 1:
return this_preds
else:
while True:
random_keep_this_preds = []
for point in this_preds:
# is_keep = random.random() < point['scores'].item()
is_keep = random.random() < 1.01
# is_keep = random.random() < 0.5
if is_keep:
random_keep_this_preds.append(point)
if len(random_keep_this_preds) > 0:
return random_keep_this_preds
def is_stop(this_preds):
if len(this_preds) == 0:
return 1 # stop
elif (len(this_preds) >= 1) and (16 in [p["edges"].item() for p in this_preds]):
return 2 # normally terminate
else:
return 0 # not stop
def draw_preds_on_tensors(preds, tensors):
tensors_list = []
unnormalized_list = []
for i in range(len(tensors)):
temp_tensor = tensors[i]
temp_tensor_0 = (temp_tensor[0] * std[0] + mean[0]) * 255
temp_tensor_1 = (temp_tensor[1] * std[1] + mean[1]) * 255
temp_tensor_2 = (temp_tensor[2] * std[2] + mean[2]) * 255
rectangle_radius = 5
this_preds, last_edges, this_edges = preds[-1]
for this_pred in this_preds:
point = tuple([int(_) for _ in this_pred["points"].tolist()])
up = point[1] - rectangle_radius
down = point[1] + rectangle_radius
left = point[0] - rectangle_radius
right = point[0] + rectangle_radius
temp_tensor_0[up : down + 1, left : right + 1] = 255
temp_tensor_1[up : down + 1, left : right + 1] = 255
temp_tensor_2[up : down + 1, left : right + 1] = 0
line_width = 2
for last_edge in last_edges:
pos1 = tuple([int(_) for _ in last_edge[0]["points"].tolist()])
pos2 = tuple([int(_) for _ in last_edge[1]["points"].tolist()])
if abs(pos1[0] - pos2[0]) < abs(pos1[1] - pos2[1]):
if pos1[1] > pos2[1]:
temp_tensor_0[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_1[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_2[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 255
else:
temp_tensor_0[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_1[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_2[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 255
else:
if pos1[0] > pos2[0]:
temp_tensor_0[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 0
temp_tensor_1[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 0
temp_tensor_2[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 255
else:
temp_tensor_0[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 0
temp_tensor_1[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 0
temp_tensor_2[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 255
for this_edge in this_edges:
pos1 = tuple([int(_) for _ in this_edge[0]["points"].tolist()])
pos2 = tuple([int(_) for _ in this_edge[1]["points"].tolist()])
if abs(pos1[0] - pos2[0]) < abs(pos1[1] - pos2[1]):
if pos1[1] > pos2[1]:
temp_tensor_0[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_1[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_2[
pos2[1] : pos1[1] + 1,
int((pos1[0] + pos2[0]) / 2) - int(line_width / 2) : int((pos1[0] + pos2[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 255
else:
temp_tensor_0[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_1[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 0
temp_tensor_2[
pos1[1] : pos2[1] + 1,
int((pos2[0] + pos1[0]) / 2) - int(line_width / 2) : int((pos2[0] + pos1[0]) / 2)
+ int(line_width / 2)
+ 1,
] = 255
else:
if pos1[0] > pos2[0]:
temp_tensor_0[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 0
temp_tensor_1[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 0
temp_tensor_2[
int((pos1[1] + pos2[1]) / 2) - int(line_width / 2) : int((pos1[1] + pos2[1]) / 2)
+ int(line_width / 2)
+ 1,
pos2[0] : pos1[0] + 1,
] = 255
else:
temp_tensor_0[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 0
temp_tensor_1[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 0
temp_tensor_2[
int((pos2[1] + pos1[1]) / 2) - int(line_width / 2) : int((pos2[1] + pos1[1]) / 2)
+ int(line_width / 2)
+ 1,
pos1[0] : pos2[0] + 1,
] = 255
unnormalized = torch.stack((temp_tensor_0, temp_tensor_1, temp_tensor_2), dim=0)
unnormalized_list.append(unnormalized)
temp_tensor_0_renorm = ((temp_tensor_0 / 255) - mean[0]) / std[0]
temp_tensor_1_renorm = ((temp_tensor_1 / 255) - mean[1]) / std[1]
temp_tensor_2_renorm = ((temp_tensor_2 / 255) - mean[2]) / std[2]
temp_tensor = torch.stack([temp_tensor_0_renorm, temp_tensor_1_renorm, temp_tensor_2_renorm], dim=0)
tensors_list.append(temp_tensor)
return torch.stack(tensors_list, dim=0), torch.stack(unnormalized_list, dim=0)
def edge_inside(edge, edges_list):
edge_point1 = tuple(edge[0]["points"].tolist())
edge_point2 = tuple(edge[1]["points"].tolist())
for edge_i in edges_list:
edge_i_point1 = tuple(edge_i[0]["points"].tolist())
edge_i_point2 = tuple(edge_i[1]["points"].tolist())
if ((edge_point1 == edge_i_point1) and (edge_point2 == edge_i_point2)) or (
(edge_point1 == edge_i_point2) and (edge_point2 == edge_i_point1)
):
return True
return False
def remove_edge(edge, edges_list):
result = []
edge_point1 = tuple(edge[0]["points"].tolist())
edge_point2 = tuple(edge[1]["points"].tolist())
for edge_i in edges_list:
edge_i_point1 = tuple(edge_i[0]["points"].tolist())
edge_i_point2 = tuple(edge_i[1]["points"].tolist())
if (edge_point1 == edge_i_point1) and (edge_point2 == edge_i_point2):
pass
else:
result.append(edge_i)
return result
def get_edges_amount(preds):
count = 0
for this_preds, last_edges, this_edges in preds:
count += len(last_edges)
count += len(this_edges)
return count
def get_reserve_preds(results, keep_confidence_threshold, targets):
reserve_preds = []
valid_label_indices_edges = torch.where(results["edges"] != 0)[0]
valid_label_indices_scores = torch.where(results["scores"] <= keep_confidence_threshold)[0]
valid_label_indices = torch.tensor(
list(set(valid_label_indices_edges.tolist()).intersection(set(valid_label_indices_scores.tolist()))),
dtype=valid_label_indices_edges.dtype,
device=valid_label_indices_edges.device,
)
for valid_label_indice in valid_label_indices:
valid_results_i = {}
valid_results_i["scores"] = results["scores"][valid_label_indice]
valid_results_i["points"] = results["points"][valid_label_indice]
valid_results_i["last_edges"] = results["last_edges"][valid_label_indice]
valid_results_i["this_edges"] = results["this_edges"][valid_label_indice]
valid_results_i["edges"] = results["edges"][valid_label_indice]
valid_results_i["size"] = targets[0]["size"]
reserve_preds.append(valid_results_i)
return reserve_preds