import gradio as gr import pandas as pd import os os.environ['GRADIO_TEMP_DIR'] = "cache" from PIL import Image # 数据和图像的路径 # file_idx = 1 # 获取当前文件的绝对路径 current_dir = os.path.dirname(os.path.abspath(__file__)) image_dir = os.path.join(current_dir, f"mixed_images_1000") # image|description|ins_idx|instruction|type|answer|ans_t|score data_path = os.path.join(current_dir, "data_1000", "score_answer_full_one_stop_{}.csv") progress_df = None df = None progress_file = None # 读取图像的函数 def load_image(image_name): global image_dir image_path = os.path.join(image_dir, image_name) return Image.open(image_path) def get_next_instance(username): # 检查用户进度 user_progress = progress_df[progress_df["username"] == username] # 获取下一个未评估的图像 completed_images = user_progress["image"].unique() if not user_progress.empty else [] remaining_images = df[~df["image"].isin(completed_images)]["image"].unique() if len(remaining_images) == 0: return None, None, "所有数据已评估完毕!" current_image = remaining_images[0] filtered_df = df[df['image'] == current_image] # 构建QA对字典 # "username", "image", "description", "ins_idx", "instruction", "type", "answer", "score_precise", "score_normal", "score_creative" qa_pairs = [] for idx in range(18): instance = filtered_df[filtered_df['ins_idx'] == idx] question = instance['instruction'].values[0] precise_answer = instance[instance['ans_t'] == 'precise']['answer'].values[0] normal_answer = instance[instance['ans_t'] == 'normal']['answer'].values[0] creative_answer = instance[instance['ans_t'] == 'creative']['answer'].values[0] qa_pairs.append({ "image": current_image, "instruction": question, "ins_idx": idx, "type": instance['type'].values[0], "precise_answer": precise_answer, "normal_answer": normal_answer, "creative_answer": creative_answer, "score_precise": None, # 让用户填写 "score_normal": None, # 让用户填写 "score_creative": None # 让用户填写 }) return load_image(current_image), qa_pairs def display_content(name): # global current_image_index # current_image_index = 0 # 重置为第一个图像 name, file_idx = name.split("_") assert file_idx is not None, "请填写文件编号!" # 初始化进度文件,如果不存在则创建 global progress_df, progress_file, df, data_path data_path = data_path.format(file_idx) df = pd.read_csv(data_path, sep="|") progress_file = os.path.join(current_dir, "val_score", f"ser_progress_{file_idx}.csv") if not os.path.exists(progress_file): # image|description|ins_idx|instruction|type|answer|ans_t|score progress_df = pd.DataFrame(columns=["username", "image", "description", "ins_idx", "instruction", "type", "answer", "score_precise", "score_normal", "score_creative", "score_ins"]) progress_df.to_csv(progress_file, index=False) else: progress_df = pd.read_csv(progress_file) image, qa_pairs = get_next_instance(name) # content = image_question_list[current_image_index] # 用户名 + 进度条 user_progress = progress_df[progress_df["username"] == name] user_completed_images = user_progress["image"].unique() if not user_progress.empty else [] completed_images = progress_df["image"].unique() if not progress_df.empty else [] remaining_images_num = len(df['image'].unique()) - len(completed_images) custom_text = f"欢迎, {name}!请选择以下问题的答案:(进度:完成{len(user_completed_images)}//剩余{remaining_images_num}//总{len(df['image'].unique())})" # 初始化所有问题的标签和选项 question_labels = [f"{i}. {qa_pairs[i]['type']}{qa_pairs[i]['instruction']}" for i in range(18)] question_updates = [] for i in range(18): if i == 14 or i == 17: question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["2:保留但需重写", "3:保留"], value=None)) else: question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) return [custom_text, gr.update(visible=False), gr.update(visible=False), image, gr.update(visible=True)] + question_labels + question_updates def next_image(name, *selected_options): name, file_idx = name.split("_") # 获取当前图像和相关QA对 _, qa_pairs = get_next_instance(name) # "username", "image", "description", "ins_idx", "instruction", "type", # "answers", "score_precise", "score_normal", "score_creative" def convert_score(ori_text): if ori_text == "1:丢弃": return 1 elif ori_text == "2:保留但需重写": return 2 elif ori_text == "3:保留": return 3 else: return 0 # 保存用户评分数据 global progress_df, progress_file new_rows = [] for i in range(18): # 保存评分结果 new_rows.append({ "username": name, "image": qa_pairs[i]["image"], "description": "待完成", "ins_idx": qa_pairs[i]["ins_idx"], "instruction": qa_pairs[i]["instruction"], "type": qa_pairs[i]["type"], "answers": "待完成", "score_precise": convert_score(selected_options[i*4]), "score_normal": convert_score(selected_options[i*4+1]), "score_creative": convert_score(selected_options[i*4+2]), "score_ins": convert_score(selected_options[i*4+3]) }) progress_df = pd.concat([progress_df, pd.DataFrame(new_rows)], ignore_index=True) # 如果处理完所有图像,保存结果 print("正在保存数据...") progress_df.to_csv(progress_file, index=False) print(f"数据已保存到 {progress_file}") # 切换到下一张图像 image, qa_pairs = get_next_instance(name) if image is None: return [None, "所有数据已评估完毕!"] + [None] * 90 # 用户名 + 进度条 user_progress = progress_df[progress_df["username"] == name] user_completed_images = user_progress["image"].unique() if not user_progress.empty else [] completed_images = progress_df["image"].unique() if not progress_df.empty else [] remaining_images_num = len(df['image'].unique()) - len(completed_images) custom_text = f"欢迎, {name}!请选择以下问题的答案:(进度:完成{len(user_completed_images)}//剩余{remaining_images_num}//总{len(df['image'].unique())}) 评分数据:{progress_file})" question_labels = [f"{i}. {qa_pairs[i]['type']}{qa_pairs[i]['instruction']}" for i in range(18)] question_updates = [] for i in range(18): if i == 14 or i == 17: question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["2:保留但需重写", "3:保留"], value=None)) else: question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) return [image, custom_text] + question_labels + question_updates def validate_input(*selected_options): # 展示选择的答案 # [a]: (0)-1, (1)-1, (2)-2,...., (6)-1 # [b]: (7)-1, (8)-1, (9)-2,...., (13)-1 # [c]: (14)-1 # [d]: (15)-1, (16)-1, (17)-1 ans_a = [f"({i})-{selected_options[i*4+3]}" for i in range(7)] ans_b = [f"({i})-{selected_options[i*4+3]}" for i in range(7, 14)] ans_c = [f"({i})-{selected_options[i*4+3]}" for i in range(14, 15)] ans_d = [f"({i})-{selected_options[i*4+3]}" for i in range(15, 18)] msg_ans = f""" 答案: [a]: {' '.join(ans_a)} [b]: {' '.join(ans_b)} [c]: {' '.join(ans_c)} [d]: {' '.join(ans_d)}""" # 验证用户输入的答案是否完整 for i in range(18): # if None in selected_options[i*4:i*4+4]: # return False, f"请填写所有问题的答案!第{i}个问题未填写!" if selected_options[i*4+3] is None: return False, f"请填写所有问题的答案!第{i}个问题未填写!\n{msg_ans}" a_num, b_num = 0, 0 for i in range(18): if i < 7 and selected_options[i*4+3] != "1:丢弃": a_num += 1 if 7 <= i < 14 and selected_options[i*4+3] != "1:丢弃": b_num += 1 if a_num != 2 or b_num != 2: return False, f"请保留两个[a]类问题和两个[b]类问题!([a]:{a_num}, [b]:{b_num})\n{msg_ans}" if (selected_options[15*4+3] == "1:丢弃" and selected_options[16*4+3] == "1:丢弃") or \ (selected_options[15*4+3] != "1:丢弃" and selected_options[16*4+3] != "1:丢弃"): return False, f"15,16问题中保留一个!\n{msg_ans}" return True, msg_ans # 交互函数,用于更新按钮状态和错误消息 def update_button_and_error_message(*selected_options): is_valid, error_message = validate_input(*selected_options) # return gr.update(visible=not is_valid), error_message return gr.update(interactive=is_valid), gr.update(value=error_message, visible=not is_valid) # 创建Gradio界面 with gr.Blocks() as demo: output_text = gr.Markdown() name_input = gr.Textbox(label="请输入你的名字,严格按照格式'姓名_文件编号'填写,如‘ysm_1’,目前只评估0和9", placeholder="姓名_文件编号,如‘ysm_1’") submit_button = gr.Button("确定") output_image = gr.Image() # 动态生成18个问题,每个问题包含4个子问题 option_radios = [] question_labels = [] for i in range(18): question_label = gr.Markdown(f"**问题**") question_labels.append(question_label) with gr.Column() as col: for j in range(4): radio = gr.Radio(choices=[], label=f"答案评分") option_radios.append(radio) next_button = gr.Button("下一张图像", visible=False) error_message = gr.Markdown(visible=False) # 提交按钮状态和错误消息绑定到验证函数 option_radios_values = [radio for radio in option_radios] for radio in option_radios_values: radio.change(update_button_and_error_message, inputs=option_radios_values, outputs=[next_button, error_message]) # 显示内容初始化,包括文本、图像和18个问题 submit_button.click(display_content, inputs=name_input, outputs=[output_text, name_input, submit_button, output_image, next_button] + question_labels + option_radios) # 下一张图像按钮,切换图像和问题 next_button.click(next_image, inputs=[name_input] + option_radios, outputs=[output_image, output_text] + question_labels + option_radios) demo.launch(share=True, debug=True)