anyact-user-study / study_utils.py
csuvla's picture
Upload app code and config
9d416e4 verified
from __future__ import annotations
import csv
import copy
import hashlib
import json
import os
import random
import re
import subprocess
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple
from filelock import FileLock
try:
import imageio_ffmpeg
except ImportError: # pragma: no cover - optional runtime dependency
imageio_ffmpeg = None
METHOD_FALLBACK_LABELS = {
"anyact": "AnyAct (ours)",
"vlm_hy_motion": "VLM+HY-Motion",
"echomotion": "EchoMotion",
}
PAIRWISE_METHOD_PAIRS: List[Tuple[str, str]] = [
("anyact", "vlm_hy_motion"),
("anyact", "echomotion"),
]
CHOICE_OPTIONS = ["ResultA", "ResultB"]
CHOICE_RESULT_A = "ResultA"
CHOICE_RESULT_B = "ResultB"
CHOICE_TIE = "Tie"
CSV_COLUMNS = [
"participant_id",
"consent",
"study_id",
"study_title",
"question_id",
"question_position",
"total_questions",
"case_id",
"case_title",
"source_key",
"pair_id",
"result_a_method",
"result_b_method",
"left_method",
"right_method",
"reference_video",
"result_a_video",
"result_b_video",
"left_video",
"right_video",
"answer_similarity",
"answer_similarity_method",
"answer_similarity_video",
"answer_quality",
"answer_quality_method",
"answer_quality_video",
"answer_preference",
"answer_preference_method",
"answer_preference_video",
"answered_at",
"duration_seconds",
"session_hash",
"user_agent",
"started_at",
"updated_at",
]
def now_iso() -> str:
return datetime.now().astimezone().isoformat(timespec="seconds")
def get_results_dir(project_root: Path) -> Path:
explicit_dir = os.environ.get("USER_STUDY_RESULTS_DIR", "").strip()
if explicit_dir:
return Path(explicit_dir).expanduser().resolve()
if os.environ.get("SPACE_ID"):
space_data_dir = Path("/data")
if space_data_dir.exists():
return (space_data_dir / "user_study_results").resolve()
return (project_root / "results").resolve()
def ensure_runtime_dirs(project_root: Path) -> None:
results_dir = get_results_dir(project_root)
for path in [
results_dir,
results_dir / "participants",
results_dir / "participants_archive",
results_dir / "plots",
results_dir / "locks",
]:
path.mkdir(parents=True, exist_ok=True)
responses_csv = results_dir / "responses.csv"
if not responses_csv.exists():
with responses_csv.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS)
writer.writeheader()
responses_jsonl = results_dir / "responses.jsonl"
responses_jsonl.touch(exist_ok=True)
def generate_participant_id() -> str:
return str(uuid.uuid4())
def sanitize_participant_id(raw_value: str | None) -> str:
cleaned = re.sub(r"[^A-Za-z0-9_-]", "_", (raw_value or "").strip())
return cleaned[:80]
def humanize_source_key(source_key: str) -> str:
return source_key.replace("_", " ").strip().title()
def stable_int_seed(text: str) -> int:
digest = hashlib.sha256(text.encode("utf-8")).hexdigest()
return int(digest[:16], 16)
def build_pair_id(method_a: str, method_b: str) -> str:
return f"{method_a}_vs_{method_b}"
def normalize_choice_value(raw_value: Any) -> str:
if raw_value is None:
return ""
cleaned = str(raw_value).strip()
if not cleaned:
return ""
compact = re.sub(r"[\s_-]+", "", cleaned).lower()
if compact in {"left", "resulta", "a"}:
return CHOICE_RESULT_A
if compact in {"right", "resultb", "b"}:
return CHOICE_RESULT_B
if compact in {"tie", "equal", "same"}:
return CHOICE_TIE
return cleaned
def _sync_result_slot_fields(row: Dict[str, Any], case: Dict[str, Any] | None = None) -> Dict[str, Any]:
result_a_method = str(row.get("result_a_method") or row.get("left_method") or "").strip()
result_b_method = str(row.get("result_b_method") or row.get("right_method") or "").strip()
result_a_video = str(row.get("result_a_video") or row.get("left_video") or "").strip()
result_b_video = str(row.get("result_b_video") or row.get("right_video") or "").strip()
if case is not None:
method_videos = case.get("method_videos", {})
if result_a_method in method_videos:
result_a_video = str(method_videos[result_a_method])
if result_b_method in method_videos:
result_b_video = str(method_videos[result_b_method])
if case.get("reference_video"):
row["reference_video"] = case["reference_video"]
row["result_a_method"] = result_a_method
row["result_b_method"] = result_b_method
row["left_method"] = result_a_method
row["right_method"] = result_b_method
row["result_a_video"] = result_a_video
row["result_b_video"] = result_b_video
row["left_video"] = result_a_video
row["right_video"] = result_b_video
return row
def _resolve_choice_targets(row: Dict[str, Any], raw_choice: Any) -> tuple[str, str]:
normalized_choice = normalize_choice_value(raw_choice)
if normalized_choice == CHOICE_RESULT_A:
return (
str(row.get("result_a_method") or row.get("left_method") or "").strip(),
str(row.get("result_a_video") or row.get("left_video") or "").strip(),
)
if normalized_choice == CHOICE_RESULT_B:
return (
str(row.get("result_b_method") or row.get("right_method") or "").strip(),
str(row.get("result_b_video") or row.get("right_video") or "").strip(),
)
return "", ""
def upgrade_response_row_schema(row: Dict[str, Any], case: Dict[str, Any] | None = None) -> Dict[str, Any]:
upgraded = row
_sync_result_slot_fields(upgraded, case=case)
for metric_key in ["answer_similarity", "answer_quality", "answer_preference"]:
normalized_choice = normalize_choice_value(upgraded.get(metric_key))
if normalized_choice:
upgraded[metric_key] = normalized_choice
elif metric_key not in upgraded:
upgraded[metric_key] = ""
selected_method, selected_video = _resolve_choice_targets(upgraded, upgraded.get(metric_key))
upgraded[f"{metric_key}_method"] = selected_method
upgraded[f"{metric_key}_video"] = selected_video
return upgraded
def _web_video_cache_dir(project_root: Path) -> Path:
cache_dir = get_results_dir(project_root) / "web_video_cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def _thumbnail_cache_dir(project_root: Path) -> Path:
cache_dir = get_results_dir(project_root) / "thumbnail_cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def _synced_video_cache_dir(project_root: Path) -> Path:
cache_dir = get_results_dir(project_root) / "synced_video_cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def _probe_video_stream(video_path: Path) -> Dict[str, str]:
if imageio_ffmpeg is None:
return {}
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
result = subprocess.run(
[ffmpeg_exe, "-i", str(video_path)],
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
stderr_text = result.stderr or ""
match = re.search(r"Video:\s*([^\s,(]+).*?(yuv[a-zA-Z0-9]+)?", stderr_text)
if not match:
return {}
codec_name = (match.group(1) or "").strip().lower()
pixel_format = (match.group(2) or "").strip().lower()
return {
"codec_name": codec_name,
"pixel_format": pixel_format,
}
def _parse_duration_to_seconds(duration_text: str) -> float:
hours, minutes, seconds = duration_text.split(":")
return int(hours) * 3600 + int(minutes) * 60 + float(seconds)
def _probe_video_timing(video_path: Path) -> Dict[str, float]:
if imageio_ffmpeg is None:
return {}
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
result = subprocess.run(
[ffmpeg_exe, "-i", str(video_path)],
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
stderr_text = result.stderr or ""
duration_match = re.search(r"Duration:\s*(\d+:\d+:\d+(?:\.\d+)?)", stderr_text)
fps_match = re.search(r"(\d+(?:\.\d+)?)\s+fps", stderr_text)
if fps_match is None:
fps_match = re.search(r"(\d+(?:\.\d+)?)\s+tbr", stderr_text)
metadata: Dict[str, float] = {}
if duration_match:
metadata["duration_seconds"] = _parse_duration_to_seconds(duration_match.group(1))
if fps_match:
metadata["fps"] = float(fps_match.group(1))
return metadata
def _format_ffmpeg_fps(value: float) -> str:
rounded = round(value)
if abs(value - rounded) < 1e-6:
return str(int(rounded))
return f"{value:.3f}".rstrip("0").rstrip(".")
def _sync_single_video_to_duration(
source_path: Path,
target_path: Path,
target_duration: float,
target_fps: float,
) -> None:
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
source_timing = _probe_video_timing(source_path)
source_duration = float(source_timing.get("duration_seconds", 0.0) or 0.0)
pad_duration = max(0.0, target_duration - source_duration)
fps_literal = _format_ffmpeg_fps(target_fps)
filter_graph = (
f"fps={fps_literal},"
f"tpad=stop_mode=clone:stop_duration={pad_duration:.6f},"
f"trim=duration={target_duration:.6f},"
"setpts=PTS-STARTPTS"
)
command = [
ffmpeg_exe,
"-y",
"-i",
str(source_path),
"-an",
"-vf",
filter_graph,
"-c:v",
"libx264",
"-preset",
"veryfast",
"-pix_fmt",
"yuv420p",
"-movflags",
"+faststart",
str(target_path),
]
result = subprocess.run(
command,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
if result.returncode != 0 or not target_path.exists():
raise RuntimeError(
f"Failed to create synchronized study video: {source_path}\n{result.stderr}"
)
def ensure_synchronized_study_videos(
reference_video: str,
left_video: str,
right_video: str,
project_root: Path,
target_fps: float = 30.0,
) -> Dict[str, str]:
"""
Create browser-playable synchronized copies for the three study videos.
The shorter videos are padded by cloning their last frame so that all three
outputs share the same fps and total duration. If synchronization fails for
any reason, the original paths are returned to keep the study app usable.
"""
raw_paths = {
"reference_video": Path(reference_video).resolve(),
"left_video": Path(left_video).resolve(),
"right_video": Path(right_video).resolve(),
}
if imageio_ffmpeg is None or not all(path.exists() for path in raw_paths.values()):
return {key: str(path) for key, path in raw_paths.items()}
try:
durations = []
for path in raw_paths.values():
timing = _probe_video_timing(path)
durations.append(float(timing.get("duration_seconds", 0.0) or 0.0))
target_duration = max(durations)
if target_duration <= 0:
return {key: str(path) for key, path in raw_paths.items()}
cache_dir = _synced_video_cache_dir(project_root)
signature = hashlib.sha1(
"::".join(
[
"sync_v1",
f"fps={_format_ffmpeg_fps(target_fps)}",
*(
f"{path.as_posix()}::{path.stat().st_mtime_ns}::{path.stat().st_size}"
for path in raw_paths.values()
),
]
).encode("utf-8")
).hexdigest()[:16]
trio_dir = cache_dir / signature
lock_path = trio_dir.with_suffix(".lock")
with FileLock(str(lock_path)):
trio_dir.mkdir(parents=True, exist_ok=True)
output_paths = {
"reference_video": trio_dir / "reference.mp4",
"left_video": trio_dir / "left.mp4",
"right_video": trio_dir / "right.mp4",
}
ready = all(path.exists() and path.stat().st_size > 0 for path in output_paths.values())
if not ready:
for key, source_path in raw_paths.items():
_sync_single_video_to_duration(
source_path=source_path,
target_path=output_paths[key],
target_duration=target_duration,
target_fps=target_fps,
)
return {key: str(path) for key, path in output_paths.items()}
except Exception as exc:
print(f"[warn] Falling back to original study videos because sync generation failed: {exc}")
return {key: str(path) for key, path in raw_paths.items()}
def ensure_web_playable_video(video_path: str, project_root: Path) -> str:
source_path = Path(video_path).resolve()
if not source_path.exists() or imageio_ffmpeg is None:
return str(source_path)
stream_info = _probe_video_stream(source_path)
if (
source_path.suffix.lower() == ".mp4"
and stream_info.get("codec_name") == "h264"
and (not stream_info.get("pixel_format") or stream_info.get("pixel_format") == "yuv420p")
):
return str(source_path)
cache_dir = _web_video_cache_dir(project_root)
signature = hashlib.sha1(
f"{source_path.as_posix()}::{source_path.stat().st_mtime_ns}::{source_path.stat().st_size}".encode("utf-8")
).hexdigest()[:12]
target_path = cache_dir / f"{source_path.stem}_{signature}.mp4"
lock_path = target_path.with_suffix(".lock")
with FileLock(str(lock_path)):
if target_path.exists() and target_path.stat().st_size > 0:
return str(target_path)
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
command = [
ffmpeg_exe,
"-y",
"-i",
str(source_path),
"-an",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-movflags",
"+faststart",
str(target_path),
]
result = subprocess.run(
command,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
if result.returncode != 0 or not target_path.exists():
raise RuntimeError(
f"Failed to convert video for browser playback: {source_path}\n{result.stderr}"
)
return str(target_path)
def prepare_reference_videos_for_web(config: Dict[str, Any], project_root: Path) -> Dict[str, Any]:
for case in config.get("cases", []):
case["reference_video"] = ensure_web_playable_video(case["reference_video"], project_root)
return config
def ensure_video_thumbnail(
video_path: str,
project_root: Path,
time_seconds: float = 0.8,
width: int = 480,
) -> str:
source_path = Path(video_path).resolve()
if not source_path.exists() or imageio_ffmpeg is None:
return ""
cache_dir = _thumbnail_cache_dir(project_root)
signature = hashlib.sha1(
f"{source_path.as_posix()}::{source_path.stat().st_mtime_ns}::{source_path.stat().st_size}::{time_seconds}::{width}".encode(
"utf-8"
)
).hexdigest()[:12]
target_path = cache_dir / f"{source_path.stem}_{signature}.jpg"
lock_path = target_path.with_suffix(".lock")
with FileLock(str(lock_path)):
if target_path.exists() and target_path.stat().st_size > 0:
return str(target_path)
ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe()
command = [
ffmpeg_exe,
"-y",
"-ss",
str(time_seconds),
"-i",
str(source_path),
"-frames:v",
"1",
"-vf",
f"scale={width}:-1",
"-q:v",
"2",
str(target_path),
]
result = subprocess.run(
command,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
if result.returncode != 0 or not target_path.exists():
raise RuntimeError(
f"Failed to extract thumbnail from video: {source_path}\n{result.stderr}"
)
return str(target_path)
def _resolve_path(config_dir: Path, raw_path: str) -> Path:
path = Path(raw_path)
if path.is_absolute():
return path
return (config_dir / path).resolve()
def _resolve_single_match(config_dir: Path, directory: str, pattern: str, source_key: str) -> Path:
base_dir = _resolve_path(config_dir, directory)
if not base_dir.exists():
raise FileNotFoundError(f"Configured directory does not exist: {base_dir}")
resolved_pattern = pattern.format(source_key=source_key)
matches = sorted(base_dir.glob(resolved_pattern))
if not matches:
raise FileNotFoundError(
f"No video matched pattern '{resolved_pattern}' inside '{base_dir}' for source_key='{source_key}'."
)
if len(matches) > 1:
match_str = ", ".join(str(match) for match in matches)
raise ValueError(
f"Pattern '{resolved_pattern}' for source_key='{source_key}' matched multiple files: {match_str}"
)
return matches[0].resolve()
def _normalize_case(
raw_case: Dict[str, Any],
raw_config: Dict[str, Any],
config_dir: Path,
method_ids: List[str],
) -> Dict[str, Any]:
case_id = raw_case["case_id"]
source_key = raw_case.get("source_key", case_id)
case_title = raw_case.get("title") or humanize_source_key(source_key)
if raw_case.get("reference_video") and raw_case.get("method_videos"):
reference_video = _resolve_path(config_dir, raw_case["reference_video"]).resolve()
method_videos = {
method_id: _resolve_path(config_dir, raw_case["method_videos"][method_id]).resolve()
for method_id in method_ids
}
else:
reference_cfg = raw_config["reference"]
reference_video = _resolve_single_match(
config_dir=config_dir,
directory=reference_cfg["directory"],
pattern=reference_cfg["glob"],
source_key=source_key,
)
method_videos = {}
for method_id in method_ids:
method_cfg = raw_config["methods"][method_id]
method_videos[method_id] = _resolve_single_match(
config_dir=config_dir,
directory=method_cfg["directory"],
pattern=method_cfg["glob"],
source_key=source_key,
)
missing_files = [reference_video, *method_videos.values()]
for path in missing_files:
if not path.exists():
raise FileNotFoundError(f"Missing video file for case '{case_id}': {path}")
return {
"case_id": case_id,
"source_key": source_key,
"case_title": case_title,
"reference_video": str(reference_video),
"method_videos": {method_id: str(path) for method_id, path in method_videos.items()},
}
def load_study_config(config_path: str | Path) -> Dict[str, Any]:
config_path = Path(config_path).resolve()
config_dir = config_path.parent
with config_path.open("r", encoding="utf-8") as handle:
raw_config = json.load(handle)
if "methods" not in raw_config or "cases" not in raw_config:
raise ValueError("study_config.json must define both 'methods' and 'cases'.")
method_ids = list(raw_config["methods"].keys())
if set(method_ids) != set(METHOD_FALLBACK_LABELS.keys()):
raise ValueError(
"This sample project expects exactly three methods: anyact, vlm_hy_motion, echomotion."
)
pair_order = raw_config.get("pair_order", [list(pair) for pair in PAIRWISE_METHOD_PAIRS])
normalized_pairs: List[Tuple[str, str]] = []
for raw_pair in pair_order:
if len(raw_pair) != 2:
raise ValueError(f"Each pair_order entry must contain exactly two methods: {raw_pair}")
left, right = raw_pair
if left not in method_ids or right not in method_ids:
raise ValueError(f"Unknown method in pair_order: {raw_pair}")
normalized_pairs.append((left, right))
methods = {}
for method_id, method_cfg in raw_config["methods"].items():
methods[method_id] = {
"display_name": method_cfg.get("display_name", METHOD_FALLBACK_LABELS[method_id]),
"directory": method_cfg.get("directory", ""),
"glob": method_cfg.get("glob", ""),
}
cases = [
_normalize_case(
raw_case=raw_case,
raw_config=raw_config,
config_dir=config_dir,
method_ids=method_ids,
)
for raw_case in raw_config["cases"]
]
raw_pair_limits = raw_config.get("per_participant_pair_limits", {})
pair_sample_limits: Dict[str, int] = {}
for method_a, method_b in normalized_pairs:
pair_id = build_pair_id(method_a, method_b)
raw_limit = raw_pair_limits.get(pair_id, len(cases))
try:
limit_value = int(raw_limit)
except (TypeError, ValueError) as exc:
raise ValueError(f"Invalid per_participant_pair_limits value for '{pair_id}': {raw_limit}") from exc
if limit_value <= 0 or limit_value > len(cases):
raise ValueError(
f"per_participant_pair_limits['{pair_id}'] must be within [1, {len(cases)}], got {limit_value}."
)
pair_sample_limits[pair_id] = limit_value
disjoint_case_sampling = bool(raw_config.get("disjoint_case_sampling", False))
if disjoint_case_sampling and sum(pair_sample_limits.values()) > len(cases):
raise ValueError(
"disjoint_case_sampling=True requires the sum of per-participant pair limits "
f"to be <= number of cases ({len(cases)})."
)
case_ids = {case["case_id"] for case in cases}
instruction_case_id = raw_config.get("instruction_case_id")
if instruction_case_id and instruction_case_id not in case_ids:
raise ValueError(f"instruction_case_id='{instruction_case_id}' is not present in cases.")
if not instruction_case_id:
instruction_case_id = cases[0]["case_id"]
return {
"study_id": raw_config.get("study_id", "anyact_user_study"),
"study_title": raw_config.get("study_title", "Human Motion Reenactment User Study"),
"question_order": raw_config.get("question_order", "shuffle_per_participant"),
"allow_tie_option": raw_config.get("allow_tie_option", True),
"pair_order": normalized_pairs,
"pair_sample_limits": pair_sample_limits,
"disjoint_case_sampling": disjoint_case_sampling,
"question_bank_total": len(cases) * len(normalized_pairs),
"participant_question_total": sum(pair_sample_limits.values()),
"methods": methods,
"cases": cases,
"instruction_case_id": instruction_case_id,
"config_path": str(config_path),
}
def get_instruction_case(config: Dict[str, Any]) -> Dict[str, Any]:
target_case_id = config["instruction_case_id"]
for case in config["cases"]:
if case["case_id"] == target_case_id:
return case
raise KeyError(f"Instruction case '{target_case_id}' was not found.")
def build_questions(config: Dict[str, Any], participant_id: str) -> List[Dict[str, Any]]:
questions: List[Dict[str, Any]] = []
cases = list(config["cases"])
pair_case_assignments: Dict[str, List[Dict[str, Any]]] = {}
if config.get("disjoint_case_sampling"):
shuffled_cases = list(cases)
assignment_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::case_assignment"))
assignment_rng.shuffle(shuffled_cases)
cursor = 0
for method_a, method_b in config["pair_order"]:
pair_id = build_pair_id(method_a, method_b)
sample_size = config["pair_sample_limits"][pair_id]
selected_cases = shuffled_cases[cursor : cursor + sample_size]
if len(selected_cases) != sample_size:
raise ValueError(
f"Not enough unique cases to assign pair '{pair_id}'. Requested {sample_size}, got {len(selected_cases)}."
)
pair_case_assignments[pair_id] = selected_cases
cursor += sample_size
else:
for method_a, method_b in config["pair_order"]:
pair_id = build_pair_id(method_a, method_b)
sample_size = config["pair_sample_limits"][pair_id]
pair_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::{pair_id}::sample"))
pair_case_assignments[pair_id] = pair_rng.sample(cases, sample_size)
for method_a, method_b in config["pair_order"]:
pair_id = build_pair_id(method_a, method_b)
for case in pair_case_assignments[pair_id]:
order_rng = random.Random(
stable_int_seed(f"{config['study_id']}::{participant_id}::{case['case_id']}::{method_a}::{method_b}")
)
result_a_method, result_b_method = (method_a, method_b)
if order_rng.random() < 0.5:
result_a_method, result_b_method = result_b_method, result_a_method
questions.append(
{
"case_id": case["case_id"],
"case_title": case["case_title"],
"source_key": case["source_key"],
"pair_id": pair_id,
"reference_video": case["reference_video"],
"result_a_method": result_a_method,
"result_b_method": result_b_method,
"left_method": result_a_method,
"right_method": result_b_method,
"result_a_video": case["method_videos"][result_a_method],
"result_b_video": case["method_videos"][result_b_method],
"left_video": case["method_videos"][result_a_method],
"right_video": case["method_videos"][result_b_method],
}
)
if config["question_order"] == "shuffle_per_participant":
shuffle_rng = random.Random(stable_int_seed(f"{config['study_id']}::{participant_id}::question_order"))
shuffle_rng.shuffle(questions)
total_questions = len(questions)
for index, question in enumerate(questions, start=1):
question["question_number"] = index
question["question_id"] = f"Q{index:03d}_{question['case_id']}_{question['pair_id']}"
question["total_questions"] = total_questions
return questions
def _state_path(project_root: Path, participant_id: str) -> Path:
return get_results_dir(project_root) / "participants" / f"{participant_id}.json"
def _archive_dir(project_root: Path) -> Path:
return get_results_dir(project_root) / "participants_archive"
def _lock_path(project_root: Path) -> Path:
return get_results_dir(project_root) / "locks" / "results.lock"
def _responses_jsonl_path(project_root: Path) -> Path:
return get_results_dir(project_root) / "responses.jsonl"
def _responses_csv_path(project_root: Path) -> Path:
return get_results_dir(project_root) / "responses.csv"
def _read_state_unlocked(project_root: Path, participant_id: str) -> Dict[str, Any] | None:
path = _state_path(project_root, participant_id)
if not path.exists():
return None
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
def _atomic_write_json(path: Path, data: Dict[str, Any]) -> None:
temp_path = path.with_suffix(path.suffix + ".tmp")
with temp_path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, ensure_ascii=False, indent=2)
os.replace(temp_path, path)
def _write_state_unlocked(project_root: Path, state: Dict[str, Any]) -> None:
_atomic_write_json(_state_path(project_root, state["participant_id"]), state)
def _archive_state_unlocked(project_root: Path, state: Dict[str, Any]) -> None:
archive_dir = _archive_dir(project_root)
timestamp = re.sub(r"[^0-9A-Za-z_-]", "-", now_iso())
filename = f"{state.get('participant_id', 'participant')}__{state.get('study_id', 'study')}__{timestamp}.json"
_atomic_write_json(archive_dir / filename, state)
def _append_jsonl_unlocked(project_root: Path, payload: Dict[str, Any]) -> None:
jsonl_path = _responses_jsonl_path(project_root)
with jsonl_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
def _normalize_canonical_row(row: Dict[str, Any]) -> Dict[str, Any]:
upgraded_row = upgrade_response_row_schema(row)
return {column: upgraded_row.get(column, "") for column in CSV_COLUMNS}
def _canonical_row_key(row: Dict[str, Any]) -> Tuple[str, str, str] | None:
participant_id = str(row.get("participant_id", "")).strip()
question_id = str(row.get("question_id", "")).strip()
if not participant_id or not question_id:
return None
study_id = str(row.get("study_id", "")).strip()
return study_id, participant_id, question_id
def _canonical_row_sort_key(row: Dict[str, Any]) -> Tuple[str, str, str]:
return (
str(row.get("event_saved_at") or row.get("answered_at") or ""),
str(row.get("updated_at") or ""),
str(row.get("answered_at") or ""),
)
def _merge_canonical_rows(*row_groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
merged_rows: Dict[Tuple[str, str, str], Tuple[Tuple[str, str, str], Dict[str, Any]]] = {}
for rows in row_groups:
for row in rows:
row_key = _canonical_row_key(row)
if row_key is None:
continue
sort_key = _canonical_row_sort_key(row)
previous = merged_rows.get(row_key)
if previous is None or sort_key >= previous[0]:
merged_rows[row_key] = (sort_key, _normalize_canonical_row(row))
canonical_rows = [payload for _, payload in merged_rows.values()]
canonical_rows.sort(
key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", ""))
)
return canonical_rows
def _load_canonical_rows_from_jsonl_unlocked(project_root: Path) -> List[Dict[str, Any]]:
jsonl_path = _responses_jsonl_path(project_root)
if not jsonl_path.exists() or jsonl_path.stat().st_size <= 0:
return []
latest_rows: Dict[Tuple[str, str, str], Tuple[Tuple[str, str, str], Dict[str, Any]]] = {}
with jsonl_path.open("r", encoding="utf-8") as handle:
for line in handle:
if not line.strip():
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
# A truncated trailing line should not make the whole study unreadable.
continue
row_key = _canonical_row_key(record)
if row_key is None:
continue
sort_key = _canonical_row_sort_key(record)
previous = latest_rows.get(row_key)
if previous is None or sort_key >= previous[0]:
latest_rows[row_key] = (sort_key, _normalize_canonical_row(record))
rows = [payload for _, payload in latest_rows.values()]
rows.sort(key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", "")))
return rows
def _load_canonical_rows_from_state_files_unlocked(project_root: Path) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
state_dirs = [
get_results_dir(project_root) / "participants",
_archive_dir(project_root),
]
for state_dir in state_dirs:
for state_path in sorted(state_dir.glob("*.json")):
with state_path.open("r", encoding="utf-8") as handle:
state = json.load(handle)
for row in state.get("answers", {}).values():
rows.append(_normalize_canonical_row(row))
rows.sort(key=lambda row: (row.get("answered_at", ""), row.get("participant_id", ""), row.get("question_id", "")))
return rows
def _all_canonical_rows_unlocked(project_root: Path) -> List[Dict[str, Any]]:
state_rows = _load_canonical_rows_from_state_files_unlocked(project_root)
jsonl_rows = _load_canonical_rows_from_jsonl_unlocked(project_root)
merged_rows = _merge_canonical_rows(state_rows, jsonl_rows)
if merged_rows:
return merged_rows
return []
def _export_csv_unlocked(project_root: Path) -> None:
csv_path = _responses_csv_path(project_root)
temp_path = csv_path.with_suffix(".tmp")
rows = _all_canonical_rows_unlocked(project_root)
with temp_path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=CSV_COLUMNS)
writer.writeheader()
for row in rows:
writer.writerow(row)
os.replace(temp_path, csv_path)
def get_current_question(state: Dict[str, Any]) -> Dict[str, Any]:
return state["questions"][state["current_index"]]
def question_stable_key(question: Dict[str, Any]) -> str:
return f"{question['case_id']}::{question['pair_id']}"
def refresh_state_video_paths(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
case_lookup = {case["case_id"]: case for case in config["cases"]}
for question in state.get("questions", []):
case = case_lookup.get(question.get("case_id"))
if not case:
continue
upgrade_response_row_schema(question, case=case)
for answer in state.get("answers", {}).values():
case = case_lookup.get(answer.get("case_id"))
if not case:
continue
upgrade_response_row_schema(answer, case=case)
return state
def sync_state_with_config(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
new_questions = build_questions(config=config, participant_id=state["participant_id"])
old_questions = state.get("questions", [])
old_answers = state.get("answers", {})
case_lookup = {case["case_id"]: case for case in config["cases"]}
old_current_key = None
if old_questions:
old_index = min(max(int(state.get("current_index", 0)), 0), len(old_questions) - 1)
old_current_key = question_stable_key(old_questions[old_index])
old_questions_by_key = {
question_stable_key(question): question
for question in old_questions
if question.get("case_id") and question.get("pair_id")
}
old_rows_by_key = {
question_stable_key(answer_row): answer_row
for answer_row in old_answers.values()
if answer_row.get("case_id") and answer_row.get("pair_id")
}
synced_answers: Dict[str, Dict[str, Any]] = {}
for question in new_questions:
stable_key = question_stable_key(question)
case = case_lookup.get(question["case_id"])
previous_question = old_questions_by_key.get(stable_key)
if previous_question:
question["result_a_method"] = previous_question.get("result_a_method") or previous_question.get("left_method")
question["result_b_method"] = previous_question.get("result_b_method") or previous_question.get("right_method")
_sync_result_slot_fields(question, case=case)
previous_row = old_rows_by_key.get(stable_key)
if not previous_row:
continue
upgraded_row = {
**previous_row,
"study_id": config["study_id"],
"study_title": config["study_title"],
"question_id": question["question_id"],
"question_position": question["question_number"],
"total_questions": question["total_questions"],
"case_id": question["case_id"],
"case_title": question["case_title"],
"source_key": question["source_key"],
"pair_id": question["pair_id"],
"result_a_method": question["result_a_method"],
"result_b_method": question["result_b_method"],
"left_method": question["left_method"],
"right_method": question["right_method"],
"reference_video": question["reference_video"],
"result_a_video": question["result_a_video"],
"result_b_video": question["result_b_video"],
"left_video": question["left_video"],
"right_video": question["right_video"],
}
synced_answers[question["question_id"]] = upgrade_response_row_schema(upgraded_row, case=case)
current_index = 0
if new_questions:
if old_current_key is not None:
matched_index = next(
(index for index, question in enumerate(new_questions) if question_stable_key(question) == old_current_key),
None,
)
if matched_index is not None:
current_index = matched_index
first_unanswered_index = next(
(
index
for index, question in enumerate(new_questions)
if question["question_id"] not in synced_answers
),
None,
)
if first_unanswered_index is not None:
current_index = first_unanswered_index
else:
current_index = len(new_questions) - 1
state["study_id"] = config["study_id"]
state["study_title"] = config["study_title"]
state["questions"] = new_questions
state["answers"] = synced_answers
state["current_index"] = current_index
if new_questions and len(synced_answers) == len(new_questions):
state["completed_at"] = state.get("completed_at") or now_iso()
state["status"] = "completed"
state["current_question_started_at"] = None
else:
state["completed_at"] = None
state["status"] = "in_progress"
state["current_question_started_at"] = time.time()
return refresh_state_video_paths(state, config)
def _upgrade_state_schema(state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
upgraded_state = copy.deepcopy(state)
upgraded_state["study_title"] = config["study_title"]
return refresh_state_video_paths(upgraded_state, config)
def upgrade_existing_results_schema(project_root: Path, config: Dict[str, Any]) -> None:
ensure_runtime_dirs(project_root)
state_dirs = [
get_results_dir(project_root) / "participants",
_archive_dir(project_root),
]
with FileLock(str(_lock_path(project_root))):
for state_dir in state_dirs:
for state_path in sorted(state_dir.glob("*.json")):
try:
with state_path.open("r", encoding="utf-8") as handle:
state = json.load(handle)
except json.JSONDecodeError:
continue
upgraded_state = _upgrade_state_schema(state, config)
if upgraded_state != state:
_atomic_write_json(state_path, upgraded_state)
_export_csv_unlocked(project_root)
def create_or_resume_participant(
project_root: Path,
config: Dict[str, Any],
participant_id: str | None,
request: Any = None,
) -> Tuple[Dict[str, Any], str]:
ensure_runtime_dirs(project_root)
participant_id = sanitize_participant_id(participant_id)
if not participant_id:
participant_id = generate_participant_id()
session_hash = getattr(request, "session_hash", "") if request is not None else ""
user_agent = request.headers.get("user-agent", "") if request is not None and getattr(request, "headers", None) else ""
with FileLock(str(_lock_path(project_root))):
existing_state = _read_state_unlocked(project_root, participant_id)
if existing_state:
if existing_state.get("study_id") != config["study_id"]:
if existing_state.get("answers"):
_archive_state_unlocked(project_root, existing_state)
timestamp = now_iso()
fresh_state = {
"participant_id": participant_id,
"consent": True,
"study_id": config["study_id"],
"study_title": config["study_title"],
"created_at": timestamp,
"started_at": timestamp,
"updated_at": timestamp,
"completed_at": None,
"status": "in_progress",
"session_hash": session_hash,
"user_agent": user_agent,
"current_index": 0,
"current_question_started_at": time.time(),
"questions": build_questions(config=config, participant_id=participant_id),
"answers": {},
}
_write_state_unlocked(project_root, fresh_state)
return fresh_state, "started"
existing_state = sync_state_with_config(existing_state, config)
if existing_state.get("completed_at"):
existing_state["session_hash"] = session_hash or existing_state.get("session_hash", "")
existing_state["user_agent"] = user_agent or existing_state.get("user_agent", "")
existing_state["updated_at"] = now_iso()
_write_state_unlocked(project_root, existing_state)
return existing_state, "completed"
existing_state["session_hash"] = session_hash or existing_state.get("session_hash", "")
existing_state["user_agent"] = user_agent or existing_state.get("user_agent", "")
existing_state["study_title"] = config["study_title"]
existing_state["updated_at"] = now_iso()
existing_state["current_question_started_at"] = time.time()
_write_state_unlocked(project_root, existing_state)
return existing_state, "resumed"
timestamp = now_iso()
state = {
"participant_id": participant_id,
"consent": True,
"study_id": config["study_id"],
"study_title": config["study_title"],
"created_at": timestamp,
"started_at": timestamp,
"updated_at": timestamp,
"completed_at": None,
"status": "in_progress",
"session_hash": session_hash,
"user_agent": user_agent,
"current_index": 0,
"current_question_started_at": time.time(),
"questions": build_questions(config=config, participant_id=participant_id),
"answers": {},
}
_write_state_unlocked(project_root, state)
return state, "started"
def move_question_pointer(
project_root: Path,
participant_id: str,
question_token: str | None,
direction: str,
) -> Tuple[Dict[str, Any], str]:
with FileLock(str(_lock_path(project_root))):
state = _read_state_unlocked(project_root, participant_id)
if state is None:
raise ValueError("Participant session could not be found.")
if state.get("completed_at"):
return state, "This study session has already been submitted."
current_question = get_current_question(state)
if question_token and current_question["question_id"] != question_token:
return state, "A newer page state was already loaded. Restored the latest progress."
if direction == "previous" and state["current_index"] > 0:
state["current_index"] -= 1
state["current_question_started_at"] = time.time()
state["updated_at"] = now_iso()
_write_state_unlocked(project_root, state)
return state, ""
def _build_response_row(
state: Dict[str, Any],
question: Dict[str, Any],
answer_similarity: str,
answer_quality: str,
answer_preference: str,
duration_seconds: float,
) -> Dict[str, Any]:
timestamp = now_iso()
response_row = {
"participant_id": state["participant_id"],
"consent": state.get("consent", True),
"study_id": state["study_id"],
"study_title": state["study_title"],
"question_id": question["question_id"],
"question_position": question["question_number"],
"total_questions": question["total_questions"],
"case_id": question["case_id"],
"case_title": question["case_title"],
"source_key": question["source_key"],
"pair_id": question["pair_id"],
"result_a_method": question.get("result_a_method") or question.get("left_method"),
"result_b_method": question.get("result_b_method") or question.get("right_method"),
"left_method": question["left_method"],
"right_method": question["right_method"],
"reference_video": question["reference_video"],
"result_a_video": question.get("result_a_video") or question.get("left_video"),
"result_b_video": question.get("result_b_video") or question.get("right_video"),
"left_video": question["left_video"],
"right_video": question["right_video"],
"answer_similarity": normalize_choice_value(answer_similarity),
"answer_quality": normalize_choice_value(answer_quality),
"answer_preference": normalize_choice_value(answer_preference),
"answered_at": timestamp,
"duration_seconds": round(duration_seconds, 3),
"session_hash": state.get("session_hash", ""),
"user_agent": state.get("user_agent", ""),
"started_at": state.get("started_at", ""),
"updated_at": timestamp,
}
return upgrade_response_row_schema(response_row)
def save_current_answer(
project_root: Path,
participant_id: str,
question_token: str,
answer_similarity: str,
answer_quality: str,
answer_preference: str,
action: str,
) -> Tuple[Dict[str, Any], str, str]:
if action not in {"next", "submit"}:
raise ValueError(f"Unsupported action: {action}")
with FileLock(str(_lock_path(project_root))):
state = _read_state_unlocked(project_root, participant_id)
if state is None:
raise ValueError("Participant session could not be found.")
if state.get("completed_at"):
return state, "This study session has already been submitted.", "completed"
current_question = get_current_question(state)
if current_question["question_id"] != question_token:
return state, "A newer page state was already loaded. Restored the latest progress.", "stale"
elapsed = max(0.0, time.time() - float(state.get("current_question_started_at") or time.time()))
previous_row = state["answers"].get(question_token)
response_row = _build_response_row(
state=state,
question=current_question,
answer_similarity=answer_similarity,
answer_quality=answer_quality,
answer_preference=answer_preference,
duration_seconds=elapsed,
)
state["answers"][question_token] = response_row
state["updated_at"] = response_row["answered_at"]
event_type = "answer_updated" if previous_row else "answer_saved"
if action == "next":
if state["current_index"] < len(state["questions"]) - 1:
state["current_index"] += 1
state["current_question_started_at"] = time.time()
status = "advanced"
message = "Response saved."
else:
state["completed_at"] = response_row["answered_at"]
state["status"] = "completed"
state["current_question_started_at"] = None
status = "completed"
message = "All responses have been submitted."
else:
state["completed_at"] = response_row["answered_at"]
state["status"] = "completed"
state["current_question_started_at"] = None
status = "completed"
message = "All responses have been submitted."
_write_state_unlocked(project_root, state)
_append_jsonl_unlocked(
project_root,
{
"event_type": event_type,
"event_saved_at": response_row["answered_at"],
**response_row,
},
)
_export_csv_unlocked(project_root)
return state, message, status
def build_question_payload(state: Dict[str, Any]) -> Dict[str, Any]:
question = get_current_question(state)
saved_answers = state.get("answers", {}).get(question["question_id"], {})
answered_count = len(state.get("answers", {}))
return {
"question_token": question["question_id"],
"progress_markdown": (
f"<div class='progress-chip'>Question {question['question_number']} / {question['total_questions']}</div>"
f"<div class='meta-line'>Participant ID: <code>{state['participant_id']}</code></div>"
f"<div class='meta-line'>Saved responses: {answered_count} / {question['total_questions']}</div>"
),
"instruction_markdown": (
"Watch the reference clip and both anonymous candidates before answering all three questions."
),
"reference_video": question["reference_video"],
"result_a_video": question.get("result_a_video") or question["left_video"],
"result_b_video": question.get("result_b_video") or question["right_video"],
"left_video": question["left_video"],
"right_video": question["right_video"],
"answer_similarity": normalize_choice_value(saved_answers.get("answer_similarity")),
"answer_quality": normalize_choice_value(saved_answers.get("answer_quality")),
"answer_preference": normalize_choice_value(saved_answers.get("answer_preference")),
"show_previous": question["question_number"] > 1,
"show_next": question["question_number"] < question["total_questions"],
"show_submit": question["question_number"] == question["total_questions"],
}
def build_completion_markdown(state: Dict[str, Any]) -> str:
completed_at = state.get("completed_at") or now_iso()
total_questions = len(state.get("questions", []))
answered_count = len(state.get("answers", {}))
return f"""
## Thank you for completing the study.
Your responses have been saved successfully.
- Participant ID: `{state["participant_id"]}`
- Saved answers: `{answered_count} / {total_questions}`
- Completed at: `{completed_at}`
You may now close this page.
""".strip()