Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import random | |
| import datetime as dt | |
| from typing import Dict, List, Tuple, Optional | |
| import gradio as gr | |
| # ------------------------------ | |
| # Paths (override with env vars) | |
| # ------------------------------ | |
| QA_PATH = os.getenv("QA_PATH", "./spatial_qa_output.json") | |
| VALIDATION_PATH = os.getenv("VALIDATION_PATH", "./validation_reports_output.json") | |
| ASSIGNMENTS_PATH = os.getenv("ASSIGNMENTS_PATH", "/data/assignments.json") | |
| PROGRESS_PATH = os.getenv("PROGRESS_PATH", "/data/progress.json") | |
| USERS_PATH = os.getenv("USERS_PATH", "./users.json") | |
| EXPORT_DIR = os.getenv("EXPORT_DIR", "/data") | |
| # ------------------------------ | |
| # Utilities | |
| # ------------------------------ | |
| def _safe_read_json(path: str, default): | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return default | |
| def _safe_write_json(path: str, obj): | |
| """Safely write JSON file, with fallback to in-memory storage if writing fails.""" | |
| try: | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| tmp = path + ".tmp" | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| json.dump(obj, f, indent=2, ensure_ascii=False) | |
| os.replace(tmp, path) | |
| return True | |
| except (PermissionError, OSError, IOError) as e: | |
| print(f"Warning: Could not write to {path}: {e}") | |
| print("Running in read-only mode - data will not persist between sessions") | |
| return False | |
| # ------------------------------ | |
| # In-memory storage for read-only environments | |
| # ------------------------------ | |
| _in_memory_assignments = None | |
| _in_memory_progress = None | |
| _file_write_enabled = True | |
| def _get_assignments(): | |
| """Get assignments from file or in-memory storage.""" | |
| global _in_memory_assignments | |
| if _in_memory_assignments is not None: | |
| return _in_memory_assignments | |
| return _safe_read_json(ASSIGNMENTS_PATH, {}) | |
| def _set_assignments(assignments): | |
| """Set assignments to file and/or in-memory storage.""" | |
| global _in_memory_assignments, _file_write_enabled | |
| _in_memory_assignments = assignments | |
| if _file_write_enabled: | |
| success = _safe_write_json(ASSIGNMENTS_PATH, assignments) | |
| if not success: | |
| _file_write_enabled = False | |
| def _get_progress(): | |
| """Get progress from file or in-memory storage.""" | |
| global _in_memory_progress | |
| if _in_memory_progress is not None: | |
| return _in_memory_progress | |
| return _safe_read_json(PROGRESS_PATH, {}) | |
| def _set_progress(progress): | |
| """Set progress to file and/or in-memory storage.""" | |
| global _in_memory_progress, _file_write_enabled | |
| _in_memory_progress = progress | |
| if _file_write_enabled: | |
| success = _safe_write_json(PROGRESS_PATH, progress) | |
| if not success: | |
| _file_write_enabled = False | |
| # ------------------------------ | |
| # Load data | |
| # ------------------------------ | |
| def load_data() -> Dict[str, Dict]: | |
| """Return dict keyed by instance_id with: | |
| - findings (str) | |
| - impressions (str) | |
| - qa_pairs (list of {'question','answer'})""" | |
| with open(QA_PATH, "r", encoding="utf-8") as f: | |
| qa_data = json.load(f) | |
| with open(VALIDATION_PATH, "r", encoding="utf-8") as f: | |
| val_data = json.load(f) | |
| data = {} | |
| missing_in_val = [] | |
| for inst_id, payload in qa_data.items(): | |
| if inst_id not in val_data: | |
| missing_in_val.append(inst_id) | |
| continue | |
| find_str = ( | |
| val_data[inst_id].get("Findings_EN") | |
| or val_data[inst_id].get("Findings") | |
| or "" | |
| ) | |
| impr_str = ( | |
| val_data[inst_id].get("Impressions_EN") | |
| or val_data[inst_id].get("Impressions") | |
| or "" | |
| ) | |
| pairs = payload.get("qa_pairs", []) | |
| # normalize | |
| normalized_pairs = [] | |
| for p in pairs: | |
| normalized_pairs.append( | |
| { | |
| "question": str(p.get("question", "")).strip(), | |
| "answer": str(p.get("answer", "")).strip(), | |
| } | |
| ) | |
| data[inst_id] = { | |
| "findings": find_str.strip(), | |
| "impressions": impr_str.strip(), | |
| "qa_pairs": normalized_pairs, | |
| } | |
| if not data: | |
| raise RuntimeError("No overlapping instances between QA and Validation files. " | |
| "Check the JSON files and their keys.") | |
| return data | |
| DATA = load_data() | |
| INSTANCE_IDS = sorted(list(DATA.keys())) | |
| def load_users() -> List[str]: | |
| """Load users from JSON file, fallback to default if file doesn't exist.""" | |
| users_data = _safe_read_json(USERS_PATH, {}) | |
| if "users" in users_data and isinstance(users_data["users"], list): | |
| return [str(user).strip() for user in users_data["users"] if user and str(user).strip()] | |
| # Fallback to default users | |
| return [f"user_{i+1:02d}" for i in range(20)] | |
| def get_default_seed() -> int: | |
| """Load default seed from users JSON file.""" | |
| users_data = _safe_read_json(USERS_PATH, {}) | |
| return users_data.get("default_seed", 42) | |
| DEFAULT_USERS = load_users() | |
| DEFAULT_SEED = get_default_seed() | |
| # ---------------------------------- | |
| # Assignment & Progress persistence | |
| # ---------------------------------- | |
| def init_or_load_assignments(default_users: List[str], seed: int = 42) -> Dict[str, List[str]]: | |
| """Load existing assignments or create a balanced random split.""" | |
| assignments = _get_assignments() | |
| if assignments: | |
| # filter out instances no longer present; preserve order | |
| for u, lst in list(assignments.items()): | |
| assignments[u] = [x for x in lst if x in INSTANCE_IDS] | |
| _set_assignments(assignments) | |
| return assignments | |
| return create_assignments(default_users, seed) | |
| def create_assignments(usernames: List[str], seed: int) -> Dict[str, List[str]]: | |
| usernames = [u.strip() for u in usernames if u and u.strip()] | |
| if len(usernames) == 0: | |
| raise gr.Error("Please provide at least one username.") | |
| # Each user gets exactly 10 instances | |
| instances_per_user = 10 | |
| total_instances_needed = len(usernames) * instances_per_user | |
| if total_instances_needed > len(INSTANCE_IDS): | |
| raise gr.Error(f"Not enough instances available. Need {total_instances_needed} but only have {len(INSTANCE_IDS)}.") | |
| insts = INSTANCE_IDS.copy() | |
| rng = random.Random(int(seed)) | |
| rng.shuffle(insts) | |
| # Take only the number of instances we need | |
| selected_insts = insts[:total_instances_needed] | |
| buckets = [[] for _ in usernames] | |
| for i, inst in enumerate(selected_insts): | |
| buckets[i % len(usernames)].append(inst) | |
| assignments = {usernames[i]: buckets[i] for i in range(len(usernames))} | |
| _set_assignments(assignments) | |
| return assignments | |
| ASSIGNMENTS = init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED) | |
| def available_users() -> List[str]: | |
| return sorted(list(_get_assignments().keys())) | |
| def load_progress() -> Dict: | |
| return _get_progress() | |
| PROGRESS = load_progress() | |
| def _ensure_user_progress_struct(user: str): | |
| """Initialize user progress skeleton for new users/instances.""" | |
| global PROGRESS | |
| if user not in PROGRESS: | |
| PROGRESS[user] = {} | |
| # ensure entries exist for assigned instances | |
| for inst in ASSIGNMENTS.get(user, []): | |
| n = len(DATA[inst]["qa_pairs"]) | |
| if inst not in PROGRESS[user]: | |
| PROGRESS[user][inst] = { | |
| "answers": [None] * n | |
| } | |
| else: | |
| # pad or trim if needed | |
| ans = PROGRESS[user][inst].get("answers", []) | |
| if len(ans) < n: | |
| ans = ans + [None] * (n - len(ans)) | |
| elif len(ans) > n: | |
| ans = ans[:n] | |
| PROGRESS[user][inst]["answers"] = ans | |
| _set_progress(PROGRESS) | |
| def save_eval(user: str, inst: str, q_idx: int, | |
| relevant_choice: Optional[str], correct_choice: Optional[str], note: str = "") -> str: | |
| """Persist evaluation for a single QA pair.""" | |
| if not user or not inst: | |
| return "Select a user to begin." | |
| _ensure_user_progress_struct(user) | |
| global PROGRESS | |
| if inst not in PROGRESS[user]: | |
| PROGRESS[user][inst] = {"answers": [None] * len(DATA[inst]["qa_pairs"])} | |
| # map choices to booleans | |
| rel = None | |
| if relevant_choice == "β Relevant": | |
| rel = True | |
| elif relevant_choice == "β Not relevant": | |
| rel = False | |
| corr = None | |
| if rel is True: | |
| if correct_choice == "β Correct": | |
| corr = True | |
| elif correct_choice == "β Incorrect": | |
| corr = False | |
| # build record | |
| record = { | |
| "relevant": rel, | |
| "correct": corr if rel is True else None, | |
| "note": (note or "").strip(), | |
| "saved_at": dt.datetime.utcnow().isoformat() + "Z" | |
| } | |
| # save | |
| answers = PROGRESS[user][inst]["answers"] | |
| while q_idx >= len(answers): # safety | |
| answers.append(None) | |
| answers[q_idx] = record | |
| _set_progress(PROGRESS) | |
| label = f"Saved: {user} β’ {inst} β’ Q{q_idx+1} β relevant={rel}" | |
| if rel is True: | |
| label += f", correct={corr}" | |
| return label | |
| def summarize_user(user: str) -> Tuple[str, List[List]]: | |
| """Return summary text & table for a user's progress.""" | |
| if not user: | |
| return ("β", []) | |
| _ensure_user_progress_struct(user) | |
| assignments = _get_assignments() | |
| progress = _get_progress() | |
| total = 0 | |
| done = 0 | |
| rows = [] | |
| for inst in assignments.get(user, []): | |
| qa_n = len(DATA[inst]["qa_pairs"]) | |
| total += qa_n | |
| answers = progress[user][inst]["answers"] | |
| c_done = sum(1 for a in answers if a is not None) | |
| done += c_done | |
| rows.append([inst, c_done, qa_n]) | |
| txt = f"Progress: {done} / {total} QA pairs completed across {len(assignments.get(user, []))} assigned instances." | |
| return txt, rows | |
| def next_unfinished(user: str) -> Tuple[Optional[str], Optional[int]]: | |
| """Return (instance_id, q_index) for the next unfinished QA pair for the user.""" | |
| _ensure_user_progress_struct(user) | |
| assignments = _get_assignments() | |
| progress = _get_progress() | |
| for inst in assignments.get(user, []): | |
| answers = progress[user][inst]["answers"] | |
| for i, a in enumerate(answers): | |
| if a is None: | |
| return inst, i | |
| return None, None | |
| def first_unfinished_in_instance(user: str, inst: str) -> int: | |
| _ensure_user_progress_struct(user) | |
| progress = _get_progress() | |
| answers = progress[user][inst]["answers"] | |
| for i, a in enumerate(answers): | |
| if a is None: | |
| return i | |
| return 0 | |
| def get_payload(inst: str, q_idx: int) -> Tuple[str, str, str, str, str]: | |
| """Return Q, A, findings, impressions, header text.""" | |
| pairs = DATA[inst]["qa_pairs"] | |
| n = len(pairs) | |
| if n == 0: | |
| q = "" | |
| a = "" | |
| header = f"{inst} β No questions (0/0)" | |
| f = DATA[inst]["findings"] | |
| im = DATA[inst]["impressions"] | |
| return q, a, f, im, header | |
| q_idx = max(0, min(q_idx, n-1)) | |
| q = pairs[q_idx]["question"] | |
| a = pairs[q_idx]["answer"] | |
| f = DATA[inst]["findings"] | |
| im = DATA[inst]["impressions"] | |
| header = f"{inst} β Question {q_idx+1} / {n}" | |
| return q, a, f, im, header | |
| def export_user_results(user: str) -> str: | |
| """Write a CSV + JSON export for the selected user and return a status string & file paths.""" | |
| if not user: | |
| return "Select a user to export results." | |
| _ensure_user_progress_struct(user) | |
| assignments = _get_assignments() | |
| progress = _get_progress() | |
| # Build flat rows | |
| rows = [] | |
| for inst in assignments.get(user, []): | |
| pairs = DATA[inst]["qa_pairs"] | |
| answers = progress[user][inst]["answers"] | |
| for i in range(len(pairs)): | |
| ans = answers[i] | |
| rows.append({ | |
| "user": user, | |
| "instance_id": inst, | |
| "q_index": i+1, | |
| "question": pairs[i]["question"], | |
| "answer": pairs[i]["answer"], | |
| "relevant": None if ans is None else ans.get("relevant"), | |
| "correct": None if ans is None else ans.get("correct"), | |
| "note": None if ans is None else ans.get("note"), | |
| "saved_at": None if ans is None else ans.get("saved_at"), | |
| }) | |
| # Try to export to files, fallback to in-memory if not possible | |
| try: | |
| ts = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S") | |
| json_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.json") | |
| csv_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.csv") | |
| _safe_write_json(json_path, rows) | |
| # Write CSV | |
| import csv | |
| with open(csv_path, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| return f"Exported {len(rows)} rows.\nJSON: {json_path}\nCSV: {csv_path}" | |
| except (PermissionError, OSError, IOError): | |
| # Fallback: return data as JSON string | |
| import json | |
| json_str = json.dumps(rows, indent=2, ensure_ascii=False) | |
| return f"Export data (read-only mode):\n\n{json_str[:1000]}{'...' if len(json_str) > 1000 else ''}" | |
| # ------------------------------ | |
| # Gradio UI | |
| # ------------------------------ | |
| with gr.Blocks(title="Spatial QA Validator", theme=gr.themes.Glass()) as demo: | |
| # Check if running in read-only mode | |
| read_only_status = "" | |
| if not _file_write_enabled: | |
| read_only_status = "\n\nβ οΈ **Running in read-only mode** - Progress will not persist between sessions" | |
| gr.Markdown("## Spatial QA Validation Tool\n" | |
| "Left: Findings & Impression (Ground Truth)\n\n" | |
| "Right: Spatial QA pairs (Q/A) to validate.\n\n" | |
| "For each Q/A:\n" | |
| "1) Mark if the **Question is relevant** to the Findings/Impression.\n" | |
| "2) If **relevant**, mark whether the **Answer is correct**.\n" + read_only_status) | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=260): | |
| user_dd = gr.Dropdown(choices=available_users(), label="Select user", interactive=True) | |
| load_btn = gr.Button("Load my queue", variant="primary") | |
| progress_text = gr.Markdown("") | |
| progress_table = gr.Dataframe(headers=["Instance", "Done", "Total"], row_count=0, interactive=False) | |
| inst_dd = gr.Dropdown(choices=[], label="Assigned instance", interactive=True, visible=False) | |
| q_slider = gr.Slider(1, 10, value=1, step=1, label="Question #", interactive=True, visible=False) | |
| export_btn = gr.Button("Export my results") | |
| with gr.Column(scale=2, min_width=600): | |
| # Left panel: Findings/Impressions | |
| with gr.Row(): | |
| with gr.Column(scale=1, elem_classes=["left-panel"]): | |
| findings_tb = gr.Textbox(label="Findings (Ground Truth)", lines=16, interactive=False) | |
| impressions_tb = gr.Textbox(label="Impression (Ground Truth)", lines=6, interactive=False) | |
| with gr.Column(scale=1, elem_classes=["left-panel"]): | |
| header_md = gr.Markdown("") | |
| question_md = gr.Markdown("") | |
| answer_md = gr.Markdown("") | |
| relevant_radio = gr.Radio(choices=["β Relevant", "β Not relevant"], | |
| label="1) Is the QUESTION relevant to Findings/Impression?", | |
| interactive=True) | |
| correct_radio = gr.Radio(choices=["β Correct", "β Incorrect"], | |
| label="2) If relevant, is the ANSWER correct?", | |
| interactive=False) | |
| note_tb = gr.Textbox(label="Optional note", lines=2, placeholder="Any comments...") | |
| with gr.Row(): | |
| save_btn = gr.Button("Save", variant="secondary") | |
| save_next_btn = gr.Button("Save & Next", variant="primary") | |
| skip_btn = gr.Button("Skip to next unfinished") | |
| nav_info = gr.Markdown("") | |
| # Move Admin accordion below so user_dd exists before wiring its updates | |
| with gr.Accordion("Setup (admin) β define users and (re)deal assignments", open=False): | |
| with gr.Row(): | |
| users_csv = gr.Textbox(value=",".join(DEFAULT_USERS), | |
| label="Usernames (comma-separated)", | |
| lines=2) | |
| seed_num = gr.Number(value=DEFAULT_SEED, precision=0, | |
| label="Assignment Seed (change & Apply)") | |
| apply_btn = gr.Button("Apply / (Re)create Assignments", variant="secondary") | |
| assign_info = gr.Markdown() | |
| def apply_users(u_csv, seed): | |
| usernames = [x.strip() for x in (u_csv or "").split(",") if x.strip()] | |
| new_assign = create_assignments(usernames, int(seed or 0)) | |
| user_list = ", ".join(sorted(new_assign.keys())) | |
| # summarize counts | |
| counts = {u: len(v) for u, v in new_assign.items()} | |
| table = "\n".join([f"- **{u}**: {counts[u]} instances" for u in sorted(counts)]) | |
| total_assigned = sum(counts.values()) | |
| total_available = len(INSTANCE_IDS) | |
| unassigned = total_available - total_assigned | |
| return gr.update(choices=available_users(), value=None), f"Assignments updated for {len(new_assign)} users (10 instances each):\n{table}\n\n**Summary:** {total_assigned} instances assigned, {unassigned} instances left unassigned out of {total_available} total." | |
| # now that user_dd exists, reference it directly in outputs | |
| apply_btn.click(apply_users, inputs=[users_csv, seed_num], outputs=[user_dd, assign_info]) | |
| # ---- Wiring functions ---- | |
| def _load_user(user: str): | |
| if not user: | |
| raise gr.Error("Please select a user.") | |
| # Refresh assignments if changed by admin | |
| init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED) | |
| _ensure_user_progress_struct(user) | |
| # summary | |
| txt, rows = summarize_user(user) | |
| # initial pointer = next unfinished overall | |
| inst, q_idx = next_unfinished(user) | |
| assignments = _get_assignments() | |
| if inst is None: | |
| # user is done | |
| return ( | |
| gr.update(choices=assignments.get(user, []), visible=True, value=None), | |
| gr.update(visible=False), | |
| "", "", "", "", "", | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", # note | |
| txt, rows, "All assigned QA pairs are completed. π" | |
| ) | |
| # populate content | |
| q, a, f, im, header = get_payload(inst, q_idx) | |
| n = len(DATA[inst]["qa_pairs"]) | |
| slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(q_idx+1 if n > 0 else 1)) | |
| nav = f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} β’ Q{(q_idx+1) if n>0 else 0}/{n}" | |
| return ( | |
| gr.update(choices=assignments.get(user, []), visible=True, value=inst), | |
| slider_update, | |
| f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}", | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", | |
| txt, rows, nav | |
| ) | |
| load_btn.click( | |
| _load_user, | |
| inputs=[user_dd], | |
| outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, | |
| relevant_radio, correct_radio, note_tb, progress_text, progress_table, nav_info] | |
| ) | |
| def _inst_changed(user: str, inst: str): | |
| if not user or not inst: | |
| return gr.update(visible=False), "", "", "", "", "", gr.update(interactive=True), gr.update(interactive=False), "", "" | |
| idx = first_unfinished_in_instance(user, inst) | |
| q, a, f, im, header = get_payload(inst, idx) | |
| n = len(DATA[inst]["qa_pairs"]) | |
| slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1)) | |
| assignments = _get_assignments() | |
| return ( | |
| slider_update, | |
| f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}", | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", | |
| f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} β’ Q{(idx+1) if n>0 else 0}/{n}" | |
| ) | |
| inst_dd.change( | |
| _inst_changed, | |
| inputs=[user_dd, inst_dd], | |
| outputs=[q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, | |
| relevant_radio, correct_radio, note_tb, nav_info] | |
| ) | |
| def _q_changed(inst: str, q_no: int): | |
| if not inst: | |
| return "", "", "", "" | |
| idx = int(q_no) - 1 | |
| q, a, f, im, header = get_payload(inst, idx) | |
| return f"**{header}**", f"**Q:** {q}", f"**A:** {a}", "" | |
| q_slider.change(_q_changed, inputs=[inst_dd, q_slider], | |
| outputs=[header_md, question_md, answer_md, note_tb]) | |
| def _relevant_changed(rel_choice: Optional[str]): | |
| if rel_choice == "β Relevant": | |
| return gr.update(interactive=True) | |
| else: | |
| # reset and disable | |
| return gr.update(value=None, interactive=False) | |
| relevant_radio.change(_relevant_changed, inputs=[relevant_radio], outputs=[correct_radio]) | |
| def _save(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str): | |
| if not (user and inst and q_no): | |
| raise gr.Error("Missing user/instance/question selection.") | |
| idx = int(q_no) - 1 | |
| msg = save_eval(user, inst, idx, rel_choice, corr_choice, note) | |
| txt, rows = summarize_user(user) | |
| return msg, txt, rows | |
| save_btn.click( | |
| _save, | |
| inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb], | |
| outputs=[nav_info, progress_text, progress_table] | |
| ) | |
| def _save_and_next(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str): | |
| if not (user and inst and q_no): | |
| raise gr.Error("Missing user/instance/question selection.") | |
| idx = int(q_no) - 1 | |
| msg = save_eval(user, inst, idx, rel_choice, corr_choice, note) | |
| # jump to next unfinished (global) | |
| nxt_inst, nxt_idx = next_unfinished(user) | |
| assignments = _get_assignments() | |
| if nxt_inst is None: | |
| txt, rows = summarize_user(user) | |
| return ( | |
| gr.Dropdown.update(value=None), | |
| gr.update(visible=False), | |
| "", "", "", | |
| "", "", # q/a | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", | |
| f"{msg}\n\nAll assigned QA pairs are completed. π", | |
| txt, rows | |
| ) | |
| # else load that payload | |
| q, a, f, im, header = get_payload(nxt_inst, nxt_idx) | |
| n = len(DATA[nxt_inst]["qa_pairs"]) | |
| txt, rows = summarize_user(user) | |
| slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(nxt_idx+1 if n > 0 else 1)) | |
| return ( | |
| gr.update(value=nxt_inst), | |
| slider_update, | |
| f, im, f"**{header}**", | |
| f"**Q:** {q}", f"**A:** {a}", | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", | |
| msg, | |
| txt, rows | |
| ) | |
| save_next_btn.click( | |
| _save_and_next, | |
| inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb], | |
| outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, | |
| relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table] | |
| ) | |
| def _skip_to_next(user: str): | |
| if not user: | |
| raise gr.Error("Please select a user.") | |
| inst, idx = next_unfinished(user) | |
| assignments = _get_assignments() | |
| if inst is None: | |
| txt, rows = summarize_user(user) | |
| return ( | |
| gr.update(value=None), | |
| gr.update(visible=False), | |
| "", "", "", | |
| "", "", | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", | |
| "All assigned QA pairs are completed. π", | |
| txt, rows | |
| ) | |
| q, a, f, im, header = get_payload(inst, idx) | |
| n = len(DATA[inst]["qa_pairs"]) | |
| txt, rows = summarize_user(user) | |
| slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1)) | |
| return ( | |
| gr.update(value=inst), | |
| slider_update, | |
| f, im, f"**{header}**", | |
| f"**Q:** {q}", f"**A:** {a}", | |
| gr.update(value=None, interactive=True), | |
| gr.update(value=None, interactive=False), | |
| "", | |
| "Jumped to next unfinished.", | |
| txt, rows | |
| ) | |
| skip_btn.click( | |
| _skip_to_next, | |
| inputs=[user_dd], | |
| outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md, | |
| relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table] | |
| ) | |
| def _export(user: str): | |
| msg = export_user_results(user) | |
| return msg | |
| export_btn.click(_export, inputs=[user_dd], outputs=[nav_info]) | |
| if __name__ == "__main__": | |
| # server_name '0.0.0.0' allows remote access if hosted; keep default port | |
| demo.launch(share=True) |