|
|
|
|
|
|
|
|
import warnings |
|
|
import csv |
|
|
import math |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
_EPS = np.finfo(float).eps * 4.0 |
|
|
|
|
|
|
|
|
def read_csv(filename, header=True, print_header=False): |
|
|
data = {} |
|
|
label_idx = {} |
|
|
|
|
|
with open(filename, newline='\n') as csvfile: |
|
|
csv_lines = csv.reader(csvfile, delimiter=',') |
|
|
for row in csv_lines: |
|
|
if header: |
|
|
header = False |
|
|
for i, name in enumerate(row): label_idx[name] = i |
|
|
if print_header: |
|
|
print(f'Skipping header for file {filename}: {row}') |
|
|
continue |
|
|
dataset = row[label_idx['dataset']] |
|
|
scene = row[label_idx['scene']] |
|
|
image = row[label_idx['image']] |
|
|
R = np.array([float(x) for x in (row[label_idx['rotation_matrix']].split(';'))]).reshape(3,3) |
|
|
t = np.array([float(x) for x in (row[label_idx['translation_vector']].split(';'))]).reshape(3) |
|
|
c = -R.T @ t |
|
|
|
|
|
if not (dataset in data): |
|
|
data[dataset] = {} |
|
|
if not (scene in data[dataset]): |
|
|
data[dataset][scene] = {} |
|
|
data[dataset][scene][image] = {'R': R, 't': t, 'c': c} |
|
|
return data |
|
|
|
|
|
|
|
|
def quaternion_matrix(quaternion): |
|
|
'''Return homogeneous rotation matrix from quaternion.''' |
|
|
|
|
|
q = np.array(quaternion, dtype=np.float64, copy=True) |
|
|
n = np.dot(q, q) |
|
|
if n < _EPS: |
|
|
|
|
|
return np.identity(4) |
|
|
q *= math.sqrt(2.0 / n) |
|
|
q = np.outer(q, q) |
|
|
return np.array( |
|
|
[ |
|
|
[ |
|
|
1.0 - q[2, 2] - q[3, 3], |
|
|
q[1, 2] - q[3, 0], |
|
|
q[1, 3] + q[2, 0], |
|
|
0.0, |
|
|
], |
|
|
[ |
|
|
q[1, 2] + q[3, 0], |
|
|
1.0 - q[1, 1] - q[3, 3], |
|
|
q[2, 3] - q[1, 0], |
|
|
0.0, |
|
|
], |
|
|
[ |
|
|
q[1, 3] - q[2, 0], |
|
|
q[2, 3] + q[1, 0], |
|
|
1.0 - q[1, 1] - q[2, 2], |
|
|
0.0, |
|
|
], |
|
|
[0.0, 0.0, 0.0, 1.0], |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
def mAA_on_cameras(err, thresholds, n, skip_top_thresholds, to_dec=3): |
|
|
'''mAA is the mean of mAA_i, where for each threshold th_i in <thresholds>, excluding the first <skip_top_thresholds values>, |
|
|
mAA_i = max(0, sum(err_i < th_i) - <to_dec>) / (n - <to_dec>) |
|
|
where <n> is the number of ground-truth cameras and err_i is the camera registration error for the best |
|
|
registration corresponding to threshold th_i''' |
|
|
|
|
|
aux = err[:, skip_top_thresholds:] < np.expand_dims(np.asarray(thresholds[skip_top_thresholds:]), axis=0) |
|
|
numerator = np.sum(np.maximum(np.sum(aux, axis=0) - to_dec, 0)) |
|
|
|
|
|
return 0 if numerator == 0 else numerator / (len(thresholds[skip_top_thresholds:]) * (n - to_dec)) |
|
|
|
|
|
|
|
|
def mAA_on_cameras_per_th(err, thresholds, n, to_dec=3): |
|
|
'''as mAA_on_cameras, to be used in score_all_ext with per_th=True''' |
|
|
aux = err < np.expand_dims(np.asarray(thresholds), axis=0) |
|
|
return np.maximum(np.sum(aux, axis=0) - to_dec, 0) / (n - to_dec) |
|
|
|
|
|
|
|
|
def check_data(gt_data, user_data, print_error=False): |
|
|
'''check if the gt/submission data are correct - |
|
|
<gt_data> - images in different scenes in the same dataset cannot have the same name |
|
|
<user_data> - there must be exactly an entry for each dataset, scene, image entry in the gt |
|
|
<print_error> - print the error *ATTENTION: must be disable when called from score_all_ext to avoid possible data leaks!*''' |
|
|
|
|
|
for dataset in gt_data.keys(): |
|
|
aux = {} |
|
|
for scene in gt_data[dataset].keys(): |
|
|
for image in gt_data[dataset][scene].keys(): |
|
|
if image in aux: |
|
|
if print_error: warnings.warn(f'image {image} found duplicated in the GT dataset {dataset}') |
|
|
return False |
|
|
else: |
|
|
aux[image] = 1 |
|
|
|
|
|
if not dataset in user_data.keys(): |
|
|
if print_error: warnings.warn(f'dataset {dataset} not found in submission') |
|
|
return False |
|
|
|
|
|
for scene in user_data[dataset].keys(): |
|
|
for image in user_data[dataset][scene].keys(): |
|
|
if not (image in aux): |
|
|
if print_error: warnings.warn(f'image {image} does not belong to the GT dataset {dataset}') |
|
|
return False |
|
|
else: |
|
|
aux.pop(image) |
|
|
|
|
|
if len(aux) > 0: |
|
|
if print_error: warnings.warn(f'submission dataset {dataset} missing some GT images') |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
def register_by_Horn(ev_coord, gt_coord, ransac_threshold, inl_cf, strict_cf): |
|
|
'''Return the best similarity transforms T that registers 3D points pt_ev in <ev_coord> to |
|
|
the corresponding ones pt_gt in <gt_coord> according to a RANSAC-like approach for each |
|
|
threshold value th in <ransac_threshold>. |
|
|
|
|
|
Given th, each triplet of 3D correspondences is examined if not already present as strict inlier, |
|
|
a correspondence is a strict inlier if <strict_cf> * err_best < th, where err_best is the registration |
|
|
error for the best model so far. |
|
|
The minimal model given by the triplet is then refined using also its inliers if their total is greater |
|
|
than <inl_cf> * ninl_best, where ninl_best is th number of inliers for the best model so far. Inliers |
|
|
are 3D correspondences (pt_ev, pt_gt) for which the Euclidean distance |pt_gt-T*pt_ev| is less than th.''' |
|
|
|
|
|
|
|
|
idx_cams = np.all(np.isfinite(ev_coord), axis=0) |
|
|
ev_coord = ev_coord[:, idx_cams] |
|
|
gt_coord = gt_coord[:, idx_cams] |
|
|
|
|
|
|
|
|
n = ev_coord.shape[1] |
|
|
r = ransac_threshold.shape[0] |
|
|
ransac_threshold = np.expand_dims(ransac_threshold, axis=0) |
|
|
ransac_threshold2 = ransac_threshold**2 |
|
|
ev_coord_1 = np.vstack((ev_coord, np.ones(n))) |
|
|
|
|
|
max_no_inl = np.zeros((1, r)) |
|
|
best_inl_err = np.full(r, np.inf) |
|
|
best_transf_matrix = np.zeros((r, 4, 4)) |
|
|
best_err = np.full((n, r), np.inf) |
|
|
strict_inl = np.full((n, r), False) |
|
|
triplets_used = np.zeros((3, r)) |
|
|
|
|
|
|
|
|
for ii in range(n-2): |
|
|
for jj in range(ii+1, n-1): |
|
|
for kk in range(jj+1, n): |
|
|
i = [ii, jj, kk] |
|
|
triplets_used_now = np.full((n), False) |
|
|
triplets_used_now[i] = True |
|
|
|
|
|
if np.all(strict_inl[i]): |
|
|
continue |
|
|
|
|
|
transf_matrix = affine_matrix_from_points(ev_coord[:, i], gt_coord[:, i], usesvd=False) |
|
|
|
|
|
rotranslated = np.matmul(transf_matrix[:3], ev_coord_1) |
|
|
|
|
|
err = np.sum((rotranslated - gt_coord)**2, axis=0) |
|
|
inl = np.expand_dims(err, axis=1) < ransac_threshold2 |
|
|
no_inl = np.sum(inl, axis=0) |
|
|
|
|
|
to_ref = np.squeeze(((no_inl > 2) & (no_inl > max_no_inl * inl_cf)), axis=0) |
|
|
for q in np.argwhere(to_ref): |
|
|
qq = q[0] |
|
|
if np.any(np.all((np.expand_dims(inl[:, qq], axis=1) == inl[:, :qq]), axis=0)): |
|
|
|
|
|
continue |
|
|
|
|
|
transf_matrix = affine_matrix_from_points(ev_coord[:, inl[:, qq]], gt_coord[:, inl[:, qq]]) |
|
|
|
|
|
rotranslated = np.matmul(transf_matrix[:3], ev_coord_1) |
|
|
|
|
|
err_ref = np.sum((rotranslated - gt_coord)**2, axis=0) |
|
|
err_ref_sum = np.sum(err_ref, axis=0) |
|
|
err_ref = np.expand_dims(err_ref, axis=1) |
|
|
inl_ref = err_ref < ransac_threshold2 |
|
|
no_inl_ref = np.sum(inl_ref, axis=0) |
|
|
|
|
|
to_update = np.squeeze((no_inl_ref > max_no_inl) | ((no_inl_ref == max_no_inl) & (err_ref_sum < best_inl_err)), axis=0) |
|
|
if np.any(to_update): |
|
|
triplets_used[0, to_update] = ii |
|
|
triplets_used[1, to_update] = jj |
|
|
triplets_used[2, to_update] = kk |
|
|
max_no_inl[:, to_update] = no_inl_ref[to_update] |
|
|
best_err[:, to_update] = np.sqrt(err_ref) |
|
|
best_inl_err[to_update] = err_ref_sum |
|
|
strict_inl[:, to_update] = (best_err[:, to_update] < strict_cf * ransac_threshold[:, to_update]) |
|
|
best_transf_matrix[to_update] = transf_matrix |
|
|
|
|
|
best_model = { |
|
|
"valid_cams": idx_cams, |
|
|
"no_inl": max_no_inl, |
|
|
"err": best_err, |
|
|
"triplets_used": triplets_used, |
|
|
"transf_matrix": best_transf_matrix} |
|
|
return best_model |
|
|
|
|
|
|
|
|
def affine_matrix_from_points(v0, v1, shear=False, scale=True, usesvd=True): |
|
|
'''Return affine transform matrix to register two point sets. |
|
|
v0 and v1 are shape (ndims, -1) arrays of at least ndims non-homogeneous |
|
|
coordinates, where ndims is the dimensionality of the coordinate space. |
|
|
If shear is False, a similarity transformation matrix is returned. |
|
|
If also scale is False, a rigid/Euclidean traffansformation matrix |
|
|
is returned. |
|
|
By default the algorithm by Hartley and Zissermann [15] is used. |
|
|
If usesvd is True, similarity and Euclidean transformation matrices |
|
|
are calculated by minimizing the weighted sum of squared deviations |
|
|
(RMSD) according to the algorithm by Kabsch [8]. |
|
|
Otherwise, and if ndims is 3, the quaternion based algorithm by Horn [9] |
|
|
is used, which is slower when using this Python implementation. |
|
|
The returned matrix performs rotation, translation and uniform scaling |
|
|
(if specified).''' |
|
|
|
|
|
v0 = np.array(v0, dtype=np.float64, copy=True) |
|
|
v1 = np.array(v1, dtype=np.float64, copy=True) |
|
|
|
|
|
ndims = v0.shape[0] |
|
|
if ndims < 2 or v0.shape[1] < ndims or v0.shape != v1.shape: |
|
|
raise ValueError("input arrays are of wrong shape or type") |
|
|
|
|
|
|
|
|
t0 = -np.mean(v0, axis=1) |
|
|
M0 = np.identity(ndims + 1) |
|
|
M0[:ndims, ndims] = t0 |
|
|
v0 += t0.reshape(ndims, 1) |
|
|
t1 = -np.mean(v1, axis=1) |
|
|
M1 = np.identity(ndims + 1) |
|
|
M1[:ndims, ndims] = t1 |
|
|
v1 += t1.reshape(ndims, 1) |
|
|
|
|
|
if shear: |
|
|
|
|
|
A = np.concatenate((v0, v1), axis=0) |
|
|
u, s, vh = np.linalg.svd(A.T) |
|
|
vh = vh[:ndims].T |
|
|
B = vh[:ndims] |
|
|
C = vh[ndims: 2 * ndims] |
|
|
t = np.dot(C, np.linalg.pinv(B)) |
|
|
t = np.concatenate((t, np.zeros((ndims, 1))), axis=1) |
|
|
M = np.vstack((t, ((0.0,) * ndims) + (1.0,))) |
|
|
elif usesvd or ndims != 3: |
|
|
|
|
|
u, s, vh = np.linalg.svd(np.dot(v1, v0.T)) |
|
|
|
|
|
R = np.dot(u, vh) |
|
|
if np.linalg.det(R) < 0.0: |
|
|
|
|
|
R -= np.outer(u[:, ndims - 1], vh[ndims - 1, :] * 2.0) |
|
|
s[-1] *= -1.0 |
|
|
|
|
|
M = np.identity(ndims + 1) |
|
|
M[:ndims, :ndims] = R |
|
|
else: |
|
|
|
|
|
|
|
|
xx, yy, zz = np.sum(v0 * v1, axis=1) |
|
|
xy, yz, zx = np.sum(v0 * np.roll(v1, -1, axis=0), axis=1) |
|
|
xz, yx, zy = np.sum(v0 * np.roll(v1, -2, axis=0), axis=1) |
|
|
N = [ |
|
|
[xx + yy + zz, 0.0, 0.0, 0.0], |
|
|
[yz - zy, xx - yy - zz, 0.0, 0.0], |
|
|
[zx - xz, xy + yx, yy - xx - zz, 0.0], |
|
|
[xy - yx, zx + xz, yz + zy, zz - xx - yy], |
|
|
] |
|
|
|
|
|
w, V = np.linalg.eigh(N) |
|
|
q = V[:, np.argmax(w)] |
|
|
q /= np.linalg.norm(q + _EPS) |
|
|
|
|
|
M = quaternion_matrix(q) |
|
|
|
|
|
if scale and not shear: |
|
|
|
|
|
v0 *= v0 |
|
|
v1 *= v1 |
|
|
M[:ndims, :ndims] *= math.sqrt(np.sum(v1) / np.sum(v0)) |
|
|
|
|
|
|
|
|
M = np.dot(np.linalg.inv(M1), np.dot(M, M0)) |
|
|
M /= M[ndims, ndims] |
|
|
|
|
|
return M |
|
|
|
|
|
|
|
|
def tth_from_csv(csv_file): |
|
|
'''read thresholds from csv file <csv_file>''' |
|
|
|
|
|
tth = {} |
|
|
label_idx = {} |
|
|
n_thresholds = [] |
|
|
with open(csv_file, newline='\n') as csvfile: |
|
|
csv_lines = csv.reader(csvfile, delimiter=',') |
|
|
header = True |
|
|
for row in csv_lines: |
|
|
if header: |
|
|
header = False |
|
|
for i, name in enumerate(row): label_idx[name] = i |
|
|
continue |
|
|
if not row: |
|
|
continue |
|
|
dataset = row[label_idx['dataset']] |
|
|
scene = row[label_idx['scene']] |
|
|
th = np.array([float(x) for x in (row[label_idx['thresholds']].split(';'))]) |
|
|
n_thresholds.append(len(th)) |
|
|
|
|
|
if not dataset in tth: |
|
|
tth[dataset] = {} |
|
|
tth[dataset][scene] = th |
|
|
if len(set(n_thresholds)) != 1: |
|
|
raise ValueError(f'Number of thresholds vary per scene: {list(set(n_thresholds))}') |
|
|
|
|
|
return tth, n_thresholds[0] |
|
|
|
|
|
|
|
|
def generate_mask_all_public(gt_data): |
|
|
mask = {} |
|
|
for dataset in gt_data: |
|
|
if dataset not in mask: |
|
|
mask[dataset] = {} |
|
|
for scene in gt_data[dataset]: |
|
|
if scene not in mask[dataset]: |
|
|
mask[dataset][scene] = {} |
|
|
for image in gt_data[dataset][scene]: |
|
|
mask[dataset][scene][image] = True |
|
|
return mask |
|
|
|
|
|
|
|
|
def fuse_score(mAA_score, cluster_score, combo_mode): |
|
|
if combo_mode =='harmonic': |
|
|
|
|
|
if (mAA_score + cluster_score) == 0: |
|
|
score = 0 |
|
|
else: |
|
|
score = 2 * mAA_score * cluster_score / (mAA_score + cluster_score) |
|
|
elif combo_mode == 'geometric': |
|
|
score = (mAA_score * cluster_score) ** 0.5 |
|
|
elif combo_mode == 'arithmetic': |
|
|
|
|
|
score = (mAA_score + cluster_score) * 0.5 |
|
|
elif combo_mode == 'mAA': |
|
|
score = mAA_score |
|
|
elif combo_mode == 'clusterness': |
|
|
score = cluster_score |
|
|
|
|
|
return score |
|
|
|
|
|
|
|
|
def get_clusterness_score(best_cluster, best_user_scene_sum): |
|
|
n = np.sum(best_cluster) |
|
|
m = np.sum(best_user_scene_sum) |
|
|
if m == 0: |
|
|
cluster_score = 0 |
|
|
else: |
|
|
cluster_score = n / m |
|
|
|
|
|
return cluster_score |
|
|
|
|
|
|
|
|
def get_mAA_score(best_gt_scene_sum, best_gt_scene, thresholds, dataset, best_model, best_err, skip_top_thresholds, to_dec, lt): |
|
|
n = np.sum(best_gt_scene_sum) |
|
|
a = 0 |
|
|
for i, scene in enumerate(best_gt_scene): |
|
|
ths = thresholds[dataset][scene] |
|
|
|
|
|
if len(best_model[i]) < 1: |
|
|
continue |
|
|
|
|
|
tmp = best_err[i][:, skip_top_thresholds:] < np.expand_dims(np.asarray(ths[skip_top_thresholds:]), axis=0) |
|
|
a = a + np.sum(np.maximum(np.sum(tmp, axis=0) - to_dec, 0)) |
|
|
|
|
|
b = max(0, lt * (n - len(best_gt_scene) * to_dec)) |
|
|
if b == 0: |
|
|
mAA_score = 0 |
|
|
else: |
|
|
mAA_score = a / b |
|
|
|
|
|
return mAA_score |
|
|
|
|
|
|
|
|
def read_mask_csv(mask_filename='split_mask.csv'): |
|
|
'''IMC2025 read split labels''' |
|
|
|
|
|
data = {} |
|
|
label_idx = {} |
|
|
with open(mask_filename, newline='\n') as csvfile: |
|
|
csv_lines = csv.reader(csvfile, delimiter=',') |
|
|
|
|
|
header = True |
|
|
for row in csv_lines: |
|
|
if header: |
|
|
header = False |
|
|
for i, name in enumerate(row): label_idx[name] = i |
|
|
continue |
|
|
|
|
|
dataset = row[label_idx['dataset']] |
|
|
scene = row[label_idx['scene']] |
|
|
image = row[label_idx['image']] |
|
|
label = row[label_idx['mask']] == 'True' |
|
|
|
|
|
if not (dataset in data): |
|
|
data[dataset] = {} |
|
|
|
|
|
if not (scene in data[dataset]): |
|
|
data[dataset][scene] = {} |
|
|
|
|
|
data[dataset][scene][image] = label |
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
def score( |
|
|
*, |
|
|
gt_csv, |
|
|
user_csv, |
|
|
thresholds_csv, |
|
|
mask_csv=None, |
|
|
combo_mode='harmonic', |
|
|
inl_cf=0, |
|
|
strict_cf=-1, |
|
|
skip_top_thresholds=2, |
|
|
to_dec=3, |
|
|
verbose=False, |
|
|
): |
|
|
'''compute the score: <gt_csv>/<user_csv> - gt/submission csv file; |
|
|
<combo_mode> - how to mix mAA_score and clusterness score ["harmonic", "geometric", "arithmetic"]; |
|
|
<inl_cf>, <strict_cf>, <skip_threshold>, <to_dec> - parameters to be passed to mAA computation, see previous IMC challenge; |
|
|
<thresholds> - the threshold dict tth, <mask_csv> - public/private label csv file''' |
|
|
|
|
|
gt_data = read_csv(gt_csv) |
|
|
user_data = read_csv(user_csv) |
|
|
|
|
|
assert check_data(gt_data, user_data, print_error=True) |
|
|
|
|
|
mask = read_mask_csv(mask_csv) if mask_csv else generate_mask_all_public(gt_data) |
|
|
one_mask = 0 |
|
|
all_mask = 0 |
|
|
for dataset in mask: |
|
|
for scene in mask[dataset]: |
|
|
one_mask = one_mask + sum([1 for image in mask[dataset][scene] if mask[dataset][scene][image]]) |
|
|
all_mask = all_mask + len(mask[dataset][scene]) |
|
|
pct = one_mask / all_mask |
|
|
|
|
|
thresholds, th_n = tth_from_csv(thresholds_csv) |
|
|
lt = th_n - skip_top_thresholds |
|
|
|
|
|
|
|
|
stat_score = [] |
|
|
stat_mAA = [] |
|
|
stat_clusterness = [] |
|
|
|
|
|
|
|
|
stat_score_mask_a = [] |
|
|
stat_mAA_mask_a = [] |
|
|
stat_clusterness_mask_a = [] |
|
|
|
|
|
|
|
|
stat_score_mask_b = [] |
|
|
stat_mAA_mask_b = [] |
|
|
stat_clusterness_mask_b = [] |
|
|
|
|
|
for dataset in gt_data.keys(): |
|
|
gt_dataset = gt_data[dataset] |
|
|
user_dataset = user_data[dataset] |
|
|
|
|
|
lg = len(gt_dataset) |
|
|
lu = len(user_dataset) |
|
|
|
|
|
|
|
|
model_table = [] |
|
|
err_table = [] |
|
|
mAA_table = np.full((lg, lu), -1).astype(float) |
|
|
cluster_table = np.full((lg, lu), -1).astype(int) |
|
|
gt_scene_sum_table = np.full((lg, lu), -1).astype(np.float64) |
|
|
user_scene_sum_table = np.full((lg, lu), -1).astype(np.float64) |
|
|
|
|
|
|
|
|
err_table_mask_a = [] |
|
|
mAA_table_mask_a = np.full((lg, lu), -1).astype(float) |
|
|
cluster_table_mask_a = np.full((lg, lu), -1).astype(int) |
|
|
gt_scene_sum_table_mask_a = np.full((lg, lu), -1).astype(np.float64) |
|
|
user_scene_sum_table_mask_a = np.full((lg, lu), -1).astype(np.float64) |
|
|
|
|
|
|
|
|
err_table_mask_b = [] |
|
|
mAA_table_mask_b = np.full((lg, lu), -1).astype(float) |
|
|
cluster_table_mask_b = np.full((lg, lu), -1).astype(int) |
|
|
gt_scene_sum_table_mask_b = np.full((lg, lu), -1).astype(np.float64) |
|
|
user_scene_sum_table_mask_b = np.full((lg, lu), -1).astype(np.float64) |
|
|
|
|
|
|
|
|
best_gt_scene = [] |
|
|
best_user_scene = [] |
|
|
best_model = [] |
|
|
best_err = [] |
|
|
best_mAA = np.zeros(lg) |
|
|
best_cluster = np.zeros(lg) |
|
|
best_gt_scene_sum = np.zeros(lg) |
|
|
best_user_scene_sum = np.zeros(lg) |
|
|
|
|
|
|
|
|
best_err_mask_a = [] |
|
|
best_mAA_mask_a = np.zeros(lg) |
|
|
best_cluster_mask_a = np.zeros(lg) |
|
|
best_gt_scene_sum_mask_a = np.zeros(lg) |
|
|
best_user_scene_sum_mask_a = np.zeros(lg) |
|
|
|
|
|
|
|
|
best_err_mask_b = [] |
|
|
best_mAA_mask_b = np.zeros(lg) |
|
|
best_cluster_mask_b = np.zeros(lg) |
|
|
best_gt_scene_sum_mask_b = np.zeros(lg) |
|
|
best_user_scene_sum_mask_b = np.zeros(lg) |
|
|
|
|
|
|
|
|
gt_scene_list = [] |
|
|
for i, gt_scene in enumerate(gt_dataset.keys()): |
|
|
gt_scene_list.append(gt_scene) |
|
|
|
|
|
model_row = [] |
|
|
err_row = [] |
|
|
err_row_mask_a = [] |
|
|
err_row_mask_b = [] |
|
|
|
|
|
user_scene_list = [] |
|
|
for j, user_scene in enumerate(user_dataset.keys()): |
|
|
user_scene_list.append(user_scene) |
|
|
|
|
|
if (gt_scene == 'outliers') or (user_scene == 'outliers'): |
|
|
model_row.append([]) |
|
|
err_row.append([]) |
|
|
err_row_mask_a.append([]) |
|
|
err_row_mask_b.append([]) |
|
|
continue |
|
|
|
|
|
ths = thresholds[dataset][gt_scene] |
|
|
|
|
|
gt_cams = gt_data[dataset][gt_scene] |
|
|
user_cams = user_data[dataset][user_scene] |
|
|
|
|
|
|
|
|
m = len(gt_cams) |
|
|
m_mask_a = np.sum([mask[dataset][gt_scene][image] for image in mask[dataset][gt_scene].keys()]) |
|
|
m_mask_b = np.sum([not mask[dataset][gt_scene][image] for image in mask[dataset][gt_scene].keys()]) |
|
|
|
|
|
|
|
|
good_cams = [] |
|
|
for image_path in gt_cams.keys(): |
|
|
if image_path in user_cams.keys(): |
|
|
good_cams.append(image_path) |
|
|
|
|
|
good_cams_mask = [] |
|
|
for image in good_cams: |
|
|
good_cams_mask.append(mask[dataset][gt_scene][image]) |
|
|
good_cams_mask_a = np.asarray(good_cams_mask) |
|
|
|
|
|
good_cams_mask = [] |
|
|
for image in good_cams: |
|
|
good_cams_mask.append(not mask[dataset][gt_scene][image]) |
|
|
good_cams_mask_b = np.asarray(good_cams_mask) |
|
|
|
|
|
|
|
|
n = len(good_cams) |
|
|
n_mask_a = np.sum(good_cams_mask_a) |
|
|
n_mask_b = np.sum(good_cams_mask_b) |
|
|
|
|
|
u_cameras = np.zeros((3, n)) |
|
|
g_cameras = np.zeros((3, n)) |
|
|
|
|
|
ii = 0 |
|
|
for k in good_cams: |
|
|
u_cameras[:, ii] = user_cams[k]['c'] |
|
|
g_cameras[:, ii] = gt_cams[k]['c'] |
|
|
ii += 1 |
|
|
|
|
|
|
|
|
model = register_by_Horn(u_cameras, g_cameras, np.asarray(ths), inl_cf, strict_cf) |
|
|
|
|
|
|
|
|
mAA = mAA_on_cameras(model["err"], ths, m, skip_top_thresholds, to_dec) |
|
|
|
|
|
if (len(model['valid_cams']) == 0) or (len(good_cams_mask_a) == 0): mAA_mask_a = np.float64(0.0) |
|
|
else: mAA_mask_a = mAA_on_cameras(model["err"][good_cams_mask_a[model['valid_cams']]], ths, m_mask_a, skip_top_thresholds, to_dec * pct) |
|
|
|
|
|
if (len(model['valid_cams']) == 0) or (len(good_cams_mask_b) == 0): mAA_mask_b = np.float64(0.0) |
|
|
else: mAA_mask_b = mAA_on_cameras(model["err"][good_cams_mask_b[model['valid_cams']]], ths, m_mask_b, skip_top_thresholds, to_dec * (1 - pct)) |
|
|
|
|
|
len_user_scene = len(user_data[dataset][user_scene]) |
|
|
|
|
|
aux_masked = {} |
|
|
masked_dataset = mask[dataset] |
|
|
for scene in masked_dataset.keys(): |
|
|
for image in masked_dataset[scene]: |
|
|
aux_masked[image] = masked_dataset[scene][image] |
|
|
|
|
|
user_data_masked = [] |
|
|
for image in user_data[dataset][user_scene]: |
|
|
if (image in aux_masked): user_data_masked.append(aux_masked[image]) |
|
|
|
|
|
len_user_scene_mask_a = np.sum(np.asarray(user_data_masked)) |
|
|
len_user_scene_mask_b = np.sum(~np.asarray(user_data_masked)) |
|
|
|
|
|
|
|
|
err_row.append(model["err"]) |
|
|
mAA_table[i, j] = mAA |
|
|
cluster_table[i, j] = n |
|
|
gt_scene_sum_table[i, j] = m |
|
|
user_scene_sum_table[i, j] = len_user_scene |
|
|
|
|
|
if (len(model['valid_cams']) == 0) or (len(good_cams_mask_a) == 0): err_row_mask_a.append(np.zeros((0, th_n))) |
|
|
else: err_row_mask_a.append(model["err"][good_cams_mask_a[model['valid_cams']]]) |
|
|
|
|
|
if (len(model['valid_cams']) == 0) or (len(good_cams_mask_b) == 0): err_row_mask_b.append(np.zeros((0, th_n))) |
|
|
else: err_row_mask_b.append(model["err"][good_cams_mask_b[model['valid_cams']]]) |
|
|
|
|
|
|
|
|
mAA_table_mask_a[i, j] = mAA_mask_a |
|
|
cluster_table_mask_a[i, j] = n_mask_a |
|
|
gt_scene_sum_table_mask_a[i, j] = m_mask_a |
|
|
user_scene_sum_table_mask_a[i, j] = len_user_scene_mask_a |
|
|
|
|
|
|
|
|
mAA_table_mask_b[i, j] = mAA_mask_b |
|
|
cluster_table_mask_b[i, j] = n_mask_b |
|
|
gt_scene_sum_table_mask_b[i, j] = m_mask_b |
|
|
user_scene_sum_table_mask_b[i, j] = len_user_scene_mask_b |
|
|
|
|
|
model_row.append(model) |
|
|
|
|
|
model_table.append(model_row) |
|
|
err_table.append(err_row) |
|
|
err_table_mask_a.append(err_row_mask_a) |
|
|
err_table_mask_b.append(err_row_mask_b) |
|
|
|
|
|
|
|
|
for i, gt_scene in enumerate(gt_dataset.keys()): |
|
|
best_ind = np.lexsort((-mAA_table[i], -cluster_table[i]))[0] |
|
|
best_gt_scene.append(gt_scene) |
|
|
best_user_scene.append(user_scene_list[best_ind]) |
|
|
best_model.append(model_table[i][best_ind]) |
|
|
|
|
|
|
|
|
best_err.append(err_table[i][best_ind]) |
|
|
best_mAA[i] = mAA_table[i, best_ind] |
|
|
best_cluster[i] = cluster_table[i, best_ind] |
|
|
best_gt_scene_sum[i] = gt_scene_sum_table[i, best_ind] |
|
|
best_user_scene_sum[i] = user_scene_sum_table[i, best_ind] |
|
|
|
|
|
|
|
|
best_err_mask_a.append(err_table_mask_a[i][best_ind]) |
|
|
best_mAA_mask_a[i] = mAA_table_mask_a[i, best_ind] |
|
|
best_cluster_mask_a[i] = cluster_table_mask_a[i, best_ind] |
|
|
best_gt_scene_sum_mask_a[i] = gt_scene_sum_table_mask_a[i, best_ind] |
|
|
best_user_scene_sum_mask_a[i] = user_scene_sum_table_mask_a[i, best_ind] |
|
|
|
|
|
|
|
|
best_err_mask_b.append(err_table_mask_b[i][best_ind]) |
|
|
best_mAA_mask_b[i] = mAA_table_mask_b[i, best_ind] |
|
|
best_cluster_mask_b[i] = cluster_table_mask_b[i, best_ind] |
|
|
best_gt_scene_sum_mask_b[i] = gt_scene_sum_table_mask_b[i, best_ind] |
|
|
best_user_scene_sum_mask_b[i] = user_scene_sum_table_mask_b[i, best_ind] |
|
|
|
|
|
|
|
|
outlier_idx = -1 |
|
|
for i, scene in enumerate(best_gt_scene): |
|
|
if scene == 'outliers': |
|
|
outlier_idx = i |
|
|
break |
|
|
|
|
|
if outlier_idx > -1: |
|
|
best_gt_scene.pop(outlier_idx) |
|
|
best_user_scene.pop(outlier_idx) |
|
|
best_model.pop(outlier_idx) |
|
|
|
|
|
|
|
|
best_err.pop(outlier_idx) |
|
|
best_mAA = np.delete(best_mAA, outlier_idx) |
|
|
best_cluster = np.delete(best_cluster, outlier_idx) |
|
|
best_gt_scene_sum = np.delete(best_gt_scene_sum, outlier_idx) |
|
|
best_user_scene_sum = np.delete(best_user_scene_sum, outlier_idx) |
|
|
|
|
|
|
|
|
best_err_mask_a.pop(outlier_idx) |
|
|
best_mAA_mask_a = np.delete(best_mAA_mask_a, outlier_idx) |
|
|
best_cluster_mask_a = np.delete(best_cluster_mask_a, outlier_idx) |
|
|
best_gt_scene_sum_mask_a = np.delete(best_gt_scene_sum_mask_a, outlier_idx) |
|
|
best_user_scene_sum_mask_a = np.delete(best_user_scene_sum_mask_a, outlier_idx) |
|
|
|
|
|
|
|
|
best_err_mask_b.pop(outlier_idx) |
|
|
best_mAA_mask_b = np.delete(best_mAA_mask_b, outlier_idx) |
|
|
best_cluster_mask_b = np.delete(best_cluster_mask_b, outlier_idx) |
|
|
best_gt_scene_sum_mask_b = np.delete(best_gt_scene_sum_mask_b, outlier_idx) |
|
|
best_user_scene_sum_mask_b = np.delete(best_user_scene_sum_mask_b, outlier_idx) |
|
|
|
|
|
|
|
|
|
|
|
cluster_score = get_clusterness_score(best_cluster, best_user_scene_sum) |
|
|
cluster_score_mask_a = get_clusterness_score(best_cluster_mask_a, best_user_scene_sum_mask_a) |
|
|
cluster_score_mask_b = get_clusterness_score(best_cluster_mask_b, best_user_scene_sum_mask_b) |
|
|
|
|
|
|
|
|
|
|
|
mAA_score = get_mAA_score(best_gt_scene_sum, best_gt_scene, thresholds, dataset, best_model, best_err, skip_top_thresholds, to_dec, lt) |
|
|
mAA_score_mask_a = get_mAA_score(best_gt_scene_sum_mask_a, best_gt_scene, thresholds, dataset, best_model, best_err_mask_a, skip_top_thresholds, to_dec * pct, lt) |
|
|
mAA_score_mask_b = get_mAA_score(best_gt_scene_sum_mask_b, best_gt_scene, thresholds, dataset, best_model, best_err_mask_b, skip_top_thresholds, to_dec * (1 - pct), lt) |
|
|
|
|
|
|
|
|
score = fuse_score(mAA_score, cluster_score, combo_mode) |
|
|
score_mask_a = fuse_score(mAA_score_mask_a, cluster_score_mask_a, combo_mode) |
|
|
score_mask_b = fuse_score(mAA_score_mask_b, cluster_score_mask_b, combo_mode) |
|
|
|
|
|
if verbose: |
|
|
print(f'{dataset}: score={score * 100:.2f}% (mAA={mAA_score * 100:.2f}%, clusterness={cluster_score * 100:.2f}%)') |
|
|
if mask_csv: |
|
|
print(f'\tPublic split: score={score_mask_a * 100:.2f}% (mAA={mAA_score_mask_a * 100:.2f}%, clusterness={cluster_score_mask_a * 100:.2f}%)') |
|
|
print(f'\tPrivate split: score={score_mask_b * 100:.2f}% (mAA={mAA_score_mask_b * 100:.2f}%, clusterness={cluster_score_mask_b * 100:.2f}%)') |
|
|
|
|
|
|
|
|
stat_mAA.append(mAA_score) |
|
|
stat_clusterness.append(cluster_score) |
|
|
stat_score.append(score) |
|
|
|
|
|
|
|
|
stat_mAA_mask_a.append(mAA_score_mask_a) |
|
|
stat_clusterness_mask_a.append(cluster_score_mask_a) |
|
|
stat_score_mask_a.append(score_mask_a) |
|
|
|
|
|
|
|
|
stat_mAA_mask_b.append(mAA_score_mask_b) |
|
|
stat_clusterness_mask_b.append(cluster_score_mask_b) |
|
|
stat_score_mask_b.append(score_mask_b) |
|
|
|
|
|
|
|
|
final_score = 100 * np.mean(stat_score) |
|
|
final_mAA = 100 * np.mean(stat_mAA) |
|
|
final_clusterness = 100 * np.mean(stat_clusterness) |
|
|
|
|
|
|
|
|
final_score_mask_a = 100 * np.mean(stat_score_mask_a) |
|
|
final_mAA_mask_a = 100 * np.mean(stat_mAA_mask_a) |
|
|
final_clusterness_mask_a = 100 * np.mean(stat_clusterness_mask_a) |
|
|
|
|
|
|
|
|
final_score_mask_b = 100 * np.mean(stat_score_mask_b) |
|
|
final_mAA_mask_b = 100 * np.mean(stat_mAA_mask_b) |
|
|
final_clusterness_mask_b = 100 * np.mean(stat_clusterness_mask_b) |
|
|
|
|
|
if verbose: |
|
|
print(f'Average over all datasets: score={final_score:.2f}% (mAA={final_mAA:.2f}%, clusterness={final_clusterness:.2f}%)') |
|
|
if mask_csv: |
|
|
print(f'\tPublic split: score={final_score_mask_a:.2f}% (mAA={final_mAA_mask_a:.2f}%, clusterness={final_clusterness_mask_a:.2f}%)') |
|
|
print(f'\tPrivate split: score={final_score_mask_b:.2f}% (mAA={final_mAA_mask_b:.2f}%, clusterness={final_clusterness_mask_b:.2f}%)') |
|
|
|
|
|
scene_score_dict = {dataset: score * 100 for dataset, score in zip(gt_data, stat_score)} |
|
|
scene_score_dict_mask_a = None if mask_csv is None else {dataset: score * 100 for dataset, score in zip(gt_data, stat_score_mask_a)} |
|
|
scene_score_dict_mask_b = None if mask_csv is None else {dataset: score * 100 for dataset, score in zip(gt_data, stat_score_mask_b)} |
|
|
|
|
|
return ( |
|
|
(final_score, final_score_mask_a, final_score_mask_b), |
|
|
(scene_score_dict, scene_score_dict_mask_a, scene_score_dict_mask_b) |
|
|
) |
|
|
|