| """ |
| Common data structures and utilities. |
| |
| directory: FastChat/fastchat/llm_judge/common.py |
| """ |
|
|
| import ast |
| import dataclasses |
| import glob |
| import json |
| import os |
| import re |
| import time |
| from typing import Optional |
|
|
| import openai |
| import anthropic |
|
|
| from fastchat.model.model_adapter import ( |
| get_conversation_template, |
| ANTHROPIC_MODEL_LIST, |
| OPENAI_MODEL_LIST, |
| ) |
|
|
| |
| API_MAX_RETRY = 16 |
| API_RETRY_SLEEP = 10 |
| API_ERROR_OUTPUT = "$ERROR$" |
|
|
| TIE_DELTA = 0.1 |
|
|
| |
| NEED_REF_CATS = ["math", "reasoning", "coding", "arena-hard-200"] |
|
|
| |
| two_score_pattern = re.compile("\[\[(\d+\.?\d*),\s?(\d+\.?\d*)\]\]") |
| two_score_pattern_backup = re.compile("\[(\d+\.?\d*),\s?(\d+\.?\d*)\]") |
| one_score_pattern = re.compile("\[\[(\d+\.?\d*)\]\]") |
| one_score_pattern_backup = re.compile("\[(\d+\.?\d*)\]") |
|
|
| |
| temperature_config = { |
| "writing": 0.7, |
| "roleplay": 0.7, |
| "extraction": 0.0, |
| "math": 0.0, |
| "coding": 0.0, |
| "reasoning": 0.0, |
| "stem": 0.1, |
| "humanities": 0.1, |
| "arena-hard-200": 0.0, |
| } |
|
|
| reverse_model_map = { |
| "model_1": "model_2", |
| "model_2": "model_1", |
| } |
|
|
|
|
| @dataclasses.dataclass |
| class Judge: |
| model_name: str |
| prompt_template: dict |
| ref_based: bool = False |
| multi_turn: bool = False |
|
|
|
|
| @dataclasses.dataclass |
| class MatchSingle: |
| question: dict |
| model: str |
| answer: dict |
| judge: Judge |
| ref_answer: dict = None |
| multi_turn: bool = False |
|
|
|
|
| @dataclasses.dataclass |
| class MatchPair: |
| question: dict |
| model_1: str |
| model_2: str |
| answer_1: dict |
| answer_2: dict |
| judge: Judge |
| ref_answer: dict = None |
| multi_turn: bool = False |
|
|
|
|
| def load_questions(question_file: str, begin: Optional[int], end: Optional[int]): |
| """Load questions from a file.""" |
| questions = [] |
| with open(question_file, "r") as ques_file: |
| for line in ques_file: |
| if line: |
| questions.append(json.loads(line)) |
| questions = questions[begin:end] |
| return questions |
|
|
|
|
| def load_model_answers(answer_dir: str): |
| """Load model answers. |
| |
| The return value is a python dict of type: |
| Dict[model_name: str -> Dict[question_id: int -> answer: dict]] |
| """ |
| filenames = glob.glob(os.path.join(answer_dir, "*.jsonl")) |
| filenames.sort() |
| model_answers = {} |
|
|
| for filename in filenames: |
| model_name = os.path.basename(filename)[:-6] |
| answer = {} |
| with open(filename) as fin: |
| for line in fin: |
| line = json.loads(line) |
| answer[line["question_id"]] = line |
| model_answers[model_name] = answer |
|
|
| return model_answers |
|
|
|
|
| def load_judge_prompts(prompt_file: str): |
| """Load judge prompts. |
| |
| The return value is a python dict of type: |
| Dict[judge_name: str -> dict] |
| """ |
| prompts = {} |
| with open(prompt_file) as fin: |
| for line in fin: |
| line = json.loads(line) |
| prompts[line["name"]] = line |
| return prompts |
|
|
|
|
| def run_judge_single(question, answer, judge, ref_answer, multi_turn=False): |
| kwargs = {} |
| model = judge.model_name |
| if ref_answer is not None: |
| kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] |
| if multi_turn: |
| kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] |
|
|
| if multi_turn: |
| user_prompt = judge.prompt_template["prompt_template"].format( |
| question_1=question["turns"][0], |
| question_2=question["turns"][1], |
| answer_1=answer["choices"][0]["turns"][0], |
| answer_2=answer["choices"][0]["turns"][1], |
| **kwargs, |
| ) |
| else: |
| user_prompt = judge.prompt_template["prompt_template"].format( |
| question=question["turns"][0], |
| answer=answer["choices"][0]["turns"][0], |
| **kwargs, |
| ) |
|
|
| rating = -1 |
|
|
| system_prompt = judge.prompt_template["system_prompt"] |
| conv = get_conversation_template(model) |
| conv.set_system_message(system_prompt) |
| conv.append_message(conv.roles[0], user_prompt) |
| conv.append_message(conv.roles[1], None) |
|
|
| if model in OPENAI_MODEL_LIST: |
| judgment = chat_completion_openai(model, conv, temperature=0, max_tokens=2048) |
| elif model in ANTHROPIC_MODEL_LIST: |
| judgment = chat_completion_anthropic( |
| model, conv, temperature=0, max_tokens=1024 |
| ) |
| else: |
| raise ValueError(f"Invalid judge model name: {model}") |
|
|
| if judge.prompt_template["output_format"] == "[[rating]]": |
| match = re.search(one_score_pattern, judgment) |
| if not match: |
| match = re.search(one_score_pattern_backup, judgment) |
|
|
| if match: |
| rating = ast.literal_eval(match.groups()[0]) |
| else: |
| rating = -1 |
| else: |
| raise ValueError( |
| f"invalid output format: {judge.prompt_template['output_format']}" |
| ) |
|
|
| return rating, user_prompt, judgment |
|
|
|
|
| def play_a_match_single(match: MatchSingle, output_file: str): |
| question, model, answer, judge, ref_answer, multi_turn = ( |
| match.question, |
| match.model, |
| match.answer, |
| match.judge, |
| match.ref_answer, |
| match.multi_turn, |
| ) |
|
|
| if judge.prompt_template["type"] == "single": |
| score, user_prompt, judgment = run_judge_single( |
| question, answer, judge, ref_answer, multi_turn=multi_turn |
| ) |
|
|
| question_id = question["question_id"] |
| turn = 1 if not multi_turn else 2 |
| result = { |
| "question_id": question_id, |
| "model": model, |
| "judge": (judge.model_name, judge.prompt_template["name"]), |
| "user_prompt": user_prompt, |
| "judgment": judgment, |
| "score": score, |
| "turn": turn, |
| "tstamp": time.time(), |
| } |
| print( |
| f"question: {question_id}, turn: {turn}, model: {model}, " |
| f"score: {score}, " |
| f"judge: {(judge.model_name, judge.prompt_template['name'])}" |
| ) |
| else: |
| raise ValueError(f"invalid judge type: {judge['type']}") |
|
|
| if output_file: |
| os.makedirs(os.path.dirname(output_file), exist_ok=True) |
| with open(output_file, "a") as fout: |
| fout.write(json.dumps(result) + "\n") |
|
|
| return result |
|
|
|
|
| def run_judge_pair(question, answer_a, answer_b, judge, ref_answer, multi_turn=False): |
| kwargs = {} |
| model = judge.model_name |
| if ref_answer is not None: |
| kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] |
| if multi_turn: |
| kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] |
|
|
| if multi_turn: |
| system_prompt = judge.prompt_template["system_prompt"] |
| user_prompt = judge.prompt_template["prompt_template"].format( |
| question_1=question["turns"][0], |
| question_2=question["turns"][1], |
| answer_a_1=answer_a["choices"][0]["turns"][0], |
| answer_b_1=answer_b["choices"][0]["turns"][0], |
| answer_a_2=answer_a["choices"][0]["turns"][1], |
| answer_b_2=answer_b["choices"][0]["turns"][1], |
| **kwargs, |
| ) |
| else: |
| system_prompt = judge.prompt_template["system_prompt"] |
| user_prompt = judge.prompt_template["prompt_template"].format( |
| question=question["turns"][0], |
| answer_a=answer_a["choices"][0]["turns"][0], |
| answer_b=answer_b["choices"][0]["turns"][0], |
| **kwargs, |
| ) |
|
|
| winner = "error" |
|
|
| conv = get_conversation_template(model) |
| conv.append_message(conv.roles[0], user_prompt) |
| conv.append_message(conv.roles[1], None) |
|
|
| if model in OPENAI_MODEL_LIST: |
| conv.set_system_message(system_prompt) |
| judgment = chat_completion_openai(model, conv, temperature=0, max_tokens=2048) |
| elif model in ANTHROPIC_MODEL_LIST: |
| if system_prompt != "You are a helpful assistant.": |
| user_prompt = "[Instruction]\n" + system_prompt + "\n\n" + user_prompt |
| conv.messages[0][1] = user_prompt |
| judgment = chat_completion_anthropic( |
| model, conv, temperature=0, max_tokens=1024 |
| ) |
| else: |
| raise ValueError(f"Invalid judge model name: {model}") |
|
|
| if judge.prompt_template["output_format"] == "[[A]]": |
| if "[[A]]" in judgment: |
| winner = "A" |
| elif "[[B]]" in judgment: |
| winner = "B" |
| elif "[[C]]" in judgment: |
| winner = "tie" |
| else: |
| winner = "error" |
| elif judge.prompt_template["output_format"] == "[[rating_a,rating_b]]": |
| match = re.search(two_score_pattern, judgment) |
| if not match: |
| match = re.search(two_score_pattern_backup, judgment) |
| if match: |
| scores = [ast.literal_eval(s.strip()) for s in match.groups()] |
| if abs(scores[0] - scores[1]) <= TIE_DELTA: |
| winner = "tie" |
| elif scores[0] > scores[1]: |
| winner = "A" |
| else: |
| winner = "B" |
| else: |
| winner = "error" |
| else: |
| raise ValueError( |
| f"invalid output format: {judge.prompt_template['output_format']}" |
| ) |
|
|
| return winner, user_prompt, judgment |
|
|
|
|
| def play_a_match_pair(match: MatchPair, output_file: str): |
| question, model_1, model_2, answer_1, answer_2, judge, ref_answer, multi_turn = ( |
| match.question, |
| match.model_1, |
| match.model_2, |
| match.answer_1, |
| match.answer_2, |
| match.judge, |
| match.ref_answer, |
| match.multi_turn, |
| ) |
|
|
| if judge.prompt_template["type"] == "pairwise": |
| g1_winner, g1_user_prompt, g1_judgment = run_judge_pair( |
| question, answer_1, answer_2, judge, ref_answer, multi_turn=multi_turn |
| ) |
| g2_winner, g2_user_prompt, g2_judgment = run_judge_pair( |
| question, answer_2, answer_1, judge, ref_answer, multi_turn=multi_turn |
| ) |
|
|
| g1_map = {"A": "model_1", "B": "model_2"} |
| g2_map = {"A": "model_2", "B": "model_1"} |
| g1_winner = g1_map.get(g1_winner, g1_winner) |
| g2_winner = g2_map.get(g2_winner, g2_winner) |
| question_id = question["question_id"] |
| turn = 1 if not multi_turn else 2 |
|
|
| result = { |
| "question_id": question_id, |
| "model_1": model_1, |
| "model_2": model_2, |
| "g1_winner": g1_winner, |
| "g2_winner": g2_winner, |
| "judge": (judge.model_name, judge.prompt_template["name"]), |
| "g1_user_prompt": g1_user_prompt, |
| "g1_judgment": g1_judgment, |
| "g2_user_prompt": g2_user_prompt, |
| "g2_judgment": g2_judgment, |
| "turn": turn, |
| "tstamp": time.time(), |
| } |
|
|
| print( |
| f"question: {question_id}, turn: {turn}, model_1: {model_1}, model_2: {model_2}, " |
| f"g1_winner: {g1_winner}, g2_winner: {g2_winner}, " |
| f"judge: {(judge.model_name, judge.prompt_template['name'])}" |
| ) |
| elif judge.prompt_template["type"] == "single": |
| m1_score, m1_user_prompt, m1_judgment = run_judge_single( |
| question, answer_1, judge |
| ) |
| m2_score, m2_user_prompt, m2_judgment = run_judge_single( |
| question, answer_2, judge |
| ) |
|
|
| if abs(m1_score - m2_score) <= TIE_DELTA: |
| winner = "tie" |
| elif m1_score > m2_score: |
| winner = "model_1" |
| else: |
| winner = "model_2" |
|
|
| question_id = question["question_id"] |
| result = { |
| "question_id": question_id, |
| "model_1": model_1, |
| "model_2": model_2, |
| "g1_winner": winner, |
| "g2_winner": winner, |
| "judge": (judge.model_name, judge.prompt_template["name"]), |
| "g1_user_prompt": m1_user_prompt, |
| "g1_judgment": m1_judgment, |
| "g2_user_prompt": m2_user_prompt, |
| "g2_judgment": m2_judgment, |
| "m1_score": m1_score, |
| "m2_score": m2_score, |
| "tstamp": time.time(), |
| } |
| print( |
| f"question: {question_id}, model_1: {model_1}, model_2: {model_2}, " |
| f"winner: {winner}, m1_score: {m1_score}, m2_score: {m2_score}, " |
| f"judge: {(judge.model_name, judge.prompt_template['name'])}" |
| ) |
| else: |
| raise ValueError(f"invalid judge type: {judge['type']}") |
|
|
| if output_file: |
| os.makedirs(os.path.dirname(output_file), exist_ok=True) |
| with open(output_file, "a") as fout: |
| fout.write(json.dumps(result) + "\n") |
|
|
| return result |
|
|
|
|
| def chat_completion_openai(model, conv, temperature, max_tokens, api_dict=None): |
| if api_dict is not None: |
| openai.api_base = "https://openrouter.ai/api/v1" |
| openai.api_key = api_dict["api_key"] |
| openai.api_base = "https://openrouter.ai/api/v1" |
| output = API_ERROR_OUTPUT |
| for _ in range(API_MAX_RETRY): |
| try: |
| messages = conv.to_openai_api_messages() |
| response = openai.ChatCompletion.create( |
| model="openai/gpt-5-nano", |
| messages=messages, |
| n=1, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| extra_body={ |
| "reasoning": { |
| "effort": "minimal" |
| } |
| } |
| ) |
| output = response["choices"][0]["message"]["content"] |
| break |
| except openai.error.OpenAIError as e: |
| print(type(e), e) |
| time.sleep(API_RETRY_SLEEP) |
|
|
| return output |
|
|
|
|
| def chat_completion_openai_azure(model, conv, temperature, max_tokens, api_dict=None): |
| openai.api_type = "azure" |
| openai.api_version = "2023-07-01-preview" |
| if api_dict is not None: |
| openai.api_base = api_dict["api_base"] |
| openai.api_key = api_dict["api_key"] |
| else: |
| openai.api_base = os.environ["AZURE_OPENAI_ENDPOINT"] |
| openai.api_key = os.environ["AZURE_OPENAI_KEY"] |
|
|
| if "azure-" in model: |
| model = model[6:] |
|
|
| output = API_ERROR_OUTPUT |
| for _ in range(API_MAX_RETRY): |
| try: |
| messages = conv.to_openai_api_messages() |
| response = openai.ChatCompletion.create( |
| engine=model, |
| messages=messages, |
| n=1, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| ) |
| output = response["choices"][0]["message"]["content"] |
| break |
| except openai.error.OpenAIError as e: |
| print(type(e), e) |
| time.sleep(API_RETRY_SLEEP) |
| except openai.error.InvalidRequestError as e: |
| print(type(e), e) |
| break |
| except KeyError: |
| print(response) |
| break |
|
|
| return output |
|
|
|
|
| def chat_completion_anthropic(model, conv, temperature, max_tokens, api_dict=None): |
| if api_dict is not None and "api_key" in api_dict: |
| api_key = api_dict["api_key"] |
| else: |
| api_key = os.environ["ANTHROPIC_API_KEY"] |
|
|
| output = API_ERROR_OUTPUT |
| for _ in range(API_MAX_RETRY): |
| try: |
| c = anthropic.Anthropic(api_key=api_key) |
| prompt = conv.get_prompt() |
| response = c.completions.create( |
| model=model, |
| prompt=prompt, |
| stop_sequences=[anthropic.HUMAN_PROMPT], |
| max_tokens_to_sample=max_tokens, |
| temperature=temperature, |
| ) |
| output = response.completion |
| break |
| except anthropic.APIError as e: |
| print(type(e), e) |
| time.sleep(API_RETRY_SLEEP) |
| return output.strip() |
|
|
|
|
| def chat_completion_palm(chat_state, model, conv, temperature, max_tokens): |
| from fastchat.serve.api_provider import init_palm_chat |
|
|
| assert model == "palm-2-chat-bison-001" |
|
|
| if chat_state is None: |
| chat_state = init_palm_chat("chat-bison@001") |
|
|
| parameters = { |
| "temperature": temperature, |
| "top_p": 0.8, |
| "top_k": 40, |
| "max_output_tokens": max_tokens, |
| } |
| output = API_ERROR_OUTPUT |
| for _ in range(API_MAX_RETRY): |
| try: |
| response = chat_state.send_message(conv.messages[-2][1], **parameters) |
| output = response.text |
| break |
| except Exception as e: |
| print(type(e), e) |
| time.sleep(API_RETRY_SLEEP) |
| return chat_state, output |
|
|
|
|
| def normalize_game_key_single(gamekey, result): |
| """Make the model names sorted in a game key.""" |
| qid, model_1, model_2 = gamekey |
| if model_1 < model_2: |
| return gamekey, result |
| else: |
| new_gamekey = (qid, model_2, model_1) |
| new_result = { |
| "winners": tuple(reverse_model_map.get(x, x) for x in result["winners"]), |
| "g1_judgment": result["g2_judgment"], |
| "g2_judgment": result["g1_judgment"], |
| } |
| return new_gamekey, new_result |
|
|
|
|
| def normalize_game_key_dict(judgment_dict): |
| """Make the model names sorted in the game keys.""" |
| ret = {} |
| for key, value in judgment_dict.items(): |
| new_key, new_value = normalize_game_key_single(key, value) |
| ret[new_key] = new_value |
| return ret |
|
|
|
|
| def load_pairwise_model_judgments(filename: str): |
| """Load model judgments. |
| |
| The return value is a dict of type: |
| Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] |
| """ |
| judge_dict = {} |
|
|
| for line in open(filename): |
| obj = json.loads(line) |
| judge = tuple(obj["judge"]) |
| qid, model_1, model_2 = obj["question_id"], obj["model_1"], obj["model_2"] |
|
|
| if judge not in judge_dict: |
| judge_dict[judge] = {} |
|
|
| if "winner" in obj: |
| winner = obj["winner"] |
| elif "g1_winner" in obj and "g2_winner" in obj: |
| g1_winner, g2_winner = obj["g1_winner"], obj["g2_winner"] |
| if g1_winner == g2_winner: |
| winner = g1_winner |
| else: |
| winner = "inconsistent" |
| else: |
| raise ValueError(f"Invalid keys: {list(obj.keys())}") |
|
|
| gamekey = (qid, model_1, model_2) |
| winners = (winner,) |
|
|
| judge_dict[judge][gamekey] = { |
| "winners": winners, |
| "g1_judgment": obj["g1_judgment"], |
| "g2_judgment": obj["g2_judgment"], |
| } |
|
|
| |
| normalized = {} |
| for judge, value in judge_dict.items(): |
| normalized[judge] = normalize_game_key_dict(value) |
| return normalized |
|
|
|
|
| def load_single_model_judgments(filename: str): |
| """Load model judgments. |
| |
| The return value is a dict of type: |
| Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] |
| """ |
| judge_dict = {} |
|
|
| for line in open(filename): |
| obj = json.loads(line) |
| judge = tuple(obj["judge"]) |
| qid, model = obj["question_id"], obj["model"] |
|
|
| if judge not in judge_dict: |
| judge_dict[judge] = {} |
|
|
| gamekey = (qid, model) |
|
|
| judge_dict[judge][gamekey] = { |
| "score": obj["score"], |
| "judgment": obj["judgment"], |
| } |
| return judge_dict |
|
|
|
|
| def resolve_pairwise_judgment_dict( |
| question, model_judgments_normal, model_judgments_math, multi_turn=False |
| ): |
| """Return the correct pairwise judge.""" |
| if multi_turn: |
| if question["category"] in NEED_REF_CATS: |
| return model_judgments_math[("gpt-4", "pair-math-v1-multi-turn")] |
| return model_judgments_normal[("gpt-4", "pair-v2-multi-turn")] |
|
|
| if question["category"] in NEED_REF_CATS: |
| return model_judgments_math[("gpt-4", "pair-math-v1")] |
| else: |
| return model_judgments_normal[("gpt-4", "pair-v2")] |
|
|
|
|
| def resolve_single_judgment_dict( |
| question, model_judgments_normal, model_judgments_math, multi_turn=False |
| ): |
| """Return the correct single answer grading judge.""" |
| if multi_turn: |
| if question["category"] in NEED_REF_CATS: |
| return model_judgments_math[("gpt-4", "single-math-v1-multi-turn")] |
| return model_judgments_normal[("gpt-4", "single-v1-multi-turn")] |
|
|
| if question["category"] in NEED_REF_CATS: |
| return model_judgments_math[("gpt-4", "single-math-v1")] |
| else: |
| return model_judgments_normal[("gpt-4", "single-v1")] |
|
|
|
|
| def get_pairwise_judge_explanation(gamekey, judgment_dict): |
| """Get model judge explanation.""" |
| try: |
| qid, model_1, model_2 = gamekey |
| if model_1 < model_2: |
| res = judgment_dict[gamekey] |
| g1_judgment, g2_judgment = res["g1_judgment"], res["g2_judgment"] |
| else: |
| new_gamekey = (qid, model_2, model_1) |
| res = judgment_dict[new_gamekey] |
|
|
| model_1, model_2 = model_1, model_2 |
| g1_judgment, g2_judgment = res["g2_judgment"], res["g1_judgment"] |
|
|
| return ( |
| f"**Game 1**. **A**: {model_1}, **B**: {model_2}\n\n" |
| f"**Judgment**: {g1_judgment}" |
| + f"\n\n`--------------------------`\n\n" |
| + f"**Game 2**. **A**: {model_2}, **B**: {model_1}\n\n" |
| f"**Judgment**: {g2_judgment}" |
| ) |
| except KeyError: |
| return "N/A" |
|
|
|
|
| def get_single_judge_explanation(gamekey, judgment_dict): |
| """Get model judge explanation.""" |
| try: |
| qid, model = gamekey |
|
|
| res = judgment_dict[gamekey] |
|
|
| g1_judgment = res["judgment"] |
| g1_score = res["score"] |
|
|
| return ( |
| f"**Game 1**. **A**: {model}, **Score**: {g1_score}\n\n" |
| f"**Judgment**: {g1_judgment}" |
| ) |
| except KeyError: |
| return "N/A" |
|
|
|
|
| def check_data(questions, model_answers, ref_answers, models, judges): |
| |
| for m in models: |
| assert m in model_answers, f"Missing model answer for {m}" |
| m_answer = model_answers[m] |
| for q in questions: |
| assert ( |
| q["question_id"] in m_answer |
| ), f"Missing model {m}'s answer to Question {q['question_id']}" |
| |
| for jg in judges.values(): |
| if not jg.ref_based: |
| continue |
| for q in questions: |
| if q["category"] not in NEED_REF_CATS: |
| continue |
| |
| |
| |
|
|
|
|
| def get_model_list(answer_dir): |
| file_paths = glob.glob(f"{answer_dir}/*.jsonl") |
| file_names = [os.path.splitext(os.path.basename(f))[0] for f in file_paths] |
| return file_names |