| | import numpy as np |
| | import os |
| | import torch |
| | import cv2 |
| | import csv |
| | from metric import * |
| | import metric |
| | import argparse |
| | from tqdm import tqdm |
| | import json |
| |
|
| |
|
| | device = 'cuda' |
| | eval_metrics = [ |
| | "abs_relative_difference", |
| | "rmse_linear", |
| | "delta1_acc", |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | ] |
| |
|
| |
|
| | def depth2disparity(depth, return_mask=False): |
| | if isinstance(depth, torch.Tensor): |
| | disparity = torch.zeros_like(depth) |
| | elif isinstance(depth, np.ndarray): |
| | disparity = np.zeros_like(depth) |
| | non_negtive_mask = depth > 0 |
| | disparity[non_negtive_mask] = 1.0 / depth[non_negtive_mask] |
| | if return_mask: |
| | return disparity, non_negtive_mask |
| | else: |
| | return disparity |
| |
|
| |
|
| | def resize_images(images, new_size): |
| | resized_images = np.empty( |
| | (images.shape[0], new_size[0], new_size[1], images.shape[3]) |
| | ) |
| |
|
| | for i, image in enumerate(images): |
| | if image.shape[2]==1: |
| | resized_images[i] = cv2.resize(image, (new_size[1], new_size[0]))[..., None] |
| | else: |
| | resized_images[i] = cv2.resize(image, (new_size[1], new_size[0])) |
| |
|
| | return resized_images |
| | |
| |
|
| | def eval_single( |
| | pred_disp_path, |
| | gt_disp_path, |
| | seq_len=98, |
| | domain='depth', |
| | method_type="ours", |
| | dataset_max_depth="70" |
| | ): |
| | |
| | gt_disp = np.load(gt_disp_path)['disparity'] \ |
| | if 'disparity' in np.load(gt_disp_path).files else \ |
| | np.load(gt_disp_path)['arr_0'] |
| |
|
| | if method_type=="ours": |
| | pred_disp = np.load(pred_disp_path)['depth'] |
| | if method_type=="depth_anything": |
| | pred_disp = np.load(pred_disp_path)['disparity'] |
| | |
| | |
| | if pred_disp.shape[0] < seq_len: |
| | seq_len = pred_disp.shape[0] |
| |
|
| | |
| | pred_disp = resize_images(pred_disp[..., None], (gt_disp.shape[-2], gt_disp.shape[-1])) |
| | pred_disp = pred_disp[..., 0] |
| | pred_disp = pred_disp[:seq_len] |
| | gt_disp = gt_disp[:seq_len, 0] |
| |
|
| | |
| | valid_mask = np.logical_and( |
| | (gt_disp > 1e-3), |
| | (gt_disp < dataset_max_depth) |
| | ) |
| | pred_disp = np.clip(pred_disp, a_min=1e-3, a_max=None) |
| | pred_disp_masked = pred_disp[valid_mask].reshape((-1, 1)) |
| | |
| | |
| | DOMAIN = domain |
| | if DOMAIN=='disp': |
| | |
| | gt_disp_maksed = gt_disp[valid_mask].reshape((-1, 1)).astype(np.float64) |
| | elif DOMAIN=='depth': |
| | |
| | gt_disp_maksed = 1. / (gt_disp[valid_mask].reshape((-1, 1)).astype(np.float64) + 1e-8) |
| | else: |
| | pass |
| |
|
| |
|
| | |
| | _ones = np.ones_like(pred_disp_masked) |
| | A = np.concatenate([pred_disp_masked, _ones], axis=-1) |
| | X = np.linalg.lstsq(A, gt_disp_maksed, rcond=None)[0] |
| | scale, shift = X |
| | |
| | |
| | aligned_pred = scale * pred_disp + shift |
| | aligned_pred = np.clip(aligned_pred, a_min=1e-3, a_max=None) |
| |
|
| |
|
| | |
| | if DOMAIN=='disp': |
| | pred_depth = aligned_pred |
| | gt_depth = gt_disp |
| | |
| | elif DOMAIN=='depth': |
| | pred_depth = depth2disparity(aligned_pred) |
| | gt_depth = gt_disp |
| | else: |
| | pass |
| |
|
| | |
| | pred_depth = np.clip( |
| | pred_depth, a_min=1e-3, a_max=dataset_max_depth |
| | ) |
| |
|
| | |
| | sample_metric = [] |
| | metric_funcs = [getattr(metric, _met) for _met in eval_metrics] |
| |
|
| | |
| | sample_metric = [] |
| | pred_depth_ts = torch.from_numpy(pred_depth).to(device) |
| | gt_depth_ts = torch.from_numpy(gt_depth).to(device) |
| | valid_mask_ts = torch.from_numpy(valid_mask).to(device) |
| |
|
| | n = valid_mask.sum((-1, -2)) |
| | valid_frame = (n > 0) |
| | pred_depth_ts = pred_depth_ts[valid_frame] |
| | gt_depth_ts = gt_depth_ts[valid_frame] |
| | valid_mask_ts = valid_mask_ts[valid_frame] |
| |
|
| | for met_func in metric_funcs: |
| | _metric_name = met_func.__name__ |
| | _metric = met_func(pred_depth_ts, gt_depth_ts, valid_mask_ts).item() |
| | sample_metric.append(_metric) |
| |
|
| | return sample_metric |
| |
|
| |
|
| |
|
| | if __name__=="__main__": |
| | parser = argparse.ArgumentParser() |
| |
|
| | parser.add_argument( |
| | "--seq_len", |
| | type=int, |
| | default=50, |
| | help="Max video frame length for evaluation." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--domain", |
| | type=str, |
| | default="depth", |
| | choices=["depth", "disp"], |
| | help="Domain of metric calculation." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--method_type", |
| | type=str, |
| | default="ours", |
| | choices=["ours", "depth_anything"], |
| | help="Choose the methods." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--dataset_max_depth", |
| | type=int, |
| | default=70, |
| | help="Dataset max depth clip." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--pred_disp_root", |
| | type=str, |
| | default="./demo_output", |
| | help="Predicted output directory." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--gt_disp_root", |
| | type=str, |
| | required=True, |
| | help="GT depth directory." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--dataset", |
| | type=str, |
| | required=True, |
| | help="Choose the datasets." |
| | ) |
| |
|
| | parser.add_argument( |
| | "--meta_path", |
| | type=str, |
| | required=True, |
| | help="Path of test dataset csv file." |
| | ) |
| |
|
| |
|
| | args = parser.parse_args() |
| |
|
| | SEQ_LEN = args.seq_len |
| | method_type = args.method_type |
| | if method_type == "ours": |
| | pred_disp_root = os.path.join(args.pred_disp_root, f'results_{args.dataset}') |
| | else: |
| | |
| | pred_disp_root = os.path.join(args.pred_disp_root, f'results_{args.dataset}') |
| | domain = args.domain |
| | dataset_max_depth = args.dataset_max_depth |
| | saved_json_path = os.path.join(args.pred_disp_root, f"results_{args.dataset}.json") |
| |
|
| | meta_path = args.meta_path |
| |
|
| | assert method_type in ["depth_anything", "ours"], "Invalid method type, must be in ['depth_anything', 'ours']" |
| | assert domain in ["depth", "disp"], "Invalid domain type, must be in ['depth', 'disp']" |
| | |
| | with open(meta_path, mode="r", encoding="utf-8") as csvfile: |
| | csv_reader = csv.DictReader(csvfile) |
| | samples = list(csv_reader) |
| |
|
| | |
| | results_all = [] |
| | for i, sample in enumerate(tqdm(samples)): |
| | gt_disp_path = os.path.join(args.gt_disp_root, samples[i]['filepath_disparity']) |
| | if method_type=="ours": |
| | pred_disp_path = os.path.join(pred_disp_root, samples[i]['filepath_disparity']) |
| | pred_disp_path = pred_disp_path.replace("disparity", "rgb_left") |
| | |
| | if method_type=="depth_anything": |
| | pred_disp_path = os.path.join(pred_disp_root, samples[i]['filepath_disparity']) |
| | pred_disp_path = pred_disp_path.replace("disparity", "rgb_left_depth") |
| | |
| | results_single = eval_single( |
| | pred_disp_path, |
| | gt_disp_path, |
| | seq_len=SEQ_LEN, |
| | domain=domain, |
| | method_type=method_type, |
| | dataset_max_depth=dataset_max_depth |
| | ) |
| |
|
| | results_all.append(results_single) |
| |
|
| | |
| | final_results = np.array(results_all) |
| | final_results_mean = np.mean(final_results, axis=0) |
| | print("") |
| |
|
| | |
| | result_dict = { 'name': method_type } |
| | for i, metric in enumerate(eval_metrics): |
| | result_dict[metric] = final_results_mean[i] |
| | print(f"{metric}: {final_results_mean[i]:04f}") |
| |
|
| | |
| | for i, results in enumerate(results_all): |
| | result_dict[samples[i]['filepath_disparity']] = results |
| |
|
| | |
| | with open(saved_json_path, 'w') as f: |
| | json.dump(result_dict, f, indent=4) |
| | print("") |
| | print(f"Evaluation results json are saved to {saved_json_path}") |
| | |
| |
|