import json import re import os from datetime import datetime from glob import glob from argparse import Namespace import gradio as gr from autoreview import const from autoreview.config import AgentConfig from autoreview.agent import Player from autoreview.backends import BACKEND_REGISTRY from autoreview.environments import PaperReview from autoreview.paper_review_arena import PaperReviewArena from autoreview.utility.experiment_utils import initialize_players from autoreview.paper_review_player import PaperExtractorPlayer, Reviewer from autoreview.role_descriptions import (get_reviewer_description, get_paper_extractor_config) from autoreview.paper_type_classifier import PaperTypeClassifier from llama_index.readers.file.docs import PDFReader # 该文件的使命是前端交互:构建前端页面,从页面中获取用户的配置,传入后端运行,将结果实时展示到相应模块 css = """ /* 全局样式 */ * { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } /* 主容器 */ #col-container { max-width: 95%; margin: 0 auto; display: flex; flex-direction: column; background: #e3f2fd; min-height: 100vh; padding: 20px; border-radius: 15px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); } /* 头部样式 */ #header { text-align: center; background: rgba(255,255,255,0.95); padding: 30px; border-radius: 15px; margin-bottom: 20px; box-shadow: 0 10px 30px rgba(0,0,0,0.1); } #header h1 { color: #2c3e50; font-size: 2.5em; margin: 0 0 10px 0; font-weight: 700; } #header p { color: #7f8c8d; font-size: 1.1em; margin: 0; } /* 聊天区域 */ #col-chatbox { flex: 1; max-height: min(900px, 100%); background: rgba(255,255,255,0.95); border-radius: 15px; padding: 20px; box-shadow: 0 10px 30px rgba(0,0,0,0.1); } /* 配置区域 */ #col-config { background: rgba(255,255,255,0.95); border-radius: 15px; padding: 20px; box-shadow: 0 10px 30px rgba(0,0,0,0.1); margin-left: 20px; } /* 标签样式 */ #label { font-size: 2em; padding: 0.5em; margin: 0; color: #2c3e50; } /* 消息样式 */ .message { font-size: 1.1em; line-height: 1.6; } .message-wrap { max-height: min(700px, 100vh); } /* 按钮样式 */ button { background: #FFFFFF; border: none; color: black; padding: 12px 24px; border-radius: 25px; font-weight: 600; transition: all 0.3s ease; } button:hover { transform: translateY(-2px); } /* 下拉框样式 */ select, .gr-dropdown { border-radius: 10px; border: 2px solid #e0e0e0; padding: 8px 12px; transition: all 0.3s ease; } select:focus, .gr-dropdown:focus { border-color: #FFFFFF; box-shadow: 0 0 0 3px rgba(100, 181, 246, 0.1); } /* 文本框样式 */ textarea, .gr-textbox { border-radius: 10px; border: 2px solid #e0e0e0; padding: 12px; transition: all 0.3s ease; } textarea:focus, .gr-textbox:focus { border-color: #FFFFFF; box-shadow: 0 0 0 3px rgba(100, 181, 246, 0.1); } /* 滑块样式 */ .gr-slider { border-radius: 10px; } /* 标签页样式 */ .tabs { background: rgba(255,255,255,0.9); border-radius: 10px; padding: 15px; margin-bottom: 15px; } /* 文件上传区域 */ .gr-file, .file-upload, .file-upload-area, .file-drop-zone { border: 2px dashed #FFFFFF; border-radius: 15px; padding: 20px; text-align: center; background: rgba(100, 181, 246, 0.05); transition: all 0.3s ease; color: white !important; } .gr-file:hover, .file-upload:hover, .file-upload-area:hover, .file-drop-zone:hover { background: rgba(100, 181, 246, 0.1); border-color: #42a5f5; color: white !important; } /* 文件上传组件内的文本 */ .gr-file *, .file-upload *, .file-upload-area *, .file-drop-zone * { color: white !important; } /* 响应式设计 */ @media (max-width: 768px) { #col-container { max-width: 100%; padding: 10px; } #col-config { margin-left: 0; margin-top: 20px; } } """ # .wrap {min-width: min(640px, 100vh)} # #env-desc {max-height: 100px; overflow-y: auto;} # .textarea {height: 100px; max-height: 100px;} # #chatbot-tab-all {height: 750px; max-height: min(750px, 100%);} # #chatbox {height: min(750px, 100%); max-height: min(750px, 100%);} # #chatbox.block {height: 730px} # .wrap {max-height: 680px;} # .scroll-hide {overflow-y: scroll; max-height: 100px;} DEBUG = False DEFAULT_BACKEND = "openai-chat" MAX_NUM_PLAYERS = 3 # Changed from 5 to 3 - only reviewers DEFAULT_NUM_PLAYERS = 3 # Changed from 5 to 3 - only reviewers CURRENT_STEP_INDEX = 0 os.environ['OPENAI_API_KEY'] = 'sk-Aors1iVXAbgd7sGwC9Ff781c75D14b74A71d4e63F1E46b68' os.environ['OPENAI_BASEURL'] = 'https://api2.aigcbest.top/v1' def load_examples(): example_configs = {} # Load json config files from examples folder example_files = glob("examples/*.json") for example_file in example_files: with open(example_file, encoding="utf-8") as f: example = json.load(f) try: example_configs[example["name"]] = example except KeyError: print(f"Example {example_file} is missing a name field. Skipping.") return example_configs EXAMPLE_REGISTRY = load_examples() # DB = SupabaseDB() if supabase_available else None def auto_detect_paper_type(pdf_file): """Automatically detect paper type from uploaded PDF file.""" if pdf_file is None: return "Technical Paper" # Default fallback try: # Create a temporary paper extractor to get paper content from pathlib import Path import tempfile # Extract text from PDF loader = PDFReader() document_path = Path(pdf_file.name) documents = loader.load_data(file=document_path) # Combine all text paper_content = "" for doc in documents: paper_content += doc.text + " " # Use classifier to detect paper type classifier = PaperTypeClassifier() detected_type = classifier.classify_paper(paper_content[:2000], use_llm=True) return detected_type except Exception as e: print(f"Error in auto-detecting paper type: {e}") return "Technical Paper" # Default fallback def update_all_paper_types(detected_type, *paper_type_boxes): """Update all paper type dropdowns with the detected type.""" # Extract the actual type from the status text if isinstance(detected_type, str) and "**Paper Type:**" in detected_type: # Extract type from status text like "**Paper Type:** Technical Paper (auto-detected)" type_part = detected_type.split("**Paper Type:**")[1].split("(")[0].strip() else: type_part = detected_type return [type_part] * len(paper_type_boxes) def update_paper_type_status(detected_type): """Update the paper type status display.""" return f"**Paper Type:** {detected_type} (auto-detected)" def get_player_components(name, visible): with gr.Row(): with gr.Column(): role_name = gr.Textbox( lines=1, show_label=False, interactive=True, visible=False, value=name, ) # is benign, is_knowledgeable, is_responsible, # player_config = gr.CheckboxGroup( # choices=["Benign", "Knowledgeable", "Responsible"], # label="Reviewer Type", # visible=visible, # ) gr.Markdown(f"### 👨‍💼 {name} Settings") with gr.Row(): Expertise_config = gr.Dropdown( choices=["Machine Learning", "Natural Language Processing", "Computer Vision", "Robotics", "Data Science", "Software Engineering", "Bioinformatics", "Computational Biology", "Genomics", "Proteomics", "Systems Biology", "Synthetic Biology", "Biomedical Engineering", "Neuroscience", "Drug Discovery", "Medical Imaging", "General"], interactive=True, label = "🎯 Expertise Area", show_label=True, value="General", allow_custom_value=True ) paper_type_box = gr.Dropdown( choices=["Technical Paper", "Survey Paper", "Application Paper", "Dataset Paper", "Tool Paper"], interactive=True, label="📝 Paper Type", show_label=True, value="Technical Paper", ) role_desc = gr.Textbox( lines=8, max_lines=8, show_label=True, label="📋 Reviewer Description", interactive=True, visible=visible, autoscroll=False, value=get_reviewer_description() ) def update_role_desc(Expertise_config, paper_type): # Always use reviewer_write_reviews phase since we're only doing Phase I phase = 'reviewer_write_reviews' return get_reviewer_description(expertise=Expertise_config, paper_type=paper_type, phase=phase) Expertise_config.select(fn=update_role_desc, inputs=[Expertise_config, paper_type_box], outputs=[role_desc]) paper_type_box.select(fn=update_role_desc, inputs=[Expertise_config, paper_type_box], outputs=[role_desc]) with gr.Column(): with gr.Accordion( f"⚙️ {name} Configuration", open=True, visible=visible ) as accordion: temperature = gr.Slider( minimum=0., maximum=2.0, step=0.1, interactive=True, visible=visible, label="🌡️ Temperature (Creativity)", value=1.0, ) max_tokens = gr.Slider( minimum=10, maximum=600, step=10, interactive=True, visible=visible, label="📏 Max Tokens", value=600, ) return [role_name, Expertise_config, paper_type_box, accordion, temperature, max_tokens] def get_empty_state(): return gr.State({"arena": None}) with (gr.Blocks(css=css) as demo): state = get_empty_state() all_components = [] with gr.Column(elem_id="col-container"): gr.Markdown( """ # 🤖 AI Paper Review System ## Intelligent Manuscript Evaluation with Multi-Agent Reviewers **Transform your paper review process with AI-powered reviewers that provide comprehensive feedback.** --- """, elem_id="header", ) with gr.Row(): with gr.Column(elem_id="col-chatbox"): gr.Markdown("### 💬 Review Progress") player_chatbots = [] for i in range(MAX_NUM_PLAYERS): player_name = f"Reviewer {i + 1}" with gr.Tab(f"👨‍💼 {player_name}", visible=(i < DEFAULT_NUM_PLAYERS)): player_chatbot = gr.Chatbot( elem_id=f"chatbox-{i}", visible=i < DEFAULT_NUM_PLAYERS, label=player_name, show_label=False, height=600 # FIXME: this parameter is not working ) player_chatbots.append(player_chatbot) all_components += [*player_chatbots] with gr.Column(elem_id="col-config"): gr.Markdown("### ⚙️ Configuration Panel") # gr.Markdown("Player Configuration") # parallel_checkbox = gr.Checkbox( # label="Parallel Actions", value=False, visible=True # ) all_players_components, players_idx2comp = [], {} with gr.Blocks(): for i in range(MAX_NUM_PLAYERS): player_name = f"Reviewer {i + 1}" with gr.Tab( f"👨‍💼 {player_name}", visible=(i < DEFAULT_NUM_PLAYERS) ) as tab: player_comps = get_player_components( player_name, visible=(i < DEFAULT_NUM_PLAYERS) ) players_idx2comp[i] = player_comps + [tab] all_players_components += player_comps + [tab] all_components += all_players_components gr.Markdown("### 📄 Upload Your Paper") upload_file_box = gr.File( visible=True, height=120, label="Choose PDF file", file_types=[".pdf"], file_count="single" ) # Add auto-detect button with gr.Row(): auto_detect_btn = gr.Button("🔍 Auto-detect Paper Type", variant="secondary", size="sm") paper_type_status = gr.Markdown("**Paper Type:** Technical Paper (default)") with gr.Row(): btn_step = gr.Button("🚀 Start Review", variant="primary", size="lg") btn_restart = gr.Button("🔄 Reset", variant="secondary", size="lg") all_components += [upload_file_box, auto_detect_btn, paper_type_status, btn_step, btn_restart] def _convert_to_chatbot_output(all_messages, display_recv=False): chatbot_output = [] try: for i, message in enumerate(all_messages): # Handle case where message might be None or invalid if not hasattr(message, 'agent_name') or not hasattr(message, 'content'): print(f"Warning: Invalid message at index {i}: {message}") continue agent_name, msg, recv = ( message.agent_name, message.content, str(message.visible_to), ) # Handle empty or None content if not msg or msg.strip() == "": print(f"Warning: Empty message from {agent_name}") continue new_msg = re.sub( r"\n+", "
", msg.strip() ) # Preprocess message for chatbot output if display_recv: new_msg = f"**{agent_name} (-> {recv})**: {new_msg}" # Add role to the message else: new_msg = f"**{agent_name}**: {new_msg}" # Convert to Gradio Chatbot format: list of [user_message, bot_message] tuples if agent_name == "Moderator": chatbot_output.append([new_msg, None]) else: chatbot_output.append([None, new_msg]) except Exception as e: print(f"Error in _convert_to_chatbot_output: {e}") print(f"all_messages type: {type(all_messages)}") print(f"all_messages content: {all_messages}") # Return empty list as fallback return [] return chatbot_output def _create_arena_config_from_components(all_comps: dict): env_desc = const.GLOBAL_PROMPT paper_pdf_path = all_comps[upload_file_box] # Step 1: Initialize the players num_players = MAX_NUM_PLAYERS # You can ignore these fields for the demo conference = "EMNLP2024" paper_decision = "Accept" data_dir = '' paper_id = "12345" args = Namespace(openai_client_type="openai", experiment_name="test", max_num_words=16384) # Phase I: Only reviewers are active players = [] # 不能直接获取role_desc,需要根据Intention_config, Knowledge_config, Responsibility_config生成一个配置 # self.environment.experiment_setting["players"]['Reviewer'][reviewer_index - 1] experiment_setting = { "paper_id": paper_id, "paper_decision": paper_decision, "players": { # Paper Extractor is a special player that extracts a paper from the dataset. # Its constructor does not take any arguments. "Paper Extractor": [{}], # Reviewer settings are generated based on reviewer types provided in the settings. "Reviewer": [], }, # "global_settings": setting['global_settings'] } for i in range(num_players): role_name = role_desc = temperature = max_tokens = None # All players are reviewers in Phase I only role_name, expertise_config, paper_type, temperature, max_tokens = ( all_comps[c] for c in players_idx2comp[i] if not isinstance(c, (gr.Accordion, gr.Tab)) ) # Use default backend backend_type = DEFAULT_BACKEND # Use default values for other attributes since we removed them from UI experiment_setting["players"]['Reviewer'].append({"is_benign": None, "is_knowledgeable": None, "is_responsible": None, "knows_authors": 'unfamous'}) role_desc = get_reviewer_description(expertise=expertise_config, paper_type=paper_type) # common config for all players player_config = { "name": role_name, "role_desc": role_desc, "global_prompt": env_desc, "backend": { "backend_type": backend_type, "temperature": temperature, "max_tokens": max_tokens, }, } player_config = AgentConfig(**player_config) # All players are reviewers in Phase I only player = Reviewer(data_dir=data_dir, conference=conference, args=args, **player_config) players.append(player) # Phase I: Create paper extractor for reviewer assessment # Add paper extractor paper_extractor_config = get_paper_extractor_config(max_tokens=2048) paper_extractor = PaperExtractorPlayer(paper_pdf_path=paper_pdf_path, data_dir=data_dir, paper_id=paper_id, paper_decision=paper_decision, args=args, conference=conference, **paper_extractor_config) players.append(paper_extractor) player_names = [player.name for player in players] # Step 2: Initialize the environment env = PaperReview(player_names=player_names, paper_decision=paper_decision, paper_id=paper_id, args=args, experiment_setting=experiment_setting) # Step 3: Initialize the Arena arena = PaperReviewArena(players=players, environment=env, args=args, global_prompt=env_desc) return arena def step_game(all_comps: dict): global CURRENT_STEP_INDEX yield { btn_step: gr.update(value="Running...", interactive=False), btn_restart: gr.update(interactive=False), } cur_state = all_comps[state] # If arena is not yet created, create it if cur_state["arena"] is None: # Create the Arena arena = _create_arena_config_from_components(all_comps) cur_state["arena"] = arena else: arena = cur_state["arena"] # TODO: 连续运行 timestep = arena.step() CURRENT_STEP_INDEX = int(arena.environment.phase_index) # 更新前端信息 if timestep: try: all_messages = timestep.observation # Initialize update dictionary update_dict = { btn_step: gr.update( value="Next Step", interactive=not timestep.terminal ), btn_restart: gr.update(interactive=True), state: cur_state, } except Exception as e: print(f"Error updating main chatbot: {e}") # Initialize update dictionary with empty chatbot output update_dict = { btn_step: gr.update( value="Next Step", interactive=not timestep.terminal ), btn_restart: gr.update(interactive=True), state: cur_state, } # Define a mapping of player names to their respective chatbots player_name_to_chatbot = { "Reviewer 1": player_chatbots[0], "Reviewer 2": player_chatbots[1], "Reviewer 3": player_chatbots[2] } # Update each player's chatbot output for player in arena.players: player_name = player.name if player_name in player_name_to_chatbot: try: player_messages = arena.environment.get_messages_from_player(player_name) # player_messages[0].content = 'Paper content has been extracted.' player_output = _convert_to_chatbot_output(player_messages) update_dict[player_name_to_chatbot[player_name]] = player_output except Exception as e: print(f"Error updating player {player_name} chatbot: {e}") # Set empty output as fallback update_dict[player_name_to_chatbot[player_name]] = [] # # Reviewer 1, 2, 3 Area Chair, Paper Extractor, Author # for i, player in enumerate(arena.players): # player_name = player.name # # Get the messages for the current player # player_messages = arena.environment.get_observation(player_name) # player_messages[0].content = 'Paper content has been extracted.' # # # Convert messages to chatbot output # player_output = _convert_to_chatbot_output(player_messages) """ # Phase I only: All players are reviewers if 'Reviewer' in player.name: player_messages = arena.environment.get_observation(player.name) # 不要显示第一条长段的信息,只显示 文章内容已被抽取 player_messages[0].content = 'Paper content has been extracted.' player_output = _convert_to_chatbot_output(player_messages) # Update the player's chatbot output update_dict[player_chatbots[i]] = player_output """ # Ahren: Auto run # if not timestep.terminal: # yield from step_game(all_comps) yield update_dict def restart_game(all_comps: dict): global CURRENT_STEP_INDEX CURRENT_STEP_INDEX = 0 cur_state = all_comps[state] cur_state["arena"] = None yield { btn_restart: gr.update(interactive=False), btn_step: gr.update(interactive=False), state: cur_state, } # arena_config = _create_arena_config_from_components(all_comps) # arena = Arena.from_config(arena_config) # log_arena(arena, database=DB) # cur_state["arena"] = arena yield { btn_step: gr.update(value="Start", interactive=True), btn_restart: gr.update(interactive=True), upload_file_box: gr.update(value=None), state: cur_state, } # Remove Accordion and Tab from the list of components all_components = [ comp for comp in all_components if not isinstance(comp, (gr.Accordion, gr.Tab)) ] # update component # If any of the Textbox, Slider, Checkbox, Dropdown, RadioButtons is changed, the Step button is disabled for comp in all_components: def _disable_step_button(state): if state["arena"] is not None: return gr.update(interactive=False) else: return gr.update() if ( isinstance( comp, (gr.Textbox, gr.Slider, gr.Checkbox, gr.Dropdown, gr.Radio) ) and comp is not upload_file_box ): comp.change(_disable_step_button, state, btn_step) # Add paper type change event to update all reviewer descriptions def update_all_reviewer_descriptions(paper_type, *reviewer_comps): # This function will be called when paper type changes # For now, we'll just disable the step button return gr.update(interactive=False) # paper_type_box.change( # fn=update_all_reviewer_descriptions, # inputs=[paper_type_box] + [comp for comp in all_components if "Expertise_config" in str(comp)], # outputs=[btn_step] # ) # Auto-detect paper type functionality # Get all paper type dropdowns from all reviewers all_paper_type_boxes = [] for i in range(MAX_NUM_PLAYERS): if i < DEFAULT_NUM_PLAYERS: # Get the paper_type_box from each reviewer's components paper_type_box = players_idx2comp[i][2] # Index 2 is paper_type_box all_paper_type_boxes.append(paper_type_box) # Set up auto-detect button event def auto_detect_and_update(pdf_file): detected_type = auto_detect_paper_type(pdf_file) status_text = update_paper_type_status(detected_type) # Return individual values for each dropdown and the status return [detected_type] * len(all_paper_type_boxes) + [status_text] # Auto-detect when file is uploaded upload_file_box.change( fn=auto_detect_and_update, inputs=[upload_file_box], outputs=all_paper_type_boxes + [paper_type_status] ) # Manual auto-detect button auto_detect_btn.click( fn=auto_detect_and_update, inputs=[upload_file_box], outputs=all_paper_type_boxes + [paper_type_status] ) # Ahren: Auto run btn_step.click( step_game, set(all_components + [state]), [*player_chatbots, btn_step, btn_restart, state, upload_file_box], ) btn_restart.click( restart_game, set(all_components + [state]), [*player_chatbots, btn_step, btn_restart, state, upload_file_box], ) demo.queue() demo.launch()