File size: 10,256 Bytes
2d84a53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import hashlib
import os
import pickle
import numpy as np
from sklearn.metrics import precision_recall_curve, auc, roc_auc_score,roc_curve

def stable_long_hash(input_string):
    hash_object = hashlib.sha256(input_string.encode())
    hex_digest = hash_object.hexdigest()
    int_hash = int(hex_digest, 16)
    long_long_hash = (int_hash & ((1 << 63) - 1))
    return long_long_hash

def load_pkl(path):
    with open(path, 'rb') as f:
        return pickle.load(f)


def save_pkl(obj, path):
    with open(path, 'wb') as f:
        pickle.dump(obj, f)



def find_top_n(embeddings,n,index,data):
    if len(embeddings.shape) == 1:
        embeddings = embeddings.reshape(1, -1)
    top_ids_and_scores = index.search_knn(embeddings, n)
    data_ans=[]
    for i, (ids, scores) in enumerate(top_ids_and_scores):
        data_now=[]
        for id in ids:
            data_now.append((data[0][int(id)],data[1][int(id)],data[2][int(id)]))
        data_ans.append(data_now)
    return data_ans


    
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def print_line(class_name, metrics, is_header=False):
    if is_header:
        line = f"| {'Class':<10} | " + " | ".join([f"{metric:<10}" for metric in metrics])
    else:
        line = f"| {class_name:<10} | " + " | ".join([f"{metrics[metric]:<10.3f}" for metric in metrics])
    print(line)
    if is_header:
        print('-' * len(line))

def calculate_per_class_metrics(classes, ground_truth, predictions):
    # Convert ground truth and predictions to numeric format
    gt_numeric = np.array([int(gt) for gt in ground_truth])
    pred_numeric = np.array([int(pred) for pred in predictions])

    results = {}
    for i, class_name in enumerate(classes):
        # For each class, calculate the 'vs rest' binary labels
        gt_binary = (gt_numeric == i).astype(int)
        pred_binary = (pred_numeric == i).astype(int)

        # Calculate metrics, handling cases where a class is not present in predictions or ground truth
        precision = precision_score(gt_binary, pred_binary, zero_division=0)
        recall = recall_score(gt_binary, pred_binary, zero_division=0)
        f1 = f1_score(gt_binary, pred_binary, zero_division=0)
        acc = np.mean(gt_binary == pred_binary)
        # Calculate recall for all other classes as 'rest'
        rest_recall = recall_score(1 - gt_binary, 1 - pred_binary, zero_division=0)

        results[class_name] = {
            'Precision': precision,
            'Recall': recall,
            'F1 Score': f1,
            'Accuracy': acc,
            'Avg Recall (with rest)': (recall + rest_recall) / 2
        }

    print_line("Metric", results[classes[0]], is_header=True)
    for class_name, metrics in results.items():
        print_line(class_name, metrics)
    overall_metrics = {metric_name: np.mean([metrics[metric_name] for metrics in results.values()]) for metric_name in results[classes[0]].keys()}
    print_line("Overall", overall_metrics)

def calculate_metrics(labels, preds):
    acc = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='macro')
    recall = recall_score(labels, preds, average='macro')
    f1 = f1_score(labels, preds, average='macro')
    return acc, precision, recall, f1

def compute_three_recalls(labels, preds):
    all_n, all_p, tn, tp = 0, 0, 0, 0
    for label, pred in zip(labels, preds):
        if label == '0':
            all_p += 1
        if label == '1':
            all_n += 1
        # Modified condition to treat None in preds as incorrect prediction
        if pred is not None and label == pred == '0':
            tp += 1 
        # Modified condition to treat None in preds as incorrect prediction
        if pred is not None and label == pred == '1':
            tn += 1
        if pred is None:
            continue
    machine_rec , human_rec= tp * 100 / all_p if all_p != 0 else 0, tn * 100 / all_n if all_n != 0 else 0
    avg_rec = (human_rec + machine_rec) / 2
    return (human_rec, machine_rec, avg_rec)


def compute_metrics(labels, preds,ids=None):
    # Handling None values in preds as incorrect predictions
    #preds = ['0' if pred is None else pred for pred in preds]
    if ids is not None:
        # Deduplicate labels and predictions for repeated ids
        dict_labels,dict_preds={},{}
        for i in range(len(ids)):
            dict_labels[ids[i]]=labels[i]
            dict_preds[ids[i]]=preds[i] 
        labels=list(dict_labels.values())
        preds=list(dict_preds.values())
    
    human_rec, machine_rec, avg_rec = compute_three_recalls(labels, preds)
    acc = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, pos_label='1')
    recall = recall_score(labels, preds, pos_label='1')
    f1 = f1_score(labels, preds, pos_label='1')
    # return human_rec, machine_rec, avg_rec
    return (human_rec, machine_rec, avg_rec, acc, precision, recall, f1)

def evaluate_max_f1_metrics(test_labels, y_score):
    test_labels = np.array(test_labels)
    y_score = np.array(y_score)

    auroc = roc_auc_score(test_labels, y_score)
    precision, recall, thresholds = precision_recall_curve(test_labels, y_score, pos_label=1)
    pr_auc = auc(recall, precision)
    epsilon = 1e-6
    f1_scores = 2 * precision * recall / (precision + recall+epsilon)
    best_index = f1_scores.argmax()
    best_f1 = f1_scores[best_index]
    best_precision = precision[best_index]
    best_recall = recall[best_index]
    
    threshold = thresholds[best_index] if best_index < len(thresholds) else 1.0
    y_pred_max_f1 = (y_score >= threshold).astype(int)
    
    acc = (y_pred_max_f1 == test_labels).mean()
    tp = sum((y_pred_max_f1 == 1) & (test_labels == 1))
    fn = sum((y_pred_max_f1 == 0) & (test_labels == 1))
    fp = sum((y_pred_max_f1 == 1) & (test_labels == 0))
    tn = sum((y_pred_max_f1 == 0) & (test_labels == 0))

    pos_recall = tp / (tp + fn + epsilon)  # recall for the positive class
    neg_recall = tn / (tn + fp + epsilon)  # recall for the negative class
    avg_recall = (pos_recall + neg_recall) / 2  # average recall across classes
    
    metric = {'auroc': auroc, 'pr_auc': pr_auc, 'F1': best_f1, 'Precision': best_precision,\
               'Recall': best_recall, 'threshold': threshold, 'acc': acc, 'avg_recall': avg_recall,\
                  'pos_recall': pos_recall, 'neg_recall': neg_recall}
    return metric

def evaluate_metrics(test_labels, y_score, threshold_param=-1,target_fpr = 0.05):
    if isinstance(test_labels, list):
        test_labels = np.array(test_labels)
    if isinstance(y_score, list):
        y_score = np.array(y_score)

    if threshold_param != -1:
        if not (0 <= threshold_param <= 1):
            raise ValueError("Threshold must be between 0 and 1.")

    auroc = roc_auc_score(test_labels, y_score)

    precision, recall, thresholds = precision_recall_curve(test_labels, y_score, pos_label=1)
    pr_auc = auc(recall, precision)

    epsilon = 1e-6
    f1_scores = 2 * precision * recall / (precision + recall + epsilon)
    

    if threshold_param == -1:
        best_index = f1_scores.argmax()
        F1 = f1_scores[best_index]
        Precision = precision[best_index]
        Recall = recall[best_index]
        threshold = thresholds[best_index] if best_index < len(thresholds) else 1.0
    else:
        threshold = threshold_param
        index = np.where(thresholds >= threshold)[0][0]
        Precision = precision[index]
        Recall = recall[index]
        F1 = f1_scores[index]


    y_pred = (y_score >= threshold).astype(int)
    acc = (y_pred == test_labels).mean()
    
    tp = ((y_pred == 1) & (test_labels == 1)).sum()
    fn = ((y_pred == 0) & (test_labels == 1)).sum()
    fp = ((y_pred == 1) & (test_labels == 0)).sum()
    tn = ((y_pred == 0) & (test_labels == 0)).sum()

    pos_recall = tp / (tp + fn + epsilon)  # TPR
    neg_recall = tn / (tn + fp + epsilon)  # TNR
    avg_recall = (pos_recall + neg_recall) / 2

    fpr, tpr, thds = roc_curve(test_labels, y_score)
    if len(fpr) > 0 and len(tpr) > 0:
        idx = np.argmin(np.abs(fpr - target_fpr))
        tpr_at_fpr = tpr[idx]
        tpr_at_fpr_threshold = thds[idx]
    else:
        tpr_at_fpr = 0.0

    metric = {'auroc': auroc, 'pr_auc': pr_auc, 'F1': F1, 'Precision': Precision,'Recall': Recall,\
               'threshold': threshold, 'acc': acc, 'avg_recall': avg_recall,'pos_recall': pos_recall,\
                  'neg_recall': neg_recall, 'tpr_at_fpr': tpr_at_fpr, 'tpr_at_fpr_threshold': tpr_at_fpr_threshold}
    
    return metric
    # return (auroc, pr_auc, best_f1, best_precision, best_recall, threshold,
    #         acc, avg_recall, pos_recall, neg_recall, tpr_at_fpr5)


def load_datapath(path,include_adversarial=False,dataset_name='all',attack_type='all'):
    data_path = {'train':[],'valid':[],'test':[]}
    if dataset_name=='all':
        datasets = os.listdir(path)
    elif dataset_name=='M4':
        datasets = ['M4_monolingual','M4_multilingual']
    elif dataset_name=='RAID_all':
        datasets = ['RAID','RAID_extra']
    else:
        datasets = [dataset_name]
    for dataset in datasets:
        dataset_path = os.path.join(path,dataset)
        if attack_type!='all':
            dataset_path_list = [pth for pth in os.listdir(dataset_path) if attack_type in pth]
        else:
            dataset_path_list = os.listdir(dataset_path)
        for adv in dataset_path_list:
            if include_adversarial==False and 'no_attack' not in adv:
                continue
            adv_path = os.path.join(dataset_path,adv)
            for data in os.listdir(adv_path):
                if 'train.' in data:
                    data_path['train'].append(os.path.join(adv_path,data))
                elif 'test.' in data:
                    data_path['test'].append(os.path.join(adv_path,data))
                elif 'valid.' in data:
                    data_path['valid'].append(os.path.join(adv_path,data))
    return data_path