import gradio as gr import os import json from datetime import datetime from datasets import load_dataset import random # 全局变量存储数据集 DATASET = None VIDEO_DATA = None # 从Hugging Face dataset加载视频 def load_videos_from_huggingface(): global DATASET, VIDEO_DATA try: print("正在加载数据集: WenjiaWang/videoforuser...") DATASET = load_dataset("WenjiaWang/videoforuser", split="train") print(f"成功加载数据集,共 {len(DATASET)} 个视频") # 组织视频数据:按场景分组 VIDEO_DATA = {} for idx, item in enumerate(DATASET): # 获取视频路径信息 if 'video' in item: video_path = item['video'] elif 'path' in item: video_path = item['path'] else: print(f"警告: 第 {idx} 项没有视频路径字段") continue # 从路径中提取场景名和方法名 # 假设路径格式类似: "videos/scene_name/method.mp4" path_parts = video_path.split('/') if len(path_parts) >= 2: scene_name = path_parts[-2] # 倒数第二部分是场景名 file_name = path_parts[-1] # 最后部分是文件名 # 提取方法名 method_name = file_name.replace('.mp4', '') if scene_name not in VIDEO_DATA: VIDEO_DATA[scene_name] = {} # 存储视频信息(包括在dataset中的索引) VIDEO_DATA[scene_name][method_name] = { 'index': idx, 'path': video_path, 'item': item } print(f"组织完成,共 {len(VIDEO_DATA)} 个场景") return True except Exception as e: print(f"加载数据集失败: {e}") import traceback traceback.print_exc() return False # 获取所有场景列表 def get_question_folders(): if VIDEO_DATA is None: success = load_videos_from_huggingface() if not success: return [] return sorted(list(VIDEO_DATA.keys())) # 获取某个场景的所有视频 def get_videos_for_question(scene_name): if VIDEO_DATA is None or scene_name not in VIDEO_DATA: return {}, {} scene_videos = VIDEO_DATA[scene_name] # 创建方法名到真实名称的映射 method_names = list(scene_videos.keys()) # 随机打乱顺序以匿名化 shuffled_methods = method_names.copy() random.shuffle(shuffled_methods) videos = {} method_mapping = {} for i, method_name in enumerate(shuffled_methods): display_name = f"Method {chr(65+i)}" # Method A, B, C, D # 获取视频数据 video_info = scene_videos[method_name] video_item = video_info['item'] # 从dataset item中获取视频文件 if 'video' in video_item: videos[display_name] = video_item['video'] # 这应该是视频文件路径或对象 method_mapping[display_name] = method_name return videos, method_mapping # 保存评分数据 def save_ratings(scene_name, ratings_data, method_mapping): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 将显示名称映射到真实方法名 mapped_ratings = {} for display_name, ratings in ratings_data.items(): real_method = method_mapping.get(display_name, display_name) mapped_ratings[real_method] = ratings # 读取现有数据 all_data = [] if os.path.exists("ratings_data.json"): try: with open("ratings_data.json", "r", encoding="utf-8") as f: all_data = json.load(f) except: all_data = [] # 添加新数据 entry = { "timestamp": timestamp, "scene": scene_name, "ratings": mapped_ratings } all_data.append(entry) # 保存数据 with open("ratings_data.json", "w", encoding="utf-8") as f: json.dump(all_data, f, ensure_ascii=False, indent=2) return f"✓ 评分已保存 / Ratings saved" # 创建Gradio界面 def create_video_survey_app(): # 预加载数据集 print("初始化应用...") load_videos_from_huggingface() question_folders = get_question_folders() if not question_folders: print("错误: 没有找到任何场景数据") return None print(f"找到 {len(question_folders)} 个场景") with gr.Blocks(title="视频生成质量用户研究", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🎬 视频生成质量用户研究 / Video Generation Quality User Study") gr.Markdown(""" ### 说明 / Instructions: - 请观看每个视频并进行评分 / Please watch each video and rate them - 评分标准 / Rating criteria: - **动态生成质量** / Dynamic Generation Quality: 视频中物体运动的流畅性和真实性 - **静态一致性** / Static Consistency: 视频中静态物体的稳定性和一致性 - **整体质量** / Overall Quality: 视频的整体观感 - 评分范围:1-5分(5分最好)/ Rating scale: 1-5 (5 = Best) """) # 状态变量 current_question_idx = gr.State(0) current_method_mapping = gr.State({}) # 进度显示 with gr.Row(): prev_btn = gr.Button("⬅️ 上一题 / Previous", size="sm") question_text = gr.Markdown(f"**场景 1 / {len(question_folders)}**") next_btn = gr.Button("下一题 / Next ➡️", size="sm", variant="primary") status_text = gr.Textbox(label="状态 / Status", interactive=False, visible=False) # 视频显示区域(4个视频) video_components = [] rating_components = [] for i in range(4): method_name = f"Method {chr(65+i)}" with gr.Group(): gr.Markdown(f"### 🎥 {method_name}") video = gr.Video(label="", height=300) video_components.append(video) with gr.Row(): dynamic = gr.Slider( minimum=1, maximum=5, step=1, value=3, label="动态质量 / Dynamic Quality", info="1=差 / Poor, 5=优秀 / Excellent" ) static = gr.Slider( minimum=1, maximum=5, step=1, value=3, label="静态一致性 / Static Consistency", info="1=差 / Poor, 5=优秀 / Excellent" ) overall = gr.Slider( minimum=1, maximum=5, step=1, value=3, label="整体质量 / Overall Quality", info="1=差 / Poor, 5=优秀 / Excellent" ) rating_components.append({ "dynamic": dynamic, "static": static, "overall": overall }) # 更新问题显示 def update_question(question_idx, save_previous=False, prev_ratings=None, prev_mapping=None): if question_idx < 0: question_idx = 0 if question_idx >= len(question_folders): question_idx = len(question_folders) - 1 # 如果需要,保存上一题的评分 save_msg = "" if save_previous and prev_ratings and prev_mapping: prev_scene = question_folders[question_idx - 1] if question_idx > 0 else None if prev_scene: save_msg = save_ratings(prev_scene, prev_ratings, prev_mapping) scene_name = question_folders[question_idx] videos, method_mapping = get_videos_for_question(scene_name) # 更新视频显示 video_updates = [] for i in range(4): method_name = f"Method {chr(65+i)}" if method_name in videos: video_updates.append(gr.Video(value=videos[method_name], visible=True)) else: video_updates.append(gr.Video(value=None, visible=False)) # 重置评分 rating_updates = [gr.Slider(value=3) for _ in range(12)] # 4个视频 x 3个评分 question_markdown = f"**场景 {question_idx + 1} / {len(question_folders)}**: `{scene_name}`" return ( [question_idx, method_mapping, question_markdown, save_msg] + video_updates + rating_updates ) # 收集当前评分 def collect_ratings(*rating_values): ratings = {} for i in range(4): method_name = f"Method {chr(65+i)}" base_idx = i * 3 ratings[method_name] = { "dynamic_quality": rating_values[base_idx], "static_consistency": rating_values[base_idx + 1], "overall_quality": rating_values[base_idx + 2] } return ratings # 下一题按钮 def on_next(question_idx, method_mapping, *rating_values): # 收集当前评分 current_ratings = collect_ratings(*rating_values) # 保存当前评分 scene_name = question_folders[question_idx] save_msg = save_ratings(scene_name, current_ratings, method_mapping) # 移动到下一题 new_idx = question_idx + 1 if new_idx >= len(question_folders): return [ question_idx, method_mapping, f"**✅ 所有场景已完成!/ All scenes completed!**", save_msg + "\n🎉 感谢参与!/ Thank you for participating!" ] + [gr.Video()] * 4 + [gr.Slider(value=3)] * 12 return update_question(new_idx) # 上一题按钮 def on_prev(question_idx, *args): new_idx = question_idx - 1 if new_idx < 0: new_idx = 0 return update_question(new_idx) # 收集所有评分组件 all_rating_inputs = [] for comp in rating_components: all_rating_inputs.extend([comp["dynamic"], comp["static"], comp["overall"]]) # 绑定事件 next_btn.click( on_next, inputs=[current_question_idx, current_method_mapping] + all_rating_inputs, outputs=[ current_question_idx, current_method_mapping, question_text, status_text ] + video_components + all_rating_inputs ) prev_btn.click( on_prev, inputs=[current_question_idx] + all_rating_inputs, outputs=[ current_question_idx, current_method_mapping, question_text, status_text ] + video_components + all_rating_inputs ) # 初始化第一个问题 demo.load( lambda: update_question(0), outputs=[ current_question_idx, current_method_mapping, question_text, status_text ] + video_components + all_rating_inputs ) return demo if __name__ == "__main__": app = create_video_survey_app() if app: app.launch(server_name="0.0.0.0", server_port=7860, share=False) else: print("应用初始化失败 / App initialization failed")