Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| from datetime import datetime | |
| from huggingface_hub import hf_hub_download | |
| import random | |
| import csv | |
| # 全局变量存储数据集 | |
| VIDEO_DATA = None | |
| REPO_ID = "WenjiaWang/videoforuser" | |
| # 从Hugging Face dataset加载视频 | |
| def load_videos_from_huggingface(): | |
| global VIDEO_DATA | |
| try: | |
| print("正在加载数据集: WenjiaWang/videoforuser...") | |
| # 定义视频文件列表(手动指定,避免列出所有文件) | |
| # 根据你的数据集结构,这里列出所有场景和方法 | |
| scenes_methods = { | |
| "0O_YyxcC-kA_scene-35_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "3IC8kkbZbsk_scene-61_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "3JZFI0yKr6Y_scene-940_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "9Ixk_PSj__Q_scene-92_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "CEPKsNuSDiQ_scene-31_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "CEPKsNuSDiQ_scene-50_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "F4bBJG9UKL0_scene-35_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "I3EG5SDZn1s_scene-101_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "RTaUgpxLBag_scene-284_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| "c4OnR033eMk_scene-158_3": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"], | |
| } | |
| # 组织视频数据:按场景分组 | |
| VIDEO_DATA = {} | |
| for scene_name, methods in scenes_methods.items(): | |
| VIDEO_DATA[scene_name] = {} | |
| for method_name in methods: | |
| video_filename = f"{method_name}.mp4" | |
| video_path = f"{scene_name}/{video_filename}" | |
| try: | |
| # 下载视频文件到本地缓存 | |
| print(f"下载视频: {video_path}") | |
| local_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=video_path, | |
| repo_type="dataset" | |
| ) | |
| # 存储视频信息 | |
| VIDEO_DATA[scene_name][method_name] = { | |
| 'path': video_path, | |
| 'local_path': local_path | |
| } | |
| print(f"✓ 已下载: {video_path} -> {local_path}") | |
| except Exception as e: | |
| print(f"✗ 下载失败: {video_path} - {e}") | |
| continue | |
| print(f"组织完成,共 {len(VIDEO_DATA)} 个场景") | |
| return True | |
| except Exception as e: | |
| print(f"加载数据集失败: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| # 获取所有场景列表 | |
| def get_question_folders(): | |
| if VIDEO_DATA is None: | |
| success = load_videos_from_huggingface() | |
| if not success: | |
| return [] | |
| return sorted(list(VIDEO_DATA.keys())) | |
| # 获取某个场景的所有视频 | |
| def get_videos_for_question(scene_name): | |
| if VIDEO_DATA is None or scene_name not in VIDEO_DATA: | |
| return {}, {} | |
| scene_videos = VIDEO_DATA[scene_name] | |
| # 创建方法名到真实名称的映射 | |
| method_names = list(scene_videos.keys()) | |
| # 随机打乱顺序以匿名化 | |
| shuffled_methods = method_names.copy() | |
| random.shuffle(shuffled_methods) | |
| videos = {} | |
| method_mapping = {} | |
| for i, method_name in enumerate(shuffled_methods): | |
| display_name = f"Method {chr(65+i)}" # Method A, B, C, D | |
| # 获取视频数据 | |
| video_info = scene_videos[method_name] | |
| # 使用本地路径 | |
| videos[display_name] = video_info['local_path'] | |
| method_mapping[display_name] = method_name | |
| return videos, method_mapping | |
| # 保存评分数据 | |
| def save_ratings(scene_name, ratings_data, method_mapping): | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 将显示名称映射到真实方法名 | |
| mapped_ratings = {} | |
| for display_name, ratings in ratings_data.items(): | |
| real_method = method_mapping.get(display_name, display_name) | |
| mapped_ratings[real_method] = ratings | |
| # 读取现有数据 | |
| all_data = [] | |
| if os.path.exists("ratings_data.json"): | |
| try: | |
| with open("ratings_data.json", "r", encoding="utf-8") as f: | |
| all_data = json.load(f) | |
| except: | |
| all_data = [] | |
| # 添加新数据 | |
| entry = { | |
| "timestamp": timestamp, | |
| "scene": scene_name, | |
| "ratings": mapped_ratings | |
| } | |
| all_data.append(entry) | |
| # 保存数据 | |
| with open("ratings_data.json", "w", encoding="utf-8") as f: | |
| json.dump(all_data, f, ensure_ascii=False, indent=2) | |
| return f"✓ 评分已保存 / Ratings saved" | |
| # 创建Gradio界面 | |
| def create_video_survey_app(): | |
| # 预加载数据集 | |
| print("初始化应用...") | |
| load_videos_from_huggingface() | |
| question_folders = get_question_folders() | |
| if not question_folders: | |
| print("错误: 没有找到任何场景数据") | |
| return None | |
| print(f"找到 {len(question_folders)} 个场景") | |
| with gr.Blocks(title="视频生成质量用户研究", theme=gr.themes.Soft()) as demo: | |
| # 问卷内容区域 | |
| with gr.Column(visible=True) as survey_area: | |
| gr.Markdown("# 🎬 视频生成质量用户研究 / Video Generation Quality User Study") | |
| gr.Markdown(""" | |
| ### 说明 / Instructions: | |
| - 请观看每个视频并进行评分 / Please watch each video and rate them | |
| - **💡 提示:可以点击视频右下角全屏按钮观看清晰细节** / Tip: Click fullscreen button to view details | |
| - 评分标准 / Rating criteria: | |
| - **动态生成质量** / Dynamic Generation Quality: 视频中物体运动的流畅性和真实性 | |
| - **静态一致性** / Static Consistency: 视频中静态物体的稳定性和一致性 | |
| - **整体质量** / Overall Quality: 视频的整体观感 | |
| - 评分范围:1-5分(5分最好)/ Rating scale: 1-5 (5 = Best) | |
| - 完成所有评分后,点击底部的"提交所有评分"按钮 / After rating all videos, click "Submit All Ratings" at the bottom | |
| """) | |
| # 存储所有评分组件 | |
| all_scene_components = [] | |
| # 为每个场景创建界面 | |
| for scene_idx, scene_name in enumerate(question_folders): | |
| videos, method_mapping = get_videos_for_question(scene_name) | |
| with gr.Group(): | |
| gr.Markdown(f"## 场景 {scene_idx + 1} / Scene {scene_idx + 1}: `{scene_name}`") | |
| # 4个视频横向排列 | |
| with gr.Row(equal_height=True, variant="compact"): | |
| scene_ratings = [] | |
| for i in range(4): | |
| method_name = f"Method {chr(65+i)}" | |
| with gr.Column(scale=1, min_width=0): | |
| gr.Markdown(f"### 🎥 {method_name}") | |
| # 视频 - 使用 autoplay=False 确保一致性 | |
| if method_name in videos: | |
| video = gr.Video( | |
| value=videos[method_name], | |
| label="", | |
| height=400, | |
| autoplay=False, | |
| show_label=False | |
| ) | |
| else: | |
| video = gr.Video( | |
| value=None, | |
| label="", | |
| height=400, | |
| visible=False, | |
| show_label=False | |
| ) | |
| # 三个评分滑块(垂直排列) | |
| dynamic = gr.Slider( | |
| minimum=1, maximum=5, step=1, value=3, | |
| label="动态质量 / Dynamic" | |
| ) | |
| static = gr.Slider( | |
| minimum=1, maximum=5, step=1, value=3, | |
| label="静态一致性 / Static" | |
| ) | |
| overall = gr.Slider( | |
| minimum=1, maximum=5, step=1, value=3, | |
| label="整体质量 / Overall" | |
| ) | |
| scene_ratings.append({ | |
| "method": method_name, | |
| "dynamic": dynamic, | |
| "static": static, | |
| "overall": overall | |
| }) | |
| all_scene_components.append({ | |
| "scene_name": scene_name, | |
| "method_mapping": method_mapping, | |
| "ratings": scene_ratings | |
| }) | |
| # 提交按钮 | |
| gr.Markdown("---") | |
| submit_btn = gr.Button("📤 提交所有评分 / Submit All Ratings", variant="primary", size="lg") | |
| # 感谢页面(初始隐藏) | |
| with gr.Column(visible=False) as thank_you_page: | |
| gr.Markdown(""" | |
| # ✅ 提交成功!/ Submission Successful! | |
| ## 🎉 感谢您的参与!/ Thank you for participating! | |
| 您的评分已成功保存。您的反馈对我们非常重要! | |
| Your ratings have been saved successfully. Your feedback is very important to us! | |
| --- | |
| 您现在可以关闭此页面。 | |
| You may now close this page. | |
| """) | |
| status_output = gr.Markdown("") | |
| # 提交函数 | |
| def submit_all_ratings(*all_rating_values): | |
| try: | |
| # 解析所有评分 | |
| value_idx = 0 | |
| for scene_data in all_scene_components: | |
| scene_name = scene_data["scene_name"] | |
| method_mapping = scene_data["method_mapping"] | |
| ratings = {} | |
| for rating_comp in scene_data["ratings"]: | |
| method_name = rating_comp["method"] | |
| ratings[method_name] = { | |
| "dynamic_quality": all_rating_values[value_idx], | |
| "static_consistency": all_rating_values[value_idx + 1], | |
| "overall_quality": all_rating_values[value_idx + 2] | |
| } | |
| value_idx += 3 | |
| # 保存评分 | |
| save_ratings(scene_name, ratings, method_mapping) | |
| # 隐藏问卷,显示感谢页面 | |
| return gr.update(visible=False), gr.update(visible=True), "" | |
| except Exception as e: | |
| return gr.update(visible=True), gr.update(visible=False), f"❌ **保存失败** / Save failed: {str(e)}" | |
| # 收集所有评分输入 | |
| all_rating_inputs = [] | |
| for scene_data in all_scene_components: | |
| for rating_comp in scene_data["ratings"]: | |
| all_rating_inputs.extend([ | |
| rating_comp["dynamic"], | |
| rating_comp["static"], | |
| rating_comp["overall"] | |
| ]) | |
| # 绑定提交按钮 | |
| submit_btn.click( | |
| submit_all_ratings, | |
| inputs=all_rating_inputs, | |
| outputs=[survey_area, thank_you_page, status_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| app = create_video_survey_app() | |
| if app: | |
| # 添加密码保护的管理员结果查看和分析页面 | |
| with gr.Blocks() as admin_app: | |
| gr.Markdown("# 🔐 管理员登录") | |
| gr.Markdown("请输入管理员密码查看结果") | |
| # 管理员密码(从环境变量读取,保护隐私) | |
| # 在 Hugging Face Space 设置中添加 Secret: ADMIN_PASSWORD | |
| ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123") # 默认密码仅用于本地测试 | |
| with gr.Column(visible=True) as login_box: | |
| password_input = gr.Textbox( | |
| label="密码", | |
| type="password", | |
| placeholder="请输入管理员密码" | |
| ) | |
| login_btn = gr.Button("🔓 登录", variant="primary") | |
| login_status = gr.Markdown("") | |
| with gr.Column(visible=False) as admin_content: | |
| gr.Markdown("# 📊 管理员 - 查看结果") | |
| gr.Markdown("查看所有用户提交的评分数据,并下载原始 JSON 文件") | |
| def load_results(): | |
| if os.path.exists("ratings_data.json"): | |
| with open("ratings_data.json", "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # 格式化显示 | |
| formatted_text = f"📊 总共收集了 {len(data)} 条评分记录\n\n" | |
| formatted_text += "=" * 80 + "\n\n" | |
| for i, entry in enumerate(data, 1): | |
| formatted_text += f"【记录 {i}】\n" | |
| formatted_text += f"时间: {entry['timestamp']}\n" | |
| formatted_text += f"场景: {entry['scene']}\n" | |
| formatted_text += f"评分:\n" | |
| for method, ratings in entry['ratings'].items(): | |
| formatted_text += f" • {method}:\n" | |
| formatted_text += f" - 动态质量: {ratings['dynamic_quality']}\n" | |
| formatted_text += f" - 静态一致性: {ratings['static_consistency']}\n" | |
| formatted_text += f" - 整体质量: {ratings['overall_quality']}\n" | |
| formatted_text += "\n" + "-" * 80 + "\n\n" | |
| return formatted_text, "ratings_data.json" | |
| else: | |
| return "📭 暂无数据\n\n还没有用户提交评分。", None | |
| def get_statistics(): | |
| if not os.path.exists("ratings_data.json"): | |
| return "📭 暂无数据", None, None | |
| with open("ratings_data.json", "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if not data: | |
| return "📭 暂无数据", None, None | |
| # 统计每个方法的平均分 | |
| method_stats = {} | |
| scene_stats = {} | |
| for entry in data: | |
| scene = entry['scene'] | |
| if scene not in scene_stats: | |
| scene_stats[scene] = {} | |
| for method, ratings in entry['ratings'].items(): | |
| if method not in method_stats: | |
| method_stats[method] = { | |
| 'dynamic': [], | |
| 'static': [], | |
| 'overall': [] | |
| } | |
| method_stats[method]['dynamic'].append(ratings['dynamic_quality']) | |
| method_stats[method]['static'].append(ratings['static_consistency']) | |
| method_stats[method]['overall'].append(ratings['overall_quality']) | |
| # 按场景统计 | |
| if method not in scene_stats[scene]: | |
| scene_stats[scene][method] = { | |
| 'dynamic': [], | |
| 'static': [], | |
| 'overall': [] | |
| } | |
| scene_stats[scene][method]['dynamic'].append(ratings['dynamic_quality']) | |
| scene_stats[scene][method]['static'].append(ratings['static_consistency']) | |
| scene_stats[scene][method]['overall'].append(ratings['overall_quality']) | |
| # 计算平均值和提交人数 | |
| num_submissions = len(data) | |
| stats_text = f"📈 统计分析\n\n" | |
| stats_text += f"👥 **提交人数**: {num_submissions}\n\n" | |
| stats_text += "=" * 80 + "\n\n" | |
| stats_text += "【总体统计 - 按方法】\n\n" | |
| for method, scores in sorted(method_stats.items()): | |
| avg_dynamic = sum(scores['dynamic']) / len(scores['dynamic']) | |
| avg_static = sum(scores['static']) / len(scores['static']) | |
| avg_overall = sum(scores['overall']) / len(scores['overall']) | |
| avg_total = (avg_dynamic + avg_static + avg_overall) / 3 | |
| stats_text += f"🎬 {method}\n" | |
| stats_text += f" 样本数: {len(scores['dynamic'])} (来自 {num_submissions} 人)\n" | |
| stats_text += f" 动态质量: {avg_dynamic:.2f}\n" | |
| stats_text += f" 静态一致性: {avg_static:.2f}\n" | |
| stats_text += f" 整体质量: {avg_overall:.2f}\n" | |
| stats_text += f" 综合平均: {avg_total:.2f}\n" | |
| stats_text += "\n" | |
| # 生成CSV文件 - 总体统计 | |
| csv_summary_path = "ratings_summary.csv" | |
| with open(csv_summary_path, 'w', newline='', encoding='utf-8-sig') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(['方法', '样本数', '动态质量均分', '静态一致性均分', '整体质量均分', '综合均分']) | |
| for method, scores in sorted(method_stats.items()): | |
| avg_dynamic = sum(scores['dynamic']) / len(scores['dynamic']) | |
| avg_static = sum(scores['static']) / len(scores['static']) | |
| avg_overall = sum(scores['overall']) / len(scores['overall']) | |
| avg_total = (avg_dynamic + avg_static + avg_overall) / 3 | |
| writer.writerow([ | |
| method, | |
| len(scores['dynamic']), | |
| f"{avg_dynamic:.2f}", | |
| f"{avg_static:.2f}", | |
| f"{avg_overall:.2f}", | |
| f"{avg_total:.2f}" | |
| ]) | |
| # 生成CSV文件 - 详细数据 | |
| csv_detail_path = "ratings_detail.csv" | |
| with open(csv_detail_path, 'w', newline='', encoding='utf-8-sig') as f: | |
| writer = csv.writer(f) | |
| writer.writerow(['时间戳', '场景', '方法', '动态质量', '静态一致性', '整体质量']) | |
| for entry in data: | |
| timestamp = entry['timestamp'] | |
| scene = entry['scene'] | |
| for method, ratings in entry['ratings'].items(): | |
| writer.writerow([ | |
| timestamp, | |
| scene, | |
| method, | |
| ratings['dynamic_quality'], | |
| ratings['static_consistency'], | |
| ratings['overall_quality'] | |
| ]) | |
| return stats_text, csv_summary_path, csv_detail_path | |
| with gr.Tabs(): | |
| with gr.Tab("📊 统计分析"): | |
| with gr.Row(): | |
| stats_refresh_btn = gr.Button("🔄 刷新统计", variant="primary") | |
| clear_data_btn = gr.Button("🗑️ 清空所有数据", variant="stop") | |
| stats_display = gr.Textbox( | |
| label="统计结果", | |
| lines=25, | |
| max_lines=50, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| csv_summary_file = gr.File(label="📥 下载汇总统计 CSV", interactive=False) | |
| csv_detail_file = gr.File(label="📥 下载详细数据 CSV", interactive=False) | |
| json_file = gr.File(label="📥 下载原始 JSON", interactive=False) | |
| gr.Markdown(""" | |
| **说明**: | |
| - **汇总统计 CSV**:每个方法的平均分和样本数 | |
| - **详细数据 CSV**:所有原始评分记录,可用于进一步分析 | |
| - **原始 JSON**:完整的原始数据 | |
| - **清空数据**:删除所有评分记录(谨慎操作!) | |
| """) | |
| # 清空数据函数 | |
| def clear_all_data(): | |
| try: | |
| if os.path.exists("ratings_data.json"): | |
| os.remove("ratings_data.json") | |
| return "✅ 所有数据已清空", None, None, None | |
| except Exception as e: | |
| return f"❌ 清空失败: {str(e)}", None, None, None | |
| # 获取统计和文件 | |
| def get_stats_and_files(): | |
| stats, csv_summary, csv_detail = get_statistics() | |
| json_path = "ratings_data.json" if os.path.exists("ratings_data.json") else None | |
| return stats, csv_summary, csv_detail, json_path | |
| stats_refresh_btn.click( | |
| get_stats_and_files, | |
| outputs=[stats_display, csv_summary_file, csv_detail_file, json_file] | |
| ) | |
| clear_data_btn.click( | |
| clear_all_data, | |
| outputs=[stats_display, csv_summary_file, csv_detail_file, json_file] | |
| ) | |
| # 登录验证 | |
| def check_password(password): | |
| if password == ADMIN_PASSWORD: | |
| stats, csv_summary, csv_detail = get_statistics() | |
| json_path = "ratings_data.json" if os.path.exists("ratings_data.json") else None | |
| return ( | |
| gr.Column(visible=False), # 隐藏登录框 | |
| gr.Column(visible=True), # 显示管理内容 | |
| "✅ 登录成功!", | |
| stats, | |
| csv_summary, | |
| csv_detail, | |
| json_path | |
| ) | |
| else: | |
| return ( | |
| gr.Column(visible=True), # 保持登录框显示 | |
| gr.Column(visible=False), # 隐藏管理内容 | |
| "❌ 密码错误,请重试", | |
| "", None, None, None | |
| ) | |
| login_btn.click( | |
| check_password, | |
| inputs=[password_input], | |
| outputs=[login_box, admin_content, login_status, stats_display, csv_summary_file, csv_detail_file, json_file] | |
| ) | |
| # 使用 TabbedInterface 组合两个界面 | |
| combined_app = gr.TabbedInterface( | |
| [app, admin_app], | |
| ["📝 用户问卷 / Survey", "🔐 管理员 / Admin"] | |
| ) | |
| combined_app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| allowed_paths=[os.path.expanduser("~/.cache/huggingface")] | |
| ) | |
| else: | |
| print("应用初始化失败 / App initialization failed") | |