Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import json | |
| import pandas as pd | |
| import random | |
| import shutil | |
| import time | |
| import collections | |
| from functools import wraps | |
| from filelock import FileLock | |
| from datasets import load_dataset, Audio | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from multiprocessing import TimeoutError | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
| # Load dataset from HuggingFace | |
| dataset = load_dataset("intersteller2887/Turing-test-dataset", split="train") | |
| dataset = dataset.cast_column("audio", Audio(decode=False)) # Prevent calling 'torchcodec' from newer version of 'datasets' | |
| # Huggingface space working directory: "/home/user/app" | |
| target_audio_dir = "/home/user/app/audio" | |
| os.makedirs(target_audio_dir, exist_ok=True) | |
| COUNT_JSON_PATH = "/home/user/app/count.json" | |
| COUNT_JSON_REPO_PATH = "submissions/count.json" # Output directory (Huggingface dataset directory) | |
| # Copy recordings to the working directory | |
| local_audio_paths = [] | |
| for item in dataset: | |
| src_path = item["audio"]["path"] | |
| if src_path and os.path.exists(src_path): | |
| filename = os.path.basename(src_path) | |
| dst_path = os.path.join(target_audio_dir, filename) | |
| if not os.path.exists(dst_path): | |
| shutil.copy(src_path, dst_path) | |
| local_audio_paths.append(dst_path) | |
| all_data_audio_paths = local_audio_paths | |
| # Take first file of the datasets as sample | |
| sample1_audio_path = local_audio_paths[0] | |
| print(sample1_audio_path) | |
| # ============================================================================== | |
| # Data Definition | |
| # ============================================================================== | |
| DIMENSIONS_DATA = [ | |
| { | |
| "title": "语义和语用特征", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "记忆一致性:偏机器:出现上下文记忆不一致且无法察觉或修正(如遗忘掉关键信息坚持错误回答);偏人类:在短上下文中记忆一致,若出现记忆偏差也会提问确认", | |
| "逻辑连贯性:偏机器:逻辑转折生硬或自相矛盾(如:突然切换话题无过渡);偏人类:逻辑自然流畅", | |
| "读音正确性:偏机器:存在不自然的发音错误,常见多音字发音错误;偏人类:用字发音正确、自然,会结合语境正确使用常见多音字", | |
| "多语言混杂:偏机器:多语言混杂生硬,无语言切换逻辑;偏人类:说话多语言混杂往往和语境相关(专有名词、习惯用法),语言切换生硬卡顿,不自然", | |
| "语言不精确性:偏机器:回应通常不存在模糊表达,回答准确、肯定;偏人类:说话存在含糊表达:如“差不多”、“应该是吧”,且会出现自我打断或修正(“如不对不对”)的行为", | |
| "填充词使用:偏机器:很少使用填充词或填充词使用不自然;偏人类:在思考时经常使用填充词(如‘嗯’‘那个’)", | |
| "隐喻与语用用意:偏机器:表达直白,缺乏语义多样性,仅能字面理解语义;偏人类:使用隐喻、反语、委婉来表达多重含义" | |
| ], | |
| "reference_scores": [5, 5, 5, 0, 5, 5, 0] | |
| }, | |
| { | |
| "title": "非生理性副语言特征", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "节奏:偏机器:说话几乎无停顿或停顿机械;偏人类:语速随语义起伏,偶尔卡顿或犹豫", | |
| "语调:偏机器:语调单一或变化过于规律,不符合语境;偏人类:在表达如疑问、惊讶、强调时,音调会自然上扬或下降", | |
| "重读:偏机器:没有在适当使用重读或出现强调部位异常;偏人类:有意识地重读重要词语,突出重点", | |
| "辅助性发声:偏机器:辅助性发声语境错误或机械化;偏人类:发出符合语境的非语言声音,如笑声、叹气等" | |
| ], | |
| "reference_scores": [5, 5, 5, 5] | |
| }, | |
| { | |
| "title": "生理性副语言特征", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "微生理杂音:偏机器:语音过于干净,或发出不自然杂音(如电音);偏人类:说话存在呼吸声、口水音、气泡音等无意识发声,且自然地出现在说话中", | |
| "发音不稳定性:偏机器:发音过于清晰规则;偏人类:发音存在一定不规则性(诸如连读、颤音、含糊发音、鼻音等)", | |
| "口音:偏机器:口音生硬;偏人类:存在自然的地区口音或语音特征" | |
| ], | |
| "reference_scores": [5, 4, 4] | |
| }, | |
| { | |
| "title": "机械人格", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "谄媚现象:偏机器:频繁同意、感谢、道歉,过度认同对方观点,缺乏真实互动感;偏人类:根据语境判断是否同意对方提出的请求或表达的观点,不总是表示同意或进行附和", | |
| "书面化表达:偏机器:回应句式工整、规范,用词过于正式、频繁列举、用词泛泛;偏人类:口语化,表达灵活多变" | |
| ], | |
| "reference_scores": [5, 5] | |
| }, | |
| { | |
| "title": "情感表达", | |
| "audio": sample1_audio_path, | |
| "sub_dims": [ | |
| "语义层面:偏机器:未能针对对方情绪作出正常的情感反应,或表达情感的词语空泛、脱离语境;偏人类:对悲伤、开心等语境有符合人类的情绪反应", | |
| "声学层面:偏机器:情感语调模式化或与语境不符;偏人类:音调、音量、节奏等声学特征随情绪动态变化" | |
| ], | |
| "reference_scores": [5, 5] | |
| } | |
| ] | |
| DIMENSION_TITLES = [d["title"] for d in DIMENSIONS_DATA] | |
| SPECIAL_KEYWORDS = ["多语言混杂", "隐喻与语用用意", "口音"] | |
| MAX_SUB_DIMS = max(len(d['sub_dims']) for d in DIMENSIONS_DATA) | |
| THE_SUB_DIMS = [d['sub_dims'] for d in DIMENSIONS_DATA] | |
| # ============================================================================== | |
| # Backend Function Definitions | |
| # ============================================================================== | |
| # This version did not place file reading into filelock, concurrent read could happen | |
| """def load_or_initialize_count_json(audio_paths): | |
| try: | |
| # Only try downloading if file doesn't exist yet | |
| if not os.path.exists(COUNT_JSON_PATH): | |
| downloaded_path = hf_hub_download( | |
| repo_id="intersteller2887/Turing-test-dataset", | |
| repo_type="dataset", | |
| filename=COUNT_JSON_REPO_PATH, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| # Save it as COUNT_JSON_PATH so that the lock logic remains untouched | |
| with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: | |
| dst.write(src.read()) | |
| except Exception as e: | |
| print(f"Could not download count.json from HuggingFace dataset: {e}") | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| # Read of count.json will wait for 10 seconds until another thread involving releases it, and then add a lock to it | |
| with FileLock(lock_path, timeout=10): | |
| # If count.json exists: load into count_data | |
| # Else initialize count_data with orderedDict | |
| if os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
| else: | |
| count_data = collections.OrderedDict() | |
| updated = False | |
| sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} | |
| # Guarantee that the sample recording won't be take into the pool | |
| # Update newly updated recordings into count.json | |
| for path in audio_paths: | |
| filename = os.path.basename(path) | |
| if filename not in count_data: | |
| if filename in sample_audio_files: | |
| count_data[filename] = 999 | |
| else: | |
| count_data[filename] = 0 | |
| updated = True | |
| if updated or not os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return count_data""" | |
| # Function that load or initialize count.json | |
| # Function is called when user start a challenge, and this will load or initialize count.json to working directory | |
| # Initialize happens when count.json does not exist in the working directory as well as HuggingFace dataset | |
| # Load happens when count.json exists in HuggingFace dataset, and it's not loaded to the working directory yet | |
| # After load/initialize, all newly added audio files will be added to count.json with initial value of 0 | |
| # Load/Initialize will generate count.json in the working directory for all users under this space | |
| # This version also places file reading into filelock, and modified | |
| def load_or_initialize_count_json(audio_paths): | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| with FileLock(lock_path, timeout=10): | |
| # If count.json does not exist in the working directory, try to download it from HuggingFace dataset | |
| if not os.path.exists(COUNT_JSON_PATH): | |
| try: | |
| # Save latest count.json to working directory | |
| downloaded_path = hf_hub_download( | |
| repo_id="intersteller2887/Turing-test-dataset", | |
| repo_type="dataset", | |
| filename=COUNT_JSON_REPO_PATH, | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| with open(downloaded_path, "rb") as src, open(COUNT_JSON_PATH, "wb") as dst: | |
| dst.write(src.read()) | |
| except Exception: | |
| pass | |
| # If count.json exists in the working directory: load into count_data for potential update | |
| if os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
| # Else initialize count_data with orderedDict | |
| # This happens when there is no count.json (both working directory and HuggingFace dataset) | |
| else: | |
| count_data = collections.OrderedDict() | |
| updated = False | |
| sample_audio_files = {os.path.basename(d["audio"]) for d in DIMENSIONS_DATA} | |
| # Guarantee that the sample recording won't be take into the pool | |
| # Update newly updated recordings into count.json | |
| for path in audio_paths: | |
| filename = os.path.basename(path) | |
| if filename not in count_data: | |
| if filename in sample_audio_files: | |
| count_data[filename] = 999 | |
| else: | |
| count_data[filename] = 0 | |
| updated = True | |
| # Write updated count_data to /home/user/app/count.json | |
| if updated or not os.path.exists(COUNT_JSON_PATH): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return | |
| # Shorten the time of playing previous audio when reached next question | |
| def append_cache_buster(audio_path): | |
| return f"{audio_path}?t={int(time.time() * 1000)}" | |
| # Function that samples questions from avaliable question set | |
| # This version utilizes a given count_data to sample audio paths | |
| """def sample_audio_paths(audio_paths, count_data, k=5, max_count=1): # k for questions per test; max_count for question limit in total | |
| eligible_paths = [p for p in audio_paths if count_data.get(os.path.basename(p), 0) < max_count] | |
| if len(eligible_paths) < k: | |
| raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") | |
| # Shuffule to avoid fixed selections resulted from directory structure | |
| selected = random.sample(eligible_paths, k) | |
| # Once sampled a test, update these questions immediately | |
| for path in selected: | |
| filename = os.path.basename(path) | |
| count_data[filename] = count_data.get(filename, 0) + 1 | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| with FileLock(lock_path, timeout=10): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return selected, count_data""" | |
| # This version places file reading into filelock to guarantee correct update of count.json | |
| def sample_audio_paths(audio_paths, k=5, max_count=1): | |
| # Add filelock to /workspace/count.json | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| # Load newest count.json | |
| with FileLock(lock_path, timeout=10): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f) | |
| eligible_paths = [ | |
| p for p in audio_paths | |
| if count_data.get(os.path.basename(p), 0) < max_count | |
| ] | |
| if len(eligible_paths) < k: | |
| raise ValueError(f"可用音频数量不足(只剩 {len(eligible_paths)} 条 count<{max_count} 的音频),无法抽取 {k} 条") | |
| selected = random.sample(eligible_paths, k) | |
| # Update count_data | |
| for path in selected: | |
| filename = os.path.basename(path) | |
| count_data[filename] = count_data.get(filename, 0) + 1 | |
| # Update count.json | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| # return selected, count_data | |
| # Keep count_data atomic | |
| return selected | |
| # ============================================================================== | |
| # Frontend Function Definitions | |
| # ============================================================================== | |
| # Save question_set in each user_data_state, preventing global sharing | |
| def start_challenge(user_data_state): | |
| load_or_initialize_count_json(all_data_audio_paths) | |
| # selected_audio_paths, updated_count_data = sample_audio_paths(all_data_audio_paths, k=5) | |
| # Keep count_data atomic | |
| selected_audio_paths = sample_audio_paths(all_data_audio_paths, k=5) | |
| question_set = [ | |
| {"audio": path, "desc": f"这是音频文件 {os.path.basename(path)} 的描述"} | |
| for path in selected_audio_paths | |
| ] | |
| user_data_state["question_set"] = question_set | |
| # count_data is not needed in the user data | |
| # user_data_state["updated_count_data"] = updated_count_data | |
| return gr.update(visible=False), gr.update(visible=True), user_data_state | |
| # This function toggles the visibility of the "其他(请注明)" input field based on the selected education choice | |
| def toggle_education_other(choice): | |
| is_other = (choice == "其他(请注明)") | |
| return gr.update(visible=is_other, interactive=is_other, value="") | |
| # This function checks if the user information is complete | |
| def check_info_complete(username, age, gender, education, education_other, ai_experience): | |
| if username.strip() and age and gender and education and ai_experience: | |
| if education == "其他(请注明)" and not education_other.strip(): | |
| return gr.update(interactive=False) | |
| return gr.update(interactive=True) | |
| return gr.update(interactive=False) | |
| # This function updates user_data and initializes the sample page (called when user submits their info) | |
| def show_sample_page_and_init(username, age, gender, education, education_other, ai_experience, user_data): | |
| final_edu = education_other if education == "其他(请注明)" else education | |
| user_data.update({ | |
| "username": username.strip(), | |
| "age": age, | |
| "gender": gender, | |
| "education": final_edu, | |
| "ai_experience": ai_experience | |
| }) | |
| first_dim_title = DIMENSION_TITLES[0] | |
| initial_updates = update_sample_view(first_dim_title) | |
| return [ | |
| gr.update(visible=False), gr.update(visible=True), user_data, first_dim_title | |
| ] + initial_updates | |
| def update_sample_view(dimension_title): | |
| dim_data = next((d for d in DIMENSIONS_DATA if d["title"] == dimension_title), None) | |
| if dim_data: | |
| audio_up = gr.update(value=dim_data["audio"]) | |
| # audio_up = gr.update(value=append_cache_buster(dim_data["audio"])) | |
| interactive_view_up = gr.update(visible=True) | |
| reference_view_up = gr.update(visible=False) | |
| reference_btn_up = gr.update(value="参考") | |
| sample_slider_ups = [] | |
| ref_slider_ups = [] | |
| scores = dim_data.get("reference_scores", []) | |
| for i in range(MAX_SUB_DIMS): | |
| if i < len(dim_data['sub_dims']): | |
| label = dim_data['sub_dims'][i] | |
| score = scores[i] if i < len(scores) else 0 | |
| sample_slider_ups.append(gr.update(visible=True, label=label, value=0)) | |
| ref_slider_ups.append(gr.update(visible=True, label=label, value=score)) | |
| else: | |
| sample_slider_ups.append(gr.update(visible=False, value=0)) | |
| ref_slider_ups.append(gr.update(visible=False, value=0)) | |
| return [audio_up, interactive_view_up, reference_view_up, reference_btn_up] + sample_slider_ups + ref_slider_ups | |
| empty_updates = [gr.update()] * 4 | |
| slider_empty_updates = [gr.update()] * (MAX_SUB_DIMS * 2) | |
| return empty_updates + slider_empty_updates | |
| def update_test_dimension_view(d_idx, selections): | |
| # dimension = DIMENSIONS_DATA[d_idx] | |
| slider_updates = [] | |
| dim_data = DIMENSIONS_DATA[d_idx] | |
| sub_dims = dim_data["sub_dims"] | |
| dim_title = dim_data["title"] | |
| existing_scores = selections.get(dim_data['title'], {}) | |
| progress_d = f"维度 {d_idx + 1} / {len(DIMENSIONS_DATA)}: **{dim_data['title']}**" | |
| for i in range(MAX_SUB_DIMS): | |
| if i < len(sub_dims): | |
| desc = sub_dims[i] | |
| # print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") | |
| name = desc.split(":")[0].strip() | |
| default_value = 0 if name in SPECIAL_KEYWORDS else 1 | |
| value = existing_scores.get(desc, default_value) | |
| slider_updates.append(gr.update( | |
| visible=True, | |
| label=desc, | |
| minimum=default_value, | |
| maximum=5, | |
| step=1, | |
| value=value, | |
| interactive=True, | |
| )) | |
| # slider_updates.append(gr.update( | |
| # visible=True, | |
| # label=desc, | |
| # minimum=0 if name in SPECIAL_KEYWORDS else 1, | |
| # maximum=5, | |
| # value = existing_scores.get(desc, 0), | |
| # interactive=True, | |
| # )) | |
| else: | |
| slider_updates.append(gr.update(visible=False)) | |
| print(f"{desc} -> default value: {existing_scores.get(desc, 0)}") | |
| # for i in range(MAX_SUB_DIMS): | |
| # if i < len(dimension['sub_dims']): | |
| # sub_dim_label = dimension['sub_dims'][i] | |
| # value = existing_scores.get(sub_dim_label, 0) | |
| # slider_updates.append(gr.update(visible=True, label=sub_dim_label, value=value)) | |
| # else: | |
| # slider_updates.append(gr.update(visible=False, value=0)) | |
| prev_btn_update = gr.update(interactive=(d_idx > 0)) | |
| next_btn_update = gr.update( | |
| value="进入最终判断" if d_idx == len(DIMENSIONS_DATA) - 1 else "下一维度", | |
| interactive=True | |
| ) | |
| return [gr.update(value=progress_d), prev_btn_update, next_btn_update] + slider_updates | |
| def init_test_question(user_data, q_idx): | |
| d_idx = 0 | |
| question = user_data["question_set"][q_idx] | |
| progress_q = f"第 {q_idx + 1} / {len(user_data['question_set'])} 题" | |
| initial_updates = update_test_dimension_view(d_idx, {}) | |
| dim_title_update, prev_btn_update, next_btn_update = initial_updates[:3] | |
| slider_updates = initial_updates[3:] | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| q_idx, d_idx, {}, | |
| gr.update(value=progress_q), | |
| dim_title_update, | |
| gr.update(value=question['audio']), | |
| # gr.update(value=append_cache_buster(question['audio'])), | |
| prev_btn_update, | |
| next_btn_update, | |
| gr.update(value=None), # BUG FIX: Changed from "" to None to correctly clear the radio button | |
| gr.update(interactive=False), | |
| ) + tuple(slider_updates) | |
| def navigate_dimensions(direction, q_idx, d_idx, selections, *slider_values): | |
| current_dim_data = DIMENSIONS_DATA[d_idx] | |
| current_sub_dims = current_dim_data['sub_dims'] | |
| scores = {sub_dim: slider_values[i] for i, sub_dim in enumerate(current_sub_dims)} | |
| selections[current_dim_data['title']] = scores | |
| new_d_idx = d_idx + (1 if direction == "next" else -1) | |
| if direction == "next" and d_idx == len(DIMENSIONS_DATA) - 1: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| q_idx, new_d_idx, selections, | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(interactive=True), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| gr.update(interactive=False), | |
| ) + (gr.update(),) * MAX_SUB_DIMS | |
| else: | |
| view_updates = update_test_dimension_view(new_d_idx, selections) | |
| dim_title_update, prev_btn_update, next_btn_update = view_updates[:3] | |
| slider_updates = view_updates[3:] | |
| return ( | |
| gr.update(), gr.update(), | |
| q_idx, new_d_idx, selections, | |
| gr.update(), | |
| dim_title_update, | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| prev_btn_update, | |
| next_btn_update, | |
| ) + tuple(slider_updates) | |
| def toggle_reference_view(current): | |
| if current == "参考": | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(value="返回") | |
| else: | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(value="参考") | |
| def back_to_welcome(): | |
| return ( | |
| gr.update(visible=True), # welcome_page | |
| gr.update(visible=False), # info_page | |
| gr.update(visible=False), # sample_page | |
| gr.update(visible=False), # pretest_page | |
| gr.update(visible=False), # test_page | |
| gr.update(visible=False), # final_judgment_page | |
| gr.update(visible=False), # result_page | |
| {}, # user_data_state | |
| 0, # current_question_index | |
| 0, # current_test_dimension_index | |
| {}, # current_question_selections | |
| [] # test_results | |
| ) | |
| # ============================================================================== | |
| # Retry Function Definitions | |
| # ============================================================================== | |
| # Decorator function that allows to use ThreadPoolExecutor to retry a function with timeout | |
| def retry_with_timeout(max_retries=3, timeout=10, backoff=1): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| last_exception = None | |
| for attempt in range(max_retries): | |
| try: | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(func, *args, **kwargs) | |
| try: | |
| result = future.result(timeout=timeout) | |
| return result | |
| except FutureTimeoutError: | |
| future.cancel() | |
| raise TimeoutError(f"Operation timed out after {timeout} seconds") | |
| except Exception as e: | |
| last_exception = e | |
| print(f"Attempt {attempt + 1} failed: {str(e)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(backoff * (attempt + 1)) | |
| print(f"All {max_retries} attempts failed") | |
| if last_exception: | |
| raise last_exception | |
| raise Exception("Unknown error occurred") | |
| return wrapper | |
| return decorator | |
| def save_with_retry(all_results, user_data): | |
| # 尝试上传到Hugging Face Hub | |
| try: | |
| # 使用线程安全的保存方式 | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(save_all_results_to_file, all_results, user_data) | |
| try: | |
| future.result(timeout=30) # 设置30秒超时 | |
| return True | |
| except FutureTimeoutError: | |
| future.cancel() | |
| print("上传超时") | |
| return False | |
| except Exception as e: | |
| print(f"上传到Hub失败: {e}") | |
| return False | |
| def save_locally_with_retry(data, filename, max_retries=3): | |
| for attempt in range(max_retries): | |
| try: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=4, ensure_ascii=False) | |
| return True | |
| except Exception as e: | |
| print(f"本地保存尝试 {attempt + 1} 失败: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(1) | |
| return False | |
| def update_count_with_retry(count_data, question_set, max_retries=3): | |
| for attempt in range(max_retries): | |
| try: | |
| lock_path = COUNT_JSON_PATH + ".lock" | |
| with FileLock(lock_path, timeout=10): | |
| # Remove unfinished question(s) from count.json | |
| for question in question_set: | |
| filename = os.path.basename(question['audio']) | |
| if filename in count_data and count_data[filename] < 1: | |
| count_data[filename] = 0 # Mark unfinished data as 0 | |
| with open(COUNT_JSON_PATH, 'w', encoding='utf-8') as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| return True | |
| except Exception as e: | |
| print(f"Fail to update count.json {e} for {attempt + 1} time") | |
| if attempt < max_retries - 1: | |
| time.sleep(1) | |
| return False | |
| # ============================================================================== | |
| # Previous version of submit_question_and_advance | |
| """def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): | |
| # selections["final_choice"] = final_choice | |
| cleaned_selections = {} | |
| for dim_title, sub_scores in selections.items(): | |
| # if dim_title == "final_choice": # 去掉if判断 | |
| cleaned_selections["final_choice"] = final_choice | |
| # continue | |
| cleaned_sub_scores = {} | |
| for sub_dim, score in sub_scores.items(): | |
| cleaned_sub_scores[sub_dim] = None if score == 0 else score | |
| cleaned_selections[dim_title] = cleaned_sub_scores | |
| final_question_result = { | |
| "question_id": q_idx, | |
| "audio_file": user_data["question_set"][q_idx]['audio'], | |
| "selections": cleaned_selections | |
| } | |
| all_results.append(final_question_result) | |
| q_idx += 1 | |
| # If q_idx hasn't reached the last one | |
| if q_idx < len(user_data["question_set"]): | |
| init_q_updates = init_test_question(user_data, q_idx) # Case 1: jam happens when initialize next question | |
| return init_q_updates + (all_results, gr.update(value="")) | |
| # If q_idx has reached the last one | |
| else: | |
| result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" | |
| for res in all_results: | |
| # result_str += f"\n#### 题目: {res['audio_file']}\n" | |
| result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" | |
| for dim_title, dim_data in res['selections'].items(): | |
| if dim_title == 'final_choice': continue | |
| result_str += f"- **{dim_title}**:\n" | |
| for sub_dim, score in dim_data.items(): | |
| result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" | |
| # save_all_results_to_file(all_results, user_data) | |
| # save_all_results_to_file(all_results, user_data, count_data=updated_count_data) | |
| save_all_results_to_file(all_results, user_data, count_data=user_data.get("updated_count_data")) | |
| return ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| q_idx, d_idx, {}, | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), | |
| ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str)""" | |
| # user_data now no further contain "updated_count_data", which should be read/write with filelock and be directly accessed from working directory | |
| def submit_question_and_advance(q_idx, d_idx, selections, final_choice, all_results, user_data): | |
| try: | |
| # 准备数据 | |
| cleaned_selections = {} | |
| for dim_title, sub_scores in selections.items(): | |
| cleaned_selections["final_choice"] = final_choice | |
| cleaned_sub_scores = {} | |
| for sub_dim, score in sub_scores.items(): | |
| cleaned_sub_scores[sub_dim] = None if score == 0 else score | |
| cleaned_selections[dim_title] = cleaned_sub_scores | |
| final_question_result = { | |
| "question_id": q_idx, | |
| "audio_file": user_data["question_set"][q_idx]['audio'], | |
| "selections": cleaned_selections | |
| } | |
| all_results.append(final_question_result) | |
| q_idx += 1 | |
| if q_idx < len(user_data["question_set"]): | |
| init_q_updates = init_test_question(user_data, q_idx) | |
| return init_q_updates + (all_results, gr.update(value="")) | |
| else: | |
| # 准备完整结果数据 | |
| result_str = "### 测试全部完成!\n\n你的提交结果概览:\n" | |
| for res in all_results: | |
| result_str += f"##### 最终判断: **{res['selections'].get('final_choice', '未选择')}**\n" | |
| for dim_title, dim_data in res['selections'].items(): | |
| if dim_title == 'final_choice': continue | |
| result_str += f"- **{dim_title}**:\n" | |
| for sub_dim, score in dim_data.items(): | |
| result_str += f" - *{sub_dim[:20]}...*: {score}/5\n" | |
| # 尝试上传(带重试) | |
| try: | |
| # success = save_with_retry(all_results, user_data, user_data.get("updated_count_data")) | |
| success = save_with_retry(all_results, user_data) | |
| except Exception as e: | |
| print(f"上传过程中发生错误: {e}") | |
| success = False | |
| if not success: | |
| # 上传失败,保存到本地 | |
| username = user_data.get("username", "anonymous") | |
| timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
| local_filename = f"submission_{username}_{timestamp}.json" | |
| # 准备数据包 | |
| user_info_clean = { | |
| k: v for k, v in user_data.items() if k not in ["question_set"] | |
| } | |
| final_data_package = { | |
| "user_info": user_info_clean, | |
| "results": all_results | |
| } | |
| # 尝试保存到本地 | |
| local_success = save_locally_with_retry(final_data_package, local_filename) | |
| if local_success: | |
| result_str += f"\n\n⚠️ 上传失败,结果已保存到本地文件: {local_filename}" | |
| else: | |
| result_str += "\n\n❌ 上传失败且无法保存到本地文件,请联系管理员" | |
| # 更新count.json(剔除未完成的题目) | |
| try: | |
| with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data = json.load(f, object_pairs_hook=collections.OrderedDict) | |
| count_update_success = update_count_with_retry(count_data, user_data["question_set"]) | |
| except Exception as e: | |
| print(f"更新count.json失败: {e}") | |
| count_update_success = False | |
| if not count_update_success: | |
| result_str += "\n\n⚠️ 无法更新题目计数,请联系管理员" | |
| return ( | |
| gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), | |
| q_idx, d_idx, {}, | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), | |
| ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, result_str) | |
| except Exception as e: | |
| print(f"提交过程中发生错误: {e}") | |
| # 返回错误信息 | |
| error_msg = f"提交过程中发生错误: {str(e)}" | |
| return ( | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| q_idx, d_idx, selections, | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), | |
| ) + (gr.update(),) * MAX_SUB_DIMS + (all_results, error_msg) | |
| """def save_all_results_to_file(all_results, user_data, count_data=None): | |
| repo_id = "intersteller2887/Turing-test-dataset" | |
| username = user_data.get("username", "user") | |
| timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
| submission_filename = f"submissions_{username}_{timestamp}.json" | |
| user_info_clean = { | |
| k: v for k, v in user_data.items() if k not in ["question_set", "updated_count_data"] | |
| } | |
| final_data_package = { | |
| "user_info": user_info_clean, | |
| "results": all_results | |
| } | |
| json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| print("HF_TOKEN not found. Cannot upload to the Hub.") | |
| return | |
| try: | |
| api = HfApi() | |
| # Upload submission file | |
| api.upload_file( | |
| path_or_fileobj=bytes(json_string, "utf-8"), | |
| path_in_repo=f"submissions/{submission_filename}", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Add new submission from {username}" | |
| ) | |
| print(f"上传成功: {submission_filename}") | |
| if count_data: | |
| with FileLock(COUNT_JSON_PATH + ".lock", timeout=10): | |
| with open(COUNT_JSON_PATH, "w", encoding="utf-8") as f: | |
| json.dump(count_data, f, indent=4, ensure_ascii=False) | |
| api.upload_file( | |
| path_or_fileobj=COUNT_JSON_PATH, | |
| path_in_repo=COUNT_JSON_REPO_PATH, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Update count.json after submission by {username}" | |
| ) | |
| except Exception as e: | |
| print(f"上传出错: {e}")""" | |
| def save_all_results_to_file(all_results, user_data): | |
| repo_id = "intersteller2887/Turing-test-dataset" | |
| username = user_data.get("username", "user") | |
| timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
| submission_filename = f"submissions_{username}_{timestamp}.json" | |
| user_info_clean = { | |
| k: v for k, v in user_data.items() if k not in ["question_set"] | |
| } | |
| final_data_package = { | |
| "user_info": user_info_clean, | |
| "results": all_results | |
| } | |
| json_string = json.dumps(final_data_package, ensure_ascii=False, indent=4) | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| raise Exception("HF_TOKEN not found. Cannot upload to the Hub.") | |
| api = HfApi() | |
| # 上传提交文件(不再使用装饰器,直接调用) | |
| api.upload_file( | |
| path_or_fileobj=bytes(json_string, "utf-8"), | |
| path_in_repo=f"submissions/{submission_filename}", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Add new submission from {username}" | |
| ) | |
| try: | |
| with FileLock(COUNT_JSON_PATH + ".lock", timeout=5): | |
| with open(COUNT_JSON_PATH, "r", encoding="utf-8") as f: | |
| count_data_str = f.read() | |
| api.upload_file( | |
| path_or_fileobj=bytes(count_data_str, "utf-8"), | |
| path_in_repo=COUNT_JSON_REPO_PATH, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=hf_token, | |
| commit_message=f"Update count.json after submission by {username}" | |
| ) | |
| except Exception as e: | |
| print(f"上传 count.json 失败: {e}") | |
| # ============================================================================== | |
| # Gradio 界面定义 (Gradio UI Definition) | |
| # ============================================================================== | |
| with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important}") as demo: | |
| user_data_state = gr.State({}) | |
| current_question_index = gr.State(0) | |
| current_test_dimension_index = gr.State(0) | |
| current_question_selections = gr.State({}) | |
| test_results = gr.State([]) | |
| welcome_page = gr.Column(visible=True) | |
| info_page = gr.Column(visible=False) | |
| sample_page = gr.Column(visible=False) | |
| pretest_page = gr.Column(visible=False) | |
| test_page = gr.Column(visible=False) | |
| final_judgment_page = gr.Column(visible=False) | |
| result_page = gr.Column(visible=False) | |
| pages = { | |
| "welcome": welcome_page, "info": info_page, "sample": sample_page, | |
| "pretest": pretest_page, "test": test_page, "final_judgment": final_judgment_page, | |
| "result": result_page | |
| } | |
| with welcome_page: | |
| gr.Markdown("# AI 识破者\n你将听到一系列对话,请判断哪个回应者是 AI。") | |
| start_btn = gr.Button("开始挑战", variant="primary") | |
| with info_page: | |
| gr.Markdown("## 请提供一些基本信息") | |
| username_input = gr.Textbox(label="用户名", placeholder="请输入你的昵称") | |
| age_input = gr.Radio(["18岁以下", "18-25岁", "26-35岁", "36-50岁", "50岁以上"], label="年龄") | |
| gender_input = gr.Radio(["男", "女", "其他"], label="性别") | |
| education_input = gr.Radio(["高中及以下", "本科", "硕士", "博士", "其他"], label="学历") | |
| education_other_input = gr.Textbox(label="请填写你的学历", visible=False, interactive=False) | |
| ai_experience_input = gr.Radio(["从未使用过", "偶尔接触(如看别人用)", "使用过几次,了解基本功能", "经常使用,有一定操作经验", "非常熟悉,深入使用过多个 AI 工具"], label="对 AI 工具的熟悉程度") | |
| submit_info_btn = gr.Button("提交并开始学习样例", variant="primary", interactive=False) | |
| with sample_page: | |
| gr.Markdown("## 样例分析\n请选择一个维度进行学习和打分练习。所有维度共用同一个样例音频。") | |
| sample_dimension_selector = gr.Radio(DIMENSION_TITLES, label="选择学习维度", value=DIMENSION_TITLES[0]) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| sample_audio = gr.Audio(label="样例音频", value=DIMENSIONS_DATA[0]["audio"]) | |
| with gr.Column(scale=2): | |
| with gr.Column(visible=True) as interactive_view: | |
| gr.Markdown("#### 请为以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)") | |
| sample_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True) for i in range(MAX_SUB_DIMS)] | |
| with gr.Column(visible=False) as reference_view: | |
| gr.Markdown("### 参考答案解析") | |
| reference_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=False) for i in range(MAX_SUB_DIMS)] | |
| with gr.Row(): | |
| reference_btn = gr.Button("参考") | |
| go_to_pretest_btn = gr.Button("我明白了,开始测试", variant="primary") | |
| with pretest_page: | |
| gr.Markdown("## 测试说明\n" | |
| "- 对于每一道题,你都需要对全部 **5 个维度** 进行评估。\n" | |
| "- 在每个维度下,请为出现的每个特征 **从0到5打分**。\n" | |
| "- **评分解释如下:**\n" | |
| " - **0 分:特征未体现** (有些特征一定会体现,所以按1到5打分);\n" | |
| " - **1 分:极度符合机器特征**;\n" | |
| " - **2 分:较为符合机器特征**;\n" | |
| " - **3 分:无明显人类或机器倾向**;\n" | |
| " - **4 分:较为符合人类特征**;\n" | |
| " - **5 分:极度符合人类特征**。\n" | |
| "- 完成所有维度后,请根据整体印象对回应方的身份做出做出“人类”或“机器人”的 **最终判断**。\n" | |
| "- 你可以使用“上一维度”和“下一维度”按钮在5个维度间自由切换和修改分数。\n" | |
| "## 特别注意\n" | |
| "- 我们希望您能判断每个维度上**回应者**的表现是**偏向人还是机器**,分数的大小反映回应者的语音类人的程度,而**不是**这个维度体现的程度多少\n(如读音正确也不代表是人类,读音错误也不代表是机器,您应当判断的是“听到的发音更偏向机器还是人类”)\n" | |
| "- 即使您一开始就已经很肯定回应方的身份,同样应当**独立地**对每个维度上回应方的表现进行细致的评判。比如您很肯定回应方是机器,也需要独立地对每个维度判断,而非简单地将每个维度归为偏机器。") | |
| go_to_test_btn = gr.Button("开始测试", variant="primary") | |
| with test_page: | |
| gr.Markdown("## 正式测试") | |
| question_progress_text = gr.Markdown() | |
| test_dimension_title = gr.Markdown() | |
| test_audio = gr.Audio(label="测试音频") | |
| gr.Markdown("--- \n ### 请为对话中的回应者(非发起者)针对以下特征打分 (0-5分。0-特征无体现;1-机器;3-特征无偏向;5-人类)") | |
| test_sliders = [gr.Slider(minimum=0, maximum=5, step=1, label=f"Sub-dim {i+1}", visible=False, interactive=True, show_label = True) for i in range(MAX_SUB_DIMS)] | |
| with gr.Row(): | |
| prev_dim_btn = gr.Button("上一维度") | |
| next_dim_btn = gr.Button("下一维度", variant="primary") | |
| with final_judgment_page: | |
| gr.Markdown("## 最终判断") | |
| gr.Markdown("您已完成对所有维度的评分。请根据您的综合印象,做出最终判断。") | |
| final_human_robot_radio = gr.Radio(["👤 人类", "🤖 机器人"], label="请判断回应者类型 (必填)") | |
| submit_final_answer_btn = gr.Button("提交本题答案", variant="primary", interactive=False) | |
| with result_page: | |
| gr.Markdown("## 测试完成") | |
| result_text = gr.Markdown() | |
| back_to_welcome_btn = gr.Button("返回主界面", variant="primary") | |
| # ============================================================================== | |
| # 事件绑定 (Event Binding) & IO 列表定义 | |
| # ============================================================================== | |
| sample_init_outputs = [ | |
| info_page, sample_page, user_data_state, sample_dimension_selector, | |
| sample_audio, interactive_view, reference_view, reference_btn | |
| ] + sample_sliders + reference_sliders | |
| test_init_outputs = [ | |
| pretest_page, test_page, final_judgment_page, result_page, | |
| current_question_index, current_test_dimension_index, current_question_selections, | |
| question_progress_text, test_dimension_title, test_audio, | |
| prev_dim_btn, next_dim_btn, | |
| final_human_robot_radio, submit_final_answer_btn, | |
| ] + test_sliders | |
| nav_inputs = [current_question_index, current_test_dimension_index, current_question_selections] + test_sliders | |
| nav_outputs = [ | |
| test_page, final_judgment_page, | |
| current_question_index, current_test_dimension_index, current_question_selections, | |
| question_progress_text, test_dimension_title, test_audio, | |
| final_human_robot_radio, submit_final_answer_btn, | |
| prev_dim_btn, next_dim_btn, | |
| ] + test_sliders | |
| full_outputs_with_results = test_init_outputs + [test_results, result_text] | |
| # start_btn.click(fn=start_challenge, outputs=[welcome_page, info_page]) | |
| start_btn.click( | |
| fn=start_challenge, | |
| inputs=[user_data_state], | |
| outputs=[welcome_page, info_page, user_data_state] | |
| ) | |
| for comp in [age_input, gender_input, education_input, education_other_input, ai_experience_input]: | |
| comp.change( | |
| fn=check_info_complete, | |
| inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input], | |
| outputs=submit_info_btn | |
| ) | |
| education_input.change(fn=toggle_education_other, inputs=education_input, outputs=education_other_input) | |
| submit_info_btn.click( | |
| fn=show_sample_page_and_init, | |
| inputs=[username_input, age_input, gender_input, education_input, education_other_input, ai_experience_input, user_data_state], | |
| outputs=sample_init_outputs | |
| ) | |
| sample_dimension_selector.change( | |
| fn=update_sample_view, | |
| inputs=sample_dimension_selector, | |
| outputs=[sample_audio, interactive_view, reference_view, reference_btn] + sample_sliders + reference_sliders | |
| ) | |
| reference_btn.click( | |
| fn=toggle_reference_view, | |
| inputs=reference_btn, | |
| outputs=[interactive_view, reference_view, reference_btn] | |
| ) | |
| go_to_pretest_btn.click(lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[sample_page, pretest_page]) | |
| go_to_test_btn.click( | |
| fn=lambda user: init_test_question(user, 0) + ([], gr.update()), | |
| inputs=[user_data_state], | |
| outputs=full_outputs_with_results | |
| ) | |
| prev_dim_btn.click( | |
| fn=lambda q,d,s, *sliders: navigate_dimensions("prev", q,d,s, *sliders), | |
| inputs=nav_inputs, outputs=nav_outputs | |
| ) | |
| next_dim_btn.click( | |
| fn=lambda q,d,s, *sliders: navigate_dimensions("next", q,d,s, *sliders), | |
| inputs=nav_inputs, outputs=nav_outputs | |
| ) | |
| final_human_robot_radio.change( | |
| fn=lambda choice: gr.update(interactive=bool(choice)), | |
| inputs=final_human_robot_radio, | |
| outputs=submit_final_answer_btn | |
| ) | |
| submit_final_answer_btn.click( | |
| fn=submit_question_and_advance, | |
| inputs=[current_question_index, current_test_dimension_index, current_question_selections, final_human_robot_radio, test_results, user_data_state], | |
| outputs=full_outputs_with_results | |
| ) | |
| back_to_welcome_btn.click(fn=back_to_welcome, outputs=list(pages.values()) + [user_data_state, current_question_index, current_test_dimension_index, current_question_selections, test_results]) | |
| # ============================================================================== | |
| # 程序入口 (Entry Point) | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| if not os.path.exists("audio"): | |
| os.makedirs("audio") | |
| if "SPACE_ID" in os.environ: | |
| print("Running in a Hugging Face Space, checking for audio files...") | |
| # all_files = [q["audio"] for q in QUESTION_SET] + [d["audio"] for d in DIMENSIONS_DATA] | |
| all_files = [d["audio"] for d in DIMENSIONS_DATA] | |
| for audio_file in set(all_files): | |
| if not os.path.exists(audio_file): | |
| print(f"⚠️ Warning: Audio file not found: {audio_file}") | |
| demo.launch(debug=True) |