Spaces:
Running
Running
| """ | |
| Script for an iterative scheme. | |
| Assumptions: | |
| - complete pariwise comparisons available, i.e. evaluations are cheap | |
| - | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from tqdm import tqdm | |
| from selfrank.algos.metrics import mapk, rank_biased_overlap | |
| from selfrank.algos.plots import plot_ranks | |
| import logging | |
| from typing import List, Callable, Optional | |
| import random | |
| logger = logging.getLogger(__name__) | |
| tol = 0.001 | |
| class LLM_Model: | |
| def __init__(self, model_name, all_model_data): | |
| self.model_name = model_name | |
| def name(self): | |
| return self.model_name | |
| def __eq__(self, other): | |
| return self.name() == other.name() | |
| def __lt__(self, other): | |
| return self.name() < other.name() | |
| class SelfRankGreedy: | |
| def __init__(self, MODELS: List, evaluator: Callable, true_ranking: Optional[List]=None, show_progress: Optional[bool]=False): | |
| self.MODELS = MODELS | |
| self.N = len(MODELS) | |
| self.evaluate = evaluator | |
| self.true_ranking = true_ranking | |
| self.show_progress = show_progress | |
| self.df = None | |
| self.DEBUG = False | |
| self.model_eval = None | |
| self.cnt=0 | |
| def getEvaluation(self, a, b , c, df, eval_arr, modelsList): | |
| ''' | |
| model c in is evaluating a and b | |
| It check in eval_arr is already evaluated; if not, evaluates and stores | |
| ''' | |
| idx_a = modelsList.index(a) | |
| idx_b = modelsList.index(b) | |
| idx_c = modelsList.index(c) | |
| val = eval_arr[idx_c, idx_a, idx_b] # stores c evaluating a to b | |
| if val > -1: | |
| return val | |
| else: | |
| val = self.evaluate(a, b, c, df) | |
| eval_arr[idx_c, idx_a, idx_b] = val | |
| eval_arr[idx_c, idx_b, idx_a] = 1 - val | |
| return val | |
| def __evaluateModelTriplet(self, df, triplet, eval_arr, modelsList): | |
| model1 = triplet[0] | |
| model2 = triplet[1] | |
| model3 = triplet[2] | |
| res = np.array([0, 0, 0]) | |
| m1_cmp_2_3 = self.getEvaluation(a=model2.name(), b=model3.name(), c=model1.name(), df=df, eval_arr=eval_arr, modelsList=modelsList) #model1.compareModels(model2, model3) | |
| m2_cmp_1_3 = self.getEvaluation(a=model1.name(), b=model3.name(), c=model2.name(), df=df, eval_arr=eval_arr, modelsList=modelsList) #model2.compareModels(model1, model3) | |
| m3_cmp_1_2 = self.getEvaluation(a=model1.name(), b=model2.name(), c=model3.name(), df=df, eval_arr=eval_arr, modelsList=modelsList) #model3.compareModels(model1, model2) | |
| if m1_cmp_2_3 >= 0.5: | |
| res[1]+=1 | |
| else: | |
| res[2]+=1 | |
| if m2_cmp_1_3 >= 0.5: | |
| res[0]+=1 | |
| else: | |
| res[2]+=1 | |
| if m3_cmp_1_2 >= 0.5: | |
| res[0]+=1 | |
| else: | |
| res[1]+=1 | |
| #print(res) | |
| #print(res.tolist()) | |
| zipped_pairs = zip(res.tolist(), triplet) | |
| z = [(x,y, x.name()) for y, x in sorted(zipped_pairs, reverse=True)] | |
| return z | |
| def __printNames(self, ll): | |
| print([i.name() for i in ll]) | |
| def __evaluateModels(self, df, evaluators, modelsToBeEvaluated, eval_arr, modelsList): | |
| # rewrittten method to allow usage with updated code | |
| # modelsToBeEvaluated can have 2 or 3 models only. evaluators will have only 1 model. Use evaluators to rank and return list of models in modelsToBeEvaluated | |
| if len(evaluators) > 1: | |
| raise Exception | |
| if len(modelsToBeEvaluated) > 3 or len(modelsToBeEvaluated) < 2: | |
| raise Exception | |
| if len(modelsToBeEvaluated) == 2: | |
| r = self.getEvaluation(a=modelsToBeEvaluated[0].name(), b=modelsToBeEvaluated[1].name(), c=evaluators[0].name(), df=df, eval_arr=eval_arr, modelsList=modelsList) | |
| if r >= 0.5: | |
| return [modelsToBeEvaluated[0],modelsToBeEvaluated[1]] | |
| else: | |
| return [modelsToBeEvaluated[1],modelsToBeEvaluated[0]] | |
| if len(modelsToBeEvaluated) == 3: | |
| r01 = self.getEvaluation(a=modelsToBeEvaluated[0].name(), b=modelsToBeEvaluated[1].name(), c=evaluators[0].name(), df=df, eval_arr=eval_arr, modelsList=modelsList) | |
| r12 = self.getEvaluation(a=modelsToBeEvaluated[1].name(), b=modelsToBeEvaluated[2].name(), c=evaluators[0].name(), df=df, eval_arr=eval_arr, modelsList=modelsList) | |
| r02 = self.getEvaluation(a=modelsToBeEvaluated[0].name(), b=modelsToBeEvaluated[2].name(), c=evaluators[0].name(), df=df, eval_arr=eval_arr, modelsList=modelsList) | |
| res = np.array([0, 0, 0]) | |
| if r01 >= 0.5: | |
| res[0]+=1 | |
| else: | |
| res[1]+=1 | |
| if r12 >= 0.5: | |
| res[1]+=1 | |
| else: | |
| res[2]+=1 | |
| if r02 >= 0.5: | |
| res[0]+=1 | |
| else: | |
| res[2]+=1 | |
| zipped_pairs = zip(res.tolist(), modelsToBeEvaluated) | |
| z = [x for y, x in sorted(zipped_pairs, reverse=True)] | |
| return z | |
| def __rankModels(self, df, eval_arr, modelsList, triplet, prev_model_ranking, unrankedModelList, rankedModelList, bottomModelList): | |
| if len(triplet) < 3: | |
| return [], list(triplet), [] | |
| self.cnt = self.cnt + 1 | |
| model_ranking = self.__evaluateModelTriplet(df, triplet, eval_arr, modelsList) | |
| if self.DEBUG: | |
| print("Cnt: ", self.cnt) | |
| print("\n\n\nFIRST") | |
| self.__printNames(triplet) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| print(model_ranking) | |
| print(prev_model_ranking) | |
| print("END FIRST") | |
| first_rank = model_ranking[0][1] | |
| second_rank = model_ranking[1][1] | |
| third_rank = model_ranking[2][1] | |
| if first_rank == 2: # first model is better than the other two | |
| if len(unrankedModelList) == 0 and len(bottomModelList) == 0: # CASE 1 | |
| # no more unranked models left to consider and none in bottomModels, | |
| # so add the models in rank order to rankedModelList | |
| if second_rank == 1 and third_rank == 0: | |
| if self.DEBUG: | |
| print('CASE 1a') | |
| rankedModelList.extend([model_ranking[0][0], model_ranking[1][0], model_ranking[2][0]]) | |
| elif second_rank == 0 and third_rank == 0: | |
| if self.DEBUG: | |
| print('CASE 1b') | |
| rankedModelList.append(model_ranking[0][0]) | |
| #use current best model to rank the bottom 2 and add to rankedList in order | |
| z = self.__evaluateModels(df, [rankedModelList[0]],[model_ranking[1][0], model_ranking[2][0]], eval_arr, modelsList) | |
| rankedModelList.extend(z) | |
| else: | |
| raise Exception("Error: Should not have occurred CASE 1") | |
| if self.DEBUG: | |
| self.__printNames(rankedModelList) | |
| return [], rankedModelList, [] | |
| if len(unrankedModelList) == 0 and len(bottomModelList) == 1: # CASE 2 | |
| # no more unranked models left to consider and only 1 bottomModels in all, | |
| if second_rank == 1 and third_rank == 0: | |
| # so add the models in rank order to rankedModelList | |
| if self.DEBUG: | |
| print('CASE 2a') | |
| rankedModelList.extend([model_ranking[0][0], model_ranking[1][0]]) | |
| #TODO Use top model in rankedModelList to rank the two models below and then add them according to ranking | |
| z = self.__evaluateModels(df, [rankedModelList[0]],[model_ranking[2][0], bottomModelList[0]], eval_arr, modelsList) | |
| rankedModelList.extend(z) | |
| if self.DEBUG: | |
| self.__printNames(rankedModelList) | |
| return [], rankedModelList, [] | |
| elif second_rank == 0 and third_rank == 0: | |
| if self.DEBUG: | |
| print('CASE 2b') | |
| rankedModelList.append(model_ranking[0][0]) | |
| modelsToCompare = [model_ranking[1][0], model_ranking[2][0], bottomModelList[0]] | |
| if self.DEBUG: | |
| self.__printNames(tuple(modelsToCompare)) | |
| self.__printNames(rankedModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, tuple(modelsToCompare), model_ranking, [], rankedModelList, []) | |
| else: | |
| raise Exception("Error: Should not have occurred CASE 2") | |
| if len(unrankedModelList) == 0 and len(bottomModelList) > 1: # CASE 3 | |
| # no more unranked models left to consider but there are at least 2 models in bottomModelList | |
| if second_rank == 1 and third_rank == 0: | |
| if self.DEBUG: | |
| print('CASE 3a') | |
| rankedModelList.extend([model_ranking[0][0], model_ranking[1][0]]) # add top two models to ranked list | |
| bottomModelList.append(model_ranking[2][0]) # add worst model to bottomModelList | |
| elif second_rank == 0 and third_rank == 0: | |
| if self.DEBUG: | |
| print('CASE 3b') | |
| rankedModelList.append(model_ranking[0][0]) # add top model to ranked list | |
| bottomModelList.extend([model_ranking[1][0], model_ranking[2][0]]) # add bottom two model to bottomModelList | |
| else: | |
| raise Exception("Error: Should not have occurred CASE 3") | |
| modelsToCompare = random.sample(bottomModelList, 3) | |
| bottomModelList = [i for i in bottomModelList if i not in modelsToCompare] | |
| if self.DEBUG: | |
| self.__printNames(tuple(modelsToCompare)) | |
| self.__printNames(bottomModelList) | |
| self.__printNames(rankedModelList) | |
| print([]) | |
| return self.__rankModels(df, eval_arr, modelsList, tuple(modelsToCompare), model_ranking, bottomModelList, rankedModelList, []) | |
| # CASE 4 len(unrankedModelList) > 0 | |
| #check the previous model ranking and model ranking. if either first or second ranked model previously is now the bottom ranked model, | |
| # move all bottom to unranked and call with new triple | |
| #if (prev_model_ranking is not None) and ((prev_model_ranking[0][0] == model_ranking[2][0]) or (prev_model_ranking[1][0] == model_ranking[2][0])): | |
| # unrankedModelList.extend(bottomModelList) | |
| # if self.DEBUG: | |
| # print('Case 4a NEW ONE') | |
| # self.__printNames(triplet) | |
| # self.__printNames(unrankedModelList) | |
| # self.__printNames(rankedModelList) | |
| # self.__printNames([]) | |
| # return self.__rankModels(df, (triplet, None, unrankedModelList, rankedModelList, []) | |
| if second_rank == 1 and third_rank == 0: | |
| if self.DEBUG: | |
| print('CASE 4a') | |
| bottomModelList.append(model_ranking[2][0]) # add worst model to bottomModelList | |
| newModel = random.sample(unrankedModelList, 1) | |
| unrankedModelList.remove(newModel[0]) | |
| triplet = (model_ranking[0][0], model_ranking[1][0], newModel[0]) | |
| if self.DEBUG: | |
| self.__printNames(triplet) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, triplet, model_ranking, unrankedModelList, rankedModelList, bottomModelList) | |
| elif second_rank == 0 and third_rank == 0: | |
| # if unrankedModelList has 2 or more elements, put both 2nd and 3rd model into bottom; if unrankedModelList has only one, | |
| # then randomly choose one of the two and put in bottom | |
| if len(unrankedModelList) > 1: | |
| if self.DEBUG: | |
| print('CASE 4b') | |
| bottomModelList.append(model_ranking[2][0]) | |
| bottomModelList.append(model_ranking[1][0]) | |
| newModels = random.sample(unrankedModelList, 2) | |
| triplet = (model_ranking[0][0],) + tuple(newModels) | |
| unrankedModelList.remove(newModels[0]) | |
| unrankedModelList.remove(newModels[1]) | |
| if self.DEBUG: | |
| self.__printNames(triplet) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, triplet, model_ranking, unrankedModelList, rankedModelList, bottomModelList) | |
| else: | |
| if self.DEBUG: | |
| print('CASE 4c') | |
| #200, UR==1 | |
| #add third model to bottom. replace in tuple with one from unranked. and rank | |
| #newModel = random.sample(unrankedModelList, 1) | |
| #unrankedModelList.remove(newModel[0]) | |
| #bottomModelList.append(model_ranking[2][0]) # add third model to bottomModelList | |
| #triplet = (model_ranking[0][0], model_ranking[1][0], newModel[0]) | |
| #add both 0s to bottom. Create tuple with 2, the one from UR and 1 from B. Call self.__rankModels(df, (triple,B,R,[]) | |
| newModel = random.sample(unrankedModelList, 1) | |
| unrankedModelList.remove(newModel[0]) | |
| bottomModelList.append(model_ranking[2][0]) # add third model to bottomModelList | |
| bottomModelList.append(model_ranking[1][0]) # add second model to bottomModelList | |
| newBottomModel = random.sample(bottomModelList, 1) | |
| bottomModelList.remove(newBottomModel[0]) | |
| triplet = (model_ranking[0][0], newModel[0], newBottomModel[0]) | |
| if self.DEBUG: | |
| self.__printNames(triplet) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, triplet, model_ranking, bottomModelList, rankedModelList, []) | |
| else: | |
| raise Exception("Error: Should not have occurred CASE 4") | |
| else: | |
| # some problem with ranking all three models | |
| if len(unrankedModelList) == 0 and len(bottomModelList) == 0: # CASE 1 | |
| #use top model from rankedlist to rank the three and append to ranked list in order | |
| if self.DEBUG: | |
| print('CASE ELSE_1') | |
| z = self.__evaluateModels(df, [rankedModelList[0]], list(triplet), eval_arr, modelsList) | |
| if self.DEBUG: | |
| self.__printNames(z) | |
| rankedModelList.extend(z) | |
| if self.DEBUG: | |
| self.__printNames(rankedModelList) | |
| return [], rankedModelList, [] | |
| if len(unrankedModelList) == 0 and len(bottomModelList) == 1: # CASE 2 | |
| if self.DEBUG: | |
| print('CASE ELSE_2') | |
| #ALTERNATIVE | |
| ##use top model from rankedlist to rank the three and append to ranked list in order; THEN, add the sole model from bottom list | |
| if len(rankedModelList) > 0: | |
| z = self.__evaluateModels(df, [rankedModelList[0]], list(triplet), eval_arr, modelsList) | |
| else: | |
| z = list(triplet) | |
| if self.DEBUG: | |
| self.__printNames(z) | |
| rankedModelList.extend(z) | |
| rankedModelList.append(bottomModelList[0]) | |
| if self.DEBUG: | |
| self.__printNames(rankedModelList) | |
| return [], rankedModelList, [] | |
| if len(unrankedModelList) == 0 and len(bottomModelList) > 1: # CASE 3 | |
| # ranks are 1xx or 000 | |
| if self.DEBUG: | |
| print('CASE ELSE_3') | |
| ##use top model from rankedlist to rank the three and add top 2 to ranked list in order; | |
| if len(rankedModelList) > 0: | |
| z = self.__evaluateModels(df, [rankedModelList[0]], list(triplet), eval_arr, modelsList) | |
| else: | |
| z = list(triplet) | |
| if self.DEBUG: | |
| self.__printNames(z) | |
| rankedModelList.append(z[0]) | |
| rankedModelList.append(z[1]) | |
| bottomModelList.append(z[2]) | |
| #Sample 3 from bottom to create triple. call self.__rankModels(df, (tripler, B, R, []) | |
| newModels = random.sample(bottomModelList, 3) | |
| for mod in newModels: | |
| bottomModelList.remove(mod) | |
| if self.DEBUG: | |
| self.__printNames(tuple(newModels)) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, tuple(newModels), model_ranking, bottomModelList, rankedModelList, []) | |
| # CASE 4 len(unrankedModelList) > 0 | |
| # if the three models are 1,1,1 or 0,0,0 i.e. indistinguishable | |
| #check the previous model ranking and model ranking. if either first or second ranked model previously is now the bottom ranked model, | |
| # move all bottom to unranked and call with new triple | |
| #if (prev_model_ranking is not None) and ((prev_model_ranking[0][0] == model_ranking[2][0]) or (prev_model_ranking[1][0] == model_ranking[2][0])): | |
| # unrankedModelList.extend(bottomModelList) | |
| # if self.DEBUG: | |
| # print('Case ELSE_4 NEW ONE') | |
| # self.__printNames(triplet) | |
| # self.__printNames(unrankedModelList) | |
| # self.__printNames(rankedModelList) | |
| # self.__printNames([]) | |
| # return self.__rankModels(df, (triplet, None, unrankedModelList, rankedModelList, []) | |
| # choose one of the tuple models and add to unrankedlIst. Remove random model from unrankedList and add to tuple. rank again | |
| if first_rank == second_rank and first_rank == third_rank: | |
| if self.DEBUG: | |
| print('CASE ELSE_4a') | |
| ##use top model from rankedlist to rank the three and add third one to Bottomlist ; | |
| ##then create tuple with top 2 and one from unranked | |
| if len(rankedModelList) > 0: | |
| z = self.__evaluateModels(df, [rankedModelList[0]], list(triplet), eval_arr, modelsList) | |
| else: | |
| z = list(triplet) | |
| if self.DEBUG: | |
| print('z: ', z) | |
| self.__printNames(z) | |
| bottomModelList.append(z[2]) | |
| newModel = random.sample(unrankedModelList, 1) | |
| unrankedModelList.remove(newModel[0]) | |
| triplet = (z[0], z[1], newModel[0]) | |
| if self.DEBUG: | |
| print(1) | |
| print('triplet:', triplet) | |
| self.__printNames(triplet) | |
| print(2) | |
| self.__printNames(unrankedModelList) | |
| print(3) | |
| self.__printNames(rankedModelList) | |
| print(4) | |
| self.__printNames(bottomModelList) | |
| print(5) | |
| else: # there are one or two models with 0 | |
| # if only 1, add to bottom and replace with one from unranked | |
| # if two are 0, then both replace with unranked if unranked has more than 1 | |
| # otherwise randomly add one of the 0s to bottom and replace with unranked. | |
| if second_rank == 1: # then only third is 0 | |
| if self.DEBUG: | |
| print('CASE ELSE_4b') | |
| newModel = random.sample(unrankedModelList, 1) | |
| unrankedModelList.remove(newModel[0]) | |
| bottomModelList.append(model_ranking[2][0]) | |
| triplet = (model_ranking[0][0], model_ranking[1][0], newModel[0]) | |
| else: # both second and third are zero | |
| if len(unrankedModelList) > 1: | |
| if self.DEBUG: | |
| print('CASE ELSE_4c') | |
| bottomModelList.append(model_ranking[2][0]) | |
| bottomModelList.append(model_ranking[1][0]) | |
| newModels = random.sample(unrankedModelList, 2) | |
| triplet = (model_ranking[0][0],) + tuple(newModels) | |
| unrankedModelList.remove(newModels[0]) | |
| unrankedModelList.remove(newModels[1]) | |
| else: | |
| if self.DEBUG: | |
| print('CASE ELSE_4d') | |
| #add third model to bottom. replace in tuple with one from unranked. and rank | |
| #newModel = random.sample(unrankedModelList, 1) | |
| #unrankedModelList.remove(newModel[0]) | |
| #bottomModelList.append(model_ranking[2][0]) # add third model to bottomModelList | |
| #triplet = (model_ranking[0][0], model_ranking[1][0], newModel[0]) | |
| # UR==1, 100 | |
| #Add both 0s to Bottom. Create tuple from the 1, one from UR, and one from Bottom | |
| #Call self.__rankModels(df, (triple, B, R, []) | |
| bottomModelList.append(model_ranking[2][0]) | |
| bottomModelList.append(model_ranking[1][0]) | |
| newModels = random.sample(unrankedModelList, 1) | |
| unrankedModelList.remove(newModel[0]) | |
| newBottomModels = random.sample(bottomModelList, 1) | |
| bottomModelList.remove(newBottomModels[0]) | |
| triplet = (model_ranking[0][0], newModels[0], newBottomModels[0]) | |
| if self.DEBUG: | |
| self.__printNames(triplet) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, triplet, model_ranking, bottomModelList, rankedModelList, []) | |
| if self.DEBUG: | |
| self.__printNames(triplet) | |
| self.__printNames(unrankedModelList) | |
| self.__printNames(rankedModelList) | |
| self.__printNames(bottomModelList) | |
| return self.__rankModels(df, eval_arr, modelsList, triplet, model_ranking, unrankedModelList, rankedModelList, bottomModelList) | |
| def __printRanks(self, ll): | |
| print([{i.name(): r} for r,i in enumerate(ll)]) | |
| def __estimate_rankings(self, df, numIter=1, modelSubset=None, numModels=None): | |
| rankedLists = [] | |
| if modelSubset is not None: | |
| model_list = modelSubset | |
| elif numModels is not None: | |
| model_list = self.MODELS.copy() #df.columns.tolist() #list(df['model'].unique()) | |
| model_list = random.sample(model_list, numModels) | |
| else: | |
| model_list = self.MODELS.copy() #df.columns.tolist() #list(df['model'].unique()) | |
| nModels = len(model_list) | |
| self.model_eval = np.full((nModels, nModels, nModels), -1) | |
| for it in tqdm(range(numIter)): | |
| shuffled_list = model_list.copy() | |
| random.shuffle(shuffled_list) | |
| t = random.sample(shuffled_list, 3) | |
| u = [i for i in shuffled_list if i not in t] | |
| t = [LLM_Model(i, df) for i in t] | |
| u = [LLM_Model(i, df) for i in u] | |
| _,rankedList,_ = self.__rankModels(df, self.model_eval, model_list, tuple(t), None, u, [], []) | |
| rankedLists.append(rankedList) | |
| estimated_ranking_lists = [] | |
| ranks = [] | |
| for rl in rankedLists: | |
| estimated_ranking = {i.name(): r+1 for r,i in enumerate(rl)} | |
| rank = [estimated_ranking[name] for name in model_list] #sorted(model_list)] | |
| estimated_ranking_lists.append(estimated_ranking) | |
| ranks.append(rank) | |
| average_estimated_scores = sorted(zip(np.mean(np.array(ranks), axis=0), model_list)) | |
| average_estimated_ranking = [mod for rnk, mod in average_estimated_scores] | |
| #average_scores = [rnk for rnk, mod in zipped] | |
| return model_list, estimated_ranking_lists, average_estimated_ranking, average_estimated_scores | |
| def fit(self, df: pd.DataFrame): | |
| """ | |
| df: Dataframe where each row is a benchmark instance, | |
| and there is a column with the output for each Model | |
| """ | |
| assert set(self.MODELS) == set(df.columns), "Benchmark data models inconsistent with models to be ranked." | |
| #process the dataset | |
| self.df = df #self.__process_dataset(df) | |
| # Build a pairwise preference matrix | |
| #if self.show_progress: | |
| # pbar = tqdm(total=self.N**3, position=0, leave=False, desc="Evaluations") | |
| #if self.show_progress: pbar.update(1) | |
| # Estimate the ranks | |
| _, _, average_estimated_ranking, _ = self.__estimate_rankings(self.df, numIter=1) | |
| #logging.info(f"Iteration {iter}:{delta}") | |
| self.ranking = average_estimated_ranking | |
| logger.info(f"Estimated 'greedy' ranks (best to worst): {self.ranking}") | |
| return self.ranking # Best to worst | |
| def measure(self, metric='rbo', k=5, p=0.95) -> float: | |
| """ | |
| Report metrics related to self-rank | |
| """ | |
| if metric not in ['rbo', 'mapk']: | |
| raise ValueError(f"Metric {metric} not supported (use 'rbo'/'mapk').") | |
| if hasattr(self, 'ranking'): | |
| if self.true_ranking is not None: | |
| if metric == 'mapk': | |
| if k > len(self.true_ranking): | |
| logger.warning(f"MAPk metric is for k={len(self.true_ranking)}, and not k={k}.") | |
| actual = [self.true_ranking[:k]] | |
| pred = [self.ranking[:k]] | |
| return mapk(actual, pred, k=k) | |
| elif metric == 'rbo': | |
| return rank_biased_overlap(self.true_ranking, self.ranking, p=p) | |
| else: | |
| raise ValueError(f"Metric {metric} not understood.") | |
| else: | |
| raise ValueError("True ranking not available for metric calculation.") | |
| else: | |
| raise ValueError("Ranking not estimated. Run 'fit' first.") | |
| def plot(self, caselabel="output"): | |
| if hasattr(self, 'ranking') & (self.true_ranking is not None): | |
| return plot_ranks(self.true_ranking, self.ranking, "actual", "estimated", caselabel) | |