| | from argparse import Namespace |
| | import os |
| | import torch |
| | import cv2 |
| | from time import time |
| | from pathlib import Path |
| | import matplotlib.cm as cm |
| | import numpy as np |
| |
|
| | from src.models.topic_fm import TopicFM |
| | from src import get_model_cfg |
| | from .base import Viz |
| | from src.utils.metrics import compute_symmetrical_epipolar_errors, compute_pose_errors |
| | from src.utils.plotting import draw_topics, draw_topicfm_demo, error_colormap |
| |
|
| |
|
| | class VizTopicFM(Viz): |
| | def __init__(self, args): |
| | super().__init__() |
| | if type(args) == dict: |
| | args = Namespace(**args) |
| |
|
| | self.match_threshold = args.match_threshold |
| | self.n_sampling_topics = args.n_sampling_topics |
| | self.show_n_topics = args.show_n_topics |
| |
|
| | |
| | conf = dict(get_model_cfg()) |
| | conf["match_coarse"]["thr"] = self.match_threshold |
| | conf["coarse"]["n_samples"] = self.n_sampling_topics |
| | print("model config: ", conf) |
| | self.model = TopicFM(config=conf) |
| | ckpt_dict = torch.load(args.ckpt) |
| | self.model.load_state_dict(ckpt_dict["state_dict"]) |
| | self.model = self.model.eval().to(self.device) |
| |
|
| | |
| | |
| | self.name = "TopicFM" |
| |
|
| | print(f"Initialize {self.name}") |
| |
|
| | def match_and_draw( |
| | self, |
| | data_dict, |
| | root_dir=None, |
| | ground_truth=False, |
| | measure_time=False, |
| | viz_matches=True, |
| | ): |
| | if measure_time: |
| | torch.cuda.synchronize() |
| | start = torch.cuda.Event(enable_timing=True) |
| | end = torch.cuda.Event(enable_timing=True) |
| | start.record() |
| | self.model(data_dict) |
| | if measure_time: |
| | torch.cuda.synchronize() |
| | end.record() |
| | torch.cuda.synchronize() |
| | self.time_stats.append(start.elapsed_time(end)) |
| |
|
| | kpts0 = data_dict["mkpts0_f"].cpu().numpy() |
| | kpts1 = data_dict["mkpts1_f"].cpu().numpy() |
| |
|
| | img_name0, img_name1 = list(zip(*data_dict["pair_names"]))[0] |
| | img0 = cv2.imread(os.path.join(root_dir, img_name0)) |
| | img1 = cv2.imread(os.path.join(root_dir, img_name1)) |
| | if str(data_dict["dataset_name"][0]).lower() == "scannet": |
| | img0 = cv2.resize(img0, (640, 480)) |
| | img1 = cv2.resize(img1, (640, 480)) |
| |
|
| | if viz_matches: |
| | saved_name = "_".join( |
| | [ |
| | img_name0.split("/")[-1].split(".")[0], |
| | img_name1.split("/")[-1].split(".")[0], |
| | ] |
| | ) |
| | folder_matches = os.path.join(root_dir, "{}_viz_matches".format(self.name)) |
| | if not os.path.exists(folder_matches): |
| | os.makedirs(folder_matches) |
| | path_to_save_matches = os.path.join( |
| | folder_matches, "{}.png".format(saved_name) |
| | ) |
| |
|
| | if ground_truth: |
| | compute_symmetrical_epipolar_errors( |
| | data_dict |
| | ) |
| | compute_pose_errors( |
| | data_dict |
| | ) |
| | epi_errors = data_dict["epi_errs"].cpu().numpy() |
| | R_errors, t_errors = data_dict["R_errs"][0], data_dict["t_errs"][0] |
| |
|
| | self.draw_matches( |
| | kpts0, |
| | kpts1, |
| | img0, |
| | img1, |
| | epi_errors, |
| | path=path_to_save_matches, |
| | R_errs=R_errors, |
| | t_errs=t_errors, |
| | ) |
| |
|
| | |
| | rel_pair_names = list(zip(*data_dict["pair_names"])) |
| | bs = data_dict["image0"].size(0) |
| | metrics = { |
| | |
| | "identifiers": ["#".join(rel_pair_names[b]) for b in range(bs)], |
| | "epi_errs": [ |
| | data_dict["epi_errs"][data_dict["m_bids"] == b].cpu().numpy() |
| | for b in range(bs) |
| | ], |
| | "R_errs": data_dict["R_errs"], |
| | "t_errs": data_dict["t_errs"], |
| | "inliers": data_dict["inliers"], |
| | } |
| | self.eval_stats.append({"metrics": metrics}) |
| | else: |
| | m_conf = 1 - data_dict["mconf"].cpu().numpy() |
| | self.draw_matches( |
| | kpts0, |
| | kpts1, |
| | img0, |
| | img1, |
| | m_conf, |
| | path=path_to_save_matches, |
| | conf_thr=0.4, |
| | ) |
| | if self.show_n_topics > 0: |
| | folder_topics = os.path.join( |
| | root_dir, "{}_viz_topics".format(self.name) |
| | ) |
| | if not os.path.exists(folder_topics): |
| | os.makedirs(folder_topics) |
| | draw_topics( |
| | data_dict, |
| | img0, |
| | img1, |
| | saved_folder=folder_topics, |
| | show_n_topics=self.show_n_topics, |
| | saved_name=saved_name, |
| | ) |
| |
|
| | def run_demo( |
| | self, dataloader, writer=None, output_dir=None, no_display=False, skip_frames=1 |
| | ): |
| | data_dict = next(dataloader) |
| |
|
| | frame_id = 0 |
| | last_image_id = 0 |
| | img0 = ( |
| | np.array(cv2.imread(str(data_dict["img_path"][0])), dtype=np.float32) / 255 |
| | ) |
| | frame_tensor = data_dict["img"].to(self.device) |
| | pair_data = {"image0": frame_tensor} |
| | last_frame = cv2.resize( |
| | img0, (frame_tensor.shape[-1], frame_tensor.shape[-2]), cv2.INTER_LINEAR |
| | ) |
| |
|
| | if output_dir is not None: |
| | print("==> Will write outputs to {}".format(output_dir)) |
| | Path(output_dir).mkdir(exist_ok=True) |
| |
|
| | |
| | if not no_display: |
| | window_name = "Topic-assisted Feature Matching" |
| | cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) |
| | cv2.resizeWindow(window_name, (640 * 2, 480 * 2)) |
| | else: |
| | print("Skipping visualization, will not show a GUI.") |
| |
|
| | |
| | print( |
| | "==> Keyboard control:\n" |
| | "\tn: select the current frame as the reference image (left)\n" |
| | "\tq: quit" |
| | ) |
| |
|
| | |
| |
|
| | while True: |
| | frame_id += 1 |
| | if frame_id == len(dataloader): |
| | print("Finished demo_loftr.py") |
| | break |
| | data_dict = next(dataloader) |
| | if frame_id % skip_frames != 0: |
| | |
| | continue |
| |
|
| | stem0, stem1 = last_image_id, data_dict["id"][0].item() - 1 |
| | frame = ( |
| | np.array(cv2.imread(str(data_dict["img_path"][0])), dtype=np.float32) |
| | / 255 |
| | ) |
| |
|
| | frame_tensor = data_dict["img"].to(self.device) |
| | frame = cv2.resize( |
| | frame, |
| | (frame_tensor.shape[-1], frame_tensor.shape[-2]), |
| | interpolation=cv2.INTER_LINEAR, |
| | ) |
| | pair_data = {**pair_data, "image1": frame_tensor} |
| | self.model(pair_data) |
| |
|
| | total_n_matches = len(pair_data["mkpts0_f"]) |
| | mkpts0 = pair_data["mkpts0_f"].cpu().numpy() |
| | mkpts1 = pair_data["mkpts1_f"].cpu().numpy() |
| | mconf = pair_data["mconf"].cpu().numpy() |
| |
|
| | |
| | if len(mconf) > 0: |
| | mconf = 1 - mconf |
| |
|
| | |
| | |
| | color = error_colormap(mconf, thr=0.4, alpha=0.1) |
| |
|
| | text = [ |
| | f"Topics", |
| | "#Matches: {}".format(total_n_matches), |
| | ] |
| |
|
| | out = draw_topicfm_demo( |
| | pair_data, |
| | last_frame, |
| | frame, |
| | mkpts0, |
| | mkpts1, |
| | color, |
| | text, |
| | show_n_topics=4, |
| | path=None, |
| | ) |
| |
|
| | if not no_display: |
| | if writer is not None: |
| | writer.write(out) |
| | cv2.imshow("TopicFM Matches", out) |
| | key = chr(cv2.waitKey(10) & 0xFF) |
| | if key == "q": |
| | if writer is not None: |
| | writer.release() |
| | print("Exiting...") |
| | break |
| | elif key == "n": |
| | pair_data["image0"] = frame_tensor |
| | last_frame = frame |
| | last_image_id = data_dict["id"][0].item() - 1 |
| | frame_id_left = frame_id |
| |
|
| | elif output_dir is not None: |
| | stem = "matches_{:06}_{:06}".format(stem0, stem1) |
| | out_file = str(Path(output_dir, stem + ".png")) |
| | print("\nWriting image to {}".format(out_file)) |
| | cv2.imwrite(out_file, out) |
| | else: |
| | raise ValueError("output_dir is required when no display is given.") |
| |
|
| | cv2.destroyAllWindows() |
| | if writer is not None: |
| | writer.release() |
| |
|