|
|
import ast |
|
|
import functools |
|
|
import json |
|
|
import multiprocessing |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple |
|
|
from datasets import load_dataset |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from scipy.spatial import KDTree |
|
|
from skimage.draw import line_aa |
|
|
from skimage.draw import line as sk_line |
|
|
from stqdm import stqdm as tqdm |
|
|
|
|
|
|
|
|
PENALTY_SCORES_PATH = "./src/score_calculation/category_penalty.tsv" |
|
|
M2F_CONFIG_PATH = "./src/score_calculation/mask2former_config.json" |
|
|
BAD_SCORE_THRESHOLD = 3234.75 |
|
|
|
|
|
|
|
|
@functools.lru_cache(maxsize=4) |
|
|
def create_penalty_lookup(embodiment: str) -> Dict[int, float]: |
|
|
"""Creates a direct mapping from a category ID (`label_id`) to its penalty factor.""" |
|
|
|
|
|
|
|
|
penalty_values_df = pd.read_csv(PENALTY_SCORES_PATH, sep="\t") |
|
|
|
|
|
|
|
|
with open(M2F_CONFIG_PATH, "r") as f: |
|
|
config = json.load(f) |
|
|
id2label = {int(k): v for k, v in config["id2label"].items()} |
|
|
|
|
|
label_id_to_penalty = {} |
|
|
for label_id, category_name in id2label.items(): |
|
|
|
|
|
|
|
|
row = penalty_values_df[ |
|
|
penalty_values_df["category"] == category_name |
|
|
] |
|
|
penalty_value = float(row.iloc[0][embodiment]) * 0.8 |
|
|
label_id_to_penalty[label_id] = penalty_value |
|
|
|
|
|
return label_id_to_penalty |
|
|
|
|
|
def rasterize_gt_trace( |
|
|
gt_trace: List[List[float]], height: int, width: int |
|
|
) -> np.ndarray: |
|
|
"""Converts a line trace into a dense array of pixel coordinates.""" |
|
|
|
|
|
gt_trace_np = np.array(gt_trace) |
|
|
gt_line_pixels = [] |
|
|
if len(gt_trace_np) > 1: |
|
|
for i in range(len(gt_trace_np) - 1): |
|
|
p1, p2 = gt_trace_np[i], gt_trace_np[i + 1] |
|
|
r0, c0, r1, c1 = ( |
|
|
int(round(p1[1])), |
|
|
int(round(p1[0])), |
|
|
int(round(p2[1])), |
|
|
int(round(p2[0])), |
|
|
) |
|
|
rr, cc, _ = line_aa(r0, c0, r1, c1) |
|
|
valid = (rr >= 0) & (rr < height) & (cc >= 0) & (cc < width) |
|
|
gt_line_pixels.extend(zip(rr[valid], cc[valid])) |
|
|
elif len(gt_trace_np) == 1: |
|
|
r, c = int(round(gt_trace_np[0][1])), int(round(gt_trace_np[0][0])) |
|
|
if 0 <= r < height and 0 <= c < width: |
|
|
gt_line_pixels.append((r, c)) |
|
|
|
|
|
return np.array(gt_line_pixels) |
|
|
|
|
|
def create_penalty_mask( |
|
|
segmentation_mask: np.ndarray, |
|
|
gt_trace: List[List[float]], |
|
|
embodiment: str, |
|
|
distance_threshold: float = 35, |
|
|
) -> np.ndarray: |
|
|
|
|
|
|
|
|
height, width = segmentation_mask.shape |
|
|
penalty_mask = np.full((height, width), 0, dtype=float) |
|
|
|
|
|
|
|
|
gt_line_pixels = rasterize_gt_trace(gt_trace, height, width) |
|
|
gt_tree = KDTree(gt_line_pixels) |
|
|
|
|
|
|
|
|
label_id_to_penalty = create_penalty_lookup(embodiment) |
|
|
|
|
|
|
|
|
all_label_ids = segmentation_mask.ravel() |
|
|
|
|
|
|
|
|
undesired_mask = np.isin(all_label_ids, list(label_id_to_penalty.keys())) |
|
|
undesired_indices = np.where(undesired_mask)[0] |
|
|
if undesired_indices.size == 0: |
|
|
return penalty_mask |
|
|
|
|
|
|
|
|
rows, cols = np.unravel_index(undesired_indices, (height, width)) |
|
|
undesired_coords = np.vstack((rows, cols)).T |
|
|
|
|
|
|
|
|
distances, _ = gt_tree.query(undesired_coords) |
|
|
|
|
|
|
|
|
coords_to_penalize = undesired_coords[distances > distance_threshold] |
|
|
|
|
|
if coords_to_penalize.size > 0: |
|
|
|
|
|
rows_pen, cols_pen = coords_to_penalize[:, 0], coords_to_penalize[:, 1] |
|
|
label_ids_to_penalize = segmentation_mask[rows_pen, cols_pen] |
|
|
penalties = np.vectorize(label_id_to_penalty.get)(label_ids_to_penalize) |
|
|
penalty_mask[rows_pen, cols_pen] = penalties |
|
|
|
|
|
return penalty_mask |
|
|
|
|
|
def resample_to_match_length( |
|
|
trace_1: np.ndarray, trace_2: np.ndarray |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
|
|
|
if len(trace_1) == 0 or len(trace_2) == 0: |
|
|
raise ValueError("One of the traces is empty") |
|
|
if len(trace_1) == len(trace_2): |
|
|
return trace_1, trace_2 |
|
|
elif len(trace_1) > len(trace_2): |
|
|
longer, shorter = (trace_1, trace_2) |
|
|
else: |
|
|
shorter, longer = (trace_1, trace_2) |
|
|
if len(shorter) == 1: |
|
|
return shorter * len(longer), longer |
|
|
|
|
|
|
|
|
dists = np.cumsum( |
|
|
[0] |
|
|
+ [np.linalg.norm(shorter[i] - shorter[i - 1]) for i in range(1, len(shorter))] |
|
|
) |
|
|
dists = dists / dists[-1] |
|
|
|
|
|
|
|
|
new_params = np.linspace(0, 1, len(longer)) |
|
|
|
|
|
|
|
|
new_x = np.interp(new_params, dists, shorter[:, 0]) |
|
|
new_y = np.interp(new_params, dists, shorter[:, 1]) |
|
|
shorter = np.column_stack([new_x, new_y]) |
|
|
|
|
|
if len(trace_1) > len(trace_2): |
|
|
return longer, shorter |
|
|
else: |
|
|
return shorter, longer |
|
|
|
|
|
def calculate_semantic_penalty( |
|
|
prediction: np.ndarray, penalty_mask: np.ndarray |
|
|
) -> List[float]: |
|
|
|
|
|
penalties = [] |
|
|
for i in range(len(prediction) - 1): |
|
|
x1, y1 = int(round(prediction[i][0])), int(round(prediction[i][1])) |
|
|
x2, y2 = int(round(prediction[i + 1][0])), int(round(prediction[i + 1][1])) |
|
|
|
|
|
|
|
|
rr, cc = sk_line(y1, x1, y2, x2) |
|
|
|
|
|
|
|
|
height, width = penalty_mask.shape |
|
|
valid_indices = (rr >= 0) & (rr < height) & (cc >= 0) & (cc < width) |
|
|
penalties.extend(penalty_mask[rr[valid_indices], cc[valid_indices]].tolist()) |
|
|
|
|
|
return np.mean(penalties) |
|
|
|
|
|
def calculate_fde(prediction: np.ndarray, ground_truth: np.ndarray): |
|
|
|
|
|
return np.linalg.norm(prediction[-1] - ground_truth[-1]) |
|
|
|
|
|
def calculate_dtw(prediction: np.ndarray, ground_truth: np.ndarray): |
|
|
|
|
|
|
|
|
n, m = len(prediction), len(ground_truth) |
|
|
cost_matrix = np.full((n + 1, m + 1), np.inf) |
|
|
cost_matrix[0, 0] = 0 |
|
|
|
|
|
for i in range(1, n + 1): |
|
|
for j in range(1, m + 1): |
|
|
euclidean_distance = np.linalg.norm(prediction[i - 1] - ground_truth[j - 1]) |
|
|
|
|
|
|
|
|
min_prev_cost = min( |
|
|
cost_matrix[i - 1, j], |
|
|
cost_matrix[i, j - 1], |
|
|
cost_matrix[i - 1, j - 1], |
|
|
) |
|
|
|
|
|
cost_matrix[i, j] = euclidean_distance + min_prev_cost |
|
|
|
|
|
return cost_matrix[n, m] |
|
|
|
|
|
def normalize_score(score: float) -> float: |
|
|
|
|
|
|
|
|
return (BAD_SCORE_THRESHOLD - score) / BAD_SCORE_THRESHOLD * 100 |
|
|
|
|
|
def score( |
|
|
prediction: List[List[float]], |
|
|
ground_truths: List[List[List[float]]], |
|
|
segmentation_mask: np.ndarray, |
|
|
embodiment: str, |
|
|
): |
|
|
|
|
|
|
|
|
scores = [] |
|
|
for ground_truth in ground_truths: |
|
|
|
|
|
|
|
|
penalty_mask = create_penalty_mask(segmentation_mask, ground_truth, embodiment) |
|
|
|
|
|
|
|
|
prediction, ground_truth = np.array(prediction), np.array(ground_truth) |
|
|
|
|
|
|
|
|
if len(prediction) != len(ground_truth): |
|
|
prediction, ground_truth = resample_to_match_length(prediction, ground_truth) |
|
|
|
|
|
|
|
|
sem_penalty = calculate_semantic_penalty(prediction, penalty_mask) |
|
|
fde = calculate_fde(prediction, ground_truth) |
|
|
dtw = calculate_dtw(prediction, ground_truth) |
|
|
scores.append(dtw + fde + sem_penalty) |
|
|
|
|
|
|
|
|
score = min(scores) |
|
|
|
|
|
|
|
|
return normalize_score(score) |
|
|
|
|
|
def _initialize_worker(results_path, dataset_id, split_name): |
|
|
|
|
|
global _results_df, _get_sample |
|
|
|
|
|
|
|
|
_results_df = pd.read_csv(results_path, sep="\t") |
|
|
data_split = load_dataset(dataset_id)[split_name] |
|
|
|
|
|
|
|
|
id_to_index = {sample_id: i for i, sample_id in enumerate(data_split["sample_id"])} |
|
|
|
|
|
def get_sample(sample_id): |
|
|
idx = id_to_index[sample_id] |
|
|
return data_split[idx] |
|
|
|
|
|
_get_sample = get_sample |
|
|
|
|
|
def _score_chunk(indices: List[int]) -> List[Tuple[int, float]]: |
|
|
|
|
|
results = [] |
|
|
for idx in indices: |
|
|
row = _results_df.loc[idx] |
|
|
|
|
|
|
|
|
sample = _get_sample(row["sample_id"]) |
|
|
embodiment = row["embodiment"] |
|
|
prediction = json.loads(row["prediction"]) |
|
|
ground_truths = sample["ground_truth"][row["embodiment"]] |
|
|
segmentation_mask = np.array(sample["segmentation_mask"]) |
|
|
|
|
|
|
|
|
if ground_truths is None: |
|
|
raise ValueError(f"The sample {sample} has hidden ground-truths") |
|
|
|
|
|
|
|
|
if len(prediction) == 0: |
|
|
results.append((idx, np.nan)) |
|
|
continue |
|
|
|
|
|
|
|
|
s = score(prediction, ground_truths, segmentation_mask, embodiment) |
|
|
results.append((idx, s)) |
|
|
|
|
|
return results |
|
|
|
|
|
def score_predictions_parallel(results_path, dataset_id, split_name, num_processes=4): |
|
|
|
|
|
|
|
|
results_df = pd.read_csv(results_path, sep='\t') |
|
|
|
|
|
|
|
|
total_rows = len(results_df) |
|
|
chunk_size = (total_rows + num_processes - 1) // num_processes |
|
|
indices_chunks = [ |
|
|
list(range(i, min(i + chunk_size, total_rows))) |
|
|
for i in range(0, total_rows, chunk_size) |
|
|
] |
|
|
|
|
|
|
|
|
scored_df = results_df.copy() |
|
|
scored_df["score"] = np.nan |
|
|
with multiprocessing.Pool( |
|
|
processes=num_processes, |
|
|
initializer=_initialize_worker, |
|
|
initargs=( |
|
|
results_path, |
|
|
dataset_id, |
|
|
split_name, |
|
|
), |
|
|
) as pool: |
|
|
with tqdm(total=total_rows, desc="Scoring predictions") as pbar: |
|
|
for chunk_results in pool.imap_unordered(_score_chunk, indices_chunks): |
|
|
for idx, s in chunk_results: |
|
|
scored_df.at[idx, "score"] = s |
|
|
pbar.update(len(chunk_results)) |
|
|
|
|
|
return scored_df |
|
|
|
|
|
def score_predictions(results_df, dataset): |
|
|
|
|
|
|
|
|
id_to_index = {sample_id: i for i, sample_id in enumerate(dataset["sample_id"])} |
|
|
|
|
|
|
|
|
scores = [] |
|
|
for _, row in tqdm(results_df.iterrows(), total=len(results_df), desc="Scoring predictions"): |
|
|
|
|
|
|
|
|
sample_id = row["sample_id"] |
|
|
sample = dataset[id_to_index[sample_id]] |
|
|
|
|
|
|
|
|
embodiment = row["embodiment"] |
|
|
prediction = json.loads(row["prediction"]) |
|
|
ground_truths = sample["ground_truth"][row["embodiment"]] |
|
|
segmentation_mask = np.array(sample["segmentation_mask"]) |
|
|
|
|
|
if ground_truths is None: |
|
|
raise ValueError(f"The sample {sample} has hidden ground-truths") |
|
|
|
|
|
|
|
|
if len(prediction) == 0: |
|
|
scores.append(np.nan) |
|
|
continue |
|
|
|
|
|
|
|
|
s = score(prediction, ground_truths, segmentation_mask, embodiment) |
|
|
scores.append(s) |
|
|
|
|
|
|
|
|
scored_df = results_df.copy() |
|
|
scored_df["score"] = scores |
|
|
|
|
|
return scored_df |