| import gradio as gr |
| import json |
| import os |
| from datetime import datetime |
|
|
|
|
| def sanitize_username(username: str) -> str: |
| """Make username safe for filesystem paths.""" |
| if not username: |
| return "" |
| username = username.strip() |
| safe = "".join(ch for ch in username if ch.isalnum() or ch in ("_", "-")) |
| return safe |
|
|
| def get_user_session_file(username): |
| safe = sanitize_username(username) |
| return os.path.join(SAVE_DIR, f"ratings_{safe}.json") |
|
|
| language="Bengali" |
| if language=="Chinese": |
| language_code="ch" |
| elif language=="Hindi": |
| language_code="hi" |
| elif language=="Bengali": |
| language_code="be" |
| else: |
| assert False, "Unsupported language" |
|
|
|
|
| |
| TRANSLATION_PATH = f"/home/mshahidul/readctrl/data/translated_data/translation_english2bangla_v1.json" |
| with open(TRANSLATION_PATH, "r", encoding="utf-8") as f: |
| translation_dataset = json.load(f)[:50] |
|
|
| |
| SRC_PATH = f"/home/mshahidul/readctrl/data/testing_data_gs/multiclinsum_gs_train_en.json" |
| with open(SRC_PATH, "r", encoding="utf-8") as f: |
| src_dataset = json.load(f)[:50] |
|
|
| |
| dataset = [ |
| { |
| "src_fulltext": src_dataset[i]["fulltext"], |
| "translated_fulltext": translation_dataset[i]["fulltext_translated"]["translated_medical_note"], |
| "id": translation_dataset[i]["id"] |
| } |
| for i in range(min(len(src_dataset), len(translation_dataset))) |
| ] |
|
|
| |
| SAVE_DIR = f"/home/mshahidul/readctrl/data/translated_data/rating_info/{language_code}" |
| os.makedirs(SAVE_DIR, exist_ok=True) |
|
|
| SESSION_FILE = None |
|
|
| RATING_OPTIONS = [ |
| ("1 - Poor (Incorrect/Nonsense)", 1), |
| ("2 - Fair (Understandable but awkward)", 2), |
| ("3 - Good (Accurate/Perfect)", 3) |
| ] |
|
|
| custom_css = """ |
| .small-header { font-size: 0.85rem !important; font-weight: 600; margin-bottom: -10px; color: #555; } |
| .nav-row { background-color: #f9f9f9; padding: 10px; border-radius: 8px; margin-bottom: 15px; } |
| """ |
|
|
| def save_rating_to_json(data_item, username): |
| session_file = get_user_session_file(username) |
| output_data = [] |
| if os.path.exists(session_file): |
| with open(session_file, "r", encoding="utf-8") as f: |
| try: |
| output_data = json.load(f) |
| except json.JSONDecodeError: |
| output_data = [] |
|
|
| |
| if isinstance(output_data, dict): |
| records = output_data.get("records", []) |
| else: |
| records = output_data if isinstance(output_data, list) else [] |
|
|
| |
| new_index = data_item.get("index") |
| updated = False |
| for i, rec in enumerate(records): |
| if isinstance(rec, dict) and rec.get("index") == new_index: |
| records[i] = data_item |
| updated = True |
| break |
| if not updated: |
| records.append(data_item) |
|
|
| payload = { |
| "username": sanitize_username(username) or username, |
| "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "records": records, |
| } |
| with open(session_file, "w", encoding="utf-8") as f: |
| json.dump(payload, f, ensure_ascii=False, indent=4) |
|
|
|
|
| def load_user_records(username): |
| session_file = get_user_session_file(username) |
| if not os.path.exists(session_file): |
| return [] |
| try: |
| with open(session_file, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| if isinstance(data, dict): |
| records = data.get("records", []) |
| else: |
| records = data |
| return records if isinstance(records, list) else [] |
| except Exception: |
| return [] |
|
|
| def load_example(index): |
| total = len(dataset) |
| index = max(0, min(index, total - 1)) |
| item = dataset[index] |
| progress_pct = (index / total) * 100 |
| progress_text = f"Sample {index + 1} of {total} ({progress_pct:.1f}%)" |
| src_fulltext = item["src_fulltext"] |
| translated_fulltext = item["translated_fulltext"] |
| return ( |
| src_fulltext, |
| translated_fulltext, |
| None, |
| index, |
| progress_text, |
| progress_pct, |
| index + 1 |
| ) |
|
|
| def get_last_index_for_user(username): |
| if not username: |
| return 0 |
| records = load_user_records(username) |
| done_indices = set() |
| for rec in records: |
| if isinstance(rec, dict) and isinstance(rec.get("index"), int): |
| done_indices.add(rec["index"]) |
|
|
| |
| for i in range(len(dataset)): |
| if i not in done_indices: |
| return i |
| |
| return len(dataset) |
|
|
|
|
| def load_example_or_done(index): |
| if index >= len(dataset): |
| total = len(dataset) |
| progress_text = f"✅ Completed all {total} samples" |
| return ( |
| "✅ ALL DONE", |
| "✅ ALL DONE", |
| None, |
| total, |
| progress_text, |
| 100, |
| total, |
| ) |
| return load_example(index) |
|
|
| def next_item(index, rating, src_txt, eng_txt, username): |
| if rating is None: |
| raise gr.Error("Please select a rating before proceeding!") |
| if not username: |
| raise gr.Error("Please enter your username!") |
| safe_user = sanitize_username(username) |
| if not safe_user: |
| raise gr.Error("Username must contain letters/numbers (optionally _ or -).") |
| record = { |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "index": index, |
| "src_text": src_txt, |
| "translated_text": eng_txt, |
| "rating": rating, |
| "username": safe_user |
| } |
| save_rating_to_json(record, safe_user) |
| gr.Info(f"Saved record {index + 1} for {safe_user}.") |
|
|
| |
| next_idx = get_last_index_for_user(safe_user) |
| return load_example_or_done(next_idx) |
|
|
| def jump_to_instance(target_index): |
| return load_example_or_done(target_index - 1) |
|
|
| with gr.Blocks(css=custom_css) as demo: |
| username_box = gr.Textbox(label="Enter your username", value="", interactive=True) |
| login_btn = gr.Button("Start/Resume Session", variant="primary") |
| current_index = gr.State(0) |
| total_count = len(dataset) |
| gr.Markdown(f"### Translation Quality Annotation") |
| with gr.Row(elem_classes="nav-row"): |
| with gr.Column(scale=2): |
| progress_bar = gr.Slider(label="Progress", minimum=0, maximum=100, value=0, interactive=False) |
| progress_display = gr.Markdown(f"Sample 1 of {total_count} (0.0%)") |
| with gr.Column(scale=1): |
| jump_input = gr.Number(label="Jump to Sample #", value=1, precision=0) |
| jump_btn = gr.Button("Go", size="sm") |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("##### Source Fulltext (English)") |
| src_display = gr.Textbox(value=dataset[0]["src_fulltext"], interactive=False, lines=12, show_label=False) |
| with gr.Column(): |
| gr.Markdown("##### Fulltext Translation (Bangla)") |
| eng_display = gr.Textbox(value=dataset[0]["translated_fulltext"], interactive=False, lines=12, show_label=False) |
| rating_dropdown = gr.Dropdown(choices=RATING_OPTIONS, label="Select Rating") |
| with gr.Row(): |
| prev_btn = gr.Button("⬅ Previous (Review)", variant="secondary") |
| submit_btn = gr.Button("Save & Next ➡", variant="primary") |
|
|
| def login_user(username): |
| safe_user = sanitize_username(username) |
| if not safe_user: |
| raise gr.Error("Please enter a valid username (letters/numbers, _ or -).") |
| idx = get_last_index_for_user(safe_user) |
| return load_example_or_done(idx) |
|
|
| login_btn.click( |
| fn=login_user, |
| inputs=[username_box], |
| outputs=[src_display, eng_display, rating_dropdown, current_index, progress_display, progress_bar, jump_input] |
| ) |
|
|
| submit_btn.click( |
| fn=next_item, |
| inputs=[current_index, rating_dropdown, src_display, eng_display, username_box], |
| outputs=[src_display, eng_display, rating_dropdown, current_index, progress_display, progress_bar, jump_input] |
| ) |
|
|
| |
| prev_btn.click( |
| fn=lambda idx: load_example_or_done(idx - 1), |
| inputs=[current_index], |
| outputs=[src_display, eng_display, rating_dropdown, current_index, progress_display, progress_bar, jump_input] |
| ) |
|
|
| |
| jump_btn.click( |
| fn=jump_to_instance, |
| inputs=[jump_input], |
| outputs=[src_display, eng_display, rating_dropdown, current_index, progress_display, progress_bar, jump_input] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True) |