| | import gradio as gr |
| | import json |
| | import os |
| | import random |
| | import glob |
| | from datetime import datetime |
| |
|
| | |
| | DATA_PATH = '/home/mshahidul/readctrl/data/extracting_subclaim/extracted_subclaims_syn_data_with_gs_summary_en.json' |
| | KEY_DATA_PATH = '/home/mshahidul/readctrl/data/key_subclaims_testing/key_subclaims.json' |
| | BASE_SAVE_DIR = '/home/mshahidul/readctrl/data/thresold_finding/' |
| |
|
| | |
| | with open(DATA_PATH, 'r') as f: |
| | data = json.load(f) |
| |
|
| | NUM_SAMPLES = 10 |
| | random.seed(42) |
| | all_possible_indices = list(range(len(data))) |
| | shuffled_indices = random.sample(all_possible_indices, min(NUM_SAMPLES, len(data))) |
| |
|
| | with open(KEY_DATA_PATH, 'r') as f: |
| | key_data = json.load(f) |
| | key_lookup = {item['index']: item['llm_output'] for item in key_data} |
| |
|
| | |
| | def get_user_dir(username): |
| | if not username: return None |
| | safe_name = "".join(x for x in username if x.isalnum()).lower() |
| | user_path = os.path.join(BASE_SAVE_DIR, safe_name) |
| | os.makedirs(user_path, exist_ok=True) |
| | return user_path |
| |
|
| | def get_last_progress(username): |
| | user_dir = get_user_dir(username) |
| | files = glob.glob(os.path.join(user_dir, "seq*_*.json")) |
| | if not files: return 0 |
| | indices = [] |
| | for f in files: |
| | try: |
| | indices.append(int(os.path.basename(f).split('_')[0].replace('seq', ''))) |
| | except: continue |
| | return min(max(indices) + 1, NUM_SAMPLES - 1) if indices else 0 |
| |
|
| | |
| | def load_example(progress_index, username): |
| | if not username: |
| | return [gr.update(value="### β οΈ Please enter your name and click Login")] + [gr.skip()]*10 |
| |
|
| | if progress_index >= len(shuffled_indices): |
| | return ["### π All Samples Complete!", "Done", [], "0%", "0%", "0%", gr.update(choices=[], value=[]), gr.update(choices=[], value=[]), gr.update(choices=[], value=[]), "Session Finished", progress_index] |
| |
|
| | actual_data_index = shuffled_indices[progress_index] |
| | record = data[actual_data_index] |
| | |
| | random.seed(actual_data_index) |
| | source_type = random.choice(["Full Original Text", "Gold Summary"]) |
| | text_content, subclaims = (record['fulltext'], record['fulltext_subclaims']) if source_type == "Full Original Text" else (record['summary'], record['summary_subclaims']) |
| | |
| | user_dir = get_user_dir(username) |
| | existing_files = glob.glob(os.path.join(user_dir, f"seq{progress_index}_*.json")) |
| | |
| | if existing_files: |
| | with open(existing_files[0], 'r') as f: |
| | saved = json.load(f) |
| | low_val = saved['annotations']['low']['subclaims'] |
| | int_val = saved['annotations']['intermediate']['subclaims'] |
| | prof_val = saved['annotations']['proficient']['subclaims'] |
| | status_msg = f"π [Sequence {progress_index}] Previously saved data loaded." |
| | else: |
| | key_items = key_lookup.get(actual_data_index, {}).get('key_source_text_subclaims' if source_type == "Full Original Text" else 'key_gold_summary_subclaims', []) |
| | indices = [] |
| | for item in key_items: |
| | try: indices.append(int(item.get("source_subclaim_id" if source_type == "Full Original Text" else "gold_subclaim_id", "").split('-')[-1])) |
| | except: continue |
| | default_sel = [subclaims[i] for i in indices if 0 <= i < len(subclaims)] |
| | low_val, int_val, prof_val = default_sel, default_sel, default_sel |
| | status_msg = f"π [Sequence {progress_index}] New record loaded." |
| |
|
| | source_info = f"### Instance: {progress_index + 1}/{len(shuffled_indices)} | User: **{username}** | Source: **{source_type}**" |
| | |
| | |
| | total = len(subclaims) if subclaims else 1 |
| | l_p, i_p, p_p = f"{(len(low_val)/total*100):.1f}%", f"{(len(int_val)/total*100):.1f}%", f"{(len(prof_val)/total*100):.1f}%" |
| |
|
| | return [ |
| | source_info, text_content, subclaims, l_p, i_p, p_p, |
| | gr.update(choices=subclaims, value=low_val), |
| | gr.update(choices=subclaims, value=int_val), |
| | gr.update(choices=subclaims, value=prof_val), |
| | status_msg, progress_index |
| | ] |
| |
|
| | def handle_save(username, progress_index, source_info, low_sel, int_sel, prof_sel, subclaims): |
| | if not username or username.strip() == "": |
| | gr.Warning("User name missing! Please enter name.") |
| | return "β Error: Username Required" |
| | |
| | if not (len(low_sel) <= len(int_sel) <= len(prof_sel)): |
| | gr.Warning("Hierarchy Error: Selections must follow Low β Intermediate β Proficient.") |
| | return "β Save Failed: Hierarchy Violation" |
| |
|
| | user_dir = get_user_dir(username) |
| | actual_data_index = shuffled_indices[progress_index] |
| | stype = "Full Original Text" if "Full Original Text" in source_info else "Gold Summary" |
| | |
| | |
| | total_count = len(subclaims) if subclaims else 1 |
| | low_pct_val = (len(low_sel) / total_count) * 100 |
| | int_pct_val = (len(int_sel) / total_count) * 100 |
| | prof_pct_val = (len(prof_sel) / total_count) * 100 |
| |
|
| | result = { |
| | "annotator": username, |
| | "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| | "progress_sequence": progress_index, |
| | "original_data_index": actual_data_index, |
| | "source_type": stype, |
| | "total_subclaims": total_count, |
| | "annotations": { |
| | "low": { |
| | "count": len(low_sel), |
| | "percentage": f"{low_pct_val:.2f}%", |
| | "subclaims": low_sel |
| | }, |
| | "intermediate": { |
| | "count": len(int_sel), |
| | "percentage": f"{int_pct_val:.2f}%", |
| | "subclaims": int_sel |
| | }, |
| | "proficient": { |
| | "count": len(prof_sel), |
| | "percentage": f"{prof_pct_val:.2f}%", |
| | "subclaims": prof_sel |
| | } |
| | } |
| | } |
| | |
| | filename = f"seq{progress_index}_record{actual_data_index}.json" |
| | file_path = os.path.join(user_dir, filename) |
| | |
| | with open(file_path, 'w') as f: |
| | json.dump(result, f, indent=4) |
| | |
| | gr.Info(f"Record {progress_index + 1} saved successfully!") |
| | return f"β
Last saved: {datetime.now().strftime('%H:%M:%S')}" |
| |
|
| | def navigate(direction, current_idx): |
| | return max(0, min(current_idx + direction, NUM_SAMPLES - 1)) |
| |
|
| | def sync_logic(low, inter, prof, total, trigger_type): |
| | if trigger_type == "low": |
| | inter, prof = list(set(inter) | set(low)), list(set(prof) | set(inter) | set(low)) |
| | elif trigger_type == "inter": |
| | prof, low = list(set(prof) | set(inter)), list(set(low) & set(inter)) |
| | else: |
| | inter, low = list(set(inter) & set(prof)), list(set(low) & set(inter) & set(prof)) |
| | |
| | calc_pct = lambda x: f"{(len(x)/len(total)*100):.1f}%" if total else "0%" |
| | return calc_pct(low), calc_pct(inter), calc_pct(prof), gr.update(value=low), gr.update(value=inter), gr.update(value=prof) |
| |
|
| | |
| | with gr.Blocks(theme=gr.themes.Soft(), title="Medical Literacy Tool") as demo: |
| | index_state = gr.State(0) |
| | subclaim_list_state = gr.State([]) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | user_input = gr.Textbox(label="Annotator Name", placeholder="e.g., Shahidul", interactive=True) |
| | load_btn = gr.Button("π Login / Resume Session", variant="primary") |
| | with gr.Column(scale=3): |
| | with gr.Accordion("π View Task Instructions", open=False): |
| | try: |
| | with open("/home/mshahidul/readctrl/code/interface/instructions", "r") as f: |
| | gr.Markdown(f.read()) |
| | except: |
| | gr.Markdown("### Instructions\n- Adjust subclaims for literacy levels.\n- **Saving:** Overwrites previous edits for the same record.") |
| |
|
| | gr.HTML("<hr>") |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=1, variant="panel"): |
| | source_display = gr.Markdown("### Please login to begin.") |
| | progress_bar = gr.Slider(label="Progress", minimum=0, maximum=NUM_SAMPLES-1, step=1, interactive=False) |
| | text_viewer = gr.Textbox(label="Reference Text", interactive=False, lines=18) |
| | save_status = gr.Markdown("Status: Waiting for login...") |
| |
|
| | with gr.Column(scale=2): |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### π’ Low") |
| | low_pct = gr.Label(label="Coverage", value="0%") |
| | low_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| | with gr.Column(): |
| | gr.Markdown("### π‘ Intermediate") |
| | int_pct = gr.Label(label="Coverage", value="0%") |
| | int_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| | with gr.Column(): |
| | gr.Markdown("### π΄ Proficient") |
| | prof_pct = gr.Label(label="Coverage", value="0%") |
| | prof_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| |
|
| | with gr.Row(): |
| | prev_btn = gr.Button("β¬
οΈ Previous") |
| | save_btn = gr.Button("πΎ Save Changes", variant="primary") |
| | next_btn = gr.Button("Next β‘οΈ") |
| |
|
| | |
| | load_btn.click(lambda u: (get_last_progress(u), f"Session for {u} active."), [user_input], [index_state, save_status]).then( |
| | load_example, [index_state, user_input], |
| | [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, save_status, progress_bar] |
| | ) |
| |
|
| | save_btn.click(handle_save, [user_input, index_state, source_display, low_check, int_check, prof_check, subclaim_list_state], [save_status]) |
| |
|
| | next_btn.click(navigate, [gr.Number(1, visible=False), index_state], [index_state]).then( |
| | load_example, [index_state, user_input], |
| | [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, save_status, progress_bar] |
| | ) |
| | |
| | prev_btn.click(navigate, [gr.Number(-1, visible=False), index_state], [index_state]).then( |
| | load_example, [index_state, user_input], |
| | [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, save_status, progress_bar] |
| | ) |
| |
|
| | |
| | low_check.input(lambda l,i,p,t: sync_logic(l,i,p,t,"low"), [low_check, int_check, prof_check, subclaim_list_state], [low_pct, int_pct, prof_pct, low_check, int_check, prof_check]) |
| | int_check.input(lambda l,i,p,t: sync_logic(l,i,p,t,"inter"), [low_check, int_check, prof_check, subclaim_list_state], [low_pct, int_pct, prof_pct, low_check, int_check, prof_check]) |
| | prof_check.input(lambda l,i,p,t: sync_logic(l,i,p,t,"prof"), [low_check, int_check, prof_check, subclaim_list_state], [low_pct, int_pct, prof_pct, low_check, int_check, prof_check]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(share=True) |