| | import gradio as gr |
| | import json |
| | import os |
| | import random |
| | from datetime import datetime |
| |
|
| | |
| | DATA_PATH = '/home/mshahidul/readctrl/data/extracting_subclaim/extracted_subclaims_syn_data_with_gs_summary_en.json' |
| | KEY_DATA_PATH = '/home/mshahidul/readctrl/data/key_subclaims_testing/key_subclaims.json' |
| | BASE_SAVE_DIR = '/home/mshahidul/readctrl/data/thresold_finding/' |
| |
|
| | |
| | session_folder_name = datetime.now().strftime("%Y-%m-%d_%Hh") |
| | SESSION_PATH = os.path.join(BASE_SAVE_DIR, session_folder_name) |
| | os.makedirs(SESSION_PATH, exist_ok=True) |
| |
|
| | |
| | with open(DATA_PATH, 'r') as f: |
| | data = json.load(f) |
| | NUM_SAMPLES= 10 |
| | random.seed(42) |
| | all_possible_indices = list(range(len(data))) |
| | shuffled_indices = random.sample(all_possible_indices, min(NUM_SAMPLES, len(data))) |
| |
|
| | with open(KEY_DATA_PATH, 'r') as f: |
| | key_data = json.load(f) |
| |
|
| | key_lookup = {item['index']: item['llm_output'] for item in key_data} |
| |
|
| | |
| | def get_key_indices(index, source_type): |
| | if index not in key_lookup: |
| | return [] |
| | |
| | key_field = 'key_source_text_subclaims' if source_type == "Full Original Text" else 'key_gold_summary_subclaims' |
| | id_key = "source_subclaim_id" if source_type == "Full Original Text" else "gold_subclaim_id" |
| | |
| | key_items = key_lookup[index].get(key_field, []) |
| | |
| | indices = [] |
| | for item in key_items: |
| | raw_id = item.get(id_key, "") |
| | try: |
| | idx = int(raw_id.split('-')[-1]) |
| | indices.append(idx) |
| | except (ValueError, IndexError): |
| | continue |
| | return indices |
| |
|
| | def load_example(progress_index): |
| | |
| | if progress_index >= len(shuffled_indices): |
| | return [ |
| | gr.update(value="### 🎉 Session Complete!"), |
| | gr.update(value=f"You have finished your set of {NUM_SAMPLES} records."), |
| | [], "0%", "0%", "0%", gr.update(choices=[], value=[]), |
| | gr.update(choices=[], value=[]), gr.update(choices=[], value=[]), "" |
| | ] |
| | |
| | |
| | actual_data_index = shuffled_indices[progress_index] |
| | record = data[actual_data_index] |
| | |
| | |
| | random.seed(actual_data_index) |
| | source_type = random.choice(["Full Original Text", "Gold Summary"]) |
| | |
| | if source_type == "Full Original Text": |
| | text_content, subclaims = record['fulltext'], record['fulltext_subclaims'] |
| | else: |
| | text_content, subclaims = record['summary'], record['summary_subclaims'] |
| | |
| | source_info = f"### Instance: {progress_index + 1}/{len(shuffled_indices)} | Source: **{source_type}**" |
| | key_indices = get_key_indices(actual_data_index, source_type) |
| | |
| | pre_selected = [subclaims[idx] for idx in key_indices if 0 <= idx < len(subclaims)] |
| |
|
| | return [ |
| | source_info, text_content, subclaims, "0%", "0%", "0%", |
| | gr.update(choices=subclaims, value=pre_selected), |
| | gr.update(choices=subclaims, value=pre_selected), |
| | gr.update(choices=subclaims, value=pre_selected), |
| | "" |
| | ] |
| | def sync_from_low(low, inter, prof, total_list): |
| | |
| | new_inter = list(set(inter) | set(low)) |
| | new_prof = list(set(prof) | set(new_inter)) |
| | return update_ui_components(low, new_inter, new_prof, total_list) |
| |
|
| | def sync_from_inter(low, inter, prof, total_list): |
| | |
| | new_prof = list(set(prof) | set(inter)) |
| | |
| | new_low = list(set(low) & set(inter)) |
| | return update_ui_components(new_low, inter, new_prof, total_list) |
| |
|
| | def sync_from_prof(low, inter, prof, total_list): |
| | |
| | new_inter = list(set(inter) & set(prof)) |
| | new_low = list(set(low) & set(prof)) |
| | return update_ui_components(new_low, new_inter, prof, total_list) |
| |
|
| | def update_ui_components(low, inter, prof, total_list): |
| | """Helper to calculate percentages and return updates for all groups""" |
| | if not total_list: |
| | return "0%", "0%", "0%", "", low, inter, prof |
| | |
| | l_pct, i_pct, p_pct = (len(x)/len(total_list) * 100 for x in [low, inter, prof]) |
| | |
| | |
| | |
| | msg = "✅ Hierarchy Enforced: Low ⊆ Intermediate ⊆ Proficient" |
| | |
| | return ( |
| | f"{l_pct:.1f}%", f"{i_pct:.1f}%", f"{p_pct:.1f}%", msg, |
| | gr.update(value=low), gr.update(value=inter), gr.update(value=prof) |
| | ) |
| |
|
| | def save_and_next(username, progress_index, source_info, low_sel, int_sel, prof_sel, subclaims): |
| | """ |
| | Saves the current annotation and moves to the next record in the random sample. |
| | |
| | progress_index: The sequence number (0, 1, 2...) from the shuffled list. |
| | shuffled_indices: This must be the global list generated at the top of your script. |
| | """ |
| | |
| | |
| | if progress_index >= len(shuffled_indices): |
| | return [progress_index] + load_example(progress_index) |
| |
|
| | |
| | if not username or username.strip() == "": |
| | gr.Warning("Action Required: Please enter your name before submitting!") |
| | |
| | return [progress_index, source_info, gr.skip(), gr.skip(), gr.skip(), gr.skip(), gr.skip(), |
| | gr.update(value=low_sel), gr.update(value=int_sel), gr.update(value=prof_sel), |
| | "⚠️ **Error:** Please enter your name."] |
| |
|
| | |
| | if not (len(low_sel) <= len(int_sel) <= len(prof_sel)): |
| | gr.Warning("DATA NOT SAVED! The selection does not follow the hierarchy: Low ≤ Intermediate ≤ Proficient.") |
| | return [progress_index, source_info, gr.skip(), gr.skip(), gr.skip(), gr.skip(), gr.skip(), |
| | gr.update(value=low_sel), gr.update(value=int_sel), gr.update(value=prof_sel), |
| | "⚠️ **Error:** Selection sequence is invalid. Please adjust before saving."] |
| |
|
| | |
| | actual_data_index = shuffled_indices[progress_index] |
| |
|
| | |
| | try: |
| | if not os.path.exists(SESSION_PATH): |
| | os.makedirs(SESSION_PATH, exist_ok=True) |
| | except Exception as e: |
| | gr.Error(f"Critical Error: Could not create directory {SESSION_PATH}. Error: {e}") |
| | return [progress_index] + load_example(progress_index) |
| |
|
| | |
| | now = datetime.now() |
| | timestamp_str = now.strftime("%Y%m%d_%H%M%S") |
| | safe_username = "".join(x for x in username if x.isalnum()) |
| | |
| | |
| | filename = f"recordID{actual_data_index}_seq{progress_index}_{safe_username}_{timestamp_str}.json" |
| | file_path = os.path.join(SESSION_PATH, filename) |
| | |
| | stype = "Full Original Text" if "Full Original Text" in source_info else "Gold Summary" |
| | |
| | |
| | result = { |
| | "annotator": username, |
| | "timestamp": now.strftime("%Y-%m-%d %H:%M:%S"), |
| | "progress_sequence": progress_index, |
| | "original_data_index": actual_data_index, |
| | "source_type": stype, |
| | "annotations": { |
| | "low": { |
| | "count": len(low_sel), |
| | "subclaims": low_sel, |
| | "pct": len(low_sel)/len(subclaims) if subclaims else 0 |
| | }, |
| | "intermediate": { |
| | "count": len(int_sel), |
| | "subclaims": int_sel, |
| | "pct": len(int_sel)/len(subclaims) if subclaims else 0 |
| | }, |
| | "proficient": { |
| | "count": len(prof_sel), |
| | "subclaims": prof_sel, |
| | "pct": len(prof_sel)/len(subclaims) if subclaims else 0 |
| | } |
| | } |
| | } |
| | |
| | |
| | with open(file_path, 'w') as f: |
| | json.dump(result, f, indent=4) |
| | |
| | gr.Info(f"Success! Record {actual_data_index} saved (Item {progress_index + 1} of {len(shuffled_indices)}).") |
| | |
| | |
| | return [progress_index + 1] + load_example(progress_index + 1) |
| |
|
| | |
| | with gr.Blocks(theme=gr.themes.Soft(), title="Medical Literacy Annotation Tool") as demo: |
| | index_state = gr.State(0) |
| | subclaim_list_state = gr.State([]) |
| | |
| | try: |
| | with open("/home/mshahidul/readctrl/code/interface/instructions", "r") as f: |
| | instructions_text = f.read() |
| | except: |
| | instructions_text = "# Medical Annotation Task" |
| | |
| | gr.Markdown(instructions_text) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1, variant="panel"): |
| | user_input = gr.Textbox(label="Annotator Name", placeholder="e.g., mshahidul", interactive=True) |
| | gr.HTML("<hr>") |
| | source_display = gr.Markdown("### Initializing...") |
| | text_viewer = gr.Textbox(label="Reference Text", interactive=False, lines=15) |
| |
|
| | with gr.Column(scale=2): |
| | hierarchy_warning = gr.Markdown(value="", visible=True) |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### 🟢 Low") |
| | low_pct = gr.Label(label="Coverage", value="0%") |
| | low_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| | |
| | with gr.Column(): |
| | gr.Markdown("### 🟡 Intermediate") |
| | int_pct = gr.Label(label="Coverage", value="0%") |
| | int_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| | |
| | with gr.Column(): |
| | gr.Markdown("### 🔴 Proficient") |
| | prof_pct = gr.Label(label="Coverage", value="0%") |
| | prof_check = gr.CheckboxGroup(label="Subclaims", choices=[]) |
| |
|
| | submit_btn = gr.Button("Submit & Next Record", variant="primary", size="lg") |
| |
|
| | |
| | demo.load(load_example, [index_state], [source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, hierarchy_warning]) |
| | |
| | |
| | low_check.input( |
| | sync_from_low, |
| | [low_check, int_check, prof_check, subclaim_list_state], |
| | [low_pct, int_pct, prof_pct, hierarchy_warning, low_check, int_check, prof_check] |
| | ) |
| | |
| | int_check.input( |
| | sync_from_inter, |
| | [low_check, int_check, prof_check, subclaim_list_state], |
| | [low_pct, int_pct, prof_pct, hierarchy_warning, low_check, int_check, prof_check] |
| | ) |
| | |
| | prof_check.input( |
| | sync_from_prof, |
| | [low_check, int_check, prof_check, subclaim_list_state], |
| | [low_pct, int_pct, prof_pct, hierarchy_warning, low_check, int_check, prof_check] |
| | ) |
| | submit_btn.click(save_and_next, [user_input, index_state, source_display, low_check, int_check, prof_check, subclaim_list_state], [index_state, source_display, text_viewer, subclaim_list_state, low_pct, int_pct, prof_pct, low_check, int_check, prof_check, hierarchy_warning]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(share=True) |