MVP / mvp /utils /eval.py
yzhouchen001's picture
model code
d9df210
from MassSpecGym.massspecgym.utils import MyopicMCES
import numpy as np
import tqdm
from multiprocessing import Pool
import os
import pandas as pd
class Compute_Myopic_MCES:
mces_compute = MyopicMCES()
def compute_mces(tar_cand):
target, cand = tar_cand
dist = Compute_Myopic_MCES.mces_compute(target, cand)
return (tar_cand, dist)
def compute_mces_parallel(target_cand_list, n_processes=25):
with Pool(processes=n_processes) as pool:
results = list(tqdm.tqdm(pool.imap(Compute_Myopic_MCES.compute_mces, target_cand_list), total=len(target_cand_list)))
return results
class Compute_Myopic_MCES_timeout:
mces_compute = MyopicMCES()
@staticmethod
def compute_mces(tar_cand):
target, cand = tar_cand
dist = Compute_Myopic_MCES.mces_compute(target, cand)
return (tar_cand, dist)
@staticmethod
def compute_mces_parallel(target_cand_list, n_processes=35, timeout=60): # timeout in seconds
results = []
with Pool(processes=n_processes) as pool:
async_results = [
pool.apply_async(Compute_Myopic_MCES.compute_mces, args=(tar_cand,))
for tar_cand in target_cand_list
]
for async_res in tqdm.tqdm(async_results, total=len(target_cand_list)):
try:
result = async_res.get(timeout=timeout)
except Exception as e:
# You can log the error or return a default value
result = (None, f"Timeout or error")
results.append(result)
return results
def get_result_files(exp_dir, spec_type, views_type):
files = os.listdir(exp_dir)
mass_result = ''
form_result = ''
for f in files:
try:
_, s, views = f.split('_')
except:
continue
if s == spec_type and views == views_type:
print(exp_dir / f)
files = os.listdir(exp_dir / f)
for fr in files:
if 'mass_result' in fr:
mass_result = exp_dir / f / fr
elif 'result' in fr:
form_result = exp_dir / f/ fr
return mass_result, form_result
# get target
def get_target(candidates, labels):
return np.array(candidates)[labels][0]
# get mol rank at 1
def get_top_cand(candidates, scores):
return candidates[np.argmax(scores)]
# split into hit rates
def convert_rank_to_hit_rates(row, rank_col ,top_k=[1,5,20]):
top_k_hits ={}
rank = row[rank_col]
for k in top_k:
if rank <= k:
top_k_hits[f'{rank_col}-hit_rate@{k}'] = 1
else:
top_k_hits[f'{rank_col}-hit_rate@{k}'] = 0
return pd.Series(top_k_hits)
#################### Rank aggregation #######################
from collections import defaultdict
import numpy as np
from scipy.stats import rankdata
def borda_count(candidates, score_lists, target):
scores = defaultdict(int)
N = len(candidates)
for score_list in score_lists:
ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True)
for rank, (mol, _) in enumerate(ranked_list, start=1):
scores[mol] += N - rank + 1
ranked_candidates = [mol for mol, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True)]
return ranked_candidates.index(target) + 1 if target in ranked_candidates else None
def average_rank(candidates, score_lists, target):
rank_sums = defaultdict(list)
for score_list in score_lists:
ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True)
for rank, (mol, _) in enumerate(ranked_list, start=1):
rank_sums[mol].append(rank)
avg_ranks = {mol: np.mean(ranks) for mol, ranks in rank_sums.items()}
ranked_candidates = [mol for mol, _ in sorted(avg_ranks.items(), key=lambda x: x[1])]
return ranked_candidates.index(target) + 1 if target in ranked_candidates else None
def reciprocal_rank_aggregation(candidates, score_lists, target):
scores = defaultdict(float)
for score_list in score_lists:
ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True)
for rank, (mol, _) in enumerate(ranked_list, start=1):
scores[mol] += 1 / rank
ranked_candidates = [mol for mol, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True)]
return ranked_candidates.index(target) + 1 if target in ranked_candidates else None
def weighted_voting(candidates, score_lists, weights, target):
scores = defaultdict(float)
for weight, score_list in zip(weights, score_lists):
ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True)
for rank, (mol, _) in enumerate(ranked_list, start=1):
scores[mol] += weight / rank
ranked_candidates = [mol for mol, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True)]
return ranked_candidates.index(target) + 1 if target in ranked_candidates else None
def median_rank(candidates, score_lists, target):
rank_sums = defaultdict(list)
for score_list in score_lists:
ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True)
for rank, (mol, _) in enumerate(ranked_list, start=1):
rank_sums[mol].append(rank)
median_ranks = {mol: np.median(ranks) for mol, ranks in rank_sums.items()}
ranked_candidates = [mol for mol, _ in sorted(median_ranks.items(), key=lambda x: x[1])]
return ranked_candidates.index(target) + 1 if target in ranked_candidates else None
def score_based_aggregation(candidates, score_lists, target):
scores = defaultdict(list)
for score_list in score_lists:
for mol, score in zip(candidates, score_list):
scores[mol].append(score)
avg_scores = {mol: np.mean(vals) for mol, vals in scores.items()}
ranked_candidates = [mol for mol, _ in sorted(avg_scores.items(), key=lambda x: x[1], reverse=True)]
return ranked_candidates.index(target) + 1 if target in ranked_candidates else None