Spaces:
Sleeping
Sleeping
| import csv | |
| import itertools | |
| import os | |
| from datetime import datetime | |
| from typing import Dict, List, Tuple | |
| import gradio as gr | |
| BASE_DIR = os.path.dirname(__file__) | |
| TASK_CONFIG = { | |
| "Scene Composition & Object Insertion": { | |
| "folder": "scene_composition_and_object_insertion", | |
| "score_fields": [ | |
| ("physical_interaction_fidelity_score", "物理交互保真度 (Physical Interaction Fidelity)"), | |
| ("optical_effect_accuracy_score", "光学效应准确度 (Optical Effect Accuracy)"), | |
| ("semantic_functional_alignment_score", "语义/功能对齐度 (Semantic/Functional Alignment)"), | |
| ("overall_photorealism_score", "整体真实感 (Overall Photorealism)"), | |
| ], | |
| }, | |
| } | |
| def _csv_path_for_task(task_name: str, filename: str) -> str: | |
| folder = TASK_CONFIG[task_name]["folder"] | |
| return os.path.join(BASE_DIR, folder, filename) | |
| def _resolve_image_path(path: str) -> str: | |
| return path if os.path.isabs(path) else os.path.join(BASE_DIR, path) | |
| def _load_task_rows(task_name: str) -> List[Dict[str, str]]: | |
| csv_path = _csv_path_for_task(task_name, "results.csv") | |
| if not os.path.exists(csv_path): | |
| raise FileNotFoundError(f"未找到任务 {task_name} 的结果文件: {csv_path}") | |
| with open(csv_path, newline="", encoding="utf-8") as csv_file: | |
| reader = csv.DictReader(csv_file) | |
| return [row for row in reader] | |
| def _build_image_pairs(rows: List[Dict[str, str]], task_name: str) -> List[Dict[str, str]]: | |
| grouped: Dict[Tuple[str, str], List[Dict[str, str]]] = {} | |
| for row in rows: | |
| key = (row["test_id"], row["org_img"]) | |
| grouped.setdefault(key, []).append(row) | |
| pairs: List[Dict[str, str]] = [] | |
| folder = TASK_CONFIG[task_name]["folder"] | |
| for (test_id, org_img), entries in grouped.items(): | |
| for model_a, model_b in itertools.combinations(entries, 2): | |
| if model_a["model_name"] == model_b["model_name"]: | |
| continue | |
| pair = { | |
| "test_id": test_id, | |
| "org_img": os.path.join(folder, org_img), | |
| "model1_name": model_a["model_name"], | |
| "model1_res": model_a["res"], | |
| "model1_path": os.path.join(folder, model_a["path"]), | |
| "model2_name": model_b["model_name"], | |
| "model2_res": model_b["res"], | |
| "model2_path": os.path.join(folder, model_b["path"]), | |
| } | |
| pairs.append(pair) | |
| def sort_key(item: Dict[str, str]): | |
| test_id = item["test_id"] | |
| try: | |
| test_id_key = int(test_id) | |
| except ValueError: | |
| test_id_key = test_id | |
| return (test_id_key, item["model1_name"], item["model2_name"]) | |
| pairs.sort(key=sort_key) | |
| return pairs | |
| def load_task(task_name: str): | |
| if not task_name: | |
| raise gr.Error("请先选择任务。") | |
| rows = _load_task_rows(task_name) | |
| pairs = _build_image_pairs(rows, task_name) | |
| if not pairs: | |
| raise gr.Error("没有找到可评测的图片对,请检查数据文件。") | |
| return pairs | |
| def _format_pair_header(pair: Dict[str, str]) -> str: | |
| return ( | |
| f"**Test ID:** {pair['test_id']} \n" | |
| f"**Model A:** {pair['model1_name']} ({pair['model1_res']}) \n" | |
| f"**Model B:** {pair['model2_name']} ({pair['model2_res']})" | |
| ) | |
| def _append_evaluation(task_name: str, pair: Dict[str, str], scores: Dict[str, int]) -> None: | |
| csv_path = _csv_path_for_task(task_name, "evaluation_results.csv") | |
| os.makedirs(os.path.dirname(csv_path), exist_ok=True) | |
| csv_exists = os.path.exists(csv_path) | |
| fieldnames = [ | |
| "eval_date", | |
| "test_id", | |
| "model1_name", | |
| "model2_name", | |
| "org_img", | |
| "model1_res", | |
| "model2_res", | |
| "model1_path", | |
| "model2_path", | |
| "physical_interaction_fidelity_score", | |
| "optical_effect_accuracy_score", | |
| "semantic_functional_alignment_score", | |
| "overall_photorealism_score", | |
| ] | |
| with open(csv_path, "a", newline="", encoding="utf-8") as csv_file: | |
| writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
| if not csv_exists: | |
| writer.writeheader() | |
| row = { | |
| "eval_date": datetime.utcnow().isoformat(), | |
| "test_id": pair["test_id"], | |
| "model1_name": pair["model1_name"], | |
| "model2_name": pair["model2_name"], | |
| "org_img": pair["org_img"], | |
| "model1_res": pair["model1_res"], | |
| "model2_res": pair["model2_res"], | |
| "model1_path": pair["model1_path"], | |
| "model2_path": pair["model2_path"], | |
| } | |
| row.update(scores) | |
| writer.writerow(row) | |
| def on_task_change(task_name: str, _state_pairs: List[Dict[str, str]]): | |
| pairs = load_task(task_name) | |
| pair = pairs[0] | |
| header = _format_pair_header(pair) | |
| default_scores = [3, 3, 3, 3] | |
| return ( | |
| pairs, | |
| gr.update(value=0, minimum=0, maximum=len(pairs) - 1, visible=(len(pairs) > 1)), | |
| gr.update(value=header), | |
| _resolve_image_path(pair["org_img"]), | |
| _resolve_image_path(pair["model1_path"]), | |
| _resolve_image_path(pair["model2_path"]), | |
| *default_scores, | |
| gr.update(value=f"共 {len(pairs)} 个待评测的图片对。"), | |
| ) | |
| def on_pair_navigate(index: int, pairs: List[Dict[str, str]]): | |
| if not pairs: | |
| raise gr.Error("请先选择任务。") | |
| index = int(index) | |
| index = max(0, min(index, len(pairs) - 1)) | |
| pair = pairs[index] | |
| header = _format_pair_header(pair) | |
| return ( | |
| gr.update(value=index), | |
| gr.update(value=header), | |
| _resolve_image_path(pair["org_img"]), | |
| _resolve_image_path(pair["model1_path"]), | |
| _resolve_image_path(pair["model2_path"]), | |
| 3, | |
| 3, | |
| 3, | |
| 3, | |
| ) | |
| def on_submit( | |
| task_name: str, | |
| index: int, | |
| pairs: List[Dict[str, str]], | |
| physical_score: int, | |
| optical_score: int, | |
| semantic_score: int, | |
| overall_score: int, | |
| ): | |
| if not task_name: | |
| raise gr.Error("请先选择任务。") | |
| if not pairs: | |
| raise gr.Error("当前任务没有加载任何图片对。") | |
| pair = pairs[index] | |
| score_map = { | |
| "physical_interaction_fidelity_score": int(physical_score), | |
| "optical_effect_accuracy_score": int(optical_score), | |
| "semantic_functional_alignment_score": int(semantic_score), | |
| "overall_photorealism_score": int(overall_score), | |
| } | |
| _append_evaluation(task_name, pair, score_map) | |
| next_index = min(index + 1, len(pairs) - 1) | |
| info = f"已保存 Test ID {pair['test_id']} 的评价结果。" | |
| if next_index != index: | |
| pair = pairs[next_index] | |
| header = _format_pair_header(pair) | |
| return ( | |
| gr.update(value=next_index), | |
| gr.update(value=header), | |
| _resolve_image_path(pair["org_img"]), | |
| _resolve_image_path(pair["model1_path"]), | |
| _resolve_image_path(pair["model2_path"]), | |
| 3, | |
| 3, | |
| 3, | |
| 3, | |
| gr.update(value=info + f" 自动跳转到下一组({next_index + 1}/{len(pairs)})。"), | |
| ) | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| 3, | |
| 3, | |
| 3, | |
| 3, | |
| gr.update(value=info + " 已经是最后一组。"), | |
| ) | |
| with gr.Blocks(title="VisArena Human Evaluation") as demo: | |
| gr.Markdown( | |
| """ | |
| # VisArena Human Evaluation | |
| 请选择任务并对模型生成的图像进行评分。每项评分范围为 **1(效果极差)** 到 **5(效果极佳)**。 | |
| """ | |
| ) | |
| with gr.Row(): | |
| task_selector = gr.Dropdown( | |
| label="Task", | |
| choices=list(TASK_CONFIG.keys()), | |
| interactive=True, | |
| value="Scene Composition & Object Insertion", | |
| ) | |
| index_slider = gr.Slider( | |
| label="Pair Index", | |
| value=0, | |
| minimum=0, | |
| maximum=0, | |
| step=1, | |
| interactive=True, | |
| visible=False, | |
| ) | |
| pair_state = gr.State([]) | |
| pair_header = gr.Markdown("") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| orig_image = gr.Image(type="filepath", label="原图 Original", interactive=False) | |
| with gr.Column(scale=1): | |
| model1_image = gr.Image(type="filepath", label="模型 A 输出", interactive=False) | |
| with gr.Column(scale=1): | |
| model2_image = gr.Image(type="filepath", label="模型 B 输出", interactive=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| physical_input = gr.Slider(1, 5, value=3, step=1, label="物理交互保真度 (Physical Interaction Fidelity)") | |
| optical_input = gr.Slider(1, 5, value=3, step=1, label="光学效应准确度 (Optical Effect Accuracy)") | |
| with gr.Column(): | |
| semantic_input = gr.Slider(1, 5, value=3, step=1, label="语义/功能对齐度 (Semantic/Functional Alignment)") | |
| overall_input = gr.Slider(1, 5, value=3, step=1, label="整体真实感 (Overall Photorealism)") | |
| submit_button = gr.Button("Submit Evaluation", variant="primary") | |
| feedback_box = gr.Markdown("") | |
| # Event bindings | |
| task_selector.change( | |
| fn=on_task_change, | |
| inputs=[task_selector, pair_state], | |
| outputs=[ | |
| pair_state, | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| physical_input, | |
| optical_input, | |
| semantic_input, | |
| overall_input, | |
| feedback_box, | |
| ], | |
| ) | |
| index_slider.release( | |
| fn=on_pair_navigate, | |
| inputs=[index_slider, pair_state], | |
| outputs=[ | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| physical_input, | |
| optical_input, | |
| semantic_input, | |
| overall_input, | |
| ], | |
| ) | |
| submit_button.click( | |
| fn=on_submit, | |
| inputs=[ | |
| task_selector, | |
| index_slider, | |
| pair_state, | |
| physical_input, | |
| optical_input, | |
| semantic_input, | |
| overall_input, | |
| ], | |
| outputs=[ | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| physical_input, | |
| optical_input, | |
| semantic_input, | |
| overall_input, | |
| feedback_box, | |
| ], | |
| ) | |
| # Auto-load default task on startup | |
| demo.load( | |
| fn=on_task_change, | |
| inputs=[task_selector, pair_state], | |
| outputs=[ | |
| pair_state, | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| physical_input, | |
| optical_input, | |
| semantic_input, | |
| overall_input, | |
| feedback_box, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |