AudioLabelingApp / task_manager.py
sunnyzjx's picture
Upload 7 files
099b013 verified
import gradio as gr
from data_processing import load_tasks
from annotation import save_annotations
tasks = load_tasks(comparison_mode="random_reverse", seed=42)
def get_current_task_with_annotations(annotation_results, user_current_task=0):
"""获取当前任务信息,应用已有标注的样式(用于初始加载)"""
task = tasks[user_current_task]
current_choice = annotation_results.get(user_current_task) if annotation_results else None
# 基础音频数据
audioA_data = (task["audioA"][1], task["audioA"][0]) # (rate, data)
audioB_data = (task["audioB"][1], task["audioB"][0]) # (rate, data)
# 根据选择结果应用样式
if current_choice == "win":
# A胜过B - A高亮绿色,B显示败北
audioA_styled = gr.update(value=audioA_data, elem_classes="selected")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
elif current_choice == "lose":
# A输给B - B高亮绿色,A显示败北
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="selected")
elif current_choice == "tie":
# 平局 - 两个都用特殊样式
audioA_styled = gr.update(value=audioA_data, elem_classes="tie-selected")
audioB_styled = gr.update(value=audioB_data, elem_classes="tie-selected")
else:
# 未选择
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
return (
task["instruction"],
task["text"],
audioA_styled,
audioB_styled,
user_current_task == 0,
user_current_task == len(tasks) - 1,
user_current_task + 1
)
def get_current_task(user_current_task=0, annotation_results=None, styled=False):
"""获取当前任务信息,可选择是否应用样式"""
task = tasks[user_current_task]
if styled and annotation_results is not None:
current_choice = annotation_results.get(user_current_task)
audioA_data = (task["audioA"][1], task["audioA"][0]) # (rate, data)
audioB_data = (task["audioB"][1], task["audioB"][0]) # (rate, data)
if current_choice == "win":
audioA_styled = gr.update(value=audioA_data, elem_classes="selected")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
elif current_choice == "lose":
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="selected")
elif current_choice == "tie":
audioA_styled = gr.update(value=audioA_data, elem_classes="tie-selected")
audioB_styled = gr.update(value=audioB_data, elem_classes="tie-selected")
else:
audioA_styled = gr.update(value=audioA_data, elem_classes="")
audioB_styled = gr.update(value=audioB_data, elem_classes="")
return (
task["instruction"],
task["text"],
audioA_styled,
audioB_styled,
user_current_task == 0,
user_current_task == len(tasks) - 1,
user_current_task + 1
)
else:
return (
task["instruction"],
task["text"],
task["audioA"][0],
task["audioA"][1],
task["audioB"][0],
task["audioB"][1],
user_current_task == 0,
user_current_task == len(tasks) - 1,
user_current_task + 1
)
def apply_selection_style(audioA, audioB, choice):
"""根据选择结果应用样式"""
if choice == "win":
# A胜过B
return (
gr.update(value=audioA, elem_classes="selected"),
gr.update(value=audioB, elem_classes="")
)
elif choice == "lose":
# A输给B
return (
gr.update(value=audioA, elem_classes=""),
gr.update(value=audioB, elem_classes="selected")
)
elif choice == "tie":
# 平局
return (
gr.update(value=audioA, elem_classes="tie-selected"),
gr.update(value=audioB, elem_classes="tie-selected")
)
else:
# 清除选择
return (
gr.update(value=audioA, elem_classes=""),
gr.update(value=audioB, elem_classes="")
)
def select_result(choice, audioA, audioB, annotation_results, username, user_current_task):
"""记录选择结果并更新UI高亮,自动保存标注结果"""
annotation_results[user_current_task] = choice
# 自动保存标注结果
save_result = save_annotations(username, annotation_results, tasks)
print(f"自动保存结果: {save_result}")
audioA_update, audioB_update = apply_selection_style(audioA, audioB, choice)
return audioA_update, audioB_update, annotation_results
def change_task(direction, annotation_results, username, user_current_task):
"""切换任务"""
new_user_current_task = user_current_task
if direction == "prev" and user_current_task > 0:
new_user_current_task = user_current_task - 1
elif direction == "next" and user_current_task < len(tasks) - 1:
new_user_current_task = user_current_task + 1
inst, text, audioA_update, audioB_update, prev_disabled, next_disabled, task_num = get_current_task(
new_user_current_task, annotation_results, styled=True)
total_tasks = get_total_tasks()
combined_task_info = f'<div class="user-task-info"><span>👤 当前用户: {username}</span><span><strong>任务编号: {task_num} / {total_tasks}</strong></span></div>'
return (
inst, text,
audioA_update,
audioB_update,
gr.update(interactive=not prev_disabled),
gr.update(interactive=not next_disabled),
gr.update(value=combined_task_info),
annotation_results,
new_user_current_task
)
def get_total_tasks():
"""返回总任务数"""
return len(tasks)