Spaces:
Sleeping
Sleeping
| """ | |
| VideoEval Movie-Level 问卷应用(Hugging Face Spaces) | |
| 仅保留 Movie-Level 评测,并支持方法级别统计输出。 | |
| """ | |
| import json | |
| import os | |
| import threading | |
| import html | |
| import shutil | |
| import random | |
| from collections import defaultdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import gradio as gr | |
| from huggingface_hub import CommitScheduler, HfApi, snapshot_download | |
| # 路径配置(按用户要求) | |
| # Spaces 推荐优先读取当前 Space 仓库内文件(app.py 同级) | |
| APP_DIR = Path(__file__).resolve().parent | |
| LOCAL_INPUT_DIR = APP_DIR / "user_study_input" | |
| LOCAL_OUTPUT_DIR = APP_DIR / "user_study_results" | |
| DATA_INPUT_DIR = Path("/data/user_study_input") | |
| DATA_OUTPUT_DIR = Path("/data/user_study_results") | |
| DATA_REPO_ID = os.environ.get("DATA_REPO_ID", "MemDirector/user_study_input") | |
| RESULTS_REPO_ID = os.environ.get("RESULTS_REPO_ID", "MemDirector/user_study_results") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
| HF_TOKEN_FILE = os.environ.get("HF_TOKEN_FILE", "/data/.secrets/hf_token") | |
| SPACE_MODE = os.environ.get("SPACE_MODE", "repo_first") # repo_first / data_first / hub_only | |
| ROOT_DIR = APP_DIR | |
| INPUT_DIR = LOCAL_INPUT_DIR | |
| OUTPUT_DIR = LOCAL_OUTPUT_DIR | |
| STORY_DIR = INPUT_DIR / "clip_movie_story" | |
| VIDEO_DIR = INPUT_DIR / "video" | |
| Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) | |
| scheduler: Optional[CommitScheduler] = None | |
| hf_api = HfApi() | |
| def _load_hf_token() -> Optional[str]: | |
| """ | |
| 安全读取 HF token: | |
| 1) 优先 HF_TOKEN 环境变量(建议在 Spaces Secrets 配置) | |
| 2) 其次读取服务端文件 HF_TOKEN_FILE | |
| """ | |
| env_token = os.environ.get("HF_TOKEN", "").strip() | |
| if env_token: | |
| return env_token | |
| token_file = Path(HF_TOKEN_FILE) | |
| if token_file.exists(): | |
| try: | |
| file_token = token_file.read_text(encoding="utf-8").strip() | |
| if file_token: | |
| return file_token | |
| except Exception as e: | |
| print(f"[INIT] failed to read HF token file: {e}") | |
| return None | |
| def _set_paths(input_dir: Path, output_dir: Path) -> None: | |
| global INPUT_DIR, OUTPUT_DIR, STORY_DIR, VIDEO_DIR, ROOT_DIR | |
| INPUT_DIR = input_dir | |
| OUTPUT_DIR = output_dir | |
| STORY_DIR = INPUT_DIR / "clip_movie_story" | |
| VIDEO_DIR = INPUT_DIR / "video" | |
| ROOT_DIR = INPUT_DIR.parent | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| def _try_use_local_repo_layout() -> bool: | |
| # Space 仓库内自带 user_study_input 时,直接读取(最符合“已放上去直接跑”) | |
| if LOCAL_INPUT_DIR.exists(): | |
| _set_paths(LOCAL_INPUT_DIR, LOCAL_OUTPUT_DIR) | |
| return True | |
| return False | |
| def _try_use_data_volume_layout() -> bool: | |
| # 如果使用 /data 持久卷,则可放在 /data/user_study_input | |
| if DATA_INPUT_DIR.exists(): | |
| _set_paths(DATA_INPUT_DIR, DATA_OUTPUT_DIR) | |
| return True | |
| return False | |
| def _try_download_from_hub() -> bool: | |
| # 最后兜底:从 dataset repo 下载 | |
| if not DATA_REPO_ID: | |
| return False | |
| hub_root = APP_DIR / ".hf_space_cache" | |
| try: | |
| snapshot_download( | |
| repo_id=DATA_REPO_ID, | |
| repo_type="dataset", | |
| local_dir=str(hub_root), | |
| token=_load_hf_token(), | |
| allow_patterns=[ | |
| "clip_movie_story/**", | |
| "video/**", | |
| "user_study_input/**", | |
| "user_study_results/**", | |
| ], | |
| ) | |
| except Exception as e: | |
| print(f"[INIT] snapshot_download failed: {e}") | |
| return False | |
| # 兼容两种 dataset 结构: | |
| # A) 仓库根目录直接是 clip_movie_story/ 与 video/ | |
| # B) 仓库里有 user_study_input/ 子目录 | |
| if (hub_root / "clip_movie_story").exists() and (hub_root / "video").exists(): | |
| hub_input = hub_root | |
| elif (hub_root / "user_study_input").exists(): | |
| hub_input = hub_root / "user_study_input" | |
| else: | |
| return False | |
| # 结果统一写到 Space 仓库内目录,避免写到缓存目录后用户难以定位 | |
| hub_output = LOCAL_OUTPUT_DIR | |
| _set_paths(hub_input, hub_output) | |
| return True | |
| def init_space_storage() -> None: | |
| """ | |
| Hugging Face Spaces 规范: | |
| - 从 dataset repo 拉取 user_study_input 与 user_study_results 到本地 ROOT_DIR | |
| - 使用 CommitScheduler 持续回写 user_study_results | |
| """ | |
| global scheduler | |
| if SPACE_MODE == "hub_only": | |
| ok = _try_download_from_hub() | |
| elif SPACE_MODE == "data_first": | |
| ok = _try_use_data_volume_layout() or _try_use_local_repo_layout() or _try_download_from_hub() | |
| else: | |
| ok = _try_use_local_repo_layout() or _try_use_data_volume_layout() or _try_download_from_hub() | |
| print(f"[INIT] storage init mode={SPACE_MODE}, success={ok}, input={INPUT_DIR}, output={OUTPUT_DIR}") | |
| if RESULTS_REPO_ID: | |
| try: | |
| scheduler = CommitScheduler( | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| folder_path=str(OUTPUT_DIR), | |
| path_in_repo="user_study_results", | |
| every=3, | |
| token=_load_hf_token(), | |
| ) | |
| print(f"[INIT] CommitScheduler enabled: {RESULTS_REPO_ID}") | |
| except Exception as e: | |
| print(f"[INIT] CommitScheduler init failed: {e}") | |
| init_space_storage() | |
| # Movie-Level 指标定义(仅保留六个聚合指标) | |
| MOVIE_CRITERIA: List[Tuple[str, str, str]] = [ | |
| ("NS", "叙事与剧本", "考察剧情是否忠于文本设定,情节推进是否自然连贯、易于理解。"), | |
| ("AT", "视听与技术", "考察画面清晰度、角色稳定性、物理合理性及音频表现的综合质量。"), | |
| ("AE", "美学与表现力", "考察镜头设计、构图与风格表达是否具有层次感与艺术表现力。"), | |
| ("RF", "节奏与流动性", "考察剪辑快慢、段落衔接与音画同步是否顺畅,整体节奏是否舒适。"), | |
| ("EE", "情感与参与度", "考察作品是否能有效调动情绪,让观众产生共鸣并保持观看投入。"), | |
| ("OE", "整体体验", "考察作为完整短片的综合观感,包括完成度、可看性与整体吸引力。"), | |
| ] | |
| BASE_METRIC_KEYS = [k for k, _, _ in MOVIE_CRITERIA] | |
| # 左侧 A 固定 MemDirector;右侧 B 固定 Seedance2.0;展示顺序仍由末尾 shuffle 随机 | |
| FIXED_A_METHOD = "MemDirector" | |
| FIXED_B_METHOD = "Seedance2.0" | |
| SAVE_LOCK = threading.Lock() | |
| CUSTOM_CSS = """ | |
| .gradio-container { | |
| max-width: 1300px !important; | |
| margin-left: auto !important; | |
| margin-right: auto !important; | |
| background: linear-gradient(180deg, #f8fbff 0%, #eef4ff 100%) !important; | |
| } | |
| #hero { | |
| border: 1px solid #d9e5ff; | |
| border-radius: 20px; | |
| padding: 24px 26px; | |
| background: linear-gradient(135deg, #ffffff 0%, #f2f7ff 50%, #eaf2ff 100%); | |
| margin-bottom: 12px; | |
| box-shadow: 0 12px 30px rgba(57, 94, 174, 0.12); | |
| } | |
| #hero h1 { | |
| margin: 0 0 8px 0; | |
| font-size: 2rem; | |
| color: #1b2a4a; | |
| } | |
| #hero p { | |
| margin: 0; | |
| color: #41557f; | |
| } | |
| .panel { | |
| border: 1px solid #dbe6fb !important; | |
| border-radius: 16px !important; | |
| padding: 16px !important; | |
| background: #ffffff !important; | |
| box-shadow: 0 8px 20px rgba(30, 78, 158, 0.08); | |
| } | |
| .center-panel { | |
| max-width: 980px; | |
| margin-left: auto !important; | |
| margin-right: auto !important; | |
| } | |
| .section-head { | |
| border: 1px solid #d7e5ff; | |
| border-radius: 12px; | |
| background: linear-gradient(180deg, #f7fbff 0%, #eef5ff 100%); | |
| padding: 10px 14px; | |
| margin-bottom: 12px; | |
| color: #233a63; | |
| font-weight: 700; | |
| } | |
| .hint { | |
| color: #6480ad; | |
| font-size: 0.9rem; | |
| } | |
| .metric-card { | |
| border: 1px solid #dbe7fb !important; | |
| border-radius: 14px !important; | |
| padding: 18px 18px 14px 18px !important; | |
| background: #fcfeff !important; | |
| box-shadow: 0 6px 16px rgba(38, 84, 160, 0.06); | |
| margin-bottom: 8px !important; | |
| } | |
| .metric-card p { | |
| margin-top: 6px !important; | |
| margin-bottom: 10px !important; | |
| } | |
| .sample-card { | |
| border: 1px solid #deebff; | |
| border-radius: 14px; | |
| padding: 14px 16px; | |
| background: #f9fcff; | |
| } | |
| .sample-card h3 { | |
| margin: 0 0 8px 0; | |
| color: #273f68; | |
| } | |
| .sample-card .sid { | |
| margin-bottom: 10px; | |
| color: #3f5f94; | |
| } | |
| .sample-card .story-title { | |
| margin: 0 0 6px 0; | |
| color: #2b4674; | |
| font-weight: 600; | |
| } | |
| .sample-card .story-body { | |
| margin: 0; | |
| color: #334f7c; | |
| white-space: pre-wrap; | |
| line-height: 1.6; | |
| } | |
| #submit-btn, | |
| #submit-btn button { | |
| min-height: 44px !important; | |
| width: min(520px, 92vw) !important; | |
| font-size: 1.08rem !important; | |
| font-weight: 700 !important; | |
| padding: 0.45rem 1.2rem !important; | |
| margin: 0 auto !important; | |
| display: block !important; | |
| } | |
| """ | |
| def _safe_read_text(path: Path) -> str: | |
| if not path.exists(): | |
| return "" | |
| return path.read_text(encoding="utf-8-sig").strip() | |
| def load_dataset_index() -> List[Dict[str, Any]]: | |
| """扫描输入目录,构建可评测样本列表(每个方法-故事仅保留1个视频)。""" | |
| stories = {p.stem: _safe_read_text(p) for p in sorted(STORY_DIR.glob("*.txt"))} | |
| samples: List[Dict[str, Any]] = [] | |
| if not VIDEO_DIR.exists(): | |
| return samples | |
| for method_dir in sorted([d for d in VIDEO_DIR.iterdir() if d.is_dir()]): | |
| method = method_dir.name | |
| for story_dir in sorted([d for d in method_dir.iterdir() if d.is_dir()]): | |
| story_name = story_dir.name | |
| # 每个方法-故事只评一次:如果有多个视频,默认取排序后第一个 | |
| video_candidates = sorted(story_dir.glob("*.mp4")) | |
| if not video_candidates: | |
| continue | |
| video_path = video_candidates[0] | |
| sample_id = f"{method}__{story_name}__{video_path.stem}" | |
| samples.append( | |
| { | |
| "sample_id": sample_id, | |
| "method": method, | |
| "story_name": story_name, | |
| "video_name": video_path.name, | |
| "video_path": str(video_path.resolve()), | |
| "story_text": stories.get(story_name, ""), | |
| } | |
| ) | |
| return samples | |
| def load_evaluated_method_story_pairs() -> set: | |
| """从结果目录读取已评估的 (method, story_name) 组合。""" | |
| evaluated = set() | |
| raw_root = OUTPUT_DIR / "raw_results" | |
| if not raw_root.exists(): | |
| return evaluated | |
| for fp in raw_root.rglob("*.json"): | |
| try: | |
| with open(fp, "r", encoding="utf-8-sig") as f: | |
| data = json.load(f) | |
| except Exception: | |
| continue | |
| sample = data.get("sample", {}) | |
| method = sample.get("method") | |
| story_name = sample.get("story_name") | |
| if method and story_name: | |
| evaluated.add((method, story_name)) | |
| return evaluated | |
| def sync_results_from_hub_to_local() -> None: | |
| """ | |
| 从远程结果仓库拉取最新结果到本地 OUTPUT_DIR。 | |
| 仅用于“判定哪些样本已评估”,保证展示逻辑以远程为准。 | |
| """ | |
| if not RESULTS_REPO_ID: | |
| return | |
| sync_root = APP_DIR / ".hf_results_sync_cache" | |
| local_raw = OUTPUT_DIR / "raw_results" | |
| local_agg = OUTPUT_DIR / "method_aggregates.json" | |
| # 远程优先:每次同步前先清空本地结果,避免沿用旧数据 | |
| if local_raw.exists(): | |
| shutil.rmtree(local_raw) | |
| if local_agg.exists(): | |
| local_agg.unlink() | |
| # 先清空同步缓存,避免远程已删除文件在本地残留导致“误判已评估” | |
| if sync_root.exists(): | |
| shutil.rmtree(sync_root) | |
| try: | |
| snapshot_download( | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| local_dir=str(sync_root), | |
| token=_load_hf_token(), | |
| allow_patterns=["user_study_results/**"], | |
| force_download=True, | |
| ) | |
| except Exception as e: | |
| print(f"[SYNC] pull results repo failed: {e}") | |
| return | |
| remote_results_root = sync_root / "user_study_results" | |
| if not remote_results_root.exists(): | |
| return | |
| remote_raw = remote_results_root / "raw_results" | |
| if remote_raw.exists(): | |
| shutil.copytree(remote_raw, local_raw) | |
| remote_agg = remote_results_root / "method_aggregates.json" | |
| if remote_agg.exists(): | |
| local_agg.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(remote_agg, local_agg) | |
| def build_pending_samples() -> List[Dict[str, Any]]: | |
| """构建对比样本池:同一 story 下 A 固定为 MemDirector,B 固定为 Seedance2.0。""" | |
| all_samples = load_dataset_index() | |
| by_story: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| for sample in all_samples: | |
| by_story[sample["story_name"]].append(sample) | |
| pending: List[Dict[str, Any]] = [] | |
| for story_name, story_samples in by_story.items(): | |
| by_method = {s["method"]: s for s in story_samples} | |
| a_sample = by_method.get(FIXED_A_METHOD) | |
| b_sample = by_method.get(FIXED_B_METHOD) | |
| if not a_sample or not b_sample: | |
| continue | |
| pending.append( | |
| { | |
| "pair_id": f"{story_name}__{FIXED_A_METHOD}_vs_{FIXED_B_METHOD}", | |
| "story_name": story_name, | |
| "story_text": a_sample.get("story_text", "") or b_sample.get("story_text", ""), | |
| "A": { | |
| "method": a_sample["method"], | |
| "video_name": a_sample["video_name"], | |
| "video_path": a_sample["video_path"], | |
| "sample_id": a_sample["sample_id"], | |
| }, | |
| "B": { | |
| "method": b_sample["method"], | |
| "video_name": b_sample["video_name"], | |
| "video_path": b_sample["video_path"], | |
| "sample_id": b_sample["sample_id"], | |
| }, | |
| } | |
| ) | |
| random.shuffle(pending) | |
| for i, sample in enumerate(pending, start=1): | |
| sample["anon_id"] = f"id_{i:03d}" | |
| return pending | |
| def build_data_diagnostics(samples: List[Dict[str, Any]]) -> str: | |
| return ( | |
| f"**SPACE_MODE**: `{SPACE_MODE}` \n" | |
| f"**DATA_REPO_ID**: `{DATA_REPO_ID}` \n" | |
| f"**RESULTS_REPO_ID**: `{RESULTS_REPO_ID}` \n" | |
| f"**ROOT_DIR**: `{ROOT_DIR}` \n" | |
| f"**INPUT_DIR exists**: `{INPUT_DIR.exists()}` \n" | |
| f"**STORY_DIR exists**: `{STORY_DIR.exists()}` \n" | |
| f"**VIDEO_DIR exists**: `{VIDEO_DIR.exists()}` \n" | |
| f"**Pending samples**: `{len(samples)}`" | |
| ) | |
| def compute_derived(scores: Dict[str, float]) -> Dict[str, float]: | |
| """计算 CL / CRH / AVG。""" | |
| cl = ((2 * scores["NS"] + 3 * scores["AT"]) / 5.0) + 0.5 * scores["AE"] | |
| crh = ((scores["AT"] + 2 * scores["RF"] + scores["EE"] + scores["OE"]) / 5.0) + 0.5 * scores["AE"] | |
| avg = ( | |
| 2 * scores["NS"] | |
| + 4 * scores["AT"] | |
| + 2 * scores["AE"] | |
| + 2 * scores["RF"] | |
| + scores["EE"] | |
| + scores["OE"] | |
| ) / 12.0 | |
| return {"CL": cl, "CRH": crh, "AVG": avg} | |
| def save_single_result( | |
| sample: Dict[str, Any], | |
| evaluator_id: str, | |
| metric_choice: Dict[str, str], | |
| method_scores: Dict[str, Dict[str, float]], | |
| summary: str, | |
| ) -> Path: | |
| """保存单个 A/B 对比问卷结果。""" | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| result_dir = OUTPUT_DIR / "raw_results" / sample["story_name"] | |
| result_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = result_dir / f"{sample['pair_id']}_{evaluator_id}_{ts}.json" | |
| payload = { | |
| "timestamp": datetime.now().isoformat(), | |
| "evaluator_id": evaluator_id, | |
| "pair": sample, | |
| "metric_choice": metric_choice, | |
| "method_scores": method_scores, | |
| "method_derived": {m: compute_derived(v) for m, v in method_scores.items()}, | |
| "summary": summary, | |
| } | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, ensure_ascii=False, indent=2) | |
| return out_path | |
| def recompute_method_aggregates() -> Path: | |
| """ | |
| 统计每个方法各维度均分,并输出 method_aggregates.json。 | |
| 同时给出 CL/CRH/AVG 的方法均值。 | |
| """ | |
| raw_root = OUTPUT_DIR / "raw_results" | |
| method_scores: Dict[str, Dict[str, List[float]]] = defaultdict(lambda: defaultdict(list)) | |
| method_count: Dict[str, int] = defaultdict(int) | |
| if raw_root.exists(): | |
| for fp in raw_root.rglob("*.json"): | |
| with open(fp, "r", encoding="utf-8-sig") as f: | |
| data = json.load(f) | |
| pair_method_scores = data.get("method_scores", {}) | |
| for method, scores in pair_method_scores.items(): | |
| if not all(k in scores for k in BASE_METRIC_KEYS): | |
| continue | |
| method_count[method] += 1 | |
| for k in BASE_METRIC_KEYS: | |
| method_scores[method][k].append(float(scores[k])) | |
| derived = compute_derived({k: float(scores[k]) for k in BASE_METRIC_KEYS}) | |
| for d_key, d_val in derived.items(): | |
| method_scores[method][d_key].append(float(d_val)) | |
| agg = { | |
| "updated_at": datetime.now().isoformat(), | |
| "metric_keys": BASE_METRIC_KEYS, | |
| "derived_keys": ["CL", "CRH", "AVG"], | |
| "methods": {}, | |
| } | |
| for method in sorted(method_scores.keys()): | |
| metric_avg = {} | |
| for key, vals in method_scores[method].items(): | |
| metric_avg[key] = round(sum(vals) / len(vals), 4) if vals else None | |
| agg["methods"][method] = { | |
| "num_submissions": method_count[method], | |
| "avg_scores": metric_avg, | |
| } | |
| out_path = OUTPUT_DIR / "method_aggregates.json" | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| json.dump(agg, f, ensure_ascii=False, indent=2) | |
| return out_path | |
| def push_result_files_to_hub(single_path: Path, agg_path: Path) -> Optional[str]: | |
| """ | |
| 提交后立即把结果文件上传到 RESULTS_REPO_ID,避免仅依赖定时 CommitScheduler。 | |
| 返回 None 表示成功;返回字符串表示失败原因。 | |
| """ | |
| if not RESULTS_REPO_ID: | |
| return "未配置 RESULTS_REPO_ID。" | |
| token = _load_hf_token() | |
| if not token: | |
| return "未配置 HF_TOKEN,无法写入 Hugging Face 远程仓库。" | |
| try: | |
| single_rel = single_path.relative_to(OUTPUT_DIR).as_posix() | |
| hf_api.upload_file( | |
| path_or_fileobj=str(single_path), | |
| path_in_repo=f"user_study_results/{single_rel}", | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| token=token, | |
| ) | |
| agg_rel = agg_path.relative_to(OUTPUT_DIR).as_posix() | |
| hf_api.upload_file( | |
| path_or_fileobj=str(agg_path), | |
| path_in_repo=f"user_study_results/{agg_rel}", | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| token=token, | |
| ) | |
| return None | |
| except Exception as e: | |
| return str(e) | |
| def build_sample_brief_html(sample: Dict[str, Any], index: int, total: int) -> str: | |
| story = sample.get("story_text") or "(未找到对应 story 文本,请检查 clip_movie_story 下是否有同名 txt)" | |
| safe_story = html.escape(story) | |
| return ( | |
| "<div class='sample-card'>" | |
| "<div class='story-title'>剧情描述</div>" | |
| f"<p class='story-body'>{safe_story}</p>" | |
| "</div>" | |
| ) | |
| def create_app(): | |
| samples = build_pending_samples() | |
| with gr.Blocks( | |
| title="VideoEval Movie-Level Evaluation", | |
| css=CUSTOM_CSS, | |
| theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan", neutral_hue="slate"), | |
| ) as app: | |
| gr.HTML( | |
| """ | |
| <div id="hero"> | |
| <h1>VideoEval · Movie-Level Evaluation</h1> | |
| <p>统一电影级评测问卷,支持方法级均分统计(含 CL / CRH / AVG)</p> | |
| </div> | |
| """ | |
| ) | |
| samples_state = gr.State(samples) | |
| submit_ok_state = gr.State(True) | |
| with gr.Row(): | |
| with gr.Column(elem_classes=["panel", "center-panel"]): | |
| gr.HTML("<div class='section-head' style='text-align:center;'>1) 视频与剧情</div>") | |
| with gr.Row(): | |
| video_a = gr.Video(label="A", value=samples[0]["A"]["video_path"] if samples else None, height=360) | |
| video_b = gr.Video(label="B", value=samples[0]["B"]["video_path"] if samples else None, height=360) | |
| sample_info = gr.HTML( | |
| "<div class='sample-card'><p class='story-body'>无可用样本</p></div>" | |
| if not samples else build_sample_brief_html(samples[0], 0, len(samples)) | |
| ) | |
| status = gr.Markdown("") | |
| gr.Markdown("## 2) 对比评分(A好 / B好 / 平手)") | |
| score_widgets: Dict[str, gr.Radio] = {} | |
| metric_groups = { | |
| "I. 叙事与剧本 (NS)": ["NS"], | |
| "II. 视听与技术 (AT)": ["AT"], | |
| "III. 美学与表现力 (AE)": ["AE"], | |
| "IV. 节奏与流动性 (RF)": ["RF"], | |
| "V. 情感与参与度 (EE)": ["EE"], | |
| "VI. 整体体验 (OE)": ["OE"], | |
| } | |
| criteria_map = {k: (name, desc) for k, name, desc in MOVIE_CRITERIA} | |
| for section_title, keys in metric_groups.items(): | |
| with gr.Accordion(section_title, open=True): | |
| for key in keys: | |
| name, desc = criteria_map[key] | |
| with gr.Group(elem_classes=["metric-card"]): | |
| gr.Markdown(f"**{key} · {name}**") | |
| gr.Markdown(f"<span class='hint'>{desc}</span>") | |
| score_widgets[key] = gr.Radio(choices=["A好", "B好", "平手"], label=key) | |
| final_summary = gr.Textbox(label="Final Summary(可选)", lines=4, placeholder="总结 A/B 的主要优缺点") | |
| submit_btn = gr.Button("提交", variant="primary", elem_id="submit-btn") | |
| def _submit(summary: str, curr_samples: List[Dict[str, Any]], *score_vals): | |
| if not curr_samples: | |
| msg = "❌ 没有可提交样本。" | |
| gr.Warning(msg) | |
| return msg, False | |
| # 由于页面已移除样本选择控件,这里默认提交当前展示的第一个样本。 | |
| sample = curr_samples[0] | |
| evaluator_id = "anonymous" | |
| a_method = sample["A"]["method"] | |
| b_method = sample["B"]["method"] | |
| method_scores: Dict[str, Dict[str, float]] = { | |
| a_method: {k: 0.0 for k in BASE_METRIC_KEYS}, | |
| b_method: {k: 0.0 for k in BASE_METRIC_KEYS}, | |
| } | |
| metric_choice: Dict[str, str] = {} | |
| for i, key in enumerate(BASE_METRIC_KEYS): | |
| raw_score = score_vals[i] if i < len(score_vals) else None | |
| if raw_score in (None, "", []): | |
| msg = f"❌ 请为 `{key}` 打分。" | |
| gr.Warning(msg) | |
| return "", False | |
| if isinstance(raw_score, str) and raw_score.strip().lower() in {"none", "null", "[]"}: | |
| msg = f"❌ 请为 `{key}` 打分。" | |
| gr.Warning(msg) | |
| return "", False | |
| choice = str(raw_score).strip() | |
| if choice not in {"A好", "B好", "平手"}: | |
| msg = f"❌ `{key}` 的选择无效,请重新选择 A好/B好/平手。" | |
| gr.Warning(msg) | |
| return msg, False | |
| metric_choice[key] = choice | |
| if choice == "A好": | |
| method_scores[a_method][key] = 1.0 | |
| method_scores[b_method][key] = 0.0 | |
| elif choice == "B好": | |
| method_scores[a_method][key] = 0.0 | |
| method_scores[b_method][key] = 1.0 | |
| else: | |
| method_scores[a_method][key] = 0.5 | |
| method_scores[b_method][key] = 0.5 | |
| with SAVE_LOCK: | |
| # 同步远程最新结果,确保“允许重复提交”后平均分统计包含全量提交。 | |
| sync_results_from_hub_to_local() | |
| single_path = save_single_result(sample, evaluator_id, metric_choice, method_scores, summary or "") | |
| agg_path = recompute_method_aggregates() | |
| push_err = push_result_files_to_hub(single_path, agg_path) | |
| if push_err: | |
| msg = f"❌ 结果已本地保存,但写入远程 `{RESULTS_REPO_ID}` 失败:{push_err}" | |
| gr.Warning(msg) | |
| return msg, False | |
| _ = (single_path, agg_path) | |
| return "", True | |
| def _refresh_on_load() -> Tuple[Any, Any, str, str, List[Dict[str, Any]]]: | |
| refreshed_samples = build_pending_samples() | |
| if not refreshed_samples: | |
| return None, None, "<div class='sample-card'><p class='story-body'>无可用样本(需要同剧情下至少两个方法)</p></div>", "", refreshed_samples | |
| first = refreshed_samples[0] | |
| return ( | |
| first["A"]["video_path"], | |
| first["B"]["video_path"], | |
| build_sample_brief_html(first, 0, len(refreshed_samples)), | |
| "", | |
| refreshed_samples, | |
| ) | |
| def _refresh_after_submit( | |
| submit_ok: bool, | |
| submit_msg: str, | |
| curr_video_a: Any, | |
| curr_video_b: Any, | |
| curr_info: str, | |
| curr_samples: List[Dict[str, Any]], | |
| ) -> Tuple[Any, Any, str, str, List[Dict[str, Any]]]: | |
| submit_msg = (submit_msg or "").strip() | |
| # 提交失败时,不刷新样本/故事,保持当前页面不变 | |
| if not submit_ok: | |
| return curr_video_a, curr_video_b, curr_info, submit_msg, curr_samples | |
| refreshed_samples = build_pending_samples() | |
| if not refreshed_samples: | |
| status_msg = submit_msg | |
| return None, None, "<div class='sample-card'><p class='story-body'>无可用样本(需要同剧情下至少两个方法)</p></div>", status_msg, refreshed_samples | |
| first = refreshed_samples[0] | |
| status_msg = submit_msg | |
| return ( | |
| first["A"]["video_path"], | |
| first["B"]["video_path"], | |
| build_sample_brief_html(first, 0, len(refreshed_samples)), | |
| status_msg, | |
| refreshed_samples, | |
| ) | |
| def _clear_scores_after_submit(submit_ok: bool) -> Tuple[Any, ...]: | |
| # 提交失败时不清空输入,便于用户补充后重提 | |
| if not submit_ok: | |
| keeps: List[Any] = [gr.update()] | |
| keeps.extend(gr.update() for _ in BASE_METRIC_KEYS) | |
| return tuple(keeps) | |
| # 提交成功后清空所有分数与总结,避免沿用上一条样本的输入 | |
| clears: List[Any] = [gr.update(value="")] | |
| clears.extend(gr.update(value=None) for _ in BASE_METRIC_KEYS) | |
| return tuple(clears) | |
| submit_inputs = [final_summary, samples_state] | |
| for key in BASE_METRIC_KEYS: | |
| submit_inputs.append(score_widgets[key]) | |
| submit_evt = submit_btn.click(_submit, inputs=submit_inputs, outputs=[status, submit_ok_state]) | |
| submit_evt.then( | |
| _clear_scores_after_submit, | |
| inputs=[submit_ok_state], | |
| outputs=[final_summary] + [score_widgets[k] for k in BASE_METRIC_KEYS], | |
| ) | |
| submit_evt.then( | |
| _refresh_after_submit, | |
| inputs=[submit_ok_state, status, video_a, video_b, sample_info, samples_state], | |
| outputs=[video_a, video_b, sample_info, status, samples_state], | |
| ) | |
| app.load( | |
| _refresh_on_load, | |
| outputs=[video_a, video_b, sample_info, status, samples_state], | |
| ) | |
| return app | |
| demo = create_app() | |
| if __name__ == "__main__": | |
| allowed_paths = [str(INPUT_DIR.resolve())] if INPUT_DIR.exists() else None | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| allowed_paths=allowed_paths, | |
| ) |