File size: 54,183 Bytes
e0a5cab a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 a4a89bf e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b fb9f14e 9976f8b 03b7af6 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 9976f8b e1d6d08 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 |
import gradio as gr
import os
import random
import time
from datetime import datetime
from functools import partial
import json
import io
from huggingface_hub import HfApi
from huggingface_hub.hf_api import HfHubHTTPError
import traceback
from itertools import combinations, product
# ==== 全局配置 ====
# ---- 测试模式开关 ----
REPEAT_SINGLE_TARGET_FOR_TESTING = False # 设置为 True 以启用“重复单一目标图”测试模式
NUM_REPEATED_TRIALS_FOR_TESTING = 5 # 在该测试模式下,单个目标图片重复的次数 (原为20,改为5方便测试)
# ---- 常规配置 ----
BASE_IMAGE_DIR = "/data/images/images"
TARGET_DIR_BASENAME = "gt"
TARGET_DIR = os.path.join(BASE_IMAGE_DIR, TARGET_DIR_BASENAME)
METHOD_ROOTS = []
if os.path.exists(BASE_IMAGE_DIR):
try:
METHOD_ROOTS = [
os.path.join(BASE_IMAGE_DIR, d)
for d in os.listdir(BASE_IMAGE_DIR)
if os.path.isdir(os.path.join(BASE_IMAGE_DIR, d)) and \
d != TARGET_DIR_BASENAME and \
not d.startswith('.')
]
if not METHOD_ROOTS: print(f"警告:在 '{BASE_IMAGE_DIR}' 中没有找到有效的方法目录 (除了 '{TARGET_DIR_BASENAME}')。")
else: print(f"已识别的方法根目录 (原始): {METHOD_ROOTS}")
except Exception as e: print(f"错误:在扫描 '{BASE_IMAGE_DIR}' 时发生错误: {e}"); METHOD_ROOTS = []
else: print(f"警告:基础目录 '{BASE_IMAGE_DIR}' 不存在。将无法加载候选图片。")
SUBJECTS = ["subj01", "subj02", "subj05", "subj07"] # 修正了 "subj05," 的拼写
SENTINEL_TRIAL_INTERVAL = 20
NUM_TRIALS_PER_RUN = 100 # 正常运行时的每轮试验数
LOG_BATCH_SIZE = 20
DATASET_REPO_ID = "YanmHa/image-aligned-experiment-data"
BATCH_LOG_FOLDER = "run_logs_batch"
CSS = ".gr-block {margin-top: 4px !important; margin-bottom: 4px !important;} .compact_button { padding: 4px 8px; min-width: auto; }"
# ---- 测试模式的列表缩减逻辑 (仅当 REPEAT_SINGLE_TARGET_FOR_TESTING 为 True 时生效) ----
if REPEAT_SINGLE_TARGET_FOR_TESTING:
print(f"--- 特殊测试模式 (重复单一目标图) 已激活 ---")
NUM_TRIALS_PER_RUN = NUM_REPEATED_TRIALS_FOR_TESTING # 确保UI显示的数字和实际测试一致
print(f"测试模式:NUM_TRIALS_PER_RUN 已被设置为: {NUM_TRIALS_PER_RUN}")
if METHOD_ROOTS:
original_method_roots_count = len(METHOD_ROOTS)
METHOD_ROOTS = [METHOD_ROOTS[0]]
print(f"测试模式:METHOD_ROOTS 已从 {original_method_roots_count} 个缩减为仅包含第一个方法: {METHOD_ROOTS}")
else:
print("测试模式警告:METHOD_ROOTS 为空,无法缩减。")
if len(METHOD_ROOTS) == 1: # 只有一种方法时,确保至少有两个subject以形成对比
if len(SUBJECTS) >= 2:
original_subjects_count = len(SUBJECTS)
SUBJECTS = [SUBJECTS[0], SUBJECTS[1]]
print(f"测试模式:由于方法仅1种,SUBJECTS 已从 {original_subjects_count} 个缩减为前两个: {SUBJECTS}")
elif SUBJECTS:
print(f"测试模式:SUBJECTS 只有一个元素 ({SUBJECTS}),方法也只有一种。注意:可能无法形成候选对。")
else:
print("测试模式警告:SUBJECTS 为空。")
print(f"--- 特殊测试模式配置结束 ---")
else:
print(f"正常模式:使用完整配置。每轮目标试验数: {NUM_TRIALS_PER_RUN}")
print(f"方法根目录: {METHOD_ROOTS}")
print(f"Subjects: {SUBJECTS}")
# ==== 全局持久化历史记录 ====
PERSISTENT_STORAGE_BASE = "/data"
DATA_SUBDIR_NAME = "my_user_study_persistent_history"
if not os.path.exists(PERSISTENT_STORAGE_BASE):
try:
os.makedirs(PERSISTENT_STORAGE_BASE, exist_ok=True)
print(f"信息:基础持久化目录 '{PERSISTENT_STORAGE_BASE}' 尝试确保其存在。")
except Exception as e:
print(f"警告:操作基础持久化目录 '{PERSISTENT_STORAGE_BASE}' 时出现问题: {e}。")
full_subdir_path = os.path.join(PERSISTENT_STORAGE_BASE, DATA_SUBDIR_NAME)
if not os.path.exists(full_subdir_path):
try:
os.makedirs(full_subdir_path)
print(f"成功创建持久化子目录: {full_subdir_path}")
except Exception as e:
print(f"错误:创建持久化子目录 '{full_subdir_path}' 失败: {e}")
else:
print(f"信息:持久化子目录 '{full_subdir_path}' 已存在。")
GLOBAL_HISTORY_FILE = os.path.join(full_subdir_path, "global_experiment_shown_pairs.json")
if not (os.path.isdir(full_subdir_path) and os.access(full_subdir_path, os.W_OK)):
print(f"严重警告:持久化子目录 '{full_subdir_path}' 无效或不可写。")
print(f"全局历史文件将被加载/保存到: {GLOBAL_HISTORY_FILE}")
global_shown_pairs_cache = {}
global_history_has_unsaved_changes = False
exhausted_target_images = set()
def load_global_shown_pairs():
global global_shown_pairs_cache, global_history_has_unsaved_changes, exhausted_target_images
exhausted_target_images = set()
if not GLOBAL_HISTORY_FILE or not os.path.exists(GLOBAL_HISTORY_FILE):
print(f"信息:全局历史文件 '{GLOBAL_HISTORY_FILE}' 未找到或路径无效。将创建新的空历史记录。")
global_shown_pairs_cache = {}
global_history_has_unsaved_changes = False
return
try:
with open(GLOBAL_HISTORY_FILE, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
print(f"信息:全局历史文件 '{GLOBAL_HISTORY_FILE}' 为空。将使用空历史记录。")
global_shown_pairs_cache = {}
else:
data_from_file = json.loads(content)
global_shown_pairs_cache = {
target_img: {frozenset(pair) for pair in pairs_list}
for target_img, pairs_list in data_from_file.items()
}
print(f"已成功从 '{GLOBAL_HISTORY_FILE}' 加载全局已展示图片对历史。")
except json.JSONDecodeError as jde:
print(f"错误:加载全局历史文件 '{GLOBAL_HISTORY_FILE}' 失败 (JSON解析错误: {jde})。文件内容可能已损坏。将使用空历史记录。")
global_shown_pairs_cache = {}
except Exception as e:
print(f"错误:加载全局历史文件 '{GLOBAL_HISTORY_FILE}' 时发生其他错误: {e}。将使用空历史记录。")
global_shown_pairs_cache = {}
global_history_has_unsaved_changes = False
def save_global_shown_pairs():
global global_shown_pairs_cache, global_history_has_unsaved_changes
if not GLOBAL_HISTORY_FILE:
print("错误:GLOBAL_HISTORY_FILE 未定义。无法保存历史。")
return False
final_save_path = os.path.abspath(GLOBAL_HISTORY_FILE)
try:
parent_dir = os.path.dirname(final_save_path)
if not os.path.exists(parent_dir):
try:
os.makedirs(parent_dir, exist_ok=True)
print(f"信息: 为保存历史文件,创建了父目录 {parent_dir}")
except Exception as e_mkdir:
print(f"错误: 创建历史文件的父目录 {parent_dir} 失败: {e_mkdir}。保存可能失败。")
return False
data_to_save = {
target_img: [sorted(list(pair_fset)) for pair_fset in pairs_set]
for target_img, pairs_set in global_shown_pairs_cache.items()
}
temp_file_path = final_save_path + ".tmp"
with open(temp_file_path, 'w', encoding='utf-8') as f:
json.dump(data_to_save, f, ensure_ascii=False, indent=2)
os.replace(temp_file_path, final_save_path)
print(f"已成功将全局已展示图片对历史保存到 '{final_save_path}'。")
global_history_has_unsaved_changes = False
return True
except Exception as e:
print(f"错误:保存全局历史文件 '{final_save_path}' 失败: {e}")
return False
load_global_shown_pairs()
# ==== 加载所有可用的目标图片 ====
master_image_list = []
if os.path.exists(TARGET_DIR):
try:
master_image_list = sorted(
[f for f in os.listdir(TARGET_DIR) if f.lower().endswith((".jpg", ".png", ".jpeg"))],
key=lambda x: int(os.path.splitext(x)[0])
)
except ValueError:
master_image_list = sorted([f for f in os.listdir(TARGET_DIR) if f.lower().endswith((".jpg", ".png", ".jpeg"))])
if master_image_list: print(f"警告: '{TARGET_DIR}' 文件名非纯数字,按字母排序。")
if not master_image_list: print(f"警告:在 '{TARGET_DIR}' 中无有效图片。")
elif not os.path.exists(TARGET_DIR) and os.path.exists(BASE_IMAGE_DIR): print(f"错误:目标目录 '{TARGET_DIR}' 未找到。")
# ---- 测试模式:缩减 master_image_list (仅当 REPEAT_SINGLE_TARGET_FOR_TESTING 为 True 时生效) ----
if REPEAT_SINGLE_TARGET_FOR_TESTING:
if not master_image_list:
print(f"测试模式错误:master_image_list 为空,无法进行重复单一目标图测试。")
else:
original_first_image = master_image_list[0]
master_image_list = [original_first_image]
print(f"测试模式:master_image_list 已被缩减为原列表的第一个图像: {master_image_list}")
if not master_image_list:
print(f"关键错误:无目标图片可用 (master_image_list为空)。实验无法进行。")
# ==== 辅助函数 ====
# #############################################################################
# ############# 函数修改点:get_next_trial_info ################################
# #############################################################################
# #############################################################################
# ############# 函数修改点:get_next_trial_info ################################
# #############################################################################
def get_next_trial_info(current_trial_idx_in_run, current_run_image_list_for_trial, num_trials_in_this_run_for_trial):
global TARGET_DIR, METHOD_ROOTS, SUBJECTS, SENTINEL_TRIAL_INTERVAL
global global_shown_pairs_cache, global_history_has_unsaved_changes, exhausted_target_images
if not current_run_image_list_for_trial or current_trial_idx_in_run >= num_trials_in_this_run_for_trial:
return None, current_trial_idx_in_run
img_filename_original = current_run_image_list_for_trial[current_trial_idx_in_run]
target_full_path = os.path.join(TARGET_DIR, img_filename_original)
trial_number_for_display = current_trial_idx_in_run + 1
# ---- MODIFICATION START: 创建两个用于特定方法对比的候选池 ----
pool_image_flited = []
pool_reconed_image_color = []
# 这个独立的池用于“哨兵试验”,它需要从“任何”方法中随机抽取一个候选图
combined_pool_for_sentinel = []
for m_root_path in METHOD_ROOTS:
method_name = os.path.basename(m_root_path)
subjects_for_method = SUBJECTS
if method_name.lower() == "takagi":
if "subj01" in SUBJECTS:
subjects_for_method = ["subj01"]
else:
continue
for s_id in subjects_for_method:
base, ext = os.path.splitext(img_filename_original)
reconstructed_filename = f"{base}_0{ext}"
candidate_path = os.path.join(m_root_path, s_id, reconstructed_filename)
if os.path.exists(candidate_path):
internal_label = f"{method_name}/{s_id}/{reconstructed_filename}"
candidate_tuple = (internal_label, candidate_path)
# 为常规试验,将候选图放入对应的特定池中
if method_name == "image_flited":
pool_image_flited.append(candidate_tuple)
elif method_name == "generated_images_color":
pool_reconed_image_color.append(candidate_tuple)
# 将“所有”有效的候选图都添加到哨兵池中
combined_pool_for_sentinel.append(candidate_tuple)
# ---- MODIFICATION END: 候选池已填充完毕 ----
trial_info = {"image_id": img_filename_original, "target_path": target_full_path, "cur_no": trial_number_for_display, "is_sentinel": False,
"left_display_label": "N/A", "left_internal_label": "N/A", "left_path": None,
"right_display_label": "N/A", "right_internal_label": "N/A", "right_path": None}
is_potential_sentinel_trial = (trial_number_for_display > 0 and trial_number_for_display % SENTINEL_TRIAL_INTERVAL == 0)
if is_potential_sentinel_trial:
# 对于哨兵试验,我们从包含所有方法候选图的池中随机选择一个
if not combined_pool_for_sentinel:
print(f"警告:哨兵图 '{img_filename_original}' (trial {trial_number_for_display}) 无任何候选图。")
else:
print(f"生成哨兵试验 for '{img_filename_original}' (trial {trial_number_for_display})")
trial_info["is_sentinel"] = True
sentinel_candidate_target_tuple = ("目标图像", target_full_path)
random_reconstruction_candidate_tuple = random.choice(combined_pool_for_sentinel)
candidates_for_sentinel = [
(("目标图像", target_full_path), sentinel_candidate_target_tuple[0]),
(("重建图", random_reconstruction_candidate_tuple[1]), random_reconstruction_candidate_tuple[0])
]
random.shuffle(candidates_for_sentinel)
trial_info.update({
"left_display_label": candidates_for_sentinel[0][0][0], "left_path": candidates_for_sentinel[0][0][1], "left_internal_label": candidates_for_sentinel[0][1],
"right_display_label": candidates_for_sentinel[1][0][0], "right_path": candidates_for_sentinel[1][0][1], "right_internal_label": candidates_for_sentinel[1][1],
})
else: # 常规试验
# ---- MODIFICATION START: 新的检查与配对逻辑 ----
# 检查两个特定的池是否都有候选图
if not pool_image_flited or not pool_reconed_image_color:
print(f"警告:常规图 '{img_filename_original}' (trial {trial_number_for_display}) 候选不足以形成 'image_flited' vs 'reconed_image_color' 对。 "
f"('image_flited' 找到 {len(pool_image_flited)} 个, "
f"'reconed_image_color' 找到 {len(pool_reconed_image_color)} 个)。此试验无法进行。")
return None, current_trial_idx_in_run
target_global_history_set = global_shown_pairs_cache.setdefault(img_filename_original, set())
# 从两个特定池中各选一个,生成所有可能的配对
all_possible_pairs_in_pool = []
for c_flited, c_reconed in product(pool_image_flited, pool_reconed_image_color):
pair_labels_fset = frozenset({c_flited[0], c_reconed[0]})
all_possible_pairs_in_pool.append( ((c_flited, c_reconed), pair_labels_fset) )
# ---- MODIFICATION END: 新的配对逻辑已完成 ----
unseen_globally_pairs_with_data = [
item for item in all_possible_pairs_in_pool if item[1] not in target_global_history_set
]
selected_candidates_tuples = None
if unseen_globally_pairs_with_data:
chosen_pair_data_and_labels = random.choice(unseen_globally_pairs_with_data)
selected_candidates_tuples = chosen_pair_data_and_labels[0]
chosen_pair_frozenset = chosen_pair_data_and_labels[1]
target_global_history_set.add(chosen_pair_frozenset)
global_history_has_unsaved_changes = True
else:
print(f"警告:目标图 '{img_filename_original}' (trial {trial_number_for_display}): 所有 ({len(all_possible_pairs_in_pool)}) 个 'image_flited' vs 'reconed_image_color' 对均已在全局展示过。")
if all_possible_pairs_in_pool:
print(f"目标图 '{img_filename_original}' 将被标记为已耗尽,未来轮次中将被跳过。")
exhausted_target_images.add(img_filename_original)
return None, current_trial_idx_in_run
display_order_candidates = list(selected_candidates_tuples)
if random.random() > 0.5:
display_order_candidates = display_order_candidates[::-1]
trial_info.update({
"left_display_label": "候选图 1", "left_path": display_order_candidates[0][1], "left_internal_label": display_order_candidates[0][0],
"right_display_label": "候选图 2", "right_path": display_order_candidates[1][1], "right_internal_label": display_order_candidates[1][0],
})
return trial_info, current_trial_idx_in_run + 1
# ==== 批量保存用户选择日志函数 (保持不变) ====
def save_single_log_to_hf_dataset(log_entry, user_identifier_str):
global DATASET_REPO_ID, INDIVIDUAL_LOGS_FOLDER
if not isinstance(log_entry, dict):
print(f"错误:单个日志条目不是字典格式,无法保存:{log_entry}")
return False
current_user_id = user_identifier_str if user_identifier_str else "unknown_user_session"
identifier_safe = str(current_user_id).replace('.', '_').replace(':', '_').replace('/', '_')
print(f"用户 {identifier_safe} - 准备保存单条日志 for image {log_entry.get('image_id', 'Unknown')}...")
try:
token = os.getenv("HF_TOKEN")
if not token:
print("错误:环境变量 HF_TOKEN 未设置。无法保存单条日志到Dataset。")
return False
if not DATASET_REPO_ID:
print("错误:DATASET_REPO_ID 未配置。无法保存单条日志到Dataset。")
return False
api = HfApi(token=token)
image_id_safe_for_filename = os.path.splitext(log_entry.get("image_id", "unknown_img"))[0].replace('.', '_').replace(':', '_').replace('/', '_')
file_creation_timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
unique_filename = (f"run{log_entry.get('run_no', 'X')}_trial{log_entry.get('trial_sequence_in_run', 'Y')}_img{image_id_safe_for_filename}_{file_creation_timestamp_str}.json")
path_in_repo = f"{INDIVIDUAL_LOGS_FOLDER}/{identifier_safe}/{unique_filename}"
try:
json_content = json.dumps(log_entry, ensure_ascii=False, indent=2)
except Exception as json_err:
print(f"错误:序列化单条日志时出错: {log_entry}. 错误: {json_err}")
error_log_content = {"error": "serialization_failed_single", "original_data_keys": list(log_entry.keys()) if isinstance(log_entry, dict) else None, "timestamp": datetime.now().isoformat()}
json_content = json.dumps(error_log_content, ensure_ascii=False, indent=2)
log_bytes = json_content.encode('utf-8')
file_like_object = io.BytesIO(log_bytes)
print(f"准备上传单条日志文件: {path_in_repo} ({len(log_bytes)} bytes)")
api.upload_file(
path_or_fileobj=file_like_object,
path_in_repo=path_in_repo,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=(f"Log choice: img {log_entry.get('image_id', 'N/A')}, run {log_entry.get('run_no', 'N/A')}, trial {log_entry.get('trial_sequence_in_run', 'N/A')} by {identifier_safe}")
)
print(f"单条日志已成功保存到 HF Dataset: {DATASET_REPO_ID}/{path_in_repo}")
return True
except HfHubHTTPError as hf_http_error:
print(f"保存单条日志到 Hugging Face Dataset 时发生 HTTP 错误 (可能被限流或权限问题): {hf_http_error}")
traceback.print_exc()
return False
except Exception as e:
print(f"保存单条日志 (image {log_entry.get('image_id', 'Unknown')}, user {identifier_safe}) 到 Hugging Face Dataset 时发生严重错误: {e}")
traceback.print_exc()
return False
# ==== 批量保存用户选择日志函数 (确保返回 True/False) ====
def save_collected_logs_batch(list_of_log_entries, user_identifier_str, batch_identifier):
global DATASET_REPO_ID, BATCH_LOG_FOLDER
if not list_of_log_entries:
print("批量保存用户日志:没有累积的日志。")
return True
identifier_safe = str(user_identifier_str if user_identifier_str else "unknown_user_session").replace('.', '_').replace(':', '_').replace('/', '_').replace(' ', '_')
print(f"用户 {identifier_safe} - 准备批量保存 {len(list_of_log_entries)} 条选择日志 (批次标识: {batch_identifier})...")
try:
token = os.getenv("HF_TOKEN")
if not token:
print("错误:HF_TOKEN 未设置。无法批量保存选择日志。")
return False
if not DATASET_REPO_ID:
print("错误:DATASET_REPO_ID 未配置。无法批量保存选择日志。")
return False
api = HfApi(token=token)
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
batch_filename = f"batch_user-{identifier_safe}_id-{batch_identifier}_{timestamp_str}_logs-{len(list_of_log_entries)}.jsonl"
path_in_repo = f"{BATCH_LOG_FOLDER}/{identifier_safe}/{batch_filename}"
jsonl_content = ""
for log_entry in list_of_log_entries:
try:
if isinstance(log_entry, dict): jsonl_content += json.dumps(log_entry, ensure_ascii=False) + "\n"
else: print(f"警告:批量保存选择日志时,条目非字典:{log_entry}")
except Exception as json_err:
print(f"错误:批量保存选择日志序列化单条时出错: {log_entry}. 错误: {json_err}")
jsonl_content += json.dumps({"error": "serialization_failed_in_batch_user_log", "original_data_preview": str(log_entry)[:100],"timestamp": datetime.now().isoformat()}, ensure_ascii=False) + "\n"
if not jsonl_content.strip():
print(f"用户 {identifier_safe} (批次 {batch_identifier}) 无可序列化选择日志。")
return True
log_bytes = jsonl_content.encode('utf-8')
file_like_object = io.BytesIO(log_bytes)
print(f"准备批量上传选择日志文件: {path_in_repo} ({len(log_bytes)} bytes)")
api.upload_file(
path_or_fileobj=file_like_object,
path_in_repo=path_in_repo,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Batch user choice logs for {identifier_safe}, batch_id {batch_identifier} ({len(list_of_log_entries)} entries)"
)
print(f"批量选择日志已成功保存到 HF Dataset: {DATASET_REPO_ID}/{path_in_repo}")
return True
except HfHubHTTPError as hf_http_error:
print(f"批量保存选择日志到 Hugging Face Dataset 时发生 HTTP 错误 (可能被限流或权限问题): {hf_http_error}")
traceback.print_exc()
return False
except Exception as e:
print(f"批量保存选择日志 (user {identifier_safe}, batch_id {batch_identifier}) 失败: {e}")
traceback.print_exc()
return False
# ==== 主要的 Gradio 事件处理函数 ====
def process_experiment_step(
s_trial_idx_val, s_run_no_val, s_user_logs_val, s_current_trial_data_val, s_user_session_id_val,
s_current_run_image_list_val, s_num_trials_this_run_val,
action_type=None, choice_value=None, request: gr.Request = None
):
global master_image_list, NUM_TRIALS_PER_RUN, outputs_ui_components_definition, LOG_BATCH_SIZE
global REPEAT_SINGLE_TARGET_FOR_TESTING, NUM_REPEATED_TRIALS_FOR_TESTING
global exhausted_target_images, global_history_has_unsaved_changes
output_s_trial_idx = s_trial_idx_val; output_s_run_no = s_run_no_val
output_s_user_logs = list(s_user_logs_val); output_s_current_trial_data = dict(s_current_trial_data_val) if s_current_trial_data_val else {}
output_s_user_session_id = s_user_session_id_val; output_s_current_run_image_list = list(s_current_run_image_list_val)
output_s_num_trials_this_run = s_num_trials_this_run_val
user_ip_fallback = request.client.host if request else "unknown_ip"
user_identifier_for_logging = output_s_user_session_id if output_s_user_session_id else user_ip_fallback
len_ui_outputs = len(outputs_ui_components_definition)
def create_ui_error_tuple(message, progress_msg_text, stop_experiment=False):
btn_start_interactive = not stop_experiment
btn_choices_interactive = not stop_experiment
return (gr.update(visible=False),) * 3 + \
("", "") + \
(message, progress_msg_text) + \
(gr.update(interactive=btn_start_interactive), gr.update(interactive=btn_choices_interactive), gr.update(interactive=btn_choices_interactive)) + \
(gr.update(visible=False),)
def create_no_change_tuple(): return (gr.update(),) * len_ui_outputs
user_id_display_text = output_s_user_session_id if output_s_user_session_id else "用户ID待分配"
if action_type == "record_choice":
if output_s_current_trial_data.get("data") and output_s_current_trial_data["data"].get("left_internal_label"):
chosen_internal_label = (output_s_current_trial_data["data"]["left_internal_label"] if choice_value == "left" else output_s_current_trial_data["data"]["right_internal_label"])
parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = "N/A", "N/A", "N/A"
if chosen_internal_label == "目标图像": parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = "TARGET", "GT", output_s_current_trial_data["data"]["image_id"]
else:
parts = chosen_internal_label.split('/');
if len(parts) == 3: parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = parts[0].strip(), parts[1].strip(), parts[2].strip()
elif len(parts) == 2: parsed_chosen_method, parsed_chosen_subject = parts[0].strip(), parts[1].strip()
elif len(parts) == 1: parsed_chosen_method = parts[0].strip()
log_entry = {
"timestamp": datetime.now().isoformat(), "user_identifier": user_identifier_for_logging, "run_no": output_s_run_no,
"image_id": output_s_current_trial_data["data"]["image_id"],
"left_internal_label": output_s_current_trial_data["data"]["left_internal_label"],
"right_internal_label": output_s_current_trial_data["data"]["right_internal_label"],
"chosen_side": choice_value, "chosen_internal_label": chosen_internal_label,
"chosen_method": parsed_chosen_method, "chosen_subject": parsed_chosen_subject, "chosen_filename": parsed_chosen_filename,
"trial_sequence_in_run": output_s_current_trial_data["data"]["cur_no"],
"is_sentinel": output_s_current_trial_data["data"]["is_sentinel"]
}
output_s_user_logs.append(log_entry)
print(f"用户 {user_identifier_for_logging} 记录选择 (img: {log_entry['image_id']})。当前批次日志数: {len(output_s_user_logs)}")
if len(output_s_user_logs) >= LOG_BATCH_SIZE:
print(f"累积用户选择日志达到 {LOG_BATCH_SIZE} 条,准备批量保存...")
batch_id_for_filename = f"run{output_s_run_no}_trialidx{output_s_trial_idx}_logcount{len(output_s_user_logs)}"
user_logs_save_success = save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename)
if user_logs_save_success:
print("批量用户选择日志已成功(或尝试)保存,将清空累积的用户选择日志列表。")
output_s_user_logs = []
else:
print("严重错误:批量用户选择日志保存失败。实验无法继续。")
error_message_ui = "错误:日志保存失败,可能是网络问题或API限流。实验已停止,请联系管理员。"
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止在第 {output_s_run_no} 轮,试验 {output_s_trial_idx+1}"
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True)
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, \
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates
if global_history_has_unsaved_changes:
print("检测到全局图片对历史自上次保存后有更新,将一并保存...")
if not save_global_shown_pairs():
print("严重错误:全局图片对历史保存失败。实验无法继续。")
error_message_ui = "错误:全局历史数据保存失败。实验已停止,请联系管理员。"
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止在第 {output_s_run_no} 轮,试验 {output_s_trial_idx+1}"
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True)
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, \
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates
else:
print(f"用户 {user_identifier_for_logging} 错误:记录选择时当前试验数据为空或缺少internal_label!")
error_ui_updates = create_ui_error_tuple("记录选择时内部错误。", f"用户ID: {user_id_display_text} | 进度:{output_s_trial_idx}/{output_s_num_trials_this_run}", stop_experiment=False)
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates
if action_type == "start_experiment":
is_first = (output_s_num_trials_this_run == 0 and output_s_trial_idx == 0 and output_s_run_no == 1)
is_completed_for_restart = (output_s_num_trials_this_run > 0 and output_s_trial_idx >= output_s_num_trials_this_run)
if is_completed_for_restart:
if output_s_user_logs:
print(f"轮次 {output_s_run_no-1} 结束,尝试保存剩余的 {len(output_s_user_logs)} 条用户选择日志...")
batch_id_for_filename = f"run{output_s_run_no-1}_final_logcount{len(output_s_user_logs)}"
if not save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename):
print("严重错误:保存上一轮剩余用户选择日志失败。实验无法继续。")
error_message_ui = "错误:日志保存失败。实验已停止,请联系管理员。"
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止"
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True)
return output_s_trial_idx, output_s_run_no-1, output_s_user_logs, output_s_current_trial_data, \
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates
output_s_user_logs = []
if global_history_has_unsaved_changes:
print("轮次结束,尝试保存全局图片对历史...")
if not save_global_shown_pairs():
print("严重错误:全局历史数据保存失败。实验无法继续。")
error_message_ui = "错误:全局历史数据保存失败。实验已停止,请联系管理员。"
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止"
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True)
return output_s_trial_idx, output_s_run_no-1, output_s_user_logs, output_s_current_trial_data, \
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates
if is_first or is_completed_for_restart:
if is_completed_for_restart: output_s_run_no += 1
available_master_images = [img for img in master_image_list if img not in exhausted_target_images]
print(f"开始轮次 {output_s_run_no}: 从 {len(master_image_list)}个总目标图片中筛选,可用图片 {len(available_master_images)}个 (已排除 {len(exhausted_target_images)}个已耗尽图片).")
if not available_master_images:
msg = "所有目标图片的所有唯一图片对均已展示完毕!感谢您的参与。"
prog_text = f"用户ID: {user_id_display_text} | 实验完成!"
if output_s_user_logs:
print(f"最终轮次结束,尝试保存剩余的 {len(output_s_user_logs)} 条用户选择日志...")
batch_id_for_filename = f"run{output_s_run_no-1}_final_logcount{len(output_s_user_logs)}"
save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename)
output_s_user_logs = []
if global_history_has_unsaved_changes:
print("实验最终结束,尝试保存全局图片对历史...")
save_global_shown_pairs()
ui_updates = list(create_ui_error_tuple(msg, prog_text, stop_experiment=True))
return 0, output_s_run_no, [], {}, output_s_user_session_id, [], 0, *tuple(ui_updates)
if REPEAT_SINGLE_TARGET_FOR_TESTING and available_master_images:
print(f"测试模式 (重复单一目标图) 已激活。")
single_image_to_repeat = available_master_images[0]
output_s_current_run_image_list = [single_image_to_repeat] * NUM_REPEATED_TRIALS_FOR_TESTING
output_s_num_trials_this_run = NUM_REPEATED_TRIALS_FOR_TESTING
print(f"测试模式:本轮将重复目标图片 '{single_image_to_repeat}' 共 {output_s_num_trials_this_run} 次。")
else:
num_really_avail = len(available_master_images)
current_run_max_trials = NUM_TRIALS_PER_RUN
run_size = min(num_really_avail, current_run_max_trials)
if run_size == 0:
error_ui = create_ui_error_tuple("错误: 可用图片采样数为0!", f"用户ID: {user_id_display_text} | 进度: 0/0", stop_experiment=False)
return 0, output_s_run_no, output_s_user_logs, {}, output_s_user_session_id, [], 0, *error_ui
output_s_current_run_image_list = random.sample(available_master_images, run_size)
output_s_num_trials_this_run = run_size
output_s_trial_idx = 0
output_s_current_trial_data = {}
if is_first:
timestamp_str = datetime.now().strftime('%Y%m%d%H%M%S%f'); random_val = random.randint(10000, 99999)
if not output_s_user_session_id:
output_s_user_session_id = f"user_{timestamp_str}_{random_val}"; user_identifier_for_logging = output_s_user_session_id
else:
user_identifier_for_logging = output_s_user_session_id
print(f"用户会话ID: {output_s_user_session_id}")
print(f"开始/继续轮次 {output_s_run_no} (用户ID: {output_s_user_session_id}). 本轮共 {output_s_num_trials_this_run} 个试验。")
else:
print(f"用户 {user_identifier_for_logging} 在第 {output_s_run_no} 轮,试验 {output_s_trial_idx} 点击开始,但轮次未完成。忽略。")
no_change_ui = create_no_change_tuple()
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *no_change_ui
current_actual_trial_index_for_get_next = output_s_trial_idx
if current_actual_trial_index_for_get_next >= output_s_num_trials_this_run and output_s_num_trials_this_run > 0:
prog_text = f"用户ID: {output_s_user_session_id} | 进度:{output_s_num_trials_this_run}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮 🎉"
ui_updates = list(create_ui_error_tuple(f"🎉 第 {output_s_run_no} 轮完成!请点击“开始试验 / 下一轮”继续。", prog_text, stop_experiment=False))
ui_updates[7]=gr.update(interactive=True); ui_updates[8]=gr.update(interactive=False); ui_updates[9]=gr.update(interactive=False)
ui_updates[0]=gr.update(value=None,visible=False); ui_updates[1]=gr.update(value=None,visible=False); ui_updates[2]=gr.update(value=None,visible=False)
yield output_s_trial_idx, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_updates; return
if not output_s_current_run_image_list or output_s_num_trials_this_run == 0:
error_ui = create_ui_error_tuple("错误: 无法加载试验图片 (列表为空或试验数为0)", f"用户ID: {user_id_display_text} | 进度: N/A", stop_experiment=False)
return output_s_trial_idx, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, [], 0, *error_ui
trial_info = None
next_s_trial_idx_for_state_loop = current_actual_trial_index_for_get_next
while next_s_trial_idx_for_state_loop < output_s_num_trials_this_run:
current_target_image_for_trial = output_s_current_run_image_list[next_s_trial_idx_for_state_loop]
if current_target_image_for_trial in exhausted_target_images:
print(f"信息:目标图 '{current_target_image_for_trial}' 已在全局耗尽列表中,跳过此试验。")
next_s_trial_idx_for_state_loop += 1
output_s_trial_idx = next_s_trial_idx_for_state_loop
continue
_trial_info_candidate, _returned_next_idx = get_next_trial_info(next_s_trial_idx_for_state_loop, output_s_current_run_image_list, output_s_num_trials_this_run)
if _trial_info_candidate is not None:
trial_info = _trial_info_candidate
output_s_trial_idx = _returned_next_idx
break
else:
print(f"信息:目标图 '{current_target_image_for_trial}' 无法生成有效试验。尝试列表中的下一个。")
next_s_trial_idx_for_state_loop +=1
output_s_trial_idx = next_s_trial_idx_for_state_loop
if trial_info is None:
print(f"轮次 {output_s_run_no} 中没有更多可用的有效试验了。结束本轮。")
if output_s_user_logs:
print(f"轮次 {output_s_run_no} 无更多有效试验,尝试保存剩余 {len(output_s_user_logs)} 条日志...")
batch_id_for_filename = f"run{output_s_run_no}_no_more_trials_logcount{len(output_s_user_logs)}"
if not save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename):
print("严重错误:保存剩余日志失败。实验可能需要停止。")
output_s_user_logs = []
if global_history_has_unsaved_changes:
print("轮次无更多有效试验,尝试保存全局图片对历史...")
if not save_global_shown_pairs():
print("严重错误:全局历史数据保存失败。实验可能需要停止。")
prog_text = f"用户ID: {output_s_user_session_id} | 进度:{output_s_num_trials_this_run}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮 (无更多可用试验)"
ui_updates = list(create_ui_error_tuple(f"第 {output_s_run_no} 轮因无更多可用试验而结束。请点击“开始试验 / 下一轮”。", prog_text, stop_experiment=False))
ui_updates[7]=gr.update(interactive=True); ui_updates[8]=gr.update(interactive=False); ui_updates[9]=gr.update(interactive=False)
ui_updates[0]=gr.update(value=None,visible=False); ui_updates[1]=gr.update(value=None,visible=False); ui_updates[2]=gr.update(value=None,visible=False)
yield output_s_num_trials_this_run, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_updates; return
output_s_current_trial_data = {"data": trial_info}
prog_text = f"用户ID: {output_s_user_session_id} | 进度:{trial_info['cur_no']}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮"
ui_show_target_updates = list(create_no_change_tuple())
ui_show_target_updates[0]=gr.update(value=trial_info["target_path"],visible=True); ui_show_target_updates[1]=gr.update(value=None,visible=False); ui_show_target_updates[2]=gr.update(value=None,visible=False)
ui_show_target_updates[3]=""; ui_show_target_updates[4]=""; ui_show_target_updates[5]="请观察原图…"; ui_show_target_updates[6]=prog_text
ui_show_target_updates[7]=gr.update(interactive=False); ui_show_target_updates[8]=gr.update(interactive=False); ui_show_target_updates[9]=gr.update(interactive=False)
yield output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_show_target_updates
time.sleep(3)
ui_show_candidates_updates = list(create_no_change_tuple())
ui_show_candidates_updates[0]=gr.update(value=None,visible=False); ui_show_candidates_updates[1]=gr.update(value=trial_info["left_path"],visible=True); ui_show_candidates_updates[2]=gr.update(value=trial_info["right_path"],visible=True)
ui_show_candidates_updates[3]=gr.update(value=trial_info["left_display_label"], visible=True); ui_show_candidates_updates[4]=gr.update(value=trial_info["right_display_label"], visible=True)
ui_show_candidates_updates[5]="请选择更像原图的一张"; ui_show_candidates_updates[6]=prog_text
ui_show_candidates_updates[7]=gr.update(interactive=False); ui_show_candidates_updates[8]=gr.update(interactive=True); ui_show_candidates_updates[9]=gr.update(interactive=True)
yield output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_show_candidates_updates
# ==== Gradio UI 定义 和 程序入口 ====
def handle_download_history_file():
global GLOBAL_HISTORY_FILE
if os.path.exists(GLOBAL_HISTORY_FILE):
try:
if os.path.getsize(GLOBAL_HISTORY_FILE) > 0:
print(f"准备提供文件下载: {GLOBAL_HISTORY_FILE}")
return GLOBAL_HISTORY_FILE, gr.update(value=f"点击上面的链接下载 '{os.path.basename(GLOBAL_HISTORY_FILE)}'")
else:
print(f"历史文件 '{GLOBAL_HISTORY_FILE}' 为空,不提供下载。")
return None, gr.update(value=f"提示: 历史文件 '{os.path.basename(GLOBAL_HISTORY_FILE)}' 当前为空。")
except Exception as e:
print(f"检查历史文件大小时出错 '{GLOBAL_HISTORY_FILE}': {e}")
return None, gr.update(value=f"错误: 检查历史文件时出错。")
else:
print(f"请求下载历史文件,但文件 '{GLOBAL_HISTORY_FILE}' 未找到。")
return None, gr.update(value=f"错误: JSON历史文件 '{os.path.basename(GLOBAL_HISTORY_FILE)}' 未找到。请先运行实验以生成数据并触发保存。")
welcome_page_markdown = """
## 欢迎加入实验!
您好!非常感谢您抽出宝贵时间参与我们的视觉偏好评估实验。您的选择将帮助我们改进重建算法,让机器生成的图像更贴近人类视觉体验!
1. **实验目的**:通过比较两幅 重建图像 与原始 目标图像 的相似度。
2. **操作流程**:
* 点击下方的「我已阅读并同意开始实验」按钮。
* 然后点击主实验界面的「开始试验 / 下一轮」按钮。
* 系统先展示一张 **目标图像**,持续 3 秒。
* 随后自动切换到 **两张重建图像**。
* 根据刚才的观察记忆,选出您认为与目标图像最相似的一张。
* 选择后系统会自动进入下一轮比较。
3. **温馨提示**:
* 请勿刷新或关闭页面,以免中断实验。
* 若图片加载稍有延迟,请耐心等待;持续异常可联系邮箱 yangminghan@bupt.edu.cn。
* 本实验将保护您的任何个人隐私信息,所有数据仅用于学术研究,请您认真选择和填写。
4. **奖励说明**:
* 完成全部轮次后,请截图记录您所完成的实验总数(可累积,页面左下角将显示进度,请保证截取到为您分配的ID,轮次)。
* 将截图发送至邮箱 yangminghan@bupt.edu.cn,我们将在核验后发放奖励。
再次感谢您的参与与支持!您每一次认真选择都对我们的研究意义重大。祝您一切顺利,实验愉快!
"""
def handle_agree_and_start(name, gender, age, education, request: gr.Request):
error_messages_list = []
if not name or str(name).strip() == "": error_messages_list.append("姓名 不能为空。")
if gender is None or str(gender).strip() == "": error_messages_list.append("性别 必须选择。")
if age is None: error_messages_list.append("年龄 不能为空。")
elif not (isinstance(age, (int, float)) and 1 <= age <= 120):
try: num_age = float(age);
except (ValueError, TypeError): error_messages_list.append("年龄必须是一个有效的数字。")
else:
if not (1 <= num_age <= 120): error_messages_list.append("年龄必须在 1 到 120 之间。")
if education is None or str(education).strip() == "其他": error_messages_list.append("学历 必须选择。")
if error_messages_list:
full_error_message = "请修正以下错误:\n" + "\n".join([f"- {msg}" for msg in error_messages_list])
print(f"用户输入验证失败: {full_error_message}")
return gr.update(), False, gr.update(visible=True), gr.update(visible=False), full_error_message
s_name = str(name).strip().replace(" ","_").replace("/","_").replace("\\","_")
s_gender = str(gender).strip().replace(" ","_").replace("/","_").replace("\\","_")
s_age = str(int(float(age)))
s_education = str(education).strip().replace(" ","_").replace("/","_").replace("\\","_")
user_id_str = f"N-{s_name}_G-{s_gender}_A-{s_age}_E-{s_education}"
print(f"用户信息收集完毕,生成用户ID: {user_id_str}")
return user_id_str, True, gr.update(visible=False), gr.update(visible=True), ""
with gr.Blocks(css=CSS, title="图像重建主观评估") as demo:
s_show_experiment_ui = gr.State(False); s_trial_index = gr.State(0); s_run_no = gr.State(1)
s_user_logs = gr.State([]); s_current_trial_data = gr.State({}); s_user_session_id = gr.State(None)
s_current_run_image_list = gr.State([]); s_num_trials_this_run = gr.State(0)
welcome_container = gr.Column(visible=True)
experiment_container = gr.Column(visible=False)
with welcome_container:
gr.Markdown(welcome_page_markdown)
with gr.Row(): user_name_input = gr.Textbox(label="请输入您的姓名或代号 (例如 张三 或 User001)", placeholder="例如:张三 -> ZS"); user_gender_input = gr.Radio(label="性别", choices=["男", "女"])
with gr.Row(): user_age_input = gr.Number(label="年龄 (请输入1-120的整数)", minimum=1, maximum=120, step=1); user_education_input = gr.Dropdown(label="学历", choices=["其他","初中及以下","高中(含中专)", "大专(含在读)", "本科(含在读)", "硕士(含在读)", "博士(含在读)"])
welcome_error_msg = gr.Markdown(value="")
btn_agree_and_start = gr.Button("我已阅读上述说明并同意参与实验")
with experiment_container:
gr.Markdown("## 🧠 图像重建主观评估实验"); gr.Markdown(f"每轮实验大约有 {NUM_TRIALS_PER_RUN} 次比较。")
with gr.Row():
with gr.Column(scale=1, min_width=300): left_img = gr.Image(label="左候选图", visible=False, height=400, interactive=False); left_lbl = gr.Textbox(label="左图信息", value="", visible=True, interactive=False, max_lines=1); btn_left = gr.Button("选择左图 (更相似)", interactive=False, elem_classes="compact_button")
with gr.Column(scale=1, min_width=300): right_img = gr.Image(label="右候选图", visible=False, height=400, interactive=False); right_lbl = gr.Textbox(label="右图信息",value="", visible=True, interactive=False, max_lines=1); btn_right = gr.Button("选择右图 (更相似)", interactive=False, elem_classes="compact_button")
with gr.Row(): target_img = gr.Image(label="目标图像 (观察3秒后消失)", visible=False, height=400, interactive=False)
with gr.Row(): status_text = gr.Markdown(value="请点击“开始试验 / 下一轮”按钮。")
with gr.Row(): progress_text = gr.Markdown()
with gr.Row():
btn_start = gr.Button("开始试验 / 下一轮")
btn_download_json = gr.Button("下载JSON历史记录")
json_download_output = gr.File(label="下载的文件会在此处提供", interactive=False)
file_out_placeholder = gr.File(label=" ", visible=False, interactive=False)
outputs_ui_components_definition = [
target_img, left_img, right_img, left_lbl, right_lbl, status_text, progress_text,
btn_start, btn_left, btn_right, file_out_placeholder
]
click_inputs_base = [
s_trial_index, s_run_no, s_user_logs, s_current_trial_data, s_user_session_id,
s_current_run_image_list, s_num_trials_this_run
]
event_outputs = [
s_trial_index, s_run_no, s_user_logs, s_current_trial_data, s_user_session_id,
s_current_run_image_list, s_num_trials_this_run, *outputs_ui_components_definition
]
btn_agree_and_start.click(fn=handle_agree_and_start, inputs=[user_name_input, user_gender_input, user_age_input, user_education_input], outputs=[s_user_session_id, s_show_experiment_ui, welcome_container, experiment_container, welcome_error_msg])
btn_start.click(fn=partial(process_experiment_step, action_type="start_experiment"), inputs=click_inputs_base, outputs=event_outputs, queue=True)
btn_left.click(fn=partial(process_experiment_step, action_type="record_choice", choice_value="left"), inputs=click_inputs_base, outputs=event_outputs, queue=True)
btn_right.click(fn=partial(process_experiment_step, action_type="record_choice", choice_value="right"), inputs=click_inputs_base, outputs=event_outputs, queue=True)
btn_download_json.click(fn=handle_download_history_file, inputs=None, outputs=[json_download_output, status_text])
if __name__ == "__main__":
if not master_image_list: print("\n关键错误:程序无法启动,因无目标图片。"); exit()
else:
print(f"从 '{TARGET_DIR}' 加载 {len(master_image_list)} 张目标图片。")
if not METHOD_ROOTS: print(f"警告: '{BASE_IMAGE_DIR}' 无候选方法子目录。")
if not SUBJECTS: print("警告: SUBJECTS 列表为空。")
print(f"用户选择日志保存到 Dataset: '{DATASET_REPO_ID}' 的 '{BATCH_LOG_FOLDER}/ 文件夹")
if not os.getenv("HF_TOKEN"): print("警告: HF_TOKEN 未设置。日志无法保存到Hugging Face Dataset。\n 请在 Space Secrets 中设置 HF_TOKEN。")
else: print("HF_TOKEN 已找到。")
print(f"全局图片对历史将从 '{GLOBAL_HISTORY_FILE}' 加载/保存到此文件。")
allowed_paths_list = []
image_base_dir_to_allow = BASE_IMAGE_DIR
if os.path.exists(image_base_dir_to_allow) and os.path.isdir(image_base_dir_to_allow):
allowed_paths_list.append(os.path.abspath(image_base_dir_to_allow))
else:
print(f"关键警告:图片基础目录 '{image_base_dir_to_allow}' 不存在或非目录。")
if os.path.exists(PERSISTENT_STORAGE_BASE) and os.path.isdir(PERSISTENT_STORAGE_BASE):
allowed_paths_list.append(os.path.abspath(PERSISTENT_STORAGE_BASE))
else:
print(f"警告:持久化存储基础目录 '{PERSISTENT_STORAGE_BASE}' 不存在。JSON历史文件下载可能受影响。")
try:
os.makedirs(PERSISTENT_STORAGE_BASE, exist_ok=True)
print(f"信息:已尝试创建目录 '{PERSISTENT_STORAGE_BASE}'。")
if os.path.exists(PERSISTENT_STORAGE_BASE) and os.path.isdir(PERSISTENT_STORAGE_BASE):
allowed_paths_list.append(os.path.abspath(PERSISTENT_STORAGE_BASE))
except Exception as e_mkdir_main:
print(f"错误:在 main 中创建目录 '{PERSISTENT_STORAGE_BASE}' 失败: {e_mkdir_main}")
final_allowed_paths = list(set(allowed_paths_list))
if final_allowed_paths:
print(f"Gradio demo.launch() 配置最终 allowed_paths: {final_allowed_paths}")
else:
print("警告:没有有效的 allowed_paths 被配置。Gradio文件访问可能受限。")
print("启动 Gradio 应用...")
if final_allowed_paths:
demo.launch(allowed_paths=final_allowed_paths)
else:
demo.launch() |