# app.py – DivPol Creativity Study (Hugging Face Space)
# ---------------------------------------------------------
# UI: Original two-panel layout (GPT chat left, scoring right)
# Scoring: 3 embedding models × pool distance → z-score → Φ(z) → average × 100
# - New responses are chunked (~300 chars each)
# - Each chunk is scored against the AI reference pool (pre-parsed chunks)
# - Chunk percentiles are averaged within each model, then averaged across models
# Flow: Prolific ID → 3 tasks (randomised) × 5 submissions → Qualtrics redirect
# ---------------------------------------------------------
#
# CHANGELOG (v4 – Feb 2026)
# --------------------------
# [INSTR] New welcome + instruction text per participant script
# [CONT] "Continue" button between tasks (manual advance)
# [TERM] "Prompt X/3" → "Task X/3"; "Round/Attempt" → "Submission"
# [LABEL] Score bar: "divergence" → "distinctiveness"
# [CHAR] Submission blocked outside 300–600 chars; live counter in status box
# [TABLE] mpnet/noinstruct/gist columns removed; "DivPol" → "Distinctiveness Score"
# [REDIR] Qualtrics redirect on study completion (placeholder URL)
# ---------------------------------------------------------
import os, json, random, hashlib, threading, csv as _csv, re
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import gradio as gr
from scipy.stats import norm
import torch
from transformers import AutoTokenizer, AutoModel
try:
from openai import OpenAI
_HAS_OPENAI = True
except Exception:
_HAS_OPENAI = False
# ============================================================
# Config
# ============================================================
QUALTRICS_REDIRECT_URL = "https://georgetown.az1.qualtrics.com/jfe/form/SV_9FaybvTxkZOaVFk"
CHAR_MIN = 300
CHAR_MAX = 600
SUBMISSIONS_PER_TASK = 5
# Column list for the history table
HIST_COLUMNS = ["Submission", "Response Preview", "Distinctiveness Score"]
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
EMB_MODELS = [
"sentence-transformers/all-mpnet-base-v2",
"avsolatorio/NoInstruct-small-Embedding-v0",
"avsolatorio/GIST-Embedding-v0",
]
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ============================================================
# Paths
# ============================================================
BASE_DIR = Path(__file__).resolve().parent
PERSISTENT_DIR = Path("/data")
if PERSISTENT_DIR.exists():
CACHE_DIR = PERSISTENT_DIR / "cache"
DATA_DIR = PERSISTENT_DIR / "responses"
os.environ["HF_HOME"] = str(PERSISTENT_DIR / ".huggingface")
else:
CACHE_DIR = BASE_DIR / "cache"
DATA_DIR = BASE_DIR / "data"
CACHE_DIR.mkdir(exist_ok=True)
DATA_DIR.mkdir(exist_ok=True)
# ============================================================
# Tasks
# ============================================================
PROMPTS = {
"car": {
"name": "Car Safety Feature",
"text": (
"Create a new feature for a car that would help keep "
"drivers and pedestrians safe."
),
"ref_file": "car_xgb_reference_plus_responses.xlsx",
},
"teambuilding": {
"name": "Team Building Activity",
"text": (
"What are some ways to do teambuilding on video conferencing, "
"with each person only needing a piece of paper and a rubber band?"
),
"ref_file": "teambuilding_xgb_reference_plus_responses.xlsx",
},
"routine": {
"name": "Morning Routine",
"text": (
"Design a 20-minute morning routine that helps someone who "
"wants to start their day in a better mindset."
),
"ref_file": "routine_xgb_reference_plus_responses.xlsx",
},
}
# ============================================================
# Instruction text [INSTR]
# ============================================================
WELCOME_TEXT = """\
👋 Welcome! To start, please enter your Prolific ID and click the "Start Study" button.
"""
INSTRUCTION_TEXT = """\
On the left panel, you will work with an AI chatbot. You may send messages \
to the chatbot by typing and clicking the "Send" button, and you will receive \
interactive responses. You may interact with it as many or as few times as you like \
(but please use it at least once per task).
\
On the right panel, the "Sketchpad" allows you to draft and organize your response \
(please aim for 300–600 characters. Your submission will be blocked otherwise). \
When you're ready, click "Copy to Submission Box" to move your response to the \
submission box, then click "Submit and Score."
\
You will complete five submissions per task, for a total of three tasks. \
After each submission, you will receive a Distinctiveness Score out of 100:
\
• A score closer to 0 means your response is very similar \
to a typical AI-generated response.
\
• A score of 50 indicates your response is moderately similar \
to a typical AI-generated response.
\
• A score closer to 100 means your response is very different \
from a typical AI-generated response.
\
Your goal is to refine and develop your ideas so your submissions become increasingly \
distinct from the AI response. You will be able to see your response history and scores \
in a table after each submission, for a total of five submissions per task.
\
Once you finish one task, click the "Continue to the Next Task" button to move on to the next.\
"""
def make_prompt_html(prompt_cfg):
return (
f''
f'
📝 {prompt_cfg["name"]}
'
f'
{prompt_cfg["text"]}
'
f'
{INSTRUCTION_TEXT}
'
f'
'
)
# ============================================================
# Text cleaning & chunking (from parser_utils.py)
# ============================================================
def clean_fun(html_string):
html_string = re.sub(r'&\(\d\)', ' ', html_string)
html_string = re.sub(r'&\d+;', ' ', html_string)
html_string = re.sub(r'\\', '', html_string)
_remove_patterns = [
b'\xc3\x83\xc6\x92\xc3\x82\xc2\xa2',
b'\xc3\x83\xc6\x92',
b'\xc3\x83\xc2\xa2\xc3\x82\xc2\xac',
b'\xc3\x83\xe2\x80\xa6',
b'\xc3\x83\xe2\x80\x9a\xc3\x82\xc2\xa6',
b'\xc3\x83\xc6\x92\xc3\x82\xe2\x80\x9a',
b'\xc3\x83\xc2\xa2\xc3\x82\xe2\x80\x9a\xc3\x82\xc2\xac',
b'\xc3\x83\xc6\x92\xc3\x82\xc2\xa2\xc3\x83\xc2\xa2\xc3\x82\xe2\x80\x9a\xc3\x82\xc2\xac\xc3\x83\xc2\xa2\xc3\x82\xe2\x80\x9e\xc3\x82\xc2\xa2',
b'\xc3\x83\xe2\x80\xa6\xc3\x82\xe2\x80\x9c',
]
for pat in _remove_patterns:
html_string = html_string.replace(pat.decode('utf-8', errors='replace'), '')
_apostrophe_patterns = [
b'\xc3\x83\xc6\x92\xc3\x82\xc2\xa2\xc3\x83\xc2\xa2\xc3\x82\xc2\xac\xc3\x83\xc2\xa2\xc3\x82\xc2\xa2',
b'\xc3\x83\xe2\x80\x9a\xc3\x82\xc2\xb4',
]
for pat in _apostrophe_patterns:
html_string = html_string.replace(pat.decode('utf-8', errors='replace'), "'")
html_string = html_string.replace('`', "'")
html_string = re.sub(r'\u009d', '', html_string)
html_string = re.sub(r'<.*?>', '', html_string)
return html_string
class CustomChunkTokenizer:
def __init__(self, chunk_size=300, direction='forward', clean_text=True, min_chunks=2):
self.chunk_size = chunk_size
self.direction = direction.lower()
self.clean_text = clean_text
self.min_chunks = min_chunks
def _remove_emojis_and_symbols(self, text):
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"\U0001F900-\U0001F9FF"
"\U0001FA70-\U0001FAFF"
"]+", flags=re.UNICODE
)
text = emoji_pattern.sub('', text)
text = re.sub(r'[*#@$%^&+=<>|~`]', '', text)
return text
def _clean_markdown(self, text):
text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'_(.+?)_', r'\1', text)
text = re.sub(r'^\s*[\*\-\+]\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE)
text = re.sub(r'```.*?```', '', text, flags=re.DOTALL)
text = re.sub(r'`(.+?)`', r'\1', text)
text = re.sub(r'[•◦▪▸‣▫▹►‣◁○■□▢▣▤▥▦▧▨▩◘◙◉◎]', '', text)
text = re.sub(r'^[\s\-\*_]{3,}\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'\s+---+\s+', ' ', text)
return text
def _preprocess_text(self, text):
if not self.clean_text:
return text
text = self._clean_markdown(text)
text = self._remove_emojis_and_symbols(text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
return text
def tokenize(self, text):
processed_text = self._preprocess_text(text)
chunks = self._create_chunks(text)
if len(chunks) == 0:
return [processed_text]
return chunks if len(chunks) >= self.min_chunks else [processed_text]
def _create_chunks(self, text):
text = self._preprocess_text(text)
words = text.split()
if self.direction == 'backward':
words = words[::-1]
chunks = []
current_chunk = ""
for word in words:
test_chunk = current_chunk + (" " if current_chunk else "") + word
if len(test_chunk) <= self.chunk_size:
current_chunk = test_chunk
else:
if not current_chunk:
current_chunk = word
else:
chunks.append(current_chunk)
current_chunk = word
if current_chunk:
if len(current_chunk) < 300 and len(chunks) > 0:
chunks[-1] = chunks[-1] + " " + current_chunk
else:
chunks.append(current_chunk)
if self.direction == 'backward':
chunks = [' '.join(chunk.split()[::-1]) for chunk in chunks[::-1]]
return chunks
def clean_punctuation(sentence):
return re.sub(r'[.?!*]', ' ', sentence)
def clean_new_response(essay):
sent_tokenizer = CustomChunkTokenizer(
chunk_size=300, direction='forward', clean_text=True, min_chunks=2
)
essay = clean_fun(essay)
sents = sent_tokenizer.tokenize(essay)
sents = [clean_punctuation(s) for s in sents]
return sents
# ============================================================
# Embedding models (lazy singletons)
# ============================================================
_models: Dict[str, Any] = {}
_model_lock = threading.Lock()
def _load_model(model_name: str):
if model_name not in _models:
with _model_lock:
if model_name not in _models:
print(f"[model] Loading {model_name} …", flush=True)
tok = AutoTokenizer.from_pretrained(model_name)
mdl = AutoModel.from_pretrained(
model_name, output_hidden_states=True
).to(DEVICE)
mdl.eval()
_models[model_name] = (tok, mdl)
return _models[model_name]
def embed_texts(texts: List[str], model_name: str,
batch_size: int = 64) -> np.ndarray:
tok, mdl = _load_model(model_name)
parts = []
total_batches = (len(texts) + batch_size - 1) // batch_size
for batch_idx, i in enumerate(range(0, len(texts), batch_size)):
batch = texts[i: i + batch_size]
if total_batches > 1:
print(f" [embed] batch {batch_idx+1}/{total_batches} "
f"({i+len(batch)}/{len(texts)} texts)", flush=True)
enc = tok(batch, padding="max_length", truncation=True, return_tensors="pt")
enc = {k: v.to(DEVICE) for k, v in enc.items()}
with torch.no_grad():
out = mdl(**enc)
h = out.hidden_states[-1][:, 0, :]
parts.append(h.cpu().numpy().astype(np.float32))
return np.vstack(parts)
def embed_single(text: str, model_name: str) -> np.ndarray:
return embed_texts([text], model_name)[0]
# ============================================================
# Baseline
# ============================================================
_baselines: Dict[str, Dict[str, Any]] = {}
def _baseline_key(prompt_key: str, model_name: str) -> str:
return f"{prompt_key}__{model_name.replace('/', '__')}"
def _compute_baseline(prompt_key: str, model_name: str) -> Dict[str, Any]:
bkey = _baseline_key(prompt_key, model_name)
npz = CACHE_DIR / f"{bkey}_pool_embs.npz"
jsn = CACHE_DIR / f"{bkey}_zparams.json"
if npz.exists() and jsn.exists():
data = np.load(npz)
stats = json.loads(jsn.read_text())
print(f"[{prompt_key}|{model_name}] Loaded cached baseline "
f"(N={stats['n_pool']}, M={stats['z_mean']:.6f}, SD={stats['z_sd']:.6f})")
return {"pool_embs": data["pool_embs"], "z_mean": stats["z_mean"],
"z_sd": stats["z_sd"], "n_pool": stats["n_pool"]}
ref_path = BASE_DIR / PROMPTS[prompt_key]["ref_file"]
df = pd.read_excel(ref_path)
ai_df = df[~df['respondent'].str.contains('human')]
pool_sentences = ai_df['sentence'].astype(str).tolist()
print(f"[{prompt_key}|{model_name}] Embedding {len(pool_sentences)} AI pool chunks …",
flush=True)
pool_embs = embed_texts(pool_sentences, model_name)
print(f"[{prompt_key}|{model_name}] Computing pairwise distances …", flush=True)
from sklearn.metrics.pairwise import cosine_similarity
sims = cosine_similarity(pool_embs)
dists = 1.0 - sims
lower = np.tril(dists, k=-1)
vals = lower[lower != 0]
z_mean = float(np.mean(vals))
z_sd = float(np.std(vals))
np.savez_compressed(str(npz), pool_embs=pool_embs.astype(np.float32))
jsn.write_text(json.dumps({"z_mean": z_mean, "z_sd": z_sd,
"n_pool": len(pool_sentences)}))
print(f"[{prompt_key}|{model_name}] Baseline: N={len(pool_sentences)}, "
f"M={z_mean:.6f}, SD={z_sd:.6f}")
return {"pool_embs": pool_embs.astype(np.float32), "z_mean": z_mean,
"z_sd": z_sd, "n_pool": len(pool_sentences)}
def get_baseline(prompt_key: str, model_name: str) -> Dict[str, Any]:
bkey = _baseline_key(prompt_key, model_name)
if bkey not in _baselines:
_baselines[bkey] = _compute_baseline(prompt_key, model_name)
return _baselines[bkey]
# ============================================================
# Scoring
# ============================================================
def _score_chunk_one_model(chunk_embedding, prompt_key, model_name):
from sklearn.metrics.pairwise import cosine_similarity
bl = get_baseline(prompt_key, model_name)
sims = cosine_similarity(chunk_embedding.reshape(1, -1), bl["pool_embs"])[0]
dists = 1.0 - sims
mean_dist = float(np.mean(dists))
z = (mean_dist - bl["z_mean"]) / bl["z_sd"] if bl["z_sd"] > 0 else 0.0
return float(norm.cdf(z)) * 100
def score_text(text: str, prompt_key: str) -> Dict[str, float]:
text = (text or "").strip()
if not text:
return {"mpnet": 0.0, "noinstruct": 0.0, "gist": 0.0, "final": 0.0}
chunks = clean_new_response(text)
result = {}
model_averages = []
for model_name, short in zip(EMB_MODELS, ["mpnet", "noinstruct", "gist"]):
scores = [_score_chunk_one_model(embed_single(c, model_name), prompt_key, model_name)
for c in chunks]
avg = float(np.mean(scores))
result[short] = round(avg, 1)
model_averages.append(avg)
result["final"] = round(float(np.mean(model_averages)), 1)
return result
# ============================================================
# Data persistence (dual-write: primary + backup)
# ============================================================
_csv_lock = threading.Lock()
# Backup always writes to app directory as second copy
BACKUP_DIR = BASE_DIR / "data_backup"
BACKUP_DIR.mkdir(exist_ok=True)
def _write_csv(csv_path, row):
"""Append a row to a CSV file, creating header if needed."""
write_header = not csv_path.exists()
with open(csv_path, "a", newline="", encoding="utf-8") as f:
w = _csv.DictWriter(f, fieldnames=list(row.keys()))
if write_header:
w.writeheader()
w.writerow(row)
def _write_json(json_path, row, chat_history, task_order):
"""Append a response to a per-participant JSON file."""
data = json.loads(json_path.read_text()) if json_path.exists() else {
"prolific_id": row["prolific_id"], "started": row["timestamp"],
"task_order": task_order, "responses": []
}
json_row = dict(row)
json_row["chat_history"] = chat_history # native list, not JSON string
data["responses"].append(json_row)
json_path.write_text(json.dumps(data, indent=2))
def save_response(prolific_id, prompt_key, submission_num, task_index,
response_text, scores, task_order,
sketchpad_text="", chat_history=None):
chat_history = chat_history or []
chat_json = json.dumps(chat_history, ensure_ascii=False)
row = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"prolific_id": prolific_id,
"task_key": prompt_key,
"task_index": task_index + 1,
"submission": submission_num,
"sketchpad_text": sketchpad_text,
"response_text": response_text,
"mpnet": scores["mpnet"],
"noinstruct": scores["noinstruct"],
"gist": scores["gist"],
"divpol_score": scores["final"],
"task_order": json.dumps(task_order),
"chat_history": chat_json,
}
with _csv_lock:
# Primary write (persistent storage)
try:
_write_csv(DATA_DIR / "responses.csv", row)
_write_json(DATA_DIR / f"{prolific_id}.json",
row, chat_history, task_order)
except Exception as e:
print(f"[WARN] Primary save failed: {e}", flush=True)
# Backup write (app directory — separate copy)
try:
_write_csv(BACKUP_DIR / "responses.csv", row)
_write_json(BACKUP_DIR / f"{prolific_id}.json",
row, chat_history, task_order)
except Exception as e:
print(f"[WARN] Backup save failed: {e}", flush=True)
# ============================================================
# OpenAI chat
# ============================================================
def chat_reply(history, user_msg, system_prompt):
history = history or []
user_msg = (user_msg or "").strip()
if not user_msg:
return history, ""
if not _HAS_OPENAI:
history.append({"role": "user", "content": user_msg})
history.append({"role": "assistant",
"content": f"(OpenAI not installed) {user_msg}"})
return history, ""
api_key = os.getenv("OPENAI_API_KEY", "").strip()
if not api_key:
history.append({"role": "user", "content": user_msg})
history.append({"role": "assistant",
"content": "OPENAI_API_KEY is missing in Space Secrets."})
return history, ""
client = OpenAI(api_key=api_key)
messages = []
sys_p = (system_prompt or "").strip()
if sys_p:
messages.append({"role": "system", "content": sys_p})
messages.extend(history)
messages.append({"role": "user", "content": user_msg})
try:
resp = client.responses.create(model=OPENAI_MODEL, input=messages, temperature=0.7)
answer = (resp.output_text or "").strip()
except Exception:
resp = client.chat.completions.create(model=OPENAI_MODEL, messages=messages,
temperature=0.7)
answer = resp.choices[0].message.content.strip()
return history + [{"role": "user", "content": user_msg},
{"role": "assistant", "content": answer}], ""
def chat_clear():
return []
# ============================================================
# Score visual
# ============================================================
def make_score_visual(score):
pct = max(0, min(100, score))
if pct < 25:
color, label = "#e74c3c", "Low distinctiveness"
elif pct < 45:
color, label = "#e67e22", "Below average distinctiveness"
elif pct < 55:
color, label = "#f1c40f", "Average distinctiveness"
elif pct < 75:
color, label = "#2ecc71", "Above average distinctiveness"
else:
color, label = "#27ae60", "High distinctiveness"
return f"""
0 – Very similar to AI
100 – Very different from AI
{pct:.1f}
{label}
"""
# ============================================================
# Qualtrics redirect HTML
# ============================================================
def make_redirect_html(prolific_id):
url = f"{QUALTRICS_REDIRECT_URL}?PROLIFIC_PID={prolific_id}"
return (
f''
f'
Study complete — thank you!
'
f'
You will be redirected to the survey '
f'shortly. If not, '
f'click here.
'
f''
f'
'
)
# ============================================================
# Character count helper
# ============================================================
def char_count_status(text):
n = len((text or "").strip())
if n == 0:
return "Character count: 0 / 600 (minimum 300)"
elif n < CHAR_MIN:
return f"⚠️ Too short: {n} / {CHAR_MAX} characters (minimum {CHAR_MIN})"
elif n > CHAR_MAX:
return f"⚠️ Too long: {n} / {CHAR_MAX} characters (maximum {CHAR_MAX})"
else:
return f"✅ {n} / {CHAR_MAX} characters (within 300–600 limit)"
# ============================================================
# UI
# ============================================================
with gr.Blocks(
theme=gr.themes.Soft(),
analytics_enabled=False,
css="""
a[href*="huggingface.co/spaces"] { display: none !important; }
footer { display: none !important; }
.built-with { display: none !important; }
"""
) as demo:
# ── State ──
st_prolific = gr.State("")
st_order = gr.State([])
st_pidx = gr.State(0)
st_submission = gr.State(1)
st_responses = gr.State([])
st_current_prompt = gr.State("car")
st_history = gr.State([])
st_task_complete = gr.State(False) # [CONT] tracks whether current task is done
sys_prompt = gr.State("You are a helpful assistant.")
# ── Welcome message ── [INSTR]
welcome_html = gr.HTML(value=WELCOME_TEXT)
# ── Top bar: Prolific ID + Start ──
with gr.Row():
with gr.Column(scale=2):
tb_prolific = gr.Textbox(
label="Prolific ID",
placeholder="Enter your Prolific ID to begin…",
max_lines=1, interactive=True,
)
with gr.Column(scale=1):
btn_start = gr.Button("Start Study", variant="primary")
md_status_bar = gr.Markdown("")
md_prompt_display = gr.HTML(value="")
with gr.Row():
# LEFT: Chat
with gr.Column(scale=1):
gr.Markdown("## Chat with AI")
chatbot = gr.Chatbot(label="Chat", type="messages", height=520)
chat_input = gr.Textbox(
label="Message", placeholder="Talk to the bot…", lines=2)
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear")
# RIGHT: Response + scoring
with gr.Column(scale=1):
gr.Markdown("## Your Response")
sketchpad = gr.Textbox(
label="📝 Sketchpad (draft your response here)",
lines=8, placeholder="Draft your ideas here…",
)
copy_btn = gr.Button("⬇ Copy to Submission Box", size="sm")
submission_box = gr.Textbox(
label="📨 Final Submission",
lines=5,
placeholder="Your final response goes here (300–600 characters).",
)
score_btn = gr.Button("Submit and Score", variant="primary")
# [CONT] Continue button — hidden until a task's 5 submissions are done
continue_btn = gr.Button(
"➡️ Continue to Next Task", variant="primary", visible=False
)
score_status = gr.Textbox(
label="Status",
value="Character count: 0 / 600 (minimum 300)",
interactive=False,
)
score_visual = gr.HTML(value="")
redirect_html = gr.HTML(value="")
history_df = gr.Dataframe(
label="Submission History",
headers=HIST_COLUMNS,
datatype=["number", "str", "number"],
interactive=False,
wrap=True,
)
# ----------------------------
# CALLBACKS
# ----------------------------
def copy_to_submission(sketch_text):
return sketch_text or ""
copy_btn.click(fn=copy_to_submission, inputs=[sketchpad], outputs=[submission_box])
submission_box.change(
fn=char_count_status,
inputs=[submission_box],
outputs=[score_status],
)
def start_study(prolific_id):
pid = (prolific_id or "").strip()
empty_hist = pd.DataFrame(
columns=HIST_COLUMNS)
if len(pid) < 3:
return (pid, [], 0, 1, [], "car", [], False,
"⚠️ **Please enter a valid Prolific ID (at least 3 characters).**",
"", empty_hist, "", "",
gr.update(visible=True), # score_btn visible
gr.update(visible=False), # continue_btn hidden
"", # welcome hidden after start
gr.update(), # btn_start stays enabled
gr.update()) # tb_prolific stays editable
rng = random.Random(hashlib.md5(pid.encode()).hexdigest())
order = list(PROMPTS.keys())
rng.shuffle(order)
pk = order[0]
status = (f"**Task 1 / 3 · Submission 1 / {SUBMISSIONS_PER_TASK}** · "
f"Participant: `{pid}`")
return (pid, order, 0, 1, [], pk, [], False,
status, make_prompt_html(PROMPTS[pk]), empty_hist, "", "",
gr.update(visible=True), # score_btn
gr.update(visible=False), # continue_btn
"", # welcome hidden
gr.update(interactive=False, variant="secondary"), # disable start btn
gr.update(interactive=False)) # lock prolific ID
btn_start.click(
fn=start_study,
inputs=[tb_prolific],
outputs=[
st_prolific, st_order, st_pidx, st_submission, st_responses,
st_current_prompt, st_history, st_task_complete,
md_status_bar, md_prompt_display, history_df,
score_visual, redirect_html,
score_btn, continue_btn,
welcome_html,
btn_start, tb_prolific,
],
)
def do_score(text, sketchpad_text, chat_history,
prompt_key, prolific_id, order, pidx, submission,
responses, history, task_complete):
text = (text or "").strip()
sketchpad_text = (sketchpad_text or "").strip()
empty_hist = pd.DataFrame(
columns=HIST_COLUMNS)
if not prolific_id:
cur_hist = pd.DataFrame(history) if history else empty_hist
return (cur_hist, "⚠️ Enter Prolific ID and click Start Study first.",
responses, submission, pidx, prompt_key, history, False,
gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(visible=True), gr.update(visible=False),
gr.update(), gr.update())
# Block if task already complete (waiting for Continue click)
if task_complete:
cur_hist = pd.DataFrame(history) if history else empty_hist
return (cur_hist,
"✅ Task complete! Click **Continue to Next Task** to proceed.",
responses, submission, pidx, prompt_key, history, True,
gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(visible=False), gr.update(visible=True),
gr.update(), gr.update())
if not text:
cur_hist = pd.DataFrame(history) if history else empty_hist
return (cur_hist, char_count_status(text),
responses, submission, pidx, prompt_key, history, False,
gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(visible=True), gr.update(visible=False),
gr.update(), gr.update())
# Enforce character limits
n = len(text)
if n < CHAR_MIN:
cur_hist = pd.DataFrame(history) if history else empty_hist
return (cur_hist,
f"⚠️ Too short: {n} characters. Please write at least {CHAR_MIN}.",
responses, submission, pidx, prompt_key, history, False,
gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(visible=True), gr.update(visible=False),
gr.update(), gr.update())
if n > CHAR_MAX:
cur_hist = pd.DataFrame(history) if history else empty_hist
return (cur_hist,
f"⚠️ Too long: {n} characters. Please keep to {CHAR_MAX} or fewer.",
responses, submission, pidx, prompt_key, history, False,
gr.update(), gr.update(), gr.update(), gr.update(),
gr.update(visible=True), gr.update(visible=False),
gr.update(), gr.update())
# Score
scores = score_text(text, prompt_key)
sc = scores["final"]
save_response(prolific_id, prompt_key, submission, pidx, text, scores, order,
sketchpad_text=sketchpad_text, chat_history=chat_history)
# History row
preview = text[:80] + "…" if len(text) > 80 else text
row = {"Submission": submission, "Response Preview": preview,
"Distinctiveness Score": sc}
new_history = history + [row]
new_responses = responses + [{"task_key": prompt_key, "submission": submission,
"response_text": text, "score": sc}]
visual_html = make_score_visual(sc)
hist_df = pd.DataFrame(new_history) if new_history else empty_hist
status_msg = f"✅ Scored {n} chars → Distinctiveness Score = {sc:.1f} / 100"
# Check if this was the last submission for this task
if submission >= SUBMISSIONS_PER_TASK:
# Task is done — show Continue button, hide Submit button
new_task_complete = True
new_pidx = pidx
new_pk = prompt_key
new_submission = submission # keep at 5
# Check if this was the LAST task entirely
if pidx + 1 >= len(order):
bar = (f"✅ **Study complete!** You submitted {len(new_responses)} "
f"responses. Redirecting to survey…")
return (hist_df, "Study complete — redirecting to survey.",
new_responses, new_submission, new_pidx, new_pk,
new_history, False,
bar, gr.update(), visual_html, make_redirect_html(prolific_id),
gr.update(visible=False), gr.update(visible=False),
"", "")
bar = (f"**Task {pidx + 1} / 3 · "
f"Submission {submission} / {SUBMISSIONS_PER_TASK}** · "
f"Participant: `{prolific_id}` — "
f"✅ **Task complete!** Click Continue when ready.")
return (hist_df, status_msg,
new_responses, new_submission, new_pidx, new_pk,
new_history, new_task_complete,
bar, gr.update(), visual_html, "",
gr.update(visible=False), gr.update(visible=True),
"", "")
else:
# More submissions remain in this task
new_submission = submission + 1
bar = (f"**Task {pidx + 1} / 3 · "
f"Submission {new_submission} / {SUBMISSIONS_PER_TASK}** · "
f"Participant: `{prolific_id}`")
return (hist_df, status_msg,
new_responses, new_submission, pidx, prompt_key,
new_history, False,
bar, gr.update(), visual_html, "",
gr.update(visible=True), gr.update(visible=False),
"", "")
score_outputs = [
history_df, score_status,
st_responses, st_submission, st_pidx, st_current_prompt,
st_history, st_task_complete,
md_status_bar, md_prompt_display, score_visual, redirect_html,
score_btn, continue_btn,
sketchpad, submission_box,
]
score_btn.click(
fn=do_score,
inputs=[submission_box, sketchpad, chatbot,
st_current_prompt, st_prolific,
st_order, st_pidx, st_submission, st_responses,
st_history, st_task_complete],
outputs=score_outputs,
)
submission_box.submit(
fn=do_score,
inputs=[submission_box, sketchpad, chatbot,
st_current_prompt, st_prolific,
st_order, st_pidx, st_submission, st_responses,
st_history, st_task_complete],
outputs=score_outputs,
)
# ── [CONT] Continue button: advance to next task ──
def do_continue(prolific_id, order, pidx, responses):
new_pidx = pidx + 1
empty_hist = pd.DataFrame(
columns=HIST_COLUMNS)
if new_pidx >= len(order):
# Shouldn't happen (button hidden on final task), but handle gracefully
bar = (f"✅ **Study complete!** You submitted {len(responses)} "
f"responses. Redirecting to survey…")
return (1, new_pidx, order[pidx], [], False,
bar, "", empty_hist, "", "",
make_redirect_html(prolific_id),
gr.update(visible=False), gr.update(visible=False),
[], "")
new_pk = order[new_pidx]
bar = (f"**Task {new_pidx + 1} / 3 · "
f"Submission 1 / {SUBMISSIONS_PER_TASK}** · "
f"Participant: `{prolific_id}`")
return (1, new_pidx, new_pk, [], False,
bar, make_prompt_html(PROMPTS[new_pk]),
empty_hist, "", "",
"",
gr.update(visible=True), gr.update(visible=False),
[], "")
continue_btn.click(
fn=do_continue,
inputs=[st_prolific, st_order, st_pidx, st_responses],
outputs=[
st_submission, st_pidx, st_current_prompt, st_history, st_task_complete,
md_status_bar, md_prompt_display,
history_df, score_visual, score_status,
redirect_html,
score_btn, continue_btn,
chatbot, submission_box,
],
)
# Chat controls
send_btn.click(fn=chat_reply, inputs=[chatbot, chat_input, sys_prompt],
outputs=[chatbot, chat_input])
clear_btn.click(fn=chat_clear, inputs=None, outputs=[chatbot])
# ── Admin data download panel ──
# Set ADMIN_PASSWORD in Space Secrets to enable
with gr.Accordion("Admin", open=False):
with gr.Row():
admin_pw = gr.Textbox(
label="Password", type="password",
placeholder="Enter password…", scale=2,
)
admin_btn = gr.Button("Submit", variant="primary", scale=1)
admin_status = gr.Markdown("")
with gr.Row():
csv_download = gr.File(label="📄 File 1", visible=False)
json_download = gr.File(label="📦 File 2", visible=False)
def admin_download(password):
import zipfile, io, tempfile
expected = os.getenv("ADMIN_PASSWORD", "").strip()
if not expected:
return ("⚠️ `ADMIN_PASSWORD` not set in Space Secrets.",
gr.update(visible=False), gr.update(visible=False))
if password.strip() != expected:
return ("❌ Incorrect password.",
gr.update(visible=False), gr.update(visible=False))
outputs = []
# Find CSV — try primary, then backup
csv_path = DATA_DIR / "responses.csv"
if not csv_path.exists():
csv_path = BACKUP_DIR / "responses.csv"
if csv_path.exists():
outputs.append(("csv", csv_path))
# Zip all JSON files from primary or backup
json_dir = DATA_DIR if any(DATA_DIR.glob("*.json")) else BACKUP_DIR
json_files = sorted(json_dir.glob("*.json"))
csv_out = gr.update(visible=False)
json_out = gr.update(visible=False)
if not outputs and not json_files:
return ("⚠️ No response data found yet.",
csv_out, json_out)
if outputs:
csv_out = gr.update(value=str(outputs[0][1]), visible=True)
if json_files:
tmp = tempfile.NamedTemporaryFile(
suffix=".zip", delete=False, dir=str(BACKUP_DIR))
with zipfile.ZipFile(tmp.name, "w", zipfile.ZIP_DEFLATED) as zf:
for jf in json_files:
zf.write(jf, jf.name)
json_out = gr.update(value=tmp.name, visible=True)
n_csv = sum(1 for _ in open(csv_path)) - 1 if csv_path.exists() else 0
return (f"✅ Authenticated. **{n_csv} submissions** in CSV, "
f"**{len(json_files)} participant files** in JSON.",
csv_out, json_out)
admin_btn.click(
fn=admin_download,
inputs=[admin_pw],
outputs=[admin_status, csv_download, json_download],
)
# ============================================================
# Preload baselines (background)
# ============================================================
def _preload():
import time
total_start = time.time()
for pk in ["car", "teambuilding", "routine"]:
for mi, model_name in enumerate(EMB_MODELS):
try:
print(f"\n{'='*60}", flush=True)
print(f"[preload] {pk} | model {mi+1}/3: {model_name.split('/')[-1]}",
flush=True)
t0 = time.time()
get_baseline(pk, model_name)
print(f"[preload] Done in {time.time()-t0:.1f}s", flush=True)
except Exception as e:
print(f"[WARN] Failed to preload {pk}|{model_name}: {e}", flush=True)
print(f"\n[preload] All baselines ready in {time.time()-total_start:.1f}s", flush=True)
threading.Thread(target=_preload, daemon=True).start()
if __name__ == "__main__":
demo.launch(show_error=True, show_api=False,
allowed_paths=[str(DATA_DIR), str(BACKUP_DIR)])