EarthLoc2
/
image-matching-models
/matching
/third_party
/accelerated_features
/modules
/eval
/megadepth1500.py
| """ | |
| "XFeat: Accelerated Features for Lightweight Image Matching, CVPR 2024." | |
| https://www.verlab.dcc.ufmg.br/descriptors/xfeat_cvpr24/ | |
| Camera pose metrics adapted from LoFTR https://github.com/zju3dv/LoFTR/blob/master/src/utils/metrics.py | |
| The main difference is the use of poselib instead of OpenCV's vanilla RANSAC for E_mat, which is more stable and MUCH and faster. | |
| """ | |
| import argparse, glob, sys, os, time | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| import cv2 | |
| import numpy as np | |
| import poselib | |
| import json | |
| import copy | |
| import tqdm | |
| # Disable scientific notation | |
| np.set_printoptions(suppress=True) | |
| class MegaDepth1500(Dataset): | |
| """ | |
| Streamlined MegaDepth-1500 dataloader. The camera poses & metadata are stored in a formatted json for facilitating | |
| the download of the dataset and to keep the setup as simple as possible. | |
| """ | |
| def __init__(self, json_file, root_dir): | |
| # Load the info & calibration from the JSON | |
| with open(json_file, 'r') as f: | |
| self.data = json.load(f) | |
| self.root_dir = root_dir | |
| if not os.path.exists(self.root_dir): | |
| raise RuntimeError( | |
| f"Dataset {self.root_dir} does not exist! \n \ | |
| > If you didn't download the dataset, use the downloader tool: python3 -m modules.dataset.download -h") | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| data = copy.deepcopy(self.data[idx]) | |
| h1, w1 = data['size0_hw'] | |
| h2, w2 = data['size1_hw'] | |
| # Here we resize the images to max_dim = 1200, as described in the paper, and adjust the image such that it is divisible by 32 | |
| # following the protocol of the LoFTR's Dataloader (intrinsics are corrected accordingly). | |
| # For adapting this with different resolution, you would need to re-scale intrinsics below. | |
| image0 = cv2.resize( cv2.imread(f"{self.root_dir}/{data['pair_names'][0]}"), | |
| (w1, h1)) | |
| image1 = cv2.resize( cv2.imread(f"{self.root_dir}/{data['pair_names'][1]}"), | |
| (w2, h2)) | |
| data['image0'] = torch.tensor(image0.astype(np.float32)/255).permute(2,0,1) | |
| data['image1'] = torch.tensor(image1.astype(np.float32)/255).permute(2,0,1) | |
| for k,v in data.items(): | |
| if k not in ('dataset_name', 'scene_id', 'pair_id', 'pair_names', 'size0_hw', 'size1_hw', 'image0', 'image1'): | |
| data[k] = torch.tensor(np.array(v, dtype=np.float32)) | |
| return data | |
| ################################# Metrics ##################################### | |
| def relative_pose_error(T_0to1, R, t, ignore_gt_t_thr=0.0): | |
| # angle error between 2 vectors | |
| t_gt = T_0to1[:3, 3] | |
| n = np.linalg.norm(t) * np.linalg.norm(t_gt) | |
| t_err = np.rad2deg(np.arccos(np.clip(np.dot(t, t_gt) / n, -1.0, 1.0))) | |
| t_err = np.minimum(t_err, 180 - t_err) # handle E ambiguity | |
| if np.linalg.norm(t_gt) < ignore_gt_t_thr: # pure rotation is challenging | |
| t_err = 0 | |
| # angle error between 2 rotation matrices | |
| R_gt = T_0to1[:3, :3] | |
| cos = (np.trace(np.dot(R.T, R_gt)) - 1) / 2 | |
| cos = np.clip(cos, -1., 1.) # handle numercial errors | |
| R_err = np.rad2deg(np.abs(np.arccos(cos))) | |
| return t_err, R_err | |
| def intrinsics_to_camera(K): | |
| px, py = K[0, 2], K[1, 2] | |
| fx, fy = K[0, 0], K[1, 1] | |
| return { | |
| "model": "PINHOLE", | |
| "width": int(2 * px), | |
| "height": int(2 * py), | |
| "params": [fx, fy, px, py], | |
| } | |
| def estimate_pose_poselib(kpts0, kpts1, K0, K1, thresh, conf=0.99999): | |
| M, info = poselib.estimate_relative_pose( | |
| kpts0, kpts1, | |
| intrinsics_to_camera(K0), | |
| intrinsics_to_camera(K1), | |
| {"max_epipolar_error": thresh, | |
| "success_prob": conf, | |
| "min_iterations": 20, | |
| "max_iterations": 1_000}, | |
| ) | |
| R, t, inl = M.R, M.t, info["inliers"] | |
| inl = np.array(inl) | |
| ret = (R, t, inl) | |
| return ret, (kpts0, kpts1) | |
| def tensor2bgr(t): | |
| return (t.cpu()[0].permute(1,2,0).numpy()*255).astype(np.uint8) | |
| def compute_pose_error(pair): | |
| """ | |
| Input: | |
| pair (dict):{ | |
| "pts0": ndrray(N,2) | |
| "pts1": ndrray(N,2) | |
| "K0": ndrray(3,3) | |
| "K1": ndrray(3,3) | |
| "T_0to1": ndrray(4,4) | |
| } | |
| Update: | |
| pair (dict):{ | |
| "R_err" List[float]: [N] | |
| "t_err" List[float]: [N] | |
| "inliers" List[np.ndarray]: [N] | |
| } | |
| """ | |
| pixel_thr = 1.0 if 'ransac_thr' not in pair else pair['ransac_thr'] | |
| conf = 0.99999 | |
| pair.update({'R_err': np.inf, 't_err': np.inf, 'inliers': []}) | |
| pts0 = pair['pts0'] | |
| pts1 = pair['pts1'] | |
| K0 = pair['K0'].cpu().numpy()[0] | |
| K1 = pair['K1'].cpu().numpy()[0] | |
| T_0to1 = pair['T_0to1'].cpu().numpy()[0] | |
| ret, corrs = estimate_pose_poselib(pts0, pts1, K0, K1, pixel_thr, conf=conf) | |
| if ret is not None: | |
| R, t, inliers = ret | |
| t_err, R_err = relative_pose_error(T_0to1, R, t, ignore_gt_t_thr=0.0) | |
| pair['R_err'] = R_err | |
| pair['t_err'] = t_err | |
| def error_auc(errors, thresholds=[5, 10, 20]): | |
| """ | |
| Args: | |
| errors (list): [N,] | |
| thresholds (list) | |
| """ | |
| errors = [0] + sorted(list(errors)) | |
| recall = list(np.linspace(0, 1, len(errors))) | |
| aucs = [] | |
| for thr in thresholds: | |
| last_index = np.searchsorted(errors, thr) | |
| y = recall[:last_index] + [recall[last_index-1]] | |
| x = errors[:last_index] + [thr] | |
| aucs.append(np.trapz(y, x) / thr) | |
| return {f'auc@{t}': auc for t, auc in zip(thresholds, aucs)} | |
| def compute_maa(pairs, thresholds=[5, 10, 20]): | |
| print("auc / mAcc on %d pairs" % (len(pairs))) | |
| errors = [] | |
| for p in pairs: | |
| et = p['t_err'] | |
| er = p['R_err'] | |
| errors.append(max(et, er)) | |
| d_err_auc = error_auc(errors) | |
| for k,v in d_err_auc.items(): | |
| print(k, ': ', '%.1f'%(v*100)) | |
| errors = np.array(errors) | |
| for t in thresholds: | |
| acc = (errors <= t).sum() / len(errors) | |
| print("mAcc@%d: %.1f "%(t, acc*100)) | |
| def run_pose_benchmark(matcher_fn, loader, ransac_thr=2.5): | |
| """ | |
| Run relative pose estimation benchmark using a specified matcher function and data loader. | |
| Parameters | |
| ---------- | |
| matcher_fn : callable | |
| The matching function to be evaluated for pose estimation. It should accept two np.array RGB images (H,W,3) | |
| and return mkpts_0, mkpts_1 which are np.array(N,2) matching coordinates. | |
| loader : iterable | |
| Data loader that provides batches of data. Each batch should contain two images, along | |
| with their groundtruth camera poses. | |
| ransac_thr : float, optional, default=2.5 | |
| The RANSAC threshold for considering a point as an inlier in pixels. | |
| """ | |
| pairs = [] | |
| cnt = 0 | |
| for d in tqdm.tqdm(loader): | |
| d_error = {} | |
| src_pts, dst_pts = matcher_fn(tensor2bgr(d['image0']), tensor2bgr(d['image1'])) | |
| #delete images to avoid OOM, happens in low mem machines | |
| del d['image0'] | |
| del d['image1'] | |
| #rescale kpts | |
| src_pts = src_pts * d['scale0'].numpy() | |
| dst_pts = dst_pts * d['scale1'].numpy() | |
| d.update({"pts0":src_pts, "pts1": dst_pts,'ransac_thr': ransac_thr}) | |
| compute_pose_error(d) | |
| pairs.append(d) | |
| cnt+=1 | |
| compute_maa(pairs) | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="Run pose benchmark with matcher") | |
| parser.add_argument('--dataset-dir', type=str, required=True, | |
| help="Path to MegaDepth dataset root") | |
| parser.add_argument('--matcher', type=str, choices=['xfeat', 'xfeat-star', 'alike'], default='xfeat', | |
| help="Matcher to use (xfeat or alike)") | |
| parser.add_argument('--ransac-thr', type=float, default=2.5, | |
| help="RANSAC threshold value in pixels (default: 2.5)") | |
| return parser.parse_args() | |
| if __name__ == '__main__': | |
| args = parse_args() | |
| dataset = MegaDepth1500( json_file = './assets/megadepth_1500.json', | |
| root_dir = args.dataset_dir + "/megadepth_test_1500") | |
| loader = DataLoader(dataset, batch_size=1, shuffle=False) | |
| if args.matcher == 'xfeat': | |
| print("Running benchmark for XFeat..") | |
| from modules.xfeat import XFeat | |
| xfeat = XFeat() | |
| run_pose_benchmark(matcher_fn = xfeat.match_xfeat, loader = loader, ransac_thr = args.ransac_thr) | |
| elif args.matcher == 'xfeat-star': | |
| from modules.xfeat import XFeat | |
| print("Running benchmark for XFeat*..") | |
| xfeat = XFeat(top_k = 10_000) | |
| run_pose_benchmark(matcher_fn = xfeat.match_xfeat_star, loader = loader, ransac_thr = args.ransac_thr) | |
| elif args.matcher == 'alike': | |
| from third_party import alike_wrapper as alike | |
| print("Running benchmark for ALIKE..") | |
| run_pose_benchmark(matcher_fn = alike.match_alike, loader = loader, ransac_thr = args.ransac_thr) | |