ehejin commited on
Commit
45b2cda
Β·
1 Parent(s): b2d734b

synced w/ personalized user study

Browse files
Files changed (6) hide show
  1. src/app.py +99 -99
  2. src/config.py +7 -3
  3. src/data.py +180 -19
  4. src/model.py +4 -8
  5. src/upload.py +7 -0
  6. study_config.yaml +9 -8
src/app.py CHANGED
@@ -34,45 +34,69 @@ def _init_submodule() -> None:
34
  raise RuntimeError("GH_TOKEN secret is not set.")
35
 
36
  import shutil
 
 
 
37
 
38
- # Aggressively remove any partial/corrupt lsp directory
39
  if _LSP_PATH.exists():
40
  shutil.rmtree(str(_LSP_PATH), ignore_errors=True)
41
- # Also nuke any leftover .git/modules/lsp entry
42
  git_modules = _BASE / ".git" / "modules" / "lsp"
43
  if git_modules.exists():
44
  shutil.rmtree(str(git_modules), ignore_errors=True)
45
 
46
- clone_url = f"https://ehejin:{token}@github.com/batu-el/lsp.git"
 
 
 
 
 
47
 
48
  for attempt in range(1, 4):
49
- print(f"[SUBMODULE] clone attempt {attempt}/3 ...")
50
- # Remove any partial clone from previous attempt
51
- if _LSP_PATH.exists():
52
- shutil.rmtree(str(_LSP_PATH), ignore_errors=True)
53
- result = subprocess.run(
54
- [
55
- "git", "clone",
56
- "--branch", "0412_train",
57
- "--depth", "1",
58
- clone_url,
59
- str(_LSP_PATH),
60
- ],
61
- capture_output=True, text=True,
62
- )
63
- print(f"[SUBMODULE] returncode: {result.returncode}")
64
- if result.stderr:
65
- # Scrub token from log
66
- print(f"[SUBMODULE] stderr: {result.stderr.replace(token, '***')}")
67
- if result.returncode == 0 and (_LSP_PATH / "src" / "prompts").exists():
68
- print("[SUBMODULE] clone succeeded.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  break
70
- print(f"[SUBMODULE] attempt {attempt} failed, retrying...")
 
 
 
71
  else:
72
- raise RuntimeError(
73
- f"Failed to clone lsp after 3 attempts. "
74
- f"Last stderr: {result.stderr.replace(token, '***')}"
75
- )
76
 
77
  lsp_src = str(_LSP_PATH / "src")
78
  if lsp_src not in sys.path:
@@ -83,6 +107,23 @@ def _init_submodule() -> None:
83
 
84
  _init_submodule()
85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  # ---------------------------------------------------------------------------
87
  # 2. App imports (only after submodule is initialised)
88
  # ---------------------------------------------------------------------------
@@ -108,15 +149,14 @@ from src.ui.screens_preference import screen_pair_intro
108
  # 3. Admin dashboard β€” visit ?admin=1
109
  # ---------------------------------------------------------------------------
110
  def _screen_admin(cfg: dict) -> None:
111
- """
112
- Coverage dashboard β€” visit ?admin=1 to see this.
113
- Always scans the HF repo directly β€” ignores local completions cache
114
- so the count reflects real accepted submissions only.
115
- """
116
  from src.data import (
117
- _load_pool, _pool_path, _data_dir,
118
- _load_reservations, _expire_reservations,
 
 
119
  )
 
120
 
121
  st.markdown("## πŸ“Š Study Coverage Dashboard")
122
  st.caption(
@@ -126,79 +166,43 @@ def _screen_admin(cfg: dict) -> None:
126
  )
127
 
128
  if st.button("πŸ”„ Refresh", type="primary"):
 
 
 
 
 
 
 
129
  st.rerun()
130
 
131
- hf_token = cfg.get("hf_token", "")
132
- output_repo = cfg.get("output_dataset_repo", "")
 
 
 
 
 
133
 
134
  for cat_cfg in cfg["categories"]:
135
- cat = cat_cfg["name"]
136
- pool = _load_pool(str(_pool_path(cat, cfg)))
137
  total = len(pool)
138
 
139
- # ── Scan HF directly (no cache) ──────────────────────────────────────
140
- hf_counts = {str(i): 0 for i in range(total)}
141
- n_json = 0
142
- if hf_token and output_repo:
143
- try:
144
- from huggingface_hub import HfApi
145
- api = HfApi(token=hf_token)
146
- files = list(api.list_repo_files(repo_id=output_repo, repo_type="dataset"))
147
- json_files = [f for f in files if f.startswith("json/") and f.endswith(".json")]
148
- n_json = len(json_files)
149
- # Build a pair_id β†’ pool_index lookup for fallback matching
150
- id_to_index = {}
151
- for i, p in enumerate(pool):
152
- pid = p.get("pair_id") or p.get("item_id", "")
153
- if pid:
154
- id_to_index[pid] = i
155
-
156
- for filepath in json_files:
157
- try:
158
- content = api.hf_hub_download(
159
- repo_id=output_repo,
160
- filename=filepath,
161
- repo_type="dataset",
162
- token=hf_token,
163
- )
164
- with open(content) as f:
165
- submission = json.load(f)
166
- for item in submission.get("items", []):
167
- if item.get("category") != cat:
168
- continue
169
- # Use _pool_index if present (new submissions),
170
- # fall back to pair_id/item_id matching (old submissions)
171
- idx = item.get("_pool_index")
172
- if idx is None:
173
- pid = item.get("pair_id") or item.get("item_id", "")
174
- idx = id_to_index.get(pid)
175
- if idx is not None:
176
- hf_counts[str(idx)] = hf_counts.get(str(idx), 0) + 1
177
- except Exception as e:
178
- st.warning(f"Could not parse {filepath}: {e}")
179
- except Exception as e:
180
- st.error(f"Could not scan HF repo: {e}")
181
 
182
- # ── Reservations (active in-progress users) ───────────────────────────
183
- reservations = _load_reservations(cfg)
184
- _expire_reservations(reservations)
185
  reserved_uncovered = sum(
186
- 1 for k, v in reservations.items()
187
- if hf_counts.get(k, 0) == 0
188
  )
189
-
190
- covered = sum(1 for v in hf_counts.values() if v >= 1)
191
- uncovered = total - covered
192
- truly_uncovered = uncovered - reserved_uncovered
193
 
194
  st.markdown(f"### {cat.capitalize()}")
195
- st.caption(f"{n_json} submission file(s) in HF repo")
196
-
197
  col1, col2, col3, col4 = st.columns(4)
198
  col1.metric("Total items", total)
199
  col2.metric("Covered βœ…", covered)
200
  col3.metric("In progress πŸ”„", reserved_uncovered,
201
- help="Reserved by active users β€” will complete soon")
202
  col4.metric("Still needed ⚠️", truly_uncovered,
203
  delta=f"-{truly_uncovered}" if truly_uncovered > 0 else None,
204
  delta_color="inverse")
@@ -206,15 +210,11 @@ def _screen_admin(cfg: dict) -> None:
206
  if truly_uncovered == 0 and reserved_uncovered == 0:
207
  st.success(f"βœ… All {total} items covered!")
208
  elif truly_uncovered == 0:
209
- st.info(
210
- f"πŸ”„ {reserved_uncovered} item(s) in progress β€” "
211
- f"waiting for active participants to finish."
212
- )
213
  else:
214
  st.warning(
215
  f"⚠️ {truly_uncovered} item(s) still need a participant. "
216
- f"Send more Prolific slots or wait for in-progress "
217
- f"reservations to expire (up to 80 min)."
218
  )
219
 
220
  st.markdown("---")
 
34
  raise RuntimeError("GH_TOKEN secret is not set.")
35
 
36
  import shutil
37
+ import tarfile
38
+ import urllib.request
39
+ import time as _time
40
 
41
+ # Clean any stale state
42
  if _LSP_PATH.exists():
43
  shutil.rmtree(str(_LSP_PATH), ignore_errors=True)
 
44
  git_modules = _BASE / ".git" / "modules" / "lsp"
45
  if git_modules.exists():
46
  shutil.rmtree(str(git_modules), ignore_errors=True)
47
 
48
+ # GitHub serves a tarball of any branch/tag/SHA at this URL.
49
+ # Pinned to a specific commit SHA so future lsp changes don't break us.
50
+ branch = "a71506e3b1fa74fa3427f8ab674fa68420ca42da"
51
+ tarball_url = f"https://api.github.com/repos/batu-el/lsp/tarball/{branch}"
52
+ tmp_tar = Path("/tmp/lsp.tar.gz")
53
+ tmp_extract = Path("/tmp/lsp_extract")
54
 
55
  for attempt in range(1, 4):
56
+ print(f"[SUBMODULE] tarball download attempt {attempt}/3 ...")
57
+ try:
58
+ req = urllib.request.Request(
59
+ tarball_url,
60
+ headers={
61
+ "Authorization": f"Bearer {token}",
62
+ "Accept": "application/vnd.github+json",
63
+ "User-Agent": "prolific-preferences",
64
+ },
65
+ )
66
+ with urllib.request.urlopen(req, timeout=60) as resp:
67
+ tmp_tar.write_bytes(resp.read())
68
+ print(f"[SUBMODULE] downloaded {tmp_tar.stat().st_size} bytes")
69
+
70
+ # Extract
71
+ if tmp_extract.exists():
72
+ shutil.rmtree(str(tmp_extract), ignore_errors=True)
73
+ tmp_extract.mkdir(parents=True)
74
+ with tarfile.open(str(tmp_tar)) as tar:
75
+ tar.extractall(str(tmp_extract))
76
+
77
+ # GitHub tarballs have a top-level dir like batu-el-lsp-abc123/
78
+ subdirs = [d for d in tmp_extract.iterdir() if d.is_dir()]
79
+ if not subdirs:
80
+ raise RuntimeError("tarball had no top-level directory")
81
+ top = subdirs[0]
82
+
83
+ # Verify the prompts dir is present
84
+ if not (top / "src" / "prompts").exists():
85
+ raise RuntimeError(f"src/prompts not found in extracted tarball at {top}")
86
+
87
+ # Move extracted contents to /app/lsp
88
+ shutil.copytree(str(top), str(_LSP_PATH))
89
+ tmp_tar.unlink(missing_ok=True)
90
+ shutil.rmtree(str(tmp_extract), ignore_errors=True)
91
+
92
+ print("[SUBMODULE] ready.")
93
  break
94
+ except Exception as e:
95
+ msg = str(e).replace(token, "***") if token else str(e)
96
+ print(f"[SUBMODULE] attempt {attempt} failed: {msg}")
97
+ _time.sleep(3)
98
  else:
99
+ raise RuntimeError(f"Failed to download lsp tarball after 3 attempts.")
 
 
 
100
 
101
  lsp_src = str(_LSP_PATH / "src")
102
  if lsp_src not in sys.path:
 
107
 
108
  _init_submodule()
109
 
110
+ # Wipe stale local state ONLY on the first container load (not on every Streamlit rerun).
111
+ # We use a marker file β€” once created, subsequent imports skip the wipe.
112
+ # Completions stay durable in HF; we re-scan HF fresh after wipe.
113
+ _data_root = _BASE / "data"
114
+ _data_root.mkdir(parents=True, exist_ok=True)
115
+ _wipe_marker = _data_root / ".startup_wiped"
116
+ if not _wipe_marker.exists():
117
+ for pattern in ("reservations.json", "local_completions_*.json", "completion_cache_*.json"):
118
+ for f in _data_root.glob(pattern):
119
+ try:
120
+ f.unlink()
121
+ print(f"[STARTUP] Wiped stale file: {f.name}")
122
+ except Exception as e:
123
+ print(f"[STARTUP] Could not wipe {f.name}: {e}")
124
+ _wipe_marker.touch()
125
+ print("[STARTUP] Marked container as wiped")
126
+
127
  # ---------------------------------------------------------------------------
128
  # 2. App imports (only after submodule is initialised)
129
  # ---------------------------------------------------------------------------
 
149
  # 3. Admin dashboard β€” visit ?admin=1
150
  # ---------------------------------------------------------------------------
151
  def _screen_admin(cfg: dict) -> None:
152
+ """Coverage dashboard β€” visit ?admin=1 to see this."""
 
 
 
 
153
  from src.data import (
154
+ _get_accepted_counts, _load_pool, _pool_path,
155
+ _load_reservations, _save_reservations,
156
+ _expire_reservations, _release_returned_reservations,
157
+ _reservation_lock_path,
158
  )
159
+ from filelock import FileLock
160
 
161
  st.markdown("## πŸ“Š Study Coverage Dashboard")
162
  st.caption(
 
166
  )
167
 
168
  if st.button("πŸ”„ Refresh", type="primary"):
169
+ # Invalidate caches so we re-scan HF and re-poll Prolific
170
+ from src.data import _data_dir
171
+ for f in _data_dir(cfg).glob("completion_cache*"):
172
+ f.unlink()
173
+ prolific_cache = _data_dir(cfg) / "prolific_returned_cache.json"
174
+ if prolific_cache.exists():
175
+ prolific_cache.unlink()
176
  st.rerun()
177
 
178
+ # Release expired + returned/timed-out reservations before displaying
179
+ lock = FileLock(str(_reservation_lock_path(cfg)), timeout=10)
180
+ with lock:
181
+ reservations = _load_reservations(cfg)
182
+ _expire_reservations(reservations)
183
+ _release_returned_reservations(reservations, cfg)
184
+ _save_reservations(reservations, cfg)
185
 
186
  for cat_cfg in cfg["categories"]:
187
+ cat = cat_cfg["name"]
188
+ pool = _load_pool(str(_pool_path(cat, cfg)))
189
  total = len(pool)
190
 
191
+ counts = _get_accepted_counts(cat, cfg)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
+ covered = sum(1 for v in counts.values() if v >= 1)
 
 
194
  reserved_uncovered = sum(
195
+ 1 for k in reservations
196
+ if counts.get(k, 0) == 0
197
  )
198
+ truly_uncovered = total - covered - reserved_uncovered
 
 
 
199
 
200
  st.markdown(f"### {cat.capitalize()}")
 
 
201
  col1, col2, col3, col4 = st.columns(4)
202
  col1.metric("Total items", total)
203
  col2.metric("Covered βœ…", covered)
204
  col3.metric("In progress πŸ”„", reserved_uncovered,
205
+ help="Reserved by active Prolific participants")
206
  col4.metric("Still needed ⚠️", truly_uncovered,
207
  delta=f"-{truly_uncovered}" if truly_uncovered > 0 else None,
208
  delta_color="inverse")
 
210
  if truly_uncovered == 0 and reserved_uncovered == 0:
211
  st.success(f"βœ… All {total} items covered!")
212
  elif truly_uncovered == 0:
213
+ st.info(f"πŸ”„ {reserved_uncovered} item(s) in progress.")
 
 
 
214
  else:
215
  st.warning(
216
  f"⚠️ {truly_uncovered} item(s) still need a participant. "
217
+ f"Send more Prolific slots."
 
218
  )
219
 
220
  st.markdown("---")
src/config.py CHANGED
@@ -16,9 +16,13 @@ def load_config() -> dict:
16
  cfg = yaml.safe_load(f)
17
 
18
  # Secrets come only from env vars, never from yaml
19
- cfg["hf_token"] = os.getenv("HF_TOKEN", "")
20
- cfg["tinker_api_key"] = os.getenv("TINKER_API_KEY", "")
21
- cfg["debug_mode"] = os.getenv("DEBUG_MODE", "false").lower() == "true"
 
 
 
 
22
 
23
  # Derived filesystem paths
24
  cfg["base_dir"] = str(BASE_DIR)
 
16
  cfg = yaml.safe_load(f)
17
 
18
  # Secrets come only from env vars, never from yaml
19
+ cfg["hf_token"] = os.getenv("HF_TOKEN", "")
20
+ cfg["tinker_api_key"] = os.getenv("TINKER_API_KEY", "")
21
+ cfg["prolific_api_token"] = os.getenv("PROLIFIC_API_TOKEN", "")
22
+ cfg["debug_mode"] = os.getenv("DEBUG_MODE", "false").lower() == "true"
23
+
24
+ # prolific_study_id is read from the yaml (non-secret, study-specific)
25
+ cfg.setdefault("prolific_study_id", "")
26
 
27
  # Derived filesystem paths
28
  cfg["base_dir"] = str(BASE_DIR)
src/data.py CHANGED
@@ -17,13 +17,19 @@ Rejected completions = JSON files moved to rejected/ by the admin.
17
  Reservations
18
  ------------
19
  When a user starts, their items are "reserved" in a local file for 80 min.
20
- Concurrent users (up to 5) each get a FileLock on the reservation file so they
21
  never receive the same items. Reservations expire automatically so abandoned
22
  sessions don't permanently block items.
23
 
 
 
 
 
24
  Dropout / rejection recovery
25
  -----------------------------
26
- - Dropout: reservation expires after 80 min β†’ item re-enters the pool.
 
 
27
  - Rejection: admin moves json/{worker}/{id}.json β†’ rejected/{worker}/{id}.json
28
  in the HF dataset repo. On next Space restart (or cache expiry) the item's
29
  accepted count drops to 0 and it gets re-assigned.
@@ -39,9 +45,10 @@ from filelock import FileLock
39
 
40
  from src.config import CATEGORY_TO_REPO
41
 
42
- POOL_SIZE = 50 # items selected per (study_type, category)
43
- RESERVATION_TTL = 60 * 80 # 80 min: 30 min expected + ~2.5x buffer
44
- COMPLETION_CACHE_TTL = 300 # re-scan HF repo every 5 minutes
 
45
 
46
 
47
  # ── Path helpers ──────────────────────────────────────────────────────────────
@@ -299,7 +306,6 @@ def record_completion(user_id: str, items: list, cfg: dict) -> None:
299
  Uses _pool_index stamped on each item at assignment time β€” no fuzzy matching.
300
  Called after successful HF upload AND by the simulation script.
301
  """
302
- # Group by category using the stamped _pool_category and _pool_index
303
  by_category: dict = {}
304
  for item in items:
305
  cat = item.get("_pool_category") or item.get("category", "")
@@ -341,28 +347,176 @@ def record_completion(user_id: str, items: list, cfg: dict) -> None:
341
  f"(user {user_id[:8]})")
342
 
343
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
344
  # ── Core assignment ───────────────────────────────────────────────────────────
345
 
346
  def _assign_from_category(category: str, n: int, user_id: str, cfg: dict) -> list:
347
  """
348
  Assign n items using least-coverage-first strategy.
349
 
350
- Priority order:
351
  1. Uncovered + unreserved (count=0, not reserved)
352
- 2. Uncovered + reserved by other (count=0, reserved β€” likely abandoned user)
353
  3. Covered + unreserved (count>0, not reserved)
354
  4. Covered + reserved by other (count>0, reserved)
355
 
356
- This ensures abandoned users' items get picked up by subsequent users
357
- rather than already-covered items being re-assigned.
 
 
 
 
 
358
  """
359
  pool = _load_pool(str(_pool_path(category, cfg)))
360
  accepted_counts = _get_accepted_counts(category, cfg)
361
  lock = FileLock(str(_reservation_lock_path(cfg)), timeout=10)
362
 
 
 
 
 
 
 
 
 
 
 
 
363
  with lock:
364
  reservations = _load_reservations(cfg)
365
  _expire_reservations(reservations)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
366
 
367
  def is_reserved_by_other(i):
368
  r = reservations.get(str(i))
@@ -373,16 +527,24 @@ def _assign_from_category(category: str, n: int, user_id: str, cfg: dict) -> lis
373
  reserved = int(is_reserved_by_other(i))
374
  return (count, reserved)
375
 
376
- # All indices sorted by (count, is_reserved_by_other)
377
- all_indices = sorted(range(len(pool)), key=sort_key)
378
  selected_indices = all_indices[:n]
379
 
380
- # Reserve selected items (overrides stale reservations from abandoned users)
381
- expiry = time.time() + RESERVATION_TTL
382
- for i in selected_indices:
383
- reservations[str(i)] = {"user_id": user_id, "expiry": expiry}
384
-
385
- _save_reservations(reservations, cfg)
 
 
 
 
 
 
 
 
 
386
 
387
  selected = []
388
  for i in selected_indices:
@@ -470,7 +632,6 @@ def assign_items(cfg: dict, user_id: str) -> list:
470
 
471
  def _make_item_slot(item: dict, study_type: str) -> dict:
472
  base = {
473
- # Preserve pool index and category for record_completion in upload.py
474
  "_pool_index": item.get("_pool_index"),
475
  "_pool_category": item.get("_pool_category", item.get("category", "")),
476
  "conversation": {
 
17
  Reservations
18
  ------------
19
  When a user starts, their items are "reserved" in a local file for 80 min.
20
+ Concurrent users each get a FileLock on the reservation file so they
21
  never receive the same items. Reservations expire automatically so abandoned
22
  sessions don't permanently block items.
23
 
24
+ Each reservation stores the user's prolific_pid so we can release their items
25
+ immediately when Prolific reports them as RETURNED or TIMED-OUT β€” no need to
26
+ wait for the 80-min TTL.
27
+
28
  Dropout / rejection recovery
29
  -----------------------------
30
+ - Dropout (voluntary return): Prolific marks RETURNED, we query the API and
31
+ release the reservation on the next assignment.
32
+ - Dropout (silent): reservation expires after 80 min β†’ item re-enters pool.
33
  - Rejection: admin moves json/{worker}/{id}.json β†’ rejected/{worker}/{id}.json
34
  in the HF dataset repo. On next Space restart (or cache expiry) the item's
35
  accepted count drops to 0 and it gets re-assigned.
 
45
 
46
  from src.config import CATEGORY_TO_REPO
47
 
48
+ POOL_SIZE = 50 # items selected per (study_type, category)
49
+ RESERVATION_TTL = 60 * 80 # 80 min: 30 min expected + ~2.5x buffer
50
+ COMPLETION_CACHE_TTL = 300 # re-scan HF repo every 5 minutes
51
+ PROLIFIC_POLL_CACHE_TTL = 120 # re-poll Prolific every 2 minutes
52
 
53
 
54
  # ── Path helpers ──────────────────────────────────────────────────────────────
 
306
  Uses _pool_index stamped on each item at assignment time β€” no fuzzy matching.
307
  Called after successful HF upload AND by the simulation script.
308
  """
 
309
  by_category: dict = {}
310
  for item in items:
311
  cat = item.get("_pool_category") or item.get("category", "")
 
347
  f"(user {user_id[:8]})")
348
 
349
 
350
+ # ── Prolific status polling ───────────────────────────────────────────────────
351
+
352
+ def _prolific_returned_pids(cfg: dict) -> set:
353
+ """
354
+ Query Prolific for participants who have RETURNED or TIMED-OUT from the
355
+ active study. Returns a set of their PIDs. Cached for PROLIFIC_POLL_CACHE_TTL.
356
+ """
357
+ token = cfg.get("prolific_api_token", "")
358
+ study_id = cfg.get("prolific_study_id", "")
359
+ if not token or not study_id:
360
+ return set()
361
+
362
+ cache_path = _data_dir(cfg) / "prolific_returned_cache.json"
363
+ now = time.time()
364
+
365
+ if cache_path.exists():
366
+ try:
367
+ with open(cache_path) as f:
368
+ c = json.load(f)
369
+ if now - c.get("timestamp", 0) < PROLIFIC_POLL_CACHE_TTL:
370
+ return set(c.get("returned_pids", []))
371
+ except Exception:
372
+ pass
373
+
374
+ returned = set()
375
+ try:
376
+ import requests
377
+ url = f"https://api.prolific.com/api/v1/studies/{study_id}/submissions/"
378
+ headers = {"Authorization": f"Token {token}"}
379
+ resp = requests.get(url, headers=headers, timeout=10)
380
+ resp.raise_for_status()
381
+ for sub in resp.json().get("results", []):
382
+ status = sub.get("status", "")
383
+ if status in ("RETURNED", "TIMED-OUT", "TIMED_OUT"):
384
+ pid = sub.get("participant_id") or sub.get("participant", "")
385
+ if pid:
386
+ returned.add(pid)
387
+ print(f"[PROLIFIC] Found {len(returned)} returned/timed-out participants")
388
+ except Exception as e:
389
+ print(f"[PROLIFIC] Could not query API: {e}")
390
+
391
+ try:
392
+ with open(cache_path, "w") as f:
393
+ json.dump({"timestamp": now, "returned_pids": list(returned)}, f)
394
+ except Exception:
395
+ pass
396
+
397
+ return returned
398
+
399
+
400
+ def _release_returned_reservations(reservations: dict, cfg: dict) -> None:
401
+ """
402
+ Remove reservations held by Prolific participants who have RETURNED or
403
+ TIMED-OUT. Mutates the reservations dict in place.
404
+ """
405
+ returned_pids = _prolific_returned_pids(cfg)
406
+ if not returned_pids:
407
+ return
408
+
409
+ released = []
410
+ for idx, r in list(reservations.items()):
411
+ pid = r.get("prolific_pid", "")
412
+ if pid and pid in returned_pids:
413
+ released.append(idx)
414
+ del reservations[idx]
415
+ if released:
416
+ print(f"[ASSIGN] Released {len(released)} reservations from returned/timed-out participants: {released}")
417
+
418
+
419
+ def all_items_covered(cfg: dict) -> bool:
420
+ """
421
+ Returns True if every item in every category has been accepted at least once.
422
+ Used for auto-pausing the Prolific study.
423
+ """
424
+ for cat_cfg in cfg["categories"]:
425
+ cat = cat_cfg["name"]
426
+ pool = _load_pool(str(_pool_path(cat, cfg)))
427
+ counts = _get_accepted_counts(cat, cfg)
428
+ for i in range(len(pool)):
429
+ if counts.get(str(i), 0) < 1:
430
+ return False
431
+ return True
432
+
433
+
434
+ def pause_prolific_study(cfg: dict) -> bool:
435
+ """
436
+ Call Prolific's API to pause the study. Returns True on success.
437
+ Requires prolific_api_token (env PROLIFIC_API_TOKEN) and prolific_study_id.
438
+ Idempotent β€” safe to call multiple times (Prolific treats repeated pauses as no-ops).
439
+ """
440
+ token = cfg.get("prolific_api_token", "")
441
+ study_id = cfg.get("prolific_study_id", "")
442
+ if not token or not study_id:
443
+ print("[PROLIFIC] Cannot auto-pause: no API token or study_id configured")
444
+ return False
445
+
446
+ # Idempotency marker so we don't spam the API on every completion after
447
+ # the first time all items are covered.
448
+ paused_marker = _data_dir(cfg) / ".prolific_paused"
449
+ if paused_marker.exists():
450
+ return True
451
+
452
+ try:
453
+ import requests
454
+ url = f"https://api.prolific.com/api/v1/studies/{study_id}/transition/"
455
+ headers = {"Authorization": f"Token {token}", "Content-Type": "application/json"}
456
+ resp = requests.post(url, headers=headers, json={"action": "PAUSE"}, timeout=10)
457
+ resp.raise_for_status()
458
+ paused_marker.touch()
459
+ print(f"[PROLIFIC] βœ… Study {study_id} paused automatically β€” all items covered.")
460
+ return True
461
+ except Exception as e:
462
+ print(f"[PROLIFIC] Could not auto-pause study: {e}")
463
+ return False
464
+
465
+
466
  # ── Core assignment ───────────────────────────────────────────────────────────
467
 
468
  def _assign_from_category(category: str, n: int, user_id: str, cfg: dict) -> list:
469
  """
470
  Assign n items using least-coverage-first strategy.
471
 
472
+ Priority order (via sort key):
473
  1. Uncovered + unreserved (count=0, not reserved)
474
+ 2. Uncovered + reserved by other (count=0, reserved)
475
  3. Covered + unreserved (count>0, not reserved)
476
  4. Covered + reserved by other (count>0, reserved)
477
 
478
+ Reservations are ONLY created for participants who come via Prolific
479
+ (i.e. have a non-empty prolific_pid in the URL). Non-Prolific visitors
480
+ (testers, previewers, direct-URL visitors) still get items assigned so
481
+ they can run through the study, but they don't hold reservations.
482
+
483
+ Reservations from participants who have RETURNED/TIMED-OUT on Prolific
484
+ are released BEFORE the sort, so their items are treated as unreserved.
485
  """
486
  pool = _load_pool(str(_pool_path(category, cfg)))
487
  accepted_counts = _get_accepted_counts(category, cfg)
488
  lock = FileLock(str(_reservation_lock_path(cfg)), timeout=10)
489
 
490
+ # Capture prolific_pid early so we can decide whether to reserve.
491
+ # Read from query_params directly β€” session_state.study_state doesn't
492
+ # exist yet during init_state, which is what calls this function.
493
+ prolific_pid = ""
494
+ try:
495
+ params = st.query_params
496
+ prolific_pid = params.get("PROLIFIC_PID", "") or ""
497
+ except Exception:
498
+ pass
499
+ is_prolific = bool(prolific_pid)
500
+
501
  with lock:
502
  reservations = _load_reservations(cfg)
503
  _expire_reservations(reservations)
504
+ _release_returned_reservations(reservations, cfg)
505
+
506
+ # If this Prolific PID already has reservations (e.g. they refreshed
507
+ # the tab, got a new user_id, and came back), release the old ones
508
+ # before creating new ones. Prevents the same participant from
509
+ # accumulating multiple reservations.
510
+ if is_prolific:
511
+ stale = [
512
+ idx for idx, r in list(reservations.items())
513
+ if r.get("prolific_pid") == prolific_pid
514
+ ]
515
+ for idx in stale:
516
+ del reservations[idx]
517
+ if stale:
518
+ print(f"[ASSIGN] Released {len(stale)} prior reservations "
519
+ f"for returning PID {prolific_pid}")
520
 
521
  def is_reserved_by_other(i):
522
  r = reservations.get(str(i))
 
527
  reserved = int(is_reserved_by_other(i))
528
  return (count, reserved)
529
 
530
+ all_indices = sorted(range(len(pool)), key=sort_key)
 
531
  selected_indices = all_indices[:n]
532
 
533
+ # Only reserve if this is a Prolific participant β€” keeps the
534
+ # admin "in progress" count accurate and stops testers/bouncers
535
+ # from blocking items for real users.
536
+ if is_prolific:
537
+ expiry = time.time() + RESERVATION_TTL
538
+ for i in selected_indices:
539
+ reservations[str(i)] = {
540
+ "user_id": user_id,
541
+ "prolific_pid": prolific_pid,
542
+ "expiry": expiry,
543
+ }
544
+ _save_reservations(reservations, cfg)
545
+ print(f"[ASSIGN] Reserved for Prolific PID {prolific_pid}")
546
+ else:
547
+ print(f"[ASSIGN] Non-Prolific visitor β€” no reservation created")
548
 
549
  selected = []
550
  for i in selected_indices:
 
632
 
633
  def _make_item_slot(item: dict, study_type: str) -> dict:
634
  base = {
 
635
  "_pool_index": item.get("_pool_index"),
636
  "_pool_category": item.get("_pool_category", item.get("category", "")),
637
  "conversation": {
src/model.py CHANGED
@@ -1,4 +1,4 @@
1
- """Tinker inference client. Model name is read from study_config.yaml."""
2
  import re
3
 
4
  import streamlit as st
@@ -33,14 +33,14 @@ def _get_tinker_clients(model_name: str, sampler_path: str = ""):
33
 
34
 
35
  def call_model(messages: list, cfg: dict) -> str:
36
- """
37
- Send a message list to Tinker and return cleaned response text.
38
- """
39
  model_name = cfg["model_name"]
40
  sampler_path = cfg.get("sampler_path", "")
41
  print(f"[MODEL] model_name={model_name} sampler_path={sampler_path or '(base)'}")
42
  print(f"[MODEL] num_messages={len(messages)}")
43
  print(f"[MODEL] roles={[m['role'] for m in messages]}")
 
 
44
 
45
  try:
46
  from tinker_cookbook import renderers as tinker_renderers
@@ -62,15 +62,11 @@ def call_model(messages: list, cfg: dict) -> str:
62
  parsed_message, _ = renderer.parse_response(result.sequences[0].tokens)
63
  content = tinker_renderers.format_content_as_string(parsed_message["content"])
64
 
65
- # Strip <think>…</think>
66
  content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
67
- # Strip leaked control tokens
68
  content = re.sub(r"<\|[^|]*\|>", "", content).strip()
69
- # Truncate degenerate repetition: any 40+ char span repeated 5+ times
70
  match = re.search(r"(.{40,}?)\1{4,}", content, flags=re.DOTALL)
71
  if match:
72
  content = content[: match.start() + len(match.group(1))].strip()
73
- # Fallback if cleanup left nothing usable
74
  if not content or len(content.split()) < 3:
75
  raise ValueError("Model output cleanup yielded no usable content.")
76
 
 
1
+ """Tinker inference client. Supports both base models and fine-tuned checkpoints."""
2
  import re
3
 
4
  import streamlit as st
 
33
 
34
 
35
  def call_model(messages: list, cfg: dict) -> str:
36
+ """Send a message list to Tinker and return cleaned response text."""
 
 
37
  model_name = cfg["model_name"]
38
  sampler_path = cfg.get("sampler_path", "")
39
  print(f"[MODEL] model_name={model_name} sampler_path={sampler_path or '(base)'}")
40
  print(f"[MODEL] num_messages={len(messages)}")
41
  print(f"[MODEL] roles={[m['role'] for m in messages]}")
42
+ if messages:
43
+ print(f"[MODEL] system_prompt[:150]={messages[0]['content'][:150]}")
44
 
45
  try:
46
  from tinker_cookbook import renderers as tinker_renderers
 
62
  parsed_message, _ = renderer.parse_response(result.sequences[0].tokens)
63
  content = tinker_renderers.format_content_as_string(parsed_message["content"])
64
 
 
65
  content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
 
66
  content = re.sub(r"<\|[^|]*\|>", "", content).strip()
 
67
  match = re.search(r"(.{40,}?)\1{4,}", content, flags=re.DOTALL)
68
  if match:
69
  content = content[: match.start() + len(match.group(1))].strip()
 
70
  if not content or len(content.split()) < 3:
71
  raise ValueError("Model output cleanup yielded no usable content.")
72
 
src/upload.py CHANGED
@@ -71,6 +71,13 @@ def save_and_upload(state: dict, cfg: dict) -> None:
71
  # Record completion locally β€” updates counts immediately without waiting
72
  # for an HF re-scan. Also invalidates the HF cache.
73
  record_completion(state.get("user_id", ""), state.get("items", []), cfg)
 
 
 
 
 
 
 
74
 
75
  # ── Write + upload CSV ────────────────────────────────────────────────────
76
  _save_and_upload_csv(state, cfg, hf_api, safe_worker, submission_id)
 
71
  # Record completion locally β€” updates counts immediately without waiting
72
  # for an HF re-scan. Also invalidates the HF cache.
73
  record_completion(state.get("user_id", ""), state.get("items", []), cfg)
74
+ # Auto-pause Prolific study if all items are now covered
75
+ try:
76
+ from src.data import all_items_covered, pause_prolific_study
77
+ if all_items_covered(cfg):
78
+ pause_prolific_study(cfg)
79
+ except Exception as e:
80
+ print(f"[SAVE] Auto-pause check failed: {e}")
81
 
82
  # ── Write + upload CSV ────────────────────────────────────────────────────
83
  _save_and_upload_csv(state, cfg, hf_api, safe_worker, submission_id)
study_config.yaml CHANGED
@@ -26,27 +26,28 @@ study_type: preference
26
  # The two counts must sum to pairs_per_user.
27
  categories:
28
  - name: movies
29
- count: 5
30
 
31
  model_variants:
32
  - name: base
33
  model_name: "meta-llama/Llama-3.1-8B-Instruct"
34
- sampler_path: "tinker://04f8057d-e987-56aa-ada6-d56cc1d1ae2d:train:0/sampler_weights/000150"
35
  prompt_variant:
36
- personalization: false
37
  detailed_instruction: true
38
- count: 5 # items using this variant for odd-numbered users
39
  # counts swap on alternating users:
40
 
41
  pair_selection_seed: 42 # Seed for reproducible 50-item pool selection per category
42
- pairs_per_user: 5 # Total items/pairs shown per participant
43
 
44
  # Chat constraints β€” both set to 3 so each participant has exactly 3 real exchanges.
45
  min_turns: 3 # Minimum exchanges before "done" button is enabled
46
  max_turns: 3 # Hard cap; input is disabled after this many exchanges
47
 
48
- # Prolific
49
- prolific_completion_code: "CBRCO395"
 
50
 
51
  # HuggingFace dataset repo where results (JSON + CSV) are uploaded
52
- output_dataset_repo: "ehejin/user_study-preference-base_DETAILED_checkpoint"
 
26
  # The two counts must sum to pairs_per_user.
27
  categories:
28
  - name: movies
29
+ count: 2
30
 
31
  model_variants:
32
  - name: base
33
  model_name: "meta-llama/Llama-3.1-8B-Instruct"
34
+ sampler_path: ""
35
  prompt_variant:
36
+ personalization: true
37
  detailed_instruction: true
38
+ count: 2 # items using this variant for odd-numbered users
39
  # counts swap on alternating users:
40
 
41
  pair_selection_seed: 42 # Seed for reproducible 50-item pool selection per category
42
+ pairs_per_user: 2 # Total items/pairs shown per participant
43
 
44
  # Chat constraints β€” both set to 3 so each participant has exactly 3 real exchanges.
45
  min_turns: 3 # Minimum exchanges before "done" button is enabled
46
  max_turns: 3 # Hard cap; input is disabled after this many exchanges
47
 
48
+ # Prolific
49
+ prolific_completion_code: "CREJ69QR"
50
+ prolific_study_id: "69cdb78670b55c986db0d736"
51
 
52
  # HuggingFace dataset repo where results (JSON + CSV) are uploaded
53
+ output_dataset_repo: "ehejin/user_study-preference-personalized_BASE"