import gradio as gr import os import json from datetime import datetime from huggingface_hub import hf_hub_download import random import csv # 全局变量存储数据集 VIDEO_DATA = None REPO_ID = "WenjiaWang/videoforuser" # 从Hugging Face dataset加载视频 def load_videos_from_huggingface(): global VIDEO_DATA try: print("正在加载数据集: WenjiaWang/videoforuser...") # 定义视频文件列表(手动指定,避免列出所有文件) # 根据你的数据集结构,这里列出所有场景和方法 scenes_methods = { "0O_YyxcC-kA_scene-35_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "3IC8kkbZbsk_scene-61_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "3JZFI0yKr6Y_scene-940_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "9Ixk_PSj__Q_scene-92_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "CEPKsNuSDiQ_scene-31_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "CEPKsNuSDiQ_scene-50_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "F4bBJG9UKL0_scene-35_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "I3EG5SDZn1s_scene-101_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "RTaUgpxLBag_scene-284_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], "c4OnR033eMk_scene-158_3": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], } # 组织视频数据:按场景分组 VIDEO_DATA = {} for scene_name, methods in scenes_methods.items(): VIDEO_DATA[scene_name] = {} for method_name in methods: video_filename = f"{method_name}.mp4" video_path = f"{scene_name}/{video_filename}" try: # 下载视频文件到本地缓存 print(f"下载视频: {video_path}") local_path = hf_hub_download( repo_id=REPO_ID, filename=video_path, repo_type="dataset" ) # 存储视频信息 VIDEO_DATA[scene_name][method_name] = { 'path': video_path, 'local_path': local_path } print(f"✓ 已下载: {video_path} -> {local_path}") except Exception as e: print(f"✗ 下载失败: {video_path} - {e}") continue print(f"组织完成,共 {len(VIDEO_DATA)} 个场景") return True except Exception as e: print(f"加载数据集失败: {e}") import traceback traceback.print_exc() return False # 获取所有场景列表 def get_question_folders(): if VIDEO_DATA is None: success = load_videos_from_huggingface() if not success: return [] return sorted(list(VIDEO_DATA.keys())) # 获取某个场景的所有视频 def get_videos_for_question(scene_name): if VIDEO_DATA is None or scene_name not in VIDEO_DATA: return {}, {} scene_videos = VIDEO_DATA[scene_name] # 创建方法名到真实名称的映射 method_names = list(scene_videos.keys()) # 随机打乱顺序以匿名化 shuffled_methods = method_names.copy() random.shuffle(shuffled_methods) videos = {} method_mapping = {} for i, method_name in enumerate(shuffled_methods): display_name = f"Method {chr(65+i)}" # Method A, B, C, D # 获取视频数据 video_info = scene_videos[method_name] # 使用本地路径 videos[display_name] = video_info['local_path'] method_mapping[display_name] = method_name return videos, method_mapping # 保存评分数据 def save_ratings(scene_name, ratings_data, method_mapping): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 将显示名称映射到真实方法名 mapped_ratings = {} for display_name, ratings in ratings_data.items(): real_method = method_mapping.get(display_name, display_name) mapped_ratings[real_method] = ratings # 读取现有数据 all_data = [] if os.path.exists("ratings_data.json"): try: with open("ratings_data.json", "r", encoding="utf-8") as f: all_data = json.load(f) except: all_data = [] # 添加新数据 entry = { "timestamp": timestamp, "scene": scene_name, "ratings": mapped_ratings } all_data.append(entry) # 保存数据 with open("ratings_data.json", "w", encoding="utf-8") as f: json.dump(all_data, f, ensure_ascii=False, indent=2) return f"✓ 评分已保存 / Ratings saved" # 创建Gradio界面 def create_video_survey_app(): # 预加载数据集 print("初始化应用...") load_videos_from_huggingface() question_folders = get_question_folders() if not question_folders: print("错误: 没有找到任何场景数据") return None print(f"找到 {len(question_folders)} 个场景") with gr.Blocks(title="视频生成质量用户研究", theme=gr.themes.Soft()) as demo: # 问卷内容区域 with gr.Column(visible=True) as survey_area: gr.Markdown("# 🎬 视频生成质量用户研究 / Video Generation Quality User Study") gr.Markdown(""" ### 说明 / Instructions: - 请观看每个视频并进行评分 / Please watch each video and rate them - **💡 提示:可以点击视频右下角全屏按钮观看清晰细节** / Tip: Click fullscreen button to view details - 评分标准 / Rating criteria: - **动态生成质量** / Dynamic Generation Quality: 视频中物体运动的流畅性和真实性 - **静态一致性** / Static Consistency: 视频中静态物体的稳定性和一致性 - **整体质量** / Overall Quality: 视频的整体观感 - 评分范围:1-5分(5分最好)/ Rating scale: 1-5 (5 = Best) - 完成所有评分后,点击底部的"提交所有评分"按钮 / After rating all videos, click "Submit All Ratings" at the bottom """) # 存储所有评分组件 all_scene_components = [] # 为每个场景创建界面 for scene_idx, scene_name in enumerate(question_folders): videos, method_mapping = get_videos_for_question(scene_name) with gr.Group(): gr.Markdown(f"## 场景 {scene_idx + 1} / Scene {scene_idx + 1}: `{scene_name}`") # 4个视频横向排列 with gr.Row(equal_height=True, variant="compact"): scene_ratings = [] for i in range(4): method_name = f"Method {chr(65+i)}" with gr.Column(scale=1, min_width=0): gr.Markdown(f"### 🎥 {method_name}") # 视频 - 使用 autoplay=False 确保一致性 if method_name in videos: video = gr.Video( value=videos[method_name], label="", height=400, autoplay=False, show_label=False ) else: video = gr.Video( value=None, label="", height=400, visible=False, show_label=False ) # 三个评分滑块(垂直排列) dynamic = gr.Slider( minimum=1, maximum=5, step=1, value=3, label="动态质量 / Dynamic" ) static = gr.Slider( minimum=1, maximum=5, step=1, value=3, label="静态一致性 / Static" ) overall = gr.Slider( minimum=1, maximum=5, step=1, value=3, label="整体质量 / Overall" ) scene_ratings.append({ "method": method_name, "dynamic": dynamic, "static": static, "overall": overall }) all_scene_components.append({ "scene_name": scene_name, "method_mapping": method_mapping, "ratings": scene_ratings }) # 提交按钮 gr.Markdown("---") submit_btn = gr.Button("📤 提交所有评分 / Submit All Ratings", variant="primary", size="lg") # 感谢页面(初始隐藏) with gr.Column(visible=False) as thank_you_page: gr.Markdown(""" # ✅ 提交成功!/ Submission Successful! ## 🎉 感谢您的参与!/ Thank you for participating! 您的评分已成功保存。您的反馈对我们非常重要! Your ratings have been saved successfully. Your feedback is very important to us! --- 您现在可以关闭此页面。 You may now close this page. """) status_output = gr.Markdown("") # 提交函数 def submit_all_ratings(*all_rating_values): try: # 解析所有评分 value_idx = 0 for scene_data in all_scene_components: scene_name = scene_data["scene_name"] method_mapping = scene_data["method_mapping"] ratings = {} for rating_comp in scene_data["ratings"]: method_name = rating_comp["method"] ratings[method_name] = { "dynamic_quality": all_rating_values[value_idx], "static_consistency": all_rating_values[value_idx + 1], "overall_quality": all_rating_values[value_idx + 2] } value_idx += 3 # 保存评分 save_ratings(scene_name, ratings, method_mapping) # 隐藏问卷,显示感谢页面 return gr.update(visible=False), gr.update(visible=True), "" except Exception as e: return gr.update(visible=True), gr.update(visible=False), f"❌ **保存失败** / Save failed: {str(e)}" # 收集所有评分输入 all_rating_inputs = [] for scene_data in all_scene_components: for rating_comp in scene_data["ratings"]: all_rating_inputs.extend([ rating_comp["dynamic"], rating_comp["static"], rating_comp["overall"] ]) # 绑定提交按钮 submit_btn.click( submit_all_ratings, inputs=all_rating_inputs, outputs=[survey_area, thank_you_page, status_output] ) return demo if __name__ == "__main__": app = create_video_survey_app() if app: # 添加密码保护的管理员结果查看和分析页面 with gr.Blocks() as admin_app: gr.Markdown("# 🔐 管理员登录") gr.Markdown("请输入管理员密码查看结果") # 管理员密码(从环境变量读取,保护隐私) # 在 Hugging Face Space 设置中添加 Secret: ADMIN_PASSWORD ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123") # 默认密码仅用于本地测试 with gr.Column(visible=True) as login_box: password_input = gr.Textbox( label="密码", type="password", placeholder="请输入管理员密码" ) login_btn = gr.Button("🔓 登录", variant="primary") login_status = gr.Markdown("") with gr.Column(visible=False) as admin_content: gr.Markdown("# 📊 管理员 - 查看结果") gr.Markdown("查看所有用户提交的评分数据,并下载原始 JSON 文件") def load_results(): if os.path.exists("ratings_data.json"): with open("ratings_data.json", "r", encoding="utf-8") as f: data = json.load(f) # 格式化显示 formatted_text = f"📊 总共收集了 {len(data)} 条评分记录\n\n" formatted_text += "=" * 80 + "\n\n" for i, entry in enumerate(data, 1): formatted_text += f"【记录 {i}】\n" formatted_text += f"时间: {entry['timestamp']}\n" formatted_text += f"场景: {entry['scene']}\n" formatted_text += f"评分:\n" for method, ratings in entry['ratings'].items(): formatted_text += f" • {method}:\n" formatted_text += f" - 动态质量: {ratings['dynamic_quality']}\n" formatted_text += f" - 静态一致性: {ratings['static_consistency']}\n" formatted_text += f" - 整体质量: {ratings['overall_quality']}\n" formatted_text += "\n" + "-" * 80 + "\n\n" return formatted_text, "ratings_data.json" else: return "📭 暂无数据\n\n还没有用户提交评分。", None def get_statistics(): if not os.path.exists("ratings_data.json"): return "📭 暂无数据", None, None with open("ratings_data.json", "r", encoding="utf-8") as f: data = json.load(f) if not data: return "📭 暂无数据", None, None # 统计每个方法的平均分 method_stats = {} scene_stats = {} for entry in data: scene = entry['scene'] if scene not in scene_stats: scene_stats[scene] = {} for method, ratings in entry['ratings'].items(): if method not in method_stats: method_stats[method] = { 'dynamic': [], 'static': [], 'overall': [] } method_stats[method]['dynamic'].append(ratings['dynamic_quality']) method_stats[method]['static'].append(ratings['static_consistency']) method_stats[method]['overall'].append(ratings['overall_quality']) # 按场景统计 if method not in scene_stats[scene]: scene_stats[scene][method] = { 'dynamic': [], 'static': [], 'overall': [] } scene_stats[scene][method]['dynamic'].append(ratings['dynamic_quality']) scene_stats[scene][method]['static'].append(ratings['static_consistency']) scene_stats[scene][method]['overall'].append(ratings['overall_quality']) # 计算平均值和提交人数 num_submissions = len(data) stats_text = f"📈 统计分析\n\n" stats_text += f"👥 **提交人数**: {num_submissions}\n\n" stats_text += "=" * 80 + "\n\n" stats_text += "【总体统计 - 按方法】\n\n" for method, scores in sorted(method_stats.items()): avg_dynamic = sum(scores['dynamic']) / len(scores['dynamic']) avg_static = sum(scores['static']) / len(scores['static']) avg_overall = sum(scores['overall']) / len(scores['overall']) avg_total = (avg_dynamic + avg_static + avg_overall) / 3 stats_text += f"🎬 {method}\n" stats_text += f" 样本数: {len(scores['dynamic'])} (来自 {num_submissions} 人)\n" stats_text += f" 动态质量: {avg_dynamic:.2f}\n" stats_text += f" 静态一致性: {avg_static:.2f}\n" stats_text += f" 整体质量: {avg_overall:.2f}\n" stats_text += f" 综合平均: {avg_total:.2f}\n" stats_text += "\n" # 生成CSV文件 - 总体统计 csv_summary_path = "ratings_summary.csv" with open(csv_summary_path, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.writer(f) writer.writerow(['方法', '样本数', '动态质量均分', '静态一致性均分', '整体质量均分', '综合均分']) for method, scores in sorted(method_stats.items()): avg_dynamic = sum(scores['dynamic']) / len(scores['dynamic']) avg_static = sum(scores['static']) / len(scores['static']) avg_overall = sum(scores['overall']) / len(scores['overall']) avg_total = (avg_dynamic + avg_static + avg_overall) / 3 writer.writerow([ method, len(scores['dynamic']), f"{avg_dynamic:.2f}", f"{avg_static:.2f}", f"{avg_overall:.2f}", f"{avg_total:.2f}" ]) # 生成CSV文件 - 详细数据 csv_detail_path = "ratings_detail.csv" with open(csv_detail_path, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.writer(f) writer.writerow(['时间戳', '场景', '方法', '动态质量', '静态一致性', '整体质量']) for entry in data: timestamp = entry['timestamp'] scene = entry['scene'] for method, ratings in entry['ratings'].items(): writer.writerow([ timestamp, scene, method, ratings['dynamic_quality'], ratings['static_consistency'], ratings['overall_quality'] ]) return stats_text, csv_summary_path, csv_detail_path with gr.Tabs(): with gr.Tab("📊 统计分析"): with gr.Row(): stats_refresh_btn = gr.Button("🔄 刷新统计", variant="primary") clear_data_btn = gr.Button("🗑️ 清空所有数据", variant="stop") stats_display = gr.Textbox( label="统计结果", lines=25, max_lines=50, interactive=False ) with gr.Row(): csv_summary_file = gr.File(label="📥 下载汇总统计 CSV", interactive=False) csv_detail_file = gr.File(label="📥 下载详细数据 CSV", interactive=False) json_file = gr.File(label="📥 下载原始 JSON", interactive=False) gr.Markdown(""" **说明**: - **汇总统计 CSV**:每个方法的平均分和样本数 - **详细数据 CSV**:所有原始评分记录,可用于进一步分析 - **原始 JSON**:完整的原始数据 - **清空数据**:删除所有评分记录(谨慎操作!) """) # 清空数据函数 def clear_all_data(): try: if os.path.exists("ratings_data.json"): os.remove("ratings_data.json") return "✅ 所有数据已清空", None, None, None except Exception as e: return f"❌ 清空失败: {str(e)}", None, None, None # 获取统计和文件 def get_stats_and_files(): stats, csv_summary, csv_detail = get_statistics() json_path = "ratings_data.json" if os.path.exists("ratings_data.json") else None return stats, csv_summary, csv_detail, json_path stats_refresh_btn.click( get_stats_and_files, outputs=[stats_display, csv_summary_file, csv_detail_file, json_file] ) clear_data_btn.click( clear_all_data, outputs=[stats_display, csv_summary_file, csv_detail_file, json_file] ) # 登录验证 def check_password(password): if password == ADMIN_PASSWORD: stats, csv_summary, csv_detail = get_statistics() json_path = "ratings_data.json" if os.path.exists("ratings_data.json") else None return ( gr.Column(visible=False), # 隐藏登录框 gr.Column(visible=True), # 显示管理内容 "✅ 登录成功!", stats, csv_summary, csv_detail, json_path ) else: return ( gr.Column(visible=True), # 保持登录框显示 gr.Column(visible=False), # 隐藏管理内容 "❌ 密码错误,请重试", "", None, None, None ) login_btn.click( check_password, inputs=[password_input], outputs=[login_box, admin_content, login_status, stats_display, csv_summary_file, csv_detail_file, json_file] ) # 使用 TabbedInterface 组合两个界面 combined_app = gr.TabbedInterface( [app, admin_app], ["📝 用户问卷 / Survey", "🔐 管理员 / Admin"] ) combined_app.launch( server_name="0.0.0.0", server_port=7860, share=False, allowed_paths=[os.path.expanduser("~/.cache/huggingface")] ) else: print("应用初始化失败 / App initialization failed")