Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| os.environ['GRADIO_TEMP_DIR'] = "cache" | |
| from PIL import Image | |
| # 数据和图像的路径 | |
| # file_idx = 1 | |
| # 获取当前文件的绝对路径 | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| image_dir = os.path.join(current_dir, f"mixed_images_1000") | |
| # image|description|ins_idx|instruction|type|answer|ans_t|score | |
| data_path = os.path.join(current_dir, "data_1000", "score_answer_full_one_stop_{}.csv") | |
| progress_df = None | |
| df = None | |
| progress_file = None | |
| # 读取图像的函数 | |
| def load_image(image_name): | |
| global image_dir | |
| image_path = os.path.join(image_dir, image_name) | |
| return Image.open(image_path) | |
| def get_next_instance(username): | |
| # 检查用户进度 | |
| user_progress = progress_df[progress_df["username"] == username] | |
| # 获取下一个未评估的图像 | |
| completed_images = user_progress["image"].unique() if not user_progress.empty else [] | |
| remaining_images = df[~df["image"].isin(completed_images)]["image"].unique() | |
| if len(remaining_images) == 0: | |
| return None, None, "所有数据已评估完毕!" | |
| current_image = remaining_images[0] | |
| filtered_df = df[df['image'] == current_image] | |
| # 构建QA对字典 | |
| # "username", "image", "description", "ins_idx", "instruction", "type", "answer", "score_precise", "score_normal", "score_creative" | |
| qa_pairs = [] | |
| for idx in range(18): | |
| instance = filtered_df[filtered_df['ins_idx'] == idx] | |
| question = instance['instruction'].values[0] | |
| precise_answer = instance[instance['ans_t'] == 'precise']['answer'].values[0] | |
| normal_answer = instance[instance['ans_t'] == 'normal']['answer'].values[0] | |
| creative_answer = instance[instance['ans_t'] == 'creative']['answer'].values[0] | |
| qa_pairs.append({ | |
| "image": current_image, | |
| "instruction": question, | |
| "ins_idx": idx, | |
| "type": instance['type'].values[0], | |
| "precise_answer": precise_answer, | |
| "normal_answer": normal_answer, | |
| "creative_answer": creative_answer, | |
| "score_precise": None, # 让用户填写 | |
| "score_normal": None, # 让用户填写 | |
| "score_creative": None # 让用户填写 | |
| }) | |
| return load_image(current_image), qa_pairs | |
| def display_content(name): | |
| # global current_image_index | |
| # current_image_index = 0 # 重置为第一个图像 | |
| name, file_idx = name.split("_") | |
| assert file_idx is not None, "请填写文件编号!" | |
| # 初始化进度文件,如果不存在则创建 | |
| global progress_df, progress_file, df, data_path | |
| data_path = data_path.format(file_idx) | |
| df = pd.read_csv(data_path, sep="|") | |
| progress_file = os.path.join(current_dir, "val_score", f"ser_progress_{file_idx}.csv") | |
| if not os.path.exists(progress_file): | |
| # image|description|ins_idx|instruction|type|answer|ans_t|score | |
| progress_df = pd.DataFrame(columns=["username", "image", "description", "ins_idx", "instruction", "type", "answer", "score_precise", "score_normal", "score_creative", "score_ins"]) | |
| progress_df.to_csv(progress_file, index=False) | |
| else: | |
| progress_df = pd.read_csv(progress_file) | |
| image, qa_pairs = get_next_instance(name) | |
| # content = image_question_list[current_image_index] | |
| # 用户名 + 进度条 | |
| user_progress = progress_df[progress_df["username"] == name] | |
| user_completed_images = user_progress["image"].unique() if not user_progress.empty else [] | |
| completed_images = progress_df["image"].unique() if not progress_df.empty else [] | |
| remaining_images_num = len(df['image'].unique()) - len(completed_images) | |
| custom_text = f"欢迎, {name}!请选择以下问题的答案:(进度:完成{len(user_completed_images)}//剩余{remaining_images_num}//总{len(df['image'].unique())})" | |
| # 初始化所有问题的标签和选项 | |
| question_labels = [f"{i}. {qa_pairs[i]['type']}{qa_pairs[i]['instruction']}" for i in range(18)] | |
| question_updates = [] | |
| for i in range(18): | |
| if i == 14 or i == 17: | |
| question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| else: | |
| question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| return [custom_text, gr.update(visible=False), gr.update(visible=False), image, gr.update(visible=True)] + question_labels + question_updates | |
| def next_image(name, *selected_options): | |
| name, file_idx = name.split("_") | |
| # 获取当前图像和相关QA对 | |
| _, qa_pairs = get_next_instance(name) | |
| # "username", "image", "description", "ins_idx", "instruction", "type", | |
| # "answers", "score_precise", "score_normal", "score_creative" | |
| def convert_score(ori_text): | |
| if ori_text == "1:丢弃": | |
| return 1 | |
| elif ori_text == "2:保留但需重写": | |
| return 2 | |
| elif ori_text == "3:保留": | |
| return 3 | |
| else: | |
| return 0 | |
| # 保存用户评分数据 | |
| global progress_df, progress_file | |
| new_rows = [] | |
| for i in range(18): | |
| # 保存评分结果 | |
| new_rows.append({ | |
| "username": name, | |
| "image": qa_pairs[i]["image"], | |
| "description": "待完成", | |
| "ins_idx": qa_pairs[i]["ins_idx"], | |
| "instruction": qa_pairs[i]["instruction"], | |
| "type": qa_pairs[i]["type"], | |
| "answers": "待完成", | |
| "score_precise": convert_score(selected_options[i*4]), | |
| "score_normal": convert_score(selected_options[i*4+1]), | |
| "score_creative": convert_score(selected_options[i*4+2]), | |
| "score_ins": convert_score(selected_options[i*4+3]) | |
| }) | |
| progress_df = pd.concat([progress_df, pd.DataFrame(new_rows)], ignore_index=True) | |
| # 如果处理完所有图像,保存结果 | |
| print("正在保存数据...") | |
| progress_df.to_csv(progress_file, index=False) | |
| print(f"数据已保存到 {progress_file}") | |
| # 切换到下一张图像 | |
| image, qa_pairs = get_next_instance(name) | |
| if image is None: | |
| return [None, "所有数据已评估完毕!"] + [None] * 90 | |
| # 用户名 + 进度条 | |
| user_progress = progress_df[progress_df["username"] == name] | |
| user_completed_images = user_progress["image"].unique() if not user_progress.empty else [] | |
| completed_images = progress_df["image"].unique() if not progress_df.empty else [] | |
| remaining_images_num = len(df['image'].unique()) - len(completed_images) | |
| custom_text = f"欢迎, {name}!请选择以下问题的答案:(进度:完成{len(user_completed_images)}//剩余{remaining_images_num}//总{len(df['image'].unique())}) 评分数据:{progress_file})" | |
| question_labels = [f"{i}. {qa_pairs[i]['type']}{qa_pairs[i]['instruction']}" for i in range(18)] | |
| question_updates = [] | |
| for i in range(18): | |
| if i == 14 or i == 17: | |
| question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["2:保留但需重写", "3:保留"], value=None)) | |
| else: | |
| question_updates.append(gr.update(label=f"precise: {qa_pairs[i]['precise_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"normal: {qa_pairs[i]['normal_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label=f"creative: {qa_pairs[i]['creative_answer']}", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| question_updates.append(gr.update(label="===========❤该问题是否保留?===========", choices=["1:丢弃", "2:保留但需重写", "3:保留"], value=None)) | |
| return [image, custom_text] + question_labels + question_updates | |
| def validate_input(*selected_options): | |
| # 展示选择的答案 | |
| # [a]: (0)-1, (1)-1, (2)-2,...., (6)-1 | |
| # [b]: (7)-1, (8)-1, (9)-2,...., (13)-1 | |
| # [c]: (14)-1 | |
| # [d]: (15)-1, (16)-1, (17)-1 | |
| ans_a = [f"({i})-{selected_options[i*4+3]}" for i in range(7)] | |
| ans_b = [f"({i})-{selected_options[i*4+3]}" for i in range(7, 14)] | |
| ans_c = [f"({i})-{selected_options[i*4+3]}" for i in range(14, 15)] | |
| ans_d = [f"({i})-{selected_options[i*4+3]}" for i in range(15, 18)] | |
| msg_ans = f""" | |
| 答案: | |
| [a]: {' '.join(ans_a)} | |
| [b]: {' '.join(ans_b)} | |
| [c]: {' '.join(ans_c)} | |
| [d]: {' '.join(ans_d)}""" | |
| # 验证用户输入的答案是否完整 | |
| for i in range(18): | |
| # if None in selected_options[i*4:i*4+4]: | |
| # return False, f"请填写所有问题的答案!第{i}个问题未填写!" | |
| if selected_options[i*4+3] is None: | |
| return False, f"请填写所有问题的答案!第{i}个问题未填写!\n{msg_ans}" | |
| a_num, b_num = 0, 0 | |
| for i in range(18): | |
| if i < 7 and selected_options[i*4+3] != "1:丢弃": | |
| a_num += 1 | |
| if 7 <= i < 14 and selected_options[i*4+3] != "1:丢弃": | |
| b_num += 1 | |
| if a_num != 2 or b_num != 2: | |
| return False, f"请保留两个[a]类问题和两个[b]类问题!([a]:{a_num}, [b]:{b_num})\n{msg_ans}" | |
| if (selected_options[15*4+3] == "1:丢弃" and selected_options[16*4+3] == "1:丢弃") or \ | |
| (selected_options[15*4+3] != "1:丢弃" and selected_options[16*4+3] != "1:丢弃"): | |
| return False, f"15,16问题中保留一个!\n{msg_ans}" | |
| return True, msg_ans | |
| # 交互函数,用于更新按钮状态和错误消息 | |
| def update_button_and_error_message(*selected_options): | |
| is_valid, error_message = validate_input(*selected_options) | |
| # return gr.update(visible=not is_valid), error_message | |
| return gr.update(interactive=is_valid), gr.update(value=error_message, visible=not is_valid) | |
| # 创建Gradio界面 | |
| with gr.Blocks() as demo: | |
| output_text = gr.Markdown() | |
| name_input = gr.Textbox(label="请输入你的名字,严格按照格式'姓名_文件编号'填写,如‘ysm_1’,目前只评估0和9", placeholder="姓名_文件编号,如‘ysm_1’") | |
| submit_button = gr.Button("确定") | |
| output_image = gr.Image() | |
| # 动态生成18个问题,每个问题包含4个子问题 | |
| option_radios = [] | |
| question_labels = [] | |
| for i in range(18): | |
| question_label = gr.Markdown(f"**问题**") | |
| question_labels.append(question_label) | |
| with gr.Column() as col: | |
| for j in range(4): | |
| radio = gr.Radio(choices=[], label=f"答案评分") | |
| option_radios.append(radio) | |
| next_button = gr.Button("下一张图像", visible=False) | |
| error_message = gr.Markdown(visible=False) | |
| # 提交按钮状态和错误消息绑定到验证函数 | |
| option_radios_values = [radio for radio in option_radios] | |
| for radio in option_radios_values: | |
| radio.change(update_button_and_error_message, inputs=option_radios_values, outputs=[next_button, error_message]) | |
| # 显示内容初始化,包括文本、图像和18个问题 | |
| submit_button.click(display_content, inputs=name_input, outputs=[output_text, name_input, submit_button, output_image, next_button] + question_labels + option_radios) | |
| # 下一张图像按钮,切换图像和问题 | |
| next_button.click(next_image, inputs=[name_input] + option_radios, outputs=[output_image, output_text] + question_labels + option_radios) | |
| demo.launch(share=True, debug=True) | |