from __future__ import annotations import csv import copy import hashlib import json import os import random import re import subprocess import time import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Tuple from filelock import FileLock try: import imageio_ffmpeg except ImportError: # pragma: no cover - optional runtime dependency imageio_ffmpeg = None METHOD_FALLBACK_LABELS = { "anyact": "AnyAct (ours)", "vlm_hy_motion": "VLM+HY-Motion", "echomotion": "EchoMotion", } PAIRWISE_METHOD_PAIRS: List[Tuple[str, str]] = [ ("anyact", "vlm_hy_motion"), ("anyact", "echomotion"), ] CHOICE_OPTIONS = ["ResultA", "ResultB"] CHOICE_RESULT_A = "ResultA" CHOICE_RESULT_B = "ResultB" CHOICE_TIE = "Tie" CSV_COLUMNS = [ "participant_id", "consent", "study_id", "study_title", "question_id", "question_position", "total_questions", "case_id", "case_title", "source_key", "pair_id", "result_a_method", "result_b_method", "left_method", "right_method", "reference_video", "result_a_video", "result_b_video", "left_video", "right_video", "answer_similarity", "answer_similarity_method", "answer_similarity_video", "answer_quality", "answer_quality_method", "answer_quality_video", "answer_preference", "answer_preference_method", "answer_preference_video", "answered_at", "duration_seconds", "session_hash", "user_agent", "started_at", "updated_at", ] def now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def get_results_dir(project_root: Path) -> Path: explicit_dir = os.environ.get("USER_STUDY_RESULTS_DIR", "").strip() if explicit_dir: return Path(explicit_dir).expanduser().resolve() if os.environ.get("SPACE_ID"): space_data_dir = Path("/data") if space_data_dir.exists(): return (space_data_dir / "user_study_results").resolve() return (project_root / "results").resolve() def ensure_runtime_dirs(project_root: Path) -> None: results_dir = get_results_dir(project_root) for path in [ results_dir, results_dir / "participants", results_dir / "participants_archive", results_dir / "plots", results_dir / "locks", ]: path.mkdir(parents=True, exist_ok=True) responses_csv = results_dir / "responses.csv" if not responses_csv.exists(): with responses_csv.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS) writer.writeheader() responses_jsonl = results_dir / "responses.jsonl" responses_jsonl.touch(exist_ok=True) def generate_participant_id() -> str: return str(uuid.uuid4()) def sanitize_participant_id(raw_value: str | None) -> str: cleaned = re.sub(r"[^A-Za-z0-9_-]", "_", (raw_value or "").strip()) return cleaned[:80] def humanize_source_key(source_key: str) -> str: return source_key.replace("_", " ").strip().title() def stable_int_seed(text: str) -> int: digest = hashlib.sha256(text.encode("utf-8")).hexdigest() return int(digest[:16], 16) def build_pair_id(method_a: str, method_b: str) -> str: return f"{method_a}_vs_{method_b}" def normalize_choice_value(raw_value: Any) -> str: if raw_value is None: return "" cleaned = str(raw_value).strip() if not cleaned: return "" compact = re.sub(r"[\s_-]+", "", cleaned).lower() if compact in {"left", "resulta", "a"}: return CHOICE_RESULT_A if compact in {"right", "resultb", "b"}: return CHOICE_RESULT_B if compact in {"tie", "equal", "same"}: return CHOICE_TIE return cleaned def _sync_result_slot_fields(row: Dict[str, Any], case: Dict[str, Any] | None = None) -> Dict[str, Any]: result_a_method = str(row.get("result_a_method") or row.get("left_method") or "").strip() result_b_method = str(row.get("result_b_method") or row.get("right_method") or "").strip() result_a_video = str(row.get("result_a_video") or row.get("left_video") or "").strip() result_b_video = str(row.get("result_b_video") or row.get("right_video") or "").strip() if case is not None: method_videos = case.get("method_videos", {}) if result_a_method in method_videos: result_a_video = str(method_videos[result_a_method]) if result_b_method in method_videos: result_b_video = str(method_videos[result_b_method]) if case.get("reference_video"): row["reference_video"] = case["reference_video"] row["result_a_method"] = result_a_method row["result_b_method"] = result_b_method row["left_method"] = result_a_method row["right_method"] = result_b_method row["result_a_video"] = result_a_video row["result_b_video"] = result_b_video row["left_video"] = result_a_video row["right_video"] = result_b_video return row def _resolve_choice_targets(row: Dict[str, Any], raw_choice: Any) -> tuple[str, str]: normalized_choice = normalize_choice_value(raw_choice) if normalized_choice == CHOICE_RESULT_A: return ( str(row.get("result_a_method") or row.get("left_method") or "").strip(), str(row.get("result_a_video") or row.get("left_video") or "").strip(), ) if normalized_choice == CHOICE_RESULT_B: return ( str(row.get("result_b_method") or row.get("right_method") or "").strip(), str(row.get("result_b_video") or row.get("right_video") or "").strip(), ) return "", "" def upgrade_response_row_schema(row: Dict[str, Any], case: Dict[str, Any] | None = None) -> Dict[str, Any]: upgraded = row _sync_result_slot_fields(upgraded, case=case) for metric_key in ["answer_similarity", "answer_quality", "answer_preference"]: normalized_choice = normalize_choice_value(upgraded.get(metric_key)) if normalized_choice: upgraded[metric_key] = normalized_choice elif metric_key not in upgraded: upgraded[metric_key] = "" selected_method, selected_video = _resolve_choice_targets(upgraded, upgraded.get(metric_key)) upgraded[f"{metric_key}_method"] = selected_method upgraded[f"{metric_key}_video"] = selected_video return upgraded def _web_video_cache_dir(project_root: Path) -> Path: cache_dir = get_results_dir(project_root) / "web_video_cache" cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir def _thumbnail_cache_dir(project_root: Path) -> Path: cache_dir = get_results_dir(project_root) / "thumbnail_cache" cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir def _synced_video_cache_dir(project_root: Path) -> Path: cache_dir = get_results_dir(project_root) / "synced_video_cache" cache_dir.mkdir(parents=True, exist_ok=True) return cache_dir def _probe_video_stream(video_path: Path) -> Dict[str, str]: if imageio_ffmpeg is None: return {} ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() result = subprocess.run( [ffmpeg_exe, "-i", str(video_path)], capture_output=True, text=True, encoding="utf-8", errors="ignore", ) stderr_text = result.stderr or "" match = re.search(r"Video:\s*([^\s,(]+).*?(yuv[a-zA-Z0-9]+)?", stderr_text) if not match: return {} codec_name = (match.group(1) or "").strip().lower() pixel_format = (match.group(2) or "").strip().lower() return { "codec_name": codec_name, "pixel_format": pixel_format, } def _parse_duration_to_seconds(duration_text: str) -> float: hours, minutes, seconds = duration_text.split(":") return int(hours) * 3600 + int(minutes) * 60 + float(seconds) def _probe_video_timing(video_path: Path) -> Dict[str, float]: if imageio_ffmpeg is None: return {} ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() result = subprocess.run( [ffmpeg_exe, "-i", str(video_path)], capture_output=True, text=True, encoding="utf-8", errors="ignore", ) stderr_text = result.stderr or "" duration_match = re.search(r"Duration:\s*(\d+:\d+:\d+(?:\.\d+)?)", stderr_text) fps_match = re.search(r"(\d+(?:\.\d+)?)\s+fps", stderr_text) if fps_match is None: fps_match = re.search(r"(\d+(?:\.\d+)?)\s+tbr", stderr_text) metadata: Dict[str, float] = {} if duration_match: metadata["duration_seconds"] = _parse_duration_to_seconds(duration_match.group(1)) if fps_match: metadata["fps"] = float(fps_match.group(1)) return metadata def _format_ffmpeg_fps(value: float) -> str: rounded = round(value) if abs(value - rounded) < 1e-6: return str(int(rounded)) return f"{value:.3f}".rstrip("0").rstrip(".") def _sync_single_video_to_duration( source_path: Path, target_path: Path, target_duration: float, target_fps: float, ) -> None: ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() source_timing = _probe_video_timing(source_path) source_duration = float(source_timing.get("duration_seconds", 0.0) or 0.0) pad_duration = max(0.0, target_duration - source_duration) fps_literal = _format_ffmpeg_fps(target_fps) filter_graph = ( f"fps={fps_literal}," f"tpad=stop_mode=clone:stop_duration={pad_duration:.6f}," f"trim=duration={target_duration:.6f}," "setpts=PTS-STARTPTS" ) command = [ ffmpeg_exe, "-y", "-i", str(source_path), "-an", "-vf", filter_graph, "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(target_path), ] result = subprocess.run( command, capture_output=True, text=True, encoding="utf-8", errors="ignore", ) if result.returncode != 0 or not target_path.exists(): raise RuntimeError( f"Failed to create synchronized study video: {source_path}\n{result.stderr}" ) def ensure_synchronized_study_videos( reference_video: str, left_video: str, right_video: str, project_root: Path, target_fps: float = 30.0, ) -> Dict[str, str]: """ Create browser-playable synchronized copies for the three study videos. The shorter videos are padded by cloning their last frame so that all three outputs share the same fps and total duration. If synchronization fails for any reason, the original paths are returned to keep the study app usable. """ raw_paths = { "reference_video": Path(reference_video).resolve(), "left_video": Path(left_video).resolve(), "right_video": Path(right_video).resolve(), } if imageio_ffmpeg is None or not all(path.exists() for path in raw_paths.values()): return {key: str(path) for key, path in raw_paths.items()} try: durations = [] for path in raw_paths.values(): timing = _probe_video_timing(path) durations.append(float(timing.get("duration_seconds", 0.0) or 0.0)) target_duration = max(durations) if target_duration <= 0: return {key: str(path) for key, path in raw_paths.items()} cache_dir = _synced_video_cache_dir(project_root) signature = hashlib.sha1( "::".join( [ "sync_v1", f"fps={_format_ffmpeg_fps(target_fps)}", *( f"{path.as_posix()}::{path.stat().st_mtime_ns}::{path.stat().st_size}" for path in raw_paths.values() ), ] ).encode("utf-8") ).hexdigest()[:16] trio_dir = cache_dir / signature lock_path = trio_dir.with_suffix(".lock") with FileLock(str(lock_path)): trio_dir.mkdir(parents=True, exist_ok=True) output_paths = { "reference_video": trio_dir / "reference.mp4", "left_video": trio_dir / "left.mp4", "right_video": trio_dir / "right.mp4", } ready = all(path.exists() and path.stat().st_size > 0 for path in output_paths.values()) if not ready: for key, source_path in raw_paths.items(): _sync_single_video_to_duration( source_path=source_path, target_path=output_paths[key], target_duration=target_duration, target_fps=target_fps, ) return {key: str(path) for key, path in output_paths.items()} except Exception as exc: print(f"[warn] Falling back to original study videos because sync generation failed: {exc}") return {key: str(path) for key, path in raw_paths.items()} def ensure_web_playable_video(video_path: str, project_root: Path) -> str: source_path = Path(video_path).resolve() if not source_path.exists() or imageio_ffmpeg is None: return str(source_path) stream_info = _probe_video_stream(source_path) if ( source_path.suffix.lower() == ".mp4" and stream_info.get("codec_name") == "h264" and (not stream_info.get("pixel_format") or stream_info.get("pixel_format") == "yuv420p") ): return str(source_path) cache_dir = _web_video_cache_dir(project_root) signature = hashlib.sha1( f"{source_path.as_posix()}::{source_path.stat().st_mtime_ns}::{source_path.stat().st_size}".encode("utf-8") ).hexdigest()[:12] target_path = cache_dir / f"{source_path.stem}_{signature}.mp4" lock_path = target_path.with_suffix(".lock") with FileLock(str(lock_path)): if target_path.exists() and target_path.stat().st_size > 0: return str(target_path) ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() command = [ ffmpeg_exe, "-y", "-i", str(source_path), "-an", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(target_path), ] result = subprocess.run( command, capture_output=True, text=True, encoding="utf-8", errors="ignore", ) if result.returncode != 0 or not target_path.exists(): raise RuntimeError( f"Failed to convert video for browser playback: {source_path}\n{result.stderr}" ) return str(target_path) def prepare_reference_videos_for_web(config: Dict[str, Any], project_root: Path) -> Dict[str, Any]: for case in config.get("cases", []): case["reference_video"] = ensure_web_playable_video(case["reference_video"], project_root) return config def ensure_video_thumbnail( video_path: str, project_root: Path, time_seconds: float = 0.8, width: int = 480, ) -> str: source_path = Path(video_path).resolve() if not source_path.exists() or imageio_ffmpeg is None: return "" cache_dir = _thumbnail_cache_dir(project_root) signature = hashlib.sha1( f"{source_path.as_posix()}::{source_path.stat().st_mtime_ns}::{source_path.stat().st_size}::{time_seconds}::{width}".encode( "utf-8" ) ).hexdigest()[:12] target_path = cache_dir / f"{source_path.stem}_{signature}.jpg" lock_path = target_path.with_suffix(".lock") with FileLock(str(lock_path)): if target_path.exists() and target_path.stat().st_size > 0: return str(target_path) ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() command = [ ffmpeg_exe, "-y", "-ss", str(time_seconds), "-i", str(source_path), "-frames:v", "1", "-vf", f"scale={width}:-1", "-q:v", "2", str(target_path), ] result = subprocess.run( command, capture_output=True, text=True, encoding="utf-8", errors="ignore", ) if result.returncode != 0 or not target_path.exists(): raise RuntimeError( f"Failed to extract thumbnail from video: {source_path}\n{result.stderr}" ) return str(target_path) def _resolve_path(config_dir: Path, raw_path: str) -> Path: path = Path(raw_path) if path.is_absolute(): return path return (config_dir / path).resolve() def _resolve_single_match(config_dir: Path, directory: str, pattern: str, source_key: str) -> Path: base_dir = _resolve_path(config_dir, directory) if not base_dir.exists(): raise FileNotFoundError(f"Configured directory does not exist: {base_dir}") resolved_pattern = pattern.format(source_key=source_key) matches = sorted(base_dir.glob(resolved_pattern)) if not matches: raise FileNotFoundError( f"No video matched pattern '{resolved_pattern}' inside '{base_dir}' for source_key='{source_key}'." ) if len(matches) > 1: match_str = ", ".join(str(match) for match in matches) raise ValueError( f"Pattern '{resolved_pattern}' for source_key='{source_key}' matched multiple files: {match_str}" ) return matches[0].resolve() def _normalize_case( raw_case: Dict[str, Any], raw_config: Dict[str, Any], config_dir: Path, method_ids: List[str], ) -> Dict[str, Any]: case_id = raw_case["case_id"] source_key = raw_case.get("source_key", case_id) case_title = raw_case.get("title") or humanize_source_key(source_key) if raw_case.get("reference_video") and raw_case.get("method_videos"): reference_video = _resolve_path(config_dir, raw_case["reference_video"]).resolve() method_videos = { method_id: _resolve_path(config_dir, raw_case["method_videos"][method_id]).resolve() for method_id in method_ids } else: reference_cfg = raw_config["reference"] reference_video = _resolve_single_match( config_dir=config_dir, directory=reference_cfg["directory"], pattern=reference_cfg["glob"], source_key=source_key, ) method_videos = {} for method_id in method_ids: method_cfg = raw_config["methods"][method_id] method_videos[method_id] = _resolve_single_match( config_dir=config_dir, directory=method_cfg["directory"], pattern=method_cfg["glob"], source_key=source_key, ) missing_files = [reference_video, *method_videos.values()] for path in missing_files: if not path.exists(): raise FileNotFoundError(f"Missing video file for case '{case_id}': {path}") return { "case_id": case_id, "source_key": source_key, "case_title": case_title, "reference_video": str(reference_video), "method_videos": {method_id: str(path) for method_id, path in method_videos.items()}, } def load_study_config(config_path: str | Path) -> Dict[str, Any]: config_path = Path(config_path).resolve() config_dir = config_path.parent with config_path.open("r", encoding="utf-8") as handle: raw_config = json.load(handle) if "methods" not in raw_config or "cases" not in raw_config: raise ValueError("study_config.json must define both 'methods' and 'cases'.") method_ids = list(raw_config["methods"].keys()) if set(method_ids) != set(METHOD_FALLBACK_LABELS.keys()): raise ValueError( "This sample project expects exactly three methods: anyact, vlm_hy_motion, echomotion." ) pair_order = raw_config.get("pair_order", [list(pair) for pair in PAIRWISE_METHOD_PAIRS]) normalized_pairs: List[Tuple[str, str]] = [] for raw_pair in pair_order: if len(raw_pair) != 2: raise ValueError(f"Each pair_order entry must contain exactly two methods: {raw_pair}") left, right = raw_pair if left not in method_ids or right not in method_ids: raise ValueError(f"Unknown method in pair_order: {raw_pair}") normalized_pairs.append((left, right)) methods = {} for method_id, method_cfg in raw_config["methods"].items(): methods[method_id] = { "display_name": method_cfg.get("display_name", METHOD_FALLBACK_LABELS[method_id]), "directory": method_cfg.get("directory", ""), "glob": method_cfg.get("glob", ""), } cases = [ _normalize_case( raw_case=raw_case, raw_config=raw_config, config_dir=config_dir, method_ids=method_ids, ) for raw_case in raw_config["cases"] ] raw_pair_limits = raw_config.get("per_participant_pair_limits", {}) pair_sample_limits: Dict[str, int] = {} for method_a, method_b in normalized_pairs: pair_id = build_pair_id(method_a, method_b) raw_limit = raw_pair_limits.get(pair_id, len(cases)) try: limit_value = int(raw_limit) except (TypeError, ValueError) as exc: raise ValueError(f"Invalid per_participant_pair_limits value for '{pair_id}': {raw_limit}") from exc if limit_value <= 0 or limit_value > len(cases): raise ValueError( f"per_participant_pair_limits['{pair_id}'] must be within [1, {len(cases)}], got {limit_value}." ) pair_sample_limits[pair_id] = limit_value disjoint_case_sampling = bool(raw_config.get("disjoint_case_sampling", False)) if disjoint_case_sampling and sum(pair_sample_limits.values()) > len(cases): raise ValueError( "disjoint_case_sampling=True requires the sum of per-participant pair limits " f"to be <= number of cases ({len(cases)})." ) case_ids = {case["case_id"] for case in cases} instruction_case_id = raw_config.get("instruction_case_id") if instruction_case_id and instruction_case_id not in case_ids: raise ValueError(f"instruction_case_id='{instruction_case_id}' is not present in cases.") if not instruction_case_id: instruction_case_id = cases[0]["case_id"] return { "study_id": raw_config.get("study_id", "anyact_user_study"), "study_title": raw_config.get("study_title", "Human Motion Reenactment User Study"), "question_order": raw_config.get("question_order", "shuffle_per_participant"), "allow_tie_option": raw_config.get("allow_tie_option", True), "pair_order": normalized_pairs, "pair_sample_limits": pair_sample_limits, "disjoint_case_sampling": disjoint_case_sampling, "question_bank_total": len(cases) * len(normalized_pairs), "participant_question_total": sum(pair_sample_limits.values()), "methods": methods, "cases": cases, "instruction_case_id": instruction_case_id, "config_path": str(config_path), } def get_instruction_case(config: Dict[str, Any]) -> Dict[str, Any]: target_case_id = config["instruction_case_id"] for case in config["cases"]: if case["case_id"] == target_case_id: return case raise KeyError(f"Instruction case '{target_case_id}' was not found.") def build_questions(config: Dict[str, Any], participant_id: str) -> List[Dict[str, Any]]: questions: List[Dict[str, Any]] = [] cases = list(config["cases"]) pair_case_assignments: Dict[str, List[Dict[str, Any]]] = {} if config.get("disjoint_case_sampling"): shuffled_cases = list(cases) assignment_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::case_assignment")) assignment_rng.shuffle(shuffled_cases) cursor = 0 for method_a, method_b in config["pair_order"]: pair_id = build_pair_id(method_a, method_b) sample_size = config["pair_sample_limits"][pair_id] selected_cases = shuffled_cases[cursor : cursor + sample_size] if len(selected_cases) != sample_size: raise ValueError( f"Not enough unique cases to assign pair '{pair_id}'. Requested {sample_size}, got {len(selected_cases)}." ) pair_case_assignments[pair_id] = selected_cases cursor += sample_size else: for method_a, method_b in config["pair_order"]: pair_id = build_pair_id(method_a, method_b) sample_size = config["pair_sample_limits"][pair_id] pair_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::{pair_id}::sample")) pair_case_assignments[pair_id] = pair_rng.sample(cases, sample_size) for method_a, method_b in config["pair_order"]: pair_id = build_pair_id(method_a, method_b) for case in pair_case_assignments[pair_id]: order_rng = random.Random( stable_int_seed(f"{config['study_id']}::{participant_id}::{case['case_id']}::{method_a}::{method_b}") ) result_a_method, result_b_method = (method_a, method_b) if order_rng.random() < 0.5: result_a_method, result_b_method = result_b_method, result_a_method questions.append( { "case_id": case["case_id"], "case_title": case["case_title"], "source_key": case["source_key"], "pair_id": pair_id, "reference_video": case["reference_video"], "result_a_method": result_a_method, "result_b_method": result_b_method, "left_method": result_a_method, "right_method": result_b_method, "result_a_video": case["method_videos"][result_a_method], "result_b_video": case["method_videos"][result_b_method], "left_video": case["method_videos"][result_a_method], "right_video": case["method_videos"][result_b_method], } ) if config["question_order"] == "shuffle_per_participant": shuffle_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::question_order")) shuffle_rng.shuffle(questions) total_questions = len(questions) for index, question in enumerate(questions, start=1): question["question_number"] = index question["question_id"] = f"Q{index:03d}_{question['case_id']}_{question['pair_id']}" question["total_questions"] = total_questions return questions def _state_path(project_root: Path, participant_id: str) -> Path: return get_results_dir(project_root) / "participants" / f"{participant_id}.json" def _archive_dir(project_root: Path) -> Path: return get_results_dir(project_root) / "participants_archive" def _lock_path(project_root: Path) -> Path: return get_results_dir(project_root) / "locks" / "results.lock" def _responses_jsonl_path(project_root: Path) -> Path: return get_results_dir(project_root) / "responses.jsonl" def _responses_csv_path(project_root: Path) -> Path: return get_results_dir(project_root) / "responses.csv" def _read_state_unlocked(project_root: Path, participant_id: str) -> Dict[str, Any] | None: path = _state_path(project_root, participant_id) if not path.exists(): return None with path.open("r", encoding="utf-8") as handle: return json.load(handle) def _atomic_write_json(path: Path, data: Dict[str, Any]) -> None: temp_path = path.with_suffix(path.suffix + ".tmp") with temp_path.open("w", encoding="utf-8") as handle: json.dump(data, handle, ensure_ascii=False, indent=2) os.replace(temp_path, path) def _write_state_unlocked(project_root: Path, state: Dict[str, Any]) -> None: _atomic_write_json(_state_path(project_root, state["participant_id"]), state) def _archive_state_unlocked(project_root: Path, state: Dict[str, Any]) -> None: archive_dir = _archive_dir(project_root) timestamp = re.sub(r"[^0-9A-Za-z_-]", "-", now_iso()) filename = f"{state.get('participant_id', 'participant')}__{state.get('study_id', 'study')}__{timestamp}.json" _atomic_write_json(archive_dir / filename, state) def _append_jsonl_unlocked(project_root: Path, payload: Dict[str, Any]) -> None: jsonl_path = _responses_jsonl_path(project_root) with jsonl_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, ensure_ascii=False) + "\n") def _normalize_canonical_row(row: Dict[str, Any]) -> Dict[str, Any]: upgraded_row = upgrade_response_row_schema(row) return {column: upgraded_row.get(column, "") for column in CSV_COLUMNS} def _canonical_row_key(row: Dict[str, Any]) -> Tuple[str, str, str] | None: participant_id = str(row.get("participant_id", "")).strip() question_id = str(row.get("question_id", "")).strip() if not participant_id or not question_id: return None study_id = str(row.get("study_id", "")).strip() return study_id, participant_id, question_id def _canonical_row_sort_key(row: Dict[str, Any]) -> Tuple[str, str, str]: return ( str(row.get("event_saved_at") or row.get("answered_at") or ""), str(row.get("updated_at") or ""), str(row.get("answered_at") or ""), ) def _merge_canonical_rows(*row_groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: merged_rows: Dict[Tuple[str, str, str], Tuple[Tuple[str, str, str], Dict[str, Any]]] = {} for rows in row_groups: for row in rows: row_key = _canonical_row_key(row) if row_key is None: continue sort_key = _canonical_row_sort_key(row) previous = merged_rows.get(row_key) if previous is None or sort_key >= previous[0]: merged_rows[row_key] = (sort_key, _normalize_canonical_row(row)) canonical_rows = [payload for _, payload in merged_rows.values()] canonical_rows.sort( key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", "")) ) return canonical_rows def _load_canonical_rows_from_jsonl_unlocked(project_root: Path) -> List[Dict[str, Any]]: jsonl_path = _responses_jsonl_path(project_root) if not jsonl_path.exists() or jsonl_path.stat().st_size <= 0: return [] latest_rows: Dict[Tuple[str, str, str], Tuple[Tuple[str, str, str], Dict[str, Any]]] = {} with jsonl_path.open("r", encoding="utf-8") as handle: for line in handle: if not line.strip(): continue try: record = json.loads(line) except json.JSONDecodeError: # A truncated trailing line should not make the whole study unreadable. continue row_key = _canonical_row_key(record) if row_key is None: continue sort_key = _canonical_row_sort_key(record) previous = latest_rows.get(row_key) if previous is None or sort_key >= previous[0]: latest_rows[row_key] = (sort_key, _normalize_canonical_row(record)) rows = [payload for _, payload in latest_rows.values()] rows.sort(key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", ""))) return rows def _load_canonical_rows_from_state_files_unlocked(project_root: Path) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] state_dirs = [ get_results_dir(project_root) / "participants", _archive_dir(project_root), ] for state_dir in state_dirs: for state_path in sorted(state_dir.glob("*.json")): with state_path.open("r", encoding="utf-8") as handle: state = json.load(handle) for row in state.get("answers", {}).values(): rows.append(_normalize_canonical_row(row)) rows.sort(key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", ""))) return rows def _all_canonical_rows_unlocked(project_root: Path) -> List[Dict[str, Any]]: state_rows = _load_canonical_rows_from_state_files_unlocked(project_root) jsonl_rows = _load_canonical_rows_from_jsonl_unlocked(project_root) merged_rows = _merge_canonical_rows(state_rows, jsonl_rows) if merged_rows: return merged_rows return [] def _export_csv_unlocked(project_root: Path) -> None: csv_path = _responses_csv_path(project_root) temp_path = csv_path.with_suffix(".tmp") rows = _all_canonical_rows_unlocked(project_root) with temp_path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS) writer.writeheader() for row in rows: writer.writerow(row) os.replace(temp_path, csv_path) def get_current_question(state: Dict[str, Any]) -> Dict[str, Any]: return state["questions"][state["current_index"]] def question_stable_key(question: Dict[str, Any]) -> str: return f"{question['case_id']}::{question['pair_id']}" def refresh_state_video_paths(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: case_lookup = {case["case_id"]: case for case in config["cases"]} for question in state.get("questions", []): case = case_lookup.get(question.get("case_id")) if not case: continue upgrade_response_row_schema(question, case=case) for answer in state.get("answers", {}).values(): case = case_lookup.get(answer.get("case_id")) if not case: continue upgrade_response_row_schema(answer, case=case) return state def sync_state_with_config(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: new_questions = build_questions(config=config, participant_id=state["participant_id"]) old_questions = state.get("questions", []) old_answers = state.get("answers", {}) case_lookup = {case["case_id"]: case for case in config["cases"]} old_current_key = None if old_questions: old_index = min(max(int(state.get("current_index", 0)), 0), len(old_questions) - 1) old_current_key = question_stable_key(old_questions[old_index]) old_questions_by_key = { question_stable_key(question): question for question in old_questions if question.get("case_id") and question.get("pair_id") } old_rows_by_key = { question_stable_key(answer_row): answer_row for answer_row in old_answers.values() if answer_row.get("case_id") and answer_row.get("pair_id") } synced_answers: Dict[str, Dict[str, Any]] = {} for question in new_questions: stable_key = question_stable_key(question) case = case_lookup.get(question["case_id"]) previous_question = old_questions_by_key.get(stable_key) if previous_question: question["result_a_method"] = previous_question.get("result_a_method") or previous_question.get("left_method") question["result_b_method"] = previous_question.get("result_b_method") or previous_question.get("right_method") _sync_result_slot_fields(question, case=case) previous_row = old_rows_by_key.get(stable_key) if not previous_row: continue upgraded_row = { **previous_row, "study_id": config["study_id"], "study_title": config["study_title"], "question_id": question["question_id"], "question_position": question["question_number"], "total_questions": question["total_questions"], "case_id": question["case_id"], "case_title": question["case_title"], "source_key": question["source_key"], "pair_id": question["pair_id"], "result_a_method": question["result_a_method"], "result_b_method": question["result_b_method"], "left_method": question["left_method"], "right_method": question["right_method"], "reference_video": question["reference_video"], "result_a_video": question["result_a_video"], "result_b_video": question["result_b_video"], "left_video": question["left_video"], "right_video": question["right_video"], } synced_answers[question["question_id"]] = upgrade_response_row_schema(upgraded_row, case=case) current_index = 0 if new_questions: if old_current_key is not None: matched_index = next( (index for index, question in enumerate(new_questions) if question_stable_key(question) == old_current_key), None, ) if matched_index is not None: current_index = matched_index first_unanswered_index = next( ( index for index, question in enumerate(new_questions) if question["question_id"] not in synced_answers ), None, ) if first_unanswered_index is not None: current_index = first_unanswered_index else: current_index = len(new_questions) - 1 state["study_id"] = config["study_id"] state["study_title"] = config["study_title"] state["questions"] = new_questions state["answers"] = synced_answers state["current_index"] = current_index if new_questions and len(synced_answers) == len(new_questions): state["completed_at"] = state.get("completed_at") or now_iso() state["status"] = "completed" state["current_question_started_at"] = None else: state["completed_at"] = None state["status"] = "in_progress" state["current_question_started_at"] = time.time() return refresh_state_video_paths(state, config) def _upgrade_state_schema(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: upgraded_state = copy.deepcopy(state) upgraded_state["study_title"] = config["study_title"] return refresh_state_video_paths(upgraded_state, config) def upgrade_existing_results_schema(project_root: Path, config: Dict[str, Any]) -> None: ensure_runtime_dirs(project_root) state_dirs = [ get_results_dir(project_root) / "participants", _archive_dir(project_root), ] with FileLock(str(_lock_path(project_root))): for state_dir in state_dirs: for state_path in sorted(state_dir.glob("*.json")): try: with state_path.open("r", encoding="utf-8") as handle: state = json.load(handle) except json.JSONDecodeError: continue upgraded_state = _upgrade_state_schema(state, config) if upgraded_state != state: _atomic_write_json(state_path, upgraded_state) _export_csv_unlocked(project_root) def create_or_resume_participant( project_root: Path, config: Dict[str, Any], participant_id: str | None, request: Any = None, ) -> Tuple[Dict[str, Any], str]: ensure_runtime_dirs(project_root) participant_id = sanitize_participant_id(participant_id) if not participant_id: participant_id = generate_participant_id() session_hash = getattr(request, "session_hash", "") if request is not None else "" user_agent = request.headers.get("user-agent", "") if request is not None and getattr(request, "headers", None) else "" with FileLock(str(_lock_path(project_root))): existing_state = _read_state_unlocked(project_root, participant_id) if existing_state: if existing_state.get("study_id") != config["study_id"]: if existing_state.get("answers"): _archive_state_unlocked(project_root, existing_state) timestamp = now_iso() fresh_state = { "participant_id": participant_id, "consent": True, "study_id": config["study_id"], "study_title": config["study_title"], "created_at": timestamp, "started_at": timestamp, "updated_at": timestamp, "completed_at": None, "status": "in_progress", "session_hash": session_hash, "user_agent": user_agent, "current_index": 0, "current_question_started_at": time.time(), "questions": build_questions(config=config, participant_id=participant_id), "answers": {}, } _write_state_unlocked(project_root, fresh_state) return fresh_state, "started" existing_state = sync_state_with_config(existing_state, config) if existing_state.get("completed_at"): existing_state["session_hash"] = session_hash or existing_state.get("session_hash", "") existing_state["user_agent"] = user_agent or existing_state.get("user_agent", "") existing_state["updated_at"] = now_iso() _write_state_unlocked(project_root, existing_state) return existing_state, "completed" existing_state["session_hash"] = session_hash or existing_state.get("session_hash", "") existing_state["user_agent"] = user_agent or existing_state.get("user_agent", "") existing_state["study_title"] = config["study_title"] existing_state["updated_at"] = now_iso() existing_state["current_question_started_at"] = time.time() _write_state_unlocked(project_root, existing_state) return existing_state, "resumed" timestamp = now_iso() state = { "participant_id": participant_id, "consent": True, "study_id": config["study_id"], "study_title": config["study_title"], "created_at": timestamp, "started_at": timestamp, "updated_at": timestamp, "completed_at": None, "status": "in_progress", "session_hash": session_hash, "user_agent": user_agent, "current_index": 0, "current_question_started_at": time.time(), "questions": build_questions(config=config, participant_id=participant_id), "answers": {}, } _write_state_unlocked(project_root, state) return state, "started" def move_question_pointer( project_root: Path, participant_id: str, question_token: str | None, direction: str, ) -> Tuple[Dict[str, Any], str]: with FileLock(str(_lock_path(project_root))): state = _read_state_unlocked(project_root, participant_id) if state is None: raise ValueError("Participant session could not be found.") if state.get("completed_at"): return state, "This study session has already been submitted." current_question = get_current_question(state) if question_token and current_question["question_id"] != question_token: return state, "A newer page state was already loaded. Restored the latest progress." if direction == "previous" and state["current_index"] > 0: state["current_index"] -= 1 state["current_question_started_at"] = time.time() state["updated_at"] = now_iso() _write_state_unlocked(project_root, state) return state, "" def _build_response_row( state: Dict[str, Any], question: Dict[str, Any], answer_similarity: str, answer_quality: str, answer_preference: str, duration_seconds: float, ) -> Dict[str, Any]: timestamp = now_iso() response_row = { "participant_id": state["participant_id"], "consent": state.get("consent", True), "study_id": state["study_id"], "study_title": state["study_title"], "question_id": question["question_id"], "question_position": question["question_number"], "total_questions": question["total_questions"], "case_id": question["case_id"], "case_title": question["case_title"], "source_key": question["source_key"], "pair_id": question["pair_id"], "result_a_method": question.get("result_a_method") or question.get("left_method"), "result_b_method": question.get("result_b_method") or question.get("right_method"), "left_method": question["left_method"], "right_method": question["right_method"], "reference_video": question["reference_video"], "result_a_video": question.get("result_a_video") or question.get("left_video"), "result_b_video": question.get("result_b_video") or question.get("right_video"), "left_video": question["left_video"], "right_video": question["right_video"], "answer_similarity": normalize_choice_value(answer_similarity), "answer_quality": normalize_choice_value(answer_quality), "answer_preference": normalize_choice_value(answer_preference), "answered_at": timestamp, "duration_seconds": round(duration_seconds, 3), "session_hash": state.get("session_hash", ""), "user_agent": state.get("user_agent", ""), "started_at": state.get("started_at", ""), "updated_at": timestamp, } return upgrade_response_row_schema(response_row) def save_current_answer( project_root: Path, participant_id: str, question_token: str, answer_similarity: str, answer_quality: str, answer_preference: str, action: str, ) -> Tuple[Dict[str, Any], str, str]: if action not in {"next", "submit"}: raise ValueError(f"Unsupported action: {action}") with FileLock(str(_lock_path(project_root))): state = _read_state_unlocked(project_root, participant_id) if state is None: raise ValueError("Participant session could not be found.") if state.get("completed_at"): return state, "This study session has already been submitted.", "completed" current_question = get_current_question(state) if current_question["question_id"] != question_token: return state, "A newer page state was already loaded. Restored the latest progress.", "stale" elapsed = max(0.0, time.time() - float(state.get("current_question_started_at") or time.time())) previous_row = state["answers"].get(question_token) response_row = _build_response_row( state=state, question=current_question, answer_similarity=answer_similarity, answer_quality=answer_quality, answer_preference=answer_preference, duration_seconds=elapsed, ) state["answers"][question_token] = response_row state["updated_at"] = response_row["answered_at"] event_type = "answer_updated" if previous_row else "answer_saved" if action == "next": if state["current_index"] < len(state["questions"]) - 1: state["current_index"] += 1 state["current_question_started_at"] = time.time() status = "advanced" message = "Response saved." else: state["completed_at"] = response_row["answered_at"] state["status"] = "completed" state["current_question_started_at"] = None status = "completed" message = "All responses have been submitted." else: state["completed_at"] = response_row["answered_at"] state["status"] = "completed" state["current_question_started_at"] = None status = "completed" message = "All responses have been submitted." _write_state_unlocked(project_root, state) _append_jsonl_unlocked( project_root, { "event_type": event_type, "event_saved_at": response_row["answered_at"], **response_row, }, ) _export_csv_unlocked(project_root) return state, message, status def build_question_payload(state: Dict[str, Any]) -> Dict[str, Any]: question = get_current_question(state) saved_answers = state.get("answers", {}).get(question["question_id"], {}) answered_count = len(state.get("answers", {})) return { "question_token": question["question_id"], "progress_markdown": ( f"