import gradio as gr import json import os from datetime import datetime from huggingface_hub import HfApi import time HF_DATASET_REPO = "Jazzcharles/audioverse_for_annotation" SYNC_INTERVAL = 300 # 秒,Space 环境建议 60~300 # os.environ["GRADIO_TEMP_DIR"] = "/home/jilan_xu/qwen/assets/gradio_temp" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) TMP_DIR = os.path.join(BASE_DIR, "gradio_tmp") os.makedirs(TMP_DIR, exist_ok=True) os.environ["GRADIO_TEMP_DIR"] = TMP_DIR DATA_PATH = "data/samples_for_annotation_with_urls.json" ASSIGN_PATH = "data/assignments.json" RESULT_PATH = "results/results.jsonl" os.makedirs("results", exist_ok=True) def pull_results_from_hf(): """ Download results.jsonl from HF dataset repo to local RESULT_PATH. If download fails, keep local file untouched. """ try: os.makedirs(os.path.dirname(RESULT_PATH), exist_ok=True) api.download_file( repo_id=HF_DATASET_REPO, repo_type="dataset", filename="results.jsonl", local_dir=os.path.dirname(RESULT_PATH), local_dir_use_symlinks=False, ) print("[INIT] Pulled results.jsonl from HF dataset.") except Exception as e: print("[INIT] No remote results.jsonl or pull failed:", e) # ---- pull latest results from HF dataset ---- pull_results_from_hf() with open(DATA_PATH, "r") as f: SAMPLES = {x["id"]: x for x in json.load(f)} with open(ASSIGN_PATH, "r") as f: ASSIGN = json.load(f) # ------------------------ # Utilities # ------------------------ def get_user_samples(user): return ASSIGN.get(user, []) def save_result(record): with open(RESULT_PATH, "a") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def load_existing_results(): if not os.path.exists(RESULT_PATH): return [] records = [] with open(RESULT_PATH, "r", encoding="utf-8") as f: for line in f: try: records.append(json.loads(line)) except: pass return records def get_user_done_ids(user): records = load_existing_results() done = {} for r in records: if r["annotator"] == user: done[r["sample_id"]] = r # 后写的覆盖前面的 return done # {sample_id: last_record} # ------------------------ # State # ------------------------ def init_state(user): sample_ids = get_user_samples(user) done_map = get_user_done_ids(user) done_ids = set(done_map.keys()) # 只保留未完成的 sample pending_ids = [sid for sid in sample_ids if sid not in done_ids] return { "user": user, "sample_ids": sample_ids, "pending_ids": pending_ids, "done_map": done_map, "idx": 0 } api = HfApi() _last_sync_time = 0 def sync_results_to_hf(force=False): global _last_sync_time if not os.path.exists(RESULT_PATH): return now = time.time() if not force and now - _last_sync_time < SYNC_INTERVAL: return try: api.upload_file( path_or_fileobj=RESULT_PATH, path_in_repo="results.jsonl", repo_id=HF_DATASET_REPO, repo_type="dataset", commit_message=f"Sync results at {datetime.utcnow().isoformat()}", ) _last_sync_time = now print("[SYNC] results.jsonl synced to HF dataset.") except Exception as e: print("[SYNC ERROR]", e) # ------------------------ # Load sample # ------------------------ def load_sample(state): if state["idx"] >= len(state["pending_ids"]): return None, None, None, None, "All pending tasks completed." sid = state["pending_ids"][state["idx"]] sample = SAMPLES[sid] return ( sample["audio_url"], sample["captions"]["long"], sample["captions"]["short"], sample["captions"]["tag"], f"Pending {state['idx']+1}/{len(state['pending_ids'])} (ID={sid})" ) # ------------------------ # Submit # ------------------------ def submit(state, long_score, short_score, tag_score): sid = state["pending_ids"][state["idx"]] record = { "timestamp": datetime.utcnow().isoformat(), "annotator": state["user"], "sample_id": sid, "scores": { "long": long_score, "short": short_score, "tag": tag_score } } save_result(record) # >>> 新增:尝试同步 <<< sync_results_to_hf() state["idx"] += 1 return state # ------------------------ # UI # ------------------------ with gr.Blocks(title="Audio-Caption Matching Annotation") as demo: gr.Markdown("# Audio–Caption Matching Annotation") with gr.Row(): user_input = gr.Textbox(label="Annotator ID", placeholder="e.g. annotator_1") start_btn = gr.Button("Start") sync_btn = gr.Button("Finish & Sync results to HF") sync_status = gr.Markdown() state = gr.State() status = gr.Markdown() audio = gr.Audio(label="Audio", type="filepath") with gr.Column(): # ---------- LONG ---------- gr.Markdown("## Long Caption") gr.Markdown( """ **Criteria** 1. **Event accuracy**: Are the sound events in the caption actually present in the audio? 2. **Completeness**: Does the caption miss any major audible events? 3. **Temporal consistency**: Does the sequence of events match the audio timeline? 4. **Acoustic detail**: Does the caption correctly reflect loudness, duration, tone, speed, environment? """ ) long_caption = gr.Textbox(label="Caption (Long)", interactive=False) long_score = gr.Radio( choices=[str(i) for i in range(1, 11)], label="Overall Score (1–10)", value=None ) # ---------- SHORT ---------- gr.Markdown("## Short Caption") gr.Markdown( """ **Criteria** 1. **Event accuracy**: Are the sound events in the caption actually present in the audio? 2. **Completeness**: Does the caption miss any major audible events? """ ) short_caption = gr.Textbox(label="Caption (Short)", interactive=False) short_score = gr.Radio( choices=[str(i) for i in range(1, 11)], label="Overall Score (1–10)", value=None ) # ---------- TAG ---------- gr.Markdown("## Tag") gr.Markdown( """ **Criteria** 1. **Event accuracy**: Are the sound events in the tags actually present in the audio? 2. **Completeness**: Does the tags miss any major audible events? """ ) tag_caption = gr.Textbox(label="Caption (Tag)", interactive=False) tag_score = gr.Radio( choices=[str(i) for i in range(1, 11)], label="Overall Score (1–10)", value=None ) submit_btn = gr.Button("Submit & Next") # ------------------------ # Callbacks # ------------------------ def on_start(user): st = init_state(user) # pending sample audio_url, long_c, short_c, tag_c, msg = load_sample(st) # 已完成样本列表 done_ids = sorted(st["done_map"].keys()) dropdown_choices = [str(sid) for sid in done_ids] return ( st, audio_url, long_c, short_c, tag_c, msg, dropdown_choices ) start_btn.click( on_start, inputs=[user_input], outputs=[state, audio, long_caption, short_caption, tag_caption, status] ) def on_submit(st, l, s, t): if l is None or s is None or t is None: return st, None, None, None, "Please score all captions before submitting." st = submit(st, l, s, t) audio_url, long_c, short_c, tag_c, msg = load_sample(st) # 注意:最后三个 None 是清空评分 return ( st, audio_url, long_c, short_c, tag_c, msg, None, # long_score reset None, # short_score reset None # tag_score reset ) submit_btn.click( on_submit, inputs=[state, long_score, short_score, tag_score], outputs=[ state, audio, long_caption, short_caption, tag_caption, status, long_score, short_score, tag_score ] ) demo.launch()