MAS-AI-0000's picture
Upload 3 files
2d84a53 verified
import hashlib
import os
import pickle
import numpy as np
from sklearn.metrics import precision_recall_curve, auc, roc_auc_score,roc_curve
def stable_long_hash(input_string):
hash_object = hashlib.sha256(input_string.encode())
hex_digest = hash_object.hexdigest()
int_hash = int(hex_digest, 16)
long_long_hash = (int_hash & ((1 << 63) - 1))
return long_long_hash
def load_pkl(path):
with open(path, 'rb') as f:
return pickle.load(f)
def save_pkl(obj, path):
with open(path, 'wb') as f:
pickle.dump(obj, f)
def find_top_n(embeddings,n,index,data):
if len(embeddings.shape) == 1:
embeddings = embeddings.reshape(1, -1)
top_ids_and_scores = index.search_knn(embeddings, n)
data_ans=[]
for i, (ids, scores) in enumerate(top_ids_and_scores):
data_now=[]
for id in ids:
data_now.append((data[0][int(id)],data[1][int(id)],data[2][int(id)]))
data_ans.append(data_now)
return data_ans
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
def print_line(class_name, metrics, is_header=False):
if is_header:
line = f"| {'Class':<10} | " + " | ".join([f"{metric:<10}" for metric in metrics])
else:
line = f"| {class_name:<10} | " + " | ".join([f"{metrics[metric]:<10.3f}" for metric in metrics])
print(line)
if is_header:
print('-' * len(line))
def calculate_per_class_metrics(classes, ground_truth, predictions):
# Convert ground truth and predictions to numeric format
gt_numeric = np.array([int(gt) for gt in ground_truth])
pred_numeric = np.array([int(pred) for pred in predictions])
results = {}
for i, class_name in enumerate(classes):
# For each class, calculate the 'vs rest' binary labels
gt_binary = (gt_numeric == i).astype(int)
pred_binary = (pred_numeric == i).astype(int)
# Calculate metrics, handling cases where a class is not present in predictions or ground truth
precision = precision_score(gt_binary, pred_binary, zero_division=0)
recall = recall_score(gt_binary, pred_binary, zero_division=0)
f1 = f1_score(gt_binary, pred_binary, zero_division=0)
acc = np.mean(gt_binary == pred_binary)
# Calculate recall for all other classes as 'rest'
rest_recall = recall_score(1 - gt_binary, 1 - pred_binary, zero_division=0)
results[class_name] = {
'Precision': precision,
'Recall': recall,
'F1 Score': f1,
'Accuracy': acc,
'Avg Recall (with rest)': (recall + rest_recall) / 2
}
print_line("Metric", results[classes[0]], is_header=True)
for class_name, metrics in results.items():
print_line(class_name, metrics)
overall_metrics = {metric_name: np.mean([metrics[metric_name] for metrics in results.values()]) for metric_name in results[classes[0]].keys()}
print_line("Overall", overall_metrics)
def calculate_metrics(labels, preds):
acc = accuracy_score(labels, preds)
precision = precision_score(labels, preds, average='macro')
recall = recall_score(labels, preds, average='macro')
f1 = f1_score(labels, preds, average='macro')
return acc, precision, recall, f1
def compute_three_recalls(labels, preds):
all_n, all_p, tn, tp = 0, 0, 0, 0
for label, pred in zip(labels, preds):
if label == '0':
all_p += 1
if label == '1':
all_n += 1
# Modified condition to treat None in preds as incorrect prediction
if pred is not None and label == pred == '0':
tp += 1
# Modified condition to treat None in preds as incorrect prediction
if pred is not None and label == pred == '1':
tn += 1
if pred is None:
continue
machine_rec , human_rec= tp * 100 / all_p if all_p != 0 else 0, tn * 100 / all_n if all_n != 0 else 0
avg_rec = (human_rec + machine_rec) / 2
return (human_rec, machine_rec, avg_rec)
def compute_metrics(labels, preds,ids=None):
# Handling None values in preds as incorrect predictions
#preds = ['0' if pred is None else pred for pred in preds]
if ids is not None:
# Deduplicate labels and predictions for repeated ids
dict_labels,dict_preds={},{}
for i in range(len(ids)):
dict_labels[ids[i]]=labels[i]
dict_preds[ids[i]]=preds[i]
labels=list(dict_labels.values())
preds=list(dict_preds.values())
human_rec, machine_rec, avg_rec = compute_three_recalls(labels, preds)
acc = accuracy_score(labels, preds)
precision = precision_score(labels, preds, pos_label='1')
recall = recall_score(labels, preds, pos_label='1')
f1 = f1_score(labels, preds, pos_label='1')
# return human_rec, machine_rec, avg_rec
return (human_rec, machine_rec, avg_rec, acc, precision, recall, f1)
def evaluate_max_f1_metrics(test_labels, y_score):
test_labels = np.array(test_labels)
y_score = np.array(y_score)
auroc = roc_auc_score(test_labels, y_score)
precision, recall, thresholds = precision_recall_curve(test_labels, y_score, pos_label=1)
pr_auc = auc(recall, precision)
epsilon = 1e-6
f1_scores = 2 * precision * recall / (precision + recall+epsilon)
best_index = f1_scores.argmax()
best_f1 = f1_scores[best_index]
best_precision = precision[best_index]
best_recall = recall[best_index]
threshold = thresholds[best_index] if best_index < len(thresholds) else 1.0
y_pred_max_f1 = (y_score >= threshold).astype(int)
acc = (y_pred_max_f1 == test_labels).mean()
tp = sum((y_pred_max_f1 == 1) & (test_labels == 1))
fn = sum((y_pred_max_f1 == 0) & (test_labels == 1))
fp = sum((y_pred_max_f1 == 1) & (test_labels == 0))
tn = sum((y_pred_max_f1 == 0) & (test_labels == 0))
pos_recall = tp / (tp + fn + epsilon) # recall for the positive class
neg_recall = tn / (tn + fp + epsilon) # recall for the negative class
avg_recall = (pos_recall + neg_recall) / 2 # average recall across classes
metric = {'auroc': auroc, 'pr_auc': pr_auc, 'F1': best_f1, 'Precision': best_precision,\
'Recall': best_recall, 'threshold': threshold, 'acc': acc, 'avg_recall': avg_recall,\
'pos_recall': pos_recall, 'neg_recall': neg_recall}
return metric
def evaluate_metrics(test_labels, y_score, threshold_param=-1,target_fpr = 0.05):
if isinstance(test_labels, list):
test_labels = np.array(test_labels)
if isinstance(y_score, list):
y_score = np.array(y_score)
if threshold_param != -1:
if not (0 <= threshold_param <= 1):
raise ValueError("Threshold must be between 0 and 1.")
auroc = roc_auc_score(test_labels, y_score)
precision, recall, thresholds = precision_recall_curve(test_labels, y_score, pos_label=1)
pr_auc = auc(recall, precision)
epsilon = 1e-6
f1_scores = 2 * precision * recall / (precision + recall + epsilon)
if threshold_param == -1:
best_index = f1_scores.argmax()
F1 = f1_scores[best_index]
Precision = precision[best_index]
Recall = recall[best_index]
threshold = thresholds[best_index] if best_index < len(thresholds) else 1.0
else:
threshold = threshold_param
index = np.where(thresholds >= threshold)[0][0]
Precision = precision[index]
Recall = recall[index]
F1 = f1_scores[index]
y_pred = (y_score >= threshold).astype(int)
acc = (y_pred == test_labels).mean()
tp = ((y_pred == 1) & (test_labels == 1)).sum()
fn = ((y_pred == 0) & (test_labels == 1)).sum()
fp = ((y_pred == 1) & (test_labels == 0)).sum()
tn = ((y_pred == 0) & (test_labels == 0)).sum()
pos_recall = tp / (tp + fn + epsilon) # TPR
neg_recall = tn / (tn + fp + epsilon) # TNR
avg_recall = (pos_recall + neg_recall) / 2
fpr, tpr, thds = roc_curve(test_labels, y_score)
if len(fpr) > 0 and len(tpr) > 0:
idx = np.argmin(np.abs(fpr - target_fpr))
tpr_at_fpr = tpr[idx]
tpr_at_fpr_threshold = thds[idx]
else:
tpr_at_fpr = 0.0
metric = {'auroc': auroc, 'pr_auc': pr_auc, 'F1': F1, 'Precision': Precision,'Recall': Recall,\
'threshold': threshold, 'acc': acc, 'avg_recall': avg_recall,'pos_recall': pos_recall,\
'neg_recall': neg_recall, 'tpr_at_fpr': tpr_at_fpr, 'tpr_at_fpr_threshold': tpr_at_fpr_threshold}
return metric
# return (auroc, pr_auc, best_f1, best_precision, best_recall, threshold,
# acc, avg_recall, pos_recall, neg_recall, tpr_at_fpr5)
def load_datapath(path,include_adversarial=False,dataset_name='all',attack_type='all'):
data_path = {'train':[],'valid':[],'test':[]}
if dataset_name=='all':
datasets = os.listdir(path)
elif dataset_name=='M4':
datasets = ['M4_monolingual','M4_multilingual']
elif dataset_name=='RAID_all':
datasets = ['RAID','RAID_extra']
else:
datasets = [dataset_name]
for dataset in datasets:
dataset_path = os.path.join(path,dataset)
if attack_type!='all':
dataset_path_list = [pth for pth in os.listdir(dataset_path) if attack_type in pth]
else:
dataset_path_list = os.listdir(dataset_path)
for adv in dataset_path_list:
if include_adversarial==False and 'no_attack' not in adv:
continue
adv_path = os.path.join(dataset_path,adv)
for data in os.listdir(adv_path):
if 'train.' in data:
data_path['train'].append(os.path.join(adv_path,data))
elif 'test.' in data:
data_path['test'].append(os.path.join(adv_path,data))
elif 'valid.' in data:
data_path['valid'].append(os.path.join(adv_path,data))
return data_path