import os import json import hashlib import random import threading import time from dataclasses import dataclass from typing import List, Dict, Any import gradio as gr from PIL import Image from huggingface_hub import HfApi, CommitOperationAdd # ---------------------- # Configuration # ---------------------- # --- HF Repo --- HF_RESULTS_REPO = os.getenv("HF_RESULTS_REPO") HF_RESULTS_REPO_TYPE = "dataset" HF_TOKEN = os.getenv("HF_TOKEN") _hf_api = HfApi(token=HF_TOKEN) # --- Main settings --- TARGET_PER_PERSON = 30 CONTACT_EMAIL = "ffallah@asu.edu" # --- Paths --- GT_MASKED_DIR = "data/gt_b" # Image 1 GT_UNMASKED_DIR = "data/adc_b" # Image 2 SR_DIR = "data/sr_b" # Image 3 ORIGINAL_DIR = "data/lr_b" # Image 4 IMAGE_5_DIR = "data/see_b" # Image 5 # --- Results --- RESULTS_DIR = "results" PROGRESS_PATH = os.path.join(RESULTS_DIR, "progress.json") ALL_RESULTS_JSONL = os.path.join(RESULTS_DIR, "all_results.jsonl") SAVE_PII = True WRITE_LOCK = threading.Lock() STRICT_ENFORCEMENT = False # ---------------------- # Data model # ---------------------- @dataclass class Sample: sample_id: str masked_gt_path: str # Image 1 unmasked_gt_path: str # Image 2 sr_path: str # Image 3 original_path: str # Image 4 image_5_path: str # Image 5 # ---------------------- # Helpers # ---------------------- # def ensure_sample_objects(samples_input): # """ # Accepts either: # - list[Sample] (already objects), or # - list[dict] (serialized Sample.__dict__) # Returns list[Sample]. # """ # if not samples_input: # return [] # if isinstance(samples_input, list): # if len(samples_input) == 0: # return [] # first = samples_input[0] # if isinstance(first, dict): # try: # return [Sample(**s) for s in samples_input] # except Exception: # # fall through to returning empty to avoid crashes # return [] # elif isinstance(first, Sample): # return samples_input # return [] def user_target_count(samples: List[Sample]) -> int: return min(len(samples), TARGET_PER_PERSON) def user_left_count(user_seen: List[str], samples: List[Sample]) -> int: target = user_target_count(samples) seen = set(user_seen or []) allowed_ids = {s.sample_id for s in samples} seen_in_allowed = len([sid for sid in seen if sid in allowed_ids]) return max(0, target - seen_in_allowed) def _ensure_private_repo(repo_id: str): try: _hf_api.repo_info(repo_id, repo_type=HF_RESULTS_REPO_TYPE) except Exception: _hf_api.create_repo(repo_id=repo_id, repo_type=HF_RESULTS_REPO_TYPE, private=True) def push_results_to_private_repo(uid: str): if not HF_TOKEN or not HF_RESULTS_REPO: return try: os.makedirs(RESULTS_DIR, exist_ok=True) user_file = os.path.join(RESULTS_DIR, f"{uid}.jsonl") ops = [ CommitOperationAdd( path_in_repo="results/all_results.jsonl", path_or_fileobj=ALL_RESULTS_JSONL ), CommitOperationAdd( path_in_repo=f"results/users/{uid}.jsonl", path_or_fileobj=user_file ), CommitOperationAdd( path_in_repo="results/progress.json", path_or_fileobj=PROGRESS_PATH ), ] _hf_api.create_commit( repo_id=HF_RESULTS_REPO, repo_type=HF_RESULTS_REPO_TYPE, operations=ops, commit_message="Update RTS eval results" ) except Exception as e: print("[WARN] push_results_to_private_repo failed:", e) def ensure_paths(): os.makedirs(RESULTS_DIR, exist_ok=True) for pth, name in [ (GT_MASKED_DIR, "GT_MASKED_DIR"), (GT_UNMASKED_DIR, "GT_UNMASKED_DIR"), (SR_DIR, "SR_DIR"), (ORIGINAL_DIR, "ORIGINAL_DIR"), (IMAGE_5_DIR, "IMAGE_5_DIR"), ]: if not os.path.isdir(pth): print(f"Warning: Directory '{pth}' for {name} not found.") def load_image(path: str) -> Image.Image: if not path or not os.path.exists(path): # return a simple placeholder image so UI doesn't crash return Image.new("RGB", (256, 256), color="gray") try: return Image.open(path).convert("RGB") except Exception: return Image.new("RGB", (256, 256), color="gray") def load_dataset( gt_masked_dir: str, gt_unmasked_dir: str, sr_dir: str, original_dir: str, image_5_dir: str, ) -> List[Sample]: """ Build samples only from the 5 folders. Each folder should have the same filenames. Example layout: data/gt_b/xxx.png data/adc_b/xxx.png data/sr_b/xxx.png data/lr_b/xxx.png data/see_b/xxx.png """ def list_images(dir_path: str) -> set: if not os.path.isdir(dir_path): print(f"Warning: directory not found: {dir_path}") return set() files = [] for f in os.listdir(dir_path): f_lower = f.lower() if f_lower.endswith((".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp")): files.append(f) return set(files) masked_files = list_images(gt_masked_dir) unmasked_files = list_images(gt_unmasked_dir) sr_files = list_images(sr_dir) orig_files = list_images(original_dir) img5_files = list_images(image_5_dir) # Common filenames present in ALL 5 folders common_files = masked_files & unmasked_files & sr_files & orig_files & img5_files if not common_files: print("No common image files found in all 5 folders.") return [] # Optional: simple debug info print(f"Found {len(common_files)} common images.") samples: List[Sample] = [] for base_filename in sorted(common_files): sample_id = os.path.splitext(base_filename)[0] paths = { "masked": os.path.join(gt_masked_dir, base_filename), "unmasked": os.path.join(gt_unmasked_dir, base_filename), "sr": os.path.join(sr_dir, base_filename), "original": os.path.join(original_dir, base_filename), "img5": os.path.join(image_5_dir, base_filename), } # If STRICT_ENFORCEMENT is True, skip if any file missing if STRICT_ENFORCEMENT: if not all(os.path.exists(p) for p in paths.values()): missing = [k for k, v in paths.items() if not os.path.exists(v)] print(f"Skipping {base_filename}: missing in folders {missing}") continue samples.append( Sample( sample_id=sample_id, masked_gt_path=paths["masked"], unmasked_gt_path=paths["unmasked"], sr_path=paths["sr"], original_path=paths["original"], image_5_path=paths["img5"], ) ) return samples # ---------------------- # Progress & results I/O # ---------------------- def hash_user_id(name: str, email: str) -> str: norm = (name or "").strip().lower() + "|" + (email or "").strip().lower() return hashlib.sha256(norm.encode("utf-8")).hexdigest()[:16] def load_progress() -> Dict[str, Dict[str, Any]]: if not os.path.exists(PROGRESS_PATH): return {} try: with open(PROGRESS_PATH, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {} def save_progress(progress: Dict[str, Dict[str, Any]]): with WRITE_LOCK: with open(PROGRESS_PATH, "w", encoding="utf-8") as f: json.dump(progress, f, ensure_ascii=False, indent=2) def append_jsonl(path: str, record: Dict[str, Any]): line = json.dumps(record, ensure_ascii=False) with WRITE_LOCK: with open(path, "a", encoding="utf-8") as f: f.write(line + "\n") # ---------------------- # LOGIC FOR CONVERTING SLIDERS TO RANK # ---------------------- def convert_scores_to_rank(s1, s2, s3, s4, s5) -> Dict[str, int]: scores = [ ("image_1", s1), ("image_2", s2), ("image_3", s3), ("image_4", s4), ("image_5", s5) ] scores.sort(key=lambda x: x[1], reverse=True) ranks = {} current_rank = 1 for img_key, score in scores: ranks[img_key] = current_rank current_rank += 1 return ranks # ---------------------- # App logic # ---------------------- def pick_next_index(user_seen: List[str], samples: List[Sample]) -> int: # FIX: define seen_set and use samples directly seen_set = set(user_seen or []) remaining = [i for i, s in enumerate(samples) if s.sample_id not in seen_set] if not remaining: return -1 return random.choice(remaining) def start_or_resume(name: str, email: str): if not name or not email: raise gr.Error("Please enter your name and email to begin.") ensure_paths() samples = load_dataset(GT_MASKED_DIR, GT_UNMASKED_DIR, SR_DIR, ORIGINAL_DIR, IMAGE_5_DIR) if not samples: raise gr.Error("No images found. Please check dataset configuration.") uid = hash_user_id(name, email) progress = load_progress() if uid not in progress: progress[uid] = {"seen": []} save_progress(progress) user_seen: List[str] = progress[uid].get("seen", []) left = user_left_count(user_seen, samples) # placeholder image to avoid Gradio trying to load None placeholder_img = Image.new("RGB", (256, 256), color="gray") # If the user has finished their target if left == 0 and len(user_seen) >= user_target_count(samples): status = ( f"Welcome back, {name}. You’ve completed all {user_target_count(samples)} images. 🎉\n" f"Your personal results file: {os.path.join(RESULTS_DIR, f'{uid}.jsonl')}" ) return ( uid, samples, user_seen, -1, placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, status, os.path.join(RESULTS_DIR, f"{uid}.jsonl"), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), ) idx = pick_next_index(user_seen, samples) if idx == -1: return ( uid, samples, user_seen, -1, placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, "No more new images available.", "", gr.update(visible=False), gr.update(visible=True), gr.update(visible=True) ) sample = samples[idx] status = ( f"Welcome, {name}. Personal progress — images left: {left} of {user_target_count(samples)}.\n" f"Current sample: {sample.sample_id}" ) os.makedirs(RESULTS_DIR, exist_ok=True) user_file_path = os.path.join(RESULTS_DIR, f"{uid}.jsonl") return ( uid, samples, user_seen, idx, load_image(sample.masked_gt_path), load_image(sample.unmasked_gt_path), load_image(sample.sr_path), load_image(sample.original_path), load_image(sample.image_5_path), status, user_file_path, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ) def _save_record_and_progress( name: str, email: str, uid: str, samples: List[Sample], user_seen: List[str], idx: int, score_1: float, score_2: float, score_3: float, score_4: float, score_5: float, q1_notes: str, ): if not name or not email: raise gr.Error("Please enter your name and email.") # FIX: use samples directly if idx is None or idx < 0 or idx >= len(samples): return load_progress() rank_dict = convert_scores_to_rank(score_1, score_2, score_3, score_4, score_5) sample = samples[idx] progress = load_progress() progress.setdefault(uid, {"seen": []}) seen = set(progress[uid].get("seen", [])) if sample.sample_id in seen: return progress record: Dict[str, Any] = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "user_id": uid, "name": name if SAVE_PII else None, "email": email if SAVE_PII else None, "sample_id": sample.sample_id, "raw_scores": { "image_1": score_1, "image_2": score_2, "image_3": score_3, "image_4": score_4, "image_5": score_5, }, "responses": { "notes": q1_notes or "", "image_ranking": rank_dict, }, } os.makedirs(RESULTS_DIR, exist_ok=True) append_jsonl(os.path.join(RESULTS_DIR, f"{uid}.jsonl"), record) append_jsonl(ALL_RESULTS_JSONL, record) # start background push but don't let failures crash the app try: thread = threading.Thread(target=push_results_to_private_repo, args=(uid,)) thread.daemon = True thread.start() except Exception: pass seen.add(sample.sample_id) progress[uid]["seen"] = sorted(list(seen)) save_progress(progress) return progress # ---------------------- # Buttons # ---------------------- def submit_finish( name: str, email: str, uid: str, samples: List[Sample], user_seen: List[str], idx: int, s1: float, s2: float, s3: float, s4: float, s5: float, q1_notes: str ): try: _save_record_and_progress( name, email, uid, samples, user_seen, idx, s1, s2, s3, s4, s5, q1_notes ) except gr.Error: return ( user_seen, idx, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) return ( user_seen, idx, gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=""), gr.update(value="Finished!"), gr.update(value=5), gr.update(value=5), gr.update(value=5), gr.update(value=5), gr.update(value=5), gr.update(value=None), ) def pause_exit(user_seen, samples): return user_seen, samples def submit_next_image( name: str, email: str, uid: str, samples: List[Sample], user_seen: List[Sample], idx: int, s1: float, s2: float, s3: float, s4: float, s5: float, q1_notes: str ): try: progress = _save_record_and_progress( name, email, uid, samples, user_seen, idx, s1, s2, s3, s4, s5, q1_notes ) except gr.Error as e: raise e seen_list = progress.get(uid, {}).get("seen", []) left_after = user_left_count(seen_list, samples) target = user_target_count(samples) # placeholder image to avoid Gradio trying to load None placeholder_img = Image.new("RGB", (256, 256), color="gray") # If user reached the target, return placeholders for images and let the then() chain show thanks if left_after == 0: status = ( f"Saved! You’ve completed all {target} images. 🎉 " f"Click **Exit** to close this session." ) return ( seen_list, -1, placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, gr.update(value=status), gr.update(value=""), 5, 5, 5, 5, 5, ) idx_next = pick_next_index(seen_list, samples) if idx_next == -1: # no more images but target not met (rare). return placeholders too. return ( seen_list, -1, placeholder_img, placeholder_img, placeholder_img, placeholder_img, placeholder_img, "No more images.", "", 5, 5, 5, 5, 5, ) # FIX: define sample_next correctly sample_next = samples[idx_next] return ( seen_list, idx_next, load_image(sample_next.masked_gt_path), load_image(sample_next.unmasked_gt_path), load_image(sample_next.sr_path), load_image(sample_next.original_path), load_image(sample_next.image_5_path), gr.update(value=""), gr.update(value=""), 5, 5, 5, 5, 5, ) def to_thanks(name: str, user_seen: List[str], samples: List[Sample]): left = user_left_count(user_seen, samples) target = user_target_count(samples) if left > 0: msg = ( f"### ⏸️ Session Paused!\n\n" f"### ✅ Thanks, {name}! Your progress has been saved.\n\n" f"We’re grateful for your time and expertise. Our suggested target is " f"{TARGET_PER_PERSON} images per reviewer.\n\n" f"You have **{left}** images left.\n\n" f"You can close this tab and return whenever you like—just use the same Name and Email to **continue where you left off**.\n\n" f"If you have questions, issues, or suggestions, please email **{CONTACT_EMAIL}**.\n\n" f"Click **Start Again** to evaluate another image." ) else: msg = ( f"### ✅ All Done, {name}!\n\n" f"You’ve completed the target of **{target}** images. Your responses are securely saved.\n\n" f"We’re extremely grateful for your time and expertise. You are welcome to continue with more images if you wish, or you can finish here.\n\n" f"If you have questions, issues, or suggestions, please email **{CONTACT_EMAIL}**.\n\n" ) return gr.update(visible=False), gr.update(visible=True), gr.update(value=msg) def hide_thanks(): return gr.update(visible=False) def maybe_show_thanks(name: str, seen: List[str], samples: List[Sample]): if len(set(seen or [])) >= TARGET_PER_PERSON: return to_thanks(name, seen, samples) return gr.update(visible=True), gr.update(visible=False), gr.update() def reset_to_start(): return ( gr.update(value=""), # Clear Name gr.update(value=""), # Clear Email gr.update(visible=True), # Show Start Group gr.update(visible=True), # Show Intro gr.update(visible=False), # Hide Eval gr.update(visible=False), # Hide Thanks ) # ---------------------- # UI # ---------------------- with gr.Blocks(title="RTS Human Evaluation", theme=gr.themes.Soft()) as demo: intro_md = gr.Markdown( f""" # Retrogressive Thaw Slump (RTS) Human Evaluation ### 👋 Welcome, and thanks for lending your expertise! We’re inviting domain experts to help evaluate satellite image patches for RTS. --- ### 📋 Instructions * **Suggested target:** ~{TARGET_PER_PERSON} images per reviewer. * **The Task:** For each set, you will see 5 variations of the same satellite image. * **Rating:** Rate each image from **1 (Poor)** to **10 (Excellent)** based on how clearly the RTS feature (indicated by the **Red Box**) is depicted. ### ⏸️ Saving & Resuming * **Automatic Saving:** Your progress is saved automatically after every "Submit". * **Take a Break:** You can close this tab at any time. * **How to Resume:** Simply return here and enter the **exact same Name and Email**. The system will pick up exactly where you left off. --- **Questions or issues?** Email **{CONTACT_EMAIL}** — we appreciate your feedback and suggestions. **Ready?** Enter your details below to begin. """ ) # Hidden states state_uid = gr.State("") state_samples = gr.State([]) state_seen = gr.State([]) state_idx = gr.State(-1) with gr.Group() as start_group: with gr.Row(): name = gr.Textbox(label="Full name", placeholder="Jane Doe", autofocus=True) email = gr.Textbox(label="Email address", placeholder="jane@example.com") start_btn = gr.Button("Start / Resume", variant="primary") status = gr.Markdown("\n") eval_panel = gr.Group(visible=False) with eval_panel: gr.Markdown( """ Focus your attention on the area inside the **Red Box**. This marks the potential location of the RTS. Compare the five images below. Rate how clearly and realistically each image depicts the **RTS** feature. **Rating Scale (1 - 10):** * **10 (Excellent):** The RTS feature is sharp, distinct, and clearly visible. * **1 (Poor):** The RTS feature is blurry, distorted, or impossible to distinguish. """ ) with gr.Row(): with gr.Column(scale=1, min_width=150): gr.Markdown("
Image 1
") image_1 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) score_1 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") with gr.Column(scale=1, min_width=150): gr.Markdown("
Image 2
") image_2 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) score_2 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") with gr.Column(scale=1, min_width=150): gr.Markdown("
Image 3
") image_3 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) score_3 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") with gr.Column(scale=1, min_width=150): gr.Markdown("
Image 4
") image_4 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) score_4 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") with gr.Column(scale=1, min_width=150): gr.Markdown("
Image 5
") image_5 = gr.Image(show_label=False, interactive=False, height=256, show_download_button=False) score_5 = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Score (1-10)") notes_q1 = gr.Textbox( label="Notes (Optional)", lines=2, placeholder="If there are multiple RTS or ambiguities, please note here." ) with gr.Row(): submit_next_btn = gr.Button("Submit & Next Image", variant="primary") pause_exit_btn = gr.Button("Exit", variant="secondary") your_jsonl_path = gr.State() with gr.Group(visible=False) as thanks_group: thanks_md = gr.Markdown("### ✅ Thanks! Your responses were saved.\n\nClick **Start Again** to evaluate another image.") restart_btn = gr.Button("Start Again", variant="primary") # --- Wiring --- start_event = start_btn.click( start_or_resume, inputs=[name, email], outputs=[ state_uid, state_samples, state_seen, state_idx, image_1, image_2, image_3, image_4, image_5, status, your_jsonl_path, eval_panel, intro_md, start_group ], ) start_event.then(hide_thanks, inputs=None, outputs=[thanks_group]) # 1. When Pause is clicked, just pass the state through pause_event = pause_exit_btn.click( pause_exit, inputs=[state_seen, state_samples], outputs=[state_seen, state_samples], ) # 2. Then show the "Thanks/Resume" screen with the 'how many left' message pause_event.then( to_thanks, inputs=[name, state_seen, state_samples], outputs=[eval_panel, thanks_group, thanks_md], ) nextimg_event = submit_next_btn.click( submit_next_image, inputs=[name, email, state_uid, state_samples, state_seen, state_idx, score_1, score_2, score_3, score_4, score_5, notes_q1], outputs=[state_seen, state_idx, image_1, image_2, image_3, image_4, image_5, status, notes_q1, score_1, score_2, score_3, score_4, score_5], ) nextimg_event.then( maybe_show_thanks, inputs=[name, state_seen, state_samples], outputs=[eval_panel, thanks_group, thanks_md], ) restart_event = restart_btn.click( reset_to_start, inputs=[], outputs=[ name, email, start_group, intro_md, eval_panel, thanks_group ], ) if __name__ == "__main__": if HF_RESULTS_REPO: from huggingface_hub import snapshot_download try: snapshot_download( repo_id=HF_RESULTS_REPO, repo_type="dataset", local_dir=".", allow_patterns=["data/*", "results/*"], token=HF_TOKEN ) except Exception as e: print(f"Error reading from HF: {e}") ensure_paths() _ = load_dataset(GT_MASKED_DIR, GT_UNMASKED_DIR, SR_DIR, ORIGINAL_DIR, IMAGE_5_DIR) print("✅ Launching app.") demo.queue() demo.launch()