Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import os.path as osp | |
| import random | |
| import re | |
| from collections import Counter | |
| from typing import Union, List, Dict, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from agentreview import const | |
| from agentreview.utility.general_utils import check_cwd, set_seed | |
| def generate_num_papers_to_accept(n, batch_number, shuffle=True): | |
| # Calculate the base value (minimum value in the array) | |
| base_value = int(n // batch_number) | |
| # Calculate how many elements need to be base_value + 1 | |
| remainder = int(n % batch_number) | |
| # Initialize the array | |
| array = [] | |
| # Add the elements to the array | |
| for i in range(batch_number): | |
| if i < remainder: | |
| array.append(base_value + 1) | |
| else: | |
| array.append(base_value) | |
| if shuffle: | |
| random.shuffle(array) | |
| return array | |
| def get_papers_accepted_by_llm(llm_ac_decisions, acceptance_rate: float) -> list: | |
| papers_accepted_by_llm = [] | |
| num_papers = sum([len(batch) for batch in llm_ac_decisions]) | |
| if num_papers == 0: | |
| raise ValueError("No papers found in batch") | |
| num_papers_to_accept = generate_num_papers_to_accept(n=acceptance_rate * num_papers, | |
| batch_number=len(llm_ac_decisions)) | |
| for idx_batch, batch in enumerate(llm_ac_decisions): | |
| tups = sorted([(paper_id, rank) for paper_id, rank in batch.items()], key=lambda x: x[1], reverse=False) | |
| paper_ids = [int(paper_id) for paper_id, rank in tups] | |
| papers_accepted_by_llm += paper_ids[:num_papers_to_accept[idx_batch]] | |
| return papers_accepted_by_llm | |
| def get_paper_decision_mapping(data_dir: str, conference: str, verbose: bool = False): | |
| paper_id2decision, paper_decision2ids = {}, {} | |
| path_paper_id2decision = os.path.join(data_dir, conference, "id2decision.json") | |
| path_paper_decision2ids = os.path.join(data_dir, conference, "decision2ids.json") | |
| if osp.exists(path_paper_id2decision) and osp.exists(path_paper_decision2ids): | |
| paper_id2decision = json.load(open(path_paper_id2decision, 'r', encoding='utf-8')) | |
| paper_decision2ids = json.load(open(path_paper_decision2ids, 'r', encoding='utf-8')) | |
| paper_id2decision = {int(k): v for k, v in paper_id2decision.items()} | |
| if verbose: | |
| print(f"Loaded {len(paper_id2decision)} paper IDs to decisions from {path_paper_id2decision}") | |
| else: | |
| PAPER_DECISIONS = get_all_paper_decisions(conference) | |
| for paper_decision in PAPER_DECISIONS: | |
| paper_ids = os.listdir(os.path.join(data_dir, conference, "notes", paper_decision)) | |
| paper_ids = sorted( | |
| [int(paper_id.split(".json")[0]) for paper_id in paper_ids if paper_id.endswith(".json")]) | |
| paper_id2decision.update({paper_id: paper_decision for paper_id in paper_ids}) | |
| paper_decision2ids[paper_decision] = paper_ids | |
| if verbose: | |
| print(f"{paper_decision}: {len(paper_ids)} papers") | |
| json.dump(paper_id2decision, open(path_paper_id2decision, 'w', encoding='utf-8'), indent=2) | |
| json.dump(paper_decision2ids, open(path_paper_decision2ids, 'w', encoding='utf-8'), indent=2) | |
| return paper_id2decision, paper_decision2ids | |
| def project_setup(): | |
| check_cwd() | |
| import warnings | |
| import pandas as pd | |
| warnings.simplefilter(action='ignore', category=FutureWarning) | |
| pd.set_option('display.max_rows', 40) | |
| pd.set_option('display.max_columns', 20) | |
| set_seed(42) | |
| def get_next_review_id(path: str) -> int: | |
| existing_review_ids = sorted([int(x.split('.json')[0].split('_')[1]) for x in os.listdir(path)]) | |
| next_review_id = 1 | |
| while next_review_id in existing_review_ids: | |
| next_review_id += 1 | |
| print(f"Next review ID: {next_review_id}") | |
| return next_review_id | |
| def filter_paper_ids_from_initial_experiments(sampled_paper_ids: List[int]): | |
| paper_ids_initial_experiments = json.load(open(f"outputs/paper_ids_initial_experiments.json")) | |
| sampled_paper_ids = set(sampled_paper_ids) - set(paper_ids_initial_experiments) | |
| sampled_paper_ids = sorted(list(sampled_paper_ids)) | |
| return sampled_paper_ids | |
| def get_paper_review_and_rebuttal_dir(reviewer_type: str, conference: str, model_name: str, paper_id: int = None): | |
| if reviewer_type == "NoOverallScore": | |
| reviewer_type = "BASELINE" | |
| path = f"outputs/paper_review_and_rebuttal" \ | |
| f"/{conference}/" \ | |
| f"{get_model_name_short(model_name)}/{reviewer_type}" | |
| if paper_id is not None: | |
| path += f"/{paper_id}" | |
| return path | |
| def get_rebuttal_dir(output_dir: str, | |
| paper_id: Union[str, int, None], | |
| experiment_name: str, | |
| model_name: str, | |
| conference: str): | |
| path = os.path.join(output_dir, "paper_review", conference, get_model_name_short(model_name), | |
| experiment_name) | |
| if paper_id is not None: | |
| path += f"/{paper_id}" | |
| return path | |
| def print_colored(text, color='red'): | |
| # Dictionary of ANSI color codes for terminal | |
| foreground_colors = { | |
| 'black': 30, | |
| 'red': 31, | |
| 'green': 32, | |
| 'yellow': 33, | |
| 'blue': 34, | |
| 'magenta': 35, | |
| 'cyan': 36, | |
| 'white': 37, | |
| } | |
| try: | |
| # get_ipython is specific to Jupyter and IPython. | |
| # We use this to decide whether we are running a Jupyter notebook or not. | |
| get_ipython | |
| print(text) # Plain text in Jupyter | |
| except: | |
| # If not Jupyter, print with color codes | |
| color_code = foreground_colors.get(color, 31) # Default to red if color not found | |
| print(f"\033[{color_code}m{text}\033[0m") | |
| def get_ac_decision_path(output_dir: str, conference: str, model_name: str, ac_scoring_method: str, experiment_name: | |
| str): | |
| ac_decision_dir = os.path.join(output_dir, "decisions", conference, | |
| get_model_name_short(model_name), | |
| f"decisions_thru_{ac_scoring_method}") | |
| os.makedirs(ac_decision_dir, exist_ok=True) | |
| if isinstance(experiment_name, str): | |
| ac_decision_dir += f"/decision_{experiment_name}.json" | |
| return ac_decision_dir | |
| def load_metareview(paper_id: int, **kwargs): | |
| rebuttal_dir = get_rebuttal_dir(paper_id=paper_id, **kwargs) | |
| path = f"{rebuttal_dir}/{paper_id}.json" | |
| if not osp.exists(path): | |
| print(f"Not Found: {path}") | |
| return None | |
| try: | |
| reviews = json.load(open(path)) | |
| metareview = reviews["messages"][-1] | |
| if not metareview["agent_name"].startswith("AC"): | |
| return None | |
| return metareview['content'] | |
| except FileNotFoundError: | |
| return None | |
| def get_reviewer_type_from_profile(profile: dict): | |
| """ | |
| Get a short name for the reviewer's type from the reviewer's experiment profile. | |
| Input: | |
| { | |
| 'is_benign': True, | |
| 'is_knowledgeable': None, | |
| 'is_responsible': None, | |
| 'provides_numeric_rating': True | |
| } | |
| Output: | |
| "benign" | |
| Input: | |
| { | |
| 'is_benign': False, | |
| 'is_knowledgeable': None, | |
| 'is_responsible': None, | |
| 'provides_numeric_rating': True | |
| } | |
| Output: | |
| "malicious" | |
| Input: | |
| { | |
| 'is_benign': None, | |
| 'is_knowledgeable': None, | |
| 'is_responsible': None, | |
| 'provides_numeric_rating': True | |
| } | |
| Output: | |
| "default" | |
| """ | |
| reviewer_attributes = Counter([profile[k] for k in ["is_benign", 'is_knowledgeable', 'is_responsible']]) | |
| assert (reviewer_attributes[True] <= 1 and reviewer_attributes[False] <= 1) and reviewer_attributes[None] >= 2, \ | |
| ("A reviewer can only have 0 or 1 of " | |
| "these " | |
| "properties profile to True or False") | |
| if profile['is_benign']: | |
| return "benign" | |
| elif profile['is_benign'] == False: | |
| # NOTE: We cannot use `not profile['is_benign']` as we need to consider the case where `profile['is_benign']` | |
| # is | |
| # None | |
| return "malicious" | |
| elif profile['is_knowledgeable']: | |
| return "knowledgeable" | |
| elif profile['is_knowledgeable'] == False: | |
| # Same as above | |
| return "unknowledgeable" | |
| elif profile['is_responsible']: | |
| return "responsible" | |
| elif profile['is_responsible'] == False: | |
| # Same as above | |
| return "irresponsible" | |
| elif profile['provides_numeric_rating'] == False: | |
| return "NoOverallScore" | |
| elif profile.get('knows_authors') == "famous": | |
| return "authors_are_famous" | |
| elif profile.get('knows_authors') == "unfamous": | |
| return "authors_are_unfamous" | |
| else: | |
| return "BASELINE" | |
| def get_ac_type_from_profile(profile: dict): | |
| return None | |
| # def get_ac_type_from_profile(profile: dict): | |
| # """ | |
| # Get a short name for the area chair's type from their profile in the experiment setting. | |
| # | |
| # """ | |
| def format_metareviews(metareviews: List[str], paper_ids: List[int]): | |
| metareviews_formatted = "" | |
| for paper_id, metareview in zip(paper_ids, metareviews): | |
| metareview = re.sub('\n+', '\n', metareview) | |
| metareviews_formatted += (f"Paper ID: {paper_id}\nMetareview: " | |
| f"{metareview}\n{'-' * 5}\n") | |
| return metareviews_formatted | |
| def get_all_paper_decisions(conference: str) -> List[str]: | |
| if conference in ["ICLR2019", "ICLR2018"]: | |
| return const.PAPER_DECISIONS_ICLR2019 | |
| else: | |
| return const.PAPER_DECISIONS | |
| def get_paper_ids_of_known_authors(conference: str, num_papers: int, decision: str = None): | |
| paper_id2decision, paper_decision2ids = get_paper_decision_mapping(conference) | |
| paper_ids_of_famous_authors = paper_decision2ids[decision][:num_papers] | |
| return paper_ids_of_famous_authors | |
| def get_experiment_names(conference: str = "ICLR2023"): | |
| experiment_names = ["BASELINE"] | |
| # The following are settings for reviewer types | |
| # Varying reviewer commitment | |
| experiment_names += ["responsible_Rx1"] | |
| experiment_names += ["irresponsible_Rx1"] | |
| # Varying reviewer intention | |
| experiment_names += ["benign_Rx1"] | |
| experiment_names += ["malicious_Rx1"] | |
| # Varying reviewer knowledgeability | |
| experiment_names += ["knowledgeable_Rx1"] | |
| experiment_names += ["unknowledgeable_Rx1"] | |
| # The following are settings for AC types | |
| experiment_names += ["conformist_ACx1", "authoritarian_ACx1", "inclusive_ACx1"] | |
| # Enable these for ICLR2023 | |
| if conference == "ICLR2023": | |
| experiment_names += ["no_rebuttal"] | |
| experiment_names += ["no_overall_score"] | |
| experiment_names += ["malicious_Rx2"] | |
| experiment_names += ["malicious_Rx3"] | |
| experiment_names += ["irresponsible_Rx2"] | |
| experiment_names += ["irresponsible_Rx3"] | |
| experiment_names += ["authors_are_famous_Rx1"] | |
| experiment_names += ["authors_are_famous_Rx2"] | |
| experiment_names += ["authors_are_famous_Rx3"] | |
| return experiment_names | |
| def load_llm_ac_decisions_as_array( | |
| output_dir: str, | |
| experiment_name: str, | |
| ac_scoring_method: str, | |
| acceptance_rate: float, | |
| conference: str, | |
| model_name: str, | |
| num_papers_per_area_chair: int | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """Loads and processes GPT-4 generated area chair (AC) decisions for an experiment. | |
| Args: | |
| experiment_name (str): Name of the experiment. | |
| ac_scoring_method (str): Method used for AC scoring ('ranking' or 'recommendation'). | |
| acceptance_rate (float): Acceptance rate for the conference. | |
| conference (str): Name of the conference. | |
| model_name (str): Model name used to generate AC decisions. | |
| num_papers_per_area_chair (int): Number of papers assigned to each area chair. | |
| Returns: | |
| Tuple[np.ndarray, np.ndarray]: An array of decisions (True for accept, False for reject) | |
| and an array of paper IDs in the order processed. | |
| Raises: | |
| NotImplementedError: If `ac_scoring_method` is not 'ranking' or 'recommendation'. | |
| """ | |
| print("=" * 30) | |
| print(f"Experiment Name: {experiment_name}") | |
| llm_ac_decisions = load_llm_ac_decisions( | |
| output_dir=output_dir, | |
| conference=conference, | |
| model_name=model_name, | |
| ac_scoring_method=ac_scoring_method, | |
| experiment_name=experiment_name, | |
| num_papers_per_area_chair=num_papers_per_area_chair | |
| ) | |
| paper_ids = sorted( | |
| int(paper_id) for batch in llm_ac_decisions for paper_id in batch | |
| ) | |
| if ac_scoring_method == "ranking": | |
| if len(paper_ids) != len(set(paper_ids)): | |
| raise ValueError(f"Duplicate paper_ids found in the AC decisions: {Counter(paper_ids)}") | |
| papers_accepted_by_llm = get_papers_accepted_by_llm(llm_ac_decisions, acceptance_rate) | |
| decisions_llm = np.array([paper_id in papers_accepted_by_llm for paper_id in paper_ids]) | |
| elif ac_scoring_method == "recommendation": | |
| llm_ac_decisions_flat = {int(k): v for batch in llm_ac_decisions for k, v in batch.items()} | |
| decisions_llm = np.array( | |
| [llm_ac_decisions_flat[paper_id].startswith("Accept") for paper_id in paper_ids] | |
| ) | |
| else: | |
| raise NotImplementedError(f"Scoring method '{ac_scoring_method}' not implemented.") | |
| return decisions_llm, np.array(paper_ids) | |
| def load_llm_ac_decisions( | |
| output_dir: str, | |
| conference: str, | |
| model_name: str, | |
| ac_scoring_method: str, | |
| experiment_name: str, | |
| num_papers_per_area_chair: int | |
| ) -> List[Dict[str, str]]: | |
| """Loads GPT-4 generated area chair (AC) decisions from a specified path. | |
| Args: | |
| conference (str): Name of the conference. | |
| model_name (str): Model name used to generate AC decisions. | |
| ac_scoring_method (str): Method used for AC scoring ('ranking' or 'recommendation'). | |
| experiment_name (str): Name of the experiment. | |
| num_papers_per_area_chair (int): Number of papers assigned to each area chair. | |
| Returns: | |
| List[Dict[str, str]]: List of batches, where each batch contains paper ID and decision. | |
| Raises: | |
| AssertionError: If a non-final batch has a paper count different from `num_papers_per_area_chair`. | |
| """ | |
| path = get_ac_decision_path( | |
| output_dir=output_dir, | |
| conference=conference, | |
| model_name=model_name, | |
| ac_scoring_method=ac_scoring_method, | |
| experiment_name=experiment_name | |
| ) | |
| if osp.exists(path): | |
| with open(path, 'r', encoding='utf-8') as file: | |
| ac_decision = json.load(file) | |
| print(f"Loaded {len(ac_decision)} batches of existing AC decisions from {path}") | |
| else: | |
| ac_decision = [] | |
| print(f"No existing AC decisions found at {path}") | |
| ac_decision = [batch for batch in ac_decision if batch] # Remove empty batches | |
| for i, batch in enumerate(ac_decision): | |
| if i != len(ac_decision) - 1: | |
| if len(batch) != num_papers_per_area_chair: | |
| raise AssertionError( | |
| f"Batch {i} has {len(batch)} papers, expected {num_papers_per_area_chair} for non-final batches." | |
| ) | |
| return ac_decision | |
| def write_to_excel(data, file_path, sheet_name): | |
| """ | |
| Write data to an Excel file. | |
| Parameters: | |
| data (pd.DataFrame): The data to write to the Excel file. | |
| file_path (str): The path to the Excel file. | |
| sheet_name (str): The name of the sheet to write to. | |
| """ | |
| # Check if the file exists | |
| if os.path.exists(file_path): | |
| # If the file exists, load it | |
| with pd.ExcelWriter(file_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer: | |
| data.to_excel(writer, sheet_name=sheet_name, index=False) | |
| else: | |
| # If the file does not exist, create it | |
| with pd.ExcelWriter(file_path, engine='openpyxl') as writer: | |
| data.to_excel(writer, sheet_name=sheet_name, index=False) | |
| def save_llm_ac_decisions(ac_decisions: List[dict], **kwargs): | |
| path = get_ac_decision_path(**kwargs) | |
| json.dump(ac_decisions, open(path, 'w', encoding='utf-8'), indent=2) | |
| def get_model_name_short(name: str): | |
| """ | |
| Convert long model names (e.g. `gpt-35-turbo`) to short model names (e.g. `gpt-35`) | |
| Args: | |
| name (str): long model name | |
| Returns: | |
| str: short model name | |
| """ | |
| assert name.startswith('gpt-') | |
| return '-'.join(name.split('-')[:2]) | |
| def get_reviewer_types_from_experiment_name(experiment_name: str): | |
| if experiment_name in ["BASELINE", 'inclusive_ACx1', 'authoritarian_ACx1', 'conformist_ACx1', | |
| "no_rebuttal"]: | |
| reviewer_types = ["BASELINE", "BASELINE", "BASELINE"] | |
| elif experiment_name == "benign_Rx1": | |
| reviewer_types = ["benign", "BASELINE", "BASELINE"] | |
| elif experiment_name == "benign_Rx2": | |
| reviewer_types = ["benign", "benign", "BASELINE"] | |
| elif experiment_name == "malicious_Rx1": | |
| reviewer_types = ["malicious", "BASELINE", "BASELINE"] | |
| elif experiment_name == "malicious_Rx2": | |
| reviewer_types = ["malicious", "malicious", "BASELINE"] | |
| elif experiment_name == "malicious_Rx3": | |
| reviewer_types = ["malicious", "malicious", "malicious"] | |
| elif experiment_name == "knowledgeable_Rx1": | |
| reviewer_types = ["knowledgeable", "BASELINE", "BASELINE"] | |
| elif experiment_name == "unknowledgeable_Rx1": | |
| reviewer_types = ["unknowledgeable", "BASELINE", "BASELINE"] | |
| elif experiment_name == "responsible_Rx1": | |
| reviewer_types = ["responsible", "BASELINE", "BASELINE"] | |
| elif experiment_name == "irresponsible_Rx1": | |
| reviewer_types = ["irresponsible", "BASELINE", "BASELINE"] | |
| elif experiment_name == "irresponsible_Rx2": | |
| reviewer_types = ["irresponsible", "irresponsible", "BASELINE"] | |
| elif experiment_name == "irresponsible_Rx3": | |
| reviewer_types = ["irresponsible", "irresponsible", "irresponsible"] | |
| elif experiment_name in ["no_overall_score"]: | |
| reviewer_types = ["NoOverallScore", "NoOverallScore", "NoOverallScore"] | |
| elif experiment_name in ["authors_are_famous_Rx1", "authors_are_famous_Rx1_no_rebuttal"]: | |
| reviewer_types = ["authors_are_famous", "BASELINE", "BASELINE"] | |
| elif experiment_name in ["authors_are_famous_Rx2", "authors_are_famous_Rx2_no_rebuttal"]: | |
| reviewer_types = ["authors_are_famous", "authors_are_famous", "BASELINE"] | |
| elif experiment_name in ["authors_are_famous_Rx3", "authors_are_famous_Rx3_no_rebuttal"]: | |
| reviewer_types = ["authors_are_famous", "authors_are_famous", "authors_are_famous"] | |
| else: | |
| raise NotImplementedError | |
| return reviewer_types | |