userstudy / app.py
Wenjiawang0312
store
bce9ffb
import gradio as gr
import os
import json
from datetime import datetime
from huggingface_hub import hf_hub_download
import random
import csv
# 全局变量存储数据集
VIDEO_DATA = None
REPO_ID = "WenjiaWang/videoforuser"
# 从Hugging Face dataset加载视频
def load_videos_from_huggingface():
global VIDEO_DATA
try:
print("正在加载数据集: WenjiaWang/videoforuser...")
# 定义视频文件列表(手动指定,避免列出所有文件)
# 根据你的数据集结构,这里列出所有场景和方法
scenes_methods = {
"0O_YyxcC-kA_scene-35_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"3IC8kkbZbsk_scene-61_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"3JZFI0yKr6Y_scene-940_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"9Ixk_PSj__Q_scene-92_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"CEPKsNuSDiQ_scene-31_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"CEPKsNuSDiQ_scene-50_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"F4bBJG9UKL0_scene-35_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"I3EG5SDZn1s_scene-101_2": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"RTaUgpxLBag_scene-284_1": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
"c4OnR033eMk_scene-158_3": ["gen3c", "svc", "trajattn", "stage12_new_epoch-9"],
}
# 组织视频数据:按场景分组
VIDEO_DATA = {}
for scene_name, methods in scenes_methods.items():
VIDEO_DATA[scene_name] = {}
for method_name in methods:
video_filename = f"{method_name}.mp4"
video_path = f"{scene_name}/{video_filename}"
try:
# 下载视频文件到本地缓存
print(f"下载视频: {video_path}")
local_path = hf_hub_download(
repo_id=REPO_ID,
filename=video_path,
repo_type="dataset"
)
# 存储视频信息
VIDEO_DATA[scene_name][method_name] = {
'path': video_path,
'local_path': local_path
}
print(f"✓ 已下载: {video_path} -> {local_path}")
except Exception as e:
print(f"✗ 下载失败: {video_path} - {e}")
continue
print(f"组织完成,共 {len(VIDEO_DATA)} 个场景")
return True
except Exception as e:
print(f"加载数据集失败: {e}")
import traceback
traceback.print_exc()
return False
# 获取所有场景列表
def get_question_folders():
if VIDEO_DATA is None:
success = load_videos_from_huggingface()
if not success:
return []
return sorted(list(VIDEO_DATA.keys()))
# 获取某个场景的所有视频
def get_videos_for_question(scene_name):
if VIDEO_DATA is None or scene_name not in VIDEO_DATA:
return {}, {}
scene_videos = VIDEO_DATA[scene_name]
# 创建方法名到真实名称的映射
method_names = list(scene_videos.keys())
# 随机打乱顺序以匿名化
shuffled_methods = method_names.copy()
random.shuffle(shuffled_methods)
videos = {}
method_mapping = {}
for i, method_name in enumerate(shuffled_methods):
display_name = f"Method {chr(65+i)}" # Method A, B, C, D
# 获取视频数据
video_info = scene_videos[method_name]
# 使用本地路径
videos[display_name] = video_info['local_path']
method_mapping[display_name] = method_name
return videos, method_mapping
# 保存评分数据
def save_ratings(scene_name, ratings_data, method_mapping):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 将显示名称映射到真实方法名
mapped_ratings = {}
for display_name, ratings in ratings_data.items():
real_method = method_mapping.get(display_name, display_name)
mapped_ratings[real_method] = ratings
# 读取现有数据
all_data = []
if os.path.exists("ratings_data.json"):
try:
with open("ratings_data.json", "r", encoding="utf-8") as f:
all_data = json.load(f)
except:
all_data = []
# 添加新数据
entry = {
"timestamp": timestamp,
"scene": scene_name,
"ratings": mapped_ratings
}
all_data.append(entry)
# 保存数据
with open("ratings_data.json", "w", encoding="utf-8") as f:
json.dump(all_data, f, ensure_ascii=False, indent=2)
return f"✓ 评分已保存 / Ratings saved"
# 创建Gradio界面
def create_video_survey_app():
# 预加载数据集
print("初始化应用...")
load_videos_from_huggingface()
question_folders = get_question_folders()
if not question_folders:
print("错误: 没有找到任何场景数据")
return None
print(f"找到 {len(question_folders)} 个场景")
with gr.Blocks(title="视频生成质量用户研究", theme=gr.themes.Soft()) as demo:
# 问卷内容区域
with gr.Column(visible=True) as survey_area:
gr.Markdown("# 🎬 视频生成质量用户研究 / Video Generation Quality User Study")
gr.Markdown("""
### 说明 / Instructions:
- 请观看每个视频并进行评分 / Please watch each video and rate them
- **💡 提示:可以点击视频右下角全屏按钮观看清晰细节** / Tip: Click fullscreen button to view details
- 评分标准 / Rating criteria:
- **动态生成质量** / Dynamic Generation Quality: 视频中物体运动的流畅性和真实性
- **静态一致性** / Static Consistency: 视频中静态物体的稳定性和一致性
- **整体质量** / Overall Quality: 视频的整体观感
- 评分范围:1-5分(5分最好)/ Rating scale: 1-5 (5 = Best)
- 完成所有评分后,点击底部的"提交所有评分"按钮 / After rating all videos, click "Submit All Ratings" at the bottom
""")
# 存储所有评分组件
all_scene_components = []
# 为每个场景创建界面
for scene_idx, scene_name in enumerate(question_folders):
videos, method_mapping = get_videos_for_question(scene_name)
with gr.Group():
gr.Markdown(f"## 场景 {scene_idx + 1} / Scene {scene_idx + 1}: `{scene_name}`")
# 4个视频横向排列
with gr.Row(equal_height=True, variant="compact"):
scene_ratings = []
for i in range(4):
method_name = f"Method {chr(65+i)}"
with gr.Column(scale=1, min_width=0):
gr.Markdown(f"### 🎥 {method_name}")
# 视频 - 使用 autoplay=False 确保一致性
if method_name in videos:
video = gr.Video(
value=videos[method_name],
label="",
height=400,
autoplay=False,
show_label=False
)
else:
video = gr.Video(
value=None,
label="",
height=400,
visible=False,
show_label=False
)
# 三个评分滑块(垂直排列)
dynamic = gr.Slider(
minimum=1, maximum=5, step=1, value=3,
label="动态质量 / Dynamic"
)
static = gr.Slider(
minimum=1, maximum=5, step=1, value=3,
label="静态一致性 / Static"
)
overall = gr.Slider(
minimum=1, maximum=5, step=1, value=3,
label="整体质量 / Overall"
)
scene_ratings.append({
"method": method_name,
"dynamic": dynamic,
"static": static,
"overall": overall
})
all_scene_components.append({
"scene_name": scene_name,
"method_mapping": method_mapping,
"ratings": scene_ratings
})
# 提交按钮
gr.Markdown("---")
submit_btn = gr.Button("📤 提交所有评分 / Submit All Ratings", variant="primary", size="lg")
# 感谢页面(初始隐藏)
with gr.Column(visible=False) as thank_you_page:
gr.Markdown("""
# ✅ 提交成功!/ Submission Successful!
## 🎉 感谢您的参与!/ Thank you for participating!
您的评分已成功保存。您的反馈对我们非常重要!
Your ratings have been saved successfully. Your feedback is very important to us!
---
您现在可以关闭此页面。
You may now close this page.
""")
status_output = gr.Markdown("")
# 提交函数
def submit_all_ratings(*all_rating_values):
try:
# 解析所有评分
value_idx = 0
for scene_data in all_scene_components:
scene_name = scene_data["scene_name"]
method_mapping = scene_data["method_mapping"]
ratings = {}
for rating_comp in scene_data["ratings"]:
method_name = rating_comp["method"]
ratings[method_name] = {
"dynamic_quality": all_rating_values[value_idx],
"static_consistency": all_rating_values[value_idx + 1],
"overall_quality": all_rating_values[value_idx + 2]
}
value_idx += 3
# 保存评分
save_ratings(scene_name, ratings, method_mapping)
# 隐藏问卷,显示感谢页面
return gr.update(visible=False), gr.update(visible=True), ""
except Exception as e:
return gr.update(visible=True), gr.update(visible=False), f"❌ **保存失败** / Save failed: {str(e)}"
# 收集所有评分输入
all_rating_inputs = []
for scene_data in all_scene_components:
for rating_comp in scene_data["ratings"]:
all_rating_inputs.extend([
rating_comp["dynamic"],
rating_comp["static"],
rating_comp["overall"]
])
# 绑定提交按钮
submit_btn.click(
submit_all_ratings,
inputs=all_rating_inputs,
outputs=[survey_area, thank_you_page, status_output]
)
return demo
if __name__ == "__main__":
app = create_video_survey_app()
if app:
# 添加密码保护的管理员结果查看和分析页面
with gr.Blocks() as admin_app:
gr.Markdown("# 🔐 管理员登录")
gr.Markdown("请输入管理员密码查看结果")
# 管理员密码(从环境变量读取,保护隐私)
# 在 Hugging Face Space 设置中添加 Secret: ADMIN_PASSWORD
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123") # 默认密码仅用于本地测试
with gr.Column(visible=True) as login_box:
password_input = gr.Textbox(
label="密码",
type="password",
placeholder="请输入管理员密码"
)
login_btn = gr.Button("🔓 登录", variant="primary")
login_status = gr.Markdown("")
with gr.Column(visible=False) as admin_content:
gr.Markdown("# 📊 管理员 - 查看结果")
gr.Markdown("查看所有用户提交的评分数据,并下载原始 JSON 文件")
def load_results():
if os.path.exists("ratings_data.json"):
with open("ratings_data.json", "r", encoding="utf-8") as f:
data = json.load(f)
# 格式化显示
formatted_text = f"📊 总共收集了 {len(data)} 条评分记录\n\n"
formatted_text += "=" * 80 + "\n\n"
for i, entry in enumerate(data, 1):
formatted_text += f"【记录 {i}】\n"
formatted_text += f"时间: {entry['timestamp']}\n"
formatted_text += f"场景: {entry['scene']}\n"
formatted_text += f"评分:\n"
for method, ratings in entry['ratings'].items():
formatted_text += f" • {method}:\n"
formatted_text += f" - 动态质量: {ratings['dynamic_quality']}\n"
formatted_text += f" - 静态一致性: {ratings['static_consistency']}\n"
formatted_text += f" - 整体质量: {ratings['overall_quality']}\n"
formatted_text += "\n" + "-" * 80 + "\n\n"
return formatted_text, "ratings_data.json"
else:
return "📭 暂无数据\n\n还没有用户提交评分。", None
def get_statistics():
if not os.path.exists("ratings_data.json"):
return "📭 暂无数据", None, None
with open("ratings_data.json", "r", encoding="utf-8") as f:
data = json.load(f)
if not data:
return "📭 暂无数据", None, None
# 统计每个方法的平均分
method_stats = {}
scene_stats = {}
for entry in data:
scene = entry['scene']
if scene not in scene_stats:
scene_stats[scene] = {}
for method, ratings in entry['ratings'].items():
if method not in method_stats:
method_stats[method] = {
'dynamic': [],
'static': [],
'overall': []
}
method_stats[method]['dynamic'].append(ratings['dynamic_quality'])
method_stats[method]['static'].append(ratings['static_consistency'])
method_stats[method]['overall'].append(ratings['overall_quality'])
# 按场景统计
if method not in scene_stats[scene]:
scene_stats[scene][method] = {
'dynamic': [],
'static': [],
'overall': []
}
scene_stats[scene][method]['dynamic'].append(ratings['dynamic_quality'])
scene_stats[scene][method]['static'].append(ratings['static_consistency'])
scene_stats[scene][method]['overall'].append(ratings['overall_quality'])
# 计算平均值和提交人数
num_submissions = len(data)
stats_text = f"📈 统计分析\n\n"
stats_text += f"👥 **提交人数**: {num_submissions}\n\n"
stats_text += "=" * 80 + "\n\n"
stats_text += "【总体统计 - 按方法】\n\n"
for method, scores in sorted(method_stats.items()):
avg_dynamic = sum(scores['dynamic']) / len(scores['dynamic'])
avg_static = sum(scores['static']) / len(scores['static'])
avg_overall = sum(scores['overall']) / len(scores['overall'])
avg_total = (avg_dynamic + avg_static + avg_overall) / 3
stats_text += f"🎬 {method}\n"
stats_text += f" 样本数: {len(scores['dynamic'])} (来自 {num_submissions} 人)\n"
stats_text += f" 动态质量: {avg_dynamic:.2f}\n"
stats_text += f" 静态一致性: {avg_static:.2f}\n"
stats_text += f" 整体质量: {avg_overall:.2f}\n"
stats_text += f" 综合平均: {avg_total:.2f}\n"
stats_text += "\n"
# 生成CSV文件 - 总体统计
csv_summary_path = "ratings_summary.csv"
with open(csv_summary_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(['方法', '样本数', '动态质量均分', '静态一致性均分', '整体质量均分', '综合均分'])
for method, scores in sorted(method_stats.items()):
avg_dynamic = sum(scores['dynamic']) / len(scores['dynamic'])
avg_static = sum(scores['static']) / len(scores['static'])
avg_overall = sum(scores['overall']) / len(scores['overall'])
avg_total = (avg_dynamic + avg_static + avg_overall) / 3
writer.writerow([
method,
len(scores['dynamic']),
f"{avg_dynamic:.2f}",
f"{avg_static:.2f}",
f"{avg_overall:.2f}",
f"{avg_total:.2f}"
])
# 生成CSV文件 - 详细数据
csv_detail_path = "ratings_detail.csv"
with open(csv_detail_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(['时间戳', '场景', '方法', '动态质量', '静态一致性', '整体质量'])
for entry in data:
timestamp = entry['timestamp']
scene = entry['scene']
for method, ratings in entry['ratings'].items():
writer.writerow([
timestamp,
scene,
method,
ratings['dynamic_quality'],
ratings['static_consistency'],
ratings['overall_quality']
])
return stats_text, csv_summary_path, csv_detail_path
with gr.Tabs():
with gr.Tab("📊 统计分析"):
with gr.Row():
stats_refresh_btn = gr.Button("🔄 刷新统计", variant="primary")
clear_data_btn = gr.Button("🗑️ 清空所有数据", variant="stop")
stats_display = gr.Textbox(
label="统计结果",
lines=25,
max_lines=50,
interactive=False
)
with gr.Row():
csv_summary_file = gr.File(label="📥 下载汇总统计 CSV", interactive=False)
csv_detail_file = gr.File(label="📥 下载详细数据 CSV", interactive=False)
json_file = gr.File(label="📥 下载原始 JSON", interactive=False)
gr.Markdown("""
**说明**:
- **汇总统计 CSV**:每个方法的平均分和样本数
- **详细数据 CSV**:所有原始评分记录,可用于进一步分析
- **原始 JSON**:完整的原始数据
- **清空数据**:删除所有评分记录(谨慎操作!)
""")
# 清空数据函数
def clear_all_data():
try:
if os.path.exists("ratings_data.json"):
os.remove("ratings_data.json")
return "✅ 所有数据已清空", None, None, None
except Exception as e:
return f"❌ 清空失败: {str(e)}", None, None, None
# 获取统计和文件
def get_stats_and_files():
stats, csv_summary, csv_detail = get_statistics()
json_path = "ratings_data.json" if os.path.exists("ratings_data.json") else None
return stats, csv_summary, csv_detail, json_path
stats_refresh_btn.click(
get_stats_and_files,
outputs=[stats_display, csv_summary_file, csv_detail_file, json_file]
)
clear_data_btn.click(
clear_all_data,
outputs=[stats_display, csv_summary_file, csv_detail_file, json_file]
)
# 登录验证
def check_password(password):
if password == ADMIN_PASSWORD:
stats, csv_summary, csv_detail = get_statistics()
json_path = "ratings_data.json" if os.path.exists("ratings_data.json") else None
return (
gr.Column(visible=False), # 隐藏登录框
gr.Column(visible=True), # 显示管理内容
"✅ 登录成功!",
stats,
csv_summary,
csv_detail,
json_path
)
else:
return (
gr.Column(visible=True), # 保持登录框显示
gr.Column(visible=False), # 隐藏管理内容
"❌ 密码错误,请重试",
"", None, None, None
)
login_btn.click(
check_password,
inputs=[password_input],
outputs=[login_box, admin_content, login_status, stats_display, csv_summary_file, csv_detail_file, json_file]
)
# 使用 TabbedInterface 组合两个界面
combined_app = gr.TabbedInterface(
[app, admin_app],
["📝 用户问卷 / Survey", "🔐 管理员 / Admin"]
)
combined_app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
allowed_paths=[os.path.expanduser("~/.cache/huggingface")]
)
else:
print("应用初始化失败 / App initialization failed")