Jazzcharles
initial deploy
dbfa08c
import gradio as gr
import json
import os
from datetime import datetime
from huggingface_hub import HfApi
import time
HF_DATASET_REPO = "Jazzcharles/audioverse_for_annotation"
SYNC_INTERVAL = 300 # 秒,Space 环境建议 60~300
# os.environ["GRADIO_TEMP_DIR"] = "/home/jilan_xu/qwen/assets/gradio_temp"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TMP_DIR = os.path.join(BASE_DIR, "gradio_tmp")
os.makedirs(TMP_DIR, exist_ok=True)
os.environ["GRADIO_TEMP_DIR"] = TMP_DIR
DATA_PATH = "data/samples_for_annotation_with_urls.json"
ASSIGN_PATH = "data/assignments.json"
RESULT_PATH = "results/results.jsonl"
os.makedirs("results", exist_ok=True)
def pull_results_from_hf():
"""
Download results.jsonl from HF dataset repo to local RESULT_PATH.
If download fails, keep local file untouched.
"""
try:
os.makedirs(os.path.dirname(RESULT_PATH), exist_ok=True)
api.download_file(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
filename="results.jsonl",
local_dir=os.path.dirname(RESULT_PATH),
local_dir_use_symlinks=False,
)
print("[INIT] Pulled results.jsonl from HF dataset.")
except Exception as e:
print("[INIT] No remote results.jsonl or pull failed:", e)
# ---- pull latest results from HF dataset ----
pull_results_from_hf()
with open(DATA_PATH, "r") as f:
SAMPLES = {x["id"]: x for x in json.load(f)}
with open(ASSIGN_PATH, "r") as f:
ASSIGN = json.load(f)
# ------------------------
# Utilities
# ------------------------
def get_user_samples(user):
return ASSIGN.get(user, [])
def save_result(record):
with open(RESULT_PATH, "a") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def load_existing_results():
if not os.path.exists(RESULT_PATH):
return []
records = []
with open(RESULT_PATH, "r", encoding="utf-8") as f:
for line in f:
try:
records.append(json.loads(line))
except:
pass
return records
def get_user_done_ids(user):
records = load_existing_results()
done = {}
for r in records:
if r["annotator"] == user:
done[r["sample_id"]] = r # 后写的覆盖前面的
return done # {sample_id: last_record}
# ------------------------
# State
# ------------------------
def init_state(user):
sample_ids = get_user_samples(user)
done_map = get_user_done_ids(user)
done_ids = set(done_map.keys())
# 只保留未完成的 sample
pending_ids = [sid for sid in sample_ids if sid not in done_ids]
return {
"user": user,
"sample_ids": sample_ids,
"pending_ids": pending_ids,
"done_map": done_map,
"idx": 0
}
api = HfApi()
_last_sync_time = 0
def sync_results_to_hf(force=False):
global _last_sync_time
if not os.path.exists(RESULT_PATH):
return
now = time.time()
if not force and now - _last_sync_time < SYNC_INTERVAL:
return
try:
api.upload_file(
path_or_fileobj=RESULT_PATH,
path_in_repo="results.jsonl",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
commit_message=f"Sync results at {datetime.utcnow().isoformat()}",
)
_last_sync_time = now
print("[SYNC] results.jsonl synced to HF dataset.")
except Exception as e:
print("[SYNC ERROR]", e)
# ------------------------
# Load sample
# ------------------------
def load_sample(state):
if state["idx"] >= len(state["pending_ids"]):
return None, None, None, None, "All pending tasks completed."
sid = state["pending_ids"][state["idx"]]
sample = SAMPLES[sid]
return (
sample["audio_url"],
sample["captions"]["long"],
sample["captions"]["short"],
sample["captions"]["tag"],
f"Pending {state['idx']+1}/{len(state['pending_ids'])} (ID={sid})"
)
# ------------------------
# Submit
# ------------------------
def submit(state, long_score, short_score, tag_score):
sid = state["pending_ids"][state["idx"]]
record = {
"timestamp": datetime.utcnow().isoformat(),
"annotator": state["user"],
"sample_id": sid,
"scores": {
"long": long_score,
"short": short_score,
"tag": tag_score
}
}
save_result(record)
# >>> 新增:尝试同步 <<<
sync_results_to_hf()
state["idx"] += 1
return state
# ------------------------
# UI
# ------------------------
with gr.Blocks(title="Audio-Caption Matching Annotation") as demo:
gr.Markdown("# Audio–Caption Matching Annotation")
with gr.Row():
user_input = gr.Textbox(label="Annotator ID", placeholder="e.g. annotator_1")
start_btn = gr.Button("Start")
sync_btn = gr.Button("Finish & Sync results to HF")
sync_status = gr.Markdown()
state = gr.State()
status = gr.Markdown()
audio = gr.Audio(label="Audio", type="filepath")
with gr.Column():
# ---------- LONG ----------
gr.Markdown("## Long Caption")
gr.Markdown(
"""
**Criteria**
1. **Event accuracy**: Are the sound events in the caption actually present in the audio?
2. **Completeness**: Does the caption miss any major audible events?
3. **Temporal consistency**: Does the sequence of events match the audio timeline?
4. **Acoustic detail**: Does the caption correctly reflect loudness, duration, tone, speed, environment?
"""
)
long_caption = gr.Textbox(label="Caption (Long)", interactive=False)
long_score = gr.Radio(
choices=[str(i) for i in range(1, 11)],
label="Overall Score (1–10)",
value=None
)
# ---------- SHORT ----------
gr.Markdown("## Short Caption")
gr.Markdown(
"""
**Criteria**
1. **Event accuracy**: Are the sound events in the caption actually present in the audio?
2. **Completeness**: Does the caption miss any major audible events?
"""
)
short_caption = gr.Textbox(label="Caption (Short)", interactive=False)
short_score = gr.Radio(
choices=[str(i) for i in range(1, 11)],
label="Overall Score (1–10)",
value=None
)
# ---------- TAG ----------
gr.Markdown("## Tag")
gr.Markdown(
"""
**Criteria**
1. **Event accuracy**: Are the sound events in the tags actually present in the audio?
2. **Completeness**: Does the tags miss any major audible events?
"""
)
tag_caption = gr.Textbox(label="Caption (Tag)", interactive=False)
tag_score = gr.Radio(
choices=[str(i) for i in range(1, 11)],
label="Overall Score (1–10)",
value=None
)
submit_btn = gr.Button("Submit & Next")
# ------------------------
# Callbacks
# ------------------------
def on_start(user):
st = init_state(user)
# pending sample
audio_url, long_c, short_c, tag_c, msg = load_sample(st)
# 已完成样本列表
done_ids = sorted(st["done_map"].keys())
dropdown_choices = [str(sid) for sid in done_ids]
return (
st,
audio_url,
long_c,
short_c,
tag_c,
msg,
dropdown_choices
)
start_btn.click(
on_start,
inputs=[user_input],
outputs=[state, audio, long_caption, short_caption, tag_caption, status]
)
def on_submit(st, l, s, t):
if l is None or s is None or t is None:
return st, None, None, None, "Please score all captions before submitting."
st = submit(st, l, s, t)
audio_url, long_c, short_c, tag_c, msg = load_sample(st)
# 注意:最后三个 None 是清空评分
return (
st,
audio_url,
long_c,
short_c,
tag_c,
msg,
None, # long_score reset
None, # short_score reset
None # tag_score reset
)
submit_btn.click(
on_submit,
inputs=[state, long_score, short_score, tag_score],
outputs=[
state,
audio,
long_caption,
short_caption,
tag_caption,
status,
long_score,
short_score,
tag_score
]
)
demo.launch()