Spaces:
Sleeping
Sleeping
Peiran
Persist evals to /data CSV and upload per-submission JSONL to dataset repo (peiranli0930/VisEval); add UI feedback
43656b3
| import csv | |
| import itertools | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from io import BytesIO | |
| from typing import Dict, List, Tuple | |
| import gradio as gr | |
| try: | |
| from huggingface_hub import HfApi | |
| except Exception: # optional dependency at runtime | |
| HfApi = None # type: ignore | |
| BASE_DIR = os.path.dirname(__file__) | |
| # Persistent local storage inside HF Spaces | |
| PERSIST_DIR = os.environ.get("PERSIST_DIR", "/data") | |
| TASK_CONFIG = { | |
| "Scene Composition & Object Insertion": { | |
| "folder": "scene_composition_and_object_insertion", | |
| "score_fields": [ | |
| ("physical_interaction_fidelity_score", "物理交互保真度 (Physical Interaction Fidelity)"), | |
| ("optical_effect_accuracy_score", "光学效应准确度 (Optical Effect Accuracy)"), | |
| ("semantic_functional_alignment_score", "语义/功能对齐度 (Semantic/Functional Alignment)"), | |
| ("overall_photorealism_score", "整体真实感 (Overall Photorealism)"), | |
| ], | |
| }, | |
| } | |
| def _csv_path_for_task(task_name: str, filename: str) -> str: | |
| folder = TASK_CONFIG[task_name]["folder"] | |
| return os.path.join(BASE_DIR, folder, filename) | |
| def _resolve_image_path(path: str) -> str: | |
| return path if os.path.isabs(path) else os.path.join(BASE_DIR, path) | |
| def _load_task_rows(task_name: str) -> List[Dict[str, str]]: | |
| csv_path = _csv_path_for_task(task_name, "results.csv") | |
| if not os.path.exists(csv_path): | |
| raise FileNotFoundError(f"未找到任务 {task_name} 的结果文件: {csv_path}") | |
| with open(csv_path, newline="", encoding="utf-8") as csv_file: | |
| reader = csv.DictReader(csv_file) | |
| return [row for row in reader] | |
| def _build_image_pairs(rows: List[Dict[str, str]], task_name: str) -> List[Dict[str, str]]: | |
| grouped: Dict[Tuple[str, str], List[Dict[str, str]]] = {} | |
| for row in rows: | |
| key = (row["test_id"], row["org_img"]) | |
| grouped.setdefault(key, []).append(row) | |
| pairs: List[Dict[str, str]] = [] | |
| folder = TASK_CONFIG[task_name]["folder"] | |
| for (test_id, org_img), entries in grouped.items(): | |
| for model_a, model_b in itertools.combinations(entries, 2): | |
| if model_a["model_name"] == model_b["model_name"]: | |
| continue | |
| pair = { | |
| "test_id": test_id, | |
| "org_img": os.path.join(folder, org_img), | |
| "model1_name": model_a["model_name"], | |
| "model1_res": model_a["res"], | |
| "model1_path": os.path.join(folder, model_a["path"]), | |
| "model2_name": model_b["model_name"], | |
| "model2_res": model_b["res"], | |
| "model2_path": os.path.join(folder, model_b["path"]), | |
| } | |
| pairs.append(pair) | |
| def sort_key(item: Dict[str, str]): | |
| test_id = item["test_id"] | |
| try: | |
| test_id_key = int(test_id) | |
| except ValueError: | |
| test_id_key = test_id | |
| return (test_id_key, item["model1_name"], item["model2_name"]) | |
| pairs.sort(key=sort_key) | |
| return pairs | |
| def load_task(task_name: str): | |
| if not task_name: | |
| raise gr.Error("请先选择任务。") | |
| rows = _load_task_rows(task_name) | |
| pairs = _build_image_pairs(rows, task_name) | |
| if not pairs: | |
| raise gr.Error("没有找到可评测的图片对,请检查数据文件。") | |
| return pairs | |
| def _format_pair_header(_pair: Dict[str, str]) -> str: | |
| # Mask model identity in UI; keep header neutral | |
| return "" | |
| def _build_eval_row(pair: Dict[str, str], scores: Dict[str, int]) -> Dict[str, object]: | |
| row = { | |
| "eval_date": datetime.utcnow().isoformat(), | |
| "test_id": pair["test_id"], | |
| "model1_name": pair["model1_name"], | |
| "model2_name": pair["model2_name"], | |
| "org_img": pair["org_img"], | |
| "model1_res": pair["model1_res"], | |
| "model2_res": pair["model2_res"], | |
| "model1_path": pair["model1_path"], | |
| "model2_path": pair["model2_path"], | |
| } | |
| row.update(scores) | |
| return row | |
| def _local_persist_csv_path(task_name: str) -> str: | |
| folder = TASK_CONFIG[task_name]["folder"] | |
| return os.path.join(PERSIST_DIR, folder, "evaluation_results.csv") | |
| def _append_local_persist_csv(task_name: str, row: Dict[str, object]) -> bool: | |
| csv_path = _local_persist_csv_path(task_name) | |
| os.makedirs(os.path.dirname(csv_path), exist_ok=True) | |
| csv_exists = os.path.exists(csv_path) | |
| fieldnames = [ | |
| "eval_date", | |
| "test_id", | |
| "model1_name", | |
| "model2_name", | |
| "org_img", | |
| "model1_res", | |
| "model2_res", | |
| "model1_path", | |
| "model2_path", | |
| "model1_physical_interaction_fidelity_score", | |
| "model1_optical_effect_accuracy_score", | |
| "model1_semantic_functional_alignment_score", | |
| "model1_overall_photorealism_score", | |
| "model2_physical_interaction_fidelity_score", | |
| "model2_optical_effect_accuracy_score", | |
| "model2_semantic_functional_alignment_score", | |
| "model2_overall_photorealism_score", | |
| ] | |
| try: | |
| with open(csv_path, "a", newline="", encoding="utf-8") as csv_file: | |
| writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
| if not csv_exists: | |
| writer.writeheader() | |
| writer.writerow(row) | |
| return True | |
| except Exception: | |
| return False | |
| def _upload_eval_record_to_dataset(task_name: str, row: Dict[str, object]) -> bool: | |
| """Upload a single-eval JSONL record to a dataset repo. | |
| Repo is taken from EVAL_REPO_ID env or defaults to 'peiranli0930/VisEval'. | |
| """ | |
| if HfApi is None: | |
| return False | |
| token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACEHUB_API_TOKEN") | |
| repo_id = os.environ.get("EVAL_REPO_ID", "peiranli0930/VisEval") | |
| if not token or not repo_id: | |
| return False | |
| try: | |
| from huggingface_hub import CommitOperationAdd | |
| api = HfApi(token=token) | |
| date_prefix = datetime.utcnow().strftime("%Y-%m-%d") | |
| folder = TASK_CONFIG[task_name]["folder"] | |
| uid = str(uuid.uuid4()) | |
| path_in_repo = f"submissions/{folder}/{date_prefix}/{uid}.jsonl" | |
| payload = (json.dumps(row, ensure_ascii=False) + "\n").encode("utf-8") | |
| operations = [CommitOperationAdd(path_in_repo=path_in_repo, path_or_fileobj=BytesIO(payload))] | |
| api.create_commit( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=f"Add eval {folder} {row.get('test_id')} {uid}", | |
| ) | |
| return True | |
| except Exception: | |
| return False | |
| def on_task_change(task_name: str, _state_pairs: List[Dict[str, str]]): | |
| pairs = load_task(task_name) | |
| pair = pairs[0] | |
| header = _format_pair_header(pair) | |
| # Defaults for A and B (8 sliders total) | |
| default_scores = [3, 3, 3, 3, 3, 3, 3, 3] | |
| return ( | |
| pairs, | |
| gr.update(value=0, minimum=0, maximum=len(pairs) - 1, visible=(len(pairs) > 1)), | |
| gr.update(value=header), | |
| _resolve_image_path(pair["org_img"]), | |
| _resolve_image_path(pair["model1_path"]), | |
| _resolve_image_path(pair["model2_path"]), | |
| *default_scores, | |
| gr.update(value=f"共 {len(pairs)} 个待评测的图片对。"), | |
| ) | |
| def on_pair_navigate(index: int, pairs: List[Dict[str, str]]): | |
| if not pairs: | |
| raise gr.Error("请先选择任务。") | |
| index = int(index) | |
| index = max(0, min(index, len(pairs) - 1)) | |
| pair = pairs[index] | |
| header = _format_pair_header(pair) | |
| return ( | |
| gr.update(value=index), | |
| gr.update(value=header), | |
| _resolve_image_path(pair["org_img"]), | |
| _resolve_image_path(pair["model1_path"]), | |
| _resolve_image_path(pair["model2_path"]), | |
| 3, 3, 3, 3, # A | |
| 3, 3, 3, 3, # B | |
| ) | |
| def on_submit( | |
| task_name: str, | |
| index: int, | |
| pairs: List[Dict[str, str]], | |
| a_physical_score: int, | |
| a_optical_score: int, | |
| a_semantic_score: int, | |
| a_overall_score: int, | |
| b_physical_score: int, | |
| b_optical_score: int, | |
| b_semantic_score: int, | |
| b_overall_score: int, | |
| ): | |
| if not task_name: | |
| raise gr.Error("请先选择任务。") | |
| if not pairs: | |
| raise gr.Error("当前任务没有加载任何图片对。") | |
| pair = pairs[index] | |
| score_map = { | |
| # Model A | |
| "model1_physical_interaction_fidelity_score": int(a_physical_score), | |
| "model1_optical_effect_accuracy_score": int(a_optical_score), | |
| "model1_semantic_functional_alignment_score": int(a_semantic_score), | |
| "model1_overall_photorealism_score": int(a_overall_score), | |
| # Model B | |
| "model2_physical_interaction_fidelity_score": int(b_physical_score), | |
| "model2_optical_effect_accuracy_score": int(b_optical_score), | |
| "model2_semantic_functional_alignment_score": int(b_semantic_score), | |
| "model2_overall_photorealism_score": int(b_overall_score), | |
| } | |
| row = _build_eval_row(pair, score_map) | |
| ok_local = _append_local_persist_csv(task_name, row) | |
| ok_hub = _upload_eval_record_to_dataset(task_name, row) | |
| next_index = min(index + 1, len(pairs) - 1) | |
| info = f"已保存 Test ID {pair['test_id']} 的评价结果。" | |
| info += " 本地持久化" + ("成功" if ok_local else "失败") + "。" | |
| info += " 上传Hub" + ("成功" if ok_hub else "失败") + "。" | |
| if next_index != index: | |
| pair = pairs[next_index] | |
| header = _format_pair_header(pair) | |
| return ( | |
| gr.update(value=next_index), | |
| gr.update(value=header), | |
| _resolve_image_path(pair["org_img"]), | |
| _resolve_image_path(pair["model1_path"]), | |
| _resolve_image_path(pair["model2_path"]), | |
| 3, 3, 3, 3, | |
| 3, 3, 3, 3, | |
| gr.update(value=info + f" 自动跳转到下一组({next_index + 1}/{len(pairs)})。"), | |
| ) | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| 3, 3, 3, 3, | |
| 3, 3, 3, 3, | |
| gr.update(value=info + " 已经是最后一组。"), | |
| ) | |
| with gr.Blocks(title="VisArena Human Evaluation") as demo: | |
| gr.Markdown( | |
| """ | |
| # VisArena Human Evaluation | |
| 请选择任务并对模型生成的图像进行评分。每项评分范围为 **1(效果极差)** 到 **5(效果极佳)**。 | |
| """ | |
| ) | |
| with gr.Row(): | |
| task_selector = gr.Dropdown( | |
| label="Task", | |
| choices=list(TASK_CONFIG.keys()), | |
| interactive=True, | |
| value="Scene Composition & Object Insertion", | |
| ) | |
| index_slider = gr.Slider( | |
| label="Pair Index", | |
| value=0, | |
| minimum=0, | |
| maximum=0, | |
| step=1, | |
| interactive=True, | |
| visible=False, | |
| ) | |
| pair_state = gr.State([]) | |
| pair_header = gr.Markdown("") | |
| # Layout: Original on top, two outputs below with their own sliders | |
| with gr.Row(): | |
| with gr.Column(scale=12): | |
| orig_image = gr.Image(type="filepath", label="原图 Original", interactive=False) | |
| with gr.Row(): | |
| with gr.Column(scale=6): | |
| model1_image = gr.Image(type="filepath", label="模型 A 输出", interactive=False) | |
| a_physical_input = gr.Slider(1, 5, value=3, step=1, label="A: 物理交互保真度") | |
| a_optical_input = gr.Slider(1, 5, value=3, step=1, label="A: 光学效应准确度") | |
| a_semantic_input = gr.Slider(1, 5, value=3, step=1, label="A: 语义/功能对齐度") | |
| a_overall_input = gr.Slider(1, 5, value=3, step=1, label="A: 整体真实感") | |
| with gr.Column(scale=6): | |
| model2_image = gr.Image(type="filepath", label="模型 B 输出", interactive=False) | |
| b_physical_input = gr.Slider(1, 5, value=3, step=1, label="B: 物理交互保真度") | |
| b_optical_input = gr.Slider(1, 5, value=3, step=1, label="B: 光学效应准确度") | |
| b_semantic_input = gr.Slider(1, 5, value=3, step=1, label="B: 语义/功能对齐度") | |
| b_overall_input = gr.Slider(1, 5, value=3, step=1, label="B: 整体真实感") | |
| submit_button = gr.Button("Submit Evaluation", variant="primary") | |
| feedback_box = gr.Markdown("") | |
| # Event bindings | |
| task_selector.change( | |
| fn=on_task_change, | |
| inputs=[task_selector, pair_state], | |
| outputs=[ | |
| pair_state, | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| a_physical_input, | |
| a_optical_input, | |
| a_semantic_input, | |
| a_overall_input, | |
| b_physical_input, | |
| b_optical_input, | |
| b_semantic_input, | |
| b_overall_input, | |
| feedback_box, | |
| ], | |
| ) | |
| index_slider.release( | |
| fn=on_pair_navigate, | |
| inputs=[index_slider, pair_state], | |
| outputs=[ | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| a_physical_input, | |
| a_optical_input, | |
| a_semantic_input, | |
| a_overall_input, | |
| b_physical_input, | |
| b_optical_input, | |
| b_semantic_input, | |
| b_overall_input, | |
| ], | |
| ) | |
| submit_button.click( | |
| fn=on_submit, | |
| inputs=[ | |
| task_selector, | |
| index_slider, | |
| pair_state, | |
| a_physical_input, | |
| a_optical_input, | |
| a_semantic_input, | |
| a_overall_input, | |
| b_physical_input, | |
| b_optical_input, | |
| b_semantic_input, | |
| b_overall_input, | |
| ], | |
| outputs=[ | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| a_physical_input, | |
| a_optical_input, | |
| a_semantic_input, | |
| a_overall_input, | |
| b_physical_input, | |
| b_optical_input, | |
| b_semantic_input, | |
| b_overall_input, | |
| feedback_box, | |
| ], | |
| ) | |
| # Auto-load default task on startup | |
| demo.load( | |
| fn=on_task_change, | |
| inputs=[task_selector, pair_state], | |
| outputs=[ | |
| pair_state, | |
| index_slider, | |
| pair_header, | |
| orig_image, | |
| model1_image, | |
| model2_image, | |
| a_physical_input, | |
| a_optical_input, | |
| a_semantic_input, | |
| a_overall_input, | |
| b_physical_input, | |
| b_optical_input, | |
| b_semantic_input, | |
| b_overall_input, | |
| feedback_box, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |