| import argparse |
| import numpy as np |
| import csv |
| from copy import deepcopy |
| from sklearn.metrics import matthews_corrcoef, confusion_matrix, f1_score |
|
|
| def generate_pred(predict_results, i, slide, metric="max"): |
|
|
| results = predict_results[i*3:(i+1)*3] |
|
|
| if metric == "max": |
| pred = max(results) |
| elif metric == "mean": |
| pred = np.mean(results) |
| elif metric == "second-max": |
| pred = np.sort(results)[-2] |
| else: |
| pass |
|
|
| return pred |
|
|
| def Compute_scan(args): |
| predict_results = np.load(args.pred_path) |
| labels = np.load(args.label_path) |
| labels = list(labels.astype(int)) |
| |
| results = [] |
| for i in range(len(labels)): |
| pred = generate_pred(predict_results, i, args.slide, args.metric) |
| |
| if pred >= args.bound: |
| results.append(1) |
| else: |
| results.append(0) |
| a = set(results) |
| b = set(labels) |
| f1 = f1_score(y_true=labels, y_pred=results) |
| mcc = matthews_corrcoef(labels, results) |
| tn, fp, fn, tp = confusion_matrix(labels, results).ravel() |
|
|
| count = 0 |
| for i in range(len(results)): |
| if results[i] == labels[i]: |
| count+=1 |
|
|
| print("number of examples: " + str(len(labels))) |
| print("number of positive examples: " + str(sum(labels))) |
| print("number of negative examples: " + str(len(labels)-sum(labels))) |
| print("f1: ", str(f1)) |
| print("mcc: " + str(mcc)) |
| print("accuracy: " + str(float(count)/len(results))) |
| print("tn:" + str(tn)) |
| print("fp:" + str(fp)) |
| print("fn:" + str(fn)) |
| print("tp:" + str(tp)) |
|
|
|
|
| def Compute_mouse(args): |
| result_file = open(args.pred_path, "r") |
| results = result_file.readlines() |
| print(len(results)) |
|
|
| all_preds = [] |
| current_preds = [] |
| for result in results: |
| scores = result.split() |
| scores = [scores[0], float(scores[1]), float(scores[2]), float(scores[3]), float(scores[4]), float(scores[5]), float(scores[6]), float(scores[7])] |
| if current_preds == [] or scores[0] == current_preds[0][0]: |
| current_preds.append(scores) |
| else: |
| all_preds.append(current_preds) |
| current_preds = [] |
| current_preds.append(scores) |
| all_preds.append(current_preds) |
| |
| print("Number of task: %d" % len(all_preds)) |
|
|
| def get_acc(val): |
| return val[1] |
|
|
| def get_auc(val): |
| return val[2] |
|
|
| tasks = [] |
| acc = [] |
| auc = [] |
| aupr = [] |
| f1 = [] |
| mcc = [] |
| precision = [] |
| recall = [] |
|
|
| for pred in all_preds: |
| if len(pred) < 10 : |
| print("Short %s : %d" % (pred[0][0], len(pred))) |
|
|
| if args.index == "acc": |
| pred.sort(key=get_acc) |
| elif args.index == "auc": |
| pred.sort(key=get_auc) |
| else: |
| raise ValueError() |
| |
| BEST = -1 |
| for i in range(len(pred)): |
| if pred[i][1] == pred[-1][1] and pred[i][2] > pred[-1][2]: |
| BEST = deepcopy(i) |
| tasks.append(pred[0][0]) |
|
|
| best_pred = pred[BEST] |
| acc.append(best_pred[1]) |
| auc.append(best_pred[2]) |
| aupr.append(best_pred[3]) |
| f1.append(best_pred[4]) |
| mcc.append(best_pred[5]) |
| precision.append(best_pred[6]) |
| recall.append(best_pred[7]) |
|
|
| acc_ave = np.mean(acc) |
| auc_ave = np.mean(auc) |
| aupr_ave = np.mean(aupr) |
| f1_ave = np.mean(f1) |
| mcc_ave = np.mean(mcc) |
| precision_ave = np.mean(precision) |
| recall_ave = np.mean(recall) |
|
|
|
|
| print("acc: " + str(acc_ave)) |
| print("auc: " + str(auc_ave)) |
| print("aupr: " + str(aupr_ave)) |
| print("f1: ", str(f1_ave)) |
| print("mcc: " + str(mcc_ave)) |
| print("precision: ", str(precision_ave)) |
| print("recall: " + str(recall_ave)) |
|
|
| |
| ranks = np.argsort(auc)[:args.num_worst] |
| print("Top %d worst tasks: " % (args.num_worst)) |
| for i in ranks: |
| print(tasks[i] + " %3f %3f" % (acc[i], auc[i])) |
|
|
|
|
|
|
|
|
| def Compute_690(args): |
| result_file = open(args.pred_path, "r") |
| results = result_file.readlines() |
|
|
| preds = [] |
|
|
| for result in results: |
| scores = result.split() |
| preds.append([scores[0], float(scores[1]), float(scores[2]), float(scores[4]), float(scores[5])]) |
| |
| num_results = args.num_results |
|
|
| num_example = int(len(preds)/num_results) |
| print("Num of tasks: %d" % num_example) |
|
|
| def get_acc(val): |
| return val[1] |
|
|
| def get_auc(val): |
| return val[2] |
| |
| def get_f1(val): |
| return val[3] |
|
|
| def get_mcc(val): |
| return val[4] |
| |
| tasks = [] |
| acc = [] |
| auc = [] |
| f1 = [] |
| mcc = [] |
|
|
| for i in range(num_example): |
| tasks.append(preds[i*num_results][0]) |
|
|
| current_preds = preds[i*num_results:(i+1)*num_results] |
| if args.index == "acc": |
| current_preds.sort(key=get_acc) |
| elif args.index == "auc": |
| current_preds.sort(key=get_auc) |
| elif args.index == "f1": |
| current_preds.sort(key=get_f1) |
| elif args.index == "mcc": |
| current_preds.sort(key=get_mcc) |
| else: |
| raise ValueError() |
| best_pred = current_preds[-1] |
| acc.append(best_pred[1]) |
| auc.append(best_pred[2]) |
| f1.append(best_pred[3]) |
| mcc.append(best_pred[4]) |
|
|
| |
| acc_ave = np.mean(acc) |
| auc_ave = np.mean(auc) |
| f1_ave = np.mean(f1) |
| mcc_ave = np.mean(mcc) |
|
|
|
|
| print("acc: " + str(acc_ave)) |
| print("auc: " + str(auc_ave)) |
| print("f1: ", str(f1_ave)) |
| print("mcc: " + str(mcc_ave)) |
|
|
| |
| ranks = np.argsort(auc)[:args.num_worst] |
| print("Top %d worst tasks: " % (args.num_worst)) |
| for i in ranks: |
| print(tasks[i] + " %3f %3f" % (acc[i], auc[i])) |
| |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--bound", |
| default=0.5, |
| type=float, |
| help="K-mer", |
| ) |
| parser.add_argument( |
| "--pred_path", |
| default=None, |
| type=str, |
| help="The path of the predicted result", |
| ) |
| parser.add_argument( |
| "--label_path", |
| default=None, |
| type=str, |
| help="The path of the label", |
| ) |
| parser.add_argument( |
| "--metric", |
| default="max", |
| type=str, |
| help="The metric of computing predited result (scan)", |
| ) |
| parser.add_argument( |
| "--slide", |
| default=3, |
| type=int, |
| help="How many 500s to use for the predictes result of 1000 (scan)", |
| ) |
| parser.add_argument( |
| "--task", |
| default="scan", |
| type=str, |
| help="Which task to compute result", |
| ) |
| parser.add_argument( |
| "--index", |
| default="acc", |
| type=str, |
| help="Which index to sort result (690)", |
| ) |
| parser.add_argument( |
| "--num_results", |
| default="10", |
| type=int, |
| help="Number of results for each task (690)", |
| ) |
| parser.add_argument( |
| "--num_worst", |
| default="10", |
| type=int, |
| help="Number of worst tasks to print out (690)", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| if args.task == "scan": |
| Compute_scan(args) |
| elif args.task == "690": |
| Compute_690(args) |
| elif args.task == "mouse": |
| Compute_mouse(args) |
| else: |
| raise ValueError() |
|
|
| |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|