Spaces:
Build error
Build error
| from __future__ import annotations | |
| import csv | |
| import copy | |
| import hashlib | |
| import json | |
| import os | |
| import random | |
| import re | |
| import subprocess | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple | |
| from filelock import FileLock | |
| try: | |
| import imageio_ffmpeg | |
| except ImportError: # pragma: no cover - optional runtime dependency | |
| imageio_ffmpeg = None | |
| METHOD_FALLBACK_LABELS = { | |
| "anyact": "AnyAct (ours)", | |
| "vlm_hy_motion": "VLM+HY-Motion", | |
| "echomotion": "EchoMotion", | |
| } | |
| PAIRWISE_METHOD_PAIRS: List[Tuple[str, str]] = [ | |
| ("anyact", "vlm_hy_motion"), | |
| ("anyact", "echomotion"), | |
| ] | |
| CHOICE_OPTIONS = ["ResultA", "ResultB"] | |
| CHOICE_RESULT_A = "ResultA" | |
| CHOICE_RESULT_B = "ResultB" | |
| CHOICE_TIE = "Tie" | |
| CSV_COLUMNS = [ | |
| "participant_id", | |
| "consent", | |
| "study_id", | |
| "study_title", | |
| "question_id", | |
| "question_position", | |
| "total_questions", | |
| "case_id", | |
| "case_title", | |
| "source_key", | |
| "pair_id", | |
| "result_a_method", | |
| "result_b_method", | |
| "left_method", | |
| "right_method", | |
| "reference_video", | |
| "result_a_video", | |
| "result_b_video", | |
| "left_video", | |
| "right_video", | |
| "answer_similarity", | |
| "answer_similarity_method", | |
| "answer_similarity_video", | |
| "answer_quality", | |
| "answer_quality_method", | |
| "answer_quality_video", | |
| "answer_preference", | |
| "answer_preference_method", | |
| "answer_preference_video", | |
| "answered_at", | |
| "duration_seconds", | |
| "session_hash", | |
| "user_agent", | |
| "started_at", | |
| "updated_at", | |
| ] | |
| def now_iso() -> str: | |
| return datetime.now().astimezone().isoformat(timespec="seconds") | |
| def get_results_dir(project_root: Path) -> Path: | |
| explicit_dir = os.environ.get("USER_STUDY_RESULTS_DIR", "").strip() | |
| if explicit_dir: | |
| return Path(explicit_dir).expanduser().resolve() | |
| if os.environ.get("SPACE_ID"): | |
| space_data_dir = Path("/data") | |
| if space_data_dir.exists(): | |
| return (space_data_dir / "user_study_results").resolve() | |
| return (project_root / "results").resolve() | |
| def ensure_runtime_dirs(project_root: Path) -> None: | |
| results_dir = get_results_dir(project_root) | |
| for path in [ | |
| results_dir, | |
| results_dir / "participants", | |
| results_dir / "participants_archive", | |
| results_dir / "plots", | |
| results_dir / "locks", | |
| ]: | |
| path.mkdir(parents=True, exist_ok=True) | |
| responses_csv = results_dir / "responses.csv" | |
| if not responses_csv.exists(): | |
| with responses_csv.open("w", newline="", encoding="utf-8") as handle: | |
| writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS) | |
| writer.writeheader() | |
| responses_jsonl = results_dir / "responses.jsonl" | |
| responses_jsonl.touch(exist_ok=True) | |
| def generate_participant_id() -> str: | |
| return str(uuid.uuid4()) | |
| def sanitize_participant_id(raw_value: str | None) -> str: | |
| cleaned = re.sub(r"[^A-Za-z0-9_-]", "_", (raw_value or "").strip()) | |
| return cleaned[:80] | |
| def humanize_source_key(source_key: str) -> str: | |
| return source_key.replace("_", " ").strip().title() | |
| def stable_int_seed(text: str) -> int: | |
| digest = hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| return int(digest[:16], 16) | |
| def build_pair_id(method_a: str, method_b: str) -> str: | |
| return f"{method_a}_vs_{method_b}" | |
| def normalize_choice_value(raw_value: Any) -> str: | |
| if raw_value is None: | |
| return "" | |
| cleaned = str(raw_value).strip() | |
| if not cleaned: | |
| return "" | |
| compact = re.sub(r"[\s_-]+", "", cleaned).lower() | |
| if compact in {"left", "resulta", "a"}: | |
| return CHOICE_RESULT_A | |
| if compact in {"right", "resultb", "b"}: | |
| return CHOICE_RESULT_B | |
| if compact in {"tie", "equal", "same"}: | |
| return CHOICE_TIE | |
| return cleaned | |
| def _sync_result_slot_fields(row: Dict[str, Any], case: Dict[str, Any] | None = None) -> Dict[str, Any]: | |
| result_a_method = str(row.get("result_a_method") or row.get("left_method") or "").strip() | |
| result_b_method = str(row.get("result_b_method") or row.get("right_method") or "").strip() | |
| result_a_video = str(row.get("result_a_video") or row.get("left_video") or "").strip() | |
| result_b_video = str(row.get("result_b_video") or row.get("right_video") or "").strip() | |
| if case is not None: | |
| method_videos = case.get("method_videos", {}) | |
| if result_a_method in method_videos: | |
| result_a_video = str(method_videos[result_a_method]) | |
| if result_b_method in method_videos: | |
| result_b_video = str(method_videos[result_b_method]) | |
| if case.get("reference_video"): | |
| row["reference_video"] = case["reference_video"] | |
| row["result_a_method"] = result_a_method | |
| row["result_b_method"] = result_b_method | |
| row["left_method"] = result_a_method | |
| row["right_method"] = result_b_method | |
| row["result_a_video"] = result_a_video | |
| row["result_b_video"] = result_b_video | |
| row["left_video"] = result_a_video | |
| row["right_video"] = result_b_video | |
| return row | |
| def _resolve_choice_targets(row: Dict[str, Any], raw_choice: Any) -> tuple[str, str]: | |
| normalized_choice = normalize_choice_value(raw_choice) | |
| if normalized_choice == CHOICE_RESULT_A: | |
| return ( | |
| str(row.get("result_a_method") or row.get("left_method") or "").strip(), | |
| str(row.get("result_a_video") or row.get("left_video") or "").strip(), | |
| ) | |
| if normalized_choice == CHOICE_RESULT_B: | |
| return ( | |
| str(row.get("result_b_method") or row.get("right_method") or "").strip(), | |
| str(row.get("result_b_video") or row.get("right_video") or "").strip(), | |
| ) | |
| return "", "" | |
| def upgrade_response_row_schema(row: Dict[str, Any], case: Dict[str, Any] | None = None) -> Dict[str, Any]: | |
| upgraded = row | |
| _sync_result_slot_fields(upgraded, case=case) | |
| for metric_key in ["answer_similarity", "answer_quality", "answer_preference"]: | |
| normalized_choice = normalize_choice_value(upgraded.get(metric_key)) | |
| if normalized_choice: | |
| upgraded[metric_key] = normalized_choice | |
| elif metric_key not in upgraded: | |
| upgraded[metric_key] = "" | |
| selected_method, selected_video = _resolve_choice_targets(upgraded, upgraded.get(metric_key)) | |
| upgraded[f"{metric_key}_method"] = selected_method | |
| upgraded[f"{metric_key}_video"] = selected_video | |
| return upgraded | |
| def _web_video_cache_dir(project_root: Path) -> Path: | |
| cache_dir = get_results_dir(project_root) / "web_video_cache" | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| return cache_dir | |
| def _thumbnail_cache_dir(project_root: Path) -> Path: | |
| cache_dir = get_results_dir(project_root) / "thumbnail_cache" | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| return cache_dir | |
| def _synced_video_cache_dir(project_root: Path) -> Path: | |
| cache_dir = get_results_dir(project_root) / "synced_video_cache" | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| return cache_dir | |
| def _probe_video_stream(video_path: Path) -> Dict[str, str]: | |
| if imageio_ffmpeg is None: | |
| return {} | |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() | |
| result = subprocess.run( | |
| [ffmpeg_exe, "-i", str(video_path)], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore", | |
| ) | |
| stderr_text = result.stderr or "" | |
| match = re.search(r"Video:\s*([^\s,(]+).*?(yuv[a-zA-Z0-9]+)?", stderr_text) | |
| if not match: | |
| return {} | |
| codec_name = (match.group(1) or "").strip().lower() | |
| pixel_format = (match.group(2) or "").strip().lower() | |
| return { | |
| "codec_name": codec_name, | |
| "pixel_format": pixel_format, | |
| } | |
| def _parse_duration_to_seconds(duration_text: str) -> float: | |
| hours, minutes, seconds = duration_text.split(":") | |
| return int(hours) * 3600 + int(minutes) * 60 + float(seconds) | |
| def _probe_video_timing(video_path: Path) -> Dict[str, float]: | |
| if imageio_ffmpeg is None: | |
| return {} | |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() | |
| result = subprocess.run( | |
| [ffmpeg_exe, "-i", str(video_path)], | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore", | |
| ) | |
| stderr_text = result.stderr or "" | |
| duration_match = re.search(r"Duration:\s*(\d+:\d+:\d+(?:\.\d+)?)", stderr_text) | |
| fps_match = re.search(r"(\d+(?:\.\d+)?)\s+fps", stderr_text) | |
| if fps_match is None: | |
| fps_match = re.search(r"(\d+(?:\.\d+)?)\s+tbr", stderr_text) | |
| metadata: Dict[str, float] = {} | |
| if duration_match: | |
| metadata["duration_seconds"] = _parse_duration_to_seconds(duration_match.group(1)) | |
| if fps_match: | |
| metadata["fps"] = float(fps_match.group(1)) | |
| return metadata | |
| def _format_ffmpeg_fps(value: float) -> str: | |
| rounded = round(value) | |
| if abs(value - rounded) < 1e-6: | |
| return str(int(rounded)) | |
| return f"{value:.3f}".rstrip("0").rstrip(".") | |
| def _sync_single_video_to_duration( | |
| source_path: Path, | |
| target_path: Path, | |
| target_duration: float, | |
| target_fps: float, | |
| ) -> None: | |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() | |
| source_timing = _probe_video_timing(source_path) | |
| source_duration = float(source_timing.get("duration_seconds", 0.0) or 0.0) | |
| pad_duration = max(0.0, target_duration - source_duration) | |
| fps_literal = _format_ffmpeg_fps(target_fps) | |
| filter_graph = ( | |
| f"fps={fps_literal}," | |
| f"tpad=stop_mode=clone:stop_duration={pad_duration:.6f}," | |
| f"trim=duration={target_duration:.6f}," | |
| "setpts=PTS-STARTPTS" | |
| ) | |
| command = [ | |
| ffmpeg_exe, | |
| "-y", | |
| "-i", | |
| str(source_path), | |
| "-an", | |
| "-vf", | |
| filter_graph, | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "veryfast", | |
| "-pix_fmt", | |
| "yuv420p", | |
| "-movflags", | |
| "+faststart", | |
| str(target_path), | |
| ] | |
| result = subprocess.run( | |
| command, | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore", | |
| ) | |
| if result.returncode != 0 or not target_path.exists(): | |
| raise RuntimeError( | |
| f"Failed to create synchronized study video: {source_path}\n{result.stderr}" | |
| ) | |
| def ensure_synchronized_study_videos( | |
| reference_video: str, | |
| left_video: str, | |
| right_video: str, | |
| project_root: Path, | |
| target_fps: float = 30.0, | |
| ) -> Dict[str, str]: | |
| """ | |
| Create browser-playable synchronized copies for the three study videos. | |
| The shorter videos are padded by cloning their last frame so that all three | |
| outputs share the same fps and total duration. If synchronization fails for | |
| any reason, the original paths are returned to keep the study app usable. | |
| """ | |
| raw_paths = { | |
| "reference_video": Path(reference_video).resolve(), | |
| "left_video": Path(left_video).resolve(), | |
| "right_video": Path(right_video).resolve(), | |
| } | |
| if imageio_ffmpeg is None or not all(path.exists() for path in raw_paths.values()): | |
| return {key: str(path) for key, path in raw_paths.items()} | |
| try: | |
| durations = [] | |
| for path in raw_paths.values(): | |
| timing = _probe_video_timing(path) | |
| durations.append(float(timing.get("duration_seconds", 0.0) or 0.0)) | |
| target_duration = max(durations) | |
| if target_duration <= 0: | |
| return {key: str(path) for key, path in raw_paths.items()} | |
| cache_dir = _synced_video_cache_dir(project_root) | |
| signature = hashlib.sha1( | |
| "::".join( | |
| [ | |
| "sync_v1", | |
| f"fps={_format_ffmpeg_fps(target_fps)}", | |
| *( | |
| f"{path.as_posix()}::{path.stat().st_mtime_ns}::{path.stat().st_size}" | |
| for path in raw_paths.values() | |
| ), | |
| ] | |
| ).encode("utf-8") | |
| ).hexdigest()[:16] | |
| trio_dir = cache_dir / signature | |
| lock_path = trio_dir.with_suffix(".lock") | |
| with FileLock(str(lock_path)): | |
| trio_dir.mkdir(parents=True, exist_ok=True) | |
| output_paths = { | |
| "reference_video": trio_dir / "reference.mp4", | |
| "left_video": trio_dir / "left.mp4", | |
| "right_video": trio_dir / "right.mp4", | |
| } | |
| ready = all(path.exists() and path.stat().st_size > 0 for path in output_paths.values()) | |
| if not ready: | |
| for key, source_path in raw_paths.items(): | |
| _sync_single_video_to_duration( | |
| source_path=source_path, | |
| target_path=output_paths[key], | |
| target_duration=target_duration, | |
| target_fps=target_fps, | |
| ) | |
| return {key: str(path) for key, path in output_paths.items()} | |
| except Exception as exc: | |
| print(f"[warn] Falling back to original study videos because sync generation failed: {exc}") | |
| return {key: str(path) for key, path in raw_paths.items()} | |
| def ensure_web_playable_video(video_path: str, project_root: Path) -> str: | |
| source_path = Path(video_path).resolve() | |
| if not source_path.exists() or imageio_ffmpeg is None: | |
| return str(source_path) | |
| stream_info = _probe_video_stream(source_path) | |
| if ( | |
| source_path.suffix.lower() == ".mp4" | |
| and stream_info.get("codec_name") == "h264" | |
| and (not stream_info.get("pixel_format") or stream_info.get("pixel_format") == "yuv420p") | |
| ): | |
| return str(source_path) | |
| cache_dir = _web_video_cache_dir(project_root) | |
| signature = hashlib.sha1( | |
| f"{source_path.as_posix()}::{source_path.stat().st_mtime_ns}::{source_path.stat().st_size}".encode("utf-8") | |
| ).hexdigest()[:12] | |
| target_path = cache_dir / f"{source_path.stem}_{signature}.mp4" | |
| lock_path = target_path.with_suffix(".lock") | |
| with FileLock(str(lock_path)): | |
| if target_path.exists() and target_path.stat().st_size > 0: | |
| return str(target_path) | |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() | |
| command = [ | |
| ffmpeg_exe, | |
| "-y", | |
| "-i", | |
| str(source_path), | |
| "-an", | |
| "-c:v", | |
| "libx264", | |
| "-pix_fmt", | |
| "yuv420p", | |
| "-movflags", | |
| "+faststart", | |
| str(target_path), | |
| ] | |
| result = subprocess.run( | |
| command, | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore", | |
| ) | |
| if result.returncode != 0 or not target_path.exists(): | |
| raise RuntimeError( | |
| f"Failed to convert video for browser playback: {source_path}\n{result.stderr}" | |
| ) | |
| return str(target_path) | |
| def prepare_reference_videos_for_web(config: Dict[str, Any], project_root: Path) -> Dict[str, Any]: | |
| for case in config.get("cases", []): | |
| case["reference_video"] = ensure_web_playable_video(case["reference_video"], project_root) | |
| return config | |
| def ensure_video_thumbnail( | |
| video_path: str, | |
| project_root: Path, | |
| time_seconds: float = 0.8, | |
| width: int = 480, | |
| ) -> str: | |
| source_path = Path(video_path).resolve() | |
| if not source_path.exists() or imageio_ffmpeg is None: | |
| return "" | |
| cache_dir = _thumbnail_cache_dir(project_root) | |
| signature = hashlib.sha1( | |
| f"{source_path.as_posix()}::{source_path.stat().st_mtime_ns}::{source_path.stat().st_size}::{time_seconds}::{width}".encode( | |
| "utf-8" | |
| ) | |
| ).hexdigest()[:12] | |
| target_path = cache_dir / f"{source_path.stem}_{signature}.jpg" | |
| lock_path = target_path.with_suffix(".lock") | |
| with FileLock(str(lock_path)): | |
| if target_path.exists() and target_path.stat().st_size > 0: | |
| return str(target_path) | |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() | |
| command = [ | |
| ffmpeg_exe, | |
| "-y", | |
| "-ss", | |
| str(time_seconds), | |
| "-i", | |
| str(source_path), | |
| "-frames:v", | |
| "1", | |
| "-vf", | |
| f"scale={width}:-1", | |
| "-q:v", | |
| "2", | |
| str(target_path), | |
| ] | |
| result = subprocess.run( | |
| command, | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="ignore", | |
| ) | |
| if result.returncode != 0 or not target_path.exists(): | |
| raise RuntimeError( | |
| f"Failed to extract thumbnail from video: {source_path}\n{result.stderr}" | |
| ) | |
| return str(target_path) | |
| def _resolve_path(config_dir: Path, raw_path: str) -> Path: | |
| path = Path(raw_path) | |
| if path.is_absolute(): | |
| return path | |
| return (config_dir / path).resolve() | |
| def _resolve_single_match(config_dir: Path, directory: str, pattern: str, source_key: str) -> Path: | |
| base_dir = _resolve_path(config_dir, directory) | |
| if not base_dir.exists(): | |
| raise FileNotFoundError(f"Configured directory does not exist: {base_dir}") | |
| resolved_pattern = pattern.format(source_key=source_key) | |
| matches = sorted(base_dir.glob(resolved_pattern)) | |
| if not matches: | |
| raise FileNotFoundError( | |
| f"No video matched pattern '{resolved_pattern}' inside '{base_dir}' for source_key='{source_key}'." | |
| ) | |
| if len(matches) > 1: | |
| match_str = ", ".join(str(match) for match in matches) | |
| raise ValueError( | |
| f"Pattern '{resolved_pattern}' for source_key='{source_key}' matched multiple files: {match_str}" | |
| ) | |
| return matches[0].resolve() | |
| def _normalize_case( | |
| raw_case: Dict[str, Any], | |
| raw_config: Dict[str, Any], | |
| config_dir: Path, | |
| method_ids: List[str], | |
| ) -> Dict[str, Any]: | |
| case_id = raw_case["case_id"] | |
| source_key = raw_case.get("source_key", case_id) | |
| case_title = raw_case.get("title") or humanize_source_key(source_key) | |
| if raw_case.get("reference_video") and raw_case.get("method_videos"): | |
| reference_video = _resolve_path(config_dir, raw_case["reference_video"]).resolve() | |
| method_videos = { | |
| method_id: _resolve_path(config_dir, raw_case["method_videos"][method_id]).resolve() | |
| for method_id in method_ids | |
| } | |
| else: | |
| reference_cfg = raw_config["reference"] | |
| reference_video = _resolve_single_match( | |
| config_dir=config_dir, | |
| directory=reference_cfg["directory"], | |
| pattern=reference_cfg["glob"], | |
| source_key=source_key, | |
| ) | |
| method_videos = {} | |
| for method_id in method_ids: | |
| method_cfg = raw_config["methods"][method_id] | |
| method_videos[method_id] = _resolve_single_match( | |
| config_dir=config_dir, | |
| directory=method_cfg["directory"], | |
| pattern=method_cfg["glob"], | |
| source_key=source_key, | |
| ) | |
| missing_files = [reference_video, *method_videos.values()] | |
| for path in missing_files: | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Missing video file for case '{case_id}': {path}") | |
| return { | |
| "case_id": case_id, | |
| "source_key": source_key, | |
| "case_title": case_title, | |
| "reference_video": str(reference_video), | |
| "method_videos": {method_id: str(path) for method_id, path in method_videos.items()}, | |
| } | |
| def load_study_config(config_path: str | Path) -> Dict[str, Any]: | |
| config_path = Path(config_path).resolve() | |
| config_dir = config_path.parent | |
| with config_path.open("r", encoding="utf-8") as handle: | |
| raw_config = json.load(handle) | |
| if "methods" not in raw_config or "cases" not in raw_config: | |
| raise ValueError("study_config.json must define both 'methods' and 'cases'.") | |
| method_ids = list(raw_config["methods"].keys()) | |
| if set(method_ids) != set(METHOD_FALLBACK_LABELS.keys()): | |
| raise ValueError( | |
| "This sample project expects exactly three methods: anyact, vlm_hy_motion, echomotion." | |
| ) | |
| pair_order = raw_config.get("pair_order", [list(pair) for pair in PAIRWISE_METHOD_PAIRS]) | |
| normalized_pairs: List[Tuple[str, str]] = [] | |
| for raw_pair in pair_order: | |
| if len(raw_pair) != 2: | |
| raise ValueError(f"Each pair_order entry must contain exactly two methods: {raw_pair}") | |
| left, right = raw_pair | |
| if left not in method_ids or right not in method_ids: | |
| raise ValueError(f"Unknown method in pair_order: {raw_pair}") | |
| normalized_pairs.append((left, right)) | |
| methods = {} | |
| for method_id, method_cfg in raw_config["methods"].items(): | |
| methods[method_id] = { | |
| "display_name": method_cfg.get("display_name", METHOD_FALLBACK_LABELS[method_id]), | |
| "directory": method_cfg.get("directory", ""), | |
| "glob": method_cfg.get("glob", ""), | |
| } | |
| cases = [ | |
| _normalize_case( | |
| raw_case=raw_case, | |
| raw_config=raw_config, | |
| config_dir=config_dir, | |
| method_ids=method_ids, | |
| ) | |
| for raw_case in raw_config["cases"] | |
| ] | |
| raw_pair_limits = raw_config.get("per_participant_pair_limits", {}) | |
| pair_sample_limits: Dict[str, int] = {} | |
| for method_a, method_b in normalized_pairs: | |
| pair_id = build_pair_id(method_a, method_b) | |
| raw_limit = raw_pair_limits.get(pair_id, len(cases)) | |
| try: | |
| limit_value = int(raw_limit) | |
| except (TypeError, ValueError) as exc: | |
| raise ValueError(f"Invalid per_participant_pair_limits value for '{pair_id}': {raw_limit}") from exc | |
| if limit_value <= 0 or limit_value > len(cases): | |
| raise ValueError( | |
| f"per_participant_pair_limits['{pair_id}'] must be within [1, {len(cases)}], got {limit_value}." | |
| ) | |
| pair_sample_limits[pair_id] = limit_value | |
| disjoint_case_sampling = bool(raw_config.get("disjoint_case_sampling", False)) | |
| if disjoint_case_sampling and sum(pair_sample_limits.values()) > len(cases): | |
| raise ValueError( | |
| "disjoint_case_sampling=True requires the sum of per-participant pair limits " | |
| f"to be <= number of cases ({len(cases)})." | |
| ) | |
| case_ids = {case["case_id"] for case in cases} | |
| instruction_case_id = raw_config.get("instruction_case_id") | |
| if instruction_case_id and instruction_case_id not in case_ids: | |
| raise ValueError(f"instruction_case_id='{instruction_case_id}' is not present in cases.") | |
| if not instruction_case_id: | |
| instruction_case_id = cases[0]["case_id"] | |
| return { | |
| "study_id": raw_config.get("study_id", "anyact_user_study"), | |
| "study_title": raw_config.get("study_title", "Human Motion Reenactment User Study"), | |
| "question_order": raw_config.get("question_order", "shuffle_per_participant"), | |
| "allow_tie_option": raw_config.get("allow_tie_option", True), | |
| "pair_order": normalized_pairs, | |
| "pair_sample_limits": pair_sample_limits, | |
| "disjoint_case_sampling": disjoint_case_sampling, | |
| "question_bank_total": len(cases) * len(normalized_pairs), | |
| "participant_question_total": sum(pair_sample_limits.values()), | |
| "methods": methods, | |
| "cases": cases, | |
| "instruction_case_id": instruction_case_id, | |
| "config_path": str(config_path), | |
| } | |
| def get_instruction_case(config: Dict[str, Any]) -> Dict[str, Any]: | |
| target_case_id = config["instruction_case_id"] | |
| for case in config["cases"]: | |
| if case["case_id"] == target_case_id: | |
| return case | |
| raise KeyError(f"Instruction case '{target_case_id}' was not found.") | |
| def build_questions(config: Dict[str, Any], participant_id: str) -> List[Dict[str, Any]]: | |
| questions: List[Dict[str, Any]] = [] | |
| cases = list(config["cases"]) | |
| pair_case_assignments: Dict[str, List[Dict[str, Any]]] = {} | |
| if config.get("disjoint_case_sampling"): | |
| shuffled_cases = list(cases) | |
| assignment_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::case_assignment")) | |
| assignment_rng.shuffle(shuffled_cases) | |
| cursor = 0 | |
| for method_a, method_b in config["pair_order"]: | |
| pair_id = build_pair_id(method_a, method_b) | |
| sample_size = config["pair_sample_limits"][pair_id] | |
| selected_cases = shuffled_cases[cursor : cursor + sample_size] | |
| if len(selected_cases) != sample_size: | |
| raise ValueError( | |
| f"Not enough unique cases to assign pair '{pair_id}'. Requested {sample_size}, got {len(selected_cases)}." | |
| ) | |
| pair_case_assignments[pair_id] = selected_cases | |
| cursor += sample_size | |
| else: | |
| for method_a, method_b in config["pair_order"]: | |
| pair_id = build_pair_id(method_a, method_b) | |
| sample_size = config["pair_sample_limits"][pair_id] | |
| pair_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::{pair_id}::sample")) | |
| pair_case_assignments[pair_id] = pair_rng.sample(cases, sample_size) | |
| for method_a, method_b in config["pair_order"]: | |
| pair_id = build_pair_id(method_a, method_b) | |
| for case in pair_case_assignments[pair_id]: | |
| order_rng = random.Random( | |
| stable_int_seed(f"{config['study_id']}::{participant_id}::{case['case_id']}::{method_a}::{method_b}") | |
| ) | |
| result_a_method, result_b_method = (method_a, method_b) | |
| if order_rng.random() < 0.5: | |
| result_a_method, result_b_method = result_b_method, result_a_method | |
| questions.append( | |
| { | |
| "case_id": case["case_id"], | |
| "case_title": case["case_title"], | |
| "source_key": case["source_key"], | |
| "pair_id": pair_id, | |
| "reference_video": case["reference_video"], | |
| "result_a_method": result_a_method, | |
| "result_b_method": result_b_method, | |
| "left_method": result_a_method, | |
| "right_method": result_b_method, | |
| "result_a_video": case["method_videos"][result_a_method], | |
| "result_b_video": case["method_videos"][result_b_method], | |
| "left_video": case["method_videos"][result_a_method], | |
| "right_video": case["method_videos"][result_b_method], | |
| } | |
| ) | |
| if config["question_order"] == "shuffle_per_participant": | |
| shuffle_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::question_order")) | |
| shuffle_rng.shuffle(questions) | |
| total_questions = len(questions) | |
| for index, question in enumerate(questions, start=1): | |
| question["question_number"] = index | |
| question["question_id"] = f"Q{index:03d}_{question['case_id']}_{question['pair_id']}" | |
| question["total_questions"] = total_questions | |
| return questions | |
| def _state_path(project_root: Path, participant_id: str) -> Path: | |
| return get_results_dir(project_root) / "participants" / f"{participant_id}.json" | |
| def _archive_dir(project_root: Path) -> Path: | |
| return get_results_dir(project_root) / "participants_archive" | |
| def _lock_path(project_root: Path) -> Path: | |
| return get_results_dir(project_root) / "locks" / "results.lock" | |
| def _responses_jsonl_path(project_root: Path) -> Path: | |
| return get_results_dir(project_root) / "responses.jsonl" | |
| def _responses_csv_path(project_root: Path) -> Path: | |
| return get_results_dir(project_root) / "responses.csv" | |
| def _read_state_unlocked(project_root: Path, participant_id: str) -> Dict[str, Any] | None: | |
| path = _state_path(project_root, participant_id) | |
| if not path.exists(): | |
| return None | |
| with path.open("r", encoding="utf-8") as handle: | |
| return json.load(handle) | |
| def _atomic_write_json(path: Path, data: Dict[str, Any]) -> None: | |
| temp_path = path.with_suffix(path.suffix + ".tmp") | |
| with temp_path.open("w", encoding="utf-8") as handle: | |
| json.dump(data, handle, ensure_ascii=False, indent=2) | |
| os.replace(temp_path, path) | |
| def _write_state_unlocked(project_root: Path, state: Dict[str, Any]) -> None: | |
| _atomic_write_json(_state_path(project_root, state["participant_id"]), state) | |
| def _archive_state_unlocked(project_root: Path, state: Dict[str, Any]) -> None: | |
| archive_dir = _archive_dir(project_root) | |
| timestamp = re.sub(r"[^0-9A-Za-z_-]", "-", now_iso()) | |
| filename = f"{state.get('participant_id', 'participant')}__{state.get('study_id', 'study')}__{timestamp}.json" | |
| _atomic_write_json(archive_dir / filename, state) | |
| def _append_jsonl_unlocked(project_root: Path, payload: Dict[str, Any]) -> None: | |
| jsonl_path = _responses_jsonl_path(project_root) | |
| with jsonl_path.open("a", encoding="utf-8") as handle: | |
| handle.write(json.dumps(payload, ensure_ascii=False) + "\n") | |
| def _normalize_canonical_row(row: Dict[str, Any]) -> Dict[str, Any]: | |
| upgraded_row = upgrade_response_row_schema(row) | |
| return {column: upgraded_row.get(column, "") for column in CSV_COLUMNS} | |
| def _canonical_row_key(row: Dict[str, Any]) -> Tuple[str, str, str] | None: | |
| participant_id = str(row.get("participant_id", "")).strip() | |
| question_id = str(row.get("question_id", "")).strip() | |
| if not participant_id or not question_id: | |
| return None | |
| study_id = str(row.get("study_id", "")).strip() | |
| return study_id, participant_id, question_id | |
| def _canonical_row_sort_key(row: Dict[str, Any]) -> Tuple[str, str, str]: | |
| return ( | |
| str(row.get("event_saved_at") or row.get("answered_at") or ""), | |
| str(row.get("updated_at") or ""), | |
| str(row.get("answered_at") or ""), | |
| ) | |
| def _merge_canonical_rows(*row_groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| merged_rows: Dict[Tuple[str, str, str], Tuple[Tuple[str, str, str], Dict[str, Any]]] = {} | |
| for rows in row_groups: | |
| for row in rows: | |
| row_key = _canonical_row_key(row) | |
| if row_key is None: | |
| continue | |
| sort_key = _canonical_row_sort_key(row) | |
| previous = merged_rows.get(row_key) | |
| if previous is None or sort_key >= previous[0]: | |
| merged_rows[row_key] = (sort_key, _normalize_canonical_row(row)) | |
| canonical_rows = [payload for _, payload in merged_rows.values()] | |
| canonical_rows.sort( | |
| key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", "")) | |
| ) | |
| return canonical_rows | |
| def _load_canonical_rows_from_jsonl_unlocked(project_root: Path) -> List[Dict[str, Any]]: | |
| jsonl_path = _responses_jsonl_path(project_root) | |
| if not jsonl_path.exists() or jsonl_path.stat().st_size <= 0: | |
| return [] | |
| latest_rows: Dict[Tuple[str, str, str], Tuple[Tuple[str, str, str], Dict[str, Any]]] = {} | |
| with jsonl_path.open("r", encoding="utf-8") as handle: | |
| for line in handle: | |
| if not line.strip(): | |
| continue | |
| try: | |
| record = json.loads(line) | |
| except json.JSONDecodeError: | |
| # A truncated trailing line should not make the whole study unreadable. | |
| continue | |
| row_key = _canonical_row_key(record) | |
| if row_key is None: | |
| continue | |
| sort_key = _canonical_row_sort_key(record) | |
| previous = latest_rows.get(row_key) | |
| if previous is None or sort_key >= previous[0]: | |
| latest_rows[row_key] = (sort_key, _normalize_canonical_row(record)) | |
| rows = [payload for _, payload in latest_rows.values()] | |
| rows.sort(key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", ""))) | |
| return rows | |
| def _load_canonical_rows_from_state_files_unlocked(project_root: Path) -> List[Dict[str, Any]]: | |
| rows: List[Dict[str, Any]] = [] | |
| state_dirs = [ | |
| get_results_dir(project_root) / "participants", | |
| _archive_dir(project_root), | |
| ] | |
| for state_dir in state_dirs: | |
| for state_path in sorted(state_dir.glob("*.json")): | |
| with state_path.open("r", encoding="utf-8") as handle: | |
| state = json.load(handle) | |
| for row in state.get("answers", {}).values(): | |
| rows.append(_normalize_canonical_row(row)) | |
| rows.sort(key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", ""))) | |
| return rows | |
| def _all_canonical_rows_unlocked(project_root: Path) -> List[Dict[str, Any]]: | |
| state_rows = _load_canonical_rows_from_state_files_unlocked(project_root) | |
| jsonl_rows = _load_canonical_rows_from_jsonl_unlocked(project_root) | |
| merged_rows = _merge_canonical_rows(state_rows, jsonl_rows) | |
| if merged_rows: | |
| return merged_rows | |
| return [] | |
| def _export_csv_unlocked(project_root: Path) -> None: | |
| csv_path = _responses_csv_path(project_root) | |
| temp_path = csv_path.with_suffix(".tmp") | |
| rows = _all_canonical_rows_unlocked(project_root) | |
| with temp_path.open("w", newline="", encoding="utf-8") as handle: | |
| writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS) | |
| writer.writeheader() | |
| for row in rows: | |
| writer.writerow(row) | |
| os.replace(temp_path, csv_path) | |
| def get_current_question(state: Dict[str, Any]) -> Dict[str, Any]: | |
| return state["questions"][state["current_index"]] | |
| def question_stable_key(question: Dict[str, Any]) -> str: | |
| return f"{question['case_id']}::{question['pair_id']}" | |
| def refresh_state_video_paths(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: | |
| case_lookup = {case["case_id"]: case for case in config["cases"]} | |
| for question in state.get("questions", []): | |
| case = case_lookup.get(question.get("case_id")) | |
| if not case: | |
| continue | |
| upgrade_response_row_schema(question, case=case) | |
| for answer in state.get("answers", {}).values(): | |
| case = case_lookup.get(answer.get("case_id")) | |
| if not case: | |
| continue | |
| upgrade_response_row_schema(answer, case=case) | |
| return state | |
| def sync_state_with_config(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: | |
| new_questions = build_questions(config=config, participant_id=state["participant_id"]) | |
| old_questions = state.get("questions", []) | |
| old_answers = state.get("answers", {}) | |
| case_lookup = {case["case_id"]: case for case in config["cases"]} | |
| old_current_key = None | |
| if old_questions: | |
| old_index = min(max(int(state.get("current_index", 0)), 0), len(old_questions) - 1) | |
| old_current_key = question_stable_key(old_questions[old_index]) | |
| old_questions_by_key = { | |
| question_stable_key(question): question | |
| for question in old_questions | |
| if question.get("case_id") and question.get("pair_id") | |
| } | |
| old_rows_by_key = { | |
| question_stable_key(answer_row): answer_row | |
| for answer_row in old_answers.values() | |
| if answer_row.get("case_id") and answer_row.get("pair_id") | |
| } | |
| synced_answers: Dict[str, Dict[str, Any]] = {} | |
| for question in new_questions: | |
| stable_key = question_stable_key(question) | |
| case = case_lookup.get(question["case_id"]) | |
| previous_question = old_questions_by_key.get(stable_key) | |
| if previous_question: | |
| question["result_a_method"] = previous_question.get("result_a_method") or previous_question.get("left_method") | |
| question["result_b_method"] = previous_question.get("result_b_method") or previous_question.get("right_method") | |
| _sync_result_slot_fields(question, case=case) | |
| previous_row = old_rows_by_key.get(stable_key) | |
| if not previous_row: | |
| continue | |
| upgraded_row = { | |
| **previous_row, | |
| "study_id": config["study_id"], | |
| "study_title": config["study_title"], | |
| "question_id": question["question_id"], | |
| "question_position": question["question_number"], | |
| "total_questions": question["total_questions"], | |
| "case_id": question["case_id"], | |
| "case_title": question["case_title"], | |
| "source_key": question["source_key"], | |
| "pair_id": question["pair_id"], | |
| "result_a_method": question["result_a_method"], | |
| "result_b_method": question["result_b_method"], | |
| "left_method": question["left_method"], | |
| "right_method": question["right_method"], | |
| "reference_video": question["reference_video"], | |
| "result_a_video": question["result_a_video"], | |
| "result_b_video": question["result_b_video"], | |
| "left_video": question["left_video"], | |
| "right_video": question["right_video"], | |
| } | |
| synced_answers[question["question_id"]] = upgrade_response_row_schema(upgraded_row, case=case) | |
| current_index = 0 | |
| if new_questions: | |
| if old_current_key is not None: | |
| matched_index = next( | |
| (index for index, question in enumerate(new_questions) if question_stable_key(question) == old_current_key), | |
| None, | |
| ) | |
| if matched_index is not None: | |
| current_index = matched_index | |
| first_unanswered_index = next( | |
| ( | |
| index | |
| for index, question in enumerate(new_questions) | |
| if question["question_id"] not in synced_answers | |
| ), | |
| None, | |
| ) | |
| if first_unanswered_index is not None: | |
| current_index = first_unanswered_index | |
| else: | |
| current_index = len(new_questions) - 1 | |
| state["study_id"] = config["study_id"] | |
| state["study_title"] = config["study_title"] | |
| state["questions"] = new_questions | |
| state["answers"] = synced_answers | |
| state["current_index"] = current_index | |
| if new_questions and len(synced_answers) == len(new_questions): | |
| state["completed_at"] = state.get("completed_at") or now_iso() | |
| state["status"] = "completed" | |
| state["current_question_started_at"] = None | |
| else: | |
| state["completed_at"] = None | |
| state["status"] = "in_progress" | |
| state["current_question_started_at"] = time.time() | |
| return refresh_state_video_paths(state, config) | |
| def _upgrade_state_schema(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: | |
| upgraded_state = copy.deepcopy(state) | |
| upgraded_state["study_title"] = config["study_title"] | |
| return refresh_state_video_paths(upgraded_state, config) | |
| def upgrade_existing_results_schema(project_root: Path, config: Dict[str, Any]) -> None: | |
| ensure_runtime_dirs(project_root) | |
| state_dirs = [ | |
| get_results_dir(project_root) / "participants", | |
| _archive_dir(project_root), | |
| ] | |
| with FileLock(str(_lock_path(project_root))): | |
| for state_dir in state_dirs: | |
| for state_path in sorted(state_dir.glob("*.json")): | |
| try: | |
| with state_path.open("r", encoding="utf-8") as handle: | |
| state = json.load(handle) | |
| except json.JSONDecodeError: | |
| continue | |
| upgraded_state = _upgrade_state_schema(state, config) | |
| if upgraded_state != state: | |
| _atomic_write_json(state_path, upgraded_state) | |
| _export_csv_unlocked(project_root) | |
| def create_or_resume_participant( | |
| project_root: Path, | |
| config: Dict[str, Any], | |
| participant_id: str | None, | |
| request: Any = None, | |
| ) -> Tuple[Dict[str, Any], str]: | |
| ensure_runtime_dirs(project_root) | |
| participant_id = sanitize_participant_id(participant_id) | |
| if not participant_id: | |
| participant_id = generate_participant_id() | |
| session_hash = getattr(request, "session_hash", "") if request is not None else "" | |
| user_agent = request.headers.get("user-agent", "") if request is not None and getattr(request, "headers", None) else "" | |
| with FileLock(str(_lock_path(project_root))): | |
| existing_state = _read_state_unlocked(project_root, participant_id) | |
| if existing_state: | |
| if existing_state.get("study_id") != config["study_id"]: | |
| if existing_state.get("answers"): | |
| _archive_state_unlocked(project_root, existing_state) | |
| timestamp = now_iso() | |
| fresh_state = { | |
| "participant_id": participant_id, | |
| "consent": True, | |
| "study_id": config["study_id"], | |
| "study_title": config["study_title"], | |
| "created_at": timestamp, | |
| "started_at": timestamp, | |
| "updated_at": timestamp, | |
| "completed_at": None, | |
| "status": "in_progress", | |
| "session_hash": session_hash, | |
| "user_agent": user_agent, | |
| "current_index": 0, | |
| "current_question_started_at": time.time(), | |
| "questions": build_questions(config=config, participant_id=participant_id), | |
| "answers": {}, | |
| } | |
| _write_state_unlocked(project_root, fresh_state) | |
| return fresh_state, "started" | |
| existing_state = sync_state_with_config(existing_state, config) | |
| if existing_state.get("completed_at"): | |
| existing_state["session_hash"] = session_hash or existing_state.get("session_hash", "") | |
| existing_state["user_agent"] = user_agent or existing_state.get("user_agent", "") | |
| existing_state["updated_at"] = now_iso() | |
| _write_state_unlocked(project_root, existing_state) | |
| return existing_state, "completed" | |
| existing_state["session_hash"] = session_hash or existing_state.get("session_hash", "") | |
| existing_state["user_agent"] = user_agent or existing_state.get("user_agent", "") | |
| existing_state["study_title"] = config["study_title"] | |
| existing_state["updated_at"] = now_iso() | |
| existing_state["current_question_started_at"] = time.time() | |
| _write_state_unlocked(project_root, existing_state) | |
| return existing_state, "resumed" | |
| timestamp = now_iso() | |
| state = { | |
| "participant_id": participant_id, | |
| "consent": True, | |
| "study_id": config["study_id"], | |
| "study_title": config["study_title"], | |
| "created_at": timestamp, | |
| "started_at": timestamp, | |
| "updated_at": timestamp, | |
| "completed_at": None, | |
| "status": "in_progress", | |
| "session_hash": session_hash, | |
| "user_agent": user_agent, | |
| "current_index": 0, | |
| "current_question_started_at": time.time(), | |
| "questions": build_questions(config=config, participant_id=participant_id), | |
| "answers": {}, | |
| } | |
| _write_state_unlocked(project_root, state) | |
| return state, "started" | |
| def move_question_pointer( | |
| project_root: Path, | |
| participant_id: str, | |
| question_token: str | None, | |
| direction: str, | |
| ) -> Tuple[Dict[str, Any], str]: | |
| with FileLock(str(_lock_path(project_root))): | |
| state = _read_state_unlocked(project_root, participant_id) | |
| if state is None: | |
| raise ValueError("Participant session could not be found.") | |
| if state.get("completed_at"): | |
| return state, "This study session has already been submitted." | |
| current_question = get_current_question(state) | |
| if question_token and current_question["question_id"] != question_token: | |
| return state, "A newer page state was already loaded. Restored the latest progress." | |
| if direction == "previous" and state["current_index"] > 0: | |
| state["current_index"] -= 1 | |
| state["current_question_started_at"] = time.time() | |
| state["updated_at"] = now_iso() | |
| _write_state_unlocked(project_root, state) | |
| return state, "" | |
| def _build_response_row( | |
| state: Dict[str, Any], | |
| question: Dict[str, Any], | |
| answer_similarity: str, | |
| answer_quality: str, | |
| answer_preference: str, | |
| duration_seconds: float, | |
| ) -> Dict[str, Any]: | |
| timestamp = now_iso() | |
| response_row = { | |
| "participant_id": state["participant_id"], | |
| "consent": state.get("consent", True), | |
| "study_id": state["study_id"], | |
| "study_title": state["study_title"], | |
| "question_id": question["question_id"], | |
| "question_position": question["question_number"], | |
| "total_questions": question["total_questions"], | |
| "case_id": question["case_id"], | |
| "case_title": question["case_title"], | |
| "source_key": question["source_key"], | |
| "pair_id": question["pair_id"], | |
| "result_a_method": question.get("result_a_method") or question.get("left_method"), | |
| "result_b_method": question.get("result_b_method") or question.get("right_method"), | |
| "left_method": question["left_method"], | |
| "right_method": question["right_method"], | |
| "reference_video": question["reference_video"], | |
| "result_a_video": question.get("result_a_video") or question.get("left_video"), | |
| "result_b_video": question.get("result_b_video") or question.get("right_video"), | |
| "left_video": question["left_video"], | |
| "right_video": question["right_video"], | |
| "answer_similarity": normalize_choice_value(answer_similarity), | |
| "answer_quality": normalize_choice_value(answer_quality), | |
| "answer_preference": normalize_choice_value(answer_preference), | |
| "answered_at": timestamp, | |
| "duration_seconds": round(duration_seconds, 3), | |
| "session_hash": state.get("session_hash", ""), | |
| "user_agent": state.get("user_agent", ""), | |
| "started_at": state.get("started_at", ""), | |
| "updated_at": timestamp, | |
| } | |
| return upgrade_response_row_schema(response_row) | |
| def save_current_answer( | |
| project_root: Path, | |
| participant_id: str, | |
| question_token: str, | |
| answer_similarity: str, | |
| answer_quality: str, | |
| answer_preference: str, | |
| action: str, | |
| ) -> Tuple[Dict[str, Any], str, str]: | |
| if action not in {"next", "submit"}: | |
| raise ValueError(f"Unsupported action: {action}") | |
| with FileLock(str(_lock_path(project_root))): | |
| state = _read_state_unlocked(project_root, participant_id) | |
| if state is None: | |
| raise ValueError("Participant session could not be found.") | |
| if state.get("completed_at"): | |
| return state, "This study session has already been submitted.", "completed" | |
| current_question = get_current_question(state) | |
| if current_question["question_id"] != question_token: | |
| return state, "A newer page state was already loaded. Restored the latest progress.", "stale" | |
| elapsed = max(0.0, time.time() - float(state.get("current_question_started_at") or time.time())) | |
| previous_row = state["answers"].get(question_token) | |
| response_row = _build_response_row( | |
| state=state, | |
| question=current_question, | |
| answer_similarity=answer_similarity, | |
| answer_quality=answer_quality, | |
| answer_preference=answer_preference, | |
| duration_seconds=elapsed, | |
| ) | |
| state["answers"][question_token] = response_row | |
| state["updated_at"] = response_row["answered_at"] | |
| event_type = "answer_updated" if previous_row else "answer_saved" | |
| if action == "next": | |
| if state["current_index"] < len(state["questions"]) - 1: | |
| state["current_index"] += 1 | |
| state["current_question_started_at"] = time.time() | |
| status = "advanced" | |
| message = "Response saved." | |
| else: | |
| state["completed_at"] = response_row["answered_at"] | |
| state["status"] = "completed" | |
| state["current_question_started_at"] = None | |
| status = "completed" | |
| message = "All responses have been submitted." | |
| else: | |
| state["completed_at"] = response_row["answered_at"] | |
| state["status"] = "completed" | |
| state["current_question_started_at"] = None | |
| status = "completed" | |
| message = "All responses have been submitted." | |
| _write_state_unlocked(project_root, state) | |
| _append_jsonl_unlocked( | |
| project_root, | |
| { | |
| "event_type": event_type, | |
| "event_saved_at": response_row["answered_at"], | |
| **response_row, | |
| }, | |
| ) | |
| _export_csv_unlocked(project_root) | |
| return state, message, status | |
| def build_question_payload(state: Dict[str, Any]) -> Dict[str, Any]: | |
| question = get_current_question(state) | |
| saved_answers = state.get("answers", {}).get(question["question_id"], {}) | |
| answered_count = len(state.get("answers", {})) | |
| return { | |
| "question_token": question["question_id"], | |
| "progress_markdown": ( | |
| f"<div class='progress-chip'>Question {question['question_number']} / {question['total_questions']}</div>" | |
| f"<div class='meta-line'>Participant ID: <code>{state['participant_id']}</code></div>" | |
| f"<div class='meta-line'>Saved responses: {answered_count} / {question['total_questions']}</div>" | |
| ), | |
| "instruction_markdown": ( | |
| "Watch the reference clip and both anonymous candidates before answering all three questions." | |
| ), | |
| "reference_video": question["reference_video"], | |
| "result_a_video": question.get("result_a_video") or question["left_video"], | |
| "result_b_video": question.get("result_b_video") or question["right_video"], | |
| "left_video": question["left_video"], | |
| "right_video": question["right_video"], | |
| "answer_similarity": normalize_choice_value(saved_answers.get("answer_similarity")), | |
| "answer_quality": normalize_choice_value(saved_answers.get("answer_quality")), | |
| "answer_preference": normalize_choice_value(saved_answers.get("answer_preference")), | |
| "show_previous": question["question_number"] > 1, | |
| "show_next": question["question_number"] < question["total_questions"], | |
| "show_submit": question["question_number"] == question["total_questions"], | |
| } | |
| def build_completion_markdown(state: Dict[str, Any]) -> str: | |
| completed_at = state.get("completed_at") or now_iso() | |
| total_questions = len(state.get("questions", [])) | |
| answered_count = len(state.get("answers", {})) | |
| return f""" | |
| ## Thank you for completing the study. | |
| Your responses have been saved successfully. | |
| - Participant ID: `{state["participant_id"]}` | |
| - Saved answers: `{answered_count} / {total_questions}` | |
| - Completed at: `{completed_at}` | |
| You may now close this page. | |
| """.strip() | |