Spaces:
Running on T4
Running on T4
| # app.py – DivPol Creativity Study (Hugging Face Space) | |
| # --------------------------------------------------------- | |
| # UI: Original two-panel layout (GPT chat left, scoring right) | |
| # Scoring: 3 embedding models × pool distance → z-score → Φ(z) → average × 100 | |
| # - New responses are chunked (~300 chars each) | |
| # - Each chunk is scored against the AI reference pool (pre-parsed chunks) | |
| # - Chunk percentiles are averaged within each model, then averaged across models | |
| # Flow: Prolific ID → 3 tasks (randomised) × 5 submissions → Qualtrics redirect | |
| # --------------------------------------------------------- | |
| # | |
| # CHANGELOG (v4 – Feb 2026) | |
| # -------------------------- | |
| # [INSTR] New welcome + instruction text per participant script | |
| # [CONT] "Continue" button between tasks (manual advance) | |
| # [TERM] "Prompt X/3" → "Task X/3"; "Round/Attempt" → "Submission" | |
| # [LABEL] Score bar: "divergence" → "distinctiveness" | |
| # [CHAR] Submission blocked outside 300–600 chars; live counter in status box | |
| # [TABLE] mpnet/noinstruct/gist columns removed; "DivPol" → "Distinctiveness Score" | |
| # [REDIR] Qualtrics redirect on study completion (placeholder URL) | |
| # --------------------------------------------------------- | |
| import os, json, random, hashlib, threading, csv as _csv, re | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| from datetime import datetime, timezone | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| from scipy.stats import norm | |
| import torch | |
| from transformers import AutoTokenizer, AutoModel | |
| try: | |
| from openai import OpenAI | |
| _HAS_OPENAI = True | |
| except Exception: | |
| _HAS_OPENAI = False | |
| # ============================================================ | |
| # Config | |
| # ============================================================ | |
| QUALTRICS_REDIRECT_URL = "https://georgetown.az1.qualtrics.com/jfe/form/SV_9FaybvTxkZOaVFk" | |
| CHAR_MIN = 300 | |
| CHAR_MAX = 600 | |
| SUBMISSIONS_PER_TASK = 5 | |
| # Column list for the history table | |
| HIST_COLUMNS = ["Submission", "Response Preview", "Distinctiveness Score"] | |
| OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") | |
| EMB_MODELS = [ | |
| "sentence-transformers/all-mpnet-base-v2", | |
| "avsolatorio/NoInstruct-small-Embedding-v0", | |
| "avsolatorio/GIST-Embedding-v0", | |
| ] | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # ============================================================ | |
| # Paths | |
| # ============================================================ | |
| BASE_DIR = Path(__file__).resolve().parent | |
| PERSISTENT_DIR = Path("/data") | |
| if PERSISTENT_DIR.exists(): | |
| CACHE_DIR = PERSISTENT_DIR / "cache" | |
| DATA_DIR = PERSISTENT_DIR / "responses" | |
| os.environ["HF_HOME"] = str(PERSISTENT_DIR / ".huggingface") | |
| else: | |
| CACHE_DIR = BASE_DIR / "cache" | |
| DATA_DIR = BASE_DIR / "data" | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| DATA_DIR.mkdir(exist_ok=True) | |
| # ============================================================ | |
| # Tasks | |
| # ============================================================ | |
| PROMPTS = { | |
| "car": { | |
| "name": "Car Safety Feature", | |
| "text": ( | |
| "Create a new feature for a car that would help keep " | |
| "drivers and pedestrians safe." | |
| ), | |
| "ref_file": "car_xgb_reference_plus_responses.xlsx", | |
| }, | |
| "teambuilding": { | |
| "name": "Team Building Activity", | |
| "text": ( | |
| "What are some ways to do teambuilding on video conferencing, " | |
| "with each person only needing a piece of paper and a rubber band?" | |
| ), | |
| "ref_file": "teambuilding_xgb_reference_plus_responses.xlsx", | |
| }, | |
| "routine": { | |
| "name": "Morning Routine", | |
| "text": ( | |
| "Design a 20-minute morning routine that helps someone who " | |
| "wants to start their day in a better mindset." | |
| ), | |
| "ref_file": "routine_xgb_reference_plus_responses.xlsx", | |
| }, | |
| } | |
| # ============================================================ | |
| # Instruction text [INSTR] | |
| # ============================================================ | |
| WELCOME_TEXT = """\ | |
| <div style="padding: 14px 20px; border: 2px solid #ccc; border-radius: 10px; | |
| margin-bottom: 12px; line-height: 1.65;"> | |
| <div style="font-size: 1.25em; font-weight: bold;"> | |
| 👋 Welcome! To start, please enter your Prolific ID and click the "Start Study" button. | |
| </div> | |
| </div> | |
| """ | |
| INSTRUCTION_TEXT = """\ | |
| <b>On the left panel,</b> you will work with an AI chatbot. You may send messages \ | |
| to the chatbot by typing and clicking the "Send" button, and you will receive \ | |
| interactive responses. You may interact with it as many or as few times as you like \ | |
| (but please use it at least once per task).<br><br>\ | |
| <b>On the right panel,</b> the "Sketchpad" allows you to draft and organize your response \ | |
| (please aim for <b>300–600 characters</b>. Your submission will be blocked otherwise). \ | |
| When you're ready, click <b>"Copy to Submission Box"</b> to move your response to the \ | |
| submission box, then click <b>"Submit and Score."</b><br><br>\ | |
| You will complete <b>five submissions</b> per task, for a total of <b>three tasks</b>. \ | |
| After each submission, you will receive a Distinctiveness Score out of 100:<br><br>\ | |
| • A score closer to <b>0</b> means your response is <b>very similar</b> \ | |
| to a typical AI-generated response.<br>\ | |
| • A score of <b>50</b> indicates your response is <b>moderately similar</b> \ | |
| to a typical AI-generated response.<br>\ | |
| • A score closer to <b>100</b> means your response is <b>very different</b> \ | |
| from a typical AI-generated response.<br><br>\ | |
| Your goal is to refine and develop your ideas so your submissions become increasingly \ | |
| distinct from the AI response. You will be able to see your response history and scores \ | |
| in a table after each submission, for a total of five submissions per task.<br><br>\ | |
| Once you finish one task, click the <b>"Continue to the Next Task"</b> button to move on to the next.\ | |
| """ | |
| def make_prompt_html(prompt_cfg): | |
| return ( | |
| f'<div style="margin: 8px 0;">' | |
| f'<div style="font-size: 1.56em; font-weight: bold;">📝 {prompt_cfg["name"]}</div>' | |
| f'<div style="font-size: 1.3em; margin-top: 8px; padding: 10px 16px; ' | |
| f'border-left: 4px solid #888;">{prompt_cfg["text"]}</div>' | |
| f'<div style="font-size: 1.08em; margin-top: 12px; ' | |
| f'line-height: 1.7; padding: 12px 16px; ' | |
| f'border-radius: 8px;">{INSTRUCTION_TEXT}</div>' | |
| f'</div>' | |
| ) | |
| # ============================================================ | |
| # Text cleaning & chunking (from parser_utils.py) | |
| # ============================================================ | |
| def clean_fun(html_string): | |
| html_string = re.sub(r'&\(\d\)', ' ', html_string) | |
| html_string = re.sub(r'&\d+;', ' ', html_string) | |
| html_string = re.sub(r'\\', '', html_string) | |
| _remove_patterns = [ | |
| b'\xc3\x83\xc6\x92\xc3\x82\xc2\xa2', | |
| b'\xc3\x83\xc6\x92', | |
| b'\xc3\x83\xc2\xa2\xc3\x82\xc2\xac', | |
| b'\xc3\x83\xe2\x80\xa6', | |
| b'\xc3\x83\xe2\x80\x9a\xc3\x82\xc2\xa6', | |
| b'\xc3\x83\xc6\x92\xc3\x82\xe2\x80\x9a', | |
| b'\xc3\x83\xc2\xa2\xc3\x82\xe2\x80\x9a\xc3\x82\xc2\xac', | |
| b'\xc3\x83\xc6\x92\xc3\x82\xc2\xa2\xc3\x83\xc2\xa2\xc3\x82\xe2\x80\x9a\xc3\x82\xc2\xac\xc3\x83\xc2\xa2\xc3\x82\xe2\x80\x9e\xc3\x82\xc2\xa2', | |
| b'\xc3\x83\xe2\x80\xa6\xc3\x82\xe2\x80\x9c', | |
| ] | |
| for pat in _remove_patterns: | |
| html_string = html_string.replace(pat.decode('utf-8', errors='replace'), '') | |
| _apostrophe_patterns = [ | |
| b'\xc3\x83\xc6\x92\xc3\x82\xc2\xa2\xc3\x83\xc2\xa2\xc3\x82\xc2\xac\xc3\x83\xc2\xa2\xc3\x82\xc2\xa2', | |
| b'\xc3\x83\xe2\x80\x9a\xc3\x82\xc2\xb4', | |
| ] | |
| for pat in _apostrophe_patterns: | |
| html_string = html_string.replace(pat.decode('utf-8', errors='replace'), "'") | |
| html_string = html_string.replace('`', "'") | |
| html_string = re.sub(r'\u009d', '', html_string) | |
| html_string = re.sub(r'<.*?>', '', html_string) | |
| return html_string | |
| class CustomChunkTokenizer: | |
| def __init__(self, chunk_size=300, direction='forward', clean_text=True, min_chunks=2): | |
| self.chunk_size = chunk_size | |
| self.direction = direction.lower() | |
| self.clean_text = clean_text | |
| self.min_chunks = min_chunks | |
| def _remove_emojis_and_symbols(self, text): | |
| emoji_pattern = re.compile( | |
| "[" | |
| "\U0001F600-\U0001F64F" | |
| "\U0001F300-\U0001F5FF" | |
| "\U0001F680-\U0001F6FF" | |
| "\U0001F1E0-\U0001F1FF" | |
| "\U00002702-\U000027B0" | |
| "\U000024C2-\U0001F251" | |
| "\U0001F900-\U0001F9FF" | |
| "\U0001FA70-\U0001FAFF" | |
| "]+", flags=re.UNICODE | |
| ) | |
| text = emoji_pattern.sub('', text) | |
| text = re.sub(r'[*#@$%^&+=<>|~`]', '', text) | |
| return text | |
| def _clean_markdown(self, text): | |
| text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) | |
| text = re.sub(r'__(.+?)__', r'\1', text) | |
| text = re.sub(r'\*(.+?)\*', r'\1', text) | |
| text = re.sub(r'_(.+?)_', r'\1', text) | |
| text = re.sub(r'^\s*[\*\-\+]\s+', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) | |
| text = re.sub(r'`(.+?)`', r'\1', text) | |
| text = re.sub(r'[•◦▪▸‣▫▹►‣◁○■□▢▣▤▥▦▧▨▩◘◙◉◎]', '', text) | |
| text = re.sub(r'^[\s\-\*_]{3,}\s*$', '', text, flags=re.MULTILINE) | |
| text = re.sub(r'\s+---+\s+', ' ', text) | |
| return text | |
| def _preprocess_text(self, text): | |
| if not self.clean_text: | |
| return text | |
| text = self._clean_markdown(text) | |
| text = self._remove_emojis_and_symbols(text) | |
| text = re.sub(r'\s+', ' ', text) | |
| text = text.strip() | |
| return text | |
| def tokenize(self, text): | |
| processed_text = self._preprocess_text(text) | |
| chunks = self._create_chunks(text) | |
| if len(chunks) == 0: | |
| return [processed_text] | |
| return chunks if len(chunks) >= self.min_chunks else [processed_text] | |
| def _create_chunks(self, text): | |
| text = self._preprocess_text(text) | |
| words = text.split() | |
| if self.direction == 'backward': | |
| words = words[::-1] | |
| chunks = [] | |
| current_chunk = "" | |
| for word in words: | |
| test_chunk = current_chunk + (" " if current_chunk else "") + word | |
| if len(test_chunk) <= self.chunk_size: | |
| current_chunk = test_chunk | |
| else: | |
| if not current_chunk: | |
| current_chunk = word | |
| else: | |
| chunks.append(current_chunk) | |
| current_chunk = word | |
| if current_chunk: | |
| if len(current_chunk) < 300 and len(chunks) > 0: | |
| chunks[-1] = chunks[-1] + " " + current_chunk | |
| else: | |
| chunks.append(current_chunk) | |
| if self.direction == 'backward': | |
| chunks = [' '.join(chunk.split()[::-1]) for chunk in chunks[::-1]] | |
| return chunks | |
| def clean_punctuation(sentence): | |
| return re.sub(r'[.?!*]', ' ', sentence) | |
| def clean_new_response(essay): | |
| sent_tokenizer = CustomChunkTokenizer( | |
| chunk_size=300, direction='forward', clean_text=True, min_chunks=2 | |
| ) | |
| essay = clean_fun(essay) | |
| sents = sent_tokenizer.tokenize(essay) | |
| sents = [clean_punctuation(s) for s in sents] | |
| return sents | |
| # ============================================================ | |
| # Embedding models (lazy singletons) | |
| # ============================================================ | |
| _models: Dict[str, Any] = {} | |
| _model_lock = threading.Lock() | |
| def _load_model(model_name: str): | |
| if model_name not in _models: | |
| with _model_lock: | |
| if model_name not in _models: | |
| print(f"[model] Loading {model_name} …", flush=True) | |
| tok = AutoTokenizer.from_pretrained(model_name) | |
| mdl = AutoModel.from_pretrained( | |
| model_name, output_hidden_states=True | |
| ).to(DEVICE) | |
| mdl.eval() | |
| _models[model_name] = (tok, mdl) | |
| return _models[model_name] | |
| def embed_texts(texts: List[str], model_name: str, | |
| batch_size: int = 64) -> np.ndarray: | |
| tok, mdl = _load_model(model_name) | |
| parts = [] | |
| total_batches = (len(texts) + batch_size - 1) // batch_size | |
| for batch_idx, i in enumerate(range(0, len(texts), batch_size)): | |
| batch = texts[i: i + batch_size] | |
| if total_batches > 1: | |
| print(f" [embed] batch {batch_idx+1}/{total_batches} " | |
| f"({i+len(batch)}/{len(texts)} texts)", flush=True) | |
| enc = tok(batch, padding="max_length", truncation=True, return_tensors="pt") | |
| enc = {k: v.to(DEVICE) for k, v in enc.items()} | |
| with torch.no_grad(): | |
| out = mdl(**enc) | |
| h = out.hidden_states[-1][:, 0, :] | |
| parts.append(h.cpu().numpy().astype(np.float32)) | |
| return np.vstack(parts) | |
| def embed_single(text: str, model_name: str) -> np.ndarray: | |
| return embed_texts([text], model_name)[0] | |
| # ============================================================ | |
| # Baseline | |
| # ============================================================ | |
| _baselines: Dict[str, Dict[str, Any]] = {} | |
| def _baseline_key(prompt_key: str, model_name: str) -> str: | |
| return f"{prompt_key}__{model_name.replace('/', '__')}" | |
| def _compute_baseline(prompt_key: str, model_name: str) -> Dict[str, Any]: | |
| bkey = _baseline_key(prompt_key, model_name) | |
| npz = CACHE_DIR / f"{bkey}_pool_embs.npz" | |
| jsn = CACHE_DIR / f"{bkey}_zparams.json" | |
| if npz.exists() and jsn.exists(): | |
| data = np.load(npz) | |
| stats = json.loads(jsn.read_text()) | |
| print(f"[{prompt_key}|{model_name}] Loaded cached baseline " | |
| f"(N={stats['n_pool']}, M={stats['z_mean']:.6f}, SD={stats['z_sd']:.6f})") | |
| return {"pool_embs": data["pool_embs"], "z_mean": stats["z_mean"], | |
| "z_sd": stats["z_sd"], "n_pool": stats["n_pool"]} | |
| ref_path = BASE_DIR / PROMPTS[prompt_key]["ref_file"] | |
| df = pd.read_excel(ref_path) | |
| ai_df = df[~df['respondent'].str.contains('human')] | |
| pool_sentences = ai_df['sentence'].astype(str).tolist() | |
| print(f"[{prompt_key}|{model_name}] Embedding {len(pool_sentences)} AI pool chunks …", | |
| flush=True) | |
| pool_embs = embed_texts(pool_sentences, model_name) | |
| print(f"[{prompt_key}|{model_name}] Computing pairwise distances …", flush=True) | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| sims = cosine_similarity(pool_embs) | |
| dists = 1.0 - sims | |
| lower = np.tril(dists, k=-1) | |
| vals = lower[lower != 0] | |
| z_mean = float(np.mean(vals)) | |
| z_sd = float(np.std(vals)) | |
| np.savez_compressed(str(npz), pool_embs=pool_embs.astype(np.float32)) | |
| jsn.write_text(json.dumps({"z_mean": z_mean, "z_sd": z_sd, | |
| "n_pool": len(pool_sentences)})) | |
| print(f"[{prompt_key}|{model_name}] Baseline: N={len(pool_sentences)}, " | |
| f"M={z_mean:.6f}, SD={z_sd:.6f}") | |
| return {"pool_embs": pool_embs.astype(np.float32), "z_mean": z_mean, | |
| "z_sd": z_sd, "n_pool": len(pool_sentences)} | |
| def get_baseline(prompt_key: str, model_name: str) -> Dict[str, Any]: | |
| bkey = _baseline_key(prompt_key, model_name) | |
| if bkey not in _baselines: | |
| _baselines[bkey] = _compute_baseline(prompt_key, model_name) | |
| return _baselines[bkey] | |
| # ============================================================ | |
| # Scoring | |
| # ============================================================ | |
| def _score_chunk_one_model(chunk_embedding, prompt_key, model_name): | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| bl = get_baseline(prompt_key, model_name) | |
| sims = cosine_similarity(chunk_embedding.reshape(1, -1), bl["pool_embs"])[0] | |
| dists = 1.0 - sims | |
| mean_dist = float(np.mean(dists)) | |
| z = (mean_dist - bl["z_mean"]) / bl["z_sd"] if bl["z_sd"] > 0 else 0.0 | |
| return float(norm.cdf(z)) * 100 | |
| def score_text(text: str, prompt_key: str) -> Dict[str, float]: | |
| text = (text or "").strip() | |
| if not text: | |
| return {"mpnet": 0.0, "noinstruct": 0.0, "gist": 0.0, "final": 0.0} | |
| chunks = clean_new_response(text) | |
| result = {} | |
| model_averages = [] | |
| for model_name, short in zip(EMB_MODELS, ["mpnet", "noinstruct", "gist"]): | |
| scores = [_score_chunk_one_model(embed_single(c, model_name), prompt_key, model_name) | |
| for c in chunks] | |
| avg = float(np.mean(scores)) | |
| result[short] = round(avg, 1) | |
| model_averages.append(avg) | |
| result["final"] = round(float(np.mean(model_averages)), 1) | |
| return result | |
| # ============================================================ | |
| # Data persistence (dual-write: primary + backup) | |
| # ============================================================ | |
| _csv_lock = threading.Lock() | |
| # Backup always writes to app directory as second copy | |
| BACKUP_DIR = BASE_DIR / "data_backup" | |
| BACKUP_DIR.mkdir(exist_ok=True) | |
| def _write_csv(csv_path, row): | |
| """Append a row to a CSV file, creating header if needed.""" | |
| write_header = not csv_path.exists() | |
| with open(csv_path, "a", newline="", encoding="utf-8") as f: | |
| w = _csv.DictWriter(f, fieldnames=list(row.keys())) | |
| if write_header: | |
| w.writeheader() | |
| w.writerow(row) | |
| def _write_json(json_path, row, chat_history, task_order): | |
| """Append a response to a per-participant JSON file.""" | |
| data = json.loads(json_path.read_text()) if json_path.exists() else { | |
| "prolific_id": row["prolific_id"], "started": row["timestamp"], | |
| "task_order": task_order, "responses": [] | |
| } | |
| json_row = dict(row) | |
| json_row["chat_history"] = chat_history # native list, not JSON string | |
| data["responses"].append(json_row) | |
| json_path.write_text(json.dumps(data, indent=2)) | |
| def save_response(prolific_id, prompt_key, submission_num, task_index, | |
| response_text, scores, task_order, | |
| sketchpad_text="", chat_history=None): | |
| chat_history = chat_history or [] | |
| chat_json = json.dumps(chat_history, ensure_ascii=False) | |
| row = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "prolific_id": prolific_id, | |
| "task_key": prompt_key, | |
| "task_index": task_index + 1, | |
| "submission": submission_num, | |
| "sketchpad_text": sketchpad_text, | |
| "response_text": response_text, | |
| "mpnet": scores["mpnet"], | |
| "noinstruct": scores["noinstruct"], | |
| "gist": scores["gist"], | |
| "divpol_score": scores["final"], | |
| "task_order": json.dumps(task_order), | |
| "chat_history": chat_json, | |
| } | |
| with _csv_lock: | |
| # Primary write (persistent storage) | |
| try: | |
| _write_csv(DATA_DIR / "responses.csv", row) | |
| _write_json(DATA_DIR / f"{prolific_id}.json", | |
| row, chat_history, task_order) | |
| except Exception as e: | |
| print(f"[WARN] Primary save failed: {e}", flush=True) | |
| # Backup write (app directory — separate copy) | |
| try: | |
| _write_csv(BACKUP_DIR / "responses.csv", row) | |
| _write_json(BACKUP_DIR / f"{prolific_id}.json", | |
| row, chat_history, task_order) | |
| except Exception as e: | |
| print(f"[WARN] Backup save failed: {e}", flush=True) | |
| # ============================================================ | |
| # OpenAI chat | |
| # ============================================================ | |
| def chat_reply(history, user_msg, system_prompt): | |
| history = history or [] | |
| user_msg = (user_msg or "").strip() | |
| if not user_msg: | |
| return history, "" | |
| if not _HAS_OPENAI: | |
| history.append({"role": "user", "content": user_msg}) | |
| history.append({"role": "assistant", | |
| "content": f"(OpenAI not installed) {user_msg}"}) | |
| return history, "" | |
| api_key = os.getenv("OPENAI_API_KEY", "").strip() | |
| if not api_key: | |
| history.append({"role": "user", "content": user_msg}) | |
| history.append({"role": "assistant", | |
| "content": "OPENAI_API_KEY is missing in Space Secrets."}) | |
| return history, "" | |
| client = OpenAI(api_key=api_key) | |
| messages = [] | |
| sys_p = (system_prompt or "").strip() | |
| if sys_p: | |
| messages.append({"role": "system", "content": sys_p}) | |
| messages.extend(history) | |
| messages.append({"role": "user", "content": user_msg}) | |
| try: | |
| resp = client.responses.create(model=OPENAI_MODEL, input=messages, temperature=0.7) | |
| answer = (resp.output_text or "").strip() | |
| except Exception: | |
| resp = client.chat.completions.create(model=OPENAI_MODEL, messages=messages, | |
| temperature=0.7) | |
| answer = resp.choices[0].message.content.strip() | |
| return history + [{"role": "user", "content": user_msg}, | |
| {"role": "assistant", "content": answer}], "" | |
| def chat_clear(): | |
| return [] | |
| # ============================================================ | |
| # Score visual | |
| # ============================================================ | |
| def make_score_visual(score): | |
| pct = max(0, min(100, score)) | |
| if pct < 25: | |
| color, label = "#e74c3c", "Low distinctiveness" | |
| elif pct < 45: | |
| color, label = "#e67e22", "Below average distinctiveness" | |
| elif pct < 55: | |
| color, label = "#f1c40f", "Average distinctiveness" | |
| elif pct < 75: | |
| color, label = "#2ecc71", "Above average distinctiveness" | |
| else: | |
| color, label = "#27ae60", "High distinctiveness" | |
| return f""" | |
| <div style="margin: 12px 0;"> | |
| <div style="display: flex; justify-content: space-between; | |
| font-size: 13px; color: #aaa; margin-bottom: 2px;"> | |
| <span>0 – Very similar to AI</span> | |
| <span>100 – Very different from AI</span> | |
| </div> | |
| <div style="position: relative; width: 100%; height: 28px; | |
| background: linear-gradient(to right, #e74c3c, #e67e22, #f1c40f, #2ecc71, #27ae60); | |
| border-radius: 6px; overflow: visible;"> | |
| <div style="position: absolute; left: {pct}%; | |
| top: -2px; transform: translateX(-50%); | |
| width: 4px; height: 32px; | |
| background: white; border-radius: 2px; | |
| box-shadow: 0 0 4px rgba(0,0,0,0.5);"></div> | |
| </div> | |
| <div style="text-align: center; margin-top: 6px;"> | |
| <span style="font-size: 22px; font-weight: bold; color: {color};">{pct:.1f}</span> | |
| <span style="font-size: 14px; color: #ccc; margin-left: 8px;">{label}</span> | |
| </div> | |
| </div> | |
| """ | |
| # ============================================================ | |
| # Qualtrics redirect HTML | |
| # ============================================================ | |
| def make_redirect_html(prolific_id): | |
| url = f"{QUALTRICS_REDIRECT_URL}?PROLIFIC_PID={prolific_id}" | |
| return ( | |
| f'<div style="text-align:center; margin-top: 24px;">' | |
| f'<p style="font-size:1.1em; color:#ccc;">Study complete — thank you!</p>' | |
| f'<p style="font-size:0.95em; color:#aaa;">You will be redirected to the survey ' | |
| f'shortly. If not, ' | |
| f'<a href="{url}" target="_blank" style="color:#4ea6dc;">click here</a>.</p>' | |
| f'<script>setTimeout(function(){{window.location.href="{url}";}}, 3000);</script>' | |
| f'</div>' | |
| ) | |
| # ============================================================ | |
| # Character count helper | |
| # ============================================================ | |
| def char_count_status(text): | |
| n = len((text or "").strip()) | |
| if n == 0: | |
| return "Character count: 0 / 600 (minimum 300)" | |
| elif n < CHAR_MIN: | |
| return f"⚠️ Too short: {n} / {CHAR_MAX} characters (minimum {CHAR_MIN})" | |
| elif n > CHAR_MAX: | |
| return f"⚠️ Too long: {n} / {CHAR_MAX} characters (maximum {CHAR_MAX})" | |
| else: | |
| return f"✅ {n} / {CHAR_MAX} characters (within 300–600 limit)" | |
| # ============================================================ | |
| # UI | |
| # ============================================================ | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(), | |
| analytics_enabled=False, | |
| css=""" | |
| a[href*="huggingface.co/spaces"] { display: none !important; } | |
| footer { display: none !important; } | |
| .built-with { display: none !important; } | |
| """ | |
| ) as demo: | |
| # ── State ── | |
| st_prolific = gr.State("") | |
| st_order = gr.State([]) | |
| st_pidx = gr.State(0) | |
| st_submission = gr.State(1) | |
| st_responses = gr.State([]) | |
| st_current_prompt = gr.State("car") | |
| st_history = gr.State([]) | |
| st_task_complete = gr.State(False) # [CONT] tracks whether current task is done | |
| sys_prompt = gr.State("You are a helpful assistant.") | |
| # ── Welcome message ── [INSTR] | |
| welcome_html = gr.HTML(value=WELCOME_TEXT) | |
| # ── Top bar: Prolific ID + Start ── | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| tb_prolific = gr.Textbox( | |
| label="Prolific ID", | |
| placeholder="Enter your Prolific ID to begin…", | |
| max_lines=1, interactive=True, | |
| ) | |
| with gr.Column(scale=1): | |
| btn_start = gr.Button("Start Study", variant="primary") | |
| md_status_bar = gr.Markdown("") | |
| md_prompt_display = gr.HTML(value="") | |
| with gr.Row(): | |
| # LEFT: Chat | |
| with gr.Column(scale=1): | |
| gr.Markdown("## Chat with AI") | |
| chatbot = gr.Chatbot(label="Chat", type="messages", height=520) | |
| chat_input = gr.Textbox( | |
| label="Message", placeholder="Talk to the bot…", lines=2) | |
| with gr.Row(): | |
| send_btn = gr.Button("Send", variant="primary") | |
| clear_btn = gr.Button("Clear") | |
| # RIGHT: Response + scoring | |
| with gr.Column(scale=1): | |
| gr.Markdown("## Your Response") | |
| sketchpad = gr.Textbox( | |
| label="📝 Sketchpad (draft your response here)", | |
| lines=8, placeholder="Draft your ideas here…", | |
| ) | |
| copy_btn = gr.Button("⬇ Copy to Submission Box", size="sm") | |
| submission_box = gr.Textbox( | |
| label="📨 Final Submission", | |
| lines=5, | |
| placeholder="Your final response goes here (300–600 characters).", | |
| ) | |
| score_btn = gr.Button("Submit and Score", variant="primary") | |
| # [CONT] Continue button — hidden until a task's 5 submissions are done | |
| continue_btn = gr.Button( | |
| "➡️ Continue to Next Task", variant="primary", visible=False | |
| ) | |
| score_status = gr.Textbox( | |
| label="Status", | |
| value="Character count: 0 / 600 (minimum 300)", | |
| interactive=False, | |
| ) | |
| score_visual = gr.HTML(value="") | |
| redirect_html = gr.HTML(value="") | |
| history_df = gr.Dataframe( | |
| label="Submission History", | |
| headers=HIST_COLUMNS, | |
| datatype=["number", "str", "number"], | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| # ---------------------------- | |
| # CALLBACKS | |
| # ---------------------------- | |
| def copy_to_submission(sketch_text): | |
| return sketch_text or "" | |
| copy_btn.click(fn=copy_to_submission, inputs=[sketchpad], outputs=[submission_box]) | |
| submission_box.change( | |
| fn=char_count_status, | |
| inputs=[submission_box], | |
| outputs=[score_status], | |
| ) | |
| def start_study(prolific_id): | |
| pid = (prolific_id or "").strip() | |
| empty_hist = pd.DataFrame( | |
| columns=HIST_COLUMNS) | |
| if len(pid) < 3: | |
| return (pid, [], 0, 1, [], "car", [], False, | |
| "⚠️ **Please enter a valid Prolific ID (at least 3 characters).**", | |
| "", empty_hist, "", "", | |
| gr.update(visible=True), # score_btn visible | |
| gr.update(visible=False), # continue_btn hidden | |
| "", # welcome hidden after start | |
| gr.update(), # btn_start stays enabled | |
| gr.update()) # tb_prolific stays editable | |
| rng = random.Random(hashlib.md5(pid.encode()).hexdigest()) | |
| order = list(PROMPTS.keys()) | |
| rng.shuffle(order) | |
| pk = order[0] | |
| status = (f"**Task 1 / 3 · Submission 1 / {SUBMISSIONS_PER_TASK}** · " | |
| f"Participant: `{pid}`") | |
| return (pid, order, 0, 1, [], pk, [], False, | |
| status, make_prompt_html(PROMPTS[pk]), empty_hist, "", "", | |
| gr.update(visible=True), # score_btn | |
| gr.update(visible=False), # continue_btn | |
| "", # welcome hidden | |
| gr.update(interactive=False, variant="secondary"), # disable start btn | |
| gr.update(interactive=False)) # lock prolific ID | |
| btn_start.click( | |
| fn=start_study, | |
| inputs=[tb_prolific], | |
| outputs=[ | |
| st_prolific, st_order, st_pidx, st_submission, st_responses, | |
| st_current_prompt, st_history, st_task_complete, | |
| md_status_bar, md_prompt_display, history_df, | |
| score_visual, redirect_html, | |
| score_btn, continue_btn, | |
| welcome_html, | |
| btn_start, tb_prolific, | |
| ], | |
| ) | |
| def do_score(text, sketchpad_text, chat_history, | |
| prompt_key, prolific_id, order, pidx, submission, | |
| responses, history, task_complete): | |
| text = (text or "").strip() | |
| sketchpad_text = (sketchpad_text or "").strip() | |
| empty_hist = pd.DataFrame( | |
| columns=HIST_COLUMNS) | |
| if not prolific_id: | |
| cur_hist = pd.DataFrame(history) if history else empty_hist | |
| return (cur_hist, "⚠️ Enter Prolific ID and click Start Study first.", | |
| responses, submission, pidx, prompt_key, history, False, | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(visible=True), gr.update(visible=False), | |
| gr.update(), gr.update()) | |
| # Block if task already complete (waiting for Continue click) | |
| if task_complete: | |
| cur_hist = pd.DataFrame(history) if history else empty_hist | |
| return (cur_hist, | |
| "✅ Task complete! Click **Continue to Next Task** to proceed.", | |
| responses, submission, pidx, prompt_key, history, True, | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(visible=False), gr.update(visible=True), | |
| gr.update(), gr.update()) | |
| if not text: | |
| cur_hist = pd.DataFrame(history) if history else empty_hist | |
| return (cur_hist, char_count_status(text), | |
| responses, submission, pidx, prompt_key, history, False, | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(visible=True), gr.update(visible=False), | |
| gr.update(), gr.update()) | |
| # Enforce character limits | |
| n = len(text) | |
| if n < CHAR_MIN: | |
| cur_hist = pd.DataFrame(history) if history else empty_hist | |
| return (cur_hist, | |
| f"⚠️ Too short: {n} characters. Please write at least {CHAR_MIN}.", | |
| responses, submission, pidx, prompt_key, history, False, | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(visible=True), gr.update(visible=False), | |
| gr.update(), gr.update()) | |
| if n > CHAR_MAX: | |
| cur_hist = pd.DataFrame(history) if history else empty_hist | |
| return (cur_hist, | |
| f"⚠️ Too long: {n} characters. Please keep to {CHAR_MAX} or fewer.", | |
| responses, submission, pidx, prompt_key, history, False, | |
| gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(visible=True), gr.update(visible=False), | |
| gr.update(), gr.update()) | |
| # Score | |
| scores = score_text(text, prompt_key) | |
| sc = scores["final"] | |
| save_response(prolific_id, prompt_key, submission, pidx, text, scores, order, | |
| sketchpad_text=sketchpad_text, chat_history=chat_history) | |
| # History row | |
| preview = text[:80] + "…" if len(text) > 80 else text | |
| row = {"Submission": submission, "Response Preview": preview, | |
| "Distinctiveness Score": sc} | |
| new_history = history + [row] | |
| new_responses = responses + [{"task_key": prompt_key, "submission": submission, | |
| "response_text": text, "score": sc}] | |
| visual_html = make_score_visual(sc) | |
| hist_df = pd.DataFrame(new_history) if new_history else empty_hist | |
| status_msg = f"✅ Scored {n} chars → Distinctiveness Score = {sc:.1f} / 100" | |
| # Check if this was the last submission for this task | |
| if submission >= SUBMISSIONS_PER_TASK: | |
| # Task is done — show Continue button, hide Submit button | |
| new_task_complete = True | |
| new_pidx = pidx | |
| new_pk = prompt_key | |
| new_submission = submission # keep at 5 | |
| # Check if this was the LAST task entirely | |
| if pidx + 1 >= len(order): | |
| bar = (f"✅ **Study complete!** You submitted {len(new_responses)} " | |
| f"responses. Redirecting to survey…") | |
| return (hist_df, "Study complete — redirecting to survey.", | |
| new_responses, new_submission, new_pidx, new_pk, | |
| new_history, False, | |
| bar, gr.update(), visual_html, make_redirect_html(prolific_id), | |
| gr.update(visible=False), gr.update(visible=False), | |
| "", "") | |
| bar = (f"**Task {pidx + 1} / 3 · " | |
| f"Submission {submission} / {SUBMISSIONS_PER_TASK}** · " | |
| f"Participant: `{prolific_id}` — " | |
| f"✅ **Task complete!** Click Continue when ready.") | |
| return (hist_df, status_msg, | |
| new_responses, new_submission, new_pidx, new_pk, | |
| new_history, new_task_complete, | |
| bar, gr.update(), visual_html, "", | |
| gr.update(visible=False), gr.update(visible=True), | |
| "", "") | |
| else: | |
| # More submissions remain in this task | |
| new_submission = submission + 1 | |
| bar = (f"**Task {pidx + 1} / 3 · " | |
| f"Submission {new_submission} / {SUBMISSIONS_PER_TASK}** · " | |
| f"Participant: `{prolific_id}`") | |
| return (hist_df, status_msg, | |
| new_responses, new_submission, pidx, prompt_key, | |
| new_history, False, | |
| bar, gr.update(), visual_html, "", | |
| gr.update(visible=True), gr.update(visible=False), | |
| "", "") | |
| score_outputs = [ | |
| history_df, score_status, | |
| st_responses, st_submission, st_pidx, st_current_prompt, | |
| st_history, st_task_complete, | |
| md_status_bar, md_prompt_display, score_visual, redirect_html, | |
| score_btn, continue_btn, | |
| sketchpad, submission_box, | |
| ] | |
| score_btn.click( | |
| fn=do_score, | |
| inputs=[submission_box, sketchpad, chatbot, | |
| st_current_prompt, st_prolific, | |
| st_order, st_pidx, st_submission, st_responses, | |
| st_history, st_task_complete], | |
| outputs=score_outputs, | |
| ) | |
| submission_box.submit( | |
| fn=do_score, | |
| inputs=[submission_box, sketchpad, chatbot, | |
| st_current_prompt, st_prolific, | |
| st_order, st_pidx, st_submission, st_responses, | |
| st_history, st_task_complete], | |
| outputs=score_outputs, | |
| ) | |
| # ── [CONT] Continue button: advance to next task ── | |
| def do_continue(prolific_id, order, pidx, responses): | |
| new_pidx = pidx + 1 | |
| empty_hist = pd.DataFrame( | |
| columns=HIST_COLUMNS) | |
| if new_pidx >= len(order): | |
| # Shouldn't happen (button hidden on final task), but handle gracefully | |
| bar = (f"✅ **Study complete!** You submitted {len(responses)} " | |
| f"responses. Redirecting to survey…") | |
| return (1, new_pidx, order[pidx], [], False, | |
| bar, "", empty_hist, "", "", | |
| make_redirect_html(prolific_id), | |
| gr.update(visible=False), gr.update(visible=False), | |
| [], "") | |
| new_pk = order[new_pidx] | |
| bar = (f"**Task {new_pidx + 1} / 3 · " | |
| f"Submission 1 / {SUBMISSIONS_PER_TASK}** · " | |
| f"Participant: `{prolific_id}`") | |
| return (1, new_pidx, new_pk, [], False, | |
| bar, make_prompt_html(PROMPTS[new_pk]), | |
| empty_hist, "", "", | |
| "", | |
| gr.update(visible=True), gr.update(visible=False), | |
| [], "") | |
| continue_btn.click( | |
| fn=do_continue, | |
| inputs=[st_prolific, st_order, st_pidx, st_responses], | |
| outputs=[ | |
| st_submission, st_pidx, st_current_prompt, st_history, st_task_complete, | |
| md_status_bar, md_prompt_display, | |
| history_df, score_visual, score_status, | |
| redirect_html, | |
| score_btn, continue_btn, | |
| chatbot, submission_box, | |
| ], | |
| ) | |
| # Chat controls | |
| send_btn.click(fn=chat_reply, inputs=[chatbot, chat_input, sys_prompt], | |
| outputs=[chatbot, chat_input]) | |
| clear_btn.click(fn=chat_clear, inputs=None, outputs=[chatbot]) | |
| # ── Admin data download panel ── | |
| # Set ADMIN_PASSWORD in Space Secrets to enable | |
| with gr.Accordion("Admin", open=False): | |
| with gr.Row(): | |
| admin_pw = gr.Textbox( | |
| label="Password", type="password", | |
| placeholder="Enter password…", scale=2, | |
| ) | |
| admin_btn = gr.Button("Submit", variant="primary", scale=1) | |
| admin_status = gr.Markdown("") | |
| with gr.Row(): | |
| csv_download = gr.File(label="📄 File 1", visible=False) | |
| json_download = gr.File(label="📦 File 2", visible=False) | |
| def admin_download(password): | |
| import zipfile, io, tempfile | |
| expected = os.getenv("ADMIN_PASSWORD", "").strip() | |
| if not expected: | |
| return ("⚠️ `ADMIN_PASSWORD` not set in Space Secrets.", | |
| gr.update(visible=False), gr.update(visible=False)) | |
| if password.strip() != expected: | |
| return ("❌ Incorrect password.", | |
| gr.update(visible=False), gr.update(visible=False)) | |
| outputs = [] | |
| # Find CSV — try primary, then backup | |
| csv_path = DATA_DIR / "responses.csv" | |
| if not csv_path.exists(): | |
| csv_path = BACKUP_DIR / "responses.csv" | |
| if csv_path.exists(): | |
| outputs.append(("csv", csv_path)) | |
| # Zip all JSON files from primary or backup | |
| json_dir = DATA_DIR if any(DATA_DIR.glob("*.json")) else BACKUP_DIR | |
| json_files = sorted(json_dir.glob("*.json")) | |
| csv_out = gr.update(visible=False) | |
| json_out = gr.update(visible=False) | |
| if not outputs and not json_files: | |
| return ("⚠️ No response data found yet.", | |
| csv_out, json_out) | |
| if outputs: | |
| csv_out = gr.update(value=str(outputs[0][1]), visible=True) | |
| if json_files: | |
| tmp = tempfile.NamedTemporaryFile( | |
| suffix=".zip", delete=False, dir=str(BACKUP_DIR)) | |
| with zipfile.ZipFile(tmp.name, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for jf in json_files: | |
| zf.write(jf, jf.name) | |
| json_out = gr.update(value=tmp.name, visible=True) | |
| n_csv = sum(1 for _ in open(csv_path)) - 1 if csv_path.exists() else 0 | |
| return (f"✅ Authenticated. **{n_csv} submissions** in CSV, " | |
| f"**{len(json_files)} participant files** in JSON.", | |
| csv_out, json_out) | |
| admin_btn.click( | |
| fn=admin_download, | |
| inputs=[admin_pw], | |
| outputs=[admin_status, csv_download, json_download], | |
| ) | |
| # ============================================================ | |
| # Preload baselines (background) | |
| # ============================================================ | |
| def _preload(): | |
| import time | |
| total_start = time.time() | |
| for pk in ["car", "teambuilding", "routine"]: | |
| for mi, model_name in enumerate(EMB_MODELS): | |
| try: | |
| print(f"\n{'='*60}", flush=True) | |
| print(f"[preload] {pk} | model {mi+1}/3: {model_name.split('/')[-1]}", | |
| flush=True) | |
| t0 = time.time() | |
| get_baseline(pk, model_name) | |
| print(f"[preload] Done in {time.time()-t0:.1f}s", flush=True) | |
| except Exception as e: | |
| print(f"[WARN] Failed to preload {pk}|{model_name}: {e}", flush=True) | |
| print(f"\n[preload] All baselines ready in {time.time()-total_start:.1f}s", flush=True) | |
| threading.Thread(target=_preload, daemon=True).start() | |
| if __name__ == "__main__": | |
| demo.launch(show_error=True, show_api=False, | |
| allowed_paths=[str(DATA_DIR), str(BACKUP_DIR)]) |