Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| import uuid | |
| import random | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| # ---------------------------- | |
| # Config | |
| # ---------------------------- | |
| STUDY_ROOT = Path("study") | |
| CATEGORIES = ["search_t2v", "search_i2v", "opt_t2v", "opt_i2v"] | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| RESULTS_REPO_ID = os.getenv("RESULTS_REPO_ID") | |
| api = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| LOCAL_RESULTS_DIR = Path("local_results") | |
| LOCAL_RESULTS_DIR.mkdir(exist_ok=True) | |
| # ---------------------------- | |
| # Pair discovery | |
| # ---------------------------- | |
| def normalize_stem(path: Path) -> str: | |
| """ | |
| Match files like: | |
| 0001_good.mp4 | |
| 0001_bad.mp4 | |
| and return: | |
| 0001 | |
| """ | |
| m = re.match(r"^(\d+)_(good|bad)$", path.stem) | |
| if m: | |
| return m.group(1) | |
| return path.stem | |
| def build_pairs(): | |
| all_pairs = [] | |
| for category in CATEGORIES: | |
| good_dir = STUDY_ROOT / category / "good" | |
| bad_dir = STUDY_ROOT / category / "bad" | |
| good_files = ( | |
| list(good_dir.glob("*.mp4")) | |
| + list(good_dir.glob("*.webm")) | |
| + list(good_dir.glob("*.ogg")) | |
| ) | |
| bad_files = ( | |
| list(bad_dir.glob("*.mp4")) | |
| + list(bad_dir.glob("*.webm")) | |
| + list(bad_dir.glob("*.ogg")) | |
| ) | |
| good_map = {} | |
| for p in good_files: | |
| key = normalize_stem(p) | |
| good_map.setdefault(key, []).append(p) | |
| bad_map = {} | |
| for p in bad_files: | |
| key = normalize_stem(p) | |
| bad_map.setdefault(key, []).append(p) | |
| shared_keys = sorted(set(good_map.keys()) & set(bad_map.keys())) | |
| for key in shared_keys: | |
| goods = sorted(good_map[key]) | |
| bads = sorted(bad_map[key]) | |
| n = min(len(goods), len(bads)) | |
| for i in range(n): | |
| all_pairs.append( | |
| { | |
| "category": category, | |
| "pair_id": f"{category}::{key}::{i}", | |
| "key": key, | |
| "good_path": str(goods[i].resolve()), | |
| "bad_path": str(bads[i].resolve()), | |
| "good_file": goods[i].name, | |
| "bad_file": bads[i].name, | |
| } | |
| ) | |
| return all_pairs | |
| PAIRS = build_pairs() | |
| if len(PAIRS) == 0: | |
| raise RuntimeError("No matched good/bad pairs found.") | |
| # ---------------------------- | |
| # Video helper | |
| # ---------------------------- | |
| def video_value(path_str: str): | |
| path = Path(path_str) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Video file not found: {path}") | |
| return str(path.resolve()) | |
| # ---------------------------- | |
| # Sampling logic | |
| # ---------------------------- | |
| def sample_pair(seen_pair_ids): | |
| seen_pair_ids = set(seen_pair_ids or []) | |
| unseen = [p for p in PAIRS if p["pair_id"] not in seen_pair_ids] | |
| pool = unseen if unseen else PAIRS | |
| pair = random.choice(pool) | |
| good_on_left = random.choice([True, False]) | |
| if good_on_left: | |
| left_path = pair["good_path"] | |
| right_path = pair["bad_path"] | |
| left_label = "good" | |
| right_label = "bad" | |
| else: | |
| left_path = pair["bad_path"] | |
| right_path = pair["good_path"] | |
| left_label = "bad" | |
| right_label = "good" | |
| current = { | |
| **pair, | |
| "left_path": left_path, | |
| "right_path": right_path, | |
| "left_hidden_label": left_label, | |
| "right_hidden_label": right_label, | |
| "good_on_left": good_on_left, | |
| } | |
| return current | |
| # ---------------------------- | |
| # Hub / persistence helpers | |
| # ---------------------------- | |
| def local_save(record): | |
| local_name = LOCAL_RESULTS_DIR / f"{record['response_id']}.json" | |
| with open(local_name, "w", encoding="utf-8") as f: | |
| json.dump(record, f, ensure_ascii=False, indent=2) | |
| return str(local_name) | |
| def check_hub_setup(): | |
| """ | |
| Returns a warning string if Hub saving is not configured correctly. | |
| Otherwise returns None. | |
| """ | |
| if not RESULTS_REPO_ID: | |
| return "RESULTS_REPO_ID is not set. Responses will be saved locally only." | |
| if not HF_TOKEN: | |
| return "HF_TOKEN is not set. Responses will be saved locally only." | |
| try: | |
| api.repo_info(repo_id=RESULTS_REPO_ID, repo_type="dataset") | |
| return None | |
| except Exception as e: | |
| return f"Hub dataset check failed: {type(e).__name__}: {e}" | |
| HUB_WARNING = check_hub_setup() | |
| def save_response(record): | |
| """ | |
| Always save locally first. | |
| Then try to upload to the dataset repo. | |
| Never raise an exception to the UI callback. | |
| """ | |
| local_path = local_save(record) | |
| if not RESULTS_REPO_ID or not HF_TOKEN or api is None: | |
| return { | |
| "ok": True, | |
| "saved_local": True, | |
| "saved_hub": False, | |
| "message": f"Saved locally to {local_path}. Hub upload is not configured." | |
| } | |
| tmp_name = f"/tmp/{record['response_id']}.json" | |
| with open(tmp_name, "w", encoding="utf-8") as f: | |
| json.dump(record, f, ensure_ascii=False, indent=2) | |
| remote_path = f"responses/{record['timestamp'][:10]}/{record['response_id']}.json" | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=tmp_name, | |
| path_in_repo=remote_path, | |
| repo_id=RESULTS_REPO_ID, | |
| repo_type="dataset", | |
| ) | |
| return { | |
| "ok": True, | |
| "saved_local": True, | |
| "saved_hub": True, | |
| "message": f"Saved to dataset repo: {remote_path}" | |
| } | |
| except Exception as e: | |
| return { | |
| "ok": False, | |
| "saved_local": True, | |
| "saved_hub": False, | |
| "message": f"Saved locally to {local_path}, but Hub upload failed: {type(e).__name__}: {e}" | |
| } | |
| # ---------------------------- | |
| # Gradio callbacks | |
| # ---------------------------- | |
| def start_session(): | |
| participant_id = str(uuid.uuid4()) | |
| seen_pair_ids = [] | |
| current = sample_pair(seen_pair_ids) | |
| status_msg = "Study loaded. Watch both videos and answer the questions below." | |
| if HUB_WARNING: | |
| status_msg += f"\n\nWarning: {HUB_WARNING}" | |
| return ( | |
| video_value(current["left_path"]), | |
| video_value(current["right_path"]), | |
| f"Participant ID: {participant_id}", | |
| participant_id, | |
| seen_pair_ids, | |
| current, | |
| None, | |
| None, | |
| status_msg, | |
| gr.update(visible=False), # hide Study study | |
| gr.update(visible=True), # show Submit and continue | |
| ) | |
| def submit_and_next( | |
| plausible_answer, | |
| quality_answer, | |
| participant_id, | |
| seen_pair_ids, | |
| current | |
| ): | |
| if current is None: | |
| return ( | |
| None, None, seen_pair_ids, current, | |
| "No current pair loaded." | |
| ) | |
| if plausible_answer is None or quality_answer is None: | |
| return ( | |
| video_value(current["left_path"]), | |
| video_value(current["right_path"]), | |
| seen_pair_ids, | |
| current, | |
| "Please answer both questions before continuing." | |
| ) | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| record = { | |
| "response_id": str(uuid.uuid4()), | |
| "timestamp": timestamp, | |
| "participant_id": participant_id, | |
| "category": current["category"], | |
| "pair_id": current["pair_id"], | |
| "pair_key": current["key"], | |
| "left_video": current["left_path"], | |
| "right_video": current["right_path"], | |
| "good_video": current["good_path"], | |
| "bad_video": current["bad_path"], | |
| "good_on_left": current["good_on_left"], | |
| "left_hidden_label": current["left_hidden_label"], | |
| "right_hidden_label": current["right_hidden_label"], | |
| "physical_plausibility_answer": plausible_answer, | |
| "visual_quality_answer": quality_answer, | |
| } | |
| save_result = save_response(record) | |
| seen_pair_ids = list(set(seen_pair_ids + [current["pair_id"]])) | |
| next_pair = sample_pair(seen_pair_ids) | |
| status_msg = save_result["message"] | |
| if not save_result["saved_hub"]: | |
| status_msg += "\n\nYour response was still saved locally." | |
| return ( | |
| video_value(next_pair["left_path"]), | |
| video_value(next_pair["right_path"]), | |
| seen_pair_ids, | |
| next_pair, | |
| status_msg | |
| ) | |
| # ---------------------------- | |
| # UI | |
| # ---------------------------- | |
| with gr.Blocks(title="Human Study") as demo: | |
| gr.Markdown( | |
| """ | |
| # Human Study | |
| Please answer as many video pairs as you can. | |
| You do not need to finish the full study. Each response is saved automatically when you click **Submit and continue**, so you may stop at any time. | |
| ### Physical plausibility | |
| - Watch both videos and choose the one that is relatively better in terms of physical realism. | |
| - Sometimes both videos may be imperfect, but please select the one with better physics than the other. | |
| - Pay attention to object motion, interactions, and overall realism. | |
| - Please avoid choosing **No preference** for physical plausibility as much as possible. | |
| ### Visual quality | |
| - Choose the video with better visual quality. | |
| - Focus on overall visual appearance and rendering quality. | |
| """ | |
| ) | |
| participant_label = gr.Markdown() | |
| with gr.Row(): | |
| video_left = gr.Video( | |
| label="Left", | |
| interactive=False, | |
| format="mp4", | |
| ) | |
| video_right = gr.Video( | |
| label="Right", | |
| interactive=False, | |
| format="mp4", | |
| ) | |
| plausible = gr.Radio( | |
| choices=["Left", "Right", "No preference"], | |
| label="Which video is more physically plausible?" | |
| ) | |
| quality = gr.Radio( | |
| choices=["Left", "Right", "No preference"], | |
| label="Which video has better visual quality?" | |
| ) | |
| status = gr.Markdown() | |
| start_btn = gr.Button("Study study", visible=True) | |
| next_btn = gr.Button("Submit and continue", visible=False) | |
| participant_id_state = gr.State() | |
| seen_pair_ids_state = gr.State([]) | |
| current_pair_state = gr.State() | |
| start_btn.click( | |
| fn=start_session, | |
| outputs=[ | |
| video_left, | |
| video_right, | |
| participant_label, | |
| participant_id_state, | |
| seen_pair_ids_state, | |
| current_pair_state, | |
| plausible, | |
| quality, | |
| status, | |
| start_btn, | |
| next_btn, | |
| ] | |
| ) | |
| next_btn.click( | |
| fn=submit_and_next, | |
| inputs=[ | |
| plausible, | |
| quality, | |
| participant_id_state, | |
| seen_pair_ids_state, | |
| current_pair_state | |
| ], | |
| outputs=[ | |
| video_left, | |
| video_right, | |
| seen_pair_ids_state, | |
| current_pair_state, | |
| status | |
| ] | |
| ).then( | |
| lambda: (None, None), | |
| outputs=[plausible, quality] | |
| ) | |
| demo.launch() |