Mashrafi2827's picture
Create app.py
05dcf61 verified
raw
history blame
26.3 kB
import os
import json
import random
import datetime as dt
from typing import Dict, List, Tuple, Optional
import gradio as gr
# ------------------------------
# Paths (override with env vars)
# ------------------------------
QA_PATH = os.getenv("QA_PATH", "./spatial_qa_output.json")
VALIDATION_PATH = os.getenv("VALIDATION_PATH", "./validation_reports_output.json")
ASSIGNMENTS_PATH = os.getenv("ASSIGNMENTS_PATH", "/data/assignments.json")
PROGRESS_PATH = os.getenv("PROGRESS_PATH", "/data/progress.json")
USERS_PATH = os.getenv("USERS_PATH", "./users.json")
EXPORT_DIR = os.getenv("EXPORT_DIR", "/data")
# ------------------------------
# Utilities
# ------------------------------
def _safe_read_json(path: str, default):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
def _safe_write_json(path: str, obj):
"""Safely write JSON file, with fallback to in-memory storage if writing fails."""
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
os.replace(tmp, path)
return True
except (PermissionError, OSError, IOError) as e:
print(f"Warning: Could not write to {path}: {e}")
print("Running in read-only mode - data will not persist between sessions")
return False
# ------------------------------
# In-memory storage for read-only environments
# ------------------------------
_in_memory_assignments = None
_in_memory_progress = None
_file_write_enabled = True
def _get_assignments():
"""Get assignments from file or in-memory storage."""
global _in_memory_assignments
if _in_memory_assignments is not None:
return _in_memory_assignments
return _safe_read_json(ASSIGNMENTS_PATH, {})
def _set_assignments(assignments):
"""Set assignments to file and/or in-memory storage."""
global _in_memory_assignments, _file_write_enabled
_in_memory_assignments = assignments
if _file_write_enabled:
success = _safe_write_json(ASSIGNMENTS_PATH, assignments)
if not success:
_file_write_enabled = False
def _get_progress():
"""Get progress from file or in-memory storage."""
global _in_memory_progress
if _in_memory_progress is not None:
return _in_memory_progress
return _safe_read_json(PROGRESS_PATH, {})
def _set_progress(progress):
"""Set progress to file and/or in-memory storage."""
global _in_memory_progress, _file_write_enabled
_in_memory_progress = progress
if _file_write_enabled:
success = _safe_write_json(PROGRESS_PATH, progress)
if not success:
_file_write_enabled = False
# ------------------------------
# Load data
# ------------------------------
def load_data() -> Dict[str, Dict]:
"""Return dict keyed by instance_id with:
- findings (str)
- impressions (str)
- qa_pairs (list of {'question','answer'})"""
with open(QA_PATH, "r", encoding="utf-8") as f:
qa_data = json.load(f)
with open(VALIDATION_PATH, "r", encoding="utf-8") as f:
val_data = json.load(f)
data = {}
missing_in_val = []
for inst_id, payload in qa_data.items():
if inst_id not in val_data:
missing_in_val.append(inst_id)
continue
find_str = (
val_data[inst_id].get("Findings_EN")
or val_data[inst_id].get("Findings")
or ""
)
impr_str = (
val_data[inst_id].get("Impressions_EN")
or val_data[inst_id].get("Impressions")
or ""
)
pairs = payload.get("qa_pairs", [])
# normalize
normalized_pairs = []
for p in pairs:
normalized_pairs.append(
{
"question": str(p.get("question", "")).strip(),
"answer": str(p.get("answer", "")).strip(),
}
)
data[inst_id] = {
"findings": find_str.strip(),
"impressions": impr_str.strip(),
"qa_pairs": normalized_pairs,
}
if not data:
raise RuntimeError("No overlapping instances between QA and Validation files. "
"Check the JSON files and their keys.")
return data
DATA = load_data()
INSTANCE_IDS = sorted(list(DATA.keys()))
def load_users() -> List[str]:
"""Load users from JSON file, fallback to default if file doesn't exist."""
users_data = _safe_read_json(USERS_PATH, {})
if "users" in users_data and isinstance(users_data["users"], list):
return [str(user).strip() for user in users_data["users"] if user and str(user).strip()]
# Fallback to default users
return [f"user_{i+1:02d}" for i in range(20)]
def get_default_seed() -> int:
"""Load default seed from users JSON file."""
users_data = _safe_read_json(USERS_PATH, {})
return users_data.get("default_seed", 42)
DEFAULT_USERS = load_users()
DEFAULT_SEED = get_default_seed()
# ----------------------------------
# Assignment & Progress persistence
# ----------------------------------
def init_or_load_assignments(default_users: List[str], seed: int = 42) -> Dict[str, List[str]]:
"""Load existing assignments or create a balanced random split."""
assignments = _get_assignments()
if assignments:
# filter out instances no longer present; preserve order
for u, lst in list(assignments.items()):
assignments[u] = [x for x in lst if x in INSTANCE_IDS]
_set_assignments(assignments)
return assignments
return create_assignments(default_users, seed)
def create_assignments(usernames: List[str], seed: int) -> Dict[str, List[str]]:
usernames = [u.strip() for u in usernames if u and u.strip()]
if len(usernames) == 0:
raise gr.Error("Please provide at least one username.")
# Each user gets exactly 10 instances
instances_per_user = 10
total_instances_needed = len(usernames) * instances_per_user
if total_instances_needed > len(INSTANCE_IDS):
raise gr.Error(f"Not enough instances available. Need {total_instances_needed} but only have {len(INSTANCE_IDS)}.")
insts = INSTANCE_IDS.copy()
rng = random.Random(int(seed))
rng.shuffle(insts)
# Take only the number of instances we need
selected_insts = insts[:total_instances_needed]
buckets = [[] for _ in usernames]
for i, inst in enumerate(selected_insts):
buckets[i % len(usernames)].append(inst)
assignments = {usernames[i]: buckets[i] for i in range(len(usernames))}
_set_assignments(assignments)
return assignments
ASSIGNMENTS = init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED)
def available_users() -> List[str]:
return sorted(list(_get_assignments().keys()))
def load_progress() -> Dict:
return _get_progress()
PROGRESS = load_progress()
def _ensure_user_progress_struct(user: str):
"""Initialize user progress skeleton for new users/instances."""
global PROGRESS
if user not in PROGRESS:
PROGRESS[user] = {}
# ensure entries exist for assigned instances
for inst in ASSIGNMENTS.get(user, []):
n = len(DATA[inst]["qa_pairs"])
if inst not in PROGRESS[user]:
PROGRESS[user][inst] = {
"answers": [None] * n
}
else:
# pad or trim if needed
ans = PROGRESS[user][inst].get("answers", [])
if len(ans) < n:
ans = ans + [None] * (n - len(ans))
elif len(ans) > n:
ans = ans[:n]
PROGRESS[user][inst]["answers"] = ans
_set_progress(PROGRESS)
def save_eval(user: str, inst: str, q_idx: int,
relevant_choice: Optional[str], correct_choice: Optional[str], note: str = "") -> str:
"""Persist evaluation for a single QA pair."""
if not user or not inst:
return "Select a user to begin."
_ensure_user_progress_struct(user)
global PROGRESS
if inst not in PROGRESS[user]:
PROGRESS[user][inst] = {"answers": [None] * len(DATA[inst]["qa_pairs"])}
# map choices to booleans
rel = None
if relevant_choice == "βœ“ Relevant":
rel = True
elif relevant_choice == "βœ— Not relevant":
rel = False
corr = None
if rel is True:
if correct_choice == "βœ“ Correct":
corr = True
elif correct_choice == "βœ— Incorrect":
corr = False
# build record
record = {
"relevant": rel,
"correct": corr if rel is True else None,
"note": (note or "").strip(),
"saved_at": dt.datetime.utcnow().isoformat() + "Z"
}
# save
answers = PROGRESS[user][inst]["answers"]
while q_idx >= len(answers): # safety
answers.append(None)
answers[q_idx] = record
_set_progress(PROGRESS)
label = f"Saved: {user} β€’ {inst} β€’ Q{q_idx+1} β†’ relevant={rel}"
if rel is True:
label += f", correct={corr}"
return label
def summarize_user(user: str) -> Tuple[str, List[List]]:
"""Return summary text & table for a user's progress."""
if not user:
return ("β€”", [])
_ensure_user_progress_struct(user)
assignments = _get_assignments()
progress = _get_progress()
total = 0
done = 0
rows = []
for inst in assignments.get(user, []):
qa_n = len(DATA[inst]["qa_pairs"])
total += qa_n
answers = progress[user][inst]["answers"]
c_done = sum(1 for a in answers if a is not None)
done += c_done
rows.append([inst, c_done, qa_n])
txt = f"Progress: {done} / {total} QA pairs completed across {len(assignments.get(user, []))} assigned instances."
return txt, rows
def next_unfinished(user: str) -> Tuple[Optional[str], Optional[int]]:
"""Return (instance_id, q_index) for the next unfinished QA pair for the user."""
_ensure_user_progress_struct(user)
assignments = _get_assignments()
progress = _get_progress()
for inst in assignments.get(user, []):
answers = progress[user][inst]["answers"]
for i, a in enumerate(answers):
if a is None:
return inst, i
return None, None
def first_unfinished_in_instance(user: str, inst: str) -> int:
_ensure_user_progress_struct(user)
progress = _get_progress()
answers = progress[user][inst]["answers"]
for i, a in enumerate(answers):
if a is None:
return i
return 0
def get_payload(inst: str, q_idx: int) -> Tuple[str, str, str, str, str]:
"""Return Q, A, findings, impressions, header text."""
pairs = DATA[inst]["qa_pairs"]
n = len(pairs)
if n == 0:
q = ""
a = ""
header = f"{inst} β€” No questions (0/0)"
f = DATA[inst]["findings"]
im = DATA[inst]["impressions"]
return q, a, f, im, header
q_idx = max(0, min(q_idx, n-1))
q = pairs[q_idx]["question"]
a = pairs[q_idx]["answer"]
f = DATA[inst]["findings"]
im = DATA[inst]["impressions"]
header = f"{inst} β€” Question {q_idx+1} / {n}"
return q, a, f, im, header
def export_user_results(user: str) -> str:
"""Write a CSV + JSON export for the selected user and return a status string & file paths."""
if not user:
return "Select a user to export results."
_ensure_user_progress_struct(user)
assignments = _get_assignments()
progress = _get_progress()
# Build flat rows
rows = []
for inst in assignments.get(user, []):
pairs = DATA[inst]["qa_pairs"]
answers = progress[user][inst]["answers"]
for i in range(len(pairs)):
ans = answers[i]
rows.append({
"user": user,
"instance_id": inst,
"q_index": i+1,
"question": pairs[i]["question"],
"answer": pairs[i]["answer"],
"relevant": None if ans is None else ans.get("relevant"),
"correct": None if ans is None else ans.get("correct"),
"note": None if ans is None else ans.get("note"),
"saved_at": None if ans is None else ans.get("saved_at"),
})
# Try to export to files, fallback to in-memory if not possible
try:
ts = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
json_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.json")
csv_path = os.path.join(EXPORT_DIR, f"results_{user}_{ts}.csv")
_safe_write_json(json_path, rows)
# Write CSV
import csv
with open(csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return f"Exported {len(rows)} rows.\nJSON: {json_path}\nCSV: {csv_path}"
except (PermissionError, OSError, IOError):
# Fallback: return data as JSON string
import json
json_str = json.dumps(rows, indent=2, ensure_ascii=False)
return f"Export data (read-only mode):\n\n{json_str[:1000]}{'...' if len(json_str) > 1000 else ''}"
# ------------------------------
# Gradio UI
# ------------------------------
with gr.Blocks(title="Spatial QA Validator", theme=gr.themes.Glass()) as demo:
# Check if running in read-only mode
read_only_status = ""
if not _file_write_enabled:
read_only_status = "\n\n⚠️ **Running in read-only mode** - Progress will not persist between sessions"
gr.Markdown("## Spatial QA Validation Tool\n"
"Left: Findings & Impression (Ground Truth)\n\n"
"Right: Spatial QA pairs (Q/A) to validate.\n\n"
"For each Q/A:\n"
"1) Mark if the **Question is relevant** to the Findings/Impression.\n"
"2) If **relevant**, mark whether the **Answer is correct**.\n" + read_only_status)
with gr.Row():
with gr.Column(scale=1, min_width=260):
user_dd = gr.Dropdown(choices=available_users(), label="Select user", interactive=True)
load_btn = gr.Button("Load my queue", variant="primary")
progress_text = gr.Markdown("")
progress_table = gr.Dataframe(headers=["Instance", "Done", "Total"], row_count=0, interactive=False)
inst_dd = gr.Dropdown(choices=[], label="Assigned instance", interactive=True, visible=False)
q_slider = gr.Slider(1, 10, value=1, step=1, label="Question #", interactive=True, visible=False)
export_btn = gr.Button("Export my results")
with gr.Column(scale=2, min_width=600):
# Left panel: Findings/Impressions
with gr.Row():
with gr.Column(scale=1, elem_classes=["left-panel"]):
findings_tb = gr.Textbox(label="Findings (Ground Truth)", lines=16, interactive=False)
impressions_tb = gr.Textbox(label="Impression (Ground Truth)", lines=6, interactive=False)
with gr.Column(scale=1, elem_classes=["left-panel"]):
header_md = gr.Markdown("")
question_md = gr.Markdown("")
answer_md = gr.Markdown("")
relevant_radio = gr.Radio(choices=["βœ“ Relevant", "βœ— Not relevant"],
label="1) Is the QUESTION relevant to Findings/Impression?",
interactive=True)
correct_radio = gr.Radio(choices=["βœ“ Correct", "βœ— Incorrect"],
label="2) If relevant, is the ANSWER correct?",
interactive=False)
note_tb = gr.Textbox(label="Optional note", lines=2, placeholder="Any comments...")
with gr.Row():
save_btn = gr.Button("Save", variant="secondary")
save_next_btn = gr.Button("Save & Next", variant="primary")
skip_btn = gr.Button("Skip to next unfinished")
nav_info = gr.Markdown("")
# Move Admin accordion below so user_dd exists before wiring its updates
with gr.Accordion("Setup (admin) β€” define users and (re)deal assignments", open=False):
with gr.Row():
users_csv = gr.Textbox(value=",".join(DEFAULT_USERS),
label="Usernames (comma-separated)",
lines=2)
seed_num = gr.Number(value=DEFAULT_SEED, precision=0,
label="Assignment Seed (change & Apply)")
apply_btn = gr.Button("Apply / (Re)create Assignments", variant="secondary")
assign_info = gr.Markdown()
def apply_users(u_csv, seed):
usernames = [x.strip() for x in (u_csv or "").split(",") if x.strip()]
new_assign = create_assignments(usernames, int(seed or 0))
user_list = ", ".join(sorted(new_assign.keys()))
# summarize counts
counts = {u: len(v) for u, v in new_assign.items()}
table = "\n".join([f"- **{u}**: {counts[u]} instances" for u in sorted(counts)])
total_assigned = sum(counts.values())
total_available = len(INSTANCE_IDS)
unassigned = total_available - total_assigned
return gr.update(choices=available_users(), value=None), f"Assignments updated for {len(new_assign)} users (10 instances each):\n{table}\n\n**Summary:** {total_assigned} instances assigned, {unassigned} instances left unassigned out of {total_available} total."
# now that user_dd exists, reference it directly in outputs
apply_btn.click(apply_users, inputs=[users_csv, seed_num], outputs=[user_dd, assign_info])
# ---- Wiring functions ----
def _load_user(user: str):
if not user:
raise gr.Error("Please select a user.")
# Refresh assignments if changed by admin
init_or_load_assignments(DEFAULT_USERS, seed=DEFAULT_SEED)
_ensure_user_progress_struct(user)
# summary
txt, rows = summarize_user(user)
# initial pointer = next unfinished overall
inst, q_idx = next_unfinished(user)
assignments = _get_assignments()
if inst is None:
# user is done
return (
gr.update(choices=assignments.get(user, []), visible=True, value=None),
gr.update(visible=False),
"", "", "", "", "",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"", # note
txt, rows, "All assigned QA pairs are completed. πŸŽ‰"
)
# populate content
q, a, f, im, header = get_payload(inst, q_idx)
n = len(DATA[inst]["qa_pairs"])
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(q_idx+1 if n > 0 else 1))
nav = f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} β€’ Q{(q_idx+1) if n>0 else 0}/{n}"
return (
gr.update(choices=assignments.get(user, []), visible=True, value=inst),
slider_update,
f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
txt, rows, nav
)
load_btn.click(
_load_user,
inputs=[user_dd],
outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, progress_text, progress_table, nav_info]
)
def _inst_changed(user: str, inst: str):
if not user or not inst:
return gr.update(visible=False), "", "", "", "", "", gr.update(interactive=True), gr.update(interactive=False), "", ""
idx = first_unfinished_in_instance(user, inst)
q, a, f, im, header = get_payload(inst, idx)
n = len(DATA[inst]["qa_pairs"])
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1))
assignments = _get_assignments()
return (
slider_update,
f, im, f"**{header}**", f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
f"{assignments.get(user, []).index(inst)+1}/{len(assignments.get(user, []))} β€’ Q{(idx+1) if n>0 else 0}/{n}"
)
inst_dd.change(
_inst_changed,
inputs=[user_dd, inst_dd],
outputs=[q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, nav_info]
)
def _q_changed(inst: str, q_no: int):
if not inst:
return "", "", "", ""
idx = int(q_no) - 1
q, a, f, im, header = get_payload(inst, idx)
return f"**{header}**", f"**Q:** {q}", f"**A:** {a}", ""
q_slider.change(_q_changed, inputs=[inst_dd, q_slider],
outputs=[header_md, question_md, answer_md, note_tb])
def _relevant_changed(rel_choice: Optional[str]):
if rel_choice == "βœ“ Relevant":
return gr.update(interactive=True)
else:
# reset and disable
return gr.update(value=None, interactive=False)
relevant_radio.change(_relevant_changed, inputs=[relevant_radio], outputs=[correct_radio])
def _save(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str):
if not (user and inst and q_no):
raise gr.Error("Missing user/instance/question selection.")
idx = int(q_no) - 1
msg = save_eval(user, inst, idx, rel_choice, corr_choice, note)
txt, rows = summarize_user(user)
return msg, txt, rows
save_btn.click(
_save,
inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb],
outputs=[nav_info, progress_text, progress_table]
)
def _save_and_next(user: str, inst: str, q_no: int, rel_choice: Optional[str], corr_choice: Optional[str], note: str):
if not (user and inst and q_no):
raise gr.Error("Missing user/instance/question selection.")
idx = int(q_no) - 1
msg = save_eval(user, inst, idx, rel_choice, corr_choice, note)
# jump to next unfinished (global)
nxt_inst, nxt_idx = next_unfinished(user)
assignments = _get_assignments()
if nxt_inst is None:
txt, rows = summarize_user(user)
return (
gr.Dropdown.update(value=None),
gr.update(visible=False),
"", "", "",
"", "", # q/a
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
f"{msg}\n\nAll assigned QA pairs are completed. πŸŽ‰",
txt, rows
)
# else load that payload
q, a, f, im, header = get_payload(nxt_inst, nxt_idx)
n = len(DATA[nxt_inst]["qa_pairs"])
txt, rows = summarize_user(user)
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(nxt_idx+1 if n > 0 else 1))
return (
gr.update(value=nxt_inst),
slider_update,
f, im, f"**{header}**",
f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
msg,
txt, rows
)
save_next_btn.click(
_save_and_next,
inputs=[user_dd, inst_dd, q_slider, relevant_radio, correct_radio, note_tb],
outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table]
)
def _skip_to_next(user: str):
if not user:
raise gr.Error("Please select a user.")
inst, idx = next_unfinished(user)
assignments = _get_assignments()
if inst is None:
txt, rows = summarize_user(user)
return (
gr.update(value=None),
gr.update(visible=False),
"", "", "",
"", "",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
"All assigned QA pairs are completed. πŸŽ‰",
txt, rows
)
q, a, f, im, header = get_payload(inst, idx)
n = len(DATA[inst]["qa_pairs"])
txt, rows = summarize_user(user)
slider_update = gr.update(visible=(n > 0), minimum=1 if n > 0 else 1, maximum=n if n > 0 else 1, step=1, value=(idx+1 if n > 0 else 1))
return (
gr.update(value=inst),
slider_update,
f, im, f"**{header}**",
f"**Q:** {q}", f"**A:** {a}",
gr.update(value=None, interactive=True),
gr.update(value=None, interactive=False),
"",
"Jumped to next unfinished.",
txt, rows
)
skip_btn.click(
_skip_to_next,
inputs=[user_dd],
outputs=[inst_dd, q_slider, findings_tb, impressions_tb, header_md, question_md, answer_md,
relevant_radio, correct_radio, note_tb, nav_info, progress_text, progress_table]
)
def _export(user: str):
msg = export_user_results(user)
return msg
export_btn.click(_export, inputs=[user_dd], outputs=[nav_info])
if __name__ == "__main__":
# server_name '0.0.0.0' allows remote access if hosted; keep default port
demo.launch(share=True)