| import gradio as gr |
| import json |
| import os |
| import random |
| import glob |
| from datetime import datetime |
|
|
| |
| DATA_PATH = '/home/mshahidul/readctrl/data/extracting_subclaim/extracted_subclaims_syn_data_with_gs_summary_en.json' |
| KEY_DATA_PATH = '/home/mshahidul/readctrl/data/key_subclaims_testing/key_subclaims.json' |
| BASE_SAVE_DIR = '/home/mshahidul/readctrl/data/thresold_finding/' |
|
|
| |
| with open(DATA_PATH, 'r') as f: |
| data = json.load(f) |
|
|
| NUM_SAMPLES = 10 |
| random.seed(42) |
| all_possible_indices = list(range(len(data))) |
| shuffled_indices = random.sample(all_possible_indices, min(NUM_SAMPLES, len(data))) |
|
|
| with open(KEY_DATA_PATH, 'r') as f: |
| key_data = json.load(f) |
| key_lookup = {item['index']: item['llm_output'] for item in key_data} |
|
|
| |
| def get_user_dir(username): |
| if not username: return None |
| safe_name = "".join(x for x in username if x.isalnum()).lower() |
| user_path = os.path.join(BASE_SAVE_DIR, safe_name) |
| os.makedirs(user_path, exist_ok=True) |
| return user_path |
|
|
| def get_last_progress(username): |
| user_dir = get_user_dir(username) |
| files = glob.glob(os.path.join(user_dir, "seq*_*.json")) |
| if not files: return 0 |
| indices = [] |
| for f in files: |
| try: |
| indices.append(int(os.path.basename(f).split('_')[0].replace('seq', ''))) |
| except: continue |
| return min(max(indices) + 1, NUM_SAMPLES - 1) if indices else 0 |
|
|
| |
| def load_example(progress_index, username): |
| if not username: |
| return [gr.update(value="### β οΈ Please enter your name and click Login")] + [gr.skip()]*10 |
|
|
| if progress_index >= len(shuffled_indices): |
| return ["### π All Samples Complete!", "Done", [], "0%", "0%", "0%", gr.update(choices=[], value=[]), gr.update(choices=[], value=[]), gr.update(choices=[], value=[]), "Session Finished", progress_index] |
|
|
| actual_data_index = shuffled_indices[progress_index] |
| record = data[actual_data_index] |
| |
| random.seed(actual_data_index) |
| source_type = random.choice(["Full Original Text", "Gold Summary"]) |
| text_content, subclaims = (record['fulltext'], record['fulltext_subclaims']) if source_type == "Full Original Text" else (record['summary'], record['summary_subclaims']) |
| |
| user_dir = get_user_dir(username) |
| existing_files = glob.glob(os.path.join(user_dir, f"seq{progress_index}_*.json")) |
| |
| if existing_files: |
| with open(existing_files[0], 'r') as f: |
| saved = json.load(f) |
| low_val = saved['annotations']['low']['subclaims'] |
| int_val = saved['annotations']['intermediate']['subclaims'] |
| prof_val = saved['annotations']['proficient']['subclaims'] |
| status_msg = f"π [Sequence {progress_index}] Previously saved data loaded." |
| else: |
| key_items = key_lookup.get(actual_data_index, {}).get('key_source_text_subclaims' if source_type == "Full Original Text" else 'key_gold_summary_subclaims', []) |
| indices = [] |
| for item in key_items: |
| try: indices.append(int(item.get("source_subclaim_id" if source_type == "Full Original Text" else "gold_subclaim_id", "").split('-')[-1])) |
| except: continue |
| default_sel = [subclaims[i] for i in indices if 0 <= i < len(subclaims)] |
| low_val, int_val, prof_val = default_sel, default_sel, default_sel |
| status_msg = f"π [Sequence {progress_index}] New record loaded." |
|
|
| source_info = f"### Instance: {progress_index + 1}/{len(shuffled_indices)} | User: **{username}** | Source: **{source_type}**" |
| |
| |
| total = len(subclaims) if subclaims else 1 |
| l_p, i_p, p_p = f"{(len(low_val)/total*100):.1f}%", f"{(len(int_val)/total*100):.1f}%", f"{(len(prof_val)/total*100):.1f}%" |
|
|
| return [ |
| source_info, text_content, subclaims, l_p, i_p, p_p, |
| gr.update(choices=subclaims, value=low_val), |
| gr.update(choices=subclaims, value=int_val), |
| gr.update(choices=subclaims, value=prof_val), |
| status_msg, progress_index |
| ] |
|
|
| def handle_save(username, progress_index, source_info, low_sel, int_sel, prof_sel, subclaims): |
| if not username or username.strip() == "": |
| gr.Warning("User name missing! Please enter name.") |
| return "β Error: Username Required" |
| |
| if not (len(low_sel) <= len(int_sel) <= len(prof_sel)): |
| gr.Warning("Hierarchy Error: Selections must follow Low β Intermediate β Proficient.") |
| return "β Save Failed: Hierarchy Violation" |
|
|
| user_dir = get_user_dir(username) |
| actual_data_index = shuffled_indices[progress_index] |
| stype = "Full Original Text" if "Full Original Text" in source_info else "Gold Summary" |
| |
| |
| total_count = len(subclaims) if subclaims else 1 |
| low_pct_val = (len(low_sel) / total_count) * 100 |
| int_pct_val = (len(int_sel) / total_count) * 100 |
| prof_pct_val = (len(prof_sel) / total_count) * 100 |
|
|
| result = { |
| "annotator": username, |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "progress_sequence": progress_index, |
| "original_data_index": actual_data_index, |
| "source_type": stype, |
| "total_subclaims": total_count, |
| "annotations": { |
| "low": { |
| "count": len(low_sel), |
| "percentage": f"{low_pct_val:.2f}%", |
| "subclaims": low_sel |
| }, |
| "intermediate": { |
| "count": len(int_sel), |
| "percentage": f"{int_pct_val:.2f}%", |
| "subclaims": int_sel |
| }, |
| "proficient": { |
| "count": len(prof_sel), |
| "percentage": f"{prof_pct_val:.2f}%", |
| "subclaims": prof_sel |
| } |
| } |
| } |
| |
| filename = f"seq{progress_index}_record{actual_data_index}.json" |
| file_path = os.path.join(user_dir, filename) |
| |
| with open(file_path, 'w') as f: |
| json.dump(result, f, indent=4) |
| |
| gr.Info(f"Record {progress_index + 1} saved successfully!") |
| return f"β
Last saved: {datetime.now().strftime('%H:%M:%S')}" |
|
|
| def navigate(direction, current_idx): |
| return max(0, min(current_idx + direction, NUM_SAMPLES - 1)) |
|
|
| def sync_logic(low, inter, prof, total, trigger_type): |
| if trigger_type == "low": |
| inter, prof = list(set(inter) | set(low)), list(set(prof) | set(inter) | set(low)) |
| elif trigger_type == "inter": |
| prof, low = list(set(prof) | set(inter)), list(set(low) & set(inter)) |
| else: |
| inter, low = list(set(inter) & set(prof)), list(set(low) & set(inter) & set(prof)) |
| |
| calc_pct = lambda x: f"{(len(x)/len(total)*100):.1f}%" if total else "0%" |
| return calc_pct(low), calc_pct(inter), calc_pct(prof), gr.update(value=low), gr.update(value=inter), gr.update(value=prof) |
|
|
| |
| with gr.Blocks(theme=gr.themes.Soft(), title="Medical Literacy Tool") as demo: |
| index_state = gr.State(0) |
| subclaim_list_state = gr.State([]) |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| user_input = gr.Textbox(label="Annotator Name", placeholder="e.g., Shahidul", interactive=True) |
| load_btn = gr.Button("π Login / Resume Session", variant="primary") |
| with gr.Column(scale=3): |
| with gr.Accordion("π View Task Instructions", open=False): |
| try: |
| with open("/home/mshahidul/readctrl/code/interface/instructions", "r") as f: |
| gr.Markdown(f.read()) |
| except: |
| gr.Markdown("### Instructions\n- Adjust subclaims for literacy levels.\n- **Saving:** Overwrites previous edits for the same record.") |
|
|
| gr.HTML("<hr>") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1, variant="panel"): |
| source_display = gr.Markdown("### Please login to begin.") |
| progress_bar = gr.Slider(label="Progress", minimum=0, maximum=NUM_SAMPLES-1, step=1, interactive=False) |
| text_viewer = gr.Textbox(label="Reference Text", interactive=False, lines=18) |
| save_status = gr.Markdown("Status: Waiting for login...") |
|
|
| with gr.Column(scale=2): |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### π’ Low") |
| low_pct = gr.Label(label="Coverage", value="0%") |
| low_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| with gr.Column(): |
| gr.Markdown("### π‘ Intermediate") |
| int_pct = gr.Label(label="Coverage", value="0%") |
| int_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| with gr.Column(): |
| gr.Markdown("### π΄ Proficient") |
| prof_pct = gr.Label(label="Coverage", value="0%") |
| prof_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
|
|
| with gr.Row(): |
| prev_btn = gr.Button("β¬
οΈ Previous") |
| save_btn = gr.Button("πΎ Save Changes", variant="primary") |
| next_btn = gr.Button("Next β‘οΈ") |
|
|
| |
| load_btn.click(lambda u: (get_last_progress(u), f"Session for {u} active."), [user_input], [index_state, save_status]).then( |
| load_example, [index_state, user_input], |
| [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, save_status, progress_bar] |
| ) |
|
|
| save_btn.click(handle_save, [user_input, index_state, source_display, low_check, int_check, prof_check, subclaim_list_state], [save_status]) |
|
|
| next_btn.click(navigate, [gr.Number(1, visible=False), index_state], [index_state]).then( |
| load_example, [index_state, user_input], |
| [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, save_status, progress_bar] |
| ) |
| |
| prev_btn.click(navigate, [gr.Number(-1, visible=False), index_state], [index_state]).then( |
| load_example, [index_state, user_input], |
| [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, save_status, progress_bar] |
| ) |
|
|
| |
| low_check.input(lambda l,i,p,t: sync_logic(l,i,p,t,"low"), [low_check, int_check, prof_check, subclaim_list_state], [low_pct, int_pct, prof_pct, low_check, int_check, prof_check]) |
| int_check.input(lambda l,i,p,t: sync_logic(l,i,p,t,"inter"), [low_check, int_check, prof_check, subclaim_list_state], [low_pct, int_pct, prof_pct, low_check, int_check, prof_check]) |
| prof_check.input(lambda l,i,p,t: sync_logic(l,i,p,t,"prof"), [low_check, int_check, prof_check, subclaim_list_state], [low_pct, int_pct, prof_pct, low_check, int_check, prof_check]) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True) |