import os, io, csv, json, random from datetime import datetime import gradio as gr from huggingface_hub import HfApi, hf_hub_download import subprocess, pathlib, hashlib import secrets, time, string from functools import lru_cache TOP_DIRS = [ "Hunyuan_videos", "Opensora_768", "RunwayGen4", "Wan2.2", "wan21_videos", ] def _choose_prefix_from_filename(base: str) -> str: best = None for d in TOP_DIRS: if ( base.startswith(d + "_") or base.startswith(d + "/") or base.startswith(d + "-") or base.startswith(d) ): if best is None or len(d) > len(best): best = d if best is not None: return best if "_" in base: return base.split("_")[0] return os.path.splitext(base)[0] def _extract_action_from_filename(base: str, prefix: str): after_prefix = base[len(prefix):] after_prefix = after_prefix.lstrip("_-") if not after_prefix: return None action = after_prefix.split("_")[0] return action or None def _patterns_for(prefix: str, action: str | None, base: str, root_prefix: str): """ root_prefix == "" -> 경로가 'Wan2.2/ThrowDiscus/file.mp4' root_prefix == "video_examples" -> 경로가 'video_examples/Wan2.2/ThrowDiscus/file.mp4' 즉 root_prefix를 앞에 optional하게 붙여서 두 세트를 만든다. """ # alt_prefix_videos: 'Wan2.2_videos' 같은 경우까지 백업 시도 alt_prefix_videos = prefix if prefix.endswith("_videos") else (prefix + "_videos") def rp(*parts): # parts를 '/'로 join, 그리고 root_prefix가 비어있지 않으면 그 앞에 붙이기 path = "/".join([p for p in parts if p]) if root_prefix: return root_prefix + "/" + path else: return path cand = [ rp(prefix, prefix, base), # Wan2.2/Wan2.2/file rp(prefix, alt_prefix_videos, base), # Wan2.2/Wan2.2_videos/file rp(prefix, base), # Wan2.2/file rp(base), # file at root ] if action: cand += [ rp(prefix, action, base), # Wan2.2/ThrowDiscus/file rp(prefix, prefix, action, base), # Wan2.2/Wan2.2/ThrowDiscus/file rp(prefix, action, f"{prefix}_{action}", base), # Wan2.2/ThrowDiscus/Wan2.2_ThrowDiscus/file rp(prefix, f"{prefix}_{action}", base), # Wan2.2/Wan2.2_ThrowDiscus/file ] # uniq 제거 uniq = [] for c in cand: if c not in uniq: uniq.append(c) return uniq def _candidate_relpaths(url_or_name: str): base = os.path.basename(url_or_name).strip() prefix = _choose_prefix_from_filename(base) action = _extract_action_from_filename(base, prefix) # 이제 두 root 세트를 합친다: # 1) 루트 바로 밑 (root_prefix = "") # 2) 루트 안의 video_examples/ 밑 (root_prefix = "video_examples") cands_root = _patterns_for(prefix, action, base, root_prefix="") cands_videos = _patterns_for(prefix, action, base, root_prefix="video_examples") # 합치고 중복 제거 full = [] for c in cands_root + cands_videos: if c not in full: full.append(c) return full @lru_cache(maxsize=2048) def get_local_video_path(hf_url_or_relpath: str) -> str: # if already local, just ensure it's muted if os.path.exists(hf_url_or_relpath): return ensure_muted_copy(hf_url_or_relpath) if not HF_TOKEN: raise RuntimeError("HF_TOKEN is not set or empty in this Space. Please add it as a secret.") last_err = None for rel in _candidate_relpaths(hf_url_or_relpath): try: local_path = hf_hub_download( repo_id=REPO_ID, filename=rel, repo_type=REPO_TYPE, token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False, ) return ensure_muted_copy(local_path) except Exception as e: last_err = e continue raise RuntimeError( "Could not locate video in repo " + REPO_ID + ". Tried: " + ", ".join(_candidate_relpaths(hf_url_or_relpath)) + f". Last error was {type(last_err).__name__}: {last_err}" ) # 사용자가 '선택'을 완료한 값 (초기엔 None) selected_action = gr.State(None) selected_phys = gr.State(None) def _recompute_save(pid_text: str, sel_a, sel_p): ok = bool(pid_text and pid_text.strip()) and (sel_a is not None) and (sel_p is not None) return gr.update(interactive=ok, variant=("primary" if ok else "secondary")) REWARD_FILE = "reward_codes.csv" # 리워드 코드 기록용 파일 (HF dataset 안에 저장) def _read_codes_bytes(): try: p = hf_hub_download( repo_id=RESULTS_REPO_ID, filename=REWARD_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False ) return open(p, "rb").read() except Exception: return None def _append_code(old_bytes, row): s = io.StringIO() w = csv.writer(s) if not old_bytes: # 새 헤더 w.writerow(["ts_iso", "participant_id", "reward_code", "total_done"]) else: s.write(old_bytes.decode("utf-8", errors="ignore")) w.writerow(row) return s.getvalue().encode("utf-8") def _persist_reward_code(pid: str, code: str, total_done: int): """리워드 코드를 HF에 reward_codes.csv로 누적 저장(append).""" old = _read_codes_bytes() row = [datetime.utcnow().isoformat(), pid.strip(), code, int(total_done)] newb = _append_code(old, row) api.upload_file( path_or_fileobj=io.BytesIO(newb), path_in_repo=REWARD_FILE, repo_id=RESULTS_REPO_ID, repo_type="dataset", token=HF_TOKEN, commit_message="append reward code" ) def _gen_reward_code(pid: str, length: int = 10, forbid_ambiguous: bool=True) -> str: """ 참가자에게 보여줄 랜덤 코드. 충돌 위험 매우 낮음. - PID + 시각 + 보안랜덤으로 시드 - 대문자/숫자 조합 """ alphabet = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" if forbid_ambiguous else string.ascii_uppercase + string.digits # secrets 기반 난수 + pid/time 섞어서 해시 비슷한 효과 rnd = secrets.token_hex(8) + pid + str(time.time_ns()) # 섞고 뽑기 rng = secrets.SystemRandom() return "".join(rng.choice(alphabet) for _ in range(length)) MUTED_CACHE_DIR = "/tmp/hf_video_cache_muted" pathlib.Path(MUTED_CACHE_DIR).mkdir(parents=True, exist_ok=True) def _sha1_8(s: str) -> str: return hashlib.sha1(s.encode("utf-8")).hexdigest()[:8] def ensure_muted_copy(src_path: str) -> str: """ 주어진 mp4에서 오디오 트랙을 제거(-an)한 무음 복사본을 캐시에 생성. 이미 있으면 그 파일 경로 반환. 실패하면 원본 경로 반환. """ if not src_path or not os.path.exists(src_path): return src_path out = os.path.join(MUTED_CACHE_DIR, _sha1_8(src_path) + ".mp4") if os.path.exists(out): return out try: subprocess.run( ["ffmpeg", "-y", "-i", src_path, "-c:v", "copy", "-an", out], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) return out if os.path.exists(out) else src_path except Exception: return src_path FEEDBACK_FILE = "final_feedback.csv" def _read_feedback_bytes(): try: p = hf_hub_download( repo_id=RESULTS_REPO_ID, filename=FEEDBACK_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False ) return open(p, "rb").read() except Exception: return None def _append_feedback(old_bytes, row): s = io.StringIO() w = csv.writer(s) if not old_bytes: # 최종 코멘트 전용 CSV 헤더 w.writerow(["ts_iso", "participant_id", "final_comment"]) else: s.write(old_bytes.decode("utf-8", errors="ignore")) w.writerow(row) return s.getvalue().encode("utf-8") def push_final_feedback(participant_id: str, comment: str): """마지막에 받는 자유 코멘트를 FEEDBACK_FILE로 저장.""" if not participant_id or not participant_id.strip(): return gr.update(visible=True, value="❗ Missing participant ID.") if comment is None or not str(comment).strip(): # 비어있으면 저장하지 않고 안내만 return gr.update(visible=True, value="ℹ️ No comment entered — nothing to submit.") try: old = _read_feedback_bytes() row = [datetime.utcnow().isoformat(), participant_id.strip(), comment.strip()] newb = _append_feedback(old, row) if not REPO_ID: return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") if not HF_TOKEN: return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") api.upload_file( path_or_fileobj=io.BytesIO(newb), path_in_repo=FEEDBACK_FILE, repo_id=RESULTS_REPO_ID, repo_type="dataset", token=HF_TOKEN, commit_message="append final feedback" ) return gr.update(visible=True, value="✅ Thanks! Your feedback was submitted.") except Exception as e: return gr.update( visible=True, value=f"❌ Feedback save failed: {type(e).__name__}: {e}" ) # -------------------- Config -------------------- # REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_eval") # 업로드한 리포와 일치 REPO_ID = os.getenv("RESULTS_REPO", "dghadiya/video_examples") RESULTS_REPO_ID = "dghadiya/video_eval" # <- 네가 원하는 최종 저장 위치 REPO_TYPE = "dataset" HF_TOKEN = os.getenv("HF_TOKEN") RESULTS_FILE = "results.csv" TOTAL_PER_PARTICIPANT = 30 # 목표 평가 개수(세션 기준) # videos.json 예시: {"url": "...mp4", "id": "BodyWeightSquats__XXXX.mp4", "action": "BodyWeightSquats"} with open("videos.json", "r", encoding="utf-8") as f: V = json.load(f) api = HfApi() # 교수님 지침(그대로, 굵게 처리 포함) # INSTRUCTION_MD = """ # **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate how well the person’s action in the AI-generated video matches the action specified as "**expected action**". Some things to keep in mind: # - The generated video should **capture** the expected action **throughout the video**. # - Try to **focus only** on the expected action and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. # - You will be **paid** once **all the videos are viewed and rated**. # """ INSTRUCTION_MD = """ **Task:** You will watch a series of **AI-generated videos**. For each video, your job is to rate: 1. **Action Consistency** - how well the person’s action matches the action specified as the "**Expected action**". 2. **Temporal Coherence** - how natural and physically possible the motion looks. Some things to keep in mind: - Try to **focus only** on the expected action and motion quality, and do **not** judge **video quality**, **attractiveness**, **background**, **camera motion**, or **objects**. - For **physical plausibility**, look for **smooth and realistic** motion without impossible poses, missing limbs, or extreme stretching. - Action consistency and physical plausibility are **not mutually exclusive** with each other. - **Physically plausible motion does not imply correct depiction of action.** - **A video cannot portray action consistency if it has physically impossible movements.** - **0: indicates really poor depiction, 10: represents perfect, realistic depiction** - The **Save & Next** button will be enabled **only after you have clicked or adjusted both sliders at least once**. - You will be **paid** once **all the videos are viewed and rated**. """ # -------------------- Helper funcs -------------------- def _load_eval_counts(): """ Hugging Face dataset의 results.csv를 읽어 video_id별 평가 개수(dict)를 반환. 없으면 0으로 초기화. """ # 모든 id를 0으로 초기화 counts = {} for v in V: vid = _get_video_id(v) counts[vid] = 0 b = _read_csv_bytes() if not b: return counts s = io.StringIO(b.decode("utf-8", errors="ignore")) r = csv.reader(s) rows = list(r) if not rows: return counts # 헤더 파악 header = rows[0] body = rows[1:] if header and ("video_id" in header or "overall" in header) else rows vid_col = None if header and "video_id" in header: vid_col = header.index("video_id") for row in body: try: vid = row[vid_col] if vid_col is not None else row[2] # 기본 포맷: ts, pid, video_id, overall, notes if vid in counts: counts[vid] += 1 except Exception: continue return counts def _get_video_id(v: dict) -> str: if "id" in v and v["id"]: return v["id"] # id가 없으면 URL 파일명으로 대체 return os.path.basename(v.get("url", "")) def _read_csv_bytes(): try: p = hf_hub_download( repo_id=RESULTS_REPO_ID, filename=RESULTS_FILE, repo_type="dataset", token=HF_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False ) return open(p, "rb").read() except Exception: return None # def _append(old_bytes, row): # s = io.StringIO() # w = csv.writer(s) # if not old_bytes: # # ✅ 새 헤더 # w.writerow(["ts_iso", "participant_id", "video_id", "overall", "notes"]) # else: # s.write(old_bytes.decode("utf-8", errors="ignore")) # w.writerow(row) # return s.getvalue().encode("utf-8") def _append(old_bytes, row): s = io.StringIO() w = csv.writer(s) if not old_bytes: # ✅ new header with two scores w.writerow(["ts_iso", "participant_id", "video_id", "action_consistency", "physical_plausibility", "notes"]) else: s.write(old_bytes.decode("utf-8", errors="ignore")) w.writerow(row) return s.getvalue().encode("utf-8") # def push(participant_id, video_id, score, notes=""): # if not participant_id or not participant_id.strip(): # return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.") # if not video_id or score is None: # return gr.update(visible=True, value="❗ Fill out all fields.") # try: # old = _read_csv_bytes() # row = [ # datetime.utcnow().isoformat(), # participant_id.strip(), # video_id, # ✅ action 대신 video_id 저장 # float(score), # overall # notes or "" # ] # newb = _append(old, row) # if not REPO_ID: # return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") # if not HF_TOKEN: # return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") # api.upload_file( # path_or_fileobj=io.BytesIO(newb), # path_in_repo=RESULTS_FILE, # repo_id=REPO_ID, # repo_type="dataset", # token=HF_TOKEN, # commit_message="append" # ) # return gr.update(visible=True, value=f"✅ Saved successfully!") # except Exception as e: # return gr.update( # visible=True, # value=f"❌ Save failed: {type(e).__name__}: {e}\n" # f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" # ) def push(participant_id, video_id, action_score, phys_score, notes=""): if not participant_id or not participant_id.strip(): return gr.update(visible=True, value="❗ Please enter your Participant ID before proceeding.") if not video_id or action_score is None or phys_score is None: return gr.update(visible=True, value="❗ Fill out all fields.") try: old = _read_csv_bytes() row = [ datetime.utcnow().isoformat(), participant_id.strip(), video_id, float(action_score), float(phys_score), notes or "" ] newb = _append(old, row) if not REPO_ID: return gr.update(visible=True, value="❗ RESULTS_REPO is not set.") if not HF_TOKEN: return gr.update(visible=True, value="❗ HF_TOKEN is missing. Set a write token for the dataset repo.") api.upload_file( path_or_fileobj=io.BytesIO(newb), path_in_repo=RESULTS_FILE, repo_id=RESULTS_REPO_ID, repo_type="dataset", token=HF_TOKEN, commit_message="append" ) return gr.update(visible=True, value=f"✅ Saved successfully!") except Exception as e: return gr.update( visible=True, value=f"❌ Save failed: {type(e).__name__}: {e}\n" f"- Check HF_TOKEN permission\n- Check REPO_ID\n- Create dataset repo if missing" ) def _extract_action(v): if "action" in v and v["action"]: return v["action"] raw = v.get("id", "") return raw.split("__")[0].split(".")[0] def pick_one(): v = random.choice(V) return v["url"], _extract_action(v) def _progress_html(done, total): pct = int(100 * done / max(1, total)) return f"""
{done} / {total}
""" def _build_order_with_anchor(total:int, anchor_idx:int, repeats:int, pool_size:int, min_gap:int=1): """ total: TOTAL_PER_PARTICIPANT (e.g., 30) anchor_idx: index of the anchor video in V (0 for first item) repeats: how many times to show anchor (e.g., 5) pool_size: len(V) min_gap: 최소 간격(인접 금지 => 1) return: list of indices (length=total) """ assert repeats <= total, "repeats must be <= total" assert pool_size >= 1, "videos pool must be non-empty" # 1) 다른 비디오 25개(중복 없이) 뽑기 others_needed = total - repeats # anchor를 제외한 후보 인덱스 candidates = list(range(1, pool_size)) if anchor_idx == 0 else [i for i in range(pool_size) if i != anchor_idx] if len(candidates) < others_needed: raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") others = random.sample(candidates, k=others_needed) # 2) 기본 시퀀스(others)를 무작위로 섞기 random.shuffle(others) # 3) 앵커를 min_gap를 만족하도록 삽입할 위치 선정 # 30개를 5구간으로 나눠, 각 구간 내에서 충돌 덜 나게 배치 # (간단하고 안정적인 방식) seq = others[:] # 길이=25 anchor_positions = [] segment = total // repeats # 30//5 = 6 for k in range(repeats): # 각 구간 [k*segment, (k+1)*segment) 안에서 후보 위치를 고름 lo = k * segment hi = (k + 1) * segment if k < repeats - 1 else total # 마지막은 끝까지 # 경계 내 임의 오프셋 선택 (여유를 두고 충돌을 피함) candidate_pos = random.randrange(lo, hi) # 인접 금지 보정: 이미 배정된 anchor 위치들과의 거리가 min_gap 이상 되도록 조정 # 필요 시 좌우로 근접한 빈 슬롯 탐색 def ok(pos): return all(abs(pos - p) >= min_gap + 1 for p in anchor_positions) # 연속금지 => 거리 >= 2 # 근방 탐색 폭 found = None for delta in range(0, segment): # 구간 크기 내에서 탐색 # 좌/우 번갈아가며 후보 시도 for sign in (+1, -1): pos = candidate_pos + sign * delta if 0 <= pos < total and ok(pos): found = pos break if found is not None: break if found is None: # 최후: 0..total-1 범위에서 아무 데나 충돌 없는 곳 찾기 for pos in range(total): if ok(pos): found = pos break if found is None: raise RuntimeError("Failed to place anchor without adjacency. Try different strategy or loosen min_gap.") anchor_positions.append(found) # 4) others를 기반으로 길이 total의 빈 시퀀스를 만들고 anchor를 주입 # 우선 빈 리스트를 만들고 anchor 위치를 채운 후, 나머지를 others로 채움 result = [None] * total for pos in anchor_positions: result[pos] = anchor_idx # others 포인터 j = 0 for i in range(total): if result[i] is None: result[i] = others[j] j += 1 # 안전체크 assert len(result) == total # 인접 anchor 없는지 확인 for i in range(1, total): assert not (result[i] == anchor_idx and result[i-1] == anchor_idx), "Adjacent anchors found." # anchor 개수 확인 assert sum(1 for x in result if x == anchor_idx) == repeats, "Anchor count mismatch." return result # -------------------- Example videos (download to local cache) -------------------- EXAMPLES = { "BodyWeightSquats": { "real": "examples/BodyWeightSquats_real.mp4", "bad": "examples/BodyWeightSquats_bad.mp4", }, "WallPushUps": { "real": "examples/WallPushUps_real.mp4", "bad": "examples/WallPushUps_bad.mp4", }, } # EX_CACHE = {} # for cls, files in EXAMPLES.items(): # EX_CACHE[cls] = {"real": None, "bad": None} # for kind, fname in files.items(): # try: # EX_CACHE[cls][kind] = hf_hub_download( # repo_id=REPO_ID, # filename=fname, # repo_type="dataset", # token=HF_TOKEN, # local_dir="/tmp", # local_dir_use_symlinks=False, # ) # except Exception as e: # print(f"[WARN] example missing: {cls} {kind} -> {fname}: {e}") # for cls in EX_CACHE: # for kind in EX_CACHE[cls]: # if EX_CACHE[cls][kind]: # EX_CACHE[cls][kind] = ensure_muted_copy(EX_CACHE[cls][kind]) CLEAN_BG_CSS = r""" /* 전체 페이지 배경을 흰색으로 (Spaces의 회색 배경 제거) */ html, body { background:#ffffff !important; } /* Intro/Eval 섹션의 래퍼 박스만 투명 처리 (입력창/슬라이더 등 컨트롤은 놔둠) */ #intro .gr-panel, #intro .gr-group, #intro .gr-box, #intro .gr-row, #intro .gr-column, #eval .gr-panel, #eval .gr-group, #eval .gr-box, #eval .gr-row, #eval .gr-column { background: transparent !important; box-shadow: none !important; border-color: transparent !important; } /* 비디오 카드 주변 툴바(작은 회색 박스)도 투명하게 */ #intro [data-testid="block-video"] .prose, #eval [data-testid="block-video"] .prose { background: transparent !important; box-shadow: none !important; border-color: transparent !important; } """ # -------------------- UI -------------------- with gr.Blocks(css=CLEAN_BG_CSS) as demo: # Blocks 안, 어디서든 한 번만 추가(권장 위치: Blocks 시작 직후) gr.HTML(""" """) order_state = gr.State(value=[]) # v4에서는 value= 권장 ptr_state = gr.State(value=0) cur_video_id = gr.State(value="") reward_code_state = gr.State(value="") # 완료 시 생성한 코드 저장(중복 생성 방지) selected_action = gr.State(value=None) # 아직 선택 없음 selected_phys = gr.State(value=None) # 아직 선택 없음 # ------------------ PAGE 1: Intro + Examples ------------------ # page_intro = gr.Group(visible=True) page_intro = gr.Group(visible=True, elem_id="intro") with page_intro: # gr.Markdown("## 🎯 Action Consistency Human Evaluation") gr.Markdown("## 🎯 Human Evaluation: Action Consistency & Temporal Coherence") gr.Markdown(INSTRUCTION_MD) understood = gr.Checkbox(label="I have read and understand the task.", value=False) start_btn = gr.Button("Yes, start", variant="secondary", interactive=False) def _toggle_start(checked: bool): return gr.update(interactive=checked, variant=("primary" if checked else "secondary")) understood.change(_toggle_start, inputs=understood, outputs=start_btn) # # Examples: Squats # with gr.Group(): # gr.Markdown("### Examples: BodyWeightSquats") # with gr.Row(): # with gr.Column(): # gr.Markdown("**Expected depiction of action**") # gr.Video(value=EX_CACHE["BodyWeightSquats"]["real"], height=240, autoplay=False, elem_id="ex_squats_real",) # with gr.Column(): # gr.Markdown("**Poorly generated action**") # gr.Video(value=EX_CACHE["BodyWeightSquats"]["bad"], height=240, autoplay=False) # if not (EX_CACHE["BodyWeightSquats"]["real"] and EX_CACHE["BodyWeightSquats"]["bad"]): # gr.Markdown("> ⚠️ Upload `examples/BodyWeightSquats_real.mp4` and `_bad.mp4` to show both samples.") # # Examples: WallPushUps # with gr.Group(): # gr.Markdown("### Examples: WallPushUps") # with gr.Row(): # with gr.Column(): # gr.Markdown("**Expected depiction of action**") # gr.Video(value=EX_CACHE["WallPushUps"]["real"], height=240, autoplay=False, elem_id="ex_wallpushups_real",) # with gr.Column(): # gr.Markdown("**Poorly generated action**") # gr.Video(value=EX_CACHE["WallPushUps"]["bad"], height=240, autoplay=False) # if not (EX_CACHE["WallPushUps"]["real"] and EX_CACHE["WallPushUps"]["bad"]): # gr.Markdown("> ⚠️ Upload `examples/WallPushUps_real.mp4` and `_bad.mp4` to show both samples.") # Examples: BodyWeightSquats with gr.Group(): gr.Markdown("### Examples: BodyWeightSquats") with gr.Row(): with gr.Column(): gr.Markdown("**Expected depiction of action**") gr.Video( value="examples/BodyWeightSquats_good.mp4", # 무음으로 만든 좋은 예시 height=240, autoplay=False, elem_id="ex_squats_real", ) with gr.Column(): gr.Markdown("**Poorly generated action**") gr.Video( value="examples/BodyWeightSquats_bad.mp4", # 나쁜 예시 height=240, autoplay=False, ) # Examples: WallPushUps with gr.Group(): gr.Markdown("### Examples: WallPushUps") with gr.Row(): with gr.Column(): gr.Markdown("**Expected depiction of action**") gr.Video( value="examples/WallPushups_good.mp4", # 무음으로 다시 만든 good 버전 height=240, autoplay=False, elem_id="ex_wallpushups_real", ) with gr.Column(): gr.Markdown("**Poorly generated action**") gr.Video( value="examples/WallPushups_bad.mp4", # bad 버전 height=240, autoplay=False, ) gr.HTML(""" """) # ------------------ PAGE 2: Evaluation ------------------ page_eval = gr.Group(visible=False, elem_id="eval") with page_eval: # PID 입력 with gr.Row(): pid = gr.Textbox(label="Participant ID (required)", placeholder="e.g., Youngsun-2025/10/24") # 지침(원문) + 비디오 + 진행바 / 오른쪽에 슬라이더 + Save&Next with gr.Row(): #equal_height=True with gr.Column(scale=1): gr.Markdown(INSTRUCTION_MD) # 교수님 문구 그대로 video = gr.Video(label="Video", height=360) progress = gr.HTML(_progress_html(0, TOTAL_PER_PARTICIPANT)) with gr.Column(scale=1): action_tb = gr.Textbox(label="Expected action", interactive=False) # NEW: two separate sliders score_action = gr.Slider( minimum=0.0, maximum=10.0, step=0.1, value=5.0, label="Action Consistency (0.0 - 10.0)" ) score_phys = gr.Slider( minimum=0.0, maximum=10.0, step=0.1, value=5.0, label="Temporal Coherence (0.0 - 10.0)" ) save_next = gr.Button("💾 Save & Next ▶", variant="secondary", interactive=False) status = gr.Markdown(visible=False) done_state = gr.State(0) # PID 입력에 따라 Save&Next 토글 def _toggle_by_pid(pid_text: str): enabled = bool(pid_text and pid_text.strip()) return gr.update(interactive=enabled, variant=("primary" if enabled else "secondary")) # pid.change(_toggle_by_pid, inputs=pid, outputs=save_next) def _on_action_release(val: float, pid_text: str, sel_p): # 사용자가 액션 슬라이더를 한 번이라도 놓으면 선택 완료 return val, _recompute_save(pid_text, val, sel_p) def _on_phys_release(val: float, pid_text: str, sel_a): return val, _recompute_save(pid_text, sel_a, val) # release: 마우스를 놓을 때 1회 확정 score_action.release(_on_action_release, inputs=[score_action, pid, selected_phys], outputs=[selected_action, save_next]) score_phys.release(_on_phys_release, inputs=[score_phys, pid, selected_action], outputs=[selected_phys, save_next]) # PID가 바뀌면 현재 선택 상태로 Save 버튼 재계산 pid.change(_recompute_save, inputs=[pid, selected_action, selected_phys], outputs=save_next) page_thanks = gr.Group(visible=False) with page_thanks: reward_msg = gr.Markdown(visible=False) reward_code_box = gr.Textbox(label="Your reward code (copy & paste)", interactive=False, visible=False) reward_pid_box = gr.Textbox(label="Your participant ID", interactive=False, visible=False) gr.Markdown("### Any comments (optional)") feedback_tb = gr.Textbox( label="Any comments (optional)", placeholder="Leave any feedback for the study organizers (optional).", lines=4 ) feedback_submit = gr.Button("Submit feedback") feedback_status = gr.Markdown(visible=False) # -------- 페이지 전환 & 첫 로드 -------- ANCHOR_IDX = 0 # videos.json의 맨 첫 비디오 ANCHOR_REPEATS = 5 # 앵커 5회 MIN_GAP = 1 # 앵커 연속 금지(인접 금지) def _build_order_least_first_with_anchor(total:int, anchor_idx:int, repeats:int, min_gap:int=1): """ - results.csv를 읽어 video_id별 카운트를 계산 - 앵커(첫 비디오) 5회 포함, 연속 금지 - 나머지는 '가장 적게 평가된 순'으로 중복 없이 채움 """ assert repeats <= total N = len(V) assert N >= 1 # 0) id 매핑 def vid_of(i): return _get_video_id(V[i]) # 1) 현재 누적 카운트 로드 counts = _load_eval_counts() # 2) 앵커 제외 후보(중복 없이) 정렬: 카운트 오름차순, 동률은 랜덤 셔플 anchor_vid = vid_of(anchor_idx) candidates = [i for i in range(N) if i != anchor_idx] # 동률 랜덤화를 위해 일단 셔플 random.shuffle(candidates) candidates.sort(key=lambda i: counts.get(vid_of(i), 0)) others_needed = total - repeats if len(candidates) < others_needed: raise ValueError("Not enough unique non-anchor videos to fill the schedule without duplication.") others = candidates[:others_needed] # 중복 없이 선택 # 3) others를 베이스 시퀀스로(랜덤 살짝 섞기) random.shuffle(others) # 4) 앵커를 구간 배치(연속 금지) seq = [None] * total segment = total // repeats if repeats > 0 else total anchor_positions = [] for k in range(repeats): lo = k * segment hi = (k + 1) * segment if k < repeats - 1 else total cand = random.randrange(lo, hi) def ok(pos): return all(abs(pos - p) >= (min_gap + 1) for p in anchor_positions) found = None for d in range(0, max(1, segment)): for sgn in (+1, -1): pos = cand + sgn * d if 0 <= pos < total and ok(pos): found = pos break if found is not None: break if found is None: # 마지막 수단: 전체 탐색 for pos in range(total): if ok(pos): found = pos break if found is None: raise RuntimeError("Failed to place anchor without adjacency.") anchor_positions.append(found) for pos in anchor_positions: seq[pos] = anchor_idx # 5) 빈 자리를 others로 채우기 j = 0 for i in range(total): if seq[i] is None: seq[i] = others[j] j += 1 # 6) 안전 체크 assert sum(1 for x in seq if x == anchor_idx) == repeats for i in range(1, total): assert not (seq[i] == anchor_idx and seq[i-1] == anchor_idx), "Adjacent anchors found." return seq # def _start_and_load_first(): # total = TOTAL_PER_PARTICIPANT # order = _build_order_least_first_with_anchor( # total=total, # anchor_idx=ANCHOR_IDX, # repeats=ANCHOR_REPEATS, # min_gap=MIN_GAP # ) # first_idx = order[0] # v0 = V[first_idx] # return ( # gr.update(visible=False), # gr.update(visible=True), # gr.update(visible=False), # v0["url"], # _extract_action(v0), # 5.0, # score_action default # 5.0, # score_phys default ✅ NEW # gr.update(visible=False, value=""), # 0, # _progress_html(0, TOTAL_PER_PARTICIPANT), # order, # 1, # _get_video_id(v0), # cur_video_id # "", # None, # ⬅️ selected_action 초기화(= 아직 선택 안됨) # None, # ⬅️ selected_phys 초기화 # gr.update(interactive=False, variant="secondary"), # ⬅️ Save 버튼 잠금 # ) def _start_and_load_first(): total = TOTAL_PER_PARTICIPANT order = _build_order_least_first_with_anchor( total=total, anchor_idx=ANCHOR_IDX, repeats=ANCHOR_REPEATS, min_gap=MIN_GAP ) first_idx = order[0] v0 = V[first_idx] local_vid_path = get_local_video_path(v0["url"]) # <= 여기서 로컬 경로 확보 return ( gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), local_vid_path, # <- v0["url"] 대신 로컬 경로 _extract_action(v0), 5.0, 5.0, gr.update(visible=False, value=""), 0, _progress_html(0, TOTAL_PER_PARTICIPANT), order, 1, _get_video_id(v0), "", None, None, gr.update(interactive=False, variant="secondary"), ) start_btn.click( _start_and_load_first, inputs=[], outputs=[ page_intro, page_eval, page_thanks, video, action_tb, score_action, score_phys, # <-- two sliders status, done_state, progress, order_state, ptr_state, cur_video_id, reward_code_state, selected_action, selected_phys, save_next, ] ) # def save_and_next(participant_id, cur_vid, action_score, phys_score, # done_cnt, order, ptr, reward_code): # try: # if not participant_id or not participant_id.strip(): # return ( # gr.update(visible=True), # gr.update(visible=False), # gr.update(visible=True, value="❗ Please enter your Participant ID."), # gr.update(), gr.update(), # done_cnt, # _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), # 5.0, # reset action slider # 5.0, # reset phys slider ✅ # ptr, # cur_vid, # reward_code, # gr.update(visible=False), # gr.update(visible=False), # gr.update(visible=False), # None, # ⬅️ selected_action reset # None, # ⬅️ selected_phys reset # gr.update(interactive=False, variant="secondary"), # ⬅️ lock button # ) # # save both scores # status_msg = push(participant_id, cur_vid, action_score, phys_score, "") # new_done = int(done_cnt) + 1 # if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order): # code = reward_code.strip() or _gen_reward_code(participant_id, length=10) # try: # _persist_reward_code(participant_id, code, new_done) # except Exception: # pass # thanks_text = ( # "## 🎉 Thank you so much!\n" # "Your responses have been recorded. You may now close this window.\n\n" # "**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment." # ) # return ( # gr.update(visible=False), # gr.update(visible=True), # status_msg, # None, # "", # TOTAL_PER_PARTICIPANT, # _progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT), # 5.0, # reset action # 5.0, # reset phys # len(order), # cur_vid, # code, # gr.update(visible=True, value=thanks_text), # gr.update(visible=True, value=code), # gr.update(visible=True, value=participant_id.strip()), # None, # ⬅️ selected_action reset # None, # ⬅️ selected_phys reset # gr.update(interactive=False, variant="secondary"), # ) # # next video # next_idx = order[ptr] # v = V[next_idx] # next_vid = _get_video_id(v) # local_vid_path = get_local_video_path(v["url"]) # 새 비디오 로컬 경로 # return ( # gr.update(visible=True), # gr.update(visible=False), # status_msg, # local_vid_path, # v["url"], # _extract_action(v), # new_done, # _progress_html(new_done, TOTAL_PER_PARTICIPANT), # 5.0, # reset action # 5.0, # reset phys # ptr + 1, # next_vid, # reward_code, # gr.update(visible=False), # gr.update(visible=False), # gr.update(visible=False), # None, # ⬅️ selected_action reset # None, # ⬅️ selected_phys reset # gr.update(interactive=False, variant="secondary"), # ) # except Exception as e: # return ( # gr.update(visible=True), # gr.update(visible=False), # gr.update(visible=True, value=f"❌ Error: {type(e).__name__}: {e}"), # gr.update(), gr.update(), # done_cnt, # _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), # 5.0, # 5.0, # ptr, # cur_vid, # reward_code, # gr.update(visible=False), # gr.update(visible=False), # gr.update(visible=False), # None, # ⬅️ reset on error too # None, # gr.update(interactive=False, variant="secondary"), # ) def save_and_next(participant_id, cur_vid, action_score, phys_score, done_cnt, order, ptr, reward_code): try: # ---------------------------------- # 0. Participant ID 없을 때 (에러 안내하고 그대로 stay) # ---------------------------------- if not participant_id or not participant_id.strip(): # done_cnt는 아직 state라서 그대로 넘기면 됨 (그건 이미 int일 수도 있고 str일 수도 있는데, # 그대로 보여주는 용도라 괜찮고, 어차피 progress_html도 그걸로 다시 그림) return ( # 1 page_eval gr.update(visible=True), # 2 page_thanks gr.update(visible=False), # 3 status gr.update(visible=True, value="❗ Please enter your Participant ID."), # 4 video (unchanged) gr.update(), # 5 action_tb (unchanged) gr.update(), # 6 done_state (keep same) done_cnt, # 7 progress html _progress_html(done_cnt, TOTAL_PER_PARTICIPANT), # 8 score_action reset 5.0, # 9 score_phys reset 5.0, # 10 ptr_state (unchanged) ptr, # 11 cur_video_id (unchanged) cur_vid, # 12 reward_code_state (unchanged) reward_code, # 13 reward_msg gr.update(visible=False), # 14 reward_code_box gr.update(visible=False), # 15 reward_pid_box gr.update(visible=False), # 16 selected_action reset None, # 17 selected_phys reset None, # 18 save_next button -> lock again, because not valid gr.update(interactive=False, variant="secondary"), ) # ---------------------------------- # 1. 정상 저장 # ---------------------------------- status_msg = push(participant_id, cur_vid, action_score, phys_score, "") # done_cnt는 gr.State로 관리 중이었고, 이게 int로 와야 정상. # 하지만 이전 wiring 문제 때문에 문자열이 올 수도 있었음. 한 번 캐스팅 시도: try: cur_done_int = int(done_cnt) except Exception: # 만약 str이 오면 그냥 0으로부터 다시 센다 (안전빵 fallback) cur_done_int = 0 new_done = cur_done_int + 1 # ---------------------------------- # 2. 마지막 비디오 다 끝난 경우 (보상 코드 보여주고 종료 화면) # ---------------------------------- if new_done >= TOTAL_PER_PARTICIPANT or ptr >= len(order): code = reward_code.strip() if isinstance(reward_code, str) and reward_code.strip() else _gen_reward_code(participant_id, length=10) try: _persist_reward_code(participant_id, code, new_done) except Exception: pass thanks_text = ( "## 🎉 Thank you so much!\n" "Your responses have been recorded. You may now close this window.\n\n" "**Below are your reward code and ID.** Please copy them and submit them to **AMT** to receive your payment." ) return ( # 1 page_eval -> hide gr.update(visible=False), # 2 page_thanks -> show gr.update(visible=True), # 3 status status_msg, # 4 video -> no next video None, # 5 action_tb -> blank "", # 6 done_state -> final count TOTAL_PER_PARTICIPANT, # 7 progress -> 30/30 full _progress_html(TOTAL_PER_PARTICIPANT, TOTAL_PER_PARTICIPANT), # 8 score_action reset 5.0, # 9 score_phys reset 5.0, # 10 ptr_state -> end ptr len(order), # 11 cur_video_id -> keep current or empty cur_vid, # 12 reward_code_state -> store code code, # 13 reward_msg markdown -> visible with thank you gr.update(visible=True, value=thanks_text), # 14 reward_code_box -> visible code gr.update(visible=True, value=code), # 15 reward_pid_box -> visible pid gr.update(visible=True, value=participant_id.strip()), # 16 selected_action reset None, # 17 selected_phys reset None, # 18 save_next -> disable gr.update(interactive=False, variant="secondary"), ) # ---------------------------------- # 3. 아직 끝 안 났으니까 다음 비디오 로드 # ---------------------------------- next_idx = order[ptr] v = V[next_idx] next_vid_id = _get_video_id(v) # 로컬 mp4 경로 (mute 사본) local_vid_path = get_local_video_path(v["url"]) return ( # 1 page_eval -> stay visible gr.update(visible=True), # 2 page_thanks -> hide gr.update(visible=False), # 3 status -> "✅ Saved successfully!" 같은 메시지 status_msg, # 4 video -> next local video file path local_vid_path, # 5 action_tb -> expected action text _extract_action(v), # 6 done_state -> incremented int new_done, # 7 progress -> updated bar _progress_html(new_done, TOTAL_PER_PARTICIPANT), # 8 score_action reset to mid 5.0, # 9 score_phys reset to mid 5.0, # 10 ptr_state -> ptr+1 ptr + 1, # 11 cur_video_id -> this video's ID/path for logging next_vid_id, # 12 reward_code_state -> unchanged so far reward_code, # 13 reward_msg -> hide at this stage gr.update(visible=False), # 14 reward_code_box -> hide gr.update(visible=False), # 15 reward_pid_box -> hide gr.update(visible=False), # 16 selected_action reset None, # 17 selected_phys reset None, # 18 save_next -> disable until sliders moved again gr.update(interactive=False, variant="secondary"), ) except Exception as e: # 에러 fallback return ( # 1 page_eval gr.update(visible=True), # 2 page_thanks gr.update(visible=False), # 3 status gr.update(visible=True, value=f"❌ Error: {type(e).__name__}: {e}"), # 4 video unchanged gr.update(), # 5 action_tb unchanged gr.update(), # 6 done_state unchanged done_cnt, # 7 progress use old done_cnt safely _progress_html(done_cnt if isinstance(done_cnt, int) else 0, TOTAL_PER_PARTICIPANT), # 8 score_action reset 5.0, # 9 score_phys reset 5.0, # 10 ptr_state unchanged ptr, # 11 cur_video_id unchanged cur_vid, # 12 reward_code_state unchanged reward_code, # 13 reward_msg hide gr.update(visible=False), # 14 reward_code_box hide gr.update(visible=False), # 15 reward_pid_box hide gr.update(visible=False), # 16 selected_action reset None, # 17 selected_phys reset None, # 18 save_next disable gr.update(interactive=False, variant="secondary"), ) save_next.click( save_and_next, inputs=[pid, cur_video_id, score_action, score_phys, done_state, order_state, ptr_state, reward_code_state], outputs=[ page_eval, page_thanks, status, video, action_tb, done_state, progress, score_action, score_phys, ptr_state, cur_video_id, reward_code_state, reward_msg, reward_code_box, reward_pid_box, selected_action, selected_phys, # ⬅️ 추가 save_next, # ⬅️ 버튼 상태 갱신 ] ) feedback_submit.click( push_final_feedback, # 완료 후 page_thanks에서 보여주는 participant ID 박스를 사용 (값이 채워져 있음) inputs=[reward_pid_box, feedback_tb], outputs=feedback_status ) if __name__ == "__main__": # demo.launch() demo.launch(ssr_mode=False)