| | |
| | import argparse |
| | import json |
| | import pickle |
| | import random |
| | from itertools import product |
| | import numpy as np |
| | import torch |
| | import torch.nn as nn |
| | import torchvision.transforms as transforms |
| | from torch.utils.data import DataLoader |
| | from torchvision.datasets import ImageFolder |
| | from tqdm import tqdm |
| | from common.evaluation import Evaluator |
| | from model import chmnet |
| | from model.base.geometry import Geometry |
| |
|
| | from Utils import ( |
| | CosineCustomDataset, |
| | PairedLayer4Extractor, |
| | compute_spatial_similarity, |
| | generate_mask, |
| | normalize_array, |
| | get_transforms, |
| | arg_topK, |
| | ) |
| |
|
| | |
| | random.seed(42) |
| |
|
| | |
| | to_np = lambda x: x.data.to("cpu").numpy() |
| |
|
| | |
| | chm_args = dict( |
| | { |
| | "alpha": [0.05, 0.1], |
| | "img_size": 240, |
| | "ktype": "psi", |
| | "load": "pas_psi.pt", |
| | } |
| | ) |
| |
|
| |
|
| | class CHMGridTransfer: |
| | def __init__( |
| | self, |
| | query_image, |
| | support_set, |
| | support_set_labels, |
| | train_folder, |
| | top_N, |
| | top_K, |
| | binarization_threshold, |
| | chm_source_transform, |
| | chm_target_transform, |
| | cosine_source_transform, |
| | cosine_target_transform, |
| | batch_size=64, |
| | ): |
| | self.N = top_N |
| | self.K = top_K |
| | self.BS = batch_size |
| |
|
| | self.chm_source_transform = chm_source_transform |
| | self.chm_target_transform = chm_target_transform |
| | self.cosine_source_transform = cosine_source_transform |
| | self.cosine_target_transform = cosine_target_transform |
| |
|
| | self.source_embeddings = None |
| | self.target_embeddings = None |
| | self.correspondence_map = None |
| | self.similarity_maps = None |
| | self.reverse_similarity_maps = None |
| | self.transferred_points = None |
| |
|
| | self.binarization_threshold = binarization_threshold |
| |
|
| | |
| | self.q = query_image |
| | self.support_set = support_set |
| | self.labels_ss = support_set_labels |
| |
|
| | def build(self): |
| | |
| | test_ds = CosineCustomDataset( |
| | query_image=self.q, |
| | supporting_set=self.support_set, |
| | source_transform=self.chm_source_transform, |
| | target_transform=self.chm_target_transform, |
| | ) |
| | test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) |
| | self.find_correspondences(test_dl) |
| |
|
| | |
| | test_ds = CosineCustomDataset( |
| | query_image=self.q, |
| | supporting_set=self.support_set, |
| | source_transform=self.cosine_source_transform, |
| | target_transform=self.cosine_target_transform, |
| | ) |
| | test_dl = DataLoader(test_ds, batch_size=self.BS, shuffle=False) |
| | self.compute_embeddings(test_dl) |
| | self.compute_similarity_map() |
| |
|
| | def find_correspondences(self, test_dl): |
| | model = chmnet.CHMNet(chm_args["ktype"]) |
| | model.load_state_dict( |
| | torch.load(chm_args["load"], map_location=torch.device("cpu")) |
| | ) |
| | Evaluator.initialize(chm_args["alpha"]) |
| | Geometry.initialize(img_size=chm_args["img_size"]) |
| |
|
| | grid_results = [] |
| | transferred_points = [] |
| |
|
| | |
| | fixed_src_grid_points = list( |
| | product( |
| | np.linspace(1 + 17, 240 - 17 - 1, 7), |
| | np.linspace(1 + 17, 240 - 17 - 1, 7), |
| | ) |
| | ) |
| | fixed_src_grid_points = np.asarray(fixed_src_grid_points, dtype=np.float64).T |
| |
|
| | with torch.no_grad(): |
| | model.eval() |
| | for idx, batch in enumerate(tqdm(test_dl)): |
| |
|
| | keypoints = ( |
| | torch.tensor(fixed_src_grid_points) |
| | .unsqueeze(0) |
| | .repeat(batch["src_img"].shape[0], 1, 1) |
| | ) |
| | n_pts = torch.tensor( |
| | np.asarray(batch["src_img"].shape[0] * [49]), dtype=torch.long |
| | ) |
| |
|
| | corr_matrix = model(batch["src_img"], batch["trg_img"]) |
| | prd_kps = Geometry.transfer_kps( |
| | corr_matrix, keypoints, n_pts, normalized=False |
| | ) |
| | transferred_points.append(prd_kps.cpu().numpy()) |
| | for tgt_points in prd_kps: |
| | tgt_grid = [] |
| | for x, y in zip(tgt_points[0], tgt_points[1]): |
| | tgt_grid.append( |
| | [int(((x + 1) / 2.0) * 7), int(((y + 1) / 2.0) * 7)] |
| | ) |
| | grid_results.append(tgt_grid) |
| |
|
| | self.correspondence_map = grid_results |
| | self.transferred_points = np.vstack(transferred_points) |
| |
|
| | def compute_embeddings(self, test_dl): |
| | paired_extractor = PairedLayer4Extractor() |
| |
|
| | source_embeddings = [] |
| | target_embeddings = [] |
| |
|
| | with torch.no_grad(): |
| | for idx, batch in enumerate(test_dl): |
| | s_e, t_e = paired_extractor((batch["src_img"], batch["trg_img"])) |
| |
|
| | source_embeddings.append(s_e) |
| | target_embeddings.append(t_e) |
| |
|
| | |
| | self.source_embeddings = torch.cat(source_embeddings, axis=0) |
| | self.target_embeddings = torch.cat(target_embeddings, axis=0) |
| |
|
| | def compute_similarity_map(self): |
| | CosSim = nn.CosineSimilarity(dim=0, eps=1e-6) |
| |
|
| | similarity_maps = [] |
| | rsimilarity_maps = [] |
| |
|
| | grid = [] |
| | for i in range(7): |
| | for j in range(7): |
| | grid.append([i, j]) |
| |
|
| | |
| | for i in range(len(self.correspondence_map)): |
| | cosine_map = np.zeros((7, 7)) |
| | reverse_cosine_map = np.zeros((7, 7)) |
| |
|
| | |
| | for S, T in zip(grid, self.correspondence_map[i]): |
| | v1 = self.source_embeddings[i][:, S[0], S[1]] |
| | v2 = self.target_embeddings[i][:, T[0], T[1]] |
| | covalue = CosSim(v1, v2) |
| | cosine_map[S[0], S[1]] = covalue |
| | reverse_cosine_map[T[0], T[1]] = covalue |
| |
|
| | similarity_maps.append(cosine_map) |
| | rsimilarity_maps.append(reverse_cosine_map) |
| |
|
| | self.similarity_maps = similarity_maps |
| | self.reverse_similarity_maps = rsimilarity_maps |
| |
|
| | def compute_score_using_cc(self): |
| | |
| | SIMS_source, SIMS_target = [], [] |
| | for i in range(len(self.source_embeddings)): |
| | simA, simB = compute_spatial_similarity( |
| | to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) |
| | ) |
| |
|
| | SIMS_source.append(simA) |
| | SIMS_target.append(simB) |
| |
|
| | SIMS_source = np.stack(SIMS_source, axis=0) |
| | |
| |
|
| | top_cos_values = [] |
| |
|
| | for i in range(len(self.similarity_maps)): |
| | cosine_value = np.multiply( |
| | self.similarity_maps[i], |
| | generate_mask( |
| | normalize_array(SIMS_source[i]), t=self.binarization_threshold |
| | ), |
| | ) |
| | top_5_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1][:5] |
| | mean_of_top_5 = np.mean( |
| | [cosine_value.T.reshape(-1)[x] for x in top_5_indicies] |
| | ) |
| | top_cos_values.append(np.mean(mean_of_top_5)) |
| |
|
| | return top_cos_values |
| |
|
| | def compute_score_using_custom_points(self, selected_keypoint_masks): |
| | top_cos_values = [] |
| |
|
| | for i in range(len(self.similarity_maps)): |
| | cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) |
| | top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] |
| | mean_of_tops = np.mean( |
| | [cosine_value.T.reshape(-1)[x] for x in top_indicies] |
| | ) |
| | top_cos_values.append(np.mean(mean_of_tops)) |
| |
|
| | return top_cos_values |
| |
|
| | def export(self): |
| | storage = { |
| | "N": self.N, |
| | "K": self.K, |
| | "source_embeddings": self.source_embeddings, |
| | "target_embeddings": self.target_embeddings, |
| | "correspondence_map": self.correspondence_map, |
| | "similarity_maps": self.similarity_maps, |
| | "T": self.binarization_threshold, |
| | "query": self.q, |
| | "support_set": self.support_set, |
| | "labels_for_support_set": self.labels_ss, |
| | "rsimilarity_maps": self.reverse_similarity_maps, |
| | "transferred_points": self.transferred_points, |
| | } |
| |
|
| | return ModifiableCHMResults(storage) |
| |
|
| |
|
| | class ModifiableCHMResults: |
| | def __init__(self, storage): |
| | self.N = storage["N"] |
| | self.K = storage["K"] |
| | self.source_embeddings = storage["source_embeddings"] |
| | self.target_embeddings = storage["target_embeddings"] |
| | self.correspondence_map = storage["correspondence_map"] |
| | self.similarity_maps = storage["similarity_maps"] |
| | self.T = storage["T"] |
| | self.q = storage["query"] |
| | self.support_set = storage["support_set"] |
| | self.labels_ss = storage["labels_for_support_set"] |
| | self.rsimilarity_maps = storage["rsimilarity_maps"] |
| | self.transferred_points = storage["transferred_points"] |
| | self.similarity_maps_masked = None |
| | self.SIMS_source = None |
| | self.SIMS_target = None |
| | self.masked_sim_values = [] |
| | self.top_cos_values = [] |
| |
|
| | def compute_score_using_cc(self): |
| | |
| | SIMS_source, SIMS_target = [], [] |
| | for i in range(len(self.source_embeddings)): |
| | simA, simB = compute_spatial_similarity( |
| | to_np(self.source_embeddings[i]), to_np(self.target_embeddings[i]) |
| | ) |
| |
|
| | SIMS_source.append(simA) |
| | SIMS_target.append(simB) |
| |
|
| | SIMS_source = np.stack(SIMS_source, axis=0) |
| | SIMS_target = np.stack(SIMS_target, axis=0) |
| |
|
| | self.SIMS_source = SIMS_source |
| | self.SIMS_target = SIMS_target |
| |
|
| | top_cos_values = [] |
| |
|
| | for i in range(len(self.similarity_maps)): |
| | masked_sim_values = np.multiply( |
| | self.similarity_maps[i], |
| | generate_mask(normalize_array(SIMS_source[i]), t=self.T), |
| | ) |
| | self.masked_sim_values.append(masked_sim_values) |
| | top_5_indicies = np.argsort(masked_sim_values.T.reshape(-1))[::-1][:5] |
| | mean_of_top_5 = np.mean( |
| | [masked_sim_values.T.reshape(-1)[x] for x in top_5_indicies] |
| | ) |
| | top_cos_values.append(np.mean(mean_of_top_5)) |
| |
|
| | self.top_cos_values = top_cos_values |
| |
|
| | return top_cos_values |
| |
|
| | def compute_score_using_custom_points(self, selected_keypoint_masks): |
| | top_cos_values = [] |
| | similarity_maps_masked = [] |
| |
|
| | for i in range(len(self.similarity_maps)): |
| | cosine_value = np.multiply(self.similarity_maps[i], selected_keypoint_masks) |
| | similarity_maps_masked.append(cosine_value) |
| | top_indicies = np.argsort(cosine_value.T.reshape(-1))[::-1] |
| | mean_of_tops = np.mean( |
| | [cosine_value.T.reshape(-1)[x] for x in top_indicies] |
| | ) |
| | top_cos_values.append(np.mean(mean_of_tops)) |
| |
|
| | self.similarity_maps_masked = similarity_maps_masked |
| | return top_cos_values |
| |
|
| | def predict_using_cc(self): |
| | top_cos_values = self.compute_score_using_cc() |
| | |
| | prediction = np.argmax( |
| | np.bincount( |
| | [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| | ) |
| | ) |
| | prediction_weight = np.max( |
| | np.bincount( |
| | [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| | ) |
| | ) |
| |
|
| | reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] |
| | reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] |
| |
|
| | topK_idx = [ |
| | x |
| | for x in np.argsort(top_cos_values)[::-1] |
| | if self.labels_ss[x] == prediction |
| | ] |
| | topK_files = [self.support_set[x] for x in topK_idx] |
| | topK_cmaps = [self.correspondence_map[x] for x in topK_idx] |
| | topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] |
| | topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] |
| | topK_transfered_points = [self.transferred_points[x] for x in topK_idx] |
| | predicted_folder_name = topK_files[0].split("/")[-2] |
| |
|
| | return ( |
| | topK_idx, |
| | prediction, |
| | predicted_folder_name, |
| | prediction_weight, |
| | topK_files[: self.K], |
| | reranked_nns_files[: self.K], |
| | topK_cmaps[: self.K], |
| | topK_similarity_maps[: self.K], |
| | topK_rsimilarity_maps[: self.K], |
| | topK_transfered_points[: self.K], |
| | ) |
| |
|
| | def predict_custom_pairs(self, selected_keypoint_masks): |
| | top_cos_values = self.compute_score_using_custom_points(selected_keypoint_masks) |
| |
|
| | |
| | prediction = np.argmax( |
| | np.bincount( |
| | [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| | ) |
| | ) |
| | prediction_weight = np.max( |
| | np.bincount( |
| | [self.labels_ss[x] for x in np.argsort(top_cos_values)[::-1][: self.K]] |
| | ) |
| | ) |
| |
|
| | reranked_nns_idx = [x for x in np.argsort(top_cos_values)[::-1]] |
| | reranked_nns_files = [self.support_set[x] for x in reranked_nns_idx] |
| |
|
| | topK_idx = [ |
| | x |
| | for x in np.argsort(top_cos_values)[::-1] |
| | if self.labels_ss[x] == prediction |
| | ] |
| | topK_files = [self.support_set[x] for x in topK_idx] |
| | topK_cmaps = [self.correspondence_map[x] for x in topK_idx] |
| | topK_similarity_maps = [self.similarity_maps[x] for x in topK_idx] |
| | topK_rsimilarity_maps = [self.rsimilarity_maps[x] for x in topK_idx] |
| | topK_transferred_points = [self.transferred_points[x] for x in topK_idx] |
| | |
| | topK_masked_sims = [self.similarity_maps_masked[x] for x in topK_idx] |
| | predicted_folder_name = topK_files[0].split("/")[-2] |
| |
|
| | non_zero_mask = np.count_nonzero(selected_keypoint_masks) |
| |
|
| | return ( |
| | topK_idx, |
| | prediction, |
| | predicted_folder_name, |
| | prediction_weight, |
| | topK_files[: self.K], |
| | reranked_nns_files[: self.K], |
| | topK_cmaps[: self.K], |
| | topK_similarity_maps[: self.K], |
| | topK_rsimilarity_maps[: self.K], |
| | topK_transferred_points[: self.K], |
| | topK_masked_sims[: self.K], |
| | non_zero_mask, |
| | ) |
| |
|
| |
|
| | def export_visualizations_results( |
| | reranker_output, |
| | knn_predicted_label, |
| | knn_confidence, |
| | topK_knns, |
| | K=20, |
| | N=50, |
| | T=0.55, |
| | ): |
| | """ |
| | Export all details for visualization and analysis |
| | """ |
| |
|
| | non_zero_mask = 5 |
| | ( |
| | topK_idx, |
| | p, |
| | pfn, |
| | pr, |
| | rfiles, |
| | reranked_nns, |
| | cmaps, |
| | sims, |
| | rsims, |
| | trns_kpts, |
| | ) = reranker_output.predict_using_cc() |
| |
|
| | MASKED_COSINE_VALUES = [ |
| | np.multiply( |
| | sims[X], |
| | generate_mask( |
| | normalize_array(reranker_output.SIMS_source[topK_idx[X]]), t=T |
| | ), |
| | ) |
| | for X in range(len(sims)) |
| | ] |
| |
|
| | list_of_source_points = [] |
| | list_of_target_points = [] |
| |
|
| | for CK in range(len(sims)): |
| | target_keypoints = [] |
| | topk_index = arg_topK(MASKED_COSINE_VALUES[CK], topK=non_zero_mask) |
| |
|
| | for i in range(non_zero_mask): |
| | |
| | x, y = trns_kpts[CK].T[topk_index[i]] |
| | Ptarget = int(((x + 1) / 2.0) * 240), int(((y + 1) / 2.0) * 240) |
| | target_keypoints.append(Ptarget) |
| |
|
| | |
| | a = np.linspace(1 + 17, 240 - 17 - 1, 7) |
| | b = np.linspace(1 + 17, 240 - 17 - 1, 7) |
| | point_list = list(product(a, b)) |
| |
|
| | list_of_source_points.append(np.asarray([point_list[x] for x in topk_index])) |
| | list_of_target_points.append(np.asarray(target_keypoints)) |
| |
|
| | |
| | detailed_output = { |
| | "q": reranker_output.q, |
| | "K": K, |
| | "N": N, |
| | "knn-prediction": knn_predicted_label, |
| | "knn-prediction-confidence": knn_confidence, |
| | "knn-nearest-neighbors": topK_knns, |
| | "chm-prediction": pfn, |
| | "chm-prediction-confidence": pr, |
| | "chm-nearest-neighbors": rfiles, |
| | "chm-nearest-neighbors-all": reranked_nns, |
| | "correspondance_map": cmaps, |
| | "masked_cos_values": MASKED_COSINE_VALUES, |
| | "src-keypoints": list_of_source_points, |
| | "tgt-keypoints": list_of_target_points, |
| | "non_zero_mask": non_zero_mask, |
| | "transferred_kpoints": trns_kpts, |
| | } |
| |
|
| | return detailed_output |
| |
|
| |
|
| | def chm_classify_and_visualize( |
| | query_image, kNN_results, support, TRAIN_SET, N=50, K=20, T=0.55, BS=64 |
| | ): |
| | global chm_args |
| | chm_src_t, chm_tgt_t, cos_src_t, cos_tgt_t = get_transforms("single", chm_args) |
| | knn_predicted_label, knn_confidence, topK_knns = kNN_results |
| |
|
| | reranker = CHMGridTransfer( |
| | query_image=query_image, |
| | support_set=support[0], |
| | support_set_labels=support[1], |
| | train_folder=TRAIN_SET, |
| | top_N=N, |
| | top_K=K, |
| | binarization_threshold=T, |
| | chm_source_transform=chm_src_t, |
| | chm_target_transform=chm_tgt_t, |
| | cosine_source_transform=cos_src_t, |
| | cosine_target_transform=cos_tgt_t, |
| | batch_size=BS, |
| | ) |
| |
|
| | |
| | reranker.build() |
| | |
| | exported_reranker = reranker.export() |
| | |
| |
|
| | output = export_visualizations_results( |
| | exported_reranker, |
| | knn_predicted_label, |
| | knn_confidence, |
| | topK_knns, |
| | K, |
| | N, |
| | T, |
| | ) |
| |
|
| | return output |
| |
|