import hashlib import os import pickle import numpy as np from sklearn.metrics import precision_recall_curve, auc, roc_auc_score,roc_curve def stable_long_hash(input_string): hash_object = hashlib.sha256(input_string.encode()) hex_digest = hash_object.hexdigest() int_hash = int(hex_digest, 16) long_long_hash = (int_hash & ((1 << 63) - 1)) return long_long_hash def load_pkl(path): with open(path, 'rb') as f: return pickle.load(f) def save_pkl(obj, path): with open(path, 'wb') as f: pickle.dump(obj, f) def find_top_n(embeddings,n,index,data): if len(embeddings.shape) == 1: embeddings = embeddings.reshape(1, -1) top_ids_and_scores = index.search_knn(embeddings, n) data_ans=[] for i, (ids, scores) in enumerate(top_ids_and_scores): data_now=[] for id in ids: data_now.append((data[0][int(id)],data[1][int(id)],data[2][int(id)])) data_ans.append(data_now) return data_ans from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score def print_line(class_name, metrics, is_header=False): if is_header: line = f"| {'Class':<10} | " + " | ".join([f"{metric:<10}" for metric in metrics]) else: line = f"| {class_name:<10} | " + " | ".join([f"{metrics[metric]:<10.3f}" for metric in metrics]) print(line) if is_header: print('-' * len(line)) def calculate_per_class_metrics(classes, ground_truth, predictions): # Convert ground truth and predictions to numeric format gt_numeric = np.array([int(gt) for gt in ground_truth]) pred_numeric = np.array([int(pred) for pred in predictions]) results = {} for i, class_name in enumerate(classes): # For each class, calculate the 'vs rest' binary labels gt_binary = (gt_numeric == i).astype(int) pred_binary = (pred_numeric == i).astype(int) # Calculate metrics, handling cases where a class is not present in predictions or ground truth precision = precision_score(gt_binary, pred_binary, zero_division=0) recall = recall_score(gt_binary, pred_binary, zero_division=0) f1 = f1_score(gt_binary, pred_binary, zero_division=0) acc = np.mean(gt_binary == pred_binary) # Calculate recall for all other classes as 'rest' rest_recall = recall_score(1 - gt_binary, 1 - pred_binary, zero_division=0) results[class_name] = { 'Precision': precision, 'Recall': recall, 'F1 Score': f1, 'Accuracy': acc, 'Avg Recall (with rest)': (recall + rest_recall) / 2 } print_line("Metric", results[classes[0]], is_header=True) for class_name, metrics in results.items(): print_line(class_name, metrics) overall_metrics = {metric_name: np.mean([metrics[metric_name] for metrics in results.values()]) for metric_name in results[classes[0]].keys()} print_line("Overall", overall_metrics) def calculate_metrics(labels, preds): acc = accuracy_score(labels, preds) precision = precision_score(labels, preds, average='macro') recall = recall_score(labels, preds, average='macro') f1 = f1_score(labels, preds, average='macro') return acc, precision, recall, f1 def compute_three_recalls(labels, preds): all_n, all_p, tn, tp = 0, 0, 0, 0 for label, pred in zip(labels, preds): if label == '0': all_p += 1 if label == '1': all_n += 1 # Modified condition to treat None in preds as incorrect prediction if pred is not None and label == pred == '0': tp += 1 # Modified condition to treat None in preds as incorrect prediction if pred is not None and label == pred == '1': tn += 1 if pred is None: continue machine_rec , human_rec= tp * 100 / all_p if all_p != 0 else 0, tn * 100 / all_n if all_n != 0 else 0 avg_rec = (human_rec + machine_rec) / 2 return (human_rec, machine_rec, avg_rec) def compute_metrics(labels, preds,ids=None): # Handling None values in preds as incorrect predictions #preds = ['0' if pred is None else pred for pred in preds] if ids is not None: # Deduplicate labels and predictions for repeated ids dict_labels,dict_preds={},{} for i in range(len(ids)): dict_labels[ids[i]]=labels[i] dict_preds[ids[i]]=preds[i] labels=list(dict_labels.values()) preds=list(dict_preds.values()) human_rec, machine_rec, avg_rec = compute_three_recalls(labels, preds) acc = accuracy_score(labels, preds) precision = precision_score(labels, preds, pos_label='1') recall = recall_score(labels, preds, pos_label='1') f1 = f1_score(labels, preds, pos_label='1') # return human_rec, machine_rec, avg_rec return (human_rec, machine_rec, avg_rec, acc, precision, recall, f1) def evaluate_max_f1_metrics(test_labels, y_score): test_labels = np.array(test_labels) y_score = np.array(y_score) auroc = roc_auc_score(test_labels, y_score) precision, recall, thresholds = precision_recall_curve(test_labels, y_score, pos_label=1) pr_auc = auc(recall, precision) epsilon = 1e-6 f1_scores = 2 * precision * recall / (precision + recall+epsilon) best_index = f1_scores.argmax() best_f1 = f1_scores[best_index] best_precision = precision[best_index] best_recall = recall[best_index] threshold = thresholds[best_index] if best_index < len(thresholds) else 1.0 y_pred_max_f1 = (y_score >= threshold).astype(int) acc = (y_pred_max_f1 == test_labels).mean() tp = sum((y_pred_max_f1 == 1) & (test_labels == 1)) fn = sum((y_pred_max_f1 == 0) & (test_labels == 1)) fp = sum((y_pred_max_f1 == 1) & (test_labels == 0)) tn = sum((y_pred_max_f1 == 0) & (test_labels == 0)) pos_recall = tp / (tp + fn + epsilon) # recall for the positive class neg_recall = tn / (tn + fp + epsilon) # recall for the negative class avg_recall = (pos_recall + neg_recall) / 2 # average recall across classes metric = {'auroc': auroc, 'pr_auc': pr_auc, 'F1': best_f1, 'Precision': best_precision,\ 'Recall': best_recall, 'threshold': threshold, 'acc': acc, 'avg_recall': avg_recall,\ 'pos_recall': pos_recall, 'neg_recall': neg_recall} return metric def evaluate_metrics(test_labels, y_score, threshold_param=-1,target_fpr = 0.05): if isinstance(test_labels, list): test_labels = np.array(test_labels) if isinstance(y_score, list): y_score = np.array(y_score) if threshold_param != -1: if not (0 <= threshold_param <= 1): raise ValueError("Threshold must be between 0 and 1.") auroc = roc_auc_score(test_labels, y_score) precision, recall, thresholds = precision_recall_curve(test_labels, y_score, pos_label=1) pr_auc = auc(recall, precision) epsilon = 1e-6 f1_scores = 2 * precision * recall / (precision + recall + epsilon) if threshold_param == -1: best_index = f1_scores.argmax() F1 = f1_scores[best_index] Precision = precision[best_index] Recall = recall[best_index] threshold = thresholds[best_index] if best_index < len(thresholds) else 1.0 else: threshold = threshold_param index = np.where(thresholds >= threshold)[0][0] Precision = precision[index] Recall = recall[index] F1 = f1_scores[index] y_pred = (y_score >= threshold).astype(int) acc = (y_pred == test_labels).mean() tp = ((y_pred == 1) & (test_labels == 1)).sum() fn = ((y_pred == 0) & (test_labels == 1)).sum() fp = ((y_pred == 1) & (test_labels == 0)).sum() tn = ((y_pred == 0) & (test_labels == 0)).sum() pos_recall = tp / (tp + fn + epsilon) # TPR neg_recall = tn / (tn + fp + epsilon) # TNR avg_recall = (pos_recall + neg_recall) / 2 fpr, tpr, thds = roc_curve(test_labels, y_score) if len(fpr) > 0 and len(tpr) > 0: idx = np.argmin(np.abs(fpr - target_fpr)) tpr_at_fpr = tpr[idx] tpr_at_fpr_threshold = thds[idx] else: tpr_at_fpr = 0.0 metric = {'auroc': auroc, 'pr_auc': pr_auc, 'F1': F1, 'Precision': Precision,'Recall': Recall,\ 'threshold': threshold, 'acc': acc, 'avg_recall': avg_recall,'pos_recall': pos_recall,\ 'neg_recall': neg_recall, 'tpr_at_fpr': tpr_at_fpr, 'tpr_at_fpr_threshold': tpr_at_fpr_threshold} return metric # return (auroc, pr_auc, best_f1, best_precision, best_recall, threshold, # acc, avg_recall, pos_recall, neg_recall, tpr_at_fpr5) def load_datapath(path,include_adversarial=False,dataset_name='all',attack_type='all'): data_path = {'train':[],'valid':[],'test':[]} if dataset_name=='all': datasets = os.listdir(path) elif dataset_name=='M4': datasets = ['M4_monolingual','M4_multilingual'] elif dataset_name=='RAID_all': datasets = ['RAID','RAID_extra'] else: datasets = [dataset_name] for dataset in datasets: dataset_path = os.path.join(path,dataset) if attack_type!='all': dataset_path_list = [pth for pth in os.listdir(dataset_path) if attack_type in pth] else: dataset_path_list = os.listdir(dataset_path) for adv in dataset_path_list: if include_adversarial==False and 'no_attack' not in adv: continue adv_path = os.path.join(dataset_path,adv) for data in os.listdir(adv_path): if 'train.' in data: data_path['train'].append(os.path.join(adv_path,data)) elif 'test.' in data: data_path['test'].append(os.path.join(adv_path,data)) elif 'valid.' in data: data_path['valid'].append(os.path.join(adv_path,data)) return data_path